- PVSM.RU - https://www.pvsm.ru -

Почтовые ящики, которые и не ящики вовсе…

Когда летом 2016-го года создавалась первая статья про SObjectizer [1] мы говорили, что со временем будем рассказывать и о деталях его реализации, дабы заинтересованные читатели могли заглянуть «под капот». Сегодняшняя статья будет как раз про потроха SObjectizer-а. Про механизм mbox-ов («почтовых ящиков»), который используется для организации взаимодействия акторов (агентов в нашей терминологии).

Почему речь именно про mbox-ы?

Потому, что мы сами удивлены, насколько много очень похожих вопросов вызывает этот механизм у тех, кто берется изучать SObjectizer. Оказалось, что вещь, хорошо знакомая, понятная и привычная нам, разработчикам SObjectizer, отнюдь не является таковой для новичков. Ну а раз так, то давайте попробуем разобраться, что же из себя представляют mbox-ы и как же они работают. А заодно и попробуем сделать свой собственный mbox.

Зачем нужны mbox-ы?

Почтовые ящики в SObjectizer нужны для того, чтобы организовывать взаимодействие между агентами. Общение между агентами строится посредством асинхронных сообщений и эти самые сообщения нужно куда-то отсылать. Возникает вопрос: «Куда именно?»

В классической Модели Акторов адресатом сообщения является сам актор-получатель. Т.е. для того, чтобы актор A мог отослать сообщение актору B, у актора A должна быть ссылка на актора B. Нет ссылки на актора-получателя — нет и возможности отослать ему сообщение. Если нужно выполнить рассылку 1:N, то у отправителя должны быть ссылки на всех получателей. Это если говорить про классическую Модель Акторов.

У нас, однако, была другая специфика (как обычно, битиё определяет сознание и мы отталкивались от потребностей задач, которые нам доводилось решать и инструментов, которые были в нашем распоряжении).

Во-первых, у нас C++. Просто так ссылку на агента B агенту A не передашь. Если это обычная ссылка (или обычный голый указатель), то при разрушении агента B у агента A останется «повисшая» ссылка на B. Соответственно, вместо обычных ссылок/указателей нужно использовать умные ссылки/указатели. Но простой умный указатель — это не есть хорошо, т.к. агент B не будет удален (а значит и не будут освобождены ресурсы, которыми он владеет) пока у агента A есть умный указатель на агента B.

Следовательно, нам в C++ пришлось бы использовать не просто умные указатели, а некие специальные умные прокси-ссылки. Агент А может иметь прокси-ссылку на B, но при этом B может быть безопасно удален не смотря на то, что прокси-ссылка у A продолжает оставаться. Причем A может попытаться отослать сообщение уже несуществующему агенту B и эта попытка не должна приводить к катастрофическим последствиям (вроде порчи «чужой» памяти или краха всего приложения, как это бывает при обращении по «повисшим» ссылкам в C++).

Во-вторых, у нас взаимодействие 1:N было весьма распространенным. Более того, по началу это вообще был единственный доступный агентам способ взаимодействия. Поэтому мы очень не хотели, чтобы агенты B и C, которым требовалось получать информацию от агента A, были бы вынуждены сперва присылать ссылки на самих себя агенту A. И чтобы агенту A приходилось самостоятельно вести списки агентов, которые хотят получать сообщения от A в режиме 1:N.

В итоге у нас родилась концепция «почтового ящика», который создавался как раз для того, чтобы a) быть той самой умной прокси-ссылкой, которую агенты могут использовать для общения друг с другом, и b) быть механизмом, упрощающим взаимодействие в режиме 1:N.

При наличии mbox-ов агенты отсылают сообщения не напрямую друг другу, а в почтовые ящики (mbox-ы). Сообщение, отосланное в mbox, доставляется тем, агентам, которые подписались на сообщения из этого mbox-а.

Таким образом, для того, чтобы агент A мог отослать сообщение агенту B, нужно иметь mbox, про который знают оба агента. Агент A отсылает сообщение в этот mbox, а агент B подписывается на сообщения из этого mbox-а. Что и можно увидеть в этом небольшом примере:

#include <so_5/all.hpp>

class A final : public so_5::agent_t {
   const so_5::mbox_t to_;
public:
   A(context_t ctx, so_5::mbox_t to)
      : so_5::agent_t{std::move(ctx)}, to_{std::move(to)} {}

   virtual void so_evt_start() override {
      // Отсылаем сообщение агенту B.
      so_5::send<std::string>(to_, "Hello!");
   }
};

class B final : public so_5::agent_t {
public:
   B(context_t ctx, const so_5::mbox_t & from)
      : so_5::agent_t{std::move(ctx)}
   {
      // Подписываемся на входящие сообщения из ящика from.
      so_subscribe(from).event(&B::on_string);
   }

private:
   void on_string(mhood_t<std::string> cmd) {
      std::cout << "Message: " << *cmd << std::endl;
      
      // Работу можно прекращать.
      so_deregister_agent_coop_normally();
   }
};

int main() {
   so_5::launch([](so_5::environment_t & env) {
      // В примере будет работать два агента, которым нужен
      // общий mbox для взаимодействия.
      env.introduce_coop([&](so_5::coop_t & coop) {
         // Создаем mbox, посредством которого будут общаться
         // агенты A и B.
         const auto mbox = env.create_mbox();
         // Теперь создаем двух агентов, которые будут использовать
         // этот mbox для общения.
         coop.make_agent<A>(mbox);
         coop.make_agent<B>(mbox);
      });
   });

   return 0;
}

При этом что отсылка, что прием сообщений в режиме 1:N, не отличаются от отсылки/приема сообщений в режиме 1:1. Вот как пример выше будет выглядеть для случая, когда агент A отсылает сообщение одновременно агентам B и C:

#include <so_5/all.hpp>

class A final : public so_5::agent_t {
   const so_5::mbox_t to_;
public:
   A(context_t ctx, so_5::mbox_t to)
      : so_5::agent_t{std::move(ctx)}, to_{std::move(to)} {}

   virtual void so_evt_start() override {
      // Отсылаем сообщение агенту B.
      so_5::send<std::string>(to_, "Hello!");
   }
};

class B final : public so_5::agent_t {
public:
   B(context_t ctx, const so_5::mbox_t & from)
      : so_5::agent_t{std::move(ctx)}
   {
      // Подписываемся на входящие сообщения из ящика from.
      so_subscribe(from).event(&B::on_string);
   }

private:
   void on_string(mhood_t<std::string> cmd) {
      std::cout << "(B) Message: " << *cmd << std::endl;
      
      // Работу можно прекращать.
      so_deregister_agent_coop_normally();
   }
};

class C final : public so_5::agent_t {
public:
   C(context_t ctx, const so_5::mbox_t & from)
      : so_5::agent_t{std::move(ctx)}
   {
      // Подписываемся на входящие сообщения из ящика from.
      so_subscribe(from).event([](mhood_t<std::string> cmd) {
         // Просто печатаем содержимое, но работу не прекращаем,
         // за нас это сделает агент B.
         std::cout << "(C) Message: " << *cmd << std::endl;
      });
   }
};

int main() {
   so_5::launch([](so_5::environment_t & env) {
      // В примере будет работать несколько агентов, которым нужен
      // общий mbox для взаимодействия.
      env.introduce_coop([&](so_5::coop_t & coop) {
         // Создаем mbox, посредством которого будут общаться
         // агенты A, B, C.
         const auto mbox = env.create_mbox();
         // Теперь создаем агентов, которые будут использовать
         // этот mbox для общения.
         coop.make_agent<A>(mbox);
         coop.make_agent<B>(mbox);
         coop.make_agent<C>(mbox);
      });
   });

   return 0;
}

Как работают mbox-ы?

Разные mbox-ы работают по-разному :) Поэтому для того, чтобы рассказать, как работают самые широкоиспользуемые типы mbox-ов, сперва нужно рассказать о том, какими вообще mbox-ы бывают.

Какие бывают mbox-ы?

Multi-Producer/Multi-Consumer

Исторически это первый тип mbox-а, который появился в SObjectizer-5. Кто угодно может отослать сообщение в такой mbox. Кто угодно может подписаться на сообщения из этого mbox-а.

Multi-Producer/Single-Consumer

Для случая взаимодействия 1:1 может использоваться MPSC-mbox, в который кто угодно может отослать сообщение, но подписаться на сообщения из MPSC-mbox-а может один-единственный агент, который и владеет MPSC-mbox-ом.

MPSC-mbox-ы появились в SObjectizer-5 спустя некоторое время после начала активного использования SObjectizer-5. Когда опыт показал, что в случаях, когда сообщения адресуются одному конкретному агенту применение MPMC-mbox-а неэффективно. Так что можно сказать, что MPSC-mbox — это скорее способ оптимизации кода, нежели какой-то принципиально другой подход к организации взаимодействия. Кроме того, пользователь не может создавать MPSC-mbox-ы. MPSC-mbox для каждого агента автоматически создается SObjectizer-ом.

Дополнительные mbox-ы из библиотеки so_5_extra

Над SObjectizer-ом построена дополнительная библиотека so_5_extra [2] содержащая компоненты, которые нам показалось неразумным добавлять в ядро SObjectizer-а. В ее состав входят несколько дополнительных типов mbox-ов. Например:

  • round_robin mbox [3], задачей которого является отсылка сообщений получателям по-очереди;
  • retained_msg mbox [4], который хранит экземпляр последнего отосланного сообщения и сразу же отсылает этот экземпляр каждому новому подписчику (тем самым подписчик автоматически получает последнее отосланное в mbox значение).

Еще одним интересным примером использования mbox-ов является компонент shutdowner [5] из so_5_extra, в котором mbox применяется для того, чтобы определить момент, когда можно корректно завершить работу большого SObjectizer-приложения.

Однако, детально mbox-ы из so_5_extra в рамках данной статьи мы рассматривать не будем.

Как работает Multi-Producer/Single-Consumer mbox?

Итак, mbox-ы разные, поэтому и работают по-разному. И рассматривать детали работы мы начнем с самого простого из них — MPSC-mbox.

Если не принимать в расчет такие специфические вещи, как message_limits [6] (это механизм защиты агентов от перегрузки) и msg_tracing [7] (это способ посмотреть за тем, как происходит доставка сообщения до получателя), то MPSC-mbox работает вообще как простейший «полупроводник»: он берет отсылаемое сообщение и отдает его агенту-получателю для того, чтобы получатель разместил сообщение в своей очереди ждущих обработки сообщений.

Ну т.е. здесь все очень тупо: взял сообщение у отправителя и отдал получателю. Ничего больше.

Как работает Multi-Producer/Multi-Consumer mbox?

А вот с MPMC-mbox-ом ситуация несколько сложнее (опять же, мы не берем в расчет такие вещи, как message_limits и msg_tracing). Поскольку получателей сообщений может быть множество, то MPMC-mbox хранит у себя ассоциативный контейнер с подписчиками. Ключем в этом контейнере является идентификатор типа сообщения, а элементом — собственно список подписчиков для сообщений этого типа.

Когда кто-то отсылает сообщение типа M, MPMC-mbox ищет в своем ассоциативном контейнере список подписчиков для сообщений типа M. Если такой список есть, то MPMC-mbox проходит по этому списку и пытается отдать сообщение каждому из подписчиков.

Было специально сказано «пытается отдать», т.к. еще есть такая штука как delivery_filters [8] (т.е. фильтры, которые разрешают или запрещают доставку сообщения подписчику в зависимости от содержимого сообщения). Перед доставкой сообщения агенту-подписчику MPMC-mbox проверяет, есть ли delivery_filter у подписчика. Если есть, то сообщение сперва отдается фильтру. И только если фильтр разрешает доставку сообщения агенту, это сообщение агенту будет отдано.

В общем, MPMC-mbox идет по списку подписчиков на конкретный тип сообщения и, если доставка конкретного экземпляра сообщения подписчику разрешена, сообщение отдается агенту-подписчику для того, чтобы подписчик поставил его свою очередь ожидающих обработки сообщений.

Что общего у штатных MPMC- и MPSC-mbox-ов?

У штатных MPMC- и MPSC-mbox-ов есть одна важная объединяющая их черта: у mbox-ов нет собственного хранилища отосланных в mbox-ы сообщений. Т.е. mbox-ы, по крайней мере, штатные, не хранят сообщения. Вообще.

Поэтому такие вопросы как "Сколько сообщений может хранить mbox пока не переполнится и что будет когда он переполнится?" или "Получит ли агент B сообщение M, если он подпишется на сообщение M уже после того, как сообщение M было отослано?" применительно к штатным MPMC- и MPSC-mbox-ам не имеют смысла. Ибо эти mbox-ы тупо не хранят сообщения у себя внутри: сообщения сразу же передаются тем агентам, которые в сообщениях заинтересованы. Либо игнорируют сообщения, если получателей для этого типа сообщений в данный момент нет.

Да и для других типов mbox-ов внутреннее хранилище для отосланных сообщений — это скорее исключение из правил, чем норма. Дело в том, что работа с mbox-ами строится по push-принципу: отсылаемое сообщение «запихивается» в mbox. И это, пожалуй, единственная возможность для mbox-а кому-то доставить сообщение. Поскольку никто не дергает периодически mbox с целью проверить, а не появилось ли в mbox-е что-нибудь новенькое. Т.е. никто, ну вот вообще никто, не работает с mbox-ом в pull-режиме.

Итак, сухой остаток: в общем случае mbox-ы не хранят сообщения внутри себя.

Усложняем картину мира: агенты не имеют собственных очередей сообщений

Столкнувшись с SObjectizer-ом разработчики довольно быстро начинают понимать, что mbox-ы не хранят сообщений, поэтому нет смысла задавать вопросы о вместимости mbox-ов. Но раз сообщения хранятся не в mbox-ах, а у агентов, то начинаются вопросы о вместимости очереди сообщений агента…

И тут новичков ожидает еще одно откровение и, возможно, разочарование: в SObjectizer у агентов, в общем случае, нет собственных очередей сообщений.

Вот так. Просто нет и все :)

Дело в том, что очередями сообщений для агентов в SObjectizer-е управляют диспетчеры [9]. Именно диспетчер обеспечивает агенту рабочий контекст, на котором агент будет обрабатывать свои сообщения. И, как следствие, именно диспетчер организует хранение ожидающих обработки сообщений.

Например, есть диспетчер типа one_thread (один из самых часто используемых). В нем все агенты, привязанные к этому диспетчеру, работают на одной единственной общей рабочей нити. И все сообщения для всех агентов хранятся в одной общей очереди сообщений. Рабочая нить достает из этой очереди следующее сообщение, отдает агенту-получателю, затем берет следующее и т.д.

Похожим образом действует диспетчер типа active_group, в нем группа агентов может быть привязана к одной общей рабочей нити. И все агенты на этой рабочей нити будут использовать общую очередь сообщений.

Хитрее ситуация с thread_pool и adv_thread_pool диспетчерами. Там для агентов можно задавать параметры FIFO-очередей. Один из них — это какую именно очередь будет использовать агент. Можно сделать так, что у агента будет собственная очередь, в которой будут только сообщения, которые адресованы именно этому агенту. А можно сделать и так, что агенты из одной кооперации будут совместно использовать общую очередь сообщений.

Еще веселее с диспетчерами, которые поддерживают приоритеты агентов. Например, диспетчер prio_one_thread::strictly_ordered. Там у всех агентов с одинаковым приоритетом будет одна общая очередь сообщений. Но для агентов с разными приоритетами очереди сообщений будут разными.

Короче говоря, в сухом остатке: в общем случае mbox-ы отдают сообщения агентам, а агенты передают сообщения диспетчерам, которые уже и сохраняют сообщения в соответствующих очередях. Поэтому, опять же, в общем случае, никаких хранилищ для сообщений нет ни у mbox-ов, ни у агентов.

А насколько сложно сделать собственный mbox?

Если делать «по всем правилам», с поддержкой message_limits, delivery_filters, msg_tracing и других нюансов, то довольно-таки сложно. Интересующиеся могут заглянуть, например, в потроха реализации retained_msg mbox-а из so_5_extra [10], дабы посмотреть как страшно все это может выглядеть :)

Однако, если собственный mbox делается под конкретную задачу, то все может быть далеко не так уж и страшно. Давайте в качестве небольшого примера сделаем MPSC-mbox, который будет предохранять агента от слишком частого поступления сообщений. Ну, скажем, если сообщение M2 прилетает менее чем через 250ms после сообщения M1, то оно выбрасывается. Если же после M1 прошло 250ms или более, то M2 доставляется получателю.

Необходимые пояснения

Итак, попробуем сделать собственный mbox под условным названием anti-jitter-mbox. Это будет MPSC-mbox, который должен быть связан с каким-то конкретным агентом.

Для того, чтобы упростить себе жизнь, мы не будем создавать полную собственную реализацию MPSC-mbox-а. Вместо этого мы будем использовать готовый MPSC-mbox, который уже есть для каждого агента. Мы просто потребуем, чтобы в конструктор нашего anti-jitter-mbox-а передавали MPSC-mbox того агента, которому anti-jitter-mbox должен принадлежать.

Нам нужно определить собственный класс anti_jitter_mbox, который должен быть наследником специального класса so_5::absctract_message_mbox_t [11]. В своем классе нам придется переопределить чистые виртуальные методы, присутствующие в absctract_message_mbox_t. В SObjectizer версий 5.5.* таковыми являются следующие методы:

id(). Он должен возвращать уникальный ID mbox-а. Поскольку доставку сообщения реально будет выполнять MPSC-mbox агента, который нам передадут в конструкторе, то мы будем возвращать ID именно этого MPSC-mbox-а. Т.е. тут мы просто будем делегировать работу актуальному MPSC-mbox-у.

subscribe_event_handler(). Этот метод вызывается, когда агент хочет подписаться на сообщения типа T. Мы в этом методе будем регистрировать тип T. Нам это нужно для того, чтобы когда в mbox придет сообщение некоторого типа M мы могли проверить, подписан ли агент на него. Если подписан, то сообщение можно попытаться доставить (и, соответственно, нужно зафиксировать время последней доставки). А если не подписан — то сообщение нужно проигнорировать.

unsubscribe_event_handlers(). Этот метод, в противоположность subscribe_event_handler(), вызывается когда агент хочет отписаться от сообщений типа Т. Мы в этом методе будем отменять регистрацию типа T.

query_name(). Этот метод должен возвращать строковое имя mbox-а. Данный метод служит для отладочных и диагностических целей. Например, SObjectizer может дергать этот метод при формировании сообщений о возникающих ошибках.

type(). Этот метод должен возвращать тип mbox-а: является ли mbox Multi-Producer/Multi-Consumer или же это Multi-Producer/Single-Consumer. Этот метод вызывается SObjectizer-ом для проверки возможности выполнения тех или иных действий. Например, mutable-сообщения [12] можно отсылать только в MPSC-mbox-ы.

do_deliver_message(). Этот метод отвечает за передачу сообщения агенту-получателю. Мы должны в этом методе проверить, зарегистрирован ли у нас тип отсылаемого сообщения. Если нет — то сообщение игнорируется. Если зарегистрирован и с момента прошлой доставки сообщения прошло достаточно времени, то сообщение должно доставляться получателю (при этом мы фиксируем время доставки). Сама же доставка делегируется актуальному MPSC-mbox-у агента.

do_deliver_service_request(). Этот метод похож на do_deliver_message(), но он вызывается в случае, когда агент A делает синхронный запрос к агенту B (т.е. вместо send_message используется request_future или request_value). Для простоты примера мы не будем поддерживать функциональность синхронных запросов для нашего anti-jitter-mbox-а.

set_delivery_filter() и drop_delivery_filter(). Эти методы используются для установки и снятия фильтров доставки сообщений. Поскольку фильтры доставки для MPSC-mbox-ов не предназначены, то мы не будем поддерживать эту функциональность в своем примере.

Пояснение по поводу константности некоторых методов abstract_message_mbox_t

В реализации примера можно будет увидеть, что методы do_deliver_message() и do_deliver_service_request() объявлены как константные. Но, т.к. в do_deliver_message() нам приходится модифицировать внутреннее состояние нашего anti-jitter-mbox-а, то приходится это самое состояние помечать как mutable в описании класса mbox-а.

Это является следствием древнего архитектурного просчета при формировании интерфейса класса abstract_message_mbox_t. Когда этот класс сформировался много лет назад, мы не думали, что кому-то когда-нибудь потребуется создавать собственные типы mbox-ов.

Когда же года полтора или два назад выяснилось, что это не только нужно, но иногда и очень нужно, мы оказались перед выбором: сломать совместимость внутри ветки SObjectizer-5.5 или же оставить все как есть и поменять интерфейс abstract_message_mbox_t в каком-нибудь будущем мажорном релизе (вроде SObjectizer-5.6). Поскольку у нас есть бзик по поводу сохранения совместимости между релизами внутри одной ветки, то мы в SObjectizer-5.5 решили оставить все как есть. Поэтому сейчас при реализации собственных mbox-ов нужно считаться с константностью ряда методов abstract_message_mbox_t и использовать ключевое слово mutable.

Реализация собственного anti-jitter-mbox-а

Ну а сейчас мы уже можем посмотреть на то, что же из себя будет представлять наш собственный mbox.

Начнем с данных, которыми наш mbox будет оперировать:

using namespace std::chrono;
using clock_type = steady_clock;

class anti_jitter_mbox : public so_5::abstract_message_box_t {
   // Вспомогательный тип для хранения информации о типах сообщений, на которые
   // есть подписки и для которых нужно хранить время поступления последнего
   // сообщения.
   struct data {
      // Элемент данных для одного типа сообщения.
      struct item {
         // Количество подписок на этот тип сообщения. Значение 0 означает,
         // что подписок нет и больше это сообщение можно не контролировать.
         std::size_t subscribers_{0};
         // Время поступления последнего сообщения.
         // Пустое значение указывает, что сообщение еще ни разу не поступало.
         std::optional<clock_type::time_point> last_received_{};
      };

      // Тип ассоциативного контейнера, который потребуется для хранения
      // информации о сообщениях.
      using message_table = std::map<std::type_index, item>;

      // Нам потребуется mutex для защиты содержимого mbox-а в многопоточном
      // окружении.
      std::mutex lock_;
      // Таблица сообщений, на которые есть подписки.
      message_table messages_;
   };

   // Содержимое mbox-а.

   // Актуальный mbox, через который доставка сообщений и будет выполняться.
   const so_5::mbox_t mbox_;
   // Минимальный порог для отсечения "лишних" сообщений.
   const clock_type::duration timeout_;
   // Собственные данные mbox-а. В SObjectizer-5.5 должны быть помечены
   // как mutable, т.к. их придется модифицировать в том числе и в
   // const-методах.
   mutable data data_;

Нам потребуется актуальный mbox, через который пойдет доставка сообщений агенту-получателю, временной порог для отсечения «лишних» сообщений и собственно информация о типах сообщений и временах их последнего получения. Плюс к тому, нам нужен mutex, поскольку методы mbox-а могут вызываться на разных рабочих нитях и нам придется обеспечивать thread-safety для нашего mbox-а.

Кстати говоря, как раз из-за обеспечения thread-safety в большинстве методов нам придется захватывать внутренний mutex нашего mbox-а. Для того, чтобы упростить себе жизнь, сделаем вспомогательный шаблонный метод, который как раз и будет отвечать за захват mutex-а и выполнения нужных нам действий под захваченным mutex-ом:

   template<typename Lambda>
   decltype(auto) lock_and_perform(Lambda l) const noexcept {
      std::lock_guard<std::mutex> lock{data_.lock_};
      return l();
   }

В принципе, его наличие вовсе не обязательно. Но я решил задействовать его еще вот по какой причине: для простоты реализации мы не будем заморачиваться на такую вещь, как exception safety. Если при выполнении каких-то действий у нас возникнет исключение, то нам нужно просто прервать работу всего приложения. Как раз то, что lock_and_perform помечен как noexcept и обеспечит нам такое поведение — если лямбда бросит исключение, то сам C++ный run-time вызовет std::terminate.

Ну и теперь можно посмотреть на собственно всю реализацию mbox-а:

public:
   // Конструктор. Конструктору требуется реальный MPSC-mbox, которому
   // и будет делегироваться актуальная доставка сообщения.
   anti_jitter_mbox(
      so_5::mbox_t actual_mbox,
      clock_type::duration timeout)
      : mbox_{std::move(actual_mbox)}
      , timeout_{timeout}
   {}

   // Уникальный ID mbox-а. Используем для этих целей ID актуального mbox-а.
   so_5::mbox_id_t id() const override { return mbox_->id(); }

   // Обработка регистрации очередного подписчика.
   void subscribe_event_handler(
         const std::type_index & msg_type,
         const so_5::message_limit::control_block_t * limit,
         so_5::agent_t * subscriber ) override {
      lock_and_perform([&]{
         // Достаем информацию о сообщениях этого типа. Если такой информации
         // еще не было, то она будет создана автоматически.
         auto & msg_data = data_.messages_[msg_type];
         msg_data.subscribers_ += 1;

         // Дальнейшую работу делегируем актуальному mbox-у.
         mbox_->subscribe_event_handler(msg_type, limit, subscriber);
      });
   }

   // Обработка дерегистрации подписчика.
   void unsubscribe_event_handlers(
         const std::type_index & msg_type,
         so_5::agent_t * subscriber ) override {
      lock_and_perform([&]{
         // Достаем информацию о сообщениях данного типа.
         // Если таковой не окажется, то ничего делать не нужно.
         auto it = data_.messages_.find(msg_type);
         if(it != data_.messages_.end()) {
            auto & msg_data = it->second;
            --msg_data.subscribers_;
            if(!msg_data.subscribers_)
               // Подписчиков больше не осталось, поэтому информацию о
               // сообщениях этого типа хранить больше не нужно.
               data_.messages_.erase(it);

            // Актуальный mbox так же должен выполнить свою работу.
            mbox_->unsubscribe_event_handlers(msg_type, subscriber);
         }
      });
   }

   // Уникальное имя mbox-а.
   std::string query_name() const override {
      return "<mbox:type=anti-jitter-mpsc:id=" + std::to_string(id()) + ">";
   }

   // Тип нашего mbox-а. Такой же, как и у актуального.
   so_5::mbox_type_t type() const override {
      return mbox_->type();
   }

   // Обработка попытки доставки обычного сообщения.
   void do_deliver_message(
         const std::type_index & msg_type,
         const so_5::message_ref_t & message,
         unsigned int overlimit_reaction_deep ) const override {
      lock_and_perform([&]{
         // Нужно найти информацию об этом типе сообщений.
         // Если тип нам неизвестен, значит подписчиков нет и сообщение
         // доставлять никуда не нужно.
         auto it = data_.messages_.find(msg_type);
         if(it != data_.messages_.end()) {
            auto & msg_data = it->second;
            const auto now = clock_type::now();

            // Проверяем, нужно ли доставлять сообщение.
            // Оно приходит к нам впервые (т.е. значения last_received_
            // еще нет), то доставлять нужно.
            bool should_be_delivered = true;
            if(msg_data.last_received_) {
               should_be_delivered = (now - *(msg_data.last_received_)) >= timeout_;
            }

            // Если все-таки нужно, то доставляем через актуальный mbox и
            // обновляем информацию о времени последней доставки этого
            // сообщения.
            if(should_be_delivered) {
               msg_data.last_received_ = now;
               mbox_->do_deliver_message(msg_type, message, overlimit_reaction_deep);
            }
         }
      });
   }

   // Доставку синхронных запросов запрещаем.
   void do_deliver_service_request(
         const std::type_index & /*msg_type*/,
         const so_5::message_ref_t & /*message*/,
         unsigned int /*overlimit_reaction_deep*/ ) const override {
      // Для того, чтобы выбростить исключение so_5::exception_t и сделать
      // это просто, используем соответствующий макрос из SObjectizer-а.
      SO_5_THROW_EXCEPTION(so_5::rc_not_implemented,
            "anti-jitter-mbox doesn't support service requests");
   }

   // Фильтры доставки для MPSC-mbox-ов не работают. Поэтому сразу
   // порождаем соответствующее исключение.
   void set_delivery_filter(
         const std::type_index & /*msg_type*/,
         const so_5::delivery_filter_t & /*filter*/,
         so_5::agent_t & /*subscriber*/ ) override {
      SO_5_THROW_EXCEPTION(so_5::rc_not_implemented,
            "anti-jitter-mbox doesn't support delivery filters");
   }

   void drop_delivery_filter(
         const std::type_index & /*msg_type*/,
         so_5::agent_t & /*subscriber*/ ) noexcept override {
      SO_5_THROW_EXCEPTION(so_5::rc_not_implemented,
            "anti-jitter-mbox doesn't support delivery filters");
   }
};

Ну а для того, чтобы проверить работу своего mbox-а, мы должны будем создать двух похожих друг на друга тестовых агента. Только первый агент должен быть получать сообщения из своего обычного MPSC-mbox-а:

class ordinary_subscriber final : public so_5::agent_t {
   const std::string name_;
public:
   ordinary_subscriber(context_t ctx,
      // Уникальное имя, которое будет использовать агент.
      std::string name)
      : so_5::agent_t{std::move(ctx)}
      , name_{std::move(name)}
   {
      so_subscribe_self().event([&](mhood_t<std::string> cmd) {
         std::cout << name_ << ": signal received -> " << *cmd << std::endl;
      });
   }

   // Mbox, который должен использоваться для отсылки сообщений.
   auto target_mbox() const { return so_direct_mbox(); }
};

А второй агент для этих же целей будет использовать anti-jitter-mbox:

class anti_jitter_subscriber final : public so_5::agent_t {
   const std::string name_;
   const so_5::mbox_t anti_jitter_mbox_;
public:
   anti_jitter_subscriber(context_t ctx,
      // Уникальное имя, которое будет использовать агент.
      std::string name,
      // Значение порога, которое будет использоваться для
      // отсечения "лишних" сообщений.
      clock_type::duration jitter_threshold)
      : so_5::agent_t{std::move(ctx)}
      , name_{std::move(name)}
      , anti_jitter_mbox_{
         new anti_jitter_mbox{so_direct_mbox(), jitter_threshold}}
   {
      // Подписываться нужно на новый mbox.
      so_subscribe(anti_jitter_mbox_).event([&](mhood_t<std::string> cmd) {
         std::cout << name_ << ": signal received -> " << *cmd << std::endl;
      });
   }

   // Mbox, который должен использоваться для отсылки сообщений.
   auto target_mbox() const { return anti_jitter_mbox_; }
};

Ну и вот так это все будет запускаться для тестового прогона:

// Вспомогательная функция для генерации последовательности отложенных сообщений.
void generate_msg_sequence(
      so_5::environment_t & env,
      const so_5::mbox_t & ordinary_mbox,
      const so_5::mbox_t & anti_jitter_mbox) {

   std::vector<milliseconds> delays{ 125ms, 250ms, 400ms, 500ms, 700ms, 750ms, 800ms };

   for(const auto d : delays) {
      const std::string msg = std::to_string(d.count()) + "ms";
      so_5::send_delayed<std::string>(env, ordinary_mbox, d, msg);
      so_5::send_delayed<std::string>(env, anti_jitter_mbox, d, msg);
   }
}

int main() {
   // Запускаем SObjectizer и выполняем нужные действия.
   so_5::launch([](so_5::environment_t & env) {
      // Нам нужно два mbox-а. Свои актуальные значения эти переменные
      // получат в процессе создания агентов.
      so_5::mbox_t ordinary, anti_jitter;

      // Теперь создадим двух агентов, каждый из которых будет слушать
      // собственный mbox.
      env.introduce_coop([&](so_5::coop_t & coop) {
         ordinary = coop.make_agent<ordinary_subscriber>(
               "ordinary-mbox")->target_mbox();
         anti_jitter = coop.make_agent<anti_jitter_subscriber>(
               "anti-jitter-mbox", 250ms)->target_mbox();
      });

      // Теперь нужно сгенерировать последовательность сообщений.
      generate_msg_sequence(env, ordinary, anti_jitter);
      // И дать достаточно времени для работы примера.
      std::this_thread::sleep_for(1250ms);

      // Теперь пример можно завершить.
      env.stop();
   });

   return 0;
}

В результате запуска примера можно увидеть, что агент с anti-jitter-mbox-ом обрабатывает меньше сообщений, чем агент с обычным mbox-ом:

ordinary-mbox: signal received -> 125ms
anti-jitter-mbox: signal received -> 125ms
ordinary-mbox: signal received -> 250ms
ordinary-mbox: signal received -> 400ms
anti-jitter-mbox: signal received -> 400ms
ordinary-mbox: signal received -> 500ms
ordinary-mbox: signal received -> 700ms
anti-jitter-mbox: signal received -> 700ms
ordinary-mbox: signal received -> 750ms
ordinary-mbox: signal received -> 800ms

Репозиторий с примерами

Полные исходные тексты приведенных в статье примеров можно найти в этом репозитории [13].

Эпилог

В завершении статьи хочется поблагодарить читателей, у которых хватило терпения дочитать до конца. Это было непросто, спасибо, что сделали это :)

Мы так же хотим поделиться небольшой новостью: SObjectizer и so_5_extra обновились [14]. SObjectizer до версии 5.5.20, so_5_extra до версии 1.0.3. Так же SObjectizer уже доступен через систему управления зависимостями vcpkg [15]. Так что установить себе SObjectizer можно посредством vcpkg install sobjectizer.

Также мы планируем в будущем году приступить к работам над следующей мажорной версией SObjectizer: версией 5.6, в которой мы избавимся от старого груза, который тянем по соображениям совместимости, и, местами, нарушим совместимость с версией 5.5. Некоторые предварительные соображения по SObjectizer-5.6 изложены здесь [16]. Было бы здорово услышать мнение тех, кто интересуется SObjectizer-ом о том, чтобы вам хотелось видеть в будущих версиях SObjectizer и какое направление развития SObjectizer было бы интересно именно вам.

Ну и если кто-то хочет узнать еще о каких-то деталях работы SObjectizer, то задавайте вопросы в комментариях. Постараемся ответить. А если ответ будет требовать пространного текста, то может быть и очередная статья получится.

Автор: eao197

Источник [17]


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/programmirovanie/270640

Ссылки в тексте:

[1] первая статья про SObjectizer: https://habrahabr.ru/post/304386/

[2] библиотека so_5_extra: https://sourceforge.net/p/sobjectizer/wiki/so5extra%20Documentation/

[3] round_robin mbox: https://sourceforge.net/p/sobjectizer/wiki/so5extra%201.0%20Round-Robin%20Mbox/

[4] retained_msg mbox: https://sourceforge.net/p/sobjectizer/wiki/so5extra%201.0%20Retained%20Message%20Mbox

[5] компонент shutdowner: https://sourceforge.net/p/sobjectizer/wiki/so5extra%201.0%20Shutdowner/

[6] message_limits: https://sourceforge.net/p/sobjectizer/wiki/so-5.5%20In-depth%20-%20Message%20Limits/

[7] msg_tracing: https://sourceforge.net/p/sobjectizer/wiki/so-5.5%20In-depth%20-%20Message%20Delivery%20Tracing/

[8] delivery_filters: https://sourceforge.net/p/sobjectizer/wiki/so-5.5%20In-depth%20-%20Message%20Delivery%20Filters/

[9] диспетчеры: https://sourceforge.net/p/sobjectizer/wiki/so-5.5%20In-depth%20-%20Dispatchers/

[10] в потроха реализации retained_msg mbox-а из so_5_extra: https://sourceforge.net/p/sobjectizer/repo/HEAD/tree/tags/so_5_extra/1.0.3/dev/so_5_extra/mboxes/retained_msg.hpp#l349

[11] so_5::absctract_message_mbox_t: https://sourceforge.net/p/sobjectizer/repo/HEAD/tree/tags/so_5/5.5.20/dev/so_5/rt/h/mbox.hpp#l643

[12] mutable-сообщения: https://sourceforge.net/p/sobjectizer/wiki/so-5.5%20In-depth%20-%20Mutable%20Messages/

[13] в этом репозитории: https://bitbucket.org/sobjectizerteam/so5_mboxes_demo

[14] SObjectizer и so_5_extra обновились: https://sourceforge.net/p/sobjectizer/news/2017/12/sobjectizer-5520-and-so5extra-103/

[15] vcpkg: https://github.com/Microsoft/vcpkg

[16] здесь: http://eao197.blogspot.com/2017/11/progc-sobjectizer-56.html

[17] Источник: https://habrahabr.ru/post/344580/?utm_campaign=344580