final Function<? super T, ? extends R> mapper;
MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) { this.t = t; this.mapper = mapper; }
@Override public void onSubscribe(Disposable d) { t.onSubscribe(d); }
@Override public void onSuccess(T value) { R v; try { //外面是判空,相当于就是mapper.apply(value),这个方法其实就是我们自己的map方法 v = ObjectHelper.requireNonNull(mapper.apply(value), “The mapper function returned a null value.”); } catch (Throwable e) { Exceptions.throwIfFatal(e); onError(e); return; } //将map方法处理后的事件,传递给下游 t.onSuccess(v); }
@Override public void onError(Throwable e) { t.onError(e); } } }
看到这儿我们可以发现,事件流向是上游的被观察者流向观察者,在操作符中,因为操作符自身是继承了被观察者(在此处为Single),而在其自身中,有一个内部类是观察者(在此处为实现了SingleObserver的MapSingleObserver),事件由上游的被观察者,流向下游的观察者,而所有的操作符的结构都是一样的,每个操作符都只需要给上游操作符提供Observer,并给下游提供一个Observable,内部结构就是,从上游流向下游内部的observer被观察者,然后此下游的观察者observable会调用它自己下游的内部observer,这样,整条链就能运行了。
由此可知,Rxjava中,每个操作符内部都实现了一整套PUSH模型的接口体系
由特殊到普通 现在回到最普通的Rxjava写法
Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext(1); emitter.onComplete(); } }).map(new Function<Integer, Integer>() { @Override public Integer apply(Integer integer) throws Exception { return integer+1; } }).subscribe(new Observer() { @Override public void onSubscribe(Disposable d) { }
@Override public void onNext(Integer integer) { }
@Override public void onError(Throwable e) { }
@Override public void onComplete() { } });
先看create方法的源码
public static Observable create(ObservableOnSubscribe source) { ObjectHelper.requireNonNull(source, “source is null”); return RxJavaPlugins.onAssembly(new ObservableCreate(source)); }
通过上面的分析,我们一眼可以看出,就相当于new ObservableCreate(source)
public final class ObservableCreate extends Observable { final ObservableOnSubscribe source;
public ObservableCreate(ObservableOnSubscribe source) { this.source = source; }
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter parent = new CreateEmitter(observer);//1 observer.onSubscribe(parent);//2
try { source.subscribe(parent);//3 } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } static final class CreateEmitter extends AtomicReference implements ObservableEmitter, Disposable { … } … }
这个类比较长,我们先只看我们关心的部分。只以看到我们喜爱的subscribeActual方法,在订阅时,会调用到此方法。
再来逐句分析,在运行1语句时,new CreateEmitter,看到CreateEmitter的源码
//实现了ObservableEmitter,ObservableEmitter是Emitter的子类,用于发射上游数据 static final class CreateEmitter extends AtomicReference implements ObservableEmitter, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
//下游的observer CreateEmitter(Observer<? super T> observer) { this.observer = observer; }
@Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sou
《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
【docs.qq.com/doc/DSkNLaERkbnFoS0ZF】 完整内容开源分享
rces.")); return; } if (!isDisposed()) { //把事件传递给下游observer,调用观察者的onNext方法 observer.onNext(t); } } … }
再回到ObservableCreate的源码,它是被观察者Observable的子类,
- 先在1时new了一个发射器CreateEmitter对象,然后我们把自定义的下游观察者observer作为参数传了进去,这里同样也是包装起来,这个CreateEmitter实现了ObservableEmitter和Disposable接口
- 在2语句时,触发我们自定义的observer的onSubscribe(Disposable)方法,实际就是调用观察者的onSubscribe方法,告诉观察者已经成功订阅到被观察者了;
- 再执行在语句3,source.subscribe(parent);就和我们分析Map一样了,就是订阅,把事件从上游传到下游
小结
Observable(被观察者)和Observer(观察者)建立连接,也就是订阅之后,会创建出一个发射器CreateEmitter,发射器会把被观察者中产生的事件发送到观察者中,观察者对发射器中发出的事件做出响应事件。可以看到,订阅成功之后,Observabel才会开始发送事件
切断消息源码分析
现在我们再来看dispose的实现。Disposabel是一个接口,可以理解Disposable是一个连接器,调用dispose后,这个连接就会中断。其具体实现在CreateEmitter类。我们现在主要来分析一下它的这一块源码。
在CreateEmitter中的dispose()方法
@Override public void dispose() { DisposableHelper.dispose(this); }
就是调用的DisposableHelper的dispose方法
public enum DisposableHelper implements Disposable { /**
- The singleton instance representing a terminal, disposed state, don’t leak it.
*/ DISPOSED ; …
public static boolean isDisposed(Disposable d) { //判断Disposable类型的变量的引用是否为DISPOSED //就可以判断这个连接器是否中断 return d == DISPOSED; }
public static boolean dispose(AtomicReference field) { Disposable current = field.get(); Disposable d = DISPOSED; if (current != d) { //把field设置为DISPOSED current = field.getAndSet(d); if (current != d) { if (current != null) { current.dispose(); } return true; } } return false; } … }
可以看到DisposableHelper是个枚举类,并且只有一个值DISPOSED。dispose方法就是把一个原子引用的field设为DISPOSED,这就是中断状态。而isDisposed()就是根据这个标志来判断是否中断的。
再回过头来看CreateEmiiter类的onNext这些方法
@Override public void onNext(T t) { //省略无关代码
if (!isDisposed()) { //如果没有dispose(),才会调用onNext() observer.onNext(t); } }
@Override public void onError(Throwable t) { if (!tryOnError(t)) { //如果dispose()了,会调用到这里,即最终会崩溃 RxJavaPlugins.onError(t); } }
@Override public boolean tryOnError(Throwable t) { //省略无关代码 if (!isDisposed()) { try { //如果没有dispose(),才会调用onError() observer.onError(t); } finally { //onError()之后会dispose() dispose(); } //如果没有dispose(),返回true return true; } //如果dispose()了,返回false return false; }
@Override public void onComplete() { if (!isDisposed()) { try { //如果没有dispose(),才会调用onComplete() observer.onComplete(); } finally { //onComplete()之后会dispose() dispose(); } } }
很容易得出,
- 如果没有dispose,observer的onNext才会被调用
- onError与onComplete方法互斥,只能其中一个调用到,因为调用其中一个,就会把连接切断,dispose
- 先onError后onComplete中是onComplete不会被调用,反过来的话,就会崩溃,因为onError中抛出了异常,实际上,dispose了后调用onError都会崩
再看一下操作符Map
public final Observable map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, “mapper is null”); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); }
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource source, Function<? super T, ? extends U> function) { super(source); this.function = function; }
@Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); }
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; }
@Override public void onNext(T t) { if (done) { return; }
if (sourceMode != NONE) { downstream.onNext(null); return; }
U v;
try { v = ObjectHelper.requireNonNull(mapper.apply(t), “The mapper function returned a null value.”); } catch (Throwable ex) { fail(ex); return; } downstream.onNext(v); }
… } }
可以看到,操作符其实和上面分析的特殊情况下的一样的,这里就省略分析了。
##三.Rxjava线程切换 我们一般是这么使用的
Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext(1); emitter.onComplete(); } }).map(new Function<Integer, Integer>() { @Override public Integer apply(Integer integer) throws Exception { return integer+1; } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer() { … });
通过subscribeOn来切换上游线程,observeOn来切换下游线程。
那么在源码中,是怎么的呢?
subscribeOn源码分析 Schedulers.io()
subscribeOn类型有好几种,这里就随便选择了Schedulers.io()来分析,别的其实都差不多的,分析了一个就行了。
@NonNull public static Scheduler io() { //又是hook,就相当于IO return RxJavaPlugins.onIoScheduler(IO); }
public final class Schedulers { … @NonNull static final Scheduler IO;
…
static final class IoHolder { static final Scheduler DEFAULT = new IoScheduler(); }
static { //又是hook,就相当于new IOTask IO = RxJavaPlugins.initIoScheduler(new IOTask()); … }
… static final class IOTask implements Callable { @Override public Scheduler call() throws Exception { return IoHolder.DEFAULT; } } }
可以看到,最后这里就相当于new IoScheduler,先不看它的具体实现。
subscribeOn
我们继续看subscribeOn的源码
public final Observable subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, “scheduler is null”); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler)); }
可以看到和前面一样,就相当于返回new ObservableSubscribeOn
public final class ObservableSubscribeOn extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) { super(source); this.scheduler = scheduler; }
@Override public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver parent = new SubscribeOnObserver(observer);
observer.onSubscribe(parent);
//外层的parent.setDisposable是为了创建连接器,以便以后切断等控制的,可以只看里面 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } … }
它的构造就是把source和scheduler两个都保存一下,在后面要用到的。
接下来我们来看订阅过程,虽然这里是线程切换,但是其实它也只是个操作符,和我们前面分析的是一样的,订阅过程和上面也是一样的,所以我们可以知道,当订阅发生后,ObservableSubscribeOn的subscribeActual方法就会被调用。
同样的,subscribeActual方法中,它把我们自定义的下游观察者observer包装成了SubscribeOnObserver对象,然后调用observer的onSubscribe方法,可以看到,目前为止,还没有发生任何的线程相关的东西,所以observer的onSubscribe()方法是运行在当前线程中的,那我们重点来看一下parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));方法。
我们先来看一下SubscribeTask类
//是ObservableSubscribeOn的内部类,实现runnable接口,看到这,我们嗅到了线程的味道 final class SubscribeTask implements Runnable { private final SubscribeOnObserver parent;
SubscribeTask(SubscribeOnObserver parent) { this.parent = parent; }
@Override public void run() { //这是的source就是我们自定义的Observable对象,就是ObservableCreate source.subscribe(parent); } }
可以看到,这个类非常简单,实现了Runnable接口,在run方法中调用source.subscribe(parent);,这是个链式调用,会一层一层调用上去。
再来看scheduler.scheduleDirect
这是线程切换的核心部分了,一定要仔细看
public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); }
//run就是SubscribeTask public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { //createWorker在Schedule类中是个抽象方法,所以实现是在子类中 //所以这个方法就是在IOSchedule中实现的 //worker中可以执行runnabale final Worker w = createWorker();
//实际上decoratedRun还是个run对象,也就是SubscribeTask final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//runnable和worker包装成一个DisposeTask DisposeTask task = new DisposeTask(decoratedRun, w);
//Worker执行这个Task w.schedule(task, delay, unit);
return task; }
上面的代码注释已经写得非常详细了,scheduleDirect方法就是,new一个worker,然后使用这个worker来执行task线程。
再看一下IoIoScheduler中,createWorker以及shedule的过程
public Worker createWorker() { //new一个EventLoopWorker并传一个worker的缓存池进去 return new EventLoopWorker(pool.get()); }
static final class EventLoopWorker extends Scheduler.Worker { private final CompositeDisposable tasks; private final CachedWorkerPool pool; private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.tasks = new CompositeDisposable(); //从缓存worker池中取一个worker出来 this.threadWorker = pool.get(); }
…
@NonNull @Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { // don’t schedule, we are unsubscribed return EmptyDisposable.INSTANCE; }
//Runnable交给threadWorker去执行 return threadWorker.scheduleActual(action, delayTime, unit, tasks); } }
注意的是,不同的Scheduler类会有不同的Worker实现,因为Scheduler类最终都是交由worker来执行调度的,不过分析起来差别不大。
接下来我们看worker的缓存池操作
static final class CachedWorkerPool implements Runnable { …
ThreadWorker get() { if (allWorkers.isDisposed()) { return SHUTDOWN_THREAD_WORKER; } while (!expiringWorkerQueue.isEmpty()) { //缓冲池不为空,就从缓存池中取一个threadWorker ThreadWorker threadWorker = expiringWorkerQueue.poll(); if (threadWorker != null) { return threadWorker; } }
// No cached worker found, so create a new one. //为空就一个并返回去 ThreadWorker w = new ThreadWorker(threadFactory); allWorkers.add(w); return w; } … }
再看worker的执行代码threadWorker.scheduleActual
代码跟进,会发现实现在它的父类NewThreadWorker中
public class NewThreadWorker extends Scheduler.Worker implements Disposable { private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) { //在构造中创建一个ScheduledExecutorService对象 //可以通过它来使用线程池 executor = SchedulerPoolFactory.create(threadFactory); } … public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { //这是decoratedRun就相当于run Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//将decoratedRun包装成一个新对象ScheduledRunnable ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) { if (!parent.add(sr)) { return sr; } }
Future<?> f; try { if (delayTime <= 0) { //线程池中立即执行ScheduledRunnable f = executor.submit((Callable)sr); } else { //线程池中延迟执行ScheduledRunnable f = executor.schedule((Callable)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { … }
return sr; } … }
这里的executor就是使用线程池来执行任务,最终subscribeTask的run方法会在线程池中被执行,即上游的Observable的subscribe方法会在IO线程中调用了。
小结
- Observer的onSubscribe方法运行在当前线程中,因为源码中并没有线程切换
- 如果设置了subscribeOn(指定线程),那么Observable中的subscribe方法将会运行在指定线程中。
- 当多个subscribeOn调用时,因为从源码可知,线程的切换是从下往上的,最后也就是链式调用的第一个切换过程,才是有效的切换
observeOn源码分析
.observeOn(AndroidSchedulers.mainThread())
AndroidSchedulers.mainThread() 同样的,我们先看AndroidSchedulers.mainThread()的源码
public static Scheduler mainThread() { return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); } private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler( new Callable() { @Override public Scheduler call() throws Exception { return MainHolder.DEFAULT; } }); private static final class MainHolder { static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()), false); }
这一段代码相信如果是看了上面的源码分析的话,一眼就能看出来,其实就相当于new HandlerScheduler(new Handler(Looper.getMainLooper()), false);,把一个主线程的Handler包装进了HandlerScheduler中。
observeOn 然后我们继续看observeOn的源码
public final Observable observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, “scheduler is null”); ObjectHelper.verifyPositive(bufferSize, “bufferSize”); return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize)); }
通过源码也可以知道,这里相当于直接new ObservableObserveOn
public final class ObservableObserveOn extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn(ObservableSource source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; }
@Override protected void subscribeActual(Observer<? super T> observer) { //判断是否是当前线程 if (scheduler instanceof TrampolineScheduler) { //是当前线程的话,直接调用下游的subscribe方法 //也就是调用下一个Observable的subscibe方法 source.subscribe(observer); } else { //创建worker //本例中的schedule为HandlerScheduler Scheduler.Worker w = scheduler.createWorker();
//这里和上面分析有点类似,会将worker包装到ObserveOnObserver中 //注意:source.subscribe没有涉及到worker,所以还是在之间设置的线程中执行 source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize)); } } … }
首先,判断是否已经在要切换的线程上了,如果是的话,那么直接调用。如果不是,那么使用HandlerScheduler包装一下worker,然后通过worker来把下游的事件进行切换,直接上游订阅,不做线程操作。
我们来看ObserveOnObserver类的源码
static final class ObserveOnObserver extends BasicIntQueueDisposable implements Observer, Runnable { …
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this.downstream = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; }
@Override public void onNext(T t) { if (done) { return; }
if (sourceMode != QueueDisposable.ASYNC) { //将信息存入队列中 queue.offer(t); } schedule(); } …
void schedule() { if (getAndIncrement() == 0) { //在这里调用 worker.schedule(this); } }
void drainNormal() { int missed = 1;
//存储消息的队列 final SimpleQueue q = queue; //这里的downstram实际就是下游的observer final Observer<? super T> a = downstream;
for (;😉 { if (checkTerminated(done, q.isEmpty(), a)) { return; }
for (;😉 { boolean d = done; T v;
try { //从队列中取出消息 v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); disposed = true; upstream.dispose(); q.clear(); a.onError(ex); worker.dispose(); return; } boolean empty = v == null;
if (checkTerminated(d, empty, a)) { return; }
if (empty) { break; }
//调用下游的Observer的onNext a.onNext(v); }
missed = addAndGet(-missed); if (missed == 0) { break; } } }
@Override public void run() { //outputFused默认是false if (outputFused) { al() { int missed = 1;
//存储消息的队列 final SimpleQueue q = queue; //这里的downstram实际就是下游的observer final Observer<? super T> a = downstream;
for (;😉 { if (checkTerminated(done, q.isEmpty(), a)) { return; }
for (;😉 { boolean d = done; T v;
try { //从队列中取出消息 v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); disposed = true; upstream.dispose(); q.clear(); a.onError(ex); worker.dispose(); return; } boolean empty = v == null;
if (checkTerminated(d, empty, a)) { return; }
if (empty) { break; }
//调用下游的Observer的onNext a.onNext(v); }
missed = addAndGet(-missed); if (missed == 0) { break; } } }
@Override public void run() { //outputFused默认是false if (outputFused) {
|