RxJava是一个神奇的框架,用法很简单,但内部实现有点复杂,代码逻辑有点绕。我读源码时,确实有点似懂非懂的感觉。网上关于RxJava源码分析的文章,源码贴了一大堆,代码逻辑绕来绕去的,让人看得云里雾里的。既然用拆轮子的方式来分析源码比较难啃,不如换种方式,以造轮子的方式,将源码中与性能、兼容性、扩展性有关的代码剔除,留下核心代码带大家揭秘RxJava 的实现原理。

什么是RxJava

  • • Rx是Reactive Extensions的简写,翻译为响应的扩展。也就是通过由一方发出信息,另一方响应信息并作出处理的核心框架代码。
  • • 该框架由微软的架构师Erik Meijer领导的团队开发,并在2012年11月开源。
  • • Rx库支持.NET、JavaScript和C++等,现在已经支持几乎全部的流行编程语言了。
  • • Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io
  • • RxJava作为一个流行的框架,其源码依托在GitHub,除了支持RxJava,针对安卓系统也除了一个支持框架RxAndroid

2.RxJava简化代码

一般我们在安卓项目中,如果想从后台获取数据并刷新界面,代码大概如下,下面我们来看一个例子:

new Thread() {

@Override

public void run() {

super.run();

for (File folder : folders) {

File[] files = folder.listFiles();

for (File file : files) {

if (file.getName().endsWith(".png")) {

final Bitmap bitmap = getBitmapFromFile(file);

getActivity().runOnUiThread(new Runnable() {

@Override

public void run() {

imageCollectorView.addImage(bitmap);

}

});

}

}

}

}

}.start();

上面的代码经过多层嵌套后 可读性太差了!如果你用了RxJava 可以这样写:

Observable.from(folders)

.flatMap(new Func1<File, Observable<File>>() {

@Override

public Observable<File> call(File file) {

return Observable.from(file.listFiles());

}

})

.filter(new Func1<File, Boolean>() {

@Override

public Boolean call(File file) {

return file.getName().endsWith(".png");

}

})

.map(new Func1<File, Bitmap>() {

@Override

public Bitmap call(File file) {

return getBitmapFromFile(file);

}

})

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Action1<Bitmap>() {

@Override

public void call(Bitmap bitmap) {

imageCollectorView.addImage(bitmap);

}

});

这样写的好处就是减少层次嵌套 提高了代码的可读性,除了简化代码,RxJava还可以为每个方法提供特定的运行线程。

3.引入框架

目前RxJava已经升级为2.0版本,但为了能够更好的理解RxJava,我们可以从1.0版本开始学习。也为了让我们的安卓项目能够更好的使用RxJava,可以在项目中引入gradle脚本依赖:

compile 'io.reactivex:rxandroid:1.2.1'

compile 'io.reactivex:rxjava:1.1.6'

现在 我们的项目已经支持RxJava的功能了。

4.响应式的核心

所谓的响应式,无非就是存在这样的2个部分,一部分负责发送事件/消息,另一部分负责响应事件/消息。

以前如果我们想看新闻,一般需要通过看报纸。比如,你对某个报刊杂志比较感兴趣,那么你首先要做3件事:

  1. 1. 提供你家的地址
  2. 2. 找到对应的报社
  3. 3. 去报社订阅整个月的报纸

经过了上面的流程,以后每天只要有新的报刊资料出来了,报社都会将杂志发送到你家。

史上最浅显易懂的RxJava入门教程-LMLPHP

将上面的例子进行代码抽象,步骤如下:

  1. 1. 提供观察者(因为你是关心杂志内容的人 所以你是观察该事件的人)
  2. 2. 提供被观察者(只要有新的杂志出来 就需要通知关心的人 所以报社是被观察的对象)
  3. 3. 订阅(也就是 观察者&被观察者之间要相互关联 以便被观察的对象一变化 就会马上通知观察该事件的对象)

史上最浅显易懂的RxJava入门教程-LMLPHP

上面示例的演示代码如下:

//1.创建被观察者

Observable<String> observable =

Observable.create(new Observable.OnSubscribe<String>() {

@Override

public void call(Subscriber<? super String> subscriber) {

//4.开始发送事件

//事件有3个类型 分别是onNext() onCompleted() onError()

//onCompleted() onError() 一般都是用来通知观察者 事件发送完毕了,两者只取其一。

subscriber.onNext("Hello Android !");

subscriber.onNext("Hello Java !");

subscriber.onNext("Hello C !");

subscriber.onCompleted();

}

});

//2.创建观察者

Subscriber<String> subscriber = new Subscriber<String>() {

@Override

public void onCompleted() {

Log.i(TAG, "onCompleted ");

}

@Override

public void onError(Throwable e) {

Log.i(TAG, "onError: "+e.getLocalizedMessage());

}

@Override

public void onNext(String s) {

Log.i(TAG, "onNext: "+s);

}

};

//3.订阅

observable.subscribe(subscriber);

输出如下:

com.m520it.rxjava I/IT520: onNext: Hello Android !

com.m520it.rxjava I/IT520: onNext: Hello Java !

com.m520it.rxjava I/IT520: onNext: Hello C !

com.m520it.rxjava I/IT520: onCompleted

代码运行的原理

  • • 上面的代码中,当观察者subscriber订阅了被观察者observable之后,系统会自动回调observable对象内部的call()。
  • • 在observable的call()方法实体中,发送了如onNext/onCompleted/onError事件后。
  • • 接着subscriber就能回调到到对应的方法。

5.被观察者变种

普通的Observable发送需要三个方法onNext, onError, onCompleted,而Single作为Observable的变种,只需要两个方法:

  • • onSuccess - Single发射单个的值到这个方法
  • • onError - 如果无法发射需要的值,Single发射一个Throwable对象到这个方法

Single只会调用这两个方法中的一个,而且只会调用一次,调用了任何一个方法之后,订阅关系终止。

final Single<String> single = Single.create(new Single.OnSubscribe<String>() {

@Override

public void call(SingleSubscriber<? super String> singleSubscriber) {

//先调用onNext() 最后调用onCompleted()

//singleSubscriber.onSuccess("Hello Android !");

//只调用onError();

singleSubscriber.onError(new NullPointerException("mock Exception !"));

}

});

Observer<String> observer = new Observer<String>() {

@Override

public void onCompleted() {

Log.i(TAG, "onCompleted ");

}

@Override

public void onError(Throwable e) {

Log.i(TAG, "onError: "+e.getLocalizedMessage());

}

@Override

public void onNext(String s) {

Log.i(TAG, "onNext: "+s);

}

};

single.subscribe(observer);

6.观察者变种

Observer观察者对象,上面我们用Subscriber对象代替。因为该对象本身就是继承了Observer。

该对象实现了onNext()&onCompleted()&onError()事件,我们如果对哪个事件比较关心,只需要实现对应的方法即可,代码如下:

//创建观察者

Subscriber<String> subscriber = new Subscriber<String>() {

@Override

public void onCompleted() {

Log.i(TAG, "onCompleted ");

}

@Override

public void onError(Throwable e) {

Log.i(TAG, "onError: "+e.getLocalizedMessage());

}

@Override

public void onNext(String s) {

Log.i(TAG, "onNext: "+s);

}

};

//订阅

observable.subscribe(subscriber);

上面的代码中,如果你只关心onNext()事件,但却不得不实现onCompleted()&onError()事件.这样的代码就显得很臃肿。鉴于这种需求,RxJava框架在订阅方面做了特定的调整,代码如下:

//为指定的onNext事件创建独立的接口

Action1<String> onNextAction = new Action1<String>() {

@Override

public void call(String s) {

Log.i(TAG, "call: "+s);

}

};

//订阅

observable.subscribe(onNextAction);

```

不知道大家注意到没有,subscribe()订阅的不再是观察者,而是特定的onNext接口对象。类似的函数如下,我们可以根据需要实现对应的订阅:

public Subscription subscribe(final Observer observer)

public Subscription subscribe(final Action1 onNext)

public Subscription subscribe(final Action1 onNext, Action1 onError)

public Subscription subscribe(final Action1 onNext, Action1 onError, Action0 onCompleted)

这里还有一个forEach函数有类似的功能:

public void forEach(final Action1 onNext)

public void forEach(final Action1 onNext, Action1 onError)

public void forEach(final Action1 onNext, Action1 onError, Action0 onComplete)

##7.Subject变种

上面2节中既介绍了被观察者变种,又介绍了观察者变种,这里再介绍一种雌雄同体的对象(既作为被观察者使用,也可以作为观察者)。

针对不同的场景一共有四种类型的Subject。他们并不是在所有的实现中全部都存在。

###AsyncSubject

一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。它会把这最后一个值发射给任何后续的观察者。

以下贴出代码:

//创建被观察者final AsyncSubject<String> subject = AsyncSubject.create();//创建观察者

Subscriber<String> subscriber = new Subscriber<String>() {

@Override

public void onCompleted() {

Log.i(TAG, "onCompleted");

}

@Override

public void onError(Throwable e) {

Log.i(TAG, "onError");

}

@Override

public void onNext(String s) {

Log.i(TAG, "s:" + s);

}

};//订阅事件

subject.subscribe(subscriber);//被观察者发出事件 如果调用onCompleted(),onNext()则会打印最后一个事件;如果没有,onNext()则不打印任何事件。

subject.onNext("Hello Android ");

subject.onNext("Hello Java ");

subject.onCompleted();

输出:

s:Hello Java onCompleted

然而,如果原始的Observable因为发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知。

上面的观察者被观察者代码相同,现在发出一系列信号,并在最后发出异常 代码如下:

subject.onNext("Hello Android ");

subject.onNext("Hello Java ");//因为发送了异常 所以onNext()无法被打印

subject.onError(null);

###BehaviorSubject

当观察者订阅BehaviorSubject时,他会将订阅前最后一次发送的事件和订阅后的所有发送事件都打印出来,如果订阅前无发送事件,则会默认接收构造器create(T)里面的对象和订阅后的所有事件,代码如下:

BehaviorSubject subject=BehaviorSubject.create("NROMAL");

Subscriber subscriber = new Subscriber() {

@Override

public void onCompleted() {

Log.i(TAG, "onCompleted");

}

@Override

public void onError(Throwable e) {

Log.i(TAG, "onError");

}

@Override

public void onNext(Object o) {

Log.i(TAG, "onNext: " + o);

}

};

//subject.onNext("Hello Android !");//subject.onNext("Hello Java !");//subject.onNext("Hello C !");//这里开始订阅 如果上面的3个注释没去掉,则Hello C的事件和订阅后面的事件生效//如果上面的三个注释去掉 则打印构造器NORMAL事件生效后和订阅后面的事件生效

subject.subscribe(subscriber);

subject.onNext("Hello CPP !");

subject.onNext("Hello IOS !");

PublishSubject

PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。

需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据,因此这里有一个风险:在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。

代码如下:

PublishSubject subject= PublishSubject.create();

Action1<String> onNextAction1 = new Action1<String>(){

@Override

public void call(String s) {

Log.i(TAG, "onNextAction1 call: "+s);

}

};

Action1<String> onNextAction2 = new Action1<String>(){

@Override

public void call(String s) {

Log.i(TAG, "onNextAction2 call: "+s);

}

};

subject.onNext("Hello Android !");

subject.subscribe(onNextAction1);

subject.onNext("Hello Java !");

subject.subscribe(onNextAction2);

subject.onNext("Hello IOS !");

输出如下:

onNextAction1 call: Hello Java !

onNextAction1 call: Hello IOS !

onNextAction2 call: Hello IOS !

ReplaySubject

ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。

代码如下:

ReplaySubject subject= ReplaySubject.create();

Action1<String> onNextAction1 = new Action1<String>(){

@Override

public void call(String s) {

Log.i(TAG, "onNextAction1 call: "+s);

}

};

Action1<String> onNextAction2 = new Action1<String>(){

@Override

public void call(String s) {

Log.i(TAG, "onNextAction2 call: "+s);

}

};

subject.onNext("Hello Android !");

subject.subscribe(onNextAction1);

subject.onNext("Hello Java !");

subject.subscribe(onNextAction2);

subject.onNext("Hello IOS !");

输出如下:

onNextAction1 call: Hello Android !

onNextAction1 call: Hello Java !

onNextAction2 call: Hello Android !

onNextAction2 call: Hello Java !

onNextAction1 call: Hello IOS !

onNextAction2 call: Hello IOS !

###Subject总结

AsyncSubject无论何时订阅 只会接收最后一次onNext()事件,如果最后出现异常,则不会打印任何onNext()

BehaviorSubject会从订阅前最后一次oNext()开始打印直至结束。如果订阅前无调用onNext(),则调用默认creat(T)传入的对象。如果异常后才调用,则不打印onNext()

PublishSubject只会打印订阅后的任何事件。

ReplaySubject无论订阅在何时都会调用发送的事件。

好了,对于RxJava整个入门文章到这里就完全结束了,现在再来回看RxJava,你会发现,它就是在观察者模式的骨架下,通过丰富的操作符和便捷的异步操作来完成对于复杂业务的处理。

05-02 21:55