/// <summary>
/// Пара потоков «наперегонки» заполняет одну очередь.
/// Эти потоки синхронизуются посредством критичесих секций кода,
/// связанных с разделяемым ресурсом - общей очередью.
/// Третий поток читает из этой очереди.
/// Этот поток синхронизуется посредством монитора.
/// Методы Enter(...) и Exit(...) обеспечивают вход в критическую секцию кода,
/// связанную с конкретным разделяемым объектом и тем самым блокируют
/// одновременное выполнение какого-либо связанного с данным ресурсом кода в другом потоке.
/// Толчея потоков сопровождается генерацией исключений.
/// </summary>
using System;
using System.Threading;
using System.Collections;
namespace CommunicatingThreadsQueue
{//----------------------------------------------------------------------------------
public delegate void CallBackFromStartClass (string param);
//===================================================================================
// Данные. Предмет и основа взаимодействия двух потоков.
class CommonData
{
private int iVal;
public int iValProp
{
get{return iVal;}
set{iVal = value;}
}
public CommonData(int key)
{
iVal = key;
}
}
// Классы Sender и Receiver: основа взаимодействующих потоков.
class Sender
{
Queue cdQueue;
CallBackFromStartClass callBack;
int threadIndex;
// Конструктор...
public Sender(ref Queue queueKey, CallBackFromStartClass cbKey, int iKey)
{
cdQueue = queueKey;
callBack = cbKey;
threadIndex = iKey;
}
public void startSender()
{
DoIt();
}
// Тело рабочей функции...
public void DoIt()
{//===============================================================
Console.WriteLine(“Sender{0}.DoIt()”, threadIndex);
int i;
for (i = 0; i < 100; i++)
{//================================================================
try
{
lock(cdQueue.SyncRoot)
{//__________________________________________________________________
Console.WriteLine(“Sender{0}.”, threadIndex);
cdQueue.Enqueue(new CommonData(i));
Console.WriteLine(“>> Sender{0} >> {1}.”, threadIndex,cdQueue.Count);
foreach(CommonData cd in cdQueue)
{
Console.Write(“\rS{0}:{1} “, threadIndex, cd.iValProp);
}
Console.WriteLine(“__ Sender{0} __”, threadIndex);
}//__________________________________________________________________
}
catch (ThreadAbortException e)
{
Console.WriteLine(“~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~”);
Console.WriteLine(“AbortException from Sender{0}.”, threadIndex);
Console.WriteLine(e.ToString());
Console.WriteLine(“~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~”);
}
catch (Exception e)
{
Console.WriteLine(“__________________________________________________”);
Console.WriteLine(“Exception from Sender{0}.”, threadIndex);
Console.WriteLine(e.ToString());
Console.WriteLine(“__________________________________________________”);
callBack(threadIndex.ToString());
}
}//================================================================
callBack(string.Format(“Sender{0}”,threadIndex.ToString()));
}//================================================================
}
class Receiver
{
Queue cdQueue;
CallBackFromStartClass callBack;
int threadIndex;
// Конструктор...
public Receiver(ref Queue queueKey, CallBackFromStartClass cbKey, int iKey)
{
cdQueue = queueKey;
callBack = cbKey;
threadIndex = iKey;
}
public void startReceiver()
{
DoIt();
}
// Тело рабочей функции...
public void DoIt()
{
Console.WriteLine(“Receiver.DoIt()”);
int i = 0;
CommonData cd;
while (i < 200)
{
try
{
Monitor.Enter(cdQueue.SyncRoot);
Console.WriteLine(“Receiver.”);
if (cdQueue.Count > 0)
{
cd = (CommonData)cdQueue.Dequeue();
Console.WriteLine(“Receiver.current:{0},in queue:{1}.”, cd.iValProp,cdQueue.Count);
foreach(CommonData cdW in cdQueue)
{
Console.Write(“\rR:{0}.”, cdW.iValProp);
}
Console.WriteLine(“__ Receiver __”);
i++;
}
Monitor.Exit(cdQueue.SyncRoot);
}
catch (ThreadAbortException e)
{
Console.WriteLine(“~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~”);
Console.WriteLine(“AbortException from Receiver.”);
Console.WriteLine(e.ToString());
Console.WriteLine(“~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~”);
}
catch (Exception e)
{
Console.WriteLine(“__________________________________________________”);
Console.WriteLine(“Exception from Receiver.”);
Console.WriteLine(e.ToString());
Console.WriteLine(“__________________________________________________”);
callBack(threadIndex.ToString());
}
}
callBack(“Receiver”);
}
}
class StartClass
{
Thread th0, th1, th2;
Queue queueX;
string[] report = new string[3];
ThreadStart t0, t1, t2;
Sender sender0;
Sender sender1;
Receiver receiver;
static void Main(string[] args)
{
StartClass sc = new StartClass();
sc.go();
}
void go()
{
// Простая очередь.
// queueX = new Queue();
// Синхронизированная очередь. Строится на основе простой очереди.
// Свойство синхронизированности дополнительно устанавливается в true
// посредством метода Synchronized.
queueX = Queue.Synchronized(new Queue());
// Но на самом деле никакой разницы между двумя версиями очереди
// (между несинхронизированной очередью и синхронизирванной оболочкой вокруг
// несинхронизированной очереди) мною замечено не было. И в том и в другом
// случае соответствующий код, который обеспечивает перебор элементов очереди
// должен быть закрыт посредством lock блока, с явным указанием ссылки на
// объект синхронизации.
sender0 = new Sender(ref queueX, new CallBackFromStartClass(StopMain), 0);
sender1 = new Sender(ref queueX, new CallBackFromStartClass(StopMain), 1);
receiver = new Receiver(ref queueX, new CallBackFromStartClass(StopMain), 2);
// Стартовые функции потоков должны соответствовать сигнатуре
// класса делегата ThreadStart. Поэтому они не имеют параметров.
t0 = new ThreadStart(sender0.startSender);
t1 = new ThreadStart(sender1.startSender);
t2 = new ThreadStart(receiver.startReceiver);
// Созданы вторичные потоки.
th0 = new Thread(t0);
th1 = new Thread(t1);
th2 = new Thread(t2);
th0.Start();
th1.Start();
th2.Start();
th0.Join();
th1.Join();
th2.Join();
Console.WriteLine
(“Main(): “ + report[0] + “...” + report[1] + “...” + report[2] + «... Bye.»);
}
// Функция-член класса StartClass выполняется во ВТОРИЧНОМ потоке!
public void StopMain(string param)
{
Console.WriteLine(«StopMain: « + param);
// Остановка рабочих потоков. Её выполняет функция-член
// класса StartClass. Этой функции в силу своего определения
// известно ВСЁ о вторичных потоках. Но выполняется она
// в ЧУЖИХ (вторичных) потоках.
if (param.Equals(“Sender0”))
{
report[0] = “Sender0 all did.”;
th0.Abort();
}
if (param.Equals(“Sender1”))
{
report[1] = “Sender1 all did.”;
th1.Abort();
}
if (param.Equals(“Receiver”))
{
report[2] = “Receiver all did.”;
th2.Abort();
}
if (param.Equals(“0”))
{
th1.Abort();
th2.Abort();
th0.Abort();
}
if (param.Equals(“1”))
{
th0.Abort();
th2.Abort();
th1.Abort();
}
if (param.Equals(“2”))
{
th0.Abort();
th1.Abort();
th2.Abort();
}
// Этот оператор не выполняется! Поток, в котором выполняется
// метод-член класса StartClass StopMain() остановлен.
Console.WriteLine(“StopMain(): bye.”);
}
}
}//----------------------------------------------------------------------------------