IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 移动开发 -> RxJava和RxAndroid学习记录 -> 正文阅读

[移动开发]RxJava和RxAndroid学习记录

目录

1?概念和说明

1.1 响应式编程

1.2 RxJava

1.3 关于RxJava和RxAndroid

1.4 关于响应式编程和普通编程

2. 基本使用

2.1 基本元素关系图

2.2 代码示例:

2.3 关于subscribe()

2.4 线程调度

????????2.4.1 线程调度

????????2.4.2 RxJava内置的常用的线程项:

下一篇说操作符。


附:RxJava源码地址:?https://github.com/ReactiveX/RxJava

1?概念和说明

1.1 响应式编程

? ? ? ? 即rective programming, 是一种通过异步数据流来构建事物关系的编程模型,用事件来驱动事务

1.2 RxJava

? ? ? ? 一个通过观察者模式实现响应式编程

优点:a. 代码体现为为链式调用,而非嵌套式。

? ? ? ? ? ?b. 方便的实现了线程调度数据转换(即中间事务的转换)

? ? ? ? ? ?c. android中常用来和retrofit配合

1.3 关于RxJava和RxAndroid

????????两者差不多,RxAndroid是RxJava针对Android平台做了一些调整。可以理解为RxAndroid是专供 android的RxJava。

1.4 关于响应式编程和普通编程

? ? ? ? 场景:有三个任务A、B、C,B的执行依赖于A的执行结果,同样,C的执行结果依赖于B,则两种编程的作废分别为:

? ? ? ? 普通编程:先执行A,A执行完后执行B,B执行完执行C,即除了3个任务本身的执行过程外,3个任务之间的依赖执行逻辑也要由开发者编写。

? ? ? ? 响应式编程:会将3个任务做依赖关系绑定:C 依赖 B 依赖 A,二这个依赖绑定的先后执行逻辑由语言或调用库来支持,开发者只需专注于3个任务本身的执行逻辑,而不必处理3个任务间的执行逻辑,只需要把它委托给语言或调用库即可。之后,B会自动响应A的执行结果,C会自动响应B的执行结果。

? ? ? ? 总结:响应式编程优雅的处理了任务(业务或事务)间的依赖关系。

?????

2. 基本使用

2.1 基本元素关系图

---->? observables 是被观察者、事件源,用来处理事件的派发。

---->? observer/subscriber 是观察者、订阅者,观察/订阅目标是observables,observables派发出来的事件由他处理。

????????observer 和 subscriber 都是观察者/订阅者,后者是前者的扩展,内部增加了 onStart()方法:在时间发送之前订阅,用于做一些准备工作;还增加了 unSubscribe(),用于取消订阅。

---->??subscribe 是一个动作,将两者建立“订阅”关系,一旦订阅关系建立,observer/subscriber就可以立即接受、响应observables的变化。

2.2 代码示例:

        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                observableEmitter.onNext("emit A");
                observableEmitter.onNext("emit B");
                observableEmitter.onNext("emit C");
                observableEmitter.onComplete();
            }
        });

        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable disposable) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext");
                System.out.println(s);
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };

        observable.subscribe(observer);

执行结果:

????????onSubscribe? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
????????onNext? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
????????emit A? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
????????onNext? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
????????emit B????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????
????????onNext? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
????????emit C????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????
????????onComplete? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

---->??observer 的 onSubscribe(Disposable disposable)方法在执行订阅动作后立即执行回调,在这里可以用??disposable 取消订阅或者将该对象保存起来以后再取消订阅。

---->??oberver 的 onNext()在emitter.onNext()执行后回调执行,表示响应被观察者事件,onNext()方法会执行多次,表示有多个事件,具体执行次数由被观察者决定。

---->??observer 的 onComplete()在被观察者的 onComplete()执行后回调执行,表示事件全部完成,通常在这里做一些收尾工作。

---->??observer 的 onError()在被观察者执行过程中的任何位置出现异常时回调执行,表示事件执行异常。同 onComplete()互斥。

? ? ? ? 通过以上说明可以发现:在 observer 中,除了 onSubscribe(Disposable d)以外,其他方法都和创建?observable 时发射器?observableEmitter 的方法同名,且都在同名方法执行后回调。比如,observableEmitter 执行一次 onNext(),则 observer 中的 onNext()方法执行一次。

????????onComplete()和onError()互斥,两者智能触发其一(且其中一个触发后,onNext()变不会再执行):

a.?onComplete()触发后,后续 observableEmitter 调用任何方法都不在生效;

b. onError()触发后,如果 observableEmitter 再调用?onComplete()或 onError(),RxJava会抛出异常,开发者需要自行保证唯一性。

? ? ? ? 特别需要注意的是:ObservableOnSubscribe 的?subscribe()方法在每次有新的observer加入时,都会在 observer 的 onSubscribe()回调后触发,这就保证了所有的观察者都可以接收到事件。

2.3 关于subscribe()

????????即订阅动作,他将观察者和被观察者建立了订阅关系,通过该方法,observable 回调 observer 的对应方法,从而达到 观察者相应被观察者发出的事件(实际上被观察者只负责产生事件,他是事件源,真正发送事件的是他在订阅时,即subscribe()被调用时)。

? ? ? ? 有多个重载的方法:

    // 1.观察者对被观察者发送的任何事件都响应
    @SchedulerSupport("none")
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");

        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            this.subscribeActual(observer);
        } catch (NullPointerException var4) {
            throw var4;
        } catch (Throwable var5) {
            Exceptions.throwIfFatal(var5);
            RxJavaPlugins.onError(var5);
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(var5);
            throw npe;
        }
    }

    // 2.观察者对被观察者发出的事件作出响应(被观察者还可以继续发送事件)
    @SchedulerSupport("none")
    public final Disposable subscribe() {
        return this.subscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }

    // 3.观察者只对被观察者发出的onNext()事件作出响应    
    @SchedulerSupport("none")
    public final Disposable subscribe(Consumer<? super T> onNext) {
        return this.subscribe(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
    
    // 4.观察者只对被观察者发出的onNext()、onError()事件作出响应  
    @SchedulerSupport("none")
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
        return this.subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }

    // 5.观察者只对被观察者发出的onNext()、onError()、onComplete()事件作出响应  
    @SchedulerSupport("none")
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {
        return this.subscribe(onNext, onError, onComplete, Functions.emptyConsumer());
    }

    // 6.观察者只对被观察者发出的onNext()、onError()、onComplete()、onSubscribe()事件作出响应
    @SchedulerSupport("none")
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
        LambdaObserver<T> ls = new LambdaObserver(onNext, onError, onComplete, onSubscribe);
        this.subscribe((Observer)ls);
        return ls;
    }

? ? ? ? 其中,2、3、4、5都this到6,而6最后又调用了1。另外,上面增加了两个新类:ConsumerAction?:observer 中的 onNext()、onComplete()、onError()被 Consumer.accept()代替;observer中的 onComplete()方法被 Action.run()方法代替。因此,如果只关心observer中的一个或几个回调方法,则可以通过使用?Consumer 或?Action 来替换observer注册到被观察者中。 比如:上面的示例代码,如果只关心onNext(),则可以写为:

        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                observableEmitter.onNext("emit A");
                observableEmitter.onNext("emit B");
                observableEmitter.onNext("emit C");
                observableEmitter.onComplete();
            }
        });

        Consumer<String> consumer = new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        };

        observable.subscribe(consumer);

2.4 线程调度

????????2.4.1 线程调度

????????默认情况下,被观察者和观察者处于同一线程中。而异步才是RxJava的核心,所有需要用到线程调用,其实现通过 Scheduler 来实现:

a. subscribeOn():指定被被观察者在subscribe所发生的线程(即指定事件源的线程)。

? ? ? ? 多次指定发射事件的线程只有第一次指定有效,也就是多次调用只有第一次有效,之后的调用全被忽略。

b. observerOn():指定观察者/订阅者接受/响应事件的线程(即指定接受事件的线程)。

? ? ? ? 多次指定接受事件的线程是可以的,每指定一次,线程就切换一次,所以以最终指定的为准,即多次指定,以最后一次为准。

????????2.4.2 RxJava内置的常用的线程项:

a. Schedulers.io(): 代表执行io操作的线程,常用于网络请求、文件读写等io密集型操作的地方。行为模式和 new Thread 类似,只是其内部维护了一个无上限的线程池,更高效。

b.?Schedulers.newThread():总是启用新线程,在新线程中执行当前事务。

c.?Schedulers.complutation():代表cpu计算密集型的操作,既不会被io操作限制性能的操作,如图形计算等。内部维护了一个固定大小(大小为cpu的核数)的线程池。不要把io操作放到该线程项中,浪费cpu的资源。

d. AndroidSchedulers.mainThread():android的主线程,用于更新UI,为android独有。

下一篇说操作符

  移动开发 最新文章
Vue3装载axios和element-ui
android adb cmd
【xcode】Xcode常用快捷键与技巧
Android开发中的线程池使用
Java 和 Android 的 Base64
Android 测试文字编码格式
微信小程序支付
安卓权限记录
知乎之自动养号
【Android Jetpack】DataStore
上一篇文章      下一篇文章      查看所有文章
加:2022-10-08 20:53:18  更:2022-10-08 20:57:28 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/19 20:52:52-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码