Решаем проблемы с RAII у std::thread: cancellation_token как альтернатива pthread_cancel и boost::thread::interrupt

в 9:36, , рубрики: boost, c++, pthread_cancel, raii, многопоточность

Статья рассматривает проблемы в std::thread, попутно разрешая древний спор на тему "что использовать: pthread_cancel, булев флаг или boost::thread::interrupt?"

Проблема

У класса std::thread, который добавили в C++11, есть одна неприятная особенность — он не соответствует с идиоме RAII (Resource Acquisition Is Initialization). Выдержка из стандарта:

30.3.1.3 thread destructor
~thread();
If joinable() then terminate(), otherwise no effects.

Чем нам грозит такой деструктор? Программист должен быть очень аккуратен, когда речь идёт об разрушении объекта std::thread:

void dangerous_thread()
{
  std::thread t([] { do_something(); });
  do_another_thing(); // may throw - can cause termination!
  t.join();
}

Если из функции do_another_thing вылетит исключение, то деструктор std::thread завершит всю программу, вызвав std::terminate. Что с этим можно сделать? Давайте попробуем написать RAII-обёртку вокруг std::thread и посмотрим, куда нас приведёт эта попытка.

Добавляем RAII в std::thread

class thread_wrapper
{
public:
  // Constructors

  ~thread_wrapper()
  { reset(); }

  void reset()
  {
    if (joinable())
    {
      // ???
    }
  }

  // Other methods

private:
  std::thread _impl;
};

thread_wrapper копирует интерфейс std::thread и реализует ещё одну дополнительную функцию — reset. Эта функция должна перевести поток в non-joinable состояние. Деструктор вызывает эту функцию, так что после этого _impl разрушится, не вызывая std::terminate.

Для того, чтобы перевести _impl в non-joinable состояние, у reset есть два варианта: detach или join. Проблема с detach в том, что поток продолжит выполняться, сея хаос и нарушая идиому RAII. Так что наш выбор — это join:

thread_wrapper::reset()
{
  if (joinable())
    join();
}

Серьёзная проблема

К сожалению, такая реализация thread_wrapper ничем не лучше, чем обычный std::thread. Почему? Давайте рассмотрим следующий пример использования:

void use_thread()
{
  std::atomic<bool> alive{true};
  thread_wrapper t([&alive] { while(alive) do_something(); });
  do_another_thing();
  alive = false;
}

Если из do_another_thing вылетит исключение, то аварийного завершения не произойдёт. Однако, вызов join из деструктора thread_wrapper зависнет навечно, потому что alive никогда не примет значение false, и поток никогда не завершится.

Всё дело в том, что у объекта thread_wrapper нет способа повлиять на выполняемую функцию, для того чтобы "попросить" её завершиться. Ситуация усложняется ещё и тем, что в функции do_something поток выполнения вполне может "уснуть" на условной переменной или в блокирующем вызове операционной системы.

Таким образом, для решения проблемы с деструктором std::thread необходимо решить более серьёзную проблему:

Как прервать выполнение длительной функции, особенно если в этой функции поток выполнения может "уснуть" на условной переменной или в блокирующем вызове ОС?

Частный случай этой проблемы — это прерывание потока выполнения целиком. Давайте рассмотрим три существующих способа для прерывания потока выполнения: pthread_cancel, boost::thread::interrupt и булев флаг.

Существующие решения

pthread_cancel

Отправляет выбранному потоку запрос на прерывание. Спецификация POSIX содержит особый список прерываемых функций (read, write и т.д.). После вызова pthread_cancel для какого-нибудь потока эти функции в данном потоке начинают кидать исключение особого типа. Это исключение нельзя проигнорировать — catch-блок, поймавший такое исключение, обязан кинуть его дальше, поэтому это исключение полностью разматывает стек потока и завершает его. Поток может на время запретить прерывание своих вызовов с помощью функции pthread_setcancelstate (одно из возможных применений: чтобы избежать исключений из деструкторов, функций логгирования и т.п.).

Плюсы:

  • Можно прервать ожидание на условных переменных
  • Можно прервать блокирующие вызовы ОС
  • Сложно проигнорировать запрос на прерывание

Минусы:

  • Большие проблемы с переносимостью: кроме очевидного отсутствия pthread_cancel в Windows, он также отсутствует в некоторых реализациях libc (например, в bionic, который используется в Android)
  • Проблемы с std::condition_variable::wait в C++14 и более поздних стандартах
  • Может вызвать проблемы в C коде, который использует прерываемые функции (вероятный список спецэффектов: утечки ресурсов, не разблокированные вовремя мьютексы и т.д.)
  • Прерываемые функции в деструкторе требуют особых предосторожностей (например, close является прерываемой функцией)
  • Нельзя использовать в среде без исключений
  • Нельзя применить для прерывания отдельных функций или задач

Проблемы с std::condition_variable::wait появляются из-за того, что в C++14 std::condition_variable::wait получил спецификацию noexcept. Если разрешить прерывания с помощью pthread_setcancelstate, то мы теряем возможность прерывать ожидание на условых переменных, а если прерывания будут разрешены, то у нас нет возможности соответствовать спецификации noexcept, потому что мы не можем "проглотить" это особое исключение.

boost::thread::interrupt

Библиотека Boost.Thread предоставляет опциональный механизм прерывания потоков, чем-то похожий на pthread_cancel. Для того, чтобы прервать поток выполнения, достаточно позвать у соответствующего ему объекта boost::thread метод interrupt. Проверить состояния текущего потока можно с помощью функции boost::this_thread::interruption_point: в прерванном потоке эта функция кидает исключение типа boost::thread_interrupted. В случае, если использование исключений запрещено с помощью BOOST_NO_EXCEPTIONS, то для проверки состояния можно использовать boost::this_thread::interruption_requested. Boost.Thread также позволяет прерывать ожидание в boost::condition_variable::wait. Для реализации этого используется thread-local storage и дополнительный мьютекс внутри условной переменной.

Плюсы:

  • Переносимость
  • Можно прервать boost::condition_variable::wait
  • Можно использовать в среде без исключений

Минусы:

  • Привязка к Boost.Thread — данный механизм прерывания нельзя использовать со стандартными условными переменными или потоками
  • Требует дополнительного мьютекса внутри condition_variable
  • Накладные расходы: добавляет две дополнительных блокировки/разблокировки мьютексов в каждый condition_variable::wait
  • Нельзя прервать блокирующие вызовы ОС
  • Проблематично применить для прерывания отдельных функций или задач (судя по коду, это можно сделать только при использовании исключений)
  • Незначительное нарушение философии исключений — прерывание потока не является исключительной ситуацией в жизненном цикле программы

Булев флаг

Если почитать на StackOverflow вопросы про pthread_cancel (1, 2, 3, 4), то один из самых популярных ответов: "Используйте вместо pthread_cancel булев флаг".

Атомарная переменная alive в нашем примере с исключениями — это и есть булев флаг:

void use_thread()
{
  std::atomic<bool> alive{true};
  thread_wrapper t([&alive] { while(alive) do_something(); });
  do_another_thing(); // may throw
  alive = false;
}

Плюсы:

  • Платформно-независимый
  • Очевидны точки прерывания выполнения потока

Минусы:

  • Дупликация кода
  • Мешает декомпозиции — нет простого и эффективного способа написать блокирующую функцию
  • Нельзя прервать ожидание на условных переменных (особенно если они находятся вне класса с булевым флагом)
  • Нельзя прервать блокирующие вызовы ОС

Cancellation token

Что делать? Давайте возьмём за основу булев флаг и начнём решать связанные с ним проблемы. Дупликация кода? Отлично — давайте завернём булев флаг в отдельный класс. Назовём его cancellation_token.

class cancellation_token
{
public:
  explicit operator bool() const
  { return !_cancelled; }

  void cancel()
  { _cancelled = true; }

private:
  std::atomic<bool> _cancelled;
};

Теперь можно положить cancellation_token в наш thread_wrapper:

class thread_wrapper
{
public:
  // Constructors

  ~thread_wrapper()
  { reset(); }

  void reset()
  {
    if (joinable())
    {
      _token.cancel();
      _impl.join();
    }
  }

  // Other methods

private:
  std::thread        _impl;
  cancellation_token _token;
};

Отлично, теперь осталось только передать ссылку на токен в ту функцию, которая исполняется в отдельном потоке:

template<class Function, class... Args>
thread_wrapper(Function&& f, Args&&... args)
{ _impl = std::thread(f, args..., std::ref(_token)); }

Так как thread_wrapper мы пишем для иллюстративных целей, то можно пока не использовать std::forward и, заодно, проигнорировать те проблемы, которые возникнут в с move-конструктором и функцией swap.

Настало время вспомнить пример с use_thread и исключениями:

void use_thread()
{
  std::atomic<bool> alive{true};
  thread_wrapper t([&alive] { while(alive) do_something(); });
  do_another_thing();
  alive = false;
}

Для того, чтобы добавить поддержку cancellation_token, нам достаточно добавить правильный аргумент в лямбду и убрать alive:

void use_thread()
{
  thread_wrapper t([] (cancellation_token& token) { while(token) do_something(); });
  do_another_thing();
}

Замечательно! Даже если из do_another_thing вылетит исключение — деструктор thread_wrapper всё равно вызовёт cancellation_token::cancel и поток завершит своё выполнение. Кроме того, убрав код булева флага в cancellation_token, мы значительно сократили количество кода в нашем примере.

Прерывание ожидания

Настало время научить наши токены прерывать блокирующие вызова, например — ожидание на условных переменных. Чтобы абстрагироваться от конкретных механизмов прерывания, нам понадобится интерфейс cancellation_handler:

struct cancellation_handler
{
  virtual void cancel() = 0;
};

Хэндлер для прервания ожидания на условной переменной выглядит примерно так:

class cv_handler : public cancellation_handler
{
public:
  cv_handler(std::condition_variable& condition, std::unique_lock<mutex>& lock) :
    _condition(condition), _lock(lock)
  { }

  virtual void cancel()
  {
    unique_lock l(_lock.get_mutex());
    _condition.notify_all();
  }

private:
  std::condition_variable& _condition;
  std::unique_lock<mutex>& _lock;
};

Теперь достаточно положить указатель на cancellation_handler в наш cancellation_handler и вызвать cancellation_handler::cancel из cancellation_token::cancel:

class cancellation_token
{
  std::mutex            _mutex;
  std::atomic<bool>     _cancelled;
  cancellation_handler* _handler;

public:
  explicit operator bool() const
  { return !_cancelled; }

  void cancel()
  {
    std::unique_lock<mutex> l(_mutex);
    if (_handler)
      _handler->cancel();
    _cancelled = true;
  }

  void set_handler(cancellation_handler* handler)
  {
    std::unique_lock<mutex> l(_mutex);
    _handler = handler;
  }
};

Прерываемая версия ожидания на условной переменной выглядит примерно так:

void cancellable_wait(std::condition_variable& cv, std::unique_lock<mutex>& l, cancellation_token& t)
{
  cv_handler handler(cv, l); // implements cancel()
  t.set_handler(&handler);
  cv.wait(l);
  t.set_handler(nullptr);
}

Внимание! Приведённая реализация небезопасна как с точки зрения исключений и потокобезопасности. Она здесь только для того, чтобы проиллюстрировать механизм работы cancellation_handler. Ссылки на правильную реализацию можно найти в конце статьи.

Реализовав соответствующий cancellation_handler, можно научить токен прерывать блокирующие вызовы ОС и блокирующие функции из других библиотек (если у этих функций есть хотя бы какой-нибудь механизм для прерывания ожидания).

Библиотека rethread

Описанные токены, хэндлеры и потоки реализованы в виде open-source библиотеки: https://github.com/bo-on-software/rethread, с документацией (на английском), тестами и бенчмарками.

Вот список главных отличий приведённого кода от того, что реализовано в библиотеке:

  • cancellation_token — это интерфейс с несколькими реализациями. Прерываемые функции получают cancellation_token по константной ссылке.
  • Токен использует атомики вместо мьютексов для часто используемых операций
  • Обёртка над потоком называется rethread::thread

Что есть в библиотеке:

  • Токены
  • RAII-совместимые потоки
  • Прерываемое ожидание на любых условных переменных, совместимых по интерфейсу с std::condition_variable
  • Прерываемое ожидание в poll — это позволяет реализовать прерываемые версии многих блокирующих POSIX вызовов (read, write, и т.д.)

Производительность

Измерения проводились на ноутбуке с процессором Intel Core i7-3630QM @ 2.4GHz.

Ниже приведены результаты бенчмарков токенов из rethread.
Измерялась производительность следующих операций:

  • Проверка состояния — это цена вызова функции cancellation_token::is_cancelled() (или эквивалентное этому контекстное приведение к булеву типу)
  • Вызов прерываемой функции — это накладные расходы на одну прерываемую блокирующую функцию: регистрация хэндлера в токене перед вызовом и "разрегистрация" после завершения вызова
  • Создание одного standalone_cancellation_token

Ubuntu 16.04

Процессорное время, нс
Проверка состояния токена 1.7
Вызов прерываемой функции 15.0
Создание токена 21.3

Windows 10

Процессорное время, нс
Проверка состояния токена 2.8
Вызов прерываемой функции 17.0
Создание токена 33.0

Отрицательный оверхэд

Столь низкие накладные расходы на прерываемость создают интересный эффект:
В некоторых ситуациях прерываемая функция работает быстрее, чем "обычный" подход.
В коде без использования токенов блокирующие функции не могут блокироваться навечно — тогда не получится достичь "нормального" завершения приложения (извращения вроде exit(1); нельзя считать нормой). Для того, чтобы избежать вечной блокировки и регулярно проверять состояние, нам нужен таймаут. Например, такой:

while (alive)
{
  _condition.wait_for(lock, std::chrono::milliseconds(100));
  // ...
}

Во-первых, такой код будет просыпаться каждые 100 миллисекунд только для того, чтобы проверить флаг (значение таймаута можно увеличить, но оно ограниченно сверху "разумным" временем завершения приложения).

Во-вторых, этот код неоптимален даже без таких бессмысленных пробуждений. Дело в том, что вызов condition_variable::wait_for(...) менее эффективен, чем condition_variable::wait(...): как минимум, ему нужно получить текущее время, посчитать время пробуждения, и т.д.

Для доказательства этого утверждения в rethread_testing были написаны два синтетических бенчмарка, в которых сравнивались две примитивных реализации многопоточной очереди: "обычная" (с таймаутом) и прерываемая (с токенами). Измерялось процессорное время, затраченное на то, чтобы дождаться появления в очереди одного объекта.

Процессорное время, нс
Ubuntu 16.04 & g++ 5.3.1 ("обычная" очередь) 5913
Ubuntu 16.04 & g++ 5.3.1 (прерываемая очередь) 5824
Windows 10 & MSVS 2015 ("обычная" очередь) 2467
Windows 10 & MSVS 2015 (прерываемая очередь) 1729

Итак, на MSVS 2015 прерываемая версия работает в 1.4 быстрее, чем "обычная" версия с таймаутами. На Ubuntu 16.04 разница не столь заметна, но даже там прерываемая версия явно выигрывает у "обычной".

Заключение

Это не единственное возможное решение изложенной проблемы. Наиболее заманчивая альтернатива — положить токен в thread-local storage и кидать исключение при прерывании. Поведение будет похоже на boost::thread::interrupt, но без дополнительного мьютекса в каждой условной переменной и со значительно меньшими накладными расходами. Основной недостаток такого подхода — уже упомянутое нарушение философии исключений и неочевидность точек прерывания.

Важное достоинство подхода с токенами состоит в том, что можно прерывать не потоки целиком, а отдельные задачи, а если использовать реализованный в библиотеке cancellation_token_source — то и несколько задач одновременно.

Почти весь свои "хотелки" в библиотеке я реализовал. На мой взгляд — не хватает интеграции с блокирующими вызовами системы вроде работы с файлами или сокетами. Написать прерываемые версии для read, write, connect, accept и т.д. не составит особого труда, основные проблемы — нежелание совать токены в стандартные iostream'ы и отсутствие общепринятой альтернативы.

Автор: bo-on-software

Источник

Поделиться новостью

* - обязательные к заполнению поля