最近准备梳理一下Kotlin ,先复习一遍RxJava 思想,做个学习笔记+伪代码,整个脉络分为三个部分。
(一)使用场景
RxJava 是重量级、最复杂的框架(没有之一),JakeWharton 的巅峰之作,操作符非常丰富、特别庞大,学关键的内容,学思维方式,看PPT资料,学两遍。
为什么要学习RxJava?
改变思维(Rx思维)来提升效率,响应式编程/异步事件流编程
Rx思维:起点(分发事件)—>…—>终点(消费事件),中间不会断掉且可以做拦截,链条式思维
学习资料
【以下五部分难度逐步提升!!!】
一、核心思想(基础?)
传统思维:不同项目(程序员)实现有不同的思路,封装、Thread、线程池……
dialog—>Thread /AsyncTask /…—>Handler —>UI
Rx思维/卡片式编程/观察者设计模式: 起点/被观察者/Observable—>订阅—>终点/观察者/Observer
封装线程调度,方便多处使用线程切换:
private final static <UD> ObservableTransformer<UD, UD> rxud {
return new ObservableTransformer<UD, UD>(){
@override
public ObservableSource<UD> apply(Observable<UD> upstream){
return upstream.subscribeOn(Schedulers.io())
.observerOn(AndroidSchedulers.mainThread())
.map(new Function<UD, UD>(){
Log.d(TAG, "balabala...");
return null;
});
}
}
}
事件触发:
public void reJavaDownloadImageAction(View view){
Observable.just(PATH)
.map(new Function<String, Bitmap>(){
...
return bitmap;
})
.map(new Function<Bitmap, Bitmap>(){
...
return newBitmap;
})
.map(new Function<Bitmap, Bitmap>(){
Log.d(TAG, "balabala...");
return newBitmap;
})
.compose(rxud())
.subscribe(
new Observer<Bitmap>() {
onSubScribe(Disposable d)
onNext(Bitmap bitmap){
image.setImageBitmap(bitmap);
}
onError(Throwable e)
onComplete()
}
);
}
二、RxJava 配合Retrofit (常用??)
常用的网络模型框架开发组合套装: Retrofit (通过OkHttp 请求网络)—>RxJava (仅处理返回数据)—>UI 注:Retrofit 不是网络框架,是个强大的封装框架,负责管理
网络请求接口
interface WanAndroidApi{
@GET("project/tree/json")
Observable<ProjectBean> getProject();
@GET("project/list/{pageIndex}/json")
Observable<ProjectItem> getProject(@Path("pageIndex")int pageIndex, @Query("cid")int cid);
}
具体工具类封装:
HttpUtil {
public static String BASE_URL="https://www.wanandroid.com/";
public static void setBaseUrl(String baseUrl) {
BASE_URL = baseUrl;
}
public static Retrofit getOnlineCookieRetrofit() {
OkHttpClient.Builder httpBuilder = new OkHttpClient.Builder();
HttpLoggingInterceptor logInterceptor = new HttpLoggingInterceptor(new HttpLogger());
logInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
OkHttpClient okHttpClient = httpBuilder
.addInterceptor(logInterceptor)
.addNetworkInterceptor(new StethoInterceptor())
.readTimeout(1000, TimeUnit.SECONDS)
.connectTimeout(1000, TimeUnit.SECONDS)
.writeTimeout(1000, TimeUnit.SECONDS)
.build();
return new Retrofit.Builder().baseUrl(BASE_URL)
.client(okHttpClient)
.addConverterFactory(GsonConverterFactory.create(new Gson()))
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
}
}
Activity 中使用:
private WanAndroidApi api;
onCreate() {
api = HttpUtil.getOnlineCookieRetrofit().create(WanAndroidApi.class);
}
onClick(View view){
api.getProject()
.subscribeOn(Schedulers.io)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data-> {
Log.d(TAG, "getProjectAction:" + data);
});
}
onItemClick(View view){
}
三、View 防抖(大公司/RxBinding ???)
定义:瞬间连续点击/自动化脚本……
onClick(View view){
antiShakeAction();
}
@SuppressLint("CheckResult")
antiShakeAction() {
Button bt_anti_shake = findViewById();
RxView.click(bt_anti_shake)
.throttleFirst(2000, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Object>(){
@override
public void accept(Object o) throws Exception {
api.getProject()
.compose(DownloadActivity.rxud())
.subscribe(new Consumer<ProjectBean>(){
@override
public void accept(ProjectBean projectBean) throws Exception{
for(ProjectBean.DataBean dataBean : projectBean.getData()){
api.getProjectItem(1, dataBean.getId())
.compose(DownloadActivity.rxud())
.subscribe(new Consumer<ProjectItem>(){
@override
public void accept(ProjectItem projectItem) throws Exception{
Log.d(TAG, "accept:"+projectItem);
};
});
}
};
});
};
});
}
四、网络嵌套(实用????)
下一步操作依赖于上一步操作得结果,先查询主数据,再查询具体的数据,如上
使用flatMap 操作符解决,自己会分发N多数据
onClick(View view){
antiShakeActionUpdate();
}
@SuppressLint("CheckResult")
antiShakeActionUpdate() {
Button bt_anti_shake = findViewById();
RxView.click(bt_anti_shake)
.throttleFirst(2000, TimeUnit.MILLISECONDS)
.observeOn(Schedules.io())
.flatMap(new Function<Object, ObservableSource<ProjectBean>>(){
@override
public ObservableSource<ProjectBean> apply(Object o)throws Exception {
return api.getProject();
}
})
.flatMap(new Function<ProjectBean, ObservableSource<ProjectBean.DataBean>>(){
@override
public ObservableSource<ProjectBean> apply(ProjectBean projectBean)throws Exception {
return Observable.formIterable(projectBean.getData());
}
})
.flatMap(new Function<ProjectBean.DataBean, ObservableSource<ProjectItem>>(){
@override
public ObservableSource<ProjectItem> apply(ProjectItem projectItem)throws Exception {
return api.getProjectItem(1, projectItem.getId());
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<ProjectItem projectItem>(){
@override
public void accept(ProjectItem projectItem) throws Exception {
}
});
}
五、doOnNext 运用(难度最高?????)
使用场景:主线程和异步线程之间频繁的线程切换(银行的业务……)
Retrofit + RxJava 实现模拟案例:
- show progressDialog
- 请求服务器注册操作
- 注册完成之后更新注册UI
- 马上登录服务器操作
- 登录完成之后更新登录UI
LoginRequest {}
RegisterRequest {}
LoginResponse {}
RegisterResponse {}
interface IRequestNetwork {
public Observable<RegisterResponse> registerAction(@Body RegisterRequest registerRequest);
public Observable<LoginResponse> loginAction(@Body LoginRequest loginRequest);
}
request(View view) {
MyRetrofit.createRetrofit().create(IRequestNetwork.class)
.registerAction(new RegisterRequest())
.compose(DownloadActivity.rxud())
.subscribe(new Consumer<RegisterResponse>() {
accept(RegisterResponse registerResponse) {
}
});
MyRetrofit.createRetrofit().create(IRequestNetwork.class)
.loginAction(new LoginRequest())
.compose(DownloadActivity.rxud())
.subscribe(new Consumer<LoginResponse>() {
accept(LoginResponse loginResponse) {
}
});
}
Disposable disposable;
requestBetter(View view) {
disposable = MyRetrofit.createRetrofit().create(IRequestNetwork.class)
.registerAction(new RegisterRequest())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<RegisterResponse>(){
accept(RegisterResponse registerResponse) {
}
})
.observeOn(Schedulers.io())
.flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>(){
@override
public ObservableSource<LoginResponse> apply(LoginResponse loginResponse)throws Exception {
Obsevable<LoginResponse> loginResponseObsevable = MyRetrofit.createRetrofit().create(IRequestNetwork.class)
.loginAction(new LoginRequest());
return loginResponseObsevable;
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<LoginResponse>() {
onSubscribe(Disposable d) {
progressDialog.show();
disposable = d;
}
onNext()
onError()
onComplete()
});
}
onDestory() {
if(disposable!=null && !disposable.isDisposed()){
disposable.dispose();
}
}
思考:说说自己对RxJava 核心思想的理解?
有一个起点和一个终点,起点开始流向我们的事件,把事件流向终点,在流向过程中可以添加拦截,拦截时可以对事件进行改变操作,终点只关心它上一个拦截,根据上一个拦截的变化而变化。
(二)模式与原理
源码、流程分析,观察者设计模式,map 变换操作符原理
被观察者抽象层-?—?┐ 观察者抽象层 Observable ├—引用—→Observer ↑ | ↑ 实现 | 实现 | | | 被观察者实现层【容器】 | 观察者实现层 ObservableImpl?-?–┘ ObserverImpl
RxJava 的Hook点,对整个项目全局(静态)的RxJava 的监听、拦截
RxJavaPlugins.onAssembly(...)
RxJava 1.x 预留onAssembly() 方法无操作 ↓ 优化 RxJava 2.x 调用setOnObservableAssembly() 方法赋值,优先执行 ↓ RxJava 3.x ↓ …
Hook机制:钩子程序,逆向
结论:很多操作符都会经过全局onAssembly 监听
RxJava 的观察者模式和标准的观察者设计模式不同
- 创建Observable
- 创建Observer
- 使用
subscribe() 订阅 - 使用
map() (发送单个)/flatMap() (发送多次)
洋葱模型:流程是U型结构封包裹、拆包裹
Observable.create(new ObservableOnSubscribe<String>(){
subscribe(ObservableEmitter<String> emitter) {
emitter.onNext("A");
}
})
.map(new Function<String, Bitmap>())
.subscribe(new Observer<String>(){
onSubscribe()
onNext(String s)
onError()
onComplete()
});
标准的观察者模式中,有一个被观察者+N多观察者,被观察者发生改变,所有的观察者随之得到改变事件。
RxJava 中多个被观察者observable/map() +一个观察者Observer,订阅后马上触发。严格来讲RxJava 叫发布订阅模式,多出一个抽象层做转换,实质上不是观察者模式,达到的效果一样。
RxJava 中的装饰模型,U型结构概括不够完整:
Observable.create(new ObservableOnSubscribe<String>(){
subscribe(ObservableEmitter<String> emitter) {
emitter.onNext("A");
}
})
.map(new Function<String, Bitmap>())
.map(new Function<Integer, Boolean>())
.subscribe(new Observer<String>(){
onSubscribe()
onNext(String s)
onError()
onComplete()
});
由内而外:
- ObservableCreate
- ObservableMap
- ObservableMap
- …
- Subscribe
上下来回的流程:
- 装饰模型↓(订阅/触发导火线—>必须调用,否则后续无法执行)
- 封包裹↑
- 拆包裹/执行流程
onNext() ↓
背压使用/策略/原理:
生产的速度>消费的速度,导致的内存泄漏 使用Flowable 解决背压。
Single 是Observable 简化版,有局限性
思考:用自己的理解画出map变换操作符详细思路、流程图。
(三)原理与自定义操作符
线程切换(线程调度)原理、自定义RxView 操作符
异步事件流编程:
执行过程中可以随意的切换线程
RxAndroid 在客户端开发中必引入,配合使用
- RxJava 80%
- RxAndroid 20%
AndroidSchedulers.mainThread()
create() 最原始的方式,执行过程是可控的,just() 内部封装,不可控。
.subscribeOn(
Schedulers.io()
)
.subscribe()
onCreate() {
RxJavaPlugins.setIoSchedulerHandler(new Function<Scheduler, Scheduler>(){
apply(Scheduler scheduler) {
Log.d(TAG, "全局监听scheduler:"+scheduler);
return scheduler;
}
});
RxJavaPlugins.setInitIoSchedulerHandler(new Function<Callable<Scheduler>, Scheduler>(){
apply(Callable<Scheduler> schedulerCallable) {
Log.d(TAG, "全局监听schedulerCallable"+schedulerCallable);
return schedulerCallable.call();
}
});
}
RxJavaPlugins.java
static {
...
IO = RxJavaPlugins.initIoScheduler(new IOTask());
NEW_THREAD = RxJavaPlugins.initIoScheduler(new NewThreadTask());
...
}
DEFAULT = new IoScheduler();
DEFAULT = new NewThreadScheduler();
结论: 经过了层层包装,最终交给线程池管控。
除了onSubscribe() 都是异步线程。
测试终点切回主线程
new Thread() {
run();
test();
}.start();
test() {
...
.observerOn(
AndroidSchedulers.mainThread()
)
}
通过Handler 切换回主线程,Looper.getMainLooper() 能保证100%在主线程
DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
如果当前是主线程就不做切换,节省开销。
自定义操作符RxView :防抖控件
操作符就是函数。
RxView {
TAG = RxView.class.getSimpleName();
Observable<Object> clicks(View view) {
return ViewClickObservable(view);
}
}
ViewClickObservable extends Observable<Object> {
private final View view;
private static final Object EVENT = new Object();
private static Object EVENT2;
public ViewClickObservable(View view) {
this.view = view;
EVENT2 = view;
}
subscribeActual(Observer<? super Object> observer) {
MyLisener myListener = new MyListener(view, observer);
observer.onSubscribe(myListener);
this.view.setOnClickListener(myListener);
}
static final class MyListener implements View.OnClickListener, Disposable {
private final View view;
private final Observer<Object> observer;
private final AtomicBoolean isDisposable = new AtomicBoolean();
public MyListener(View view, Observer<? super Object> observer) {
this.view = view;
this.observer = observer;
}
onClick(View v){
if(isDisposed() == false) {
observer.onNext(EVENT)
}
}
dispose(){
if(isDisposable.compareAndSet(false, true)){
if(Looper.myLooper() == Looper.getMainLooper()){
view.setOnClickListener(null);
} else {
AndroidSchedulers.mainThread().scheduleDirect(new Runnable() {
run() {
view.setOnClickListener(null);
}
});
}
}
}
isDisposed(){
return isDisposable.get();
}
}
}
在 Activity 中使用:
onCreate() {
Button button = findViewById();
RxView.clicks(button)
.throttleFirst(2000, TimeUnit.SECONDS)
.subscribe(new Consumer<Object>(){
accept(Object o) {
Observable.create(new ObservableOnSubscribe<String>(){
subscribe(ObservableEmiiter<String> e){
e.onNext("aaaa");
}
})
.subscribe(new Consumer<Strign>(){
accept(String s){
Log.d(TAG, s);
}
});
}
});
}
思考:自己对RxJava 线程切换的理解?
|