Android Rx基础介绍
01.Rx基础介绍
1.1 Rx简单定义
- 简单解释
- RxJava 是一个基于事件流、实现异步操作的库
- GitHub官方解释
- 代码地址:https://github.com/ReactiveX/RxJava
- 官方解释
- RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
- It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
- 翻译如下,借助有道词典
- RxJava是反应性扩展的JavaVM实现:一个通过使用可观察序列组合异步和基于事件的程序的库。
- 它扩展了观察者模式以支持数据/事件序列,并添加了允许您以声明方式组合序列的操作符,同时抽象出对诸如低级别线程、同步、线程安全和并发数据结构等问题的关注。
1.2 RxJava三个基本元素
在 RxJava 中,一个实现了
Observer
接口的对象可以订阅一个Observable
类的实例。- 订阅者对
Observable
发射的任何数据或数据序列作出响应。这种模式简化了并发操作,因为它不需要阻塞等待Observable
发射数据,而是创建了一个处于待命状态的观察者哨兵,哨兵在未来某个时刻响应Observable
的通知。 - RxJava 提供了一套异步编程的 API,并且支持链式调用,所以使用RxJava编写的代码的逻辑会非常简洁
- 订阅者对
RxJava 有以下三个最基本的元素:
- 1.被观察者(Observable)
- 2.观察者(Observer)
- 3.订阅(subscribe)
Rxjava的扩展观察者模式中有4个角色:
1
2
3
4
5角色 作用 类比
被观察者(Observable) 产生事件 顾客
观察者(Observer) 接收事件,并给出响应动作 厨房
订阅(Subscribe) 连接 被观察者 & 观察者 服务员
事件(Event) 被观察者 & 观察者 沟通的载体 菜式- 举例说明
1
2
3
4
5角色 作用 类比
被观察者(Observable) 产生事件 公众号
观察者(Observer) 接收事件,并给出响应动作 程序员
订阅(Subscribe) 连接 被观察者 & 观察者 关注公众号
事件(Event) 被观察者 & 观察者 沟通的载体 内容文章- 一个微信公众号会不断产生新的内容,如果我们对这个微信公众号的内容感兴趣,就会订阅这个公众号,当公众号有新内容时,就会推送给我们。我们收到新内容时,如果是我们感兴趣的,就会点进去看下;如果是广告的话,就可能直接忽略掉。这就是我们生活中遇到的典型的观察者模式。
- 微信公众号就是一个被观察者(Observable),不断的产生内容(事件),而程序员就是一个观察者(Observer) ,通过订阅(subscribe)就能够接受到微信公众号(被观察者)推送的内容(事件),接收文章(事件)后处理消息【可以看,也可以忽略】。
创建被观察者
1
2
3
4
5
6
7
8
9
10
11Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) {
Log.e(TAG, "subscribe");
Log.e(TAG, "currentThread name: " + Thread.currentThread().getName());
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
});创建观察者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21Observer<Integer> observer = new Observer<Integer>() {
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
public void onComplete() {
Log.e(TAG, "onComplete");
}
};完成观察者与被观察者之间的订阅关系
1
observable.subscribe(observer);
也可以以链式调用的方式来完成订阅
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31Observable.create(new ObservableOnSubscribe<Integer>() {
public void subscribe(ObservableEmitter<Integer> e) {
Log.e(TAG, "subscribe");
Log.e(TAG, "currentThread name: " + Thread.currentThread().getName());
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}).subscribe(new Observer<Integer>() {
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
public void onComplete() {
Log.e(TAG, "onComplete");
}
});最终的输出结果是一样的
1
2
3
4
5
6
7onSubscribe
subscribe
currentThread name: main
onNext: 1
onNext: 2
onNext: 3
onComplete被观察者发送的事件类型有以下几种
事件种类 作用 onNext() 发送该事件时,观察者会回调 onNext() 方法 onError() 发送该事件时,观察者会回调 onError() 方法,当发送该事件之后,其他事件将不会继续发送 onComplete() 发送该事件时,观察者会回调 onComplete() 方法,当发送该事件之后,其他事件将不会继续发送
2.了解一些基础概念
2.1 RxJava事件类型
- RxJava中的事件分为三种类型:Next事件、Complete事件和Error事件。如下所示:
- Next 常规事件 被观察者可以发送无数个Next事件,观察者也可以接受无数个Next事件
- Complete 结束事件 被观察者发送Complete事件后可以继续发送事件,观察者收到Complete事件后将不会接受其他任何事件
- Error 异常事件 被观察者发送Error事件后,其他事件将被终止发送,观察者收到Error事件后将不会接受其他任何事件
2.2 RxJava的消息订阅步骤
这里把消息订阅步骤拆分一下,流程如下
- 创建被观察者(Observable),定义要发送的事件。
- 创建观察者(Observer),接受事件并做出响应操作。
- 观察者通过订阅(subscribe)被观察者把它们连接到一起。
代码如下所示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35//1.创建被订阅者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
}
});
//1.1线程调度
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
//2.创建订阅者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
//3.观察者通过订阅与被观察者绑定一起
observable.subscribe(observer);
2.3 如何切断消息
如何切断消息
- 直接先上代码。要切断消息的传递很简单,调用下Disposable的dispose()方法即可。调用dispose()之后,被观察者虽然能继续发送消息,但是观察者却收不到消息。
1
2
3
4//切断链接操作
if (compositeDisposable != null && !compositeDisposable.isDisposed()) {
compositeDisposable.dispose()
}那么如何关联消息呢
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21private val compositeDisposable: CompositeDisposable by lazy {
CompositeDisposable()
}
val disposable: Disposable = model.getHomeList(page)
//网络请求在子线程,所以是在io线程,避免阻塞线程
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe ({ bean ->
LogUtils.e("getHomeList-----"+"onNext")
mView.setDataView(bean)
}, { t ->
LogUtils.e("getHomeList-----"+"onError"+t.localizedMessage)
if(NetworkUtils.isConnected()){
mView.setDataErrorView()
}else{
mView.setNetWorkErrorView()
}
}
)
compositeDisposable.add(disposable)
2.4 Rx调度/切换
2.4.1 主要作用
- 指定 被观察者Observable/ 观察者Observer的工作线程类型。
2.4.2 为什么要进行RxJava线程控制(调度 / 切换)
- 若被观察者 (Observable) / 观察者(Observer)在主线程被创建,那么他们的工作(生产事件 / 接收& 响应事件)就会发生在主线程
- 因为创建被观察者 (Observable) / 观察者(Observer)的线程 = 主线程
- 所以生产事件 / 接收& 响应事件都发生在主线程
2.4.3 冲突
- 对于一般的需求场景,需要在子线程中实现耗时的操作;然后回到主线程实现 UI操作
- 应用到 RxJava模型中,可理解为:
- 被观察者Observable在子线程中生产事件(如实现耗时操作等等)
- 观察者Observer在主线程接收&响应事件(即实现UI操作)
2.4.4 解决方案
- 为了解决上述冲突,即实现真正的异步操作,我们需要对RxJava进行 线程控制(也称为调度 / 切换)
- 采用 RxJava内置的线程调度器( Scheduler ),即通过功能性操作符subscribeOn() & observeOn()实现
2.4.5 如何调用代码
- Observer(观察者)的onSubscribe()方法运行在当前线程中。
- Observable(被观察者)中的subscribe()运行在subscribeOn()指定的线程中。
- Observer(观察者)的onNext()和onComplete()等方法运行在observeOn()指定的线程中。
1
2
3
4
5
6
7
8//线程控制(也称为调度 / 切换),即讲解功能性操作符中的:subscribeOn() & observeOn()
//功能性操作符subscribeOn() & observeOn()作用
//线程控制,即指定 被观察者 (Observable) / 观察者(Observer) 的工作线程类型
observable
// 1. 指定被观察者 生产事件的线程
.subscribeOn(Schedulers.io())
// 2. 指定观察者 接收 & 响应事件的线程
.observeOn(AndroidSchedulers.mainThread())
2.5 多次设置subscribeOn()的问题
这种操作会出现什么问题呢?
- 只要你测试一下,就可以知道只有第一次的subscribeOn()起作用。那么这是为啥呢?这里先留个疑问,后面源码分析会解答。
1
2
3
4.subscribeOn(Schedulers.io())//第一次
.subscribeOn(Schedulers.newThread())//第二次
.subscribeOn(Schedulers.computation())//第三次
.subscribeOn(AndroidSchedulers.mainThread())//第四次
03.一些简单思考
3.1 subscribeOn()多次调用
Observable.subscribeOn()多次调用
- 测试结果:被观察者的线程 = 第一次指定的线程 = 新的工作线程,第二次指定的线程(主线程)无效
- 若Observable.subscribeOn()多次指定被观察者。生产事件的线程,则只有第一次指定有效,其余的指定线程无效
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22MovieModel model = MovieModel.getInstance();
Disposable subscribe = model.getHomePage()
//线程控制(也称为调度 / 切换),即讲解功能性操作符中的:subscribeOn() & observeOn()
//功能性操作符subscribeOn() & observeOn()作用
//线程控制,即指定 被观察者 (Observable) / 观察者(Observer) 的工作线程类型
// 1. 指定被观察者 生产事件的线程
// 被观察者的线程 = 第一次指定的线程 = 新的工作线程,第二次指定的线程(主线程)无效
.subscribeOn(Schedulers.io()) // 第一次指定io流线程
.subscribeOn(Schedulers.newThread()) // 第二次指定被观察者线程 = 新线程
// 2. 指定观察者 接收 & 响应事件的线程
.observeOn(AndroidSchedulers.mainThread())
// 3. 最后再通过订阅(subscribe)连接观察者和被观察者
.subscribe(new Consumer<MovieBean>() {
@Override
public void accept(MovieBean movieBean) throws Exception {
LogUtils.e(TAG+"----"+"accept(MovieBean movieBean)");
if (movieBean != null && movieBean.getRet() != null) {
mView.setAdapterData(movieBean);
}
}
});
mSubscriptions.add(subscribe);
3.2 Observable.observeOn()多次指定
- 测试结果:每调用一次observeOn(),观察者的线程就会切换一次
- 若Observable.observeOn()多次指定观察者 接收 & 响应事件的线程,则每次指定均有效,即每指定一次,就会进行一次线程的切换