Android Rx 源码分析
01.observable创建过程
看下创建被观察者(Observable)的过程,直接使用Observable.create()来创建Observable,看下源码
- 首先对创建的对象source进行非空判断,创建一个ObservableCreate对象出来,然后把自定义的ObservableOnSubscribe
作为参数传到ObservableCreate中去,最后就是调用 RxJavaPlugins.onAssembly()方法。 - 注意这个T使用泛型,实际开发用具体类型替代。可以看上面的代码案例!
1
2
3
4
5
6@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}- 首先对创建的对象source进行非空判断,创建一个ObservableCreate对象出来,然后把自定义的ObservableOnSubscribe
接着看看new ObservableCreate
(source)做了什么? - ObservableCreate是继承自Observable的,并且会把ObservableOnSubscribe对象给存起来。
接着看看RxJavaPlugins.onAssembly()方法源码
- 把创建的ObservableCreate的对象给返回回去
02.observer创建过程
- 看一下观察者observer创建的过程,直接通过new创建observer对象,看下源码
- 注意Observer类是一个接口,所以直接new的话,需要重写里面的几个抽象方法。
- 这里有个小小的疑惑?接口interface可以通过new来创建对象吗?如果是抽象类则是否可以new对象呢?
- 接口是可以new对象的,并且需要重写里面的抽象方法。比如Android中setOnClickListener中的listener就需要new接口,比如这里的Observer观察者就是一个接口并且可以new来创建。
- 抽象类是不可以new对象的。那么为什么呢?
03.subscribe订阅过程
调用方式observable.subscribe(observer),然后看一下源码
接着看看RxJavaPlugins类的onSubscribe()做了什么
- 把原来的observer返回而已
1
2
3
4
5
6
7
8
9@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
if (f != null) {
return apply(f, source, observer);
}
return observer;
}接着看看Observable类的subscribeActual()方法
- 抽象方法。Observable类的subscribeActual()中的方法是一个抽象方法,那么其具体实现在哪呢?回到前面创建被观察者的过程吗,最终会返回一个ObservableCreate对象,这个ObservableCreate就是Observable的子类,然后看看里面的代码……
1
protected abstract void subscribeActual(Observer<? super T> observer);
- 看ObservableCreate类中的subscribeActual方法,如下所示。
- subscribeActual()方法中首先会创建一个CreateEmitter对象,然后把自定义的观察者observer作为参数给传进去。
- 调用了observer.onSubscribe(parent),实际上就是调用观察者的onSubscribe()方法,即告诉观察者已经成功订阅到了被观察者。
1
2
3
4
5
6
7
8
9
10
11
12@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}- 接着看看CreateEmitter类
- CreateEmitter实现了ObservableEmitter接口和Disposable接口
1
2
3
4
5
6
7
8
9
10
11
12static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
}- 接着看看source.subscribe(parent)方法干什么
- 调用source.subscribe(parent),这里的source就是ObservableOnSubscribe对象,即这里会调用ObservableOnSubscribe的subscribe()方法。
- ObservableEmitter,顾名思义,就是被观察者发射器。所以,subscribe()里面的onNext()方法,onError()[异常时调用]和一个onComplete()会逐一被调用。这里的ObservableEmitter接口其具体实现为CreateEmitter,看看CreateEmitte类的onNext()方法和onComplete()的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param emitter the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
//具体定义的subscribe()方法如下
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
//yc处理某些操作
try {
//这里只是假设try-catch异常
}catch (Exception e){
emitter.onError(e);
}
emitter.onNext("潇湘剑雨yc");
emitter.onComplete();
}
});
//CreateEmitte类的onNext(),onError()方法和onComplete()的实现
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}isDisposed()方法是干什么用的呢?
- isDisposed()方法能控制消息的走向,即能够切断消息的传递
04.disposable.dispose()切断消息
Disposable是一个接口,可以理解Disposable为一个连接器,调用dispose()后,这个连接器将会中断。其具体实现在CreateEmitter类,来看下CreateEmitter的dispose()方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16//Disposable是一个接口
public interface Disposable {
void dispose();
boolean isDisposed();
}
//CreateEmitter的dispose()
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}然后看看DisposableHelper类的操作
- 看到DisposableHelper是一个枚举类,并且只有一个值:DISPOSED。dispose()方法中会把一个原子引用field设为DISPOSED,即标记为中断状态。因此后面通过isDisposed()方法即可以判断连接器是否被中断。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public enum DisposableHelper implements Disposable {
DISPOSED;
public static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
}回头看看CreateEmitter类中的onNext()方法,onError()[异常时调用]和onComplete()方法
- 如果没有dispose,observer.onNext()才会被调用到。onError()和onComplete()互斥,只能其中一个被调用到,因为调用了他们的任意一个之后都会调用dispose()。
1
2
3
4
5
6
7
8
9
10
11
12@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
//省略其他方法代码
05.线程切换源码
分为两部分:
- subscribeOn()
- observeOn()。
1
2
3
4
5
6
7
8//线程控制(也称为调度 / 切换),即讲解功能性操作符中的:subscribeOn() & observeOn()
//功能性操作符subscribeOn() & observeOn()作用
//线程控制,即指定 被观察者 (Observable) / 观察者(Observer) 的工作线程类型
observable
// 1. 指定被观察者 生产事件的线程
.subscribeOn(Schedulers.io())
// 2. 指定观察者 接收 & 响应事件的线程
.observeOn(AndroidSchedulers.mainThread())首先看看observable.subscribeOn(Schedulers.io())部分的源代码
- subscribeOn()方法要传入一个Scheduler类对象作为参数,Scheduler是一个调度类,能够延时或周期性地去执行一个任务。
- 可以看到,在subscribeOn源码中,首先会将当前的Observable(其具体实现为ObservableCreate)包装成一个新的ObservableSubscribeOn对象。RxJavaPlugins.onAssembly()也是将ObservableSubscribeOn对象原样返回而已。
1
2
3
4
5
6@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}- 接着看看Schedulers.io()源码。Schedulers.io()中使用了静态内部类的方式来创建出了一个单例IoScheduler对象出来,这个IoScheduler是继承自Scheduler的。这里mark一发,后面会用到这个IoScheduler的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25//Schedulers.io()
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
//静态代码块
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
//看看IO = RxJavaPlugins.initIoScheduler(new IOTask())中的new IOTask()方法源码
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}多次调用subscribeOn方法为啥最终只会执行第一次设置,也就是2.5问题。为啥呢?
- 每调用一次subscribeOn()就会把旧的被观察者包装成一个新的被观察者,经过了四次调用之后,就变成了下面这个样子:图片摘自网络,非常棒。
- 被观察者被订阅时是从最外面的一层通知到里面的一层,那么当传到上图第三层时,也就是ObservableSubscribeOn(第一次)那一层时,管你之前是在哪个线程,subscribeOn(Schedulers.io())都会把线程切到IO线程中去执行,所以多次设置subscribeOn()时,只有第一次生效。
接着看看observeOn()方法源码
- 常见使用:.observeOn(AndroidSchedulers.mainThread()),大意是指定在Android主线程中执行。看源码可知,这里也是新包装一个ObservableObserveOn对象
1
2
3
4
5
6
7
8
9
10
11
12
13@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
06.观察者和被观察者总结
- 被订阅者创建过程总结
- Observable.create()中就是把我们自定义的ObservableOnSubscribe对象重新包装成一个ObservableCreate对象,然后返回这个ObservableCreate对象。
- 订阅者订阅过程总结
- Observable(被观察者)和Observer(观察者)建立连接(订阅)之后,会创建出一个发射器CreateEmitter,发射器会把被观察者中产生的事件发送到观察者中去,观察者对发射器中发出的事件做出响应处理。
- 观察者没有订阅时被观察者是不会发送消息的,订阅之后,Observable(被观察者)才会开始发送事件。