В параллельных вычислениях барьер — это тип метода синхронизации . [1] Барьер для группы потоков или процессов в исходном коде означает, что любой поток/процесс должен остановиться в этой точке и не может продолжаться, пока все остальные потоки/процессы не достигнут этого барьера. [2]
Многие коллективные процедуры и параллельные языки на основе директив накладывают неявные барьеры. Например, параллельный цикл do в Fortran с OpenMP не будет разрешен для продолжения в любом потоке, пока не завершится последняя итерация. [ необходима цитата ] Это в случае, если программа полагается на результат цикла сразу после его завершения. При передаче сообщений любая глобальная коммуникация (например, сокращение или рассеивание) может подразумевать барьер.
В параллельных вычислениях барьер может находиться в поднятом или опущенном состоянии . Термин защелка иногда используется для обозначения барьера, который запускается в поднятом состоянии и не может быть повторно поднят после того, как он находится в опущенном состоянии. Термин защелка обратного отсчета иногда используется для обозначения защелки, которая автоматически опускается после прибытия предопределенного числа потоков/процессов.
Возьмем пример для потока, известного как барьер потока . Барьеру потока нужна переменная для отслеживания общего числа потоков, которые вошли в барьер . [3] Всякий раз, когда достаточно потоков входят в барьер, он будет снят. При реализации барьера потока также необходим примитив синхронизации, такой как мьютекс .
Этот метод барьера нитей также известен как централизованный барьер , поскольку нити должны ждать перед «центральным барьером», пока ожидаемое количество нитей не достигнет барьера, прежде чем он будет поднят.
Следующий код на языке C, реализующий барьер потока с использованием потоков POSIX, продемонстрирует эту процедуру: [1]
#include <stdio.h> #include <pthread.h> #define ОБЩЕЕ_КОЛИЧЕСТВО_ПОТОКОВ 2#define THREAD_BARRIERS_NUMBER 3#define PTHREAD_BARRIER_ATTR NULL // атрибут барьера pthreadtypedef struct _thread_barrier { int номер_барьера_потока ; pthread_mutex_t блокировка ; int total_thread ; } thread_barrier ; thread_barrier барьер ; void thread_barrier_init ( thread_barrier * barrier , pthread_mutexattr_t * mutex_attr , int thread_barrier_number ) { pthread_mutex_init ( & ( барьер -> блокировка ), mutex_attr ); барьер -> номер_барьера_потока = номер_барьера_потока ; barrier -> total_thread = 0 ; // Устанавливаем общее количество потоков равным 0 }void thread_barrier_wait ( thread_barrier * barrier ){ если ( ! pthread_mutex_lock ( & ( барьер -> блокировка ))){ барьер -> total_thread += 1 ; pthread_mutex_unlock ( & ( барьер -> блокировка )); } пока ( барьер -> общее_число_потоков < барьер -> номер_барьера_потоков ); если ( ! pthread_mutex_lock ( & ( барьер -> блокировка ))){ barrier -> total_thread -= 1 ; // Уменьшаем один поток, поскольку он прошел барьер потока pthread_mutex_unlock ( & ( барьер -> блокировка )); }}void thread_barrier_destroy ( thread_barrier * barrier ){ pthread_mutex_destroy ( & ( барьер -> блокировка ));}void * thread_func ( void * ptr ){ printf ( "поток с идентификатором %ld ожидает у барьера, так как запущено недостаточно %d потоков ... \n " , pthread_self (), THREAD_BARRIERS_NUMBER ); thread_barrier_wait ( & барьер ); printf ( "Барьер снят, поток с идентификатором %ld сейчас запущен \n " , pthread_self ()); }целочисленный основной () { pthread_t thread_id [ ВСЕГО_ПОТОКОВ ]; thread_barrier_init ( & барьер , PTHREAD_BARRIER_ATTR , THREAD_BARRIERS_NUMBER ); для ( int i = 0 ; i < ОБЩЕЕ_КОЛИЧЕСТВО_ПОТОКОВ ; i ++ ){ pthread_create ( & thread_id [ i ], NULL , thread_func , NULL ); } // Поскольку pthread_join() заблокирует процесс до тех пор, пока все указанные потоки не будут завершены, // и не хватает потока для ожидания у барьера, поэтому этот процесс блокируется для ( int i = 0 ; i < ОБЩЕЕ_КОЛИЧЕСТВО_ПОТОКОВ ; i ++ ){ pthread_join ( thread_id [ i ], NULL ); } thread_barrier_destroy ( & барьер ); printf ( "Барьер потока снят \n " ); // Эта строка не будет вызываться как TOTAL_THREADS < THREAD_BARRIERS_NUMBER }
В этой программе барьер потока определяется как структура struct _thread_barrier, которая включает в себя:
Исходя из определения барьера, нам необходимо реализовать в этой программе функцию типа thread_barrier_wait() , которая будет «контролировать» общее количество потоков в программе для преодоления барьера.
В этой программе каждый поток, вызывающий thread_barrier_wait(), будет заблокирован до тех пор, пока THREAD_BARRIERS_NUMBER потоков не достигнут барьера потока.
Результат этой программы:
поток с идентификатором <thread_id, например 139997337872128> ожидает у барьера, так как запущено недостаточно 3 потоков... поток с идентификатором <thread_id, например 139997329479424> ожидает у барьера, так как запущено недостаточно 3 потоков... // (основной процесс заблокирован, так как не имеет достаточно 3 потоков) // Строка printf("Барьер потока снят\n") не будет достигнута
Как мы видим из программы, создано всего 2 потока. Оба этих потока имеют , как обработчик функции потока, который вызывает , в то время как барьер потока ожидал 3 потока, чтобы вызвать ( ), чтобы быть снятым. Измените TOTAL_THREADS на 3, и барьер потока будет снят:thread_func()
thread_barrier_wait(&barrier)
thread_barrier_wait
THREAD_BARRIERS_NUMBER = 3
поток с идентификатором <ID потока, например 140453108946688> ожидает у барьера, так как недостаточно 3 потоков запущены... поток с идентификатором <ID потока, например 140453117339392> ожидает у барьера, так как недостаточно 3 потоков запущены... поток с идентификатором <ID потока, например 140453100553984> ожидает у барьера, так как недостаточно 3 потоков запущены... Барьер снят, поток с идентификатором <ID потока, например 140453108946688> запущен сейчас Барьер снят, поток с идентификатором <ID потока, например 140453117339392> запущен сейчас Барьер снят, поток с идентификатором <ID потока, например 140453100553984> запущен сейчас Барьер потока снят
Помимо уменьшения общего числа потоков на единицу для каждого потока, успешно проходящего барьер потока, барьер потока может использовать противоположные значения для обозначения каждого состояния потока как проходящего или останавливающегося. [4] Например, поток 1 со значением состояния 0 означает, что он останавливается у барьера, поток 2 со значением состояния 1 означает, что он прошел барьер, значение состояния потока 3 = 0 означает, что он останавливается у барьера и т. д. [5] Это известно как Sense-Reversal. [1]
Следующий код на языке C демонстрирует это: [3] [6]
#include <stdio.h> #include <stdbool.h> #include <pthread.h> #define ОБЩЕЕ_КОЛИЧЕСТВО_ПОТОКОВ 2#define THREAD_BARRIERS_NUMBER 3#define PTHREAD_BARRIER_ATTR NULL // атрибут барьера pthreadtypedef struct _thread_barrier { int номер_барьера_потока ; int total_thread ; pthread_mutex_t блокировка ; логический флаг ; } thread_barrier ; thread_barrier барьер ; void thread_barrier_init ( thread_barrier * barrier , pthread_mutexattr_t * mutex_attr , int thread_barrier_number ) { pthread_mutex_init ( & ( барьер -> блокировка ), mutex_attr ); барьер -> total_thread = 0 ; барьер -> номер_барьера_потока = номер_барьера_потока ; барьер -> флаг = ложь ; }void thread_barrier_wait ( thread_barrier * barrier ){ bool local_sense = барьер -> флаг ; если ( ! pthread_mutex_lock ( & ( барьер -> блокировка ))){ барьер -> total_thread += 1 ; local_sense = ! local_sense ; если ( барьер -> общий_поток == барьер -> номер_барьера_потоков ){ барьер -> total_thread = 0 ; барьер -> флаг = local_sense ; pthread_mutex_unlock ( & ( барьер -> блокировка )); } еще { pthread_mutex_unlock ( & ( барьер -> блокировка )); while ( barrier -> flag != local_sense ); // ждем флаг } }}void thread_barrier_destroy ( thread_barrier * barrier ){ pthread_mutex_destroy ( & ( барьер -> блокировка ));}void * thread_func ( void * ptr ){ printf ( "поток с идентификатором %ld ожидает у барьера, так как запущено недостаточно %d потоков ... \n " , pthread_self (), THREAD_BARRIERS_NUMBER ); thread_barrier_wait ( & барьер ); printf ( "Барьер снят, поток с идентификатором %ld сейчас запущен \n " , pthread_self ()); }целочисленный основной () { pthread_t thread_id [ ВСЕГО_ПОТОКОВ ]; thread_barrier_init ( & барьер , PTHREAD_BARRIER_ATTR , THREAD_BARRIERS_NUMBER ); для ( int i = 0 ; i < ОБЩЕЕ_КОЛИЧЕСТВО_ПОТОКОВ ; i ++ ){ pthread_create ( & thread_id [ i ], NULL , thread_func , NULL ); } // Поскольку pthread_join() заблокирует процесс до тех пор, пока все указанные потоки не будут завершены, // и не хватает потока для ожидания у барьера, поэтому этот процесс блокируется для ( int i = 0 ; i < ОБЩЕЕ_КОЛИЧЕСТВО_ПОТОКОВ ; i ++ ){ pthread_join ( thread_id [ i ], NULL ); } thread_barrier_destroy ( & барьер ); printf ( "Барьер потока снят \n " ); // Эта строка не будет вызываться как TOTAL_THREADS < THREAD_BARRIERS_NUMBER }
Эта программа имеет все функции, аналогичные предыдущему исходному коду Centralized Barrier . Она просто реализуется другим способом, используя 2 новые переменные: [1]
Когда поток останавливается на барьере, значение local_sense переключается. [1] Когда на барьере потока останавливается менее THREAD_BARRIERS_NUMBER потоков, эти потоки будут продолжать ждать при условии, что флаг- член структуры _thread_barrier не равен закрытой local_sense
переменной.
Когда число потоков, остановившихся на барьере потока, равно THREAD_BARRIERS_NUMBER , общее количество потоков сбрасывается до 0, а флаг устанавливается в значение local_sense
.
Потенциальная проблема с централизованным барьером заключается в том, что из-за того, что все потоки многократно обращаются к глобальной переменной для передачи/остановки, трафик связи становится довольно высоким, что снижает масштабируемость .
Эту проблему можно решить, перегруппировав потоки и используя многоуровневый барьер, например, Combining Tree Barrier. Также аппаратные реализации могут иметь преимущество более высокой масштабируемости .
Комбинированный барьер дерева — это иерархический способ реализации барьера для решения проблемы масштабируемости путем избежания ситуации, когда все потоки вращаются в одном и том же месте. [4]
В k-Tree Barrier все потоки равномерно разделены на подгруппы по k потоков, и внутри этих подгрупп выполняется первый раунд синхронизации. После того, как все подгруппы выполнили свою синхронизацию, первый поток в каждой подгруппе переходит на второй уровень для дальнейшей синхронизации. На втором уровне, как и на первом уровне, потоки формируют новые подгруппы по k потоков и синхронизируются внутри групп, отправляя один поток в каждой подгруппе на следующий уровень и так далее. В конце концов, на последнем уровне остается только одна подгруппа для синхронизации. После синхронизации на последнем уровне сигнал освобождения передается на верхние уровни, и все потоки преодолевают барьер. [6] [7]
Аппаратный барьер использует аппаратное обеспечение для реализации вышеуказанной базовой модели барьера. [3]
Простейшая аппаратная реализация использует выделенные провода для передачи сигнала для реализации барьера. Этот выделенный провод выполняет операцию ИЛИ/И, чтобы действовать как флаги пропуска/блокировки и счетчик потоков. Для небольших систем такая модель работает, и скорость связи не является основной проблемой. В больших многопроцессорных системах эта аппаратная конструкция может сделать реализацию барьера с высокой задержкой. Сетевое соединение между процессорами является одной из реализаций для снижения задержки, что аналогично объединению барьера дерева. [8]
Стандарт потоков POSIX напрямую поддерживает функции барьера потоков , которые можно использовать для блокировки указанных потоков или всего процесса на барьере до тех пор, пока другие потоки не достигнут этого барьера . [2] POSIX поддерживает 3 основных API для реализации барьеров потоков:
pthread_barrier_init()
pthread_barrier_destroy()
pthread_barrier_wait()
pthread_barrier_init()
вызове, pthread_barrier_wait()
не снимет барьер. [10]В следующем примере (реализованном на языке C с API pthread) будет использоваться барьер потока для блокировки всех потоков основного процесса и, следовательно, блокировки всего процесса:
#include <stdio.h> #include <pthread.h> #define ОБЩЕЕ_КОЛИЧЕСТВО_ПОТОКОВ 2#define THREAD_BARRIERS_NUMBER 3#define PTHREAD_BARRIER_ATTR NULL // атрибут барьера pthreadpthread_barrier_t барьер ; void * thread_func ( void * ptr ){ printf ( "Ожидание у барьера, так как запущено недостаточно %d потоков ... \n " , THREAD_BARRIERS_NUMBER ); pthread_barrier_wait ( & барьер ); printf ( "Барьер снят, поток с идентификатором %ld сейчас запущен \n " , pthread_self ()); }целочисленный основной () { pthread_t thread_id [ ВСЕГО_ПОТОКОВ ]; pthread_barrier_init ( & барьер , PTHREAD_BARRIER_ATTR , THREAD_BARRIERS_NUMBER ); для ( int i = 0 ; i < ОБЩЕЕ_КОЛИЧЕСТВО_ПОТОКОВ ; i ++ ){ pthread_create ( & thread_id [ i ], NULL , thread_func , NULL ); } // Поскольку pthread_join() заблокирует процесс до тех пор, пока все указанные им потоки не будут завершены, // и не хватает потока для ожидания у барьера, поэтому этот процесс блокируется для ( int i = 0 ; i < ОБЩЕЕ_КОЛИЧЕСТВО_ПОТОКОВ ; i ++ ){ pthread_join ( thread_id [ i ], NULL ); } pthread_barrier_destroy ( & барьер ); printf ( "Барьер потока снят \n " ); // Эта строка не будет вызываться как TOTAL_THREADS < THREAD_BARRIERS_NUMBER }
Результат этого исходного кода:
Ожидание у барьера, так как запущено недостаточно 3 потоков... Ожидание у барьера, так как запущено недостаточно 3 потоков... // (основной процесс заблокирован, так как не запущено достаточно 3 потоков) // Строка printf("Барьер потока снят\n") не будет достигнута
Как мы видим из исходного кода, создано всего два потока. Оба этих потока имеют thread_func() в качестве обработчика функции потока, который вызывает , в то время как барьер потока ожидал 3 потока для вызова ( ) для снятия. Измените TOTAL_THREADS на 3, и барьер потока будет снят:pthread_barrier_wait(&barrier)
pthread_barrier_wait
THREAD_BARRIERS_NUMBER = 3
Ожидание у барьера, так как недостаточно 3 потока запущены... Ожидание у барьера, так как недостаточно 3 потока запущены... Ожидание у барьера, так как недостаточно 3 потока запущены... Барьер снят, идентификатор потока 140643372406528 запущен сейчас Барьер снят , идентификатор потока 140643380799232 запущен сейчас Барьер снят, идентификатор потока 140643389191936 запущен сейчас Барьер потока снят
Поскольку main() рассматривается как поток , т. е. "главный" поток процесса, [11] вызов pthread_barrier_wait()
внутри main()
заблокирует весь процесс, пока другие потоки не достигнут барьера. В следующем примере будет использоваться барьер потока с pthread_barrier_wait()
внутри main()
для блокировки процесса/главного потока на 5 секунд в ожидании того, что 2 "ново созданных" потока достигнут барьера потока:
#define ОБЩЕЕ_КОЛИЧЕСТВО_ПОТОКОВ 2#define THREAD_BARRIERS_NUMBER 3#define PTHREAD_BARRIER_ATTR NULL // атрибут барьера pthreadpthread_barrier_t барьер ; void * thread_func ( void * ptr ){ printf ( "Ожидание у барьера, так как запущено недостаточно %d потоков ... \n " , THREAD_BARRIERS_NUMBER ); сон ( 5 ); pthread_barrier_wait ( & барьер ); printf ( "Барьер снят, поток с идентификатором %ld сейчас запущен \n " , pthread_self ()); }целочисленный основной () { pthread_t thread_id [ ВСЕГО_ПОТОКОВ ]; pthread_barrier_init ( & барьер , PTHREAD_BARRIER_ATTR , THREAD_BARRIERS_NUMBER ); для ( int i = 0 ; i < ОБЩЕЕ_КОЛИЧЕСТВО_ПОТОКОВ ; i ++ ){ pthread_create ( & thread_id [ i ], NULL , thread_func , NULL ); }pthread_barrier_wait ( & барьер ); printf ( "Барьер потока снят \n " ); // Эта строка не будет вызываться как TOTAL_THREADS < THREAD_BARRIERS_NUMBER pthread_barrier_destroy ( & барьер );}
В этом примере не используется pthread_join()
ожидание завершения 2 "ново созданных" потоков. Он вызывает pthread_barrier_wait()
внутри main()
, чтобы заблокировать основной поток, так что процесс будет заблокирован до тех пор, пока 2 потока не завершат свою работу после 5 секунд ожидания (строка 9 - sleep(5)
).
«Параллельное программирование с барьерной синхронизацией». sourceallies.com . Март 2012 г.