Современный подход к конкурентности в Android: корутины в Kotlin

в 5:35, , рубрики: android, java, kotlin, Retrofit. акторы, Блог компании Издательский дом «Питер», высокая производительность, многопоточность, Программирование

Привет!

Напоминаем, что у нас уже открыт предзаказ на долгожданную книгу о языке Kotlin из знаменитой серии Big Nerd Ranch Guides. Сегодня мы решили предложить вашему вниманию перевод статьи, рассказывающей о корутинах Kotlin и о правильной работе с потоками в Android. Тема обсуждается очень активно, поэтому для полноты картины также рекомендуем посмотреть эту статью с Хабра и этот подробный пост из блога компании Axmor Software.

Современный фреймворк для обеспечения конкурентности в Java/Android учиняет ад обратных вызовов и приводит к блокирующим состояниям, так как в Android нет достаточно простого способа гарантировать потокобезопасность.

Корутины Kotlin – это очень эффективный и полный инструментарий, позволяющий гораздо проще и производительнее управлять конкурентностью.

Приостановка и блокирование: в чем разница

Корутины не заменяют потоков, а скорее дают фреймворк для управления ими. Философия корутин заключается в определении контекста, позволяющего ожидать, пока завершатся фоновые операции, не блокируя при этом основного потока.

Цель корутин в данном случае – обойтись без обратных вызовов и упростить конкуренцию.

Простейший пример

Для начала возьмем самый простой пример: запустим корутину в контексте Main (главный поток). В нем мы извлечем изображение из потока IO и отправим это изображение на обработку обратно в Main.

launch(Dispatchers.Main) {
    val image = withContext(Dispatchers.IO) { getImage() } // получить из контекста IO
    imageView.setImageBitmap(image) // Возвращаемся в главный поток
}

Код прост как однопоточная функция. Причем, пока getImage выполняется в выделенном пуле потоков IO, главный поток свободен и может взяться за любую другую задачу! Функция withContext приостанавливает текущую корутину, пока работает ее действие (getImage()). Как только getImage() возвратится и looper из главного потока станет доступен, корутина возобновит работу в главном потоке и вызовет imageView.setImageBitmap(image).

Второй пример: теперь нам требуется, чтобы были выполнены 2 фоновые задачи, чтобы ими можно было воспользоваться. Мы применим дуэт async/await, чтобы две эти задачи выполнялись параллельно, и воспользуемся их результатом в главном потоке, как только обе задачи будут готовы:

val job = launch(Dispatchers.Main) {
    val deferred1 = async(Dispatchers.Default) { getFirstValue() }
    val deferred2 = async(Dispatchers.IO) { getSecondValue() }
    useValues(deferred1.await(), deferred2.await())
}
job.join() // приостанавливает выполнение актуальной корутины, пока задача не будет выполнена

async подобен launch, но возвращает deferred (сущность Kotlin, эквивалентная Future), поэтому ее результат можно получить при помощи await(). При вызове без параметров она работает в контексте, задаваемом по умолчанию для текущей области видимости.

Опять же, главный поток остается свободен, пока мы дожидаемся наших 2 значений.
Как видите, функция launch возвращает Job, который можно использовать для ожидания, пока операция завершится – это делается при помощи функции join(). Она работает как и в любом другом языке, с той оговоркой, что просто приостанавливает корутину, а не блокирует поток.

Диспетчеризация

Диспетчеризация – ключевая концепция при работе с корутинами. Это действие, позволяющее «перепрыгнуть» от одного потока к другому.

Рассмотрим, как в java выглядит эквивалент для диспетчеризации в Main, то есть,

runOnUiThread:
public final void runOnUiThread(Runnable action) {
    if (Thread.currentThread() != mUiThread) {
        mHandler.post(action); // Диспетчеризация
    } else {
        action.run(); // Немедленное выполнение
    }
}

Реализация контекста Main для Android – это диспетчер на основе Handler. Итак, это действительно очень подходящая реализация:

launch(Dispatchers.Main) { ... }
        vs
launch(Dispatchers.Main, CoroutineStart.UNDISPATCHED) { ... }


// Начиная с kotlinx 0.26:
launch(Dispatchers.Main.immediate) { ... }

launch(Dispatchers.Main) посылает Runnable в Handler, так что его код выполняется не сразу.

launch(Dispatchers.Main, CoroutineStart.UNDISPATCHED) немедленно выполнит свое лямбда-выражение в текущем потоке.

Dispatchers.Main гарантирует, что когда корутина возобновит работу, она будет направлена в главный поток; кроме того, Handler используется здесь как нативная реализация Android для отправки в цикл событий приложения.

Точная реализация выглядит так:

val Main: HandlerDispatcher = HandlerContext(mainHandler, "Main")

Вот хорошая статья помогающая разобраться в тонкостях диспетчеризации в Android:
Understanding Android Core: Looper, Handler, and HandlerThread.

Контекст корутины

Контекст корутины (он же – диспетчер корутины) определяет, в каком потоке будет выполняться ее код, что делать, если будет выброшено исключение, и обращается к родительскому контексту для распространения отмены.

val job = Job()
val exceptionHandler = CoroutineExceptionHandler {
    coroutineContext, throwable -> whatever(throwable)
}

launch(Disaptchers.Default+exceptionHandler+job) { ... }

job.cancel() отменит все корутины, родителем которых является job. A exceptionHandler получит все исключения, выброшенные в этих корутинах.

Область видимости

Интерфейс coroutineScope упрощает обработку ошибок:
Если откажет какая-либо из его дочерних корутин, то откажет и вся область видимости, и все дочерние корутины будут отменены.

В примере async, если извлечь значение не удалось, а другая задача при этом продолжила работу – у нас возникает поврежденное состояние, и с этим надо что-то делать.

При работе с coroutineScope функция useValues будет вызываться лишь в случае, если извлечение обоих значений прошло успешно. Также, если deferred2 откажет, deferred1 будет отменена.

coroutineScope { 
    val deferred1 = async(Dispatchers.Default) { getFirstValue() }
    val deferred2 = async(Dispatchers.IO) { getSecondValue() }
    useValues(deferred1.await(), deferred2.await())
}

Также можно “поместить в область видимости” целый класс, чтобы задать для него контекст CoroutineContext по умолчанию и использовать его.

Пример класса, реализующего интерфейс CoroutineScope:

open class ScopedViewModel : ViewModel(), CoroutineScope {
    protected val job = Job()
    override val coroutineContext = Dispatchers.Main+job

    override fun onCleared() {
        super.onCleared()
        job.cancel()
    }
}

Запуск корутин в CoroutineScope:

Диспетчер launch или async, задаваемый по умолчанию, теперь становится диспетчером актуальной области видимости.

launch {
    val foo = withContext(Dispatchers.IO) { … }
    // лямбда-выражение выполняется в контексте CoroutineContext области видимости 
    …
}

launch(Dispatchers.Default) {
    // лямбда-выражение выполняется в задаваемом по умолчанию пуле потоков
    …
}

Автономный запуск корутины (вне какого-либо CoroutineScope):

GlobalScope.launch(Dispatchers.Main) {
    // лямбда-выражение выполняется в главном потоке.
    …
}

Можно даже определить область видимости для приложения, задав диспетчер Main по умолчанию:

object AppScope : CoroutineScope by GlobalScope {
    override val coroutineContext = Dispatchers.Main.immediate
}

Замечания

  • Корутины ограничивают интероперабельность с Java
  • Ограничивают изменяемость во избежание блокировок
  • Корутины предназначены для ожидания, а не для организации потоков
  • Избегайте I/O в Dispatchers.DefaultMain…) — для этого предназначен Dispatchers.IO
  • Потоки ресурсозатратны, поэтому используются однопоточные контексты
  • Dispatchers.Default основан на ForkJoinPool, появившемся в Android 5+
  • Корутины можно использовать посредством каналов

Избавляемся от блокировок и обратных вызовов при помощи каналов

Определение канала из документации JetBrains:

Канал Channel концептуально очень похож на BlockingQueue. Ключевое отличие заключается в том, что он не блокирует операцию put, он предусматривает приостанавливающий send (или неблокирующий offer), а вместо блокирования операции take предусматривает приостанавливающий receive.

Акторы

Рассмотрим простой инструмент для работы с каналами: Actor.

Actor, опять же, очень похож на Handler: мы определяем контекст корутины (то, есть, поток, в котором собираемся выполнять действия) и работаем с ним в последовательном порядке.

Разница, конечно же, заключается в том, что здесь используются корутины; можно указать мощность, а выполняемый код – приостановить.

В принципе, actor будет переадресовывать любую команду каналу корутины. Он гарантирует выполнение команды и ограничивает операции в ее контексте. Такой подход отлично помогает избавиться от вызовов synchronize и держать все потоки свободными!

protected val updateActor by lazy {
    actor<Update>(capacity = Channel.UNLIMITED) {
        for (update in channel) when (update) {
            Refresh -> updateList()
            is Filter -> filter.filter(update.query)
            is MediaUpdate -> updateItems(update.mediaList as List<T>)
            is MediaAddition -> addMedia(update.media as T)
            is MediaListAddition -> addMedia(update.mediaList as List<T>)
            is MediaRemoval -> removeMedia(update.media as T)
        }
    }
}
// используем
fun filter(query: String?) = updateActor.offer(Filter(query))
// или
suspend fun filter(query: String?) = updateActor.send(Filter(query))

В данном примере мы пользуемся запечатанными классами Kotlin, выбирая, какое именно действие выполнить.

sealed class Update
object Refresh : Update()
class Filter(val query: String?) : Update()
class MediaAddition(val media: Media) : Update()

Причем, все эти действия будут поставлены в очередь, параллельно выполняться они никогда не будут. Это удобный способ добиться ограничения изменяемости.

Жизненный цикл Android + корутины

Акторы могут очень пригодиться и для управления пользовательским интерфейсом Android, упрощают отмену задач и предотвращают перегрузку главного потока.
Давайте это реализуем и вызовем job.cancel() при уничтожении активности.

class MyActivity : AppCompatActivity(), CoroutineScope {
    protected val job = SupervisorJob() // экземпляр Job для данной активности
    override val coroutineContext = Dispatchers.Main.immediate+job


    override fun onDestroy() {
        super.onDestroy()
        job.cancel() // отмена задачи при уничтожении активности
    }
}

Класс SupervisorJob похож на обычный Job с тем единственным исключением, что отмена распространяется только в нисходящем направлении.

Поэтому мы не отменяем всех корутин в Activity, когда одна из них отказывает.

Чуть лучше дела обстоят с функцией расширения, позволяющей открыть доступ к этому CoroutineContext из любого View в CoroutineScope.

val View.coroutineContext: CoroutineContext?
    get() = (context as? CoroutineScope)?.coroutineContext

Теперь мы можем все это скомбинировать, функция setOnClick создает объединенный actor для управления ее действиями onClick. В случае множественных нажатий промежуточные действия будут игнорироваться, исключая таким образом ошибки ANR (приложение не отвечает), и эти действия будут выполняться в области видимости Activity. Поэтому при уничтожении активности все это будет отменено.

fun View.setOnClick(action: suspend () -> Unit) {
    // запускаем один актор в качестве родителя задачи контекста
    val scope = (context as? CoroutineScope)?: AppScope
    val eventActor = scope.actor<Unit>(capacity = Channel.CONFLATED) {
        for (event in channel) action()
    }
    // устанавливаем слушатель для активации этого актора
    setOnClickListener { eventActor.offer(Unit) }
}

В данном примере мы задаем для Channel значение Conflated, чтобы он игнорировал часть событий, если их будет слишком много. Можно заменить его на Channel.UNLIMITED, если вы предпочитаете ставить события в очередь, не теряя ни одного из них, но все равно хотите защитить приложение от ошибки ANR.

Также можно комбинировать корутины и фреймворки Lifecycle, чтобы автоматизировать отмену задач, связанных с пользовательским интерфейсом:

val LifecycleOwner.untilDestroy: Job get() {
    val job = Job()

    lifecycle.addObserver(object: LifecycleObserver {
        @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
        fun onDestroy() { job.cancel() }
    })

    return job
}
// использование
GlobalScope.launch(Dispatchers.Main, parent = untilDestroy) {
    /* здесь происходят удивительные вещи! */
}

Упрощаем ситуацию с обратными вызовами (часть 1)

Вот как можно преобразить использование API, основанного на обратных вызовах, благодаря Channel.

API работает вот так:

  1. requestBrowsing(url, listener) инициирует синтаксический разбор папки, расположенной по адресу url.
  2. Слушатель listener получает onMediaAdded(media: Media) для любого медиа-файла, обнаруженного в этой папке.
  3. listener.onBrowseEnd() вызывается по завершении синтаксического разбора папки

Вот старая функция refresh в поставщике контента для обозревателя VLC:

private val refreshList = mutableListOf<Media>()

fun refresh() = requestBrowsing(url, refreshListener)

private val refreshListener = object : EventListener{
    override fun onMediaAdded(media: Media) {
        refreshList.add(media))
    }
    override fun onBrowseEnd() {
        val list = refreshList.toMutableList()
        refreshList.clear()
        launch {
            dataset.value = list
            parseSubDirectories()
        }
    }
}

Как это улучшить?

Создаем канал, который будет запускаться в refresh. Теперь обратные вызовы обозревателя будут лишь направлять медиа в этот канал, а затем закрывать его.

Теперь функция refresh стала понятнее. Она создает канал, вызывает обозреватель VLC, затем формирует список медиа-файлов и обрабатывает его.

Вместо функций select или consumeEach можно использовать for для ожидания медиа, и этот цикл будет разрываться, как только канал browserChannel закроется.

private lateinit var browserChannel : Channel<Media>

override fun onMediaAdded(media: Media) {
    browserChannel.offer(media)
}

override fun onBrowseEnd() {
    browserChannel.close()
}

suspend fun refresh() {
    browserChannel = Channel(Channel.UNLIMITED)
    val refreshList = mutableListOf<Media>()
    requestBrowsing(url)
    // Приостанавливается на каждой итерации в ожидании медиа
    for (media in browserChannel) refreshList.add(media)
    // Канал закрыт
    dataset.value = refreshList
    parseSubDirectories()
}

Упрощаем ситуацию с обратными вызовами (часть 2): Retrofit

Второй подход: мы вообще не используем корутины kotlinx, зато применяем корутинный core-фреймворк.

Смотрите, как на самом деле работают корутины!

Функция retrofitSuspendCall оборачивает запрос на вызов Retrofit Call, чтобы сделать из него функцию suspend.

При помощи suspendCoroutine мы вызываем метод Call.enqueue и приостанавливаем корутину. Предоставленный таким образом обратный вызов обратится к continuation.resume(response), чтобы возобновить корутину откликом от сервера, как только тот будет получен.

Далее нам остается просто объединить наши функции Retrofit в retrofitSuspendCall, чтобы с их помощью возвращать результаты запросов.

suspend inline fun  <reified T> retrofitSuspendCall(request: () -> Call <T>
) : Response <T> = suspendCoroutine { continuation ->
    request.invoke().enqueue(object : Callback<T> {
        override fun onResponse(call: Call<T>, response: Response<T>) {
            continuation.resume(response)
        }
        override fun onFailure(call: Call<T>, t: Throwable) {
            continuation.resumeWithException(t)
        }
    })
}

suspend fun browse(path: String?) = retrofitSuspendCall {
    ApiClient.browse(path)
}

// использование (в контексте корутины Main)
livedata.value = Repo.browse(path)

Таким образом, вызов, блокирующий сеть, делается в выделенном потоке Retrofit, корутина находится здесь, ожидая отклика от сервера, а использовать ее в приложении – проще некуда!

Такая реализация вдохновлена библиотекой gildor/kotlin-coroutines-retrofit.

Также имеется JakeWharton/retrofit2-kotlin-coroutines-adapter с другой реализацией, дающей аналогичный результат.

Эпилог

Channel можно использовать и многими другими способами; посмотрите в BroadcastChannel более мощные реализации, которые могут вам пригодиться.
Также можно создавать каналы при помощи функции Produce.

Наконец, при помощи каналов удобно организовать коммуникацию между компонентами UI: адаптер может передавать события нажатий в свой фрагмент/активность через Channel или, например, через Actor.

Автор: ph_piter

Источник


* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js