IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> RxJava源码分析 -> 正文阅读

[Java知识库]RxJava源码分析

1、创建Observable

调用一个create方法创建Observable,会检查ObservableOnSubscribe是否为null,使用RxJavaPlugins的onAssembly
装配一个新建的ObservableCreate,他是Observable的子类

    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        //返回装配的Observable,即新建的ObservableCreate
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

在onAssembly会执行下hook方法f,方便创建Observable对象时再做一些处理

 @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

create方法创建Observable为ObservableCreate,并且内部包含了一个ObservableOnSubscribe对象

2、subscribe

创建Observable后就可以使用订阅方法subscribe指定Observer了

@SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
    	//检查Observer是否为null
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
        	//如果设置过subscribe的hook方法则执行hook方法
            observer = RxJavaPlugins.onSubscribe(this, observer);
            ObjectHelper.requireNonNull(observer, "");
            //开始真正订阅
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            ...
        }
    }

调用subscribeActual开始真正订阅,这是个抽象方法,我们创建的Observable是ObservableCreate,实际上是在ObservableCreate实现的

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //调用观察者的onSubscribe方法
        observer.onSubscribe(parent);
        try {
        	//source是ObservableOnSubscribe对象,创建Observable时传入
        	//调用ObservableOnSubscribe的subscribe方法传入CreateEmitter
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

subscribeActua中,先调用观察者的onSubscribe方法通知已经订阅好,准备发送数据了,Observer的onSubscribe方法接受的是Disposable,CreateEmitter实现了Disposable接口。最后调用ObservableOnSubscribe的subscribe方法传入CreateEmitter,可以使用CreateEmitter发送数据。

3、发送数据或事件

ObservableOnSubscribe的subscribe方法中发送数据。传入CreateEmitter,所以发送的数据或事件就是调用的CreateEmitter的onNext方法

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
		...
        final Observer<? super T> observer;
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            ...
            if (!isDisposed()) {
            	//如果没有关闭开关,调用观察者的onNext方法,将数据传给观察者
                observer.onNext(t);
            }
        }

       ...
    }

4、subscribeOn

subscribeOn指定线程调度器

 	@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        //创建并返回一个新的被观察者ObservableSubscribeOn,他持有一个之前创建的被观察者引用
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

subscribeOn创建并返回一个新的被观察者ObservableSubscribeOn也是Observer的子类,他持有一个之前创建的被观察者引用。用create创建的被观察者叫ObservableCreate,subscribeOn创建的被观察者叫ObservableSubscribeOn。在创建ObservableSubscribeOn时候传入了this,他指向的是用create创建的被观察者叫ObservableCreate。
ObservableSubscribeOn的subscribeActual方法的实现

	@Override
    public void subscribeActual(final Observer<? super T> observer) {
    	//创建新的观察者包装传进来的观察者
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
		//调用观察者onSubscribe
        observer.onSubscribe(parent);
		//调用调度器执行任务,返回Disposable用于取消任务
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

SubscribeTask任务就是调用被观察者ObservableCreate的Subscribe方法,实现了线程调度器中运行Subscribe方法

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
        	//调用被观察者的Subscribe方法
        	//source是创建ObservableSubscribeOn时候传入的SubscribeCreate对象
            source.subscribe(parent);
        }
    }

多次使用subscribeOn只有一次起作用,原因分析:
比如按照顺序指定线程调度器为I/O,和newThread,第一次调用subscribeOn指定I/O线程调度器时,创建了一个被观察者ObservableSubscribeOn,里面包含了create方法创建的被观察者叫ObservableCreate的引用,第二次调用subscribeOn指定newThread线程调度器时,又创建创建了一个被观察者ObservableSubscribeOn,里面包含了第一个ObservableSubscribeOn的引用,最后调用第二个ObservableSubscribeOn的subscribe方法指定观察者时,按顺序执行,最终会触发ObservableCreate的subscribe方法执行,而ObservableCreate的subscribe方法在哪个线程调度器取决于指向它的ObservableSubscribeOn线程调度器是哪个

5、observeOn

observeOn控制在什么样的线程中接受事件

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        //返回新建的被观察者ObservableObserveOn
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

observeOn也创建了新的观察者ObservableObserveOn,也持有上一个被观察者也就是ObservableCreate的引用,执行Observable的subscribe方法指定观察者时触发subscribeActual时调用,这时是调用的ObservableObserveOn的subscribeActual方法

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
			//调用指向的被观察者的subscribe,创建一个新的观察者ObserveOnObserver
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

创建一个新的观察者ObserveOnObserver包装原来的观察者,还包含一个Scheduler.Worker,所以最后发送出来的数据回传给ObserveOnObserver,在指定线程中接受处理。
多次调用observeOn都会起作用,因为每次调用observeOn都会创建ObserveOnObserver,每个ObserveOnObserver都会绑定对应的线程调度器接受和处理事件

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-07-29 23:24:06  更:2021-07-29 23:24:24 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年4日历 -2024/4/28 2:11:28-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码