stringtranslate.com

Проблема производитель-потребитель

В вычислительной технике проблема производителя -потребителя (также известная как проблема ограниченного буфера ) представляет собой семейство проблем, описанных Эдсгером В. Дейкстрой с 1965 года.

Дейкстра нашел решение проблемы производителя-потребителя, работая консультантом на компьютерах Electrologica X1 и X8: «Первое использование производителя-потребителя было частично программным, частично аппаратным: компонент, отвечающий за транспортировку информации между хранилищем и периферийным устройством, назывался «каналом»… Синхронизация контролировалась двумя счетными семафорами в том, что мы теперь знаем как схему производителя/потребителя: один семафор, указывающий длину очереди, увеличивался (в V) ЦП и уменьшался (в P) каналом, другой, подсчитывающий количество неподтвержденных завершений, увеличивался каналом и уменьшался ЦП. [Второй семафор, будучи положительным, поднимал соответствующий флаг прерывания.]» [1]

Дейкстра писал о случае неограниченного буфера: «Мы рассматриваем два процесса, которые называются «производителем» и «потребителем» соответственно. Производитель — это циклический процесс, и каждый раз, когда он проходит свой цикл, он производит определенную порцию информации, которая должна быть обработана потребителем. Потребитель также является циклическим процессом, и каждый раз, когда он проходит свой цикл, он может обрабатывать следующую порцию информации, которая была произведена производителем... Мы предполагаем, что два процесса соединены для этой цели через буфер с неограниченной емкостью». [2]

Он писал о случае ограниченного буфера: «Мы изучили производителя и потребителя, связанных через буфер с неограниченной емкостью... Отношение становится симметричным, если они связаны через буфер конечного размера, скажем, N порций» [3]

А о случае множественного производителя-потребителя: «Мы рассматриваем несколько пар производитель/потребитель, где пара i связана через информационный поток, содержащий n i порций. Мы предполагаем... конечный буфер, который должен содержать все порции всех потоков, имеет емкость „tot“ порций». [4]

Пер Бринч Хансен и Никлаус Вирт вскоре увидели проблему семафоров: «Я пришел к такому же выводу относительно семафоров, а именно, что они не подходят для языков более высокого уровня. Вместо этого естественными событиями синхронизации являются обмены сообщениями ». [5]

Решение ограниченного буфера Дейкстры

Исходное решение с буфером, ограниченным семафором, было написано в стиле ALGOL . Буфер может хранить N порций или элементов. Семафор «количество порций очереди» подсчитывает заполненные ячейки в буфере, семафор «количество пустых позиций» подсчитывает пустые ячейки в буфере, а семафор «манипулирование буфером» работает как мьютекс для операций буфера put и get. Если буфер заполнен, то есть количество пустых позиций равно нулю, поток-производитель будет ждать в операции P(количество пустых позиций). Если буфер пуст, то есть количество порций очереди равно нулю, поток-потребитель будет ждать в операции P(количество порций очереди). Операции V() освобождают семафоры. В качестве побочного эффекта поток может перейти из очереди ожидания в очередь готовности. Операция P() уменьшает значение семафора до нуля. Операция V() увеличивает значение семафора. [6]

begin integer количество порций в очереди , количество пустых позиций , буферная манипуляция ; количество порций в очереди : = 0 ; количество пустых позиций : = N ; буферная манипуляция : = 1 ; parbegin производитель : начать снова 1 : создать следующую порцию ; P ( количество пустых позиций ) ; P ( буферная манипуляция ) ; добавить порцию в буфер ; V ( буферная манипуляция ) ; V ( количество порций в очереди ) ; перейти снова 1 end ; потребитель : начать снова 2 : P ( количество порций в очереди ) ; P ( буферная манипуляция ) ; взять порцию из буфера ; V ( буферная манипуляция ) ; V ( количество пустых позиций ) ; обработать взятую порцию ; перейти снова 2 end parend end                                                                                 

Начиная с C++ 20, семафоры являются частью языка. Решение Дейкстры можно легко написать на современном C++. Переменная buffer_manipulation является мьютексом. Функция семафора для захвата в одном потоке и освобождения в другом потоке не нужна. Оператор lock_guard() вместо пары lock() и unlock() — это C++ RAII . Деструктор lock_guard обеспечивает освобождение блокировки в случае исключения. Это решение может обрабатывать несколько потоков-потребителей и/или несколько потоков-производителей.

#include <поток> #include <мьютекс> #include <семафор>   std :: counting_semaphore < N > количество_очередей_порций { 0 }; std :: counting_semaphore < N > количество_пустых_позиций { N } ; std :: управление_буфером_мьютекса ;   void produce () { for ( ;;) { Порция порция = produce_next_portion ( ); number_of_empty_positions.acquire ( ); { std :: lock_guard < std :: mutex > g ( buffer_manipulation ) ; add_portion_to_buffer ( part ); } number_of_queueing_portions.release (); } }                 void consumer () { for (;;) { number_of_queueing_portions.acquire ( ); Порция порция ; { std :: lock_guard < std :: mutex > g ( buffer_manipulation ) ; порция = take_portion_from_buffer (); } number_of_empty_positions.release ( ); process_portion_taken ( partion ) ; } }                  int main () { std :: thread t1 ( производитель ); std :: thread t2 ( потребитель ); t1.join ( ) ; t2.join ( ) ; }        

[ неправильный синтез? ]

Использование мониторов

Пер Бринч Хансен определил монитор: Я буду использовать термин монитор для обозначения общей переменной и набора значимых операций над ней. Цель монитора — контролировать планирование ресурсов среди отдельных процессов в соответствии с определенной политикой. [7] Тони Хоар заложил теоретическую основу для монитора. [8]

ограниченный буфер : монитор начало буфер : массив 0 .. N - 1 из порция ; голова , хвост : 0 .. N - 1 ; количество : 0 .. N ; непусто , не полно : условие ; процедура добавления ( x : порция ) ; начало если количество = N , то не полно . ожидание ; примечание 0 <= количество < N ; буфер [ хвост ] := x ; хвост := хвост ( + ) 1 ; количество := количество + 1 ; не пусто . сигнал конец добавления ; процедура удаления ( результат x : порция ) ; начало если количество = 0, то не пусто . ожидание ; примечание 0 < количество <= N ; x := буфер [ голова ] ; голова := голова ( + ) 1 ; количество := количество - 1 ; не полно . сигнал конец удаления ; голова := 0 ; хвост := 0 ; count := 0 ; конец ограниченного буфера ;                                                                                            

Монитор — это объект, содержащий переменные buffer, head, tailи countдля реализации циклического буфера , переменные условия nonemptyи nonfullдля синхронизации и методы appendи removeдля доступа к ограниченному буферу. Операция монитора wait соответствует операции семафора P или acquire, signal соответствует V или release. Обведенная операция (+) берется по модулю N. Представленный псевдокод в стиле Pascal показывает монитор Хоара. Монитор Mesa использует while countвместо if count. Версия языка программирования C++:

class Bounded_buffer { Portion buffer [ N ]; // 0..N-1 беззнаковый head , tail ; // 0..N-1 беззнаковый count ; // 0..N std :: condition_variable nonempty , nonfull ; std :: mutex mtx ; public : void append ( Portion x ) { std :: unique_lock < std :: mutex > lck ( mtx ); nonfull.wait ( lck , [ & ] { return ! ( N == count ); }); assert ( 0 <= count && count < N ); buffer [ tail ++ ] = x ; tail % = N ; ++ count ; nonempty.notify_one ( ); } Portion remove ( ) { std :: unique_lock < std :: mutex > lck ( mtx ); nonempty . wait ( lck , [ & ]{ return ! ( 0 == count ); }); assert ( 0 < count && count <= N ); Порция x = буфер [ head ++ ]; head %= N ; -- count ; nonfull.notify_one (); return x ; } Ограниченный_буфер ( ) { head = 0 ; tail = 0 ; count = 0 ; } } ;                                                                                          

Версия C++ нуждается в дополнительном мьютексе по техническим причинам. Она использует assert для обеспечения предварительных условий для операций добавления и удаления буфера.

Использование каналов

Самое первое решение производитель-потребитель в компьютерах Electrologica использовало «каналы». Хоар определил каналы : альтернативой явному именованию источника и назначения было бы наименование порта, через который должна осуществляться связь. Имена портов были бы локальными для процессов, а способ, которым пары портов должны быть соединены каналами, мог быть объявлен в заголовке параллельной команды. [9] Бринч Хансен реализовал каналы в языках программирования Joyce и Super Pascal . Язык программирования операционной системы Plan 9 Alef , язык программирования операционной системы Inferno Limbo имеют каналы. Следующий исходный код C компилируется в Plan 9 из пространства пользователя :

#include "uh" #include "libc.h" #include "thread.h"   перечисление { СТЕК = 8192 };     void produce ( void * v ) { Channel * ch = v ; for ( uint i = 1 ;; ++ i ) { sleep ( 400 ) ; print ( "p %d \n " , i ); sendul ( ch , i ); } } void consumer ( void * v ) { Channel * ch = v ; for (;;) { uint p = recvul ( ch ); print ( " \t\t c %d \n " , p ); sleep ( 200 + nrand ( 600 )); } } void threadmain ( int argc , char ** argv ) { int ( * mk )( void ( * fn )( void * ), void * arg , uint stack ); mk = threadcreate ; Channel * ch = chancreate ( sizeof ( ulong ), 1 ); mk ( production , ch , STACK ); mk ( consumer , ch , STACK ); recvp ( chancreate ( sizeof ( void * ), 0 )); threadexitsall ( 0 ); }                                                                      

Точка входа в программу находится в function threadmain. Вызов функции ch = chancreate(sizeof(ulong), 1)создает канал, вызов функции sendul(ch, i)отправляет значение в канал, а вызов функции p = recvul(ch)получает значение из канала. В языке программирования Go тоже есть каналы. Пример Go:

основной пакет импорт ( "fmt" "math/rand" "time" ) var sendMsg = 0   func produceMessage ( ) int { time.Sleep ( 400 * time.Millisecond ) sendMsg ++ fmt.Printf ( "sendMsg = %v\n" , sendMsg ) return sendMsg } func consumerMessage ( recvMsg int ) { fmt.Printf ( " \ t \trecvMsg = %v \ n " , recvMsg ) time.Sleep ( time.Duration ( 200 + rand.Intn ( 600 ) ) * time.Millisecond ) } func main ( ) { ch : = make ( chan int , 3 ) go func ( ) { for { ch < - produceMessage ( ) } } ( ) for recvMsg : = range ch { consumerMessage ( recvMsg ) } }                             

Решение Go производитель-потребитель использует основную процедуру Go для потребителя и создает новую, безымянную процедуру Go для производителя. Две процедуры Go связаны с каналом ch. Этот канал может ставить в очередь до трех значений int. Оператор ch := make(chan int, 3)создает канал, оператор ch <- produceMessage()отправляет значение в канал, а оператор recvMsg := range chполучает значение из канала. [10] Выделение ресурсов памяти, выделение ресурсов обработки и синхронизация ресурсов выполняются языком программирования автоматически.

Без семафоров и мониторов

Лесли Лэмпорт задокументировал решение производителя-потребителя с ограниченным буфером для одного производителя и одного потребителя: Мы предполагаем, что буфер может содержать не более b сообщений, b >= 1. В нашем решении мы даем k быть константой, большей b, и пусть s и r будут целочисленными переменными, принимающими значения от 0 до k-1. Мы предполагаем, что изначально s=r и буфер пуст. Выбрав k как кратное b, буфер можно реализовать как массив B [0: b - 1]. Производитель просто помещает каждое новое сообщение в B[s mod b], а потребитель берет каждое сообщение из B[r mod b]. [11] Алгоритм показан ниже, обобщенный для бесконечного k.

Производитель : L : if ( s - r ) mod k = b then goto L fi ; поместить сообщение в буфер ; s := ( s + 1 ) mod k ; goto L ; Потребитель : L : if ( s - r ) mod k = 0 then goto L fi ; взять сообщение из буфера ; r := ( r + 1 ) mod k ; goto L ;                                                    

Решение Лампорта использует активное ожидание в потоке вместо ожидания в планировщике. Это решение игнорирует влияние переключения потока планировщика в неудобное время. Если первый поток прочитал значение переменной из памяти, планировщик переключается на второй поток, который изменяет значение переменной, а планировщик переключается обратно на первый поток, после чего первый поток использует старое значение переменной, а не текущее значение. Атомарные операции чтения-изменения-записи решают эту проблему. Современный C++ предлагает atomicпеременные и операции для многопоточного программирования. Следующее решение активного ожидания C++11 для одного производителя и одного потребителя использует атомарные операции чтения-изменения-записи fetch_addи fetch_subатомарной переменной count.

enum { N = 4 }; Message buffer [ N ]; std :: atomic < unsigned > count { 0 }; void produce () { unsigned tail { 0 }; for (;;) { Message message = produceMessage (); while ( N == count ) ; // занято ожидание buffer [ tail ++ ] = message ; tail %= N ; count . fetch_add ( 1 , std :: memory_order_relaxed ); } } void consumer () { unsigned head { 0 }; for (;;) { while ( 0 == count ) ; // занято ожидание Message message = buffer [ head ++ ]; head %= N ; count . fetch_sub ( 1 , std :: memory_order_relaxed ); consumerMessage ( message ); } } int main () { std :: thread t1 ( producer ); std :: thread t2 ( consumer ); t1 . присоединиться (); t2 . присоединиться (); }                                                                     

Переменные индекса циклического буфера headи tailявляются локальными для потока и, следовательно, не имеют отношения к согласованности памяти. Переменная countуправляет активным ожиданием потока-производителя и потока-потребителя.

Смотрите также

Ссылки

  1. ^ Дейкстра; 2000; EWD1303 Мои воспоминания о разработке операционных систем
  2. ^ Дейкстра; 1965; EWD123 Взаимодействующие последовательные процессы, раздел 4.1. Типичное использование общего семафора.
  3. ^ Дейкстра; 1965; EWD123 Взаимодействующие последовательные процессы, раздел 4.3. Ограниченный буфер.
  4. ^ Дейкстра; 1972; EWD329 Информационные потоки, совместно использующие конечный буфер
  5. Вирт; 1969; Письмо Никлауса Вирта от 14 июля 1969 г. в Brinch Hansen; 2004; История программиста, глава 4 Молодой человек в спешке
  6. ^ Дейкстра; 1965; EWD123 Взаимодействующие последовательные процессы, раздел 4.3. Ограниченный буфер.
  7. ^ Пер Бринч Хансен; 1973; Принципы операционной системы, 3.4.7. Очереди событий
  8. ^ CAR Hoare; 1974; Мониторы: концепция структурирования операционной системы, 4. Пример: ограниченный буфер
  9. ^ Хоар; 1978; Взаимодействие последовательных процессов, 7.3 Имена портов
  10. ^ Экскурсия по Go, каналы
  11. ^ Лампорт, Лесли; 1977; Доказательство корректности многопроцессных программ, пример производитель/потребитель

Дальнейшее чтение