- PVSM.RU - https://www.pvsm.ru -
Заинтересовавшись темой функционального программирования я встал на распутье, — какой фреймворк выбрать для ознакомления. ReactiveCocoa — ветеран в iOS кругах, по нему вдоволь информации. Но он вырос с Objective-C, и хотя это не является проблемой, но все же в данный момент я в основном пишу именно на Swift, — хотелось бы взять решение изначально спроектированное с учетом всех плюшек языка. RxSwift же порт Reactive Extensions, имеющего долгую историю, но сам порт свежий и написанный именно под Swift. На нем я и решил остановиться.
Но специфика документации по RxSwift в том, что описание всех команд ведет на reactivex.io [1], а там в основном дается общая информация, руки у разработчиков не дошли еще сделать документацию именно для RxSwift, что не всегда удобно. Некоторые команды имеют тонкости в реализации, есть те, о которых в общей документации нет ничего кроме упоминания.
Прочитав все главы вики с RxSwift гитхаба, я сразу решил поразбираться с официальными примерами, тут то и стало ясно, что с RX такое не пройдет, нужно хорошо понимать основы, иначе будешь как мартышка с копипастом гранатой. Я начал разбирать самые сложные для понимания команды, потом те, что вроде понятны, но задав себе вопросы по ним я понял, что лишь догадываюсь на то как верно ответить, но не уверен.
В общем ничтоже сумняшеся я решил проработать все операторы RxSwift. Лучший способ что то понять в программировании — запустить код и посмотреть как он отработает. Затем учитывая специфику реактивного программирования — очень полезны схемы, ну и краткое описание на русском. Закончив сегодня работу, я подумал, что грех не поделиться результатами с тем, кто лишь присматривается к теме реактивного программирования.
Много картинок и текста под катом, очень много!
Предварительно я рекомендую просмотреть официальную документацию [2], у меня передана основная суть и специфика RxSwift команд, а не основы.
Так же можно «поиграться» с шариками в схемах, так называемые RxMarbles [3], есть бесплатная версия под iPhone/iPad [4]
Итак, в этой статье я рассмотрю все (ну или почти все) команды RxSwift, по каждой я дам краткое описание, схему(если это имеет смысл), код, результат выполнения, при необходимости сделаю комментарии по выводу в лог результатов выполнения кода.
В статье заголовок каждой команды — ссылка на на официальную документацию, т.к. я не ставил перед собой цели перевести все нюансы по командам.
Вот ссылка на гитхаб [5], куда я склонировал официальный репозиторий RxSwift, и добавил свою песочницу (DescribeOperators.playground), где и будет практически тот же код, что и в статье.
А вот и ссылка конкретно на PDF [6] где в виде mindMap собраны все команды, что позволяет быстро просмотреть их все. Кусочки кода в PDF приложены для того чтобы увидеть как и с каким параметрами нужно работать с командой. Изначально ради этого PDF я все и затеял — иметь под рукой документ в котором наглядно видны все команды с их схемами. PDF получился огромным (в плане рабочего пространства, а не веса), но я проверял, даже на iPad 2 все нормально просматривается.
Обо всех ошибках просьба писать в личку, объем работ оказался слегка великоват, после четвертой вычитки текста мои глаза меня прокляли.
Что ж, надеюсь моя работа кому то пригодится. Приступим.
Заметки [7]
asObservable [8]
create [9]
deferred [10]
empty [11]
error [12]
interval [13]
just [14]
never [15]
of [16]
range [17]
repeatElement [18]
timer [19]
amb [20]
combineLatest [21]
concat [22]
merge [23]
startWith [24]
switchLatest [25]
withLatestFrom [26]
zip [27]
distinctUntilChanged [28]
elementAt [29]
filter [30]
ignoreElements [31]
sample [32]
single [33]
skip [34]
skip (duration) [35]
skipUntil [36]
skipWhile [37]
skipWhileWithIndex [38]
take [39]
take (duration) [40]
takeLast [41]
takeUntil [42]
takeWhile [43]
takeWhileWithIndex [44]
throttle [45]
buffer [46]
flatMap [47]
flatMapFirst [48]
flatMapLatest [49]
flatMapWithIndex [50]
map [51]
mapWithIndex [52]
window [53]
reduce [54]
scan [55]
toArray [56]
catchError [57]
catchErrorJustReturn [58]
retry [59]
retryWhen [60]
multicast [61]
publish [62]
refCount [63]
reply [64]
replayAll [65]
debug [66]
doOn / doOnNext [67]
delaySubscription [68]
observeOn [69]
subscribe [70]
subscribeOn [71]
timeout [72]
using [73]
В схемах я буду использовать обозначение Source/SO в качестве Source Observable, RO/Result в качестве Result Observable.
В качестве вспомогательного кода я буду пользоваться функцией createSequenceWithWait, она создает Observable из массива элементов с указанным интервалом между элементами.
public enum ResultType {
case Infinite
case Completed
case Error
}
public func createSequenceWithWait<T, U>(array: [T], waitTime: Int64 = 1, resultType: ResultType = .Completed, describer: ((value: T) -> U)? = nil) -> Observable<U> {
return Observable<U>.create{ observer in
for (idx, letter) in array.enumerate() {
let time = dispatch_time(dispatch_time_t(DISPATCH_TIME_NOW), waitTime * Int64(idx) * Int64(NSEC_PER_SEC))
dispatch_after(time, dispatch_get_main_queue()) {
if let describer = describer {
let value = describer(value: letter)
observer.on(.Next(value))
} else {
observer.on(.Next(letter as! U))
}
}
}
if resultType != .Infinite {
let allTime = dispatch_time(dispatch_time_t(DISPATCH_TIME_NOW), waitTime * Int64(array.count) * Int64(NSEC_PER_SEC))
dispatch_after(allTime, dispatch_get_main_queue()) {
switch resultType {
case .Completed:
observer.onCompleted()
case .Error:
observer.onError(RxError.Unknown)
default:
break
}
}
}
return NopDisposable.instance
}
}
Функция example — просто позволяет отделять вывод в консоли, её код следующий (взят из RxSwift)
public func example(description: String, action: () -> ()) {
print("n--- (description) example ---")
action()
}
Во всех примерах, где необходимо работать с временными задержками, если этот код будет запускаться в песочнице — необходимо прописать
import XCPlayground
XCPlaygroundPage.currentPage.needsIndefiniteExecution = true
Так же подразумевается, что читатель имеет общее представление о том, что такое реактивное программирование в общем и о RxSwift в частности. Не знаю есть ли смысл городить очередную вводную.
Этот метод реализован в классах RxSwift, если они поддерживают конвертацию в Observable. Например: ControlEvent, ControlProperty, Variable, Driver
example("as Observable") {
let variable = Variable<Int>(0)
variable.asObservable().subscribe { e in
print(e)
}
variable.value = 1
}
Консоль:
--- as Observable example ---
Next(0)
Next(1)
Completed
В данном примере мы Variable преобразовали в Observable и подписались на его события
Этот метод позволяет создавать Observable с нуля, полностью контролируя какие элементы и когда он будет генерировать.
example("create") {
let firstSequence = Observable<AnyObject>.of(1, 2, 3)
let secondSequence = Observable<AnyObject>.of("A", "B", "C")
let multipleSequence = Observable<Observable<AnyObject>>.create { observer in
observer.on(.Next(firstSequence))
observer.on(.Next(secondSequence))
return NopDisposable.instance
}
let concatSequence = multipleSequence.concat()
concatSequence.subscribe { e in
print(e)
}
}
Консоль:
--- create example ---
Next(1)
Next(2)
Next(3)
Next(A)
Next(B)
Next(C)
В данном примере мы создали Observable вручную, он сгенерирует два Observable, элементы которых мы затем объединим командой concat, на получившийся Observable мы и подпишемся
Этот оператор позволяет отложить создание Observable, до момента подписки с помощью subscribe
example("without deferred") {
var i = 1
let justObservable = Observable.just(i)
i = 2
_ = justObservable.subscribeNext{ print ("i = ($0)") }
}
example("with deferred") {
var i = 1
let deferredJustObservable = Observable.deferred{
Observable.just(i)
}
i = 2
_ = deferredJustObservable.subscribeNext{ print ("i = ($0)") }
}
Консоль:
--- without deferred example ---
i = 1
--- with deferred example ---
i = 2
В первом случае Observable создается сразу, с помощью Observable.just(i), и изменение значение i уже не влияет на генерируемый элемент это последовательностью. Во втором же случае мы создаем с помощью deferred, и мы можем поменять значение i перед subscribe
Пустая последовательность, заканчивающаяся Completed
example("empty") {
let sequence = Observable<Int>.empty()
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- empty example ---
Completed
Создаст последовательность, которая состоит из одного события — Error
example("error") {
let sequence = Observable<Int>
.error(RxError.Unknown)
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- error example ---
Error(Unknown error occured.)
Создает бесконечную последовательность, возрастающую с 0 с шагом 1, с указанной периодичностью
example("interval") {
let sequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- interval example ---
Next(0)
Next(1)
Next(2)
Next(3)
Next(4)
....
Создает последовательность из любого значения, которая завершается Completed
example("just") {
let sequence = Observable.just(100)
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- just example ---
Next(100)
Completed
Пустая последовательность, чьи observer’ы никогда не вызываются, т.е. не будет сгенерировано ни одно событие
example("never") {
let sequence = Observable<Int>.never()
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- never example ---
Последовательность из variadic переменной, после всех элементов генерируется Completed
example("simple of") {
let sequence = Observable.of(1, 2)
sequence.subscribe { e in
print(e)
}
}
example("of for Observables") {
let firstSequence = Observable<AnyObject>.of(1, 2, 3)
let secondSequence = Observable<AnyObject>.of("A", "B", "C")
let bothSequence = Observable.of(firstSequence, secondSequence)
let mergedSequence = bothSequence.merge()
mergedSequence.subscribe { e in
print(e)
}
}
Консоль:
--- simple of example ---
Next(1)
Next(2)
Completed
--- of for Observables example ---
Next(1)
Next(2)
Next(3)
Next(A)
Next(B)
Next(C)
Completed
В первом случае мы создали последовательность из двух чисел. Во втором из двух Observable, а затем их объединили между собой с помощью оператора merge
Создает последовательность с конечным числом элементов, возрастающую с шагом 1 от указанного значения указанное число раз, после всех элементов генерируется Completed
example("range") {
let sequence = Observable.range(start: 5, count: 3)
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- range example ---
Next(5)
Next(6)
Next(7)
Completed
Сгенерировались элементы начиная с 5, 3 раза с шагом 1
Бесконечно создавать указанный элемент, без задержек. Никогда не будет сгенерированы события Completed или Error
example("repeatElement") {
let sequence = Observable.repeatElement(1, scheduler: MainScheduler.instance)
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- repeatElement example ---
Next(1)
Next(2)
Next(3)
.....
Бесконечная последовательность, возрастающая с 0 с шагом 1, с указанной периодичностью и возможность задержки при старте. Никогда не будет сгенерированы события Completed или Error
example("timer") {
let sequence = Observable<Int64>.timer(2, period: 3, scheduler: MainScheduler.instance)
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- timer example ---
Next(0)
Next(1)
Next(2)
В данном примере последовательность начнет генерировать элементы с задержкой в 2 секунды, каждые 3 секунды
SO = [Observable<T>] или SO1, SO2 = Observable<T>
RO = Observable<T>
Из всех Observable SO выбирается тот, который первым начинает генерировать элементы, его элементы и дублируются в RO, остальные SO игнорируются
example("amb") {
let subjectA = PublishSubject<Int>()
let subjectB = PublishSubject<Int>()
let subjectC = PublishSubject<Int>()
let subjectD = PublishSubject<Int>()
let ambSequence = [subjectA, subjectB, subjectC, subjectD].amb()
ambSequence.subscribe { e in
print(e)
}
subjectC.onNext(0)
subjectA.onNext(3)
subjectB.onNext(102)
subjectC.onNext(1)
subjectD.onNext(45)
}
Консоль:
--- amb example ---
Next(0)
Next(1)
Т.к. первым сгенерировал элемент subjectC, — лишь его элементы дублируются в RO, остальные игнорируются
SO = SO1, SO2,... SON = Observable<T>
RO = Observable<f(T,T)>
Как только все Observable сгенерировали хотя бы по одному элементу — эти элементы используются в качестве параметров в переданную функцию, и результат этой функции генерируется RO в качестве элемента. В дальнейшем при генерации любым Observable элемента — генерируется новый результат функции с последними элементами из всех комбинируемых Observable
example("combineLatest") {
let firstSequence = createSequenceWithWait([1,2,3], waitTime: 2) { element in
"(element)"
}.debug("firstSequence")
let secondSequence = createSequenceWithWait(["A", "B", "C"], waitTime: 1) { element in
"(element)"
}
.delaySubscription(3, scheduler: MainScheduler.instance)
.debug("secondSequence")
let concatSequence = Observable.combineLatest(firstSequence, secondSequence) {
first, second -> String in
"(first) - (second)"
}
concatSequence.subscribe { e in
print(e)
}
}
Консоль:
--- combineLatest example ---
2016-04-12 16:59:35.421: firstSequence -> subscribed
2016-04-12 16:59:35.422: secondSequence -> subscribed
2016-04-12 16:59:35.434: firstSequence -> Event Next(1)
2016-04-12 16:59:37.423: firstSequence -> Event Next(2)
2016-04-12 16:59:38.423: secondSequence -> Event Next(A)
Next(2 - A)
2016-04-12 16:59:39.423: firstSequence -> Event Next(3)
Next(3 - A)
2016-04-12 16:59:39.522: secondSequence -> Event Next(B)
Next(3 - B)
2016-04-12 16:59:40.622: secondSequence -> Event Next(C)
Next(3 - C)
2016-04-12 16:59:41.722: firstSequence -> Event Completed
2016-04-12 16:59:41.722: firstSequence -> disposed
2016-04-12 16:59:41.722: secondSequence -> Event Completed
2016-04-12 16:59:41.722: secondSequence -> disposed
Completed
В этом примере я создал Observable с помощью createSequenceWithWait, чтобы элементы генерировались с разной задержкой, чтобы было видно как перемешиваются элементы.
firstSequence успел сгенерировать 1 и 2, прежде чем secondSequence сгенерировал A, поэтому 1 отбросили, и первым выводом стало 2 — A
SO = Observable<Observable<T>> или SO1, SO2 = Observable<T>
RO = Observable<T>
В RO элементы включают сначала все элементы первого Observable, и лишь затем следующего. Это означает, что если первый Observable никогда не сгенерирует Completed, — элементы второго никогда не поступят в RO. Ошибка в текущем Observable пробрасывается в RO
example("concat object method") {
let firstSequence = Observable<AnyObject>.of(1, 2, 3)
let secondSequence = Observable<AnyObject>.of("A", "B", "C")
let concatSequence = firstSequence.concat(secondSequence)
concatSequence.subscribe { e in
print(e)
}
}
example("concat from array") {
let firstSequence = Observable.of(1,2,3)
let secondSequence = Observable.of(4,5,6)
let concatSequence = Observable.of(firstSequence, secondSequence)
.concat()
concatSequence.subscribe { e in
print(e)
}
}
Консоль:
--- concat object method example ---
Next(1)
Next(2)
Next(3)
Next(A)
Next(B)
Next(C)
Completed
--- concat from array example ---
Next(1)
Next(2)
Next(3)
Next(4)
Next(5)
Next(6)
Completed
В первом примере мы присоединяем второй Observable к первому.
Во втором генерируем последовательность из массива.
SO = Observable<Observable<T>>
RO = Observable<T>
Элементы RO включают элементы из исходных Observable в том порядке, в котором они были выпущены в исходных Observable
example("simple merge") {
let firstSequence = Observable<AnyObject>.of(1, 2, 3)
let secondSequence = Observable<AnyObject>.of("A", "B", "C")
let bothSequence = Observable.of(firstSequence, secondSequence)
let mergedSequence = bothSequence.merge()
mergedSequence.subscribe { e in
print(e)
}
}
example("merge with wait") {
let firstSequence = createSequenceWithWait([1,2,3]) { element in
"(element)"
}
let secondSequence = createSequenceWithWait(["A", "B", "C"], waitTime: 2) { element in
"(element)"
}
let bothSequence = Observable.of(firstSequence, secondSequence)
let mergedSequence = bothSequence.merge()
mergedSequence.subscribe { e in
print(e)
}
}
Консоль:
--- simple merge example ---
Next(1)
Next(2)
Next(3)
Next(A)
Next(B)
Next(C)
Completed
--- merge with wait example ---
Next(1)
Next(A)
Next(2)
Next(3)
Next(B)
Next(C)
Completed
В первом примере мы сливаем две последовательности созданные без задержки, в итоге первый Observable успевает сгенерировать все свои элементы перед тем как это начнет делать второй, результат оказывается идентичен concat
Во втором же случае последовательности сделаны с задержкой в генерации, и видно что элементы в RO теперь вперемешку, в том порядке в котором они были сгенерированы в исходных Observable
SO = Observable<T>
RO = Observable<T>
В начало SO добавляются элементы переданные в качестве аргумента
example("startWith") {
let sequence = Observable.of(1, 2, 3).startWith(0)
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- startWith example ---
Next(0)
Next(1)
Next(2)
Next(3)
Completed
SO = Observable<Observable<T>>
RO = Observable<T>
Изначально подписываемся на O1 генерируемого SO, его элементы зеркально генерируются в RO. Как только из SO генерируется очередной Observable — элементы предыдущего Observable отбрасываются, т.к. происходит отписка от O1, подписываемся на O2 и так далее. Таким образом в RO — элементы лишь из последнего сгенерированного Observable
example("switchLatest") {
let varA = Variable<Int>(0)
let varB = Variable<Int>(100)
let proxyVar = Variable(varA.asObservable())
let concatSequence = proxyVar.asObservable().switchLatest()
concatSequence.subscribe { e in
print(e)
}
varA.value = 1
varA.value = 2
varB.value = 3
proxyVar.value = varB.asObservable()
varB.value = 4
varA.value = 5
}
example("switchLatest") {
let observableA = Observable<Int>.create{ observer in
delay(0) {
observer.on(.Next(10))
}
delay(3) {
observer.on(.Next(20))
}
delay(5) {
observer.onCompleted()
}
return NopDisposable.instance
}.debug("oA")
let observableB = Observable<Int>.create{ observer in
delay(0) {
observer.on(.Next(100))
}
delay(1) {
observer.on(.Next(200))
}
delay(2) {
observer.onCompleted()
}
return NopDisposable.instance
}.debug("oB")
let observableC = Observable<Int>.create{ observer in
delay(0) {
observer.on(.Next(1000))
}
delay(1) {
observer.on(.Next(2000))
}
delay(2) {
observer.onCompleted()
}
return NopDisposable.instance
}.debug("oC")
let subjects = [observableA, observableB, observableC]
let sequence:Observable<Observable<Int>> = createSequenceWithWait([observableA, observableB, observableC],waitTime:1) {$0}
let switchLatestSequence:Observable<Int> = sequence.switchLatest()
switchLatestSequence.subscribe { e in
print(e)
}
}
Консоль:
--- switchLatest example ---
Next(0)
Next(1)
Next(2)
Next(3)
Next(4)
Completed
--- switchLatest example ---
2016-04-12 17:15:22.710: oA -> subscribed
2016-04-12 17:15:22.711: oA -> Event Next(10)
Next(10)
2016-04-12 17:15:23.797: oA -> disposed // происходит отписка как только сгенерировался oB
2016-04-12 17:15:23.797: oB -> subscribed
2016-04-12 17:15:23.797: oB -> Event Next(100)
Next(100)
2016-04-12 17:15:24.703: oB -> disposed // происходит отписка как только сгенерировался oC
2016-04-12 17:15:24.703: oC -> subscribed
2016-04-12 17:15:24.703: oC -> Event Next(1000)
Next(1000)
2016-04-12 17:15:25.800: oC -> Event Next(2000)
Next(2000)
2016-04-12 17:15:26.703: oC -> Event Completed
2016-04-12 17:15:26.703: oC -> disposed
Completed
В первом примере показано как команда работает в статике, когда мы руками переподключаем Observable.
Во втором же у нас последовательности с задержками. observableA, observableB, observableC из SO генерируются раз в 1 секунду. Их же элементы генерируются с различными задержками.
SO1, SO2 = Observable<T>
RO = Observable<f(T,T)>
Как только O1 генерирует элемент проверяется сгенерирован ли хоть один элемент в O2, если да, то берутся последние элементы из O1 и O2 и используются в качестве аргументов для переданной функции, результат которой генерируется RO в качестве элемента
example("withLatestFrom") {
let varA = Variable<Int>(0)
let varB = Variable<Int>(10)
let withLatestFromSequence = varA.asObservable().withLatestFrom(varB.asObservable()) {
"($0) - ($1)"
}
withLatestFromSequence.subscribe { e in
print(e)
}
varA.value = 1
varA.value = 2
varB.value = 20
varB.value = 30
varA.value = 5
varA.value = 6
}
Консоль:
--- withLatestFrom example ---
Next(0 - 10)
Next(1 - 10)
Next(2 - 10)
Next(5 - 30)
Next(6 - 30)
Completed
SO = Observable<Observable<T>>
RO = Observable<f(T,T)>
Элементы RO представляют собой комбинацию из элементов сгенерированных исходными Observable, объединение идет по индексу выпущенного элемента
example("zip with simple Variable") {
let varA = Variable<Int>(0)
let varB = Variable<Int>(10)
let zippedSequence = Observable.zip(varA.asObservable(), varB.asObservable()) { "($0) - ($1)"
}
zippedSequence.subscribe { e in
print(e)
}
varA.value = 1
varA.value = 2
varB.value = 20
varB.value = 30
varA.value = 3
varA.value = 4
}
example("zip with PublishSubject") {
let subjectA = PublishSubject<Int>()
let subjectB = PublishSubject<Int>()
let zippedSequence = Observable.zip(subjectA, subjectB) { "($0) - ($1)"
}
zippedSequence.subscribe { e in
print(e)
}
subjectA.onNext(0)
subjectA.onNext(1)
subjectA.onNext(2)
subjectB.onNext(100)
subjectB.onNext(101)
subjectA.onNext(3)
subjectB.onNext(102)
subjectA.onNext(4)
}
Консоль:
--- zip with simple Variable example ---
Next(0 - 10)
Next(1 - 20)
Next(2 - 30)
Completed
--- zip with PublishSubject example ---
Next(0 - 100)
Next(1 - 101)
Next(2 - 102)
Из примеров видно, что элементы комбинируются в том порядке, в каком они были сгенерированы в исходных Observable
Пропускаем все повторяющиеся подряд идущие элементы
example("distinctUntilChanged") {
let sequence = Observable.of(1, 2, 2, 3, 4, 4, 4, 1).distinctUntilChanged()
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- distinctUntilChanged example ---
Next(1)
Next(2)
Next(3)
Next(4)
Next(1)
Completed
Здесь тонкий момент, что отбрасываются не уникальные для всей последовательности элементы, а лишь те, которые идут подряд.
В RO попадает лишь элемент выпущенный N по счету
example("elementAt") {
let sequence = Observable.of(0, 10, 20, 30, 40)
.elementAt(2)
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- elementAt example ---
Next(20)
Completed
Отбрасываются все элементы, которые не удовлетворяют заданным условиям
example("filter") {
let sequence = Observable.of(1, 20, 3, 40)
.filter{ $0 > 10}
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- filter example ---
Next(20)
Next(40)
Completed
Отбрасывает все элементы, передаёт только терминальные сообщения Completed и Error
example("ignoreElements") {
let sequence = Observable.of(1, 2, 3, 4)
.ignoreElements()
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- ignoreElements example ---
Completed
При каждом сгенерированном элементе последовательности семплера (воспринимать как таймер) — брать последний выпущенный элемент исходной последовательности и дублировать его в RO, ЕСЛИ он не был сгенерирован ранее
example("sampler") {
let sampler = Observable<Int>.interval(1, scheduler: MainScheduler.instance).debug("sampler")
let sequence:Observable<Int> = createSequenceWithWait([1,2,3,4], waitTime: 3).sample(sampler)
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- sampler example ---
2016-04-12 18:28:20.322: sampler -> subscribed
2016-04-12 18:28:21.323: sampler -> Event Next(0)
Next(1)
2016-04-12 18:28:22.324: sampler -> Event Next(1) // элемент в RO не был сгенерирован, т.к. он уже был сгенерирован ранее
2016-04-12 18:28:23.323: sampler -> Event Next(2)
Next(2)
2016-04-12 18:28:24.323: sampler -> Event Next(3) // элемент в RO не был сгенерирован, т.к. он уже был сгенерирован ранее
2016-04-12 18:28:25.323: sampler -> Event Next(4) // элемент в RO не был сгенерирован, т.к. он уже был сгенерирован ранее
2016-04-12 18:28:26.323: sampler -> Event Next(5)
Next(3)
...
Из исходной последовательности берется единственный элемент, если элементов > 1 — генерировать ошибку. Есть вариант с предикатом
example("single generate error") {
let sequence = Observable.of(1, 2, 3, 4).single()
sequence.subscribe { e in
print(e)
}
}
example("single") {
let sequence = Observable.of(1, 2, 3, 5).single {
$0 % 2 == 0
}
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- single generate error example ---
Next(1)
Error(Sequence contains more than one element.)
--- single example ---
Next(2)
Completed
В первом примере в исходной последовательности оказалось больше 1 элемента, поэтому была сгенерирована ошибка в момент генерирования в SO второго элемента
Во втором примере условиям предиката удовлетворил всего 1 элемент, поэтому ошибки сгенерировано не было
Из SO отбрасываем первые N элементов
example("skip") {
let sequence = Observable.of(1, 2, 3, 4).skip(2)
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- skip example ---
Next(3)
Next(4)
Completed
Из SO отбрасываем первые элементы, которые были сгенерированы в первые N
example("skip duration with wait") {
let sequence = createSequenceWithWait([1,2,3,4]) { $0 }.skip(2, scheduler: MainScheduler.instance)
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- skip duration with wait example ---
Next(3)
Next(4)
Completed
Отбрасываем из SO элементы, которые были сгенерированы до начала генерации элементов последовательностью переданной в качестве параметра
example("skipUntil") {
let firstSequence = createSequenceWithWait([1,2,3,4]) { $0 }
let secondSequence = Observable.just(1)
.delaySubscription(1, scheduler: MainScheduler.instance)
let skippedSequence = firstSequence.skipUntil(secondSequence)
skippedSequence.subscribe { e in
print(e)
}
}
Консоль:
--- skipUntil example ---
Next(3)
Next(4)
Completed
Генерация элементов в secondSequence была отложена на 1 секунду с помощью команды delaySubscription, таким образом элементы из firstSequence стали дублироваться в RO лишь через 1 секунду
Отбрасываем из SO элементы до тех пор, пока истинно условие возвращаемой переданное функцией
example("skipWhile") {
let firstSequence = [1,2,3,4,0].toObservable()
let skipSequence = firstSequence.skipWhile { $0 < 3 }
skipSequence.subscribe { e in
print(e)
}
}
Консоль:
--- skipWhile example ---
Next(3)
Next(4)
Next(0)
Completed
Отбрасываем из SO элементы до тех пор, пока истинно условие возвращаемой переданное функцией. Отличие от skipWhile в том, что в качестве еще одного параметра переданного в функцию является индекс сгенерированного элемента
example("skipWhileWithIndex") {
let firstSequence = [1,2,5,0,7].toObservable()
let skipSequence = firstSequence.skipWhileWithIndex{ value, idx in
value < 4 || idx < 2
}
skipSequence.subscribe { e in
print(e)
}
}
Консоль:
--- skipWhileWithIndex example ---
Next(5)
Next(0)
Next(7)
Completed
Из SO берутся лишь первые N элементов
example("take") {
let sequence = Observable.of(1, 2, 3, 4).take(2)
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- take example ---
Next(1)
Next(2)
Completed
Из SO берутся лишь элементы сгенерированные в первые N секунд
example("take duration with wait") {
let sequence = createSequenceWithWait([1,2,3,4]) { $0 }
let takeSequence = sequence.take(2, scheduler: MainScheduler.instance)
takeSequence.subscribe { e in
print(e)
}
}
Консоль:
--- take duration with wait example ---
Next(1)
Next(2)
Completed
Из SO берутся лишь последние N элементов. Что означает, если SO никогда не закончит генерировать элементы — в RO не попадет ни одного элемента.
example("takeLast") {
let sequence = Observable.of(1, 2, 3, 4).takeLast(2)
sequence.subscribe { e in
print(e)
}
}
example("takeLast with wait") {
let sequence = createSequenceWithWait([1,2,3,4]) { $0 }
let takeSequence = sequence.takeLast(2)
takeSequence.subscribe { e in
print(e)
}
}
Консоль:
--- takeLast example ---
Next(3)
Next(4)
Completed
--- takeLast with wait example ---
Next(3)
Next(4)
Completed
Второй пример приведен для иллюстрации в задержке генерации элементов в RO из за ожидания завершения генерации элементов в SO
Из SO берутся элементы, которые были выпущены до начала генерации элементов последовательностью переданной в качестве параметра
example("takeUntil") {
let stopSequence = Observable.just(1)
.delaySubscription(2, scheduler: MainScheduler.instance)
let sequence = createSequenceWithWait([1,2,3,4]) { $0 }
.takeUntil(stopSequence)
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- takeUntil example ---
Next(1)
Next(2)
Completed
Из SO берутся Элементы до тех пор, пока истинно условие возвращаемой переданной функцией
example("takeWhile") {
let sequence = [1,2,3,4].toObservable().takeWhile{ $0 < 3 }
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- takeWhile example ---
Next(1)
Next(2)
Completed
Из SO берутся Элементы до тех пор, пока истинно условие возвращаемой переданной функцией. Отличие от takeWhile в том, что в качестве еще одного параметра переданного в функцию является индекс сгенерированного элемента
example("takeWhileWithIndex") {
let sequence = [1,2,3,4,5,6].toObservable()
.takeWhileWithIndex{ (val, idx) in
val % 2 == 0 || idx < 3
}
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- takeWhileWithIndex example ---
Next(1)
Next(2)
Next(3)
Next(4)
Completed
Из SO берутся лишь элементы, после которых не было новых элементов N секунд.
example("throttle") {
let sequence = Observable.of(1, 2, 3, 4)
.throttle(1, scheduler: MainScheduler.instance)
sequence.subscribe { e in
print(e)
}
}
example("throttle with wait") {
let sequence = createSequenceWithWait([1,2,3,4]) { $0 }
.throttle(0.5, scheduler: MainScheduler.instance)
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- throttle example ---
Next(4)
Completed
--- throttle with wait example ---
Next(1)
Next(2)
Next(3)
Next(4)
Completed
В первом случае SO генерирует элементы без задержек, поэтому лишь последний элемент не имеет после себя новых элементов.
Во втором примере элементы генерируются медленнее, чем N секунд переданные в throttle, поэтому за каждым генерируемым элементом достаточный временной промежуток.
SO = Observable<>>
RO = Observable<[T]>
Элементы из SO по определенным правилам объединяются в массивы и генерируются в RO. В качестве параметров передается count, — максимальное число элементов в массиве, и timeSpan время максимального ожидания наполнения текущего массива из элементов SO. Таким образом элемент RO, являет собой массив [T], длинной от 0 до count.
example("buffer") {
let varA = Variable<Int>(0)
let bufferSequence = varA.asObservable()
.buffer(timeSpan: 3, count: 3, scheduler: MainScheduler.instance)
bufferSequence.subscribe { e in
print("(NSDate()) - (e)")
}
varA.value = 1
varA.value = 2
varA.value = 3
delay(3) {
varA.value = 4
varA.value = 5
delay(5) {
varA.value = 6
}
}
}
Консоль:
--- buffer example ---
2016-04-12 16:10:58 +0000 - Next([0, 1, 2])
2016-04-12 16:11:01 +0000 - Next([3])
2016-04-12 16:11:04 +0000 - Next([4, 5])
2016-04-12 16:11:07 +0000 - Next([6])
2016-04-12 16:11:07 +0000 - Completed
Длина массива была указана как 3, — как только были сгенерированы 3 элемента — в RO сгенерировался элемент [0, 1, 2]
После генерации элемента 3, — была задержка в 3 секунды, сработал таймаут, и массив не был заполнен полностью.
То же касается и задержки после генерации элемента 5
Каждый элемент SO превращается в отдельный Observable, и все элементы из [O1, O2, O3…] объединяются в RO. Порядок генерации элементов в RO зависит от времени их генерации в исходных [O1, O2, O3…] (как в команде merge)
example("flatMap with wait") {
let sequence:Observable<Int> = createSequenceWithWait([0,1,2], waitTime: 1) { $0 }
let flatMapSequence:Observable<String> = sequence.flatMap{val in
createSequenceWithWait([10,11,12], waitTime: 2) { element in
"(element) - (val)"
}
}
flatMapSequence.subscribe { e in
print(e)
}
}
Консоль:
--- flatMap with wait example ---
Next(10 - 0)
Next(10 - 1)
Next(11 - 0)
Next(10 - 2)
Next(11 - 1)
Next(12 - 0)
Next(11 - 2)
Next(12 - 1)
Next(12 - 2)
Completed
Каждый элемент SO превращается в отдельный Observable.
1) Изначально подписываемся на O1, его элементы зеркально генерируются в RO. Пока O1 генерирует элементы — все последующие Observable сгенерированные из SO отбрасываются, на них не подписываемся.
2) как только O1 оканчивается, — если будет сгенерирован новый Observable — на него подпишутся и его элементы будут дублироваться в RO.
Повторяем пункт 1, но вместо O1 берем последний сгенерированный Observable
example("flatMapFirst") {
let sequence:Observable<Int> = Observable.of(10, 20, 30)
.debug("sequence")
let flatMapSequence:Observable<String> = sequence
.flatMapFirst{val in
Observable.of(0, 1, 2)
.map{"($0) - (val)"
}
}
flatMapSequence.subscribe { e in
print(e)
}
}
example("flatMapFirst with delay") {
let subjectA = Observable<Int>.create{ observer in
delay(0) {
observer.on(.Next(10))
}
delay(1) {
observer.on(.Next(20))
}
delay(7) {
observer.onCompleted()
}
return NopDisposable.instance
}.debug("sA")
let subjectB = Observable<Int>.create{ observer in
delay(0) {
observer.on(.Next(100))
}
delay(1) {
observer.on(.Next(200))
}
delay(2) {
observer.onCompleted()
}
return NopDisposable.instance
}.debug("sB")
let subjectC = Observable<Int>.create{ observer in
delay(0) {
observer.on(.Next(1000))
}
delay(1) {
observer.on(.Next(2000))
}
delay(2) {
observer.onCompleted()
}
return NopDisposable.instance
}.debug("sC")
let subjects = [subjectA, subjectB, subjectC]
let sequence:Observable<Int> = createSequenceWithWait([0, 1, 2],waitTime:4){$0}
.debug("sequence")
let flatMapSequence:Observable<Int> = sequence.flatMapFirst{val in
return subjects[val].asObservable()
}.debug("flatMapSequence")
flatMapSequence.subscribe { e in
print(e)
}
}
Консоль:
--- flatMapFirst example ---
2016-04-12 19:19:46.915: sequence -> subscribed
2016-04-12 19:19:46.916: sequence -> Event Next(10)
Next(0 - 10)
Next(1 - 10)
Next(2 - 10)
2016-04-12 19:19:46.918: sequence -> Event Next(20)
Next(0 - 20)
Next(1 - 20)
Next(2 - 20)
2016-04-12 19:19:46.919: sequence -> Event Next(30)
Next(0 - 30)
Next(1 - 30)
Next(2 - 30)
2016-04-12 19:19:46.921: sequence -> Event Completed
Completed
2016-04-12 19:19:46.921: sequence -> disposed
--- flatMapFirst with delay example ---
2016-04-12 19:19:46.925: flatMapSequence -> subscribed
2016-04-12 19:19:46.926: sequence -> subscribed
2016-04-12 19:19:46.935: sequence -> Event Next(0) // SO генерирует 1й элемент
2016-04-12 19:19:46.935: sA -> subscribed // на его основе генерируется Observable sA, на который мы подписываемся
2016-04-12 19:19:46.936: sA -> Event Next(10)
2016-04-12 19:19:46.936: flatMapSequence -> Event Next(10)
Next(10)
2016-04-12 19:19:47.936: sA -> Event Next(20)
2016-04-12 19:19:47.936: flatMapSequence -> Event Next(20)
Next(20)
2016-04-12 19:19:50.926: sequence -> Event Next(1) // SO генерирует 2й элемент, но на этот момент sA еще генерирует элементы, поэтому подписки на sB не произошло, он просто отбрасывается
2016-04-12 19:19:53.935: sA -> Event Completed
2016-04-12 19:19:53.936: sA -> disposed // sA закончил генерировать элементы, от него отписались
2016-04-12 19:19:55.137: sequence -> Event Next(2) // SO генерирует 3й элемент
2016-04-12 19:19:55.137: sC -> subscribed // т.к. на этот момент нет активных Observable (от sA мы отписались, sB - мы отбросили) - мы можем подписаться на него
2016-04-12 19:19:55.137: sC -> Event Next(1000)
2016-04-12 19:19:55.137: flatMapSequence -> Event Next(1000)
Next(1000)
2016-04-12 19:19:56.236: sC -> Event Next(2000)
2016-04-12 19:19:56.236: flatMapSequence -> Event Next(2000)
Next(2000)
2016-04-12 19:19:57.335: sC -> Event Completed
2016-04-12 19:19:57.336: sC -> disposed
2016-04-12 19:19:58.926: sequence -> Event Completed
2016-04-12 19:19:58.926: flatMapSequence -> Event Completed
Completed
2016-04-12 19:19:58.926: sequence -> disposed
Первый пример показывает, что т.к. Observable успевают сгенерировать свои элементы, ко времени когда приходит очередь подписаться на новый Observable — это уже разрешено, поэтому в RO попадают элементы со всех Observable
А вот второй пример очень объемный, но позволят в подробностях наблюдать как происходят подписка/отписка и как это влияет на генерацию элементов
Каждый элемент SO превращается в отдельный Observable. Изначально подписываемся на O1, его элементы зеркально генерируются в RO. Как только из SO выпускается очередной элемент и на его основе генерируется очередной Observable — элементы предыдущего Observable отбрасываются, т.к. происходит отписка. Таким образом в RO — элементы из последнего генерированного Observable
example("flatMapLatest") {
let sequence:Observable<Int> = Observable.of(10, 20, 30)
let flatMapSequence = sequence.flatMapLatest{val in Observable.of(0, 1, 2)
.map{"($0) - (val)"
}
}
flatMapSequence.subscribe { e in
print(e)
}
}
example("flatMapLatest with delay") {
let subjectA = Observable<Int>.create{ observer in
delay(0) {
observer.on(.Next(10))
}
delay(3) {
observer.on(.Next(20))
}
delay(5) {
observer.onCompleted()
}
return NopDisposable.instance
}.debug("sA")
let subjectB = Observable<Int>.create{ observer in
delay(0) {
observer.on(.Next(100))
}
delay(1) {
observer.on(.Next(200))
}
delay(2) {
observer.onCompleted()
}
return NopDisposable.instance
}.debug("sB")
let subjectC = Observable<Int>.create{ observer in
delay(0) {
observer.on(.Next(1000))
}
delay(1) {
observer.on(.Next(2000))
}
delay(2) {
observer.onCompleted()
}
return NopDisposable.instance
}.debug("sC")
let subjects = [subjectA, subjectB, subjectC]
let sequence:Observable<Int> = createSequenceWithWait([0, 1, 2],waitTime:1) {$0}
.debug("sequence")
let flatMapSequence:Observable<Int> = sequence.flatMapLatest{val in
return subjects[val].asObservable()
}.debug("flatMapSequence")
flatMapSequence.subscribe { e in
print(e)
}
}
Консоль:
--- flatMapLatest example ---
Next(0 - 10)
Next(1 - 10)
Next(2 - 10)
Next(0 - 20)
Next(1 - 20)
Next(2 - 20)
Next(0 - 30)
Next(1 - 30)
Next(2 - 30)
Completed
--- flatMapLatest with delay example ---
2016-04-12 19:30:50.309: flatMapSequence -> subscribed
2016-04-12 19:30:50.310: sequence -> subscribed
2016-04-12 19:30:50.318: sequence -> Event Next(0) // SO сгенерировала 1й элемент, и на его основе создали sA
2016-04-12 19:30:50.319: sA -> subscribed // подписались на sA
2016-04-12 19:30:50.319: sA -> Event Next(10) // он генерирует элемент
2016-04-12 19:30:50.319: flatMapSequence -> Event Next(10) // flatMap генерирует новый элемент
Next(10) // и он попадает в RO
2016-04-12 19:30:51.310: sequence -> Event Next(1) // SO сгенерировала 2й элемент, и на его основе создали sA
2016-04-12 19:30:51.311: sA -> disposed // и хоть sA не успел сгенерировать все элементы, от него мы отписываемся
2016-04-12 19:30:51.311: sB -> subscribed // и подписываемся на новый Observable sB
2016-04-12 19:30:51.311: sB -> Event Next(100)
2016-04-12 19:30:51.311: flatMapSequence -> Event Next(100)
Next(100)
2016-04-12 19:30:52.310: sequence -> Event Next(2)
2016-04-12 19:30:52.311: sB -> disposed
2016-04-12 19:30:52.311: sC -> subscribed
2016-04-12 19:30:52.311: sC -> Event Next(1000)
2016-04-12 19:30:52.311: flatMapSequence -> Event Next(1000)
Next(1000)
2016-04-12 19:30:53.372: sequence -> Event Completed
2016-04-12 19:30:53.372: sequence -> disposed
2016-04-12 19:30:53.372: sC -> Event Next(2000)
2016-04-12 19:30:53.372: flatMapSequence -> Event Next(2000)
Next(2000)
2016-04-12 19:30:54.501: sC -> Event Completed
2016-04-12 19:30:54.501: sC -> disposed
2016-04-12 19:30:54.501: flatMapSequence -> Event Completed
Completed
Первый пример показывает, что т.к. Observable успевают сгенерировать свои элементы, ко времени когда приходит очередь подписаться на новый Observable — предыдущий уже успел сгенерировать все свои элементы, поэтому в RO попадают элементы со всех Observable
Во втором примере благодаря задержкам в генерации мы видим, что как только генерируется новый Observable — происходит отписка от предыдущего Observable
Каждый элемент SO превращается в отдельный Observable, и все элементы из [O1, O2, O3…] объединяются в RO. Порядок генерации элементов в RO зависит от времени их генерации в исходных [O1, O2, O3…] (как в команде merge). Отличие от flatMap в том, что в качестве еще одного параметра переданного в функцию является индекс сгенерированного элемента
example("flatMapWithIndex") {
let sequence:Observable<Int> = Observable.of(10, 20, 30)
let flatMapSequence:Observable<String> = sequence.flatMapWithIndex{val, idx in Observable.of("A", "B", "C").map{"index: ((idx)) - ($0) - (val)"} }
print(flatMapSequence.dynamicType)
flatMapSequence.subscribe { e in
print(e)
}
}
example("flatMapWithIndex with wait") {
let sequence:Observable<Int> = createSequenceWithWait([0,1,2], waitTime: 1) { $0 }
let flatMapSequence:Observable<String> = sequence.flatMapWithIndex{val, idx in
createSequenceWithWait(["A","B","C"], waitTime: 2) { element in
"index: ((idx)) - (element) - (val)"
}
}
print(flatMapSequence.dynamicType)
flatMapSequence.subscribe { e in
print(e)
}
}
Консоль:
FlatMapWithIndex<Int, Observable<String>>
Next(index: (0) - A - 10)
Next(index: (0) - B - 10)
Next(index: (0) - C - 10)
Next(index: (1) - A - 20)
Next(index: (1) - B - 20)
Next(index: (1) - C - 20)
Next(index: (2) - A - 30)
Next(index: (2) - B - 30)
Next(index: (2) - C - 30)
Completed
--- flatMapWithIndex with wait example ---
FlatMapWithIndex<Int, Observable<String>>
Next(index: (0) - A - 0)
Next(index: (1) - A - 1)
Next(index: (0) - B - 0)
Next(index: (2) - A - 2)
Next(index: (1) - B - 1)
Next(index: (0) - C - 0)
Next(index: (2) - B - 2)
Next(index: (1) - C - 1)
Next(index: (2) - C - 2)
Completed
Observable<T> -> Observable<U>
Элементы SO преобразовываются, не меняя порядок их генерации. Можно менять не только значение, но и тип элементов.
example("map") {
let sequence = Observable.of(1, 2, 3)
.map{ "($0 * 5)" }
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- map example ---
Next(5)
Next(10)
Next(15)
Completed
Observable<T> -> Observable<U>
Элементы SO преобразовываются, не меняя порядок их генерации. Можно менять не только значение, но и тип элементов. Отличие от map в том, что в качестве еще одного параметра переданного в функцию является индекс сгенерированного элемента
example("mapWithIndex") {
let sequence = Observable.of("A", "B", "C")
.mapWithIndex({ "($0) / ($1)" })
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- mapWithIndex example ---
Next(A / 0)
Next(B / 1)
Next(C / 2)
Completed
SO = Observable<T>
RO = Observable<Observable<T>>
Элемент из SO по определенным правилам передаются в генерирующиеся новые Observable. В качестве параметров передается count, — максимальное число элементов которые будут сгенерированы каждым Observable, и timeSpan — время максимального ожидания наполнения текущего Observable из элементов SO. Таким образом элемент RO, являет собой Observable число генерируемых элементов которого равно от 0 до N. Основным отличием от bufffer — элементы SO зеркалятся сгенерированными Observable моментально, а в случае buffer — мы вынуждены ждать указанное в качестве параметра максимальное время (если буфер не заполнится раньше)
example("window") {
let varA = Variable<Int>(0)
let bufferSequence:Observable<Observable<Int>> = varA.asObservable()
.window(timeSpan: 3, count: 3, scheduler: MainScheduler.instance)
.debug("bufferSequence")
bufferSequence.subscribe { e in
if case .Next(let observable) = e {
print("(NSDate()) - генерируем новый Observable")
observable.subscribe { val in
print(val)
}
}
}
varA.value = 1
varA.value = 2
varA.value = 3
delay(4) {
varA.value = 4
varA.value = 5
delay(4) {
varA.value = 6
}
}
}
Консоль:
--- window example ---
2016-04-12 19:51:54.372: bufferSequence -> subscribed
2016-04-12 19:51:54.373: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>)
2016-04-12 16:51:54 +0000 - генерируем новый Observable
Next(0)
Next(1)
Next(2)
Completed
2016-04-12 19:51:54.377: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>)
2016-04-12 16:51:54 +0000 - генерируем новый Observable
Next(3)
Completed
2016-04-12 19:51:57.378: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>)
2016-04-12 16:51:57 +0000 - генерируем новый Observable
Next(4)
Next(5)
Completed
2016-04-12 19:52:00.380: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>)
2016-04-12 16:52:00 +0000 - генерируем новый Observable
Next(6)
Completed
2016-04-12 19:52:02.895: bufferSequence -> Event Completed
В примере используются временные задержки что помогает добиться частичной наполненности генерируемых Observable
Каждый элемент SO преобразуется с помощью переданной функции, результат операции передается в качестве параметра в функцию на следующем шаге. Как только SO генерирует терминальное состояние, RO генерирует результат, т.е. RO сгенерирует лишь один элемент.
example("reduce") {
let sequence = Observable.of(1, 2, 3, 4)
.reduce(1) { $0 * $1 }
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- reduce example ---
Next(24)
Completed
Каждый элемент SO преобразуется с помощью переданной функции, результат операции генерируется в RO, но кроме этого оно передается в качестве параметра в функцию на следующем шаге. В отличии от reduce число элементов в RO равно числу элементов в SO.
example("scan") {
let sequence = Observable.of(1, 2, 3).scan(10) { result, element in
return result + element
}
sequence.subscribe { e in
print(e)
}
}
example("scan multiply") {
let sequence = Observable.of(2, 3, 5).scan(10) { result, element in
return result * element
}
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- scan example ---
Next(11)
Next(13)
Next(16)
Completed
--- scan multiply example ---
Next(20)
Next(60)
Next(300)
Completed
SO = Observable<T>
RO = Observable<[T]>
Все элементы из SO после генерации терминального состояния объединяются в массив и генерируются RO
example("toArray") {
let sequence = Observable.of(1, 2, 3)
let arraySequence = sequence.toArray()
arraySequence.subscribe { e in
print(e)
}
}
Консоль:
--- toArray example ---
Next([1, 2, 3])
Completed
Позволяет перехватить генерированную ошибку из SO и заменить ее на новый Observable, который теперь будет генерировать элементы
example("with catchError") {
let sequenceWithError = Observable<Int>.create { observer in
observer.on(.Next(1))
observer.on(.Next(2))
observer.on(.Next(3))
observer.on(.Next(4))
observer.onError(RxError.Unknown)
observer.on(.Next(5))
return NopDisposable.instance
}
let sequenceIgnoreError = sequenceWithError.catchError{ error in
return Observable.of(10, 11, 12)
}
sequenceIgnoreError.subscribe { e in
print(e)
}
}
Консоль:
--- with catchError example ---
Next(1)
Next(2)
Next(3)
Next(4)
Next(10)
Next(11)
Next(12)
Completed
После генерации элемента 4, была сгенерирована ошибка RxError.Unknown, но мы её перехватили и вернули взамен новый Observable
Позволяет перехватить генерированную ошибку из SO и заменить её на указанный элемент, после этого SO генерирует Completed
example("with catchErrorJustReturn") {
let sequenceWithError = Observable.of(1, 2, 3, 4)
.concat(Observable.error(RxError.Unknown))
.concat(Observable.just(5))
let sequenceIgnoreError = sequenceWithError.catchErrorJustReturn(-1)
sequenceIgnoreError.subscribe { e in
print(e)
}
}
Консоль:
--- with catchErrorJustReturn example ---
Next(1)
Next(2)
Next(3)
Next(4)
Next(-1)
Completed
После генерации элемента 4, была сгенерирована ошибка RxError.Unknown, но мы её перехватили и вернули взамен элемент -1
Позволяет перехватить генерированную ошибку из SO и в зависимости от переданного параметра попытаться запустить SO c начала нужное число раз в надежде что ошибка не повторится
example("retry full sequence") {
let sequenceWithError = Observable.of(1, 2, 3, 4).concat(Observable.error(RxError.Unknown))
let wholeSequenceWithErrorRetry = sequenceWithError.retry(2)
wholeSequenceWithErrorRetry.subscribe { e in
print(e)
}
}
Консоль:
--- retry full sequence example ---
Next(1)
Next(2)
Next(3)
Next(4)
Next(1)
Next(2)
Next(3)
Next(4)
Error(Unknown error occured.)
Т.к. был применен оператор retry(2) — мы один раз повторили продписку на SO, но ошибка повторилась, и была сгенерирована в RO
Таким образом retry(1) — не сделает ни одного повтора
Позволяет перехватить сгенерированную ошибку из SO и в зависимости от типа ошибки мы либо повторно генерируем ошибку, которая пробрасывается в RO и на этом выполнение заканчивается, либо генерируем Observable (tryObservable), генерация каждого корректного элемента которого выполнит повторную подписку на SO, в надежде что ошибка исчезнет. Если tryObservable заканчивается ошибкой — она пробрасывается в RO и на этом выполнение заканчивается
example("retryWhen") {
var counter = 0
let sequenceWithError = Observable<Int>.create { observer in
observer.on(.Next(1))
observer.on(.Next(2))
observer.on(.Next(3))
observer.on(.Next(4))
counter += 1
if counter < 3 {
observer.onError(RxError.Unknown)
} /*else {
observer.onError(RxError.Overflow)
}*/
observer.on(.Next(5))
return NopDisposable.instance
}.debug("with error")
let sequenceWithoutError = Observable<Int>.create { observer in
observer.on(.Next(10))
//observer.onError(RxError.NoElements)
return NopDisposable.instance
}.debug("without error")
let retrySequence = sequenceWithError.retryWhen{ (error: Observable<RxError>) -> Observable<Int> in
let seq:Observable<Int> = error.flatMap { (generatedError: RxError) -> Observable<Int> in
if case .Unknown = generatedError {
return sequenceWithoutError
}
return Observable<Int>.error(generatedError)
}
return seq
}//.debug()
retrySequence.subscribe { e in
print(e)
}
}
Консоль:
--- retryWhen example ---
2016-04-12 20:18:04.484: with error -> subscribed
2016-04-12 20:18:04.485: with error -> Event Next(1)
Next(1)
2016-04-12 20:18:04.486: with error -> Event Next(2)
Next(2)
2016-04-12 20:18:04.486: with error -> Event Next(3)
Next(3)
2016-04-12 20:18:04.487: with error -> Event Next(4)
Next(4)
2016-04-12 20:18:04.487: with error -> Event Error(Unknown error occured.)
2016-04-12 20:18:04.488: without error -> subscribed
2016-04-12 20:18:04.488: without error -> Event Next(10)
2016-04-12 20:18:04.489: with error -> disposed
2016-04-12 20:18:04.489: with error -> subscribed
2016-04-12 20:18:04.489: with error -> Event Next(1)
Next(1)
2016-04-12 20:18:04.490: with error -> Event Next(2)
Next(2)
2016-04-12 20:18:04.490: with error -> Event Next(3)
Next(3)
2016-04-12 20:18:04.490: with error -> Event Next(4)
Next(4)
2016-04-12 20:18:04.491: with error -> Event Error(Unknown error occured.)
2016-04-12 20:18:04.491: without error -> subscribed
2016-04-12 20:18:04.492: without error -> Event Next(10)
2016-04-12 20:18:04.492: with error -> disposed
2016-04-12 20:18:04.492: with error -> subscribed
2016-04-12 20:18:04.493: with error -> Event Next(1)
Next(1)
2016-04-12 20:18:04.493: with error -> Event Next(2)
Next(2)
2016-04-12 20:18:04.493: with error -> Event Next(3)
Next(3)
2016-04-12 20:18:04.494: with error -> Event Next(4)
Next(4)
2016-04-12 20:18:04.494: with error -> Event Next(5)
Next(5)
Я встроил инкремент переменной i в генерацию sequenceWithError, чтобы на 3й попытке — ошибка исчезла. Если раскоментировать генерацию ошибку RxError.Overflow — мы её не перехватим в операторе retryWhen и пробросим в RO
Позволяет проксировать элементы из исходной SO на Subject переданный в качестве параметра. Подписываться нужно именно на этот Subject, генерация элементов Subject начнется после вызова оператора connect.
example("multicast") {
let subject = PublishSubject<Int>()
let firstSequence = createSequenceWithWait([0,1,2,3,4,5]) { $0 }
.multicast(subject)
delay(2) {
_ = subject.subscribe { e in
print("first: (e)")
}
}
delay(3) {
_ = subject.subscribe { e in
print("second: (e)")
}
}
firstSequence.connect()
}
Консоль:
--- multicast example ---
first: Next(2)
first: Next(3)
second: Next(3)
first: Next(4)
second: Next(4)
first: Next(5)
second: Next(5)
first: Completed
second: Completed
publish = multicast + replay subject
Позволяет создавать Connectable Observable, которые не генерируют события даже после subscribe. Для старта генерации таким Observable нужно дать команду connect. Это позволяет подписать несколько Observer к одному Observable и начать генерировать элементы одновременно, вне зависимости от того, когда был выполнен subscribe
example("subscribe connectable sequnce with connect") {
let sequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).debug("sequence").publish()
var disposable1: Disposable!
var disposable2: Disposable!
disposable1 = sequence.subscribe { e in
print("first: (e)")
}
delay(2) {
disposable2 = sequence.subscribe { e in
print("second: (e)")
}
}
delay(4) {
sequence.connect()
}
delay(8) {
disposable1.dispose()
disposable2.dispose()
}
}
Консоль:
--- subscribe connectable sequnce with connect example ---
2016-04-12 21:35:32.130: sequence -> subscribed
2016-04-12 21:35:33.131: sequence -> Event Next(0)
first: Next(0)
second: Next(0)
2016-04-12 21:35:34.131: sequence -> Event Next(1)
first: Next(1)
second: Next(1)
2016-04-12 21:35:35.132: sequence -> Event Next(2)
first: Next(2)
second: Next(2)
2016-04-12 21:35:36.132: sequence -> Event Next(3)
2016-04-12 21:35:37.132: sequence -> Event Next(4)
Как видно, хоть подписка была произведена в разное время, пока не вызвали команду connect — генерация элементов не началась. Зато благодаря команде debug видно, что даже после того как все отписались — последовательность продолжила генерировать элементы
Позволяет создать обычный Observable из Connectable. После первого вызова subscribe к этому обычному Observable — происходит подписка Connectable на SO.
Получается что то вроде
publishSequence = SO.publish()
refCountSequence = publishSequence.refCount()
SO будет продолжать генерировать элементы до тех пор, пока есть хотя бы один подписанный на refCountSequence. Как только все подписки на refCountSequence аннулируются, — происходит отписка и publishSequence от SO
example("with refCount") {
let sequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).debug("sequence")
let publishSequence = sequence.publish() // создаем Connectable Observable
let refCountSequence = publishSequence.refCount().debug("refCountSequence")
let subscription1 = refCountSequence.subscribe{ e in
print("first: (e)")
}
let subscription2 = refCountSequence.subscribe{ e in
print("second: (e)")
}
delay(2) {
subscription1.dispose() // отписываемся в первый раз
}
delay(3) {
subscription2.dispose() // здесь мы отписываемся во второй, больше подписок на Observable созданный с помощью refCount нет. Поэтому этот Observable отпсиывается от SO
}
delay(5) {
_ = refCountSequence.subscribe { e in
print("after: (e)")
}
}
}
Консоль:
--- with refCount example ---
2016-04-12 20:25:24.154: refCountSequence -> subscribed // подписались на refCountSequence в 1й раз
2016-04-12 20:25:24.155: sequence -> subscribed // после этого publishSequence подписался на SO
2016-04-12 20:25:24.156: refCountSequence -> subscribed // // подписались на refCountSequence во 2й раз
2016-04-12 20:25:25.156: sequence -> Event Next(0)
2016-04-12 20:25:25.156: refCountSequence -> Event Next(0)
first: Next(0)
2016-04-12 20:25:25.156: refCountSequence -> Event Next(0)
second: Next(0)
2016-04-12 20:25:26.156: sequence -> Event Next(1)
2016-04-12 20:25:26.156: refCountSequence -> Event Next(1)
first: Next(1)
2016-04-12 20:25:26.157: refCountSequence -> Event Next(1)
second: Next(1)
2016-04-12 20:25:26.353: refCountSequence -> disposed // отписались от refCountSequence в 1й раз
2016-04-12 20:25:27.156: sequence -> Event Next(2) // SO продолжает генерировать элементы, т.к. есть еще одна подписка на refCountSequence
2016-04-12 20:25:27.157: refCountSequence -> Event Next(2)
second: Next(2)
2016-04-12 20:25:27.390: refCountSequence -> disposed // отписались от refCountSequence во 2й раз
2016-04-12 20:25:27.390: sequence -> disposed // подписок на refCountSequence больше н еосталось, поэтому publishSequence отписался на SO
2016-04-12 20:25:29.157: refCountSequence -> subscribed // подписались на refCountSequence снова
2016-04-12 20:25:29.157: sequence -> subscribed // т.к. это первая подписка - publishSequence заново подписался на SO
2016-04-12 20:25:30.158: sequence -> Event Next(0)
2016-04-12 20:25:30.159: refCountSequence -> Event Next(0)
after: Next(0)
2016-04-12 20:25:31.158: sequence -> Event Next(1)
2016-04-12 20:25:31.159: refCountSequence -> Event Next(1)
after: Next(1)
2016-04-12 20:25:32.159: sequence -> Event Next(2)
2016-04-12 20:25:32.159: refCountSequence -> Event Next(2)
after: Next(2)
....
далее бесконечно генерируются элементы
Если SO обычный, — конвертирует его в Connectable. И после этого все кто подпишутся на него после вызова connect() — мгновенно получат в качестве первых элементов последние генерированные N элементов. Даже если отпишутся все, — Connectable будет продолжать генерировать элементы
example("replay") {
let firstSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).replay(2)
let firstDisposable = firstSequence.subscribe { e in
print("first: (e)")
}
firstSequence.connect()
var secondDisposable: Disposable!
delay(3) {
secondDisposable = firstSequence.subscribe { e in
print("second: (e)")
}
}
delay(4) {
firstDisposable.dispose()
}
delay(5) {
secondDisposable.dispose()
}
delay(7) {
firstSequence.subscribe { e in
print("third: (e)")
}
}
}
Консоль:
--- replay example ---
first: Next(0)
first: Next(1)
first: Next(2)
second: Next(1)
second: Next(2)
first: Next(3) // после генерации этого элемента мы отписываемся во 1й раз
second: Next(3)
second: Next(4) //после генерации этого элемента мы отписываемся во 2й раз, но SO продолжает генерировать элементы
//тут мы после задержки подписываемся в третий раз
third: Next(5)
third: Next(6)
Если SO обычный, — конвертирует его в Connectable. Все кто подпишутся на него после вызова connect() — получат сначала все элементы, которые были генерированы ранее. Даже если отпишутся все, — Connectable будет продолжать генерировать элементы
example("replayAll") {
let firstSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).replayAll()
let firstDisposable = firstSequence.subscribe { e in
print("first: (e)")
}
firstSequence.connect()
var secondDisposable: Disposable!
delay(3) {
secondDisposable = firstSequence.subscribe { e in
print("second: (e)")
}
}
delay(4) {
firstDisposable.dispose()
}
delay(5) {
secondDisposable.dispose()
}
delay(7) {
firstSequence.subscribe { e in
print("third: (e)")
}
}
}
Консоль:
--- replayAll example ---
first: Next(0)
first: Next(1)
first: Next(2)
second: Next(0)
second: Next(1)
second: Next(2)
first: Next(3) // после генерации этого элемента мы отписываемся во 1й раз
second: Next(3)
second: Next(4)
//после генерации этого элемента мы отписываемся во 2й раз, но SO продолжает генерировать элементы
//тут мы после задержки подписываемся в третий раз
third: Next(0)
third: Next(1)
third: Next(2)
third: Next(3)
third: Next(4)
third: Next(5)
third: Next(6)
third: Next(7)
RO полностью дублирует SO, но логируются все события с временной меткой
example("debug") {
let sequence = Observable<AnyObject>.of(1, 2, 3)
.debug("sequence")
.subscribe{}
}
Консоль:
--- debug example ---
2016-04-12 21:41:08.467: sequence -> subscribed
2016-04-12 21:41:08.469: sequence -> Event Next(1)
2016-04-12 21:41:08.469: sequence -> Event Next(2)
2016-04-12 21:41:08.469: sequence -> Event Next(3)
2016-04-12 21:41:08.469: sequence -> Event Completed
RO полностью дублирует SO, но мы встраиваем перехватчик всех событий из жизненного цикла SO
example("simple doOn") {
let firstSequence = Observable.of(1,2).doOn{e in
print(e)
}
firstSequence.subscribeNext{ e in // замечу что логирование производится не в методе subscribe, как обычно, а в doOn
}
}
Консоль:
--- simple doOn example ---
Next(1)
Next(2)
Completed
Дублирует элементы из SO в RO, но с временной задержкой указанной в качестве параметра
example("delaySubscription") {
let sequence = Observable.of(1, 2, 3).debug("sequence")
.delaySubscription(3, scheduler: MainScheduler.instance).debug("delayed sequence")
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- delaySubscription example ---
2016-04-12 21:44:05.226: delayed sequence -> subscribed // подписка на delayed sequence произошла в 5 секунд
2016-04-12 21:44:08.228: sequence -> subscribed // а подписка на SO произошла только через указанных 3 секунды
2016-04-12 21:44:08.229: sequence -> Event Next(1)
2016-04-12 21:44:08.229: delayed sequence -> Event Next(1)
Next(1)
2016-04-12 21:44:08.229: sequence -> Event Next(2)
2016-04-12 21:44:08.229: delayed sequence -> Event Next(2)
Next(2)
2016-04-12 21:44:08.229: sequence -> Event Next(3)
2016-04-12 21:44:08.229: delayed sequence -> Event Next(3)
Next(3)
2016-04-12 21:44:08.230: sequence -> Event Completed
2016-04-12 21:44:08.230: delayed sequence -> Event Completed
Completed
2016-04-12 21:44:08.230: sequence -> disposed
Указывает на каком Scheduler выполнять свою работу Observer, особенно критично при работе с GUI
example("without observeOn") {
let sequence = Observable<AnyObject>.of(1, 2, 3)
sequence.subscribe { e in
print("(NSThread.currentThread())(e)")
}
}
example("with observeOn") {
let queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)
let sequence = Observable<AnyObject>.of(1, 2, 3)
sequence.observeOn(ConcurrentDispatchQueueScheduler.init(queue: queue))
.subscribe { e in
print("(NSThread.currentThread())(e)")
}
}
Консоль:
--- without observeOn example ---
<NSThread: 0x7fac1ac13240>{number = 1, name = main}Next(1)
<NSThread: 0x7fac1ac13240>{number = 1, name = main}Next(2)
<NSThread: 0x7fac1ac13240>{number = 1, name = main}Next(3)
<NSThread: 0x7fac1ac13240>{number = 1, name = main}Completed
--- with observeOn example ---
<NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Next(1)
<NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Next(2)
<NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Next(3)
<NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Completed
Как видно, благодаря observeOn мы смогли выполнить код внутри subscribe на другом потоке
Оператор, связывающий Observable с Observer, позволяет подписаться на все события из Observable
example("subscribe") {
let firstSequence = Observable.of(1)
firstSequence.subscribe { e in
print(e)
}
firstSequence.subscribeCompleted {
print("!completed")
}
firstSequence.subscribeNext{next in
print("next: (next)")
}
}
example("subscribeNext") {
let firstSequence = Observable.of(1)
firstSequence.subscribeNext{next in
print("next: (next)")
}
}
example("subscribeCompleted") {
let firstSequence = Observable.of(1)
firstSequence.subscribeCompleted {
print("!completed")
}
}
example("subscribeError") {
let firstSequence = Observable<Int>.error(RxError.ArgumentOutOfRange)
firstSequence.subscribeError {e in
print("!error (e)")
}
}
Консоль:
--- subscribe example ---
Next(1)
Completed
!completed
next: 1
--- subscribeNext example ---
next: 1
--- subscribeCompleted example ---
!completed
--- subscribeError example ---
!error Argument out of range.
Продемонстрированы 4 формы: subscribe, subscribeNext, subscribeCompleted, subscribeError
Указывает на каком Scheduler выполнять свою работу Observable, особенно критично при работе с GUI
example("without subscribeOn") {
let sequence = Observable<AnyObject>.of(1, 2, 3)
sequence.subscribe { e in
print("(NSThread.currentThread()) (e)")
}
}
example("with subscribeOn") {
let queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)
let sequence = Observable<AnyObject>.of(1, 2, 3)
.subscribeOn(ConcurrentDispatchQueueScheduler.init(queue: queue))
sequence.subscribe { e in
print("(NSThread.currentThread()) (e)")
}
}
Консоль:
--- without subscribeOn example ---
<NSThread: 0x7fef30413290>{number = 1, name = main} Next(1)
<NSThread: 0x7fef30413290>{number = 1, name = main} Next(2)
<NSThread: 0x7fef30413290>{number = 1, name = main} Next(3)
<NSThread: 0x7fef30413290>{number = 1, name = main} Completed
--- with subscribeOn example ---
<NSThread: 0x7fef305c0db0>{number = 3, name = (null)} Next(1)
<NSThread: 0x7fef305c0db0>{number = 3, name = (null)} Next(2)
<NSThread: 0x7fef305c0db0>{number = 3, name = (null)} Next(3)
<NSThread: 0x7fef305c0db0>{number = 3, name = (null)} Completed
Дублирует элементы из SO в RO, но если в течении указанного времени SO не сгенерировало ни одного элемента — RO генерирует ошибку
example("failed timeout ") {
let sequence = createSequenceWithWait([1,2,3,4]) { $0 }
let timeoutSequence = sequence.timeout(0.9, scheduler: MainScheduler.instance)
timeoutSequence.subscribe { e in
print(e)
}
}
Консоль:
--- failed timeout example ---
Next(1)
Error(Sequence timeout.)
Позволяет проинструктировать Observable создать ресурс, который будет жить лишь пока жив RO, в качестве параметров передаются 2 фабрики, одна генерирует ресурс, вторая — Observable, у которых будет единое время жизни
class FakeDisposable: Disposable {
func dispose() {
print("disposed")
}
}
example("using") {
let sequence = Observable.using({
return FakeDisposable()
}, observableFactory: { d in
Observable.just(1)
}) as Observable<Int>
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- using example ---
Next(1)
Completed
disposed
Как видно, после того как Observable закончил генерировать элементы, у нашего ресурса FakeDisposable был вызван метод dispose
Автор: SparkLone
Источник [123]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/ios/117791
Ссылки в тексте:
[1] reactivex.io: http://reactivex.io/
[2] официальную документацию: https://github.com/ReactiveX/RxSwift
[3] RxMarbles: http://rxmarbles.com/
[4] iPhone/iPad: https://itunes.apple.com/ru/app/rxmarbles/id1087272442?mt=8
[5] ссылка на гитхаб: https://github.com/sparklone/RxSwift
[6] ссылка конкретно на PDF: https://github.com/sparklone/RxSwift/blob/master/RXSwift%20operators.pdf
[7] Заметки: #Intro
[8] asObservable: #asObservable
[9] create: #create
[10] deferred: #deferred
[11] empty: #empty
[12] error: #error
[13] interval: #interval
[14] just: #just
[15] never: #never
[16] of: #of
[17] range: #range
[18] repeatElement: #repeatElement
[19] timer: #timer
[20] amb: #amb
[21] combineLatest: #combineLatest
[22] concat: #concat
[23] merge: #merge
[24] startWith: #startWith
[25] switchLatest: #switchLatest
[26] withLatestFrom: #withLatestFrom
[27] zip: #zip
[28] distinctUntilChanged: #distinctUntilChanged
[29] elementAt: #elementAt
[30] filter: #filter
[31] ignoreElements: #ignoreElements
[32] sample: #sample
[33] single: #single
[34] skip: #skip
[35] skip (duration): #skipDuration
[36] skipUntil: #skipUntil
[37] skipWhile: #skipWhile
[38] skipWhileWithIndex: #skipWhileWithIndex
[39] take: #take
[40] take (duration): #takeDuration
[41] takeLast: #takeLast
[42] takeUntil: #takeUntil
[43] takeWhile: #takeWhile
[44] takeWhileWithIndex: #takeWhileWithIndex
[45] throttle: #throttle
[46] buffer: #buffer
[47] flatMap: #flatMap
[48] flatMapFirst: #flatMapFirst
[49] flatMapLatest: #flatMapLatest
[50] flatMapWithIndex: #flatMapWithIndex
[51] map: #map
[52] mapWithIndex: #mapWithIndex
[53] window: #window
[54] reduce: #reduce
[55] scan: #scan
[56] toArray: #toArray
[57] catchError: #catchError
[58] catchErrorJustReturn: #catchErrorJustReturn
[59] retry: #retry
[60] retryWhen: #retryWhen
[61] multicast: #multicast
[62] publish: #publish
[63] refCount: #refCount
[64] reply: #reply
[65] replayAll: #replayAll
[66] debug: #debug
[67] doOn / doOnNext: #doOn
[68] delaySubscription: #delaySubscription
[69] observeOn: #observeOn
[70] subscribe: #subscribe
[71] subscribeOn: #subscribeOn
[72] timeout: #timeout
[73] using: #using
[74] asObservable: http://reactivex.io/documentation/operators/from.html
[75] create: http://reactivex.io/documentation/operators/create.html
[76] deferred: http://reactivex.io/documentation/operators/defer.html
[77] empty: http://reactivex.io/documentation/operators/empty-never-throw.html
[78] interval: http://reactivex.io/documentation/operators/interval.html
[79] just: http://reactivex.io/documentation/operators/just.html
[80] range: http://reactivex.io/documentation/operators/range.html
[81] repeatElement: http://reactivex.io/documentation/operators/repeat.html
[82] timer: http://reactivex.io/documentation/operators/timer.html
[83] amb: http://reactivex.io/documentation/operators/amb.html
[84] combineLatest: http://reactivex.io/documentation/operators/combinelatest.html
[85] concat: http://reactivex.io/documentation/operators/concat.html
[86] merge: http://reactivex.io/documentation/operators/merge.html
[87] startWith: http://reactivex.io/documentation/operators/startwith.html
[88] switchLatest: http://reactivex.io/documentation/operators/switch.html
[89] zip: http://reactivex.io/documentation/operators/zip.html
[90] distinctUntilChanged: http://reactivex.io/documentation/operators/distinct.html
[91] elementAt: http://reactivex.io/documentation/operators/elementat.html
[92] filter: http://reactivex.io/documentation/operators/filter.html
[93] ignoreElements: http://reactivex.io/documentation/operators/ignoreelements.html
[94] sample: http://reactivex.io/documentation/operators/sample.html
[95] single: http://reactivex.io/documentation/operators/first.html
[96] skip: http://reactivex.io/documentation/operators/skip.html
[97] skipUntil: http://reactivex.io/documentation/operators/skipuntil.html
[98] skipWhile: http://reactivex.io/documentation/operators/skipwhile.html
[99] take: http://reactivex.io/documentation/operators/take.html
[100] takeLast: http://reactivex.io/documentation/operators/takelast.html
[101] takeUntil: http://reactivex.io/documentation/operators/takeuntil.html
[102] takeWhile: http://reactivex.io/documentation/operators/takewhile.html
[103] throttle: http://reactivex.io/documentation/operators/debounce.html
[104] buffer: http://reactivex.io/documentation/operators/buffer.html
[105] flatMap: http://reactivex.io/documentation/operators/flatmap.html
[106] map: http://reactivex.io/documentation/operators/map.html
[107] window: http://reactivex.io/documentation/operators/window.html
[108] reduce: http://reactivex.io/documentation/operators/reduce.html
[109] scan: http://reactivex.io/documentation/operators/scan.html
[110] toArray: http://reactivex.io/documentation/operators/to.html
[111] catchError: http://reactivex.io/documentation/operators/catch.html
[112] retry: http://reactivex.io/documentation/operators/retry.html
[113] multicast: http://reactivex.io/documentation/operators/publish.html
[114] refCount: http://reactivex.io/documentation/operators/refcount.html
[115] replay: http://reactivex.io/documentation/operators/replay.html
[116] do / doOnNext: http://reactivex.io/documentation/operators/do.html
[117] delaySubscription: http://reactivex.io/documentation/operators/delay.html
[118] observeOn: http://reactivex.io/documentation/operators/observeon.html
[119] subscribe: http://reactivex.io/documentation/operators/subscribe.html
[120] subscribeOn: http://reactivex.io/documentation/operators/subscribeon.html
[121] timeout: http://reactivex.io/documentation/operators/timeout.html
[122] using: http://reactivex.io/documentation/operators/using.html
[123] Источник: https://habrahabr.ru/post/281292/
Нажмите здесь для печати.