1.map
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
for (int i = 0; i < 10; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "这是新转换后的数据" + integer;
}
}).subscribe(new io.reactivex.Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.i("aaaa= " ,s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.i("aaaa= " ,"onComplete");
}
});
? ? map操作符,可以说是的被观察者转换器。 通过指定一个Funcation对象,将被观察者(Observable)转换成新的被观察者(Observable)对象并发射,观察者会收到新的被观察者并处理
? ? 本来发射的数据是 数字1,然后观察者接收到的是 “ 这是新的观察数据===: 1” ? ? 流程: ?被观察者.create(事件发射器).map(转换器).subscribe(观察者)
?2.flatMap()操作符(concatMap)
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
for (int i = 0; i < 10; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<>();
for (int i = 0; i < 5; i++) {
list.add(i+"");
}
return Observable.fromIterable(list);
}
}).subscribe(new io.reactivex.Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.i("aaaa= " ,s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.i("aaaa= " ,"onComplete");
}
});
输出:
?最终输出的是flatMap方法里面操作过后数据。
与上面的flatMap作用基本一样,与flatMap唯一不同的是concat能保证Observer接收到Observable集合发送事件的顺序
3.buffer() 操作符
Observable.just(1, 2, 3, 4, 5, 6)
.buffer(3)
.subscribe(new io.reactivex.Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<Integer> integers) {
for (Integer integer : integers) {
Log.i("aaaa= " ,integer + "");
}
Log.i("aaaa= " ,"==============");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.i("aaaa= ", "onComplete");
}
});
buffer(3),从第一个开始分割
可以简单地理解为把一组数据分成若干小组发射出去,而不是单个单个地发射出去
输出结果:
?
?
|