Боремся в deadlock-ами: паттерн unlocked callbacks

в 8:05, , рубрики: deadlock, Mutex, pattern, thread, Блог компании Нордавинд, Проектирование и рефакторинг, метки: , , ,

Ситуации взаимной блокировки

В Википедии дается следующее определение взаимной блокировки: «Взаимная блокировка (англ. deadlock) — ситуация в многозадачной среде или СУБД, при которой несколько процессов находятся в состоянии бесконечного ожидания ресурсов, занятых самими этими процессами».

Взаимные блокировки носят, как правило, динамический характер: их проявление зависит от таких факторов, как действия пользователя, доступность сетевых сервисов, позиционирование головки жесткого диска, переключение задач в системе с вытесняющей многозадачностью и т.п.

Классический пример взаимной блокировки: первый поток (A) захватывает мьютекс M1 и следом мьютекс M2. Второй поток (B) захватывает мьютекс M2, а уже после этого – мьютекс M1. Взаимная блокировка этих двух потоков может произойти следующим образом: поток A захватывает M1, поток B захватывает M2, после этого оба потока «обречены»: ни поток A не может захватить M2, ни поток B не может захватить M1; попытки захвата мьютексов заблокируют оба потока.

Описанная взаимная блокировка произойдет только в том случае, если оба потока успеют захватить ровно по одному мьютексу. В противном случае потоки продолжат свое выполнение.

Данная ситуация очень распространена в сложных многопоточных системах. Как правило, мьютексы-участники расположены далеко друг от друга (в различных компонентах системы), и выявить участников взаимной блокировки оказывается достаточно сложно.

Распространенная ситуация в реальной системе

Один из частных случаев возникновения описанной выше взаимной блокировки выглядит следующим образом:

  • некоторый объект (Worker) осуществляет некоторую деятельность в отдельном потоке; объект Worker в начале своего существования создает этот поток, и сам же его завершает перед своим уничтожением;
  • деятельность потока заключается в циклическом повторении некоторых действий (например, получение данных из сети);
  • результаты этих отдельных действий выдаются потребителям через функции или интерфейсы обратного вызова (callback);
  • в любой момент жизни объекта Worker может появиться новый потребитель или исчезнуть существующий; объект Worker ведет список потребителей и предоставляет функции для регистрации и удаления потребителей (registerCallback и unregisterCallback соответственно);
  • список потребителей является разделяемым ресурсом: к нему имеет доступ внутренний поток объекта Worker, а также потребители, которые делают вызовы registerCallback и unregisterCallback из контекста своего потока;
  • так как список потребителей является разделяемым ресурсом, он защищается с помощью мьютекса, которым владеет объект Worker; при захваченном мьютексе выполняется модификация списка потребителей; при захваченном мьютексе производится передача данных потребителям из объекта Worker.

Чем опасна описанная выше ситуация? Тем, что на этапе разработки объекта Worker разработчик еще не знал, какие именно функции системы будут вызваны через интерфейс обратного вызова. Он только предъявил требования к интерфейсу: функция должна иметь такие-то параметры, через которые будут передаваться такие-то данные. И этот вызов «в неизвестном направлении» делается при захваченном мьютексе.

Достаточно к описанной картине добавить несколько штрихов (в сложных системах так и происходит):

  • внутри одного из потребителей объекта Worker делается захват мьютекса M (с последующим его освобождением, разумеется);
  • еще один объект системы при захваченном мьютексе M регистрирует себя в качестве потребителя объекта Worker.

Всё. Получилась ситуация, при которой возможна взаимная блокировка. Она не будет происходить в 100% случаев (необходима определенная динамика, чтобы каждый из потоков-участников успел захватить только один мьютекс), и это существенно осложняет поиск таких ошибок.

Далее предлагаются два способа решения этой проблемы.

Способ 1: изменить порядок блокировки

Объект Worker предоставит отдельные функции для блокирования и разблокирования своего внутреннего мьютекса, а потребитель будет регистрироваться следующим образом:

  1. Сначала (заблоговременно) будет блокироваться внутренний мьютекс объекта Worker.
  2. Уже после этого потребитель будет выполнять действия, требующие захвата мьютекса M.
  3. Потребитель зарегистрируется у объекта Worker; захват внутреннего мьютекса внутри объекта Worker не производится.

Недостатки этого способа очевидны:

  • на каждого потребителя ложится бремя предварительных действий перед работой с объектом Worker; вероятность ошибки со стороны программиста увеличивается;
  • не всегда такое возможно вообще: а что, если с помощью мьютекса M обеспечивался безопасный доступ к указателю на объект Worker?
  • «кишки наружу» — это просто некрасиво.

Способ 2: не блокировать мьютекс при передаче данных потребителям

Этот способ звучит многообещающе: если передача данных из внутреннего потока объекта Worker потребителям будет происходить при не заблокированных мьютексах, это исправит все возможные в дальнейшем взаимные блокировки при работе с объектом Worker.

Почему бы просто не делать обратный вызов при не захваченном мьютексе? Потому что поток объекта Worker должен пройтись по списку зарегистрированных потребителей и вызвать функцию интерфейса каждого потребителя. Если список не будет защищен мьютексом, и содержимое списка изменится в ходе этого цикла, вероятно зацикливание или даже аварийное завершение программы из-за некорректного обращения к памяти.

Почему бы не сделать копию списка потребителей (при создании копии захватывать мьютекс), а дальше пройти циклом по копии? Потому что нужно гарантировать потребителю, что после вызова unregisterCallback ему не будут переданы данные. Если потребитель вызывает unregisterCallback из своего деструктора, последующая передача данных в интерфейс обратного вызова этого потребителя приведет к аварийному завершению программы.

Таким образом, мы почти пришли к решению:

  • поток объекта Worker перед вызовом интерфейсов зарегистрированных потребителей должен создать копию списка потребителей; копия создается при захваченном мьютексе, для передачи данных потребителям поток делает цикл по копии списка;
  • нужно обеспечить, чтобы после завершения вызова unregisterCallback никакие данные потребителю не передавались; так как копия списка потребителей создается при каждом вызове из потока потребителям, риск «запоздалого вызова» существует только один раз, если unregisterCallback вызван в ходе прохода по копии списка потребителей;
  • и, таким образом, необходимо как-то идентифицировать состояние, когда поток объекта Worker выполняет передачу данных потребителям и работает с копией списка потребителей; если unregisterCallback вызван, когда поток находится в этом состоянии, возврат из unregisterCallback должен быть задержан до того момента, когда завершится передача данных потребителям.

Вот и готовое решение. Для его реализации потребуется еще один объект синхронизации — «условная переменная» (англ. condition variable):

  • объект Worker содержит:
    • список потребителей;
    • флаг, означающий, что поток в настоящий момент выполняет передачу данных потребителям;
    • мьютекс для разграничения доступа к списку потребителей и флагу;
    • условную переменную для организации ожидания и пробуждения потребителя, который вызвал unregisterCallback «не вовремя»;
  • изначально флаг инициализируется значением false;
  • registerCallback захватывает мьютекс, добавляет потребителя в список и освобождает мьютекс;
  • поток перед циклом передачи данных потребителям выполняет следующие действия:
    • захватывает мьютекс;
    • создает копию списка потребителей; копия создается в стеке потока;
    • устанавливает флаг в значение true;
    • освобождает мьютекс;
  • поток передает данные потребителям, проходя в цикле по копии списка потребителей; мьютекс при этом не захвачен, а флаг установлен;
  • после передачи данных потребителям поток выполняет следующие действия:
    • захватывает мьютекс;
    • сбрасывает флаг в значение false;
    • с использованием условной переменной пробуждает все ожидающие потоки;
    • освобождает мьютекс;
  • и — самое интересное — unregisterCallback:
    • захватывает мьютекс;
    • удаляет потребителя из списка;
    • до тех пор, пока флаг установлен, ожидает на условной переменной; атомарно с переходом в состояние ожидания освобождается мьютекс, и атомарно с пробуждением мьютекс захватывается;
    • освобождает мьютекс.

Важное замечание: если unregisterCallback может быть вызван из интерфейса обратного вызова, реализованного потребителем, то описанный алгоритм приведет к 100-процентному зависанию внутри unregisterCallback. Это легко решается: если unregisterCallback вызван в контексте внутреннего потока объекта Worker, проверять флаг и ожидать изменения условной переменной не нужно.

Реализация с использованием средств синхронизации библиотеки Qt

Заголовочный файл:

class ICallback
{
public:
    virtual void dataReady(QByteArray data) = 0;
};

class Worker : public QThread
{
public:
    Worker();

    void registerCallback(ICallback *callback);
    void unregisterCallback(ICallback *callback);

protected:
    virtual void run();

private:
    QMutex                    _mutex;
    QWaitCondition            _wait;
    bool                      _callingNow;
    QLinkedList<ICallback *>  _callbacks;
};

Реализация:

Worker::Worker() :
    QThread(),
    _mutex(QMutex::NonRecursive),
    _callingNow(false)
{
    ...
}

void Worker::registerCallback(ICallback *callback)
{
    QMutexLocker locker(&_mutex);
    _callbacks.append(callback);
}

void Worker::unregisterCallback(ICallback *callback)
{
    QMutexLocker locker(&_mutex);
    _callbacks.removeOne(callback);
    if(QThread::currentThread()!=this)
    {
        while(_callingNow)
            _wait.wait(&_mutex);
    }
}

void Worker::run()
{
    while(...)
    {
        QByteArray data;
        ...
        QLinkedList<ICallback *> callbacksCopy;

        _mutex.lock();
        _callingNow=true;
        callbacksCopy=_callbacks;
        _mutex.unlock();

        for(QLinkedList<Callback *>::const_iterator it=callbacksCopy.begin();
            it!=callbacksCopy.end();
            ++it)
        {
            (*it)->dataReady(data);
        }

        _mutex.lock();
        _callingNow=false;
        _wait.wakeAll();
        _mutex.unlock();
    }
}

Автор: Nordavind

Источник

Поделиться

* - обязательные к заполнению поля