Реализация Kotlin Flow на C#

в 20:32, , рубрики: .net, async/await, C#, flow, kotlin, Reactive Streams, rx, Программирование

image

Всем привет!
Последние годы я занимаюсь разработкой под Андроид на Котлине. Не так давно, за неимением RxJava на Kotlin multiplatform, мы начали использовать корутины и flow – холодные стримы для Котлина из коробки. До Андроида я много лет провёл с C#, и там свои корутины есть уже очень давно, только их там так называть не принято. Но вот про аналог flow на async/await я не слышал. Основной инструмент для реактивного программирования – Rx.Net (собственно, здесь rx и родился). Вот я и решил поностальгировать и попробовать напилить велосипед.

Далее подразумевается, что читатель имеет представление о штуках, про которые говорилось в предыдущем абзаце. Для нетерпеливых — сразу ссылка на репозиторий:
https://github.com/ILAgent/flowsharp

Дисклеймер: данный код не претендует на использование в продакшене. Это — концепт, не более того. Что-то может работать не совсем так, как задумывалось.

IFlow и IFlowCollector

Что ж, начнём с того, что перепишем в лоб интерфейсы Flow и FlowCollector на C#.
Было:

interface Flow<out T> {
    suspend fun collect(collector: FlowCollector<T>)
}
interface FlowCollector<in T> {
    suspend fun emit(value: T)
}

Стало:

    public interface IFlow<out T>
    {
        Task Collect(IFlowCollector<T> collector);
    }
    public interface IFlowCollector<in T>
    {
        Task Emit(T item);
    }

Полагаю, отличия понятны и объясняются разной реализацией асинхронности.
Чтобы воспользоваться этими интерфейсами, их надо реализовать. Вот что получилось:

    internal class Flow<T> : IFlow<T>
    {
        private readonly Func<IFlowCollector<T>, Task> _emitter;

        public Flow(Func<IFlowCollector<T>, Task> emitter)
        {
            _emitter = emitter;
        }

        public Task Collect(IFlowCollector<T> collector)
        {
            return _emitter(collector);
        }

    }

    internal class FlowCollector<T> : IFlowCollector<T>
    {
        private readonly Func<T, Task> _handler;

        public FlowCollector(Func<T, Task> handler)
        {
            _handler = handler;
        }

        public Task Emit(T item)
        {
            return _handler(item);
        }

    }

В конструктор flow передаём функцию, которая будет эмитить значения. А в конструктор коллектора – функцию, которая будет обрабатывать каждое эмитированное значение. Использовать это можно так

var flow = new Flow<int>(async collector =>
            {
                await collector.Emit(1);
                await Task.Delay(1000);
                await collector.Emit(2);
                await Task.Delay(1000);
                await collector.Emit(3);
            });
            var collector = new FlowCollector<int>(async item => Console.WriteLine(item));
            await flow.Collect(collector);

Думаю, в коде выше всё понятно. Сначала мы создаём Flow, затем создаём коллектор (обработчик каждого элемента). Затем запускаем Flow, «подписав» на него коллектор. Если добавить немного сахара (см. гитхаб), то получим что-то вроде этого:

await Flow<int>(async collector =>
            {
                await collector.Emit(1);
                await Task.Delay(1000);
                await collector.Emit(2);
                await Task.Delay(1000);
                await collector.Emit(3);
            })
            .Collect(Console.WriteLine);

На Котлине это выглядит вот так:

scope.launch{
   flow{
        emit(1)
    delay(1000)
    …
   }.collect{ printl(it) }
}

Лично мне в варианте на Шарпе больше всего не нравится необходимость явно указывать тип элемента при создании флоу. Но дело тут не в том, что вывод типов в Котлине сильно круче. Функция flow выглядит так:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

Как мы видим, параметр block помечен аннотацией BuilderInference, которая и подсказывает компилятору, что тип надо взять из этого параметра. Кто-нибудь знает, можно ли напилить подобное для C# на Roslyn?

CancellationToken

В rx есть подписка, от которой можно отписаться. В Kotlin Flow за отмену отвечает Job, которую возвращает билдер, либо Coroutine Scope. Нам тоже определённо необходим инструмент, позволяющий Flow завершиться досрочно. В C# для отмены асинхронных операций используется, не побоюсь этого слова, паттерн Cancellation Token. CancellationToken – это класс, объект которого предоставляет асинхронной операции информацию о том, что она отменена. Он прокидывается в асинхронную операцию при старте, и эта операция сама смотрит за его состоянием. А меняется состояние извне.
Короче, нам надо прокинуть CancellationToken в наши Flow и FlowCollector.

   public interface IFlow<out T>
    {
        Task Collect(IFlowCollector<T> collector, CancellationToken cancellationToken = default);
    }

    public interface IFlowCollector<in T>
    {
        Task Emit(T item, CancellationToken cancellationToken = default);
    }

Реализацию пастить сюда не буду – см. гитхаб.
Тест теперь будет выглядеть вот так:

            var cts = new CancellationTokenSource();

            var flowTask = Flow<int>(async (collector, cancellationToken) =>
            {
                await collector.Emit(1);
                await Task.Delay(2000, cancellationToken);
                await collector.Emit(2);
                await Task.Delay(2000, cancellationToken);
                await collector.Emit(3);
            })
            .Collect(item => Log(item), cts.Token);

            var cancelationTask = Task.Run(async () =>
            {
                await Task.Delay(3000);
                cts.Cancel();
            });

            await flowTask;

Суть такова. Параллельно Flow запускаем операцию, которая через 3 секунды его отменит. В результате Flow не успевает эмитировать третий элемент и завершается с TaskCanceledException, что и является требуемым поведением.

Немного практики

Давайте попробуем использовать то, что получилось, на практике. Например, обернём какой-нибудь event в наш Flow. В Rx.Net для этого даже существует библиотечный метод FromEventPattern.
Чтобы не связываться с реальным UI, я написал класс ClicksEmulator, который генерирует условные нажатия на кнопку мыши через случайные интервалы времени.

    public class ClicksEmulator
    {
        public enum Button { Left, Right }

        public class ClickEventArgs : EventArgs
        {
//…
            public int X { get; }
            public int Y { get; }
            public Button Button { get; }
        }

        public event EventHandler<ClickEventArgs> ButtonClick;

        public async Task Start(CancellationToken cancellationToken = default)         {…  }

    }

Я опустил реализацию, т.к. она здесь не очень важна. Главное – это event ButtonClick, который мы хотим превратить во Flow. Для это напишем метод-расширение

public static IFlow<ClicksEmulator.ClickEventArgs> Clicks(this ClicksEmulator emulator)
        {
            return FlowFactory.Flow<ClicksEmulator.ClickEventArgs>(async (collector, cancellationToken) =>
            {
                void clickHandler(object sender, ClicksEmulator.ClickEventArgs args) => collector.Emit(args);

                emulator.ButtonClick += clickHandler;
                cancellationToken.Register(() =>
                {
                    emulator.ButtonClick -= clickHandler;
                });

                await Task.Delay(-1, cancellationToken);
            });
        }

Сначала мы объявляем обработчик события, который ничего не делает, кроме передачи аргумента события в коллектор. Затем подписываемся на события и регистрируем отписку в случае отмены (завершения) flow. Ну и далее бесконечно ждём и слушаем события ButtonClick, пока cancellationToken не выстрелит.
Если вы использовали callbackFlow или channelFlow в Котлине или создавали холодные Observable из listener’ов в Rx, то вы отметите, что структура кода во всех случаях очень схожа. Это прекрасно, но возникает вопрос – чем Flow в данном случае лучше, чем сырой event? Вся сила реактивных стримов – в операторах, которые выполняют разные преобразования над ними: фильтрацию, маппинг и многие другие, более сложные. Но у нас их пока нет. Давайте попробуем что-нибудь с этим сделать.

Filter, Map, OnNext

Начнем с одного из самых простых операторов — Filter. Он, как это очевидно из названия, будет фильтровать элементы flow в соответствии с заданным предикатом. Это будет extension-метод, применяемый к оригинальному flow и возвращающий flow только с отфильтрованными элементами. Получается, нам надо брать каждый элемент из оригинального flow, проверять, и эмитить дальше, если предикат возвращает true. Так и сделаем:

  public static IFlow<T> Filter<T>(this IFlow<T> source, Func<T, bool> predicate) =>
            FlowFactory.Flow<T>((collector, cancellationToken) =>
                source.Collect(item =>
                {
                    if (predicate(item))
                        collector.Emit(item);
                }, cancellationToken)
            );

Теперь, если нам нужны нажатия только на левую кнопку мыши, можно написать так:

emulator
                .Clicks()
                .Filter(click => click.Button == ClicksEmulator.Button.Left)
                .Collect(item => Log($"{item.Button} {item.X} {item.Y}"), cts.Token);

По аналогии напишем операторы Map и OnNext. Первый преобразует каждый элемент исходного flow в другой с помощью переданной функции-маппера. Второй будет возвращать flow с теми же элементами, что и оригинальный, но выполняя на каждом какое-то действие Action (обычно логирование).

        public static IFlow<R> Map<T, R>(this IFlow<T> source, Func<T, R> mapper) =>
           FlowFactory.Flow<R>((collector, cancellationToken) =>
               source.Collect(
                        item => collector.Emit(mapper(item)),
                        cancellationToken
                   )
           );

        public static IFlow<T> OnNext<T>(this IFlow<T> source, Action<T> action) =>
           FlowFactory.Flow<T>((collector, cancellationToken) =>
               source.Collect(item =>
               {
                   action(item);
                   collector.Emit(item);
               }, cancellationToken)
           );

И пример использования:

emulator
                .Clicks()
                .OnNext(click => Log($"{click.Button} {click.X} {click.Y}"))
                .Map(click => click.Button == ClicksEmulator.Button.Left ? 0 : 1)                
                .Collect(item => Log($"{item}"), cts.Token);

Вообще для реактивных стримов придумано очень много операторов, их можно найти, например, здесь: http://reactivex.io/documentation/operators.html
И ничего не мешает реализовать любые из них для IFlow.
Те, кто знаком с Rx.Net, знают, что там, помимо новых и специфичных операторов для IObservable, используются методы-расширения из Linq-to-objects, и это позволяет рассматривать стримы как “коллекции событий” и манипулировать ими с помощью привычных Linq-методов. Почему бы вместо того, чтобы писать операторы самим, не попробовать поставить IFlow на рельсы Linq?

IAsyncEnumerable

В C# 8 завезли асинхронную версию IEnumerable — IAsyncEnumerable — интерфейс коллекции, по которой можно итерироваться асинхронно. Принципиальная разница между IAsyncEnumerable и реактивными стримами (IObservable и IFlow ) вот в чём. IAsyncEnumerable, как и IEnumerable — это pull-модель. Мы итерируемся по коллекции сколько и когда нам надо и сами тянем из неё элементы. Стримы — это push. Мы подписываемся на события и “реагируем” на них, когда они приходят — на то они и реактивные. Однако от pull-модели можно добиться push-like поведения. Это называется long polling https://en.wikipedia.org/wiki/Push_technology#Long_polling. Суть такая: мы, итерируясь по коллекции, запрашиваем очередной её элемент и ждём сколь угодно долго, пока коллекция нам его не вернёт, т.е. пока очередное событие не наступит. IAsyncEnumerable, в отличие от IEnumerable, позволит нам ждать асинхронно. Короче, нам надо как-то натянуть IAsyncEnumerable на IFlow.
Как известно, за возврат текущего элемента коллекции IAsyncEnumerable и переход к следующему элементу отвечает интерфейс IAsyncEnumerator. При этом нам надо брать элементы из IFlow, а этим занимается IFlowCollector. Получается вот такой объект, реализующий эти интерфейсы:

internal class FlowCollectorEnumerator<T> : IFlowCollector<T>, IAsyncEnumerator<T>
    {
        private readonly SemaphoreSlim _backpressureSemaphore = new SemaphoreSlim(0, 1);
        private readonly SemaphoreSlim _longPollingSemaphore = new SemaphoreSlim(0, 1);

        private bool _isFinished;

        public T Current { get; private set; }

        public async ValueTask DisposeAsync() { }

        public async Task Emit(T item, CancellationToken cancellationToken)
        {
            await _backpressureSemaphore.WaitAsync(cancellationToken);
            Current = item;
            _longPollingSemaphore.Release();
        }

        public async Task Finish()
        {
            await _backpressureSemaphore.WaitAsync();
            _isFinished = true;
            _longPollingSemaphore.Release();
        }

        public async ValueTask<bool> MoveNextAsync()
        {
            _backpressureSemaphore.Release();
            await _longPollingSemaphore.WaitAsync();
            return !_isFinished;
        }
    }

Основное здесь методы — Emit, Finish и MoveNextAsync.
Emit в начале ждёт момента, когда очередной элемент из коллекции будет запрошен. Т.е. не эмитит элемент, пока он не потребуется. Это называется backpressure, отсюда и имя семофора. Затем выставляется текущий item и сообщается, что long polling запрос может получить результат.
MoveNextAsync вызывается, когда из коллекции тянут очередной элемент. Он отпускает _backpressureSemaphore и ждёт, когда Flow запушит очередной элемент. Затем он возвращает признак того, закончилась ли коллекция. Этот флаг выставляет метод Finish.
Finish работает по тому же принципу, что и Emit, только вместо очередного элемента выставляет признак конца коллекции.
Теперь надо этот класс заиспользовать.

public static class AsyncEnumerableExtensions
    {
        public static IAsyncEnumerable<T> CollectEnumerable<T>(this IFlow<T> flow, CancellationToken cancellationToken = default)
        {
            var collector = new FlowCollectorEnumerator<T>();
            flow
                .Collect(collector, cancellationToken)
                .ContinueWith(_ => collector.Finish(), cancellationToken);
            return new FlowEnumerableAdapter<T>(collector);
        }
    }

    internal class FlowEnumerableAdapter<T> : IAsyncEnumerable<T>
    {
        private readonly IAsyncEnumerator<T> _enumerator;

        public FlowEnumerableAdapter(IAsyncEnumerator<T> enumerator)
        {
            _enumerator = enumerator;
        }

        public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
        {
            return _enumerator;
        }
    }

Extension-метод CollectEnumerable для IFlow создаёт FlowCollectorEnumerator и подписывает на него flow, по завершению которого вызывается метод Finish(). И возвращает FlowEnumerableAdapter, который является простейшей реализацией IAsyncEnumerable, использующей FlowCollectorEnumerator в качестве IEnumerator.
Пробуем, что получилось.

var clicks = emulator
   .Clicks()
   .OnNext(item => Log($"{item.Button} {item.X} {item.Y}"))
   .CollectEnumerable(cts.Token)
   .Where(click => click.Button == ClicksEmulator.Button.Right)
   .Select(click => click.Y < 540 ? "TOP" : "LEFT");

await foreach (var click in clicks)
{
   Log($"Clicked at: {click}");
}

Здесь мы получаем Flow clicks(), каждый клик логируем, затем превращаем IFlow в IAsyncEnumerable. Далее применяет известные Linq-операторы: оставляем только клики правой кнопкой и получаем, в какой части экрана они сделаны.
Далее рассмотрим пример посложнее. Будем заменять правый клик на двойной левый. Т.е. нам надо будет мапить каждый элемент не в какой-то другой, а в коллекцию. Либо во Flow, преобразуемый в коллекцию.

var clicks = emulator
   .Clicks()
   .OnNext(item => Log($"Original: {item.Button} {item.X} {item.Y}"))
   .CollectEnumerable(cts.Token)
   .Select(click => click.Button == ClicksEmulator.Button.Left
       ? Flow<ClicksEmulator.ClickEventArgs>(collector => collector.Emit(click))
       : Flow<ClicksEmulator.ClickEventArgs>(async collector =>
       {
           var changedClick =
               new ClicksEmulator.ClickEventArgs(click.X, click.Y, ClicksEmulator.Button.Left);
           await collector.Emit(changedClick);
           await Task.Delay(200);
           await collector.Emit(changedClick);
       })
   )
   .SelectMany(flow => flow.CollectEnumerable());

await foreach (var click in clicks)
{
   Log($"Changed: {click.Button} {click.X} {click.Y}");
}

Для этого в Linq существует оператор SelectMany. Его аналог в реактивных стримах — FlatMap. Сначала мапим каждый клик в IFlow: для левого клика — Flow с одним этим кликом, для правого — Flow из двух левых кликов с задержкой между ними. А затем в SelectMany превращаем IFlow в IAyncEnumerable.
И это работает! Т.е. многие операторы не обязательно реализовывать для IFlow — можно использовать Linq.

Заключение

Rx.Net — был и остаётся главным инструментом при работе с асинхронными последовательностями событий на C#. Но это довольно большая библиотека по объёмы кода. Как мы увидели, похожую функциональность можно получить значительно проще — всего лишь два интерфейса плюс некоторая обвязка. Это возможно благодаря использованию возможностей языка — async/await. Когда зарождался Rx, эту фичу в C# ещё не завезли.
Спасибо за внимание!

Автор: Иван

Источник


* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js