- PVSM.RU - https://www.pvsm.ru -

RxSwift шпаргалка по операторам (+ PDF)

RxSwift шпаргалка по операторам (+ PDF) - 1

Заинтересовавшись темой функционального программирования я встал на распутье, — какой фреймворк выбрать для ознакомления. ReactiveCocoa — ветеран в iOS кругах, по нему вдоволь информации. Но он вырос с Objective-C, и хотя это не является проблемой, но все же в данный момент я в основном пишу именно на Swift, — хотелось бы взять решение изначально спроектированное с учетом всех плюшек языка. RxSwift же порт Reactive Extensions, имеющего долгую историю, но сам порт свежий и написанный именно под Swift. На нем я и решил остановиться.
Но специфика документации по RxSwift в том, что описание всех команд ведет на reactivex.io [1], а там в основном дается общая информация, руки у разработчиков не дошли еще сделать документацию именно для RxSwift, что не всегда удобно. Некоторые команды имеют тонкости в реализации, есть те, о которых в общей документации нет ничего кроме упоминания.
Прочитав все главы вики с RxSwift гитхаба, я сразу решил поразбираться с официальными примерами, тут то и стало ясно, что с RX такое не пройдет, нужно хорошо понимать основы, иначе будешь как мартышка с копипастом гранатой. Я начал разбирать самые сложные для понимания команды, потом те, что вроде понятны, но задав себе вопросы по ним я понял, что лишь догадываюсь на то как верно ответить, но не уверен.
В общем ничтоже сумняшеся я решил проработать все операторы RxSwift. Лучший способ что то понять в программировании — запустить код и посмотреть как он отработает. Затем учитывая специфику реактивного программирования — очень полезны схемы, ну и краткое описание на русском. Закончив сегодня работу, я подумал, что грех не поделиться результатами с тем, кто лишь присматривается к теме реактивного программирования.
Много картинок и текста под катом, очень много!

Предварительно я рекомендую просмотреть официальную документацию [2], у меня передана основная суть и специфика RxSwift команд, а не основы.
Так же можно «поиграться» с шариками в схемах, так называемые RxMarbles [3], есть бесплатная версия под iPhone/iPad [4]

Итак, в этой статье я рассмотрю все (ну или почти все) команды RxSwift, по каждой я дам краткое описание, схему(если это имеет смысл), код, результат выполнения, при необходимости сделаю комментарии по выводу в лог результатов выполнения кода.
В статье заголовок каждой команды — ссылка на на официальную документацию, т.к. я не ставил перед собой цели перевести все нюансы по командам.

Вот ссылка на гитхаб [5], куда я склонировал официальный репозиторий RxSwift, и добавил свою песочницу (DescribeOperators.playground), где и будет практически тот же код, что и в статье.
А вот и ссылка конкретно на PDF [6] где в виде mindMap собраны все команды, что позволяет быстро просмотреть их все. Кусочки кода в PDF приложены для того чтобы увидеть как и с каким параметрами нужно работать с командой. Изначально ради этого PDF я все и затеял — иметь под рукой документ в котором наглядно видны все команды с их схемами. PDF получился огромным (в плане рабочего пространства, а не веса), но я проверял, даже на iPad 2 все нормально просматривается.

Обо всех ошибках просьба писать в личку, объем работ оказался слегка великоват, после четвертой вычитки текста мои глаза меня прокляли.
Что ж, надеюсь моя работа кому то пригодится. Приступим.

Содержание

Заметки [7]

Создание Observable

asObservable [8]
create [9]
deferred [10]
empty [11]
error [12]
interval [13]
just [14]
never [15]
of [16]
range [17]
repeatElement [18]
timer [19]

Комбинирование Observable

amb [20]
combineLatest [21]
concat [22]
merge [23]
startWith [24]
switchLatest [25]
withLatestFrom [26]
zip [27]

Фильтрация

distinctUntilChanged [28]
elementAt [29]
filter [30]
ignoreElements [31]
sample [32]
single [33]
skip [34]
skip (duration) [35]
skipUntil [36]
skipWhile [37]
skipWhileWithIndex [38]
take [39]
take (duration) [40]
takeLast [41]
takeUntil [42]
takeWhile [43]
takeWhileWithIndex [44]
throttle [45]

Трансформация

buffer [46]
flatMap [47]
flatMapFirst [48]
flatMapLatest [49]
flatMapWithIndex [50]
map [51]
mapWithIndex [52]
window [53]

Операторы математические и агрегирования

reduce [54]
scan [55]
toArray [56]

Работа с ошибками

catchError [57]
catchErrorJustReturn [58]
retry [59]
retryWhen [60]

Операторы для работы с Connectable Observable

multicast [61]
publish [62]
refCount [63]
reply [64]
replayAll [65]

Вспомогательные методы

debug [66]
doOn / doOnNext [67]
delaySubscription [68]
observeOn [69]
subscribe [70]
subscribeOn [71]
timeout [72]
using [73]

В схемах я буду использовать обозначение Source/SO в качестве Source Observable, RO/Result в качестве Result Observable.

В качестве вспомогательного кода я буду пользоваться функцией createSequenceWithWait, она создает Observable из массива элементов с указанным интервалом между элементами.

public enum ResultType {
    case Infinite
    case Completed
    case Error
}

public func createSequenceWithWait<T, U>(array: [T], waitTime: Int64 = 1, resultType: ResultType = .Completed, describer: ((value: T) -> U)? = nil) -> Observable<U> {
    return Observable<U>.create{ observer  in
        for (idx, letter) in array.enumerate() {
            let time = dispatch_time(dispatch_time_t(DISPATCH_TIME_NOW), waitTime * Int64(idx) * Int64(NSEC_PER_SEC))
            dispatch_after(time, dispatch_get_main_queue()) {
                if let describer = describer {
                    let value = describer(value: letter)
                    observer.on(.Next(value))
                } else {
                    observer.on(.Next(letter as! U))
                }
                
            }
        }
        
        if resultType != .Infinite {
            let allTime = dispatch_time(dispatch_time_t(DISPATCH_TIME_NOW), waitTime * Int64(array.count) * Int64(NSEC_PER_SEC))
            dispatch_after(allTime, dispatch_get_main_queue()) {
                switch resultType {
                case .Completed:
                    observer.onCompleted()
                case .Error:
                    observer.onError(RxError.Unknown)
                default:
                    break
                }
                
            }
        }
        
        return NopDisposable.instance
    }
}

Функция example — просто позволяет отделять вывод в консоли, её код следующий (взят из RxSwift)

public func example(description: String, action: () -> ()) {
    print("n--- (description) example ---")
    action()
}

Во всех примерах, где необходимо работать с временными задержками, если этот код будет запускаться в песочнице — необходимо прописать

import XCPlayground
XCPlaygroundPage.currentPage.needsIndefiniteExecution = true

Так же подразумевается, что читатель имеет общее представление о том, что такое реактивное программирование в общем и о RxSwift в частности. Не знаю есть ли смысл городить очередную вводную.

Создание Observable


asObservable [74]

Этот метод реализован в классах RxSwift, если они поддерживают конвертацию в Observable. Например: ControlEvent, ControlProperty, Variable, Driver

example("as Observable") {
    let variable = Variable<Int>(0)
    
    variable.asObservable().subscribe { e in
        print(e)
    }
    variable.value = 1
}

Консоль:

--- as Observable example ---
Next(0)
Next(1)
Completed

В данном примере мы Variable преобразовали в Observable и подписались на его события


create [75]

Этот метод позволяет создавать Observable с нуля, полностью контролируя какие элементы и когда он будет генерировать.

example("create") {
    let firstSequence = Observable<AnyObject>.of(1, 2, 3)
    let secondSequence = Observable<AnyObject>.of("A", "B", "C")
    
    let multipleSequence = Observable<Observable<AnyObject>>.create { observer in
        observer.on(.Next(firstSequence))
        observer.on(.Next(secondSequence))
        return NopDisposable.instance
    }
    let concatSequence = multipleSequence.concat()
    concatSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- create example ---
Next(1)
Next(2)
Next(3)
Next(A)
Next(B)
Next(C)

В данном примере мы создали Observable вручную, он сгенерирует два Observable, элементы которых мы затем объединим командой concat, на получившийся Observable мы и подпишемся


deferred [76]

Этот оператор позволяет отложить создание Observable, до момента подписки с помощью subscribe

example("without deferred") {
    var i = 1
    let justObservable = Observable.just(i)
    i = 2
    _ = justObservable.subscribeNext{ print ("i = ($0)") }
}

example("with deferred") {
    var i = 1
    let deferredJustObservable = Observable.deferred{
        Observable.just(i)
    }
    i = 2
    _ = deferredJustObservable.subscribeNext{ print ("i = ($0)") }
}

Консоль:

--- without deferred example ---
i = 1

--- with deferred example ---
i = 2

В первом случае Observable создается сразу, с помощью Observable.just(i), и изменение значение i уже не влияет на генерируемый элемент это последовательностью. Во втором же случае мы создаем с помощью deferred, и мы можем поменять значение i перед subscribe


empty [77]

Пустая последовательность, заканчивающаяся Completed

RxSwift шпаргалка по операторам (+ PDF) - 2

example("empty") {
    let sequence = Observable<Int>.empty()
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- empty example ---
Completed

error [77]

Создаст последовательность, которая состоит из одного события — Error

RxSwift шпаргалка по операторам (+ PDF) - 3

example("error") {
    let sequence = Observable<Int>
        .error(RxError.Unknown)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- error example ---
Error(Unknown error occured.)

interval [78]

Создает бесконечную последовательность, возрастающую с 0 с шагом 1, с указанной периодичностью

RxSwift шпаргалка по операторам (+ PDF) - 4

example("interval") {
    let sequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- interval example ---
Next(0)
Next(1)
Next(2)
Next(3)
Next(4)
....

just [79]

Создает последовательность из любого значения, которая завершается Completed

RxSwift шпаргалка по операторам (+ PDF) - 5

example("just") {
    let sequence = Observable.just(100)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- just example ---
Next(100)
Completed

never [77]

Пустая последовательность, чьи observer’ы никогда не вызываются, т.е. не будет сгенерировано ни одно событие

RxSwift шпаргалка по операторам (+ PDF) - 6

example("never") {
    let sequence = Observable<Int>.never()
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- never example ---

of [74]

Последовательность из variadic переменной, после всех элементов генерируется Completed

RxSwift шпаргалка по операторам (+ PDF) - 7

example("simple of") {
    let sequence = Observable.of(1, 2)

    sequence.subscribe { e in
        print(e)
    }
}

example("of for Observables") {
    let firstSequence = Observable<AnyObject>.of(1, 2, 3)
    let secondSequence = Observable<AnyObject>.of("A", "B", "C")
    
    let bothSequence = Observable.of(firstSequence, secondSequence)
    let mergedSequence = bothSequence.merge()
    
    mergedSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- simple of example ---
Next(1)
Next(2)
Completed

--- of for Observables example ---
Next(1)
Next(2)
Next(3)
Next(A)
Next(B)
Next(C)
Completed

В первом случае мы создали последовательность из двух чисел. Во втором из двух Observable, а затем их объединили между собой с помощью оператора merge


range [80]

Создает последовательность с конечным числом элементов, возрастающую с шагом 1 от указанного значения указанное число раз, после всех элементов генерируется Completed

RxSwift шпаргалка по операторам (+ PDF) - 8

example("range") {
    let sequence = Observable.range(start: 5, count: 3)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- range example ---
Next(5)
Next(6)
Next(7)
Completed

Сгенерировались элементы начиная с 5, 3 раза с шагом 1


repeatElement [81]

Бесконечно создавать указанный элемент, без задержек. Никогда не будет сгенерированы события Completed или Error

RxSwift шпаргалка по операторам (+ PDF) - 9

example("repeatElement") {
    let sequence = Observable.repeatElement(1, scheduler: MainScheduler.instance)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- repeatElement example ---
Next(1)
Next(2)
Next(3)
.....

timer [82]

Бесконечная последовательность, возрастающая с 0 с шагом 1, с указанной периодичностью и возможность задержки при старте. Никогда не будет сгенерированы события Completed или Error

RxSwift шпаргалка по операторам (+ PDF) - 10

example("timer") {
    let sequence = Observable<Int64>.timer(2, period: 3, scheduler: MainScheduler.instance)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- timer example ---
Next(0)
Next(1)
Next(2)

В данном примере последовательность начнет генерировать элементы с задержкой в 2 секунды, каждые 3 секунды

Комбинирование Observable


amb [83]

SO = [Observable<T>] или SO1, SO2 = Observable<T>
RO = Observable<T>

Из всех Observable SO выбирается тот, который первым начинает генерировать элементы, его элементы и дублируются в RO, остальные SO игнорируются

RxSwift шпаргалка по операторам (+ PDF) - 11

example("amb") {
    let subjectA = PublishSubject<Int>()
    let subjectB = PublishSubject<Int>()
    let subjectC = PublishSubject<Int>()
    let subjectD = PublishSubject<Int>()
    
    let ambSequence = [subjectA, subjectB, subjectC, subjectD].amb()
    ambSequence.subscribe { e in
        print(e)
    }
    
    subjectC.onNext(0)
    subjectA.onNext(3)
    subjectB.onNext(102)
    subjectC.onNext(1)
    subjectD.onNext(45)
}

Консоль:

--- amb example ---
Next(0)
Next(1)

Т.к. первым сгенерировал элемент subjectC, — лишь его элементы дублируются в RO, остальные игнорируются


combineLatest [84]

SO = SO1, SO2,... SON = Observable<T>
RO = Observable<f(T,T)> 

Как только все Observable сгенерировали хотя бы по одному элементу — эти элементы используются в качестве параметров в переданную функцию, и результат этой функции генерируется RO в качестве элемента. В дальнейшем при генерации любым Observable элемента — генерируется новый результат функции с последними элементами из всех комбинируемых Observable

RxSwift шпаргалка по операторам (+ PDF) - 12

example("combineLatest") {
    let firstSequence = createSequenceWithWait([1,2,3], waitTime: 2) { element in
        "(element)"
    }.debug("firstSequence")
    let secondSequence = createSequenceWithWait(["A", "B", "C"], waitTime: 1) { element in
        "(element)"
        }
        .delaySubscription(3, scheduler: MainScheduler.instance)
        .debug("secondSequence")
    
    let concatSequence = Observable.combineLatest(firstSequence, secondSequence) {
        first, second -> String in
        "(first) - (second)"
    }
    concatSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- combineLatest example ---
2016-04-12 16:59:35.421: firstSequence -> subscribed
2016-04-12 16:59:35.422: secondSequence -> subscribed
2016-04-12 16:59:35.434: firstSequence -> Event Next(1)
2016-04-12 16:59:37.423: firstSequence -> Event Next(2)
2016-04-12 16:59:38.423: secondSequence -> Event Next(A)
Next(2 - A)
2016-04-12 16:59:39.423: firstSequence -> Event Next(3)
Next(3 - A)
2016-04-12 16:59:39.522: secondSequence -> Event Next(B)
Next(3 - B)
2016-04-12 16:59:40.622: secondSequence -> Event Next(C)
Next(3 - C)
2016-04-12 16:59:41.722: firstSequence -> Event Completed
2016-04-12 16:59:41.722: firstSequence -> disposed
2016-04-12 16:59:41.722: secondSequence -> Event Completed
2016-04-12 16:59:41.722: secondSequence -> disposed
Completed

В этом примере я создал Observable с помощью createSequenceWithWait, чтобы элементы генерировались с разной задержкой, чтобы было видно как перемешиваются элементы.
firstSequence успел сгенерировать 1 и 2, прежде чем secondSequence сгенерировал A, поэтому 1 отбросили, и первым выводом стало 2 — A


concat [85]

SO = Observable<Observable<T>> или SO1, SO2 = Observable<T>
RO = Observable<T>

В RO элементы включают сначала все элементы первого Observable, и лишь затем следующего. Это означает, что если первый Observable никогда не сгенерирует Completed, — элементы второго никогда не поступят в RO. Ошибка в текущем Observable пробрасывается в RO

RxSwift шпаргалка по операторам (+ PDF) - 13

example("concat object method") {
    let firstSequence = Observable<AnyObject>.of(1, 2, 3)
    let secondSequence = Observable<AnyObject>.of("A", "B", "C")
    let concatSequence = firstSequence.concat(secondSequence)
    concatSequence.subscribe { e in
        print(e)
    }
}

example("concat from array") {
    let firstSequence = Observable.of(1,2,3)
    let secondSequence = Observable.of(4,5,6)
    let concatSequence = Observable.of(firstSequence, secondSequence)
        .concat()
    
    concatSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- concat object method example ---
Next(1)
Next(2)
Next(3)
Next(A)
Next(B)
Next(C)
Completed

--- concat from array example ---
Next(1)
Next(2)
Next(3)
Next(4)
Next(5)
Next(6)
Completed

В первом примере мы присоединяем второй Observable к первому.
Во втором генерируем последовательность из массива.


merge [86]

SO = Observable<Observable<T>>
RO = Observable<T>

Элементы RO включают элементы из исходных Observable в том порядке, в котором они были выпущены в исходных Observable

RxSwift шпаргалка по операторам (+ PDF) - 14

example("simple merge") {
    let firstSequence = Observable<AnyObject>.of(1, 2, 3)
    let secondSequence = Observable<AnyObject>.of("A", "B", "C")

    let bothSequence = Observable.of(firstSequence, secondSequence)
    let mergedSequence = bothSequence.merge()
    
    mergedSequence.subscribe { e in
        print(e)
    }
}

example("merge with wait") {
    let firstSequence = createSequenceWithWait([1,2,3]) { element in
        "(element)"
    }
    let secondSequence = createSequenceWithWait(["A", "B", "C"], waitTime: 2) { element in
        "(element)"
    }

    let bothSequence = Observable.of(firstSequence, secondSequence)
    let mergedSequence = bothSequence.merge()
    
    mergedSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- simple merge example ---
Next(1)
Next(2)
Next(3)
Next(A)
Next(B)
Next(C)
Completed

--- merge with wait example ---
Next(1)
Next(A)
Next(2)
Next(3)
Next(B)
Next(C)
Completed

В первом примере мы сливаем две последовательности созданные без задержки, в итоге первый Observable успевает сгенерировать все свои элементы перед тем как это начнет делать второй, результат оказывается идентичен concat
Во втором же случае последовательности сделаны с задержкой в генерации, и видно что элементы в RO теперь вперемешку, в том порядке в котором они были сгенерированы в исходных Observable


startWith [87]

SO = Observable<T>
RO = Observable<T>

В начало SO добавляются элементы переданные в качестве аргумента

RxSwift шпаргалка по операторам (+ PDF) - 15

example("startWith") {
    let sequence = Observable.of(1, 2, 3).startWith(0)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- startWith example ---
Next(0)
Next(1)
Next(2)
Next(3)
Completed

switchLatest [88]

SO = Observable<Observable<T>>
RO = Observable<T> 

Изначально подписываемся на O1 генерируемого SO, его элементы зеркально генерируются в RO. Как только из SO генерируется очередной Observable — элементы предыдущего Observable отбрасываются, т.к. происходит отписка от O1, подписываемся на O2 и так далее. Таким образом в RO — элементы лишь из последнего сгенерированного Observable

RxSwift шпаргалка по операторам (+ PDF) - 16

example("switchLatest") {
    let varA = Variable<Int>(0)
    let varB = Variable<Int>(100)
    
    let proxyVar = Variable(varA.asObservable())
    let concatSequence = proxyVar.asObservable().switchLatest()
    
    concatSequence.subscribe { e in
        print(e)
    }
    varA.value = 1
    varA.value = 2
    varB.value = 3
    proxyVar.value = varB.asObservable()
    varB.value = 4
    varA.value = 5
}


example("switchLatest") {
    let observableA = Observable<Int>.create{ observer in
        delay(0) {
            observer.on(.Next(10))
        }
        delay(3) {
            observer.on(.Next(20))
        }
        delay(5) {
            observer.onCompleted()
        }
        return NopDisposable.instance
        }.debug("oA")
    let observableB = Observable<Int>.create{ observer in
        delay(0) {
            observer.on(.Next(100))
        }
        delay(1) {
            observer.on(.Next(200))
        }
        delay(2) {
            observer.onCompleted()
        }
        return NopDisposable.instance
        }.debug("oB")
    
    let observableC = Observable<Int>.create{ observer in
        delay(0) {
            observer.on(.Next(1000))
        }
        delay(1) {
            observer.on(.Next(2000))
        }
        delay(2) {
            observer.onCompleted()
        }
        return NopDisposable.instance
        }.debug("oC")
    
        let subjects = [observableA, observableB, observableC]
        let sequence:Observable<Observable<Int>> = createSequenceWithWait([observableA, observableB, observableC],waitTime:1) {$0}
        let switchLatestSequence:Observable<Int> = sequence.switchLatest()
        switchLatestSequence.subscribe { e in
            print(e)
        }
}

Консоль:


--- switchLatest example ---
Next(0)
Next(1)
Next(2)
Next(3)
Next(4)
Completed

--- switchLatest example ---
2016-04-12 17:15:22.710: oA -> subscribed
2016-04-12 17:15:22.711: oA -> Event Next(10)
Next(10)
2016-04-12 17:15:23.797: oA -> disposed // происходит отписка как только сгенерировался oB
2016-04-12 17:15:23.797: oB -> subscribed
2016-04-12 17:15:23.797: oB -> Event Next(100)
Next(100)
2016-04-12 17:15:24.703: oB -> disposed // происходит отписка как только сгенерировался oC
2016-04-12 17:15:24.703: oC -> subscribed 
2016-04-12 17:15:24.703: oC -> Event Next(1000)
Next(1000)
2016-04-12 17:15:25.800: oC -> Event Next(2000)
Next(2000)
2016-04-12 17:15:26.703: oC -> Event Completed
2016-04-12 17:15:26.703: oC -> disposed
Completed

В первом примере показано как команда работает в статике, когда мы руками переподключаем Observable.
Во втором же у нас последовательности с задержками. observableA, observableB, observableC из SO генерируются раз в 1 секунду. Их же элементы генерируются с различными задержками.


withLatestFrom [84]

SO1, SO2 = Observable<T>
RO = Observable<f(T,T)>

Как только O1 генерирует элемент проверяется сгенерирован ли хоть один элемент в O2, если да, то берутся последние элементы из O1 и O2 и используются в качестве аргументов для переданной функции, результат которой генерируется RO в качестве элемента

RxSwift шпаргалка по операторам (+ PDF) - 17

example("withLatestFrom") {
    let varA = Variable<Int>(0)
    let varB = Variable<Int>(10)
    
    let withLatestFromSequence = varA.asObservable().withLatestFrom(varB.asObservable()) {
        "($0) - ($1)"
    }
    withLatestFromSequence.subscribe { e in
        print(e)
    }
    varA.value = 1
    varA.value = 2 
    varB.value = 20
    
    varB.value = 30
    varA.value = 5
    varA.value = 6
}

Консоль:

--- withLatestFrom example ---
Next(0 - 10)
Next(1 - 10)
Next(2 - 10)
Next(5 - 30)
Next(6 - 30)
Completed

zip [89]

SO = Observable<Observable<T>>
RO = Observable<f(T,T)> 

Элементы RO представляют собой комбинацию из элементов сгенерированных исходными Observable, объединение идет по индексу выпущенного элемента

RxSwift шпаргалка по операторам (+ PDF) - 18

example("zip with simple Variable") {
    let varA = Variable<Int>(0)
    let varB = Variable<Int>(10)
    
    let zippedSequence = Observable.zip(varA.asObservable(), varB.asObservable()) { "($0) - ($1)"
    }
    
    zippedSequence.subscribe { e in
        print(e)
    }
    varA.value = 1
    varA.value = 2
    varB.value = 20
    
    varB.value = 30
    varA.value = 3
    varA.value = 4
}

example("zip with PublishSubject") {
    let subjectA = PublishSubject<Int>()
    let subjectB = PublishSubject<Int>()

    let zippedSequence = Observable.zip(subjectA, subjectB) { "($0) - ($1)"
    }
    
    zippedSequence.subscribe { e in
        print(e)
    }
    subjectA.onNext(0)
    subjectA.onNext(1)
    subjectA.onNext(2)
    subjectB.onNext(100)
    subjectB.onNext(101)
    subjectA.onNext(3)
    subjectB.onNext(102)
    subjectA.onNext(4)
}

Консоль:

--- zip with simple Variable example ---
Next(0 - 10)
Next(1 - 20)
Next(2 - 30)
Completed

--- zip with PublishSubject example ---
Next(0 - 100)
Next(1 - 101)
Next(2 - 102)

Из примеров видно, что элементы комбинируются в том порядке, в каком они были сгенерированы в исходных Observable

Фильтрация


distinctUntilChanged [90]

Пропускаем все повторяющиеся подряд идущие элементы

RxSwift шпаргалка по операторам (+ PDF) - 19

example("distinctUntilChanged") {
    let sequence = Observable.of(1, 2, 2, 3, 4, 4, 4, 1).distinctUntilChanged()
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- distinctUntilChanged example ---
Next(1)
Next(2)
Next(3)
Next(4)
Next(1)
Completed

Здесь тонкий момент, что отбрасываются не уникальные для всей последовательности элементы, а лишь те, которые идут подряд.


elementAt [91]

В RO попадает лишь элемент выпущенный N по счету

RxSwift шпаргалка по операторам (+ PDF) - 20

example("elementAt") {
    let sequence = Observable.of(0, 10, 20, 30, 40)
        .elementAt(2)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- elementAt example ---
Next(20)
Completed

filter [92]

Отбрасываются все элементы, которые не удовлетворяют заданным условиям

RxSwift шпаргалка по операторам (+ PDF) - 21

example("filter") {
    let sequence = Observable.of(1, 20, 3, 40)
        .filter{ $0 > 10}
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- filter example ---
Next(20)
Next(40)
Completed

ignoreElements [93]

Отбрасывает все элементы, передаёт только терминальные сообщения Completed и Error

RxSwift шпаргалка по операторам (+ PDF) - 22

example("ignoreElements") {
    let sequence = Observable.of(1, 2, 3, 4)
        .ignoreElements()
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- ignoreElements example ---
Completed

sample [94]

При каждом сгенерированном элементе последовательности семплера (воспринимать как таймер) — брать последний выпущенный элемент исходной последовательности и дублировать его в RO, ЕСЛИ он не был сгенерирован ранее

RxSwift шпаргалка по операторам (+ PDF) - 23

example("sampler") {
    let sampler = Observable<Int>.interval(1, scheduler: MainScheduler.instance).debug("sampler")
    
    let sequence:Observable<Int> = createSequenceWithWait([1,2,3,4], waitTime: 3).sample(sampler)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- sampler example ---
2016-04-12 18:28:20.322: sampler -> subscribed
2016-04-12 18:28:21.323: sampler -> Event Next(0)
Next(1)
2016-04-12 18:28:22.324: sampler -> Event Next(1) // элемент в RO не был сгенерирован, т.к. он уже был сгенерирован ранее
2016-04-12 18:28:23.323: sampler -> Event Next(2)
Next(2)
2016-04-12 18:28:24.323: sampler -> Event Next(3) // элемент в RO не был сгенерирован, т.к. он уже был сгенерирован ранее
2016-04-12 18:28:25.323: sampler -> Event Next(4) // элемент в RO не был сгенерирован, т.к. он уже был сгенерирован ранее
2016-04-12 18:28:26.323: sampler -> Event Next(5)
Next(3)
...

single [95]

Из исходной последовательности берется единственный элемент, если элементов > 1 — генерировать ошибку. Есть вариант с предикатом

RxSwift шпаргалка по операторам (+ PDF) - 24

example("single generate error") {
    let sequence = Observable.of(1, 2, 3, 4).single()
    sequence.subscribe { e in
        print(e)
    }
}

example("single") {
    let sequence = Observable.of(1, 2, 3, 5).single {
        $0 % 2 == 0
    }
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- single generate error example ---
Next(1)
Error(Sequence contains more than one element.)

--- single example ---
Next(2)
Completed

В первом примере в исходной последовательности оказалось больше 1 элемента, поэтому была сгенерирована ошибка в момент генерирования в SO второго элемента
Во втором примере условиям предиката удовлетворил всего 1 элемент, поэтому ошибки сгенерировано не было


skip [96]

Из SO отбрасываем первые N элементов

RxSwift шпаргалка по операторам (+ PDF) - 25

example("skip") {
    let sequence = Observable.of(1, 2, 3, 4).skip(2)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- skip example ---
Next(3)
Next(4)
Completed

skip (duration) [96]

Из SO отбрасываем первые элементы, которые были сгенерированы в первые N

RxSwift шпаргалка по операторам (+ PDF) - 26

example("skip duration with wait") {
    let sequence = createSequenceWithWait([1,2,3,4]) { $0 }.skip(2, scheduler: MainScheduler.instance)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- skip duration with wait example ---
Next(3)
Next(4)
Completed

skipUntil [97]

Отбрасываем из SO элементы, которые были сгенерированы до начала генерации элементов последовательностью переданной в качестве параметра

RxSwift шпаргалка по операторам (+ PDF) - 27

example("skipUntil") {
    let firstSequence = createSequenceWithWait([1,2,3,4]) { $0 }
    let secondSequence = Observable.just(1)
        .delaySubscription(1, scheduler: MainScheduler.instance)
    let skippedSequence = firstSequence.skipUntil(secondSequence)
    
    skippedSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- skipUntil example ---
Next(3)
Next(4)
Completed

Генерация элементов в secondSequence была отложена на 1 секунду с помощью команды delaySubscription, таким образом элементы из firstSequence стали дублироваться в RO лишь через 1 секунду


skipWhile [98]

Отбрасываем из SO элементы до тех пор, пока истинно условие возвращаемой переданное функцией

RxSwift шпаргалка по операторам (+ PDF) - 28

example("skipWhile") {
    let firstSequence = [1,2,3,4,0].toObservable()
    let skipSequence = firstSequence.skipWhile { $0 < 3 }
    
    skipSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- skipWhile example ---
Next(3)
Next(4)
Next(0)
Completed

skipWhileWithIndex [98]

Отбрасываем из SO элементы до тех пор, пока истинно условие возвращаемой переданное функцией. Отличие от skipWhile в том, что в качестве еще одного параметра переданного в функцию является индекс сгенерированного элемента

RxSwift шпаргалка по операторам (+ PDF) - 29

example("skipWhileWithIndex") {
    let firstSequence = [1,2,5,0,7].toObservable()
    let skipSequence = firstSequence.skipWhileWithIndex{ value, idx in
        value < 4 || idx < 2
    }
    skipSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- skipWhileWithIndex example ---
Next(5)
Next(0)
Next(7)
Completed

take [99]

Из SO берутся лишь первые N элементов

RxSwift шпаргалка по операторам (+ PDF) - 30

example("take") {
    let sequence = Observable.of(1, 2, 3, 4).take(2)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- take example ---
Next(1)
Next(2)
Completed

take (duration) [99]

Из SO берутся лишь элементы сгенерированные в первые N секунд

RxSwift шпаргалка по операторам (+ PDF) - 31

example("take duration with wait") {
    let sequence = createSequenceWithWait([1,2,3,4]) { $0 }
    let takeSequence = sequence.take(2, scheduler: MainScheduler.instance)
    takeSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- take duration with wait example ---
Next(1)
Next(2)
Completed

takeLast [100]

Из SO берутся лишь последние N элементов. Что означает, если SO никогда не закончит генерировать элементы — в RO не попадет ни одного элемента.

RxSwift шпаргалка по операторам (+ PDF) - 32

example("takeLast") {
    let sequence = Observable.of(1, 2, 3, 4).takeLast(2)
    sequence.subscribe { e in
        print(e)
    }
}

example("takeLast with wait") {
    let sequence = createSequenceWithWait([1,2,3,4]) { $0 }
    let takeSequence = sequence.takeLast(2)
    takeSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- takeLast example ---
Next(3)
Next(4)
Completed

--- takeLast with wait example ---
Next(3)
Next(4)
Completed

Второй пример приведен для иллюстрации в задержке генерации элементов в RO из за ожидания завершения генерации элементов в SO


takeUntil [101]

Из SO берутся элементы, которые были выпущены до начала генерации элементов последовательностью переданной в качестве параметра

RxSwift шпаргалка по операторам (+ PDF) - 33

example("takeUntil") {
    let stopSequence = Observable.just(1)
        .delaySubscription(2, scheduler: MainScheduler.instance)
    let sequence = createSequenceWithWait([1,2,3,4]) { $0 }
        .takeUntil(stopSequence)
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- takeUntil example ---
Next(1)
Next(2)
Completed

takeWhile [102]

Из SO берутся Элементы до тех пор, пока истинно условие возвращаемой переданной функцией

RxSwift шпаргалка по операторам (+ PDF) - 34

example("takeWhile") {
    let sequence = [1,2,3,4].toObservable().takeWhile{ $0 < 3 }
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- takeWhile example ---
Next(1)
Next(2)
Completed

takeWhileWithIndex [102]

Из SO берутся Элементы до тех пор, пока истинно условие возвращаемой переданной функцией. Отличие от takeWhile в том, что в качестве еще одного параметра переданного в функцию является индекс сгенерированного элемента

RxSwift шпаргалка по операторам (+ PDF) - 35

example("takeWhileWithIndex") {
    let sequence = [1,2,3,4,5,6].toObservable()
        .takeWhileWithIndex{ (val, idx) in
            val % 2 == 0 || idx < 3
    }
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- takeWhileWithIndex example ---
Next(1)
Next(2)
Next(3)
Next(4)
Completed

throttle [103]

Из SO берутся лишь элементы, после которых не было новых элементов N секунд.

RxSwift шпаргалка по операторам (+ PDF) - 36

example("throttle") {
    let sequence = Observable.of(1, 2, 3, 4)
        .throttle(1, scheduler: MainScheduler.instance)
    sequence.subscribe { e in
        print(e)
    }
}

example("throttle with wait") {
    let sequence = createSequenceWithWait([1,2,3,4]) { $0 }
        .throttle(0.5, scheduler: MainScheduler.instance)
    
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- throttle example ---
Next(4)
Completed

--- throttle with wait example ---
Next(1)
Next(2)
Next(3)
Next(4)
Completed

В первом случае SO генерирует элементы без задержек, поэтому лишь последний элемент не имеет после себя новых элементов.
Во втором примере элементы генерируются медленнее, чем N секунд переданные в throttle, поэтому за каждым генерируемым элементом достаточный временной промежуток.

Трансформация


buffer [104]

SO = Observable<>>
RO = Observable<[T]>

Элементы из SO по определенным правилам объединяются в массивы и генерируются в RO. В качестве параметров передается count, — максимальное число элементов в массиве, и timeSpan время максимального ожидания наполнения текущего массива из элементов SO. Таким образом элемент RO, являет собой массив [T], длинной от 0 до count.

RxSwift шпаргалка по операторам (+ PDF) - 37

example("buffer") {
    let varA = Variable<Int>(0)
    
    let bufferSequence = varA.asObservable()
        .buffer(timeSpan: 3, count: 3, scheduler: MainScheduler.instance)
    bufferSequence.subscribe { e in
        print("(NSDate()) - (e)")
    }
    varA.value = 1
    varA.value = 2
    varA.value = 3
    delay(3) {
        varA.value = 4
        varA.value = 5
        delay(5) {
            varA.value = 6
        }
    }
}

Консоль:

--- buffer example ---
2016-04-12 16:10:58 +0000 - Next([0, 1, 2])
2016-04-12 16:11:01 +0000 - Next([3]) 
2016-04-12 16:11:04 +0000 - Next([4, 5])
2016-04-12 16:11:07 +0000 - Next([6])
2016-04-12 16:11:07 +0000 - Completed

Длина массива была указана как 3, — как только были сгенерированы 3 элемента — в RO сгенерировался элемент [0, 1, 2]
После генерации элемента 3, — была задержка в 3 секунды, сработал таймаут, и массив не был заполнен полностью.
То же касается и задержки после генерации элемента 5


flatMap [105]

Каждый элемент SO превращается в отдельный Observable, и все элементы из [O1, O2, O3…] объединяются в RO. Порядок генерации элементов в RO зависит от времени их генерации в исходных [O1, O2, O3…] (как в команде merge)

RxSwift шпаргалка по операторам (+ PDF) - 38

example("flatMap with wait") {
    let sequence:Observable<Int> = createSequenceWithWait([0,1,2], waitTime: 1) { $0 }
    let flatMapSequence:Observable<String> = sequence.flatMap{val in
        createSequenceWithWait([10,11,12], waitTime: 2) { element in
            "(element) - (val)"
        }
    }
    flatMapSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- flatMap with wait example ---
Next(10 - 0)
Next(10 - 1)
Next(11 - 0)
Next(10 - 2)
Next(11 - 1)
Next(12 - 0)
Next(11 - 2)
Next(12 - 1)
Next(12 - 2)
Completed

flatMapFirst [105]

Каждый элемент SO превращается в отдельный Observable.
1) Изначально подписываемся на O1, его элементы зеркально генерируются в RO. Пока O1 генерирует элементы — все последующие Observable сгенерированные из SO отбрасываются, на них не подписываемся.
2) как только O1 оканчивается, — если будет сгенерирован новый Observable — на него подпишутся и его элементы будут дублироваться в RO.
Повторяем пункт 1, но вместо O1 берем последний сгенерированный Observable

RxSwift шпаргалка по операторам (+ PDF) - 39

example("flatMapFirst") {
    let sequence:Observable<Int> = Observable.of(10, 20, 30)
        .debug("sequence")
    let flatMapSequence:Observable<String> = sequence
        .flatMapFirst{val in
            Observable.of(0, 1, 2)
                .map{"($0) - (val)"
            }
    }
    flatMapSequence.subscribe { e in
        print(e)
    }
}


example("flatMapFirst with delay") {
    let subjectA = Observable<Int>.create{ observer in
        delay(0) {
            observer.on(.Next(10))
        }
        delay(1) {
            observer.on(.Next(20))
        }
        delay(7) {
            observer.onCompleted()
        }
        return NopDisposable.instance
    }.debug("sA")
    let subjectB = Observable<Int>.create{ observer in
        delay(0) {
            observer.on(.Next(100))
        }
        delay(1) {
            observer.on(.Next(200))
        }
        delay(2) {
            observer.onCompleted()
        }
        return NopDisposable.instance
    }.debug("sB")

    let subjectC = Observable<Int>.create{ observer in
        delay(0) {
            observer.on(.Next(1000))
        }
        delay(1) {
            observer.on(.Next(2000))
        }
        delay(2) {
            observer.onCompleted()
        }
        return NopDisposable.instance
    }.debug("sC")

    let subjects = [subjectA, subjectB, subjectC]
    let sequence:Observable<Int> = createSequenceWithWait([0, 1, 2],waitTime:4){$0}
        .debug("sequence")
    let flatMapSequence:Observable<Int> = sequence.flatMapFirst{val in
        return subjects[val].asObservable()
        }.debug("flatMapSequence")
    flatMapSequence.subscribe { e in
        print(e)
    }
}

Консоль:


--- flatMapFirst example ---
2016-04-12 19:19:46.915: sequence -> subscribed
2016-04-12 19:19:46.916: sequence -> Event Next(10)
Next(0 - 10)
Next(1 - 10)
Next(2 - 10)
2016-04-12 19:19:46.918: sequence -> Event Next(20)
Next(0 - 20)
Next(1 - 20)
Next(2 - 20)
2016-04-12 19:19:46.919: sequence -> Event Next(30)
Next(0 - 30)
Next(1 - 30)
Next(2 - 30)
2016-04-12 19:19:46.921: sequence -> Event Completed
Completed
2016-04-12 19:19:46.921: sequence -> disposed

--- flatMapFirst with delay example ---
2016-04-12 19:19:46.925: flatMapSequence -> subscribed
2016-04-12 19:19:46.926: sequence -> subscribed
2016-04-12 19:19:46.935: sequence -> Event Next(0) // SO генерирует 1й элемент
2016-04-12 19:19:46.935: sA -> subscribed // на его основе генерируется Observable sA, на  который мы подписываемся
2016-04-12 19:19:46.936: sA -> Event Next(10)
2016-04-12 19:19:46.936: flatMapSequence -> Event Next(10)
Next(10)
2016-04-12 19:19:47.936: sA -> Event Next(20)
2016-04-12 19:19:47.936: flatMapSequence -> Event Next(20)
Next(20)
2016-04-12 19:19:50.926: sequence -> Event Next(1) // SO генерирует 2й элемент, но на этот момент sA еще генерирует элементы, поэтому подписки на sB не произошло, он просто отбрасывается
2016-04-12 19:19:53.935: sA -> Event Completed 
2016-04-12 19:19:53.936: sA -> disposed // sA закончил генерировать элементы, от него отписались
2016-04-12 19:19:55.137: sequence -> Event Next(2) // SO генерирует 3й элемент
2016-04-12 19:19:55.137: sC -> subscribed // т.к. на этот момент нет активных Observable (от sA мы отписались, sB - мы отбросили) - мы можем подписаться на него
2016-04-12 19:19:55.137: sC -> Event Next(1000)
2016-04-12 19:19:55.137: flatMapSequence -> Event Next(1000)
Next(1000)
2016-04-12 19:19:56.236: sC -> Event Next(2000)
2016-04-12 19:19:56.236: flatMapSequence -> Event Next(2000)
Next(2000)
2016-04-12 19:19:57.335: sC -> Event Completed
2016-04-12 19:19:57.336: sC -> disposed
2016-04-12 19:19:58.926: sequence -> Event Completed
2016-04-12 19:19:58.926: flatMapSequence -> Event Completed
Completed
2016-04-12 19:19:58.926: sequence -> disposed

Первый пример показывает, что т.к. Observable успевают сгенерировать свои элементы, ко времени когда приходит очередь подписаться на новый Observable — это уже разрешено, поэтому в RO попадают элементы со всех Observable
А вот второй пример очень объемный, но позволят в подробностях наблюдать как происходят подписка/отписка и как это влияет на генерацию элементов


flatMapLatest [105]

Каждый элемент SO превращается в отдельный Observable. Изначально подписываемся на O1, его элементы зеркально генерируются в RO. Как только из SO выпускается очередной элемент и на его основе генерируется очередной Observable — элементы предыдущего Observable отбрасываются, т.к. происходит отписка. Таким образом в RO — элементы из последнего генерированного Observable

RxSwift шпаргалка по операторам (+ PDF) - 40

example("flatMapLatest") {
    let sequence:Observable<Int> = Observable.of(10, 20, 30)
    let flatMapSequence = sequence.flatMapLatest{val in Observable.of(0, 1, 2)
        .map{"($0) - (val)"
        }
    }
    flatMapSequence.subscribe { e in
        print(e)
    }
}

example("flatMapLatest with delay") {
    let subjectA = Observable<Int>.create{ observer in
        delay(0) {
            observer.on(.Next(10))
        }
        delay(3) {
            observer.on(.Next(20))
        }
        delay(5) {
            observer.onCompleted()
        }
        return NopDisposable.instance
        }.debug("sA")
    let subjectB = Observable<Int>.create{ observer in
        delay(0) {
            observer.on(.Next(100))
        }
        delay(1) {
            observer.on(.Next(200))
        }
        delay(2) {
            observer.onCompleted()
        }
        return NopDisposable.instance
        }.debug("sB")
    
    let subjectC = Observable<Int>.create{ observer in
        delay(0) {
            observer.on(.Next(1000))
        }
        delay(1) {
            observer.on(.Next(2000))
        }
        delay(2) {
            observer.onCompleted()
        }
        return NopDisposable.instance
        }.debug("sC")
    
    let subjects = [subjectA, subjectB, subjectC]
    let sequence:Observable<Int> = createSequenceWithWait([0, 1, 2],waitTime:1) {$0}
        .debug("sequence")
    let flatMapSequence:Observable<Int> = sequence.flatMapLatest{val in
        return subjects[val].asObservable()
        }.debug("flatMapSequence")
    flatMapSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- flatMapLatest example ---
Next(0 - 10)
Next(1 - 10)
Next(2 - 10)
Next(0 - 20)
Next(1 - 20)
Next(2 - 20)
Next(0 - 30)
Next(1 - 30)
Next(2 - 30)
Completed

--- flatMapLatest with delay example ---
2016-04-12 19:30:50.309: flatMapSequence -> subscribed
2016-04-12 19:30:50.310: sequence -> subscribed
2016-04-12 19:30:50.318: sequence -> Event Next(0) // SO сгенерировала 1й элемент, и на его основе создали sA
2016-04-12 19:30:50.319: sA -> subscribed // подписались на sA
2016-04-12 19:30:50.319: sA -> Event Next(10) // он генерирует элемент
2016-04-12 19:30:50.319: flatMapSequence -> Event Next(10) // flatMap генерирует новый элемент 
Next(10) // и он попадает в RO
2016-04-12 19:30:51.310: sequence -> Event Next(1) // SO сгенерировала 2й элемент, и на его основе создали sA
2016-04-12 19:30:51.311: sA -> disposed // и хоть sA не успел сгенерировать все элементы, от него мы отписываемся
2016-04-12 19:30:51.311: sB -> subscribed // и подписываемся на новый Observable sB
2016-04-12 19:30:51.311: sB -> Event Next(100)
2016-04-12 19:30:51.311: flatMapSequence -> Event Next(100)
Next(100)
2016-04-12 19:30:52.310: sequence -> Event Next(2)
2016-04-12 19:30:52.311: sB -> disposed
2016-04-12 19:30:52.311: sC -> subscribed
2016-04-12 19:30:52.311: sC -> Event Next(1000)
2016-04-12 19:30:52.311: flatMapSequence -> Event Next(1000)
Next(1000)
2016-04-12 19:30:53.372: sequence -> Event Completed
2016-04-12 19:30:53.372: sequence -> disposed
2016-04-12 19:30:53.372: sC -> Event Next(2000)
2016-04-12 19:30:53.372: flatMapSequence -> Event Next(2000)
Next(2000)
2016-04-12 19:30:54.501: sC -> Event Completed
2016-04-12 19:30:54.501: sC -> disposed
2016-04-12 19:30:54.501: flatMapSequence -> Event Completed
Completed

Первый пример показывает, что т.к. Observable успевают сгенерировать свои элементы, ко времени когда приходит очередь подписаться на новый Observable — предыдущий уже успел сгенерировать все свои элементы, поэтому в RO попадают элементы со всех Observable
Во втором примере благодаря задержкам в генерации мы видим, что как только генерируется новый Observable — происходит отписка от предыдущего Observable


flatMapWithIndex [105]

Каждый элемент SO превращается в отдельный Observable, и все элементы из [O1, O2, O3…] объединяются в RO. Порядок генерации элементов в RO зависит от времени их генерации в исходных [O1, O2, O3…] (как в команде merge). Отличие от flatMap в том, что в качестве еще одного параметра переданного в функцию является индекс сгенерированного элемента

RxSwift шпаргалка по операторам (+ PDF) - 41

example("flatMapWithIndex") {
    let sequence:Observable<Int> = Observable.of(10, 20, 30)
    let flatMapSequence:Observable<String> = sequence.flatMapWithIndex{val, idx  in Observable.of("A", "B", "C").map{"index: ((idx)) - ($0) - (val)"} }
    print(flatMapSequence.dynamicType)
    
    flatMapSequence.subscribe { e in
        print(e)
    }
}

example("flatMapWithIndex with wait") {
    let sequence:Observable<Int> = createSequenceWithWait([0,1,2], waitTime: 1) { $0 }
    let flatMapSequence:Observable<String> = sequence.flatMapWithIndex{val, idx in
        createSequenceWithWait(["A","B","C"], waitTime: 2) { element in
            "index: ((idx)) - (element) - (val)"
        }
    }
    print(flatMapSequence.dynamicType)
    
    flatMapSequence.subscribe { e in
        print(e)
    }
}

Консоль:

FlatMapWithIndex<Int, Observable<String>>
Next(index: (0) - A - 10)
Next(index: (0) - B - 10)
Next(index: (0) - C - 10)
Next(index: (1) - A - 20)
Next(index: (1) - B - 20)
Next(index: (1) - C - 20)
Next(index: (2) - A - 30)
Next(index: (2) - B - 30)
Next(index: (2) - C - 30)
Completed

--- flatMapWithIndex with wait example ---
FlatMapWithIndex<Int, Observable<String>>
Next(index: (0) - A - 0)
Next(index: (1) - A - 1)
Next(index: (0) - B - 0)
Next(index: (2) - A - 2)
Next(index: (1) - B - 1)
Next(index: (0) - C - 0)
Next(index: (2) - B - 2)
Next(index: (1) - C - 1)
Next(index: (2) - C - 2)
Completed

map [106]

Observable<T> -> Observable<U>

Элементы SO преобразовываются, не меняя порядок их генерации. Можно менять не только значение, но и тип элементов.

RxSwift шпаргалка по операторам (+ PDF) - 42

example("map") {
    let sequence = Observable.of(1, 2, 3)
        .map{ "($0 * 5)" }
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- map example ---
Next(5)
Next(10)
Next(15)
Completed

mapWithIndex [106]

Observable<T> -> Observable<U>

Элементы SO преобразовываются, не меняя порядок их генерации. Можно менять не только значение, но и тип элементов. Отличие от map в том, что в качестве еще одного параметра переданного в функцию является индекс сгенерированного элемента

RxSwift шпаргалка по операторам (+ PDF) - 43

example("mapWithIndex") {
    let sequence = Observable.of("A", "B", "C")
        .mapWithIndex({ "($0) / ($1)" })
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- mapWithIndex example ---
Next(A / 0)
Next(B / 1)
Next(C / 2)
Completed

window [107]

SO = Observable<T>
RO = Observable<Observable<T>>

Элемент из SO по определенным правилам передаются в генерирующиеся новые Observable. В качестве параметров передается count, — максимальное число элементов которые будут сгенерированы каждым Observable, и timeSpan — время максимального ожидания наполнения текущего Observable из элементов SO. Таким образом элемент RO, являет собой Observable число генерируемых элементов которого равно от 0 до N. Основным отличием от bufffer — элементы SO зеркалятся сгенерированными Observable моментально, а в случае buffer — мы вынуждены ждать указанное в качестве параметра максимальное время (если буфер не заполнится раньше)

RxSwift шпаргалка по операторам (+ PDF) - 44

example("window") {
    let varA = Variable<Int>(0)
    
    let bufferSequence:Observable<Observable<Int>> = varA.asObservable()
        .window(timeSpan: 3, count: 3, scheduler: MainScheduler.instance)
        .debug("bufferSequence")
    bufferSequence.subscribe { e in
        if case .Next(let observable) = e {
            print("(NSDate()) - генерируем новый Observable")
            observable.subscribe { val in
                print(val)
                
            }
        }
    }
    varA.value = 1
    varA.value = 2
    varA.value = 3
    delay(4) {
        varA.value = 4
        
        varA.value = 5
        delay(4) {
            varA.value = 6
        }
    }
}

Консоль:


--- window example ---
2016-04-12 19:51:54.372: bufferSequence -> subscribed
2016-04-12 19:51:54.373: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>)
2016-04-12 16:51:54 +0000 - генерируем новый Observable
Next(0)
Next(1)
Next(2)
Completed
2016-04-12 19:51:54.377: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>)
2016-04-12 16:51:54 +0000 - генерируем новый Observable
Next(3)
Completed
2016-04-12 19:51:57.378: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>)
2016-04-12 16:51:57 +0000 - генерируем новый Observable
Next(4)
Next(5)
Completed
2016-04-12 19:52:00.380: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>)
2016-04-12 16:52:00 +0000 - генерируем новый Observable
Next(6)
Completed
2016-04-12 19:52:02.895: bufferSequence -> Event Completed

В примере используются временные задержки что помогает добиться частичной наполненности генерируемых Observable

Операторы математические и агрегирования


reduce [108]

Каждый элемент SO преобразуется с помощью переданной функции, результат операции передается в качестве параметра в функцию на следующем шаге. Как только SO генерирует терминальное состояние, RO генерирует результат, т.е. RO сгенерирует лишь один элемент.

RxSwift шпаргалка по операторам (+ PDF) - 45

example("reduce") {
    let sequence = Observable.of(1, 2, 3, 4)
        .reduce(1) { $0 * $1 }
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- reduce example ---
Next(24)
Completed

scan [109]

Каждый элемент SO преобразуется с помощью переданной функции, результат операции генерируется в RO, но кроме этого оно передается в качестве параметра в функцию на следующем шаге. В отличии от reduce число элементов в RO равно числу элементов в SO.

RxSwift шпаргалка по операторам (+ PDF) - 46

example("scan") {
    let sequence = Observable.of(1, 2, 3).scan(10) { result, element in
        return result + element
    }
    sequence.subscribe { e in
        print(e)
    }
}

example("scan multiply") {
    let sequence = Observable.of(2, 3, 5).scan(10) { result, element in
        return result * element
    }
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- scan example ---
Next(11)
Next(13)
Next(16)
Completed

--- scan multiply example ---
Next(20)
Next(60)
Next(300)
Completed

toArray [110]

SO = Observable<T>
RO = Observable<[T]>   

Все элементы из SO после генерации терминального состояния объединяются в массив и генерируются RO

RxSwift шпаргалка по операторам (+ PDF) - 47

example("toArray") {
    let sequence = Observable.of(1, 2, 3)

    let arraySequence = sequence.toArray()
    arraySequence.subscribe { e in
            print(e)
        }
}

Консоль:

--- toArray example ---
Next([1, 2, 3])
Completed

Работа с ошибками


catchError [111]

Позволяет перехватить генерированную ошибку из SO и заменить ее на новый Observable, который теперь будет генерировать элементы

RxSwift шпаргалка по операторам (+ PDF) - 48

example("with catchError") {
    let sequenceWithError = Observable<Int>.create { observer in
        observer.on(.Next(1))
        observer.on(.Next(2))
        observer.on(.Next(3))
        observer.on(.Next(4))
        observer.onError(RxError.Unknown)
        observer.on(.Next(5))
        return NopDisposable.instance
    }
    let sequenceIgnoreError = sequenceWithError.catchError{ error in
        return Observable.of(10, 11, 12)
    }
    
    sequenceIgnoreError.subscribe { e in
        print(e)
    }
}

Консоль:

--- with catchError example ---
Next(1)
Next(2)
Next(3)
Next(4) 
Next(10)
Next(11)
Next(12)
Completed

После генерации элемента 4, была сгенерирована ошибка RxError.Unknown, но мы её перехватили и вернули взамен новый Observable


catchErrorJustReturn [111]

Позволяет перехватить генерированную ошибку из SO и заменить её на указанный элемент, после этого SO генерирует Completed

RxSwift шпаргалка по операторам (+ PDF) - 49

example("with catchErrorJustReturn") {
    let sequenceWithError = Observable.of(1, 2, 3, 4)
        .concat(Observable.error(RxError.Unknown))
        .concat(Observable.just(5))
    let sequenceIgnoreError = sequenceWithError.catchErrorJustReturn(-1)
    sequenceIgnoreError.subscribe { e in
        print(e)
    }
}

Консоль:

--- with catchErrorJustReturn example ---
Next(1)
Next(2)
Next(3)
Next(4)
Next(-1)
Completed

После генерации элемента 4, была сгенерирована ошибка RxError.Unknown, но мы её перехватили и вернули взамен элемент -1


retry [112]

Позволяет перехватить генерированную ошибку из SO и в зависимости от переданного параметра попытаться запустить SO c начала нужное число раз в надежде что ошибка не повторится

RxSwift шпаргалка по операторам (+ PDF) - 50

example("retry full sequence") {
    let sequenceWithError = Observable.of(1, 2, 3, 4).concat(Observable.error(RxError.Unknown))
    let wholeSequenceWithErrorRetry = sequenceWithError.retry(2)
    
    wholeSequenceWithErrorRetry.subscribe { e in
        print(e)
    }
}

Консоль:

--- retry full sequence example ---
Next(1)
Next(2)
Next(3)
Next(4)
Next(1)
Next(2)
Next(3)
Next(4)
Error(Unknown error occured.)

Т.к. был применен оператор retry(2) — мы один раз повторили продписку на SO, но ошибка повторилась, и была сгенерирована в RO
Таким образом retry(1) — не сделает ни одного повтора


retryWhen [112]

Позволяет перехватить сгенерированную ошибку из SO и в зависимости от типа ошибки мы либо повторно генерируем ошибку, которая пробрасывается в RO и на этом выполнение заканчивается, либо генерируем Observable (tryObservable), генерация каждого корректного элемента которого выполнит повторную подписку на SO, в надежде что ошибка исчезнет. Если tryObservable заканчивается ошибкой — она пробрасывается в RO и на этом выполнение заканчивается

RxSwift шпаргалка по операторам (+ PDF) - 51

example("retryWhen") {
    var counter = 0
    let sequenceWithError = Observable<Int>.create { observer in
        observer.on(.Next(1))
        observer.on(.Next(2))
        observer.on(.Next(3))
        observer.on(.Next(4))
        counter += 1
        if counter < 3 {
            observer.onError(RxError.Unknown)
        } /*else {
            observer.onError(RxError.Overflow)
        }*/
        observer.on(.Next(5))
        return NopDisposable.instance
    }.debug("with error")
    let sequenceWithoutError = Observable<Int>.create { observer in
        observer.on(.Next(10))
//observer.onError(RxError.NoElements)
        return NopDisposable.instance
        }.debug("without error")
    let retrySequence = sequenceWithError.retryWhen{ (error: Observable<RxError>) -> Observable<Int> in
        let seq:Observable<Int> = error.flatMap { (generatedError: RxError) -> Observable<Int> in
            if case .Unknown = generatedError {
                return sequenceWithoutError
            }
            return Observable<Int>.error(generatedError)
        }
        return seq

    }//.debug()
 
    retrySequence.subscribe { e in
        print(e)
    }
}

Консоль:


--- retryWhen example ---
2016-04-12 20:18:04.484: with error -> subscribed
2016-04-12 20:18:04.485: with error -> Event Next(1)
Next(1)
2016-04-12 20:18:04.486: with error -> Event Next(2)
Next(2)
2016-04-12 20:18:04.486: with error -> Event Next(3)
Next(3)
2016-04-12 20:18:04.487: with error -> Event Next(4)
Next(4)
2016-04-12 20:18:04.487: with error -> Event Error(Unknown error occured.)
2016-04-12 20:18:04.488: without error -> subscribed
2016-04-12 20:18:04.488: without error -> Event Next(10)
2016-04-12 20:18:04.489: with error -> disposed
2016-04-12 20:18:04.489: with error -> subscribed
2016-04-12 20:18:04.489: with error -> Event Next(1)
Next(1)
2016-04-12 20:18:04.490: with error -> Event Next(2)
Next(2)
2016-04-12 20:18:04.490: with error -> Event Next(3)
Next(3)
2016-04-12 20:18:04.490: with error -> Event Next(4)
Next(4)
2016-04-12 20:18:04.491: with error -> Event Error(Unknown error occured.)
2016-04-12 20:18:04.491: without error -> subscribed
2016-04-12 20:18:04.492: without error -> Event Next(10)
2016-04-12 20:18:04.492: with error -> disposed
2016-04-12 20:18:04.492: with error -> subscribed
2016-04-12 20:18:04.493: with error -> Event Next(1)
Next(1)
2016-04-12 20:18:04.493: with error -> Event Next(2)
Next(2)
2016-04-12 20:18:04.493: with error -> Event Next(3)
Next(3)
2016-04-12 20:18:04.494: with error -> Event Next(4)
Next(4)
2016-04-12 20:18:04.494: with error -> Event Next(5)
Next(5)

Я встроил инкремент переменной i в генерацию sequenceWithError, чтобы на 3й попытке — ошибка исчезла. Если раскоментировать генерацию ошибку RxError.Overflow — мы её не перехватим в операторе retryWhen и пробросим в RO

Операторы для работы с Connectable Observable


multicast [113]

Позволяет проксировать элементы из исходной SO на Subject переданный в качестве параметра. Подписываться нужно именно на этот Subject, генерация элементов Subject начнется после вызова оператора connect.

example("multicast") {
    let subject = PublishSubject<Int>()
    let firstSequence = createSequenceWithWait([0,1,2,3,4,5]) { $0 }
        .multicast(subject)
    delay(2) {
        _ = subject.subscribe { e in
            print("first: (e)")
        }
    }
    delay(3) {
        _ = subject.subscribe { e in
            print("second: (e)")
        }
    }
    firstSequence.connect()
}

Консоль:

--- multicast example ---
first: Next(2)
first: Next(3)
second: Next(3)
first: Next(4)
second: Next(4)
first: Next(5)
second: Next(5)
first: Completed
second: Completed

publish [113]

publish = multicast + replay subject
Позволяет создавать Connectable Observable, которые не генерируют события даже после subscribe. Для старта генерации таким Observable нужно дать команду connect. Это позволяет подписать несколько Observer к одному Observable и начать генерировать элементы одновременно, вне зависимости от того, когда был выполнен subscribe

RxSwift шпаргалка по операторам (+ PDF) - 52

example("subscribe connectable sequnce with connect") {
    let sequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).debug("sequence").publish()
    var disposable1: Disposable!
    var disposable2: Disposable!
    disposable1 = sequence.subscribe { e in
        print("first: (e)")
    }
    delay(2) {
        disposable2 = sequence.subscribe { e in
            print("second: (e)")
        }
    }
    
    delay(4) {
        sequence.connect()
    }
    
    delay(8) {
        disposable1.dispose()
        disposable2.dispose()
    }
}

Консоль:

--- subscribe connectable sequnce with connect example ---
2016-04-12 21:35:32.130: sequence -> subscribed
2016-04-12 21:35:33.131: sequence -> Event Next(0)
first: Next(0)
second: Next(0)
2016-04-12 21:35:34.131: sequence -> Event Next(1)
first: Next(1)
second: Next(1)
2016-04-12 21:35:35.132: sequence -> Event Next(2)
first: Next(2)
second: Next(2)
2016-04-12 21:35:36.132: sequence -> Event Next(3)
2016-04-12 21:35:37.132: sequence -> Event Next(4)

Как видно, хоть подписка была произведена в разное время, пока не вызвали команду connect — генерация элементов не началась. Зато благодаря команде debug видно, что даже после того как все отписались — последовательность продолжила генерировать элементы


refCount [114]

Позволяет создать обычный Observable из Connectable. После первого вызова subscribe к этому обычному Observable — происходит подписка Connectable на SO.
Получается что то вроде
publishSequence = SO.publish()
refCountSequence = publishSequence.refCount()

SO будет продолжать генерировать элементы до тех пор, пока есть хотя бы один подписанный на refCountSequence. Как только все подписки на refCountSequence аннулируются, — происходит отписка и publishSequence от SO

RxSwift шпаргалка по операторам (+ PDF) - 53

example("with refCount") {
    let sequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).debug("sequence")
    let publishSequence = sequence.publish() // создаем Connectable Observable
    let refCountSequence = publishSequence.refCount().debug("refCountSequence")
    let subscription1 = refCountSequence.subscribe{ e in
        print("first: (e)")
    }
    let subscription2 = refCountSequence.subscribe{ e in
        print("second: (e)")
    }
    
    delay(2) {
        subscription1.dispose() // отписываемся в первый раз
        
    }
    delay(3) {
        subscription2.dispose() // здесь мы отписываемся во второй, больше подписок на Observable созданный с помощью refCount  нет. Поэтому этот Observable отпсиывается от SO
        
    }
    delay(5) {
        _ = refCountSequence.subscribe { e in
            print("after: (e)")
        }
    }
}

Консоль:

--- with refCount example ---
2016-04-12 20:25:24.154: refCountSequence -> subscribed // подписались на refCountSequence в 1й раз
2016-04-12 20:25:24.155: sequence -> subscribed // после этого publishSequence подписался на SO
2016-04-12 20:25:24.156: refCountSequence -> subscribed // // подписались на refCountSequence во 2й раз
2016-04-12 20:25:25.156: sequence -> Event Next(0)
2016-04-12 20:25:25.156: refCountSequence -> Event Next(0)
first: Next(0)
2016-04-12 20:25:25.156: refCountSequence -> Event Next(0)
second: Next(0)
2016-04-12 20:25:26.156: sequence -> Event Next(1)
2016-04-12 20:25:26.156: refCountSequence -> Event Next(1)
first: Next(1)
2016-04-12 20:25:26.157: refCountSequence -> Event Next(1)
second: Next(1)
2016-04-12 20:25:26.353: refCountSequence -> disposed // отписались от refCountSequence в 1й раз
2016-04-12 20:25:27.156: sequence -> Event Next(2) // SO продолжает генерировать элементы, т.к. есть еще одна подписка на refCountSequence
2016-04-12 20:25:27.157: refCountSequence -> Event Next(2)
second: Next(2)
2016-04-12 20:25:27.390: refCountSequence -> disposed // отписались от refCountSequence во 2й раз
2016-04-12 20:25:27.390: sequence -> disposed // подписок на refCountSequence больше н еосталось, поэтому publishSequence отписался на SO
2016-04-12 20:25:29.157: refCountSequence -> subscribed // подписались на refCountSequence снова
2016-04-12 20:25:29.157: sequence -> subscribed // т.к. это первая подписка - publishSequence заново подписался на SO
2016-04-12 20:25:30.158: sequence -> Event Next(0)
2016-04-12 20:25:30.159: refCountSequence -> Event Next(0)
after: Next(0)
2016-04-12 20:25:31.158: sequence -> Event Next(1)
2016-04-12 20:25:31.159: refCountSequence -> Event Next(1)
after: Next(1)
2016-04-12 20:25:32.159: sequence -> Event Next(2)
2016-04-12 20:25:32.159: refCountSequence -> Event Next(2)
after: Next(2)
.... 
далее бесконечно генерируются элементы

replay [115]

Если SO обычный, — конвертирует его в Connectable. И после этого все кто подпишутся на него после вызова connect() — мгновенно получат в качестве первых элементов последние генерированные N элементов. Даже если отпишутся все, — Connectable будет продолжать генерировать элементы

RxSwift шпаргалка по операторам (+ PDF) - 54

example("replay") {
    let firstSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).replay(2)
    
    let firstDisposable = firstSequence.subscribe { e in
        print("first: (e)")
    }
    
    firstSequence.connect()
    
    var secondDisposable: Disposable!
    delay(3) {
        secondDisposable = firstSequence.subscribe { e in
        print("second: (e)")
        }
    }
    
    delay(4) {
        firstDisposable.dispose()
    }
    delay(5) {
        secondDisposable.dispose()
    }
    
    delay(7) {
        firstSequence.subscribe { e in
            print("third: (e)")
        }
    }
}

Консоль:

--- replay example ---
first: Next(0)
first: Next(1)
first: Next(2)
second: Next(1)
second: Next(2)
first: Next(3) // после генерации этого элемента мы отписываемся во 1й раз
second: Next(3)
second: Next(4) //после генерации этого элемента мы отписываемся во 2й раз, но SO продолжает генерировать элементы
//тут мы после задержки подписываемся в третий раз
third: Next(5)
third: Next(6)

replayAll [115]

Если SO обычный, — конвертирует его в Connectable. Все кто подпишутся на него после вызова connect() — получат сначала все элементы, которые были генерированы ранее. Даже если отпишутся все, — Connectable будет продолжать генерировать элементы

RxSwift шпаргалка по операторам (+ PDF) - 55

example("replayAll") {
    let firstSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).replayAll()
    
    let firstDisposable = firstSequence.subscribe { e in
        print("first: (e)")
    }
    
    firstSequence.connect()
    
    var secondDisposable: Disposable!
    delay(3) {
        secondDisposable = firstSequence.subscribe { e in
            print("second: (e)")
        }
    }
    
    delay(4) {
        firstDisposable.dispose()
    }
    delay(5) {
        secondDisposable.dispose()
    }
    
    delay(7) {
        firstSequence.subscribe { e in
            print("third: (e)")
        }
    }
}

Консоль:

--- replayAll example ---
first: Next(0)
first: Next(1)
first: Next(2)
second: Next(0)
second: Next(1)
second: Next(2)
first: Next(3) // после генерации этого элемента мы отписываемся во 1й раз
second: Next(3)
second: Next(4)
//после генерации этого элемента мы отписываемся во 2й раз, но SO продолжает генерировать элементы
//тут мы после задержки подписываемся в третий раз
third: Next(0)
third: Next(1)
third: Next(2)
third: Next(3)
third: Next(4)
third: Next(5)
third: Next(6)
third: Next(7)

Вспомогательные методы


debug

RO полностью дублирует SO, но логируются все события с временной меткой

example("debug") {
    let sequence = Observable<AnyObject>.of(1, 2, 3)
        .debug("sequence")
        .subscribe{}
}

Консоль:

--- debug example ---
2016-04-12 21:41:08.467: sequence -> subscribed
2016-04-12 21:41:08.469: sequence -> Event Next(1)
2016-04-12 21:41:08.469: sequence -> Event Next(2)
2016-04-12 21:41:08.469: sequence -> Event Next(3)
2016-04-12 21:41:08.469: sequence -> Event Completed

do / doOnNext [116]

RO полностью дублирует SO, но мы встраиваем перехватчик всех событий из жизненного цикла SO

example("simple doOn") {
    let firstSequence = Observable.of(1,2).doOn{e in
        print(e)
    }
    firstSequence.subscribeNext{ e in // замечу что логирование производится не в методе subscribe, как обычно, а в doOn
    }
}

Консоль:

--- simple doOn example ---
Next(1)
Next(2)
Completed

delaySubscription [117]

Дублирует элементы из SO в RO, но с временной задержкой указанной в качестве параметра

RxSwift шпаргалка по операторам (+ PDF) - 56

example("delaySubscription") {
    let sequence = Observable.of(1, 2, 3).debug("sequence")
        .delaySubscription(3, scheduler: MainScheduler.instance).debug("delayed sequence")
    
    
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- delaySubscription example ---
2016-04-12 21:44:05.226: delayed sequence -> subscribed // подписка на delayed sequence произошла в 5 секунд
2016-04-12 21:44:08.228: sequence -> subscribed // а подписка на SO произошла только через указанных 3 секунды
2016-04-12 21:44:08.229: sequence -> Event Next(1)
2016-04-12 21:44:08.229: delayed sequence -> Event Next(1)
Next(1)
2016-04-12 21:44:08.229: sequence -> Event Next(2)
2016-04-12 21:44:08.229: delayed sequence -> Event Next(2)
Next(2)
2016-04-12 21:44:08.229: sequence -> Event Next(3)
2016-04-12 21:44:08.229: delayed sequence -> Event Next(3)
Next(3)
2016-04-12 21:44:08.230: sequence -> Event Completed
2016-04-12 21:44:08.230: delayed sequence -> Event Completed
Completed
2016-04-12 21:44:08.230: sequence -> disposed

observeOn [118]

Указывает на каком Scheduler выполнять свою работу Observer, особенно критично при работе с GUI

example("without observeOn") {
    let sequence = Observable<AnyObject>.of(1, 2, 3)
    
    sequence.subscribe { e in
        print("(NSThread.currentThread())(e)")
    }
}

example("with observeOn") {
    let queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)
    let sequence = Observable<AnyObject>.of(1, 2, 3)

    sequence.observeOn(ConcurrentDispatchQueueScheduler.init(queue: queue))
        .subscribe { e in
            print("(NSThread.currentThread())(e)")
    }
}

Консоль:

--- without observeOn example ---
<NSThread: 0x7fac1ac13240>{number = 1, name = main}Next(1)
<NSThread: 0x7fac1ac13240>{number = 1, name = main}Next(2)
<NSThread: 0x7fac1ac13240>{number = 1, name = main}Next(3)
<NSThread: 0x7fac1ac13240>{number = 1, name = main}Completed

--- with observeOn example ---
<NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Next(1)
<NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Next(2)
<NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Next(3)
<NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Completed

Как видно, благодаря observeOn мы смогли выполнить код внутри subscribe на другом потоке


subscribe [119]

Оператор, связывающий Observable с Observer, позволяет подписаться на все события из Observable

example("subscribe") {
    let firstSequence = Observable.of(1)
    
    firstSequence.subscribe { e in
        print(e)
    }
    
    firstSequence.subscribeCompleted {
        print("!completed")
    }
    
    firstSequence.subscribeNext{next in
        print("next: (next)")
    }
}

example("subscribeNext") {
    let firstSequence = Observable.of(1)
    
    firstSequence.subscribeNext{next in
        print("next: (next)")
    }
}

example("subscribeCompleted") {
    let firstSequence = Observable.of(1)
    
    firstSequence.subscribeCompleted {
        print("!completed")
    }
}

example("subscribeError") {
    let firstSequence = Observable<Int>.error(RxError.ArgumentOutOfRange)
    
    firstSequence.subscribeError {e in
        print("!error (e)")
    }
}

Консоль:


--- subscribe example ---
Next(1)
Completed
!completed
next: 1

--- subscribeNext example ---
next: 1

--- subscribeCompleted example ---
!completed

--- subscribeError example ---
!error Argument out of range.

Продемонстрированы 4 формы: subscribe, subscribeNext, subscribeCompleted, subscribeError


subscribeOn [120]

Указывает на каком Scheduler выполнять свою работу Observable, особенно критично при работе с GUI

example("without subscribeOn") {
    let sequence = Observable<AnyObject>.of(1, 2, 3)
    sequence.subscribe { e in
        print("(NSThread.currentThread()) (e)")
    }
}

example("with subscribeOn") {
    let queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)
    let sequence = Observable<AnyObject>.of(1, 2, 3)
        .subscribeOn(ConcurrentDispatchQueueScheduler.init(queue: queue))
    
    sequence.subscribe { e in
        print("(NSThread.currentThread()) (e)")
    }
}

Консоль:

--- without subscribeOn example ---
<NSThread: 0x7fef30413290>{number = 1, name = main} Next(1)
<NSThread: 0x7fef30413290>{number = 1, name = main} Next(2)
<NSThread: 0x7fef30413290>{number = 1, name = main} Next(3)
<NSThread: 0x7fef30413290>{number = 1, name = main} Completed

--- with subscribeOn example ---
<NSThread: 0x7fef305c0db0>{number = 3, name = (null)} Next(1)
<NSThread: 0x7fef305c0db0>{number = 3, name = (null)} Next(2)
<NSThread: 0x7fef305c0db0>{number = 3, name = (null)} Next(3)
<NSThread: 0x7fef305c0db0>{number = 3, name = (null)} Completed

timeout [121]

Дублирует элементы из SO в RO, но если в течении указанного времени SO не сгенерировало ни одного элемента — RO генерирует ошибку

RxSwift шпаргалка по операторам (+ PDF) - 57

example("failed timeout ") {
    let sequence = createSequenceWithWait([1,2,3,4]) { $0 }
    let timeoutSequence = sequence.timeout(0.9, scheduler: MainScheduler.instance)
    timeoutSequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- failed timeout  example ---
Next(1)
Error(Sequence timeout.)

using [122]

Позволяет проинструктировать Observable создать ресурс, который будет жить лишь пока жив RO, в качестве параметров передаются 2 фабрики, одна генерирует ресурс, вторая — Observable, у которых будет единое время жизни

class FakeDisposable: Disposable {
    func dispose() {
        print("disposed")
    }
}

example("using") {
    let sequence = Observable.using({
        return FakeDisposable()
        }, observableFactory: { d in
            Observable.just(1)
    }) as Observable<Int>

    
    sequence.subscribe { e in
        print(e)
    }
}

Консоль:

--- using example ---
Next(1)
Completed
disposed

Как видно, после того как Observable закончил генерировать элементы, у нашего ресурса FakeDisposable был вызван метод dispose

Автор: SparkLone

Источник [123]


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/ios/117791

Ссылки в тексте:

[1] reactivex.io: http://reactivex.io/

[2] официальную документацию: https://github.com/ReactiveX/RxSwift

[3] RxMarbles: http://rxmarbles.com/

[4] iPhone/iPad: https://itunes.apple.com/ru/app/rxmarbles/id1087272442?mt=8

[5] ссылка на гитхаб: https://github.com/sparklone/RxSwift

[6] ссылка конкретно на PDF: https://github.com/sparklone/RxSwift/blob/master/RXSwift%20operators.pdf

[7] Заметки: #Intro

[8] asObservable: #asObservable

[9] create: #create

[10] deferred: #deferred

[11] empty: #empty

[12] error: #error

[13] interval: #interval

[14] just: #just

[15] never: #never

[16] of: #of

[17] range: #range

[18] repeatElement: #repeatElement

[19] timer: #timer

[20] amb: #amb

[21] combineLatest: #combineLatest

[22] concat: #concat

[23] merge: #merge

[24] startWith: #startWith

[25] switchLatest: #switchLatest

[26] withLatestFrom: #withLatestFrom

[27] zip: #zip

[28] distinctUntilChanged: #distinctUntilChanged

[29] elementAt: #elementAt

[30] filter: #filter

[31] ignoreElements: #ignoreElements

[32] sample: #sample

[33] single: #single

[34] skip: #skip

[35] skip (duration): #skipDuration

[36] skipUntil: #skipUntil

[37] skipWhile: #skipWhile

[38] skipWhileWithIndex: #skipWhileWithIndex

[39] take: #take

[40] take (duration): #takeDuration

[41] takeLast: #takeLast

[42] takeUntil: #takeUntil

[43] takeWhile: #takeWhile

[44] takeWhileWithIndex: #takeWhileWithIndex

[45] throttle: #throttle

[46] buffer: #buffer

[47] flatMap: #flatMap

[48] flatMapFirst: #flatMapFirst

[49] flatMapLatest: #flatMapLatest

[50] flatMapWithIndex: #flatMapWithIndex

[51] map: #map

[52] mapWithIndex: #mapWithIndex

[53] window: #window

[54] reduce: #reduce

[55] scan: #scan

[56] toArray: #toArray

[57] catchError: #catchError

[58] catchErrorJustReturn: #catchErrorJustReturn

[59] retry: #retry

[60] retryWhen: #retryWhen

[61] multicast: #multicast

[62] publish: #publish

[63] refCount: #refCount

[64] reply: #reply

[65] replayAll: #replayAll

[66] debug: #debug

[67] doOn / doOnNext: #doOn

[68] delaySubscription: #delaySubscription

[69] observeOn: #observeOn

[70] subscribe: #subscribe

[71] subscribeOn: #subscribeOn

[72] timeout: #timeout

[73] using: #using

[74] asObservable: http://reactivex.io/documentation/operators/from.html

[75] create: http://reactivex.io/documentation/operators/create.html

[76] deferred: http://reactivex.io/documentation/operators/defer.html

[77] empty: http://reactivex.io/documentation/operators/empty-never-throw.html

[78] interval: http://reactivex.io/documentation/operators/interval.html

[79] just: http://reactivex.io/documentation/operators/just.html

[80] range: http://reactivex.io/documentation/operators/range.html

[81] repeatElement: http://reactivex.io/documentation/operators/repeat.html

[82] timer: http://reactivex.io/documentation/operators/timer.html

[83] amb: http://reactivex.io/documentation/operators/amb.html

[84] combineLatest: http://reactivex.io/documentation/operators/combinelatest.html

[85] concat: http://reactivex.io/documentation/operators/concat.html

[86] merge: http://reactivex.io/documentation/operators/merge.html

[87] startWith: http://reactivex.io/documentation/operators/startwith.html

[88] switchLatest: http://reactivex.io/documentation/operators/switch.html

[89] zip: http://reactivex.io/documentation/operators/zip.html

[90] distinctUntilChanged: http://reactivex.io/documentation/operators/distinct.html

[91] elementAt: http://reactivex.io/documentation/operators/elementat.html

[92] filter: http://reactivex.io/documentation/operators/filter.html

[93] ignoreElements: http://reactivex.io/documentation/operators/ignoreelements.html

[94] sample: http://reactivex.io/documentation/operators/sample.html

[95] single: http://reactivex.io/documentation/operators/first.html

[96] skip: http://reactivex.io/documentation/operators/skip.html

[97] skipUntil: http://reactivex.io/documentation/operators/skipuntil.html

[98] skipWhile: http://reactivex.io/documentation/operators/skipwhile.html

[99] take: http://reactivex.io/documentation/operators/take.html

[100] takeLast: http://reactivex.io/documentation/operators/takelast.html

[101] takeUntil: http://reactivex.io/documentation/operators/takeuntil.html

[102] takeWhile: http://reactivex.io/documentation/operators/takewhile.html

[103] throttle: http://reactivex.io/documentation/operators/debounce.html

[104] buffer: http://reactivex.io/documentation/operators/buffer.html

[105] flatMap: http://reactivex.io/documentation/operators/flatmap.html

[106] map: http://reactivex.io/documentation/operators/map.html

[107] window: http://reactivex.io/documentation/operators/window.html

[108] reduce: http://reactivex.io/documentation/operators/reduce.html

[109] scan: http://reactivex.io/documentation/operators/scan.html

[110] toArray: http://reactivex.io/documentation/operators/to.html

[111] catchError: http://reactivex.io/documentation/operators/catch.html

[112] retry: http://reactivex.io/documentation/operators/retry.html

[113] multicast: http://reactivex.io/documentation/operators/publish.html

[114] refCount: http://reactivex.io/documentation/operators/refcount.html

[115] replay: http://reactivex.io/documentation/operators/replay.html

[116] do / doOnNext: http://reactivex.io/documentation/operators/do.html

[117] delaySubscription: http://reactivex.io/documentation/operators/delay.html

[118] observeOn: http://reactivex.io/documentation/operators/observeon.html

[119] subscribe: http://reactivex.io/documentation/operators/subscribe.html

[120] subscribeOn: http://reactivex.io/documentation/operators/subscribeon.html

[121] timeout: http://reactivex.io/documentation/operators/timeout.html

[122] using: http://reactivex.io/documentation/operators/using.html

[123] Источник: https://habrahabr.ru/post/281292/