IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 移动开发 -> RxJava的Disposable及其工作原理 -> 正文阅读

[移动开发]RxJava的Disposable及其工作原理

一、关于 Disposable

任何订阅者模式的代码,都需要注意注册与注销的配对出现,否则会出现内存泄漏。RxJava2 提供了 Disposable( RxJava1 中是 Subscription),在适当时机取消订阅、截断数据流。当在 Android 中使用时尤其要注意,避免内存泄露。

private CompositeDisposable compositeDisposable = new CompositeDisposable();

@Override 
public void onCreate() {
    compositeDisposable.add(backendApi.loadUser()
      .subscribe(this::displayUser, this::handleError));
}

@Override public void onDestroy() {
    compositeDisposable.clear();
}

上面例子展示了在 Activity 等 LifecycleOwner 中的一般做法:使用 CompositeDisposable 收集所有的 Disposable 句柄,而后在 onDestroy 中调用 clear 统一注销。
clear 最终调用的是各个 Disposable 的 dispose 方法:

public interface Disposable {
	void dispose();
	boolean isDisposed();
}

当然,除了手动调用 dispose,也有一些自动框架可供使用, 如 RxLifecycle 、uber 的 AutoDispose 等, 但最终都要调用到 Disposable 的 dispose() 。

二、dispose 实现原理

Disposable disposable = Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
		for (int i = 1; i <= 3; i++) {
			observableEmitter.onNext(i);
		}
	})
	.takeUntil(integer -> integer < 3)
	.subscribe();
...
...
//在适当的是调用dispose取消订阅
disposable.dispose()

1、Disposable 是一个 Observer

调用 Observable.subscribe(…) 返回的 Disposable 本质是一个 LambdaObserver

public final Disposable subscribe(
    @NonNull Consumer<? super T> onNext,
    @NonNull Consumer<? super Throwable> onError,
    @NonNull Action onComplete) {

	LambdaObserver<T> ls = new LambdaObserver<>(
      onNext, onError, onComplete,
      Functions.emptyConsumer());

	subscribe(ls);

	return ls; //return as a Disposable 
}

LambdaObserver 集众多接口于一身

public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable
  • 首先,是一个 Observer,被subscribe()后,通过onNext发射数据;
  • 其次,是一个 Disposable,对外提供 dispose 方法;
  • 最后,通过 AtomicReference,确保 dispose 线程安全的执行
@Override
public void dispose() {
	DisposableHelper.dispose(this);
}

public static boolean dispose(AtomicReference<Disposable> field) {
    Disposable current = field.get();
    Disposable d = DISPOSED;
    if (current != d) {
        current = field.getAndSet(d);
        if (current != d) {
            if (current != null) {
                current.dispose();
            }
            return true;
        }
    }
    return false;
}

原子地设置 DISPOSED, 确保 AtomicReference 中的 Disposable 的 dispose 一定被调用,有且仅有一次。

2、onSubscribe 中传递 Disposable
AtomicReference 的 value 是在 Observer.onSubscribe 中被赋值的:

@Override
public void onSubscribe(Disposable d) {
	if (DisposableHelper.setOnce(this, d)) { //设置 value
		try {
			onSubscribe.accept(this);
		} catch (Throwable ex) {
                ...
		}
	}
}

那么 Observer.onSubscribe 又是何时被调用呢?

RxJava 的操作符都是一个 Observable 实现。操作符链式调用的本质就创建 Observable 并通过 subscribe 依次订阅。 subscribe 内部会用 subscribeActual ,这是每个操作符都必须实现的方法。

看一下 Observabel.create 的 subscribeActual :

调用 Observer.onSubscrie(), 将当前 Disposable 作为 parent 传递给下游

protected void subscribeActual(Observer<? super T> observer) {
	CreateEmitter<T> parent = new CreateEmitter<>(observer); // CreateEmitter是一个Diposable
	observer.onSubscribe(parent); // Observer.onSubscrie()
  
	try {
        source.subscribe(parent);
	} catch (Throwable ex) {
        ...
    }
}

3、Observer 关联上下游
除 create、subscribe 这样的终端操作符以外,大部分的操作符的 Observer 结构如下:


/** The downstream subscriber. */
protected final Observer<? super R> downstream;

/** The upstream subscription. */
protected Disposable upstream;
    
public final void onSubscribe(Disposable d) {
	...
	this.upstream = d;
	downstream.onSubscribe(this);
	...
}

public void dispose() {
	upstream.dispose();
}
  • Observer 持有上下游对象:upstream 和 downstream
  • onSubscribe 向下递归调用
  • dipose 向上递归调用

在链式订阅中,向下游订阅 Observer 的同时,也关联了上游的 Disposable(Observer)

我们在最下端调用 subscribe 时,调用链上的 Observer 会建立上下游关联,当我们在下游调用 dispose 时,最终会递归调用到顶端(create)的 dispose

4、再看takeUntil的例子
根据上述分析,在回顾一下最初 takeUntil 的例子。
前面说过所有的操作符都是 Observable:

  • takeUntil 对应的Observable: ObservableTakeUntilPredicate;
  • create 对应的Observable: ObservableCreate

subscribe 调用链如下:

在这里插入图片描述
随着 onSubscribe 的调用,Disposable 也建立了如下引用链:

在这里插入图片描述
当我们调用 dispose 方法时,通过引用链递会最终调用到 CreateEmitter 的 dispose。
由于 CreateEmitter 将 AtomicReference 的 value 设为 DISPOSED
后续,onNext 中判断状态,当为 DISPOSED 时,数据流停止发射

@Override
public void onNext(T t) {
    if (!isDisposed()) { //是否为DISPOSED
        observer.onNext(t);
    }
}
  移动开发 最新文章
Vue3装载axios和element-ui
android adb cmd
【xcode】Xcode常用快捷键与技巧
Android开发中的线程池使用
Java 和 Android 的 Base64
Android 测试文字编码格式
微信小程序支付
安卓权限记录
知乎之自动养号
【Android Jetpack】DataStore
上一篇文章      下一篇文章      查看所有文章
加:2021-11-27 10:01:16  更:2021-11-27 10:02:32 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 5:32:02-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码