IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 移动开发 -> RxJava 合并操作符 merge和mergeDelayError -> 正文阅读

[移动开发]RxJava 合并操作符 merge和mergeDelayError

合并操作符(Combining Operators)是RxJava里面的一类操作符。主要有merge,concat,zip,amb,combineLast,group.这篇文章介绍merge和相关的方法。

合并操作符是用于处理多操作符的。这些操作符要处理的对象至少是两个。merge操作符可以合并两个Observable,变成一个Observable.过程如下图所示。
在这里插入图片描述
从上图可以看出,merge是不保证顺序的,两个Obervable元素是会交叉发射的。在官方文档里面这幅图并不是一个静态图,两个为1的元素是可以用鼠标拖动的。下面的两个1也会跟着移动。可以拖动成下面的样子:
在这里插入图片描述
merge还有一个重要的特点是多个Observable之中的一个发生错误的时候,会导致整个Observable调用onError而导致程序结束,没有发生错误的Observable后续的内容将丢失。如下图所示。
在这里插入图片描述
总结就是两个特点:
1.不保证合并后Observable的执行顺序。
2.发生错误后,未执行完的后续内容将被丢弃。
特别是第二个特点,感觉像是缺点。因为如果要丢弃数据的话,我们肯定是不能够在项目中使用的,除非一些需要这个特性的功能,不过一般也不太可能。

下面通过代码演示这些特性。
首先是一个错误的例子。我们可能很自然的写下下面的代码:

 		Observable<Integer> observable1 = Observable.just(1, 2, 3)Observable<Integer> observable2 = Observable.just(4, 5, 6);
        Observable
                .merge(observable1, observable2)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        //数据开关,直接关闭
                        //d.dispose();
                        Log.d("CreateActivity", "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer i) {
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Log.d("ConbineActivity", i + " " + Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                        Log.d("ConbineActivity", "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d("ConbineActivity", "onComplete");
                    }
                });

输出如下。不管重复多少次都是下面的结果,这里可能就疑惑了,不是说好不保证顺序的吗?怎么又能保证顺序了呢?这是个错误例子,保证顺序的前提是在多个线程中,比如官方给出的一个例子。

 onSubscribe
 1 main
 2 main
 3 main
 4 main
 5 main
 6 main
 onComplete

下面的代码是官方给出的例子。和上面区别就是添加了subscribeOn(Schedulers.io())这段代码。使用了create来创建数据源,因为我们需要输出数据源的线程名字。这就变成了两个线程了,这个时候不保证执行顺序的特性就体现出来了。
不过多线程不是本来就不保证执行顺序的吗?是的,和多线程是没什么区别的,但这里最主要的点是合并多个Observable,顺不顺序是次要的。

 		Observable<Integer> source1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d("ConbineActivity","Source1: emitting from " +Thread.currentThread().getName());
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io());
        Observable<Integer> source2 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d("ConbineActivity","Source2: emitting from " +Thread.currentThread().getName());
                emitter.onNext(4);
                emitter.onNext(5);
                emitter.onNext(6);
                emitter.onComplete();
            }
        });

        Observable.merge(source1, source2)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        //数据开关,直接关闭
                        //d.dispose();
                        Log.d("CreateActivity", "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer i) {
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Log.d("ConbineActivity", i + " " + Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                        Log.d("ConbineActivity", "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d("ConbineActivity", "onComplete");
                    }
                });

输出结果如下。source1和source2是数据源是在两个不同的线程发布的,source1在新创建的io线程里面创建,source2在main线程。而观察者接受到数据全都是在main线程。我们可以发现发射源source1和source2的执行顺序可能不一样,观察者接受到的顺序也可能不一样。这样,merge不保证顺序的特性就已经体现出来了。

 onSubscribe
 Source2: emitting from main
 Source1: emitting from RxCachedThreadScheduler-1
 4 main
 1 main
 2 main
 3 main
 5 main
 6 main
 onComplete

接下来要特别第二个比较麻烦的问题。出现错误之后数据丢失的问题。
看下面的例子:

Observable<Integer> source1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d("ConbineActivity", "Source1: emitting from " + Thread.currentThread().getName());
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onError(new Exception("No Network"));
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io());
        Observable<Integer> source2 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d("ConbineActivity", "Source2: emitting from " + Thread.currentThread().getName());
                emitter.onNext(4);
                emitter.onNext(5);
                emitter.onNext(6);
                emitter.onComplete();
            }
        });

        Observable.merge(source1, source2)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        //数据开关,直接关闭
                        //d.dispose();
                        Log.d("ConbineActivity", "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer i) {
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Log.d("ConbineActivity", i + " " + Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                        Log.d("ConbineActivity", "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d("ConbineActivity", "onComplete");
                    }
                });

这个例子和上一个例子是一样的,只是在第一个Observable的数据源里面调用了onError方法,这时候可能出现下面两种结果。
结果一:
这里出现很困惑的事情,为什么输出结果都是在RxCachedThreadScheduler-1这个线程里面,不是应该都是在主线程里面吗?这个情况如果在onNext方法里面添加了sleep方法会出现的非常频繁。不叫也是有概率出现的。这个情况非常严重,我甚至对rxjava的正确性和稳定性产生了非常大的怀疑。

 onSubscribe
 Source1: emitting from RxCachedThreadScheduler-1
 Source2: emitting from main
 1 RxCachedThreadScheduler-1
 4 RxCachedThreadScheduler-1
 5 RxCachedThreadScheduler-1
 6 RxCachedThreadScheduler-1
 2 RxCachedThreadScheduler-1
 3 RxCachedThreadScheduler-1
 W/System.err: java.lang.Exception: Low Power
 //这里省略一堆异常信息
 D/ConbineActivity: onError

结果二:
输出4之后直接就发生错误了,我们的onError方法是在onNext(2)之后调用的,但这里onNext(1)和onNext(2)都没有输出,这也是非常有问题的。不知道是什么原因。

 onSubscribe
 Source2: emitting from main
 Source1: emitting from RxCachedThreadScheduler-1
 4 main
 No Network
 //some error output code
 D/ConbineActivity: onError

没有一种情况是正常的,这让我对rxjava产生了很大的怀疑。我甚至怀疑rxjava2还只是个实验的产品,根本没有投入实际项目的基础,因为正确性和稳定性都是非常值得怀疑的。

我特地去github提了问题,项目人员回答我了。

A1) By default, errors can cut ahead of items. merge reuses the emitting thread and if that thread is blocked down the line, other sources may run to termination. When merge can run again, it will first check for errors and ignore any value from any source.

A2) Depends on which source wins as you are racing the main thread with the background thread.

大致意思是说,merge可以提前舍弃元素,就是一个元素虽然发出了,但还没来的及被接受就发生错误而被舍弃了。
而且观察者线程是谁先争取到就是谁。
根据他的回答,也就是他不认为这是错误。既然是“正确的”行为。那我们就不应该在实际项目中用merge。因为行为是不可靠的。所以我还是不是特别理解为什么要设计成这种特性,可能有特殊的情况使用到。

但这根本没法在项目中用啊。实际上,RxJava是提供在所有操作执行完后再抛异常的操作的。这就是mergeDelayError()方法。和merge使用方法是一样的,只是带来延迟错误的功能。这是我们想要的功能。
从下面这张图可以看出,紫色的球在异常发生后还是执行了,并没有被丢弃。这就是mergeDelayError()方法的作用。
在这里插入图片描述

使用起来和merge一样,如下面的代码。

        Observable<Integer> source1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d("ConbineActivity", "Source1: emitting from " + Thread.currentThread().getName());
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onError(new Exception("No Network"));
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io());
        
        
 
        Observable<Integer> source2 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d("ConbineActivity", "Source2: emitting from " + Thread.currentThread().getName());
                emitter.onNext(4);
                emitter.onNext(5);
                emitter.onNext(6);
                emitter.onComplete();
            }
        });

        Observable.mergeDelayError(source1, source2)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        //数据开关,直接关闭
                        //d.dispose();
                        Log.d("ConbineActivity", "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer i) {

//                        try {
//                            Thread.sleep(500);
//                        } catch (InterruptedException e) {
//                            e.printStackTrace();
//                        }

                        Log.d("ConbineActivity", i + " " + Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                        Log.d("ConbineActivity", "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d("ConbineActivity", "onComplete");
                    }
                });

非常明显,在onNext(3)的时候发生异常,异常在onNext(6)执行完才抛出,符合我们的预期。

onSubscribe
Source1: emitting from RxCachedThreadScheduler-1
Source2: emitting from main
1 RxCachedThreadScheduler-1
2 RxCachedThreadScheduler-1
4 main
5 main
6 main
W/System.err: java.lang.Exception: No Network
onError

最后简单说一下mergeWith().
这个方法实际就是个特例,就是两个Observable的时候可以使用。

 Observable<Integer> source1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d("ConbineActivity", "Source1: emitting from " + Thread.currentThread().getName());
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onError(new Exception("No Network"));
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io());
        
 Observable<Integer> source2 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d("ConbineActivity", "Source2: emitting from " + Thread.currentThread().getName());
                emitter.onNext(4);
                emitter.onNext(5);
                emitter.onNext(6);
                emitter.onComplete();
            }
        });

source1.mergeWith(observable2)
           .subcribeOn(Schedules.io())
           .

我觉得这个方法没什么用,只是提供多一种写法而已。mergeDelayError是没有这种with方法的,所以也不通用,建议不要用。在项目中只用mergeDelayError就行。

  移动开发 最新文章
Vue3装载axios和element-ui
android adb cmd
【xcode】Xcode常用快捷键与技巧
Android开发中的线程池使用
Java 和 Android 的 Base64
Android 测试文字编码格式
微信小程序支付
安卓权限记录
知乎之自动养号
【Android Jetpack】DataStore
上一篇文章      下一篇文章      查看所有文章
加:2022-05-11 16:33:45  更:2022-05-11 16:33:50 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/25 1:25:43-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码