SObjectizer: от простого к сложному. Часть III

в 4:58, , рубрики: actor model, c++, c++11, C++14, concurrency, multithreading, open source, Программирование

В очередной статье про SObjectizer продолжим следить за эволюцией простого поначалу агента, который все более и более усложняется по мере своего развития. Рассмотрим, как быть с отложенными сообщениями, в которых мы больше не заинтересованы. И воспользуемся некоторой функциональностью иерархических конечных автоматов.

В предыдущей статье мы остановились на том, что у нас появился агент email_analyzer, который можно считать более-менее надежно решающим свою задачу. Однако, он сам, последовательно, выполняет три стадии проверки email-а: сперва проверяет заголовки, затем содержимое, затем аттачи.

Скорее всего, каждая из этих операций не будет исключительно CPU-bound. Намного вероятнее, что вычленив какие-то значения из проверяемого фрагмента (например, из заголовков письма), потребуется сделать куда-то запрос для проверки допустимости этого значения. Например, запрос в БД дабы проверить, нет ли имени хоста-отправителя в черном списке. Пока будет выполняться данный запрос можно было бы выполнить еще какую-то операцию, например, разобрать содержимое текста письма на отдельные ключевые фразы, дабы их можно было проверить по какому-то словарю спам-маркеров. Или проверить, есть ли в аттачах архивы, и инициировать их проверку антивирусом. В общем, имеет смысл распараллелить операции анализа email-а.

Давайте попробуем задействовать отдельных агентов на каждую операцию. Т.е. можно написать агентов вида:

class email_headers_checker : public agent_t {
public :
  struct result { check_status status_ }; /* Сообщение с результатом */
  email_headers_checker( context_t ctx, ... /* Какие-то параметры */ ) {...}
  virtual void so_evt_start() override {
    ... /* Иницирование операций по проверке заголовков */
  }
... /* Какие-то детали реализации */
};
class email_body_checker : public agent_t {...};
class email_attachment_checker : public agent_t {...};

Каждый такой агент будет выполнять специфические для своей операции действия, а затем отошлет результат email_analyzer в виде сообщения. Нашему email_analyzer потребуется создать экземпляры этих агентов у себя и дождаться от них сообщений с результатами анализа:

void on_load_succeed( const load_email_succeed & msg ) {
  try {
    auto parsed_data = parse_email( msg.content_ );
    introduce_child_coop( *this,
      // Агенты-checker-ы будут работать на своем собственном
      // thread-pool-диспетчере, который был создан заранее
      // под специальным именем.
      disp::thread_pool::create_disp_binder(
          "checkers", disp::thread_pool::bind_params_t{} ),
      [&]( coop_t & coop ) {
        coop.make_agent< email_headers_checker >(
            so_direct_mbox(), parsed_data->headers() );
        coop.make_agent< email_body_checker >(
            so_direct_mbox(), parsed_data->body() );
        coop.make_agent< email_attach_checker >(
            so_direct_mbox(), parsed_data->attachments() );
      } );
  }
  catch( const exception & ) {...}
}

Тех, кто внимательно читал предыдущие статьи, фраза «дождаться от них сообщений» должна была бы насторожить. Ждать без ограничения времени не есть хорошо, это прямой путь получить зря болтающегося в системе и ничего не делающего агента. Поэтому при ожидании ответов от checker-ов нам имеет смысл поступить так же, как и при ожидании результата IO-операции: отослать самим себе какой-то отложенный сигнал, получив который мы поймем, что дальше ждать бессмысленно. Т.е. нам надо было бы написать что-то вроде:

// Попытка представить агента email_analyzer с двумя отложенными сигналами.
class email_analyzer : public agent_t {
  // Этот сигнал потребуется для того, чтобы отслеживать отсутствие
  // ответа от IO-агента в течении разумного времени.
  struct io_agent_response_timeout : public signal_t {};
  // Этот сигнал потребуется для того, чтобы отслеживать отсутствие
  // результатов проверки отдельных частей email-а.
  struct checkers_responses_timeout : public signal_t {};
...
  virtual void so_evt_start() override {
    ... /* Отсылка запроса IO-агенту */
    // И сразу же начинаем отсчет тайм-аута для ответа от IO-агента.
    send_delayed< io_agent_response_timeout >( *this, 1500ms );
  }
...
  void on_load_succeed( const load_succeed & msg ) {
    ... /* Создание коопераций с агентами checker-ами */
    // Сразу же начинаем отсчет тайм-аута для ответов от агентов-checker-ов.
    send_delayed< checkers_responses_timeout >( *this, 750ms );
  }
...
  void on_checkers_responses_timeout() {
    ... /* Отсылка отрицательного ответа. */
  }
};

Однако, пойдя по этому пути мы наступим на грабли: ожидая ответа от checker-ов мы запросто можем получить отложенный сигнал io_agent_response_timeout. Ведь его же никто не отменял. И когда это сигнал придет, мы сгенерируем отрицательный ответ из-за якобы имеющегося тайм-аута ввода-вывода, которого-то и нет. Давайте попробуем обойти эти грабли.

Зачастую разработчики, не привыкшие к асинхронному обмену сообщениями, пытаются отменить отложенный сигнал. Это можно сделать, если сохранить идентификатор таймера при обращении к send_periodic:

// Попытка представить агент email_analyzer с отменой отложенного
// сигнала io_agent_response_timeout.
class email_analyzer : public agent_t {
  struct io_agent_response_timeout : public signal_t {};
...
  virtual void so_evt_start() override {
    ... /* Отсылка запроса IO-агенту */
    // Для того, чтобы получить идентификатор таймера используем
    // send_periodic вместо send_delayed, но параметр period
    // выставляем в 0, что делает отсылаемый сигнал отложенным,
    // но не периодическим.
    io_response_timer_ = send_periodic< io_agent_response_timeout >(
      *this, 1500ms, 0ms );
  }
...
  void on_load_succeed( const load_succeed & msg ) {
    // Отменяем отложенный сигнал.
    io_response_timer_.reset();
    ... /* Создание коопераций с агентами checker-ами */
    // Сразу же начинаем отсчет тайм-аута для ответов от агентов-checker-ов.
    send_delayed< checkers_responses_timeout >( *this, 750ms );
  }
...
  // Идентификатор таймера для отложенного сигнала о тайм-ауте для IO-операции.
  timer_id_t io_response_timer_;
};

К сожалению, этот простой способ не всегда работает. Проблема в том, что отложенный сигнал может быть отослан агенту email_analyzer буквально за мгновение до того, как агент email_analyzer выполнит сброс таймера для этого отложенного сигнала. Тут уж ничего не поделать – чудеса многопоточности, они такие.

Агент email_analyzer может зайти в on_load_succeed на контексте своей рабочей нити, может даже успеть войти в вызов reset() для таймера… Но тут его нить вытеснят, управление получит нить таймера SObjectizer-а, на которой произойдет отсылка отложенного сигнала. После чего управление опять получит рабочая нить агента email_analyzer() и метод reset() для таймера сделает отмену уже отосланного сигнала. Однако, сигнал уже находится в очереди сообщений агента, откуда его уже никто не выбросит – раз уж сообщение попало в очередь к агенту, то изъять его оттуда нельзя.

Самое плохое в этой ситуации то, что подобная ошибка будет возникать эпизодически. Из-за чего понять, что именно происходит и в чем именно ошибка, будет сложно. Так что нужно помнить, что отмена отложенного сообщения – это вовсе не гарантия того, что оно не будет отослано.

Итак, если просто отменять отложенное сообщение неправильно, то что же делать?

Например, можно использовать состояния агента. Когда email_analyzer ждет ответа от IO-агента, он находится в одном состоянии. Когда ответ от IO-агента приходит, агент email_analyzer переходит в другое состояние, в котором он будет ждать ответов от checker-ов. Т.к. во втором состоянии email_analyzer на сигнал io_agent_response_timeout не подписан, то этот сигнал будет просто проигнорирован.

С введением состояний в агент email_analyzer мы могли бы получить что-то вроде:

// Попытка представить агент email_analyzer с использованием
// нескольких состояний.
class email_analyzer : public agent_t {
  struct io_agent_response_timeout : public signal_t {};
  struct checkers_responses_timeout : public signal_t {};

  // Состояние, в котором агент будет ждать результата IO-операции.
  state_t st_wait_io{ this };
  // Состояние, в котором агент будет ждать ответов от checker-ов.
  state_t st_wait_checkers{ this };
  ...
  virtual void so_define_agent() override {
    // Подписываем агента на разные события в разных состояниях.
    // Для того, чтобы это было наглядно, используем вторую способ
    // подписки агентов – через методы класса state_t.
    st_wait_io
      .event( &email_analyzer::on_load_succeed )
      .event( &email_analyzer::on_load_failed )
      .event< io_agent_response_timeout >( &email_analyzer::on_io_timeout );
    st_wait_checkers
      .event( &email_analyzer::on_header_check_result )
      .event( &email_analyzer::on_body_check_result )
      .event( &email_analyzer::on_attach_check_result )
      .event< checkers_responses_timeout >( &email_analyzer::on_checkers_timeout );
  }
  ...
};

Однако, в SObjectizer можно поступить еще проще: можно назначить временной лимит на пребывание агента в конкретном состоянии. Когда этот лимит истечет, агент будет принудительно переведен в другое состояние. Т.е. мы можем написать что-то вроде:

// Попытка представить агента email_analyzer с использованием ограничения времени
// на пребывание агента в конкретном состоянии.
class email_analyzer : public agent_t {
  state_t st_wait_io{ this };
  state_t st_io_timeout{ this };

  state_t st_wait_checkers{ this };
  state_t st_checkers_timeout{ this };
...
  virtual void so_define_agent() override {
    st_wait_io
      .event( &email_analyzer::on_load_succeed )
      .event( &email_analyzer::on_load_failed )
      // Ограничиваем время ожидания.
      .time_limit( 1500ms, st_io_timeout );
    st_wait_checkers
      .event( &email_analyzer::on_header_check_result )
      .event( &email_analyzer::on_body_check_result )
      .event( &email_analyzer::on_attach_check_result )
      .time_limit( 750ms, st_checkers_timeout );
  }
};

Но просто ограничить время пребывания в некотором состоянии недостаточно. Нужно еще предпринять какие-то действия, когда это время истечет. Как это сделать?

Использовать такую вещь, как обработчик входа в состояние. Когда агент входит в конкретное состояние, SObjectizer вызывает функцию-обработчик входа в это состояние, если пользователь такую функцию назначил. Это означает, что на вход в st_io_timeout мы можем повесить обработчик, который отсылает check_result с отрицательным результатом и завершает работу агента:

st_io_timeout.on_enter( [this]{
  send< check_result >( reply_to_, email_file_, check_status::check_failure );
  so_deregister_agent_coop_normally();
} );

Точно такой же обработчик мы повесим и на вход в st_checkers_timeout. А т.к. действия внутри этих обработчиков будут одинаковыми, то мы можем вынести их в отдельный метод агента email_analyzer и указать этот метод в качестве обработчика входа и для состояния st_io_timeout, и для состояния st_checkers_timeout:

class email_analyzer : public agent_t {
  state_t st_wait_io{ this };
  state_t st_io_timeout{ this };

  state_t st_wait_checkers{ this };
  state_t st_checkers_timeout{ this };
...
  virtual void so_define_agent() override {
    ...
    st_io_timeout
      .on_enter( &email_analyzer::on_enter_timeout_state );
    ...
    st_checkers_timeout
      .on_enter( &email_analyzer::on_enter_timeout_state );
  };
...
  void on_enter_timeout_state() {
    send< check_result >( reply_to_, email_file_, check_status::check_failure );
    so_deregister_agent_coop_normally();
  }
};

Но и это еще не все. Раз уж мы затронули тему состояний агентов и их возможностей, то можно развить ее дальше и провести рефакторинг кода email_analyzer.

Нетрудно заметить, что в коде очень часто дублируется парочка действий: отсылка сообщения check_result и дерегистрация кооперации агента. Такое дублирование не есть хорошо, cледует от него избавиться.

По сути, работа агента email_analyzer сводится к тому, чтобы в итоге агент оказался в одном из двух состояний: либо все завершилось нормально и следует отослать положительный результат, после чего завершить свою работу, либо же все завершилось ошибкой, нужно отослать отрицательный результат и, опять таки, завершить работу агента. Так давайте это и выразим прямо в коде с помощью двух состояний агента: st_success и st_failure.

// Попытка представить агента email_analyzer со специальными финальными
// состояниями st_success и st_failure.
class email_analyzer : public agent_t {
  state_t st_wait_io{ this };
  state_t st_wait_checkers{ this };

  state_t st_failure{ this };
  state_t st_success{ this };
...
  virtual void so_define_agent() override {
    st_wait_io
      .event( &email_analyzer::on_load_succeed )
      .event( &email_analyzer::on_load_failed )
      // Ограничиваем время ожидания.
      .time_limit( 1500ms, st_failure );
    st_wait_checkers
      .event( &email_analyzer::on_header_check_result )
      .event( &email_analyzer::on_body_check_result )
      .event( &email_analyzer::on_attach_check_result )
      .time_limit( 750ms, st_failure );
    st_failure
      .on_enter( [this]{
        send< check_result >( reply_to_, email_file_, status_ );
        so_deregister_agent_coop_normally();
      } );
    st_success
      .on_enter( [this]{
        send< check_result >( reply_to_, email_file_, check_status::safe );
        so_deregister_agent_coop_normally();
      } );
  };
...
  // Новый атрибут нужен для сохранения актуального отрицательного результата.
  check_status status_{ check_status::check_failure };
};

Это позволит нам в коде агента просто менять состояние для завершения работы агента тем или иным образом:

void on_load_failed( const load_email_failed & ) {
  st_failure.activate();
}

void on_checker_result( check_status status ) {
  // На первом же неудачном результате прерываем свою работу.
  if( check_status::safe != status ) {
    status_ = status;
    st_failure.activate();
  }
  else {
    ++checks_passed_;
    if( 3 == checks_passed_ )
      // Все результаты получены. Можно завершать проверку с
      // положительным результатом.
      st_success.activate();
  }
}

Но можно пойти и еще дальше. Для состояний st_failure и st_success есть одно общее действие, которое нужно выполнить при входе в любое их этих состояний – обращение к so_deregister_agent_coop_normally(). И это не случайно, ведь оба этих состояния отвечают за завершение работы агента. А раз так, то мы можем воспользоваться вложенными состояниями. Т.е. мы введем состояние st_finishing, для которого st_failure и st_success будут подсостояниями. При входе в st_finishing будет вызываться so_deregister_agent_coop_normally(). А при входе в st_failure и st_success – будет только отсылаться соответствующее сообщение.

Т.к. состояния st_failure и st_success вложены в st_finishing, то при входе в любое из них сначала будет вызваться обработчик входа в st_finishing, а уже затем – обработчик входа в st_failure или st_success. Получится, что мы при входе в st_finishing мы дерегистрируем агента, а следом, при входе в st_failure или st_success, отсылаем сообщение check_result.

Если кто-то из читателей чувствует себя не комфортно при упоминании вложенных состояний, обработчиков входа в состояния, ограничений на время пребывания в состоянии, то имеет смысл ознакомится с одной из основополагающих статей на тему иерархических конечных автоматов: David Harel, Statecharts: A visual formalism for complex systems. Science of Computer Programming. Состояния агентов в SObjectizer реализуют изрядную часть описанных там возможностей.

В итоге всех этих преобразований агент email_analyzer примет показанный ниже вид.

// Седьмая версия агента email_analyzer, с распараллеливанием работы по проверке
// содержимого email-а и использованием вложенных состояний.

class email_analyzer : public agent_t {
  state_t st_wait_io{ this };
  state_t st_wait_checkers{ this };

  state_t st_finishing{ this };
  state_t st_failure{  initial_substate_of{ st_finishing } };
  state_t st_success{ substate_of{ st_finishing } };

public :
  email_analyzer( context_t ctx,
    string email_file,
    mbox_t reply_to )
    : agent_t(ctx), email_file_(move(email_file)), reply_to_(move(reply_to))
  {}

  virtual void so_define_agent() override {
    st_wait_io
      .event( &email_analyzer::on_load_succeed )
      .event( &email_analyzer::on_load_failed )
      // Назначаем тайм-аут для ожидания ответа.
      .time_limit( 1500ms, st_failure );

    st_wait_checkers
      .event( [this]( const email_headers_checker::result & msg ) {
          on_checker_result( msg.status_ );
        } )
      .event( [this]( const email_body_checker::result & msg ) {
          on_checker_result( msg.status_ );
        } )
      .event( [this]( const email_attach_checker::result & msg ) {
          on_checker_result( msg.status_ );
        } )
      // Еще один тайм-аут для ответов.
      .time_limit( 750ms, st_failure );

    // Для состояний, которые отвечают за завершение работы,
    // нужно определить только обработчики входа.
    st_finishing.on_enter( [this]{ so_deregister_agent_coop_normally(); } );
    st_failure.on_enter( [this]{
        send< check_result >( reply_to_, email_file_, status_ );
      } );
    st_success.on_enter( [this]{
        send< check_result >( reply_to_, email_file_, check_status::safe );
      } );
  }

  virtual void so_evt_start() override {
    // Начинаем работать в состоянии по умолчанию, поэтому
    // нужно принудительно перейти в нужное состояние.
    st_wait_io.activate();

    // При старте сразу же отправляем запрос IO-агенту для загрузки
    // содержимого email файла.
    send< load_email_request >(
        so_environment().create_mbox( "io_agent" ),
        email_file_,
        so_direct_mbox() );
  }

private :
  const string email_file_;
  const mbox_t reply_to_;

  // Храним последний отрицательный результат для того, чтобы отослать
  // его при входе в состояние st_failure.
  check_status status_{ check_status::check_failure };

  int checks_passed_{};

  void on_load_succeed( const load_email_succeed & msg ) {
    // Меняем состояние т.к. переходим к следующей операции.
    st_wait_checkers.activate();

    try {
      auto parsed_data = parse_email( msg.content_ );
      introduce_child_coop( *this,
        // Агенты-checker-ы будут работать на своем собственном
        // thread-pool-диспетчере, который был создан заранее
        // под специальным именем.
        disp::thread_pool::create_disp_binder(
            "checkers", disp::thread_pool::bind_params_t{} ),
        [&]( coop_t & coop ) {
          coop.make_agent< email_headers_checker >(
              so_direct_mbox(), parsed_data->headers() );
          coop.make_agent< email_body_checker >(
              so_direct_mbox(), parsed_data->body() );
          coop.make_agent< email_attach_checker >(
              so_direct_mbox(), parsed_data->attachments() );
        } );
    }
    catch( const exception & ) {
      st_failure.activate();
    }
  }

  void on_load_failed( const load_email_failed & ) {
    st_failure.activate();
  }

  void on_checker_result( check_status status ) {
    // На первом же неудачном результате прерываем свою работу.
    if( check_status::safe != status ) {
      status_ = status;
      st_failure.activate();
    }
    else {
      ++checks_passed_;
      if( 3 == checks_passed_ )
        // Все результаты получены. Можно завершать проверку с
        // положительным результатом.
        st_success.activate();
    }
  }
};

Ну а теперь имеет смысл посмотреть на код получившегося агента email_analyzer и задать себе простой, но важный вопрос: а оно того стоило?

Очевидно, что с ответом на этот вопрос все не так однозначно. Но поговорить об этом мы попробуем уже в следующей статье. В которой затронем тему уроков, которые мы извлекли после более чем десяти лет использования SObjectizer в разработке программных систем.

Исходные коды к показанным в статье примерам можно найти в этом репозитории.

Автор: eao197

Источник

Поделиться новостью

* - обязательные к заполнению поля