- PVSM.RU - https://www.pvsm.ru -
Tokio [1] — это фреймворк для разработки [2] сетевых масштабируемых приложений на Rust [3], использующий [4] компоненты для работы с асинхронным вводом/выводом. Tokio часто служит основой для других библиотек и реализаций высокопроизводительных протоколов. Несмотря на то что он является довольно молодым фреймворком, ему уже удалось стать частью экосистемы межплатформенного программного обеспечения.
И хотя Tokio критикуют [5] за излишнюю сложность в освоении, он уже используется [6] в продакшн-средах, поскольку код, написанный на Tokio, легче поддерживать. Например, его уже интегрировали в hyper [7], tower-grpc [8] и сonduit [9]. Мы тоже обратились [10] к этому решению при разработке нашей платформы Exonum [11].
Работа над Exonum началась в 2016 году, когда Tokio еще не существовал, поэтому сперва нами использовалась библиотека Mio v0.5. С появлением Tokio стало ясно, что используемая библиотека Mio устарела, более того, с её помощью было сложно организовывать событийную модель Exonum. Модель включала несколько типов событий (сетевые сообщения, таймауты, сообщения из REST API и др.), а также их сортировки по степени приоритетности.
Каждое событие влечет за собой изменение состояния узла, а значит их необходимо обрабатывать в одном потоке, в определенном порядке и по одному принципу. На Mio схему обработки каждого события приходилось описывать вручную, что при поддержании кода (добавлении/изменении параметров) могло оборачиваться большим количеством ошибок. Tokio позволил упростить этот процесс за счет встроенных функций.
Ниже мы расскажем о компонентах стека Tokio, которые позволяют эффективно организовывать обработку асинхронных задач.
[12]
/ изображение Kevin Dooley [13] CC [14]
По своей сути Tokio представляет [15] собой «обертку» над Mio. Mio [16] — это крэйт Rust, который предоставляет API для низкоуровневого ввода/вывода и не зависит от платформы — он работает с несколькими инструментами: epoll [17] в Linux, kqueue [18] в Mac OS или IOCP [19] в Windows. Таким образом, архитектура Tokio может быть представлена следующим образом:
Как видно из схемы выше, главным функциональным компонентом Tokio, является futures [20] — это crate Rust, который позволяет работать с асинхронным кодом в синхронной манере. Иными словами, библиотека дает возможность оперировать с кодом, который реализует еще не выполненные задачи, как будто они уже завершились.
По сути, futures — это значения, которые будут подсчитаны в будущем, но пока неизвестны. В формате futures можно представлять [21] разного рода события: запросы к базам данных, таймауты, длительные задачи для CPU, чтение информации с сокета и т. д., и синхронизировать их.
Примером future в реальной жизни может служить уведомление о доставке заказного письма почтой: по завершении доставки отправителю направляется уведомление об успешном получении адресатом письма. Получив уведомление, отправитель определяет, какие действия ему предпринимать дальше.
Разработчик Дэвид Симмонс (David Simmons [22]), сотрудничавший с компаниями Intel, Genuity и Sparco Media, в качестве примера организации асинхронного ввода/вывода с помощью futures приводит [4] обмен сообщениями с HTTP-сервером.
Представьте, что сервер каждый раз порождает новую нить (thread) для установленного соединения. При синхронном I/O система сперва считает байты по порядку, затем обработает информацию и запишет результат обратно. При этом в момент чтения/записи нить не сможет продолжать выполнение (она блокируется), пока операция не будет завершена. Это приводит к тому, что при большом числе соединений возникают трудности при масштабировании (так называемая проблема C10k [23]).
В случае асинхронной обработки, нить ставит в очередь запрос на I/O и продолжает выполнение (то есть не блокируется). Система осуществляет чтение/запись через какое-то время, а нить, прежде чем использовать результаты, спрашивает, был ли выполнен запрос. Таким образом, futures способны выполнять разные задачи, например, один может считывать запрос, второй — его обрабатывать, а третий — формировать ответ.
В крэйте futures определен типаж [24] Future, который является ядром всей библиотеки. Этот типаж определяется для объектов, которые выполняются не сразу, а спустя некоторое время. Его основная часть выражена в коде следующим образом:
trait Future {
type Item;
type Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
fn wait(self) -> Result<Self::Item, Self::Error> { ... }
fn map<F, U>(self, f: F) -> Map<Self, F>
where F: FnOnce(Self::Item) -> U { ... }
}
«Сердцем» типажа Future является метод poll() [25]. Он отвечает за пересылку индикатора завершения работы, ожидания вызова или посчитанного значения. При этом futures запускаются в контексте задачи (task [26]). Задача ассоциируется только с одним future, однако последний может быть составным, то есть содержать внутри себя несколько других futures, объединенных командами join_all() [27] или and_then() [28]. Например:
let client_to_server = copy(client_reader, server_writer)
.and_then(|(n, _, server_writer)| {
shutdown(server_writer).map(move |_| n)
});
За координацию task/future отвечает исполнитель (executor). Если есть несколько задач, запущенных одновременно, и часть из них ожидает результатов внешних асинхронных событий (например, чтение данных из сети/сокета), исполнитель должен эффективно распределить ресурсы процессора для оптимального их выполнения. На практике это происходит за счет «перебрасывания» мощностей процессора на задачи, которые могут быть выполнены, пока другие задачи заблокированы из-за отсутствия внешних данных.
В случае отложенной задачи, executor получает информацию о том, что ее можно выполнять, при помощи метода notify() [29]. Примером может служить исполнитель крэйта futures, который «просыпается» при вызове wait() — исходный код примера представлен [30] в официальном репозитории Rust на GitHub:
pub fn wait_future(&mut self) -> Result<F::Item, F::Error> {
ThreadNotify::with_current(|notify| {
loop {
match self.poll_future_notify(notify, 0)? {
Async::NotReady => notify.park(),
Async::Ready(e) => return Ok(e),
}
}
})
}
Кроме futures, Tokio работает [10] и с другими компонентами для асинхронного I/O — потоками (streams). В то время как future возвращает лишь один финальный результат, stream работает с серией событий и способен вернуть несколько результатов.
Снова пример из реальной жизни: периодические оповещения от датчика измерения температуры могут быть представлены в виде stream. Датчик будет регулярно отправлять значение измерения температуры пользователю через некоторые промежутки времени.
Типаж stream может выглядеть [31] следующим образом:
trait Stream {
type Item;
type Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
}
Механика работы со stream идентична той, что применяется к futures: используются похожие комбинаторы для преобразования и изменения деталей потока. Более того, stream легко может быть преобразован в future при помощи адаптера into_future.
Ниже мы предметно рассмотрим применение futures и stream в нашем фреймворке Exonum.
Как уже было сказано, разработчиками Exonum было принято решение использовать библиотеку Tokio для реализации цикла событий (event loop) во фреймворке.
Упрощенная схема организации событийной модели в Exonum может быть представлена следующим образом:
Каждый узел сети обменивается сообщениями с другими узлами. Все входящие сообщения попадают в очередь сетевых событий, куда кроме них также попадают внутренние события (тайм-ауты и события внутреннего API). Каждый тип события формирует отдельный поток (stream). Но обработка таких событий, как было отмечено ранее, — процесс синхронный, поскольку влечет за собой изменения состояния узла. Event Agregator [32] объединяет несколько цепочек событий в одну и отправляет их с помощью канала в event loop, где они обрабатываются в порядке установленного приоритета.
При коммуникации между узлами Exonum выполняет [33] следующие связанные операции на каждом из них:
Подключение к узлу N (открытие сокета, настройка сокета) —> Получение сообщений узла N (получение байтов из сокета, разбиение байтов на сообщения) —> Пересылка сообщений в канал текущего узла
let connect_handle = Retry::spawn(handle.clone(), strategy, action)
.map_err(into_other)
// Configure socket
.and_then(move |sock| {
sock.set_nodelay(network_config.tcp_nodelay)?;
let duration =
network_config.tcp_keep_alive.map(Duration::from_millis);
sock.set_keepalive(duration)?;
Ok(sock)
})
// Connect socket with the outgoing channel
.and_then(move |sock| {
trace!("Established connection with peer={}", peer);
let stream = sock.framed(MessagesCodec::new(max_message_len));
let (sink, stream) = stream.split();
let writer = conn_rx
.map_err(|_| other_error("Can't send data into socket"))
.forward(sink);
let reader = stream.for_each(result_ok);
reader
.select2(writer)
.map_err(|_| other_error("Socket error"))
.and_then(|res| match res {
Either::A((_, _reader)) => Ok("by reader"),
Either::B((_, _writer)) => Ok("by writer"),
})
})
Ввиду того, что futures дает возможность комбинировать различные абстракции в цепочку без потери производительности системы, программный код получается разбит на маленькие функциональные блоки, а, следовательно, его становится легче поддерживать.
Использование сети является нетривиальной задачей. Для работы с узлом необходимо к нему подключиться, а также обеспечить логику переподключений в случае разрыва [34]:
.map_err(into_other)
Помимо этого, необходимо произвести настройку сокета [35]:
.and_then(move |sock| {
sock.set_nodelay(network_config.tcp_nodelay)?;
let duration =
network_config.tcp_keep_alive.map(Duration::from_millis);
sock.set_keepalive(duration)?;
Ok(sock)
})
И парсить [36] входящие байты как сообщения:
let (sink, stream) = stream.split();
Каждый из приведенных блоков кода, в свою очередь, состоит из более мелких участков. Однако благодаря тому, что futures позволяет свободно комбинировать эти блоки, нам нет необходимости задумываться о внутреннем строении каждого из них.
В завершение хотелось бы отметить, что на данный момент Exonum в качестве API использует несколько устаревшую версию iron [37] на базе библиотеки hyper [38]. Однако сейчас мы рассматриваем вариант перехода на чистый hyper, который использует Tokio.
Автор: alinatestova
Источник [42]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/detsentralizovanny-e-seti/275978
Ссылки в тексте:
[1] Tokio: https://github.com/tokio-rs/tokio
[2] разработки: https://medium.com/@carllerche/announcing-tokio-df6bb4ddb34
[3] Rust: https://habrahabr.ru/company/bitfury/blog/349786/
[4] использующий: https://cafbit.com/post/tokio_internals/
[5] критикуют: https://www.reddit.com/r/rust/comments/7klghl/tokio_internals_understanding_rusts_asynchronous/
[6] используется: https://news.ycombinator.com/item?id=15972593&ref=hvper.com
[7] hyper: http://hyper.rs/
[8] tower-grpc: https://github.com/tower-rs/tower-grpc
[9] сonduit: https://github.com/runconduit/conduit
[10] обратились: https://habrahabr.ru/company/bitfury/blog/342970/
[11] Exonum: https://exonum.com/
[12] Image: https://habrahabr.ru/company/bitfury/blog/351824/
[13] Kevin Dooley: https://www.flickr.com/photos/pagedooley/7011008137/
[14] CC: https://creativecommons.org/licenses/by/2.0/
[15] представляет: https://manishearth.github.io/blog/2018/01/10/whats-tokio-and-async-io-all-about/
[16] Mio: https://docs.rs/mio/0.6.10/mio/
[17] epoll: https://ru.wikipedia.org/wiki/Epoll
[18] kqueue: https://developer.apple.com/library/content/documentation/Darwin/Conceptual/FSEvents_ProgGuide/KernelQueues/KernelQueues.html
[19] IOCP: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx
[20] futures: https://en.wikipedia.org/wiki/Futures_and_promises
[21] представлять: https://tokio.rs/docs/getting-started/futures/
[22] David Simmons: https://twitter.com/simmons
[23] C10k: https://ru.wikipedia.org/wiki/C10k
[24] типаж: https://ru.wikipedia.org/wiki/%D0%A2%D0%B8%D0%BF%D0%B0%D0%B6_(%D0%B0%D0%B1%D1%81%D1%82%D1%80%D0%B0%D0%BA%D1%82%D0%BD%D1%8B%D0%B9_%D1%82%D0%B8%D0%BF)
[25] poll(): https://docs.rs/futures/0.1.17/futures/future/trait.Future.html#tymethod.poll
[26] task: https://docs.rs/futures/0.1.17/futures/task/index.html
[27] join_all(): https://docs.rs/futures/0.1.17/futures/future/fn.join_all.html
[28] and_then(): https://docs.rs/futures/0.1.17/futures/future/trait.Future.html#method.and_then
[29] notify(): https://docs.rs/futures/0.1.17/futures/executor/trait.Notify.html#tymethod.notify
[30] представлен: https://github.com/rust-lang-nursery/futures-rs/blob/0.1.17/src/task_impl/std/mod.rs#L233
[31] выглядеть: https://docs.rs/futures/*/futures/stream/trait.Stream.html
[32] Event Agregator: https://github.com/exonum/exonum/blob/master/exonum/src/events/mod.rs#L164-L218
[33] выполняет: https://github.com/exonum/exonum/blob/master/exonum/src/events/network.rs#L145-L175
[34] разрыва: https://github.com/exonum/exonum/blob/master/exonum/src/events/network.rs#L146
[35] настройку сокета: https://github.com/exonum/exonum/blob/master/exonum/src/events/network.rs#L149-L154
[36] парсить: https://github.com/exonum/exonum/blob/master/exonum/src/events/network.rs#L160
[37] iron: https://crates.io/crates/iron
[38] hyper: https://github.com/hyperium/hyper
[39] Как создать блокчейн-проект на Exonum: краткое руководство: https://habrahabr.ru/company/bitfury/blog/342208/
[40] Vk: https://vk.com/bitfuryrussia
[41] Fb: https://www.facebook.com/BitfuryRussia/
[42] Источник: https://habrahabr.ru/post/351824/?utm_campaign=351824
Нажмите здесь для печати.