- PVSM.RU - https://www.pvsm.ru -
Отказоустойчивость, отзывчивость, ориентированность на события и масштабируемость — четыре принципа нынче популярного реактивного программирования [1]. Именно следуя им создаётся backend больших систем с одновременной поддержкой десятков тысяч соединений.
Отзывчивость, простота, гибкость и расширяемость кода — принципы, которые можно закрепить за реактивным UI.
Наверняка, если совместить реактивные backend и UI, то можно получить качественный продукт. Именно его мы и попытались сделать, разрабатывая 2GIS Dialer [2] — звонилки, которая работает через API и при этом должна оставаться быстрой и удобной.

Рассмотрим пример:
requestDataTask = new AsyncTask<Void, Void, JSONObject>() {
@Override
protected JSONObject doInBackground(Void... params) {
final String requestResult = apiService.getData();
final JSONObject json = JsonUtils.parse(requestResult);
lruCache.cacheJson(json);
return json;
}
};
Тут всё просто, мы создаем AsyncTask, в котором:
<code>JSONObject на основе результата запроса.JSONObject.JSONObject.Подобный код встречается во многих проектах, он понятен, а миллионы леммингов не могут ошибаться. Но давайте копнём чуть глубже:
Exception?doInBackground(Void...) выполняется в отдельном потоке, как нам сказать пользователю об ошибке в UI? Заводить поля для Exception?И ведь это не самый сложный пример. Представьте, что вам надо сделать ещё один запрос на основе результатов предыдущего. На AsyncTask’ах это будет callback-hell, который, как минимум, будет неустойчив к падениям, ошибкам и т.д.
Вопросов больше, чем ответов. О недостатках AsyncTask’ов можно написать целую статью, серьезно. Но есть ли варианты лучше?
Оглядываясь на принципы реактивного программирования мы начали искать решение, которое обеспечит:
Таковым стала RxJava [4]от ребят из Netflix — reactive extension, идея (но не реализация) которого перекочевала из reactive extension for c#.
В RxJava всё крутится вокруг Observable. Observable — это как потоки данных (ещё их можно рассматривать как монады), которые могут каким-либо образом получать и отдавать эти самые данные. Над Observable’ами можно применять операции, такие как flatmap, filter, zip, merge, cast и т.д.
Простой пример:
//Observable, который последовательно будет давать нам элементы из Iterable
final Subscription subscription = Observable.from(Arrays.asList(1, 2, 3, 4, 5))
.subscribeOn(Schedulers.newThread()) //отдаем новый тред для работы в background
.observeOn(AndroidSchedulers.mainThread()) //говорим, что обсервить хотим в main thread
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
//do something with result
}
});
Мы создаем Observable, который поочередно отдает нам числа из Iterable. Указываем, что генерация и передача данных будет происходить в отдельном треде, а обработка результата — в main thread. Подписываемся на него, и в методе подписчика производим любые манипуляции с каждым следующим результатом.
Можно сделать этот пример более интересным:
//Observable, который последовательно будет давать нам элементы из Iterable
final Subscription subscription = Observable.from(Arrays.asList(1, 2, 3, 4, 5)).
//оператор фильтрации для отсеивания ненужных результатов
filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 2 == 0; //выражение верно только для четных чисел
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
//do something with ONLY EVEN result
}
});
Теперь, указав оператор filter, мы можем обрабатывать только чётные значения.
Вернёмся к нашему первому AsyncTask и посмотрим, как бы мы решили задачу с помощью реактивного программирования.
Для начала создадим Observable с запросом:
//Observable, действия которого основанны на переданной ему Observable.OnSubscribe<String>
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//сообщить сабскрайберу о том, что есть новые данные
subscriber.onNext(apiService.getData());
//А теперь сообщаем о том, что мы закончили и данных больше нет
subscriber.onCompleted();
}
});
Тут мы создаем Observable и специфицируем его поведение. Делаем запрос и отдаем результат в onNext(...), после чего говорим Subscriber’у, что мы закончили, вызвав onCompleted().
С этим понятно: мы создали Observalble, который отвечает только за получение объекта String с API. SRP [5] в чистом виде.
Что, если запрос не прошёл по каким-то причинам? Тогда мы можем позвать у Observable метод retry(...), который будет повторять этот самый Observable n раз, пока он не завершится успешно (читай, без Exception). Кроме того, мы можем отдать Observable’у другой Observable, если даже retry() не помог. Если backend написан криво, то лучше бы нам закрывать соединение по таймауту. И у нас есть метод timeout(...) на этот случай. Всё вместе это выглядело бы так:
final Subscription subscription =
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext(apiService.getData());
subscriber.onCompleted();
}
})
.timeout(5, TimeUnit.SECONDS) //указываем таймаут операции в секундах
.retry(3) // делаем 3 попытки запроса
//назначаем обработчик в случае, если все таки мы не спасли положение
.onErrorResumeNext(new Func1<Throwable, Observable<? extends String>>() {
@Override
public Observable<? extends String> call(Throwable throwable) {
//return new observable here, that can rescure us from error
}
});
И немного рефакторинга:
final Subscription subscription =
createApiRequestObservable() //создали Observable с запросом
.timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) //поставили таймаут
.retry(RETRY_COUNT_FOR_REQUEST) //поставили кол-во повторов
.onErrorResumeNext(createRequestErrorHandler()); // назначили обработчик ошибки
Теперь займемся созданием json. Для этого результат нашего первого Observable (а там String) надо преобразовать. Используем map(...), и, если что-то вдруг пойдет не так, вернем другой, нужный нам в случае неудачи, json с помощью onErrorReturn(...).
Вот так:
final Subscription subscription =
createApiRequestObservable()
.timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)
.retry(RETRY_COUNT_FOR_REQUEST)
.onErrorResumeNext(createRequestErrorHandler())
//модифицируем Observable, чтобы тот преобразовывал String в JSONObject
.map(new Func1<String, JSONObject>() {
@Override
public JSONObject call(String s) {
return JsonUtils.parse(s);
}
})
//снова ставим обработчик ошибки
//и вернем предустановленный "ошибочный" json
.onErrorReturn(new Func1<Throwable, JSONObject>() {
@Override
public JSONObject call(Throwable throwable) {
return jsonObjectForErrors;
}
});
Ок, с json разобрались. Осталось кэширование. Кэширование: это не преобразование результата, а действие над ним. Для этого у Observable есть методы doOnNext(...), doOnEach(...) и т.д. Получается как-то так:
final Subscription subscription =
createApiRequestObservable()
.timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)
.retry(RETRY_COUNT_FOR_REQUEST)
.onErrorResumeNext(createRequestErrorHandler())
//модифицируем Observable, чтобы тот преобразовывал String в JSONObject
.map(new Func1<String, JSONObject>() {
@Override
public JSONObject call(String s) {
return JsonUtils.parse(s);
}
})
//снова ставим обработчик ошибки
//и вернем предустановленный "ошибочный" json
.onErrorReturn(new Func1<Throwable, JSONObject>() {
@Override
public JSONObject call(Throwable throwable) {
return jsonObjectForErrors;
}
})
//процедура, вызывающаяся при каждом onNext(..) от Observable
.doOnNext(new Action1<JSONObject>() {
@Override
public void call(JSONObject jsonObject) {
lruCache.cacheJson(jsonObject);
}
});
Снова немного отрефакторим код:
final Subscription subscription =
createApiRequestObservable() //создали Observable с запросом
.timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) //поставили таймаут
.retry(RETRY_COUNT_FOR_REQUEST) //поставили кол-во повторов
.onErrorResumeNext(createRequestErrorHandler()) // назначили обработчик ошибки
.map(createJsonMapOperator()) //модифицировали Observable, чтобы получать JSONObject
.onErrorReturn(createJsonErrorHandler()) //возвращаем в случае ошибки то, что ожидаем
.doOnNext(createCacheOperation()); //кэшируем JSONObject
Мы почти закончили. Как в самом первом примере с RxJava, добавим обработчик результата и укажем треды, в которых надо исполняться.
Финальная версия:
final Subscription subscription =
createApiRequestObservable() //создали Observable с запросом
.timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) //поставили таймаут
.retry(RETRY_COUNT_FOR_REQUEST) //поставили кол-во повторов
.onErrorResumeNext(createRequestErrorHandler()) // назначили обработчик ошибки
.map(createJsonMapOperator()) //модифицировали Observable, чтобы получать JSONObject
.onErrorReturn(createJsonErrorHandler()) //возвращаем в случае ошибки то, что ожидаем
.doOnNext(createCacheOperation()); //кэшируем JSONObject
.subscribeOn(Schedulers.newThread()) //делаем запрос, преобразование, кэширование в отдельном потоке
.observeOn(AndroidSchedulers.mainThread()) // обработка результата - в main thread
.subscribe(subscriber); //обработчик результата
Давайте посмотрим, чего мы тут добились:
doOnNext выполнится параллельно обработке результата.Observable и держать в консистентном состоянии всю систему.doOnError(Throwable thr). Хотите отфильтровать результаты — добавьте оператор filter и реализуйте его поведение.Как и недостатки AsyncTask’ов, преимущества этого подхода, на мой взгляд, можно перечислять очень долго. Последний из принципов реактивного программирования, принцип масштабируемости, продемонстрируем ниже.
Живой пример:
//создаем новый Observable путем комбинирования четырех других
final Observable<AggregatedContactData> obs = Observable.combineLatest(
createContactsObservable(context), //Observable для получения контактов из базы
createPhonesObservable(context), //Observable для получения всех телефонов контактов
createAccountsObservable(context), //Observable для полуения аккаунтов и контактов по ним
createJobsObservable(context), //Observable для получения мест работы контактов
contactsInfoCombinator() //функция комбинироваия результатов всех Observable выше
).onErrorReturn(createEmptyObservable()).cache() //обработчик ошибки и оператор кэширования
.subscribeOn(Schedulers.executor(ThreadPoolHolder.getGeneralExecutor())) //для выполнения такой задачи потребуется тред пул
.observeOn(AndroidSchedulers.mainThread()); // обработка данных как всегда - в main thread
Observable.combineLatest(...). Этот оператор ждет onNext(...) от всех переданных ему Observable’ов и применяет функцию комбинирования сразу ко всем результатам. Может показаться сложным, но картинка [6] из вики RxJava сделает всё понятнее. Самое важное тут, что каждый из Observable, переданных в Observable.combineLatest(...) — это CursorObservable, который передает в свой onNext(...) новый курсор, как только он меняется в базе данных. Таким образом, на любое обновление любого из четырех курсоров выполняется функция комбинирования, что позволяет всегда поставлять самые свежие данные. Это и есть принцип ориентированности на события.Collections.emptyList(); cache() очень полезен, если на этот Observable могут быть подписаны сразу несколько Subscribers. Если этот оператор применен к Observable, то новый его подписчик мгновенно получает данные, при условии, что эти данные уже были посчитаны для подписавшихся ранее. Таким образом, все желающие имеет актуальные одинаковые данные.subscribeOn(...) я отдаю тред пул на 4 треда, чтобы каждый из 4х моих Observable выполнялся в отдельном треде с целью максимизации скорости, всю остальную заботу берет на себя RxJava. То есть задействованы будут все 4 процессора, при наличие оных.Как видите, потенциал у реактивного программирования огромный, а фукнционал RxJava реализует его в достаточной мере.
Всё, продемонстрированное выше и намного больше в том или ином виде используется у нас в дайлере. И вот с какими проблемами мы столкнулись:
OutOfMemoryError. Решение было простым. Ввести CachedThreadPool для этих дел и проблема решена.cache() из примера выше. Хотелось бы, чтобы следующий запрос на тот же самый url сразу брался из кэша. В RxJava такого нет. В принципе это правильно, потому что реактивность и кэш — две разные вещи. Поэтому мы написали свой кэш.
Мы увидели, как классно реактивно работать с многопоточностью и запросами в Android. Но это далеко не всё. Например, можно подписываться на изменение Checkable или EditText (это из коробки идет в RxJava для Android). Тут всё просто, но ужасно удобно.
Кстати, одной RxJava реактивное программирование под Java не ограничивается. Существуют и другие библиотеки, например, Bolts-Android [7].
Кроме этого, сейчас активно разрабатывается Reactive-Streams [8], который призван унифицировать работу с разными реактивными провайдерами в java.
Понравилось ли нам? Однозначно. Реактивные приложения действительно гораздо устойчивее к багам и падениям, код становится понятным и гибким (были бы лямбды — был бы еще и красивым). Много рутинной работы перекладывается на библиотеку, которая выполняет свою работу лучше, чем нативные Android-компоненты. Это позволяет сосредоточиться на реализации вещей, которые действительно стоит обдумать.
Реактивное программирование — это немного другое по сравнению с традиционной разработкой под Android. Потоки данных, функциональные операторы — эти сложные, на первый взгляд, вещи оказываются намного проще, если разобраться. Стоит немного поработать с реактивным программированием, и
Ссылки, которые помогут вам разобраться с реактивным программированием или просто могут оказаться интересными:
Небольшое отступление. Если вы разделяете наши взгляды на программирование и создание продуктов, то приходите [12] — будем рады вас видеть в команде 2GIS Dialer.
Автор: lNevermore
Источник [13]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/news/63803
Ссылки в тексте:
[1] реактивного программирования: http://www.reactivemanifesto.org/
[2] 2GIS Dialer: http://apps.2gis.ru/?utm_source=news&utm_medium=habr&utm_campaign=post_reactive#dialer
[3] API 2ГИС: http://api.2gis.ru/?utm_source=news&utm_medium=habr&utm_campaign=post_reactive</a>
[4] RxJava : https://github.com/Netflix/RxJava
[5] SRP: http://en.wikipedia.org/wiki/Single_responsibility_principle
[6] картинка: https://github.com/Netflix/RxJava/wiki/images/rx-operators/combineLatest.png
[7] Bolts-Android: https://github.com/BoltsFramework/Bolts-Android
[8] Reactive-Streams: http://www.reactive-streams.org/
[9] мышление: http://www.braintools.ru
[10] Википедия RxJava.: https://github.com/Netflix/RxJava/wiki
[11] Очень толковая статья: http://mttkay.github.io/blog/2013/08/25/functional-reactive-programming-on-android-with-rxjava/
[12] приходите: http://job.2gis.ru/vacancy/novosibirsk/id/695/
[13] Источник: http://habrahabr.ru/post/228125/
Нажмите здесь для печати.