ReactiveX 是一个库,用于通过使用可观察序列来编写异步的、基于事件的程序。

它扩展了观察者模式以支持数据、事件序列,并添加了允许你以声明方式组合序列的操作符,同时抽象对低层线程、同步、线程安全等。

本文主要作为 RxSwift 的入门文章,对 RxSwift 中的一些基础内容、常用实践,做些介绍。

Observables aka Sequences

Basics

观察者模式(Observable(Element>)和正常序列(Sequence)的等价性对于理解 Rx 是相当重要的。

每个 Observable 序列知识一个序列。Observable 与 Swift 的 Sequence 相比,其主要优点是可以异步接收元素。这是 RxSwift 的核心。

  • Observable(Observable) 与 Sequence 等价
  • Observable.subscribe 方法与 Sequence.makeIterator方法等价
  • Observer(callback)需要被传递到 Observable.subscribe 方法来接受序列元素,而不是在返回的 iterator 上调用 next() 方法

Sequence 是一个简单、熟悉的概念,很容易可视化。

人是具有巨大视觉皮层的生物。当我们可以轻松地想想一个概念时,理解它就容易多了。

我们可以通过尝试模拟每个Rx操作符内的事件状态机到序列上的高级别操作来接触认知负担。

如果我们不使用 Rx 而是使用模型异步系统(model asynchronous systems),这可能意味着我们的代码会充满状态机和瞬态,这些正式我们需要模拟的,而不是抽象。

ListSequence 可能是数学家和程序员首先学习的概念之一。

这是一个数字的序列:

--1--2--3--4--5--6--|   // 正常结束

另一个字符序列:

--a--b--a--a--a---d---X     // terminates with error

一些序列是有限的,而一些序列是无限的,比如一个按钮点击的列:

---tap-tap-------tap--->

这些被叫做 marble diagram。可以在rxmarbles.com了解更多的 marble diagram。

如果我们将序列愈发指定为正则表达式,它将如下所示:

next*(error | completed)?

这描述了以下内容:

  • Sequence 可以有 0 个 或者多个元素
  • 一旦收到 errorcompleted 事件,这个 Sequence 就不能再产生其他元素

在 Rx 中, Sequence 被描述为一个 push interface(也叫做 callbak)。

enum Event<Element>  {
    case next(Element)      // next element of a sequence
    case error(Swift.Error) // sequence failed with error
    case completed          // sequence terminated successfully
}

class Observable<Element> {
    func subscribe(_ observer: Observer<Element>) -> Disposable
}

protocol ObserverType {
    func on(_ event: Event<Element>)
}

当序列发送 errorcompleted 事件时,将释放计算序列元素的所有内部资源。

要立即取消序列元素的生成,并释放资源,可以在返回的订阅(subscription)上调用 dispose

如果一个序列在有限时间内结束,则不调用 dispose 或者不使用 disposed(by: disposeBag) 不会造成任何永久性资源泄漏。但是,这些资源会一直被使用,直到序列完成(完成产生元素,或者返回一个错误)。

如果一个序列没有自行终止,比如一系列的按钮点击,资源会被永久分配,直到 dispose 被手动调用(在 disposeBag 内调用,使用 takeUntil 操作符,或者其他方式)。

使用 dispose bag 或者 takeUtil 操作符是一个确保资源被清除的鲁棒(robust)的方式。即使序列将在有限时间内终止,我们也推荐在生产环境中使用它们。

Disposing

被观察的序列(observed sequence)有另一种终止的方式。当我们使用完一个序列并且想要释放分配用于计算即将到来的元素的所有资源时,我们可以在一个订阅上调用 dispose

这时一个使用 interval 操作符的例子:

let scheduler = SerialDispatchQueueScheduler(qos: .default)
let subscription = Observable<Int>.interval(0.3, scheduler: scheduler)
    .subscribe { event in
        print(event)
    }

Thread.sleep(forTimeInterval: 2.0)

subscription.dispose()

上边的例子打印:

0
1
2
3
4
5

注意,你通常不希望调用 dispose,这知识一个例子。手动调用 dispose 通常是一种糟糕的代码味道。dispose 订阅有更好的方式,比如 DisposeBagtakeUntil操作符、或者一些其他的机制。

那么,上边的代码是否可以在 dispose 被执行后,打印任何东西?答案是,是情况而定。

  • 如果上边的 scheduler 是串行调度器(serial scheduler),比如 MainSchedulerdispose 在相同的串行调度器上调用,那么答案就是 no。
  • 否则,答案是 yes。

你仅仅有两个过程在并行执行。

  • 一个在产生元素
  • 另一个 dispose 订阅

“可以在之后打印某些内容吗?”这个问题,在这两个过程在不同调度上执行的情况下甚至没有意义。

如果我们的代码是这样的:

let subscription = Observable<Int>.interval(0.3, scheduler: scheduler)
            .observeOn(MainScheduler.instance)
            .subscribe { event in
                print(event)
            }

// ....

subscription.dispose() // called from main thread

dispose 调用返回后,不会打印任何东西。

同样,在这个例子中:

let subscription = Observable<Int>.interval(0.3, scheduler: scheduler)
            .observeOn(serialScheduler)
            .subscribe { event in
                print(event)
            }

// ...

subscription.dispose() // executing on same `serialScheduler`

dispose 调用返回后,也不会打印任何东西。

Dispose Bag

当一个 DisposeBag 被释放时,它会在每一个可被 dispose 的对象(disposables)上调用 dispose

它没有 dispose 方法,因此不允许故意显式地调用 dispose。如果需要立即清理,我们可以创建一个新的 DisposeBag

self.disposeBag = DisposeBag()

这将清除旧的引用,并引起资源清理。

如果仍然需要手动清理,可以使用 CompositeDisposable。它具有所需的行为,但一旦调用了 dispose 方法,它将立即处理任何新添加可被dispose的对象(disposable)。

Take until

另一种在 dealloc 时自动处理(dispose)订阅的方式是使用 takeUtil 操作符。

sequence
    .takeUntil(self.rx.deallocated)
    .subscribe {
        print($0)
    }

Implicit Observable guarantees

还有一些额外的保证,所有的序列产生者(sequence producers、Observable s),必须遵守.

它们在哪一个线程上产生元素无关紧要,但如果它们生成一个元素并发送给观察者observer.on(.next(nextElement)),那么在 observer.on 方法执行完成前,它们不能发送下一个元素。

如果 .next 事件还没有完成,那么生产者也不能发送终止 .completed.error

简而言之,考虑以下示例:

someObservable
  .subscribe { (e: Event<Element>) in
      print("Event processing started")
      // processing
      print("Event processing ended")
  }

它始终打印:

Event processing started
Event processing ended
Event processing started
Event processing ended
Event processing started
Event processing ended

它永远无法打印:

Event processing started
Event processing started
Event processing ended
Event processing ended

Creating your own Observable (aka observable sequence)

关于观察者有一个重要的事情需要理解。

创建 observable 时,它不会仅仅因为它已创建而执行任何工作。

确实,Observable 可以通过多种方式产生元素。其中一些会导致副作用,一些会影响现有的运行过程,例如点击鼠标事件等。

但是,如果只调用一个返回 Observable 的方法,那么没有序列生成,也没有副作用。Observable仅仅定义序列的生成方法以及用于元素生成的参数。序列生成始于 subscribe 方法被调用。

例如,假设你有一个类似原型的方法:

func searchWikipedia(searchTerm: String) -> Observable<Results> {}
let searchForMe = searchWikipedia("me")

// no requests are performed, no work is being done, no URL requests were fired

let cancel = searchForMe
  // sequence generation starts now, URL requests are fired
  .subscribe(onNext: { results in
      print(results)
  })

有许多方法可以生成你自己的 Observable 序列,最简单方法或许是使用 create 函数。

RxSwift 提供了一个方法可以创建一个序列,这个序列订阅时返回一个元素。这个方法是 just。我们亲自实现一下:

func myJust<E>(_ element: E) -> Observable<E> {
    return Observable.create { observer in
        observer.on(.next(element))
        observer.on(.completed)
        return Disposables.create()
    }
}

myJust(0)
    .subscribe(onNext: { n in
      print(n)
    })

这会打印:

0

不错,create 函数是什么?

它只是一个便利方法,使你可以使用 Swift 的闭包轻松实现 subscribe 方法。像 subscribe 方法一样,它带有一个参数 observer,并返回 disposable。

以这种方式实现的序列实际上是同步的(synchronous)。它将生成元素,并在 subscribe 调用返回 disposable 表示订阅前终止。因此,它返回的 disposable 并不重要,生成元素的过程不会被中断。

当生成同步序列,通常用于返回的 disposable 是 NopDisposable 的单例。

现在,我们来创建一个从数组中返回元素的 observable。

func myFrom<E>(_ sequence: [E]) -> Observable<E> {
    return Observable.create { observer in
        for element in sequence {
            observer.on(.next(element))
        }

        observer.on(.completed)
        return Disposables.create()
    }
}

let stringCounter = myFrom(["first", "second"])

print("Started ----")

// first time
stringCounter
    .subscribe(onNext: { n in
        print(n)
    })

print("----")

// again
stringCounter
    .subscribe(onNext: { n in
        print(n)
    })

print("Ended ----")

上边的例子会打印:

Started ----
first
second
----
first
second
Ended ----

Creating an Observable that perfroms work

OK,现在更有趣了。我们来创建前边示例中使用的 interval 操作符。

这相当于 dispatch queue schedulers 的实际实现

func myInterval(_ interval: TimeInterval) -> Observable<Int> {
    return Observable.create { observer in
        print("Subscribed")
        let timer = DispatchSource.makeTimerSource(queue: DispatchQueue.global())
        timer.scheduleRepeating(deadline: DispatchTime.now() + interval, interval: interval)

        let cancel = Disposables.create {
            print("Disposed")
            timer.cancel()
        }

        var next = 0
        timer.setEventHandler {
            if cancel.isDisposed {
                return
            }
            observer.on(.next(next))
            next += 1
        }
        timer.resume()

        return cancel
    }
}
let counter = myInterval(0.1)

print("Started ----")

let subscription = counter
    .subscribe(onNext: { n in
        print(n)
    })


Thread.sleep(forTimeInterval: 0.5)

subscription.dispose()

print("Ended ----")

上边的示例会打印:

Started ----
Subscribed
0
1
2
3
4
Disposed
Ended ----

如果这样写:

let counter = myInterval(0.1)

print("Started ----")

let subscription1 = counter
    .subscribe(onNext: { n in
        print("First \(n)")
    })
let subscription2 = counter
    .subscribe(onNext: { n in
        print("Second \(n)")
    })

Thread.sleep(forTimeInterval: 0.5)

subscription1.dispose()

Thread.sleep(forTimeInterval: 0.5)

subscription2.dispose()

print("Ended ----")

那么打印如下:

Started ----
Subscribed
Subscribed
First 0
Second 0
First 1
Second 1
First 2
Second 2
First 3
Second 3
First 4
Second 4
Disposed
Second 5
Second 6
Second 7
Second 8
Second 9
Disposed
Ended ----

订阅后的每个订阅者(subscriber)同行会生成自己独立的元素序列。默认情况下,操作符是无状态的。无状态的操作符远多于有状态的操作符。

Sharing subscription and share operator

但是,如果你希望多个观察者从一个订阅共享事件(元素),该怎么办?

有两件事需要定义:

  • 如何处理在新订阅者有兴趣观察它们之前收到的过去的元素(replay lastest only, replay all, replay last n)
  • 如何决定何时出发共享的订阅(refCount, manual or some other algorithm)

通常是一个这样的组合,replay(1).refCount,也就是 share(replay: 1)

let counter = myInterval(0.1)
    .share(replay: 1)

print("Started ----")

let subscription1 = counter
    .subscribe(onNext: { n in
        print("First \(n)")
    })
let subscription2 = counter
    .subscribe(onNext: { n in
        print("Second \(n)")
    })

Thread.sleep(forTimeInterval: 0.5)

subscription1.dispose()

Thread.sleep(forTimeInterval: 0.5)

subscription2.dispose()

print("Ended ----")

这将打印:

Started ----
Subscribed
First 0
Second 0
First 1
Second 1
First 2
Second 2
First 3
Second 3
First 4
Second 4
First 5
Second 5
Second 6
Second 7
Second 8
Second 9
Disposed
Ended ----

请注意现在只有一个 Subscribed 和 Disposed 事件。

对 URL 可观察对象(observable)的行为是等效的。

下面的例子展示了如何的 HTTP 请求封装在 Rx 中,这种封装非常像 interval 操作符的模式。

extension Reactive where Base: URLSession {
    public func response(_ request: URLRequest) -> Observable<(Data, HTTPURLResponse)> {
        return Observable.create { observer in
            let task = self.dataTaskWithRequest(request) { (data, response, error) in
                guard let response = response, let data = data else {
                    observer.on(.error(error ?? RxCocoaURLError.Unknown))
                    return
                }

                guard let httpResponse = response as? HTTPURLResponse else {
                    observer.on(.error(RxCocoaURLError.nonHTTPResponse(response: response)))
                    return
                }

                observer.on(.next(data, httpResponse))
                observer.on(.completed)
            }

            task.resume()

            return Disposables.create {
                task.cancel()
            }
        }
    }
}

Operator

RxSwift 实现了许多操作符。

所有操作符的的 marble diagram 可以在 ReactiveX.io 看到。

Playgrouds 里边几乎有所有操作符的演示。

如果你需要一个操作符,并且不知道如何找到它,这里有一个操作符的决策树

Custom operators

有两种方式可以创建自定义的操作符。

Easy way

所有的内部代码都使用高度优化的运算符版本,因此它们不是最好的教程材料。这就是为什么我们非常鼓励使用标准运算符。

幸运的是,有一种简单的方法来创建操作符。创建新的操作符实际上就是创建可观察对象,前边的章节已经描述了如何做到这一点。

来看一下为优化的 map 操作符的实现:

extension ObservableType {
    func myMap<R>(transform: @escaping (E) -> R) -> Observable<R> {
        return Observable.create { observer in
            let subscription = self.subscribe { e in
                    switch e {
                    case .next(let value):
                        let result = transform(value)
                        observer.on(.next(result))
                    case .error(let error):
                        observer.on(.error(error))
                    case .completed:
                        observer.on(.completed)
                    }
                }

            return subscription
        }
    }
}


现在可以使用自定义的 map 了:

let subscription = myInterval(0.1)
    .myMap { e in
        return "This is simply \(e)"
    }
    .subscribe(onNext: { n in
        print(n)
    })

这将打印:

Subscribed
This is simply 0
This is simply 1
This is simply 2
This is simply 3
This is simply 4
This is simply 5
This is simply 6
This is simply 7
This is simply 8
...

Life happens

那么,如果用自定义运算符解决某些情况太难了呢? 你可以退出 Rx monad,在命令性世界中执行操作,然后使用 Subjects 再次将结果隧道传输到Rx。

下边的例子是不应该被经常实践的,是糟糕的代码味道,但是你可以这么做。

let magicBeings: Observable<MagicBeing> = summonFromMiddleEarth()

magicBeings
  .subscribe(onNext: { being in     // exit the Rx monad
      self.doSomeStateMagic(being)
  })
  .disposed(by: disposeBag)

//
//  Mess
//
let kitten = globalParty(   // calculate something in messy world
  being,
  UIApplication.delegate.dataSomething.attendees
)
kittens.on(.next(kitten))   // send result back to rx

//
// Another mess
//
let kittens = BehaviorRelay(value: firstKitten) // again back in Rxmonad
kittens.asObservable()
  .map { kitten in
    return kitten.purr()
  }
  // ....

每一次你这样写的时候,其他人可能在其他地方写这样的代码:

kittens
  .subscribe(onNext: { kitten in
    // do something with kitten
  })
  .disposed(by: disposeBag)

所以,不要尝试这么做。

Error handling

有两种错误机制。

Asynchrouous error handling mechanism in observables

错误处理非常直接,如果一个序列以错误而终止,则所有依赖的序列都将以错误而终止。这是通常的短路逻辑。

你可以使用 catch 操作符从可观察对象的失败中恢复,有各种各样的可以让你详细指定恢复。

还有 retry 操作符,可以在序列出错的情况下重试。

KVO

KVO 是一个 Objective-C 的机制。这意味着他没有考虑类型安全,该项目试图解决这个问题的一部分。

有两种内置的方式支持 KVO:

// KVO
extension Reactive where Base: NSObject {
    public func observe<E>(type: E.Type, _ keyPath: String, options: KeyValueObservingOptions, retainSelf: Bool = true) -> Observable<E?> {}
}

#if !DISABLE_SWIZZLING
// KVO
extension Reactive where Base: NSObject {
    public func observeWeakly<E>(type: E.Type, _ keyPath: String, options: KeyValueObservingOptions) -> Observable<E?> {}
}
#endif

看一下观察 UIView 的 frame 的例子,注意 UIKit 并不遵从 KVO,但是这样可以

view
  .rx.observe(CGRect.self, "frame")
  .subscribe(onNext: { frame in
    ...
  })

view
  .rx.observeWeakly(CGRect.self, "frame")
  .subscribe(onNext: { frame in
    ...
  })

rx.observe

rx.observe 有更好的性能,因为它知识对 KVO 机制的包装,但是它使用场景有限。

  • 它可用于观察从所有权图表中的self或祖先开始的 path(retainSelf = false)

  • 它可用于观察从所有权图中的后代开始的 path(retainSelf = true)

  • path 必须只包含 strong 属性,否则你可能会因为在 dealloc 之前没有取消注册KVO观察者而导致系统崩溃。

例如:

self.rx.observe(CGRect.self, "view.frame", retainSelf: false)

rx.observeWeakly

rx.observeWeaklyrx.observe 慢一些,因为它必须在若引用的情况下处理对象释放。

它不仅适用于 rx.observe 适用的所有场景,还适用于:

  • 因为它不会持有被观察的对象,所以它可以用来观察所有权关系位置的任意对象
  • 它可以用来观察 weak 属性

Observing structs

KVO 是 Objective-C 的机制,所以它重度以来 NSValue

RxCocoa 内置支持 KVO 观察 CGRectCGSizeCGPoint 结构体。

当观察其他结构体时,需要手动从 NSValue 中提前值。

这里有展示如何通过实现 KVORepresentable 协议,为其他的结构体扩展 KVO 观察和 *rx.observe**方法。

UI layer tips

在绑定到 UIKit 控件时,Observable 需要在 UI 层中满足某些要求。

Threading

Observable 需要在 MainScheduler 发送值,这只是普通的 UIKit/Cocoa 要求。

你的 API 最好在 MainScheduler 上返回结果。如果你试图从后台线程绑定一些东西到 UI,在 Debug build 中,RxCocoa 通常会抛出异常来通知你。

可以通过添加 observeOn(MainScheduler.instance) 来修复该问题。

Error

你无法将失败绑定到 UIKit 控件,因为这是为定义的行为。

如果你不知道 Observable 是否可以失败,你可以通过使用 catchErrorJustReturn(valueThatIsReturnedWhenErrorHappens) 来确保它不会失败,但是错误发生后,基础序列仍将完成。

如果所需行为是基础序列继续产生元素,则需要某些版本的 retry 操作符。

Sharing subscription

你通常希望在 UI 层中共享订阅,你不希望单独的 HTTP 调用将相同的数据绑定到多个 UI 元素。

假设你有这样的代码:

let searchResults = searchText
    .throttle(0.3, $.mainScheduler)
    .distinctUntilChanged
    .flatMapLatest { query in
        API.getSearchResults(query)
            .retry(3)
            .startWith([]) // clears results on new search term
            .catchErrorJustReturn([])
    }
    .share(replay: 1)    // <- notice the `share` operator

你通常想要的是在计算后共享搜索结果,这就是 share 的含义。

在 UI 层中,在转换链的末尾添加 share 通常是一个很好的经验法则。因为你想要共享计算结果,而不是把 searcResults 绑定到多个 UI 元素时,触发多个 HTTP 连接。

另外,请参阅 Driver,它旨在透明地包装这些 share 调用,确保在主 UI 县城上观察元素,并且不会将错误绑定到 UI。


原文为RxSwift/Getting Started,本文在原文基础上依自身需要略有修改。

10-08 16:22