- PVSM.RU - https://www.pvsm.ru -

Взгляд на Tokio: как устроен этот асинхронный обработчик событий

И для чего он используется в фреймворке для приватных блокчейнов Exonum


Tokio [1] — это фреймворк для разработки [2] сетевых масштабируемых приложений на Rust [3], использующий [4] компоненты для работы с асинхронным вводом/выводом. Tokio часто служит основой для других библиотек и реализаций высокопроизводительных протоколов. Несмотря на то что он является довольно молодым фреймворком, ему уже удалось стать частью экосистемы межплатформенного программного обеспечения.

И хотя Tokio критикуют [5] за излишнюю сложность в освоении, он уже используется [6] в продакшн-средах, поскольку код, написанный на Tokio, легче поддерживать. Например, его уже интегрировали в hyper [7], tower-grpc [8] и сonduit [9]. Мы тоже обратились [10] к этому решению при разработке нашей платформы Exonum [11].

Работа над Exonum началась в 2016 году, когда Tokio еще не существовал, поэтому сперва нами использовалась библиотека Mio v0.5. С появлением Tokio стало ясно, что используемая библиотека Mio устарела, более того, с её помощью было сложно организовывать событийную модель Exonum. Модель включала несколько типов событий (сетевые сообщения, таймауты, сообщения из REST API и др.), а также их сортировки по степени приоритетности.

Каждое событие влечет за собой изменение состояния узла, а значит их необходимо обрабатывать в одном потоке, в определенном порядке и по одному принципу. На Mio схему обработки каждого события приходилось описывать вручную, что при поддержании кода (добавлении/изменении параметров) могло оборачиваться большим количеством ошибок. Tokio позволил упростить этот процесс за счет встроенных функций.

Ниже мы расскажем о компонентах стека Tokio, которые позволяют эффективно организовывать обработку асинхронных задач.

Взгляд на Tokio: как устроен этот асинхронный обработчик событий - 1 [12]
/ изображение Kevin Dooley [13] CC [14]

Архитектура Tokio

По своей сути Tokio представляет [15] собой «обертку» над Mio. Mio [16] — это крэйт Rust, который предоставляет API для низкоуровневого ввода/вывода и не зависит от платформы — он работает с несколькими инструментами: epoll [17] в Linux, kqueue [18] в Mac OS или IOCP [19] в Windows. Таким образом, архитектура Tokio может быть представлена следующим образом:

Взгляд на Tokio: как устроен этот асинхронный обработчик событий - 2
Futures

Как видно из схемы выше, главным функциональным компонентом Tokio, является futures [20] — это crate Rust, который позволяет работать с асинхронным кодом в синхронной манере. Иными словами, библиотека дает возможность оперировать с кодом, который реализует еще не выполненные задачи, как будто они уже завершились.

По сути, futures — это значения, которые будут подсчитаны в будущем, но пока неизвестны. В формате futures можно представлять [21] разного рода события: запросы к базам данных, таймауты, длительные задачи для CPU, чтение информации с сокета и т. д., и синхронизировать их.

Примером future в реальной жизни может служить уведомление о доставке заказного письма почтой: по завершении доставки отправителю направляется уведомление об успешном получении адресатом письма. Получив уведомление, отправитель определяет, какие действия ему предпринимать дальше.  

Разработчик Дэвид Симмонс (David Simmons [22]), сотрудничавший с компаниями Intel, Genuity и Sparco Media, в качестве примера организации асинхронного ввода/вывода с помощью futures приводит [4] обмен сообщениями с HTTP-сервером.

Представьте, что сервер каждый раз порождает новую нить (thread) для установленного соединения. При синхронном I/O система сперва считает байты по порядку, затем обработает информацию и запишет результат обратно. При этом в момент чтения/записи нить не сможет продолжать выполнение (она блокируется), пока операция не будет завершена. Это приводит к тому, что при большом числе соединений возникают трудности при масштабировании (так называемая проблема C10k [23]).

В случае асинхронной обработки, нить ставит в очередь запрос на I/O и продолжает выполнение (то есть не блокируется). Система осуществляет чтение/запись через какое-то время, а нить, прежде чем использовать результаты, спрашивает, был ли выполнен запрос. Таким образом, futures способны выполнять разные задачи, например, один может считывать запрос, второй — его обрабатывать, а третий — формировать ответ.

В крэйте futures определен типаж [24] Future, который является ядром всей библиотеки. Этот типаж определяется для объектов, которые выполняются не сразу, а спустя некоторое время. Его основная часть выражена в коде следующим образом:

trait Future {
    type Item;
    type Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
    fn wait(self) -> Result<Self::Item, Self::Error> { ... }

    fn map<F, U>(self, f: F) -> Map<Self, F>
        where F: FnOnce(Self::Item) -> U { ... }
}

«Сердцем» типажа Future является метод poll() [25]. Он отвечает за пересылку индикатора завершения работы, ожидания вызова или посчитанного значения. При этом futures запускаются в контексте задачи (task [26]). Задача ассоциируется только с одним future, однако последний может быть составным, то есть содержать внутри себя несколько других futures, объединенных командами join_all() [27] или and_then() [28]. Например:

let client_to_server = copy(client_reader, server_writer)
                    .and_then(|(n, _, server_writer)| {
                        shutdown(server_writer).map(move |_| n)
                    });

За координацию task/future отвечает исполнитель (executor). Если есть несколько задач, запущенных одновременно, и часть из них ожидает результатов внешних асинхронных событий (например, чтение данных из сети/сокета), исполнитель должен эффективно распределить ресурсы процессора для оптимального их выполнения. На практике это происходит за счет «перебрасывания» мощностей процессора на задачи, которые могут быть выполнены, пока другие задачи заблокированы из-за отсутствия внешних данных.

В случае отложенной задачи, executor получает информацию о том, что ее можно выполнять, при помощи метода notify() [29]. Примером может служить исполнитель крэйта futures, который «просыпается» при вызове wait() — исходный код примера представлен [30] в официальном репозитории Rust на GitHub:

    pub fn wait_future(&mut self) -> Result<F::Item, F::Error> {
        ThreadNotify::with_current(|notify| {

            loop {
                match self.poll_future_notify(notify, 0)? {
                    Async::NotReady => notify.park(),
                    Async::Ready(e) => return Ok(e),
                }
            }
        })
    }

Streams

Кроме futures, Tokio работает [10] и с другими компонентами для асинхронного I/O — потоками (streams). В то время как future возвращает лишь один финальный результат, stream работает с серией событий и способен вернуть несколько результатов.

Снова пример из реальной жизни: периодические оповещения от датчика измерения температуры могут быть представлены в виде stream. Датчик будет регулярно отправлять значение измерения температуры пользователю через некоторые промежутки времени.

Типаж stream может выглядеть [31] следующим образом:

trait Stream {
    type Item;
    type Error;
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
}

Механика работы со stream идентична той, что применяется к futures: используются похожие комбинаторы для преобразования и изменения деталей потока. Более того, stream легко может быть преобразован в future при помощи адаптера into_future.

Ниже мы предметно рассмотрим применение futures и stream в нашем фреймворке Exonum.

Tokio в Exonum

Как уже было сказано, разработчиками Exonum было принято решение использовать библиотеку Tokio для реализации цикла событий (event loop) во фреймворке.

Упрощенная схема организации событийной модели в Exonum может быть представлена следующим образом:

Взгляд на Tokio: как устроен этот асинхронный обработчик событий - 3

Каждый узел сети обменивается сообщениями с другими узлами. Все входящие сообщения попадают в очередь сетевых событий, куда кроме них также попадают внутренние события (тайм-ауты и события внутреннего API). Каждый тип события формирует отдельный поток (stream). Но обработка таких событий, как было отмечено ранее, — процесс синхронный, поскольку влечет за собой изменения состояния узла. Event Agregator [32] объединяет несколько цепочек событий в одну и отправляет их с помощью канала в event loop, где они обрабатываются в порядке установленного приоритета.

При коммуникации между узлами Exonum выполняет [33] следующие связанные операции на каждом из них:
 

Подключение к узлу N (открытие сокета, настройка сокета) —> Получение сообщений узла N (получение байтов из сокета, разбиение байтов на сообщения) —> Пересылка сообщений в канал текущего узла

let connect_handle = Retry::spawn(handle.clone(), strategy, action)
            .map_err(into_other)
            // Configure socket
            .and_then(move |sock| {
                sock.set_nodelay(network_config.tcp_nodelay)?;
                let duration =
                    network_config.tcp_keep_alive.map(Duration::from_millis);
                sock.set_keepalive(duration)?;
                Ok(sock)
            })
            // Connect socket with the outgoing channel
            .and_then(move |sock| {
                trace!("Established connection with peer={}", peer);

                let stream = sock.framed(MessagesCodec::new(max_message_len));
                let (sink, stream) = stream.split();

                let writer = conn_rx
                    .map_err(|_| other_error("Can't send data into socket"))
                    .forward(sink);
                let reader = stream.for_each(result_ok);

                reader
                    .select2(writer)
                    .map_err(|_| other_error("Socket error"))
                    .and_then(|res| match res {
                        Either::A((_, _reader)) => Ok("by reader"),
                        Either::B((_, _writer)) => Ok("by writer"),
                    })
            })

Ввиду того, что futures дает возможность комбинировать различные абстракции в цепочку без потери производительности системы, программный код получается разбит на маленькие функциональные блоки, а, следовательно, его становится легче поддерживать.

Использование сети является нетривиальной задачей. Для работы с узлом необходимо к нему подключиться, а также обеспечить логику переподключений в случае разрыва [34]:

.map_err(into_other)

Помимо этого, необходимо произвести настройку сокета [35]:

.and_then(move |sock| {
                sock.set_nodelay(network_config.tcp_nodelay)?;
                let duration =
                    network_config.tcp_keep_alive.map(Duration::from_millis);
                sock.set_keepalive(duration)?;
                Ok(sock)
            })

И парсить [36] входящие байты как сообщения:

let (sink, stream) = stream.split();

Каждый из приведенных блоков кода, в свою очередь, состоит из более мелких участков. Однако благодаря тому, что futures позволяет свободно комбинировать эти блоки, нам нет необходимости задумываться о внутреннем строении каждого из них.

В завершение хотелось бы отметить, что на данный момент Exonum в качестве API использует несколько устаревшую версию iron [37] на базе библиотеки hyper [38]. Однако сейчас мы рассматриваем вариант перехода на чистый hyper, который использует Tokio.


Предлагаем вам еще несколько материалов по теме из нашего блога на Хабре:

Автор: alinatestova

Источник [42]


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/detsentralizovanny-e-seti/275978

Ссылки в тексте:

[1] Tokio: https://github.com/tokio-rs/tokio

[2] разработки: https://medium.com/@carllerche/announcing-tokio-df6bb4ddb34

[3] Rust: https://habrahabr.ru/company/bitfury/blog/349786/

[4] использующий: https://cafbit.com/post/tokio_internals/

[5] критикуют: https://www.reddit.com/r/rust/comments/7klghl/tokio_internals_understanding_rusts_asynchronous/

[6] используется: https://news.ycombinator.com/item?id=15972593&ref=hvper.com

[7] hyper: http://hyper.rs/

[8] tower-grpc: https://github.com/tower-rs/tower-grpc

[9] сonduit: https://github.com/runconduit/conduit

[10] обратились: https://habrahabr.ru/company/bitfury/blog/342970/

[11] Exonum: https://exonum.com/

[12] Image: https://habrahabr.ru/company/bitfury/blog/351824/

[13] Kevin Dooley: https://www.flickr.com/photos/pagedooley/7011008137/

[14] CC: https://creativecommons.org/licenses/by/2.0/

[15] представляет: https://manishearth.github.io/blog/2018/01/10/whats-tokio-and-async-io-all-about/

[16] Mio: https://docs.rs/mio/0.6.10/mio/

[17] epoll: https://ru.wikipedia.org/wiki/Epoll

[18] kqueue: https://developer.apple.com/library/content/documentation/Darwin/Conceptual/FSEvents_ProgGuide/KernelQueues/KernelQueues.html

[19] IOCP: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx

[20] futures: https://en.wikipedia.org/wiki/Futures_and_promises

[21] представлять: https://tokio.rs/docs/getting-started/futures/

[22] David Simmons: https://twitter.com/simmons

[23] C10k: https://ru.wikipedia.org/wiki/C10k

[24] типаж: https://ru.wikipedia.org/wiki/%D0%A2%D0%B8%D0%BF%D0%B0%D0%B6_(%D0%B0%D0%B1%D1%81%D1%82%D1%80%D0%B0%D0%BA%D1%82%D0%BD%D1%8B%D0%B9_%D1%82%D0%B8%D0%BF)

[25] poll(): https://docs.rs/futures/0.1.17/futures/future/trait.Future.html#tymethod.poll

[26] task: https://docs.rs/futures/0.1.17/futures/task/index.html

[27] join_all(): https://docs.rs/futures/0.1.17/futures/future/fn.join_all.html

[28] and_then(): https://docs.rs/futures/0.1.17/futures/future/trait.Future.html#method.and_then

[29] notify(): https://docs.rs/futures/0.1.17/futures/executor/trait.Notify.html#tymethod.notify

[30] представлен: https://github.com/rust-lang-nursery/futures-rs/blob/0.1.17/src/task_impl/std/mod.rs#L233

[31] выглядеть: https://docs.rs/futures/*/futures/stream/trait.Stream.html

[32] Event Agregator: https://github.com/exonum/exonum/blob/master/exonum/src/events/mod.rs#L164-L218

[33] выполняет: https://github.com/exonum/exonum/blob/master/exonum/src/events/network.rs#L145-L175

[34] разрыва: https://github.com/exonum/exonum/blob/master/exonum/src/events/network.rs#L146

[35] настройку сокета: https://github.com/exonum/exonum/blob/master/exonum/src/events/network.rs#L149-L154

[36] парсить: https://github.com/exonum/exonum/blob/master/exonum/src/events/network.rs#L160

[37] iron: https://crates.io/crates/iron

[38] hyper: https://github.com/hyperium/hyper

[39] Как создать блокчейн-проект на Exonum: краткое руководство: https://habrahabr.ru/company/bitfury/blog/342208/

[40] Vk: https://vk.com/bitfuryrussia

[41] Fb: https://www.facebook.com/BitfuryRussia/

[42] Источник: https://habrahabr.ru/post/351824/?utm_campaign=351824