- PVSM.RU - https://www.pvsm.ru -
Для более полного понимания этой статьи, рекомендуется прочитать ее первую часть [1], где основное внимание было уделено потокам и блокировкам, в ней объяснено много моментов (терминов, функций и т.д.), которые без пояснения будут использованы здесь.
В данной статье будут рассмотрены условные переменные…
Помимо описанных ранее [1] способов синхронизации, C++11 предоставляет поддержку условных переменных, которые позволяют блокировать один или более потоков, пока либо не будет получено уведомление от другого потока, либо не произойдет мифическое spurious wakeup [2] («ложное/случайное пробуждение»).
Есть две реализации условных переменных, доступных в заголовке <condition_variable>
:
std::unique_lock
Опишу, как работают условные переменные:
unique_lock
. Эта блокировка передается методу wait()
, который освобождает мьютекс и приостанавливает поток, пока не будет получен сигнал от условной переменной. Когда это произойдет, поток пробудится и снова выполнится lock
.Код ниже демонстрирует пример использования условной переменной, для синхронизации потоков: во время работы некоторых потоков потоков (назовем их «рабочими») могут произойти ошибку, при этом они помещаются в очередь. Поток «регистратора» обрабатывает эти ошибки (получая их из очереди) и печатает их. «Рабочие» сигнализируют «регистратору», когда происходит ошибка. Регистратор ожидает сигнала условной переменной. Чтобы избежать ложных пробуждений, ожидание происходит в цикле, где проверяется булевское условие.
#include <condition_variable>
#include <iostream>
#include <random>
#include <thread>
#include <mutex>
#include <queue>
std::mutex g_lockprint;
std::mutex g_lockqueue;
std::condition_variable g_queuecheck;
std::queue<int> g_codes;
bool g_done;
bool g_notified;
void workerFunc(int id, std::mt19937 &generator)
{
// стартовое сообщение
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[worker " << id << "]trunning..." << std::endl;
}
// симуляция работы
std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));
// симуляция ошибки
int errorcode = id*100+1;
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[worker " << id << "]tan error occurred: " << errorcode << std::endl;
}
// сообщаем об ошибке
{
std::unique_lock<std::mutex> locker(g_lockqueue);
g_codes.push(errorcode);
g_notified = true;
g_queuecheck.notify_one();
}
}
void loggerFunc()
{
// стартовое сообщение
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[logger]trunning..." << std::endl;
}
// до тех пор, пока не будет получен сигнал
while(!g_done)
{
std::unique_lock<std::mutex> locker(g_lockqueue);
while(!g_notified) // от ложных пробуждений
g_queuecheck.wait(locker);
// если есть ошибки в очереди, обрабатывать их
while(!g_codes.empty())
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[logger]tprocessing error: " << g_codes.front() << std::endl;
g_codes.pop();
}
g_notified = false;
}
}
int main()
{
// инициализация генератора псевдо-случайных чисел
std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count());
// запуск регистратора
std::thread loggerThread(loggerFunc);
// запуск рабочих
std::vector<std::thread> threads;
for(int i = 0; i < 5; ++i)
threads.push_back(std::thread(workerFunc, i+1, std::ref(generator)));
for(auto &t: threads)
t.join();
// сообщаем регистратору о завершении и ожидаем его
g_done = true;
loggerthread.join();
return 0;
}
Выполнение этого кода даст примерно следующий результат (результат каждый раз будет разным, т.к. рабочие потоки работают (точнее спят) случайные интервалы времени):
[logger] running...
[worker 1] running...
[worker 2] running...
[worker 3] running...
[worker 4] running...
[worker 5] running...
[worker 1] an error occurred: 101
[worker 2] an error occurred: 201
[logger] processing error: 101
[logger] processing error: 201
[worker 5] an error occurred: 501
[logger] processing error: 501
[worker 3] an error occurred: 301
[worker 4] an error occurred: 401
[logger] processing error: 301
[logger] processing error: 401
У метода wait
, обозначенного выше, есть две перегрузки:
unique_lock
; он (метод) блокирует поток и добавляет его в очередь потоков, ожидающих сигнала от этой условной переменной; поток пробуждается, когда будет получен сигнал от условной переменной или в случае ложного пробуждения.unique_lock
, принимает предикат, используемый в цикле до тех пор, пока он не вернет false
; эта перегрузка может использоваться, чтобы избежать ложных пробуждений. В общем случае это эквивалентно такому циклу:
while(!predicate())
wait(lock);
Таким образом, используя вторую перегрузку, можно избежать использования булевского флага g_notified
в примере выше:
void workerFunc(int id, std::mt19937 &generator)
{
// стартовое сообщение
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[worker " << id << "]trunning..." << std::endl;
}
// симуляция работы
std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));
// симуляция ошибки
int errorcode = id*100+1;
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[worker " << id << "]tan error occurred: " << errorcode << std::endl;
}
// сообщаем об ошибке
{
std::unique_lock<std::mutex> locker(g_lockqueue);
g_codes.push(errorcode);
g_queuecheck.notify_one();
}
}
void loggerFunc()
{
// стартовое сообщение
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[logger]trunning..." << std::endl;
}
// до тех пор, пока не будет получен сигнал
while(!g_done)
{
std::unique_lock<std::mutex> locker(g_lockqueue);
g_queuecheck.wait(locker, [&](){return !g_codes.empty();});
// если есть ошибки в очереди, обрабатывать их
while(!g_codes.empty())
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[logger]tprocessing error: " << g_codes.front() << std::endl;
g_codes.pop();
}
}
}
В дополнении к перегруженному методу wait()
, есть еще два похожих метода с такой же перегрузкой для предиката:
Перегрузка этих методов без предиката возвращает cv_status [9], показывающий, произошел ли таймаут, или пробуждение произошло из-за сигнала условной переменной, или это ложное пробуждение.
Std также предоставляет функцию notify_all_at_thread_exit [10], которая реализует механизм уведомления других потоков о том, что данных поток завершил свою работу, включая уничтожение всех объектов thread_local
. Ожидание потоков механизмом, отличным от join
, может привести к неправильному поведению, когда thread_locals
уже были использованы, а их деструкторы могли вызываться после того, как поток был пробужден или после того, как уже завершился (см. N3070 [11] и N2880 [12]. Как правило, вызов этой функции должен произойти непосредственно до того, как поток начнет свое существование. Ниже приведен пример, как notify_all_at_thread_exit
может использоваться с условными переменными для синхронизации двух потоков:
std::mutex g_lockprint;
std::mutex g_lock;
std::condition_variable g_signal;
bool g_done;
void workerFunc(std::mt19937 &generator)
{
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "worker running..." << std::endl;
}
std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "worker finished..." << std::endl;
}
std::unique_lock<std::mutex> lock(g_lock);
g_done = true;
std::notify_all_at_thread_exit(g_signal, std::move(lock));
}
int main()
{
std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count());
std::cout << "main running..." << std::endl;
std::thread worker(workerFunc, std::ref(generator));
worker.detach();
std::cout << "main crunching..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "main waiting for worker..." << std::endl;
}
std::unique_lock<std::mutex> lock(g_lock);
while(!g_done) // против ложных пробуждений
g_signal.wait(lock);
std::cout << "main finished..." << std::endl;
return 0;
}
Если worker заканчивает свою работу перед потоком main, то результат будет таким:
main running...
worker running...
main crunching...
worker finished...
main waiting for worker...
main finished...
Если поток main заканчивает свою работу перед потоком worker, то результат будет таким:
main running...
worker running...
main crunching...
main waiting for worker...
worker finished...
main finished...
Стандарт C++11 позволяет разработчикам C++ писать многопоточный код стандартным, платформонезависимым способом. Эта статья — всего лишь «пробежка» по потокам и механизмам синхронизации от std. Заголовок <thread>
предоставляет класс с тем же именем (и много дополнительных функций), представляющий потоки. Заголовок <mutex>
обеспечивает реализацию нескольких мьютексов и «оберток» для синхронизации доступа к потокам. Заголовок <condition_variable>
предоставляет две реализации условных переменных, которые позволяют блокировать один или более потоков, до получение уведомления от другого потока или до ложного пробуждения. Для более подробной информации и понимания сути дела, конечно же, рекомендуется прочитать дополнительную литературу :)
Автор: Renzo
Источник [13]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/programmirovanie/36125
Ссылки в тексте:
[1] первую часть: http://habrahabr.ru/post/182610/
[2] spurious wakeup: http://en.wikipedia.org/wiki/Spurious_wakeup
[3] condition_variable: http://en.cppreference.com/w/cpp/thread/condition_variable
[4] condition_variable_any: http://en.cppreference.com/w/cpp/thread/condition_variable_any
[5] notify_one(): http://en.cppreference.com/w/cpp/thread/condition_variable/notify_one
[6] notify_all(): http://en.cppreference.com/w/cpp/thread/condition_variable/notify_all
[7] wait_for: http://en.cppreference.com/w/cpp/thread/condition_variable/wait_for
[8] wait_until: http://en.cppreference.com/w/cpp/thread/condition_variable/wait_until
[9] cv_status: http://en.cppreference.com/w/cpp/thread/cv_status
[10] notify_all_at_thread_exit: http://en.cppreference.com/w/cpp/thread/notify_all_at_thread_exit
[11] N3070: http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2010/n3070.html
[12] N2880: http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2009/n2880.html
[13] Источник: http://habrahabr.ru/post/182626/
Нажмите здесь для печати.