Что нового
  • Что бы вступить в ряды "Принятый кодер" Вам нужно:
    Написать 10 полезных сообщений или тем и Получить 10 симпатий.
    Для того кто не хочет терять время,может пожертвовать средства для поддержки сервеса, и вступить в ряды VIP на месяц, дополнительная информация в лс.

  • Пользаватели которые будут спамить, уходят в бан без предупреждения. Спам сообщения определяется администрацией и модератором.

  • Гость, Что бы Вы хотели увидеть на нашем Форуме? Изложить свои идеи и пожелания по улучшению форума Вы можете поделиться с нами здесь. ----> Перейдите сюда
  • Все пользователи не прошедшие проверку электронной почты будут заблокированы. Все вопросы с разблокировкой обращайтесь по адресу электронной почте : info@guardianelinks.com . Не пришло сообщение о проверке или о сбросе также сообщите нам.

Знакомство С Rxswift: Примеры Кода Реактивного Программирования На Языке Swift

Sascha

Заместитель Администратора
Команда форума
Администратор
Регистрация
9 Май 2015
Сообщения
1,071
Баллы
155
Возраст
51
Около года назад наш iOS-разработчик компании Noveo, Александр, заинтересовался RxSwift. Столкнувшись с нехваткой документации, Саша решил самостоятельно упорядочить все приобретенные в ходе изучения фреймворка знания — для себя и для других. Результатом стала одна из его статей о Swift 2.2. Со времени ее публикации, конечно, и RxSwift, и сам Swift эволюционировали, и материал нуждался в обновлении, и другой iOS-разработчик из Noveo, Михаил, адаптировал материал для Swift 3. После редакции статья заиграла свежими красками, и на этом мы передаем слово нашим коллегам.

Заинтересовавшись темой функционального программирования, я встал на распутье: какой фреймворк выбрать для ознакомления? ReactiveCocoa — ветеран в iOS-кругах, информации по нему вдоволь. Но он вырос из Objective-C, и хотя это не является проблемой, все же в данный момент я в основном пишу на Swift, и хотелось бы взять решение, изначально спроектированное с учетом всех плюшек этого языка. RxSwift же — порт Reactive Extensions; последний, конечно, имеет долгую историю, но сам порт свежий и написан как раз под Swift. На нем я и решил остановиться.

Как выяснилось, документация по RxSwift несколько специфична: описание всех команд ведет на

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

, а там в основном дается общая информация, у разработчиков еще не дошли руки сделать документацию именно для RxSwift, что не всегда удобно. Некоторые команды имеют тонкости в реализации, а есть такие, о которых в общей документации нет ничего, кроме упоминания.

Прочитав все главы вики с гитхаба RxSwift, я решил сразу поразбираться с официальными примерами; тут-то и стало ясно, что с RX такое не пройдет, нужно хорошо понимать основы, иначе будешь как мартышка с копипастом гранатой. Я начал разбирать самые сложные для понимания команды, а потом перешел к тем, что были вроде и понятны, но задавая себе вопросы по ним, я лишь догадывался, как верно ответить, и уверенности в моих ответах у меня не было.

В общем, я решил проработать все операторы RxSwift. Лучший способ что-то понять в программировании — запустить код и посмотреть, как он отработает. Учитывая специфику реактивного программирования, лучше дополнить это схемами (они бывают очень полезны), ну и кратким описанием на русском. Закончив сегодня работу, я подумал, что грех не поделиться результатами с тем, кто лишь присматривается к теме реактивного программирования.

Много картинок и текста ниже, очень много!

Предварительно я рекомендую просмотреть

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

, у меня передана основная суть и специфика RxSwift команд, а не основы.
Так же можно «поиграться» с шариками в схемах, так называемые

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

, есть бесплатная версия под

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

.

Итак, в этой статье я рассмотрю все (ну или почти все) команды RxSwift, дам для каждой краткое описание, схему (если это имеет смысл), код, результат выполнения, а при необходимости сделаю комментарии по выводу в лог результатов выполнения кода.
В статье заголовок каждой команды — ссылка на на официальную документацию, т.к. я не ставил перед собой цели перевести все нюансы по командам.


Вот

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

, куда я склонировал официальный репозиторий RxSwift и добавил свою песочницу (DescribeOperators.playground), где и будет практически тот же код, что и в статье.
А вот и

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

, где в виде mindMap собраны все команды, что позволяет быстро просмотреть их все. Кусочки кода в PDF приложены для того, чтобы увидеть, как и с каким параметрами нужно работать с командой. Изначально ради этого PDF я все и затеял — чтобы иметь под рукой документ, в котором наглядно видны все команды с их схемами. PDF получился огромным (в плане рабочего пространства, а не веса), но я проверял, даже на iPad 2 все нормально просматривается.

Обо всех ошибках просьба писать в личку, объем работ оказался слегка великоват, после четвертой вычитки текста мои глаза меня прокляли.

Что ж, надеюсь, моя работа кому-то пригодится. Приступим.

Содержание



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Создание Observable



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Комбинирование Observable



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Фильтрация



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Трансформация



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Операторы математические и агрегирования



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Работа с ошибками



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Операторы для работы с Connectable Observable



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Вспомогательные методы



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




В схемах я буду использовать обозначение Source/SO в качестве Source Observable, RO/Result в качестве Result Observable.

Функция example просто позволяет отделять вывод в консоли, её код следующий (взят из RxSwift):

public func example(_ description: String, action: (Void) -> Void) {
printExampleHeader(description)
action()
}

Во всех примерах, где необходимо работать с временными задержками, если этот код будет запускаться в песочнице, необходимо прописать

import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true

Также подразумевается, что читатель имеет общее представление о реактивном программировании в общем и об RxSwift в частности. Не знаю, есть ли смысл городить очередную вводную.

Создание Observable




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Этот метод реализован в классах RxSwift, если они поддерживают конвертацию в Observable. Например: ControlEvent, ControlProperty, Variable, Driver

example("\"asObservable\"") {
let variable = Variable(0)
variable
.asObservable()
.subscribe({ event in
print(event)
})
variable.value = 1
}

Консоль:

--- "asObservable" example ---
next(0)
next(1)
completed

В данном примере мы Variable преобразовали в Observable и подписались на его события.




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Этот метод позволяет создавать Observable с нуля, полностью контролируя, какие элементы и когда он будет генерировать.

example("\"create\"") {
Observable
.create({ observer in
observer.on(.next(1))
observer.on(.next(2))
observer.on(.next("A"))
observer.on(.next("B"))
observer.onCompleted()
return Disposables.create()
})
.subscribe({ event in
print(event)
})
}

Консоль:

--- "create" example ---
next(1)
next(2)
next(A)
next(B)
completed

В данном примере мы создали Observable, который сгенерирует несколько значений, и в конце вызовется complete.




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Этот оператор позволяет отложить создание Observable до момента подписки с помощью subscribe.

example("without \"deferred\"") {
var i = 1
let observable = Observable.just(i)
i = 2
observable
.subscribe({ event in
print(event)
})
}

example("with \"deferred\"") {
var i = 1
let observable = Observable.deferred({ Observable.just(i) })
i = 2
observable
.subscribe({ event in
print(event)
})
}

Консоль:

--- without "deferred" example ---
next(1)
completed

--- with "deferred" example ---
next(2)
completed

В первом случае Observable создается сразу, с помощью Observable.just(i), и изменение значения i уже не влияет на генерируемый этой последовательностью элемент. Во втором же случае мы создаем Observable с помощью deferred и можем поменять значение i перед subscribe.




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Пустая последовательность, заканчивающаяся Completed.



example("\"empty\"") {
Observable
.empty()
.subscribe({ event in
print(event)
})
}

Консоль:

--- "empty" example ---
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Создаст последовательность, которая состоит из одного события — Error.



example("\"error\"") {
Observable
.error(RxError.unknown)
.subscribe({ event in
print(event)
})
}

Консоль:

--- "error" example ---
error(Unknown error occured.)




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Создает бесконечную последовательность, возрастающую с 0 с шагом 1 с указанной периодичностью.



example("\"interval\"") {
Observable
.interval(1, scheduler: MainScheduler.instance)
.subscribe({ event in
print(event)
})
}

Консоль:

--- "interval" example ---
next(0)
next(1)
next(2)
next(3)
.....




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Создает последовательность из любого значения, которая завершается Completed.



example("\"just\"") {
Observable
.just(2)
.subscribe({ event in
print(event)
})
}

Консоль:

--- "just" example ---
next(2)
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Пустая последовательность, чьи observer’ы никогда не вызываются, т.е. не будет сгенерировано ни одно событие.



example("\"never\"") {
Observable
.never()
.subscribe({ event in
print(event)
})
}

Консоль:

--- "never" example ---




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Последовательность из переменного количества элементов. После всех элементов генерируется Completed.



example("\"of\"") {
Observable
.of(2, 3, 5)
.subscribe({ event in
print(event)
})
}

Консоль:

--- "of" example ---
next(2)
next(3)
next(5)
completed

В первом случае мы создали последовательность из двух чисел, во втором — из двух Observable, а затем объединили их между собой с помощью оператора merge.




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Создает последовательность с конечным числом элементов, возрастающую с шагом 1 от указанного значения указанное число раз, после всех элементов генерируется Completed.



example("\"range\"") {
Observable
.range(start: 5, count: 3)
.subscribe({ event in
print(event)
})
}

Консоль:

--- "range" example ---
next(5)
next(6)
next(7)
completed

Сгенерировались элементы, начиная с 5, 3 раза с шагом 1.




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Бесконечно создавать указанный элемент, без задержек. Никогда не будут сгенерированы события Completed или Error.



example("\"repeatElement\"") {
Observable
.repeatElement(2, scheduler: MainScheduler.instance)
.subscribe({ event in
print(event)
})
}

Консоль:

--- repeatElement example ---
Next(2)
Next(2)
Next(2)
.....




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Бесконечная последовательность, возрастающая с 0 с шагом 1, с указанной периодичностью и возможностью задержки при старте. Никогда не будут сгенерированы события Completed или Error.



example("\"timer\"") {
Observable
.timer(2, period: 3, scheduler: MainScheduler.instance)
.subscribe({ event in
print(event)
})
}

Консоль:

--- "timer" example ---
next(0)
next(1)
next(2)
.....

В данном примере последовательность начнет генерировать элементы с задержкой в 2 секунды, каждые 3 секунды.

Комбинирование Observable




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




SO = [Observable] или SO1, SO2 = Observable
RO = Observable
Из всех Observable SO выбирается тот, который первым начинает генерировать элементы, его элементы и дублируются в RO, остальные SO игнорируются.


example("\"amb\"") {
let subjectA = PublishSubject()
let subjectB = PublishSubject()
let subjectC = PublishSubject()
let subjectD = PublishSubject()
Observable
.amb([subjectA.asObservable(), subjectB.asObservable(), subjectC.asObservable(), subjectD.asObservable()])
.subscribe({ event in
print(event)
})
subjectC.onNext("C1")
subjectD.onNext("D1")
subjectB.onNext("B1")
subjectA.onNext("A1")
subjectC.onNext("C2")
subjectD.onNext("D2")
subjectA.onNext("A2")
}

Консоль:

--- "amb" example ---
next(C1)
next(C2)

Т.к. первым сгенерировал элемент subjectC, лишь его элементы дублируются в RO, остальные игнорируются.




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




SO = SO1, SO2,... SON = Observable
RO = Observable<f(T,T)>
Как только все Observable сгенерировали хотя бы по одному элементу, эти элементы используются в качестве параметров в переданную функцию, и результат этой функции генерируется RO в качестве элемента. В дальнейшем при генерации элемента любым Observable генерируется новый результат функции с последними элементами из всех комбинируемых Observable.


example("\"combineLatest\"") {
let sequenceA = Observable
.timer(0.15, period: 0.15, scheduler: MainScheduler.instance)
.take(3)
.map({ "A\($0)" })
let sequenceB = Observable
.timer(0.0, period: 0.05, scheduler: MainScheduler.instance)
.take(3)
.map({ "B\($0)" })
Observable
.combineLatest(sequenceA, sequenceB){$0.0 + " - " + $0.1}
.subscribe({ event in
print(event)
})
}

Консоль:

--- "combineLatest" example ---
next(A0 - B2)
next(A1 - B2)
next(A2 - B2)
completed

В этом примере я создал Observable с помощью timer — для генерации элементов с разной задержкой, чтобы было видно, как перемешиваются элементы. К моменту появления первого элемента sequenceA появилось уже три элемента sequenceB. Поэтому первым элементом в RO последовательности стала пара объектов A0 — B2.




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




SO = Observable<Observable> или SO1, SO2 = Observable
RO = Observable
В RO элементы включают сначала все элементы первого Observable, и лишь затем следующего. Это означает, что если первый Observable никогда не сгенерирует Completed, элементы второго никогда не поступят в RO. Ошибка в текущем Observable пробрасывается в RO.


example("\"concat\"") {
let sequenceA = Observable.of(1, 2, 3)
let sequenceB = Observable.of("A", "B", "C")
sequenceA
.concat(sequenceB)
.subscribe({ event in
print(event)
})
}

Консоль:

--- "concat" example ---
next(1)
next(2)
next(3)
next(A)
next(B)
next(C)
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




SO = Observable<Observable>
RO = Observable
Элементы RO включают элементы из исходных Observable в том порядке, в котором они были выпущены в исходных Observable.


example("\"merge\"") {
let sequenceA = Observable
.timer(0.0, period: 0.15, scheduler: MainScheduler.instance)
.take(3)
.map({ "A\($0)" })
let sequenceB = Observable
.timer(0.0, period: 0.1, scheduler: MainScheduler.instance)
.take(3)
.map({ "B\($0)" })
Observable
.of(sequenceA, sequenceB)
.merge()
.subscribe({ event in
print(event)
})
}
}

Консоль:

--- "merge" example ---
next(A0)
next(B0)
next(B1)
next(A1)
next(B2)
next(A2)
completed

Последовательности сделаны с задержкой в генерации, и видно, что элементы в RO теперь вперемешку, в том порядке, в котором они были сгенерированы в исходных Observable.




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




SO = Observable
RO = Observable
В начало SO добавляются элементы, переданные в качестве аргумента.


example("\"startWith\"") {
Observable.of(1, 2, 3)
.startWith(6,7)
.subscribe({ event in
print(event)
})
}

Консоль:

--- "startWith" example ---
next(6)
next(7)
next(1)
next(2)
next(3)
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




SO = Observable<Observable>
RO = Observable
Изначально подписываемся на O1 генерируемого SO, его элементы зеркально генерируются в RO. Как только из SO генерируется очередной Observable, элементы предыдущего Observable отбрасываются, т.к. происходит отписка от O1, подписываемся на O2 и так далее. Таким образом в RO — элементы лишь из последнего сгенерированного Observable.


example("\"switchLatest\" without delays") {
let variableA = Variable(0)
let variableB = Variable(100)
let variableC = Variable(variableA.asObservable())
variableC
.asObservable()
.switchLatest()
.subscribe({ event in
print(event)
})
variableA.value = 1
variableA.value = 2
variableB.value = 3
variableC.value = variableB.asObservable()
variableB.value = 4
variableA.value = 5
}

example("\"switchLatest\" with delays") {
var sequenceObserver: AnyObserver<Observable>?
Observable<Observable>
.create({ observer in
sequenceObserver = observer
return Disposables.create()
})
.switchLatest()
.debug("result\t\t")
.subscribe()
Observable
.timer(0, period: 0.3, scheduler: MainScheduler.instance)
.skip(1)
.take(3)
.subscribe(onNext: { element in
let observable = Observable
.timer(0.0, period: 0.1, scheduler: MainScheduler.instance)
.map({ $0 + element })
.take(3)
.debug("timer #\(element)\t")
sequenceObserver?.onNext(observable)
}, onCompleted: {
sequenceObserver?.onCompleted()
})
}

Консоль:

--- "switchLatest" without delays example ---
next(0)
next(1)
next(2)
next(3)
next(4)
completed

--- "switchLatest" with delays example ---
2017-02-02 19:18:22.976: result -> subscribed
2017-02-02 19:18:23.282: timer #1 -> subscribed
2017-02-02 19:18:23.283: timer #1 -> Event next(1)
2017-02-02 19:18:23.283: result -> Event next(1)
2017-02-02 19:18:23.384: timer #1 -> Event next(2)
2017-02-02 19:18:23.384: result -> Event next(2)
2017-02-02 19:18:23.483: timer #1 -> Event next(3)
2017-02-02 19:18:23.483: result -> Event next(3)
2017-02-02 19:18:23.483: timer #1 -> Event completed
2017-02-02 19:18:23.483: timer #1 -> isDisposed
2017-02-02 19:18:23.581: timer #2 -> subscribed
2017-02-02 19:18:23.582: timer #2 -> Event next(2)
2017-02-02 19:18:23.582: result -> Event next(2)
2017-02-02 19:18:23.683: timer #2 -> Event next(3)
2017-02-02 19:18:23.683: result -> Event next(3)
2017-02-02 19:18:23.783: timer #2 -> Event next(4)
2017-02-02 19:18:23.783: result -> Event next(4)
2017-02-02 19:18:23.783: timer #2 -> Event completed
2017-02-02 19:18:23.783: timer #2 -> isDisposed
2017-02-02 19:18:23.881: timer #3 -> subscribed
2017-02-02 19:18:23.882: timer #3 -> Event next(3)
2017-02-02 19:18:23.882: result -> Event next(3)
2017-02-02 19:18:23.983: timer #3 -> Event next(4)
2017-02-02 19:18:23.983: result -> Event next(4)
2017-02-02 19:18:24.083: timer #3 -> Event next(5)
2017-02-02 19:18:24.083: result -> Event next(5)
2017-02-02 19:18:24.083: timer #3 -> Event completed
2017-02-02 19:18:24.083: timer #3 -> isDisposed
2017-02-02 19:18:24.083: result -> Event completed
2017-02-02 19:18:24.083: result -> isDisposed

В первом примере показано, как команда работает в статике, когда мы руками переподключаем Observable.
Во втором примере оператором create создан Observable<Observable>, AnyObserver которого мы вынесли в переменную, по таймеру получающую новый Observable, который реализован в виде таймера. Т.к. задержки у таймеров разные, то можно наблюдать. как с помощью switchLatest в RO попадают значения из последнего сгенерированного таймера.




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




SO1, SO2 = Observable
RO = Observable&lt;f(T,T)&gt;
Как только O1 генерирует элемент, проверяется, сгенерирован ли хоть один элемент в O2, и если да, то берутся последние элементы из O1 и O2 и используются в качестве аргументов для переданной функции, результат которой генерируется RO в качестве элемента.


example("\"withLatestFrom\" O2, where O2 has previous value") {
let variableA = Variable(0)
let variableB = Variable(10)
variableA
.asObservable()
.withLatestFrom(variableB.asObservable()) {"\($0) - \($1)"}
.subscribe({ event in
print(event)
})
variableA.value = 1
variableA.value = 2
variableB.value = 20
variableB.value = 30
variableA.value = 5
variableA.value = 6
}

example("\"withLatestFrom\" O2, where O2 doesn't have previous value") {
let publishA = PublishSubject()
let publishB = PublishSubject()
publishA
.asObservable()
.withLatestFrom(publishB.asObservable()) {"\($0) - \($1)"}
.subscribe({ event in
print(event)
})
publishA.onNext(1)
publishA.onNext(2)
publishB.onNext(20)
publishB.onNext(30)
publishA.onNext(5)
publishA.onNext(6)
}

Консоль:

--- "withLatestFrom" O2, where O2 has previous value example ---
next(0 - 10)
next(1 - 10)
next(2 - 10)
next(5 - 30)
next(6 - 30)
completed

--- "withLatestFrom" O2, where O2 doesn't have previous value example ---
next(5 - 30)
next(6 - 30)




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




SO = Observable&lt;Observable&gt;
RO = Observable&lt;f(T,T)&gt;
Элементы RO представляют собой комбинацию из элементов, сгенерированных исходными Observable, объединение идет по индексу выпущенного элемента.


example("\"zip\"") {
let publishA = PublishSubject()
let publishB = PublishSubject()
Observable
.zip(publishA.asObservable(), publishB.asObservable()) {"\($0) - \($1)"}
.subscribe({ event in
print(event)
})
publishA.onNext(0)
publishA.onNext(1)
publishA.onNext(2)
publishB.onNext("A")
publishB.onNext("B")
publishA.onNext(3)
publishB.onNext("C")
}

Консоль:

--- "zip" example ---
next(0 - A)
next(1 - B)
next(2 - C)

Из примеров видно, что элементы комбинируются попарно в том порядке, в каком они были сгенерированы в исходных Observable.

Фильтрация




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Пропускаем все повторяющиеся подряд идущие элементы.



example("\"distinctUntilChanged\"") {
Observable
.of(1, 1, 2, 3, 3, 1, 1, 2)
.distinctUntilChanged()
.subscribe({ event in
print(event)
})
}

Консоль:

--- "distinctUntilChanged" example ---
next(1)
next(2)
next(3)
next(1)
next(2)
completed

Здесь тонкий момент: отбрасываются не уникальные для всей последовательности элементы, а лишь те, которые идут подряд.




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




В RO попадает лишь элемент, выпущенный N по счету.



example("\"elementAt\"") {
Observable
.of(1, 8, 2, 3)
.elementAt(1)
.subscribe({ event in
print(event)
})
}

Консоль:

--- "elementAt" example ---
next(8)
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Отбрасываются все элементы, которые не удовлетворяют заданным условиям.



example("\"filter\"") {
Observable
.of(2, 12, 5, 4, 30)
.filter({ $0 &gt; 10 })
.subscribe({ event in
print(event)
})
}

Консоль:

--- "filter" example ---
next(12)
next(30)
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Отбрасывает все элементы, передаёт только терминальные сообщения Completed и Error.



example("\"ignoreElements\"") {
Observable
.of(1, 8, 2)
.ignoreElements()
.subscribe({ event in
print(event)
})
}

Консоль:

--- "ignoreElements" example ---
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




При каждом сгенерированном элементе последовательности семплера (воспринимать как таймер) — брать последний выпущенный элемент исходной последовательности и дублировать его в RO, ЕСЛИ он не был сгенерирован ранее.



let firstSequence = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(10)
let secondSequence = Observable
.timer(0, period: 0.17, scheduler: MainScheduler.instance)
.take(10)
firstSequence
.sample(secondSequence)
.subscribe({ event in
print(event)
})

Консоль:

--- "sample" example ---
next(0)
next(1)
next(3)
next(5)
next(6)
next(8)
next(9)
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Из исходной последовательности берется единственный элемент, если элементов > 1 — генерировать ошибку. Есть вариант с предикатом.



example("simple \"single\" with error") {
Observable
.of(1, 2, 3, 4)
.single()
.subscribe({ event in
print(event)
})
}

example("predicate \"single\" with success") {
Observable
.of(1, 2, 3, 4)
.single({ $0 == 2 })
.subscribe({ event in
print(event)
})
}

example("predicate \"single\" with error") {
Observable
.of(1, 2, 3, 4)
.single({ $0 &lt; 0 })
.subscribe({ event in
print(event)
})
}

Консоль:

--- simple "single" with error example ---
next(1)
error(Sequence contains more than one element.)

--- predicate "single" with success example ---
next(2)
completed

--- predicate "single" with error example ---
error(Sequence doesn't contain any elements.)

В первом примере в исходной последовательности оказалось больше 1 элемента, поэтому была сгенерирована ошибка в момент генерирования в SO второго элемента.
Во втором примере условиям предиката удовлетворил всего 1 элемент, поэтому ошибки сгенерировано не было.




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Из SO отбрасываем первые N элементов.



example("\"skip\"") {
Observable
.of(1, 8, 2, 3)
.skip(2)
.subscribe({ event in
print(event)
})
}

Консоль:

--- "skip" example ---
next(2)
next(3)
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Из SO отбрасываем первые элементы, которые были сгенерированы в первые N.



example("\"skip\" with duration") {
Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(5)
.skip(0.25, scheduler: MainScheduler.instance)
.subscribe({ event in
print(event)
})
}

Консоль:

--- "skip" with duration example ---
next(3)
next(4)
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Отбрасываем из SO элементы, которые были сгенерированы до начала генерации элементов последовательностью, переданной в качестве параметра.



example("\"skipUntil\"") {
let firstSequence = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(5)
let secondSequence = Observable
.timer(0.25, period: 0.1, scheduler: MainScheduler.instance)
.take(1)
firstSequence
.skipUntil(secondSequence)
.subscribe({ event in
print(event)
})
}

Консоль:

--- "skipUntil" example ---
next(3)
next(4)
completed

Генерация элементов в secondSequence была отложена на 1 секунду с помощью команды delaySubscription, таким образом элементы из firstSequence стали дублироваться в RO лишь через 1 секунду.




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Отбрасываем из SO элементы до тех пор, пока функция, переданная в качестве параметра, возвращает true.



example("\"skipWhile\"") {
Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(5)
.skipWhile({ $0 &lt; 3 })
.subscribe({ event in
print(event)
})
}

Консоль:

--- "skipWhile" example ---
next(3)
next(4)
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Отбрасываем из SO элементы до тех пор, пока пока функция, переданная в качестве параметра, возвращает true. Отличие от skipWhile в том, что еще одним параметром, переданным в функцию, является индекс сгенерированного элемента.



example("\"skipWhileWithIndex\"") {
Observable
.of(1,2,5,0,7)
.skipWhileWithIndex({ (value, index) -&gt; Bool in
return value &lt; 4 || index &lt; 2
})
.subscribe({ event in
print(event)
})
}

Консоль:

--- "skipWhileWithIndex" example ---
next(5)
next(0)
next(7)
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Из SO берутся лишь первые N элементов.



example("\"take\"") {
Observable
.of(1, 2, 3, 4)
.take(2)
.subscribe({ event in
print(event)
})
}

Консоль:

--- "take" example ---
next(1)
next(2)
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Из SO берутся лишь элементы, сгенерированные в первые N секунд.



example("\"take\" with duration") {
Observable
.timer(0, period: 0.16, scheduler: MainScheduler.instance)
.take(0.3, scheduler: MainScheduler.instance)
.subscribe({ event in
print(event)
})
}

Консоль:

--- "take" with duration example ---
next(0)
next(1)
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Из SO берутся лишь последние N элементов. Что означает: если SO никогда не закончит генерировать элементы, в RO не попадет ни одного элемента.



example("\"takeLast\"") {
Observable.of(1, 2, 3, 4)
.takeLast(2)
.subscribe({ event in
print(event)
})
}

Консоль:

--- "takeLast" example ---
next(3)
next(4)
completed

Второй пример приведен для иллюстрации в задержке генерации элементов в RO из-за ожидания завершения генерации элементов в SO.




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Из SO берутся элементы, которые были выпущены до начала генерации элементов последовательностью, переданной в качестве параметра.



example("\"takeUntil\"") {
let firstSequence = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(5)
let secondSequence = Observable
.timer(0.25, period: 0.1, scheduler: MainScheduler.instance)
.take(1)
firstSequence
.takeUntil(secondSequence)
.subscribe({ event in
print(event)
})
}

Консоль:

--- "takeUntil" example ---
next(0)
next(1)
next(2)
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Из SO берутся элементы до тех пор, пока функция, переданная в качестве параметра, возвращает true.



example("\"takeWhileWithIndex\"") {
Observable
.of(1,2,5,0,7)
.takeWhileWithIndex({ (value, index) -&gt; Bool in
return value &lt; 4 || index &lt; 2
})
.subscribe({ event in
print(event)
})
}

Консоль:

--- "takeWhileWithIndex" example ---
next(1)
next(2)
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Из SO берутся элементы до тех пор, пока функция, переданная в качестве параметра, возвращает true. Отличие от takeWhile в том, что еще одним параметром, переданным в функцию, является индекс сгенерированного элемента.



example("takeWhileWithIndex") {
let sequence = [1,2,3,4,5,6].toObservable()
.takeWhileWithIndex{ (val, idx) in
val % 2 == 0 || idx &lt; 3
}
sequence.subscribe { e in
print(e)
}
}

Консоль:

--- takeWhileWithIndex example ---
Next(1)
Next(2)
Next(3)
Next(4)
Completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Из SO берутся лишь элементы, после которых не было новых элементов N секунд.

example("\"debounce\"") {
let firstSequence = Observable
.timer(0, period: 1.0, scheduler: MainScheduler.instance)
.map({ "X\($0)" })
.take(7)
.debug("\tfirst\t")
let secondSequence = Observable
.timer(0, period: 1.4, scheduler: MainScheduler.instance)
.skip(1)
.map({ "Y\($0)" })
.take(4)
.debug("\tsecond\t")
Observable
.of(firstSequence, secondSequence)
.merge()
.debounce(0.9, scheduler: MainScheduler.instance)
.debug("\tdebounce\t")
.subscribe()
}

Консоль:

--- "debounce" example ---
2017-01-31 15:18:40.601: debounce -&gt; subscribed
2017-01-31 15:18:40.604: first -&gt; subscribed
2017-01-31 15:18:40.605: second -&gt; subscribed
2017-01-31 15:18:40.618: first -&gt; Event next(X0)
2017-01-31 15:18:41.519: debounce -&gt; Event next(X0)
2017-01-31 15:18:41.606: first -&gt; Event next(X1)
2017-01-31 15:18:42.006: second -&gt; Event next(Y1)
2017-01-31 15:18:42.606: first -&gt; Event next(X2)
2017-01-31 15:18:43.406: second -&gt; Event next(Y2)
2017-01-31 15:18:43.606: first -&gt; Event next(X3)
2017-01-31 15:18:44.507: debounce -&gt; Event next(X3)
2017-01-31 15:18:44.606: first -&gt; Event next(X4)
2017-01-31 15:18:44.806: second -&gt; Event next(Y3)
2017-01-31 15:18:45.605: first -&gt; Event next(X5)
2017-01-31 15:18:46.206: second -&gt; Event next(Y4)
2017-01-31 15:18:46.206: second -&gt; Event completed
2017-01-31 15:18:46.206: second -&gt; isDisposed
2017-01-31 15:18:46.605: first -&gt; Event next(X6)
2017-01-31 15:18:46.605: first -&gt; Event completed
2017-01-31 15:18:46.605: first -&gt; isDisposed
2017-01-31 15:18:46.605: debounce -&gt; Event next(X6)
2017-01-31 15:18:46.605: debounce -&gt; Event completed
2017-01-31 15:18:46.605: debounce -&gt; isDisposed

В данном примере элементы генерируются c разными задержками. Поэтому debounce сработает всего несколько раз в момент, когда между элементами будет достаточный временной промежуток.

Трансформация




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




SO = Observable&lt;&gt;&gt;
RO = Observable&lt;[T]&gt;
Элементы из SO по определенным правилам объединяются в массивы и генерируются в RO. В качестве параметров передаются count (максимальное число элементов в массиве) и timeSpan (время максимального ожидания наполнения текущего массива из элементов SO). Таким образом, элемент RO являет собой массив [T] длиной от 0 до count.


example("\"buffer\"") {
Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.buffer(timeSpan: 0.16, count: 3, scheduler: MainScheduler.instance)
.take(5)
.subscribe({ event in
print(event)
})
}

Консоль:

--- "buffer" example ---
next([0, 1])
next([2, 3])
next([4])
next([5, 6])
next([7, 8])
completed

Максимальное число элементов в массиве указано равное трем, и оно никак не влияет на наполнение массива в данном примере. За максимальное время ожидания наполнения таймер успевает сгенерировать не более 2 элементов. Однако период таймера и время ожидания наполнения выбраны таким образом, что третий массив успеет получить лишь один объект.




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Каждый элемент SO превращается в отдельный Observable, и все элементы из [O1, O2, O3…] объединяются в RO. Порядок генерации элементов в RO зависит от времени их генерации в исходных [O1, O2, O3…] (как в команде merge).



example("\"flatMap\"") {
Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(1)
.take(3)
.flatMap({ (value: Int) -&gt; Observable in
return Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(value)
.take(value)
.map({"\(value) --&gt; \($0)"})
})
.subscribe({ event in
print(event)
})
}

Консоль:

--- "flatMap" example ---
next(1 --&gt; 1)
next(2 --&gt; 2)
next(2 --&gt; 3)
next(3 --&gt; 3)
next(3 --&gt; 4)
next(3 --&gt; 5)
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Каждый элемент SO превращается в отдельный Observable.
1) Изначально подписываемся на O1, его элементы зеркально генерируются в RO. Пока O1 генерирует элементы, все последующие Observable, сгенерированные из SO, отбрасываются, на них не подписываемся.
2) как только O1 оканчивается, если будет сгенерирован новый Observable, на него подпишутся, и его элементы будут дублироваться в RO.
Повторяем пункт 1, но вместо O1 берем последний сгенерированный Observable.



example("\"flatMapFirst\"") {
Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(1)
.take(3)
.debug("\tвнешний\t\t")
.flatMapFirst({ (value: Int) -&gt; Observable in
return Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(value)
.take(value)
.map({"\(value) --&gt; \($0)"})
.debug("\tвнутренний\t")
})
.debug("\tрезультат\t")
.subscribe()
}

Консоль:

--- "flatMap" example ---
2017-01-31 17:41:02.078: результат -&gt; subscribed
2017-01-31 17:41:02.079: внешний -&gt; subscribed
2017-01-31 17:41:02.180: внешний -&gt; Event next(1) // учитывается, т.к. впервые
2017-01-31 17:41:02.182: внутренний -&gt; subscribed // начало 1
2017-01-31 17:41:02.281: внешний -&gt; Event next(2) // не учитывается, т.к. 1 жив
2017-01-31 17:41:02.284: внутренний -&gt; Event next(1 --&gt; 1)
2017-01-31 17:41:02.284: результат -&gt; Event next(1 --&gt; 1) // РЕЗУЛЬТАТ
2017-01-31 17:41:02.284: внутренний -&gt; Event completed // окончание 1
2017-01-31 17:41:02.284: внутренний -&gt; isDisposed
2017-01-31 17:41:02.381: внешний -&gt; Event next(3) // учитывается, т.к. 1 окончен
2017-01-31 17:41:02.382: внутренний -&gt; subscribed // начало 3
2017-01-31 17:41:02.382: внешний -&gt; Event completed
2017-01-31 17:41:02.382: внешний -&gt; isDisposed
2017-01-31 17:41:02.682: внутренний -&gt; Event next(3 --&gt; 3)
2017-01-31 17:41:02.682: результат -&gt; Event next(3 --&gt; 3) // РЕЗУЛЬТАТ
2017-01-31 17:41:02.782: внутренний -&gt; Event next(3 --&gt; 4)
2017-01-31 17:41:02.782: результат -&gt; Event next(3 --&gt; 4) // РЕЗУЛЬТАТ
2017-01-31 17:41:02.883: внутренний -&gt; Event next(3 --&gt; 5)
2017-01-31 17:41:02.883: результат -&gt; Event next(3 --&gt; 5) // РЕЗУЛЬТАТ
2017-01-31 17:41:02.883: внутренний -&gt; Event completed // окончание 3
2017-01-31 17:41:02.883: внутренний -&gt; isDisposed
2017-01-31 17:41:02.883: результат -&gt; Event completed
2017-01-31 17:41:02.883: результат -&gt; isDisposed

В примере благодаря задержкам в генерации мы видим, что, пока не произойдет окончание первого Observable, никакой новый Observable его не заменит.




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Каждый элемент SO превращается в отдельный Observable. Изначально подписываемся на O1, его элементы зеркально генерируются в RO. Как только из SO выпускается очередной элемент и на его основе генерируется очередной Observable, элементы предыдущего Observable отбрасываются, т.к. происходит отписка. Таким образом в RO — элементы из последнего генерированного Observable.



example("\"flatMapLatest\"") {
Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(1)
.take(3)
.debug("внешний\t")
.flatMapLatest({ (value: Int) -&gt; Observable in
return Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(value)
.take(value)
.map({"\(value) --&gt; \($0)"})
.debug("внутренний\t")
})
.debug("результат\t")
.subscribe()
}

Консоль:

--- "flatMapLatest" example ---
2017-01-31 18:35:25.941: результат -&gt; subscribed
2017-01-31 18:35:25.942: внешний -&gt; subscribed
2017-01-31 18:35:26.044: внешний -&gt; Event next(1) // Новый сигнал
2017-01-31 18:35:26.046: внутренний -&gt; subscribed // flatMapLatest
2017-01-31 18:35:26.144: внешний -&gt; Event next(2) // Новый сигнал
2017-01-31 18:35:26.145: внутренний -&gt; isDisposed // Отписался от прошлого
2017-01-31 18:35:26.145: внутренний -&gt; subscribed // flatMapLatest
2017-01-31 18:35:26.244: внешний -&gt; Event next(3) // Новый сигнал
2017-01-31 18:35:26.245: внутренний -&gt; isDisposed // Отписался от прошлого
2017-01-31 18:35:26.245: внутренний -&gt; subscribed // flatMapLatest
2017-01-31 18:35:26.245: внешний -&gt; Event completed // Новых сигналов не будет
2017-01-31 18:35:26.245: внешний -&gt; isDisposed
2017-01-31 18:35:26.546: внутренний -&gt; Event next(3 --&gt; 3)
2017-01-31 18:35:26.546: результат -&gt; Event next(3 --&gt; 3) // РЕЗУЛЬТАТ
2017-01-31 18:35:26.646: внутренний -&gt; Event next(3 --&gt; 4)
2017-01-31 18:35:26.646: результат -&gt; Event next(3 --&gt; 4) // РЕЗУЛЬТАТ
2017-01-31 18:35:26.746: внутренний -&gt; Event next(3 --&gt; 5)
2017-01-31 18:35:26.746: результат -&gt; Event next(3 --&gt; 5) // РЕЗУЛЬТАТ
2017-01-31 18:35:26.746: внутренний -&gt; Event completed
2017-01-31 18:35:26.746: внутренний -&gt; isDisposed
2017-01-31 18:35:26.746: результат -&gt; Event completed
2017-01-31 18:35:26.746: результат -&gt; isDisposed

В примере благодаря задержкам в генерации мы видим, что, как только генерируется новый Observable, происходит отписка от предыдущего Observable.




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Каждый элемент SO превращается в отдельный Observable, и все элементы из [O1, O2, O3…] объединяются в RO. Порядок генерации элементов в RO зависит от времени их генерации в исходных [O1, O2, O3…] (как в команде merge). Отличие от flatMap в том, что еще одним параметром, переданным в функцию, является индекс сгенерированного элемента.



example("\"flatMapWithIndex\"") {
Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(1)
.take(3)
.flatMapWithIndex({ (value: Int, index: Int) -&gt; Observable in
return Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.skip(value)
.take(value)
.map({"\(value, index) --&gt; \($0)"})
})
.subscribe({ event in
print(event)
})
}

Консоль:

--- "flatMapWithIndex" example ---
next((1, 0) --&gt; 1)
next((2, 1) --&gt; 2)
next((2, 1) --&gt; 3)
next((3, 2) --&gt; 3)
next((3, 2) --&gt; 4)
next((3, 2) --&gt; 5)
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Observable -&gt; Observable
Элементы SO преобразуются, не меняя порядок их генерации. Можно менять не только значение, но и тип элементов.


example("\"map\"") {
Observable
.of(1, 2, 3)
.map({ $0 * 5 })
.subscribe({ event in
print(event)
})
}

Консоль:

--- "map" example ---
next(5)
next(10)
next(15)
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Observable -&gt; Observable
Элементы SO преобразуются, не меняя порядок их генерации. Можно менять не только значение, но и тип элементов. Отличие от map в том, что еще одним параметром, переданным в функцию, является индекс сгенерированного элемента.


example("\"mapWithIndex\"") {
Observable
.of("A", "B", "C")
.mapWithIndex({ $0.0 + "\($0.1)" })
.subscribe({ event in
print(event)
})
}

Консоль:

--- "mapWithIndex" example ---
next(A0)
next(B1)
next(C2)
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




SO = Observable
RO = Observable&lt;Observable&gt;
Элементы из SO по определенным правилам передаются в генерирующиеся новые Observable. В качестве параметров передаются count (максимальное число элементов, которые будут сгенерированы каждым Observable) и timeSpan (время максимального ожидания наполнения текущего Observable из элементов SO). Таким образом элемент RO являет собой Observable, число генерируемых элементов которого равно от 0 до N. Основное отличие от bufffer в том, что элементы SO зеркалятся сгенерированными Observable моментально, а в случае buffer мы вынуждены ждать указанное в качестве параметра максимальное время (если буфер не заполнится раньше).


example("\"window\"") {
let firstSequence = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.window(timeSpan: 0.16, count: 3, scheduler: MainScheduler.instance)
.take(5)
.subscribe(onNext: { (observable: Observable) in
observable
.subscribe({ event in
print(event)
})
})
}

Консоль:

--- "window" example ---
next(0)
next(1)
completed
next(2)
next(3)
completed
next(4)
completed
next(5)
next(6)
completed
next(7)
next(8)
completed

В примере используются временные задержки, что помогает добиться частичной наполненности генерируемых Observable.

Операторы математические и агрегирования




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Каждый элемент SO преобразуется с помощью переданной функции, результат операции передается в качестве параметра в функцию на следующем шаге. Как только SO генерирует терминальное состояние, RO генерирует результат, т.е. RO сгенерирует лишь один элемент.



example("\"reduce\"") {
Observable
.of(1, 2, 3, 4)
.reduce(1) {$0 * $1}
.subscribe({ event in
print(event)
})
}

Консоль:

--- "reduce" example ---
next(24)
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Каждый элемент SO преобразуется с помощью переданной функции, результат операции генерируется в RO, но, кроме этого, оно передается в качестве параметра в функцию на следующем шаге. В отличии от reduce число элементов в RO равно числу элементов в SO.



example("\"scan\"") {
Observable
.of(1, 2, 3, 4)
.scan(1) {$0 * $1}
.subscribe({ event in
print(event)
})
}

Консоль:

--- "scan" example ---
next(1)
next(2)
next(6)
next(24)
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




SO = Observable
RO = Observable&lt;[T]&gt;
Все элементы из SO после генерации терминального состояния объединяются в массив, и генерируются RO.


example("\"toArray\"") {
Observable
.of(1, 2, 3)
.toArray()
.subscribe({ event in
print(event)
})
}

Консоль:

--- "toArray" example ---
next([1, 2, 3])
completed

Работа с ошибками




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Позволяет перехватить сгенерированную ошибку из SO и заменить ее на новый Observable, который теперь будет генерировать элементы.



example("without \"catchError\"") {
Observable
.create({ (observer) in
observer.onNext(1)
observer.onNext(2)
observer.onError(RxError.unknown)
observer.onNext(3)
return Disposables.create()
})
.subscribe({ event in
print(event)
})
}

example("with \"catchError\"") {
Observable
.create({ (observer) in
observer.onNext(1)
observer.onNext(2)
observer.onError(RxError.unknown)
observer.onNext(3)
return Disposables.create()
})
.catchError({ (error) -&gt; Observable in
return Observable.just(3)
})
.subscribe({ event in
print(event)
})
}

Консоль:

--- without "catchError" example ---
next(1)
next(2)
error(Unknown error occured.)

--- with "catchError" example ---
next(1)
next(2)
next(3)
completed

После генерации очередного элемента была сгенерирована ошибка, но мы её перехватили и вернули взамен новый Observable.




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Позволяет перехватить сгенерированную ошибку из SO и заменить её на указанный элемент, после этого SO генерирует Completed.



example("without \"catchErrorJustReturn\"") {
Observable
.create({ (observer) in
observer.onNext(1)
observer.onNext(2)
observer.onError(RxError.unknown)
observer.onNext(3)
return Disposables.create()
})
.subscribe({ event in
print(event)
})
}

example("with \"catchErrorJustReturn\"") {
Observable
.create({ (observer) in
observer.onNext(1)
observer.onNext(2)
observer.onError(RxError.unknown)
observer.onNext(3)
return Disposables.create()
})
.catchErrorJustReturn(3)
.subscribe({ event in
print(event)
})
}

Консоль:

--- without "catchErrorJustReturn" example ---
next(1)
next(2)
error(Unknown error occured.)

--- with "catchErrorJustReturn" example ---
next(1)
next(2)
next(3)
completed

После генерации очередного элемента была сгенерирована ошибка, но мы её перехватили и вернули взамен новый элемент.




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Позволяет перехватить сгенерированную ошибку из SO и в зависимости от переданного параметра попытаться запустить SO c начала нужное число раз в надежде, что ошибка не повторится.



example("\"retry\" once") {
Observable
.create({ (observer) in
observer.onNext(1)
observer.onNext(2)
observer.onError(RxError.unknown)
observer.onNext(3)
return Disposables.create()
})
.retry(2)
.subscribe({ event in
print(event)
})
}

Консоль:

--- "retry" once example ---
next(1)
next(2)
next(1)
next(2)
error(Unknown error occured.)

Передаваемое в оператор целое число означает количество попыток дождаться успешного окончания. 0 — ни одной попытки, т.е. цепочка ни разу не будет запущена, и просто произойдет completed событие. 1 — такое же поведение, как будто оператор и не не был применен. 2 — исходная попытка + дополнительная, и т.д. Если в оператор не передать параметр, то количество попыток повторить будет бесконечным.




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Позволяет перехватить сгенерированную ошибку из SO, и в зависимости от типа ошибки мы либо повторно генерируем ошибку, которая пробрасывается в RO, и на этом выполнение заканчивается, либо генерируем Observable (tryObservable), генерация каждого корректного элемента которого выполнит повторную подписку на SO в надежде, что ошибка исчезнет. Если tryObservable заканчивается ошибкой, она пробрасывается в RO, и на этом выполнение заканчивается.



example("\"retryWhen\"") {
var counter = 0
let sequenceWithError = Observable
.create({ observer in
observer.on(.next(1))
observer.on(.next(2))
counter += 1
switch counter {
case 0..
.create({ observer in
observer.on(.next(10))
// observer.onError(RxError.noElements)
return Disposables.create()
})
.debug("without\t")
let retrySequence = sequenceWithError
.retryWhen({ (observableError: Observable) -&gt; Observable in
return observableError
.flatMap({ (error: RxError) -&gt; Observable in
switch error {
case .unknown: return sequenceWithoutError
default: return Observable.error(error)
}
})
})
.subscribe()
}

Консоль:

--- "retryWhen" example ---
2017-02-03 15:23:18.563: with -&gt; subscribed
2017-02-03 15:23:18.564: with -&gt; Event next(1)
2017-02-03 15:23:18.564: with -&gt; Event next(2)
2017-02-03 15:23:18.565: with -&gt; Event error(Unknown error occured.)
2017-02-03 15:23:18.566: without -&gt; subscribed
2017-02-03 15:23:18.566: without -&gt; Event next(10)
2017-02-03 15:23:18.567: with -&gt; isDisposed
2017-02-03 15:23:18.568: with -&gt; subscribed
2017-02-03 15:23:18.568: with -&gt; Event next(1)
2017-02-03 15:23:18.568: with -&gt; Event next(2)
2017-02-03 15:23:18.568: with -&gt; Event error(Unknown error occured.)
2017-02-03 15:23:18.569: without -&gt; subscribed
2017-02-03 15:23:18.569: without -&gt; Event next(10)
2017-02-03 15:23:18.569: with -&gt; isDisposed
2017-02-03 15:23:18.570: with -&gt; subscribed
2017-02-03 15:23:18.570: with -&gt; Event next(1)
2017-02-03 15:23:18.570: with -&gt; Event next(2)
2017-02-03 15:23:18.571: with -&gt; Event next(3)

Я встроил инкремент переменной i в генерацию sequenceWithError, чтобы на 3й попытке ошибка исчезла. Если раскоментировать генерацию ошибки RxError.Overflow, мы её не перехватим в операторе retryWhen и пробросим в RO.

Операторы для работы с Connectable Observable




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Позволяет проксировать элементы из исходной SO на Subject, переданный в качестве параметра. Подписываться нужно именно на этот Subject, генерация элементов Subject начнется после вызова оператора connect.

example("\"multicast\"") {
let subject = PublishSubject()
let multicast = getExternalObservable()
.multicast(subject)
subject
.asObservable()
.debug("\tinstant\t")
.subscribe()
delay(0.25, closure: { _ in
multicast.connect()
})
delay(0.45, closure: { _ in
subject
.asObservable()
.debug("\tdelayed\t")
.subscribe()
})
}

func getExternalObservable() -&gt; Observable {
let timer = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(7)
.shareReplay(1)
defer {
timer.subscribe()
}
return timer
}

Консоль:

--- "multicast" example ---
2017-02-02 20:20:14.446: instant -&gt; subscribed
2017-02-02 20:20:14.721: instant -&gt; Event next(2)
2017-02-02 20:20:14.743: instant -&gt; Event next(3)
2017-02-02 20:20:14.844: instant -&gt; Event next(4)
2017-02-02 20:20:14.901: delayed -&gt; subscribed
2017-02-02 20:20:14.943: instant -&gt; Event next(5)
2017-02-02 20:20:14.943: delayed -&gt; Event next(5)
2017-02-02 20:20:15.043: instant -&gt; Event next(6)
2017-02-02 20:20:15.043: delayed -&gt; Event next(6)
2017-02-02 20:20:15.043: instant -&gt; Event completed
2017-02-02 20:20:15.043: instant -&gt; isDisposed
2017-02-02 20:20:15.043: delayed -&gt; Event completed
2017-02-02 20:20:15.043: delayed -&gt; isDisposed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




publish = multicast + replay subject
Позволяет создавать Connectable Observable, которые не генерируют события даже после subscribe. Для старта генерации таким Observable нужно дать команду connect. Это позволяет подписать несколько Observer к одному Observable и начать генерировать элементы одновременно, вне зависимости от того, когда был выполнен subscribe.



var disposeBag: DisposeBag? = DisposeBag()

example("\"publish\"") {
let publish = getExternalObservable()
.debug("\tsequence\t")
.publish()
Observable
.timer(0.0, period: 0.1, scheduler: MainScheduler.instance)
.skip(1)
.take(2)
.subscribe(onNext: { value in
let disposable = publish
.asObservable()
.debug("\tdelayed #\(value)\t")
.subscribe()
disposeBag?.insert(disposable)
})
delay(0.25, closure: { _ in
publish.connect()
})
delay(0.55, closure: { _ in
disposeBag = nil
})
}

func getExternalObservable() -&gt; Observable {
let timer = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(8)
.shareReplay(1)
defer {
timer.subscribe()
}
return timer
}

Консоль:

--- "publish" example ---
2017-02-02 20:43:47.691: delayed #1 -&gt; subscribed // ожидание connect
2017-02-02 20:43:47.791: delayed #2 -&gt; subscribed // ожидание connect
2017-02-02 20:43:47.859: sequence -&gt; subscribed // connect случился
2017-02-02 20:43:47.860: sequence -&gt; Event next(2)
2017-02-02 20:43:47.860: delayed #1 -&gt; Event next(2)
2017-02-02 20:43:47.860: delayed #2 -&gt; Event next(2)
2017-02-02 20:43:47.889: sequence -&gt; Event next(3)
2017-02-02 20:43:47.889: delayed #1 -&gt; Event next(3)
2017-02-02 20:43:47.889: delayed #2 -&gt; Event next(3)
2017-02-02 20:43:47.989: sequence -&gt; Event next(4)
2017-02-02 20:43:47.989: delayed #1 -&gt; Event next(4)
2017-02-02 20:43:47.989: delayed #2 -&gt; Event next(4)
2017-02-02 20:43:48.089: sequence -&gt; Event next(5)
2017-02-02 20:43:48.089: delayed #1 -&gt; Event next(5)
2017-02-02 20:43:48.089: delayed #2 -&gt; Event next(5)
2017-02-02 20:43:48.170: delayed #1 -&gt; isDisposed // все отписались
2017-02-02 20:43:48.170: delayed #2 -&gt; isDisposed // все отписались
2017-02-02 20:43:48.188: sequence -&gt; Event next(6) // продолжила генерировать
2017-02-02 20:43:48.289: sequence -&gt; Event next(7) // продолжила генерировать
2017-02-02 20:43:48.289: sequence -&gt; Event completed
2017-02-02 20:43:48.289: sequence -&gt; isDisposed

Как видно, хоть подписка и была произведена в разное время, пока не вызвали команду connect — генерация элементов не началась. Зато благодаря команде debug видно, что даже после того как все отписались, последовательность продолжила генерировать элементы.




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Позволяет создать обычный Observable из Connectable. После первого вызова subscribe к этому обычному Observable происходит подписка Connectable на SO.
Получается что-то вроде
publishSequence = SO.publish()
refCountSequence = publishSequence.refCount()

SO будет продолжать генерировать элементы до тех пор, пока есть хотя бы один подписанный на refCountSequence. Как только все подписки на refCountSequence аннулируются, происходит отписка и publishSequence от SO.



var disposeBag: DisposeBag? = DisposeBag()

example("\"refCount\"") {
let sequence = getExternalObservable()
.debug("\tsequence\t\t")
let refCount = sequence
.publish()
.refCount()
.debug("\trefCount\t\t")
Observable
.timer(0.0, period: 0.1, scheduler: MainScheduler.instance)
.take(2)
.subscribe(onNext: { value in
let disposable = refCount
.debug("\tsubscribe #\(value)\t")
.subscribe()
disposeBag?.insert(disposable)
})
delay(0.35, closure: { _ in
disposeBag = nil
})
}

private func getExternalObservable() -&gt; Observable {
let timer = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.shareReplay(1)
defer {
timer.subscribe()
}
return timer
}

Консоль:

--- "refCount" example ---
2017-02-02 21:03:14.487: subscribe #0 -&gt; subscribed
2017-02-02 21:03:14.488: refCount -&gt; subscribed
2017-02-02 21:03:14.488: sequence -&gt; subscribed
2017-02-02 21:03:14.489: sequence -&gt; Event next(0)
2017-02-02 21:03:14.489: refCount -&gt; Event next(0)
2017-02-02 21:03:14.489: subscribe #0 -&gt; Event next(0)
2017-02-02 21:03:14.572: sequence -&gt; Event next(1)
2017-02-02 21:03:14.573: refCount -&gt; Event next(1)
2017-02-02 21:03:14.573: subscribe #0 -&gt; Event next(1)
2017-02-02 21:03:14.576: subscribe #1 -&gt; subscribed
2017-02-02 21:03:14.576: refCount -&gt; subscribed
2017-02-02 21:03:14.673: sequence -&gt; Event next(2)
2017-02-02 21:03:14.673: refCount -&gt; Event next(2)
2017-02-02 21:03:14.673: subscribe #0 -&gt; Event next(2)
2017-02-02 21:03:14.673: refCount -&gt; Event next(2)
2017-02-02 21:03:14.673: subscribe #1 -&gt; Event next(2)
2017-02-02 21:03:14.773: sequence -&gt; Event next(3)
2017-02-02 21:03:14.774: refCount -&gt; Event next(3)
2017-02-02 21:03:14.774: subscribe #0 -&gt; Event next(3)
2017-02-02 21:03:14.774: refCount -&gt; Event next(3)
2017-02-02 21:03:14.774: subscribe #1 -&gt; Event next(3)
2017-02-02 21:03:14.835: subscribe #0 -&gt; isDisposed
2017-02-02 21:03:14.835: refCount -&gt; isDisposed
2017-02-02 21:03:14.835: subscribe #1 -&gt; isDisposed
2017-02-02 21:03:14.835: refCount -&gt; isDisposed // отписался последний
2017-02-02 21:03:14.835: sequence -&gt; isDisposed // отписка от SO




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Если SO обычный, — конвертирует его в Connectable. После этого все, кто подпишутся на него после вызова connect(), мгновенно получат в качестве первых элементов последние сгенерированные N элементов. Даже если отпишутся все — Connectable будет продолжать генерировать элементы.



var disposeBag: DisposeBag? = DisposeBag()

example("\"replay\"") {
let sequence = getExternalObservable()
.replay(2)
let disposable1 = sequence
.debug("\tfirst\t")
.subscribe()
disposeBag?.insert(disposable1)
delay(0.5, closure: { _ in
let disposable2 = sequence
.debug("\tsecond\t")
.subscribe()
disposeBag?.insert(disposable2)
})
delay(0.3, closure: { _ in
sequence.connect()
})
delay(0.7, closure: { _ in
disposeBag = nil
})
}

fileprivate func getExternalObservable() -&gt; Observable {
let timer = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(10)
.shareReplay(1)
defer {
timer.debug("\ttimer\t").subscribe()
}
return timer
}

Консоль:

--- "replay" example ---
2017-02-02 21:36:30.914: timer -&gt; subscribed
2017-02-02 21:36:30.918: first -&gt; subscribed
2017-02-02 21:36:30.931: timer -&gt; Event next(0)
2017-02-02 21:36:31.016: timer -&gt; Event next(1)
2017-02-02 21:36:31.116: timer -&gt; Event next(2)
2017-02-02 21:36:31.216: timer -&gt; Event next(3)
2017-02-02 21:36:31.248: first -&gt; Event next(3)
2017-02-02 21:36:31.316: timer -&gt; Event next(4)
2017-02-02 21:36:31.317: first -&gt; Event next(4)
2017-02-02 21:36:31.416: timer -&gt; Event next(5)
2017-02-02 21:36:31.416: first -&gt; Event next(5)
2017-02-02 21:36:31.468: second -&gt; subscribed
2017-02-02 21:36:31.468: second -&gt; Event next(4) // мгновенно получил
2017-02-02 21:36:31.468: second -&gt; Event next(5) // мгновенно получил
2017-02-02 21:36:31.515: timer -&gt; Event next(6)
2017-02-02 21:36:31.516: first -&gt; Event next(6)
2017-02-02 21:36:31.516: second -&gt; Event next(6)
2017-02-02 21:36:31.616: timer -&gt; Event next(7)
2017-02-02 21:36:31.617: first -&gt; Event next(7)
2017-02-02 21:36:31.617: second -&gt; Event next(7)
2017-02-02 21:36:31.688: first -&gt; isDisposed // все отписались
2017-02-02 21:36:31.688: second -&gt; isDisposed // все отписались
2017-02-02 21:36:31.716: timer -&gt; Event next(8) // продолжают генерироваться
2017-02-02 21:36:31.816: timer -&gt; Event next(9) // продолжают генерироваться
2017-02-02 21:36:31.817: timer -&gt; Event completed
2017-02-02 21:36:31.817: timer -&gt; isDisposed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Если SO обычный, — конвертирует его в Connectable. Все, кто подпишутся на него, после вызова connect() получат сначала все элементы, которые были сгенерированы ранее. Даже если отпишутся все, Connectable будет продолжать генерировать элементы.



var disposeBag: DisposeBag? = DisposeBag()

example("\"replayAll\"") {
let sequence = getExternalObservable()
.replayAll()
let disposable1 = sequence
.debug("\tfirst\t")
.subscribe()
disposeBag?.insert(disposable1)
delay(0.5, closure: { _ in
let disposable2 = sequence
.debug("\tsecond\t")
.subscribe()
disposeBag?.insert(disposable2)
})
delay(0.3, closure: { _ in
sequence.connect()
})
delay(0.7, closure: { _ in
disposeBag = nil
})
}

fileprivate func getExternalObservable() -&gt; Observable {
let timer = Observable
.timer(0, period: 0.1, scheduler: MainScheduler.instance)
.take(10)
.shareReplay(1)
defer {
timer.debug("\ttimer\t").subscribe()
}
return timer
}

Консоль:

--- "replayAll" example ---
2017-02-02 21:39:27.174: timer -&gt; subscribed
2017-02-02 21:39:27.178: first -&gt; subscribed
2017-02-02 21:39:27.190: timer -&gt; Event next(0)
2017-02-02 21:39:27.276: timer -&gt; Event next(1)
2017-02-02 21:39:27.376: timer -&gt; Event next(2)
2017-02-02 21:39:27.476: timer -&gt; Event next(3)
2017-02-02 21:39:27.509: first -&gt; Event next(3) // 1й элемент
2017-02-02 21:39:27.576: timer -&gt; Event next(4)
2017-02-02 21:39:27.576: first -&gt; Event next(4) // 2й элемент
2017-02-02 21:39:27.675: timer -&gt; Event next(5)
2017-02-02 21:39:27.675: first -&gt; Event next(5) // 3й элемент
2017-02-02 21:39:27.731: second -&gt; subscribed // подписание
2017-02-02 21:39:27.731: second -&gt; Event next(3) // сразу получил 1й
2017-02-02 21:39:27.731: second -&gt; Event next(4) // сразу получил 2й
2017-02-02 21:39:27.731: second -&gt; Event next(5) // сразу получил 3й
2017-02-02 21:39:27.775: timer -&gt; Event next(6)
2017-02-02 21:39:27.775: first -&gt; Event next(6)
2017-02-02 21:39:27.775: second -&gt; Event next(6)
2017-02-02 21:39:27.875: timer -&gt; Event next(7)
2017-02-02 21:39:27.876: first -&gt; Event next(7)
2017-02-02 21:39:27.876: second -&gt; Event next(7)
2017-02-02 21:39:27.946: first -&gt; isDisposed // все отписались
2017-02-02 21:39:27.946: second -&gt; isDisposed // все отписались
2017-02-02 21:39:27.975: timer -&gt; Event next(8) // продолжают генерироваться
2017-02-02 21:39:28.076: timer -&gt; Event next(9) // продолжают генерироваться
2017-02-02 21:39:28.076: timer -&gt; Event completed
2017-02-02 21:39:28.076: timer -&gt; isDisposed

Вспомогательные методы



debug


RO полностью дублирует SO, но логируются все события с временной меткой.

example("\"debug\"") {
Observable
.of(1, 2, 3)
.debug("sequence")
.subscribe{}
}

Консоль:

--- "debug" example ---
2017-02-03 10:01:07.746: sequence -&gt; subscribed
2017-02-03 10:01:07.748: sequence -&gt; Event next(1)
2017-02-03 10:01:07.748: sequence -&gt; Event next(2)
2017-02-03 10:01:07.748: sequence -&gt; Event next(3)
2017-02-03 10:01:07.748: sequence -&gt; Event completed
2017-02-03 10:01:07.748: sequence -&gt; isDisposed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




RO полностью дублирует SO, но мы встраиваем перехватчик всех событий из жизненного цикла SO.

example("\"do\"") {
Observable
.of(1, 2, 3)
.do(onNext: { value in
print("do(\(value))")
})
.subscribe({ event in
print(event)
})
}

Консоль:

--- "do" example ---
do(1)
next(1)
do(2)
next(2)
do(3)
next(3)
completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Дублирует элементы из SO в RO, но с временной задержкой, указанной в качестве параметра.



example("\"delaySubscription\"") {
var timer: Observable = getExternalObservable()
timer
.debug("\tnormal\t")
.subscribe()
timer
.delaySubscription(0.9, scheduler: MainScheduler.instance)
.debug("\tdelayed\t")
.subscribe()
}

private func getExternalObservable() -&gt; Observable {
let observable = Observable
.timer(0, period: 0.2, scheduler: MainScheduler.instance)
.take(3)
defer {
observable.subscribe()
}
return observable
}

Консоль:

--- "delaySubscription" example ---
2017-02-03 10:58:13.985: normal -&gt; subscribed
2017-02-03 10:58:13.989: delayed -&gt; subscribed
2017-02-03 10:58:13.999: normal -&gt; Event next(0)
2017-02-03 10:58:14.187: normal -&gt; Event next(1)
2017-02-03 10:58:14.387: normal -&gt; Event next(2)
2017-02-03 10:58:14.387: normal -&gt; Event completed
2017-02-03 10:58:14.387: normal -&gt; isDisposed
2017-02-03 10:58:14.890: delayed -&gt; Event next(0)
2017-02-03 10:58:15.091: delayed -&gt; Event next(1)
2017-02-03 10:58:15.290: delayed -&gt; Event next(2)
2017-02-03 10:58:15.291: delayed -&gt; Event completed
2017-02-03 10:58:15.291: delayed -&gt; isDisposed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Указывает, на каком Scheduler должен выполнять свою работу Observer, особенно критично при работе с GUI.

example("\"observeOn\"") {
DispatchQueue.global(qos: .background).async {
Observable
.of(1, 2, 3)
.observeOn(MainScheduler.instance)
.subscribe({ event in
print("Main: \(Thread.isMainThread)\t\tEvent: \(event)")
})
Observable
.of("A", "B", "C")
.subscribe({ event in
print("Main: \(Thread.isMainThread)\t\tEvent: \(event)")
})
}
}

Консоль:

--- "observeOn" example ---
Main: false Event: next(A)
Main: false Event: next(B)
Main: true Event: next(1)
Main: true Event: next(2)
Main: true Event: next(3)
Main: true Event: completed
Main: false Event: next(C)
Main: false Event: completed

Как видно, благодаря observeOn мы смогли выполнить код внутри subscribe на другом потоке, хотя оба Observable были запущены на background.




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Оператор, связывающий Observable с Observer, позволяет подписаться на все события из Observable.

example("\"subscribe\" with failure") {
observable(failing: true)
}

example("\"subscribe\" with success") {
observable(failing: false)
}

func observable(failing: Bool) {
var observable: Observable!
switch failing {
case true: observable = Observable.error(RxError.argumentOutOfRange)
case false: observable = Observable.of(1)
}
observable
.subscribe(onNext: { value in
print("next: \(value)")
}, onError: { error in
print("error: \(error)")
}, onCompleted: {
print("completed")
}, onDisposed: {
print("disposed")
})
}

Консоль:

--- "subscribe" with failure example ---
error: Argument out of range.
disposed

--- "subscribe" with success example ---
next: 1
completed
disposed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Указывает, на каком Scheduler выполнять подписку на Observable. Редко используемый оператор. Для получения колбэков на нужном Scheduler следует пользоваться observeOn.

example("\"subscribeOn\"") {
DispatchQueue.global(qos: .background).async {
Observable
.of(1, 2, 3)
.subscribeOn(MainScheduler.instance)
.subscribe({ event in
print("Main: \(Thread.isMainThread)\t\tEvent: \(event)")
})
Observable
.of("A", "B", "C")
.subscribe({ event in
print("Main: \(Thread.isMainThread)\t\tEvent: \(event)")
})
}
}

Консоль:

--- "subscribeOn" example ---
Main: true Event: next(1)
Main: true Event: next(2)
Main: true Event: next(3)
Main: true Event: completed
Main: false Event: next(A)
Main: false Event: next(B)
Main: false Event: next(C)
Main: false Event: completed




Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Дублирует элементы из SO в RO, но если в течение указанного времени SO не сгенерировало ни одного элемента, RO генерирует ошибку.



delay(0.0) { _ in
example("\"timeout\" with success") {
observable(timeout: 0.3, delay: 0.1)
.debug("\tsuccess\t")
.subscribe()
}
}

delay(0.5) { _ in
example("\"timeout\" with failure") {
observable(timeout: 0.3, delay: 0.4)
.debug("\tfailure\t")
.subscribe()
}
}

func observable(timeout: RxTimeInterval, delay: RxTimeInterval) -&gt; Observable {
return Observable
.timer(delay, period: 0.1, scheduler: MainScheduler.instance)
.take(1)
.timeout(timeout, scheduler: MainScheduler.instance)
}

Консоль:

--- "timeout" with success example ---
2017-02-03 12:01:31.344: success -&gt; subscribed
2017-02-03 12:01:31.447: success -&gt; Event next(0)
2017-02-03 12:01:31.447: success -&gt; Event completed
2017-02-03 12:01:31.447: success -&gt; isDisposed

--- "timeout" with failure example ---
2017-02-03 12:01:31.856: failure -&gt; subscribed
2017-02-03 12:01:32.157: failure -&gt; Event error(Sequence timeout.)
2017-02-03 12:01:32.157: failure -&gt; isDisposed


using


Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.




Позволяет проинструктировать Observable создать ресурс, который будет жить лишь пока жив RO, в качестве параметров передаются 2 фабрики, одна генерирует ресурс, вторая — Observable из ресурса, у которых будет единое время жизни.

example("\"using\"") {
Observable
.of(1,3)
.flatMap { value -&gt; Observable in
return Observable
.using(resourceFactory(value), observableFactory: observableFactory())
}
.subscribe({ event in
print(event)
})
}

class Factory: Disposable {

let value: Int

init(_ value: Int) {
self.value = value
}

var observable: Observable {
let array = Array(repeating: value, count: value)
return Observable.from(array)
}

func dispose() {
print("Factory(\(value)) disposed")
}
}

func resourceFactory(_ value: Int) -&gt; (() -&gt; Factory) {
func resource() -&gt; Factory {
return Factory(value)
}
return resource
}

func observableFactory() -&gt; ((Factory) -&gt; Observable) {
func observable(_ resource: Factory) -&gt; Observable {
return resource.observable
}
return observable
}

Консоль:

--- "using" example ---
next(1)
Factory(1) disposed
next(3)
next(3)
next(3)
Factory(3) disposed
completed

Как видно, после того, как Observable закончил генерировать элементы, у нашего ресурса Factory был вызван метод dispose.




За материал выражаем благодарность

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

.


Запись

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

впервые появилась

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

.
 
Вверх