- PVSM.RU - https://www.pvsm.ru -

Kino: communication frawemork на NetMQ. Краткое описание

Лет 8 назад я начал работать в команде, которая разрабатывала один сервис. Интерфейс сервиса был достаточно прост, всего 4 метода, и выполнял он одну единственную задачу. В течение всего этого времени код постоянно изменялся: реализовались новые бизнес-правила и ограничения, добавлялась версионность. В один прекрасный момент, front-end‘у понадобился очень небольшой функционал, который был «зарыт» глубоко в сервисе. Реализация необходимой функции была разработана в виде компоненты и не представляло никаких проблем дать к ней доступ из сервиса через дополнительный метод… Кроме одной: нарушалась логическая связанность методов сервиса, то есть его «внутренности» начали становиться «внешностями».

Проблему можно было бы решить, если преобразовать все эти небольшие внутренние компоненты, к которым потребовался доступ извне, в отдельные сервисы. В таком случае, front-end мог бы получить доступ к их функционалу; основной же сервис стал бы более компактным и его роль сводилась к оркестровке вызовов.

Мы использовали WCF для построения сервисов. Разворачивать сервис в 50 строчек кода на WCF, как минимум на 3-4 серверах, с load-balancer‘ом, новыми URL‘ами и прочими наворотами, казалось не очень хорошей идеей. А хотелось какой-то легкости, перспективы…

Несколько лет спустя я принимал участие в другом проекте на Workflow Foundation. Глядя на то, что получалось в XAML-редакторе, я подумал: «А почему-бы не представить весь workflow, как последовательность сообщений»?

Kinoпробы

Поиск по имеющимся решениям, честно говоря, я не делал. На тот момент (4-5 лет назад) об „Orleans“ было мало что известно, а об Akka я узнал уже после начала велосипедостроения. С одной стороны, это плохо, недостойно профессионального разработчика и все такое. С другой стороны, могло получиться что-то новенькое… Насколько хорошо или плохо все получилось, может судить уважаемый читатель.

Итак, я занялся созданием kino: actor-like communication framework на NetMQ. Суффикс «-like» потому, что классические актеры имеют иерархическую организацию, супервизоров, они stateful и, вообще, целая математическая модель там у них… Тут все проще, но, тем не менее, актеры будут и у нас.

Вкратце, что здесь к чему

Основным средством общения в kino является сообщение. Каждое сообщение имеет версию и тип, которые используются для поиска соответствующего обработчика. Есть небольшое отклонение от правила, но пока не будем об этом.

Актеры (Actors) — основные потребители и производители сообщений. Actor объявляет свой интерфейс, указывая тип и версию сообщения, которые он может получать. Есть еще один участник массовки, MessageHub, который также может получать и отправлять сообщения. Однако, между ними есть определенные различия. Actor нужно рассматривать, как сервис: он может ответить только при получении входящего сообщения. MessageHub – это клиент, который может отправить сообщение и (попытаться) получить ответное сообщения, если необходимо. Итак, чаще всего, начальное сообщение отправляется через MessageHub и обрабатывается одним или несколькими Actors.

Для поиска адресатов сообщений необходим MessageRouter. Он хранит таблицу маршрутизации — соответствие версии (Version) и типа сообщения (Identity) со списком Actors, которые его могут обработать. Для одного процесса достаточно одного MessageRouter‘а.

Для выхода за рамки одного процесса/хоста нам необходимо получить знание о внешнем мире, то есть о других MessageRouter‘ах и их таблицах маршрутизации. Источником для получения такого знания является Rendezvous сервис. Это – единственный well-known адрес, который должен быть сконфигурирован для приложения на базе kino. Rendezvous принимает от всех и раздает всем подключенным MessageRouter‘ам информацию о добавлении новых и удалении несуществующих маршрутов, ping‘ует установленные подключения. Rendezvous сервис формирует единую сеть компонентов kino.

Тоже, но более детально

1. Message

Так выглядит типичное сообщение, которое можно отправить гулять по сети kino:

public class MyMessage : Payload
{
    private static readonly byte[] MessageIdentity = "NAMESPACE.MYMESSAGE".GetBytes();
    private static readonly byte[] MessageVersion = "1.0".GetBytes();

    // Здесь идут свойства сообщения, т.е. то, что мы в итоге хотим передать

    public override byte[] Version => MessageVersion;
    public override byte[] Identity => MessageIdentity;
}

Поддерживается 3 способа распределения (Distribution Pattern) сообщений: unicast, broadcast и direct. В первом случае сообщение отправляется только одному обработчику, зарегистрированному в сети. Во втором – всем.

IPayload payload = new MyMessage();
IMessage message = Message.Create(payload, DistributionPattern.Broadcast);

В случае direct distribution, который может быть особо полезен при тестировании, сообщение отправляется конкретному MessageRouter‘у:

IMessage message = Message.CreateFlowStartMessage(new MyMessage());
message.SetReceiverNode(receiverIdentity);
// Теперь можно отправлять сообщение

Добраться до данных в полученном сообщении можно следующим образом:

MyMessage payload = message.GetPayload<MyMessage>();

2. Actors

Для создания своего актера необходимо унаследовать класс от Actor и реализовать в нем хотя бы один метод-обработчик сообщения:

public class MyMessageProcessor : Actor
{
    [MessageHandlerDefinition(typeof (MyMessage))]
    public async Task<IActorResult> MyMessageHandler(IMessage message)
    {
        // тело метода
    }

    [MessageHandlerDefinition(typeof (OtherMessage))]
    public Task<IActorResult> OtherMessageHandler(IMessage message)
    {
        // тело метода
    }
}
```cs

Все актеры по умолчанию регистрируются глобально, то есть доступны во всей сети *kino*. Если вы хотите обрабатывать сообщения только в локальном процессе, можно объявить регистрацию обработчиков локальной:

```cs
public class LocalMessageProcessor : Actor
{
    // Данная регистрация доступна только локально
    [MessageHandlerDefinition(typeof (LocalMessage), true)]
    public async Task<IActorResult> MyMessageHandler(IMessage message)
    {
        // тело метода
    }
}

Фреймворк гарантирует, что метод-обработчик актера получит сообщения только того типа, который задекларирован. Возвращаемый же результат может представлять из себя одно или несколько сообщений любых типов, причем, с разными distribution pattern. То есть, мы можем послать ответ начальному отправителю и одновременно сообщить всем остальным еще о чем-нибудь:

public class MyMessageProcessor : Actor
{
    [MessageHandlerDefinition(typeof (MyMessage))]
    public async Task<IActorResult> MyMessageHandler(IMessage message)
    {
        MyMessage payload = message.GetPayload<MyMessage>();

        var result = await UpdateDb(payload);

        IMessage response = Message.Create(new ResponseMessage(result));
        IMessage notifyRequest = Message.Create(new NotifyMessage(result), DistributionPattern.Broadcast);

        return new ActionResult(response, notifyRequest);
    }    
}

3. ActorHost

Об ActorHost мы еще не говорили. Это компонент, который выполняет несколько функций:

  • хранит ссылки на методы-обработчики всех зарегистрированных у него Actors
  • регистрирует обработчики в MessageRouter
  • принимает сообщения на обработку от MessageRouter к Actors и отправляет ответы обратно в MessageRouter.

Вызов методов-обработчиков в ActorHost происходит в одном потоке (исключение составляют асинхронные методы). Поэтому, ActorHost не поддерживает множественные регистрации обработчиков одного и того же сообщения. Если же необходимо масштабирование одного и того же типа Actor в рамках одного процесса, требуется создание нового экземпляра ActorHost для каждого из них. Все эти сложности по выбору и созданию ActorHosts берет на себя ActorHostManager:

// Создаем первый экземпляр актера MyActor
IActor actor = new MyActor();
// По умолчанию, выбирается первый доступный ActorHost
actorHostManager.AssignActor(actor);
// Нам показалось, что одного мало
actor = new MyActor();
// Указываем, что нам обязательно нужен второй экземпляр MyActor
actorHostManager.AssignActor(actor, ActorHostInstancePolicy.AlwaysCreateNew);

4. MessageHub

Вернемся немного назад, с чего все началось. А началось с того, что появилась необходимость разнести код из одного WCF-сервиса в несколько доступных по сети компонент. В результате, вместо вызовов сотни методов в одном процессе у нас получился некий поток сообщений (message flow), которые, вдобавок, путешествуют по разным серверам. Тем не менее, и функционал и поведение для конечного пользователя сервиса должны, в идеале, оставаться прежними. То есть, если раньше клиент вызывал метод сервиса синхронно и ожидал получение ответа, то со всем этим kino паттерн работы клиента не должен поменяться кардинальным образом. Необходимо из всего этого потока сообщений определить, что является ответом клиенту и доставить его обратно.

MessageHub как раз призван решить эту задачу. С его помощью можно отправить сообщение в сеть kino, не дожидаясь ответа:

IPayload payload = message.GetPayload<MyMessage>();
IMessage message = Message.CreateFlowStartMessage(payload);
messageHub.SendOneWay(message);

А можно так же указать, что отправитель ожидает определенный ответ:

// Создаем сообщение, представляющее запрос
IMessage request = Message.CreateFlowStartMessage(new StartMessage());
// Говорим, что мы заинтересованы в получении ответа определенного типа
ICallbackPoint callbackPoint = CallbackPoint.Create<ResultMessage>();

// Теперь отправляем сообщение и ожидаем результат
using(IPromise promise = messageHub.EnqueueRequest(request, callbackPoint))
{
    if(promise.GetResponse().Wait(timeout))
    {
    // Обрабатываем полученный результат
        ResultMessage result = promise.GetResponse().Result.GetPayload<ResultMessage>();       
    }
    else
    {
    // Попробуем снова…
    }
}

5. MessageRouter

MessageRouter представляет собой узел в сети kino. К нему подключаются другие компоненты, ActorHosts и MessageHubs, для обмена сообщениями. В свою очередь, MessageRouters находят подобных себе и подключаются друг к другу с помощью Rendezvous сервиса, формируя таким образом сеть kino.

В качестве транспорта в kino используется библиотека NetMQ [1]. Она, практически, вбита во фреймворк гвоздями и использовать другой транспорт не планировалось.

Итак, маршрутизация сообщений. Она осуществляется по следующим алгоритмам:

Unicast-сообщение:

НАЙТИ локально зарегистрированный ActorHost или MessageHub, которые могут обработать сообщение
ЕСЛИ найден
    Отправить сообщение на обработку
ИНАЧЕ
    НАЙТИ MessageRouter в сети, который может обработать сообщение
    ЕСЛИ найден
        Отправить сообщение на обработку
    ИНАЧЕ
        Сообщение не обработано!

Broadcast-сообщение:

НАЙТИ все локально зарегистрированные ActorHosts и MessageHubs, которые могут обработать сообщение
ЕСЛИ найдены
    Отправить сообщение на обработку
ЕСЛИ broadcast-сообщение отправлено из локально зарегистрированного Actor
    НАЙТИ все MessageRouter в сети, которые могут обработать сообщение
    ЕСЛИ найдены
        Отправить сообщение на обработку
ЕСЛИ не найдено ни одного обработчика локально или удаленно
    Сообщение не обработано!

Direct-сообщение:

ЕСЛИ unicast-сообщение
    НАЙТИ MessageRouter в сети, указанный в сообщении, как получатель (ReceiverNode), который может обработать сообщение
    ЕСЛИ найден
        Отправить сообщение на обработку
    ИНАЧЕ
        Сообщение не обработано!
ИНАЧЕ
    <маршрутизация broadcast-сообщения>

6. Rendezvous

Rendezvous сервис – единственный well-known сервис, адрес которого должен быть сконфигурирован для всех узлов одной сети kino. Он выполняет следующие функции:

  • broadcast-рассылка сообщений об изменениях в маршрутизации: добавление новых и удаление недействительных маршрутов,
  • broadcast-рассылка PING сообщений для мониторинга подключенных узлов,
  • broadcast-рассылка ответных PONG сообщений от подключенных узлов.

При необходимости, Rendezvous сервис можно установить на кластер серверов. Выбранный на основании консенсуса лидер отвечает за все вышеперечисленные функции. В случае «падения» кластера, сеть kino продолжит работу. Однако, информация об изменениях в маршрутизации будет недоступна. Когда работа Rendezvous сервиса будет восстановлена, узлы получат обновление конфигурации сети.

Открытые вопросы

  • Ну, собственно говоря, увидеть что-то в продакшине. Пока до этого не дошло…
  • Как работать с сообщениями разного wire-формата в одной сети
  • Возможные проблемы при большом количестве подключений к Rendezvous сервису, пакетная обработка PONG-сообщений
  • Объединение нескольких сетей kino, то есть маршрутизация между узлами, подключенными к разным Rendezvous серверам/кластерам

Проект kino на Github'е: https://github.com/iiwaasnet/kino [2]
Wiki: https://github.com/iiwaasnet/kino/wiki [3]

Автор: aosja

Источник [4]


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/programmirovanie/121386

Ссылки в тексте:

[1] NetMQ: https://github.com/zeromq/netmq

[2] https://github.com/iiwaasnet/kino: https://github.com/iiwaasnet/kino

[3] https://github.com/iiwaasnet/kino/wiki: https://github.com/iiwaasnet/kino/wiki

[4] Источник: https://habrahabr.ru/post/301454/?utm_source=habrahabr&utm_medium=rss&utm_campaign=sandbox