stringtranslate.com

Монитор (синхронизация)

В параллельном программировании монитор — это конструкция синхронизации, которая не позволяет потокам одновременно получать доступ к состоянию общего объекта и позволяет им ожидать изменения состояния. Они предоставляют потоку механизм, позволяющий временно отказаться от исключительного доступа, чтобы дождаться выполнения некоторого условия, прежде чем восстановить исключительный доступ и возобновить выполнение своей задачи. Монитор состоит из мьютекса (блокировки) и по крайней мере одной условной переменной . Условная переменная явно «сигнализируется», когда состояние объекта изменяется, временно передавая мьютекс другому потоку, «ожидающему» условную переменную.

Другое определение монитора — это потокобезопасный класс , объект или модуль , который оборачивает мьютекс , чтобы безопасно разрешить доступ к методу или переменной более чем одному потоку . Определяющей характеристикой монитора является то, что его методы выполняются с взаимным исключением : в каждый момент времени не более одного потока может выполнять любой из его методов . Используя одну или несколько переменных условий, он также может предоставить потокам возможность ожидать определенного условия (таким образом, используя приведенное выше определение «монитора»). В оставшейся части этой статьи это значение «монитора» будет называться «потокобезопасным объектом/классом/модулем».

Мониторы были изобретены Пером Бринчем Хансеном [1] и К. А. Р. Хоаром [2] и впервые были реализованы на языке Concurrent Pascal Бринка Хансена . [3]

Взаимное исключение

Пока поток выполняет метод потокобезопасного объекта, говорят, что он занимает объект, удерживая его мьютекс (блокировку) . Потокобезопасные объекты реализованы для обеспечения того, чтобы в каждый момент времени не более одного потока могли занимать объект . Блокировка, которая изначально разблокирована, блокируется в начале каждого публичного метода и разблокируется при каждом возврате из каждого публичного метода.

При вызове одного из методов поток должен дождаться, пока никакой другой поток не выполнит ни один из методов потокобезопасного объекта, прежде чем начать выполнение своего метода. Обратите внимание, что без этого взаимного исключения два потока могут привести к потере или получению денег без причины. Например, два потока, снимающие 1000 со счета, могут оба вернуть true, в то же время вызывая уменьшение баланса только на 1000, следующим образом: сначала оба потока извлекают текущий баланс, находят его больше 1000 и вычитают из него 1000; затем оба потока сохраняют баланс и возвращаются.

Условные переменные

Постановка проблемы

Для многих приложений взаимное исключение недостаточно. Потоки, пытающиеся выполнить операцию, могут ожидать, пока не будет выполнено некоторое условие P. Цикл ожидания с активным ожиданием

пока  нет ( P ) пропустить 

не сработает, так как взаимное исключение не позволит любому другому потоку войти в монитор, чтобы условие стало истинным. Существуют и другие «решения», например, цикл, который разблокирует монитор, ждет определенное время, блокирует монитор и проверяет условие P. Теоретически это работает и не приведет к взаимоблокировке, но возникают проблемы. Трудно определить подходящее время ожидания: слишком мало — и поток будет загружать процессор, слишком много — и он, по-видимому, не будет отвечать. Нужен способ сообщить потоку, когда условие P истинно (или может быть истинным).

Пример из практики: классическая ограниченная проблема производителя/потребителя

Классическая проблема параллелизма — это проблема ограниченного производителя/потребителя , в которой есть очередь или кольцевой буфер задач максимального размера, при этом один или несколько потоков являются потоками «производителя», которые добавляют задачи в очередь, а один или несколько других потоков являются потоками «потребителя», которые извлекают задачи из очереди. Предполагается, что очередь сама по себе не является потокобезопасной и может быть пустой, полной или между пустой и полной. Всякий раз, когда очередь заполнена задачами, нам нужно, чтобы потоки-производители блокировались до тех пор, пока не освободится место для потоков-потребителей, вытесняющих задачи из очереди. С другой стороны, всякий раз, когда очередь пуста, нам нужно, чтобы потоки-потребители блокировались до тех пор, пока не станет доступно больше задач из-за добавления их потоками-производителями.

Поскольку очередь является параллельным объектом, разделяемым потоками, доступ к ней должен быть сделан атомарным , поскольку очередь может быть переведена в несогласованное состояние в ходе доступа к очереди, который никогда не должен быть раскрыт между потоками. Таким образом, любой код, который обращается к очереди, представляет собой критическую секцию , которая должна быть синхронизирована путем взаимного исключения. Если инструкции кода и процессора в критических секциях кода, которые обращаются к очереди, могут быть перемежены произвольными переключениями контекста между потоками на одном процессоре или одновременно запущенными потоками на нескольких процессорах, то существует риск раскрытия несогласованного состояния и возникновения условий гонки .

Неверно без синхронизации

Наивный подход заключается в проектировании кода с активным ожиданием и без синхронизации, что делает код подверженным условиям гонки:

глобальная очередь RingBuffer ; // Поточно-небезопасный кольцевой буфер задач.   // Метод, представляющий поведение каждого потока-производителя: public method producer () { while ( true ) { task myTask = ...; // Производитель создает новую задачу для добавления. while ( queue . isFull ()) {} // Ожидание занятости, пока очередь не станет полной. queue . enqueue ( myTask ); // Добавление задачи в очередь. } }                  // Метод , представляющий поведение каждого потока-потребителя: public method consumer () { while ( true ) { while ( queue.isEmpty ()) { } // Ожидание, пока очередь не станет непустой. myTask = queue.dequeue (); // Извлечение задачи из очереди. doStuff ( myTask ); // Отключиться и сделать что-нибудь с задачей. } }                 

Этот код имеет серьезную проблему, заключающуюся в том, что доступ к очереди может прерываться и чередоваться с доступом других потоков к очереди. Методы queue.enqueue и queue.dequeue, вероятно, содержат инструкции по обновлению переменных-членов очереди, таких как ее размер, начальная и конечная позиции, назначение и распределение элементов очереди и т. д. Кроме того, методы queue.isEmpty() и queue.isFull() также считывают это общее состояние. Если потокам производителя/потребителя разрешено чередоваться во время вызовов enqueue/dequeue, то может быть обнаружено несогласованное состояние очереди, что приведет к условиям гонки. Кроме того, если один потребитель сделает очередь пустой между выходом другого потребителя из состояния ожидания и вызовом "dequeue", то второй потребитель попытается выйти из пустой очереди, что приведет к ошибке. Аналогично, если производитель заполнит очередь в промежутке между выходом другого производителя из состояния ожидания и вызовом «enqueue», то второй производитель попытается добавить элемент в полную очередь, что приведет к ошибке.

Ожидание спина

Один из наивных подходов к достижению синхронизации, как упоминалось выше, заключается в использовании « спин-ожидания », при котором для защиты критических участков кода используется мьютекс, а активное ожидание по-прежнему используется, при этом блокировка устанавливается и снимается между каждой проверкой активного ожидания.

global RingBuffer queue ; // Потокобезопасный кольцевой буфер задач. global Lock queueLock ; // Мьютекс для кольцевого буфера задач.      // Метод, представляющий поведение каждого потока-производителя: public method producer () { while ( true ) { task myTask = ...; // Производитель создает новую задачу для добавления.            queueLock . acquire (); // Получить блокировку для первоначальной проверки занятости-ожидания. while ( queue . isFull ()) { // Ожидание занятости, пока очередь не станет полной. queueLock . release (); // Временно снять блокировку, чтобы дать возможность другим потокам , // которым требуется queueLock для выполнения, чтобы потребитель мог принять задачу. queueLock . acquire (); // Повторно получить блокировку для следующего вызова "queue.isFull()". }            queue.enqueue ( myTask ) ; // Добавляем задачу в очередь.queueLock.release ( ); // Снимаем блокировку очереди, пока она нам снова не понадобится для добавления следующей задачи . } }    // Метод, представляющий поведение каждого потока-потребителя: public method consumer () { while ( true ) { queueLock . acquire (); // Получить блокировку для начальной проверки занятости. while ( queue . isEmpty ()) { // Ожидание занятости, пока очередь не станет непустой. queueLock . release (); // Временно снять блокировку, чтобы дать возможность другим потокам , // которым требуется queueLock для выполнения, чтобы производитель мог добавить задачу. queueLock . acquire (); // Повторно получить блокировку для следующего вызова "queue.isEmpty()". } myTask = queue . dequeue (); // Снять задачу из очереди. queueLock . release (); // Снять блокировку очереди, пока она нам снова не понадобится, чтобы снять следующую задачу. doStuff ( myTask ); // Идти и что-нибудь делать с задачей. } }                           

Этот метод гарантирует, что несогласованное состояние не возникнет, но тратит ресурсы ЦП из-за ненужного ожидания. Даже если очередь пуста и потокам-производителям нечего добавить в течение длительного времени, потоки-потребители всегда находятся в состоянии ожидания без необходимости. Аналогично, даже если потребители долгое время блокируются при обработке своих текущих задач, а очередь заполнена, производители всегда находятся в состоянии ожидания. Это расточительный механизм. Нужен способ заставить потоки-производители блокироваться до тех пор, пока очередь не заполнится, и способ заставить потоки-потребители блокироваться до тех пор, пока очередь не станет непустой.

(Примечание: Мьютексы сами по себе также могут быть спин-блокировками , которые подразумевают активное ожидание для получения блокировки, но для решения этой проблемы бесполезной траты ресурсов ЦП мы предполагаем, что queueLock не является спин-блокировкой и правильно использует саму очередь блокирующих блокировок.)

Условные переменные

Решение заключается в использовании переменных условий . Концептуально переменная условий — это очередь потоков, связанная с мьютексом, в которой поток может ожидать, пока некоторое условие не станет истинным. Таким образом, каждая переменная условий c связана с утверждением P c . Пока поток ожидает переменную условий, этот поток не считается занимающим монитор, и поэтому другие потоки могут войти в монитор, чтобы изменить его состояние. В большинстве типов мониторов эти другие потоки могут сигнализировать переменной условий c , чтобы указать, что утверждение P c является истинным в текущем состоянии.

Таким образом, над условными переменными существуют три основные операции:

Как правило проектирования, несколько переменных условий могут быть связаны с одним и тем же мьютексом, но не наоборот. (Это соответствие один ко многим .) Это связано с тем, что предикат P c одинаков для всех потоков, использующих монитор, и должен быть защищен взаимным исключением от всех других потоков, которые могут вызвать изменение условия или которые могут прочитать его, пока рассматриваемый поток вызывает его изменение, но могут быть разные потоки, которые хотят ожидать другого условия для той же переменной, требуя использования того же мьютекса. В примере производитель-потребитель, описанном выше, очередь должна быть защищена уникальным объектом мьютекса, m. Потоки «производителя» захотят ждать на мониторе, используя блокировку mи переменную условия , которая блокирует, пока очередь не заполнится. Потоки «потребителя» захотят ждать на другом мониторе, используя тот же мьютекс , но другую переменную условия , которая блокирует, пока очередь не станет непустой. (Обычно) никогда не имеет смысла иметь разные мьютексы для одной и той же условной переменной, но этот классический пример показывает, почему часто определенно имеет смысл иметь несколько условных переменных, использующих один и тот же мьютекс. Мьютекс, используемый одной или несколькими условными переменными (одним или несколькими мониторами), может также использоваться совместно с кодом, который не использует условные переменные (и который просто получает/освобождает его без каких-либо операций ожидания/сигнала), если эти критические разделы не требуют ожидания определенного условия для параллельных данных.m

Мониторинг использования

Правильное базовое использование монитора:

acquire ( m ); // Получить блокировку этого монитора. while ( ! p ) { // Пока условие/предикат/утверждение, которое мы ожидаем, не является истинным... wait ( m , cv ); // Ожидать блокировки этого монитора и условной переменной. } // ... Здесь находится критический раздел кода ... signal ( cv2 ); // Или: broadcast(cv2); // cv2 может быть таким же, как cv, или другим. release ( m ); // Снять блокировку этого монитора.         

Ниже представлен тот же псевдокод, но с более подробными комментариями, которые лучше объясняют происходящее:

// ... (предыдущий код) // Сейчас войдём в монитор. // Получаем консультативный мьютекс (блокировку), связанный с параллельными данными, // которые совместно используются потоками, // чтобы гарантировать, что никакие два потока не могут быть упреждающе перемежены или // запущены одновременно на разных ядрах при выполнении в критических // секциях, которые считывают или записывают эти же параллельные данные. Если другой // поток удерживает этот мьютекс, то этот поток будет переведен в спящий режим // (заблокирован) и помещен в очередь сна m. (Мьютекс "m" не должен быть // спин-блокировкой.) acquire ( m ); // Теперь мы удерживаем блокировку и можем проверить состояние в // первый раз.// В первый раз, когда мы выполняем условие цикла while после приведенного выше // "acquire", мы спрашиваем: "Не является ли условие/предикат/утверждение, // которое мы ожидаем, уже истинным?"while ( ! p ()) // "p" - это любое выражение (например, переменная или // вызов функции), которое проверяет условие и // возвращает логическое значение. Это само по себе критический раздел, поэтому вы *ДОЛЖНЫ* удерживать блокировку при // выполнении этого условия цикла "while"! // Если это не первый раз, когда проверяется условие "while", // то мы задаем вопрос: "Теперь, когда другой поток, использующий этот // монитор, уведомил меня и разбудил, и я был переключен // обратно в контекст, оставалось ли условие/предикат/утверждение, которое мы ожидаем, // истинным между моментом моего пробуждения и моментом повторного получения // блокировки внутри вызова "wait" в последней итерации этого цикла, или // какой-то другой поток снова заставил условие стать ложным // в это время, тем самым сделав это ложным пробуждением?  { // Если это первая итерация цикла, то ответ: // "нет" — условие еще не готово. В противном случае ответ: // последнее. Это было ложное пробуждение, какой-то другой поток возник первым и снова сделал условие ложным, и мы должны // снова подождать.wait ( m , cv ); // Временно запретить любому другому потоку на любом ядре выполнять // операции с m или cv. // release(m) // Атомарно снять блокировку "m", чтобы другой // // код, использующий эти параллельные данные , // // мог работать, переместить этот поток в очередь ожидания cv, чтобы он был уведомлен // // когда-нибудь, когда условие станет // // истинным, и перевести этот поток в спящий режим. Повторно включить // // другие потоки и ядра для выполнения // // операций с m и cv. // // На этом ядре происходит переключение контекста. // // В какой-то момент в будущем условие, которого мы ждем, становится // истинным, и другой поток, использующий этот монитор (m, cv), либо подает // сигнал, который пробуждает этот поток, либо // широковещательную передачу, которая пробуждает нас, означающую, что мы были выведены // из очереди ожидания cv. // // В это время другие потоки могут снова сделать условие // ложным, или условие может переключиться один или несколько // раз, или оно может остаться истинным. // // Этот поток переключается обратно на какое-то ядро. // // acquire(m) // Блокировка "m" повторно захватывается. // Завершить эту итерацию цикла и повторно проверить условие цикла "while", чтобы убедиться, // что предикат все еще истинен. } // Условие, которого мы ждем, истинно! // Мы все еще удерживаем блокировку, либо до входа в монитор, либо с // последнего выполнения «wait».// Здесь находится критический раздел кода, в котором есть предварительное условие, что наш предикат // должен быть истинным. // Этот код может сделать условие cv ложным и/или сделать предикаты других условных переменных // истинными.// Вызов сигнала или широковещательная передача в зависимости от того, какие предикаты условных переменных // (которые совместно используют мьютекс m) были сделаны истинными или могли быть сделаны истинными, // и от используемого семантического типа монитора.for ( cv_x in cvs_to_signal ) { signal ( cv_x ); // Или: broadcast(cv_x); } // Один или несколько потоков были разбужены, но будут заблокированы, как только попытаются // получить m.     // Освободить мьютекс, чтобы уведомленные потоки и другие могли войти в свои критические // разделы. release ( m );

Решение проблемы ограниченного производителя/потребителя

Введя использование переменных условий, давайте используем его для повторного рассмотрения и решения классической ограниченной проблемы производителя/потребителя. Классическое решение заключается в использовании двух мониторов, включающих две переменные условий, разделяющие одну блокировку в очереди:

global volatile RingBuffer queue ; // Потокобезопасный кольцевой буфер задач. global Lock queueLock ; // Мьютекс для кольцевого буфера задач. (Не спин-блокировка.) global CV queueEmptyCV ; // Условная переменная для потоков-потребителей, ожидающих, пока очередь // не станет непустой. Связанная с ней блокировка — «queueLock». global CV queueFullCV ; // Условная переменная для потоков-производителей, ожидающих, пока очередь // не станет полной. Связанная с ней блокировка также «queueLock».               // Метод, представляющий поведение каждого потока-производителя: public method producer () { while ( true ) { // Производитель создает некоторую новую задачу для добавления. task myTask = ...;            // Получить "queueLock" для начальной проверки предиката. queueLock . acquire ();  // Критическая секция, которая проверяет, не заполнена ли очередь. while ( queue . isFull ()) { // Освобождаем "queueLock", ставим этот поток в очередь "queueFullCV" и переводим его в спящий режим. wait ( queueLock , queueFullCV ); // Когда этот поток просыпается, повторно захватываем "queueLock" для следующей проверки предиката. }         // Критическая секция, которая добавляет задачу в очередь (обратите внимание, что мы удерживаем "queueLock"). queue . enqueue ( myTask );  // Разбудить один или все потоки-потребители, ожидающие, когда очередь станет непустой , // теперь, когда это гарантировано, поток-потребитель примет задачу. signal ( queueEmptyCV ); // Или: broadcast(queueEmptyCV); // Конец критических секций.     // Освобождаем "queueLock" до тех пор, пока он нам снова не понадобится для добавления следующей задачи. queueLock . release (); } }  // Метод, представляющий поведение каждого потока-потребителя: public method consumer () { while ( true ) { // Получить "queueLock" для начальной проверки предиката. queueLock . acquire ();         // Критическая секция, которая проверяет, не пуста ли очередь. while ( queue . isEmpty ()) { // Освобождаем "queueLock", ставим этот поток в очередь "queueEmptyCV" и переводим его в спящий режим. wait ( queueLock , queueEmptyCV ); // Когда этот поток просыпается, повторно захватываем "queueLock" для следующей проверки предиката. }         // Критическая секция, которая снимает задачу с очереди (обратите внимание, что мы удерживаем «queueLock»). myTask = queue . dequeue ();    // Разбудить один или все потоки-производители, ожидающие, пока очередь не будет заполнена , // теперь, когда это гарантировано, чтобы поток-производитель добавил задачу. signal ( queueFullCV ); // Или: broadcast(queueFullCV); // Конец критических секций.     // Освобождаем "queueLock" до тех пор, пока он нам снова не понадобится для выполнения следующей задачи. queueLock . release ();  // Идите и сделайте что-нибудь с задачей. doStuff ( myTask ); } }  

Это обеспечивает параллелизм между потоками производителя и потребителя, совместно использующими очередь задач, и блокирует потоки, которым нечего делать, а не заставляет их находиться в состоянии ожидания, как показано в вышеупомянутом подходе с использованием спин-блокировок.

Вариант этого решения может использовать одну переменную условия как для производителей, так и для потребителей, возможно, с именем "queueFullOrEmptyCV" или "queueSizeChangedCV". В этом случае с переменной условия связано более одного условия, так что переменная условия представляет более слабое условие, чем условия, проверяемые отдельными потоками. Переменная условия представляет потоки, которые ждут, чтобы очередь не была полной, и потоки, которые ждут, чтобы она не была пустой. Однако для этого потребуется использовать широковещательную рассылку во всех потоках, использующих переменную условия, и нельзя использовать обычный сигнал . Это связано с тем, что обычный сигнал может разбудить поток неправильного типа, условие которого еще не было выполнено, и этот поток вернется в спящий режим без получения сигнала потоком правильного типа. Например, производитель может сделать очередь полной и разбудить другого производителя вместо потребителя, и разбуженный производитель вернется в спящий режим. В дополнительном случае потребитель может сделать очередь пустой и разбудить другого потребителя вместо производителя, и потребитель вернется в спящий режим. Использование широковещательной передачи гарантирует, что некоторый поток нужного типа будет выполняться так, как ожидается в соответствии с условием задачи.

Вот вариант, использующий только одну условную переменную и трансляцию:

global volatile RingBuffer queue ; // Потоконебезопасный кольцевой буфер задач. global Lock queueLock ; // Мьютекс для кольцевого буфера задач. (Не спин-блокировка.) global CV queueFullOrEmptyCV ; // Единственная переменная условия для случая, когда очередь не готова ни к одному потоку , // т. е. для потоков-производителей, ожидающих, пока очередь не станет полной , // и потоков-потребителей, ожидающих, пока очередь не станет непустой. // Связанная с ней блокировка — «queueLock». // Небезопасно использовать обычный «сигнал», поскольку он связан с // несколькими предикатными условиями (утверждениями).               // Метод, представляющий поведение каждого потока-производителя: public method producer () { while ( true ) { // Производитель создает некоторую новую задачу для добавления. task myTask = ...;            // Получить "queueLock" для начальной проверки предиката. queueLock . acquire ();  // Критическая секция, которая проверяет, не заполнена ли очередь. while ( queue . isFull ()) { // Освобождаем "queueLock", ставим этот поток в очередь "queueFullOrEmptyCV" и переводим его в спящий режим. wait ( queueLock , queueFullOrEmptyCV ); // Когда этот поток просыпается, повторно захватываем "queueLock" для следующей проверки предиката. }         // Критическая секция, которая добавляет задачу в очередь (обратите внимание, что мы удерживаем "queueLock"). queue . enqueue ( myTask );  // Разбудить все потоки-производители и потоки-потребители, ожидающие, пока очередь не будет // соответственно незаполненной и непустой, теперь, когда последнее гарантировано, так что поток-потребитель примет задачу. broadcast ( queueFullOrEmptyCV ); // Не использовать «сигнал» (так как он может разбудить только другой поток-производитель). // Конец критических разделов.     // Освобождаем "queueLock" до тех пор, пока он нам снова не понадобится для добавления следующей задачи. queueLock . release (); } }  // Метод, представляющий поведение каждого потока-потребителя: public method consumer () { while ( true ) { // Получить "queueLock" для начальной проверки предиката. queueLock . acquire ();         // Критическая секция, которая проверяет, не пуста ли очередь. while ( queue . isEmpty ()) { // Освобождаем "queueLock", ставим этот поток в очередь "queueFullOrEmptyCV" и переводим этот поток в спящий режим. wait ( queueLock , queueFullOrEmptyCV ); // Когда этот поток просыпается, повторно захватываем "queueLock" для следующей проверки предиката. }         // Критическая секция, которая снимает задачу с очереди (обратите внимание, что мы удерживаем «queueLock»). myTask = queue . dequeue ();    // Разбудить все потоки-производители и потоки-потребители, ожидающие, пока очередь не будет // соответственно незаполненной и непустой, теперь, когда первое гарантировано, так что поток-производитель добавит задачу. broadcast ( queueFullOrEmptyCV ); // Не использовать «сигнал» (так как он может разбудить только другой поток-потребитель). // Конец критических разделов.     // Освобождаем "queueLock" до тех пор, пока он нам снова не понадобится для выполнения следующей задачи. queueLock . release ();  // Идите и сделайте что-нибудь с задачей. doStuff ( myTask ); } }  

Примитивы синхронизации

Мониторы реализованы с использованием атомарного примитива чтения-изменения-записи и примитива ожидания. Примитив чтения-изменения-записи (обычно test-and-set или compare-and-swap) обычно имеет форму инструкции блокировки памяти, предоставляемой ISA , но также может состоять из неблокирующих инструкций на однопроцессорных устройствах, когда прерывания отключены. Примитив ожидания может быть циклом занятости-ожидания или примитивом, предоставляемым ОС, который предотвращает планирование потока, пока он не будет готов к продолжению.

Ниже приведен пример реализации на псевдокоде частей потоковой системы, мьютексов и условных переменных в стиле Mesa с использованием метода «тест-и-установить» и политики «первым пришел — первым обслужен»:

Пример реализации Mesa-monitor с Test-and-Set

// Основные части системы потоков: // Предположим, что "ThreadQueue" поддерживает произвольный доступ. public volatile ThreadQueue readyQueue ; // Потокобезопасная очередь готовых потоков. Элементы: (Thread*). public volatile global Thread * currentThread ; // Предположим, что эта переменная является переменной ядра. (Другие являются общими.)         // Реализует спин-блокировку только для синхронизированного состояния самой потоковой системы. // Используется с test-and-set в качестве примитива синхронизации. public volatile global bool threadingSystemBusy = false ;      // Процедура обработки прерываний переключения контекста (ISR): // На текущем ядре ЦП упреждающе переключиться на другой поток. public method contextSwitchISR () { if ( testAndSet ( threadingSystemBusy )) { return ; // Сейчас невозможно переключить контекст. }          // Гарантируем, что это прерывание не возникнет снова, что может нарушить переключение контекста: systemCall_disableInterrupts ();  // Получить все регистры текущего процесса. // Для счетчика программ (PC) нам понадобится расположение инструкции // метки "resume" ниже. Получение значений регистров зависит от платформы и может включать // чтение текущего стекового кадра, инструкции JMP/CALL и т. д. (Подробности выходят за рамки этого описания.) currentThread -> registers = getAllRegisters (); // Сохранение регистров в объекте "currentThread" в памяти. currentThread -> registers . PC = resume ; // Установка следующего PC на метку "resume" ниже в этом методе.            readyQueue . enqueue ( currentThread ); // Помещаем этот поток обратно в очередь готовых потоков для последующего выполнения. Thread * otherThread = readyQueue . dequeue (); // Удаляем и запускаем следующий поток из очереди готовых потоков. currentThread = otherThread ; // Заменяем значение глобального указателя текущего потока, чтобы он был готов к следующему потоку.             // Восстановить регистры из currentThread/otherThread, включая переход к сохраненному PC другого потока // (в "resume" ниже). Опять же, подробности того, как это делается, выходят за рамки этой темы. restoreRegisters ( otherThread . registers );   // *** Сейчас запущен "otherThread" (который теперь "currentThread")! Исходный поток теперь "спит". *** resume : // Это то место, куда должен быть установлен другой вызов contextSwitch() при переключении контекста обратно сюда.  // Возвращаемся к месту остановки otherThread. threadingSystemBusy = false ; // Должно быть атомарное назначение. systemCall_enableInterrupts (); // Включаем упреждающее переключение обратно на этом ядре. }     // Метод сна потока: // На текущем ядре ЦП синхронное переключение контекста на другой поток без помещения текущего потока в очередь готовности. // Необходимо удерживать "threadingSystemBusy" и отключить прерывания, чтобы этот метод // не прерывался таймером переключения потоков, который вызовет contextSwitchISR(). // После возврата из этого метода необходимо очистить "threadingSystemBusy". public method threadSleep () { // Получить все регистры текущего процесса. // Для счетчика программ (PC) нам понадобится расположение инструкции // метки "resume" ниже. Получение значений регистров зависит от платформы и может включать // чтение текущего стекового кадра, инструкции JMP/CALL и т. д. (Подробности выходят за рамки этой темы.) currentThread -> registers = getAllRegisters (); // Сохранение регистров в объекте "currentThread" в памяти. currentThread -> registers . PC = resume ; // Установить следующий PC на метку "resume" ниже в этом методе.                // В отличие от contextSwitchISR(), мы не будем помещать currentThread обратно в readyQueue. // Вместо этого он уже помещен в очередь мьютекса или условной переменной. Thread * otherThread = readyQueue . dequeue (); // Удалить и запустить следующий поток из очереди готовых потоков. currentThread = otherThread ; // Заменить глобальное значение указателя текущего потока, чтобы он был готов к следующему потоку.             // Восстановить регистры из currentThread/otherThread, включая переход к сохраненному PC другого потока // (в "resume" ниже). Опять же, подробности того, как это делается, выходят за рамки этой темы. restoreRegisters ( otherThread . registers );   // *** Сейчас запущен "otherThread" (который теперь "currentThread")! Исходный поток теперь "спит". *** resume : // Это то место, куда должен быть установлен другой вызов contextSwitch() при переключении контекста обратно сюда.  // Возвращаемся к месту, где остановился otherThread. }public method wait ( Mutex m , ConditionVariable c ) { // Внутренняя спин-блокировка, пока другие потоки на любом ядре обращаются к // "held" и "threadQueue" или "readyQueue" этого объекта. while ( testAndSet ( threadingSystemBusy )) {} // Примечание: "threadingSystemBusy" теперь имеет значение true. // Системный вызов для отключения прерываний на этом ядре, чтобы threadSleep() не прерывался // таймером переключения потоков на этом ядре, который вызвал бы contextSwitchISR(). // Выполняется вне threadSleep() для большей эффективности, чтобы этот поток был переведен в спящий режим // сразу после перехода в очередь переменных условия. systemCall_disableInterrupts (); assert m . hold ; // (В частности, этот поток должен быть тем, кто его удерживает.) m . release (); c . waitingThreads . enqueue ( currentThread ); threadSleep (); // Поток спит... Поток пробуждается от сигнала/трансляции. threadingSystemBusy = false ; // Должно быть атомарное назначение. systemCall_enableInterrupts (); // Включаем упреждающее переключение обратно на этом ядре. // Стиль Mesa: // Теперь здесь могут происходить переключения контекста, делая предикат вызывающей стороны клиента ложным. m . acquire (); }                                         public method signal ( ConditionVariable c ) { // Внутренняя спин-блокировка, пока другие потоки на любом ядре обращаются к // "held" и "threadQueue" или "readyQueue" этого объекта. while ( testAndSet ( threadingSystemBusy )) {} // Примечание: "threadingSystemBusy" теперь имеет значение true. // Системный вызов для отключения прерываний на этом ядре, чтобы threadSleep() не прерывался // таймером переключения потоков на этом ядре, который вызвал бы contextSwitchISR(). // Выполняется вне threadSleep() для большей эффективности, чтобы этот поток был переведен в режим сна // сразу после перехода в очередь переменных условия. systemCall_disableInterrupts (); if ( ! c . waitingThreads . isEmpty ()) { wakenThread = c . waitingThreads . dequeue (); readyQueue . enqueue ( wakenThread ); } threadingSystemBusy = false ; // Должно быть атомарное назначение. systemCall_enableInterrupts (); // Включаем упреждающее переключение обратно на этом ядре. // Стиль Mesa: // Разбуженному потоку не дается никакого приоритета. }                                   public method broadcast ( ConditionVariable c ) { // Внутренняя спин-блокировка, пока другие потоки на любом ядре обращаются к // "held" и "threadQueue" или "readyQueue" этого объекта. while ( testAndSet ( threadingSystemBusy )) {} // Примечание: "threadingSystemBusy" теперь имеет значение true. // Системный вызов для отключения прерываний на этом ядре, чтобы threadSleep() не прерывался // таймером переключения потоков на этом ядре, который вызвал бы contextSwitchISR(). // Выполняется вне threadSleep() для большей эффективности, чтобы этот поток был переведен в режим сна // сразу после перехода в очередь переменных условия. systemCall_disableInterrupts (); while ( ! c . waitingThreads . isEmpty ()) { wakenThread = c . waitingThreads . dequeue (); readyQueue . enqueue ( wakenThread ); } threadingSystemBusy = false ; // Должно быть атомарное назначение. systemCall_enableInterrupts (); // Включаем упреждающее переключение обратно на этом ядре. // Стиль Mesa: // Разбуженным потокам не дается никакого приоритета. }                                   class Mutex { protected volatile bool hold = false ; private volatile ThreadQueue blockingThreads ; // Потоконно-небезопасная очередь заблокированных потоков. Элементы: (Thread*). public method acquire () { // Внутренняя спин-блокировка, пока другие потоки на любом ядре обращаются к // "held" и "threadQueue" или "readyQueue" этого объекта . while ( testAndSet ( threadingSystemBusy )) {} // Примечание: "threadingSystemBusy" теперь имеет значение true. // Системный вызов для отключения прерываний на этом ядре, чтобы threadSleep() не прерывался // таймером переключения потоков на этом ядре, который вызвал бы contextSwitchISR(). // Выполняется вне threadSleep() для большей эффективности, чтобы этот поток был переведен в режим сна // сразу после перехода в очередь блокировки. systemCall_disableInterrupts ();                               assert ! blockingThreads . содержит ( currentThread );  if ( hold ) { // Помещаем "currentThread" в очередь этой блокировки, чтобы он // считался "спящим" для этой блокировки. // Обратите внимание, что "currentThread" все еще должен обрабатываться функцией threadSleep(). readyQueue . remove ( currentThread ); blockingThreads . enqueue ( currentThread ); threadSleep (); // Теперь мы проснулись, что должно быть связано с тем, что "hold" стало false. assert ! hold ; assert ! blockingThreads . contains ( currentThread ); } hold = true ; threadingSystemBusy = false ; // Должно быть атомарное назначение. systemCall_enableInterrupts (); // Включаем упреждающее переключение обратно на этом ядре. } public method release () { // Внутренняя спин-блокировка, пока другие потоки на любом ядре обращаются к "hold" и "threadQueue" или "readyQueue" этого объекта . while ( testAndSet ( threadingSystemBusy )) {} // Примечание: "threadingSystemBusy" теперь имеет значение true. // Системный вызов для отключения прерываний на этом ядре для повышения эффективности. systemCall_disableInterrupts (); assert hold ; // (Освобождение следует выполнять только во время удержания блокировки.)                                               hold = false ; if ( ! blockingThreads . isEmpty ()) { Thread * unblockedThread = blockingThreads . dequeue (); readyQueue . enqueue ( unblockedThread ); } threadingSystemBusy = false ; // Должно быть атомарное назначение. systemCall_enableInterrupts (); // Включить упреждающее переключение обратно на этом ядре. } }                    struct ConditionVariable { volatile ThreadQueue waitingThreads ; }     

Блокировка переменных условий

Первоначальные предложения CAR Hoare и Per Brinch Hansen были для блокирующих переменных условий . С блокирующей переменной условий сигнальный поток должен ждать вне монитора (по крайней мере), пока сигнализируемый поток не освободит монитор, либо вернувшись, либо снова ожидая переменную условий. Мониторы, использующие блокирующие переменные условий, часто называют мониторами в стиле Хоара или мониторами сигнала и срочного ожидания .

Монитор в стиле Хоара с двумя переменными состояния aи b. По Буру и др.

Мы предполагаем, что с каждым объектом монитора связаны две очереди потоков.

Кроме того, мы предполагаем, что для каждой переменной условия c существует очередь

Обычно гарантируется, что все очереди будут справедливыми , а в некоторых реализациях может быть гарантировано, что они будут работать по принципу « первым пришел — первым обслужен» .

Реализация каждой операции выглядит следующим образом. (Мы предполагаем, что каждая операция выполняется во взаимном исключении по отношению к другим; таким образом, перезапущенные потоки не начинают выполняться до тех пор, пока операция не будет завершена.)

введите монитор: введите метод если монитор заблокирован добавить эту ветку в е заблокировать эту ветку еще заблокировать монитороставьте монитор: расписание возврат из методаподожди  с : добавить эту ветку в c .q расписание заблокировать эту веткусигнал  c : если есть поток, ожидающий c .q выберите и удалите одну такую ​​нить t из c .q (t называется «сигнальная нить») добавить эту ветку в s перезапустить т (так что t займет монитор следующим) заблокировать эту веткурасписание: если есть поток на s выберите и удалите один поток из s и перезапустите его (эта тема будет занимать монитор дальше) иначе если есть поток на e выберите и удалите один поток из e и перезапустите его (эта тема будет занимать монитор дальше) еще разблокировать монитор (монитор станет свободным)

Процедура scheduleвыбирает следующий поток для занятия монитора или, в случае отсутствия потоков-кандидатов, разблокирует монитор.

Результирующая дисциплина сигнализации известна как "сигнал и срочное ожидание", поскольку сигнализатор должен ждать, но имеет приоритет над потоками в очереди на входе. Альтернативой является "сигнал и ожидание", в котором нет sочереди, а сигнализатор вместо этого ждет в eочереди.

Некоторые реализации предусматривают операцию сигнала и возврата , которая объединяет сигнализацию с возвратом из процедуры.

подать сигнал  c  и вернуть : если есть поток, ожидающий c .q выберите и удалите одну такую ​​нить t из c .q (t называется «сигнальная нить») перезапустить т (так что t займет монитор следующим) еще расписание возврат из метода

В любом случае («сигнал и срочное ожидание» или «сигнал и ожидание»), когда переменная условия сигнализирована и есть по крайней мере один поток, ожидающий переменную условия, сигнализирующий поток плавно передает занятость сигнализированному потоку, так что никакой другой поток не может получить занятость между ними. Если P c является истинным в начале каждой операции сигнала c , он будет истинным в конце каждой операции ожидания c . Это суммируется следующими контрактами . В этих контрактах I является инвариантом монитора .

введите монитор: постусловие  Iоставьте монитор: предварительное условие  Iwait  c : предварительное условие  I  изменяет состояние монитора постусловия  P c  и  Iсигнал  c : предварительное условие  P c  и  I  изменяет состояние монитора после условия  Iсигнал  c  и возврат : предварительное условие  P c  и  I

В этих контрактах предполагается, что I и P c не зависят от содержимого или длины каких-либо очередей.

(Когда переменная условия может быть запрошена относительно количества потоков, ожидающих в своей очереди, могут быть заданы более сложные контракты. Например, полезная пара контрактов, позволяющая передавать занятость без установления инварианта, выглядит следующим образом:

wait  c : предварительное условие  I  изменяет состояние постусловия монитора  P cсигнал  c  предварительное условие ( непустое ( c ) и  P c ) или (пустое( c ) и  I ) изменяет состояние монитора постусловия  I

( Более подробную информацию см. в работах Говарда [4] и Бура и др. [5] .)

Здесь важно отметить, что утверждение P c полностью зависит от программиста; ему или ей просто нужно быть последовательным в отношении того, что это такое.

Завершим этот раздел примером потокобезопасного класса, использующего блокирующий монитор, реализующий ограниченный потокобезопасный стек .

класс монитора  SharedStack { private const capacity := 10 private  int [capacity] A private  int size := 0 invariant 0 <= size и size <= capacity private  BlockingCondition theStackIsNotEmpty /* связано с 0 < size и size <= capacity */  private  BlockingCondition theStackIsNotFull /* связано с 0 <= size и size < capacity */ публичный метод push( целое значение) { если размер = емкость , то  ждите, пока StackIsNotFull не выскажет утверждение 0 <= размер и размер < емкость A[размер] := значение ; размер := размер + 1 утверждать 0 < размер и размер <= емкость сигнализировать theStackIsNotEmpty и возвращать } публичный метод  int pop() { если размер = 0 , то  ждите, пока StackIsNotEmpty не установит 0 < размер и размер <= емкость размер := размер - 1 ; утверждать 0 <= размер и размер < емкость сигнализировать theStackIsNotFull и возвращать A[размер] }}

Обратите внимание, что в этом примере потокобезопасный стек внутренне предоставляет мьютекс, который, как и в предыдущем примере производителя/потребителя, совместно используется обеими переменными условия, которые проверяют различные условия для одних и тех же параллельных данных. Единственное отличие заключается в том, что пример производителя/потребителя предполагал обычную непотокобезопасную очередь и использовал автономный мьютекс и переменные условия, без абстрагирования этих деталей монитора, как в данном случае. В этом примере, когда вызывается операция "wait", она должна каким-то образом быть снабжена мьютексом потокобезопасного стека, например, если операция "wait" является интегрированной частью "класса монитора". Помимо этого вида абстрактной функциональности, когда используется "сырой" монитор, он всегда должен будет включать мьютекс и переменную условия, с уникальным мьютексом для каждой переменной условия.

Неблокирующие переменные условия

При использовании неблокируемых условных переменных (также называемых условными переменными "Mesa style" или условными переменными "signal and continue" ) сигнализация не приводит к тому, что сигнальный поток теряет занятость монитора. Вместо этого сигнализированные потоки перемещаются в eочередь. Очередь не нужна s.

Монитор в стиле Mesa с двумя переменными состояния aиb

С неблокируемыми условными переменными операция сигнала часто называется уведомлением — терминология, которой мы будем следовать здесь. Также распространено предоставление операции уведомления всех , которая перемещает все потоки, ожидающие условную переменную, в eочередь.

Здесь дается значение различных операций. (Мы предполагаем, что каждая операция выполняется во взаимном исключении по отношению к другим; таким образом, перезапущенные потоки не начинают выполняться до тех пор, пока операция не будет завершена.)

введите монитор: введите метод если монитор заблокирован добавить эту ветку в е заблокировать эту ветку еще заблокировать монитороставьте монитор: расписание возврат из методаподожди  с : добавить эту ветку в c .q расписание заблокировать эту веткууведомить  c : если есть поток, ожидающий c .q выбрать и удалить одну нить t из c .q (t называется «уведомленная ветка») переместить t в eуведомить всех  c : переместить все потоки, ожидающие c .q, в eрасписание : если есть тема на е выберите и удалите один поток из e и перезапустите его еще разблокировать монитор

В качестве вариации этой схемы уведомленный поток может быть перемещен в очередь, называемую w, которая имеет приоритет над e. См. Howard [4] и Buhr et al. [5] для дальнейшего обсуждения.

Можно связать утверждение P c с каждой условной переменной c таким образом, что P c обязательно будет истинным при возврате из . Однако необходимо гарантировать, что P c сохраняется с момента, когда уведомляющий поток прекращает занятие, до тех пор, пока уведомленный поток не будет выбран для повторного входа в монитор. Между этими моментами может быть активность других занимающих. Таким образом, P c обычно просто является истинным .wait c

По этой причине обычно необходимо заключать каждую операцию ожидания в цикл, подобный этому:

пока  нет ( P ) делать  ждать c

где P — некоторое условие, более сильное, чем P c . Операции и рассматриваются как «подсказки» о том, что P может быть истинным для некоторого ожидающего потока. Каждая итерация такого цикла после первой представляет собой потерянное уведомление; таким образом, с неблокирующими мониторами нужно быть осторожным, чтобы гарантировать, что слишком много уведомлений не будет потеряно.notify cnotify all c

В качестве примера «намека» рассмотрим банковский счет, на котором поток снятия средств будет ждать, пока на счете не будет достаточно средств, прежде чем продолжить.

класс монитора  Account { private  int balance := 0 инвариантный balance >= 0 private  NonblockingCondition balanceMayBeBigEnough публичный метод withdraw( int amount) предварительное условие amount >= 0 { пока баланс < сумма делать  ждать balanceMayBeBigEnough утверждать баланс >= сумма баланс := баланс - сумма } публичный метод deposit( int amount) предварительное условие amount >= 0 { баланс := баланс + сумма уведомить всех balanceMayBeBigEnough }}

В этом примере ожидаемое условие является функцией суммы, которая должна быть снята, поэтому поток, делающий депозит, не может знать , что он сделал такое условие истинным. В этом случае имеет смысл разрешить каждому ожидающему потоку входить в монитор (по одному за раз) для проверки истинности его утверждения.

Неявные мониторы переменных условий

Монитор в стиле Java

В языке Java каждый объект может использоваться как монитор. Методы, требующие взаимного исключения, должны быть явно помечены ключевым словом synchronized . Блоки кода также могут быть помечены synchronized . [6]

Вместо явных переменных условий каждый монитор (т. е. объект) оснащен одной очередью ожидания в дополнение к своей входной очереди. Все ожидание выполняется в этой единственной очереди ожидания, и все операции notify и notifyAll применяются к этой очереди. [7] Этот подход был принят в других языках, например, C# .

Неявная сигнализация

Другой подход к сигнализации — это пропуск операции сигнала . Всякий раз, когда поток покидает монитор (возвращаясь или ожидая), утверждения всех ожидающих потоков оцениваются до тех пор, пока одно из них не окажется истинным. В такой системе переменные условия не нужны, но утверждения должны быть явно закодированы. Контракт на ожидание — это

ожидание  P : предварительное условие  I  изменяет состояние монитора после условия  P  и  I

История

Бринч Хансен и Хоар разработали концепцию монитора в начале 1970-х годов, основываясь на своих собственных идеях и идеях Эдсгера Дейкстры . [8] Бринч Хансен опубликовал первую нотацию монитора, приняв концепцию класса Simula 67 , [1] и изобрел механизм очередей. [9] Хоар усовершенствовал правила возобновления процесса. [2] Бринч Хансен создал первую реализацию мониторов на Concurrent Pascal . [8] Хоар продемонстрировал их эквивалентность семафорам .

Мониторы (и Concurrent Pascal) вскоре стали использоваться для структурной синхронизации процессов в операционной системе Solo. [10] [11]

Языки программирования, поддерживающие мониторы, включают:

Было написано несколько библиотек, которые позволяют создавать мониторы на языках, которые не поддерживают их изначально. При использовании библиотечных вызовов программист должен явно помечать начало и конец кода, выполняемого с взаимным исключением. Pthreads — одна из таких библиотек.

Смотрите также

Примечания

  1. ^ ab Brinch Hansen, Per (1973). "7.2 Class Concept" (PDF) . Принципы операционной системы . Prentice Hall. ISBN 978-0-13-637843-3.
  2. ^ ab Hoare, CAR (октябрь 1974 г.). «Мониторы: концепция структурирования операционной системы». Comm. ACM . 17 (10): 549–557. CiteSeerX 10.1.1.24.6394 . doi :10.1145/355620.361161. S2CID  1005769. 
  3. ^ Хансен, ПБ (июнь 1975 г.). «Язык программирования Concurrent Pascal» (PDF) . IEEE Транс. Программное обеспечение англ. СЭ-1 (2): 199–207. дои : 10.1109/TSE.1975.6312840. S2CID  2000388.
  4. ^ ab Howard, John H. (1976). «Сигнализация в мониторах». ICSE '76 Труды 2-й международной конференции по программной инженерии . Международная конференция по программной инженерии. Лос-Аламитос, Калифорния, США: IEEE Computer Society Press. стр. 47–52.
  5. ^ ab Buhr, Peter A.; Fortier, Michel; Coffin, Michael H. (март 1995 г.). «Классификация мониторов». ACM Computing Surveys . 27 (1): 63–107. doi : 10.1145/214037.214100 . S2CID  207193134.
  6. ^ Bloch 2018, стр. 311-316, §Пункт 11: Синхронизация доступа к общим изменяемым данным.
  7. ^ Bloch 2018, стр. 325-329, §Глава 11, пункт 81: Предпочитать утилиты параллельного выполнения для ожидания и уведомления.
  8. ^ ab Hansen, Per Brinch (1993). "Мониторы и параллельный Pascal: личная история". HOPL-II: Вторая конференция ACM SIGPLAN по истории языков программирования . История языков программирования. Нью-Йорк, США: ACM . стр. 1–35. doi :10.1145/155360.155361. ISBN 0-89791-570-4.
  9. ^ Brinch Hansen, Per (июль 1972). «Структурированное мультипрограммирование (приглашенный доклад)». Communications of the ACM . 15 (7): 574–578. doi : 10.1145/361454.361473 . S2CID  14125530.
  10. ^ Бринч Хансен, Пер (апрель 1976 г.). "Операционная система Solo: параллельная программа на языке Pascal" (PDF) . Программное обеспечение: практика и опыт .
  11. ^ Brinch Hansen, Per (1977). Архитектура параллельных программ . Prentice Hall. ISBN 978-0-13-044628-2.
  12. ^ "sync - Язык программирования Go". golang.org . Получено 2021-06-17 .
  13. ^ "Что такое "sync.Cond" | dtyler.io". dtyler.io . Архивировано из оригинала 2021-10-01 . Получено 2021-06-17 .

Дальнейшее чтение

Внешние ссылки