ReactiveX 2.0 с примерами, или грокаем реактивное программирование 2.0. Часть 1: Observable vs Flowable, Backpressure

в 13:41, , рубрики: backpressure, java, reactiveX, rxandroid, rxjava, rxjava2, rxkotlin, Разработка под android

image

Привет, меня зовут Алексей. Я пишу Backend'ы на Kotlin, а также занимаюсь разработкой Android приложений. Продолжительное время я страдал: мучался с CallBack Hell'ом, императивным стилем, синхронизацией потоков и прочими классическими проблемами Java на Android. Это была огромная боль. И я начал искать какое-то решение, чтобы как-то избавится от этой боли. И приходит счастливый случай — я встречаю нарастающий хайп по RxJava. Попробовав, не могу остановиться по сей день. На момент написания данной статьи, релизнулась RxJava 2.0 и появилось стойкое желание разобраться в нововведениях. На официальном источнике, в Github Wiki, появилась глава RxJava 2.0: What's different in 2.0. Но, к сожалению, я не обладатель "беглого" английского, и чтение настолько важной доки заняло время. Накопились некоторые заметки, появился концепт, которым я хочу поделиться. Но чтобы не стать "Арт-директором Арт-пространства" и не принести банальный перевод, а какой-то профит, данная статья будет смесь туториала и перевода вики, приправленное реальными примерами юзкейсов на RxKotlin.

Так как подходы к разработке Web и Android приложений отличаются, как соответственно и контекст использования Rx, разговор буду вести в контексте Android разработки. Кого заинтересовал, прошу под кат.

Краткое введение

Прежде чем начинать, я бы посоветовал прочитать шикарный цикл статей

И еще мне понравилась вот эта статья, которую также рекомендую к прочтению.

И предлагаю ознакомится с такими терминами, как:

Producer(Производитель) — источник создаваемых значений для ваших Observable/Flowable. Это может быть веб-сокет, это могут быть XML/JSON-данные, полученные например из сети. Итерируемая коллекция также может быть производителем. В сущности это все, что находится в data layer концепции Clean Architecture(Чистая архитектура), все что порождает какие-то данные и может передаться в метод observer.next(value)

Hot Observable(Горячий поток) — в том кейсе, когда производитель находится снаружи вашего Observable/Flowable, поток является горячим

@inject Repository<Model> repository;

var hot = Observable.create(emmiter -> emitter.onNext(repository.requestModel()));

Cold Observable(Холодный поток) — в том кейсе, когда производитель находится внутри вашего Observable/Flowable, поток является холодным

var cold = Observable.create(emmiter -> {
    Repository<Model> repository = new Repository();
    emitter.onNext(repository.requestModel())
});

На заметку: я называю Hot/Cold Observable Горячими/Холодными потоками именно так потому, что любой Observable изнутри представляет с собой поток, который при помощи операторов оборачивается другими потоками. И принципиальной разницы для выражения и перевода я не вижу. Возможно я не прав, но вижу это так. Напишите в комментариях свою точку зрения, возможно я исправлю на вариант получше. Заранее, спасибо!

Для наиболее детального разбора, чем отличаются горячие Observable/Flowable от холодных, я готовлю отдельную статью. Пожалуйста потерпите)

RxJava 2.0 был полностью переписан с нуля поверх спецификации Reactive Streams. Сама спецификация эволюционировала из RxJava 1.x и обеспечивает общую базовую линию для реактивных систем и библиотек.

Поскольку Reactive-Streams имеет другую архитектуру, он предусматривает изменения некоторых известных типов RxJava. За несколько статей я попытаюсь обобщить то, что изменилось, и рассказать, как переписать код 1.x в код 2.x.

NullPointerException

Да-да, RxJava 2.x больше не приемлет значения NULL, и следующий код приведет к немедленному получению NullPointerException, или Emitter(порождающий поток) выбросит onError-событие.

class NullPointerActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        var obsJust: TextView? = null
        var singleJust: TextView? = null
        var callable: TextView? = null
        var nullMaping: TextView? = null

        verticalLayout {
            obsJust = textView()
            singleJust = textView()
            callable = textView()
            nullMaping = textView()
        }

        try {
            Observable.just(null).subscribe()
        } catch (e: Exception) {
            obsJust?.text = e.localizedMessage
            e.printStackTrace()
        }

        try {
            Single.just(null).subscribe()
        } catch (e: Exception) {
            singleJust?.text = e.localizedMessage
            e.printStackTrace()
        }

        Observable.fromCallable{null}.subscribe({Log.d("NullPointerActivity", it)}, {
            callable?.text = it.localizedMessage
            it.printStackTrace()
        })

        Observable.just(1).map{null}.subscribe({Log.d("NullPointerActivity", it)}, {
            nullMaping?.text = it.localizedMessage
            it.printStackTrace()
        })

    }

}

Это означает, что Observable<Void> больше не может порождать какие-либо события, а поток, горячий ли, холодный, может только завершиться событием onComplete или onError. Ну а в некоторых случаях, он может просто выкинуть Exception еще до оборачивания значения в поток, как это происходит с Single.just(null) и Observable.just(null)

При проектировании/реализации кастомных Observable вам не обязательно определять конкретные типы для ObservableEmitter<Any>, который пришел на смену Observable.OnSubscribe<Any>. Например, если вам нужен источник, похожий на signaller, можно определить Enum и прокидывать его синглтон на onNext:

enum class Irrelevant { INSTANCE; }

val source = Observable.create<Any> {emitter ->
            Log.d(TAG, "Side-effect 1")
            emitter.onNext(Irrelevant.INSTANCE)

            Log.d(TAG, "Side-effect 2")
            emitter.onNext(Irrelevant.INSTANCE)

            Log.d(TAG, "Side-effect 3")
            emitter.onNext(Irrelevant.INSTANCE)
        }

source.subscribe({Log.d(TAG, it.toString())}, Throwable::printStackTrace)

И в результате, в лог запишется то, что и ожидалось:

D/NullPointerActivity: Side-effect 1
D/NullPointerActivity: INSTANCE
D/NullPointerActivity: Side-effect 2
D/NullPointerActivity: INSTANCE
D/NullPointerActivity: Side-effect 3
D/NullPointerActivity: INSTANCE

Observable vs. Flowable

К сожалению, до Rx 2.0 внедрение Backpressure было произведено непосредственно в Observable, вместо того, чтобы выделить отдельный класс с его поддержкой. Основная проблема с Backpressure заключается в том, что многие горячие Observable, не могут быть достаточно надежными, и при определенных обстоятельствах могут вызвать неожиданный MissingBackpressureException. А без определенного наработанного опыта очень сложно предсказать такое исключение.

В Rx 2.0 исправлена эта ситуация. Observable — класс без покрытия backpressure, а новый Flowable был наделен backpressure из коробки. Далее мы рассмотрим вариации, где и в каком кейсе использовать Flowable, а где Observable.

Так какой когда юзать?

Этот вопрос, пожалуй, самый логичный. При реализации ваших классов-репозиториев, классов бизнес-логики, при принятии решения о том, какой тип, Observable или Flowable, следует принимать и возвращать, вам стоит рассмотреть несколько факторов, которые помогут вам избежать проблем в получении исключений таких, как MissingBackpressureException или OutOfMemoryError, т.к. неаккуратное использование неправильного типа ведет к падению fps на перфомансе.

Когда юзать Observable

В тех кейсах, когда у вас в итерации, условно говоря, не более 1000 элементов, при этом это самый худший случай и не предполагается масштабирования, вам не требуется backpressure. В целом, можно сказать, что, если вы чувствуете, что в каком-то определенном кейсе нет шансов на OutOfMemoryException, то это ровно тот случай, когда можно и нужно использовать Observable. В основном, это случаи использования UI, самые разнообразные события onClick, Touch, Pointer-movement, и.т.п. По сути, это любые события, частота которых не превышает 1000 Hz. Согласитесь, врядли вы сможете нажимать на тачскрин чаще 1000 раз в секунду. Но все равно не забывайте по оператор debounce.

class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        RxView.clicks(backpressure)
                .debounce(200, TimeUnit.MILLISECONDS)
                .subscribe ({ openNewScreen(BackpressureExampleActivity::class.java) }, Throwable::printStackTrace)

        RxView.clicks(nullpointer)
                .debounce(200, TimeUnit.MILLISECONDS)
                .subscribe ({ openNewScreen(NullPointerActivity::class.java) }, Throwable::printStackTrace)
    }
}

RxBinding я думаю всем известно, что библиотека Jake Wharton'а возвращает Observable при биндинге View, и это полностью оправдывает концепцию Rx2.x

Когда юзать Flowable?

Когда вы имеете дело уже с большими или непредсказуемыми объемами данных, скажем свыше 10000 элементов, или в ситуациях с непрерывной генерацией данных. Также поводом для использования Flowable является парсинг, чтение данных с разнообразных носителей информации(Internal/External storage).

Чтение из базы данных при помощи SQLiteOpenHelper тоже является предлогом для использования Flowable. Поэтому, если вы используете библиотеки https://github.com/square/sqlbrite и https://github.com/pushtorefresh/storio, не будет лишним приводить Observable к Flowable. Выполнение запросов к бэкенду — еще одна причина для использования Flowable.

Если присмотреться, можно заметить общую деталь у всех перечисленных кейсов — они блокируют UI-поток. Соответственно, если вы выполняете какие-то операции блокирующие MainThread — это повод использовать Flowable.

Ну и напоследок. Многие блокирующие и/или pull-based источники данных, от которых в конечном итоге можно получить неблокирующее реактивное API(Retrofit) также являются основанием для использования Flowable.

Поподробнее о Backpressure

Backpressure(Обратное давление) — явление, которое можно встретить в порождающем потоке, где некоторые асинхронные операции не могут обрабатывать значения достаточно быстро и нуждаются в замедлении работы производителя.

Классический кейс необходимости использования backpressure, когда порождающий поток является горячим:

class BackpressureExampleActivity : AppCompatActivity() {

    private val n = 10000000

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_backpressure)

        val source = PublishProcessor.create<Int>()

        source
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete)

        for(i in 0..n) {
            source.onNext(i)
            if(i == n) {
                source.onComplete()
            }
        }

    }

    private fun addToIntListAdapter(number: Int?) {
        Log.d("number", number.toString())

        // do something
    }

    private fun onComplete() {
        textView?.text = "completed"
    }

}

PublishProcessor — это несколько иная форма привычных нам Observable. Как вы можете видеть, он обладает более императивной формой.

В этом примере основной поток будет выдавать N элементов. Представим, что метод addToIntListAdapter(int number) добавляет следующий приходящий элемент в адаптер, который прикреплен к RecyclerView. Это все займет некоторое время, а Overhead на текущий стек запросов может быть выше времени исполнения. Тем не менее, производящий поток с циклом for не может этого знать и продолжает вызывать onNext.

Внутри асинхронных операторов имеются буферы для хранения таких элементов до тех пор, пока они не будут обработаны. В RxJava 1.x эти буферы были неограниченными, а это означает, что они, вероятно, будут содержать все n элементов из примера. Проблема начинается тогда, когда n = 1000000. В классическом представлении это привело бы к OutOfMemoryError или, как правило, к фризам перфоманса из-за чрезмерной ресурсоемкой работы GC и, как следствие, к частому его вызову.

Подобно тому, как обработка ошибок стала обязательной частью Rx и получила операторов для работы с ней (через операторы onErrorReturn, onErrorResumeNext, doOnError), Backpressure — еще одно свойство потоков данных, о которых разработчик должен думать и обрабатывать, непосредственно через операторы onBackpressureBuffer, onBackpressureDrop, onBackpressureLast.

При случае n = 1000000 мы получим:

W/System.err: io.reactivex.exceptions.MissingBackpressureException: Could not emit value due to lack of requests
W/System.err: at io.reactivex.processors.PublishProcessor$PublishSubscription.onNext(PublishProcessor.java:322)
W/System.err: at io.reactivex.processors.PublishProcessor.onNext(PublishProcessor.java:198)
W/System.err: at mobile.geekbit.rx20habrahabrproject.backpressure.BackpressureExampleActivity.onCreate(BackpressureExampleActivity.kt:30)
W/System.err: at android.app.Activity.performCreate(Activity.java:6679)
W/System.err: at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java:1118)
W/System.err: at android.app.ActivityThread.performLaunchActivity(ActivityThread.java:2618)
W/System.err: at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java:2726)
W/System.err: at android.app.ActivityThread.-wrap12(ActivityThread.java)
W/System.err: at android.app.ActivityThread$H.handleMessage(ActivityThread.java:1477)
W/System.err: at android.os.Handler.dispatchMessage(Handler.java:102)
W/System.err: at android.os.Looper.loop(Looper.java:154)
W/System.err: at android.app.ActivityThread.main(ActivityThread.java:6119)
W/System.err: at java.lang.reflect.Method.invoke(Native Method)
W/System.err: at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886)
W/System.err: at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776)

А если мы еще вызовем Thread.sleep(10000) на UI-потоке, то Activity запустится, естественно, с жуткими фризами, вызвав GC 7-8 раз!

Помимо вышеупомянутого PublishProcessor существуют другие операторы, которые не поддерживают backpressure. Например, Observable.interval периодически порождает значения быстрее, чем они могут быть обработаны.

В RxJava 2.x большинство асинхронных операторов теперь имеют ограниченный внутренний буфер, и любая попытка переполнения этого буфера завершает всю последовательность с помощью MissingBackpressureException. В документации есть раздел по Backpressure и операторам, которые его поддеживают http://reactivex.io/documentation/operators/backpressure.html

Однако backpressure встречается и в более неожиданных местах: в обычных холодных потоках, которые не дают и не должны давать MissingBackpressureException. Если взять наш пример выше, и отрефакторить:

class BackpressureExampleActivity : AppCompatActivity() {

    private val n = 10000000

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_backpressure)

        Flowable.range(1, n)
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete)
    }

    private fun addToIntListAdapter(number: Int?) {
        Log.d("number", number.toString())

        // do something
    }

    private fun onComplete() {
        textView?.text = "completed"
    }

}

Все запустится без MissingBackpressureException, perfomance будет работать в стабильные fps, плавно и с адекватным использованием памяти, кол-во которой будет выделено значительно меньше. Причиной этого является то, что многие операторы-производители могут «генерировать» значения по требованию и, таким образом, наблюдать за оператором. Можно сказать, что Flowable.range(0, n) генерирует столько значений, сколько может удержать буфер без переполнения. Давайте все рассмотрим на более конкретном примере:

   Flowable.range(1, n)
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(object : DisposableSubscriber<Int>() {
                    public override fun onStart() {
                        request(1)
                    }

                    override fun onNext(v: Int?) {
                        addToIntListAdapter(v)

                        request(1)
                    }

                    override fun onError(ex: Throwable) {
                        ex.printStackTrace()
                    }

                    override fun onComplete() {
                        onComplete()
                    }
                })

Здесь реализация onStart указывает диапазон для создания своего первого значения, которое затем принимается в onNext. По завершении операции другое значение запрашивается из диапазона. В обычной реализации диапазона такой вызов рекурсивно вызывает onNext, что приводит к StackOverflowError, что, конечно, нежелательно.

Чтобы предотвратить StackOverflowError, операторы используют trampolining(в переводе можно определить термин как батут). Это условно называемая логика, которая предотвращает повторные вызовы. Ее очень часто можно встретить в функциональных языках, таких как Clojure, Scala, и, в частности, в рекурсивных вызовах. В терминах диапазона батут будет помнить, что был вызов request(1), в то время как он вызывал onNext (). И как только onNext () возвращается, он вызовет следующий onNext () со следующим значением. Поэтому, если они меняются местами, пример выше по-прежнему будет работает идентично:

override fun onNext(v: Int?) {
    addToIntListAdapter(v)

    request(1)
}

Однако это не работает для onStart. Хотя Flowable-инфраструктура гарантирует, что она будет вызываться не более одного раза для каждого Подписчика, вызов request (1) может сразу вызвать порождение нового элемента. Если у вас есть логика инициализации после вызова request (1), которая необходима для onNext, у вас могут быть исключения:


class IntMapper {

    private val KOFF = 2

    fun map(int: Int?): Int = int ?: 0 * KOFF

}

Flowable.range(1, n)
.subscribe(object : DisposableSubscriber<Int>() {
    lateinit var mapper: IntMapper

    public override fun onStart() {
        request(1)

        mapper = IntMapper()
    }

    override fun onNext(v: Int?) {
        addToIntListAdapter(mapper.map(v))

        request(1)
    }

    override fun onError(ex: Throwable) {
        ex.printStackTrace()
    }

    override fun onComplete() {
        onComplete()
    }
})

В этом кейсе, у вас возможен выброс NullPointerException. А учитывая особенность Null Safety в Kotlin, мне пришлось постараться, чтобы добиться этого исключения. И прошу учесть одну особенность — я убрал операторы

.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())

потому что с ними, вы не получите NullPointerException, ибо природа этих операторов асинхронна.
И именно в этом синхронном кейсе вы получите незамедлительный NullPointerException сразу, как только произойдет вызов onStart. Более того, эту ошибку сложнее отловить, если вызов request(1) сопровождается асинхронный вызовом onNext в другом потоке, из-за чего происходит Race Condition.

Таким образом, нужно выполнить все инициализирующие операции в onStart или, еще лучше, до него, и запросить request() последним.

Операторы onBackpressureBuffer, onBackpressureDrop, onBackpressureLast

Большинство разработчиков сталкиваются с обратным давлением, когда их приложение крашится с MissingBackpressureException, и логи обычно указывают на оператор observeOn. Фактической же причиной является, как правило, использование PublishProcessor, Observable, а также операторы timer (), interval () или кастомные lift() операторы, а также операции, происходящие в create().

Существует несколько способов решения таких ситуаций, и сейчас мы их рассмотрим.

  1. Увеличение размеров буфера

Иногда такие переполнения происходят из-за потенциально опасных источников. Например, внезапно пользователь слишком быстро тапает на экран и observeOn буфер очень быстро и активно переполняется.

Хотелось бы остановиться на самом буфере. Может возникнуть соответствующий вопрос, что это вообще такое и как он задействован в механизме Rx? Все, на самом деле, довольно просто. Начнем с того, что размерность буфера по умолчанию рассчитывается как Math.max(1, Integer.getInteger("rx2.buffer-size", 128)), и по моим наблюдениям, этот метод всегда возвращает 128 (капитан очевидность не дремлет). В итоге, мы получаем буфер с размерностью 128… мм… чего? Бит? Байт? Попугаев? Ответ на этот вопрос мы можем получить здесь:

    @CheckReturnValue
    @BackpressureSupport(BackpressureKind.FULL)
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Flowable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new FlowableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

Параметр bufferSize проверяется по модулю и отправляется в инстанс FlowableObserveOn.

    final int prefetch; // это и есть bufferSize

    public FlowableObserveOn(
            Flowable<T> source,
            Scheduler scheduler,
            boolean delayError,
            int prefetch) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.prefetch = prefetch;
    }

Далее в onSubscribe уже видим что-то знакомое.


    @Override
    public void onSubscribe(Subscription s) {
        // какой-то код

        s.request(prefetch);

        // какой-то код      
    }

И оказывается, что мы на самом деле уже использовали буфер, только не подозревали об этом.
request(1) помните? Так вот, вот так на самом деле выглядит интерфейс Subscription

public interface Subscription {
    public void request(long n);
    public void cancel();
}

Где request(long n) ничто иное, как количество элементов для запросов к восходящему потоку.
Итак, мы можем сделать вывод, что буфер — это тот самый request(n). Но как сообщить Flowable, размер его буфера? Об этом ниже.

Большинство backpressure-чувствительных операторов в последних версиях RxJava теперь позволяют разработчикам указывать размер своих внутренних буферов. Соответствующие параметры называются bufferSize, prefetch или capacityHint. Учитывая переполненный пример во введении, мы можем просто увеличить размер буфера для PublishProcessor, чтобы иметь достаточно места для всех значений.

val source = PublishProcessor.create<Int>()

source
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread(), false, 1024)
.subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete)

for(i in 0..n) {
    source.onNext(i)
    if(i == n) {
        source.onComplete()
    }
}

Thread.sleep(10000)

Но обратите внимание, что, в принципе, это является костылем, поскольку переполнение может все же произойти, если производитель порождает больше, чем размер предсказанного буфера. В этом случае можно использовать один из операторов следующего раздела.

  1. Группировка/пропускание/буферизация значений со стандартными операторами

В случае, если исходные данные могут быть обработаны более эффективно в группированом виде, можно уменьшить вероятность исключения MissingBackpressureException с использованием одного из стандартных операторов пакетной обработки (по размеру или по времени).

val source = PublishProcessor.create<Int>()

source
.buffer(1024*1024)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread(), false, 1024)
.flatMap { PublishProcessor.fromIterable(it) }
.subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete)

for(i in 0..n) {
    source.onNext(i)
    if(i == n) {
        source.onComplete()
    }
}

Thread.sleep(10000)

В том случае, когда некоторые из значений можно безопасно игнорировать, можно использовать сэмплинг(оператор sample) и операторы тротлинга (throttleFirst, throttleLast, throttleWithTimeout).

val source = PublishProcessor.create<Int>()

source
.sample(1, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread(), false, 1024)
.subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete)

for(i in 0..n) {
    source.onNext(i)
    if(i == n) {
        source.onComplete()
    }
}

Однако, эти операторы только снижают скорость приема стоимости по нисходящему потоку и, следовательно, могут по-прежнему приводить к исключению MissingBackpressureException.

  1. Оператор onBackpressureBuffer()

Этот оператор в своей базовой форме повторно вводит неограниченный буфер между исходным источником и оператором нисходящего потока. Это означает, что пока не исчерпается память, он может обрабатывать почти любую сумму, поступающую из непрерывно порождающего источника.

Flowable.range(1, n)
             .onBackpressureBuffer()
             .subscribeOn(Schedulers.computation())
             .observeOn(AndroidSchedulers.mainThread())
             .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete)

В этом примере функция observeOn имеет очень низкий размер буфера (напомню, что он 128 по умолчанию), но отсутствует исключение MissingBackpressureException, так как onBackpressureBuffer впитывает все n значений(которых val n = 1000000) и передает на
исполнение пакеты(batch) значений небольшими партиями.
Стоит заметить, что оператор onBackpressureBuffer потребляет порождающий источник неограниченным образом, то есть без применения к нему обратного давления. Это приводит к тому, что даже такие производители, как range() будет полностью исполнены.

Есть также несколько перегруженных форм у оператора onBackpressureBuffer. Я приведу самые необходимые.

onBackpressureBuffer(int capacity)

Это ограниченная версия, которая будет уведомлять о возникновении BufferOverflowException, при случае достижении буфером заданной емкости.

Flowable.range(1, n)
             .onBackpressureBuffer(16)
             .subscribeOn(Schedulers.computation())
             .observeOn(AndroidSchedulers.mainThread())
             .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete)

Адекватность этого оператора уменьшается, поскольку все больше и больше операторов теперь позволяют устанавливать размеры своих буферов. В остальном это дает возможность расширить свой внутренний буфер.

onBackpressureBuffer(int capacity, Action onOverflow)

Этот перегруженный метод вызывает callback в случае переполнения. Сам Action не представляет
из себя ничего необычного

public interface Action {
    void run() throws Exception;
}

Но стоит отметить, что его полезность довольно ограничена, поскольку нет другой информации о переполнении, чем текущий стек вызовов.

onBackpressureBuffer(int capacity, Action0 onOverflow, BackpressureOverflowStrategy strategy)

А вот этот вариант уже действительно кое-что интересное. Его полезность несколько выше, так как позволяет определить, что делать, если емкость была достигнута. BackpressureOverflowStrategy — это enum, который предлагает 3 статических поля с реализациями, представляющими типичные действия при:

ERROR — это дефолтное поведение при всех предыдущих перегрузках, выбрасывающее BufferOverflowException.

DROP_OLDES — по сути, это означает обновление буфера, при его переполнении. Все старые значения выбрасываются и буфер наполняется новым стеком

DROP_LATEST — немного иное поведение, отличающееся тем, что, если произойдет переполнение, текущее значение будет просто проигнорировано, и только старые значения будут доставлены после запросов нисходящего потока.

Flowable.range(1, n)
    .onBackpressureBuffer(1024, { Toast.makeText(baseContext, BufferOverflowException::class.simpleName, Toast.LENGTH_SHORT).show() }, BackpressureOverflowStrategy.DROP_LATEST)
    .subscribeOn(Schedulers.computation())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete)

Каждая стратегия ситуативна, но необходимо помнить, что последние две стратегии вызывают прерывание потока по мере удаления элементов. Кроме того, они не будут уведомлять вас об исключении BufferOverflowException.

Есть одна особенность в использовании BackpressureOverflowStrategy. Вы можете использовать специальные операторы onBackpressureDrop() и onBackpressureLatest(). Это сделает ваш код более лаконичным.

Flowable.range(1, n)
    .onBackpressureDrop()
    .subscribeOn(Schedulers.computation())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete)

Flowable.range(1, n)
    .onBackpressureLatest()
    .subscribeOn(Schedulers.computation())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete)

Подводя итоги, я хотел бы отметить, что к backpressure стоит относиться как к непредсказуемым результатам от определенных действий. Вы никогда не сможете объективно и адекватно предсказать, сколько данных придет с бэкенда, как быстро будет масштабироваться ваше приложение, и.т.д и.т.п. Нет ничего сложного в том, чтобы добавить оператор onBackpressureи быть уверенным в том, что с вашим приложением не случится переполнения буфера.

Я не рассказал в этой статье о backpressured datasources, но на это есть объективная причина. О ней расскажу в следующей части.

Все исходники доступны на гитхабе https://github.com/scrobot/Rx2.0habrahabrproject

Во второй части статьи мы поговорим о

  • Single
  • Completable-типах
  • Maybe-типах
  • Основных реактивных интерфейсах
  • Subjects и Processors
  • Функциональные interfaces
  • Subscriber
  • Subscription
  • Backpressured datasources

Автор: Scrobot

Источник

Поделиться

* - обязательные к заполнению поля