RxJava最核心的两个东西是Observables(被观察者,事件源)和Observers(观察者)。Observables发出一系列事件,Subscribers处理这些事件。 一个Observable可以发出零个或者多个事件。每发出一个事件,就会调用它的Subscriber的onNext方法,最后调用Subscriber.onNext()或者Subscriber.onError()结束。
1.创建Observables
Observable<String> observable = Observable.create(
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter)
throws Exception {
emitter.onNext("test");
}
});
2.创建Observer
Observer<String> subscriber = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(String s) {
Log.e("tag",s);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
};
3.订阅观察者
observable.subscribe(subscriber);
一旦subscriber订阅了observable,observable就会调用subscriber对象的onNext方法,subscriber就会打印出Hello World!
简化写法
Observable.just就是用来创建只发出一个事件就结束的Observable对象,上面创建Observable对象的代码可以简化为一行
Observable<String> hello_world = Observable.just("hello world");
Observer?使用?Consumer代替 ,最后简化为:
Observable.just("hello world").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("tag",s);
}
});
线程调度:
subscribeOn() :subscribeOn ?用于指定?subscribe() ?时所发生的线程
observeOn ?() :??observeOn ?方法用于指定下游?Observer ?回调发生的线程
subscribeOn() ?指定的就是发射事件的线程,observerOn ?指定的就是订阅者接收事件的线程。
Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作;Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作;Schedulers.newThread() 代表一个常规的新线程;AndroidSchedulers.mainThread() 代表Android的主线程
操作符
map ?操作符可以将一个?Observable ?对象通过某种关系转换为另一个Observable ?对象。可以进行网络数据解析
.map(new Function<原类型,?新类型>()?
String转int?
Observable.map(new Function<String,Integer>() {
@Override
public Integer apply(@NonNull String str) throws Exception {
return Integer.valueOf(str);
}
});
通过某种关系?改变Observable ?对象的类型
1)通过 Observable.create() 方法,调用 OkHttp 网络请求; 2)通过 map 操作符,将 Response 转换为 bean 类; 3)通过 doOnNext() 方法,解析 bean 中的数据,并进行数据库存储等操作; 4)调度线程,在子线程中进行耗时操作任务,在主线程中更新 UI ; 5)通过 subscribe(),根据请求成功或者失败来更新 UI
Observable.create(new ObservableOnSubscribe<Response>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
Builder builder = new Builder()
.url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
}
}).map(new Function<Response, MobileAddress>() {
@Override
public MobileAddress apply(@NonNull Response response) throws Exception {
if (response.isSuccessful()) {
ResponseBody body = response.body();
if (body != null) {
Log.e(TAG, "map:转换前:" + response.body());
return new Gson().fromJson(body.string(), MobileAddress.class);
}
}
return null;
}
}).observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<MobileAddress>() {
@Override
public void accept(@NonNull MobileAddress s) throws Exception {
Log.e(TAG, "doOnNext: 保存成功:" + s.toString() + "\n");
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<MobileAddress>() {
@Override
public void accept(@NonNull MobileAddress data) throws Exception {
Log.e(TAG, "成功:" + data.toString() + "\n");
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "失败:" + throwable.getMessage() + "\n");
}
});
concat ?可以做到不交错的发射两个甚至多个 Observable 的发射事件,并且只有前一个?Observable ?终止(onComplete ) 后才会订阅下一个?Observable 。?
Observable.concat(Observable.just(1,2,3),Observable.just(4,5,6)).subscribe(
new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
flatMap 实现多个网络请求依次依赖? 例如注册后自动登录
zip ?操作符可以将多个?Observable ?的数据结合为一个数据源再发射出去
Observable.zip(getName(), getAge(), new BiFunction<String, Integer, String>() {
@NonNull
@Override
public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
return "name:"+s+"age:"+integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("tag",s);
}
});
}
private Observable<Integer> getAge() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(15);
emitter.onNext(16);
emitter.onNext(17);
}
});
}
private Observable<String> getName() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("wang");
emitter.onNext("long");
emitter.onNext("jiang");
}
});
distinct()? 去重
Observable.just(1,2,2,1,3,3).distinct()
filter 过滤掉不符合条件的值
Observable.just(1,2,2,1,3,3).distinct().filter(new Predicate<Integer>() {
@Override
public boolean test(@NonNull Integer integer) throws Exception {
return integer >=2 ; //只传递 大于等于2的数值
}
})
doOnNext() 在onNext执行前 ,做一些操作。
skip 跳过几个输入后在接收
take 最多接收几个输入
timer 和 interval 都是做延时处理的
第一次 隔2秒,接下来每隔3秒 接收一个输入
Observable.interval(2,3,TimeUnit.SECONDS);
原文链接:这可能是最好的RxJava 2.x 教程(完结版) - 简书 (jianshu.com)
|