- Регистрация
- 9 Май 2015
- Сообщения
- 1,071
- Баллы
- 155
- Возраст
- 51
Около года назад наш iOS-разработчик компании Noveo, Александр, заинтересовался RxSwift. Столкнувшись с нехваткой документации, Саша решил самостоятельно упорядочить все приобретенные в ходе изучения фреймворка знания — для себя и для других. Результатом стала одна из его статей о Swift 2.2. Со времени ее публикации, конечно, и RxSwift, и сам Swift эволюционировали, и материал нуждался в обновлении, и другой iOS-разработчик из Noveo, Михаил, адаптировал материал для Swift 3. После редакции статья заиграла свежими красками, и на этом мы передаем слово нашим коллегам.
Заинтересовавшись темой функционального программирования, я встал на распутье: какой фреймворк выбрать для ознакомления? ReactiveCocoa — ветеран в iOS-кругах, информации по нему вдоволь. Но он вырос из Objective-C, и хотя это не является проблемой, все же в данный момент я в основном пишу на Swift, и хотелось бы взять решение, изначально спроектированное с учетом всех плюшек этого языка. RxSwift же — порт Reactive Extensions; последний, конечно, имеет долгую историю, но сам порт свежий и написан как раз под Swift. На нем я и решил остановиться.
Как выяснилось, документация по RxSwift несколько специфична: описание всех команд ведет на , а там в основном дается общая информация, у разработчиков еще не дошли руки сделать документацию именно для RxSwift, что не всегда удобно. Некоторые команды имеют тонкости в реализации, а есть такие, о которых в общей документации нет ничего, кроме упоминания.
Прочитав все главы вики с гитхаба RxSwift, я решил сразу поразбираться с официальными примерами; тут-то и стало ясно, что с RX такое не пройдет, нужно хорошо понимать основы, иначе будешь как мартышка скопипастом гранатой. Я начал разбирать самые сложные для понимания команды, а потом перешел к тем, что были вроде и понятны, но задавая себе вопросы по ним, я лишь догадывался, как верно ответить, и уверенности в моих ответах у меня не было.
В общем, я решил проработать все операторы RxSwift. Лучший способ что-то понять в программировании — запустить код и посмотреть, как он отработает. Учитывая специфику реактивного программирования, лучше дополнить это схемами (они бывают очень полезны), ну и кратким описанием на русском. Закончив сегодня работу, я подумал, что грех не поделиться результатами с тем, кто лишь присматривается к теме реактивного программирования.
Много картинок и текста ниже, очень много!
Предварительно я рекомендую просмотреть , у меня передана основная суть и специфика RxSwift команд, а не основы.
Так же можно «поиграться» с шариками в схемах, так называемые , есть бесплатная версия под .
Итак, в этой статье я рассмотрю все (ну или почти все) команды RxSwift, дам для каждой краткое описание, схему (если это имеет смысл), код, результат выполнения, а при необходимости сделаю комментарии по выводу в лог результатов выполнения кода.
В статье заголовок каждой команды — ссылка на на официальную документацию, т.к. я не ставил перед собой цели перевести все нюансы по командам.
Вот , куда я склонировал официальный репозиторий RxSwift и добавил свою песочницу (DescribeOperators.playground), где и будет практически тот же код, что и в статье.
А вот и , где в виде mindMap собраны все команды, что позволяет быстро просмотреть их все. Кусочки кода в PDF приложены для того, чтобы увидеть, как и с каким параметрами нужно работать с командой. Изначально ради этого PDF я все и затеял — чтобы иметь под рукой документ, в котором наглядно видны все команды с их схемами. PDF получился огромным (в плане рабочего пространства, а не веса), но я проверял, даже на iPad 2 все нормально просматривается.
Обо всех ошибках просьба писать в личку, объем работ оказался слегка великоват, после четвертой вычитки текста мои глаза меня прокляли.
Что ж, надеюсь, моя работа кому-то пригодится. Приступим.
Содержание
Создание Observable
Комбинирование Observable
Фильтрация
Трансформация
Операторы математические и агрегирования
Работа с ошибками
Операторы для работы с Connectable Observable
Вспомогательные методы
В схемах я буду использовать обозначение Source/SO в качестве Source Observable, RO/Result в качестве Result Observable.
Функция example просто позволяет отделять вывод в консоли, её код следующий (взят из RxSwift):
public func example(_ description: String, action: (Void) -> Void) {
printExampleHeader(description)
action()
}
Во всех примерах, где необходимо работать с временными задержками, если этот код будет запускаться в песочнице, необходимо прописать
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
Также подразумевается, что читатель имеет общее представление о реактивном программировании в общем и об RxSwift в частности. Не знаю, есть ли смысл городить очередную вводную.
Создание Observable
Этот метод реализован в классах RxSwift, если они поддерживают конвертацию в Observable. Например: ControlEvent, ControlProperty, Variable, Driver
example("\"asObservable\"") {
let variable = Variable(0)
variable
.asObservable()
.subscribe({ event in
print(event)
})
variable.value = 1
}
Консоль:
--- "asObservable" example ---
next(0)
next(1)
completed
В данном примере мы Variable преобразовали в Observable и подписались на его события.
Этот метод позволяет создавать Observable с нуля, полностью контролируя, какие элементы и когда он будет генерировать.
example("\"create\"") {
Observable
.create({ observer in
observer.on(.next(1))
observer.on(.next(2))
observer.on(.next("A"))
observer.on(.next("B"))
observer.onCompleted()
return Disposables.create()
})
.subscribe({ event in
print(event)
})
}
Консоль:
--- "create" example ---
next(1)
next(2)
next(A)
next(B)
completed
В данном примере мы создали Observable, который сгенерирует несколько значений, и в конце вызовется complete.
Этот оператор позволяет отложить создание Observable до момента подписки с помощью subscribe.
example("without \"deferred\"") {
var i = 1
let observable = Observable.just(i)
i = 2
observable
.subscribe({ event in
print(event)
})
}
example("with \"deferred\"") {
var i = 1
let observable = Observable.deferred({ Observable.just(i) })
i = 2
observable
.subscribe({ event in
print(event)
})
}
Консоль:
--- without "deferred" example ---
next(1)
completed
--- with "deferred" example ---
next(2)
completed
В первом случае Observable создается сразу, с помощью Observable.just(i), и изменение значения i уже не влияет на генерируемый этой последовательностью элемент. Во втором же случае мы создаем Observable с помощью deferred и можем поменять значение i перед subscribe.
Пустая последовательность, заканчивающаяся Completed.
example("\"empty\"") {
Observable
.empty()
.subscribe({ event in
print(event)
})
}
Консоль:
--- "empty" example ---
completed
Создаст последовательность, которая состоит из одного события — Error.
example("\"error\"") {
Observable
.error(RxError.unknown)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "error" example ---
error(Unknown error occured.)
Создает бесконечную последовательность, возрастающую с 0 с шагом 1 с указанной периодичностью.
example("\"interval\"") {
Observable
.interval(1, scheduler: MainScheduler.instance)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "interval" example ---
next(0)
next(1)
next(2)
next(3)
.....
Создает последовательность из любого значения, которая завершается Completed.
example("\"just\"") {
Observable
.just(2)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "just" example ---
next(2)
completed
Пустая последовательность, чьи observer’ы никогда не вызываются, т.е. не будет сгенерировано ни одно событие.
example("\"never\"") {
Observable
.never()
.subscribe({ event in
print(event)
})
}
Консоль:
--- "never" example ---
Последовательность из переменного количества элементов. После всех элементов генерируется Completed.
example("\"of\"") {
Observable
.of(2, 3, 5)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "of" example ---
next(2)
next(3)
next(5)
completed
В первом случае мы создали последовательность из двух чисел, во втором — из двух Observable, а затем объединили их между собой с помощью оператора merge.
Создает последовательность с конечным числом элементов, возрастающую с шагом 1 от указанного значения указанное число раз, после всех элементов генерируется Completed.
example("\"range\"") {
Observable
.range(start: 5, count: 3)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "range" example ---
next(5)
next(6)
next(7)
completed
Сгенерировались элементы, начиная с 5, 3 раза с шагом 1.
Бесконечно создавать указанный элемент, без задержек. Никогда не будут сгенерированы события Completed или Error.
example("\"repeatElement\"") {
Observable
.repeatElement(2, scheduler: MainScheduler.instance)
.subscribe({ event in
print(event)
})
}
Консоль:
--- repeatElement example ---
Next(2)
Next(2)
Next(2)
.....
Бесконечная последовательность, возрастающая с 0 с шагом 1, с указанной периодичностью и возможностью задержки при старте. Никогда не будут сгенерированы события Completed или Error.
example("\"timer\"") {
Observable
.timer(2, period: 3, scheduler: MainScheduler.instance)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "timer" example ---
next(0)
next(1)
next(2)
.....
В данном примере последовательность начнет генерировать элементы с задержкой в 2 секунды, каждые 3 секунды.
Комбинирование Observable
SO = [Observable] или SO1, SO2 = Observable
RO = Observable
Из всех Observable SO выбирается тот, который первым начинает генерировать элементы, его элементы и дублируются в RO, остальные SO игнорируются.
example("\"amb\"") {
let subjectA = PublishSubject()
let subjectB = PublishSubject()
let subjectC = PublishSubject()
let subjectD = PublishSubject()
Observable
.amb([subjectA.asObservable(), subjectB.asObservable(), subjectC.asObservable(), subjectD.asObservable()])
.subscribe({ event in
print(event)
})
subjectC.onNext("C1")
subjectD.onNext("D1")
subjectB.onNext("B1")
subjectA.onNext("A1")
subjectC.onNext("C2")
subjectD.onNext("D2")
subjectA.onNext("A2")
}
Консоль:
--- "amb" example ---
next(C1)
next(C2)
Т.к. первым сгенерировал элемент subjectC, лишь его элементы дублируются в RO, остальные игнорируются.
SO = SO1, SO2,... SON = Observable
RO = Observable<f(T,T)>
Как только все Observable сгенерировали хотя бы по одному элементу, эти элементы используются в качестве параметров в переданную функцию, и результат этой функции генерируется RO в качестве элемента. В дальнейшем при генерации элемента любым Observable генерируется новый результат функции с последними элементами из всех комбинируемых Observable.
example("\"combineLatest\"") {
let sequenceA = Observable
.timer(0.15, period: 0.15, scheduler: MainScheduler.instance)
.take(3)
.map({ "A\($0)" })
let sequenceB = Observable
.timer(0.0, period: 0.05, scheduler: MainScheduler.instance)
.take(3)
.map({ "B\($0)" })
Observable
.combineLatest(sequenceA, sequenceB){$0.0 + " - " + $0.1}
.subscribe({ event in
print(event)
})
}
Консоль:
--- "combineLatest" example ---
next(A0 - B2)
next(A1 - B2)
next(A2 - B2)
completed
В этом примере я создал Observable с помощью timer — для генерации элементов с разной задержкой, чтобы было видно, как перемешиваются элементы. К моменту появления первого элемента sequenceA появилось уже три элемента sequenceB. Поэтому первым элементом в RO последовательности стала пара объектов A0 — B2.
SO = Observable<Observable> или SO1, SO2 = Observable
RO = Observable
В RO элементы включают сначала все элементы первого Observable, и лишь затем следующего. Это означает, что если первый Observable никогда не сгенерирует Completed, элементы второго никогда не поступят в RO. Ошибка в текущем Observable пробрасывается в RO.
example("\"concat\"") {
let sequenceA = Observable.of(1, 2, 3)
let sequenceB = Observable.of("A", "B", "C")
sequenceA
.concat(sequenceB)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "concat" example ---
next(1)
next(2)
next(3)
next(A)
next(B)
next(C)
completed
SO = Observable<Observable>
RO = Observable
Элементы RO включают элементы из исходных Observable в том порядке, в котором они были выпущены в исходных Observable.
example("\"merge\"") {
let sequenceA = Observable
.timer(0.0, period: 0.15, scheduler: MainScheduler.instance)
.take(3)
.map({ "A\($0)" })
let sequenceB = Observable
.timer(0.0, period: 0.1, scheduler: MainScheduler.instance)
.take(3)
.map({ "B\($0)" })
Observable
.of(sequenceA, sequenceB)
.merge()
.subscribe({ event in
print(event)
})
}
}
Консоль:
--- "merge" example ---
next(A0)
next(B0)
next(B1)
next(A1)
next(B2)
next(A2)
completed
Последовательности сделаны с задержкой в генерации, и видно, что элементы в RO теперь вперемешку, в том порядке, в котором они были сгенерированы в исходных Observable.
SO = Observable
RO = Observable
В начало SO добавляются элементы, переданные в качестве аргумента.
example("\"startWith\"") {
Observable.of(1, 2, 3)
.startWith(6,7)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "startWith" example ---
next(6)
next(7)
next(1)
next(2)
next(3)
completed
SO = Observable<Observable>
RO = Observable
Изначально подписываемся на O1 генерируемого SO, его элементы зеркально генерируются в RO. Как только из SO генерируется очередной Observable, элементы предыдущего Observable отбрасываются, т.к. происходит отписка от O1, подписываемся на O2 и так далее. Таким образом в RO — элементы лишь из последнего сгенерированного Observable.
example("\"switchLatest\" without delays") {
let variableA = Variable(0)
let variableB = Variable(100)
let variableC = Variable(variableA.asObservable())
variableC
.asObservable()
.switchLatest()
.subscribe({ event in
print(event)
})
variableA.value = 1
variableA.value = 2
variableB.value = 3
variableC.value = variableB.asObservable()
variableB.value = 4
variableA.value = 5
}
example("\"switchLatest\" with delays") {
var sequenceObserver: AnyObserver<Observable>?
Observable<Observable>
.create({ observer in
sequenceObserver = observer
return Disposables.create()
})
.switchLatest()
.debug("result\t\t")
.subscribe()
Observable
.timer(0, period: 0.3, scheduler: MainScheduler.instance)
.skip(1)
.take(3)
.subscribe(onNext: { element in
let observable = Observable
.timer(0.0, period: 0.1, scheduler: MainScheduler.instance)
.map({ $0 + element })
.take(3)
.debug("timer #\(element)\t")
sequenceObserver?.onNext(observable)
}, onCompleted: {
sequenceObserver?.onCompleted()
})
}
Консоль:
--- "switchLatest" without delays example ---
next(0)
next(1)
next(2)
next(3)
next(4)
completed
--- "switchLatest" with delays example ---
2017-02-02 19:18:22.976: result -> subscribed
2017-02-02 19:18:23.282: timer #1 -> subscribed
2017-02-02 19:18:23.283: timer #1 -> Event next(1)
2017-02-02 19:18:23.283: result -> Event next(1)
2017-02-02 19:18:23.384: timer #1 -> Event next(2)
2017-02-02 19:18:23.384: result -> Event next(2)
2017-02-02 19:18:23.483: timer #1 -> Event next(3)
2017-02-02 19:18:23.483: result -> Event next(3)
2017-02-02 19:18:23.483: timer #1 -> Event completed
2017-02-02 19:18:23.483: timer #1 -> isDisposed
2017-02-02 19:18:23.581: timer #2 -> subscribed
2017-02-02 19:18:23.582: timer #2 -> Event next(2)
2017-02-02 19:18:23.582: result -> Event next(2)
2017-02-02 19:18:23.683: timer #2 -> Event next(3)
2017-02-02 19:18:23.683: result -> Event next(3)
2017-02-02 19:18:23.783: timer #2 -> Event next(4)
2017-02-02 19:18:23.783: result -> Event next(4)
2017-02-02 19:18:23.783: timer #2 -> Event completed
2017-02-02 19:18:23.783: timer #2 -> isDisposed
2017-02-02 19:18:23.881: timer #3 -> subscribed
2017-02-02 19:18:23.882: timer #3 -> Event next(3)
2017-02-02 19:18:23.882: result -> Event next(3)
2017-02-02 19:18:23.983: timer #3 -> Event next(4)
2017-02-02 19:18:23.983: result -> Event next(4)
2017-02-02 19:18:24.083: timer #3 -> Event next(5)
2017-02-02 19:18:24.083: result -> Event next(5)
2017-02-02 19:18:24.083: timer #3 -> Event completed
2017-02-02 19:18:24.083: timer #3 -> isDisposed
2017-02-02 19:18:24.083: result -> Event completed
2017-02-02 19:18:24.083: result -> isDisposed
В первом примере показано, как команда работает в статике, когда мы руками переподключаем Observable.
Во втором примере оператором create создан Observable<Observable>, AnyObserver которого мы вынесли в переменную, по таймеру получающую новый Observable, который реализован в виде таймера. Т.к. задержки у таймеров разные, то можно наблюдать. как с помощью switchLatest в RO попадают значения из последнего сгенерированного таймера.
SO1, SO2 = Observable
RO = Observable<f(T,T)>
Как только O1 генерирует элемент, проверяется, сгенерирован ли хоть один элемент в O2, и если да, то берутся последние элементы из O1 и O2 и используются в качестве аргументов для переданной функции, результат которой генерируется RO в качестве элемента.
example("\"withLatestFrom\" O2, where O2 has previous value") {
let variableA = Variable(0)
let variableB = Variable(10)
variableA
.asObservable()
.withLatestFrom(variableB.asObservable()) {"\($0) - \($1)"}
.subscribe({ event in
print(event)
})
variableA.value = 1
variableA.value = 2
variableB.value = 20
variableB.value = 30
variableA.value = 5
variableA.value = 6
}
example("\"withLatestFrom\" O2, where O2 doesn't have previous value") {
let publishA = PublishSubject()
let publishB = PublishSubject()
publishA
.asObservable()
.withLatestFrom(publishB.asObservable()) {"\($0) - \($1)"}
.subscribe({ event in
print(event)
})
publishA.onNext(1)
publishA.onNext(2)
publishB.onNext(20)
publishB.onNext(30)
publishA.onNext(5)
publishA.onNext(6)
}
Консоль:
--- "withLatestFrom" O2, where O2 has previous value example ---
next(0 - 10)
next(1 - 10)
next(2 - 10)
next(5 - 30)
next(6 - 30)
completed
--- "withLatestFrom" O2, where O2 doesn't have previous value example ---
next(5 - 30)
next(6 - 30)
SO = Observable<Observable>
RO = Observable<f(T,T)>
Элементы RO представляют собой комбинацию из элементов, сгенерированных исходными Observable, объединение идет по индексу выпущенного элемента.
example("\"zip\"") {
let publishA = PublishSubject()
let publishB = PublishSubject()
Observable
.zip(publishA.asObservable(), publishB.asObservable()) {"\($0) - \($1)"}
.subscribe({ event in
print(event)
})
publishA.onNext(0)
publishA.onNext(1)
publishA.onNext(2)
publishB.onNext("A")
publishB.onNext("B")
publishA.onNext(3)
publishB.onNext("C")
}
Консоль:
--- "zip" example ---
next(0 - A)
next(1 - B)
next(2 - C)
Из примеров видно, что элементы комбинируются попарно в том порядке, в каком они были сгенерированы в исходных Observable.
Фильтрация
Пропускаем все повторяющиеся подряд идущие элементы.
example("\"distinctUntilChanged\"") {
Observable
.of(1, 1, 2, 3, 3, 1, 1, 2)
.distinctUntilChanged()
.subscribe({ event in
print(event)
})
}
Консоль:
--- "distinctUntilChanged" example ---
next(1)
next(2)
next(3)
next(1)
next(2)
completed
Здесь тонкий момент: отбрасываются не уникальные для всей последовательности элементы, а лишь те, которые идут подряд.
В RO попадает лишь элемент, выпущенный N по счету.
example("\"elementAt\"") {
Observable
.of(1, 8, 2, 3)
.elementAt(1)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "elementAt" example ---
next(8)
completed
Отбрасываются все элементы, которые не удовлетворяют заданным условиям.
example("\"filter\"") {
Observable
.of(2, 12, 5, 4, 30)
.filter({ $0 > 10 })
.subscribe({ event in
print(event)
})
}
Консоль:
--- "filter" example ---
next(12)
next(30)
completed
Отбрасывает все элементы, передаёт только терминальные сообщения Completed и Error.
example("\"ignoreElements\"") {
Observable
.of(1, 8, 2)
.ignoreElements()
.subscribe({ event in
print(event)
})
}
Консоль:
--- "ignoreElements" example ---
completed
При каждом сгенерированном элементе последовательности семплера (воспринимать как таймер) — брать последний выпущенный элемент исходной последовательности и дублировать его в RO, ЕСЛИ он не был сгенерирован ранее.
let firstSequence = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(10)
let secondSequence = Observable
.timer(0, period: 0.17, scheduler: MainScheduler.instance)
.take(10)
firstSequence
.sample(secondSequence)
.subscribe({ event in
print(event)
})
Консоль:
--- "sample" example ---
next(0)
next(1)
next(3)
next(5)
next(6)
next(8)
next(9)
completed
Из исходной последовательности берется единственный элемент, если элементов > 1 — генерировать ошибку. Есть вариант с предикатом.
example("simple \"single\" with error") {
Observable
.of(1, 2, 3, 4)
.single()
.subscribe({ event in
print(event)
})
}
example("predicate \"single\" with success") {
Observable
.of(1, 2, 3, 4)
.single({ $0 == 2 })
.subscribe({ event in
print(event)
})
}
example("predicate \"single\" with error") {
Observable
.of(1, 2, 3, 4)
.single({ $0 < 0 })
.subscribe({ event in
print(event)
})
}
Консоль:
--- simple "single" with error example ---
next(1)
error(Sequence contains more than one element.)
--- predicate "single" with success example ---
next(2)
completed
--- predicate "single" with error example ---
error(Sequence doesn't contain any elements.)
В первом примере в исходной последовательности оказалось больше 1 элемента, поэтому была сгенерирована ошибка в момент генерирования в SO второго элемента.
Во втором примере условиям предиката удовлетворил всего 1 элемент, поэтому ошибки сгенерировано не было.
Из SO отбрасываем первые N элементов.
example("\"skip\"") {
Observable
.of(1, 8, 2, 3)
.skip(2)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "skip" example ---
next(2)
next(3)
completed
Из SO отбрасываем первые элементы, которые были сгенерированы в первые N.
example("\"skip\" with duration") {
Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(5)
.skip(0.25, scheduler: MainScheduler.instance)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "skip" with duration example ---
next(3)
next(4)
completed
Отбрасываем из SO элементы, которые были сгенерированы до начала генерации элементов последовательностью, переданной в качестве параметра.
example("\"skipUntil\"") {
let firstSequence = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(5)
let secondSequence = Observable
.timer(0.25, period: 0.1, scheduler: MainScheduler.instance)
.take(1)
firstSequence
.skipUntil(secondSequence)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "skipUntil" example ---
next(3)
next(4)
completed
Генерация элементов в secondSequence была отложена на 1 секунду с помощью команды delaySubscription, таким образом элементы из firstSequence стали дублироваться в RO лишь через 1 секунду.
Отбрасываем из SO элементы до тех пор, пока функция, переданная в качестве параметра, возвращает true.
example("\"skipWhile\"") {
Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(5)
.skipWhile({ $0 < 3 })
.subscribe({ event in
print(event)
})
}
Консоль:
--- "skipWhile" example ---
next(3)
next(4)
completed
Отбрасываем из SO элементы до тех пор, пока пока функция, переданная в качестве параметра, возвращает true. Отличие от skipWhile в том, что еще одним параметром, переданным в функцию, является индекс сгенерированного элемента.
example("\"skipWhileWithIndex\"") {
Observable
.of(1,2,5,0,7)
.skipWhileWithIndex({ (value, index) -> Bool in
return value < 4 || index < 2
})
.subscribe({ event in
print(event)
})
}
Консоль:
--- "skipWhileWithIndex" example ---
next(5)
next(0)
next(7)
completed
Из SO берутся лишь первые N элементов.
example("\"take\"") {
Observable
.of(1, 2, 3, 4)
.take(2)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "take" example ---
next(1)
next(2)
completed
Из SO берутся лишь элементы, сгенерированные в первые N секунд.
example("\"take\" with duration") {
Observable
.timer(0, period: 0.16, scheduler: MainScheduler.instance)
.take(0.3, scheduler: MainScheduler.instance)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "take" with duration example ---
next(0)
next(1)
completed
Из SO берутся лишь последние N элементов. Что означает: если SO никогда не закончит генерировать элементы, в RO не попадет ни одного элемента.
example("\"takeLast\"") {
Observable.of(1, 2, 3, 4)
.takeLast(2)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "takeLast" example ---
next(3)
next(4)
completed
Второй пример приведен для иллюстрации в задержке генерации элементов в RO из-за ожидания завершения генерации элементов в SO.
Из SO берутся элементы, которые были выпущены до начала генерации элементов последовательностью, переданной в качестве параметра.
example("\"takeUntil\"") {
let firstSequence = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(5)
let secondSequence = Observable
.timer(0.25, period: 0.1, scheduler: MainScheduler.instance)
.take(1)
firstSequence
.takeUntil(secondSequence)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "takeUntil" example ---
next(0)
next(1)
next(2)
completed
Из SO берутся элементы до тех пор, пока функция, переданная в качестве параметра, возвращает true.
example("\"takeWhileWithIndex\"") {
Observable
.of(1,2,5,0,7)
.takeWhileWithIndex({ (value, index) -> Bool in
return value < 4 || index < 2
})
.subscribe({ event in
print(event)
})
}
Консоль:
--- "takeWhileWithIndex" example ---
next(1)
next(2)
completed
Из SO берутся элементы до тех пор, пока функция, переданная в качестве параметра, возвращает true. Отличие от takeWhile в том, что еще одним параметром, переданным в функцию, является индекс сгенерированного элемента.
example("takeWhileWithIndex") {
let sequence = [1,2,3,4,5,6].toObservable()
.takeWhileWithIndex{ (val, idx) in
val % 2 == 0 || idx < 3
}
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- takeWhileWithIndex example ---
Next(1)
Next(2)
Next(3)
Next(4)
Completed
Из SO берутся лишь элементы, после которых не было новых элементов N секунд.
example("\"debounce\"") {
let firstSequence = Observable
.timer(0, period: 1.0, scheduler: MainScheduler.instance)
.map({ "X\($0)" })
.take(7)
.debug("\tfirst\t")
let secondSequence = Observable
.timer(0, period: 1.4, scheduler: MainScheduler.instance)
.skip(1)
.map({ "Y\($0)" })
.take(4)
.debug("\tsecond\t")
Observable
.of(firstSequence, secondSequence)
.merge()
.debounce(0.9, scheduler: MainScheduler.instance)
.debug("\tdebounce\t")
.subscribe()
}
Консоль:
--- "debounce" example ---
2017-01-31 15:18:40.601: debounce -> subscribed
2017-01-31 15:18:40.604: first -> subscribed
2017-01-31 15:18:40.605: second -> subscribed
2017-01-31 15:18:40.618: first -> Event next(X0)
2017-01-31 15:18:41.519: debounce -> Event next(X0)
2017-01-31 15:18:41.606: first -> Event next(X1)
2017-01-31 15:18:42.006: second -> Event next(Y1)
2017-01-31 15:18:42.606: first -> Event next(X2)
2017-01-31 15:18:43.406: second -> Event next(Y2)
2017-01-31 15:18:43.606: first -> Event next(X3)
2017-01-31 15:18:44.507: debounce -> Event next(X3)
2017-01-31 15:18:44.606: first -> Event next(X4)
2017-01-31 15:18:44.806: second -> Event next(Y3)
2017-01-31 15:18:45.605: first -> Event next(X5)
2017-01-31 15:18:46.206: second -> Event next(Y4)
2017-01-31 15:18:46.206: second -> Event completed
2017-01-31 15:18:46.206: second -> isDisposed
2017-01-31 15:18:46.605: first -> Event next(X6)
2017-01-31 15:18:46.605: first -> Event completed
2017-01-31 15:18:46.605: first -> isDisposed
2017-01-31 15:18:46.605: debounce -> Event next(X6)
2017-01-31 15:18:46.605: debounce -> Event completed
2017-01-31 15:18:46.605: debounce -> isDisposed
В данном примере элементы генерируются c разными задержками. Поэтому debounce сработает всего несколько раз в момент, когда между элементами будет достаточный временной промежуток.
Трансформация
SO = Observable<>>
RO = Observable<[T]>
Элементы из SO по определенным правилам объединяются в массивы и генерируются в RO. В качестве параметров передаются count (максимальное число элементов в массиве) и timeSpan (время максимального ожидания наполнения текущего массива из элементов SO). Таким образом, элемент RO являет собой массив [T] длиной от 0 до count.
example("\"buffer\"") {
Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.buffer(timeSpan: 0.16, count: 3, scheduler: MainScheduler.instance)
.take(5)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "buffer" example ---
next([0, 1])
next([2, 3])
next([4])
next([5, 6])
next([7, 8])
completed
Максимальное число элементов в массиве указано равное трем, и оно никак не влияет на наполнение массива в данном примере. За максимальное время ожидания наполнения таймер успевает сгенерировать не более 2 элементов. Однако период таймера и время ожидания наполнения выбраны таким образом, что третий массив успеет получить лишь один объект.
Каждый элемент SO превращается в отдельный Observable, и все элементы из [O1, O2, O3…] объединяются в RO. Порядок генерации элементов в RO зависит от времени их генерации в исходных [O1, O2, O3…] (как в команде merge).
example("\"flatMap\"") {
Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(1)
.take(3)
.flatMap({ (value: Int) -> Observable in
return Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(value)
.take(value)
.map({"\(value) --> \($0)"})
})
.subscribe({ event in
print(event)
})
}
Консоль:
--- "flatMap" example ---
next(1 --> 1)
next(2 --> 2)
next(2 --> 3)
next(3 --> 3)
next(3 --> 4)
next(3 --> 5)
completed
Каждый элемент SO превращается в отдельный Observable.
1) Изначально подписываемся на O1, его элементы зеркально генерируются в RO. Пока O1 генерирует элементы, все последующие Observable, сгенерированные из SO, отбрасываются, на них не подписываемся.
2) как только O1 оканчивается, если будет сгенерирован новый Observable, на него подпишутся, и его элементы будут дублироваться в RO.
Повторяем пункт 1, но вместо O1 берем последний сгенерированный Observable.
example("\"flatMapFirst\"") {
Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(1)
.take(3)
.debug("\tвнешний\t\t")
.flatMapFirst({ (value: Int) -> Observable in
return Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(value)
.take(value)
.map({"\(value) --> \($0)"})
.debug("\tвнутренний\t")
})
.debug("\tрезультат\t")
.subscribe()
}
Консоль:
--- "flatMap" example ---
2017-01-31 17:41:02.078: результат -> subscribed
2017-01-31 17:41:02.079: внешний -> subscribed
2017-01-31 17:41:02.180: внешний -> Event next(1) // учитывается, т.к. впервые
2017-01-31 17:41:02.182: внутренний -> subscribed // начало 1
2017-01-31 17:41:02.281: внешний -> Event next(2) // не учитывается, т.к. 1 жив
2017-01-31 17:41:02.284: внутренний -> Event next(1 --> 1)
2017-01-31 17:41:02.284: результат -> Event next(1 --> 1) // РЕЗУЛЬТАТ
2017-01-31 17:41:02.284: внутренний -> Event completed // окончание 1
2017-01-31 17:41:02.284: внутренний -> isDisposed
2017-01-31 17:41:02.381: внешний -> Event next(3) // учитывается, т.к. 1 окончен
2017-01-31 17:41:02.382: внутренний -> subscribed // начало 3
2017-01-31 17:41:02.382: внешний -> Event completed
2017-01-31 17:41:02.382: внешний -> isDisposed
2017-01-31 17:41:02.682: внутренний -> Event next(3 --> 3)
2017-01-31 17:41:02.682: результат -> Event next(3 --> 3) // РЕЗУЛЬТАТ
2017-01-31 17:41:02.782: внутренний -> Event next(3 --> 4)
2017-01-31 17:41:02.782: результат -> Event next(3 --> 4) // РЕЗУЛЬТАТ
2017-01-31 17:41:02.883: внутренний -> Event next(3 --> 5)
2017-01-31 17:41:02.883: результат -> Event next(3 --> 5) // РЕЗУЛЬТАТ
2017-01-31 17:41:02.883: внутренний -> Event completed // окончание 3
2017-01-31 17:41:02.883: внутренний -> isDisposed
2017-01-31 17:41:02.883: результат -> Event completed
2017-01-31 17:41:02.883: результат -> isDisposed
В примере благодаря задержкам в генерации мы видим, что, пока не произойдет окончание первого Observable, никакой новый Observable его не заменит.
Каждый элемент SO превращается в отдельный Observable. Изначально подписываемся на O1, его элементы зеркально генерируются в RO. Как только из SO выпускается очередной элемент и на его основе генерируется очередной Observable, элементы предыдущего Observable отбрасываются, т.к. происходит отписка. Таким образом в RO — элементы из последнего генерированного Observable.
example("\"flatMapLatest\"") {
Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(1)
.take(3)
.debug("внешний\t")
.flatMapLatest({ (value: Int) -> Observable in
return Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(value)
.take(value)
.map({"\(value) --> \($0)"})
.debug("внутренний\t")
})
.debug("результат\t")
.subscribe()
}
Консоль:
--- "flatMapLatest" example ---
2017-01-31 18:35:25.941: результат -> subscribed
2017-01-31 18:35:25.942: внешний -> subscribed
2017-01-31 18:35:26.044: внешний -> Event next(1) // Новый сигнал
2017-01-31 18:35:26.046: внутренний -> subscribed // flatMapLatest
2017-01-31 18:35:26.144: внешний -> Event next(2) // Новый сигнал
2017-01-31 18:35:26.145: внутренний -> isDisposed // Отписался от прошлого
2017-01-31 18:35:26.145: внутренний -> subscribed // flatMapLatest
2017-01-31 18:35:26.244: внешний -> Event next(3) // Новый сигнал
2017-01-31 18:35:26.245: внутренний -> isDisposed // Отписался от прошлого
2017-01-31 18:35:26.245: внутренний -> subscribed // flatMapLatest
2017-01-31 18:35:26.245: внешний -> Event completed // Новых сигналов не будет
2017-01-31 18:35:26.245: внешний -> isDisposed
2017-01-31 18:35:26.546: внутренний -> Event next(3 --> 3)
2017-01-31 18:35:26.546: результат -> Event next(3 --> 3) // РЕЗУЛЬТАТ
2017-01-31 18:35:26.646: внутренний -> Event next(3 --> 4)
2017-01-31 18:35:26.646: результат -> Event next(3 --> 4) // РЕЗУЛЬТАТ
2017-01-31 18:35:26.746: внутренний -> Event next(3 --> 5)
2017-01-31 18:35:26.746: результат -> Event next(3 --> 5) // РЕЗУЛЬТАТ
2017-01-31 18:35:26.746: внутренний -> Event completed
2017-01-31 18:35:26.746: внутренний -> isDisposed
2017-01-31 18:35:26.746: результат -> Event completed
2017-01-31 18:35:26.746: результат -> isDisposed
В примере благодаря задержкам в генерации мы видим, что, как только генерируется новый Observable, происходит отписка от предыдущего Observable.
Каждый элемент SO превращается в отдельный Observable, и все элементы из [O1, O2, O3…] объединяются в RO. Порядок генерации элементов в RO зависит от времени их генерации в исходных [O1, O2, O3…] (как в команде merge). Отличие от flatMap в том, что еще одним параметром, переданным в функцию, является индекс сгенерированного элемента.
example("\"flatMapWithIndex\"") {
Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(1)
.take(3)
.flatMapWithIndex({ (value: Int, index: Int) -> Observable in
return Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(value)
.take(value)
.map({"\(value, index) --> \($0)"})
})
.subscribe({ event in
print(event)
})
}
Консоль:
--- "flatMapWithIndex" example ---
next((1, 0) --> 1)
next((2, 1) --> 2)
next((2, 1) --> 3)
next((3, 2) --> 3)
next((3, 2) --> 4)
next((3, 2) --> 5)
completed
Observable -> Observable
Элементы SO преобразуются, не меняя порядок их генерации. Можно менять не только значение, но и тип элементов.
example("\"map\"") {
Observable
.of(1, 2, 3)
.map({ $0 * 5 })
.subscribe({ event in
print(event)
})
}
Консоль:
--- "map" example ---
next(5)
next(10)
next(15)
completed
Observable -> Observable
Элементы SO преобразуются, не меняя порядок их генерации. Можно менять не только значение, но и тип элементов. Отличие от map в том, что еще одним параметром, переданным в функцию, является индекс сгенерированного элемента.
example("\"mapWithIndex\"") {
Observable
.of("A", "B", "C")
.mapWithIndex({ $0.0 + "\($0.1)" })
.subscribe({ event in
print(event)
})
}
Консоль:
--- "mapWithIndex" example ---
next(A0)
next(B1)
next(C2)
completed
SO = Observable
RO = Observable<Observable>
Элементы из SO по определенным правилам передаются в генерирующиеся новые Observable. В качестве параметров передаются count (максимальное число элементов, которые будут сгенерированы каждым Observable) и timeSpan (время максимального ожидания наполнения текущего Observable из элементов SO). Таким образом элемент RO являет собой Observable, число генерируемых элементов которого равно от 0 до N. Основное отличие от bufffer в том, что элементы SO зеркалятся сгенерированными Observable моментально, а в случае buffer мы вынуждены ждать указанное в качестве параметра максимальное время (если буфер не заполнится раньше).
example("\"window\"") {
let firstSequence = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.window(timeSpan: 0.16, count: 3, scheduler: MainScheduler.instance)
.take(5)
.subscribe(onNext: { (observable: Observable) in
observable
.subscribe({ event in
print(event)
})
})
}
Консоль:
--- "window" example ---
next(0)
next(1)
completed
next(2)
next(3)
completed
next(4)
completed
next(5)
next(6)
completed
next(7)
next(8)
completed
В примере используются временные задержки, что помогает добиться частичной наполненности генерируемых Observable.
Операторы математические и агрегирования
Каждый элемент SO преобразуется с помощью переданной функции, результат операции передается в качестве параметра в функцию на следующем шаге. Как только SO генерирует терминальное состояние, RO генерирует результат, т.е. RO сгенерирует лишь один элемент.
example("\"reduce\"") {
Observable
.of(1, 2, 3, 4)
.reduce(1) {$0 * $1}
.subscribe({ event in
print(event)
})
}
Консоль:
--- "reduce" example ---
next(24)
completed
Каждый элемент SO преобразуется с помощью переданной функции, результат операции генерируется в RO, но, кроме этого, оно передается в качестве параметра в функцию на следующем шаге. В отличии от reduce число элементов в RO равно числу элементов в SO.
example("\"scan\"") {
Observable
.of(1, 2, 3, 4)
.scan(1) {$0 * $1}
.subscribe({ event in
print(event)
})
}
Консоль:
--- "scan" example ---
next(1)
next(2)
next(6)
next(24)
completed
SO = Observable
RO = Observable<[T]>
Все элементы из SO после генерации терминального состояния объединяются в массив, и генерируются RO.
example("\"toArray\"") {
Observable
.of(1, 2, 3)
.toArray()
.subscribe({ event in
print(event)
})
}
Консоль:
--- "toArray" example ---
next([1, 2, 3])
completed
Работа с ошибками
Позволяет перехватить сгенерированную ошибку из SO и заменить ее на новый Observable, который теперь будет генерировать элементы.
example("without \"catchError\"") {
Observable
.create({ (observer) in
observer.onNext(1)
observer.onNext(2)
observer.onError(RxError.unknown)
observer.onNext(3)
return Disposables.create()
})
.subscribe({ event in
print(event)
})
}
example("with \"catchError\"") {
Observable
.create({ (observer) in
observer.onNext(1)
observer.onNext(2)
observer.onError(RxError.unknown)
observer.onNext(3)
return Disposables.create()
})
.catchError({ (error) -> Observable in
return Observable.just(3)
})
.subscribe({ event in
print(event)
})
}
Консоль:
--- without "catchError" example ---
next(1)
next(2)
error(Unknown error occured.)
--- with "catchError" example ---
next(1)
next(2)
next(3)
completed
После генерации очередного элемента была сгенерирована ошибка, но мы её перехватили и вернули взамен новый Observable.
Позволяет перехватить сгенерированную ошибку из SO и заменить её на указанный элемент, после этого SO генерирует Completed.
example("without \"catchErrorJustReturn\"") {
Observable
.create({ (observer) in
observer.onNext(1)
observer.onNext(2)
observer.onError(RxError.unknown)
observer.onNext(3)
return Disposables.create()
})
.subscribe({ event in
print(event)
})
}
example("with \"catchErrorJustReturn\"") {
Observable
.create({ (observer) in
observer.onNext(1)
observer.onNext(2)
observer.onError(RxError.unknown)
observer.onNext(3)
return Disposables.create()
})
.catchErrorJustReturn(3)
.subscribe({ event in
print(event)
})
}
Консоль:
--- without "catchErrorJustReturn" example ---
next(1)
next(2)
error(Unknown error occured.)
--- with "catchErrorJustReturn" example ---
next(1)
next(2)
next(3)
completed
После генерации очередного элемента была сгенерирована ошибка, но мы её перехватили и вернули взамен новый элемент.
Позволяет перехватить сгенерированную ошибку из SO и в зависимости от переданного параметра попытаться запустить SO c начала нужное число раз в надежде, что ошибка не повторится.
example("\"retry\" once") {
Observable
.create({ (observer) in
observer.onNext(1)
observer.onNext(2)
observer.onError(RxError.unknown)
observer.onNext(3)
return Disposables.create()
})
.retry(2)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "retry" once example ---
next(1)
next(2)
next(1)
next(2)
error(Unknown error occured.)
Передаваемое в оператор целое число означает количество попыток дождаться успешного окончания. 0 — ни одной попытки, т.е. цепочка ни разу не будет запущена, и просто произойдет completed событие. 1 — такое же поведение, как будто оператор и не не был применен. 2 — исходная попытка + дополнительная, и т.д. Если в оператор не передать параметр, то количество попыток повторить будет бесконечным.
Позволяет перехватить сгенерированную ошибку из SO, и в зависимости от типа ошибки мы либо повторно генерируем ошибку, которая пробрасывается в RO, и на этом выполнение заканчивается, либо генерируем Observable (tryObservable), генерация каждого корректного элемента которого выполнит повторную подписку на SO в надежде, что ошибка исчезнет. Если tryObservable заканчивается ошибкой, она пробрасывается в RO, и на этом выполнение заканчивается.
example("\"retryWhen\"") {
var counter = 0
let sequenceWithError = Observable
.create({ observer in
observer.on(.next(1))
observer.on(.next(2))
counter += 1
switch counter {
case 0..
.create({ observer in
observer.on(.next(10))
// observer.onError(RxError.noElements)
return Disposables.create()
})
.debug("without\t")
let retrySequence = sequenceWithError
.retryWhen({ (observableError: Observable) -> Observable in
return observableError
.flatMap({ (error: RxError) -> Observable in
switch error {
case .unknown: return sequenceWithoutError
default: return Observable.error(error)
}
})
})
.subscribe()
}
Консоль:
--- "retryWhen" example ---
2017-02-03 15:23:18.563: with -> subscribed
2017-02-03 15:23:18.564: with -> Event next(1)
2017-02-03 15:23:18.564: with -> Event next(2)
2017-02-03 15:23:18.565: with -> Event error(Unknown error occured.)
2017-02-03 15:23:18.566: without -> subscribed
2017-02-03 15:23:18.566: without -> Event next(10)
2017-02-03 15:23:18.567: with -> isDisposed
2017-02-03 15:23:18.568: with -> subscribed
2017-02-03 15:23:18.568: with -> Event next(1)
2017-02-03 15:23:18.568: with -> Event next(2)
2017-02-03 15:23:18.568: with -> Event error(Unknown error occured.)
2017-02-03 15:23:18.569: without -> subscribed
2017-02-03 15:23:18.569: without -> Event next(10)
2017-02-03 15:23:18.569: with -> isDisposed
2017-02-03 15:23:18.570: with -> subscribed
2017-02-03 15:23:18.570: with -> Event next(1)
2017-02-03 15:23:18.570: with -> Event next(2)
2017-02-03 15:23:18.571: with -> Event next(3)
Я встроил инкремент переменной i в генерацию sequenceWithError, чтобы на 3й попытке ошибка исчезла. Если раскоментировать генерацию ошибки RxError.Overflow, мы её не перехватим в операторе retryWhen и пробросим в RO.
Операторы для работы с Connectable Observable
Позволяет проксировать элементы из исходной SO на Subject, переданный в качестве параметра. Подписываться нужно именно на этот Subject, генерация элементов Subject начнется после вызова оператора connect.
example("\"multicast\"") {
let subject = PublishSubject()
let multicast = getExternalObservable()
.multicast(subject)
subject
.asObservable()
.debug("\tinstant\t")
.subscribe()
delay(0.25, closure: { _ in
multicast.connect()
})
delay(0.45, closure: { _ in
subject
.asObservable()
.debug("\tdelayed\t")
.subscribe()
})
}
func getExternalObservable() -> Observable {
let timer = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(7)
.shareReplay(1)
defer {
timer.subscribe()
}
return timer
}
Консоль:
--- "multicast" example ---
2017-02-02 20:20:14.446: instant -> subscribed
2017-02-02 20:20:14.721: instant -> Event next(2)
2017-02-02 20:20:14.743: instant -> Event next(3)
2017-02-02 20:20:14.844: instant -> Event next(4)
2017-02-02 20:20:14.901: delayed -> subscribed
2017-02-02 20:20:14.943: instant -> Event next(5)
2017-02-02 20:20:14.943: delayed -> Event next(5)
2017-02-02 20:20:15.043: instant -> Event next(6)
2017-02-02 20:20:15.043: delayed -> Event next(6)
2017-02-02 20:20:15.043: instant -> Event completed
2017-02-02 20:20:15.043: instant -> isDisposed
2017-02-02 20:20:15.043: delayed -> Event completed
2017-02-02 20:20:15.043: delayed -> isDisposed
publish = multicast + replay subject
Позволяет создавать Connectable Observable, которые не генерируют события даже после subscribe. Для старта генерации таким Observable нужно дать команду connect. Это позволяет подписать несколько Observer к одному Observable и начать генерировать элементы одновременно, вне зависимости от того, когда был выполнен subscribe.
var disposeBag: DisposeBag? = DisposeBag()
example("\"publish\"") {
let publish = getExternalObservable()
.debug("\tsequence\t")
.publish()
Observable
.timer(0.0, period: 0.1, scheduler: MainScheduler.instance)
.skip(1)
.take(2)
.subscribe(onNext: { value in
let disposable = publish
.asObservable()
.debug("\tdelayed #\(value)\t")
.subscribe()
disposeBag?.insert(disposable)
})
delay(0.25, closure: { _ in
publish.connect()
})
delay(0.55, closure: { _ in
disposeBag = nil
})
}
func getExternalObservable() -> Observable {
let timer = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(8)
.shareReplay(1)
defer {
timer.subscribe()
}
return timer
}
Консоль:
--- "publish" example ---
2017-02-02 20:43:47.691: delayed #1 -> subscribed // ожидание connect
2017-02-02 20:43:47.791: delayed #2 -> subscribed // ожидание connect
2017-02-02 20:43:47.859: sequence -> subscribed // connect случился
2017-02-02 20:43:47.860: sequence -> Event next(2)
2017-02-02 20:43:47.860: delayed #1 -> Event next(2)
2017-02-02 20:43:47.860: delayed #2 -> Event next(2)
2017-02-02 20:43:47.889: sequence -> Event next(3)
2017-02-02 20:43:47.889: delayed #1 -> Event next(3)
2017-02-02 20:43:47.889: delayed #2 -> Event next(3)
2017-02-02 20:43:47.989: sequence -> Event next(4)
2017-02-02 20:43:47.989: delayed #1 -> Event next(4)
2017-02-02 20:43:47.989: delayed #2 -> Event next(4)
2017-02-02 20:43:48.089: sequence -> Event next(5)
2017-02-02 20:43:48.089: delayed #1 -> Event next(5)
2017-02-02 20:43:48.089: delayed #2 -> Event next(5)
2017-02-02 20:43:48.170: delayed #1 -> isDisposed // все отписались
2017-02-02 20:43:48.170: delayed #2 -> isDisposed // все отписались
2017-02-02 20:43:48.188: sequence -> Event next(6) // продолжила генерировать
2017-02-02 20:43:48.289: sequence -> Event next(7) // продолжила генерировать
2017-02-02 20:43:48.289: sequence -> Event completed
2017-02-02 20:43:48.289: sequence -> isDisposed
Как видно, хоть подписка и была произведена в разное время, пока не вызвали команду connect — генерация элементов не началась. Зато благодаря команде debug видно, что даже после того как все отписались, последовательность продолжила генерировать элементы.
Позволяет создать обычный Observable из Connectable. После первого вызова subscribe к этому обычному Observable происходит подписка Connectable на SO.
Получается что-то вроде
publishSequence = SO.publish()
refCountSequence = publishSequence.refCount()
SO будет продолжать генерировать элементы до тех пор, пока есть хотя бы один подписанный на refCountSequence. Как только все подписки на refCountSequence аннулируются, происходит отписка и publishSequence от SO.
var disposeBag: DisposeBag? = DisposeBag()
example("\"refCount\"") {
let sequence = getExternalObservable()
.debug("\tsequence\t\t")
let refCount = sequence
.publish()
.refCount()
.debug("\trefCount\t\t")
Observable
.timer(0.0, period: 0.1, scheduler: MainScheduler.instance)
.take(2)
.subscribe(onNext: { value in
let disposable = refCount
.debug("\tsubscribe #\(value)\t")
.subscribe()
disposeBag?.insert(disposable)
})
delay(0.35, closure: { _ in
disposeBag = nil
})
}
private func getExternalObservable() -> Observable {
let timer = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.shareReplay(1)
defer {
timer.subscribe()
}
return timer
}
Консоль:
--- "refCount" example ---
2017-02-02 21:03:14.487: subscribe #0 -> subscribed
2017-02-02 21:03:14.488: refCount -> subscribed
2017-02-02 21:03:14.488: sequence -> subscribed
2017-02-02 21:03:14.489: sequence -> Event next(0)
2017-02-02 21:03:14.489: refCount -> Event next(0)
2017-02-02 21:03:14.489: subscribe #0 -> Event next(0)
2017-02-02 21:03:14.572: sequence -> Event next(1)
2017-02-02 21:03:14.573: refCount -> Event next(1)
2017-02-02 21:03:14.573: subscribe #0 -> Event next(1)
2017-02-02 21:03:14.576: subscribe #1 -> subscribed
2017-02-02 21:03:14.576: refCount -> subscribed
2017-02-02 21:03:14.673: sequence -> Event next(2)
2017-02-02 21:03:14.673: refCount -> Event next(2)
2017-02-02 21:03:14.673: subscribe #0 -> Event next(2)
2017-02-02 21:03:14.673: refCount -> Event next(2)
2017-02-02 21:03:14.673: subscribe #1 -> Event next(2)
2017-02-02 21:03:14.773: sequence -> Event next(3)
2017-02-02 21:03:14.774: refCount -> Event next(3)
2017-02-02 21:03:14.774: subscribe #0 -> Event next(3)
2017-02-02 21:03:14.774: refCount -> Event next(3)
2017-02-02 21:03:14.774: subscribe #1 -> Event next(3)
2017-02-02 21:03:14.835: subscribe #0 -> isDisposed
2017-02-02 21:03:14.835: refCount -> isDisposed
2017-02-02 21:03:14.835: subscribe #1 -> isDisposed
2017-02-02 21:03:14.835: refCount -> isDisposed // отписался последний
2017-02-02 21:03:14.835: sequence -> isDisposed // отписка от SO
Если SO обычный, — конвертирует его в Connectable. После этого все, кто подпишутся на него после вызова connect(), мгновенно получат в качестве первых элементов последние сгенерированные N элементов. Даже если отпишутся все — Connectable будет продолжать генерировать элементы.
var disposeBag: DisposeBag? = DisposeBag()
example("\"replay\"") {
let sequence = getExternalObservable()
.replay(2)
let disposable1 = sequence
.debug("\tfirst\t")
.subscribe()
disposeBag?.insert(disposable1)
delay(0.5, closure: { _ in
let disposable2 = sequence
.debug("\tsecond\t")
.subscribe()
disposeBag?.insert(disposable2)
})
delay(0.3, closure: { _ in
sequence.connect()
})
delay(0.7, closure: { _ in
disposeBag = nil
})
}
fileprivate func getExternalObservable() -> Observable {
let timer = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(10)
.shareReplay(1)
defer {
timer.debug("\ttimer\t").subscribe()
}
return timer
}
Консоль:
--- "replay" example ---
2017-02-02 21:36:30.914: timer -> subscribed
2017-02-02 21:36:30.918: first -> subscribed
2017-02-02 21:36:30.931: timer -> Event next(0)
2017-02-02 21:36:31.016: timer -> Event next(1)
2017-02-02 21:36:31.116: timer -> Event next(2)
2017-02-02 21:36:31.216: timer -> Event next(3)
2017-02-02 21:36:31.248: first -> Event next(3)
2017-02-02 21:36:31.316: timer -> Event next(4)
2017-02-02 21:36:31.317: first -> Event next(4)
2017-02-02 21:36:31.416: timer -> Event next(5)
2017-02-02 21:36:31.416: first -> Event next(5)
2017-02-02 21:36:31.468: second -> subscribed
2017-02-02 21:36:31.468: second -> Event next(4) // мгновенно получил
2017-02-02 21:36:31.468: second -> Event next(5) // мгновенно получил
2017-02-02 21:36:31.515: timer -> Event next(6)
2017-02-02 21:36:31.516: first -> Event next(6)
2017-02-02 21:36:31.516: second -> Event next(6)
2017-02-02 21:36:31.616: timer -> Event next(7)
2017-02-02 21:36:31.617: first -> Event next(7)
2017-02-02 21:36:31.617: second -> Event next(7)
2017-02-02 21:36:31.688: first -> isDisposed // все отписались
2017-02-02 21:36:31.688: second -> isDisposed // все отписались
2017-02-02 21:36:31.716: timer -> Event next(8) // продолжают генерироваться
2017-02-02 21:36:31.816: timer -> Event next(9) // продолжают генерироваться
2017-02-02 21:36:31.817: timer -> Event completed
2017-02-02 21:36:31.817: timer -> isDisposed
Если SO обычный, — конвертирует его в Connectable. Все, кто подпишутся на него, после вызова connect() получат сначала все элементы, которые были сгенерированы ранее. Даже если отпишутся все, Connectable будет продолжать генерировать элементы.
var disposeBag: DisposeBag? = DisposeBag()
example("\"replayAll\"") {
let sequence = getExternalObservable()
.replayAll()
let disposable1 = sequence
.debug("\tfirst\t")
.subscribe()
disposeBag?.insert(disposable1)
delay(0.5, closure: { _ in
let disposable2 = sequence
.debug("\tsecond\t")
.subscribe()
disposeBag?.insert(disposable2)
})
delay(0.3, closure: { _ in
sequence.connect()
})
delay(0.7, closure: { _ in
disposeBag = nil
})
}
fileprivate func getExternalObservable() -> Observable {
let timer = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(10)
.shareReplay(1)
defer {
timer.debug("\ttimer\t").subscribe()
}
return timer
}
Консоль:
--- "replayAll" example ---
2017-02-02 21:39:27.174: timer -> subscribed
2017-02-02 21:39:27.178: first -> subscribed
2017-02-02 21:39:27.190: timer -> Event next(0)
2017-02-02 21:39:27.276: timer -> Event next(1)
2017-02-02 21:39:27.376: timer -> Event next(2)
2017-02-02 21:39:27.476: timer -> Event next(3)
2017-02-02 21:39:27.509: first -> Event next(3) // 1й элемент
2017-02-02 21:39:27.576: timer -> Event next(4)
2017-02-02 21:39:27.576: first -> Event next(4) // 2й элемент
2017-02-02 21:39:27.675: timer -> Event next(5)
2017-02-02 21:39:27.675: first -> Event next(5) // 3й элемент
2017-02-02 21:39:27.731: second -> subscribed // подписание
2017-02-02 21:39:27.731: second -> Event next(3) // сразу получил 1й
2017-02-02 21:39:27.731: second -> Event next(4) // сразу получил 2й
2017-02-02 21:39:27.731: second -> Event next(5) // сразу получил 3й
2017-02-02 21:39:27.775: timer -> Event next(6)
2017-02-02 21:39:27.775: first -> Event next(6)
2017-02-02 21:39:27.775: second -> Event next(6)
2017-02-02 21:39:27.875: timer -> Event next(7)
2017-02-02 21:39:27.876: first -> Event next(7)
2017-02-02 21:39:27.876: second -> Event next(7)
2017-02-02 21:39:27.946: first -> isDisposed // все отписались
2017-02-02 21:39:27.946: second -> isDisposed // все отписались
2017-02-02 21:39:27.975: timer -> Event next(8) // продолжают генерироваться
2017-02-02 21:39:28.076: timer -> Event next(9) // продолжают генерироваться
2017-02-02 21:39:28.076: timer -> Event completed
2017-02-02 21:39:28.076: timer -> isDisposed
Вспомогательные методы
debug
RO полностью дублирует SO, но логируются все события с временной меткой.
example("\"debug\"") {
Observable
.of(1, 2, 3)
.debug("sequence")
.subscribe{}
}
Консоль:
--- "debug" example ---
2017-02-03 10:01:07.746: sequence -> subscribed
2017-02-03 10:01:07.748: sequence -> Event next(1)
2017-02-03 10:01:07.748: sequence -> Event next(2)
2017-02-03 10:01:07.748: sequence -> Event next(3)
2017-02-03 10:01:07.748: sequence -> Event completed
2017-02-03 10:01:07.748: sequence -> isDisposed
RO полностью дублирует SO, но мы встраиваем перехватчик всех событий из жизненного цикла SO.
example("\"do\"") {
Observable
.of(1, 2, 3)
.do(onNext: { value in
print("do(\(value))")
})
.subscribe({ event in
print(event)
})
}
Консоль:
--- "do" example ---
do(1)
next(1)
do(2)
next(2)
do(3)
next(3)
completed
Дублирует элементы из SO в RO, но с временной задержкой, указанной в качестве параметра.
example("\"delaySubscription\"") {
var timer: Observable = getExternalObservable()
timer
.debug("\tnormal\t")
.subscribe()
timer
.delaySubscription(0.9, scheduler: MainScheduler.instance)
.debug("\tdelayed\t")
.subscribe()
}
private func getExternalObservable() -> Observable {
let observable = Observable
.timer(0, period: 0.2, scheduler: MainScheduler.instance)
.take(3)
defer {
observable.subscribe()
}
return observable
}
Консоль:
--- "delaySubscription" example ---
2017-02-03 10:58:13.985: normal -> subscribed
2017-02-03 10:58:13.989: delayed -> subscribed
2017-02-03 10:58:13.999: normal -> Event next(0)
2017-02-03 10:58:14.187: normal -> Event next(1)
2017-02-03 10:58:14.387: normal -> Event next(2)
2017-02-03 10:58:14.387: normal -> Event completed
2017-02-03 10:58:14.387: normal -> isDisposed
2017-02-03 10:58:14.890: delayed -> Event next(0)
2017-02-03 10:58:15.091: delayed -> Event next(1)
2017-02-03 10:58:15.290: delayed -> Event next(2)
2017-02-03 10:58:15.291: delayed -> Event completed
2017-02-03 10:58:15.291: delayed -> isDisposed
Указывает, на каком Scheduler должен выполнять свою работу Observer, особенно критично при работе с GUI.
example("\"observeOn\"") {
DispatchQueue.global(qos: .background).async {
Observable
.of(1, 2, 3)
.observeOn(MainScheduler.instance)
.subscribe({ event in
print("Main: \(Thread.isMainThread)\t\tEvent: \(event)")
})
Observable
.of("A", "B", "C")
.subscribe({ event in
print("Main: \(Thread.isMainThread)\t\tEvent: \(event)")
})
}
}
Консоль:
--- "observeOn" example ---
Main: false Event: next(A)
Main: false Event: next(B)
Main: true Event: next(1)
Main: true Event: next(2)
Main: true Event: next(3)
Main: true Event: completed
Main: false Event: next(C)
Main: false Event: completed
Как видно, благодаря observeOn мы смогли выполнить код внутри subscribe на другом потоке, хотя оба Observable были запущены на background.
Оператор, связывающий Observable с Observer, позволяет подписаться на все события из Observable.
example("\"subscribe\" with failure") {
observable(failing: true)
}
example("\"subscribe\" with success") {
observable(failing: false)
}
func observable(failing: Bool) {
var observable: Observable!
switch failing {
case true: observable = Observable.error(RxError.argumentOutOfRange)
case false: observable = Observable.of(1)
}
observable
.subscribe(onNext: { value in
print("next: \(value)")
}, onError: { error in
print("error: \(error)")
}, onCompleted: {
print("completed")
}, onDisposed: {
print("disposed")
})
}
Консоль:
--- "subscribe" with failure example ---
error: Argument out of range.
disposed
--- "subscribe" with success example ---
next: 1
completed
disposed
Указывает, на каком Scheduler выполнять подписку на Observable. Редко используемый оператор. Для получения колбэков на нужном Scheduler следует пользоваться observeOn.
example("\"subscribeOn\"") {
DispatchQueue.global(qos: .background).async {
Observable
.of(1, 2, 3)
.subscribeOn(MainScheduler.instance)
.subscribe({ event in
print("Main: \(Thread.isMainThread)\t\tEvent: \(event)")
})
Observable
.of("A", "B", "C")
.subscribe({ event in
print("Main: \(Thread.isMainThread)\t\tEvent: \(event)")
})
}
}
Консоль:
--- "subscribeOn" example ---
Main: true Event: next(1)
Main: true Event: next(2)
Main: true Event: next(3)
Main: true Event: completed
Main: false Event: next(A)
Main: false Event: next(B)
Main: false Event: next(C)
Main: false Event: completed
Дублирует элементы из SO в RO, но если в течение указанного времени SO не сгенерировало ни одного элемента, RO генерирует ошибку.
delay(0.0) { _ in
example("\"timeout\" with success") {
observable(timeout: 0.3, delay: 0.1)
.debug("\tsuccess\t")
.subscribe()
}
}
delay(0.5) { _ in
example("\"timeout\" with failure") {
observable(timeout: 0.3, delay: 0.4)
.debug("\tfailure\t")
.subscribe()
}
}
func observable(timeout: RxTimeInterval, delay: RxTimeInterval) -> Observable {
return Observable
.timer(delay, period: 0.1, scheduler: MainScheduler.instance)
.take(1)
.timeout(timeout, scheduler: MainScheduler.instance)
}
Консоль:
--- "timeout" with success example ---
2017-02-03 12:01:31.344: success -> subscribed
2017-02-03 12:01:31.447: success -> Event next(0)
2017-02-03 12:01:31.447: success -> Event completed
2017-02-03 12:01:31.447: success -> isDisposed
--- "timeout" with failure example ---
2017-02-03 12:01:31.856: failure -> subscribed
2017-02-03 12:01:32.157: failure -> Event error(Sequence timeout.)
2017-02-03 12:01:32.157: failure -> isDisposed
using
Позволяет проинструктировать Observable создать ресурс, который будет жить лишь пока жив RO, в качестве параметров передаются 2 фабрики, одна генерирует ресурс, вторая — Observable из ресурса, у которых будет единое время жизни.
example("\"using\"") {
Observable
.of(1,3)
.flatMap { value -> Observable in
return Observable
.using(resourceFactory(value), observableFactory: observableFactory())
}
.subscribe({ event in
print(event)
})
}
class Factory: Disposable {
let value: Int
init(_ value: Int) {
self.value = value
}
var observable: Observable {
let array = Array(repeating: value, count: value)
return Observable.from(array)
}
func dispose() {
print("Factory(\(value)) disposed")
}
}
func resourceFactory(_ value: Int) -> (() -> Factory) {
func resource() -> Factory {
return Factory(value)
}
return resource
}
func observableFactory() -> ((Factory) -> Observable) {
func observable(_ resource: Factory) -> Observable {
return resource.observable
}
return observable
}
Консоль:
--- "using" example ---
next(1)
Factory(1) disposed
next(3)
next(3)
next(3)
Factory(3) disposed
completed
Как видно, после того, как Observable закончил генерировать элементы, у нашего ресурса Factory был вызван метод dispose.
За материал выражаем благодарность .
Запись впервые появилась .
Заинтересовавшись темой функционального программирования, я встал на распутье: какой фреймворк выбрать для ознакомления? ReactiveCocoa — ветеран в iOS-кругах, информации по нему вдоволь. Но он вырос из Objective-C, и хотя это не является проблемой, все же в данный момент я в основном пишу на Swift, и хотелось бы взять решение, изначально спроектированное с учетом всех плюшек этого языка. RxSwift же — порт Reactive Extensions; последний, конечно, имеет долгую историю, но сам порт свежий и написан как раз под Swift. На нем я и решил остановиться.
Как выяснилось, документация по RxSwift несколько специфична: описание всех команд ведет на , а там в основном дается общая информация, у разработчиков еще не дошли руки сделать документацию именно для RxSwift, что не всегда удобно. Некоторые команды имеют тонкости в реализации, а есть такие, о которых в общей документации нет ничего, кроме упоминания.
Прочитав все главы вики с гитхаба RxSwift, я решил сразу поразбираться с официальными примерами; тут-то и стало ясно, что с RX такое не пройдет, нужно хорошо понимать основы, иначе будешь как мартышка с
В общем, я решил проработать все операторы RxSwift. Лучший способ что-то понять в программировании — запустить код и посмотреть, как он отработает. Учитывая специфику реактивного программирования, лучше дополнить это схемами (они бывают очень полезны), ну и кратким описанием на русском. Закончив сегодня работу, я подумал, что грех не поделиться результатами с тем, кто лишь присматривается к теме реактивного программирования.
Много картинок и текста ниже, очень много!
Предварительно я рекомендую просмотреть , у меня передана основная суть и специфика RxSwift команд, а не основы.
Так же можно «поиграться» с шариками в схемах, так называемые , есть бесплатная версия под .
Итак, в этой статье я рассмотрю все (ну или почти все) команды RxSwift, дам для каждой краткое описание, схему (если это имеет смысл), код, результат выполнения, а при необходимости сделаю комментарии по выводу в лог результатов выполнения кода.
В статье заголовок каждой команды — ссылка на на официальную документацию, т.к. я не ставил перед собой цели перевести все нюансы по командам.
Вот , куда я склонировал официальный репозиторий RxSwift и добавил свою песочницу (DescribeOperators.playground), где и будет практически тот же код, что и в статье.
А вот и , где в виде mindMap собраны все команды, что позволяет быстро просмотреть их все. Кусочки кода в PDF приложены для того, чтобы увидеть, как и с каким параметрами нужно работать с командой. Изначально ради этого PDF я все и затеял — чтобы иметь под рукой документ, в котором наглядно видны все команды с их схемами. PDF получился огромным (в плане рабочего пространства, а не веса), но я проверял, даже на iPad 2 все нормально просматривается.
Обо всех ошибках просьба писать в личку, объем работ оказался слегка великоват, после четвертой вычитки текста мои глаза меня прокляли.
Что ж, надеюсь, моя работа кому-то пригодится. Приступим.
Содержание
Создание Observable
Комбинирование Observable
Фильтрация
Трансформация
Операторы математические и агрегирования
Работа с ошибками
Операторы для работы с Connectable Observable
Вспомогательные методы
В схемах я буду использовать обозначение Source/SO в качестве Source Observable, RO/Result в качестве Result Observable.
Функция example просто позволяет отделять вывод в консоли, её код следующий (взят из RxSwift):
public func example(_ description: String, action: (Void) -> Void) {
printExampleHeader(description)
action()
}
Во всех примерах, где необходимо работать с временными задержками, если этот код будет запускаться в песочнице, необходимо прописать
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
Также подразумевается, что читатель имеет общее представление о реактивном программировании в общем и об RxSwift в частности. Не знаю, есть ли смысл городить очередную вводную.
Создание Observable
Этот метод реализован в классах RxSwift, если они поддерживают конвертацию в Observable. Например: ControlEvent, ControlProperty, Variable, Driver
example("\"asObservable\"") {
let variable = Variable(0)
variable
.asObservable()
.subscribe({ event in
print(event)
})
variable.value = 1
}
Консоль:
--- "asObservable" example ---
next(0)
next(1)
completed
В данном примере мы Variable преобразовали в Observable и подписались на его события.
Этот метод позволяет создавать Observable с нуля, полностью контролируя, какие элементы и когда он будет генерировать.
example("\"create\"") {
Observable
.create({ observer in
observer.on(.next(1))
observer.on(.next(2))
observer.on(.next("A"))
observer.on(.next("B"))
observer.onCompleted()
return Disposables.create()
})
.subscribe({ event in
print(event)
})
}
Консоль:
--- "create" example ---
next(1)
next(2)
next(A)
next(B)
completed
В данном примере мы создали Observable, который сгенерирует несколько значений, и в конце вызовется complete.
Этот оператор позволяет отложить создание Observable до момента подписки с помощью subscribe.
example("without \"deferred\"") {
var i = 1
let observable = Observable.just(i)
i = 2
observable
.subscribe({ event in
print(event)
})
}
example("with \"deferred\"") {
var i = 1
let observable = Observable.deferred({ Observable.just(i) })
i = 2
observable
.subscribe({ event in
print(event)
})
}
Консоль:
--- without "deferred" example ---
next(1)
completed
--- with "deferred" example ---
next(2)
completed
В первом случае Observable создается сразу, с помощью Observable.just(i), и изменение значения i уже не влияет на генерируемый этой последовательностью элемент. Во втором же случае мы создаем Observable с помощью deferred и можем поменять значение i перед subscribe.
Пустая последовательность, заканчивающаяся Completed.
example("\"empty\"") {
Observable
.empty()
.subscribe({ event in
print(event)
})
}
Консоль:
--- "empty" example ---
completed
Создаст последовательность, которая состоит из одного события — Error.
example("\"error\"") {
Observable
.error(RxError.unknown)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "error" example ---
error(Unknown error occured.)
Создает бесконечную последовательность, возрастающую с 0 с шагом 1 с указанной периодичностью.
example("\"interval\"") {
Observable
.interval(1, scheduler: MainScheduler.instance)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "interval" example ---
next(0)
next(1)
next(2)
next(3)
.....
Создает последовательность из любого значения, которая завершается Completed.
example("\"just\"") {
Observable
.just(2)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "just" example ---
next(2)
completed
Пустая последовательность, чьи observer’ы никогда не вызываются, т.е. не будет сгенерировано ни одно событие.
example("\"never\"") {
Observable
.never()
.subscribe({ event in
print(event)
})
}
Консоль:
--- "never" example ---
Последовательность из переменного количества элементов. После всех элементов генерируется Completed.
example("\"of\"") {
Observable
.of(2, 3, 5)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "of" example ---
next(2)
next(3)
next(5)
completed
В первом случае мы создали последовательность из двух чисел, во втором — из двух Observable, а затем объединили их между собой с помощью оператора merge.
Создает последовательность с конечным числом элементов, возрастающую с шагом 1 от указанного значения указанное число раз, после всех элементов генерируется Completed.
example("\"range\"") {
Observable
.range(start: 5, count: 3)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "range" example ---
next(5)
next(6)
next(7)
completed
Сгенерировались элементы, начиная с 5, 3 раза с шагом 1.
Бесконечно создавать указанный элемент, без задержек. Никогда не будут сгенерированы события Completed или Error.
example("\"repeatElement\"") {
Observable
.repeatElement(2, scheduler: MainScheduler.instance)
.subscribe({ event in
print(event)
})
}
Консоль:
--- repeatElement example ---
Next(2)
Next(2)
Next(2)
.....
Бесконечная последовательность, возрастающая с 0 с шагом 1, с указанной периодичностью и возможностью задержки при старте. Никогда не будут сгенерированы события Completed или Error.
example("\"timer\"") {
Observable
.timer(2, period: 3, scheduler: MainScheduler.instance)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "timer" example ---
next(0)
next(1)
next(2)
.....
В данном примере последовательность начнет генерировать элементы с задержкой в 2 секунды, каждые 3 секунды.
Комбинирование Observable
SO = [Observable] или SO1, SO2 = Observable
RO = Observable
Из всех Observable SO выбирается тот, который первым начинает генерировать элементы, его элементы и дублируются в RO, остальные SO игнорируются.
example("\"amb\"") {
let subjectA = PublishSubject()
let subjectB = PublishSubject()
let subjectC = PublishSubject()
let subjectD = PublishSubject()
Observable
.amb([subjectA.asObservable(), subjectB.asObservable(), subjectC.asObservable(), subjectD.asObservable()])
.subscribe({ event in
print(event)
})
subjectC.onNext("C1")
subjectD.onNext("D1")
subjectB.onNext("B1")
subjectA.onNext("A1")
subjectC.onNext("C2")
subjectD.onNext("D2")
subjectA.onNext("A2")
}
Консоль:
--- "amb" example ---
next(C1)
next(C2)
Т.к. первым сгенерировал элемент subjectC, лишь его элементы дублируются в RO, остальные игнорируются.
SO = SO1, SO2,... SON = Observable
RO = Observable<f(T,T)>
Как только все Observable сгенерировали хотя бы по одному элементу, эти элементы используются в качестве параметров в переданную функцию, и результат этой функции генерируется RO в качестве элемента. В дальнейшем при генерации элемента любым Observable генерируется новый результат функции с последними элементами из всех комбинируемых Observable.
example("\"combineLatest\"") {
let sequenceA = Observable
.timer(0.15, period: 0.15, scheduler: MainScheduler.instance)
.take(3)
.map({ "A\($0)" })
let sequenceB = Observable
.timer(0.0, period: 0.05, scheduler: MainScheduler.instance)
.take(3)
.map({ "B\($0)" })
Observable
.combineLatest(sequenceA, sequenceB){$0.0 + " - " + $0.1}
.subscribe({ event in
print(event)
})
}
Консоль:
--- "combineLatest" example ---
next(A0 - B2)
next(A1 - B2)
next(A2 - B2)
completed
В этом примере я создал Observable с помощью timer — для генерации элементов с разной задержкой, чтобы было видно, как перемешиваются элементы. К моменту появления первого элемента sequenceA появилось уже три элемента sequenceB. Поэтому первым элементом в RO последовательности стала пара объектов A0 — B2.
SO = Observable<Observable> или SO1, SO2 = Observable
RO = Observable
В RO элементы включают сначала все элементы первого Observable, и лишь затем следующего. Это означает, что если первый Observable никогда не сгенерирует Completed, элементы второго никогда не поступят в RO. Ошибка в текущем Observable пробрасывается в RO.
example("\"concat\"") {
let sequenceA = Observable.of(1, 2, 3)
let sequenceB = Observable.of("A", "B", "C")
sequenceA
.concat(sequenceB)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "concat" example ---
next(1)
next(2)
next(3)
next(A)
next(B)
next(C)
completed
SO = Observable<Observable>
RO = Observable
Элементы RO включают элементы из исходных Observable в том порядке, в котором они были выпущены в исходных Observable.
example("\"merge\"") {
let sequenceA = Observable
.timer(0.0, period: 0.15, scheduler: MainScheduler.instance)
.take(3)
.map({ "A\($0)" })
let sequenceB = Observable
.timer(0.0, period: 0.1, scheduler: MainScheduler.instance)
.take(3)
.map({ "B\($0)" })
Observable
.of(sequenceA, sequenceB)
.merge()
.subscribe({ event in
print(event)
})
}
}
Консоль:
--- "merge" example ---
next(A0)
next(B0)
next(B1)
next(A1)
next(B2)
next(A2)
completed
Последовательности сделаны с задержкой в генерации, и видно, что элементы в RO теперь вперемешку, в том порядке, в котором они были сгенерированы в исходных Observable.
SO = Observable
RO = Observable
В начало SO добавляются элементы, переданные в качестве аргумента.
example("\"startWith\"") {
Observable.of(1, 2, 3)
.startWith(6,7)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "startWith" example ---
next(6)
next(7)
next(1)
next(2)
next(3)
completed
SO = Observable<Observable>
RO = Observable
Изначально подписываемся на O1 генерируемого SO, его элементы зеркально генерируются в RO. Как только из SO генерируется очередной Observable, элементы предыдущего Observable отбрасываются, т.к. происходит отписка от O1, подписываемся на O2 и так далее. Таким образом в RO — элементы лишь из последнего сгенерированного Observable.
example("\"switchLatest\" without delays") {
let variableA = Variable(0)
let variableB = Variable(100)
let variableC = Variable(variableA.asObservable())
variableC
.asObservable()
.switchLatest()
.subscribe({ event in
print(event)
})
variableA.value = 1
variableA.value = 2
variableB.value = 3
variableC.value = variableB.asObservable()
variableB.value = 4
variableA.value = 5
}
example("\"switchLatest\" with delays") {
var sequenceObserver: AnyObserver<Observable>?
Observable<Observable>
.create({ observer in
sequenceObserver = observer
return Disposables.create()
})
.switchLatest()
.debug("result\t\t")
.subscribe()
Observable
.timer(0, period: 0.3, scheduler: MainScheduler.instance)
.skip(1)
.take(3)
.subscribe(onNext: { element in
let observable = Observable
.timer(0.0, period: 0.1, scheduler: MainScheduler.instance)
.map({ $0 + element })
.take(3)
.debug("timer #\(element)\t")
sequenceObserver?.onNext(observable)
}, onCompleted: {
sequenceObserver?.onCompleted()
})
}
Консоль:
--- "switchLatest" without delays example ---
next(0)
next(1)
next(2)
next(3)
next(4)
completed
--- "switchLatest" with delays example ---
2017-02-02 19:18:22.976: result -> subscribed
2017-02-02 19:18:23.282: timer #1 -> subscribed
2017-02-02 19:18:23.283: timer #1 -> Event next(1)
2017-02-02 19:18:23.283: result -> Event next(1)
2017-02-02 19:18:23.384: timer #1 -> Event next(2)
2017-02-02 19:18:23.384: result -> Event next(2)
2017-02-02 19:18:23.483: timer #1 -> Event next(3)
2017-02-02 19:18:23.483: result -> Event next(3)
2017-02-02 19:18:23.483: timer #1 -> Event completed
2017-02-02 19:18:23.483: timer #1 -> isDisposed
2017-02-02 19:18:23.581: timer #2 -> subscribed
2017-02-02 19:18:23.582: timer #2 -> Event next(2)
2017-02-02 19:18:23.582: result -> Event next(2)
2017-02-02 19:18:23.683: timer #2 -> Event next(3)
2017-02-02 19:18:23.683: result -> Event next(3)
2017-02-02 19:18:23.783: timer #2 -> Event next(4)
2017-02-02 19:18:23.783: result -> Event next(4)
2017-02-02 19:18:23.783: timer #2 -> Event completed
2017-02-02 19:18:23.783: timer #2 -> isDisposed
2017-02-02 19:18:23.881: timer #3 -> subscribed
2017-02-02 19:18:23.882: timer #3 -> Event next(3)
2017-02-02 19:18:23.882: result -> Event next(3)
2017-02-02 19:18:23.983: timer #3 -> Event next(4)
2017-02-02 19:18:23.983: result -> Event next(4)
2017-02-02 19:18:24.083: timer #3 -> Event next(5)
2017-02-02 19:18:24.083: result -> Event next(5)
2017-02-02 19:18:24.083: timer #3 -> Event completed
2017-02-02 19:18:24.083: timer #3 -> isDisposed
2017-02-02 19:18:24.083: result -> Event completed
2017-02-02 19:18:24.083: result -> isDisposed
В первом примере показано, как команда работает в статике, когда мы руками переподключаем Observable.
Во втором примере оператором create создан Observable<Observable>, AnyObserver которого мы вынесли в переменную, по таймеру получающую новый Observable, который реализован в виде таймера. Т.к. задержки у таймеров разные, то можно наблюдать. как с помощью switchLatest в RO попадают значения из последнего сгенерированного таймера.
SO1, SO2 = Observable
RO = Observable<f(T,T)>
Как только O1 генерирует элемент, проверяется, сгенерирован ли хоть один элемент в O2, и если да, то берутся последние элементы из O1 и O2 и используются в качестве аргументов для переданной функции, результат которой генерируется RO в качестве элемента.
example("\"withLatestFrom\" O2, where O2 has previous value") {
let variableA = Variable(0)
let variableB = Variable(10)
variableA
.asObservable()
.withLatestFrom(variableB.asObservable()) {"\($0) - \($1)"}
.subscribe({ event in
print(event)
})
variableA.value = 1
variableA.value = 2
variableB.value = 20
variableB.value = 30
variableA.value = 5
variableA.value = 6
}
example("\"withLatestFrom\" O2, where O2 doesn't have previous value") {
let publishA = PublishSubject()
let publishB = PublishSubject()
publishA
.asObservable()
.withLatestFrom(publishB.asObservable()) {"\($0) - \($1)"}
.subscribe({ event in
print(event)
})
publishA.onNext(1)
publishA.onNext(2)
publishB.onNext(20)
publishB.onNext(30)
publishA.onNext(5)
publishA.onNext(6)
}
Консоль:
--- "withLatestFrom" O2, where O2 has previous value example ---
next(0 - 10)
next(1 - 10)
next(2 - 10)
next(5 - 30)
next(6 - 30)
completed
--- "withLatestFrom" O2, where O2 doesn't have previous value example ---
next(5 - 30)
next(6 - 30)
SO = Observable<Observable>
RO = Observable<f(T,T)>
Элементы RO представляют собой комбинацию из элементов, сгенерированных исходными Observable, объединение идет по индексу выпущенного элемента.
example("\"zip\"") {
let publishA = PublishSubject()
let publishB = PublishSubject()
Observable
.zip(publishA.asObservable(), publishB.asObservable()) {"\($0) - \($1)"}
.subscribe({ event in
print(event)
})
publishA.onNext(0)
publishA.onNext(1)
publishA.onNext(2)
publishB.onNext("A")
publishB.onNext("B")
publishA.onNext(3)
publishB.onNext("C")
}
Консоль:
--- "zip" example ---
next(0 - A)
next(1 - B)
next(2 - C)
Из примеров видно, что элементы комбинируются попарно в том порядке, в каком они были сгенерированы в исходных Observable.
Фильтрация
Пропускаем все повторяющиеся подряд идущие элементы.
example("\"distinctUntilChanged\"") {
Observable
.of(1, 1, 2, 3, 3, 1, 1, 2)
.distinctUntilChanged()
.subscribe({ event in
print(event)
})
}
Консоль:
--- "distinctUntilChanged" example ---
next(1)
next(2)
next(3)
next(1)
next(2)
completed
Здесь тонкий момент: отбрасываются не уникальные для всей последовательности элементы, а лишь те, которые идут подряд.
В RO попадает лишь элемент, выпущенный N по счету.
example("\"elementAt\"") {
Observable
.of(1, 8, 2, 3)
.elementAt(1)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "elementAt" example ---
next(8)
completed
Отбрасываются все элементы, которые не удовлетворяют заданным условиям.
example("\"filter\"") {
Observable
.of(2, 12, 5, 4, 30)
.filter({ $0 > 10 })
.subscribe({ event in
print(event)
})
}
Консоль:
--- "filter" example ---
next(12)
next(30)
completed
Отбрасывает все элементы, передаёт только терминальные сообщения Completed и Error.
example("\"ignoreElements\"") {
Observable
.of(1, 8, 2)
.ignoreElements()
.subscribe({ event in
print(event)
})
}
Консоль:
--- "ignoreElements" example ---
completed
При каждом сгенерированном элементе последовательности семплера (воспринимать как таймер) — брать последний выпущенный элемент исходной последовательности и дублировать его в RO, ЕСЛИ он не был сгенерирован ранее.
let firstSequence = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(10)
let secondSequence = Observable
.timer(0, period: 0.17, scheduler: MainScheduler.instance)
.take(10)
firstSequence
.sample(secondSequence)
.subscribe({ event in
print(event)
})
Консоль:
--- "sample" example ---
next(0)
next(1)
next(3)
next(5)
next(6)
next(8)
next(9)
completed
Из исходной последовательности берется единственный элемент, если элементов > 1 — генерировать ошибку. Есть вариант с предикатом.
example("simple \"single\" with error") {
Observable
.of(1, 2, 3, 4)
.single()
.subscribe({ event in
print(event)
})
}
example("predicate \"single\" with success") {
Observable
.of(1, 2, 3, 4)
.single({ $0 == 2 })
.subscribe({ event in
print(event)
})
}
example("predicate \"single\" with error") {
Observable
.of(1, 2, 3, 4)
.single({ $0 < 0 })
.subscribe({ event in
print(event)
})
}
Консоль:
--- simple "single" with error example ---
next(1)
error(Sequence contains more than one element.)
--- predicate "single" with success example ---
next(2)
completed
--- predicate "single" with error example ---
error(Sequence doesn't contain any elements.)
В первом примере в исходной последовательности оказалось больше 1 элемента, поэтому была сгенерирована ошибка в момент генерирования в SO второго элемента.
Во втором примере условиям предиката удовлетворил всего 1 элемент, поэтому ошибки сгенерировано не было.
Из SO отбрасываем первые N элементов.
example("\"skip\"") {
Observable
.of(1, 8, 2, 3)
.skip(2)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "skip" example ---
next(2)
next(3)
completed
Из SO отбрасываем первые элементы, которые были сгенерированы в первые N.
example("\"skip\" with duration") {
Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(5)
.skip(0.25, scheduler: MainScheduler.instance)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "skip" with duration example ---
next(3)
next(4)
completed
Отбрасываем из SO элементы, которые были сгенерированы до начала генерации элементов последовательностью, переданной в качестве параметра.
example("\"skipUntil\"") {
let firstSequence = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(5)
let secondSequence = Observable
.timer(0.25, period: 0.1, scheduler: MainScheduler.instance)
.take(1)
firstSequence
.skipUntil(secondSequence)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "skipUntil" example ---
next(3)
next(4)
completed
Генерация элементов в secondSequence была отложена на 1 секунду с помощью команды delaySubscription, таким образом элементы из firstSequence стали дублироваться в RO лишь через 1 секунду.
Отбрасываем из SO элементы до тех пор, пока функция, переданная в качестве параметра, возвращает true.
example("\"skipWhile\"") {
Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(5)
.skipWhile({ $0 < 3 })
.subscribe({ event in
print(event)
})
}
Консоль:
--- "skipWhile" example ---
next(3)
next(4)
completed
Отбрасываем из SO элементы до тех пор, пока пока функция, переданная в качестве параметра, возвращает true. Отличие от skipWhile в том, что еще одним параметром, переданным в функцию, является индекс сгенерированного элемента.
example("\"skipWhileWithIndex\"") {
Observable
.of(1,2,5,0,7)
.skipWhileWithIndex({ (value, index) -> Bool in
return value < 4 || index < 2
})
.subscribe({ event in
print(event)
})
}
Консоль:
--- "skipWhileWithIndex" example ---
next(5)
next(0)
next(7)
completed
Из SO берутся лишь первые N элементов.
example("\"take\"") {
Observable
.of(1, 2, 3, 4)
.take(2)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "take" example ---
next(1)
next(2)
completed
Из SO берутся лишь элементы, сгенерированные в первые N секунд.
example("\"take\" with duration") {
Observable
.timer(0, period: 0.16, scheduler: MainScheduler.instance)
.take(0.3, scheduler: MainScheduler.instance)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "take" with duration example ---
next(0)
next(1)
completed
Из SO берутся лишь последние N элементов. Что означает: если SO никогда не закончит генерировать элементы, в RO не попадет ни одного элемента.
example("\"takeLast\"") {
Observable.of(1, 2, 3, 4)
.takeLast(2)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "takeLast" example ---
next(3)
next(4)
completed
Второй пример приведен для иллюстрации в задержке генерации элементов в RO из-за ожидания завершения генерации элементов в SO.
Из SO берутся элементы, которые были выпущены до начала генерации элементов последовательностью, переданной в качестве параметра.
example("\"takeUntil\"") {
let firstSequence = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(5)
let secondSequence = Observable
.timer(0.25, period: 0.1, scheduler: MainScheduler.instance)
.take(1)
firstSequence
.takeUntil(secondSequence)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "takeUntil" example ---
next(0)
next(1)
next(2)
completed
Из SO берутся элементы до тех пор, пока функция, переданная в качестве параметра, возвращает true.
example("\"takeWhileWithIndex\"") {
Observable
.of(1,2,5,0,7)
.takeWhileWithIndex({ (value, index) -> Bool in
return value < 4 || index < 2
})
.subscribe({ event in
print(event)
})
}
Консоль:
--- "takeWhileWithIndex" example ---
next(1)
next(2)
completed
Из SO берутся элементы до тех пор, пока функция, переданная в качестве параметра, возвращает true. Отличие от takeWhile в том, что еще одним параметром, переданным в функцию, является индекс сгенерированного элемента.
example("takeWhileWithIndex") {
let sequence = [1,2,3,4,5,6].toObservable()
.takeWhileWithIndex{ (val, idx) in
val % 2 == 0 || idx < 3
}
sequence.subscribe { e in
print(e)
}
}
Консоль:
--- takeWhileWithIndex example ---
Next(1)
Next(2)
Next(3)
Next(4)
Completed
Из SO берутся лишь элементы, после которых не было новых элементов N секунд.
example("\"debounce\"") {
let firstSequence = Observable
.timer(0, period: 1.0, scheduler: MainScheduler.instance)
.map({ "X\($0)" })
.take(7)
.debug("\tfirst\t")
let secondSequence = Observable
.timer(0, period: 1.4, scheduler: MainScheduler.instance)
.skip(1)
.map({ "Y\($0)" })
.take(4)
.debug("\tsecond\t")
Observable
.of(firstSequence, secondSequence)
.merge()
.debounce(0.9, scheduler: MainScheduler.instance)
.debug("\tdebounce\t")
.subscribe()
}
Консоль:
--- "debounce" example ---
2017-01-31 15:18:40.601: debounce -> subscribed
2017-01-31 15:18:40.604: first -> subscribed
2017-01-31 15:18:40.605: second -> subscribed
2017-01-31 15:18:40.618: first -> Event next(X0)
2017-01-31 15:18:41.519: debounce -> Event next(X0)
2017-01-31 15:18:41.606: first -> Event next(X1)
2017-01-31 15:18:42.006: second -> Event next(Y1)
2017-01-31 15:18:42.606: first -> Event next(X2)
2017-01-31 15:18:43.406: second -> Event next(Y2)
2017-01-31 15:18:43.606: first -> Event next(X3)
2017-01-31 15:18:44.507: debounce -> Event next(X3)
2017-01-31 15:18:44.606: first -> Event next(X4)
2017-01-31 15:18:44.806: second -> Event next(Y3)
2017-01-31 15:18:45.605: first -> Event next(X5)
2017-01-31 15:18:46.206: second -> Event next(Y4)
2017-01-31 15:18:46.206: second -> Event completed
2017-01-31 15:18:46.206: second -> isDisposed
2017-01-31 15:18:46.605: first -> Event next(X6)
2017-01-31 15:18:46.605: first -> Event completed
2017-01-31 15:18:46.605: first -> isDisposed
2017-01-31 15:18:46.605: debounce -> Event next(X6)
2017-01-31 15:18:46.605: debounce -> Event completed
2017-01-31 15:18:46.605: debounce -> isDisposed
В данном примере элементы генерируются c разными задержками. Поэтому debounce сработает всего несколько раз в момент, когда между элементами будет достаточный временной промежуток.
Трансформация
SO = Observable<>>
RO = Observable<[T]>
Элементы из SO по определенным правилам объединяются в массивы и генерируются в RO. В качестве параметров передаются count (максимальное число элементов в массиве) и timeSpan (время максимального ожидания наполнения текущего массива из элементов SO). Таким образом, элемент RO являет собой массив [T] длиной от 0 до count.
example("\"buffer\"") {
Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.buffer(timeSpan: 0.16, count: 3, scheduler: MainScheduler.instance)
.take(5)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "buffer" example ---
next([0, 1])
next([2, 3])
next([4])
next([5, 6])
next([7, 8])
completed
Максимальное число элементов в массиве указано равное трем, и оно никак не влияет на наполнение массива в данном примере. За максимальное время ожидания наполнения таймер успевает сгенерировать не более 2 элементов. Однако период таймера и время ожидания наполнения выбраны таким образом, что третий массив успеет получить лишь один объект.
Каждый элемент SO превращается в отдельный Observable, и все элементы из [O1, O2, O3…] объединяются в RO. Порядок генерации элементов в RO зависит от времени их генерации в исходных [O1, O2, O3…] (как в команде merge).
example("\"flatMap\"") {
Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(1)
.take(3)
.flatMap({ (value: Int) -> Observable in
return Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(value)
.take(value)
.map({"\(value) --> \($0)"})
})
.subscribe({ event in
print(event)
})
}
Консоль:
--- "flatMap" example ---
next(1 --> 1)
next(2 --> 2)
next(2 --> 3)
next(3 --> 3)
next(3 --> 4)
next(3 --> 5)
completed
Каждый элемент SO превращается в отдельный Observable.
1) Изначально подписываемся на O1, его элементы зеркально генерируются в RO. Пока O1 генерирует элементы, все последующие Observable, сгенерированные из SO, отбрасываются, на них не подписываемся.
2) как только O1 оканчивается, если будет сгенерирован новый Observable, на него подпишутся, и его элементы будут дублироваться в RO.
Повторяем пункт 1, но вместо O1 берем последний сгенерированный Observable.
example("\"flatMapFirst\"") {
Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(1)
.take(3)
.debug("\tвнешний\t\t")
.flatMapFirst({ (value: Int) -> Observable in
return Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(value)
.take(value)
.map({"\(value) --> \($0)"})
.debug("\tвнутренний\t")
})
.debug("\tрезультат\t")
.subscribe()
}
Консоль:
--- "flatMap" example ---
2017-01-31 17:41:02.078: результат -> subscribed
2017-01-31 17:41:02.079: внешний -> subscribed
2017-01-31 17:41:02.180: внешний -> Event next(1) // учитывается, т.к. впервые
2017-01-31 17:41:02.182: внутренний -> subscribed // начало 1
2017-01-31 17:41:02.281: внешний -> Event next(2) // не учитывается, т.к. 1 жив
2017-01-31 17:41:02.284: внутренний -> Event next(1 --> 1)
2017-01-31 17:41:02.284: результат -> Event next(1 --> 1) // РЕЗУЛЬТАТ
2017-01-31 17:41:02.284: внутренний -> Event completed // окончание 1
2017-01-31 17:41:02.284: внутренний -> isDisposed
2017-01-31 17:41:02.381: внешний -> Event next(3) // учитывается, т.к. 1 окончен
2017-01-31 17:41:02.382: внутренний -> subscribed // начало 3
2017-01-31 17:41:02.382: внешний -> Event completed
2017-01-31 17:41:02.382: внешний -> isDisposed
2017-01-31 17:41:02.682: внутренний -> Event next(3 --> 3)
2017-01-31 17:41:02.682: результат -> Event next(3 --> 3) // РЕЗУЛЬТАТ
2017-01-31 17:41:02.782: внутренний -> Event next(3 --> 4)
2017-01-31 17:41:02.782: результат -> Event next(3 --> 4) // РЕЗУЛЬТАТ
2017-01-31 17:41:02.883: внутренний -> Event next(3 --> 5)
2017-01-31 17:41:02.883: результат -> Event next(3 --> 5) // РЕЗУЛЬТАТ
2017-01-31 17:41:02.883: внутренний -> Event completed // окончание 3
2017-01-31 17:41:02.883: внутренний -> isDisposed
2017-01-31 17:41:02.883: результат -> Event completed
2017-01-31 17:41:02.883: результат -> isDisposed
В примере благодаря задержкам в генерации мы видим, что, пока не произойдет окончание первого Observable, никакой новый Observable его не заменит.
Каждый элемент SO превращается в отдельный Observable. Изначально подписываемся на O1, его элементы зеркально генерируются в RO. Как только из SO выпускается очередной элемент и на его основе генерируется очередной Observable, элементы предыдущего Observable отбрасываются, т.к. происходит отписка. Таким образом в RO — элементы из последнего генерированного Observable.
example("\"flatMapLatest\"") {
Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(1)
.take(3)
.debug("внешний\t")
.flatMapLatest({ (value: Int) -> Observable in
return Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(value)
.take(value)
.map({"\(value) --> \($0)"})
.debug("внутренний\t")
})
.debug("результат\t")
.subscribe()
}
Консоль:
--- "flatMapLatest" example ---
2017-01-31 18:35:25.941: результат -> subscribed
2017-01-31 18:35:25.942: внешний -> subscribed
2017-01-31 18:35:26.044: внешний -> Event next(1) // Новый сигнал
2017-01-31 18:35:26.046: внутренний -> subscribed // flatMapLatest
2017-01-31 18:35:26.144: внешний -> Event next(2) // Новый сигнал
2017-01-31 18:35:26.145: внутренний -> isDisposed // Отписался от прошлого
2017-01-31 18:35:26.145: внутренний -> subscribed // flatMapLatest
2017-01-31 18:35:26.244: внешний -> Event next(3) // Новый сигнал
2017-01-31 18:35:26.245: внутренний -> isDisposed // Отписался от прошлого
2017-01-31 18:35:26.245: внутренний -> subscribed // flatMapLatest
2017-01-31 18:35:26.245: внешний -> Event completed // Новых сигналов не будет
2017-01-31 18:35:26.245: внешний -> isDisposed
2017-01-31 18:35:26.546: внутренний -> Event next(3 --> 3)
2017-01-31 18:35:26.546: результат -> Event next(3 --> 3) // РЕЗУЛЬТАТ
2017-01-31 18:35:26.646: внутренний -> Event next(3 --> 4)
2017-01-31 18:35:26.646: результат -> Event next(3 --> 4) // РЕЗУЛЬТАТ
2017-01-31 18:35:26.746: внутренний -> Event next(3 --> 5)
2017-01-31 18:35:26.746: результат -> Event next(3 --> 5) // РЕЗУЛЬТАТ
2017-01-31 18:35:26.746: внутренний -> Event completed
2017-01-31 18:35:26.746: внутренний -> isDisposed
2017-01-31 18:35:26.746: результат -> Event completed
2017-01-31 18:35:26.746: результат -> isDisposed
В примере благодаря задержкам в генерации мы видим, что, как только генерируется новый Observable, происходит отписка от предыдущего Observable.
Каждый элемент SO превращается в отдельный Observable, и все элементы из [O1, O2, O3…] объединяются в RO. Порядок генерации элементов в RO зависит от времени их генерации в исходных [O1, O2, O3…] (как в команде merge). Отличие от flatMap в том, что еще одним параметром, переданным в функцию, является индекс сгенерированного элемента.
example("\"flatMapWithIndex\"") {
Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(1)
.take(3)
.flatMapWithIndex({ (value: Int, index: Int) -> Observable in
return Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(value)
.take(value)
.map({"\(value, index) --> \($0)"})
})
.subscribe({ event in
print(event)
})
}
Консоль:
--- "flatMapWithIndex" example ---
next((1, 0) --> 1)
next((2, 1) --> 2)
next((2, 1) --> 3)
next((3, 2) --> 3)
next((3, 2) --> 4)
next((3, 2) --> 5)
completed
Observable -> Observable
Элементы SO преобразуются, не меняя порядок их генерации. Можно менять не только значение, но и тип элементов.
example("\"map\"") {
Observable
.of(1, 2, 3)
.map({ $0 * 5 })
.subscribe({ event in
print(event)
})
}
Консоль:
--- "map" example ---
next(5)
next(10)
next(15)
completed
Observable -> Observable
Элементы SO преобразуются, не меняя порядок их генерации. Можно менять не только значение, но и тип элементов. Отличие от map в том, что еще одним параметром, переданным в функцию, является индекс сгенерированного элемента.
example("\"mapWithIndex\"") {
Observable
.of("A", "B", "C")
.mapWithIndex({ $0.0 + "\($0.1)" })
.subscribe({ event in
print(event)
})
}
Консоль:
--- "mapWithIndex" example ---
next(A0)
next(B1)
next(C2)
completed
SO = Observable
RO = Observable<Observable>
Элементы из SO по определенным правилам передаются в генерирующиеся новые Observable. В качестве параметров передаются count (максимальное число элементов, которые будут сгенерированы каждым Observable) и timeSpan (время максимального ожидания наполнения текущего Observable из элементов SO). Таким образом элемент RO являет собой Observable, число генерируемых элементов которого равно от 0 до N. Основное отличие от bufffer в том, что элементы SO зеркалятся сгенерированными Observable моментально, а в случае buffer мы вынуждены ждать указанное в качестве параметра максимальное время (если буфер не заполнится раньше).
example("\"window\"") {
let firstSequence = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.window(timeSpan: 0.16, count: 3, scheduler: MainScheduler.instance)
.take(5)
.subscribe(onNext: { (observable: Observable) in
observable
.subscribe({ event in
print(event)
})
})
}
Консоль:
--- "window" example ---
next(0)
next(1)
completed
next(2)
next(3)
completed
next(4)
completed
next(5)
next(6)
completed
next(7)
next(8)
completed
В примере используются временные задержки, что помогает добиться частичной наполненности генерируемых Observable.
Операторы математические и агрегирования
Каждый элемент SO преобразуется с помощью переданной функции, результат операции передается в качестве параметра в функцию на следующем шаге. Как только SO генерирует терминальное состояние, RO генерирует результат, т.е. RO сгенерирует лишь один элемент.
example("\"reduce\"") {
Observable
.of(1, 2, 3, 4)
.reduce(1) {$0 * $1}
.subscribe({ event in
print(event)
})
}
Консоль:
--- "reduce" example ---
next(24)
completed
Каждый элемент SO преобразуется с помощью переданной функции, результат операции генерируется в RO, но, кроме этого, оно передается в качестве параметра в функцию на следующем шаге. В отличии от reduce число элементов в RO равно числу элементов в SO.
example("\"scan\"") {
Observable
.of(1, 2, 3, 4)
.scan(1) {$0 * $1}
.subscribe({ event in
print(event)
})
}
Консоль:
--- "scan" example ---
next(1)
next(2)
next(6)
next(24)
completed
SO = Observable
RO = Observable<[T]>
Все элементы из SO после генерации терминального состояния объединяются в массив, и генерируются RO.
example("\"toArray\"") {
Observable
.of(1, 2, 3)
.toArray()
.subscribe({ event in
print(event)
})
}
Консоль:
--- "toArray" example ---
next([1, 2, 3])
completed
Работа с ошибками
Позволяет перехватить сгенерированную ошибку из SO и заменить ее на новый Observable, который теперь будет генерировать элементы.
example("without \"catchError\"") {
Observable
.create({ (observer) in
observer.onNext(1)
observer.onNext(2)
observer.onError(RxError.unknown)
observer.onNext(3)
return Disposables.create()
})
.subscribe({ event in
print(event)
})
}
example("with \"catchError\"") {
Observable
.create({ (observer) in
observer.onNext(1)
observer.onNext(2)
observer.onError(RxError.unknown)
observer.onNext(3)
return Disposables.create()
})
.catchError({ (error) -> Observable in
return Observable.just(3)
})
.subscribe({ event in
print(event)
})
}
Консоль:
--- without "catchError" example ---
next(1)
next(2)
error(Unknown error occured.)
--- with "catchError" example ---
next(1)
next(2)
next(3)
completed
После генерации очередного элемента была сгенерирована ошибка, но мы её перехватили и вернули взамен новый Observable.
Позволяет перехватить сгенерированную ошибку из SO и заменить её на указанный элемент, после этого SO генерирует Completed.
example("without \"catchErrorJustReturn\"") {
Observable
.create({ (observer) in
observer.onNext(1)
observer.onNext(2)
observer.onError(RxError.unknown)
observer.onNext(3)
return Disposables.create()
})
.subscribe({ event in
print(event)
})
}
example("with \"catchErrorJustReturn\"") {
Observable
.create({ (observer) in
observer.onNext(1)
observer.onNext(2)
observer.onError(RxError.unknown)
observer.onNext(3)
return Disposables.create()
})
.catchErrorJustReturn(3)
.subscribe({ event in
print(event)
})
}
Консоль:
--- without "catchErrorJustReturn" example ---
next(1)
next(2)
error(Unknown error occured.)
--- with "catchErrorJustReturn" example ---
next(1)
next(2)
next(3)
completed
После генерации очередного элемента была сгенерирована ошибка, но мы её перехватили и вернули взамен новый элемент.
Позволяет перехватить сгенерированную ошибку из SO и в зависимости от переданного параметра попытаться запустить SO c начала нужное число раз в надежде, что ошибка не повторится.
example("\"retry\" once") {
Observable
.create({ (observer) in
observer.onNext(1)
observer.onNext(2)
observer.onError(RxError.unknown)
observer.onNext(3)
return Disposables.create()
})
.retry(2)
.subscribe({ event in
print(event)
})
}
Консоль:
--- "retry" once example ---
next(1)
next(2)
next(1)
next(2)
error(Unknown error occured.)
Передаваемое в оператор целое число означает количество попыток дождаться успешного окончания. 0 — ни одной попытки, т.е. цепочка ни разу не будет запущена, и просто произойдет completed событие. 1 — такое же поведение, как будто оператор и не не был применен. 2 — исходная попытка + дополнительная, и т.д. Если в оператор не передать параметр, то количество попыток повторить будет бесконечным.
Позволяет перехватить сгенерированную ошибку из SO, и в зависимости от типа ошибки мы либо повторно генерируем ошибку, которая пробрасывается в RO, и на этом выполнение заканчивается, либо генерируем Observable (tryObservable), генерация каждого корректного элемента которого выполнит повторную подписку на SO в надежде, что ошибка исчезнет. Если tryObservable заканчивается ошибкой, она пробрасывается в RO, и на этом выполнение заканчивается.
example("\"retryWhen\"") {
var counter = 0
let sequenceWithError = Observable
.create({ observer in
observer.on(.next(1))
observer.on(.next(2))
counter += 1
switch counter {
case 0..
.create({ observer in
observer.on(.next(10))
// observer.onError(RxError.noElements)
return Disposables.create()
})
.debug("without\t")
let retrySequence = sequenceWithError
.retryWhen({ (observableError: Observable) -> Observable in
return observableError
.flatMap({ (error: RxError) -> Observable in
switch error {
case .unknown: return sequenceWithoutError
default: return Observable.error(error)
}
})
})
.subscribe()
}
Консоль:
--- "retryWhen" example ---
2017-02-03 15:23:18.563: with -> subscribed
2017-02-03 15:23:18.564: with -> Event next(1)
2017-02-03 15:23:18.564: with -> Event next(2)
2017-02-03 15:23:18.565: with -> Event error(Unknown error occured.)
2017-02-03 15:23:18.566: without -> subscribed
2017-02-03 15:23:18.566: without -> Event next(10)
2017-02-03 15:23:18.567: with -> isDisposed
2017-02-03 15:23:18.568: with -> subscribed
2017-02-03 15:23:18.568: with -> Event next(1)
2017-02-03 15:23:18.568: with -> Event next(2)
2017-02-03 15:23:18.568: with -> Event error(Unknown error occured.)
2017-02-03 15:23:18.569: without -> subscribed
2017-02-03 15:23:18.569: without -> Event next(10)
2017-02-03 15:23:18.569: with -> isDisposed
2017-02-03 15:23:18.570: with -> subscribed
2017-02-03 15:23:18.570: with -> Event next(1)
2017-02-03 15:23:18.570: with -> Event next(2)
2017-02-03 15:23:18.571: with -> Event next(3)
Я встроил инкремент переменной i в генерацию sequenceWithError, чтобы на 3й попытке ошибка исчезла. Если раскоментировать генерацию ошибки RxError.Overflow, мы её не перехватим в операторе retryWhen и пробросим в RO.
Операторы для работы с Connectable Observable
Позволяет проксировать элементы из исходной SO на Subject, переданный в качестве параметра. Подписываться нужно именно на этот Subject, генерация элементов Subject начнется после вызова оператора connect.
example("\"multicast\"") {
let subject = PublishSubject()
let multicast = getExternalObservable()
.multicast(subject)
subject
.asObservable()
.debug("\tinstant\t")
.subscribe()
delay(0.25, closure: { _ in
multicast.connect()
})
delay(0.45, closure: { _ in
subject
.asObservable()
.debug("\tdelayed\t")
.subscribe()
})
}
func getExternalObservable() -> Observable {
let timer = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(7)
.shareReplay(1)
defer {
timer.subscribe()
}
return timer
}
Консоль:
--- "multicast" example ---
2017-02-02 20:20:14.446: instant -> subscribed
2017-02-02 20:20:14.721: instant -> Event next(2)
2017-02-02 20:20:14.743: instant -> Event next(3)
2017-02-02 20:20:14.844: instant -> Event next(4)
2017-02-02 20:20:14.901: delayed -> subscribed
2017-02-02 20:20:14.943: instant -> Event next(5)
2017-02-02 20:20:14.943: delayed -> Event next(5)
2017-02-02 20:20:15.043: instant -> Event next(6)
2017-02-02 20:20:15.043: delayed -> Event next(6)
2017-02-02 20:20:15.043: instant -> Event completed
2017-02-02 20:20:15.043: instant -> isDisposed
2017-02-02 20:20:15.043: delayed -> Event completed
2017-02-02 20:20:15.043: delayed -> isDisposed
publish = multicast + replay subject
Позволяет создавать Connectable Observable, которые не генерируют события даже после subscribe. Для старта генерации таким Observable нужно дать команду connect. Это позволяет подписать несколько Observer к одному Observable и начать генерировать элементы одновременно, вне зависимости от того, когда был выполнен subscribe.
var disposeBag: DisposeBag? = DisposeBag()
example("\"publish\"") {
let publish = getExternalObservable()
.debug("\tsequence\t")
.publish()
Observable
.timer(0.0, period: 0.1, scheduler: MainScheduler.instance)
.skip(1)
.take(2)
.subscribe(onNext: { value in
let disposable = publish
.asObservable()
.debug("\tdelayed #\(value)\t")
.subscribe()
disposeBag?.insert(disposable)
})
delay(0.25, closure: { _ in
publish.connect()
})
delay(0.55, closure: { _ in
disposeBag = nil
})
}
func getExternalObservable() -> Observable {
let timer = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(8)
.shareReplay(1)
defer {
timer.subscribe()
}
return timer
}
Консоль:
--- "publish" example ---
2017-02-02 20:43:47.691: delayed #1 -> subscribed // ожидание connect
2017-02-02 20:43:47.791: delayed #2 -> subscribed // ожидание connect
2017-02-02 20:43:47.859: sequence -> subscribed // connect случился
2017-02-02 20:43:47.860: sequence -> Event next(2)
2017-02-02 20:43:47.860: delayed #1 -> Event next(2)
2017-02-02 20:43:47.860: delayed #2 -> Event next(2)
2017-02-02 20:43:47.889: sequence -> Event next(3)
2017-02-02 20:43:47.889: delayed #1 -> Event next(3)
2017-02-02 20:43:47.889: delayed #2 -> Event next(3)
2017-02-02 20:43:47.989: sequence -> Event next(4)
2017-02-02 20:43:47.989: delayed #1 -> Event next(4)
2017-02-02 20:43:47.989: delayed #2 -> Event next(4)
2017-02-02 20:43:48.089: sequence -> Event next(5)
2017-02-02 20:43:48.089: delayed #1 -> Event next(5)
2017-02-02 20:43:48.089: delayed #2 -> Event next(5)
2017-02-02 20:43:48.170: delayed #1 -> isDisposed // все отписались
2017-02-02 20:43:48.170: delayed #2 -> isDisposed // все отписались
2017-02-02 20:43:48.188: sequence -> Event next(6) // продолжила генерировать
2017-02-02 20:43:48.289: sequence -> Event next(7) // продолжила генерировать
2017-02-02 20:43:48.289: sequence -> Event completed
2017-02-02 20:43:48.289: sequence -> isDisposed
Как видно, хоть подписка и была произведена в разное время, пока не вызвали команду connect — генерация элементов не началась. Зато благодаря команде debug видно, что даже после того как все отписались, последовательность продолжила генерировать элементы.
Позволяет создать обычный Observable из Connectable. После первого вызова subscribe к этому обычному Observable происходит подписка Connectable на SO.
Получается что-то вроде
publishSequence = SO.publish()
refCountSequence = publishSequence.refCount()
SO будет продолжать генерировать элементы до тех пор, пока есть хотя бы один подписанный на refCountSequence. Как только все подписки на refCountSequence аннулируются, происходит отписка и publishSequence от SO.
var disposeBag: DisposeBag? = DisposeBag()
example("\"refCount\"") {
let sequence = getExternalObservable()
.debug("\tsequence\t\t")
let refCount = sequence
.publish()
.refCount()
.debug("\trefCount\t\t")
Observable
.timer(0.0, period: 0.1, scheduler: MainScheduler.instance)
.take(2)
.subscribe(onNext: { value in
let disposable = refCount
.debug("\tsubscribe #\(value)\t")
.subscribe()
disposeBag?.insert(disposable)
})
delay(0.35, closure: { _ in
disposeBag = nil
})
}
private func getExternalObservable() -> Observable {
let timer = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.shareReplay(1)
defer {
timer.subscribe()
}
return timer
}
Консоль:
--- "refCount" example ---
2017-02-02 21:03:14.487: subscribe #0 -> subscribed
2017-02-02 21:03:14.488: refCount -> subscribed
2017-02-02 21:03:14.488: sequence -> subscribed
2017-02-02 21:03:14.489: sequence -> Event next(0)
2017-02-02 21:03:14.489: refCount -> Event next(0)
2017-02-02 21:03:14.489: subscribe #0 -> Event next(0)
2017-02-02 21:03:14.572: sequence -> Event next(1)
2017-02-02 21:03:14.573: refCount -> Event next(1)
2017-02-02 21:03:14.573: subscribe #0 -> Event next(1)
2017-02-02 21:03:14.576: subscribe #1 -> subscribed
2017-02-02 21:03:14.576: refCount -> subscribed
2017-02-02 21:03:14.673: sequence -> Event next(2)
2017-02-02 21:03:14.673: refCount -> Event next(2)
2017-02-02 21:03:14.673: subscribe #0 -> Event next(2)
2017-02-02 21:03:14.673: refCount -> Event next(2)
2017-02-02 21:03:14.673: subscribe #1 -> Event next(2)
2017-02-02 21:03:14.773: sequence -> Event next(3)
2017-02-02 21:03:14.774: refCount -> Event next(3)
2017-02-02 21:03:14.774: subscribe #0 -> Event next(3)
2017-02-02 21:03:14.774: refCount -> Event next(3)
2017-02-02 21:03:14.774: subscribe #1 -> Event next(3)
2017-02-02 21:03:14.835: subscribe #0 -> isDisposed
2017-02-02 21:03:14.835: refCount -> isDisposed
2017-02-02 21:03:14.835: subscribe #1 -> isDisposed
2017-02-02 21:03:14.835: refCount -> isDisposed // отписался последний
2017-02-02 21:03:14.835: sequence -> isDisposed // отписка от SO
Если SO обычный, — конвертирует его в Connectable. После этого все, кто подпишутся на него после вызова connect(), мгновенно получат в качестве первых элементов последние сгенерированные N элементов. Даже если отпишутся все — Connectable будет продолжать генерировать элементы.
var disposeBag: DisposeBag? = DisposeBag()
example("\"replay\"") {
let sequence = getExternalObservable()
.replay(2)
let disposable1 = sequence
.debug("\tfirst\t")
.subscribe()
disposeBag?.insert(disposable1)
delay(0.5, closure: { _ in
let disposable2 = sequence
.debug("\tsecond\t")
.subscribe()
disposeBag?.insert(disposable2)
})
delay(0.3, closure: { _ in
sequence.connect()
})
delay(0.7, closure: { _ in
disposeBag = nil
})
}
fileprivate func getExternalObservable() -> Observable {
let timer = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(10)
.shareReplay(1)
defer {
timer.debug("\ttimer\t").subscribe()
}
return timer
}
Консоль:
--- "replay" example ---
2017-02-02 21:36:30.914: timer -> subscribed
2017-02-02 21:36:30.918: first -> subscribed
2017-02-02 21:36:30.931: timer -> Event next(0)
2017-02-02 21:36:31.016: timer -> Event next(1)
2017-02-02 21:36:31.116: timer -> Event next(2)
2017-02-02 21:36:31.216: timer -> Event next(3)
2017-02-02 21:36:31.248: first -> Event next(3)
2017-02-02 21:36:31.316: timer -> Event next(4)
2017-02-02 21:36:31.317: first -> Event next(4)
2017-02-02 21:36:31.416: timer -> Event next(5)
2017-02-02 21:36:31.416: first -> Event next(5)
2017-02-02 21:36:31.468: second -> subscribed
2017-02-02 21:36:31.468: second -> Event next(4) // мгновенно получил
2017-02-02 21:36:31.468: second -> Event next(5) // мгновенно получил
2017-02-02 21:36:31.515: timer -> Event next(6)
2017-02-02 21:36:31.516: first -> Event next(6)
2017-02-02 21:36:31.516: second -> Event next(6)
2017-02-02 21:36:31.616: timer -> Event next(7)
2017-02-02 21:36:31.617: first -> Event next(7)
2017-02-02 21:36:31.617: second -> Event next(7)
2017-02-02 21:36:31.688: first -> isDisposed // все отписались
2017-02-02 21:36:31.688: second -> isDisposed // все отписались
2017-02-02 21:36:31.716: timer -> Event next(8) // продолжают генерироваться
2017-02-02 21:36:31.816: timer -> Event next(9) // продолжают генерироваться
2017-02-02 21:36:31.817: timer -> Event completed
2017-02-02 21:36:31.817: timer -> isDisposed
Если SO обычный, — конвертирует его в Connectable. Все, кто подпишутся на него, после вызова connect() получат сначала все элементы, которые были сгенерированы ранее. Даже если отпишутся все, Connectable будет продолжать генерировать элементы.
var disposeBag: DisposeBag? = DisposeBag()
example("\"replayAll\"") {
let sequence = getExternalObservable()
.replayAll()
let disposable1 = sequence
.debug("\tfirst\t")
.subscribe()
disposeBag?.insert(disposable1)
delay(0.5, closure: { _ in
let disposable2 = sequence
.debug("\tsecond\t")
.subscribe()
disposeBag?.insert(disposable2)
})
delay(0.3, closure: { _ in
sequence.connect()
})
delay(0.7, closure: { _ in
disposeBag = nil
})
}
fileprivate func getExternalObservable() -> Observable {
let timer = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(10)
.shareReplay(1)
defer {
timer.debug("\ttimer\t").subscribe()
}
return timer
}
Консоль:
--- "replayAll" example ---
2017-02-02 21:39:27.174: timer -> subscribed
2017-02-02 21:39:27.178: first -> subscribed
2017-02-02 21:39:27.190: timer -> Event next(0)
2017-02-02 21:39:27.276: timer -> Event next(1)
2017-02-02 21:39:27.376: timer -> Event next(2)
2017-02-02 21:39:27.476: timer -> Event next(3)
2017-02-02 21:39:27.509: first -> Event next(3) // 1й элемент
2017-02-02 21:39:27.576: timer -> Event next(4)
2017-02-02 21:39:27.576: first -> Event next(4) // 2й элемент
2017-02-02 21:39:27.675: timer -> Event next(5)
2017-02-02 21:39:27.675: first -> Event next(5) // 3й элемент
2017-02-02 21:39:27.731: second -> subscribed // подписание
2017-02-02 21:39:27.731: second -> Event next(3) // сразу получил 1й
2017-02-02 21:39:27.731: second -> Event next(4) // сразу получил 2й
2017-02-02 21:39:27.731: second -> Event next(5) // сразу получил 3й
2017-02-02 21:39:27.775: timer -> Event next(6)
2017-02-02 21:39:27.775: first -> Event next(6)
2017-02-02 21:39:27.775: second -> Event next(6)
2017-02-02 21:39:27.875: timer -> Event next(7)
2017-02-02 21:39:27.876: first -> Event next(7)
2017-02-02 21:39:27.876: second -> Event next(7)
2017-02-02 21:39:27.946: first -> isDisposed // все отписались
2017-02-02 21:39:27.946: second -> isDisposed // все отписались
2017-02-02 21:39:27.975: timer -> Event next(8) // продолжают генерироваться
2017-02-02 21:39:28.076: timer -> Event next(9) // продолжают генерироваться
2017-02-02 21:39:28.076: timer -> Event completed
2017-02-02 21:39:28.076: timer -> isDisposed
Вспомогательные методы
debug
RO полностью дублирует SO, но логируются все события с временной меткой.
example("\"debug\"") {
Observable
.of(1, 2, 3)
.debug("sequence")
.subscribe{}
}
Консоль:
--- "debug" example ---
2017-02-03 10:01:07.746: sequence -> subscribed
2017-02-03 10:01:07.748: sequence -> Event next(1)
2017-02-03 10:01:07.748: sequence -> Event next(2)
2017-02-03 10:01:07.748: sequence -> Event next(3)
2017-02-03 10:01:07.748: sequence -> Event completed
2017-02-03 10:01:07.748: sequence -> isDisposed
RO полностью дублирует SO, но мы встраиваем перехватчик всех событий из жизненного цикла SO.
example("\"do\"") {
Observable
.of(1, 2, 3)
.do(onNext: { value in
print("do(\(value))")
})
.subscribe({ event in
print(event)
})
}
Консоль:
--- "do" example ---
do(1)
next(1)
do(2)
next(2)
do(3)
next(3)
completed
Дублирует элементы из SO в RO, но с временной задержкой, указанной в качестве параметра.
example("\"delaySubscription\"") {
var timer: Observable = getExternalObservable()
timer
.debug("\tnormal\t")
.subscribe()
timer
.delaySubscription(0.9, scheduler: MainScheduler.instance)
.debug("\tdelayed\t")
.subscribe()
}
private func getExternalObservable() -> Observable {
let observable = Observable
.timer(0, period: 0.2, scheduler: MainScheduler.instance)
.take(3)
defer {
observable.subscribe()
}
return observable
}
Консоль:
--- "delaySubscription" example ---
2017-02-03 10:58:13.985: normal -> subscribed
2017-02-03 10:58:13.989: delayed -> subscribed
2017-02-03 10:58:13.999: normal -> Event next(0)
2017-02-03 10:58:14.187: normal -> Event next(1)
2017-02-03 10:58:14.387: normal -> Event next(2)
2017-02-03 10:58:14.387: normal -> Event completed
2017-02-03 10:58:14.387: normal -> isDisposed
2017-02-03 10:58:14.890: delayed -> Event next(0)
2017-02-03 10:58:15.091: delayed -> Event next(1)
2017-02-03 10:58:15.290: delayed -> Event next(2)
2017-02-03 10:58:15.291: delayed -> Event completed
2017-02-03 10:58:15.291: delayed -> isDisposed
Указывает, на каком Scheduler должен выполнять свою работу Observer, особенно критично при работе с GUI.
example("\"observeOn\"") {
DispatchQueue.global(qos: .background).async {
Observable
.of(1, 2, 3)
.observeOn(MainScheduler.instance)
.subscribe({ event in
print("Main: \(Thread.isMainThread)\t\tEvent: \(event)")
})
Observable
.of("A", "B", "C")
.subscribe({ event in
print("Main: \(Thread.isMainThread)\t\tEvent: \(event)")
})
}
}
Консоль:
--- "observeOn" example ---
Main: false Event: next(A)
Main: false Event: next(B)
Main: true Event: next(1)
Main: true Event: next(2)
Main: true Event: next(3)
Main: true Event: completed
Main: false Event: next(C)
Main: false Event: completed
Как видно, благодаря observeOn мы смогли выполнить код внутри subscribe на другом потоке, хотя оба Observable были запущены на background.
Оператор, связывающий Observable с Observer, позволяет подписаться на все события из Observable.
example("\"subscribe\" with failure") {
observable(failing: true)
}
example("\"subscribe\" with success") {
observable(failing: false)
}
func observable(failing: Bool) {
var observable: Observable!
switch failing {
case true: observable = Observable.error(RxError.argumentOutOfRange)
case false: observable = Observable.of(1)
}
observable
.subscribe(onNext: { value in
print("next: \(value)")
}, onError: { error in
print("error: \(error)")
}, onCompleted: {
print("completed")
}, onDisposed: {
print("disposed")
})
}
Консоль:
--- "subscribe" with failure example ---
error: Argument out of range.
disposed
--- "subscribe" with success example ---
next: 1
completed
disposed
Указывает, на каком Scheduler выполнять подписку на Observable. Редко используемый оператор. Для получения колбэков на нужном Scheduler следует пользоваться observeOn.
example("\"subscribeOn\"") {
DispatchQueue.global(qos: .background).async {
Observable
.of(1, 2, 3)
.subscribeOn(MainScheduler.instance)
.subscribe({ event in
print("Main: \(Thread.isMainThread)\t\tEvent: \(event)")
})
Observable
.of("A", "B", "C")
.subscribe({ event in
print("Main: \(Thread.isMainThread)\t\tEvent: \(event)")
})
}
}
Консоль:
--- "subscribeOn" example ---
Main: true Event: next(1)
Main: true Event: next(2)
Main: true Event: next(3)
Main: true Event: completed
Main: false Event: next(A)
Main: false Event: next(B)
Main: false Event: next(C)
Main: false Event: completed
Дублирует элементы из SO в RO, но если в течение указанного времени SO не сгенерировало ни одного элемента, RO генерирует ошибку.
delay(0.0) { _ in
example("\"timeout\" with success") {
observable(timeout: 0.3, delay: 0.1)
.debug("\tsuccess\t")
.subscribe()
}
}
delay(0.5) { _ in
example("\"timeout\" with failure") {
observable(timeout: 0.3, delay: 0.4)
.debug("\tfailure\t")
.subscribe()
}
}
func observable(timeout: RxTimeInterval, delay: RxTimeInterval) -> Observable {
return Observable
.timer(delay, period: 0.1, scheduler: MainScheduler.instance)
.take(1)
.timeout(timeout, scheduler: MainScheduler.instance)
}
Консоль:
--- "timeout" with success example ---
2017-02-03 12:01:31.344: success -> subscribed
2017-02-03 12:01:31.447: success -> Event next(0)
2017-02-03 12:01:31.447: success -> Event completed
2017-02-03 12:01:31.447: success -> isDisposed
--- "timeout" with failure example ---
2017-02-03 12:01:31.856: failure -> subscribed
2017-02-03 12:01:32.157: failure -> Event error(Sequence timeout.)
2017-02-03 12:01:32.157: failure -> isDisposed
using
Позволяет проинструктировать Observable создать ресурс, который будет жить лишь пока жив RO, в качестве параметров передаются 2 фабрики, одна генерирует ресурс, вторая — Observable из ресурса, у которых будет единое время жизни.
example("\"using\"") {
Observable
.of(1,3)
.flatMap { value -> Observable in
return Observable
.using(resourceFactory(value), observableFactory: observableFactory())
}
.subscribe({ event in
print(event)
})
}
class Factory: Disposable {
let value: Int
init(_ value: Int) {
self.value = value
}
var observable: Observable {
let array = Array(repeating: value, count: value)
return Observable.from(array)
}
func dispose() {
print("Factory(\(value)) disposed")
}
}
func resourceFactory(_ value: Int) -> (() -> Factory) {
func resource() -> Factory {
return Factory(value)
}
return resource
}
func observableFactory() -> ((Factory) -> Observable) {
func observable(_ resource: Factory) -> Observable {
return resource.observable
}
return observable
}
Консоль:
--- "using" example ---
next(1)
Factory(1) disposed
next(3)
next(3)
next(3)
Factory(3) disposed
completed
Как видно, после того, как Observable закончил генерировать элементы, у нашего ресурса Factory был вызван метод dispose.
За материал выражаем благодарность .
Запись впервые появилась .