Создание барьера синхронизации с использованием C++11

в 15:17, , рубрики: c++, c++11, threads, барьеры, параллельное программирование, потоки с++, Программирование

Введение

Сравнивая две различные технологии параллельного программирования: потоки POSIX и потоки C++11, можно заметить, что в последних отсутствует аналог типа barrier_t из библиотеки pthread.

Довольно странно, что такой важный примитив синхронизации отсутствует в стандартной библиотеке. В этой статье пойдёт речь о том, как сделать барьер с использованием только библиотек, входящих в набор стандарта C++11.

Определение
Барьер — один из примитивов синхронизации. Он создаётся на некоторое количество потоков. Когда первый поток завершает свою работу, то он остаётся ждать у барьера и ждёт, пока не завершат работу остальные потоки.
Как только у барьера накапливается ровно столько потоков, на сколько был создан барьер, все потоки, которые ожидают у барьера, продолжают свою работу.

Начнём создавать свой барьер, с блэкджеком и ...

Прежде всего, нам понадобится подключить следующие библиотеки, входящие в стандарт C++11:

#include <thread>
#include <atomic>
#include <condition_variable>
#include <mutex>

Сейчас вы, наверное, спросите, а почему нам понадобятся все эти библиотеки? Ну, первая для барьера не нужна, но я не думаю, что вы сможете проверить свой код без подключения этой библиотеки.

Но обо всём по порядку!

Какое поле самое важное для барьера? Очевидно, число потоков.
Что ещё необходимо знать барьеру? Количество потоков, которое сейчас на нём ожидает.

Рука так и тянется написать

class barrier {
 const unsigned int threadCount;
 unsigned int threadsWaiting;
public:
 barrier(unsigned int n) : threadCount(n) {
  threadsWaiting = 0;
 }
};

Однако давайте немного подумаем. Барьер уже замедляет работу приложения, поскольку для синхронизации требуется время. Таким образом, нам бы хотелось снизить затраты на создание и обработку самого барьера.
Поэтому для изменения числа потоков, которые ожидают на барьере, более подойдут атомарные операции.

Итак, наш класс теперь выглядит следующим образом:

class barrier {
 const unsigned int threadCount;
 std::atomic<unsigned int> threadsWaiting;
public:
 barrier(unsigned int n) : threadCount(n) {
  threadsWaiting = 0;
 }
};

Ну, скелет класса мы написали. Можем создавать объект этого класса, есть конструктор, есть конструктор копирования…
Простите, что я сказал? Вообще, при сочетании объектно-ориентированного и параллельного программирования лучше избавляться от конструкторов копирования, чтобы оградить себя от неприятных последствий.

Ну что ж, C++11 позволяет явно запретить этот конструктор

class barrier {
 const unsigned int threadCount;
 std::atomic<unsigned int> threadsWaiting;
public:
 barrier(unsigned int n) : threadCount(n) {
  threadsWaiting = 0;
 }
 barrier(const barrier &) = delete;
};

Так, с этим разобрались. Осталось лишь написать метод, ради которого мы всё это и затеяли. Ожидание на барьере.

В голову приходит следующая идея: сделаем логическую переменную, которая будет отвечать за ожидание у барьера или его прохождение, а поведение реализуем с помощью условной переменной по этому самому условию.

Так что подправим наш класс новыми полями:

class barrier {
 const unsigned int threadCount;
 std::atomic<unsigned int> threadsWaiting;
 bool isNotWaiting;
 std::condition_variable waitVariable;
 std::mutex mutex;
public:
 barrier(unsigned int n) : threadCount(n) {
  threadsWaiting = 0;
 }
 barrier(const barrier &) = delete;
};

Теперь разберёмся с методом. Если прошли ещё не все потоки, то дошедшие до барьера потоки должны спать на этой условной переменной, т. е. должен выполняться следующий код

std::unique_lock<std::mutex> lock(mutex);
waitVariable.wait(lock, [&]{ return noWait; });

Если прошли все потоки, то мы должны уведомить остальные потоки о том, что на барьере больше ждать не надо. Это сделает следующий код:

isNotWaiting = true;
waitVariable.notify_all();
threadsWaiting.store(0);

Последний метод осуществляет атомарную запись числа 0 в переменную threadsWaiting.

Теперь осталось решить один простой вопрос: а как эти два случая объединить. Как нам узнать, сколько потоков ожидает на барьере?

Теперь вспоминаем, как устроен барьер. Для ожидания потока на барьере все потоки должны вызвать функцию барьера. Таким образом, как только вызван метод wait, мы должны сразу же увеличить нашу переменную threadsWaiting на 1.
Для этого используем такую функцию как fetch_add. Это одна из так называемых RMW-операций (read-modify-write). Она читает значение атомарной переменной, складывает её атомарно с аргументом и записывает в неё новое значение, при этом возвращая старое.

Таким образом, описанные выше два случая объединяются условным оператором, и наш класс выглядит следующим образом:

class barrier {
 const unsigned int threadCount;
 std::atomic<unsigned int> threadsWaiting;
 bool isNotWaiting;
 std::condition_variable waitVariable;
 std::mutex mutex;
public:
 barrier(unsigned int n) : threadCount(n) {
  threadsWaiting = 0;
 }
 barrier(const barrier &) = delete;
 void wait() {
  if (threadsWaiting.fetch_add(1) >= threadCount - 1) {
   isNotWaiting = true;
   waitVariable.notify_all();
   threadCount.store(0);
 }
 else {
  std::unique_lock<std::mutex> lock(mutex);
  waitVariable.wait(lock,[&]{ return isNoWaiting;});
 }
};

Теперь осталось лишь задать начальное значение переменной isNotWaiting, которое, очевидно, равно false.

class barrier {
 const unsigned int threadCount;
 std::atomic<unsigned int> threadsWaiting;
 bool isNotWaiting;
 std::condition_variable waitVariable;
 std::mutex mutex;
public:
 barrier(unsigned int n) : threadCount(n) {
  threadsWaiting = 0;
  isNotWaiting = false;
 }
 barrier(const barrier &) = delete;
 void wait() {
  if (threadsWaiting.fetch_add(1) >= threadCount - 1) {
   isNotWaiting = true;
   waitVariable.notify_all();
   threadCount.store(0);
 }
 else {
  std::unique_lock<std::mutex> lock(mutex);
  waitVariable.wait(lock,[&]{ return isNotWaiting;});
 }
};

Итак, мы написали класс для барьера с использованием стандарта С++11 без подключения сторонних библиотек.

Теперь вы можете мне возразить: ну, написал я какой-то код? А где доказательство того, что оно работает?

Итак, самая важная часть: демонстрация барьера

#include <iostream>
#include <thread>
#include <atomic>
#include <condition_variable>
#include <mutex>

class barrier {
 const unsigned int threadCount;
 std::atomic<unsigned int>threadsWaiting;
  bool isNotWaiting;
 std::condition_variable waitVariable;
 std::mutex mutex;
public:
 barrier(unsigned int n) : threadCount(n) {
  threadsWaiting = 0;
  isNotWaiting = false;
}
barrier(const barrier &) = delete;
 void wait() {
  if (threadsWaiting.fetch_add(1) >= threadCount - 1) {
   isNotWaiting = true;
   waitVariable.notify_all();
   threadsWaiting.store(0);
 }
 else {
  std::unique_lock<std::mutex> lock(mutex);
  waitVariable.wait(lock,[&]{ return isNotWaiting;});
 }
}
};


barrier *myBarrier;

class Thread {
private:
	std::thread* cppthread;
	static void threadFunction(Thread* arg) {
		arg->run();
	}
public:
	Thread() {}
	Thread(const Thread&) = delete;
	virtual ~Thread() {delete cppthread;}
	virtual void run() = 0;
	void start() {
		cppthread = new std::thread(Thread::threadFunction, this);
	}
	void wait() {
		cppthread->join();
	}
};

class BarrierDemo: public Thread {
  int id;
public:
 BarrierDemo(int i) {
 	id = i;
 }
 void run() {
 	std::cout << "Thread " << id << "runs before barrier" << std::endl;
 	myBarrier->wait();
 	std::cout << "Thread " << id << "runs after barrier" << std::endl;
 }
};

int main() {
	// your code goes here
	int threads;
    std::cin >> threads;
    myBarrier = new barrier(threads);

   BarrierDemo* bardemos = static_cast<BarrierDemo*>(::operator new(sizeof(BarrierDemo)*threads));
    
    for (int i = 0; i < threads; i++) {
		new (&bardemos[i])BarrierDemo(i);
		bardemos[i].start();
	}
	for (int i = 0; i < threads; i++) {
		bardemos[i].wait();
	}
   
  
   ::operator delete(bardemos);
    delete myBarrier;
	return 0;
}

Вы можете скопировать приведённый выше код в компилятор с поддержкой C++11 для проверки его работоспособности. На этом данная статья и заканчивается.

P. S. Несложно из приведённого кода догадаться, что это «одноразовый» барьер: как только через него пройдут все потоки, вы не сможете повторно использовать этот же экземпляр класса в качестве барьера.

Автор: ProPupil

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js