- PVSM.RU - https://www.pvsm.ru -
Java позволяет писать последовательный, параллельный и асинхронный код. Асинхронный — это когда регистрируется callback, который запустится после какого-либо события (например, файл прочитан). Это позволяет избежать блокировки потока, но ломает последовательность выполнения, так что на java пишут такой код скорее когда нет других вариантов. Kotlin даёт решение — корутины, с ними асинхронный код выглядит почти так же, как последовательный.
По корутинам мало статей. Конкретных примеров, показывающих их преимущества — ещё меньше.
Что нашёл:
Последнее интересно — большинство enterprise приложений всё время что-нибудь ждут: БД, другие приложения, изредка и файл нужно прочесть. И всё это может быть полностью асинхронным, а значит всё приложение можно перевести на асинхронную обработку запросов.
Итак, посмотрим как ведут себя корутины под нагрузкой.
Под NIO есть готовая обвязка для корутин [6]. Пишем код:
suspend fun readFileAsync(): String {
val channel = AsynchronousFileChannel.open(filePath)
val bytes = ByteArray(size)
val byteBuffer = ByteBuffer.wrap(bytes)
channel.aRead(byteBuffer, 0L) /*(1)*/
/*(2)*/ channel.close()
return bytes.toString(Charset.forName("UTF-8"))
}
fun readFileSync() = file.inputStream().use {
it.readBytes(size).toString(Charset.forName("UTF-8"))
}
Не буду заострять внимание на синтаксисе, для этого есть короткий guide [7]. Суть в том, что в строчке (1) вызывается метод из NIO, в котором регистрируется callback, который продолжит исполнение в точке (2). Добрый компилятор сохраняет локальные переменные метода и восстанавливает их для продолжения программы. Между (1) и (2), пока файл читается с диска, поток свободен и может, к примеру, начать читать второй файл. В случае с блокировкой для второго чтения файла, пока не окончилось первое чтение, пришлось бы создавать ещё один поток и он бы тоже заблокировался.
Берем JMH и измеряем [8] единичный вызов. Итог: для HDD разница в пределах погрешности, для SSD NIO в корутине на 7.5% ± 0.01% быстрее. Разница небольшая, но это и не удивительно — всё упирается в скорость диска. Но в случае корутин потоки на время чтения не блокируются и могут делать другую работу.
Посмотрим, сколько ещё работы можно сделать, пока мы читаем заданное количество данных с диска. Для этого будем в ForkJoinPool подбрасывать IO и CPU задачи в определенном соотношении. Когда у нас выполнится 400 IO задач, подсчитаем, сколько чисто CPU задач было отработано. Бенчмарк [9]
Затраченное время (ms) | Сколько CPU задач успели выполнить | |||
---|---|---|---|---|
Доля IO задач | Sync | Async | Sync | Async |
3/4 | 117 | 116 | 497 | 584 (+17%) |
1/2 | 128 | 127 | 1522 | 1652 (+8%) |
1/4 | 163 | 164 | 4958 | 4960 |
1/8 | 230 | 238 (+3%) | 11381 | 11495 (+1%) |
Разница есть. Замерял на HDD, на котором единичное чтение почти не отличалось. Отдельно хочу отметить последнюю строку: await генерит относительно большое количество объектов, что дополнительно нагружает GC, это заметно на фоне нашей CPU таски, которая создает 50 объектов. Замерял отдельно: чем больше таска создает объектов, тем меньше разница между Future и await вплоть до равенства.
Нашлась одна библиотека [10], которая умеет работать с базой без блокировок. Она написана на scala и умеет работать только с MySql и Postgres. Если кто знает другие библиотеки — напишите в комментариях.
Await для Future из scala:
suspend fun <T> Future<T>.await(): T = suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
onComplete({
if (it.isSuccess) {
cont.resume(it.get())
} else {
cont.resumeWithException(it.failed().get())
}
}, ExecutionContext.fromExecutor(ForkJoinPool.commonPool()))
}
Сделал пару табличек в Postgres, специально без индексов, чтобы тайминг был заметен, базу запустил в докере, выдав 4 логических процессора. СonnectionPool ограничил в 4. Каждый запрос к приложению делал три последовательных обращения к базе.
Spring позволяет довольно просто сделать асинхронный http сервер, для этого достаточно из метода контроллера возвращать DeferredResult вместо MyClasss, а уже потом заполнять DeferredResult (в другом потоке). Для удобства написал маленькую обертку (цифрами обозначил реальный порядок выполнения):
@GetMapping("/async")
fun async(): DeferredResult<Response> = asyncResponse {
(4) //code that produce Response
}
fun <R> asyncResponse(body: suspend CoroutineScope.() -> R): DeferredResult<R> {
(1) val result = DeferredResult<R>() //пока пустой
(2) launch(CommonPool) { //запускаем корутину
try {
(5) result.setResult(body.invoke(this))
} catch (e: Exception) {
(5') result.setErrorResult(e)
}
}
(3) return result //возвращаем DeferredResult, всё ещё пустой
}
Отдельной проблемой было решить, какое ожидание соединения из пула выставить для sync и async вариантов. Для sync выставляется в мс, а для async — в штуках. Решил, что средний запрос в базу ~30мс, поэтому время поделил на 30 — получил штуки (оказалось, ошибся где-то на треть).
Приложение запустил, выдав один логический процессор. На другой машине поставил яндекс танк [11] и расстрелял приложение. К моему удивлению разницы не было… до 50 rps (слева async [12], справа sync [13]).
После 50 rps переставало хватать 4 соединений (средний timing к этому моменту — 80 rps) и синхронная версия к 66rps доходила до 11 секунд и умирала — начинала отвечать только timeout на любой запрос (даже если убрать нагрузку вообще), а асинхронная к 54rps переходила на 730мс и начинала обрабатывать ровно столько запросов, сколько позволяла база, по всем остальным — 500, при этом ошибки практически всегда отбрасывались моментально.
При запуске приложения с восемью логическими процессорами картина немного изменилась (слева async [14], справа sync [15])
Синхронная версия от 60 до 80rps отвечала за 3 секунды, отбрасывая лишние запросы, и полностью переставала отвечать только к 91rps.
Отчего так вышло? Tomcat создает до 200 (по умолчанию) потоков на обработку входящих запросов. Когда запросов приходит больше, чем можно обработать, они создаются все и через некоторое время оказываются все заблокированными. При этом каждый запрос должен получить коннект 3 раза и каждый раз ждет по секунде. В случае с асинхронным вариантом, запрос не ждет какое-либо время, а смотрит, сколько ещё желающих на этот ресурс и отсылает ошибку, если желающих слишком много. В моём случае ограничение было в 33 при 4 коннектах, что, наверное, многовато. При уменьшении этого числа получим более приемлемую скорость ответа при перегрузках сервера.
Плавный отказ — это хорошо, но интересно, можно ли получить прирост производительности в штатных ситуациях.
На этот раз приложение ходило по http на заглушку, заглушка отвечала с задержкой (от 1.5 мс). Делал два варианта: 100 последовательных и 100 параллельных (batch) запросов на заглушку. Замерял с помощью JMH в 1 и 6 потоков (имитировал разную нагрузку).
Последовательно (avg ms) | Параллельно (avg ms) | |||||
---|---|---|---|---|---|---|
Sync | Async | Δ | Sync | Async | Δ | |
1core / 1 jmh thread | 160.3 ± 1.8 | 154.1 ± 1.0 | 4.0% ± 1.7% | 163.9 ± 2.4 | 10.7 ± 0.3 | 1438.3% ± 4.6% |
2core / 1 jmh thread | 159.3 ± 1.0 | 156.3 ± 0.7 | 1.9% ± 1.1% | 57.6 ± 0.5 | 15.4 ± 0.2 | 274.0% ± 1.9% |
4core / 1 jmh thread | 159.0 ± 1.1 | 157.4 ± 1.3 | 1.0% ± 1.5% | 25.7 ± 0.2 | 14.8 ± 0.3 | 74.3% ± 2.8% |
1core / 6 jmh thread | 146.8 ± 2.5 | 146.3 ± 2.5 | 0.4% ± 3.4% | 984.8 ± 34.2 | 79.3 ± 3.7 | 1141.6% ± 5.1% |
2core / 6 jmh thread | 151.3 ± 1.6 | 143.8 ± 1.9 | 5.2% ± 2.3% | 343.9 ± 17.2 | 86.7 ± 3.7 | 296.5% ± 6.3% |
4core / 6 jmh thread | 152.3 ± 1.5 | 144.7 ± 1.2 | 5.2% ± 1.8% | 135.0 ± 3.0 | 81.7 ± 4.8 | 65.2% ± 8.1% |
Даже при последовательных запросах мы получаем прирост, ну а при батче… Конечно, если мы добавим процессоров, картина для синхронного варианта станет значительно лучше. Так, увеличив ресурсы в 4 раза, мы получим прирост синхронного варианта в 6.5 раз, но так и не достигнем скорости async. При этом скорость работы async не зависит от количества процессоров.
Корутины можно и нужно применять. На них можно писать приложения, которым потребуется меньше процессоров, чтобы работать с той же скоростью. Моё впечатление — в большинстве случаев достаточно будет 1-2 ядер.
Да, ещё и подарок в виде отказоустойчивости.
Я надеюсь, постепенно появятся best practices, так как сейчас паттерны по работе с channels лучше искать в Go, async/await — в C#, yeald — C# и python.
P.S.:
исходники [17]
Послевкусие от Kotlin, часть 1 [18]
Послевкусие от Kotlin, часть 2 [19]
Автор: gnefedev
Источник [1]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/java/265962
Ссылки в тексте:
[1] Image: https://habrahabr.ru/post/339618/
[2] Избавление от callback hell: https://habrahabr.ru/post/314656/
[3] channels: https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md#channels
[4] actors: https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md#actors
[5] Совет от Романа Елизарова: https://habrahabr.ru/company/alfa/blog/336228/
[6] готовая обвязка для корутин: https://github.com/Kotlin/kotlinx.coroutines/tree/master/integration/kotlinx-coroutines-nio
[7] guide: https://kotlinlang.ru/docs/reference/coroutines.html
[8] измеряем: https://github.com/gnefedev/experiments/blob/master/coroutine/src/main/kotlin/com/gnefedev.coroutine/benchmark/BenchmarkOfIO.kt
[9] Бенчмарк: https://github.com/gnefedev/experiments/blob/master/coroutine/src/main/kotlin/com/gnefedev.coroutine/benchmark/BenchmarkOfBlocking.kt
[10] библиотека: https://github.com/mauricio/postgresql-async
[11] яндекс танк: https://tech.yandex.ru/tank/
[12] async: https://overload.yandex.net/50485#tab=test_data&tags=&plot_groups=main&machines=&metrics=&slider_start=1507399813&slider_end=1507400410
[13] sync: https://overload.yandex.net/50480#tab=test_data&tags=&plot_groups=main&machines=&metrics=&slider_start=1507390008&slider_end=1507390617
[14] async: https://overload.yandex.net/50488#tab=test_data&tags=&plot_groups=main&machines=&metrics=&slider_start=1507402172&slider_end=1507402769
[15] sync: https://overload.yandex.net/50492#tab=test_data&tags=&plot_groups=main&machines=&metrics=&slider_start=1507402907&slider_end=1507403517
[16] CoroutineContext: https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md#coroutine-context-and-dispatchers
[17] исходники: https://github.com/gnefedev/experiments/tree/master/coroutine
[18] Послевкусие от Kotlin, часть 1: https://habrahabr.ru/post/331280/
[19] Послевкусие от Kotlin, часть 2: https://habrahabr.ru/post/337002/
Нажмите здесь для печати.