- PVSM.RU - https://www.pvsm.ru -
Отказоустойчивость, отзывчивость, ориентированность на события и масштабируемость — четыре принципа нынче популярного реактивного программирования [1]. Именно следуя им создаётся backend больших систем с одновременной поддержкой десятков тысяч соединений.
Отзывчивость, простота, гибкость и расширяемость кода — принципы, которые можно закрепить за реактивным UI.
Наверняка, если совместить реактивные backend и UI, то можно получить качественный продукт. Именно его мы и попытались сделать, разрабатывая 2GIS Dialer [2] — звонилки, которая работает через API и при этом должна оставаться быстрой и удобной.
Рассмотрим пример:
requestDataTask = new AsyncTask<Void, Void, JSONObject>() {
@Override
protected JSONObject doInBackground(Void... params) {
final String requestResult = apiService.getData();
final JSONObject json = JsonUtils.parse(requestResult);
lruCache.cacheJson(json);
return json;
}
};
Тут всё просто, мы создаем AsyncTask, в котором:
<code>JSONObject
на основе результата запроса.JSONObject
.JSONObject
.Подобный код встречается во многих проектах, он понятен, а миллионы леммингов не могут ошибаться. Но давайте копнём чуть глубже:
Exception
?doInBackground(Void...)
выполняется в отдельном потоке, как нам сказать пользователю об ошибке в UI? Заводить поля для Exception
?И ведь это не самый сложный пример. Представьте, что вам надо сделать ещё один запрос на основе результатов предыдущего. На AsyncTask’ах это будет callback-hell, который, как минимум, будет неустойчив к падениям, ошибкам и т.д.
Вопросов больше, чем ответов. О недостатках AsyncTask’ов можно написать целую статью, серьезно. Но есть ли варианты лучше?
Оглядываясь на принципы реактивного программирования мы начали искать решение, которое обеспечит:
Таковым стала RxJava [4]от ребят из Netflix — reactive extension, идея (но не реализация) которого перекочевала из reactive extension for c#.
В RxJava всё крутится вокруг Observable
. Observable
— это как потоки данных (ещё их можно рассматривать как монады), которые могут каким-либо образом получать и отдавать эти самые данные. Над Observable
’ами можно применять операции, такие как flatmap
, filter
, zip
, merge
, cast
и т.д.
Простой пример:
//Observable, который последовательно будет давать нам элементы из Iterable
final Subscription subscription = Observable.from(Arrays.asList(1, 2, 3, 4, 5))
.subscribeOn(Schedulers.newThread()) //отдаем новый тред для работы в background
.observeOn(AndroidSchedulers.mainThread()) //говорим, что обсервить хотим в main thread
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
//do something with result
}
});
Мы создаем Observable
, который поочередно отдает нам числа из Iterable
. Указываем, что генерация и передача данных будет происходить в отдельном треде, а обработка результата — в main thread. Подписываемся на него, и в методе подписчика производим любые манипуляции с каждым следующим результатом.
Можно сделать этот пример более интересным:
//Observable, который последовательно будет давать нам элементы из Iterable
final Subscription subscription = Observable.from(Arrays.asList(1, 2, 3, 4, 5)).
//оператор фильтрации для отсеивания ненужных результатов
filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 2 == 0; //выражение верно только для четных чисел
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
//do something with ONLY EVEN result
}
});
Теперь, указав оператор filter
, мы можем обрабатывать только чётные значения.
Вернёмся к нашему первому AsyncTask и посмотрим, как бы мы решили задачу с помощью реактивного программирования.
Для начала создадим Observable с запросом:
//Observable, действия которого основанны на переданной ему Observable.OnSubscribe<String>
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//сообщить сабскрайберу о том, что есть новые данные
subscriber.onNext(apiService.getData());
//А теперь сообщаем о том, что мы закончили и данных больше нет
subscriber.onCompleted();
}
});
Тут мы создаем Observable
и специфицируем его поведение. Делаем запрос и отдаем результат в onNext(...)
, после чего говорим Subscriber
’у, что мы закончили, вызвав onCompleted()
.
С этим понятно: мы создали Observalble
, который отвечает только за получение объекта String
с API. SRP [5] в чистом виде.
Что, если запрос не прошёл по каким-то причинам? Тогда мы можем позвать у Observable
метод retry(...)
, который будет повторять этот самый Observable
n раз, пока он не завершится успешно (читай, без Exception
). Кроме того, мы можем отдать Observable
’у другой Observable
, если даже retry()
не помог. Если backend написан криво, то лучше бы нам закрывать соединение по таймауту. И у нас есть метод timeout(...)
на этот случай. Всё вместе это выглядело бы так:
final Subscription subscription =
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext(apiService.getData());
subscriber.onCompleted();
}
})
.timeout(5, TimeUnit.SECONDS) //указываем таймаут операции в секундах
.retry(3) // делаем 3 попытки запроса
//назначаем обработчик в случае, если все таки мы не спасли положение
.onErrorResumeNext(new Func1<Throwable, Observable<? extends String>>() {
@Override
public Observable<? extends String> call(Throwable throwable) {
//return new observable here, that can rescure us from error
}
});
И немного рефакторинга:
final Subscription subscription =
createApiRequestObservable() //создали Observable с запросом
.timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) //поставили таймаут
.retry(RETRY_COUNT_FOR_REQUEST) //поставили кол-во повторов
.onErrorResumeNext(createRequestErrorHandler()); // назначили обработчик ошибки
Теперь займемся созданием json. Для этого результат нашего первого Observable
(а там String
) надо преобразовать. Используем map(...)
, и, если что-то вдруг пойдет не так, вернем другой, нужный нам в случае неудачи, json с помощью onErrorReturn(...)
.
Вот так:
final Subscription subscription =
createApiRequestObservable()
.timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)
.retry(RETRY_COUNT_FOR_REQUEST)
.onErrorResumeNext(createRequestErrorHandler())
//модифицируем Observable, чтобы тот преобразовывал String в JSONObject
.map(new Func1<String, JSONObject>() {
@Override
public JSONObject call(String s) {
return JsonUtils.parse(s);
}
})
//снова ставим обработчик ошибки
//и вернем предустановленный "ошибочный" json
.onErrorReturn(new Func1<Throwable, JSONObject>() {
@Override
public JSONObject call(Throwable throwable) {
return jsonObjectForErrors;
}
});
Ок, с json разобрались. Осталось кэширование. Кэширование: это не преобразование результата, а действие над ним. Для этого у Observable
есть методы doOnNext(...)
, doOnEach(...)
и т.д. Получается как-то так:
final Subscription subscription =
createApiRequestObservable()
.timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)
.retry(RETRY_COUNT_FOR_REQUEST)
.onErrorResumeNext(createRequestErrorHandler())
//модифицируем Observable, чтобы тот преобразовывал String в JSONObject
.map(new Func1<String, JSONObject>() {
@Override
public JSONObject call(String s) {
return JsonUtils.parse(s);
}
})
//снова ставим обработчик ошибки
//и вернем предустановленный "ошибочный" json
.onErrorReturn(new Func1<Throwable, JSONObject>() {
@Override
public JSONObject call(Throwable throwable) {
return jsonObjectForErrors;
}
})
//процедура, вызывающаяся при каждом onNext(..) от Observable
.doOnNext(new Action1<JSONObject>() {
@Override
public void call(JSONObject jsonObject) {
lruCache.cacheJson(jsonObject);
}
});
Снова немного отрефакторим код:
final Subscription subscription =
createApiRequestObservable() //создали Observable с запросом
.timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) //поставили таймаут
.retry(RETRY_COUNT_FOR_REQUEST) //поставили кол-во повторов
.onErrorResumeNext(createRequestErrorHandler()) // назначили обработчик ошибки
.map(createJsonMapOperator()) //модифицировали Observable, чтобы получать JSONObject
.onErrorReturn(createJsonErrorHandler()) //возвращаем в случае ошибки то, что ожидаем
.doOnNext(createCacheOperation()); //кэшируем JSONObject
Мы почти закончили. Как в самом первом примере с RxJava, добавим обработчик результата и укажем треды, в которых надо исполняться.
Финальная версия:
final Subscription subscription =
createApiRequestObservable() //создали Observable с запросом
.timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) //поставили таймаут
.retry(RETRY_COUNT_FOR_REQUEST) //поставили кол-во повторов
.onErrorResumeNext(createRequestErrorHandler()) // назначили обработчик ошибки
.map(createJsonMapOperator()) //модифицировали Observable, чтобы получать JSONObject
.onErrorReturn(createJsonErrorHandler()) //возвращаем в случае ошибки то, что ожидаем
.doOnNext(createCacheOperation()); //кэшируем JSONObject
.subscribeOn(Schedulers.newThread()) //делаем запрос, преобразование, кэширование в отдельном потоке
.observeOn(AndroidSchedulers.mainThread()) // обработка результата - в main thread
.subscribe(subscriber); //обработчик результата
Давайте посмотрим, чего мы тут добились:
doOnNext
выполнится параллельно обработке результата.Observable
и держать в консистентном состоянии всю систему.doOnError(Throwable thr)
. Хотите отфильтровать результаты — добавьте оператор filter
и реализуйте его поведение.Как и недостатки AsyncTask’ов, преимущества этого подхода, на мой взгляд, можно перечислять очень долго. Последний из принципов реактивного программирования, принцип масштабируемости, продемонстрируем ниже.
Живой пример:
//создаем новый Observable путем комбинирования четырех других
final Observable<AggregatedContactData> obs = Observable.combineLatest(
createContactsObservable(context), //Observable для получения контактов из базы
createPhonesObservable(context), //Observable для получения всех телефонов контактов
createAccountsObservable(context), //Observable для полуения аккаунтов и контактов по ним
createJobsObservable(context), //Observable для получения мест работы контактов
contactsInfoCombinator() //функция комбинироваия результатов всех Observable выше
).onErrorReturn(createEmptyObservable()).cache() //обработчик ошибки и оператор кэширования
.subscribeOn(Schedulers.executor(ThreadPoolHolder.getGeneralExecutor())) //для выполнения такой задачи потребуется тред пул
.observeOn(AndroidSchedulers.mainThread()); // обработка данных как всегда - в main thread
Observable.combineLatest(...)
. Этот оператор ждет onNext(...)
от всех переданных ему Observable
’ов и применяет функцию комбинирования сразу ко всем результатам. Может показаться сложным, но картинка [6] из вики RxJava сделает всё понятнее. Самое важное тут, что каждый из Observable
, переданных в Observable.combineLatest(...)
— это CursorObservable
, который передает в свой onNext(...)
новый курсор, как только он меняется в базе данных. Таким образом, на любое обновление любого из четырех курсоров выполняется функция комбинирования, что позволяет всегда поставлять самые свежие данные. Это и есть принцип ориентированности на события.Collections.emptyList();
cache()
очень полезен, если на этот Observable
могут быть подписаны сразу несколько Subscribers
. Если этот оператор применен к Observable
, то новый его подписчик мгновенно получает данные, при условии, что эти данные уже были посчитаны для подписавшихся ранее. Таким образом, все желающие имеет актуальные одинаковые данные.subscribeOn(...)
я отдаю тред пул на 4 треда, чтобы каждый из 4х моих Observable
выполнялся в отдельном треде с целью максимизации скорости, всю остальную заботу берет на себя RxJava. То есть задействованы будут все 4 процессора, при наличие оных.Как видите, потенциал у реактивного программирования огромный, а фукнционал RxJava реализует его в достаточной мере.
Всё, продемонстрированное выше и намного больше в том или ином виде используется у нас в дайлере. И вот с какими проблемами мы столкнулись:
OutOfMemoryError
. Решение было простым. Ввести CachedThreadPool
для этих дел и проблема решена.cache()
из примера выше. Хотелось бы, чтобы следующий запрос на тот же самый url сразу брался из кэша. В RxJava такого нет. В принципе это правильно, потому что реактивность и кэш — две разные вещи. Поэтому мы написали свой кэш.
Мы увидели, как классно реактивно работать с многопоточностью и запросами в Android. Но это далеко не всё. Например, можно подписываться на изменение Checkable
или EditText
(это из коробки идет в RxJava для Android). Тут всё просто, но ужасно удобно.
Кстати, одной RxJava реактивное программирование под Java не ограничивается. Существуют и другие библиотеки, например, Bolts-Android [7].
Кроме этого, сейчас активно разрабатывается Reactive-Streams [8], который призван унифицировать работу с разными реактивными провайдерами в java.
Понравилось ли нам? Однозначно. Реактивные приложения действительно гораздо устойчивее к багам и падениям, код становится понятным и гибким (были бы лямбды — был бы еще и красивым). Много рутинной работы перекладывается на библиотеку, которая выполняет свою работу лучше, чем нативные Android-компоненты. Это позволяет сосредоточиться на реализации вещей, которые действительно стоит обдумать.
Реактивное программирование — это немного другое
Ссылки, которые помогут вам разобраться с реактивным программированием или просто могут оказаться интересными:
Небольшое отступление. Если вы разделяете наши взгляды на программирование и создание продуктов, то приходите [12] — будем рады вас видеть в команде 2GIS Dialer.
Автор: lNevermore
Источник [13]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/news/63803
Ссылки в тексте:
[1] реактивного программирования: http://www.reactivemanifesto.org/
[2] 2GIS Dialer: http://apps.2gis.ru/?utm_source=news&utm_medium=habr&utm_campaign=post_reactive#dialer
[3] API 2ГИС: http://api.2gis.ru/?utm_source=news&utm_medium=habr&utm_campaign=post_reactive</a>
[4] RxJava : https://github.com/Netflix/RxJava
[5] SRP: http://en.wikipedia.org/wiki/Single_responsibility_principle
[6] картинка: https://github.com/Netflix/RxJava/wiki/images/rx-operators/combineLatest.png
[7] Bolts-Android: https://github.com/BoltsFramework/Bolts-Android
[8] Reactive-Streams: http://www.reactive-streams.org/
[9] мышление: http://www.braintools.ru
[10] Википедия RxJava.: https://github.com/Netflix/RxJava/wiki
[11] Очень толковая статья: http://mttkay.github.io/blog/2013/08/25/functional-reactive-programming-on-android-with-rxjava/
[12] приходите: http://job.2gis.ru/vacancy/novosibirsk/id/695/
[13] Источник: http://habrahabr.ru/post/228125/
Нажмите здесь для печати.