- 依赖
- 五种被观察者
- 订阅者
- 创建操作符
- 转换操作符
- 组合操作符
- 功能操作符
- 过滤操作符
- 条件操作符
- 自定义Observer
- 对上下游所处线程进行封装
1.依赖
implementation 'io.reactivex.rxjava2:rxjava:2.2.21'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
implementation 'com.jakewharton.rxbinding2:rxbinding:2.1.1'
2.五种被观察者 (1)Observer
Observer observer = new Observer<Object>() {
private Disposable disposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("onSubscribe()");
disposable = d;
}
@Override
public void onNext(@NonNull Object o) {
System.out.println("onNext()" + o);
disposable.dispose();
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("onError()" + e);
}
@Override
public void onComplete() {
System.out.println("onComplete()");
}
};
3.订阅者
4.创建操作符 (1) create()
(2)just() 简化Create操作,最多只能传入10个参数 (3)fromArray() 简化Create操作 可能传入无限个参数
(4)fromFuture() 与线程安全有关 (5)fromIterable() 可以传入继承了Iterable接口的集合 (6)range() 可以方便确定范围
5.转换操作符 (1)map() 可以将被观察者发送的数据类型转变为其他类型
(2)flatMap() 可以将事件序列中的元素进行加工,一一返回新的被观察者(s)
可以处理嵌套for循环、嵌套网络请求的情况
(3)concatMap() 在flatMap的基础上保证转发出来的事件是有序的
(4)buffer() 从需要发射的事件序列中放进缓存区,达一定数量时一起发送
6.组合操作符 (1)concat() 将多个被观察者所需要发送的事件组合在一个被观察者中,一起发送(即调用一个被观察者,多次调用onNext()方法)最多发送4个
(2)concatArray() 在concat()的基础上可以传无限个被观察者
(3)merge() 在concat()的基础上,实现并行操作
7.功能操作符 (1)subscribeOn() 确定上游事件所处线程 (2)observeOn() 确定下游事件所处线程 (3)参数
- Schedulers.newThread() — 总是启用新线程,并在新线程中执行操作;多用于耗时操作
- Schedulers.io() — 通常用于网络、读写文件等IO密集型的操作,行为模式和new Thread()差不多,只是IO的内部是一个无上限的线程池,可重用空闲的线程,更高效
- AndroidSchedulers.mainThread() — Android的主线程;用于更新UI
- Schedulers.computation() — 代表CPU计算密集型的操作
8.过滤操作符 (1)filter() 满足条件true则通过;不满足条件false过滤掉
9.条件操作符
10.自定义Observer
- 继承Observer
- 确定泛型类型
- 重写4个主要方法
- 使用模板设计模式
abstract class APIResponse<T>(val context : Context)
: Observer<LoginResponseWrapper<T>>{
private var isShow = true
constructor(context: Context,isShow : Boolean) : this(context){
this.isShow = isShow
}
abstract fun success(data : T ?)
abstract fun failure(errorMsg: String ? )
override fun onSubscribe(d: Disposable) {
if (isShow) {
LoadingDialog.show(context)
}
}
override fun onNext(t: LoginResponseWrapper<T>) {
if (t.data == null) {
failure("登录失败了,请检查原因:msg:${t.errorMsg}")
} else {
success(t.data)
}
}
override fun onError(e: Throwable) {
LoadingDialog.cancel()
failure(e.message)
}
override fun onComplete() {
LoadingDialog.cancel()
}
}
11.对上下游所处线程进行封装
private static <UD> ObservableTransformer<UD, UD> rxud() {
return new ObservableTransformer<UD, UD>() {
@Override
public ObservableSource<UD> apply(Observable<UD> upstream) {
return upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<UD, UD>() {
@Override
public UD apply(UD ud) throws Exception {
Log.d(TAG, "apply: 我监听到你了,居然再执行");
return ud;
}
});
}
};
}
|