参考文档
RxJava-Android开发_Essentials中文翻译版_官方中文版-Android文档类资源-CSDN下载
包含:
Android开发-RxJava详解.pdf(偏向于Android开发的指导)
RxJava Essentials_中文翻译版.pdf(国外大牛Ivan.Morgillo编写的RxJava教程)
RxJava官方中文文档.pdf(比较全面,包含所有操作符的说明。这是RxJava1的文档,目前通常使用RxJava2,RxJava2基于RxJava1扩充了操作符和少量修改,整体用法和思维是一致的)
1 RxJava基本介绍
1.1 概念
? ? ? ? RxJava是采用响应式编程思想的异步操作框架,设计初衷是让异步编程变得简洁,灵活。Android 提供的 AsyncTask 和Handler ,其实都是为了让异步代码更加简洁(逻辑简洁)。 RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁(逻辑简洁),这得益于RxJava编码特征:减少嵌套、多采用链式调用,简化代码逻辑。
1.2 内容概括
? ? ? ? 两个对象 + 操作符
两个对象:被观察者、观察者
操作符(方法):
- 创建操作:create、defer、empty/never/throw、from、interval、just、range、repeat、start、timer
- 变换操作:buffer、flatMap、groupBy、map、scan、window
- 过滤操作:debounce、distinct、elementAt、filter、first、ignoreElements、Last、Sample、Skip、SkipLast、Take、TakeLast
- 结合操作:and/then/when、combineLatest、join、merge、startWith、switch、zip
- 错误处理:catch、retry
- 辅助操作:delay、do、materialize/dematerialize、observeOn、serialize、subscribe、subscribeOn、timeInterval、timeout、timestamp、using、to
- 条件和布尔操作:all/contains/amb、defaultEmpty、SequenceEqual、skipUntil/skipWhile、takeUntil/TakeWhile
- 算数和聚合操作:average/concat/reduce、max/min/count/sum
- 异步操作:start、toAsync、、、
- 连接操作:connect、publish、refCount、replay
- 转换操作:toFuture、、、
- 阻塞操作:forEach、、、
- 字符串操作:byLine、docede、、、
2 使用案例解析
2.1 操作符just、map组合
2.1.1 场景描述
? ? ? ? 输入网络图片路径path(String),通过网络请求获取原图Bitmap,然后裁剪Bitmap,并且转成Drawable。最后给文字添加背景图片Drawable。
2.1.2 主要代码
private void doRxJava1(String path) {
Log.d(TAG, "doRxJava1: start");
Observable.just(path)
// 需求1:获取图片
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(@NonNull String path) throws Exception {
Log.d(TAG, "io线程 apply需求1:正在获取网络图片,需要3秒...");
// String --> Bitmap
Bitmap bitmap = obtainPictureByHttp(path);
return bitmap;
}
})
// 需求2:裁剪图片
.map(new Function<Bitmap, Drawable>() {
@Override
public Drawable apply(@NonNull Bitmap bitmap) throws Exception {
Log.d(TAG, "io线程 apply需求2:裁剪图片");
// Bitmap --> Drawable
Drawable drawable = cropPicture(bitmap);
return drawable;
}
})
// .map(todo 实现更多需求)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Drawable>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "调用subscribe的线程 onSubscribe: ");
}
@Override
public void onNext(@NonNull Drawable drawable) {
Log.d(TAG, "main线程 onNext: 设置背景图片" + drawable);
mTextView.setBackground(drawable);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "main线程 onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "main线程 onComplete: 过程结束");
}
});
}
//********************************************************************************
//模拟耗时操作,获取网络图片:String --> Bitmap
private Bitmap obtainPictureByHttp(String path) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Drawable drawable = getDrawable(R.drawable.picture1);
Bitmap bitmap = ((BitmapDrawable) drawable).getBitmap();
return bitmap;
}
//裁剪图片: Bitmap --> Drawable
private Drawable cropPicture(Bitmap bitmap) {
// ...
Drawable drawable = new BitmapDrawable(bitmap);
return drawable;
}
2.1.3 测试结果
?2.1.4 解析
just(T...): 将传入的参数依次发送出来
map( ) — 对序列的每一项都应用一个函数来变换Observable发射的数据序列
2.2 操作符create
2.2.1 场景描述
? ? ? ? 初次获取网络文本,并显示,3秒后,再次获取网络文本,并显示。
2.2.2 主要代码
private void doRxJava2(String path) {
Log.d(TAG, "doRxJava2: start");
Observable.create(new ObservableOnSubscribe<String>() { // 所有泛型一致
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "io线程 apply操作1:正在获取网络文本text111");
emitter.onNext("text111");
Thread.sleep(3000);
Log.d(TAG, "io线程 apply操作2:正在获取网络文本text222");
emitter.onNext("text222");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "调用subscribe的线程 onSubscribe: ");
}
@Override
public void onNext(@NonNull String text) {
Log.d(TAG, "main线程 onNext: 设置文本" + text);
mTextView.setText(text);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "main线程 onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "main线程 onComplete: 过程结束");
}
});
}
2.2.3 测试结果
2.3 操作符flatMap、filter、map组合
2.3.1 场景描述
????????界面上有一个自定义的视图 imageCollectorView ,它的作用是显示多张图 片,并能使用 addImage(Bitmap) 方法来任意增加显示的图片。现在需要程序将一个给出的目录数 组 File[] folders 中每个目录下的 png 图片都加载出来并显示在 imageCollectorView 中。需要注意 的是,由于读取图片的这一过程较为耗时,需要放在后台执行,而图片的显示则必须在 UI 线程执行
2.3.2 主要代码(直接用java多线程)
new Thread() {
@Override
public void run() {
super.run();
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
imageCollectorView.addImage(bitmap);
}
});
}
}
}
}
}.start();
2.3.3 主要代码(采用RxJava)?
private void showPictures(File[] folders) {
Observable.fromArray(folders)
.flatMap(new Function<File, Observable<File>>() {
@Override
public Observable<File> apply(File file) {
return Observable.fromArray(file.listFiles());
}
})
.filter(new Predicate<File>() {
@Override
public boolean test(@NonNull File file) throws Exception {
return file.getName().endsWith(".png");
}
})
.map(new Function<File, Bitmap>() {
@Override
public Bitmap apply(File file) {
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Bitmap>() {
@Override
public void onSubscribe(@NonNull Disposable d) {}
@Override
public void onNext(@NonNull Bitmap bitmap) {
imageCollectorView.addImage(bitmap);
}
@Override
public void onError(@NonNull Throwable e) {}
@Override
public void onComplete() { }
});
}
2.4 取消订阅
每天记录学习的新知识 :Disposable和CompositeDisposable_清风徐来辽的博客-CSDN博客Disposable1.简介:rxjava虽然好用,但是总所周知,容易遭层内存泄漏。也就说在订阅了事件后没有及时取阅,导致在activity或者fragment销毁后仍然占用着内存,无法释放。而disposable便是这个订阅事件,可以用来取消订阅。2.方法调用:2.1.主动解除订阅dispose():2.2查询是否解除订阅 true 代表 已经解除订阅isDisposed():...https://blog.csdn.net/weixin_35691921/article/details/104992346
2.5 线程切换
observeOn()与subscribeOn()用于切换线程
observeOn()与subscribeOn()的详解_江东橘子的博客-CSDN博客_subscribeon和observeonRxjava 提供了subscribeOn()方法来用于每个observable对象的操作符在哪个线程上运行Rxjava 提供了ObserveOn()方法来用于每个Subscriber(Observer)对象的操作符在哪个线程上运行线程切换的时候subscribeOn()只被执行一次 。如果出现多次,那么以第一次出现是用的那个线程为准。 ObserverOnobsehttps://blog.csdn.net/michael1112/article/details/78688099
2.6?Rxjava操作符(defer,compose,retryWhen)
2.7 emitter.onNext
2.7.1 Demo代码
private void doRxJava4(String path) {
Log.d(TAG, "doRxJava2: start");
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "io线程 第一次发射 text111");
emitter.onNext("text111");
Log.d(TAG, "io线程 第二次发射 text222");
emitter.onNext("text222");
Log.d(TAG, "io线程 发射Complete信号");
emitter.onComplete();
}
})
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String text) throws Exception {
Log.d(TAG, "io线程 map " + text);
return text;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "main线程 onSubscribe: ");
}
@Override
public void onNext(@NonNull String text) {
Log.d(TAG, "main线程 onNext " + text);
mTextView.setText(text);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "main线程 onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "main线程 onComplete: 过程结束");
}
});
}
2.7.2 测试结果
2.8?observer.onNext
2.8.1 Demo代码
private void doRxJava4(String path) {
Log.d(TAG, "doRxJava2: start");
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "io线程 第一次发射 text111");
emitter.onNext("text111");
Log.d(TAG, "io线程 第二次发射 text222");
emitter.onNext("text222");
Log.d(TAG, "io线程 发射Complete信号");
emitter.onComplete();
}
})
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull String text) throws Exception {
Log.d(TAG, "io线程 flatMap apply " + text);
return new Observable<String>() {
@Override
protected void subscribeActual(Observer<? super String> observer) {
Log.d(TAG, "io线程 observer.onNext " + text);
observer.onNext(text);
observer.onNext(text);
}
};
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "main线程 onSubscribe: ");
}
@Override
public void onNext(@NonNull String text) {
Log.d(TAG, "main线程 onNext " + text);
mTextView.setText(text);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "main线程 onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "main线程 onComplete: 过程结束");
}
});
}
2.8.2 测试结果
?
|