Корни RxJava — о чем мы не подозревали

в 14:40, , рубрики: android, android development, Android sdk, java, kotlin, observable, Observer, retrofit2, room, rxjava, разработка мобильных приложений, Разработка под android

Всем привет, меня зовут Руслан, я Head of mobile development в одной международной компании. В нашей производственной практике достаточно много проектов используют для упрощенной работы с асинхронщиной фреймворк RxJava.

Обычно изучение RxJava в большинстве статей или онлайн-школ начинается со слов «Жил был Observable/Single/Flowable и мы решили на него подписаться».

После всего этого, как правило идёт пару слов про операторы, усиленный разбор отличий map от flatMap, concatMap, switchMap (мне сразу вспоминается среднестатистическое собеседование в какой-нибудь компании). Дальше идет что-то не очень внятное и совсем теоретическое про горячие источники и на этом всё. 

В реальности, начинающий Android разработчик либо начал с coroutines и flow, либо шлёпает RxJava цепочки по одному и тому же алгоритму:

auth(credentials)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({ response ->
        Log.d("RESPONSE", response.toString())           
    }, { throwable ->
        Log.d("ERROR", throwable.localizedMessage)
    })

Красота да? У нас есть цепочка, которая что-то получает от бэкэнда, даже работает! Но, в действительности мы даже не представляем как она работает.

Начитавшись умных статей о том, что RxJava построена на основе паттерну Observer мы думаем - Ну вот метод auth(), это издатель, а subscribe это подписчик, subscribeOn - устанавливает стратегию на каком пуле потоков будет работать издатель, а observeOn - определяет на каком пуле потоков будет получать данные наш подписчик, которого мы бережно поместили внутрь метода subscribe.

На этом можно было бы заканчивать статью, но увы, не всё так, как кажется на самом деле. Нет, метод auth(), это действительно издатель, а subscribe - подписчик, с одной лишь оговоркой, ПОДПИСЧИК ЗАМЫКАЮЩИЙ ЦЕПОЧКУ (ну т.е. Вызов метода subscribe вернет некий Disposable). Отсюда назревает резонный вопрос, а что бывают какие-то ещё подписчики? Представляете, бывают!

Вот, век живи, век учись, каждый раз работая на проектах компании, где есть RxJava, я открываю для себя её по новому, бесконечный ящик пандоры. Окей, давайте ближе к сути.

Из курсов нам говорят, каждый оператор возвращает нам новый экземпляр источника с видоизмененными данными (если мы применяем какие-то операторы трансформации, комбинации, сортировки и т.д.), но нам забыли упомянуть одну важную вещь…

Каждый оператор это источник, внутри которого есть свой подписчик! Прикиньте? Чтоб в этом убедиться, давайте рассмотрим реализацию функции take под капотом:

//Original source from RxJava3 library
public final class ObservableTake<T> extends AbstractObservableWithUpstream<T, T> {
    final long limit;
    public ObservableTake(ObservableSource<T> source, long limit) {
        super(source);
        this.limit = limit;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        source.subscribe(new TakeObserver<>(observer, limit));
    }

    static final class TakeObserver<T> implements Observer<T>, Disposable {
        final Observer<? super T> downstream;

        boolean done;

        Disposable upstream;

        long remaining;
        TakeObserver(Observer<? super T> actual, long limit) {
            this.downstream = actual;
            this.remaining = limit;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                upstream = d;
                if (remaining == 0) {
                    done = true;
                    d.dispose();
                    EmptyDisposable.complete(downstream);
                } else {
                    downstream.onSubscribe(this);
                }
            }
        }

        @Override
        public void onNext(T t) {
            if (!done && remaining-- > 0) {
                boolean stop = remaining == 0;
                downstream.onNext(t);
                if (stop) {
                    onComplete();
                }
            }
        }

        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }

            done = true;
            upstream.dispose();
            downstream.onError(t);
        }

        @Override
        public void onComplete() {
            if (!done) {
                done = true;
                upstream.dispose();
                downstream.onComplete();
            }
        }

        @Override
        public void dispose() {
            upstream.dispose();
        }

        @Override
        public boolean isDisposed() {
            return upstream.isDisposed();
        }
    }
}

Шок, правда? Т.е. У нас каждый оператор подписывается друг на друга в цепочке и к примеру наличие doOnTerminate{  exitProcess(0) }  будет давать разный результат в зависимости от его местоположения в цепочке:

Single.just(1)
    .subscribeOn(Schedulers.newThread())
    .doOnSuccess { logger.warning("First Single on: "+Thread.currentThread().name) }
    .observeOn(Schedulers.io())
    .doOnTerminate { exitProcess(0) }
    .doOnError { throwable -> logger.warning(throwable.localizedMessage) }
    .subscribe(
        { logger.warning("Root subscribe(): "+Thread.currentThread().name) },
        { throwable -> logger.warning(throwable.localizedMessage) }
    )

OUTPUT:

WARNING: Current thread: RxNewThreadScheduler-1

WARNING: Current thread after observeOn: RxCachedThreadScheduler-1

Вопрос - а где лог с Root subscribe(): "+Thread.currentThread().name - Это нормальное поведение, у нас ведь выполняется метод

doOnTerminate { exitProcess(0) }

который завершает программу, просто он выполняется по очереди со всеми операторами, а не когда корневая цепочка завершит своё выполнение. Убедиться в этом можно, если переставить его в самое начало Rx - цепочки, после выполнения такого алгоритма вы не увидите никаких логов, программа завершится до их появления.

Теперь, держите эту информацию в уме, потому что дальше начнутся странные странности, которые без понимания вот этого материала не объяснить.

Всё было бы так просто, если бы не было так сложно. Я приведу пример:

Single.just(1)
    .subscribeOn(Schedulers.newThread())
    .doOnSuccess { logger.warning("Current thread: "+Thread.currentThread().name) }
    .observeOn(Schedulers.computation())
    .doOnSuccess { logger.warning("Current thread after observeOn: "+Thread.currentThread().name) }
    .subscribeOn(Schedulers.io())
    .doOnError { throwable -> logger.warning(throwable.localizedMessage) }
    .subscribe(
        { logger.warning("Root subscribe(): "+Thread.currentThread().name) },
        { throwable -> logger.warning(throwable.localizedMessage) }
    )

OUTPUT:

WARNING: Current thread: RxNewThreadScheduler-1

WARNING: Current thread after observeOn: RxComputationThreadPool-1

WARNING: Root subscribe(): RxComputationThreadPool-1

Вполне реальная ситуация, которая может вызвать ступор после радужного subscribeOn.observeOn. Благо, в документации на гите RxJava об этом написано. Пишут, что нельзя больше одного раза в корневой цепочке вызвать subscribeOn, а вот observeOn можно вызывать сколько угодно. Правило да правило, вот и живи теперь с этим. Ладно, на самом деле subscribeOn можно вызвать сколько угодно раз, но во второстепенных цепочках, которые к примеру вызываются внутри оператора flatMap, но поведение в корневой цепочке будет максимально неожиданным:

Single.just(1)
    .subscribeOn(Schedulers.newThread())
    .doOnSuccess { logger.warning("Current thread: "+Thread.currentThread().name) }
    .observeOn(Schedulers.computation())
    .doOnSuccess { logger.warning("Current thread after observeOn: "+Thread.currentThread().name) }
    .flatMap {
        Single.just(2).subscribeOn(Schedulers.io())
    }
    .doOnError { throwable -> logger.warning(throwable.localizedMessage) }
    .subscribe(
        { logger.warning("Root subscribe(): "+Thread.currentThread().name) },
        { throwable -> logger.warning(throwable.localizedMessage) }
    )

OUTPUT:

WARNING: Current thread: RxNewThreadScheduler-1

WARNING: Current thread after observeOn: RxComputationThreadPool-1

WARNING: Root subscribe(): RxCachedThreadScheduler-1

Оказывается, в RxJava есть два ключевых понятия, характеризующих порядок работы цепочки - upstream и downstream.

Ниже на скрине я нарисую что такое upstream и downstream:

Корни RxJava — о чем мы не подозревали - 1

Смысл этих двух терминов в том, что подписка происходит вверх по течению upstream, а выброс данных вниз по течению downstream. Давайте заглянем под капот функции subscribeOn, интересно же, почему в случае без flatMap у нас поток не переключился второй раз через subscribeOn на IO пулл потоков:

Корни RxJava — о чем мы не подозревали - 2

Вот это поворот! Оказывается внутри функции subscribeOn мы делаем replace передаваемого экземпляра пула потоков и этот replace работает снизу вверх, проходя по КОРНЕВОЙ цепочке, тот вызов subscribeOn который будет самым первым сверху, тот и установит реальный последний примененный пул потоков выполнения, не зря же он называется subscribeOn, при подписке, upstream. Интересно, а что же тогда происходит с observeOn, почему его можно вызвать много раз? Всё просто, у observeOn под капотом тот же replace, но только сверху вниз (downstream), именно по этому он сменится столько раз, сколько мы захотим.

Вы ещё держите в уме, что операторы друг на друга подписываются? Теперь сможете ответить на вопрос, почему subscribeOn во второстепенной цепи меняет поведение корневой? Думаю очевидно.

Если вам понравилась моя статья, подписывайтесь на мой телеграм-канал.

Автор: Руслан Иваныкин

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js