IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 移动开发 -> RxJava 两种生产和消费模式,(冷)cold和(热)hot -> 正文阅读

[移动开发]RxJava 两种生产和消费模式,(冷)cold和(热)hot

RxJava目前有两种发布和订阅模式

第一种 cold模式,这种模式观察者订阅被观察模式时,被观察者的动作会重放,举例说明:

@NonNull
		Flowable<@NonNull Object> observeOn =Flowable.create(e -> {
			e.onNext(1); Thread.sleep(1000);
			e.onNext(2); Thread.sleep(1000);
			e.onNext(3); Thread.sleep(1000);
			e.onNext(4); Thread.sleep(1000);
			e.onNext(5); Thread.sleep(1000);
			e.onNext(6); Thread.sleep(1000);
		}, BackpressureStrategy.BUFFER).observeOn(Schedulers.computation());
		Thread.sleep(1000 * 1);
		observeOn.subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
		Thread.sleep(1000 * 1);
		observeOn.subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));

这种会大音两次1,2,3,4,5,6,即使时不同的线程,第二次会等第一次完成过后开始(因为未设置subscribeOn,所以是单线程的,第二次会在第一次完成后再开始)。而且是当观察者订阅被观察者的时候触发被观察者的动作。

RxComputationThreadPool-1 + next + 1
RxComputationThreadPool-1 + next + 2
RxComputationThreadPool-1 + next + 3
RxComputationThreadPool-1 + next + 4
RxComputationThreadPool-1 + next + 5
RxComputationThreadPool-1 + next + 6
RxComputationThreadPool-2 + next + 1
RxComputationThreadPool-2 + next + 2
RxComputationThreadPool-2 + next + 3
RxComputationThreadPool-2 + next + 4
RxComputationThreadPool-2 + next + 5
RxComputationThreadPool-2 + next + 6

我们create中lambda的方法会再次运行

相似的ReplaySubject

还有一种当订阅后会重新处理已发送的数据ReplaySubject

ReplaySubject<Integer> replaySubject = ReplaySubject.create(1);
		replaySubject.onNext(1);
		replaySubject.onNext(2);
		replaySubject.onNext(3);
		System.out.println("subscribe 1");
		replaySubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
		replaySubject.onNext(4);
		replaySubject.onNext(5);
		System.out.println("subscribe 2");
		replaySubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
		replaySubject.onNext(6);

控制台输出为

subscribe 1
subscribe 2
RxComputationThreadPool-1 + next + 1
RxComputationThreadPool-1 + next + 2
RxComputationThreadPool-1 + next + 3
RxComputationThreadPool-1 + next + 4
RxComputationThreadPool-1 + next + 5
RxComputationThreadPool-1 + next + 6
RxComputationThreadPool-2 + next + 1
RxComputationThreadPool-2 + next + 2
RxComputationThreadPool-2 + next + 3
RxComputationThreadPool-2 + next + 4
RxComputationThreadPool-2 + next + 5
RxComputationThreadPool-2 + next + 6

但是Flowable.create和ReplaySubject的模式不太一样。
对于Flowable来说,新的subscribe来临时,会重新执行create方法里面的FlowableOnSubscribe的apply方法。
对于ReplaySubject是把前面onNext()的数据保存到list中,然后新的subscribe来临时重新遍历list消费。这里需要注意ReplaySubject有内存泄漏的风险。见io.reactivex.rxjava3.subjects.ReplaySubject.buffer。

第二种是HOT模式。

1、使用cold + publish()方法修改cold为hot。

@NonNull
		ConnectableFlowable<@NonNull Object> publish = Flowable.create(e -> {
			e.onNext(1); Thread.sleep(1000);System.out.println(1+ " " + Thread.currentThread().getName());
			e.onNext(2); Thread.sleep(1000);System.out.println(2+ " " + Thread.currentThread().getName());
			e.onNext(3); Thread.sleep(1000);System.out.println(3+ " " + Thread.currentThread().getName());
			e.onNext(4); Thread.sleep(1000);System.out.println(4+ " " + Thread.currentThread().getName());
			e.onNext(5); Thread.sleep(1000);System.out.println(5+ " " + Thread.currentThread().getName());
			e.onNext(6); Thread.sleep(1000);System.out.println(6+ " " + Thread.currentThread().getName());
		}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.computation()).publish();
		System.out.println("connect");
		publish.connect();
		System.out.println("subscribe 1");
		publish.subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
		Thread.sleep(3000 * 1);
		System.out.println("subscribe 2");
		publish.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));

下面是输出,可以看到没有出现重放。

connect
subscribe 1
main + next + 1
1 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 2
2 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 3
3 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 4
subscribe 2
4 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 5
RxComputationThreadPool-2 + next + 5
5 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 6
RxComputationThreadPool-2 + next + 6
6 RxComputationThreadPool-1

2、或者使用Subject对象
这里使用了PublishSubject,他的观察者只处理订阅过后的数据。subject包含了其他类型的对象,可以参考RxJava 的 Subject

		PublishSubject<Integer> publishSubject = PublishSubject.create();
		publishSubject.onNext(1);
		publishSubject.onNext(2);
		publishSubject.onNext(3);
		System.out.println("subscribe 1");
		publishSubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
		publishSubject.onNext(4);
		publishSubject.onNext(5);
		System.out.println("subscribe 2");
		publishSubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
		publishSubject.onNext(6);

执行过后的打印

subscribe 1
subscribe 2
RxComputationThreadPool-1 + next + 4
RxComputationThreadPool-1 + next + 5
RxComputationThreadPool-1 + next + 6
RxComputationThreadPool-2 + next + 6
  移动开发 最新文章
Vue3装载axios和element-ui
android adb cmd
【xcode】Xcode常用快捷键与技巧
Android开发中的线程池使用
Java 和 Android 的 Base64
Android 测试文字编码格式
微信小程序支付
安卓权限记录
知乎之自动养号
【Android Jetpack】DataStore
上一篇文章      下一篇文章      查看所有文章
加:2022-09-13 11:27:25  更:2022-09-13 11:31:11 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年10日历 -2025/10/25 18:47:45-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码