1、创建Observable
调用一个create方法创建Observable,会检查ObservableOnSubscribe是否为null,使用RxJavaPlugins的onAssembly 装配一个新建的ObservableCreate,他是Observable的子类
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
在onAssembly会执行下hook方法f,方便创建Observable对象时再做一些处理
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
create方法创建Observable为ObservableCreate,并且内部包含了一个ObservableOnSubscribe对象
2、subscribe
创建Observable后就可以使用订阅方法subscribe指定Observer了
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "");
subscribeActual(observer);
} catch (NullPointerException e) {
throw e;
} catch (Throwable e) {
...
}
}
调用subscribeActual开始真正订阅,这是个抽象方法,我们创建的Observable是ObservableCreate,实际上是在ObservableCreate实现的
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
subscribeActua中,先调用观察者的onSubscribe方法通知已经订阅好,准备发送数据了,Observer的onSubscribe方法接受的是Disposable,CreateEmitter实现了Disposable接口。最后调用ObservableOnSubscribe的subscribe方法传入CreateEmitter,可以使用CreateEmitter发送数据。
3、发送数据或事件
ObservableOnSubscribe的subscribe方法中发送数据。传入CreateEmitter,所以发送的数据或事件就是调用的CreateEmitter的onNext方法
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
...
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
...
if (!isDisposed()) {
observer.onNext(t);
}
}
...
}
4、subscribeOn
subscribeOn指定线程调度器
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
subscribeOn创建并返回一个新的被观察者ObservableSubscribeOn也是Observer的子类,他持有一个之前创建的被观察者引用。用create创建的被观察者叫ObservableCreate,subscribeOn创建的被观察者叫ObservableSubscribeOn。在创建ObservableSubscribeOn时候传入了this,他指向的是用create创建的被观察者叫ObservableCreate。 ObservableSubscribeOn的subscribeActual方法的实现
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
SubscribeTask任务就是调用被观察者ObservableCreate的Subscribe方法,实现了线程调度器中运行Subscribe方法
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
多次使用subscribeOn只有一次起作用,原因分析: 比如按照顺序指定线程调度器为I/O,和newThread,第一次调用subscribeOn指定I/O线程调度器时,创建了一个被观察者ObservableSubscribeOn,里面包含了create方法创建的被观察者叫ObservableCreate的引用,第二次调用subscribeOn指定newThread线程调度器时,又创建创建了一个被观察者ObservableSubscribeOn,里面包含了第一个ObservableSubscribeOn的引用,最后调用第二个ObservableSubscribeOn的subscribe方法指定观察者时,按顺序执行,最终会触发ObservableCreate的subscribe方法执行,而ObservableCreate的subscribe方法在哪个线程调度器取决于指向它的ObservableSubscribeOn线程调度器是哪个
5、observeOn
observeOn控制在什么样的线程中接受事件
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
observeOn也创建了新的观察者ObservableObserveOn,也持有上一个被观察者也就是ObservableCreate的引用,执行Observable的subscribe方法指定观察者时触发subscribeActual时调用,这时是调用的ObservableObserveOn的subscribeActual方法
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
创建一个新的观察者ObserveOnObserver包装原来的观察者,还包含一个Scheduler.Worker,所以最后发送出来的数据回传给ObserveOnObserver,在指定线程中接受处理。 多次调用observeOn都会起作用,因为每次调用observeOn都会创建ObserveOnObserver,每个ObserveOnObserver都会绑定对应的线程调度器接受和处理事件
|