Android Rx 源码分析

01.observable创建过程

  • 看下创建被观察者(Observable)的过程,直接使用Observable.create()来创建Observable,看下源码

    • 首先对创建的对象source进行非空判断,创建一个ObservableCreate对象出来,然后把自定义的ObservableOnSubscribe作为参数传到ObservableCreate中去,最后就是调用 RxJavaPlugins.onAssembly()方法。
    • 注意这个T使用泛型,实际开发用具体类型替代。可以看上面的代码案例!
    1
    2
    3
    4
    5
    6
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
  • 接着看看new ObservableCreate(source)做了什么?

    • ObservableCreate是继承自Observable的,并且会把ObservableOnSubscribe对象给存起来。
    • image
  • 接着看看RxJavaPlugins.onAssembly()方法源码

    • 把创建的ObservableCreate的对象给返回回去
    • image

02.observer创建过程

  • 看一下观察者observer创建的过程,直接通过new创建observer对象,看下源码
    • 注意Observer类是一个接口,所以直接new的话,需要重写里面的几个抽象方法。
    • image
  • 这里有个小小的疑惑?接口interface可以通过new来创建对象吗?如果是抽象类则是否可以new对象呢?
    • 接口是可以new对象的,并且需要重写里面的抽象方法。比如Android中setOnClickListener中的listener就需要new接口,比如这里的Observer观察者就是一个接口并且可以new来创建。
    • 抽象类是不可以new对象的。那么为什么呢?

03.subscribe订阅过程

  • 调用方式observable.subscribe(observer),然后看一下源码

    • image
  • 接着看看RxJavaPlugins类的onSubscribe()做了什么

    • 把原来的observer返回而已
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @SuppressWarnings({ "rawtypes", "unchecked" })
    @NonNull
    public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
    BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
    if (f != null) {
    return apply(f, source, observer);
    }
    return observer;
    }
  • 接着看看Observable类的subscribeActual()方法

    • 抽象方法。Observable类的subscribeActual()中的方法是一个抽象方法,那么其具体实现在哪呢?回到前面创建被观察者的过程吗,最终会返回一个ObservableCreate对象,这个ObservableCreate就是Observable的子类,然后看看里面的代码……
    1
    protected abstract void subscribeActual(Observer<? super T> observer);
    • 看ObservableCreate类中的subscribeActual方法,如下所示。
      • subscribeActual()方法中首先会创建一个CreateEmitter对象,然后把自定义的观察者observer作为参数给传进去。
      • 调用了observer.onSubscribe(parent),实际上就是调用观察者的onSubscribe()方法,即告诉观察者已经成功订阅到了被观察者。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
    source.subscribe(parent);
    } catch (Throwable ex) {
    Exceptions.throwIfFatal(ex);
    parent.onError(ex);
    }
    }
    • 接着看看CreateEmitter类
      • CreateEmitter实现了ObservableEmitter接口和Disposable接口
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

    private static final long serialVersionUID = -3434801548987643227L;

    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
    this.observer = observer;
    }
    }
    • 接着看看source.subscribe(parent)方法干什么
      • 调用source.subscribe(parent),这里的source就是ObservableOnSubscribe对象,即这里会调用ObservableOnSubscribe的subscribe()方法。
      • ObservableEmitter,顾名思义,就是被观察者发射器。所以,subscribe()里面的onNext()方法,onError()[异常时调用]和一个onComplete()会逐一被调用。这里的ObservableEmitter接口其具体实现为CreateEmitter,看看CreateEmitte类的onNext()方法和onComplete()的实现:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    public interface ObservableOnSubscribe<T> {

    /**
    * Called for each Observer that subscribes.
    * @param emitter the safe emitter instance, never null
    * @throws Exception on error
    */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
    }

    //具体定义的subscribe()方法如下
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
    //yc处理某些操作
    try {
    //这里只是假设try-catch异常
    }catch (Exception e){
    emitter.onError(e);
    }
    emitter.onNext("潇湘剑雨yc");
    emitter.onComplete();
    }
    });



    //CreateEmitte类的onNext(),onError()方法和onComplete()的实现
    @Override
    public void onNext(T t) {
    if (t == null) {
    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
    return;
    }
    if (!isDisposed()) {
    observer.onNext(t);
    }
    }

    @Override
    public void onError(Throwable t) {
    if (!tryOnError(t)) {
    RxJavaPlugins.onError(t);
    }
    }

    @Override
    public boolean tryOnError(Throwable t) {
    if (t == null) {
    t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
    }
    if (!isDisposed()) {
    try {
    observer.onError(t);
    } finally {
    dispose();
    }
    return true;
    }
    return false;
    }

    @Override
    public void onComplete() {
    if (!isDisposed()) {
    try {
    observer.onComplete();
    } finally {
    dispose();
    }
    }
    }
  • isDisposed()方法是干什么用的呢?

    • isDisposed()方法能控制消息的走向,即能够切断消息的传递

04.disposable.dispose()切断消息

  • Disposable是一个接口,可以理解Disposable为一个连接器,调用dispose()后,这个连接器将会中断。其具体实现在CreateEmitter类,来看下CreateEmitter的dispose()方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    //Disposable是一个接口
    public interface Disposable {
    void dispose();
    boolean isDisposed();
    }

    //CreateEmitter的dispose()
    @Override
    public void dispose() {
    DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
    return DisposableHelper.isDisposed(get());
    }
  • 然后看看DisposableHelper类的操作

    • 看到DisposableHelper是一个枚举类,并且只有一个值:DISPOSED。dispose()方法中会把一个原子引用field设为DISPOSED,即标记为中断状态。因此后面通过isDisposed()方法即可以判断连接器是否被中断。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public enum DisposableHelper implements Disposable {
    DISPOSED;

    public static boolean isDisposed(Disposable d) {
    return d == DISPOSED;
    }

    public static boolean dispose(AtomicReference<Disposable> field) {
    Disposable current = field.get();
    Disposable d = DISPOSED;
    if (current != d) {
    current = field.getAndSet(d);
    if (current != d) {
    if (current != null) {
    current.dispose();
    }
    return true;
    }
    }
    return false;
    }
    }
  • 回头看看CreateEmitter类中的onNext()方法,onError()[异常时调用]和onComplete()方法

    • 如果没有dispose,observer.onNext()才会被调用到。onError()和onComplete()互斥,只能其中一个被调用到,因为调用了他们的任意一个之后都会调用dispose()。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Override
    public void onNext(T t) {
    if (t == null) {
    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
    return;
    }
    if (!isDisposed()) {
    observer.onNext(t);
    }
    }

    //省略其他方法代码

05.线程切换源码

  • 分为两部分:

    • subscribeOn()
    • observeOn()。
    1
    2
    3
    4
    5
    6
    7
    8
    //线程控制(也称为调度 / 切换),即讲解功能性操作符中的:subscribeOn() & observeOn()
    //功能性操作符subscribeOn() & observeOn()作用
    //线程控制,即指定 被观察者 (Observable) / 观察者(Observer) 的工作线程类型
    observable
    // 1. 指定被观察者 生产事件的线程
    .subscribeOn(Schedulers.io())
    // 2. 指定观察者 接收 & 响应事件的线程
    .observeOn(AndroidSchedulers.mainThread())
  • 首先看看observable.subscribeOn(Schedulers.io())部分的源代码

    • subscribeOn()方法要传入一个Scheduler类对象作为参数,Scheduler是一个调度类,能够延时或周期性地去执行一个任务。
    • 可以看到,在subscribeOn源码中,首先会将当前的Observable(其具体实现为ObservableCreate)包装成一个新的ObservableSubscribeOn对象。RxJavaPlugins.onAssembly()也是将ObservableSubscribeOn对象原样返回而已。
    1
    2
    3
    4
    5
    6
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    • 接着看看Schedulers.io()源码。Schedulers.io()中使用了静态内部类的方式来创建出了一个单例IoScheduler对象出来,这个IoScheduler是继承自Scheduler的。这里mark一发,后面会用到这个IoScheduler的。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    //Schedulers.io()
    @NonNull
    public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
    }

    //静态代码块
    static {
    SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
    COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
    IO = RxJavaPlugins.initIoScheduler(new IOTask());
    TRAMPOLINE = TrampolineScheduler.instance();
    NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }

    //看看IO = RxJavaPlugins.initIoScheduler(new IOTask())中的new IOTask()方法源码
    static final class IOTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
    return IoHolder.DEFAULT;
    }
    }
    static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
    }
  • 多次调用subscribeOn方法为啥最终只会执行第一次设置,也就是2.5问题。为啥呢?

    • 每调用一次subscribeOn()就会把旧的被观察者包装成一个新的被观察者,经过了四次调用之后,就变成了下面这个样子:图片摘自网络,非常棒。
    • 被观察者被订阅时是从最外面的一层通知到里面的一层,那么当传到上图第三层时,也就是ObservableSubscribeOn(第一次)那一层时,管你之前是在哪个线程,subscribeOn(Schedulers.io())都会把线程切到IO线程中去执行,所以多次设置subscribeOn()时,只有第一次生效。
    • image
  • 接着看看observeOn()方法源码

    • 常见使用:.observeOn(AndroidSchedulers.mainThread()),大意是指定在Android主线程中执行。看源码可知,这里也是新包装一个ObservableObserveOn对象
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
    }

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

06.观察者和被观察者总结

  • 被订阅者创建过程总结
    • Observable.create()中就是把我们自定义的ObservableOnSubscribe对象重新包装成一个ObservableCreate对象,然后返回这个ObservableCreate对象。
  • 订阅者订阅过程总结
    • Observable(被观察者)和Observer(观察者)建立连接(订阅)之后,会创建出一个发射器CreateEmitter,发射器会把被观察者中产生的事件发送到观察者中去,观察者对发射器中发出的事件做出响应处理。
    • 观察者没有订阅时被观察者是不会发送消息的,订阅之后,Observable(被观察者)才会开始发送事件。