IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 移动开发 -> rxjava背压(1),4年小Android的心路历程 -> 正文阅读

[移动开发]rxjava背压(1),4年小Android的心路历程

这样来看,我的内存就稳定,老铁稳。

那么,试试第二种方法,下游少接收点事件

//定时取样
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
for (int i = 0; ; i++) { //无限循环发事件
emitter.onNext(i);

}
}
}).sample(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {

Log.d(TAG, “” + integer);
}
});

或者是用过滤操作符,过滤掉一些上游事件

//过滤器 过滤操作
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
for (int i = 0; ; i++) { //无限循环发事件
emitter.onNext(i);

}
}
}).filter(new Predicate() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 100 == 0;

}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {

Log.d(TAG, “” + integer);
}
});


背压策略

上面唠唠叨叨说了那么多,基本上也给大家阐明了阻塞形成的原因和解决阻塞的方法,基本策略就是减少发送事件的频率和减少发送事件的数量。

But……

我们手动让上游发送事件的速度满下来貌似是不可取的,你想让上游的速度十多快呢?上游需要等多久呢?

还有……

我们依旧无法知道下游处理事件的能力,无法很好地处理阻塞的事件。

当然,你们肯定会说RxJava2.0不是很好地支了背压了吗?是的,确实比较好地对阻塞做了处理,咱们来看下吧。

在RxJava2.0中官方,推出了Flowable 和Subscriber用来支持背压,同样的去除了Observable对背压的支持,对的就像你上面看到的,Observable不再支持背压了,即使阻塞崩溃也不会抛出MissingBackpressureException

还是上代码看看,Flowable的用法吧。

Flowable.create(FlowableOnSubscribe source, BackpressureStrategy mode)

创建Flowable会默认让传入一个FlowableOnSubscribe和一个BackpressureStrategy,FlowableOnSubscribe很好理解就是一个就是Flowable的一个被观察者源,而BackpressureStrategy就是Flowable提供的背压策略

有哪些策略还是上源码看下吧:

public enum BackpressureStrategy {
/**

  • OnNext events are written without any buffering or dropping.
  • Downstream has to deal with any overflow.
  • Useful when one applies one of the custom-parameter onBackpressureXXX operators.

/
MISSING,
/
*

  • Signals a MissingBackpressureException in case the downstream can’t keep up.
    /
    ERROR,
    /
    *
  • Buffers all onNext values until the downstream consumes it.
    /
    BUFFER,
    /
    *
  • Drops the most recent onNext value if the downstream can’t keep up.
    /
    DROP,
    /
    *
  • Keeps only the latest onNext value, overwriting any previous value if the
  • downstream can’t keep up.
    */
    LATEST
    }

MISSING:
如果流的速度无法保持同步,可能会抛出MissingBackpressureException或IllegalStateException。

BUFFER
上游不断的发出onNext请求,直到下游处理完,也就是和Observable一样了,缓存池无限大,最后直到程序崩溃

ERROR
会在下游跟不上速度时抛出MissingBackpressureException。

DROP
会在下游跟不上速度时把onNext的值丢弃。

LATEST
会一直保留最新的onNext的值,直到被下游消费掉。


先不看上面的策略,我们最起码先看看Flowable怎么用吧

Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter e) throws Exception {
Log.d(TAG, “emit 1”);
emitter.onNext(1);
Log.d(TAG, “emit 2”);
emitter.onNext(2);
Log.d(TAG, “emit 3”);
emitter.onNext(3);
Log.d(TAG, “emit complete”);
emitter.onComplete();

}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {

}

@Override
public void onNext(Integer s) {
Log.d(TAG, "onNext: " + integer);
}

@Override
public void onError(Throwable t) {
Log.d(TAG, “onError”+t);

}

@Override
public void onComplete() {
Log.d(TAG, “onComplete”);

}
});

上游 Flowable 构建FlowableEmitter用来发送上游事件,这里的背压策略我们采用ERROR,下游方法中出现了一个和原来

@Override
public void onSubscribe(Subscription s) {

}

Subscription.java

public interface Subscription {

public void request(long n);

public void cancel();

}

这里需要重点说明一下,在Flowable中背压采取拉取响应的方式来进行水流控制,也就是说Subscription是控制上下游水流的主要方式,一般的,我们需要调用Subscription.request()来传入一个下游目前能处理的事件的数量

那么,我们不传会怎么样?

备注:这里上下游是在不同的线程里进行的,如果在同一个线程里,它也会抛出一个Missi
ngBackpressureException,让你去设置 调用request()方法

咦,我上游发送的事件,下游一个没收到啊

那么也就是说上游不能发射事件,是因为你没有调用request方法,因为你不调用request()上游不知道下游能处理事件的能力啊。

那么,也就是说我必须要调用request方法咯,那么我们就调用一下喽,官方说默认推荐使用Long.MAX_VALUE。

好吧,那么我们来试下吧,加上如下代码。

@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE); //下游处理事件能力值
}

咦,还真正常了啊。

那么,我们设置个2试试?

s.request(2);

也就是说我们下游告诉上游我们能处理2个事件,这样上游就缓存池中取出了2个事件给发送给了下游。这点相比Rxjava1.0可以说是智能了很多,并不会一股脑的抛给下游而是又下游来主动拉取事件。

ERROR

Flowable既然可以跑了,那么咱们就来试试背压吧,我们还是采用BackpressureStrategy.ERROR这个策略,如果下游处理不过来就抛出异常。

Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
for (int i = 0;i< 128; i++) {
Log.d(TAG, "emit " + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber() {

@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, “onSubscribe”);

}

@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}

@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}

@Override
public void onComplete() {
Log.d(TAG, “onComplete”);
}
});

我们,首先让上游发送128个事件,下游不做处理,恩好吧是正常的

现在我们把128改成129,怎么就异常了呢?

好吧,还是看源码吧

纳尼,原来Flowable的缓存池的最大大小是128吧,如果缓存池里有超过128个事件就会抛出异常,提示你去处理这些事件。

MISSING

那么,MISSING和ERROR有什么区别呢?

我们把缓存策略设置为BackpressureStrategy.MISSING试一下

Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
for (int i = 0;i< 129; i++) {
Log.d(TAG, "emit " + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.MISSING).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber() {

@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, “onSubscribe”);
subscription = s;
}

@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}

@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}

@Override
public void onComplete() {
Log.d(TAG, “onComplete”);
}
});

结构还是一样的,不过这次很友好的提示你队列满了

io.reactivex.exceptions.MissingBackpressureException: Queue is full?!

下面是MISSING策略的备注:

/**

  • OnNext events are written without any buffering or dropping.
  • Downstream has to deal with any overflow.
  • Useful when one applies one of the custom-parameter onBackpressureXXX operators.

*/

也就是说,这种策略是不丢弃,不缓存的策略,那么我要你也没什么用啊…………

BUFFER

BUFFER是一个无限大的缓存池,也就是说我们可以往里面存储无限多的事件

Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
for (int i = 0;i<129 ; i++) {
Log.d(TAG, "emit " + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber() {

@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, “onSubscribe”);

}

@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}

@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}

@Override
public void onComplete() {
Log.d(TAG, “onComplete”);
}
});

但是,如果我们发送无数多的事件,同样要注意内存情况。

DROP和LATEST

首先我们看下Drop

Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
Log.d(TAG, "requested: " + emitter.requested());
for (int i = 0; ; i++) {
emitter.onNext(i);

}
}
}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber() {

@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, “onSubscribe”);
subscription = s;

}

@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}

@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}

@Override
public void onComplete() {
Log.d(TAG, “onComplete”);
}
});

我们把对象subscription放到外面,在外面调用request方法(让事件往下面传递,上面说过!!!),看下输出情况。
每次点击界面上的按钮触发下面的操作。

假装有按钮

subscription.request(64);

可以看到,刚进入时打印了当前的request大小,默认为缓存池的大小128.

当我们点击按钮触发subscription.request(64)时,它会从缓存池中取出64个事件发送给下游,当我们呢再次点击,它又取出了64条。但是,当我们第三次点击按钮时,看到了上面令我们诧异的结果。为什么呢?

FLowable内部的默认的水缸大小为128, 因此, 它刚开始肯定会把0-127这128个事件保存起来, 然后丢弃掉其余的事件, 当我们request(64)的时候,下游便会处理掉这64个事件,当第二次请求时把水缸里剩余的64个事件清空, 那么上游水缸中又会重新装进新的128个事件。

也就是说,我先存128个,当这128个被清空后从新再装进128吧,那么中间这个过程中上游发送的事件,下游就给全部丢掉了。

,看下输出情况。
每次点击界面上的按钮触发下面的操作。

假装有按钮

subscription.request(64);

可以看到,刚进入时打印了当前的request大小,默认为缓存池的大小128.

当我们点击按钮触发subscription.request(64)时,它会从缓存池中取出64个事件发送给下游,当我们呢再次点击,它又取出了64条。但是,当我们第三次点击按钮时,看到了上面令我们诧异的结果。为什么呢?

FLowable内部的默认的水缸大小为128, 因此, 它刚开始肯定会把0-127这128个事件保存起来, 然后丢弃掉其余的事件, 当我们request(64)的时候,下游便会处理掉这64个事件,当第二次请求时把水缸里剩余的64个事件清空, 那么上游水缸中又会重新装进新的128个事件。

也就是说,我先存128个,当这128个被清空后从新再装进128吧,那么中间这个过程中上游发送的事件,下游就给全部丢掉了。

  移动开发 最新文章
Vue3装载axios和element-ui
android adb cmd
【xcode】Xcode常用快捷键与技巧
Android开发中的线程池使用
Java 和 Android 的 Base64
Android 测试文字编码格式
微信小程序支付
安卓权限记录
知乎之自动养号
【Android Jetpack】DataStore
上一篇文章      下一篇文章      查看所有文章
加:2022-01-30 19:02:44  更:2022-01-30 19:03:46 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/28 5:46:46-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码