Android Rx 实际应用

01.最普通的案例分析

  • 如下所示,指定线程控制后简单的调用。

    • 第一种方式
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    MovieModel model = MovieModel.getInstance();
    Disposable subscribe = model.getHomePage()
    //线程控制(也称为调度 / 切换),即讲解功能性操作符中的:subscribeOn() & observeOn()
    //功能性操作符subscribeOn() & observeOn()作用
    //线程控制,即指定 被观察者 (Observable) / 观察者(Observer) 的工作线程类型
    // 1. 指定被观察者 生产事件的线程
    .subscribeOn(Schedulers.io())
    // 2. 指定观察者 接收 & 响应事件的线程
    .observeOn(AndroidSchedulers.mainThread())
    // 3. 最后再通过订阅(subscribe)连接观察者和被观察者
    .subscribe(new Consumer<MovieBean>() {
    @Override
    public void accept(MovieBean movieBean) throws Exception {
    LogUtils.e(TAG+"----"+"accept(MovieBean movieBean)");
    if (movieBean != null && movieBean.getRet() != null) {
    mView.setAdapterData(movieBean);
    }
    }
    }, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
    LogUtils.e(TAG+"----"+"accept(Throwable throwable)");
    }
    }, new Action() {
    @Override
    public void run() throws Exception {
    LogUtils.e(TAG+"----"+"run()");
    }
    }, new Consumer<Disposable>() {
    @Override
    public void accept(Disposable disposable) throws Exception {
    LogUtils.e(TAG+"----"+"accept(Disposable disposable)");
    }
    });
    mSubscriptions.add(subscribe);
    • 第二种方式
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    GanKModel model = GanKModel.getInstance();
    model.getSearchResult(searchText, 10, mPage)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<SearchResult>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(SearchResult searchResult) {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
    });

03.网络请求嵌套回调

  • 使用场景

    • 即在第1个网络请求成功后,继续再进行一次网络请求
    • 比如 先进行用户注册的网络请求, 待注册成功后回再继续发送用户登录 的网络请求
  • 一般处理代码案例

    • 估计有些开发者一开始就是这么进行操作的。哈哈!但是感觉有点麻烦!
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    // 发送注册网络请求的函数方法 
    private void register() {
    api.register(new RegisterRequest())
    .subscribeOn(Schedulers.io()) //在IO线程进行网络请求
    .observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求结果
    .subscribe(new Consumer<RegisterResponse>() {
    @Override
    public void accept(RegisterResponse registerResponse) throws Exception {
    Toast.makeText(MainActivity.this, "注册成功", Toast.LENGTH_SHORT).show();
    login(); //注册成功, 调用登录的方法
    }
    }, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
    Toast.makeText(MainActivity.this, "注册失败", Toast.LENGTH_SHORT).show();
    }
    });
    }

    // 发送登录网络请求的函数方法
    private void login() {
    api.login(new LoginRequest())
    .subscribeOn(Schedulers.io()) //在IO线程进行网络请求
    .observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求结果
    .subscribe(new Consumer<LoginResponse>() {
    @Override
    public void accept(LoginResponse loginResponse) throws Exception {
    Toast.makeText(MainActivity.this, "登录成功", Toast.LENGTH_SHORT).show();
    }
    }, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
    Toast.makeText(MainActivity.this, "登录失败", Toast.LENGTH_SHORT).show();
    }
    });
    }
  • 网络嵌套代码案例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    // 步骤1:创建Retrofit对象
    Retrofit retrofit = new Retrofit.Builder()
    .baseUrl("http://fy.iciba.com/") // 设置 网络请求 Url
    .addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(记得加入依赖)
    .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava
    .build();

    // 步骤2:创建 网络请求接口 的实例
    GetRequest_Interface2 request = retrofit.create(GetRequest_Interface2.class);

    // 步骤3:采用Observable<...>形式 对 2个网络请求 进行封装
    observable1 = request.getCall();
    observable2 = request.getCall_2();

    observable1.subscribeOn(Schedulers.io()) // (初始被观察者)切换到IO线程进行网络请求1
    .observeOn(AndroidSchedulers.mainThread()) // (新观察者)切换到主线程 处理网络请求1的结果
    .doOnNext(new Consumer<Translation1>() {
    @Override
    public void accept(Translation1 result) throws Exception {
    Log.d(TAG, "第1次网络请求成功");
    result.show(); // 对第1次网络请求返回的结果进行操作 = 显示翻译结果
    }
    })
    .observeOn(Schedulers.io()) // (新被观察者,同时也是新观察者)切换到IO线程去发起登录请求
    // 特别注意:因为flatMap是对初始被观察者作变换,所以对于旧被观察者,它是新观察者,所以通过observeOn切换线程
    // 但对于初始观察者,它则是新的被观察者
    .flatMap(new Function<Translation1, ObservableSource<Translation2>>() { // 作变换,即作嵌套网络请求
    @Override
    public ObservableSource<Translation2> apply(Translation1 result) throws Exception {
    // 将网络请求1转换成网络请求2,即发送网络请求2
    return observable2;
    }
    })
    .observeOn(AndroidSchedulers.mainThread()) // (初始观察者)切换到主线程 处理网络请求2的结果
    .subscribe(new Consumer<Translation2>() {
    @Override
    public void accept(Translation2 result) throws Exception {
    Log.d(TAG, "第2次网络请求成功");
    result.show(); // 对第2次网络请求返回的结果进行操作 = 显示翻译结果
    }
    }, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
    System.out.println("登录失败");
    }
    });

04.合并网络请求数据

  • 代码如下所示

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    // 步骤1:创建Retrofit对象
    Retrofit retrofit = new Retrofit.Builder()
    .baseUrl("http://fy.iciba.com/") // 设置 网络请求 Url
    .addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(记得加入依赖)
    .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava
    .build();

    // 步骤2:创建 网络请求接口 的实例
    GetRequest_Interface request = retrofit.create(GetRequest_Interface.class);

    // 步骤3:采用Observable<...>形式 对 2个网络请求 进行封装
    observable1 = request.getCall().subscribeOn(Schedulers.io()); // 新开线程进行网络请求1
    observable2 = request.getCall_2().subscribeOn(Schedulers.io());// 新开线程进行网络请求2
    // 即2个网络请求异步 & 同时发送

    // 步骤4:通过使用Zip()对两个网络请求进行合并再发送
    Observable.zip(observable1, observable2,
    new BiFunction<Translation1, Translation2, String>() {
    // 注:创建BiFunction对象传入的第3个参数 = 合并后数据的数据类型
    @Override
    public String apply(Translation1 translation1,
    Translation2 translation2) throws Exception {
    return translation1.show() + " & " +translation2.show();
    }
    }).observeOn(AndroidSchedulers.mainThread()) // 在主线程接收 & 处理数据
    .subscribe(new Consumer<String>() {
    // 成功返回数据时调用
    @Override
    public void accept(String combine_infro) throws Exception {
    // 结合显示2个网络请求的数据结果
    Log.d(TAG, "最终接收到的数据是:" + combine_infro);
    }
    }, new Consumer<Throwable>() {
    // 网络请求错误时调用
    @Override
    public void accept(Throwable throwable) throws Exception {
    System.out.println("登录失败");
    }
    });

05.重复发送网络请求

  • 需求分析

    • 采用Get方法对金山词霸API按规定时间重复发送网络请求,从而模拟 轮询 需求实现
  • 实现步骤

    1
    2
    3
    4
    5
    6
    7
    8
    添加依赖
    创建 接收服务器返回数据的类
    创建 用于描述网络请求 的接口(区别于传统Retrofit形式)
    创建 Retrofit 实例
    创建 网络请求接口实例 并 配置网络请求参数(区别于传统Retrofit形式)
    发送网络请求(区别于传统Retrofit形式)
    发送网络请求
    对返回的数据进行处理
  • 代码实现步骤

    • a.通过接口请求的数据创建实体类
    • b.创建用于描述网络请求的接口
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public interface GetRequest_Interface { 

    @GET("ajax.php?a=fy&f=auto&t=auto&w=hi%20world")
    Observable<Translation> getCall();
    // 注解里传入 网络请求 的部分URL地址
    // Retrofit把网络请求的URL分成了两部分:一部分放在Retrofit对象里,另一部分放在网络请求接口里 // 如果接口里的url是一个完整的网址,那么放在Retrofit对象里的URL可以忽略
    // 采用Observable<...>接口
    // getCall()是接受网络请求数据的方法

    }
    • c.具体实现可以参考下面代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    @Override
    protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);

    /*
    * 步骤1:采用interval()延迟发送
    **/
    Observable.interval(2,1, TimeUnit.SECONDS)
    // 参数说明:
    // 参数1 = 第1次延迟时间;
    // 参数2 = 间隔时间数字;
    // 参数3 = 时间单位;
    // 该例子发送的事件特点:延迟2s后发送事件,每隔1秒产生1个数字(从0开始递增1,无限个)

    /*
    * 步骤2:每次发送数字前发送1次网络请求(doOnNext()在执行Next事件前调用)
    * 即每隔1秒产生1个数字前,就发送1次网络请求,从而实现轮询需求
    **/
    .doOnNext(new Consumer<Long>() {
    @Override
    public void accept(Long integer) throws Exception {
    Log.d(TAG, "第 " + integer + " 次轮询" );

    /*
    * 步骤3:通过Retrofit发送网络请求
    **/
    // a. 创建Retrofit对象
    Retrofit retrofit = new Retrofit.Builder()
    // 设置 网络请求 Url
    .baseUrl("http://fy.iciba.com/")
    //设置使用Gson解析(记得加入依赖)
    .addConverterFactory(GsonConverterFactory.create())
    // 支持RxJava
    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
    .build();

    // b. 创建 网络请求接口 的实例
    GetRequest_Interface request = retrofit.create(GetRequest_Interface.class);

    // c. 采用Observable<...>形式 对 网络请求 进行封装
    Observable<Translation> observable = request.getCall();
    // d. 通过线程切换发送网络请求
    // 切换到IO线程进行网络请求
    observable
    .subscribeOn(Schedulers.io())
    // 切换回到主线程 处理请求结果
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<Translation>() {
    @Override
    public void onSubscribe(Disposable d) {
    }

    @Override
    public void onNext(Translation result) {
    // e.接收服务器返回的数据
    result.show() ;
    }

    @Override
    public void onError(Throwable e) {
    Log.d(TAG, "请求失败");
    }

    @Override
    public void onComplete() {

    }
    });
    }
    })
    .subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d) {

    }
    @Override
    public void onNext(Long value) {

    }

    @Override
    public void onError(Throwable e) {
    Log.d(TAG, "对Error事件作出响应");
    }

    @Override
    public void onComplete() {
    Log.d(TAG, "对Complete事件作出响应");
    }
    });
    }

06.有条件网络请求轮询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// 步骤1:创建Retrofit对象
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/") // 设置 网络请求 Url
.addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(记得加入依赖)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava
.build();
// 步骤2:创建 网络请求接口 的实例
GetRequest_Interface request = retrofit.create(GetRequest_Interface.class);
// 步骤3:采用Observable<...>形式 对 网络请求 进行封装
Observable<Translation> observable = request.getCall();
// 步骤4:发送网络请求 & 通过repeatWhen()进行轮询
observable.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
// 在Function函数中,必须对输入的 Observable<Object>进行处理,此处使用flatMap操作符接收上游的数据
public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
// 将原始 Observable 停止发送事件的标识(Complete() / Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable)
// 以此决定是否重新订阅 & 发送原来的 Observable,即轮询
// 此处有2种情况:
// 1. 若返回1个Complete() / Error()事件,则不重新订阅 & 发送原来的 Observable,即轮询结束
// 2. 若返回其余事件,则重新订阅 & 发送原来的 Observable,即继续轮询
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Object throwable) throws Exception {
// 加入判断条件:当轮询次数 = 5次后,就停止轮询
if (i > 3) {
// 此处选择发送onError事件以结束轮询,因为可触发下游观察者的onError()方法回调
return Observable.error(new Throwable("轮询结束"));
}
// 若轮询次数<4次,则发送1Next事件以继续轮询
// 注:此处加入了delay操作符,作用 = 延迟一段时间发送(此处设置 = 2s),以实现轮询间间隔设置
return Observable.just(1).delay(2000, TimeUnit.MILLISECONDS);
}
});

}
}).subscribeOn(Schedulers.io()) // 切换到IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread()) // 切换回到主线程 处理请求结果
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
}

@Override
public void onNext(Translation result) {
// e.接收服务器返回的数据
}

@Override
public void onError(Throwable e) {
// 获取轮询结束信息
Log.d(TAG, e.toString());
}

@Override
public void onComplete() {

}
});
}

07.联合判断多个事件

  • 需求场景

    • 需要同时对多个事件进行联合判。如,填写表单时,需要表单里所有信息(姓名、年龄、职业等)都被填写后,才允许点击 “提交” 按钮
  • 功能说明

    • 此处采用填写表单作为联合判断功能展示。即,表单里所有信息(姓名、年龄、职业等)都被填写后,才允许点击 “提交” 按钮
  • 具体实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    /*
    * 步骤1:设置控件变量 & 绑定
    **/
    EditText name,age,job;
    Button list;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_combine);

    name = (EditText) findViewById(R.id.name);
    age = (EditText) findViewById(R.id.age);
    job = (EditText) findViewById(R.id.job);
    list = (Button) findViewById(R.id.list);

    /*
    * 步骤2:为每个EditText设置被观察者,用于发送监听事件
    * 说明:
    * 1. 此处采用了RxBinding,需要引入依赖:compile 'com.jakewharton.rxbinding2:rxbinding:2.0.0'
    * 2. 传入EditText控件,点击任1个EditText撰写时,都会发送数据事件 = Function3()的返回值(下面会详细说明)
    * 3. 采用skip(1)原因:跳过 一开始EditText无任何输入时的空值
    **/
    Observable<CharSequence> nameObservable = RxTextView.textChanges(name).skip(1);
    Observable<CharSequence> ageObservable = RxTextView.textChanges(age).skip(1);
    Observable<CharSequence> jobObservable = RxTextView.textChanges(job).skip(1);
    /*
    * 步骤3:通过combineLatest()合并事件 & 联合判断
    **/
    Observable.combineLatest(nameObservable,ageObservable,jobObservable,new Function3<CharSequence, CharSequence, CharSequence,Boolean>() {
    @Override
    public Boolean apply(@NonNull CharSequence charSequence, @NonNull CharSequence charSequence2, @NonNull CharSequence charSequence3) throws Exception {
    /*
    * 步骤4:规定表单信息输入不能为空
    **/
    // 1. 姓名信息
    boolean isUserNameValid = !TextUtils.isEmpty(name.getText()) ;
    // 除了设置为空,也可设置长度限制
    // boolean isUserNameValid = !TextUtils.isEmpty(name.getText()) && (name.getText().toString().length() > 2 && name.getText().toString().length() < 9);

    // 2. 年龄信息
    boolean isUserAgeValid = !TextUtils.isEmpty(age.getText());
    // 3. 职业信息
    boolean isUserJobValid = !TextUtils.isEmpty(job.getText()) ;
    /*
    * 步骤5:返回信息 = 联合判断,即3个信息同时已填写,"提交按钮"才可点击
    **/
    return isUserNameValid && isUserAgeValid && isUserJobValid;
    }
    }).subscribe(new Consumer<Boolean>() {
    @Override
    public void accept(Boolean s) throws Exception {
    /*
    * 步骤6:返回结果 & 设置按钮可点击样式
    **/
    Log.e(TAG, "提交按钮是否可点击: "+s);
    list.setEnabled(s);
    }
    });
    }

08.从本地内存网络获取数据

  • 从磁盘 / 内存缓存中 获取缓存数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    // 该2变量用于模拟内存缓存 & 磁盘缓存中的数据
    String memoryCache = null;
    String diskCache = "从磁盘缓存中获取数据";

    private String TAG = "RxJava";

    @Override
    protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_combine);
    /*
    * 设置第1个Observable:检查内存缓存是否有该数据的缓存
    **/
    Observable<String> memory = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
    // 先判断内存缓存有无数据
    if (memoryCache != null) {
    // 若有该数据,则发送
    emitter.onNext(memoryCache);
    } else {
    // 若无该数据,则直接发送结束事件
    emitter.onComplete();
    }
    }
    });

    /*
    * 设置第2个Observable:检查磁盘缓存是否有该数据的缓存
    **/
    Observable<String> disk = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
    // 先判断磁盘缓存有无数据
    if (diskCache != null) {
    // 若有该数据,则发送
    emitter.onNext(diskCache);
    } else {
    // 若无该数据,则直接发送结束事件
    emitter.onComplete();
    }
    }
    });

    /*
    * 设置第3个Observable:通过网络获取数据
    **/
    Observable<String> network = Observable.just("从网络中获取数据");
    // 此处仅作网络请求的模拟

    /*
    * 通过concat() 和 firstElement()操作符实现缓存功能
    **/
    // 1. 通过concat()合并memory、disk、network 3个被观察者的事件(即检查内存缓存、磁盘缓存 & 发送网络请求)
    // 并将它们按顺序串联成队列
    Observable.concat(memory, disk, network)
    // 2. 通过firstElement(),从串联队列中取出并发送第1个有效事件(Next事件),即依次判断检查memory、disk、network
    .firstElement()
    // 即本例的逻辑为:
    // a. firstElement()取出第1个事件 = memory,即先判断内存缓存中有无数据缓存;由于memoryCache = null,即内存缓存中无数据,所以发送结束事件(视为无效事件)
    // b. firstElement()继续取出第2个事件 = disk,即判断磁盘缓存中有无数据缓存:由于diskCache ≠ null,即磁盘缓存中有数据,所以发送Next事件(有效事件)
    // c. 即firstElement()已发出第1个有效事件(disk事件),所以停止判断。
    // 3. 观察者订阅
    .subscribe(new Consumer<String>() {
    @Override
    public void accept( String s) throws Exception {
    Log.d(TAG,"最终获取的数据来源 = "+ s);
    }
    });
    }