- PVSM.RU - https://www.pvsm.ru -
В мире существует множество библиотек, реализующих сигналы в C++. К сожалению, у всех реализаций, с которыми я сталкивался, есть несколько проблем, которые не позволяют писать простой многопоточный код с использованием этих библиотек. Здесь я расскажу об этих проблемах, и о том, как их можно решить.
Думаю, многие уже знакомы с этой концепцией, но на всякий случай всё же напишу.
Сигнал — это способ отправить уведомление о произвольном событии получателям, которые могут регистрироваться независимо друг от друга. Если угодно, callback с множеством получателей. Или, для тех, кто работал с .NET, multicast delegate.
struct Button
{
boost::signals2::signal<void()> OnClick;
};
Подключение к сигналу и отключение от него:
void ClickHandler()
{ cout << “Button clicked” << endl; }
// ...
boost::signals2::connection c = button->OnClick.connect(&ClickHandler);
// ...
c.disconnect();
Вызов сигнала:
struct Button
{
boost::signals2::signal<void()> OnClick;
private:
void MouseDownHandler()
{
OnClick();
}
};
В однопоточном коде всё выглядит отлично, и работает неплохо, но что насчёт многопоточного?
Тут, к сожалению, есть три общих для разных реализаций проблемы:
Рассмотрим каждую из них подробно. Для этого напишем часть прошивки воображаемой медиа-приставки, а именно три класса:
Сразу скажу, что код, который вы тут увидите, предельно упрощён, и не содержит ничего лишнего, чтобы мы могли сконцентрироваться на этих проблемах. Также вы встретите типы вида TypePtr. Это всего лишь std::shared_ptr<Type>, не пугайтесь.
Итак, StorageManager. Нужен геттер для тех носителей, которые уже вставлены в приставку, и сигнал для уведомления о появлении новых.
class StorageManager
{
public:
std::vector<StoragePtr> GetStorages() const;
boost::signals2::signal<void(const StoragePtr&)> OnStorageAdded;
// ...
};
Увы, таким интерфейсом невозможно воспользоваться без того, чтобы получить race condition.
Не работает в таком порядке…
storageManager->OnStorageAdded.connect(&StorageHandler);
// Если пользователь вставляет флэшку до цикла, она будет обработана дважды
for (auto&& storage : storageManager->GetStorages())
StorageHandler(storage);
… и не работает в таком порядке.
for (auto&& storage : storageManager->GetStorages())
StorageHandler(storage);
// Если пользователь вставляет флэшку до подключения к сигналу, она не будет обработана совсем
storageManager->OnStorageAdded.connect(&StorageHandler);
Очевидно, раз мы получили race condition, нам нужен мьютекс.
class StorageManager
{
mutable std::recursive_mutex _mutex;
std::vector<StoragePtr> _storages;
public:
StorageManager()
{ /* ... */ }
boost::signals2::signal<void(const StoragePtr&)> OnStorageAdded;
std::recursive_mutex& GetMutex() const
{ return _mutex; }
std::vector<StoragePtr> GetStorages() const
{
std::lock_guard<std::recursive_mutex> l(_mutex);
return _storages;
}
private:
void ReportNewStorage(const StoragePtr& storage)
{
std::lock_guard<std::recursive_mutex> l(_mutex);
_storages.push_back(storage);
OnStorageAdded(storage);
}
};
// ...
{
std::lock_guard<std::recursive_mutex> l(storageManager->GetMutex());
storageManager->OnStorageAdded.connect(&StorageHandler);
for (auto&& storage : storageManager->GetStorages())
StorageHandler(storage);
}
Этот код будет работать, но у него есть несколько недостатков:
Давайте перенесём всё то, что мы делаем вокруг вызова connect (захват мьютекса и обход коллекции) внутрь.
Тут важно понимать, что алгоритм получения текущего состояния зависит от природы этого самого состояния. Если это коллекция, нужно вызвать обработчик для каждого элемента, если же это, например, enum, то нужно вызвать обработчик ровно один раз. Соответственно, нам нужна некая абстракция.
Добавим в сигнал популятор — функцию, принимающую обработчик, который сейчас подключается, и пусть владелец сигнала (StorageManager, в нашем случае) определяет, каким образом текущее состояние будет отправлено в этот обработчик.
template < typename Signature >
class signal
{
using populator_type = std::function<void(const std::function<Signature>&)>;
mutable std::mutex _mutex;
std::list<std::function<Signature> > _handlers;
populator_type _populator;
public:
signal(populator_type populator)
: _populator(std::move(populator))
{ }
std::mutex& get_mutex() const { return _mutex; }
signal_connection connect(std::function<Signature> handler)
{
std::lock_guard<std::mutex> l(_mutex);
_populator(handler); // Владелец сигнала определяет конкретный алгоритм получения состояния
_handlers.push_back(std::move(handler));
return signal_connection([&]() { /* удаляем обработчик из _handlers */ } );
}
// ...
};
Класс signal_connection пока принимает лямбда-функцию, которая удалит обработчик из списка в сигнале. Чуть дополненный код я приведу позже.
Перепишем StorageManager с использованием этой новой концепции:
class StorageManager
{
std::vector<StoragePtr> _storages;
public:
StorageManager()
: _storages([&](const std::function<void(const StoragePtr&)>& h) { for (auto&& s : _storages) h(s); })
{ /* ... */ }
signal<void(const StoragePtr&)> OnStorageAdded;
private:
void ReportNewStorage(const StoragePtr& storage)
{
// Мы должны захватить мьютекс именно тут, а не внутри вызова сигнала,
// потому что он защищает в том числе и коллекцию _storages
std::lock_guard<std::mutex> l(OnStorageAdded.get_mutex());
_storages.push_back(storage);
OnStorageAdded(storage);
}
};
Если вы используете C++14, популятор может быть совсем коротким:
StorageManager()
: _storages([&](auto&& h) { for (auto&& s : _storages) h(s); })
{ }
Обратите внимание, что при вызове популятора мьютекс захватывается в методе signal::connect, поэтому в теле самого популятора делать этого не нужно.
Клиентский код становится совсем коротким:
storageManager->OnStorageAdded.connect(&StorageHandler);
Одной строчкой мы одновременно подключаемся к сигналу и получаем текущее состояние объекта. Отлично!
Теперь пора писать MediaScanner. В конструкторе подключимся к сигналу StorageManager::OnStorageAdded, а в деструкторе отключимся.
class MediaScanner
{
private:
boost::signals2::connection _connection;
public:
MediaScanner(const StorageManagerPtr& storageManager)
{ _connection = storageManager->OnStorageAdded.connect([&](const StoragePtr& s) { this->StorageHandler(s); }); }
~MediaScanner()
{
_connection.disconnect();
// Обработчик сигнала может всё ещё исполняться в потоке, вызвавшем сигнал.
// В этом случае, далее он будет обращаться к разрушенному объекту MediaScanner.
}
private:
void StorageHandler(const StoragePtr& storage)
{ /* Здесь что-то долгое */ }
};
Увы, этот код время от времени будет падать. Причина в том, как работает метод disconnect во всех известных мне реализациях. Он гарантирует, что когда сигнал будет вызван в следующий раз, соответствующий обработчик не сработает. При этом, если обработчик в это время исполняется в другом потоке, то он не будет прерван, и продолжит работать с разрушенным объектом MediaScanner.
В Qt каждый объект принадлежит определённому потоку, и его обработчики вызываются исключительно в этом потоке. Для безопасного отключения от сигнала используется пара disconnect/deleteLater.
mediaScanner->disconnect();
mediaScanner->deleteLater();
Тут мы сначала отключаем объект MediaScanner от всех сигналов, к которым он был подключён, а после кладём удаление объекта в очередь его потока после всех обработчиков, которые там уже могли быть. Таким образом, на момент вызова деструктора MediaScanner ни один обработчик сигнала уже точно не будет доступаться к объекту.
Это относительно неплохой вариант, который всё же имеет и проблемы:
Буст для решения этой проблемы предлагает использовать методы track/track_foreign в слоте (т. е. обработчике). Эти методы принимают weak_ptr на произвольный объект, и соединение обработчика с сигналом существует, пока жив каждый из объектов, за которым «следит» слот.
Работает это довольно просто: в каждом слоте есть коллекция weak_ptr'ов на отслеживаемые объекты, которые «лочатся» (простите) на время выполнения обработчика. Таким образом, эти объекты гарантированно не разрушаются, пока код обработчика имеет к ним доступ. Если же какой-либо из объектов уже был уничтожен, соединение разрывается.
Проблема в том, что нам для этого требуется иметь weak_ptr на подписываемый объект. На мой взгляд, самый адекватный способ этого достичь — сделать фабричный метод в классе MediaScanner, где подписать создаваемый объект на все интересные ему сигналы:
class MediaScanner
{
public:
static std::shared_ptr<MediaScanner> Create(const StorageManagerPtr& storageManager)
{
std::lock_guard<std::recursive_mutex> l(storageManager->GetMutex());
MediaScannerPtr result(new MediaScanner);
boost::signals2::signal<void(const StoragePtr&)>::slot_type
slot(bind(&MediaScanner::StorageHandler, result.get(), _1));
slot.track_foreign(result);
storageManager->OnStorageAdded.connect(slot);
for (auto&& storage : storageManager->GetStorages())
result->StorageHandler(storage);
return result;
}
private:
MediaScanner() // приватный конструктор!
{ /* Проинициализировать всё, кроме обработчиков сигналов */ }
void StorageHandler(const StoragePtr& storage);
{ /* Здесь что-то долгое */ }
};
Итак, недостатки:
Давайте сделаем метод disconnect блокирующим, чтобы он гарантировал нам, что после того, как он вернёт управление, можно уничтожать всё, к чему имел доступ обработчик сигнала. Что-то вроде метода std::thread::join.
Забегая вперёд, скажу, что нам для этого понадобятся три класса:
Код класса signal_connection:
class signal_connection
{
life_token _token;
std::function<void()> _eraseHandlerFunc;
public:
signal_connection(life_token token, std::function<void()> eraseHandlerFunc)
: _token(token), _eraseHandlerFunc(eraseHandlerFunc)
{ }
~signal_connection();
{ disconnect(); }
void disconnect()
{
if (_token.released())
return;
_token.release(); // Тут мы ждём, если обработчик сейчас заблокирован (т. е. исполняется)
_eraseHandler(); // Та самая лямбда-функция, которая удалит обработчик из списка
}
};
Тут нужно сказать, что я сторонник RAII-шного объекта соединения. Останавливаться подробно на этом не буду, скажу только, что это в данном контексте несущественно.
Класс signal у нас тоже немного поменяется:
template < typename Signature >
class signal
{
using populator_type = std::function<void(const std::function<Signature>&)>;
struct handler
{
std::function<Signature> handler_func;
life_token::checker life_checker;
};
mutable std::mutex _mutex;
std::list<handler> _handlers;
populator_type _populator;
public:
// ...
signal_connection connect(std::function<Signature> handler)
{
std::lock_guard<std::mutex> l(_mutex);
life_token token;
_populator(handler);
_handlers.push_back(Handler{std::move(handler), life_token::checker(token)});
return signal_connection(token, [&]() { /* удаляем обработчик из _handlers */ } );
}
template < typename... Args >
void operator() (Args&&... args) const
{
for (auto&& handler : _handlers)
{
life_token::checker::execution_guard g(handler.life_checker);
if (g.is_alive())
handler.handler_func(forward<Args>(args)...);
}
}
};
Теперь у нас рядом с каждым обработчиком лежит объект life_token::checker, который ссылается на life_token, лежащий в signal_connection. Его мы захватываем на время выполнения обработчика при помощи объекта life_token::checker::execution_guard
class life_token
{
struct impl
{
std::mutex mutex;
bool alive = true;
};
std::shared_ptr<impl> _impl;
public:
life_token() : _impl(std::make_shared<impl>()) { }
~life_token() { release(); }
bool released() const { return !_impl; }
void release()
{
if (released())
return;
std::lock_guard<std::mutex> l(_impl->mutex);
_impl->alive = false;
_impl.reset();
}
class checker
{
shared_ptr<impl> _impl;
public:
checker(const life_token& t) : _impl(t._impl) { }
class execution_guard
{
shared_ptr<Impl> _impl;
public:
execution_guard(const checker& c) : _impl(c._impl) { _impl->mutex.lock(); }
~execution_guard() { _impl->mutex.unlock(); }
bool is_alive() const { return _impl->alive; }
};
};
};
Мьютекс захватывается на время жизни execution_guard. Соответственно, если в другом потоке в это время будет вызван метод life_token::release, он заблокируется на захвате того же мьютекса и дождётся окончания выполнения обработчика сигнала. После этого он сбросит флаг alive, и все последующие вызовы сигнала не приведут к вызову обработчика.
Как теперь выглядит код MediaScanner? Ровно так, как нам и хотелось его написать в самом начале:
class MediaScanner
{
private:
signals_connection _connection;
public:
MediaScanner(const StorageManagerPtr& storageManager)
{ _connection = storageManager->OnStorageAdded.connect([&](const StoragePtr& s) { this->StorageHandler(s); }); }
~MediaScanner()
{ _connection.disconnect(); }
private:
void StorageHandler(const StoragePtr& storage)
{ /* Здесь что-то долгое */ }
};
Пишем MediaUiModel, которая будет реагировать на найденные медиа-файлы и добавлять в себя строки для их отображения.
Для этого добавим в MediaScanner следующий сигнал:
signal<void(const MediaPtr&)> OnMediaFound;
Тут есть две важные вещи:
class MediaUiModel : public UiModel<MediaUiModelRow>
{
private:
boost::io_service& _uiThread;
boost::signals2::connection _connection;
public:
MediaUiModel(boost::io_service& uiThread, const MediaScanner& scanner)
: _uiThread(uiThread)
{
std::lock_guard<std::recursive_mutex> l(scanner.GetMutex());
scanner.OnMediaFound.connect([&](const MediaPtr& m) { this->MediaHandler(m); });
for (auto&& m : scanner.GetMedia())
AppendRow(MediaUiModelRow(m))
}
~MediaUiModel()
{ _connection.disconnect(); }
private:
// Этот метод выполняется в потоке MediaScanner'а, и всю реальную работу перебрасывает в поток UI.
void MediaHandler(const MediaPtr& m)
{ _uiThread.post([&]() { this->AppendRow(MediaUiModelRow(m)); }); }
};
Помимо предыдущей проблемы, тут есть ещё одна. Каждый раз, когда срабатывает сигнал, мы перекладываем обработчик в поток UI. Если в какой-то момент мы удаляем модель (например, вышли из приложения «Галерея»), все эти обработчики приходят позже в мёртвый объект. И опять падение.
Всё те же disconnect/deleteLater, с теми же достоинствами и недостатками.
Если вам повезло, и ваш UI-фреймворк позволяет сказать модели deleteLater, вы спасены. Вам достаточно сделать публичный метод disconnect, и жить примерно так же, как в Qt. Правда, предыдущую проблему вам всё же придётся решить. Для этого вы, скорее всего, сделаете внутри модели shared_ptr на некий класс, который и будете подписывать на сигналы. Кода не очень мало, но это дело техники.
Если же вам не повезло, и ваш UI-фреймворк требует удаления модели ровно тогда, когда ему захотелось, вы будете изобретать свой life_token.
template < typename Signature_ >
class AsyncToUiHandlerWrapper
{
private:
boost::io_service& _uiThread;
std::function<Signature_> _realHandler;
bool _released;
mutable std::mutex _mutex;
public:
AsyncToUiHandlerWrapper(boost::io_service& uiThread, std::function<Signature_> realHandler)
: _uiThread(uiThread), _realHandler(realHandler), _released(false)
{ }
void Release()
{
std::lock_guard<std::mutex> l(_mutex);
_released = true;
}
template < typename... Args_ >
static void AsyncHandler(const std::weak_ptr<AsyncToUiHandlerWrapper>& selfWeak, Args_&&... args)
{
auto self = selfWeak.lock();
std::lock_guard<std::mutex> l(self->_mutex);
if (!self->_released) // AsyncToUiHandlerWrapper не был освобождён, значит _uiThread всё ещё ссылается на живой объект
self->_uiThread.post(std::bind(&AsyncToUiHandlerWrapper::UiThreadHandler<Args_&...>, selfWeak, std::forward<Args_>(args)...)));
}
private:
template < typename... Args_ >
static void UiThreadHandler(const std::weak_ptr<AsyncToUiHandlerWrapper>& selfWeak, Args_&&... args)
{
auto self = selfWeak.lock();
if (!self)
return;
if (!self->_released) // AsyncToUiHandlerWrapper не был освобождён, значит, объекты, доступные _realHandler, ещё живы
self->_realHandler(std::forward<Args_>(args)...);
}
};
class MediaUiModel : public UiModel<MediaUiModelRow>
{
private:
using AsyncMediaHandler = AsyncToUiHandlerWrapper<void(const MediaPtr&)>;
private:
std::shared_ptr<AsyncMediaHandler> _asyncHandler;
public:
MediaUiModel(boost::io_service& uiThread, const MediaScanner& scanner)
{
try
{
_asyncHandler = std::make_shared<AsyncMediaHandler>(std::ref(uiThread), [&](const MediaPtr& m) { this->AppendRow(MediaUiModelRow(m)); });
std::lock_guard<std::recursive_mutex> l(scanner.GetMutex());
boost::signals2::signal<void(const MediaPtr&)>::slot_type
slot(std::bind(&AsyncMediaHandler::AsyncHandler<const MediaPtr&>, std::weak_ptr<AsyncMediaHandler>(_asyncHandler), std::placeholders::_1));
slot.track_foreign(_asyncHandler);
scanner.OnMediaFound.connect(slot);
for (auto&& m : scanner.GetMedia())
AppendRow(MediaUiModelRow(m));
}
catch (...)
{
Destroy();
throw;
}
}
~MediaUiModel()
{ Destroy(); }
private:
void Destroy()
{
if (_asyncHandler)
_asyncHandler->Release(); // Асинхронный код не обращается к MediaUiModel после этой строки, так что можно окончательно разрушать объект
_asyncHandler.reset();
}
};
Я даже не стану комментировать этот код, давайте просто немного погрустим.
Очень просто. Во-первых, сделать интерфейс для потока, как очереди задач:
struct task_executor
{
virtual ~task_executor() { }
virtual void add_task(const std::function<void()>& task) = 0;
};
Во-вторых, сделать в сигнале перегруженный метод connect, который принимает поток:
signal_connection connect(const std::shared_ptr<task_executor>& worker, std::function<Signature> handler);
В этом методе в коллекцию _handlers положить обёртку над обработчиком, которая при вызове перекладывает в нужный поток пару из обработчика и соответствующего life_token::checker. Для вызова реального обработчика в конечном потоке мы будем использовать execution_guard точно так же, как и раньше.
Таким образом, метод disconnect нам будет гарантировать в том числе и то, что асинхронные обработчики тоже не будут вызваны после того, как мы отключились от сигнала.
Код обёртки и перегруженного метода connect я здесь приводить не буду. Думаю, идея ясна и так.
Код модели же становится совсем простым:
class MediaUiModel : public UiModel<MediaUiModelRow>
{
private:
signal_connection _connection;
public:
MediaUiModel(const std::shared_ptr<task_executor>& uiThread, const MediaScanner& scanner)
{ _connection = scanner.OnMediaFound.connect(uiThread, [&](const MediaPtr& m) { this->AppendRow(MediaUiModelRow(m)); }); }
~MediaUiModel()
{ _connection.reset(); }
};
Здесь метод AppendRow будет вызываться строго в потоке UI, и лишь до тех пор, пока мы не отключимся.
Итак, есть три ключевые вещи, которые позволяют писать намного более простой код с использованием сигналов:
Конечно, код сигналов, который я тут привёл, очень прост и примитивен, и работает не очень быстро. Моей целью было рассказать об альтернативном подходе, который мне кажется более привлекательным, чем доминирующие сегодня. В реальности все эти вещи можно написать гораздо эффективнее.
Этот подход мы используем в нашем проекте около пяти лет, и очень счастливы.
Я переписал с использованием C++11 с нуля те сигналы, что у нас были, улучшил те части реализации, которые давно стоило улучшить.
Пользуйтесь на здоровье: https://github.com/koplyarov/wigwag [1].
Судя по реакции людей на реддите и в твиттере, в основном всех волнуют три вопроса:
Q: Тут же нужно блокировать life_token на вызов каждого обработчика. Не будет ли это медленно?
A: Как ни странно, нет. Можно вместо мьютекса использовать атомарные переменные, а если мы таки попали вызовом disconnect в тот момент, когда обработчик исполнялся, ждать на std::condition_variable. Тогда результат абсолютно противоположен: из-за отсутствующего оверхеда в виде track/track_foreign (которые требуют работы с коллекциями weak_ptr), эта реализация и по памяти и по скорости оставляет далеко позади boost::signals2, и даже опережает Qt.
Бенчмарки можно посмотреть тут [2].
Q: Не будет ли deadlock'ов из-за блокирующего метода disconnect?
A: Да, тут deadlock'и получить действительно чуть проще, чем в бусте и Qt. На мой взгляд, это окупается более простым кодом использования сигналов и более высокой скоростью их работы. К тому же, если аккуратно следить за тем, кто на кого подписан, то такие ситуации — скорее исключение.
Ну и, естественно deadlock'и нужно ловить и чинить. В Linux для этого рекомендую Helgrind [3]. Для Windows двухминутный поиск в гугле даёт Intel Inspector [4] и CHESS [5].
Если же по какой-то причине вы не можете себе позволить ничего из вышеперечисленного (например, на вашей платформе недостаточно памяти для запуска helgrind или вообще какая-нибудь маргинальная операционная система), есть костылерешение в виде вот такого (опять же, упрощённо) класса мьютекса:
class mutex
{
private:
std::timed_mutex _m;
public:
void lock()
{
if (_m.try_lock())
return;
while (!_m.try_lock_for(std::chrono::seconds(10)))
Logger::Warning() << "Could not lock mutex " << (void*)this << " for a long time:n" << get_backtrace_string();
}
// ...
};
И в Visual Studio и в GCC есть средства для получения бэктрейса в коде. Кроме того, есть неплохой libunwind.
С этим подходом большую часть ваших deadlock'ов поймают QA, а вы при одном взгляде на логи поймёте, где всё заблокировалось. Останется только починить.
Q: Можно ли использовать один мьютекс на несколько сигналов? Можно ли обрабатывать исключения так, как я хочу? Можно ли не использовать синхронизацию, и получить быстрые однопоточные сигналы?
A: Можно, можно, можно. Для этого всего есть шаблонные стратегии. Подробнее — в документации.
Автор: koplyarov_da
Источник [6]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/programmirovanie/115749
Ссылки в тексте:
[1] https://github.com/koplyarov/wigwag: https://github.com/koplyarov/wigwag
[2] тут: https://github.com/koplyarov/wigwag/wiki/Benchmarks
[3] Helgrind: http://valgrind.org/docs/manual/hg-manual.html
[4] Intel Inspector: https://software.intel.com/en-us/intel-inspector-xe
[5] CHESS: http://research.microsoft.com/en-us/projects/chess/
[6] Источник: https://habrahabr.ru/post/279851/
Нажмите здесь для печати.