Добро пожаловать на домашнюю страницу языка программирования MC#!
Проект MC#
Главная страница 
Язык MC# 
Документация 
Публикации 
 Примеры 
FAQ 

 
Дистрибутивы
Система программирования MC# 

 
Ссылки
Parallel C# 
Polyphonic C# 
Проект SKIF 

 
Контакты
 Контакты 


Mono powered

Microsoft .Net powered


 
   Программирование с помощью каналов на языке MC#

 

Механизмы взаимодействия объектов в параллельном объектно-ориентированном языке программирования MC#

 

 

Гузев В. Б.

Российский Университет Дружбы Народов,

Москва, Россия,

e-mail: vadim@joker.botik.ru

Сердюк Ю.П.

Институт Программных Систем РАН,

Переславль-Залесский, Россия

e-mail: Yury@serdyuk.botik.ru

Декабрь, 2003

 

 

Механизмы взаимодействия объектов в параллельном объектно-ориентированном языке программирования MC#. 1

Введение. 1

Общий обзор языка программирования  MC#. 2

Классификация каналов в языке MC#. 2

Описание типов каналов с примерами их применения.. 3

Channel 3

BDChannel 5

BoundedChannel 5

TransientChannel 6

SynchronizedChannel 7

Дополнительные встроенные классы... 8

Activity. 8

Redirector. 9

Multiplier. 10

Создание пользовательских каналов.. 11

Фильтрация данных. 11

Пример аккумулирующего потока. 12

Каналы с приоритетами. 13

Обзор других языков, использующих конструкцию “канал” для взаимодействия процессов/объектов.. 17

Список литературы... 19

Приложение. 20

 

 

Введение

Базовым механизмом взаимодействия параллельных процессов в
современных языках программирования для кластерных архитектур
является асинхронная передача сообщений. Её суть заключается в том, что передача сообщений производится в отдельном потоке, причём пославший сообщение  поток, не дожидается получения этого сообщения, а продолжает свою работу. В различных приложениях кроме базовых асинхронных механизмов передачи сообщений требуются специальные методы общения между параллельными процессами. Например, в качестве защиты от перегрузок могут понадобиться средства ограничения длины очереди сообщений. Другим примером может послужить необходимость задания приоритетов для сообщений в системах реального времени, таких как биржи. Цель данной работы продемонстрировать возможности языка MC# для решения подобных задач, а именно, описать механизмы работы с каналами. Предполагается, что читатель уже знаком с основами языка MC# (начальные сведения по языку MC# можно найти в статье [MCS2], а также на сайте [MCSH]).

 

 

Общий обзор языка программирования  MC#

 

Язык MC# базируется на модели программирования, предложенной  Н. Бентоном (N.Benton), Л. Карделли ( L. Cardelli ) и Ц. Фурнье ( C. Fournet )  в языке Polyphonic C# [POLY]. Целью создания этого языка было добавление высокоуровневых средств асинхронного параллельного программирования в язык C#, входящий в состав программной платформы .NET фирмы Microsoft.

Ключевая особенность языка Polyphonic C# заключается в добавлении к обычным синхронным методам, так называемых "асинхронных" методов, которые предназначены играть в программе две основные роли:

1)      автономных методов, исполняемых в отдельных потоках;

2)      методов, предназначенных для доставки данных обычным, синхронным методам.

Для синхронизации методов обоих видов в язык C#, кроме того, были добавлены новые конструкции, получившие название связок (chords).

Связки – это набор заголовков функций, объединённых символами  ‘&&’, причём у связки существует только одно тело функции. Тело функции исполняется только тогда, когда будут вызваны все методы, входящие в заголовок связки. В связке может участвовать не более одного синхронного метода, который может возвращать какое-либо значение – результат исполнения тела связки. Более подробно ознакомиться с этими конструкциями можно на странице [POLY].

Основным нововведением в MC# стало введение дополнительных “перемещаемых” методов (movable-методов), которые могут исполняться на любом из узлов кластера (или GRID-сети), а также каналов для возврата результатов из movable-функций и способа объединения каналов в связки с обычными синхронными методами. Как оказалось, этого вполне достаточно, чтобы создавать простые, но в тоже время полнофункциональные программы, которые к тому же могут использовать всю мощь кластерных технологий. Кроме того, Runtime-система MC# обеспечивает динамическое распараллеливание, т.е. только на этапе исполнения в зависимости от загруженности узлов решается вопрос, на каком из них должна исполняться конкретная movable-функция. Ранее такой подход уже использовался в других проектах (например, в T-системе [TSYS], [ABRA]) и доказал свою эффективность.

 

 

Классификация каналов в языке MC#

 

В MC# для передачи данных между объектами, располагающимися на разных узлах, используются каналы. Каналы обязательно привязаны к узлу, на котором они были созданы, т.е. все сообщения, отсылаемые по каналу, отправляются сначала на узел привязки. Копии каналов могут быть переданы на другой узел в качестве параметров movable-методам и впоследствии, использоваться на удалённом узле для получения и отправления сообщений. Таким образом, в MC# каналы являются средством общения между объектами на разных узлах. При “переходе” на другой узел канал регистрируется локально и может быть использован повторно.

Все каналы подразделяются на однонаправленные и двунаправленные каналы. Пользователь может только посылать свои данные в однонаправленные каналы, но не может с них “напрямую” читать – чтение производится при срабатывании связок, в которых участвует данный канал. Над двунаправленными каналами можно производить обе операции - как чтение, так и запись.

Один и тот же канал может участвовать в нескольких связках, что позволяет создавать каналы с приоритетами - задействована будет та связка, которая объявлена раньше в исходном коде (однако, такой способ имеет свои недостатки, о которых будет рассказано в главе “Каналы с приоритетами”).

Далее, по типу очередей каналы можно подразделить на следующие подтипы:

-         каналы с ограниченной ёмкостью накопителя;

-         с неограниченной ёмкостью накопителя;

-         каналы без накопителей.

Каналы без накопителей, как таковых, обычно содержат только последний прибывший объект, который не удаляется при чтении, но заменяется другим объектом при поступлении нового сообщения.

Каналы могут быть асинхронными и синхронными. В первом случае, сразу же после вызова Send (метода отправки какого-либо объекта), поток не дожидаясь, пока сообщение будет прочитано, продолжает своё исполнение. Во втором, поток приостанавливает свою работу до тех пор, пока сообщение не будет прочитано каким-либо другим процессом.

По умолчанию, в MC# по каналам могут быть переданы объекты любых типов (поддерживающих сериализацию). Но канал может быть типизированным, т.е. по нему можно послать данные только определённых типов. Более того, они могут фильтровать данные или производить какие-либо действия с проходящими через них данными.

В MC# пользователь может сам создавать свои собственные каналы, наделяя их той или иной функциональностью, размером очередей и т.д. (более подробно об этом см. в разделе “Создание пользовательских каналов”).

 

 

Описание типов каналов с примерами их применения

Channel

Класс Channel является основным в иерархии однонаправленных каналов языка MC#. Все остальные однонаправленные каналы должны быть унаследованы от этого класса. Отличие однонаправленных каналов от двунаправленных в том, что пользователь не может “вручную” произвести из него чтение. Фактически чтение из каналов, унаследованных от класса Channel, может производиться только с помощью связок (т.е. чтение производится, когда будет задействована та или иная связка).

Как и асинхронные методы в Polyphonic C#, каналы могут участвовать в связках. Связка состоит из заголовка и тела метода, где заголовок – несколько объявлений методов, разделённых символом ‘&’:

 

int Get() & Channel c1( int x ) & Channel c2( int y )

{

 return x + y;

}

 

Тело связки выполняется только когда все методы, входящие в связку были вызваны. Одиночные вызовы методов выстраиваются в очередь до тех пор, пока не будут удовлетворять заголовку какой-либо связки.

В любой связке только один метод может быть синхронным. В потоке, связанным с этим методом исполняется тело связки. Возвращаемое в теле связки значение становится возвращаемым значением синхронного метода.

 

В случае если в связке отсутствует синхронный метод, а по всем каналам, входящим в данную связку поступили новые сообщения, то автоматически будет исполнено тело связки в новом потоке. При этом тело связки не возвращает никуда результат своей работы, однако эта особенность может оказаться очень удобной, например, для прорисовки пользовательского интерфейса. В следующем примере при вызове функции mfun должны распечататься числа от 0 до 99:

 

movable mfun( Channel screen )

{

 for ( int i = 0; i < 100; i++ )

 {

  screen.Send( i );

 }

}

 

Channel RefreshScreen( int x )

{

 Console.WriteLine( “Current value is “ + x );

}

 

void fun() {

 mfun( this.RefreshScreen );

}

 

В языке Polyphonic C# не существовало понятия “канал”, а ключевое слово async использовалось для синхронизации потоков с помощью связок. Если класс Channel используется только локально (т.е. из разных потоков, но на одной машине), то его функциональность полностью совпадает с async из языка Polyphonic C#, а именно:

  1. отправка сообщений происходит практически мгновенно
  2. каналы можно объединять в связки и использовать для синхронизации.

Отличие каналов от async лишь в том, что канал может быть переправлен на другие узлы, и сообщения в этот канал могут отправляться с любого узла, где имеется копия этого канала.

 

Каждый канал имеет два уникальных идентификатора: primaryGuid и guid. Первый из них (primaryGuid) – это идентификатор данного канала, на узле, где он впервые был создан. При отправлении какого-либо сообщения по каналу, оно становится в канальную очередь на узле первоначального создания (поиск канала на этом узле производится по primaryGuid). Идентификатор guid генерируется автоматически при каждом “мигрировании” канала на новый узел. Этот идентификатор используется при чтении из удалённого источника.

При отправлении какого-либо сообщения по каналу, оно обязательно отправляется на тот узел, где первоначально был создан канал. Этот факт следует учитывать при написании программ на языке MC#.

По каналу можно послать сообщения при помощи метода Send. Метод Send возвращает true, если объект был доставлен, в противном случае он возвращает false. Метод Put используется для постановки какого-либо объекта в локальную очередь – его можно переопределить при написании пользовательских типов каналов.

Из приведённого ниже фрагмента интерфейса класса Channel видно, что, вообще говоря, по каналу может быть отправлен объект любого типа (типизация каналов будет добавлена с выходом платформы .Net, поддерживающей темплейты в языке C#). Для проверки на соответствие типов (а также для проверки значений отсылаемых значений) можно использовать метод CheckValues, который вызывается каждый раз перед отправлением какого-либо сообщения. Если CheckValues возвращает значение true, то данные отсылаются на другой узел и становятся на том узле в локальную очередь.

 

[Serializable()]

public class Channel

{

 [NonSerialized] public Queue objects;

 [NonSerialized] public ThreadQ readers;

 [NonSerialized] public bool CreatedOnThisMachine = false;

 [NonSerialized] protected string privateGuid;

 protected string privatePrimaryGuid;

 public string guid { get { return this.privateGuid; } }

 public string primaryGuid { get { return this.privatePrimaryGuid } }

 public string host;

 public int port;

 

 public virtual bool CheckValues( object parameters ) { return true; }

 public virtual object BeforeSend( object parameters ) { return parameters; };

 public virtual object AfterSend( object parameters ) { return parameters; };

 public virtual object BeforeReceive( object parameters ) { return parameters;};

 public virtual object AfterReceive( object parameters ) { return parameters; };

 

 public bool Send( object parameters )

 {

  if ( CheckValues( parameters ) != true )

   return false;

  // Отправка parameters на узел, где первоначально был создан канал

  // и постановка локальная постановка в очередь

  // ...

  return true;

 }

 

 // Добавление parameters в локальную очередь

 private void Put( object parameters ) { ... }

 

 // Другие методы класса Channel ...

}

 

BDChannel

Класс BDChannel является основным для всех двунаправленных каналов в иерархии каналов MC#. В классе BDChannel дублируются все свойства и методы класса Channel. Единственным добавлением в интерфейсе этого класса является метод Receive, производящий чтение из потока. В случае если канальная очередь пуста, вызов этого метода блокирует поток до тех пор, пока не поступит новый объект и не произведётся чтение из канала. Экземпляры класса BDChannel можно также передавать в качестве параметров movable-функциям сколько угодно раз.

Следует заметить, что двунаправленные каналы, в отличие от однонаправленных каналов, не могут участвовать в связках.

Ниже приведён пример создания и передачи канала в movable-метод.

 

movable mfun( BDChannel bdc )

{

 bdc.Send( “Hello world!” );

}

 

void fun()

{

 BDChannel bdc = new BDChannel();

 Console.WriteLine( bdc.Receive() );

}

 

Данный пример должен напечатать “Hello world!”

BoundedChannel

В определённом типе задач часто необходимо ограничить ёмкость накопителя какого-либо канала. Для этих целей существует класс BoundedChannel. Типичный пример использования канала, у которого максимальная длина очереди равна 10, выглядит следующим образом:

 

int Get() & BoundedChannel( 10 ) bc( int x )

{

 return x;

}

 

void fun1()

{

 for ( int i = 0; i < 100; i++ ) bc( i );

 for ( int i = 0; i < 10; i++ ) Console.Write( Get() + “ “ );

}

 

Этот код должен вывести последние десять чисел, поступившие в канал (т.е. числа от 89 до 99). Аналогично, двунаправленный канал с длиной очереди равной 10, создаётся следующим образом:

 

BoundedBDChannel bdc = new BoundedBDChannel( 10 );

 

Использовать BoundedBDChannel можно так же, как и обычный BDChannel.

 

TransientChannel

Иногда не нужно хранить все заявки в очереди, а достаточно только хранения последней поступившей заявки. Для этой цели служат каналы типа TransientChannel. Особенность этого типа каналов в том, что очередь у него состоит всего из одного элемента, но при чтении из этого канала этот элемент не удаляется (в этом и состоит его отличие от типа каналов BoundedChannel с длиной очереди равной 1). Если кто-либо посылает сообщение по этому каналу, то старое значение заменяется новым.

Пусть, например, у нас имеется movable-функция Stock (биржа), поставляющая каждую секунду котировки по какому-либо финансовому инструменту (в нашем примере это случайные числа). Задача состоит в том, чтобы при каждом изменении котировки необходимо распечатывать новую цену.

 

movable Stock( TransientChannel client )

{

 Random r  = new Random();

 while ( true )

 {

  client( r.Next( 100 ) );

  Thread.Sleep( 1000 );

 }

}

 

TransientChannel client( double price )

{

 Console.WriteLine( price );

}

 

void mainFun()

{

 Stock( this.client );

 Console.ReadLine();

}

 

Приведённый выше код при вызове функции mainFun будет печатать текущую цену каждую секунду до тех пор, пока не будет произведён ввод какой-либо строки с консоли (если только текущая цена отличается от предыдущей цены).

Аналогично, можно создать двунаправленный канал, у которого не будет очереди сообщений:

 

TransientBDChannel tbdc = new TransientBDChannel();

 

SynchronizedChannel

Базовые каналы языка МС# являются асинхронными. При посылке сообщения по каналам типа Channel оно становится в очередь на узле первоначальной “регистрации” канала, а поток продолжает исполняться. Однако, иногда необходимо при передаче сообщений между процессами Р1 и Р2, чтобы операция посылки сообщения Send заканчивалась только тогда, когда сопутствующий процесс выдает команду Receive, и наоборот.

Для этих целей в MC# служат каналы типа SynchronizedChannel. При вызове метода Send поток, который отсылает сообщение, блокируется до тех пор, пока сообщение на другом узле не будет прочитано (т.е. пока не будет задействована связка, в которой данный канал участвует).

На самом деле, синхронизированные каналы - это исходный тип каналов, который был принят в p-исчислении, когда оно появилось (и даже в более ранних исчислениях). Однако затем появились варианты исчислений с асинхронными каналами, что объяснялось двумя причинами:

а) во-первых, асинхронные каналы являются более базовыми единицами, чем  синхронные каналы, другими словами, синхронные каналы выразимы через асинхронные каналы, но не наоборот (пример этого см. ниже);

б) во-вторых, асинхронная передача сообщений лежит в основе механизма взаимодействия  процессов (машин),  принятого в реальных компьютерах.

 

class  SyncChannel  

{

 private int Get() & private Channel c( int x ) 

 {

  return ( x );

 }

 

 private void Get_Ack() & private Channel ack() 

 {

  return;

 }

 

 public void Send( int x ) 

 {

  c( x );

  Get_Ack();

 }

 

 public int Receive()

 {

  int x = Get();

  ack();

  return ( x );

 }

}

 

В следующем примере имеется две movable-функции fun1 и fun2, параметрами которых являются два синхронизированных канала, связанные между собой связкой. По каналам посылаются новые данные только после того, как произведётся чтение из них, т.е. при срабатывании связки c1/c2. В результате исполнения приведённого ниже фрагмента должна выводиться строка “Hello world!” до тех пор, пока не будет нажата клавиша “Enter”.

 

SynchronizedChannel c1( string s1 ) & SynchronizedChannel c2(string s2 )

{

 Console.WriteLine( s1 + s2 );

}

 

movable fun1( SynchronizedChannel c1 )

{

 while ( true )

 {

  c1.Send( “Hello ” );

 }

}

 

movable fun2( SynchronizedChannel c2 )

{

 while ( true )

 {

  c2.Send( “world!” );

 }

}

// ...

void test()

{

 fun1( this.c1 );

 fun2( this.c2 );

 Console.ReadLine();

}

 

Дополнительные встроенные классы

 

Для удобства программирования в язык был добавлен набор классов, обеспечивающий наиболее часто используемые функции, оперирующие с каналами, такие как перенаправление потоков данных, дублирование потоков и т.д. Все классы встроены в Runtime-систему и в основном реализованы на уровне протокола передачи данных в виде отдельных объектов.

Activity

Ранее, в разделе, описывающем тип Channel, был рассмотрен пример программы, в котором при получении какого-либо сообщения по каналу, оно автоматически обрабатывалось на узле кластера, где первоначально был создан канал. Но как сделать так, чтобы автоматическое обслуживание могло производиться и на других узлах, где имеется копия данного канала? Класс Activity помогает решить данную проблему. Основные принципы работы Activity можно описать следующим образом:

  1. Каждый из экземпляров Activity может быть закреплён только за одним каналом, однако несколько экземпляров Activity могут быть “подключены” к одному и тому же каналу
  2. Activity прослушивает этот канал и “ждёт” поступления данных
  3. При поступлении данных тут же происходит их считывание, и эти данные передаются всем экземплярам Activity, “подключённых” к данному каналу. В каждом из экземпляров производится запуск метода Run, параметром которого являются считанные данные
  4. Для создания собственного класса MyActivity необходимо его унаследовать от класса Activity и переопределить метод Run(object parameters)
  5. Экземпляры Activity могут быть динамически подключены и отключены от любого двунаправленного канала.

Ниже приведён пример использования класса Activity:

 

using System;

using System.Runtime.Serialization;

using MCSharp;

using System.Threading;

 

[Serializable]

public class MyActivity : Activity

{

 public override void Run( object parameters )

 {

  Console.WriteLine( "MyActivity is running... " + parameters );

 }

}

 

[Serializable]

public class ActivityDemo

{

 public void fun1( BDChannel bdc )

 {

  MyActivity ma = new MyActivity();

  ma.Register( bdc );

  MyActivity ma2 = new MyActivity();

  ma2.Register( bdc );

  bdc.Send( 1 );

  bdc.Send( 2 );

  Thread.Sleep( 3000 );

  ma2.Unregister( bdc );

  bdc.Send( 3 );

 }

 

 public static void Main( string[] args )

 {

  BDChannel bdc = new BDChannel();

  ActivityDemo ad = new ActivityDemo( bdc );

  ad.fun1();

 }

}

 

Данный пример должен выдать следующее:

 

MyActivity is running... 1

MyActivity is running... 1

MyActivity is running... 2

MyActivity is running... 2

MyActivity is running... 3

 

Redirector

При программировании определённых задач иногда необходимо перенаправить весь поток данных из одного канала (канала-поставщика) в другой (канал-потребитель). Для этой цели можно использовать объект класса Redirector, в конструкторе которого первым аргументом указывается входящий поток, а вторым - выходящий.

 

BDChannel inChannel = new BDChannel();

BDChannel outChannel = new BDChannel();

Redirector r = new Redirector( inChannel, outChannel );

inChannel.Send( “some object” );

Console.WriteLine( outChannel.Receive() );

 

Приведённый фрагмент должен напечатать строку “some object”.

Multiplier

Класс Multiplier является обобщением класса Redirector, на случай, когда есть несколько потребителей и несколько поставщиков. Причём этот класс позволяет динамически подключать и отключать как каналы-потребители, так и каналы-поставщики. Для этого существуют методы RegisterConsumer, RegisterProducer, UnregisterConsumer, UnregisterProducer, параметром которых служит регистрируемый канал. Потоки данных всех производителей суммируются и передаются всем потребителям одновременно. Приведём пример использования этого класса:

 

using System;

using System.Threading;

 

public class MultipliersDemo

{

 public void fun1( BDChannel bdc, BDChannel bdc2 )

 {

  Multiplier m = new Multiplier();

  m.RegisterProducer( bdc );

 

  BDChannel bdc3 = new BDChannel();

  m.RegisterConsumer( bdc2 );

  m.RegisterConsumer( bdc3 );

 

  fun3( bdc3 );

 

  Thread.Sleep( 6000 );

 

  for ( int i = 0; i < 10; i++ )

   bdc.Send( i );

 }

 

 public void fun2( BDChannel bdc2 )

 {

  Console.WriteLine( "fun2 is running..." );

  while ( true )

   Console.WriteLine( "fun2: " + (int) bdc2.Receive() );

 }

 

 public void fun3( BDChannel bdc3 )

 {

  Console.WriteLine( "fun3 is running..." );

  while ( true )

   Console.WriteLine( "fun3: " + (int) bdc3.Receive() );

 }

 

 public static void Main( string[] args )

 {

  MultipliersDemo md = new MultipliersDemo();

  BDChannel bdc = new BDChannel();

  BDChannel bdc2 = new BDChannel();

  md.fun2( bdc2 );

  md.fun1( bdc1, bdc2 );

  Console.ReadLine();

 }

}

 

Приведённый код должен выдать ответ, в котором печатаются числа от 0 до 9 в функциях fun2 и fun3 (распечатанные числа не обязательно должны быть упорядоченными, более того порядок может меняться при каждом запуске, т.к. все действия производятся параллельно):

 

fun2 is running...

fun3 is running...

fun3: 0

fun3: 1

fun3: 2

fun3: 3

fun2: 0

fun2: 1

fun2: 2

fun2: 3

fun2: 4

fun2: 5

fun2: 6

fun2: 7

fun2: 8

fun2: 9

fun3: 4

fun3: 5

fun3: 6

fun3: 7

fun3: 8

fun3: 9

 

Создание пользовательских каналов

 

В языке MC# можно создавать свои собственные каналы, наследуясь от уже созданных каналов и  добавляя или изменяя логику проверки, хранения и пересылки данных. Пользовательские однонаправленные каналы также могут участвовать в связках, как и встроенные каналы. Пользовательские каналы также можно передавать с одного узла на другой – все необходимые действия, связанные с локальной регистрацией каналов уже реализованы в классе Channel.

Фильтрация данных

Иногда необходимо предотвратить поступление каких-либо данных по каналу. Это можно сделать вручную, т.е. постфактум, когда данные уже поступили на узел, где впервые был зарегистрирован канал и производится чтение из этого канала. Но в таком случае происходит ненужное срабатывание связок.

Чтобы предотвратить это в языке MC# имеется возможность создавать свои собственные каналы, в которых проверка данных происходит ещё при отправке данных. Если данные не удовлетворяют определённым критериям, то данные не отсылаются, и, следовательно, не происходит срабатывания связок.

Следующий пример демонстрирует создание, и использование однонаправленного канала, фильтрующего все  отрицательные числа:

 

[Serializable]

public class ConditionalChannel : Channel

{

 public override bool CheckValues( object parameters )

 {

  return (int) parameters >= 0 ? true : false;

 }

}

// ...

int Get() & ConditionalChannel cc( int x ) { return x; }

 

public void Test()

{

 Console.WriteLine( cc.Send( -1 ) );

 Console.WriteLine( Get() ); // поток "приостанавливается"

}

 

В данном фрагменте связка с Get не будет задействована при вызове метода cc.Send и метод Send должен возвратить значение false. Аналогично можно создавать двунаправленные каналы, которые будут производить аналогичную фильтрацию:

 

[Serializable]

public class ConditionalBDChannel : BDChannel

{

 public override bool CheckValues( object parameters )

 {

  return (int) parameters >= 0 ? true : false;

 }

}

 

// ...

public void Test()

{

 ConditionalBDChannel cbdc = new ConditionalBDChannel();

 Console.WriteLine( cbdc.Send( 1 ) ); // печатает "true"

 Console.WriteLine( cbdc.Send( -1 ) ); // печатает "false"

 Console.WriteLine( cbdc.Receive() ); // печатает "1"

 Console.WriteLine( cbdc.Receive() ); // очередь пуста, поток "блокируется"

}

 

Пример аккумулирующего потока

Потоки по мере своей работы могут накапливать определённую статистическую информацию. Например, это может быть подсчёт количества проходящих сообщений, размера переданных данных. Для этих целей в классе Channel есть четыре виртуальных метода, которые можно переопределить:

-         BeforeReceive – вызывается перед тем, как начнёт исполнение метод Receive;

-         AfterReceive – вызывается после отработки метода Receive;

-         BeforeSend – вызывается перед тем, как начнёт исполнение метод Send;

-         AfterSend – вызывается после отработки метода Send.

Приведём фрагмент кода, демонстрирующий использование метода AfterReceive для подсчёта количества “чтений” из канала:

 

[Serializable]

public class StatisticChannel : Channel

{

 private int pSum = 0;

 public int Sum

 {

  get

  {

   return pSum;

  }

 }

 public override object AfterReceive( object parameters )

 {

  pSum++;

  return parameters;

 }

}

 

// ...

 

StatisticChannel sc( int i )

{

 Console.WriteLine( “Received “ + i );

}

 

public void test()

{

 for ( int i = 0; i < 5; i++) sc( i );

 Console.WriteLine( “Total packets received: “ + sc.Sum );

}

 

При вызове функции test должно распечататься следующее:

 

Received 0

Received 1

Received 2

Received 3

Received 4

Received 5

Total packets received: 5

 

Каналы с приоритетами

В данном разделе демонстрируется пример эмулирования каналов с приоритетами с помощью связок на примере задачи Санта Клауса.

 

Формулировка задачи

Первоначально сформулированная Trono [TRON], задача Санта Клауса является интересным (и забавным) упражнением в параллельном программировании, которое привлекает к себе внимание чаще, чем традиционные задачи на взаимное исключение, т.к. оно включает в себя три вида процессов, а также некоторое количество процессов для взаимодействия. Задача может быть сформулирована следующим образом:

 

Санта Клаус постоянно спит, до тех пор, пока не будет разбужен либо всеми девятью северными оленями, вернувшимися из своего отпуска, либо любыми тремя эльфами из его десяти эльфов. Если Санту разбудили олени, то он запрягает их всех в свои сани, доставляет игрушки и, наконец, распрягает оленей (отправляя их обратно в отпуск). Если же его разбудила группа эльфов, он провожает каждого из них в свой кабинет, консультируется с ними по поводу игрушек. После этого он выводит каждого эльфа из своего кабинета (отправляя их обратно работать).

Санта должен отдавать приоритет оленям, в случае если его ожидают сразу и олени, и группа эльфов.

 

Ошибки, с которыми легко можно столкнуться, пытаясь решить эту задачу, включают случаи, когда не учитывается, что лишние эльфы могут проникнуть в группу эльфов как раз в тот момент, когда Санта начал провожать её в свой кабинет. Или, например, случай, когда Санта может отправиться развозить игрушки, в то время, когда некоторые из оленей всё ещё дожидаются его на стоянке. Ben-Ari [BENA] указывая на ошибку (второго вида) в первоначальном решении Trono, базирующемся на семафорах, показал, как задача может быть аккуратно решена с использованием параллельных примитивов языка Ada, и сравнил его с менее эллегантным и менее эффективным решением на языке Java.

 

Вспомогательный класс

Мы начнём с описания класса nway, который позволит одному потоку синхронизироваться с другими потоками. Поток может вызвать метод acceptn( m ) какого-либо экземпляра класса nway, после чего будет заблокирован до тех пор, пока m других потоков не вызовут метод entry(). В свою очередь, вызовы метода entry() будут блокироваться до тех пор,

пока не будет вызван метод acceptn( m ).

 

public class nway

{

 public void entry() & private async tokens( int n )

 {

  if ( n == 1 ) allgone();

  else tokens( n - 1 );

 }

 

 public void acceptn( int n )

 {

  tokens( n );

  wait();

 }

 

 private void wait() & private async allgone() {}

}

 

 

Определение класса nway демонстрирует обычный подход в Polyphonic C# и MC# - использование приватных сообщений для фиксации состояния объекта. Вызов метода acceptn( m ) генерирует новое приватное сообщение tokens( m ). Каждый вызов метода entry() ожидает появление сообщения tokens( k ), после появления которого генерируется новое сообщение вида tokens( k - 1 ) или allgone(). Сообщение allgone() при срабатывания связки wait/allgone снимает блок с вызова метода wait().

 

Основное решение

Мы будем использовать экземпляры класса nway для синхронизации Санты со всеми оленями на стадии, когда их запрягают и распрягают, и со всеми эльфами в группе в то время, когда их провожают в офис и из офиса:

 

static nway harness = new nway();

static nway unharness = new nway();

static nway roomin = new nway();

static nway roomout = new nway();

 

Теперь, каждый из потоков-эльфов исполняет следующий код:

 

while ( true )

{

 Work();             // Работаем пока не столкнёмся с какой-нибудь проблемой

 elfqueue();         // Попытаемся присоединиться к группе из 3-х эльфов

 roomin.entry();     // Ждём пока Санта пригласит к себе в кабинет

 ConsultWithSanta(); // Консультируемся до тех пор, пока не решим проблему

 roomout.entry();    // Санта провожает эльфа

}

 

Вызов метода elfqueue() используется для контроля над объединением эльфов в группы по трое и пробуждением Санты Клауса. Он синхронизируется с другим приватным сообщением, содержащим информацию о количестве ожидающих эльфов:

 

static void elfqueue() & static async elveswaiting( int e )

{

 if ( e == 2 ) elvesready(); // Это третий эльф в группе

 else elveswaiting( e + 1 );

}

 

Первоначально поступает сообщение elveswaiting( 0 ). Каждый из эльфов, вызывая метод elfqueue(), будет ожидать поступления сообщения elveswaiting( k ) после чего либо отправит сообщение elveswaiting( k + 1 ), либо elvesready(), что послужит сигналом для пробуждения Санты. Заметим, что в последнем случае сообщение elveswaiting() не посылается - другие эльфы, вызвавшие метод elfqueue() будут заблокированы, до тех пор, пока текущую группу не проводят в офис.

Код для потоков-оленей будет приблизительно тем же, что и для эльфов. Одно сообщение reinwaiting( k ) подсчитывает сколько оленей уже вернулось из отпуска, в то время как сообщение reindeerready() используется для уведомления Санты, что все они вернулись:

 

// Каждый поток-олень выполняет этот код

while ( true )

{

 Holiday();         // Пока не надоест - отдыхаем

 reindeerback();    // Присоединяемся к группе, ожидающей Санту

 harness.entry();   // Ждём, пока Санта запрягает оленя в сани

 DeliverToys();     // Доставляем игрушки

 unharness.entry(); // Ждём, пока Санта распрягает оленя

}

 

static void reindeerback() & static async reinwaiting(int r)

{

 if ( r == 8 ) reindeerready(); // Последний олень прибыл

 else reinwaiting( r + 1 );

}

 

Санта просто исполняет следующее:

 

while ( true )

{

 waittobewoken();

}

 

где синхронный метод waittobewoken() определяется двумя связками, одна из которых синхронизирована с reindeerready(), а другая с elvesready():

 

static void waittobewoken() & static async reindeerready()

{

 harness.acceptn(9);   // Запрягаем все девять оленей

 reinwaiting(0);       // Обнуляем счётчик ожидающих оленей

 DeliverToys();        // Доставляем подарки на оленях

 unharness.acceptn(9); // Отпускаем всех оленей

}

 

static void waittobewoken() & static async elvesready()

{

 roomin.acceptn(3);  // Пригласим трёх эльфов в офис

 elveswaiting(0);    // теперь позволим всем остальным

                     // присоединиться к группе эльфов

 ConsultWithElves(); // Консультируемся, пока не решим проблему

 roomout.acceptn(3); // Провожаем эльфов

}

 

Каждое из тел связок содержит соответствующие действия, включая сброс счётчика ожидающих эльфов или оленей. Место размещения вызова reinwaiting(0) не является критическим - можно поместить его где угодно вне связок или даже рядом с вызовом reindeerready() в связке reindeerback(). С другой стороны, чтобы предотвратить ошибку, связанную с изменением размера очереди, нельзя вызывать elveswaiting( 0 ) до того момента, пока всех трёх эльфов из группы не проводят в офис. Размещение этого вызова так, как показано выше, позволяет набирать новую группу, в то время как Санта консультирует предыдущую группу.

 

Приоритеты

В текущей реализации языков Polyphonic C# и MC#, в решении, приведённом в предыдущем пункте, действительно отдаётся предпочтение потокам-оленям, как того требует формулировка задачи. Происходит это потому, что при проверке срабатывания связок связки проверяются последовательно в том порядке, в котором они описаны в исходном коде. А в нашем случае связка соответствующая waittobewoken() вместе с reindeerready() предшествует связке с методом elvesready(). Таким образом, если сообщение elvesready() стояло бы первым, которое отвечает за возобновление работы потока Санты, и если бы появилось сообщение reindeerready() когда поток-Санта только начинает своё исполнение, то будет исполнена связка с оленями. Такое поведение программы наблюдается на практике, если заменить порядок следования связок.

 

Полагаться на алгоритмы проверки срабатывания связок не приемлемо: они не являются частью официальной семантики языков и версия компилятора, пытающегося применить оптимизирующие алгоритмы, наверняка не будет следовать порядку описания связок в исходном тексте. Более того, такой подход неуклюж и является незащищённым от ошибок способом контроля над приоритетами, а в более сложных случаях может и не быть текстовой зависимости, с помощью которой можно получить требуемые приоритеты.

 

К счастью, программная реализация приоритетов довольно легка. Для этого мы введём одно новое сообщение reindeernotready() и добавим его для синхронизации между Сантой и собравшейся группой эльфов:

 

static void waittobewoken()

 & static async elvesready()

 & static async reindeernotready()

{

 reindeernotready();

 // Остальная часть тела функции, описанной выше

}

 

Мы принимаем сообщение reindeernotready() когда олени прибыли:

 

static void reindeerback() & static async reinwaiting( int r )

{

 if ( r == 8 )

 {

  clearreindeernotready();

  reindeerready();

 }

 else reinwaiting( r + 1 );

}

 

 // новая связка

static void clearreindeernotready() & static async reindeernotready() {}

 

и добавим отправку сообщения reindeernotready() в инициализирующий код и в код потоков Санты/оленей, который теперь будет выглядеть следующим образом:

 

static void waittobewoken() & static async reindeerready() {

 harness.acceptn( 9 );

 reindeernotready();

 reinwaiting( 0 );

 DeliverToys();

 unharness.acceptn( 9 );

}

 

Эти дополнительные семь строчек - это всё, что нам надо для того, чтобы контролировать приоритеты вручную. Приведённый выше пример решения задачи Санта-Клауса является локальным.

В более сложном случае, если, допустим, мы захотим, чтобы потоки эльфов или оленей запускались на других узлах, то придётся добавить ещё несколько связок с использованием SynchronizedChannel. Полный текст данной программы, в которой потоки-олени и эльфы реализованы как movable-методы, приведён в приложении [ SantaDistributed.mcs ].

 

Обзор других языков, использующих конструкцию “канал” для взаимодействия процессов/объектов

 

Теоретической основой всех языков программирования с обменом сообщениями по каналам является p-исчисление [PICA]. Фактически, p-исчисление – это математическая модель процессов, взаимодействующих между собой и его использование не ограничивается только программами. Простейшей операцией в p-исчислении является передача канала от одного процесса к другому, принимающая сторона после получения канала может использовать в дальнейшем этот канал для взаимодействия с другими процессами. Это свойство, а также то, что исчисление является число математическим языком для описания процессов, позволяет описывать любые процессы, в которых некоторые затрагиваемые ресурсы изменяются с течением времени.

На базе p-исчисления было создано много других исчислений и языков программирования: PICT, Facile, Join, Ambients, Spi, POOL и т.д. Интересным проектом является TYped Concurrent Objects [TYCO] – расширение p-исчисления с добавлением распределённых структур и рекурсии (вместо репликации p-исчисления). В основном, все эти языки заимствуют семантику или центральные аспекты p-исчисления и нацелены на определённую область приложений.

Существует довольно много диалектов языка ML, которые поддерживают каналы. Из них основными являются JoCaml [JOCA] и Nomadic Pict. Язык JoCaml является функциональным аналогом языка Polyphonic C#. Каналы в нём имеют свои “уникальные” имена. Любой процесс, который узнал имя канала, может послать по нему сообщение. Для каждого канала создаётся специальный процесс, который следит за поступающими сообщениями. Как только приходит сообщение, запускается копия процесса и производится обслуживание сообщения. Аналогично языку MC# в JoCaml имеются средства для синхронизации потоков данных и их перенаправления из одного канала в другой.

С одной стороны в языке JoCaml имеется возможность вызвать какой-либо метод на определённом узле,  что отсутствует в языке MC# (Runtime система сама решает, на каком из узлов должен исполняться данный movable-метод). В JoCaml также имеются встроенные средства для обнаружения сбоев в программе на определённом узле, что так же на данный момент отсутствует в MC# (аналогичные средства появятся в следующих версиях Runtime-системы). С другой стороны недостатком языка JoCaml является отсутствие автоматической балансировки нагрузки между узлами – её должен реализовывать сам программист в каждой отдельной задаче, что само по себе является нетривиальной задачей. К тому же, язык MC# базируется на языке C#, т.е. он может использовать многочисленные библиотеки, написанные на любом из языков, поддерживаемых платформой .Net.


Список литературы

 

[MCSH] – Официальный сайт языка программирования MC#

http://u.pereslavl.ru/~vadim/MCSharp

 

[MCS2]MC#: расширение языка C# для программирования на кластерных и GRID-архитектурах, Технологии C# и .NET '2003, 1-ая Международная конференция технологий C# и .NET   по Алгоритмам, Компьютерной Графике, Визуализации, Распределённым и WEB вычислениям,  Гузев В. Б., Сердюк Ю. П., Чудинов А. М.,  Plzen, Czech Republic.

ISBN 80-903100-3-6

Web-версия: http://u.pereslavl.ru/~vadim/MCSharp/docs/index.ru.html

 

[TSYS] – Официальный сайт проекта SKIF

http://skif.pereslavl.ru

 

[POLY] – Официальный сайт языка программирования Polyphonic C#

http://research.microsoft.com/~nick/polyphony/

 

[CALC] – Calculi for Mobile Processes – сайт, посвящённый p-исчислению.

http://move.to/mobility

 

[PICA] – Joachim Parrow, An introduction to p-Calculus, - chapter to appear in Handbook of Process Algebra, ed. Bergstra, Ponse and Smolka, Elsevier

 

[MODE] – N. Benton, L. Cardelli, C. Fournet "Modern concurrency abstractions for C#", - draft submitted to ACM Transactions on Programming Languages and Systems, July 2002.

 

[ABRA] – С.М.Абрамов, А.И. Адамович "Т-система - среда программирования с поддержкой автоматического динамического распараллеливания программ", - В сб. "Программные системы: Теоретические основы и приложения", под ред. А.К. Айламазяна. М.: Наука. Физматлит, 1999, стр. 201 - 213.

 

[JOCA] – Cedric Fournet, Fabrice Fessant, Luc Maranget, Alan Smith, “JoCaml: a Language for Concurrent Distributed And Mobile Programming”

 

[TYCO] – Сайт проекта TYped Concurrent Objects (TYCO)

http://www.di.fc.ul.pt/~vv/tyco.html

 

[TRON] J. A. Trono. A new exercise in concurrency. SIGCSE Bulletin, 26(3):8-10,     1994. Corrigendum: 26(4):63.

 

[SANT] Nick Benton. Jingle Bells: Solving the Santa Claus Problem in Polyphonic C#, March 20, 2003

 

[BENA] M. Ben-Ari. How to solve the Santa Claus problem. Concurrency: Practice & Experience, 10(6):485-496, 1998.

 

[ECMA] ECMA. Standard ECMA-334: C# Language Specification, December 2001.

 


Приложение

 

SantaDistributed.mcs

 

using System;

using System.Threading;

 

public class nway

{

 public void entry() & async tokens( int n )

 {

  if ( n == 1 ) allgone();

  else tokens( n - 1 );

 }

 

 public void acceptn( int n )

 {

  tokens( n );

  wait();

 }

 

 void wait() & async allgone() {}

}

 

public class SantaDemo

{

 [NonSerialized] nway harness = new nway();

 [NonSerialized] nway unharness = new nway();

 [NonSerialized] nway roomin = new nway();

 [NonSerialized] nway roomout = new nway();

 

 public void Work()

 {

  Console.WriteLine( "Elf is working..." );

  Thread.Sleep( 1000 );

 }

 

 public void Holiday()

 {

  Console.WriteLine( "Going on holiday..." );

  Thread.Sleep( 1000 );

 }

 

 public void ConsultWithElves()

 {

  Console.WriteLine( "Consulting with elves..." );

  Thread.Sleep( 500 );

 }

 

 public void ConsultWithSanta()

 {

  Console.WriteLine( "Consulting with Santa..." );

  Thread.Sleep( 500 );

 }

 

 movable Elf( SynchronizedChannel elfqueueChannel,

              SynchronizedChannel roominChannel,

              SynchronizedChannel roomoutChannel )

 {

  while ( true )

  {

   Work();             // Работаем пока не столкнёмся с какой-нибудь проблемой

   elfqueueChannel();  // Попытаемся присоединиться к группе из 3-х эльфов

   roominChannel();    // Ждём пока Санта пригласит к себе в кабинет

   ConsultWithSanta(); // Консультируемся до тех пор, пока не решим проблему

   roomoutChannel();   // Санта провожает эльфа

  }

 }

 

 movable Reindeer( SynchronizedChannel reindeerbackChannel,

                   SynchronizedChannel harnessChannel,

                   SynchronizedChannel unharnessChannel )

 {

  while ( true )

  {

   Holiday();             // Пока не надоест - отдыхаем

   reindeerbackChannel(); // Присоединяемся к группе, ожидающей Санту

   harnessChannel();      // Ждём, пока Санта запрягает оленя в сани

   DeliverToys();         // Доставляем игрушки

   unharnessChannel();    // Ждём, пока Санта распрягает оленя

  }

 }

 

 async Santa()

 {

  // Пусть Санта для примера обслужит только 1000 вызовов

  for ( int i = 0; i < 1000; i++ )

   waittobewoken();

  done();

 }

 

 void waittobewoken() & async elvesready()

                      & async reindeernotready()

 {

  reindeernotready();

  roomin.acceptn(3);  // Пригласим трёх эльфов в офис

  elveswaiting(0);    // теперь позволим всем остальным

                      // присоединиться к группе эльфов

  ConsultWithElves(); // Консультируемся, пока не решим проблему

  roomout.acceptn(3); // Провожаем эльфов   

 }

 

 void waittobewoken() & async reindeerready()

 {

  harness.acceptn(9);   // Запрягаем все девять оленей

  reindeernotready();

  reinwaiting(0);       // Обнуляем счётчик ожидающих оленей

  DeliverToys();        // Доставляем подарки на оленях

  unharness.acceptn(9); // Отпускаем всех оленей

 }

 

 SynchronizedChannel reindeerbackChannel() & async reinwaiting( int r )

 {

  if ( r == 8 )

  { // Последний олень прибыл

   clearreindeernotready();

   reindeerready();

  }

  else reinwaiting( r + 1 );

 }

 

 void clearreindeernotready() & async reindeernotready() {}

 

 SynchronizedChannel elfqueueChannel() & Channel elveswaiting( int e )

 {

  if ( e == 2 ) elvesready(); // Это третий эльф в группе

  else elveswaiting( e + 1 );

 }

 

 void Wait() & async done() {}

 

 SynchronizedChannel roominChannel()

 {

  roomin.entry();

 }

 

 SynchronizedChannel roomoutChannel()

 {

  roomout.entry();

 }

 

 SynchronizedChannel harnessChannel()

 {

  harness.entry();

 }

 

 SynchronizedChannel unharnessChannel()

 {

  unharness.entry();

 }

 

 public static void Main( string[] args )

 {

  SantaDemo sd = new SantaDemo();

  sd.elveswaiting( 0 );

  sd.reinwaiting( 0 );

  sd.reindeernotready();

  sd.Santa();

  for ( int i = 0; i < 10; i++ )

   sd.Elf( sd.elfqueueChannel, sd.roominChannel, sd.roomoutChannel );

  for ( int i = 0; i < 9; i++ )

   sd.Reindeer( sd.reindeerbackChannel, sd.harnessChannel,

                sd.unharnessChannel );

  sd.Wait();

 }

}

 


Весь Переславль