IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 移动开发 -> RxJava:concat(连接)、 concatDelayError、 concatEager的使用 -> 正文阅读

[移动开发]RxJava:concat(连接)、 concatDelayError、 concatEager的使用


在这里插入图片描述

concat(串行连接数据)

concat,翻译为连接。这个方法的作用是将多个数据源发射的数据,按照顺序进行连接。数据的顺序,只与发射源的顺序相关,与数据源的发射时间无关。就是说,即使第二个发射源先完成了所有数据的发送,发射的数据也要排到第一个发射源的数据之后。

测试简单连接:

    @Test
    public void testConcat(){
        List<Observable<Integer>> list = new ArrayList<>();
        list.add(Observable.just(1));
        list.add(Observable.just(2));
        list.add(Observable.just(3));
        list.add(Observable.just(4));
        Observable.concat(list).subscribe(i->{
            System.out.println(i);
        });
    }

输出:

1
2
3
4

测试给第一个数据源添加延时:

@Test
public void testConcatDelay() throws InterruptedException {
    List<Observable<Integer>> list = new ArrayList<>();
    list.add(Observable.just(1).delay(5, TimeUnit.SECONDS));
    list.add(Observable.just(2));
    list.add(Observable.just(3));
    list.add(Observable.just(4));
    Observable.concat(list).subscribe(i->{
        System.out.println(i);
    });
    Thread.sleep(10*1000);
}

输出:

1
2
3
4

可见,即使发射 1 的时间延迟了5s,1 仍然排在数据的首位。

concatDelayError

这里引出另外一个方法:concatDelayError 。

如果在concat过程中发生了错误,RxJava会如何处理:

测试:

@Test
public void testConcatError(){
    List<Observable<Integer>> list = new ArrayList<>();
    list.add(Observable.just(1).doOnNext(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            throw new NullPointerException();
        }
    }));
    list.add(Observable.just(2));
    list.add(Observable.just(3));
    list.add(Observable.just(4));
    Observable.concat(list).subscribe(new Subscriber<Integer>() {
        @Override
        public void onCompleted() {
            System.out.println("ConcatTest.onCompleted");
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("ConcatTest.onError");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("integer = " + integer);
        }
    });
}

输出:

ConcatTest.onError

只要有一个发射源产生了错误,concat就会停止执行。

concatDelayError方法的出现正是为了应对这种场景,当数据源发生错误时,concatDelayError允许继续连接其他数据源,并且在最后,进入onError。

测试:

@Test
public void testConcatDelayError(){
    List<Observable<Integer>> list = new ArrayList<>();
    list.add(Observable.just(1).doOnNext(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            throw new NullPointerException();
        }
    }));
    list.add(Observable.just(2));
    list.add(Observable.just(3));
    list.add(Observable.just(4));
    Observable.concatDelayError(list).subscribe(new Subscriber<Integer>() {
        @Override
        public void onCompleted() {
            System.out.println("ConcatTest.onCompleted");
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("ConcatTest.onError");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("integer = " + integer);
        }
    });
}

输出:

integer = 2
integer = 3
integer = 4
ConcatTest.onError

concatEager(并行连接数据)

我们继续关注concat方法,数据源延时发射的问题,我们给两个数据源添加延时

@Test
public void testConcatDelayTimeOut() throws InterruptedException {
    List<Observable<Integer>> list = new ArrayList<>();
    list.add(Observable.just(1).delay(2, TimeUnit.SECONDS));
    list.add(Observable.just(2).delay(2, TimeUnit.SECONDS));
    Observable.concat(list).subscribe(i->{
        System.out.println(i);
    });
    Thread.sleep(3*1000);
}

输出:

1

这里2没有被发射出去,因为concat默认是串行的,只有前一个Observable执行完成之后,才开始执行下一个。test方法执行到第3s时,方法已经执行完毕,而第二个Observable的延时还在执行,导致2没有被发射。

concatEager提供了并行的解决方案,一旦开始订阅,所有的Observable都开始执行。

测试:

@Test
public void testConcatEager() throws InterruptedException {
    List<Observable<Integer>> list = new ArrayList<>();
    list.add(Observable.just(1).delay(2, TimeUnit.SECONDS));
    list.add(Observable.just(2).delay(2, TimeUnit.SECONDS));
    Observable.concatEager(list).subscribe(i->{
        System.out.println(i);
    });
    Thread.sleep(3*1000);
}

输出

1
2
  移动开发 最新文章
Vue3装载axios和element-ui
android adb cmd
【xcode】Xcode常用快捷键与技巧
Android开发中的线程池使用
Java 和 Android 的 Base64
Android 测试文字编码格式
微信小程序支付
安卓权限记录
知乎之自动养号
【Android Jetpack】DataStore
上一篇文章      下一篇文章      查看所有文章
加:2022-01-29 23:11:58  更:2022-01-29 23:14:36 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 12:34:53-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码