Событийная модель на основе async и await

в 10:04, , рубрики: .net, async, C#, параллельное программирование

В далеком 2012, когда цена на нефть еще была трехзначной, а трава зеленее, майкрософтом был выпущен .NET 4.5, а с ним и конструкция async/await. Про неё написано уже довольно много статей (Async в C#), а большинство разработчиков C# хорошо её изучили. Но все ли варианты использования были рассмотрены, можно ли выжать из await немного больше?

Самым очевидным вариантом использованием этой конструкции является ожидание завершения некой асинхронной операции. Первое, что приходит на ум — это ожидание ввода-вывода. Например, мы послали запрос клиенту и ожидаем ответа, тогда используя await мы сможем продолжить выполнение кода после получения ответа, а сам код при этом будет выглядеть синхронным. Но что если во время ожидания возникнет необходимость прервать выполнение этой операции? Тогда нам придется использовать CancellationToken, причем если таких операций несколько, то токены необходимо будет линковать или использовать один общий токен. При этом причина отмены будет скрыта от кода, использующего этот CancellationToken. Кроме отмены, код должен поддерживать обработку потери соединения, таймаута, возвращаемых ошибок и т.д.

В классическом варианте это выльется в использование CancellationToken для обработки отмены, try catch для обработки разрыва соединения и код анализа возвращенных данных, для оценки результата запроса. Но можно ли уместить всё это в единой парадигме? В этот статье я предлагаю рассмотреть альтернативный подход, основанный на событийной модели с использованием синтаксического сахара async/await.

Библиотека Eventing.

Всё необходимое для событийной модели на async/await было оформлено в виде библиотеки Eventing и выложено на GitHub под лицензией MIT.

Библиотека протестирована и успешно используется на боевой системе более двух лет.

Использование

Описанный в начале пример с использованием Eventing будет выглядеть так:

var @event = await this.EventManager.WaitFor<MessageReceived, CancelRequested>(TimeSpan.FromSeconds(50));

if (@event == null)
    Log.Info("timeout");
else if (@event is CancelRequested)
    Log.Info("Cancelled, reason: {0}", ((CancelRequested) @event).Reason);
else
    Log.Info("Message received");

Здесь мы используем EventManager — менеджер событий реализующий интерфейс IEventManager, для ожидания событий MessageReceived и CancelRequested с таймаутом в 50 секунд. С помощью вызова WaitFor мы формируем подписку на указанные события, а вызов await блокирует дальнейшее исполнение кода(но не потока). Оно останется заблокированным до тех пор, пока не произойдет одно из указанных событий или истечет время таймаута, после чего выполнение продолжится в текущем контексте синхронизации. Но что если связь с клиентом будет потеряна во время формирования подписки? В этом случае код зависнет на 50 секунд, так как событие отключения клиента будет упущено. Исправим это:

// Создание подписки
var eventAwait = this.EventManager.WaitFor<MessageReceived, ClientDisconnected, CancelRequested>(TimeSpan.FromSeconds(50), 
            e => !(e is ClientDisconnected) || ((ClientDisconnected)e).id == client.Id); // Фильтр события

if (!client.Connected || cancelRequested) {
    // Случай отключения клиента или запроса на отмену во время создания подписки
    Log.Info("Client disconnected or cancel requested");
    return;
}

 //  Прерывание кода до наступления события
 var @event = await eventAwait;
 ...

Здесь мы добавили событие ClientDisconnected и разделили создание awaitable переменной eventAwait и непосредственно ожидание события. Если бы мы не разделили их, то клиент мог бы отключиться после проверки client.Connected и ожиданием события, что привело бы к потери события. Также был добавлен фильтр событий, который исключает события ClientDisconnected не относящиеся к текущему клиенту.

Как создать событие?

Для этого надо создать класс, имплементирующий IEvent:

class CancelRequested : IEvent {
    public string Reason { get; set; }
}

А затем вызвать IEventManager.RaiseEvent, например:

this.EventManager.RaiseEvent(new CancelRequested()). 

Наследование от IEvent отделяет события от остальных классов и предотвращает использование неподходящих экземпляров в методе RaiseEvent. Также поддерживается наследование:

class UserCancelRequested : CancelRequested {
}

class SystemCancelRequested : CancelRequested {
}

var @event = await this.EventManager.WaitFor<CancelRequested>();
if (@event is UserCancelRequested)
    ...

Если у вас сложная система в которой множество одновременно ожидаемых событий, использование события CancelRequested вместо токенов отмены, позволит избежать прокидывания и линкование глобального и локального CancellationToken. Это важно, так как сложное линкование повышает вероятность пропустить утечку памяти из-за удержания токенов.

Как подписаться на событие?

Некоторые события носят периодический характер, такие события можно получать методом IEventManager.StartReceiving:

void StartReceiving<T>(Action<T> handler, object listener, Func<T, bool> filter = null, SynchronizationContext context = null) 
                       where T : IEvent;

Обработчик handler будет вызван в контексте синхронизации context при каждом событии T, которое удовлетворяет фильтру filter, если он задан. Если контекст синхронизации не задан, то будет использован SynchronizationContext.Current.

Как это работает?

Используется всё тот-же механизм тасков, на котором основан async/await. При вызове WaitFor менеджер событий создает таск используя TaskCompletionSource и формирует подписку по выбранным типам событий в шине сообщений.

// EventManager.cs, создание подписки
var taskCompletionSource = new TaskCompletionSource<IEvent>();

var subscription = new MessageSubscription(
            subscriptionId,
            message => {
                var @event = message as IEvent;
                if (filter != null && !filter(@event))
                    return;

                // Устанавливаем результат исполнения задачи
                if (taskCompletionSource.TrySetResult(@event))
                    this.trace.TraceEvent(TraceEventType.Information, 0, "Wait ended: '{0}' - '{1}'",
                        subscriptionId, message.GetType());
            },
            this, UnsubscribePolicy.Auto, this.defaultContext, eventTypes);
            
this.messageBus.Subscribe(subscription);
...
return taskCompletionSource.Task;

При генерации события вызывается метод RaiseEvent, который передает событие в шину, а она в соответствии с типом события выбирает подписки, в которых eventTypes включает в себя этот тип. Далее вызывается обработчик подписки и если он удовлетворяет фильтру, то устанавливается результат исполнения задачи и разблокирует вызов await.

// EventManager.cs, генерация события
public void RaiseEvent(IEvent @event) {
    this.trace.TraceEvent(TraceEventType.Information, 0, "Event: {0}", @event);

    this.messageBus.Send(@event);
}

// MessageBus.cs, отправка сообщения
public void Send(object message) {
var messageType = message.GetType();
IMessageSubscription[] subscriptionsForMessage;

lock (this.subscriptions) {
    subscriptionsForMessage = this.subscriptions
        .Where(s => s.MessagesTypes.Any(type => messageType == type || type.IsAssignableFrom(messageType)))
        .ToArray();
}

...

foreach (var subscription in subscriptionsForMessage)
    subscription.ProccessMessage(message);

this.UnsubscribeAutoSubscriptions(subscriptionsForMessage);
...

// MessageSubscription.cs
public void ProccessMessage(object message) {
    var messageHandler = this.handler;
    this.SynchronizationContext.Post(o => messageHandler(message), null);
}

В MessageSubscription.ProccessMessage сообщение передается в заданный пользователем контекст синхронизации, что позволяет избежать задержок при отправке сообщения.

Избавь мой класс от многопоточности!

Каждый, кто работал с async/await знает, что после завершения await код продолжает свое исполнения не в текущем потоке, а в текущем контексте синхронизации. Это может быть проблемой, если вы подпишетесь на событие с помощью StartReceiving, а затем вызовите WaitFor, что приведет к тому, что код класса будет исполняться одновременно в разных потоках(обработчик событий из StartReceiving и код после await // как страшно жить!). Это легко исправить однопоточным контектстом синхронизации, входящим в библиотеку:

this.serverSynchronizationContext = new SingleThreadSynchronizationContext("Server thread");
this.clientSynchronizationContext = new SingleThreadSynchronizationContext("Client thread");

this.serverSynchronizationContext.Post(async o => await this.RunServer(), null);
this.clientSynchronizationContext.Post(async o => await this.RunClient(), null);

Таким образом у нас клиент всегда будет выполняться в потоке «Client thread», а сервер в «Server thread». Вы сможете писать многопоточный код не задумываясь о race condition. В качестве бонуса это позволит максимально утилизировать отдельно взятый поток.

В чем преимущество?

Главным преимуществом является простота и тестируемость кода. Если насчет первого можно спорить, простоту каждый понимает по своему, то со вторым пунктом всё очевидно. Многопоточное приложение можно протестировать в одном потоке, эмулируя любую последовательность событий, причем для этого не требуется создавать mock объекты, любое взаимодействие можно свести к событиям, а их проверку к вызову RaiseEvent. Пример NUnit:

/// <summary>
///     This test demonstrates how to test application that uses Eventing
///     All code executes sequently in one thread
/// </summary>
[TestFixture]
public class TestCase : TestBase {
    [Test]
    public async Task ClientDoesWork() {
        var client = new Client(this.EventManager);
        var doWorkAwaitable = client.DoWork();

        this.EventManager.RaiseEvent(new Connected());

        // We can debug and step into 
        this.React();

        await doWorkAwaitable;

        Assert.AreEqual(true, client.Connected);
    }
}

Как это можно использовать?

Чтобы не переполнять статью листингами, приведу лишь краткое текстовое описание одной из системы, где используется Eventing. Это горизонтально масштабируемая распределенная система, состоящая из четырех типов узлов, один из которых является мастером. Мастер непрерывно общается со всеми узлами и управляет выполнением на них различных операций. Каждую операцию можно представить в виде конечного автомата, где переход это наступление события(в том числе таймаут или отмена). Хотя для каждой операции и можно было автомат реализовать в его классическом виде(что мы изначально и сделали), намного проще оказалось представить его используя Eventing, где текущее состояние определялось точкой выполнения кода, а не отдельной переменной. При это на каждом шаге были явно перечислены все ожидаемые события, что упрощало тестирование белого ящика.

Заключение

В статье рассмотрены ключевые возможности и варианты использования библиотеки Eventing. Библиотека не претендует на универсальность и поддержку высоконагруженных систем, но призывает немного по другому взглянуть на привычные вещи, позволяет писать безопасный и легко тестируемый с точки зрения многопоточности код.

Автор: Therg

Источник

Поделиться новостью

* - обязательные к заполнению поля