Android Rx 实际应用
01.最普通的案例分析
如下所示,指定线程控制后简单的调用。
- 第一种方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35MovieModel model = MovieModel.getInstance();
Disposable subscribe = model.getHomePage()
//线程控制(也称为调度 / 切换),即讲解功能性操作符中的:subscribeOn() & observeOn()
//功能性操作符subscribeOn() & observeOn()作用
//线程控制,即指定 被观察者 (Observable) / 观察者(Observer) 的工作线程类型
// 1. 指定被观察者 生产事件的线程
.subscribeOn(Schedulers.io())
// 2. 指定观察者 接收 & 响应事件的线程
.observeOn(AndroidSchedulers.mainThread())
// 3. 最后再通过订阅(subscribe)连接观察者和被观察者
.subscribe(new Consumer<MovieBean>() {
@Override
public void accept(MovieBean movieBean) throws Exception {
LogUtils.e(TAG+"----"+"accept(MovieBean movieBean)");
if (movieBean != null && movieBean.getRet() != null) {
mView.setAdapterData(movieBean);
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
LogUtils.e(TAG+"----"+"accept(Throwable throwable)");
}
}, new Action() {
@Override
public void run() throws Exception {
LogUtils.e(TAG+"----"+"run()");
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
LogUtils.e(TAG+"----"+"accept(Disposable disposable)");
}
});
mSubscriptions.add(subscribe);- 第二种方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25GanKModel model = GanKModel.getInstance();
model.getSearchResult(searchText, 10, mPage)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<SearchResult>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(SearchResult searchResult) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
03.网络请求嵌套回调
使用场景
- 即在第1个网络请求成功后,继续再进行一次网络请求
- 比如 先进行用户注册的网络请求, 待注册成功后回再继续发送用户登录 的网络请求
一般处理代码案例
- 估计有些开发者一开始就是这么进行操作的。哈哈!但是感觉有点麻烦!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36// 发送注册网络请求的函数方法
private void register() {
api.register(new RegisterRequest())
.subscribeOn(Schedulers.io()) //在IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求结果
.subscribe(new Consumer<RegisterResponse>() {
@Override
public void accept(RegisterResponse registerResponse) throws Exception {
Toast.makeText(MainActivity.this, "注册成功", Toast.LENGTH_SHORT).show();
login(); //注册成功, 调用登录的方法
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Toast.makeText(MainActivity.this, "注册失败", Toast.LENGTH_SHORT).show();
}
});
}
// 发送登录网络请求的函数方法
private void login() {
api.login(new LoginRequest())
.subscribeOn(Schedulers.io()) //在IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求结果
.subscribe(new Consumer<LoginResponse>() {
@Override
public void accept(LoginResponse loginResponse) throws Exception {
Toast.makeText(MainActivity.this, "登录成功", Toast.LENGTH_SHORT).show();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Toast.makeText(MainActivity.this, "登录失败", Toast.LENGTH_SHORT).show();
}
});
}网络嵌套代码案例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46// 步骤1:创建Retrofit对象
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/") // 设置 网络请求 Url
.addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(记得加入依赖)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava
.build();
// 步骤2:创建 网络请求接口 的实例
GetRequest_Interface2 request = retrofit.create(GetRequest_Interface2.class);
// 步骤3:采用Observable<...>形式 对 2个网络请求 进行封装
observable1 = request.getCall();
observable2 = request.getCall_2();
observable1.subscribeOn(Schedulers.io()) // (初始被观察者)切换到IO线程进行网络请求1
.observeOn(AndroidSchedulers.mainThread()) // (新观察者)切换到主线程 处理网络请求1的结果
.doOnNext(new Consumer<Translation1>() {
@Override
public void accept(Translation1 result) throws Exception {
Log.d(TAG, "第1次网络请求成功");
result.show(); // 对第1次网络请求返回的结果进行操作 = 显示翻译结果
}
})
.observeOn(Schedulers.io()) // (新被观察者,同时也是新观察者)切换到IO线程去发起登录请求
// 特别注意:因为flatMap是对初始被观察者作变换,所以对于旧被观察者,它是新观察者,所以通过observeOn切换线程
// 但对于初始观察者,它则是新的被观察者
.flatMap(new Function<Translation1, ObservableSource<Translation2>>() { // 作变换,即作嵌套网络请求
@Override
public ObservableSource<Translation2> apply(Translation1 result) throws Exception {
// 将网络请求1转换成网络请求2,即发送网络请求2
return observable2;
}
})
.observeOn(AndroidSchedulers.mainThread()) // (初始观察者)切换到主线程 处理网络请求2的结果
.subscribe(new Consumer<Translation2>() {
@Override
public void accept(Translation2 result) throws Exception {
Log.d(TAG, "第2次网络请求成功");
result.show(); // 对第2次网络请求返回的结果进行操作 = 显示翻译结果
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("登录失败");
}
});
04.合并网络请求数据
代码如下所示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39// 步骤1:创建Retrofit对象
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/") // 设置 网络请求 Url
.addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(记得加入依赖)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava
.build();
// 步骤2:创建 网络请求接口 的实例
GetRequest_Interface request = retrofit.create(GetRequest_Interface.class);
// 步骤3:采用Observable<...>形式 对 2个网络请求 进行封装
observable1 = request.getCall().subscribeOn(Schedulers.io()); // 新开线程进行网络请求1
observable2 = request.getCall_2().subscribeOn(Schedulers.io());// 新开线程进行网络请求2
// 即2个网络请求异步 & 同时发送
// 步骤4:通过使用Zip()对两个网络请求进行合并再发送
Observable.zip(observable1, observable2,
new BiFunction<Translation1, Translation2, String>() {
// 注:创建BiFunction对象传入的第3个参数 = 合并后数据的数据类型
@Override
public String apply(Translation1 translation1,
Translation2 translation2) throws Exception {
return translation1.show() + " & " +translation2.show();
}
}).observeOn(AndroidSchedulers.mainThread()) // 在主线程接收 & 处理数据
.subscribe(new Consumer<String>() {
// 成功返回数据时调用
@Override
public void accept(String combine_infro) throws Exception {
// 结合显示2个网络请求的数据结果
Log.d(TAG, "最终接收到的数据是:" + combine_infro);
}
}, new Consumer<Throwable>() {
// 网络请求错误时调用
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("登录失败");
}
});
05.重复发送网络请求
需求分析
- 采用Get方法对金山词霸API按规定时间重复发送网络请求,从而模拟 轮询 需求实现
实现步骤
1
2
3
4
5
6
7
8添加依赖
创建 接收服务器返回数据的类
创建 用于描述网络请求 的接口(区别于传统Retrofit形式)
创建 Retrofit 实例
创建 网络请求接口实例 并 配置网络请求参数(区别于传统Retrofit形式)
发送网络请求(区别于传统Retrofit形式)
发送网络请求
对返回的数据进行处理代码实现步骤
- a.通过接口请求的数据创建实体类
- b.创建用于描述网络请求的接口
1
2
3
4
5
6
7
8
9
10public interface GetRequest_Interface {
@GET("ajax.php?a=fy&f=auto&t=auto&w=hi%20world")
Observable<Translation> getCall();
// 注解里传入 网络请求 的部分URL地址
// Retrofit把网络请求的URL分成了两部分:一部分放在Retrofit对象里,另一部分放在网络请求接口里 // 如果接口里的url是一个完整的网址,那么放在Retrofit对象里的URL可以忽略
// 采用Observable<...>接口
// getCall()是接受网络请求数据的方法
}- c.具体实现可以参考下面代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
/*
* 步骤1:采用interval()延迟发送
**/
Observable.interval(2,1, TimeUnit.SECONDS)
// 参数说明:
// 参数1 = 第1次延迟时间;
// 参数2 = 间隔时间数字;
// 参数3 = 时间单位;
// 该例子发送的事件特点:延迟2s后发送事件,每隔1秒产生1个数字(从0开始递增1,无限个)
/*
* 步骤2:每次发送数字前发送1次网络请求(doOnNext()在执行Next事件前调用)
* 即每隔1秒产生1个数字前,就发送1次网络请求,从而实现轮询需求
**/
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long integer) throws Exception {
Log.d(TAG, "第 " + integer + " 次轮询" );
/*
* 步骤3:通过Retrofit发送网络请求
**/
// a. 创建Retrofit对象
Retrofit retrofit = new Retrofit.Builder()
// 设置 网络请求 Url
.baseUrl("http://fy.iciba.com/")
//设置使用Gson解析(记得加入依赖)
.addConverterFactory(GsonConverterFactory.create())
// 支持RxJava
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
// b. 创建 网络请求接口 的实例
GetRequest_Interface request = retrofit.create(GetRequest_Interface.class);
// c. 采用Observable<...>形式 对 网络请求 进行封装
Observable<Translation> observable = request.getCall();
// d. 通过线程切换发送网络请求
// 切换到IO线程进行网络请求
observable
.subscribeOn(Schedulers.io())
// 切换回到主线程 处理请求结果
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Translation result) {
// e.接收服务器返回的数据
result.show() ;
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "请求失败");
}
@Override
public void onComplete() {
}
});
}
})
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long value) {
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
}
06.有条件网络请求轮询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// 步骤1:创建Retrofit对象
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/") // 设置 网络请求 Url
.addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(记得加入依赖)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava
.build();
// 步骤2:创建 网络请求接口 的实例
GetRequest_Interface request = retrofit.create(GetRequest_Interface.class);
// 步骤3:采用Observable<...>形式 对 网络请求 进行封装
Observable<Translation> observable = request.getCall();
// 步骤4:发送网络请求 & 通过repeatWhen()进行轮询
observable.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
// 在Function函数中,必须对输入的 Observable<Object>进行处理,此处使用flatMap操作符接收上游的数据
public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
// 将原始 Observable 停止发送事件的标识(Complete() / Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable)
// 以此决定是否重新订阅 & 发送原来的 Observable,即轮询
// 此处有2种情况:
// 1. 若返回1个Complete() / Error()事件,则不重新订阅 & 发送原来的 Observable,即轮询结束
// 2. 若返回其余事件,则重新订阅 & 发送原来的 Observable,即继续轮询
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Object throwable) throws Exception {
// 加入判断条件:当轮询次数 = 5次后,就停止轮询
if (i > 3) {
// 此处选择发送onError事件以结束轮询,因为可触发下游观察者的onError()方法回调
return Observable.error(new Throwable("轮询结束"));
}
// 若轮询次数<4次,则发送1Next事件以继续轮询
// 注:此处加入了delay操作符,作用 = 延迟一段时间发送(此处设置 = 2s),以实现轮询间间隔设置
return Observable.just(1).delay(2000, TimeUnit.MILLISECONDS);
}
});
}
}).subscribeOn(Schedulers.io()) // 切换到IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread()) // 切换回到主线程 处理请求结果
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Translation result) {
// e.接收服务器返回的数据
}
@Override
public void onError(Throwable e) {
// 获取轮询结束信息
Log.d(TAG, e.toString());
}
@Override
public void onComplete() {
}
});
}
07.联合判断多个事件
需求场景
- 需要同时对多个事件进行联合判。如,填写表单时,需要表单里所有信息(姓名、年龄、职业等)都被填写后,才允许点击 “提交” 按钮
功能说明
- 此处采用填写表单作为联合判断功能展示。即,表单里所有信息(姓名、年龄、职业等)都被填写后,才允许点击 “提交” 按钮
具体实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60/*
* 步骤1:设置控件变量 & 绑定
**/
EditText name,age,job;
Button list;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_combine);
name = (EditText) findViewById(R.id.name);
age = (EditText) findViewById(R.id.age);
job = (EditText) findViewById(R.id.job);
list = (Button) findViewById(R.id.list);
/*
* 步骤2:为每个EditText设置被观察者,用于发送监听事件
* 说明:
* 1. 此处采用了RxBinding,需要引入依赖:compile 'com.jakewharton.rxbinding2:rxbinding:2.0.0'
* 2. 传入EditText控件,点击任1个EditText撰写时,都会发送数据事件 = Function3()的返回值(下面会详细说明)
* 3. 采用skip(1)原因:跳过 一开始EditText无任何输入时的空值
**/
Observable<CharSequence> nameObservable = RxTextView.textChanges(name).skip(1);
Observable<CharSequence> ageObservable = RxTextView.textChanges(age).skip(1);
Observable<CharSequence> jobObservable = RxTextView.textChanges(job).skip(1);
/*
* 步骤3:通过combineLatest()合并事件 & 联合判断
**/
Observable.combineLatest(nameObservable,ageObservable,jobObservable,new Function3<CharSequence, CharSequence, CharSequence,Boolean>() {
@Override
public Boolean apply(@NonNull CharSequence charSequence, @NonNull CharSequence charSequence2, @NonNull CharSequence charSequence3) throws Exception {
/*
* 步骤4:规定表单信息输入不能为空
**/
// 1. 姓名信息
boolean isUserNameValid = !TextUtils.isEmpty(name.getText()) ;
// 除了设置为空,也可设置长度限制
// boolean isUserNameValid = !TextUtils.isEmpty(name.getText()) && (name.getText().toString().length() > 2 && name.getText().toString().length() < 9);
// 2. 年龄信息
boolean isUserAgeValid = !TextUtils.isEmpty(age.getText());
// 3. 职业信息
boolean isUserJobValid = !TextUtils.isEmpty(job.getText()) ;
/*
* 步骤5:返回信息 = 联合判断,即3个信息同时已填写,"提交按钮"才可点击
**/
return isUserNameValid && isUserAgeValid && isUserJobValid;
}
}).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean s) throws Exception {
/*
* 步骤6:返回结果 & 设置按钮可点击样式
**/
Log.e(TAG, "提交按钮是否可点击: "+s);
list.setEnabled(s);
}
});
}
08.从本地内存网络获取数据
从磁盘 / 内存缓存中 获取缓存数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70// 该2变量用于模拟内存缓存 & 磁盘缓存中的数据
String memoryCache = null;
String diskCache = "从磁盘缓存中获取数据";
private String TAG = "RxJava";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_combine);
/*
* 设置第1个Observable:检查内存缓存是否有该数据的缓存
**/
Observable<String> memory = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
// 先判断内存缓存有无数据
if (memoryCache != null) {
// 若有该数据,则发送
emitter.onNext(memoryCache);
} else {
// 若无该数据,则直接发送结束事件
emitter.onComplete();
}
}
});
/*
* 设置第2个Observable:检查磁盘缓存是否有该数据的缓存
**/
Observable<String> disk = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
// 先判断磁盘缓存有无数据
if (diskCache != null) {
// 若有该数据,则发送
emitter.onNext(diskCache);
} else {
// 若无该数据,则直接发送结束事件
emitter.onComplete();
}
}
});
/*
* 设置第3个Observable:通过网络获取数据
**/
Observable<String> network = Observable.just("从网络中获取数据");
// 此处仅作网络请求的模拟
/*
* 通过concat() 和 firstElement()操作符实现缓存功能
**/
// 1. 通过concat()合并memory、disk、network 3个被观察者的事件(即检查内存缓存、磁盘缓存 & 发送网络请求)
// 并将它们按顺序串联成队列
Observable.concat(memory, disk, network)
// 2. 通过firstElement(),从串联队列中取出并发送第1个有效事件(Next事件),即依次判断检查memory、disk、network
.firstElement()
// 即本例的逻辑为:
// a. firstElement()取出第1个事件 = memory,即先判断内存缓存中有无数据缓存;由于memoryCache = null,即内存缓存中无数据,所以发送结束事件(视为无效事件)
// b. firstElement()继续取出第2个事件 = disk,即判断磁盘缓存中有无数据缓存:由于diskCache ≠ null,即磁盘缓存中有数据,所以发送Next事件(有效事件)
// c. 即firstElement()已发出第1个有效事件(disk事件),所以停止判断。
// 3. 观察者订阅
.subscribe(new Consumer<String>() {
@Override
public void accept( String s) throws Exception {
Log.d(TAG,"最终获取的数据来源 = "+ s);
}
});
}