flow {}
以下面代码为例,讲解 flow 工作的基本流程:
flow { emit(1) }.collect { println(it) }
首先看一下 flow {} 的源码:
public fun flow(@BuilderInference block: suspend FlowCollector.() -> Unit): Flow = SafeFlow(block)
上面就是以 block 代码块为参数创建了一个 SafeFlow 对象,SafeFlow 实现了 Flow 接口,于是接着看其 collect 方法。
collect
除了一开始贴的实现 Flow 接口调用 collect 方法的方式, Kotlin 还提供了调用 collect 的两个扩展函数,最后都是调用的 fun collect(collector: FlowCollector<T>) 方法:
public suspend fun Flow<*>.collect(): Unit = collect(NopCollector)
public suspend inline fun Flow.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector { override suspend fun emit(value: T) = action(value) })
于是我们接着上面的示例,看一下 SafeFlow.collect 方法:
private class SafeFlow(private val block: suspend FlowCollector.() -> Unit) : AbstractFlow() { override suspend fun collectSafely(collector: FlowCollector) { collector.block() } }
// collect 方法在父类 AbstractFlow 中 public abstract class AbstractFlow : Flow, CancellableFlow { public final override suspend fun collect(collector: FlowCollector) { val safeCollector = SafeCollector(collector, coroutineContext) try { collectSafely(safeCollector) } finally { safeCollector.releaseIntercepted() } }
public abstract suspend fun collectSafely(collector: FlowCollector) }
可以看到 collect 方法中通过 collector 封装了一个 SafeCollector 对象,并以其为参数执行了 SafeFlow.collectSafely 方法,而 collectSafely 方法只是执行了 block 代码块(collector.block() ),它是一个扩展函数,所以执行的示例代码中的 emit(1) 其实就是调用了 SafeCollector.emit(1), 然后在 SafeCollector 中对 FlowCollector 做了一层安全校验后,最后还是会调用 FlowCo llector.emit 方法,即创建 SafeCollector 时传入的 collector 对象的 emit 方法。这里只关注核心流程,故不贴出具体代码了。
根据上面我们看到的 collect {} 扩展函数的源码,可以知道其 emit 方法其实就是执行 collect {} 中传入的 action 代码块,参数为 emit 发射的值 – 1.
小结:flow {} 方式(或flowOf, asFlow)创建的 Flow 实例是 SafeFlow 类型,其父类是 AbstractFlow 抽象类,当调用其 collect(FlowCollector) 方法时,首先会执行该 Flow 对象传入的 block 代码块,代码块中一般会有 emit 方法发射值,这个 emit 调用的就是 AbstractFlow.emit 方法,在其中做了安全判定后,会接着调用到 collect 中传入的 FlowCollector.emit 方法,对于 collect {} 的情况,emit 方法内部就是执行 collect 传入的 action 代码块。因为它在每次调用 collect 时才去触发发送数据的动作,所以说 Flow 是冷流。
主要流程如下图:
flowOn
学习 flow 一个绕不开的操作符就是 flowOn 了,以下面示例代码为例, flow 需要在协程中使用,下面的 emit(1) 会在 Dispatchers.Default 指定的线程中执行,而 println(it) 会在父协程所在线程中执行:
flow { emit(1) }.flowOn(Dispatchers.Default).collect { println(it) }
flow {} 的源码在上面已经看过了,就是以 block 代码块为参数创建了一个 SafeFlow 对象,接下来看一下 Flow.flowOn 的逻辑:
public fun Flow.flowOn(context: CoroutineContext): Flow { checkFlowContext(context) return when { // 返回自身 Flow 实例 // 这里我们传入了 Dispatchers.Default, 所以不符合这个条件 context == EmptyCoroutineContext -> this // SafeFlow 不是该类型,因此也不走这个流程,实际上 FusibleFlow 是当连续多次调用 flowOn 后会创建的 Flow 对象 this is FusibleFlow -> fuse(context = context) // 逻辑走到这里 else -> ChannelFlowOperatorImpl(this, context = context) } }
在上面已经对流程注释了一下,因此上述实例代码转换一下即为: SafeFlow.flowOn.collect {} --> ChannelFlowOperatorImpl.collect {} , 这里注意一下创建 ChannelFlowOperatorImpl 对象时传入的两个参数,第一个 this 指的是之前的 SafeFlow 对象,第二个 context 参数即是我们传入的调度器,它是一个协程上下文。
ChannelFlowOperatorImpl.collect 实现在父类 ChannelFlowOperator.collect 中,该方法如果发现传入的 coroutineContext 上下文中没有携带调度器,即我们调用 flowOn 时没有传入 Dispatchers 等调度器,则会直接调用上一层 SafeFlow 的 collect 方法(代码不贴了),否则接着调用父类 ChannelFlow 中的 collect 方法,我们直接看 flowOn 中传入了调度器后的逻辑:
internal abstract class ChannelFlowOperator<S, T>( @JvmField protected val flow: Flow, context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow ) : ChannelFlow(context, capacity, onBufferOverflow) { override suspend fun collect(collector: FlowCollector) { // 判断 coroutineContext 逻辑 // … super.collect(collector) // 调用父类 ChannelFlow 中方法 } }
public abstract class ChannelFlow( // upstream context @JvmField public val context: CoroutineContext, // buffer capacity between upstream and downstream context @JvmField public val capacity: Int, // buffer overflow strategy @JvmField public val onBufferOverflow: BufferOverflow ) : FusibleFlow { override suspend fun collect(collector: FlowCollector): Unit = coroutineScope { collector.emitAll(produceImpl(this)) }
public open fun produceImpl(scope: CoroutineScope): ReceiveChannel = scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun) }
这里可以看到 ChannelFlowOperatorImpl.collect 最后会走到 collector.emitAll(produceImpl(this)) 生产消费的逻辑,我们分步骤看一下生产和接收的流程。
生产数据
首先看上面 produceImpl 方法:
internal fun CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, start: CoroutineStart = CoroutineStart.DEFAULT, onCompletion: CompletionHandler? = null, @BuilderInference block: suspend ProducerScope.() -> Unit ): ReceiveChannel { val channel = Channel(capacity, onBufferOverflow) val newContext = newCoroutineContext(context) val coroutine = ProducerCoroutine(newContext, channel) if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion) coroutine.start(start, coroutine, block) return coroutine }
看到这个方法,是不是很熟悉呢?参考之前的 Kotlin之深入理解协程工作原理 的文章可以知道,这里的 produce 方法其实就是启动了一个新的协程,该协程执行的代码块 block 是传入的 collectToFun 参数,接着找 collectToFun 可以发现它会取 ChannelFlowOperator.collectTo 方法:
// ChannelFlowOperator protected override suspend fun collectTo(scope: ProducerScope) = // flowCollect 方法实现在子类 ChannelFlowOperatorImpl 中 flowCollect(SendingCollector(scope))
// ChannelFlowOperatorImpl internal class ChannelFlowOperatorImpl( flow: Flow, context: CoroutineContext = EmptyCoroutineContext, capacity: Int = Channel.OPTIONAL_CHANNEL, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) { override suspend fun flowCollect(collector: FlowCollector) = // 这个 flow 就是上层传入的 SafeFlow 对象 flow.collect(collector) }
根据之前的解析, flow.collect(collector) 中的 flow 是 SafeFlow 对象,其 collect 方法会执行 SafeFlow 中传入的代码块(即flow {} ),这个代码块中调用了 collector.emit(1) 方法(上面代码可以看出此时的 collector 是 SendingCollector 实例),因此我们看看 SendingCollector.emit 方法做了什么:
public class SendingCollector( private val channel: SendChannel ) : FlowCollector { override suspend fun emit(value: T): Unit = channel.send(value) }
.emit 方法做了什么:
public class SendingCollector( private val channel: SendChannel ) : FlowCollector { override suspend fun emit(value: T): Unit = channel.send(value) }
|