一、冷流Flow
1、基本概念
Flow是一种异步数据流,它按顺序发出值并正常或异常完成。
- 与Rxjava区别:Flow 就是 Kotlin 协程与响应式编程模型结合的产物,与Rxjava非常像,Flow 提供了很多丰富的操作符,例如 map、fliter、count 等等,相比 Rxjava ,Flow 的使用和线程切换更为简单
- 与Sequences区别:每一个Flow其内部是按照顺序执行的,这一点跟Sequences很类似。Flow跟Sequences之间的区别是Flow不会阻塞主线程的运行,而Sequences会阻塞主线程的运行。
- 与热数据流Channel区别:冷数据流Flow与热数据流Channel更好相反,Flow是不消费则不生产
?
2、创建(生产)
方式一,Flow:创建Flow的普通方法,从给定的一个挂起函数创建一个冷数据流。
flow<String> {
emit("Hello")
delay(5000)
emit("World")
}
方式二,flowOf:使用可变数组快速创建flow,类似于listOf()。
flowOf(10, 200, 50, "String")
方式三,asFlow:将其他数据转换成普通的flow,一般是集合向Flow的转换。
val list = arrayListOf(5, 10, 15, 20)
list.asFlow()
(1..5).asFlow()
方式四,channelFlow:支持缓冲通道,线程安全,允许不同的CorotineContext发送事件。
val channelFlow = channelFlow<Int> {
for (i in 1..10) {
delay(100)
send(i)
}
}
?
3、消费
val flow = flow<Int> {
for (i in 1..10) {
delay(1000)
emit(i)
}
}
GlobalScope.launch(Dispatchers.Main) {
flow.collect {
Log.i(TAG, "Collect value is: $it")
}
}
和 RxJava 一样,在创建 Flow 对象的时候我们也需要调用 emit 方法发射数据,collect 方法用来消费收集数据。
- emit(value):收集上游的值并发出。不是线程安全,不应该并发调用。线程安全请使用channelFlow而不是flow。
- collect():接收给定收集器emit()发出的值。它是一个挂起函数,在所在作用域的线程上执行。
flow的代码块只有调用collected()才开始运行,正如 RxJava 创建的 Observables只有调用subscribe()才开始运行一样。如果熟悉 RxJava 的话,则可以理解为collect()对应subscribe(),而emit()对应onNext()。
?
4、线程切换
- 生产端使用flowOn进行线程的指定
- 消费端的线程指定依赖于协程上下文
val flow = flow<Int> {
Log.i(TAG, "Current thread is ${Thread.currentThread().name}")
for (i in 1..3) {
delay(1000)
emit(i)
}
}.flowOn(Dispatchers.IO).map {
Log.i(TAG, "Current thread is ${Thread.currentThread().name}")
it * it
}.flowOn(Dispatchers.Main)
GlobalScope.launch(Dispatchers.Main) {
flow.collect {
Log.i(TAG, "Collect value is: $it, thread is ${Thread.currentThread().name}")
}
}
I: Current thread is DefaultDispatcher-worker-2
I: Current thread is main
I: Collect value is: 1, thread is main
I: Current thread is main
I: Collect value is: 4, thread is main
I: Current thread is main
I: Collect value is: 9, thread is main
?
5、异常处理
val flow = flow<Int> {
delay(100)
emit(10)
throw IllegalArgumentException("抛出异常啦")
emit(100)
}.catch { e ->
Log.i(TAG, "收集到异常:${e.message}")
}.onCompletion {
Log.i(TAG, "OnComplete run")
}
GlobalScope.launch {
flow.collect {
Log.i(TAG, "Collect value is:$it")
}
}
I: Collect value is:10
I: 收集到异常:抛出异常啦
I: OnComplete run
注意:catch 函数只能捕获它的上游的异常,未捕获异常会在消费时抛出。
?
6、背压
什么是背压?就是在生产者的生产速率高于消费者的处理速率的情况下出现,发射的量大于消费的量,造成了阻塞,就相当于压力往回走,这就是背压。只要是响应式编程,就一定会有背压问题。处理背压问题有以下三种方式:
- buffer:指定固定容量的缓存
- conflate:当生产者发射数据速度大于消费者的时候,消费者只能拿到生产者最新发射的数据
- collectLatest:新值发送时,取消之前的。如果生产者数据以及发射过来了,消费者还没有把上一个数据处理完,那么直接停止处理上一条数据,直接处理最新的数据。
val flow = flow<Int> {
(1..3).forEach {
delay(100)
emit(it)
}
}.buffer(3)
GlobalScope.launch {
val time = measureTimeMillis {
flow.collect {
delay(300)
Log.i(TAG, "Collect value is:$it")
}
}
Log.i(TAG, "Total time: $time")
}
I: Collect value is:1
I: Collect value is:2
I: Collect value is:3
I: Total time: 1003
需要100毫秒才能发射一个元素;收集器处理一个元素需要300毫秒。那么不加buffer的情况下,三个数据执行完毕需要约1200毫秒。而加了buffer缓冲后,运行更快,只需要1003毫秒。
val flow = flow<Int> {
(1..5).forEach {
delay(100)
emit(it)
}
}.conflate()
GlobalScope.launch {
val time = measureTimeMillis {
flow.collect {
delay(300)
Log.i(TAG, "Collect value is:$it")
}
}
Log.i(TAG, "Total time: $time")
}
I: Collect value is:1
I: Collect value is:3
I: Collect value is:5
I: Total time: 1021
当数字1扔在处理时,数字2和数字3已经产生了,所以数字2被合并,只有最近的数字1(数字3)被交付给收集器。
val flow = flow<Int> {
(1..5).forEach {
delay(100)
emit(it)
}
}
GlobalScope.launch {
val time = measureTimeMillis {
flow.collectLatest {
delay(300)
Log.i(TAG, "Collect value is:$it")
}
}
Log.i(TAG, "Total time: $time")
}
I: Collect value is:5
I: Total time: 841
由于collectLatest的代码需要300毫秒的时间,但是每100毫秒就会发出一个新值,每个数据都还没处理完,就发过来新的,所以就取消了原来的处理操作。
?
7、操作符
基本操作符
Flow 操作符 | 作用 |
---|
map | 转换操作符,将值转换为另一种形式输出 | take | 接收指定个数发出的值 | filter | 过滤操作符,返回只包含与给定规则匹配的原始值的流。 |
末端操作符 做 collect 处理,collect 是最基础的末端操作符。
末端流操作符 | 作用 |
---|
collect | 最基础的收集数据,触发flow的运行 | toCollection | 将结果添加到集合 | launchIn | 在指定作用域直接触发流的执行 | toList | 给定的流收集到 List 集合 | toSet | 给定的流收集到 Set 集合 | reduce | 规约,从第一个元素开始累加值,并将参数应用到当前累加器的值和每个元素。 | fold | 规约,从[初始]值开始累加值,并应用[操作]当前累加器值和每个元素 |
功能性操作符
功能性操作符 | 作用 |
---|
retry | 重试机制 ,当流发生异常时可以重新执行 | cancellable | 接收的的时候判断 协程是否被取消 ,如果已取消,则抛出异常 | debounce | 防抖节流 ,指定时间内的值只接收最新的一个,其他的过滤掉。搜索联想场景适用 |
回调操作符
回调流操作符 | 作用 |
---|
onStart | 在上游流开始之前被调用。 可以发出额外元素,也可以处理其他事情,比如发埋点 | onEach | 在上游向下游发出元素之前调用 | onEmpty | 当流完成却没有发出任何元素时回调,可以用来兜底 |
组合操作符
组合流操作符 | 作用 |
---|
zip | 组合两个流,分别从二者取值,一旦一个流结束了,那整个过程就结束了 | combine | 组合两个流,在经过第一次发射以后,任意方有新数据来的时候就可以发射,另一方有可能是已经发射过的数据 |
展平流操作符 展平流有点类似于 RxJava 中的 flatmap,将你发射出去的数据源转变为另一种数据源。
展平流操作符 | 作用 |
---|
flatMapConcat | 串行处理数据,展开合并成一个流 | flatMapMerge | 并发地收集所有流,并将它们的值合并到单个流中,以便尽快发出值 | flatMapLatest | 一旦发出新流,就取消前一个流的集合 |
|