概念
挂起函数是单个的异步操作只能返回一个值,Flow用于表达连续的多个异步操作,可以返回多个值。
- 冷流:即懒加载,Flow构建器中的代码直到流被最终消费的时候才运行,所以是响应式编程(也叫声明式)。
- 一些名词:构建→上游(前面的流)→中间消费者/当前操作→下游(后面的流)→消费。
- 中间消费:上流和下流之间可以插入中间消费者,拦截上流数据经过任意处理后再转发给下流,因为Folw是冷流所以多个中间消费者可以看作是几串待执行的调用链。
- 有序:元素遵循先进先出原则,是一个接一个操作完,而不是统一操作再进行下一步。
- 协作取消:消费的取消只能在可取消的挂起函数中挂起的时候取消。
- 默认情况下,上流和下流都是在同一线程中进行,可以通过 flowOn 改变上流执行线程而不会影响下流。
- 背压:消费操作对数据的处理都被包含在 suspend 修饰的 Lambda 中,因此在协程中调用能轻松切换线程,也正是由于挂起恢复的特性会阻塞上流数据产生的速度不会对下流产生背压。
冷流 | 无消费时,不产生数据。与订阅者是一对一关系,多个订阅者彼此之间独立,数据是完整重新发送。 | 热流 | 无消费时,也会产生数据。与订阅者是一对多关系,多个订阅者彼此之间共享,数据会被修改。 |
Flow | 冷流。 | SharedFlow | 热流。无需初始值。 | StateFlow | 热流。需要初始值。继承自ShareeFlow。类比LiveData。 始终有值,值唯一,多个观察者数据共享,只会提供最新值。 |
中间操作符 | 执行一些操作,不会立即执行,返回的还是Flow。 | 末端操作符 | 最终操作,会触发流的执行,返回的是结果。 |
Flow
构建
flow | public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block) 创建Flow的基本方法,需要手动使用 emit?发射单个值,使用 emitAll 发射一个Flow(类似 list.addAll() )。 | flowOf | public fun <T> flowOf(vararg elements: T): Flow<T> = flow { ? ? for (element in elements) { emit(element) } } 快速创建固定值集的Flow(类似 listOf() )。 | asFlow | public fun IntRange.asFlow(): Flow<Int> = flow { ? ? forEach { value -> emit(value) } } 将其他数据转换为Flow(集合、序列)。 | callbackFlow | public fun <T> callbackFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> = CallbackFlowBuilder(block) 将函数改造成Flow。 | emptyFlow | public fun <T> emptyFlow(): Flow<T> = EmptyFlow 返回一个空的Flow。 | channelFlow | public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> = ChannelFlowBuilder(block) 允许内部切换线程的Flow。 |
val flow1 = flow {
emit(5)
for (i in 1..3) {
emit(i)
}
emitAll(flowOf(1, 2, 3))
}
val flow2 = flowOf(1, 2, 3)
val flow3 = listOf(1, 2, 3).asFlow()
fun method(): Flow<Int> = callbackFlow { }
val flow4 = emptyFlow<Int>()
val flow5 = channelFlow<Int> { }
搜索场景使用debounce防抖,网络请求使用retry,组件通信使用SharedFlow, 数据合并使用combine等操作符。
中间操作
过滤出
filter | public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value -> ? ? if (predicate(value)) return@transform emit(value) } 保留符合条件的元素。 | filterNot | public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value -> ? ? if (!predicate(value)) return@transform emit(value) } 保留不符合条件的元素。 | filterNotNull | public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T> = transform<T?, T> { value -> ? ? if (value != null) return@transform emit(value) } 保留不为null的元素。 | filterIsInstance | public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter { it is R } as Flow<R> 保留对应类型的元素(类型填到泛型里面)。 |
线程切换
flowOn | public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>? Flow默认执行在消费所在的线程。flowOn使上游执行在指定的线程上,不会影响下游。上游还有 flowOn 的时候,只影响他们之间的那些操作。下游默认还是执行在消费所在的线程。 |
背压?
数据的消费速度赶不上生产速度。
- 默认情况:生产和消费是同步的,而且元素是一个接一个处理不是同时,生产完耗时会包含每一次消费耗时。
- buffer:相当于一个蓄水池,设置缓冲区大小,先生产一些元素缓存起来。
- BufferOverflow.SUSPEND模式:默认模式,缓冲区满了就和默认情况一样等待,消费一个再生产一个。
- BufferOverflow.DROP_OLDEST模式:缓冲区满了就丢弃还没消费掉的旧元素,存入新生产的元素。
- BufferOverflow.DROP_LATEST模式:缓冲区满了就丢弃后来新生产的元素,直到有空位了再往里存又生产的(不是丢弃掉的那些)。
buffer | public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T>? 参数capacity是缓冲区大小,参数onBufferOverflow是对缓冲区满后新生产元素的处理模式。 | conflate | public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED) 会丢弃中间元素,只有首尾元素会被保留。相当于 buffer(0 , BufferOverflow.DROP_OLDEST)。 |
转换
transform transformLatest transformWhile | 可以控制流速(相较于map) | 只对最后一个元素消费。 | 为true继续消费,为false后续元素都丢弃。 | map mapLatest mapNotNull | | | |
val flow = (1..5)asFlow()
flow.transform{
delay(1000)
emit(it * 10)
}.collect{
println("transform:$it")
}
flow.transformLatest{
emit(it * 100)
}.collect{
println("transformLast:$it")
}
flow.transformWhile{
it != 3
}.collect{
println("transformWhile:$it")
}
截取
take takeWhile | public fun <T> Flow<T>.take(count: Int): Flow<T>? 获取几个元素,丢弃剩下的。 | public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T> 满足条件就获取元素,只要碰到不满足条件的元素就丢弃剩下全部元素(即便剩下的里面有满足条件的),第一个元素就不满足会全部丢弃。 | drop dropWhile | public fun <T> Flow<T>.drop(count: Int): Flow<T> 丢弃几个元素,获取剩下的。 | public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T> 满足条件就丢弃元素,只要碰到不满足条件的元素就获取剩下全部元素(即便剩下的里面有满足条件的),第一个元素就不满足会全部获取。 |
val flow = (1..5).asFlow()
flow.take(3).collect { print(",$it") } //打印:1,2,3
flow.takeWhile { it > 3 }.collect { print(",$it") } //打印:无内容
flow.drop(3).collect { print(",$it") } //打印:4,5
flow.dropWhile { it == 3 }.collect { print(",$it") } //打印:1,2,3,4,5
合并
merge | public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge() 将两个Flow中的元素连接起来。 | zip | public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> 将两个Flow中同索引的元素根据条件合并成一个元素,最终元素个数为数量少的那个(多的元素丢弃)。 |
val flow1 = flowOf(1, 2, 3, 4, 5)
val flow2 = flowOf('a', 'b', 'c')
merge(flow1, flow2).collect { print(",$it") } //打印:1,2,3,4,5,a,b,c
flow1.zip(flow2) { a, b -> "[$a$b]" }.collect { print(it) } //打印:[1a][2b][3c]
展平多维元素
flattenConcat flattenMerge | public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow 将Flow中的多维元素都展平然后全部连接起来。 | public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> 将Flow中的多维元素都展平然后全部连接起来,可以设置并发数。 |
val flow1 = flowOf(1, 2, 3, 4, 5)
val flow2 = flowOf('a', 'b', 'c', 'd', 'e', 'f')
val flow3 = flowOf(flow1, flow2)
flow3.flattenMerge(2).collect { print(",$it") } //打印:1,2,3,4,5,a,b,c,d,e,f
flow3.flattenConcat().collect { print(",$it") } //打印:1,2,3,4,5,a,b,c,d,e,f
组合操作
flatMapContact flatMapMerge flatMapLatest | public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> = map(transform).flattenConcat() 流中流,外流元素执行完内流操作,才轮到下一个外流元素。 | public fun <T, R> Flow<T>.flatMapMerge(concurrency: Int = DEFAULT_CONCURRENCY, transform: suspend (value: T) -> Flow<R>): Flow<R> = map(transform).flattenMerge(concurrency) 流中流,内流元素执行完外流操作,才轮到下一个内流元素。 | public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> = transformLatest { emitAll(transform(it)) } 流中流,外流发射新元素,上一个元素没操作完就会被取消。 |
val flow = (1..3).asFlow()
flow.flatMapConcat {
flow {
emit("$it:前")
delay(500)
emit("$it:后")
}
}.collect { print("[$it]") } //打印:[1:前][1:后][2:前][2:后][3:前][3:后]
println()
flow.flatMapMerge {
flow {
emit("$it:前")
delay(500)
emit("$it:后")
}
}.collect { print("[$it]") } //打印:[1:前][2:前][3:前][1:后][2:后][3:后]
println()
flow.flatMapLatest {
flow {
emit("$it:前")
delay(500)
emit("$it:后")
}
}.collect { print("[$it]") } //打印:[1:前][2:前][3:前][3:后]
重试
retry | 重试机制 ,当流发生异常时可以重新执行。retryWhen 的简化版。 | retryWhen | 有条件的进行重试 ,lambda 中有两个参数: 一个是 异常原因,一个是当前重试的 index (从0开始)。lambda 的返回值 为 Boolean ,true则继续重试 ,false 则结束重试。 |
回调
onStart onCompletion onEach onEmpty onSubion | 在上游流开始之前被调用。可以发出额外元素,也可以处理其他事情,比如发埋点。 | 在流取消或者结束时调用。可以执行发送元素,发埋点等操作。 | 在上游向下游发出元素之前调用。 | 当流完成却没有发出任何元素时回调。可以用来兜底.。 | SharedFlow 专属操作符 (StateFlow是SharedFlow 的一种特殊实现)。 |
变换
asStateFlow asShareFlow receiveAsFlow consumeAsFlow | 将?MutableStateFlow转换为 StateFlow ,就是变成不可变的。常用在对外暴露属性时使用。 | 将?MutableSharedFlow转换为 SharedFlow ,就是变成不可变的。常用在对外暴露属性时使用。 | 将Channel 转换为Flow ,可以有多个观察者,但不是多播,可能会轮流收到值。 | 将Channel 转换为Flow ,但不能多个观察者(会crash)! | withIndex | 将结果包装成IndexedValue类型。 | scan | 和 fold 相似,区别是fold 返回的是最终结果,scan返回的是个flow ,会把初始值和每一步的操作结果发送出去。 | produceIn | 转换为ReceiveChannel?, 不常用。 | runningFold | 区别于 fold ,就是返回一个新流,将每步的结果发射出去。 | runningReduce | 区别于 reduce ,就是返回一个新流,将每步的结果发射出去。 | shareIn | 将普通flow 转化为 SharedFlow , 其有两个参数: scope:?CoroutineScope开始共享的协程范围。 started:?SharingStarted?控制何时开始和停止共享的策略。 | stateIn | 将普通flow 转化为 StateFlow 。其有三个参数: scope- 开始共享的协程范围。 started- 控制何时开始和停止共享的策略。 initialValue- 状态流的初始值。 |
过滤
debounce | 防抖节流 ,指定时间内的值只接收最新的一个,其他的过滤掉。搜索联想场景适用。 | sample | 采样 。给定一个时间周期,仅获取周期内最新发出的值。 | distinctUntilChanged distinctUntilChangedBy | 过滤用,distinctUntilChangedBy的简化调用 。连续两个值一样,则跳过发送。 | 去重操作符,判断连续的两个值是否重复,可以选择是否丢弃重复值。 |
组合
conbine | 组合每个流最新发出的值。 组合两个流,经过一次emic后,任一乙方有新数据就可以emic。 | conbineTransform | 顾名思义?combine+?transform |
功能性
cancellable | 接收的的时候判断 协程是否被取消 ,如果已取消,则抛出异常。 | catch | 对此操作符之前的流异常进行捕获 ,对此操作符之后的流无影响。 |
中间消费
所有的中间消费者都定义成 Flow 的扩展方法,而且都会返回一个新建的下游流(当下流被消费的时候,立马消费上流,收集到上流数据后传递给下流。),这样做是为了让不同的中间消费者可以方便地通过链式调用串联在一起。
transform | 拦截并转发。 | onEach | 通过 transform 构建了一个下游流,转发每一个上游流数据前又做了一件额外的事情 | map | 通过 transform 构建了一个下游流,拿到上游流数据时先将其进行了transform 变换,然后再转发出去。 | onStrat | 通过 unsafeFlow 构建了一个下游流套上游流,在收集动作(所有数据发射之前)之前做了一件额外的事。 | onCompletion | 通过 unsafeFlow 构建了一个下游流套上游流,在收集数据之后再执行动作。 |
注意:链式调用中出现多个onStart { action }时,后出现的 action 会先执行,因为后续 onStart 构建的下游流包在了上游 onStart 的外面,并且 action 会在收集上游流数据之前执行。而这个结论却不能沿用到onCompletion { action },虽然 onCompletion 构建的下游流也包裹在上游 onCompletion 外面,但是 action 总是在收集上游流之后执行。
最终消费
收集
collect | public suspend fun collect(collector: FlowCollector<T>) 收集Flow中的元素。 | collectIndexed | public suspend inline fun <T> Flow<T>.collectIndexed(crossinline action: suspend (index: Int, value: T) -> Unit): Unit | conflateLast | public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) = mapLatest(action).buffer(0).collect() 只收集最后一个元素。 |
线程切换
launchIn | public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch { collect() } 让Flow在指定的其它协程作用域中消费,而不是当前协程中。指定该协程作用域的上下文也就相当于切换了线程,记得接着调用join()。 |
flowOf(1,2,3)
.launchIn(CoroutineScope(Dispatchers.IO)) //相当于切换了线程
.join()
累计
fold | public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S 对元素挨个进行指定运算(1和2计算后,结果和3运算,结果再和4运算...) | reduce | public suspend inline fun <T, R> Flow<T>.fold( initial: R, crossinline operation: suspend (acc: R, value: T) -> R): R 带初始值,对元素挨个进行指定运算(初始值和1计算后,结果和2计算,击鼓再和4运算...) |
val flow = (1..5).asFlow()
println(flow.reduce({ a, b -> a + b })) //打印:15
println(flow.fold(2, { a, b -> a + b })) //打印:17
toCollection toList toSet | 将结果添加到集合。 | 将结果转换为List | 将结果转换为Set。 | last lastOrNull first firstOrNull single singleOrNull | 返回流 发出 的最后一个值 ,如果为空会抛异常。 | 返回流 发出 的最后一个值 ,可以为空。 | 返回流 发出 的第一个值 ,如果为空会抛异常。 | 返回流 发出 的第一个值 ,可以为空。 | 接收流发送的第一个值 ,区别于first,如果为空或者发了不止一个值,则都会报错。 | 接收流发送的第一个值 ,可以为空 ,发出多值的话除第一个,后面均被置为null。 | count | 返回流发送值的个数。类似list.size,注:sharedFlow无效(无意义) |
异常处理
SharedFlow
StateFlow
|