stringtranslate.com

Монитор (синхронизация)

В параллельном программировании монитор это конструкция синхронизации, которая предотвращает одновременный доступ потоков к состоянию общего объекта и позволяет им ждать изменения состояния. Они обеспечивают механизм, позволяющий потокам временно отказываться от монопольного доступа, чтобы дождаться выполнения какого-либо условия, прежде чем восстановить монопольный доступ и возобновить выполнение своей задачи. Монитор состоит из мьютекса (блокировки) и как минимум одной условной переменной . Условная переменная явно «сигнализирует», когда состояние объекта изменяется, временно передавая мьютекс другому потоку, «ожидающему» условной переменной.

Другое определение монитора — это потокобезопасный класс , объект или модуль , который оборачивается вокруг мьютекса , чтобы безопасно разрешить доступ к методу или переменной более чем одному потоку . Определяющей характеристикой монитора является то, что его методы выполняются с принципом взаимного исключения : в каждый момент времени не более одного потока может выполнять любой из его методов . Используя одну или несколько переменных условия, он также может предоставить потокам возможность ожидать определенного условия (таким образом, используя приведенное выше определение «монитора»). В оставшейся части статьи этот смысл «монитора» будет называться «потокобезопасным объектом/классом/модулем».

Мониторы были изобретены Пером Бринчом Хансеном [1] и CAR Hoare , [2] и впервые были реализованы на языке Concurrent Pascal Бринч Хансена . [3]

Взаимное исключение

Когда поток выполняет метод потокобезопасного объекта, говорят, что он занимает объект, удерживая его мьютекс (блокировку) . Потокобезопасные объекты реализованы для обеспечения того, чтобы в каждый момент времени объект мог занимать не более одного потока . Блокировка, которая изначально разблокирована, блокируется в начале каждого общедоступного метода и разблокируется при каждом возврате из каждого общедоступного метода.

После вызова одного из методов поток должен дождаться, пока ни один другой поток не выполнит ни один из методов потокобезопасного объекта, прежде чем начать выполнение своего метода. Обратите внимание, что без этого взаимного исключения два потока могут привести к потере или получению денег без всякой причины. Например, два потока, снимающие 1000 со счета, могут оба вернуть true, в то время как баланс упадет только на 1000, следующим образом: сначала оба потока извлекают текущий баланс, находят его больше 1000 и вычитают из него 1000; затем оба потока сохраняют баланс и возвращаются.

Условные переменные

Постановка задачи

Для многих приложений взаимного исключения недостаточно. Потокам, пытающимся выполнить операцию, возможно, придется подождать, пока некоторое условие P не станет истинным. Напряженный цикл ожидания

пока  нет ( P ) пропускай  _

не будет работать, поскольку взаимное исключение не позволит любому другому потоку войти в монитор, чтобы условие стало истинным. Существуют и другие «решения», такие как цикл, который разблокирует монитор, ждет определенное время, блокирует монитор и проверяет условие P . Теоретически работает и тупика не будет, но возникают проблемы. Трудно определить подходящее время ожидания: слишком маленькое — и поток перегружает процессор, слишком большое — и он, очевидно, перестанет отвечать. Что необходимо, так это способ сигнализировать потоку, когда условие P истинно (или может быть истинным).

Практический пример: классическая ограниченная проблема производителя/потребителя

Классическая проблема параллелизма — это проблема ограниченного производителя/потребителя , в которой существует очередь или кольцевой буфер задач максимального размера, при этом один или несколько потоков являются потоками «производителей», которые добавляют задачи в очередь, а один или несколько потоков — «производителей», добавляющих задачи в очередь. другие потоки являются «потребительскими» потоками, которые извлекают задачи из очереди. Предполагается, что очередь сама по себе не является потокобезопасной и может быть пустой, полной или между пустой и полной. Всякий раз, когда очередь полна задач, нам нужно, чтобы потоки-производители блокировались до тех пор, пока не останется место для задач, выводящих из очереди потоки-потребители. С другой стороны, всякий раз, когда очередь пуста, нам нужно, чтобы потоки-потребители блокировались до тех пор, пока не станут доступны новые задачи из-за их добавления потоками-производителями.

Поскольку очередь является параллельным объектом, разделяемым между потоками, доступ к ней должен быть атомарным , поскольку в ходе доступа к очереди очередь может быть переведена в несогласованное состояние , которое никогда не должно раскрываться между потоками. Таким образом, любой код, обращающийся к очереди, представляет собой критическую секцию , которую необходимо синхронизировать путем взаимного исключения. Если инструкции кода и процессора в критических разделах кода, имеющих доступ к очереди, могут чередоваться из-за произвольных переключений контекста между потоками на одном и том же процессоре или из-за одновременного выполнения потоков на нескольких процессорах, то существует риск выявления несогласованного состояния и возникновения условий гонки . .

Неверно без синхронизации

Наивный подход заключается в разработке кода с ожиданием занятости и отсутствием синхронизации, что делает код подверженным условиям гонки:

глобальная очередь RingBuffer ; // Потокобезопасный кольцевой буфер задач.   // Метод, представляющий поведение каждого потока-производителя: public метод Producer () { while ( true ) { Task myTask = ...; // Производитель добавляет новую задачу. while ( queue . isFull ()) {} // Ожидание занятости, пока очередь не заполнится. очередь . поставить в очередь ( myTask ); // Добавляем задачу в очередь. } }                  // Метод, представляющий поведение каждого потребительского потока: public метод Consumer () { while ( true ) { while ( queue . isEmpty ()) {} // Занято — ожидание, пока очередь не станет непустой. мояЗадача = очередь . удалить из очереди (); // Убираем задачу из очереди. сделатьStuff ( myTask ); // Выходим и делаем что-нибудь с задачей. } }                 

В этом коде есть серьезная проблема: доступ к очереди может прерываться и чередоваться с доступом к очереди других потоков. Методыqueue.enqueue иqueue.dequeue , скорее всего, содержат инструкции по обновлению переменных-членов очереди, таких как ее размер, начальная и конечная позиции, назначение и распределение элементов очереди и т. д. Кроме того, очереди.isEmpty ( ) и очередь.isFull () также читают это общее состояние. Если потокам-производителям и потребителям разрешено чередование во время вызовов постановки в очередь или удаления из очереди, то может проявиться несогласованное состояние очереди, что приведет к условиям гонки. Кроме того, если один потребитель опустошает очередь между выходом другого потребителя из режима ожидания занятости и вызовом «удаления из очереди», то второй потребитель попытается исключить из очереди пустую очередь, что приведет к ошибке. Аналогично, если производитель заполняет очередь между выходом другого производителя из режима ожидания занятости и вызовом «очереди», тогда второй производитель попытается добавить к полной очереди, что приведет к ошибке.

Ожидание вращения

Одним из наивных подходов к достижению синхронизации, как упоминалось выше, является использование « ожидания вращения », при котором мьютекс используется для защиты критических разделов кода, а ожидание занятости по-прежнему используется, при этом блокировка захватывается и снимается в между каждой проверкой занятости-ожидания.

глобальная очередь RingBuffer ; // Потокобезопасный кольцевой буфер задач. глобальная блокировка очередиLock ; // Мьютекс для кольцевого буфера задач.      // Метод, представляющий поведение каждого потока-производителя: public метод Producer () { while ( true ) { Task myTask = ...; // Производитель добавляет новую задачу.            очередьLock . приобретать (); // Получение блокировки для начальной проверки ожидания занятости. while ( queue . isFull ()) { // Занято – ожидание, пока очередь не заполнится. очередьLock . выпускать (); // Временно отмените блокировку, чтобы дать возможность другим потокам // нуждающимся в запуске очереди QueueLock, чтобы потребитель мог принять задачу. очередьLock . приобретать (); // Повторно получаем блокировку для следующего вызова "queue.isFull()". }            очередь . поставить в очередь ( myTask ); // Добавляем задачу в очередь. очередьLock . выпускать (); // Снимаем блокировку очереди до тех пор, пока она нам снова не понадобится для добавления следующей задачи. } }    // Метод, представляющий поведение каждого потребительского потока: public метод Consumer () { while ( true ) { queueLock . приобретать (); // Получение блокировки для начальной проверки ожидания занятости. while ( queue . isEmpty ()) { // Занято – ожидание, пока очередь не станет непустой. очередьLock . выпускать (); // Временно отмените блокировку, чтобы дать возможность другим потокам // нуждаться в запуске очереди QueueLock, чтобы производитель мог добавить задачу. очередьLock . приобретать (); // Повторно получаем блокировку для следующего вызова "queue.isEmpty()". } MyTask = очередь . удалить из очереди (); // Убираем задачу из очереди. очередьLock . выпускать (); // Снимаем блокировку очереди до тех пор, пока она нам снова не понадобится для выполнения следующей задачи. сделатьStuff ( myTask ); // Выходим и делаем что-нибудь с задачей. } }                           

Этот метод гарантирует, что несогласованное состояние не возникнет, но тратит ресурсы ЦП из-за ненужного ожидания занятости. Даже если очередь пуста и потокам-производителям нечего добавить в течение длительного времени, потоки-потребители всегда заняты ожиданием без необходимости. Аналогично, даже если потребители на долгое время заблокированы при обработке своих текущих задач и очередь заполнена, производители всегда заняты ожиданием. Это расточительный механизм. Что необходимо, так это способ блокировать потоки-производители до тех пор, пока очередь не переполнится, и способ блокировать потоки-потребители до тех пор, пока очередь не станет непустой.

(Примечание: мьютексы сами по себе также могут представлять собой спин-блокировки , которые включают ожидание занятости для получения блокировки, но чтобы решить проблему нерациональной траты ресурсов ЦП, мы предполагаем, что очередь QueueLock не является спин-блокировкой и правильно использует блокировку. заблокировать саму очередь.)

Условные переменные

Решение состоит в использовании переменных условия . Концептуально условная переменная представляет собой очередь потоков, связанную с мьютексом, в которой поток может ожидать выполнения некоторого условия. Таким образом, каждая переменная условия c связана с утверждением P c . Пока поток ожидает условную переменную, считается, что этот поток не занимает монитор, и поэтому другие потоки могут войти в монитор, чтобы изменить его состояние. В большинстве типов мониторов эти другие потоки могут сигнализировать переменную условия c , чтобы указать, что утверждение P c истинно в текущем состоянии.

Таким образом, над условными переменными выполняются три основные операции:

Как правило, несколько условных переменных могут быть связаны с одним и тем же мьютексом, но не наоборот. (Это соответствие « один ко многим» .) Это связано с тем, что предикат P c одинаков для всех потоков, использующих монитор, и должен быть защищен взаимным исключением от всех других потоков, которые могут вызвать изменение условия или которые могут прочитайте его, пока рассматриваемый поток вызывает его изменение, но могут быть разные потоки, которые хотят дождаться другого условия для одной и той же переменной, требующей использования одного и того же мьютекса. В описанном выше примере производитель-потребитель очередь должна быть защищена уникальным объектом мьютекса m. Потоки-производители захотят ожидать на мониторе, используя блокировку mи переменную условия , которая блокируется до тех пор, пока очередь не переполнится. «Потребительские» потоки захотят ожидать на другом мониторе, используя тот же мьютекс, но с другой переменной условия , которая блокируется до тех пор, пока очередь не станет непустой. (Обычно) никогда не имеет смысла иметь разные мьютексы для одной и той же условной переменной, но этот классический пример показывает, почему часто имеет смысл иметь несколько условных переменных, использующих один и тот же мьютекс. Мьютекс, используемый одной или несколькими условными переменными (одним или несколькими мониторами), также может использоваться совместно с кодом, который не использует условные переменные (и который просто получает/освобождает его без каких-либо операций ожидания/сигнала), если эти критические разделы не происходят. требовать ожидания определенного условия для одновременных данных.m

Мониторинг использования

Правильное базовое использование монитора:

приобрести ( м ); // Получите блокировку этого монитора. while ( ! p ) { // Пока ожидаемое условие/предикат/утверждение не истинно... wait ( m , cv ); // Ожидание блокировки и условной переменной этого монитора. } // ... Здесь находится критическая часть кода ... signal ( cv2 ); // Или: Broadcast(cv2); // cv2 может быть таким же, как cv, или отличаться. выпуск ( м ); // Снимите блокировку этого монитора.         

Точнее, это тот же псевдокод, но с более подробными комментариями, чтобы лучше объяснить, что происходит:

// ... (предыдущий код) // Скоро вход в монитор. // Получите консультативный мьютекс (блокировку), связанный с одновременными // данными, которые совместно используются потоками, // чтобы гарантировать, что никакие два потока не могут быть упреждающе чередованы или // выполняться одновременно на разных ядрах во время выполнения в критических // разделах, которые читать или записывать одни и те же одновременные данные. Если другой // поток удерживает этот мьютекс, то этот поток // будет переведен в режим сна (заблокирован) и помещен в очередь ожидания m. (Мьютекс "m" не должен // быть спин-блокировкой.) Acquire ( m ); // Теперь мы удерживаем блокировку и можем проверить условие в первый раз.// В первый раз, когда мы выполняем условие цикла while после вышеописанного // "получения", мы спрашиваем: "Условие/предикат/утверждение , // которое мы ожидаем, уже истинно?"while ( ! p ()) // "p" — это любое выражение (например, переменная или // вызов функции), которое проверяет условие и // принимает логическое значение. Это сам по себе критический // раздел, поэтому вы *ДОЛЖНЫ* удерживать блокировку при // выполнении этого условия цикла «пока»! // Если условие «пока» проверяется не в первый раз, // тогда мы задаем вопрос: «Теперь, когда другой поток, использующий этот // монитор, уведомил меня и разбудил меня, и я был контекстно- переключился // обратно, осталось ли условие/предикат/утверждение, которого мы ожидаем, // истинным между моментом, когда меня разбудили, и моментом, когда я повторно получил // блокировку внутри вызова "wait" в последнем итерации этого цикла, или // какой-то другой поток заставил // тем временем снова стать ложным условие, что сделало это ложным пробуждением?  { // Если это первая итерация цикла, то ответ // «нет» — условие еще не готово. В противном случае ответ: // последнее. Это было ложное пробуждение, сначала произошел // какой-то другой поток, из-за которого условие снова стало ложным, и мы должны // снова подождать.подожди ( м , резюме ); // Временно запретить любому другому потоку на любом ядре выполнять // операции над m или cv. // Release(m) // Атомарно освобождаем блокировку "m", чтобы другой // // код, использующий эти параллельные данные , // // мог работать, переместите этот поток в // // очередь ожидания cv, чтобы он был уведомлен // // когда-нибудь, когда условие станет // // истинным, и засыпаем этот поток. Повторно включите // // другие потоки и ядра для выполнения // // операций над m и cv. // // Переключение контекста происходит на этом ядре. // // В какой-то момент в будущем условие, которого мы ожидаем, станет // истинным, и другой поток, использующий этот монитор (m, cv), подаст либо // сигнал, который разбудит этот поток, либо // широковещательную передачу это будит нас, а это означает, что нас исключили // из очереди ожидания cv. // // В течение этого времени другие потоки могут привести к тому, что условие // снова станет ложным, или условие может переключиться один или несколько // раз, или оно может остаться истинным. // // Этот поток снова переключается на какое-то ядро. // //acquire(m) // Блокировка "m" получена повторно. // Завершим итерацию цикла и повторно проверим условие цикла while, // чтобы убедиться, что предикат по-прежнему истинен. } // Условие, которого мы ждем, истинно! // Мы все еще удерживаем блокировку либо перед входом в монитор, либо с // последнего выполнения "wait".// Здесь находится критический раздел кода, в котором есть предварительное условие, что наш предикат // должен быть истинным. // Этот код может сделать условие cv ложным и/или сделать предикаты // других переменных условия истинными.// Сигнал вызова или широковещательная рассылка, в зависимости от того, // какие предикаты условных переменных (которые совместно используют мьютекс m) стали истинными или могли стать истинными, // и используемый семантический тип монитора.for ( cv_x в cvs_to_signal ) { сигнал ( cv_x ); // Или: Broadcast(cv_x); } // Один или несколько потоков были разбужены, но заблокируются, как только попытаются // получить m.     // Освободите мьютекс, чтобы уведомленные потоки и другие могли войти в свои критические // секции. выпуск ( м );

Решение ограниченной задачи производителя/потребителя

Введя использование условных переменных, давайте воспользуемся ими, чтобы вернуться к классической проблеме ограниченного производителя/потребителя и решить ее. Классическое решение — использовать два монитора, содержащие две условные переменные, совместно использующие одну блокировку в очереди:

глобальная нестабильная очередь RingBuffer ; // Потокобезопасный кольцевой буфер задач. глобальная блокировка очередиLock ; // Мьютекс для кольцевого буфера задач. (Не спин-блокировка.) global CV QueueEmptyCV ; // Условная переменная для потребительских потоков, ожидающих, пока // очередь не станет непустой. Связанная с ним блокировка — «queueLock». глобальная очередь CVFullCV ; // Условная переменная для потоков-производителей, ожидающих, пока очередь // не станет неполной. Связанная с ним блокировка также является «queueLock».               // Метод, представляющий поведение каждого потока-производителя: public метод Producer () { while ( true ) { // Производитель создает новую задачу, которую нужно добавить. задача myTask = ...;            // Получаем "queueLock" для начальной проверки предиката. очередьLock . приобретать ();  // Критическая секция, которая проверяет, не заполнена ли очередь. while ( queue . isFull ()) { // Освободите «queueLock», поставьте этот поток в очередь на «queueFullCV» и засните этот поток. подождите ( queueLock , queueFullCV ); // Когда этот поток просыпается, повторно получаем «queueLock» для следующей проверки предиката. }         // Критическая секция, добавляющая задачу в очередь (обратите внимание, что мы держим «queueLock»). очередь . поставить в очередь ( myTask );  // Пробуждаем один или все потоки-потребители, ожидающие, пока очередь не станет непустой // теперь, когда это гарантировано, чтобы поток-потребитель взял на себя задачу. сигнал ( queueEmptyCV ); // Или: Broadcast(queueEmptyCV); // Конец критических секций.     // Отпускаем «queueLock», пока он нам снова не понадобится для добавления следующей задачи. очередьLock . выпускать (); } }  // Метод, представляющий поведение каждого потока-потребителя: public метод Consumer () { while ( true ) { // Получение «queueLock» для начальной проверки предиката. очередьLock . приобретать ();         // Критическая секция, проверяющая, не пуста ли очередь. while ( queue . isEmpty ()) { // Освободите «queueLock», поставьте этот поток в очередь на «queueEmptyCV» и засните этот поток. подождите ( queueLock , queueEmptyCV ); // Когда этот поток просыпается, повторно получаем «queueLock» для следующей проверки предиката. }         // Критическая секция, которая удаляет задачу из очереди (обратите внимание, что мы удерживаем «queueLock»). мояЗадача = очередь . удалить из очереди ();    // Пробуждаем один или все потоки-производители, ожидающие заполнения очереди // теперь, когда это гарантировано, чтобы поток-производитель добавил задачу. сигнал ( queueFullCV ); // Или: Broadcast(queueFullCV); // Конец критических секций.     // Отпускаем «queueLock», пока он нам снова не понадобится для выполнения следующей задачи. очередьLock . выпускать ();  // Выходим и делаем что-нибудь с задачей. сделатьStuff ( myTask ); } }  

Это обеспечивает параллелизм между потоками производителя и потребителя, совместно использующими очередь задач, и блокирует потоки, которым нечего делать, вместо ожидания занятости, как показано в вышеупомянутом подходе с использованием спин-блокировок.

Вариант этого решения может использовать одну переменную условия как для производителей, так и для потребителей, возможно, с именем «queueFullOrEmptyCV» или «queueSizeChangedCV». В этом случае с переменной условия связано более одного условия, так что переменная условия представляет собой более слабое условие, чем условия, проверяемые отдельными потоками. Условная переменная представляет потоки, ожидающие, пока очередь не заполнится , и потоки, ожидающие, пока она не станет пустой. Однако для этого потребуется использовать широковещательную рассылку во всех потоках с использованием условной переменной и нельзя использовать обычный сигнал . Это связано с тем, что обычный сигнал может разбудить поток неправильного типа, условие которого еще не выполнено, и этот поток снова перейдет в спящий режим без получения сигнала потока правильного типа. Например, производитель может заполнить очередь и разбудить другого производителя вместо потребителя, а проснувшийся производитель снова перейдет в режим сна. В дополнительном случае потребитель может опустошить очередь и разбудить другого потребителя вместо производителя, и потребитель снова перейдет в режим сна. Использование широковещательной рассылки гарантирует, что некоторый поток правильного типа будет работать так, как ожидается в условии задачи.

Вот вариант, использующий только одну условную переменную и трансляцию:

глобальная нестабильная очередь RingBuffer ; // Потокобезопасный кольцевой буфер задач. глобальная блокировка очередиLock ; // Мьютекс для кольцевого буфера задач. (Не спин-блокировка.) global CV QueueFullOrEmptyCV ; // Одна переменная условия, когда очередь не готова для какого-либо потока // т.е. для потоков-производителей, ожидающих, пока очередь станет неполной, // и потоков-потребителей, ожидающих, пока очередь станет непустой. // Связанная с ним блокировка — «queueLock». // Небезопасно использовать обычный «сигнал», поскольку он связан // с несколькими условиями предиката (утверждениями).               // Метод, представляющий поведение каждого потока-производителя: public метод Producer () { while ( true ) { // Производитель создает новую задачу, которую нужно добавить. задача myTask = ...;            // Получаем "queueLock" для начальной проверки предиката. очередьLock . приобретать ();  // Критическая секция, которая проверяет, не заполнена ли очередь. while ( queue . isFull ()) { // Освободите «queueLock», поставьте этот поток в очередь «queueFullOrEmptyCV» и засните этот поток. подождите ( queueLock , queueFullOrEmptyCV ); // Когда этот поток просыпается, повторно получаем «queueLock» для следующей проверки предиката. }         // Критическая секция, добавляющая задачу в очередь (обратите внимание, что мы держим «queueLock»). очередь . поставить в очередь ( myTask );  // Пробуждаем все потоки-производители и потребители, ожидающие, пока очередь не станет соответственно // не полной и не пустой, поскольку последнее гарантировано, чтобы поток-потребитель взял на себя задачу. трансляция ( queueFullOrEmptyCV ); // Не используйте «сигнал» (так как он может разбудить только другой поток-производитель). // Конец критических секций.     // Отпускаем «queueLock», пока он нам снова не понадобится для добавления следующей задачи. очередьLock . выпускать (); } }  // Метод, представляющий поведение каждого потока-потребителя: public метод Consumer () { while ( true ) { // Получение «queueLock» для начальной проверки предиката. очередьLock . приобретать ();         // Критическая секция, проверяющая, не пуста ли очередь. while ( queue . isEmpty ()) { // Освободите «queueLock», поставьте этот поток в очередь на «queueFullOrEmptyCV» и засните этот поток. подождите ( queueLock , queueFullOrEmptyCV ); // Когда этот поток просыпается, повторно получаем «queueLock» для следующей проверки предиката. }         // Критическая секция, которая удаляет задачу из очереди (обратите внимание, что мы удерживаем «queueLock»). мояЗадача = очередь . удалить из очереди ();    // Пробуждаем все потоки-производители и потребители, ожидающие, пока очередь не станет соответственно // не полной и не пустой, поскольку первый гарантирован, так что поток-производитель добавит задачу. трансляция ( queueFullOrEmptyCV ); // Не используйте «сигнал» (так как он может разбудить только другой потребительский поток). // Конец критических секций.     // Отпускаем «queueLock», пока он нам снова не понадобится для выполнения следующей задачи. очередьLock . выпускать ();  // Выходим и делаем что-нибудь с задачей. сделатьStuff ( myTask ); } }  

Примитивы синхронизации

Мониторы реализованы с использованием атомарного примитива чтения-изменения-записи и примитива ожидания. Примитив чтения-изменения-записи (обычно «проверка-установка» или «сравнение-и-замена») обычно имеет форму инструкции блокировки памяти, предоставляемой ISA , но также может состоять из неблокирующих инструкций на одном языке. процессорные устройства, когда прерывания отключены. Примитив ожидания может быть циклом ожидания занятости или примитивом, предоставляемым ОС, который предотвращает планирование потока до тех пор, пока он не будет готов продолжить работу.

Вот пример реализации псевдокода частей системы потоков, мьютексов и условных переменных в стиле Mesa с использованием политики « тестируй и устанавливай» и политики «первым пришел — первым обслужен»:

Пример реализации Mesa-монитора с помощью Test-and-Set

// Основные части системы потоков: // Предположим, что "ThreadQueue" поддерживает произвольный доступ. общедоступная изменчивая ThreadQueue ReadyQueue ; // Потокобезопасная очередь готовых потоков. Элементы: (Поток*). общественный изменчивый глобальный поток * currentThread ; // Предположим, что эта переменная относится к числу ядер. (Остальные общие.)         // Реализует спин-блокировку только для синхронизированного состояния самой системы потоков. // Используется с test-and-set в качестве примитива синхронизации. public voluting global bool threadingSystemBusy = false ;      // Подпрограмма обслуживания прерывания переключения контекста (ISR): // На текущем ядре ЦП упреждающее переключение на другой поток. публичный метод contextSwitchISR () { if ( testAndSet ( threadingSystemBusy )) { return ; // Невозможно сейчас переключить контекст. }          // Обеспечиваем, чтобы это прерывание не повторилось и не нарушило переключение контекста: systemCall_disableInterrupts ();  // Получить все регистры текущего процесса. // Для счетчика программ (ПК) нам понадобится расположение инструкции // в метке «возобновить» ниже. Получение значений регистров зависит от платформы и может включать // чтение текущего кадра стека, инструкции JMP/CALL и т. д. (Подробности выходят за рамки этой темы.) currentThread -> Registers = getAllRegisters (); // Сохраняем регистры объекта currentThread в памяти. currentThread -> регистры . ПК = резюме ; // В этом методе для следующего компьютера установите метку «возобновить» ниже.            готовочередь . постановка в очередь ( currentThread ); // Помещаем этот поток обратно в очередь готовности для последующего выполнения. Тема * otherThread = ReadyQueue . удалить из очереди (); // Удалить и запустить следующий поток из очереди готовности. currentThread = OtherThread ; // Заменяем значение глобального указателя текущего потока, чтобы оно было готово для следующего потока.             // Восстанавливаем регистры из currentThread/otherThread, включая переход к сохраненному ПК другого потока // (в разделе «резюме» ниже). Опять же, подробности того, как это делается, выходят за рамки этой задачи. восстановления регистров ( otherThread . регистры );   // *** Сейчас выполняется «otherThread» (теперь «currentThread»)! Исходный поток теперь «спит». *** резюме : // Это место, где другой вызов contextSwitch() должен установить ПК при переключении контекста здесь.  // Возврат к тому месту, где остановился другой поток. ThreadingSystemBusy = ложь ; // Должно быть атомарное присваивание. systemCall_enableInterrupts (); // Снова включаем упреждающее переключение на этом ядре. }     // Метод сна потока: // На текущем ядре ЦП синхронное переключение контекста на другой поток без помещения // текущего потока в очередь готовности. // Должен содержать "threadingSystemBusy" и отключенные прерывания, чтобы этот метод // не прерывался таймером переключения потоков, который вызывал бы contextSwitchISR(). // После возврата из этого метода необходимо очистить «threadingSystemBusy». public метод threadSleep () { // Получаем все регистры текущего процесса. // Для счетчика программ (ПК) нам понадобится расположение инструкции // в метке «возобновить» ниже. Получение значений регистров зависит от платформы и может включать // чтение текущего кадра стека, инструкции JMP/CALL и т. д. (Подробности выходят за рамки этой темы.) currentThread -> Registers = getAllRegisters (); // Сохраняем регистры объекта currentThread в памяти. currentThread -> регистры . ПК = резюме ; // В этом методе для следующего компьютера установите метку «возобновить» ниже.                // В отличие от contextSwitchISR(), мы не будем помещать currentThread обратно в ReadyQueue. // Вместо этого он уже помещен в очередь мьютекса или условной переменной. Тема * otherThread = ReadyQueue . удалить из очереди (); // Удалить и запустить следующий поток из очереди готовности. currentThread = OtherThread ; // Заменяем значение глобального указателя текущего потока, чтобы оно было готово для следующего потока.             // Восстанавливаем регистры из currentThread/otherThread, включая переход к сохраненному ПК другого потока // (в разделе «резюме» ниже). Опять же, подробности того, как это делается, выходят за рамки этой задачи. восстановления регистров ( otherThread . регистры );   // *** Сейчас выполняется «otherThread» (теперь «currentThread»)! Исходный поток теперь «спит». *** резюме : // Это место, где другой вызов contextSwitch() должен установить ПК при переключении контекста здесь.  // Возврат к тому месту, где остановился другой поток. }public метод wait ( Mutex m , ConditionVariable c ) { // Внутренняя спин-блокировка, пока другие потоки на любом ядре обращаются к // "hold" и "threadQueue" или "readyQueue" этого объекта. while ( testAndSet ( threadingSystemBusy )) {} // Обратите внимание: «threadingSystemBusy» теперь истинно. // Системный вызов для отключения прерываний на этом ядре, чтобы threadSleep() не прерывался // таймером переключения потоков на этом ядре, который вызывал бы contextSwitchISR(). // Выполнено вне threadSleep() для большей эффективности, чтобы этот поток перешел в сон // сразу после перехода в очередь с переменными условия. systemCall_disableInterrupts (); утверждать м . держал ; // (В частности, этот поток должен содержать его.) m . выпускать (); в . ожидание Темы . постановка в очередь ( currentThread ); потокSleep (); // Поток спит... Поток просыпается от сигнала/трансляции. ThreadingSystemBusy = ложь ; // Должно быть атомарное присваивание. systemCall_enableInterrupts (); // Снова включаем упреждающее переключение на этом ядре. // Стиль Mesa: // Теперь здесь могут происходить переключения контекста, что делает предикат вызывающего клиента ложным. м . приобретать (); }                                         сигнал публичного метода ( ConditionVariable c ) { // Внутренняя спин-блокировка, пока другие потоки на любом ядре обращаются к // "hold" и "threadQueue" или "readyQueue" этого объекта . while ( testAndSet ( threadingSystemBusy )) {} // Обратите внимание: «threadingSystemBusy» теперь истинно. // Системный вызов для отключения прерываний на этом ядре, чтобы threadSleep() не прерывался // таймером переключения потоков на этом ядре, который вызывал бы contextSwitchISR(). // Выполнено вне threadSleep() для большей эффективности, чтобы этот поток перешел в сон // сразу после перехода в очередь с переменными условия. systemCall_disableInterrupts (); если ( ! c . waitThreads . isEmpty ()) { wakenThread = c . ожидание Темы . удалить из очереди (); готовочередь . постановка в очередь ( wakenThread ); } ThreadingSystemBusy = ложь ; // Должно быть атомарное присваивание. systemCall_enableInterrupts (); // Снова включаем упреждающее переключение на этом ядре. // Стиль Mesa: // Пробужденный поток не имеет никакого приоритета. }                                   общедоступный метод Broadcast ( ConditionVariable c ) { // Внутренняя спин-блокировка, пока другие потоки на любом ядре обращаются к // "hold" и "threadQueue" или "readyQueue" этого объекта . while ( testAndSet ( threadingSystemBusy )) {} // Обратите внимание: «threadingSystemBusy» теперь истинно. // Системный вызов для отключения прерываний на этом ядре, чтобы threadSleep() не прерывался // таймером переключения потоков на этом ядре, который вызывал бы contextSwitchISR(). // Выполнено вне threadSleep() для большей эффективности, чтобы этот поток перешел в сон // сразу после перехода в очередь с переменными условия. systemCall_disableInterrupts (); в то время как ( ! c . waitThreads . isEmpty ()) { wakenThread = c . ожидание Темы . удалить из очереди (); готовочередь . постановка в очередь ( wakenThread ); } ThreadingSystemBusy = ложь ; // Должно быть атомарное присваивание. systemCall_enableInterrupts (); // Снова включаем упреждающее переключение на этом ядре. // Стиль Mesa: // Пробужденные потоки не имеют никакого приоритета. }                                   класс Mutex { protected voluty bool Hold = false ; частный изменчивый ThreadQueue BlockingThreads ; // Потокобезопасная очередь заблокированных потоков. Элементы: (Поток*). public методacquire () { // Внутренняя спин-блокировка, пока другие потоки на любом ядре обращаются к // "hold" и "threadQueue" или "readyQueue" этого объекта . while ( testAndSet ( threadingSystemBusy )) {} // Обратите внимание: «threadingSystemBusy» теперь истинно. // Системный вызов для отключения прерываний на этом ядре, чтобы threadSleep() не прерывался // таймером переключения потоков на этом ядре, который будет вызывать contextSwitchISR(). // Выполнено вне threadSleep() для большей эффективности, чтобы этот поток перешел в сон // сразу после перехода в очередь блокировки. systemCall_disableInterrupts ();                               утверждать ! блокировка потоков . содержит ( currentThread );  if ( hold ) { // Помещаем "currentThread" в очередь этой блокировки, чтобы // он считался "спящим" для этой блокировки. // Обратите внимание, что "currentThread" по-прежнему должен обрабатываться функцией threadSleep(). готовочередь . удалить ( currentThread ); блокировка потоков . постановка в очередь ( currentThread ); потокSleep (); // Теперь мы проснулись, что должно быть потому, что "hold" стало ложным. утверждать ! держал ; утверждать ! блокировка потоков . содержит ( currentThread ); } Удержано = правда ; ThreadingSystemBusy = ложь ; // Должно быть атомарное присваивание. systemCall_enableInterrupts (); // Снова включаем упреждающее переключение на этом ядре. } public метод Release () { // Внутренняя спин-блокировка, пока другие потоки на любом ядре обращаются к // "hold" и "threadQueue" или "readyQueue" этого объекта. while ( testAndSet ( threadingSystemBusy )) {} // Обратите внимание: «threadingSystemBusy» теперь истинно. // Системный вызов для отключения прерываний на этом ядре для повышения эффективности. systemCall_disableInterrupts (); утверждать, что проведено ; // (Освобождение должно выполняться только во время удержания блокировки.)                                               удержано = ложь ; если ( ! blockingThreads . isEmpty ()) { Тема * unblockedThread = blockingThreads . удалить из очереди (); готовочередь . постановка в очередь ( unblockedThread ); } ThreadingSystemBusy = ложь ; // Должно быть атомарное присваивание. systemCall_enableInterrupts (); // Снова включаем упреждающее переключение на этом ядре. } }                    struct ConditionVariable { изменчивый ThreadQueue waitThreads ; }     

Блокирующие переменные условия

Первоначальные предложения К.А.Р. Хоара и Пера Бринча Хансена касались блокировки условных переменных . При использовании переменной условия блокировки сигнальный поток должен ждать за пределами монитора (по крайней мере), пока сигнальный поток не освободит монитор, либо вернувшись, либо снова ожидая условную переменную. Мониторы, использующие переменные условия блокировки, часто называют мониторами в стиле Хоара или мониторами сигналов и срочного ожидания .

Монитор в стиле Хоара с двумя переменными состояния aи b. После того, как Бур и др.

Мы предполагаем, что с каждым объектом монитора связаны две очереди потоков.

Кроме того, мы предполагаем, что для каждой условной переменной c существует очередь

Обычно гарантируется справедливость всех очередей , а в некоторых реализациях может быть гарантировано, что они будут первыми пришли — первым обслужены .

Реализация каждой операции осуществляется следующим образом. (Мы предполагаем, что каждая операция выполняется во взаимном исключении других; таким образом, перезапущенные потоки не начинают выполнение до завершения операции.)

войдите в монитор: введите метод если монитор заблокирован добавить эту тему в эл. заблокировать эту тему еще заблокировать монитороставь монитор: расписание возврат из методаподожди  с : добавить эту тему в c.q расписание заблокировать эту темусигнал  c : если есть поток, ожидающий c .q выберите и удалите один такой поток t из c.q (t называется «сигнализированным потоком») добавить эту тему в s перезапустить т (так что следующим я займу монитор) заблокировать эту темурасписание: если есть тема на s выберите и удалите один поток из s и перезапустите его (эта ветка займет монитор дальше) иначе, если есть нить на e выберите и удалите один поток из e и перезапустите его (эта ветка займет монитор дальше) еще разблокировать монитор (монитор станет пустым)

Процедура scheduleвыбирает следующий поток, который займет монитор, или, при отсутствии потоков-кандидатов, разблокирует монитор.

Получающаяся в результате дисциплина сигнализации известна как «сигнальное и срочное ожидание», поскольку сигнализатор должен ждать, но ему дается приоритет над потоками во входной очереди. Альтернативой является «сигнал и ожидание», при котором очередь отсутствует s, а сигнализатор вместо этого ожидает в eочереди.

Некоторые реализации предоставляют операцию сигнала и возврата , которая сочетает в себе сигнализацию и возврат из процедуры.

сигнал  c  и возврат : если есть поток, ожидающий c .q выберите и удалите один такой поток t из c.q (t называется «сигнализированным потоком») перезапустить т (так что следующим я займу монитор) еще расписание возврат из метода

В любом случае («сигнал и срочное ожидание» или «сигнал и ожидание»), когда сигнализируется условная переменная и есть хотя бы один поток, ожидающий условной переменной, сигнальный поток плавно передает занятость сигнальному потоку, поэтому что никакой другой поток не может занять место между ними. Если P c истинно в начале каждой операции сигнала c , оно будет истинным и в конце каждой операции ожидания c . Это суммировано следующими контрактами . В этих контрактах I является инвариантом монитора .

войдите в монитор: постусловие  Iоставь монитор: при условии, что  яwait  c : предварительное условие  I  изменяет состояние постусловия монитора P c  и  Iсигнал  c : предварительное условие  P c  и  I  изменяет состояние постусловия монитора I.сигнал  c  и возврат : предварительное условие  P c  и  I

В этих контрактах предполагается, что I и P c не зависят от содержимого или длины каких-либо очередей.

(Когда у условной переменной можно запросить количество потоков, ожидающих в ее очереди, можно задать более сложные контракты. Например, полезная пара контрактов, позволяющая передавать занятость без установления инварианта, такова:

wait  c : предварительное условие  I  изменяет состояние постусловия монитора P c Предварительное условие сигнала c ( не пустое( c ) и P c ) или (пустое ( c ) и I ) изменяет состояние постусловия монитора I   

( Подробнее см. Ховард [4] и Бур и др. [5] ).

Здесь важно отметить, что утверждение P c полностью зависит от программиста; ему или ей просто нужно быть последовательным в том, что это такое.

Мы завершаем этот раздел примером потокобезопасного класса, использующего блокирующий монитор, реализующий ограниченный поточно-безопасный стек .

класс монитора  SharedStack { Private const емкость := 10 частный  int [емкость] Частный размер  int := 0 инвариант 0 <= размер и размер <= емкость Private  BlockingCondition theStackIsNotEmpty /* связан с 0 < размер и размер <= емкость */  частный  BlockingCondition theStackIsNotFull /* связан с 0 <= размер и размер < емкость */ общедоступный метод push ( целое значение) { если размер = емкость , то  подождите, пока theStackIsNotFull утверждает 0 <= размер и размер < емкость A[размер] := значение; размер := размер + 1 утверждать 0 <размер и размер <= сигнал емкости theStackIsNotEmpty и возвращать } публичный метод  int pop() { если размер = 0 , подождите  , пока StackIsNotEmpty утверждает 0 <размер и размер <= емкость размер := размер - 1; утверждать 0 <= размер и размер < сигнал емкости theStackIsNotFull и возвращать A[размер] }}

Обратите внимание, что в этом примере потокобезопасный стек внутренне предоставляет мьютекс, который, как и в предыдущем примере производителя/потребителя, используется обеими переменными условия, которые проверяют разные условия для одних и тех же параллельных данных. Единственное отличие состоит в том, что в примере производителя/потребителя предполагалась обычная непотокобезопасная очередь и использовались автономные мьютексы и условные переменные, без абстрагирования этих деталей монитора, как в данном случае. В этом примере, когда вызывается операция «ожидание», она должна каким-то образом снабжаться мьютексом потокобезопасного стека, например, если операция «ожидание» является интегрированной частью «класса монитора». Помимо такого рода абстрактной функциональности, когда используется «необработанный» монитор, он всегда должен включать мьютекс и условную переменную с уникальным мьютексом для каждой условной переменной.

Неблокирующие переменные условия

При использовании неблокирующих условных переменных (также называемых условными переменными в стиле Mesa или условными переменными «сигнал и продолжение» ) передача сигналов не приводит к тому, что поток сигнализации теряет занятость монитора. Вместо этого сигнальные потоки перемещаются в eочередь. Очереди не нужны s.

Монитор в стиле Mesa с двумя переменными состояния aиb

При использовании неблокирующих условных переменных операцию сигнала часто называют уведомлением — этой терминологии мы будем следовать здесь. Также часто используется операция notify all , которая перемещает все потоки, ожидающие условной переменной, в очередь e.

Здесь дается значение различных операций. (Мы предполагаем, что каждая операция выполняется во взаимном исключении других; таким образом, перезапущенные потоки не начинают выполнение до завершения операции.)

войдите в монитор: введите метод если монитор заблокирован добавить эту тему в эл. заблокировать эту тему еще заблокировать монитороставь монитор: расписание возврат из методаподожди  с : добавить эту тему в c.q расписание заблокировать эту темууведомить  С : если есть поток, ожидающий c .q выберите и удалите один поток t из c.q (t называется «уведомленный поток») переместить т в еуведомить всех  c : переместить все потоки, ожидающие c .q, в eрасписание : если есть тема на е выберите и удалите один поток из e и перезапустите его еще разблокировать монитор

В качестве разновидности этой схемы уведомленный поток может быть перемещен в очередь с именем w, имеющая приоритет над e. См. Howard [4] и Buhr et al. [5] для дальнейшего обсуждения.

Можно связать утверждение P c с каждой условной переменной c так, что P c обязательно будет истинным по возвращении из . Однако необходимо гарантировать, что P c сохраняется с момента, когда уведомляющий поток покидает занятость, до тех пор, пока уведомляемый поток не будет выбран для повторного входа в монитор. В это время могут наблюдаться активности других жильцов. Таким образом, обычно P c просто истинно .wait c

По этой причине обычно необходимо заключать каждую операцию ожидания в такой цикл:

пока  нет ( P ) подожди c _

где P — некоторое условие, более сильное, чем P c . Операции и рассматриваются как «подсказки» о том, что P может быть истинным для некоторого ожидающего потока. Каждая итерация такого цикла после первой представляет собой потерю уведомления; таким образом, при использовании неблокирующих мониторов необходимо быть осторожным, чтобы не потерять слишком много уведомлений.notify cnotify all c

В качестве примера «намека» рассмотрим банковский счет, на котором поток вывода средств будет ждать, пока на счету не будет достаточно средств, прежде чем продолжить.

класс монитора  Account { частный  int баланс := 0 инвариантный баланс >= 0 частный  NonblockingCondition балансMayBeBigEnough общедоступный метод вывода ( int sum) сумма предварительного условия >= 0 { while Balance <сумма подождите BalanceMayBeBigEnough утвердить баланс > = сумма баланс := баланс - сумма } публичный метод депозита ( int sum) сумма предварительного условия >= 0 { баланс := баланс + сумма уведомить весь балансMayBeBigEnough }}

В этом примере ожидаемое условие является функцией суммы, подлежащей снятию, поэтому поток внесения депозита не может знать, что он сделал такое условие истинным. В этом случае имеет смысл разрешить каждому ожидающему потоку подключаться к монитору (по одному) для проверки истинности его утверждения.

Мониторы неявных переменных условий

Монитор в стиле Java

В языке Java каждый объект может использоваться в качестве монитора. Методы, требующие взаимного исключения, должны быть явно отмечены ключевым словом Synchronized . Блоки кода также могут быть отмечены синхронизированными . [6]

Вместо явных переменных условий каждый монитор (т. е. объект) оснащен одной очередью ожидания в дополнение к своей очереди на вход. Все ожидания выполняются в этой единственной очереди ожидания, и все операции notify и notifyAll применяются к этой очереди. [7] Этот подход был принят в других языках, например C# .

Неявная сигнализация

Другой подход к передаче сигналов — опустить операцию сигнала . Всякий раз, когда поток покидает монитор (путем возврата или ожидания), утверждения всех ожидающих потоков оцениваются до тех пор, пока одно из них не окажется истинным. В такой системе условные переменные не нужны, но утверждения должны быть явно закодированы. Контракт на ожидание

wait  P : предварительное условие  I  изменяет состояние монитора, постусловие  P  и  I

История

Бринч Хансен и Хоар разработали концепцию монитора в начале 1970-х годов, основываясь на своих более ранних идеях и идеях Эдсгера Дейкстры . [8] Бринч Хансен опубликовал первую нотацию монитора, приняв концепцию классов Simula 67 , [1] и изобрел механизм организации очередей. [9] Хоар уточнил правила возобновления процесса. [2] Бринч Хансен создал первую реализацию мониторов в Concurrent Pascal . [8] Хоар продемонстрировал их эквивалентность семафорам .

Мониторы (и Concurrent Pascal) вскоре стали использоваться для структурирования синхронизации процессов в операционной системе Solo. [10] [11]

Языки программирования, поддерживающие мониторы, включают:

Был написан ряд библиотек, позволяющих создавать мониторы на языках, которые не поддерживают их изначально. Когда используются вызовы библиотеки, программист должен явно отметить начало и конец кода, выполняемого с взаимным исключением. Pthreads — одна из таких библиотек.

Смотрите также

Примечания

  1. ^ аб Бринч Хансен, Пер (1973). «Концепция класса 7.2» (PDF) . Принципы операционной системы . Прентис Холл. ISBN 978-0-13-637843-3.
  2. ^ Аб Хоар, ЦАР (октябрь 1974 г.). «Мониторы: концепция структурирования операционной системы». Комм. АКМ . 17 (10): 549–557. CiteSeerX 10.1.1.24.6394 . дои : 10.1145/355620.361161. S2CID  1005769. 
  3. ^ Хансен, ПБ (июнь 1975 г.). «Язык программирования Concurrent Pascal» (PDF) . IEEE Транс. Программное обеспечение англ. СЭ-1 (2): 199–207. дои : 10.1109/TSE.1975.6312840. S2CID  2000388.
  4. ^ аб Ховард, Джон Х. (1976). «Сигнализация в мониторах». ICSE '76 Материалы 2-й международной конференции по программной инженерии . Международная конференция по программной инженерии. Лос-Аламитос, Калифорния, США: Издательство IEEE Computer Society Press. стр. 47–52.
  5. ^ аб Бур, Питер А.; Фортье, Мишель; Гроб, Майкл Х. (март 1995 г.). «Классификация мониторов». Обзоры вычислительной техники ACM . 27 (1): 63–107. дои : 10.1145/214037.214100 . S2CID  207193134.
  6. ^ Блох 2018, с. 311-316, §Пункт 11: Синхронизация доступа к общим изменяемым данным.
  7. ^ Блох 2018, с. 325-329, §Глава 11, пункт 81. Предпочитайте, чтобы утилиты параллелизма ждали и уведомляли.
  8. ^ Аб Хансен, Пер Бринч (1993). «Мониторы и параллельный Паскаль: личная история». HOPL-II: Вторая конференция ACM SIGPLAN по истории языков программирования . История языков программирования. Нью-Йорк, штат Нью-Йорк, США: ACM . стр. 1–35. дои : 10.1145/155360.155361. ISBN 0-89791-570-4.
  9. ^ Бринч Хансен, Пер (июль 1972 г.). «Структурированное мультипрограммирование (приглашенный доклад)». Коммуникации АКМ . 15 (7): 574–578. дои : 10.1145/361454.361473 . S2CID  14125530.
  10. ^ Бринч Хансен, Пер (апрель 1976 г.). «Операционная система Solo: программа Concurrent Pascal» (PDF) . Программное обеспечение: практика и опыт .
  11. ^ Бринч Хансен, Пер (1977). Архитектура параллельных программ . Прентис Холл. ISBN 978-0-13-044628-2.
  12. ^ «синхронизация — язык программирования Go» . golang.org . Проверено 17 июня 2021 г.
  13. ^ "Что такое "sync.Cond" | dtyler.io" . dtyler.io . Архивировано из оригинала 01 октября 2021 г. Проверено 17 июня 2021 г.

дальнейшее чтение

Внешние ссылки