在MainActivity6
2.讲解
就是把多个上流拼接后,再打印
1.startWith
将两个被观察者拼接起来,拼接的后面运行。下面运行结果是1000,2000,3000,1,2,3。 被拼接的前面运行
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
})
.startWith(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
emitter.onNext(1000);
emitter.onNext(2000);
emitter.onNext(3000);
emitter.onComplete();
}
}))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d(TAG, "accept: "+integer);
}
});
2.concatWith
拼接的后面运行,这个跟startWith反过来。下面运行结果是1,2,3,1000,2000,3000,
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
})
.concatWith(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
emitter.onNext(1000);
emitter.onNext(2000);
emitter.onNext(3000);
emitter.onComplete();
}
}))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d(TAG, "accept: "+integer);
}
});
3.concat
把几个拼接起来,然后依次执行。下面运行结果是1,2,3,4
Observable.concat(
Observable.just(1),
Observable.just(2),
Observable.just(3),
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext("4");
emitter.onComplete();
}
})
)
.subscribe(new Consumer<Serializable>() {
@Override
public void accept(Serializable serializable) throws Throwable {
Log.d(TAG, "accept: "+serializable);
}
});
4.merge
拼接被观察者(上游),然后并列运行。并列运行的第一个值随机,可以是第一个被观察者的值,也可以是2,或者3被观察者。 intervalRange的第一个参数是打印的值,第二个是执行几次,第三个是第一个执行的延迟时间,第四个是间隔时间,第五个是第四个的间隔时间的单位
Observable<Long> longObservable1 = Observable.intervalRange(1, 5, 1, 2, TimeUnit.SECONDS);
Observable<Long> longObservable2 = Observable.intervalRange(6, 5, 1, 2, TimeUnit.SECONDS);
Observable<Long> longObservable3 = Observable.intervalRange(11, 5, 1, 2, TimeUnit.SECONDS);
Observable.merge(longObservable1,longObservable2,longObservable3)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Throwable {
Log.d(TAG, "accept: "+aLong);
}
});
运行结果:
5.zip
对被观察者进行各个值得拼接,如“英语”与77拼接,再加“”等值,就变成“课程英语77”。这里zip的1,2个参数为被观察者Observable。
Observable<String> stringObservable1 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext("英语");
emitter.onNext("数学");
emitter.onNext("语文");
emitter.onNext("物理");
emitter.onComplete();
}
});
Observable<Integer> stringObservable2 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
emitter.onNext(77);
emitter.onNext(88);
emitter.onNext(99);
emitter.onComplete();
}
});
Observable.zip(stringObservable1, stringObservable2, new BiFunction<String, Integer, StringBuffer>() {
@Override
public StringBuffer apply(String s, Integer integer) throws Throwable {
return new StringBuffer().append("课程"+s).append("==").append(integer);
}
}).subscribe(new Observer<StringBuffer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe: 准备进入考试,考试了……");
}
@Override
public void onNext(@NonNull StringBuffer stringBuffer) {
Log.d(TAG, "onNext: 考试结果输出:"+stringBuffer.toString());
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: 考试结束");
}
});
运行结果:
|