Android Rx基础介绍

01.Rx基础介绍

1.1 Rx简单定义

  • 简单解释
    • RxJava 是一个基于事件流、实现异步操作的库
  • GitHub官方解释
    • 代码地址:https://github.com/ReactiveX/RxJava
    • 官方解释
      • RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
      • It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
    • 翻译如下,借助有道词典
      • RxJava是反应性扩展的JavaVM实现:一个通过使用可观察序列组合异步和基于事件的程序的库。
      • 它扩展了观察者模式以支持数据/事件序列,并添加了允许您以声明方式组合序列的操作符,同时抽象出对诸如低级别线程、同步、线程安全和并发数据结构等问题的关注。

1.2 RxJava三个基本元素

  • 在 RxJava 中,一个实现了 Observer 接口的对象可以订阅一个 Observable 类的实例。

    • 订阅者对 Observable发射的任何数据或数据序列作出响应。这种模式简化了并发操作,因为它不需要阻塞等待 Observable 发射数据,而是创建了一个处于待命状态的观察者哨兵,哨兵在未来某个时刻响应 Observable 的通知。
    • RxJava 提供了一套异步编程的 API,并且支持链式调用,所以使用RxJava编写的代码的逻辑会非常简洁
  • RxJava 有以下三个最基本的元素:

    • 1.被观察者(Observable)
    • 2.观察者(Observer)
    • 3.订阅(subscribe)
  • Rxjava的扩展观察者模式中有4个角色:

    1
    2
    3
    4
    5
    角色 					作用 							类比
    被观察者(Observable) 产生事件 顾客
    观察者(Observer) 接收事件,并给出响应动作 厨房
    订阅(Subscribe) 连接 被观察者 & 观察者 服务员
    事件(Event) 被观察者 & 观察者 沟通的载体 菜式
    • 举例说明
    1
    2
    3
    4
    5
    角色 					作用 							类比
    被观察者(Observable) 产生事件 公众号
    观察者(Observer) 接收事件,并给出响应动作 程序员
    订阅(Subscribe) 连接 被观察者 & 观察者 关注公众号
    事件(Event) 被观察者 & 观察者 沟通的载体 内容文章
    • 一个微信公众号会不断产生新的内容,如果我们对这个微信公众号的内容感兴趣,就会订阅这个公众号,当公众号有新内容时,就会推送给我们。我们收到新内容时,如果是我们感兴趣的,就会点进去看下;如果是广告的话,就可能直接忽略掉。这就是我们生活中遇到的典型的观察者模式。
    • 微信公众号就是一个被观察者(Observable),不断的产生内容(事件),而程序员就是一个观察者(Observer) ,通过订阅(subscribe)就能够接受到微信公众号(被观察者)推送的内容(事件),接收文章(事件)后处理消息【可以看,也可以忽略】。
  • 创建被观察者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) {
    Log.e(TAG, "subscribe");
    Log.e(TAG, "currentThread name: " + Thread.currentThread().getName());
    e.onNext(1);
    e.onNext(2);
    e.onNext(3);
    e.onComplete();
    }
    });
  • 创建观察者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    Observer<Integer> observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
    Log.e(TAG, "onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
    Log.e(TAG, "onNext: " + integer);
    }

    @Override
    public void onError(Throwable e) {
    Log.e(TAG, "onError: " + e.getMessage());
    }

    @Override
    public void onComplete() {
    Log.e(TAG, "onComplete");
    }
    };
  • 完成观察者与被观察者之间的订阅关系

    1
    observable.subscribe(observer);
  • 也可以以链式调用的方式来完成订阅

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) {
    Log.e(TAG, "subscribe");
    Log.e(TAG, "currentThread name: " + Thread.currentThread().getName());
    e.onNext(1);
    e.onNext(2);
    e.onNext(3);
    e.onComplete();
    }
    }).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
    Log.e(TAG, "onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
    Log.e(TAG, "onNext: " + integer);
    }

    @Override
    public void onError(Throwable e) {
    Log.e(TAG, "onError: " + e.getMessage());
    }

    @Override
    public void onComplete() {
    Log.e(TAG, "onComplete");
    }
    });
  • 最终的输出结果是一样的

    1
    2
    3
    4
    5
    6
    7
    onSubscribe
    subscribe
    currentThread name: main
    onNext: 1
    onNext: 2
    onNext: 3
    onComplete
  • 被观察者发送的事件类型有以下几种

    事件种类 作用
    onNext() 发送该事件时,观察者会回调 onNext() 方法
    onError() 发送该事件时,观察者会回调 onError() 方法,当发送该事件之后,其他事件将不会继续发送
    onComplete() 发送该事件时,观察者会回调 onComplete() 方法,当发送该事件之后,其他事件将不会继续发送

2.了解一些基础概念

2.1 RxJava事件类型

  • RxJava中的事件分为三种类型:Next事件、Complete事件和Error事件。如下所示:
    • Next 常规事件 被观察者可以发送无数个Next事件,观察者也可以接受无数个Next事件
    • Complete 结束事件 被观察者发送Complete事件后可以继续发送事件,观察者收到Complete事件后将不会接受其他任何事件
    • Error 异常事件 被观察者发送Error事件后,其他事件将被终止发送,观察者收到Error事件后将不会接受其他任何事件

2.2 RxJava的消息订阅步骤

  • 这里把消息订阅步骤拆分一下,流程如下

    • 创建被观察者(Observable),定义要发送的事件。
    • 创建观察者(Observer),接受事件并做出响应操作。
    • 观察者通过订阅(subscribe)被观察者把它们连接到一起。
  • 代码如下所示

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    //1.创建被订阅者
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {

    }
    });
    //1.1线程调度
    observable.subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread());

    //2.创建订阅者
    Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(String s) {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
    };
    //3.观察者通过订阅与被观察者绑定一起
    observable.subscribe(observer);

2.3 如何切断消息

  • 如何切断消息

    • 直接先上代码。要切断消息的传递很简单,调用下Disposable的dispose()方法即可。调用dispose()之后,被观察者虽然能继续发送消息,但是观察者却收不到消息。
    1
    2
    3
    4
    //切断链接操作
    if (compositeDisposable != null && !compositeDisposable.isDisposed()) {
    compositeDisposable.dispose()
    }
  • 那么如何关联消息呢

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    private val compositeDisposable: CompositeDisposable by lazy {
    CompositeDisposable()
    }

    val disposable: Disposable = model.getHomeList(page)
    //网络请求在子线程,所以是在io线程,避免阻塞线程
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe ({ bean ->
    LogUtils.e("getHomeList-----"+"onNext")
    mView.setDataView(bean)
    }, { t ->
    LogUtils.e("getHomeList-----"+"onError"+t.localizedMessage)
    if(NetworkUtils.isConnected()){
    mView.setDataErrorView()
    }else{
    mView.setNetWorkErrorView()
    }
    }
    )
    compositeDisposable.add(disposable)

2.4 Rx调度/切换

  • 2.4.1 主要作用

    • 指定 被观察者Observable/ 观察者Observer的工作线程类型。
  • 2.4.2 为什么要进行RxJava线程控制(调度 / 切换)

    • 若被观察者 (Observable) / 观察者(Observer)在主线程被创建,那么他们的工作(生产事件 / 接收& 响应事件)就会发生在主线程
    • 因为创建被观察者 (Observable) / 观察者(Observer)的线程 = 主线程
    • 所以生产事件 / 接收& 响应事件都发生在主线程
  • 2.4.3 冲突

    • 对于一般的需求场景,需要在子线程中实现耗时的操作;然后回到主线程实现 UI操作
    • 应用到 RxJava模型中,可理解为:
    • 被观察者Observable在子线程中生产事件(如实现耗时操作等等)
    • 观察者Observer在主线程接收&响应事件(即实现UI操作)
  • 2.4.4 解决方案

    • 为了解决上述冲突,即实现真正的异步操作,我们需要对RxJava进行 线程控制(也称为调度 / 切换)
    • 采用 RxJava内置的线程调度器( Scheduler ),即通过功能性操作符subscribeOn() & observeOn()实现
    • image
  • 2.4.5 如何调用代码

    • Observer(观察者)的onSubscribe()方法运行在当前线程中。
    • Observable(被观察者)中的subscribe()运行在subscribeOn()指定的线程中。
    • Observer(观察者)的onNext()和onComplete()等方法运行在observeOn()指定的线程中。
    1
    2
    3
    4
    5
    6
    7
    8
    //线程控制(也称为调度 / 切换),即讲解功能性操作符中的:subscribeOn() & observeOn()
    //功能性操作符subscribeOn() & observeOn()作用
    //线程控制,即指定 被观察者 (Observable) / 观察者(Observer) 的工作线程类型
    observable
    // 1. 指定被观察者 生产事件的线程
    .subscribeOn(Schedulers.io())
    // 2. 指定观察者 接收 & 响应事件的线程
    .observeOn(AndroidSchedulers.mainThread())

2.5 多次设置subscribeOn()的问题

  • 这种操作会出现什么问题呢?

    • 只要你测试一下,就可以知道只有第一次的subscribeOn()起作用。那么这是为啥呢?这里先留个疑问,后面源码分析会解答。
    1
    2
    3
    4
    .subscribeOn(Schedulers.io())//第一次
    .subscribeOn(Schedulers.newThread())//第二次
    .subscribeOn(Schedulers.computation())//第三次
    .subscribeOn(AndroidSchedulers.mainThread())//第四次

03.一些简单思考

3.1 subscribeOn()多次调用

  • Observable.subscribeOn()多次调用

    • 测试结果:被观察者的线程 = 第一次指定的线程 = 新的工作线程,第二次指定的线程(主线程)无效
    • 若Observable.subscribeOn()多次指定被观察者。生产事件的线程,则只有第一次指定有效,其余的指定线程无效
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    MovieModel model = MovieModel.getInstance();
    Disposable subscribe = model.getHomePage()
    //线程控制(也称为调度 / 切换),即讲解功能性操作符中的:subscribeOn() & observeOn()
    //功能性操作符subscribeOn() & observeOn()作用
    //线程控制,即指定 被观察者 (Observable) / 观察者(Observer) 的工作线程类型
    // 1. 指定被观察者 生产事件的线程
    // 被观察者的线程 = 第一次指定的线程 = 新的工作线程,第二次指定的线程(主线程)无效
    .subscribeOn(Schedulers.io()) // 第一次指定io流线程
    .subscribeOn(Schedulers.newThread()) // 第二次指定被观察者线程 = 新线程
    // 2. 指定观察者 接收 & 响应事件的线程
    .observeOn(AndroidSchedulers.mainThread())
    // 3. 最后再通过订阅(subscribe)连接观察者和被观察者
    .subscribe(new Consumer<MovieBean>() {
    @Override
    public void accept(MovieBean movieBean) throws Exception {
    LogUtils.e(TAG+"----"+"accept(MovieBean movieBean)");
    if (movieBean != null && movieBean.getRet() != null) {
    mView.setAdapterData(movieBean);
    }
    }
    });
    mSubscriptions.add(subscribe);

3.2 Observable.observeOn()多次指定

  • 测试结果:每调用一次observeOn(),观察者的线程就会切换一次
    • 若Observable.observeOn()多次指定观察者 接收 & 响应事件的线程,则每次指定均有效,即每指定一次,就会进行一次线程的切换