合并操作符(Combining Operators)是RxJava里面的一类操作符。主要有merge,concat,zip,amb,combineLast,group.这篇文章介绍merge和相关的方法。
合并操作符是用于处理多操作符的。这些操作符要处理的对象至少是两个。merge操作符可以合并两个Observable,变成一个Observable.过程如下图所示。 从上图可以看出,merge是不保证顺序的,两个Obervable元素是会交叉发射的。在官方文档里面这幅图并不是一个静态图,两个为1的元素是可以用鼠标拖动的。下面的两个1也会跟着移动。可以拖动成下面的样子: merge还有一个重要的特点是多个Observable之中的一个发生错误的时候,会导致整个Observable调用onError而导致程序结束,没有发生错误的Observable后续的内容将丢失。如下图所示。 总结就是两个特点: 1.不保证合并后Observable的执行顺序。 2.发生错误后,未执行完的后续内容将被丢弃。 特别是第二个特点,感觉像是缺点。因为如果要丢弃数据的话,我们肯定是不能够在项目中使用的,除非一些需要这个特性的功能,不过一般也不太可能。
下面通过代码演示这些特性。 首先是一个错误的例子。我们可能很自然的写下下面的代码:
Observable<Integer> observable1 = Observable.just(1, 2, 3);
Observable<Integer> observable2 = Observable.just(4, 5, 6);
Observable
.merge(observable1, observable2)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("CreateActivity", "onSubscribe");
}
@Override
public void onNext(Integer i) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.d("ConbineActivity", i + " " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
Log.d("ConbineActivity", "onError");
}
@Override
public void onComplete() {
Log.d("ConbineActivity", "onComplete");
}
});
输出如下。不管重复多少次都是下面的结果,这里可能就疑惑了,不是说好不保证顺序的吗?怎么又能保证顺序了呢?这是个错误例子,保证顺序的前提是在多个线程中,比如官方给出的一个例子。
onSubscribe
1 main
2 main
3 main
4 main
5 main
6 main
onComplete
下面的代码是官方给出的例子。和上面区别就是添加了subscribeOn(Schedulers.io())这段代码。使用了create来创建数据源,因为我们需要输出数据源的线程名字。这就变成了两个线程了,这个时候不保证执行顺序的特性就体现出来了。 不过多线程不是本来就不保证执行顺序的吗?是的,和多线程是没什么区别的,但这里最主要的点是合并多个Observable,顺不顺序是次要的。
Observable<Integer> source1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d("ConbineActivity","Source1: emitting from " +Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable<Integer> source2 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d("ConbineActivity","Source2: emitting from " +Thread.currentThread().getName());
emitter.onNext(4);
emitter.onNext(5);
emitter.onNext(6);
emitter.onComplete();
}
});
Observable.merge(source1, source2)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("CreateActivity", "onSubscribe");
}
@Override
public void onNext(Integer i) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.d("ConbineActivity", i + " " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
Log.d("ConbineActivity", "onError");
}
@Override
public void onComplete() {
Log.d("ConbineActivity", "onComplete");
}
});
输出结果如下。source1和source2是数据源是在两个不同的线程发布的,source1在新创建的io线程里面创建,source2在main线程。而观察者接受到数据全都是在main线程。我们可以发现发射源source1和source2的执行顺序可能不一样,观察者接受到的顺序也可能不一样。这样,merge不保证顺序的特性就已经体现出来了。
onSubscribe
Source2: emitting from main
Source1: emitting from RxCachedThreadScheduler-1
4 main
1 main
2 main
3 main
5 main
6 main
onComplete
接下来要特别第二个比较麻烦的问题。出现错误之后数据丢失的问题。 看下面的例子:
Observable<Integer> source1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d("ConbineActivity", "Source1: emitting from " + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Exception("No Network"));
emitter.onNext(3);
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable<Integer> source2 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d("ConbineActivity", "Source2: emitting from " + Thread.currentThread().getName());
emitter.onNext(4);
emitter.onNext(5);
emitter.onNext(6);
emitter.onComplete();
}
});
Observable.merge(source1, source2)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("ConbineActivity", "onSubscribe");
}
@Override
public void onNext(Integer i) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.d("ConbineActivity", i + " " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
Log.d("ConbineActivity", "onError");
}
@Override
public void onComplete() {
Log.d("ConbineActivity", "onComplete");
}
});
这个例子和上一个例子是一样的,只是在第一个Observable的数据源里面调用了onError方法,这时候可能出现下面两种结果。 结果一: 这里出现很困惑的事情,为什么输出结果都是在RxCachedThreadScheduler-1这个线程里面,不是应该都是在主线程里面吗?这个情况如果在onNext方法里面添加了sleep方法会出现的非常频繁。不叫也是有概率出现的。这个情况非常严重,我甚至对rxjava的正确性和稳定性产生了非常大的怀疑。
onSubscribe
Source1: emitting from RxCachedThreadScheduler-1
Source2: emitting from main
1 RxCachedThreadScheduler-1
4 RxCachedThreadScheduler-1
5 RxCachedThreadScheduler-1
6 RxCachedThreadScheduler-1
2 RxCachedThreadScheduler-1
3 RxCachedThreadScheduler-1
W/System.err: java.lang.Exception: Low Power
D/ConbineActivity: onError
结果二: 输出4之后直接就发生错误了,我们的onError方法是在onNext(2)之后调用的,但这里onNext(1)和onNext(2)都没有输出,这也是非常有问题的。不知道是什么原因。
onSubscribe
Source2: emitting from main
Source1: emitting from RxCachedThreadScheduler-1
4 main
No Network
D/ConbineActivity: onError
没有一种情况是正常的,这让我对rxjava产生了很大的怀疑。我甚至怀疑rxjava2还只是个实验的产品,根本没有投入实际项目的基础,因为正确性和稳定性都是非常值得怀疑的。
我特地去github提了问题,项目人员回答我了。
A1) By default, errors can cut ahead of items. merge reuses the emitting thread and if that thread is blocked down the line, other sources may run to termination. When merge can run again, it will first check for errors and ignore any value from any source.
A2) Depends on which source wins as you are racing the main thread with the background thread.
大致意思是说,merge可以提前舍弃元素,就是一个元素虽然发出了,但还没来的及被接受就发生错误而被舍弃了。 而且观察者线程是谁先争取到就是谁。 根据他的回答,也就是他不认为这是错误。既然是“正确的”行为。那我们就不应该在实际项目中用merge。因为行为是不可靠的。所以我还是不是特别理解为什么要设计成这种特性,可能有特殊的情况使用到。
但这根本没法在项目中用啊。实际上,RxJava是提供在所有操作执行完后再抛异常的操作的。这就是mergeDelayError()方法。和merge使用方法是一样的,只是带来延迟错误的功能。这是我们想要的功能。 从下面这张图可以看出,紫色的球在异常发生后还是执行了,并没有被丢弃。这就是mergeDelayError()方法的作用。
使用起来和merge一样,如下面的代码。
Observable<Integer> source1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d("ConbineActivity", "Source1: emitting from " + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Exception("No Network"));
emitter.onNext(3);
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable<Integer> source2 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d("ConbineActivity", "Source2: emitting from " + Thread.currentThread().getName());
emitter.onNext(4);
emitter.onNext(5);
emitter.onNext(6);
emitter.onComplete();
}
});
Observable.mergeDelayError(source1, source2)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("ConbineActivity", "onSubscribe");
}
@Override
public void onNext(Integer i) {
Log.d("ConbineActivity", i + " " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
Log.d("ConbineActivity", "onError");
}
@Override
public void onComplete() {
Log.d("ConbineActivity", "onComplete");
}
});
非常明显,在onNext(3)的时候发生异常,异常在onNext(6)执行完才抛出,符合我们的预期。
onSubscribe
Source1: emitting from RxCachedThreadScheduler-1
Source2: emitting from main
1 RxCachedThreadScheduler-1
2 RxCachedThreadScheduler-1
4 main
5 main
6 main
W/System.err: java.lang.Exception: No Network
onError
最后简单说一下mergeWith(). 这个方法实际就是个特例,就是两个Observable的时候可以使用。
Observable<Integer> source1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d("ConbineActivity", "Source1: emitting from " + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Exception("No Network"));
emitter.onNext(3);
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable<Integer> source2 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d("ConbineActivity", "Source2: emitting from " + Thread.currentThread().getName());
emitter.onNext(4);
emitter.onNext(5);
emitter.onNext(6);
emitter.onComplete();
}
});
source1.mergeWith(observable2)
.subcribeOn(Schedules.io())
.
我觉得这个方法没什么用,只是提供多一种写法而已。mergeDelayError是没有这种with方法的,所以也不通用,建议不要用。在项目中只用mergeDelayError就行。
|