concat(串行连接数据)
concat,翻译为连接。这个方法的作用是将多个数据源发射的数据,按照顺序进行连接。数据的顺序,只与发射源的顺序相关,与数据源的发射时间无关。就是说,即使第二个发射源先完成了所有数据的发送,发射的数据也要排到第一个发射源的数据之后。
测试简单连接:
@Test
public void testConcat(){
List<Observable<Integer>> list = new ArrayList<>();
list.add(Observable.just(1));
list.add(Observable.just(2));
list.add(Observable.just(3));
list.add(Observable.just(4));
Observable.concat(list).subscribe(i->{
System.out.println(i);
});
}
输出:
1
2
3
4
测试给第一个数据源添加延时:
@Test
public void testConcatDelay() throws InterruptedException {
List<Observable<Integer>> list = new ArrayList<>();
list.add(Observable.just(1).delay(5, TimeUnit.SECONDS));
list.add(Observable.just(2));
list.add(Observable.just(3));
list.add(Observable.just(4));
Observable.concat(list).subscribe(i->{
System.out.println(i);
});
Thread.sleep(10*1000);
}
输出:
1
2
3
4
可见,即使发射 1 的时间延迟了5s,1 仍然排在数据的首位。
concatDelayError
这里引出另外一个方法:concatDelayError 。
如果在concat过程中发生了错误,RxJava会如何处理:
测试:
@Test
public void testConcatError(){
List<Observable<Integer>> list = new ArrayList<>();
list.add(Observable.just(1).doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
throw new NullPointerException();
}
}));
list.add(Observable.just(2));
list.add(Observable.just(3));
list.add(Observable.just(4));
Observable.concat(list).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("ConcatTest.onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("ConcatTest.onError");
}
@Override
public void onNext(Integer integer) {
System.out.println("integer = " + integer);
}
});
}
输出:
ConcatTest.onError
只要有一个发射源产生了错误,concat就会停止执行。
concatDelayError方法的出现正是为了应对这种场景,当数据源发生错误时,concatDelayError允许继续连接其他数据源,并且在最后,进入onError。
测试:
@Test
public void testConcatDelayError(){
List<Observable<Integer>> list = new ArrayList<>();
list.add(Observable.just(1).doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
throw new NullPointerException();
}
}));
list.add(Observable.just(2));
list.add(Observable.just(3));
list.add(Observable.just(4));
Observable.concatDelayError(list).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("ConcatTest.onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("ConcatTest.onError");
}
@Override
public void onNext(Integer integer) {
System.out.println("integer = " + integer);
}
});
}
输出:
integer = 2
integer = 3
integer = 4
ConcatTest.onError
concatEager(并行连接数据)
我们继续关注concat方法,数据源延时发射的问题,我们给两个数据源添加延时
@Test
public void testConcatDelayTimeOut() throws InterruptedException {
List<Observable<Integer>> list = new ArrayList<>();
list.add(Observable.just(1).delay(2, TimeUnit.SECONDS));
list.add(Observable.just(2).delay(2, TimeUnit.SECONDS));
Observable.concat(list).subscribe(i->{
System.out.println(i);
});
Thread.sleep(3*1000);
}
输出:
1
这里2没有被发射出去,因为concat默认是串行的,只有前一个Observable执行完成之后,才开始执行下一个。test方法执行到第3s时,方法已经执行完毕,而第二个Observable的延时还在执行,导致2没有被发射。
concatEager提供了并行的解决方案,一旦开始订阅,所有的Observable都开始执行。
测试:
@Test
public void testConcatEager() throws InterruptedException {
List<Observable<Integer>> list = new ArrayList<>();
list.add(Observable.just(1).delay(2, TimeUnit.SECONDS));
list.add(Observable.just(2).delay(2, TimeUnit.SECONDS));
Observable.concatEager(list).subscribe(i->{
System.out.println(i);
});
Thread.sleep(3*1000);
}
输出
1
2
|