SObjectizer: от простого к сложному. Часть II

в 5:40, , рубрики: actor model, c++, c++11, C++14, concurrency, multithreading, open source, Программирование

В первой статье речь шла о том, что такое SObjectizer. Во второй статье мы начали рассказывать как могут выглядеть агенты, почему, как и куда они эволюционируют. Сегодня мы продолжим этот рассказ, ещё более усложняя реализацию демонстрационных агентов. Заодно проверим надежность асинхронного обмена сообщениями.

В прошлый раз мы остановились на том, что операцию чтения содержимого файла с email-ом следует отдать на откуп отдельному IO-агенту. Давайте сделаем это и посмотрим, что получится.

Во-первых, нам потребуется набор сообщений, которыми между собой будут обмениваться IO-агент и email_analyzer:

// Запрос на загрузку содержимого файла.
struct load_email_request
{
  // Имя файла для загрузки.
  string email_file_;
  // Куда нужно прислать результат.
  mbox_t reply_to_;
};

// Успешный результат загрузки файла.
struct load_email_succeed
{
  // Содержимое файла.
  string content_;
};

// Неудачный результат загрузки файла.
struct load_email_failed
{
  // Описание причины неудачи.
  string what_;
};

Во-вторых, нам нужно определить, куда именно агент email_analyzer будет отсылать сообщение-запрос load_email_request. Мы могли бы пойти уже привычным путем: при регистрации IO-агента сохранить его direct_mbox, затем этот mbox передать параметром в конструктор агента analyzer_manager, затем параметром в конструктор каждого из агентов email_analyzer… В принципе, если бы нам нужно было бы иметь несколько разных IO-агентов, то так и следовало бы сделать. Но в нашей задачке вполне достаточно одного IO-агента. Что позволяет нам продемонстрировать именованные mbox-ы.

Именованный mbox создается обращением к so_5::environment_t::create_mbox(name). Если вызывать create_mbox несколько раз с одним и тем же именем, то возвращаться будет всегда один и тот же mbox, созданный при первом вызове create_mbox с этим именем.

IO-агент создает себе именованный mbox и подписывается на него. Агенты email_analyzer-ы получают этот же mbox когда им нужно отослать сообщение load_email_request. Тем самым мы избавляемся от необходимости «протаскивать» mbox IO-агента через analyzer_manager.

Теперь, когда мы определились с интерфейсом взаимодействия IO-агента и email_manager-а, мы можем сделать новый вариант агента email_analyzer:

// Пятая версия. С передачей IO-операции специальному IO-агенту.

class email_analyzer : public agent_t {
public :
  email_analyzer( context_t ctx,
    string email_file,
    mbox_t reply_to )
    : agent_t(ctx), email_file_(move(email_file)), reply_to_(move(reply_to))
  {}

  // Агент усложнился, у него появилось несколько обработчиков событий.
  // Поэтому подписки агента лучше определять в специально предназначенном
  // для этого виртуальном методе.
  virtual void so_define_agent() override {
    // Нам нужно получить два сообщения от IO-агента. Каждое
    // из эти сообщений будет обрабатываться своим событием.
    so_subscribe_self()
      .event( &email_analyzer::on_load_succeed )
      .event( &email_analyzer::on_load_failed );
  }

  virtual void so_evt_start() override {
    // При старте сразу же отправляем запрос IO-агенту для загрузки
    // содержимого email файла.
    send< load_email_request >(
        // mbox IO-агента будет получен по имени.
        so_environment().create_mbox( "io_agent" ),
        email_file_,
        // Ответ должен прийти на наш собственный mbox.
        so_direct_mbox() );
  }

private :
  const string email_file_;
  const mbox_t reply_to_;

  void on_load_succeed( const load_email_succeed & msg ) {
    try {
      // Стадии обработки обозначаем лишь схематично.
      auto parsed_data = parse_email( msg.content_ );
      auto status = check_headers( parsed_data->headers() );
      if( check_status::safe == status )
        status = check_body( parsed_data->body() );
      if( check_status::safe == status )
        status = check_attachments( parsed_data->attachments() );
      send< check_result >( reply_to_, email_file_, status );
    }
    catch( const exception & ) {
      // В случае какой-либо ошибки отсылаем статус о невозможности
      // проверки файла с email-ом по техническим причинам.
      send< check_result >(
          reply_to_, email_file_, check_status::check_failure );
    }
    // Больше мы не нужны, поэтому дерегистрируем кооперацию,
    // в которой находимся.
    so_deregister_agent_coop_normally();
  }

  void on_load_failed( const load_email_failed & ) {
    // Загрузить файл не удалось. Возвращаем инициатору запроса
    // отрицательный результат и завершаем свою работу.
    send< check_result >(
        reply_to_, email_file_, check_status::check_failure );
    so_deregister_agent_coop_normally();
  }
};

Теперь агенты email_analyzer делегируют IO-операции другому агенту, который знает, как делать это эффективно. Соответственно, агенты email_analyzer-ы на своих рабочих нитях будут заниматься либо раздачей заданий IO-агенту, либо же обработкой ответов email_analyzer-ов. Это дает нам возможность изменить взгляд на то, сколько агентов email_analyzer мы можем создавать и сколько рабочих нитей им нужно.

Когда каждый агент email_analyzer сам выполнял синхронную IO-операцию нам нужно было иметь столько рабочих нитей в пуле, сколько параллельных IO-операций мы хотели разрешить. При этом не было смысла создавать намного больше агентов email_analyzer, чем количество рабочих нитей в пуле. Если в пуле 16 нитей, а мы позволяем одновременно существовать 32-м агентам, то это приведет к тому, что половина этих агентов будет просто ждать, когда же для них освободится какая-нибудь из рабочих нитей.

Теперь, после выноса IO-операций на другой рабочий контекст, можно, во-первых, сократить количество рабочих нитей в пуле. Агенты email_analyzer в своих событиях будут выполнять, в основном, нагружающие процессор операции. Поэтому нет смысла создавать больше рабочих потоков, чем есть доступных вычислительных ядер. Значит, если у нас 4-х ядерный процессор, то нам потребуется не 16 нитей в пуле, а не более 4-х.

Во-вторых, если IO-операции занимают больше времени, чем обработка содержимого email, то мы получаем возможность создать больше агентов email_analyzer, чем нитей в пуле. Просто большинство из этих агентов будут ждать результата своей IO-операции. Хотя, если время загрузки email-а сравнимо или меньше времени анализа его содержимого, то этот пункт потеряет свою актуальность и мы сможем создавать всего на 1-2-3 агента email_analyzer больше, чем количество нитей в пуле. Все эти настройки легко делаются в одном месте – в агенте analyzer_manager. Достаточно поменять буквально пару констант в его коде и увидеть, как изменения сказываются на производительности нашего решения. Однако, тюнинг производительности – это отдельная большая тема, углубляться в которую сейчас преждевременно...

Итак, у нас появилась очередная версия агента email_analyzer, которая устраняет проблемы предыдущих версий. Можем ли мы считать ее приемлемой?

Нет.

Проблема в том, что получившуюся реализацию нельзя считать надёжной.

Эта реализация рассчитана на оптимистичный сценарий, в котором отсылаемые нами сообщения никогда не теряются и всегда доходят до адресата, а дойдя до адресата, всегда обрабатываются. После чего мы всегда получаем нужный нам ответ.

Суровая правда жизни, однако, состоит в том, что когда система строится на асинхронном обмене сообщениями между отдельными акторами/агентами, то этот самый асинхронный обмен нельзя считать абсолютно надёжной штукой. Сообщения могут теряться. И это нормально.

Потеря сообщений может происходить по разным причинам. Например, агент-получатель еще просто не успел подписаться на сообщение. Или агента-получателя вообще в данный момент нет. Либо он есть, но у него сработал механизм защиты от перегрузки (подробнее об этом в одной из последующих статей). Либо агент есть и сообщение до него даже дошло, но агент находится в состоянии, в котором это сообщение не обрабатывается. Либо агент есть, сообщение до него дошло, он даже начал его обрабатывать, но в процессе обработки случилась какая-то прикладная ошибка и сбойнувший агент не отослал ничего в ответ.

В общем, общение между агентами посредством асинхронных сообщений – это как взаимодействие хостов через UDP протокол. В большинстве случаев датаграммы доходят до получателей. Но иногда теряются по дороге или даже при обработке.

Вышесказанное означает, что load_email_request может не дойти до IO-агента. Или, до агента email_analyzer могут не дойти ответные сообщения load_email_successed/load_email_failed. И что у нас получится в этом случае?

Мы получим агента email_analyzer, который присутствует в системе, но ничего не делает. Не работает. Не собирается умирать. И не дает стартовать какому-то другому агенту email_analyzer. Если нам не повезет, то мы можем столкнуться с ситуацией, когда все созданные analyzer_manager-ом агенты email_analyzer-ы превратятся в этаких ничего не делающих полутрупов. После чего analyzer_manager будет просто накапливать заявки в своей очереди, а потом выбрасывать их оттуда после истечения тайм-аута. Но никакой полезной работы выполняться не будет.

Как выйти из этой ситуации?

Например, за счёт контроля тайм-аутов. Мы можем либо ввести контроль времени выполнения IO-операции агентом email_analyzer (т.е., если нет ответа слишком долго, то считать, что IO-операция завершилась неудачно). Либо же ввести контроль времени выполнения всей операции анализа email-а в агенте analyzer_manager. Либо сделать и то, и другое.

Для простоты ограничимся отсчётом тайм-аута IO-операции в агенте email_analyzer:

// Шестая версия. С контролем тайм-аута для ответа IO-агента.

class email_analyzer : public agent_t {
  // Этот сигнал потребуется для того, чтобы отслеживать отсутствие
  // ответа от IO-агента в течении разумного времени.
  struct io_agent_response_timeout : public signal_t {};

public :
  email_analyzer( context_t ctx,
    string email_file,
    mbox_t reply_to )
    : agent_t(ctx), email_file_(move(email_file)), reply_to_(move(reply_to))
  {}

  virtual void so_define_agent() override {
    so_subscribe_self()
      .event( &email_analyzer::on_load_succeed )
      .event( &email_analyzer::on_load_failed )
      // Добавляем еще обработку тайм-аута на ответ IO-агента.
      .event< io_agent_response_timeout >( &email_analyzer::on_io_timeout );
  }

  virtual void so_evt_start() override {
    // При старте сразу же отправляем запрос IO-агенту для загрузки
    // содержимого email файла.
    send< load_email_request >(
        so_environment().create_mbox( "io_agent" ),
        email_file_,
        so_direct_mbox() );

    // И сразу же начинам отсчет тайм-аута для ответа от IO-агента.
    send_delayed< io_agent_response_timeout >( *this, 1500ms );
  }

private :
  const string email_file_;
  const mbox_t reply_to_;

  void on_load_succeed( const load_email_succeed & msg ) {
    try {
      auto parsed_data = parse_email( msg.content_ );
      auto status = check_headers( parsed_data->headers() );
      if( check_status::safe == status )
        status = check_body( parsed_data->body() );
      if( check_status::safe == status )
        status = check_attachments( parsed_data->attachments() );
      send< check_result >( reply_to_, email_file_, status );
    }
    catch( const exception & ) {
      send< check_result >(
          reply_to_, email_file_, check_status::check_failure );
    }
    so_deregister_agent_coop_normally();
  }

  void on_load_failed( const load_email_failed & ) {
    send< check_result >(
        reply_to_, email_file_, check_status::check_failure );
    so_deregister_agent_coop_normally();
  }

  void on_io_timeout() {
    // Ведем себя точно так же, как и при ошибке ввода-вывода.
    send< check_result >(
        reply_to_, email_file_, check_status::check_failure );
    so_deregister_agent_coop_normally();
  }
};

Вот этот вариант email_analyzer можно считать уже вполне приемлемым. В его коде напрашивается рефакторинг с вынесением парочки операций (send и so_deregister_agent_coop_normally) в отдельный вспомогательный метод. Но это не было сделано специально, дабы код каждой последующей версии агента email_analyzer минимально отличался от кода предыдущей версии.

И как раз если сравнить две показанные выше версии агента email_analyzer, то станет заметна одна особенность, которую очень ценят программисты, давно использующие SObjectizer в повседневной работе: простота и понятность процедуры расширения агентов. Потребовалось агенту реагировать на еще какое-то событие? Значит нужно добавить еще одну подписку и еще один обработчик события. А поскольку подписки, как правило, делаются в одних и тех же местах, то сразу понятно, куда именно идти и что именно править.

SObjectizer не накладывает каких-то ограничений на то, где и как агент пописывает свои события, но следование простому соглашению – подписки делаются в so_define_agent(), либо, в совсем простых случаях, для final-классов агентов, в конструкторе – что сильно упрощает жизнь. Заглядываешь в код чужого агента или даже в код своего агента, но написанного несколько лет назад, и сразу знаешь, что именно нужно смотреть, чтобы разобраться в поведении агента. Удобно, хотя для понимания этого удобства нужно, пожалуй, написать и отладить не одного реального агента, и даже не двух...

Однако, вернемся к теме надёжности агентов, которая была затронута выше и из-за которой появилась очередная, шестая по счету, версия агента email_analyzer: механизм асинхронного обмена сообщениями между агентами не надежен и с этим нужно как то жить.

Здесь нужно сделать важную ремарку: неправильно говорить, что механизм доставки сообщений в SObjectizer ну совсем уж «дырявый» и позволяет себе терять любые сообщения когда ему вздумается.

Сообщения в SObjectizer просто так не теряются, у каждой потери есть своя причина. Если агент отсылает сообщение самому себе, и функция send завершилась успешно, то сообщение до агента дойдет. Если только сам разработчик не предпримет явным образом каких-то действий, предписывающих SObjectizer-у выбросить это сообщение в определённом случае (например, разработчик не подписывает агента на сообщение в каком-то из состояний или задействует limit_then_drop, для защиты от перегрузки).

Итак, если разработчик сам не разрешает SObjectizer-у в определенных ситуациях выбрасывать определенные сообщения, то сообщение, которое агент отправил самому себе, до агента должно дойти. Поэтому в показанном выше коде мы совершенно спокойно отсылали сами себе отложенные сообщения не опасаясь того, что эти сообщения будут потеряны где-то по дороге.

Однако, когда сообщение отсылается другому агенту, то ситуация несколько меняется. Бывают случаи, когда мы уверены в успешности доставки. Например, если мы сами реализовали агента-получателя, да ещё и включили его в ту же кооперацию, в которой живет агент-отправитель.

Но если агент-получатель написан не нами, создается и уничтожается в составе чужой кооперации, если его поведение мы не контролируем, если мы не знаем, как именно агент защищается от перегрузок, как он себя ведет в той или иной ситуации, то уверенность у нас такая же, как при отсылке датаграммы по UDP-протоколу: если всё нормально, то скорее всего датаграмма до отправителя дойдет, а мы затем получим ответ. Если всё нормально. А вот если нет?

Мы подошли к интересному моменту: разработка софта на акторах/агентах из-за относительной ненадёжности асинхронного обмена сообщениями может выглядеть более трудоёмкой, чем при использовании подходов на основе синхронного взаимодействия объектов в программе.

Временами так оно и есть. Но зато в итоге, по нашему мнению, софт получается более надёжным, т.к. в коде поневоле приходится обрабатывать множество нештатных ситуаций, связанных как с потерей сообщений, так и с вариациями времен доставки и обработки сообщений.

Предположим, что email_analyzer-ы обращаются к io_agent посредством синхронного запроса, а не асинхронного сообщения, а о сбоях при выполнении IO-операции io_agent информирует посредством выбрасывания исключений. Долгое время всё будет работать нормально: email_analyzer синхронно запрашивает io_agent и получает в ответ либо содержимое email-а, либо исключение. Но в один прекрасный момент где-то внутри io_agent проявляется скрытый баг, и синхронный вызов просто подвисает. Ни ответа, ни исключения, просто зависание. Соответственно, подвисает сначала один email_analyzer, затем еще один, затем еще один и т.д. В итоге подвисшим оказывается все приложение.

Тогда как при асинхронном обмене сообщениями агент io_agent может повиснуть где-то у себя в потрохах. Но это не скажется на агентах email_analyzer, которые имеют возможность легко отследить истечение тайм-аута запроса и отослать отрицательный результат. Т.е., даже при сбоях в одной части приложения другие части приложения смогут продолжать свою работу, пусть эта работа будет состоять в генерировании потока отрицательных ответов. Ведь сам факт этого потока может стать важным симптомом и подсказать наблюдателю, что в приложении что-то пошло не так.

Кстати говоря, на тему наблюдения за работой написанного на агентах приложения.

За годы работы с SObjectizer у нас сложилось убеждение, что возможность увидеть что происходит внутри построенного на акторах/агентах приложения очень важна. В принципе, это было показано даже в данной статье. Если взять пятую версию email_analyzer без контроля тайм-аутов и попробовать ее запустить, то можно увидеть, как обработка запросов замедляется до тех пор, пока не останавливается совсем. Но как именно понять, в чем дело?

Хорошую подсказку могла бы дать информация о том, сколько агентов email_analyzer создано в данный момент и чем занимается каждый из них. Для этого нужна возможность мониторинга происходящего внутри приложения. Как раз то, за что так ценят Erlang и его платформу: там можно подключиться к работающей Erlang VM, посмотреть список Erlang-овых процессов, их параметры и т.д. Но в Erlang это возможно за счет того, что Erlang-овое приложение работает под управлением Erlang VM.

В случае с нативным C++приложением ситуация сложнее. В SObjectizer были добавлены средства для мониторинга происходящего внутри SObjectizer Environment (хотя эти средства пока обеспечивают лишь самый базовый функционал). Так, с их помощью в процессе работы нашего демонстрационного приложения можно получить следующую информацию:

mbox_repository/named_mbox.count -> 1
coop_repository/coop.reg.count -> 20
coop_repository/coop.dereg.count -> 0
coop_repository/agent.count -> 20
coop_repository/coop.final.dereg.count -> 0
timer_thread/single_shot.count -> 0
timer_thread/periodic.count -> 1
disp/ot/DEFAULT/agent.count -> 3
disp/ot/DEFAULT/wt-0/demands.count -> 8
disp/tp/analyzers/threads.count -> 4
disp/tp/analyzers/agent.count -> 16
disp/tp/analyzers/cq/__so5_au...109__/agent.count -> 1
disp/tp/analyzers/cq/__so5_au...109__/demands.count -> 0
disp/tp/analyzers/cq/__so5_au...124__/agent.count -> 1
disp/tp/analyzers/cq/__so5_au...124__/demands.count -> 0
...
disp/tp/analyzers/cq/__so5_au..._94__/agent.count -> 1
disp/tp/analyzers/cq/__so5_au..._94__/demands.count -> 0
disp/ot/req_initiator/agent.count -> 1
disp/ot/req_initiator/wt-0/demands.count -> 0

Этот выхлоп мониторинговой информации позволяет понять, что есть диспетчер с пулом потоков с именем «analyzers», в котором работает 4 рабочих потока. Именно на этом диспетчере в примере работают агенты email_analyzer. К диспетчеру привязаны 16 агентов, каждый из которых составляет отдельную кооперацию. И у этих агентов нет заявок. Т.е., агенты есть, а работы для них нет. И это уже повод разобраться почему так произошло.

Очевидно, что далеко не всегда низкоуровневая информация, которой располагает SObjectizer Environment, будет полезна прикладному программисту. Скажем, в обсуждаемом примере гораздо больше пользы разработчику мог бы дать счётчик количества агентов email_analyzer и размер длины списка заявок в агенте analyzer_manager. Но это прикладные данные, SObjectizer не имеет о них никакого понятия. Поэтому при разработке приложения на агентах программисту нужно позаботится о том, чтобы извне приложения была доступна информация, максимально полезная для оценки работоспособности и жизнеспособности приложения. Хотя это уже большая тема для отдельного разговора.

Пожалуй, на этом очередную статью можно закончить. В следующей статье попробуем показать, что можно сделать, если попробовать распараллелить операции анализа email-а ещё больше. Скажем, если выполнять параллельно операции анализа заголовков, тела письма и аттачей. И во что тогда превратится код агента email_analyzer.

Исходные коды к показанным в статье примерам можно найти в этом репозитории.

Автор: eao197

Источник

Поделиться новостью

* - обязательные к заполнению поля