Задача производителя потребителя
Сейчас мы рассмотрим одну из общих задач параллельных вычислений — задачу производителя/потребителя. Вот ее обобщенная формулировка. Имеются один или несколько производителей, генерирующих данные некоторого типа (записи, символы и т.п.) и помещающих их в буфер, а также единственный потребитель, который извлекает помещенные в буфер элементы по одному. Требуется защитить систему от перекрытия операций с буфером, т.е. обеспечить, чтобы одновременно получить доступ к буферу мог только один процесс (производитель или потребитель). Мы рассмотрим несколько решений этой задачи, с тем чтобы проиллюстрировать как мощь семафоров, так и встречающиеся при их использовании ловушки.
Для начала предположим, что буфер бесконечен и представляет собой линейный массив элементов. Говоря абстрактно, мы можем определить функции производителя и потребителя следующим образом:
/* Производитель */
while(true)
{
/* Производство элемента v */;
b[in] = v;
in++;
} |
/* Потребитель */
while(true)
{
while (in <= out)
/* Бездействие*/
w = b[out];
out++;
/* Потребление w*/
} |
На рис. 5.4 показана структура буфера b. Производитель может генерировать данные и сохранять их в буфере со своей индивидуальной скоростью. Всякий раз при сохранении увеличивается индекс in. Потребитель поступает аналогично, с тем отличием, что он не должен считывать данные из пустого буфера. Следовательно, перед выполнением считывания он должен убедиться, что производитель обогнал его (in > out).
Примечание. Занятая часть буфера заштрихована
Рис. 5.4. Бесконечный буфер задачи производитель/потребитель
Попытаемся реализовать нашу систему с использованием бинарных семафоров. В листинге 5.8 приведена первая попытка реализации. Вместо работы с индексами in и out мы можем просто отслеживать количество элементов в буфере посредством целой переменной n = in - out. Для осуществления взаимного исключения используется семафор s; семафор delay применяется для ожидания потребителя при пустом буфере.
Листинг 5.8. [Неверное] решение задачи производитель/потребитель с использованием бинарных семафоров
int n;
binary_semaphore s = 1;
binary_semaphore delay = 0;
void producer ()
{
while(true)
{
produce();
waitB (s);
append();
n++;
if (n == 1) signals(delay);
signals (s);
}
}
void consumer()
{
waitB(delay);
while(true)
{
waitB (s);
take();
n--;
signalB(s);
consume();
if (n == 0) waitB(delay);
}
}
void main()
{
n = 0;
parbegin (producer, consumer) ;
}
Решение представляется достаточно простым и очевидным. Производитель может добавлять данные в буфер в любой момент времени. Перед добавлением он выполняет waitB (s), а после добавления— signalB (s), чтобы предотвратить обращение к буферу других производителей или потребителя на все время операции добавления данных в буфер. Кроме того, при работе в критическом разделе производитель увеличивает значение п. Если n = 1, то перед этим добавлением данных в буфер он был пуст, так что производитель выполняет signalB (delay), чтобы сообщить об этом потребителю. Потребитель начинает с ожидания производства первого элемента, используя вызов waitB (delay). Затем потребитель получает данные из буфера и уменьшает значение n в своем критическом разделе. Если производители опережают потребителя (достаточно распространенная ситуация), то потребитель будет редко блокирован семафором delay, поскольку п обычно положительно. Следовательно, благополучно работают и производитель, и потребитель.
Тем не менее в предложенной программе имеется один изъян. Когда потребитель исчерпывает буфер, он должен сбросить семафор delay с помощью инструкции if (n == 0) waits (delay) ;, чтобы дождаться размещения данных в буфере производителем. Рассмотрим сценарий, приведенный в табл. 5.2. В строке 14 потребитель не выполняет операцию waitB. Он действительно исчерпал буфер и установил п равным 0 в строке 8, но до проверки значения п в строке 14 оно было изменено производителем. В результате signals не соответствует предшествующему waitB. Значение п, равное -1 в строке 20, означает, что потребитель пытается извлечь из буфера несуществующий элемент. Простое перемещение проверки в критический раздел потребителя недопустимо, так как может привести к взаимоблокировке (например, после строки 8).
Решение проблемы заключается во введении вспомогательной переменной, значение которой присваивается в критическом разделе и используется вне его, как показано в листинге 5.9. Внимательно рассмотрев приведенный код, вы убедитесь в отсутствии возможных взаимоблокировок.
Глава 5. Параллельные вычисления: взаимоисключения... 275 |
Таблица 5.2. Возможный сценарий работы программы из листинга
Листинг 5.9. Верное решение задачи производитель/потребитель с использованием бинарных семафоров
int n;
binary_semaphore s = 1;
binary_semaphore delay = 0;
void producer()
{
while(true)
{
proceduce();
waitB(s);
append();
n++;
if (n == 1) signalB(delay);
signalB(s);
}
}
void consumer()
{
int m; /* Локальная переменная */
waitB(delay);
while(true)
{
waitB(s);
take ();
n--;
m = n;
signalB(s);
consume();
if (m == 0) waitB(delay);
}
}
void main()
{
n = 0;
parbegin(producer,consumer);
}
Несколько более простое решение (приведенное в листинге 5.10) можно получить при использовании обобщенных семафоров (именуемых также семафорами со счетчиками). Переменная n в этом случае является семафором; ее значение остается равным количеству элементов в буфере. Предположим теперь, что при переписывании этой программы произошла ошибка, и операции signal (s) и signal (n) оказались взаимозамененными. Это может привести к тому, что операция signal (n) будет выполняться в критическом разделе производителя без прерывания потребителя или другого производителя. Повлияет ли это на выполнение программы? Нет, поскольку потребитель в любом случае должен ожидать установки обоих семафоров перед продолжением работы.
Листинг 5.10. Решение задачи производитель/потребитель с использованием семафоров
semaphore n = 0;
semaphore s = 1;
void producer ()
{
while(true)
{
produce ();
wait(s) ;
append();
signal(s);
signal(n);
}
}
void consumer ()
{
while(true)
{
wait(n);
wait (s);
take ();
signal (s);
consume();
}
}
void main ()
{
parbegin(producer, consumer);
}
Теперь предположим, что взаимозаменены операции wait(n) и wait(s). Это приведет к фатальным последствиям. Если пользователь войдет в критический раздел, когда буфер пуст (n.count = 0), то ни один производитель не сможет добавить данные в буфер и система окажется в состоянии взаимной блокировки. Это хороший пример тонкости работы с семафорами и сложности корректной разработки параллельно работающих процессов.
А теперь добавим к нашей задаче новое, достаточно реалистичное ограничение — конечность буфера. Буфер рассматривается нами как циклическое хранилище (см. рис. 5.5), при работе с которым значения указателей должны выражаться по модулю размера буфера. При этом выполняются следующие условия.
Блокировка |
Деблокировка |
Производитель: вставка в полный буфер |
Потребитель: удаление элемента из буфера |
Потребитель: удаление из пустого буфера |
Производитель: вставка элемента в буфер |