- PVSM.RU - https://www.pvsm.ru -

RxJava. Убираем магию

Я долго боялся использовать RxJava в production. Её назначение и принцип работы оставались для меня загадкой. Чтение исходного кода не добавляло ясности, а статьи только путали. Под катом попытка ответить на вопросы: «Какие задачи эта технология решает лучше аналогов?» и «Как это работает?» с помощью аналогий с классической Java и простых метафор.

image

Применение

RxJava отлично заменяет Streams API из Java 8 на более ранних версиях Java. Так как Android Java 8 поддерживается далеко не с 4.0, Rx будет оптимальным решением. В статье RxJava рассматривается именно с этого ракурса, так как, по-моему, он наиболее понятный и по-настоящему реактивное приложение под Android с помощью чистой Rx реализовать сложно.

Emitter

Всем нам знаком паттерн Iterator.

interface Iterator<T> {
    T next();
    boolean hasNext();
}

За интерфейсом скрывается какой-нибудь источник данных, причём совершенно не важно, какой. Iterator полностью скрывает все детали реализации, предоставляя всего два метода:

next — получить следующий элемент
hasNext — узнать, есть ли ещё данные в источнике

У этого паттерна есть одна особенность: потребитель запрашивает данные и ждёт («зависает»), пока источник не выдаст их. Поэтому в качестве источника обычно выступает конечная, часто заранее сформированная коллекция.

Проведём небольшой рефакторинг.

interface Iterator<T> {
    T getNext();
    boolean isComplete();
}

Думаю, вы уже поняли, к чему я. Интерфейс Emitter из RxJava (для потребителей он дублируется в Observer (Subscriber в RxJava 1)):

interface Emitter<T> {
    void onNext(T value);
    void onComplete();
    void onError(Throwable error);
}

Он похож на Iterator, но работает в обратную сторону: источник сообщает потребителю о том, что появились новые данные.

Это позволяет разрешить все проблемы с многопоточностью на стороне источника и, например, если вы проектируете UI, то вы сможете рассчитывать на то, что весь код, отвечающий за графический интерфейс — последовательный. Невероятно удобно. Прощайте, каллбэки! Скучать не буду.

Аналогия с Iterator взята из [1] [1]

Sources

Теперь немного о самих источниках. Они бывают множества типов: Observable, Single, Maybe… И все они похожи на капусту (и монады, но это не так важно).

image

Потому что создав один источник, можно заворачивать его в другой источник, который можно ещё раз завернуть в ещё один источник и так до OutOfMemory. (Но так как обычный источник весит меньше 100 байт, скорее, пока заряд не кончится.)

Давайте завернём в источник ответ на тот самый вопрос.

Observable.just(42)

Как мы знаем, получение ответа — довольно долгая операция. Поэтому завернём в источник, который выполнит вычисления в специальном потоке:

Observable.just(42)
                .subscribeOn(computation())

А ещё мы хотим, чтобы приложение не упало при ответе. Заворачиваем в источник, который вернёт ответ в главном потоке:

Observable.just(42)
                .subscribeOn(computation())
                .observeOn(mainThread())

И, наконец, запускаем:

Observable.just(42)
                .subscribeOn(computation())
                .observeOn(mainThread())
                .subscribe(new DisposableObserver<Integer>() {
                    @Override
                    public void onNext(Integer answer) {
                        System.out.print(answer);
                    }
                    @Override public void onComplete() {}
                    @Override public void onError(Throwable e) {}
                });

В консоль вывелся ответ, но что же произошло?

Метод subscribe определён в Observable. Он делает проверки и подготовку, а затем вызывает метод subscribeActual, который уже по-разному определён для разных источников.

В нашем случае метод subscribe вызвал метод subscribeActual у ObservableObserveOn, который вызывает метод subscribe завёрнутого в него источника, уточнив, в какой поток нужно вернуть результат.

В ObservableObserveOn лежит ObservableSubscribeOn. Его subscribeActual запускает subscribe завёрнутого в заданном потоке.

И, наконец, в ObservableSubscribeOn завёрнут ObservableJust, который просто выдаёт в onNext своё значение.

Естественно, просто с числом не интересно. Поэтому вот источник, который получает список товаров и узнаёт для них цены. Цены можно получать только по 20 штук (у InAppBilling API такое же ограничение).

github.com/a-dminator/rx-products-and-prices [2]

Этот пример создан для демонстрации принципа работы, а не для использования в реальных проектах.

В RxJava огромное количество разных реализаций источников. Все они работают по одному принципу, а детали отлично описаны в документации. Поэтому не буду останавливаться на них.

Операции

Все операции над источниками делятся на 2 типа:

Не терминальные возвращают новый источник, который завернул исходный
Терминальные исполняют цепочку и получают данные (subscribe, map...)

И да, ничего не исполнится, пока не будет выполнена терминальная операция. Цепочка может сколько угодно лежать в памяти, не делая вообще ничего. И это хорошо, потому что если мы не получаем данные, то зачем их производить? (Ленивые вычисления без Haskell в комплекте!).

По аналогии со Streams API из [2] [3]

Dispose (Unsubscribe в RxJava 1)

Исполнение цепочки можно прервать. Делается это вызовом dispose() у DisposableObserver (unsubscribe() у Subscriber в RxJava 1).

После этого RxJava прекратит исполнение цепочек, отпишет всех Observer'ов и вызовет iterrupt() у потоков, которые больше не нужны.

Так же можно узнать, не прервано ли исполнение из источников. Для этого у Emitter есть метод isDispose() (isUnsubscribe() для RxJava 1).

У этого есть логичная, но неприятная особенность: так как Observer отвечает за обработку ошибок, теперь все ошибки крашат приложение. Я пока не нашёл решения, о котором готов написать.

Заключение

RxJava:

— Позволяет легко компоновать запросы к сети, базе данных и т.д; организуя их асинхронное выполнение. Это означает, что ваши пользователи получат более быстрое и отзывчивое приложение.

— Не содержит в себе никакой магии. Только составление и исполнение цепочек источников.

— (Для меня) Решает больше проблем, чем создаёт!

Всем спасибо.

[1] Видео [1]
[2] Видео [3]

Автор: adev_one

Источник [4]


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/java/222787

Ссылки в тексте:

[1] [1]: https://www.youtube.com/watch?v=sTSQlYX5DU0

[2] github.com/a-dminator/rx-products-and-prices: https://github.com/a-dminator/rx-products-and-prices

[3] [2]: https://www.youtube.com/watch?v=O8oN4KSZEXE

[4] Источник: https://habrahabr.ru/post/317928/?utm_source=habrahabr&utm_medium=rss&utm_campaign=best