RxJava目前有两种发布和订阅模式
第一种 cold模式,这种模式观察者订阅被观察模式时,被观察者的动作会重放,举例说明:
@NonNull
Flowable<@NonNull Object> observeOn =Flowable.create(e -> {
e.onNext(1); Thread.sleep(1000);
e.onNext(2); Thread.sleep(1000);
e.onNext(3); Thread.sleep(1000);
e.onNext(4); Thread.sleep(1000);
e.onNext(5); Thread.sleep(1000);
e.onNext(6); Thread.sleep(1000);
}, BackpressureStrategy.BUFFER).observeOn(Schedulers.computation());
Thread.sleep(1000 * 1);
observeOn.subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
Thread.sleep(1000 * 1);
observeOn.subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
这种会大音两次1,2,3,4,5,6,即使时不同的线程,第二次会等第一次完成过后开始(因为未设置subscribeOn,所以是单线程的,第二次会在第一次完成后再开始)。而且是当观察者订阅被观察者的时候触发被观察者的动作。
RxComputationThreadPool-1 + next + 1
RxComputationThreadPool-1 + next + 2
RxComputationThreadPool-1 + next + 3
RxComputationThreadPool-1 + next + 4
RxComputationThreadPool-1 + next + 5
RxComputationThreadPool-1 + next + 6
RxComputationThreadPool-2 + next + 1
RxComputationThreadPool-2 + next + 2
RxComputationThreadPool-2 + next + 3
RxComputationThreadPool-2 + next + 4
RxComputationThreadPool-2 + next + 5
RxComputationThreadPool-2 + next + 6
我们create中lambda的方法会再次运行
相似的ReplaySubject
还有一种当订阅后会重新处理已发送的数据ReplaySubject
ReplaySubject<Integer> replaySubject = ReplaySubject.create(1);
replaySubject.onNext(1);
replaySubject.onNext(2);
replaySubject.onNext(3);
System.out.println("subscribe 1");
replaySubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
replaySubject.onNext(4);
replaySubject.onNext(5);
System.out.println("subscribe 2");
replaySubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
replaySubject.onNext(6);
控制台输出为
subscribe 1
subscribe 2
RxComputationThreadPool-1 + next + 1
RxComputationThreadPool-1 + next + 2
RxComputationThreadPool-1 + next + 3
RxComputationThreadPool-1 + next + 4
RxComputationThreadPool-1 + next + 5
RxComputationThreadPool-1 + next + 6
RxComputationThreadPool-2 + next + 1
RxComputationThreadPool-2 + next + 2
RxComputationThreadPool-2 + next + 3
RxComputationThreadPool-2 + next + 4
RxComputationThreadPool-2 + next + 5
RxComputationThreadPool-2 + next + 6
但是Flowable.create和ReplaySubject的模式不太一样。 对于Flowable来说,新的subscribe来临时,会重新执行create方法里面的FlowableOnSubscribe的apply方法。 对于ReplaySubject是把前面onNext()的数据保存到list中,然后新的subscribe来临时重新遍历list消费。这里需要注意ReplaySubject有内存泄漏的风险。见io.reactivex.rxjava3.subjects.ReplaySubject.buffer。
第二种是HOT模式。
1、使用cold + publish()方法修改cold为hot。
@NonNull
ConnectableFlowable<@NonNull Object> publish = Flowable.create(e -> {
e.onNext(1); Thread.sleep(1000);System.out.println(1+ " " + Thread.currentThread().getName());
e.onNext(2); Thread.sleep(1000);System.out.println(2+ " " + Thread.currentThread().getName());
e.onNext(3); Thread.sleep(1000);System.out.println(3+ " " + Thread.currentThread().getName());
e.onNext(4); Thread.sleep(1000);System.out.println(4+ " " + Thread.currentThread().getName());
e.onNext(5); Thread.sleep(1000);System.out.println(5+ " " + Thread.currentThread().getName());
e.onNext(6); Thread.sleep(1000);System.out.println(6+ " " + Thread.currentThread().getName());
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.computation()).publish();
System.out.println("connect");
publish.connect();
System.out.println("subscribe 1");
publish.subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
Thread.sleep(3000 * 1);
System.out.println("subscribe 2");
publish.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
下面是输出,可以看到没有出现重放。
connect
subscribe 1
main + next + 1
1 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 2
2 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 3
3 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 4
subscribe 2
4 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 5
RxComputationThreadPool-2 + next + 5
5 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 6
RxComputationThreadPool-2 + next + 6
6 RxComputationThreadPool-1
2、或者使用Subject对象 这里使用了PublishSubject,他的观察者只处理订阅过后的数据。subject包含了其他类型的对象,可以参考RxJava 的 Subject
PublishSubject<Integer> publishSubject = PublishSubject.create();
publishSubject.onNext(1);
publishSubject.onNext(2);
publishSubject.onNext(3);
System.out.println("subscribe 1");
publishSubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
publishSubject.onNext(4);
publishSubject.onNext(5);
System.out.println("subscribe 2");
publishSubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
publishSubject.onNext(6);
执行过后的打印
subscribe 1
subscribe 2
RxComputationThreadPool-1 + next + 4
RxComputationThreadPool-1 + next + 5
RxComputationThreadPool-1 + next + 6
RxComputationThreadPool-2 + next + 6
|