В вычислительной технике проблема производителя -потребителя (также известная как проблема ограниченного буфера ) представляет собой семейство проблем, описанных Эдсгером В. Дейкстрой с 1965 года.
Дейкстра нашел решение проблемы производитель-потребитель, работая консультантом по компьютерам Electrologica X1 и X8: «Первое использование производителя-потребителя было частично программным, частично аппаратным: компонент, обеспечивающий передачу информации между магазином и магазином. периферийное устройство называлось «каналом»... Синхронизация контролировалась двумя счетными семафорами в том, что мы теперь знаем как механизм производитель/потребитель: один семафор, указывающий длину очереди, увеличивался (в V) процессором и уменьшается (в P) каналом, другой, подсчитывая количество неподтвержденных завершений, увеличивается каналом и уменьшается процессором. [Второй семафор, будучи положительным, поднимет соответствующий флаг прерывания.]" [1 ]
Дейкстра писал о случае неограниченного буфера: «Мы рассматриваем два процесса, которые называются «производителем» и «потребителем» соответственно. Производитель — это циклический процесс, и каждый раз, когда он проходит свой цикл, он производит определенную порцию информации. который должен быть обработан потребителем.Потребитель также представляет собой циклический процесс, и каждый раз, когда он проходит свой цикл, он может обрабатывать следующую порцию информации, как это было произведено производителем... Мы предполагаем, что эти два процесса быть подключен для этой цели через буфер с неограниченной емкостью». [2]
Он писал о случае ограниченного буфера: «Мы изучали производителя и потребителя, связанных через буфер с неограниченной емкостью... Отношение становится симметричным, если они связаны через буфер конечного размера, скажем, N порций» [3 ] ]
А о случае с несколькими производителями-потребителями: «Мы рассматриваем несколько пар производитель/потребитель, где пара i связана через информационный поток, содержащий n i частей. Мы предполагаем... конечный буфер, который должен содержать все части всех потоков. иметь вместимость «всего» порций». [4]
Пер Бринч Хансен и Никлаус Вирт вскоре увидели проблему семафоров: «Я пришел к тому же выводу в отношении семафоров, а именно, что они не подходят для языков более высокого уровня. Вместо этого естественными событиями синхронизации являются обмены сообщениями » . [5]
Исходное решение с ограниченным семафором буфером было написано в стиле ALGOL . Буфер может хранить N частей или элементов. Семафор «количество частей в очереди» подсчитывает заполненные ячейки в буфере, семафор «количество пустых позиций» подсчитывает пустые ячейки в буфере, а семафор «манипулирование буфером» работает как мьютекс для операций размещения и получения буфера. Если буфер заполнен, то есть количество пустых позиций равно нулю, поток-производитель будет ожидать операции P (количество пустых позиций). Если буфер пуст, то есть количество частей в очереди равно нулю, потребительский поток будет ожидать операции P (количество частей в очереди). Операции V() освобождают семафоры. В качестве побочного эффекта поток может перейти из очереди ожидания в очередь готовности. Операция P() уменьшает значение семафора до нуля. Операция V() увеличивает значение семафора. [6]
начало целое число порций в очереди , количество пустых позиций , манипуляции с буфером ; количество частей в очереди := 0 ; количество пустых позиций : = N ; манипуляция буфером := 1 ; производитель parbegin : начать заново 1 : произвести следующую порцию ; P ( количество пустых позиций ) ; _ P ( манипулирование буфером ) ; добавить часть в буфер ; V ( манипулирование буфером ) ; V ( количество частей в очереди ) ; перейти снова 1 конец ; потребитель : начать заново 2 : P ( количество частей в очереди ) ; P ( манипулирование буфером ) ; взять часть из буфера ; V ( манипулирование буфером ) ; V ( количество пустых позиций ) ; _ принята часть процесса ; перейти еще раз 2 end parend end
Начиная с C++ 20, семафоры являются частью языка. Решение Дейкстры можно легко написать на современном C++. Переменная buffer_manipulation является мьютексом. Функция семафора для получения данных в одном потоке и освобождения в другом потоке не требуется. Оператор lock_guard() вместо пары lock() и unlock() — это C++ RAII . Деструктор lock_guard обеспечивает снятие блокировки в случае исключения. Это решение может обрабатывать несколько потоков потребителей и/или несколько потоков производителей.
#include <поток> #include <мьютекс> #include <семафор> std :: counting_semaphore < N > число_частей_очередей {0} ; _ std :: counting_semaphore < N > число_пустых_позиций { N }; std :: мьютекс buffer_manipulation ; void Producer () { for (;;) { Порционная часть = Produce_next_portion (); число_пустых_позиций . приобретать (); { std :: lock_guard < std :: mutex > g ( buffer_manipulation ); add_portion_to_buffer ( часть ); } число_частей_очередей . выпускать (); } } void потребитель () { для (;;) { number_of_queueing_portions . приобретать (); Порционная порция ; { std :: lock_guard < std :: mutex > g ( buffer_manipulation ); часть = take_portion_from_buffer (); } число_пустых_позиций . выпускать (); process_portion_taken ( часть ); } } int main () { std :: thread t1 ( продюсер ); std :: поток t2 ( потребитель ); т1 . присоединиться (); т2 . присоединиться (); }
Пер Бринч Хансен определил монитор: Я буду использовать термин «монитор» для обозначения общей переменной и набора значимых операций над ней. Целью монитора является управление планированием ресурсов между отдельными процессами в соответствии с определенной политикой. [7] Тони Хоар заложил теоретическую основу монитора. [8]
ограниченный буфер : монитор начала буфера : массив 0 .. N - 1 порции ; _ голова , хвост : 0..N - 1 ; _ _ счет : 0 .. Н ; непустой , неполный : условие ; процедура добавления ( x : часть ) ; начать , если count = N , то неполный . ждать ; примечание 0 <= count < N ; буфер [ хвост ] := х ; хвост := хвост ( + ) 1 ; счет := счет + 1 ; непустой . конец сигнала , добавление ; процедура удаления ( результат x : часть ) ; начать , если count = 0 , то непусто . ждать ; примечание 0 < счет <= N ; х := буфер [ голова ] ; голова := голова ( + ) 1 ; кол := кол - 1 ; неполный . конец сигнала удалить ; голова := 0 ; хвост := 0 ; счет := 0 ; конец ограниченного буфера ;
Монитор — это объект, который содержит переменные buffer
, head
и для реализации циклического буфера , переменные условия , и для синхронизации tail
, и методы , и для доступа к ограниченному буферу. Ожидание операции монитора соответствует операции семафора P или захвату, сигнал соответствует V или освобождению. Операция, обведенная кружком (+), берется по модулю N. Представленный псевдокод в стиле Паскаля показывает монитор Хоара. Монитор Mesa использует вместо . Версия языка программирования C++:count
nonempty
nonfull
append
remove
while count
if count
класс Bounded_buffer { Буфер порции [ N ]; // 0..N-1 беззнаковая голова , хвост ; // 0..N-1 беззнаковый счетчик ; // 0..N std :: condition_variable непустой , неполный ; стандартный :: мьютекс mtx ; public : void add ( Partion x ) { std :: unique_lock < std :: mutex > lck ( mtx ); неполный . подождите ( lck , [ & ]{ return ! ( N == count ); }); утверждать ( 0 <= счетчик && счетчик < N ); буфер [ хвост ++ ] = х ; хвост %= N ; ++ количество ; непустой . notify_one (); } Portion Remove () { std :: unique_lock < std :: mutex > lck ( mtx ); непустой . подождите ( lck , [ & ]{ return ! ( 0 == count ); }); утверждать ( 0 < счет && счетчик <= N ); Часть x = буфер [ head ++ ]; голова %= N ; -- считать ; неполный . notify_one (); вернуть х ; } Bounded_buffer () { head = 0 ; хвост = 0 ; счет = 0 ; } };
Версия C++ требует дополнительного мьютекса по техническим причинам. Он использует утверждение для обеспечения выполнения предварительных условий для операций добавления и удаления буфера.
Самое первое решение производитель-потребитель в компьютерах Electrologica использовало «каналы». Каналы , определенные Хоаром . Альтернативой явному указанию источника и пункта назначения может быть указание порта, через который будет осуществляться связь. Имена портов будут локальными для процессов, а способ соединения пар портов по каналам может быть объявлен в заголовке параллельной команды. [9] Бринч Хансен реализовал каналы на языках программирования Joyce и Super Pascal . Язык программирования операционной системы Plan 9 Alef и язык программирования операционной системы Inferno Limbo имеют каналы. Следующий исходный код C компилируется в Plan 9 из пользовательского пространства :
#include "uh" #include "libc.h" #include "thread.h" перечисление { СТЕК = 8192 }; недействительный производитель ( void * v ) { Channel * ch = v ; для ( uint я знак равно 1 ; ; ++ я ) { сон ( 400 ); print ( "p %d \n " , i ); сендул ( ч , я ); } } void потребитель ( void * v ) { Channel * ch = v ; для (;;) { uint p = Recvul ( ch ); print ( " \t\t c %d \n " , p ); сон ( 200 + nrand ( 600 )); } } void threadmain ( int argc , char ** argv ) { int ( * mk ) ( void ( * fn ) ( void * ), void * arg , uint stack ); МК = создание потока ; Канал * ch = chancreate ( sizeof ( ulong ), 1 ); МК ( продюсер , канал , СТЕК ); МК ( потребитель , ch , СТЕК ); Recvp ( chancreate ( sizeof ( void * ), 0 )); потокexitsall ( 0 ); }
Точка входа в программу находится в функции threadmain
. Вызов функции ch = chancreate(sizeof(ulong), 1)
создает канал, вызов функции sendul(ch, i)
отправляет значение в канал, а вызов функции p = recvul(ch)
получает значение из канала. В языке программирования Go тоже есть каналы. Пример Го:
пакет основной импорт ( «fmt» «math/rand» «время» ) вар sendMsg = 0 func ProduceMessage () int { время . Сон ( 400 * время.миллисекунды ) sendMsg ++ fmt . _ _ Printf ( "sendMsg = %v\n" , sendMsg ) return sendMsg } func ConsumerMessage ( recvMsg int ) { fmt . Printf ( "\t\trecvMsg = %v\n" , RecvMsg ) время . Sleep ( time . Duration ( 200 + rand . Intn ( 600 )) * time . Milliсекунда ) } func main () { ch := make ( chan int , 3 ) go func () { for { ch <- ProduceMessage () } }() для RecvMsg := диапазон ch { ConsumerMessage ( RecvMsg ) } }
Решение Go производитель-потребитель использует основную процедуру Go для потребителя и создает новую безымянную процедуру Go для производителя. Две процедуры Go связаны с каналом ch. Этот канал может поставить в очередь до трех значений int. Оператор ch := make(chan int, 3)
создает канал, оператор ch <- produceMessage()
отправляет значение в канал, а оператор recvMsg := range ch
получает значение из канала. [10] Распределение ресурсов памяти, выделение ресурсов обработки и синхронизация ресурсов выполняются языком программирования автоматически.
Лесли Лэмпорт задокументировал решение «производитель-потребитель» с ограниченным буфером для одного производителя и одного потребителя: Мы предполагаем, что буфер может содержать не более b сообщений, b >= 1. В нашем решении мы позволяем k быть константой, большей, чем b, и пусть s и r — целочисленные переменные, принимающие значения от 0 до k-1. Мы предполагаем, что изначально s=r и буфер пуст. Выбрав k кратным b, буфер можно реализовать как массив B [0: b - 1]. Производитель просто помещает каждое новое сообщение в B[s mod b], а потребитель берет каждое сообщение из B[r mod b]. [11] Ниже показан алгоритм, обобщенный на бесконечный k.
Производитель : L : если ( s - r ) mod k = b , то перейти к L fi ; поместить сообщение в буфер ; s := ( s + 1 ) mod k ; перейти к Л ; Потребитель : L : если ( s - r ) mod k = 0 , то перейти к L fi ; взять сообщение из буфера ; р := ( р + 1 ) мод k ; перейти к Л ;
Решение Лампорта использует ожидание занятости в потоке вместо ожидания в планировщике. Это решение не учитывает влияние переключения потоков планировщика в неподходящее время. Если первый поток прочитал значение переменной из памяти, планировщик переключается на второй поток, который меняет значение переменной, а планировщик переключается обратно на первый поток, тогда первый поток использует старое значение переменной, а не текущее значение. . Атомное чтение-изменение-запись решает эту проблему. Современный C++ предлагает atomic
переменные и операции для многопоточного программирования. Следующее занятое ожидание C++11 для одного производителя и одного потребителя использует атомарные операции чтения-изменения-записи fetch_add
и fetch_sub
атомарную переменную count
.
перечисление { N = 4 }; Буфер сообщений [ N ]; std :: atomic < беззнаковый > count {0} ; _ недействительный производитель ( ) { беззнаковый хвост {0} ; for (;;) { Сообщение message = ProduceMessage (); в то время как ( N == счетчик ) ; // занят буфер ожидания [ tail ++ ] = message ; хвост %= N ; считать . fetch_add ( 1 , std :: memory_order_relaxed ); } } void потребитель ( ) { беззнаковая голова {0} ; for (;;) { while ( 0 == count ) ; // занято ожидание Message message = buffer [ head ++ ]; голова %= N ; считать . fetch_sub ( 1 , std :: memory_order_relaxed ); ConsumerMessage ( сообщение ); } } int main () { std :: поток t1 ( производитель ); std :: поток t2 ( потребитель ); т1 . присоединиться (); т2 . присоединиться (); }
Переменные индекса кольцевого буфера head
и tail
являются локальными для потока и поэтому не влияют на согласованность памяти. Переменная count
контролирует занятое ожидание потока производителя и потребителя.