IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 移动开发 -> Kotlin协程之flow工作原理,一个三非渣本的Android校招秋招之路 -> 正文阅读

[移动开发]Kotlin协程之flow工作原理,一个三非渣本的Android校招秋招之路

flow {}

以下面代码为例,讲解 flow 工作的基本流程:

flow { emit(1) }.collect { println(it) }

首先看一下 flow {} 的源码:

public fun flow(@BuilderInference block: suspend FlowCollector.() -> Unit): Flow = SafeFlow(block)

上面就是以 block 代码块为参数创建了一个 SafeFlow 对象,SafeFlow 实现了 Flow 接口,于是接着看其 collect 方法。

collect

除了一开始贴的实现 Flow 接口调用 collect 方法的方式, Kotlin 还提供了调用 collect 的两个扩展函数,最后都是调用的 fun collect(collector: FlowCollector<T>) 方法:

public suspend fun Flow<*>.collect(): Unit = collect(NopCollector)

public suspend inline fun Flow.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector {
override suspend fun emit(value: T) = action(value)
})

于是我们接着上面的示例,看一下 SafeFlow.collect 方法:

private class SafeFlow(private val block: suspend FlowCollector.() -> Unit) : AbstractFlow() {
override suspend fun collectSafely(collector: FlowCollector) {
collector.block()
}
}

// collect 方法在父类 AbstractFlow 中
public abstract class AbstractFlow : Flow, CancellableFlow {
public final override suspend fun collect(collector: FlowCollector) {
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}

public abstract suspend fun collectSafely(collector: FlowCollector)
}

可以看到 collect 方法中通过 collector 封装了一个 SafeCollector 对象,并以其为参数执行了 SafeFlow.collectSafely 方法,而 collectSafely 方法只是执行了 block 代码块(collector.block()),它是一个扩展函数,所以执行的示例代码中的 emit(1) 其实就是调用了 SafeCollector.emit(1), 然后在 SafeCollector 中对 FlowCollector 做了一层安全校验后,最后还是会调用 FlowCo
llector.emit 方法,即创建 SafeCollector 时传入的 collector 对象的 emit 方法。这里只关注核心流程,故不贴出具体代码了。

根据上面我们看到的 collect {} 扩展函数的源码,可以知道其 emit 方法其实就是执行 collect {} 中传入的 action 代码块,参数为 emit 发射的值 – 1.

小结:flow {} 方式(或flowOf, asFlow)创建的 Flow 实例是 SafeFlow 类型,其父类是 AbstractFlow 抽象类,当调用其 collect(FlowCollector) 方法时,首先会执行该 Flow 对象传入的 block 代码块,代码块中一般会有 emit 方法发射值,这个 emit 调用的就是 AbstractFlow.emit 方法,在其中做了安全判定后,会接着调用到 collect 中传入的 FlowCollector.emit 方法,对于 collect {} 的情况,emit 方法内部就是执行 collect 传入的 action 代码块。因为它在每次调用 collect 时才去触发发送数据的动作,所以说 Flow 是冷流

主要流程如下图:

flowOn

学习 flow 一个绕不开的操作符就是 flowOn 了,以下面示例代码为例, flow 需要在协程中使用,下面的 emit(1) 会在 Dispatchers.Default 指定的线程中执行,而 println(it) 会在父协程所在线程中执行:

flow { emit(1) }.flowOn(Dispatchers.Default).collect { println(it) }

flow {} 的源码在上面已经看过了,就是以 block 代码块为参数创建了一个 SafeFlow 对象,接下来看一下 Flow.flowOn 的逻辑:

public fun Flow.flowOn(context: CoroutineContext): Flow {
checkFlowContext(context)
return when {
// 返回自身 Flow 实例
// 这里我们传入了 Dispatchers.Default, 所以不符合这个条件
context == EmptyCoroutineContext -> this
// SafeFlow 不是该类型,因此也不走这个流程,实际上 FusibleFlow 是当连续多次调用 flowOn 后会创建的 Flow 对象
this is FusibleFlow -> fuse(context = context)
// 逻辑走到这里
else -> ChannelFlowOperatorImpl(this, context = context)
}
}

在上面已经对流程注释了一下,因此上述实例代码转换一下即为: SafeFlow.flowOn.collect {} --> ChannelFlowOperatorImpl.collect {}, 这里注意一下创建 ChannelFlowOperatorImpl 对象时传入的两个参数,第一个 this 指的是之前的 SafeFlow 对象,第二个 context 参数即是我们传入的调度器,它是一个协程上下文

ChannelFlowOperatorImpl.collect 实现在父类 ChannelFlowOperator.collect 中,该方法如果发现传入的 coroutineContext 上下文中没有携带调度器,即我们调用 flowOn 时没有传入 Dispatchers 等调度器,则会直接调用上一层 SafeFlow 的 collect 方法(代码不贴了),否则接着调用父类 ChannelFlow 中的 collect 方法,我们直接看 flowOn 中传入了调度器后的逻辑:

internal abstract class ChannelFlowOperator<S, T>(
@JvmField protected val flow: Flow,
context: CoroutineContext,
capacity: Int,
onBufferOverflow: BufferOverflow
) : ChannelFlow(context, capacity, onBufferOverflow) {
override suspend fun collect(collector: FlowCollector) {
// 判断 coroutineContext 逻辑
// …
super.collect(collector) // 调用父类 ChannelFlow 中方法
}
}

public abstract class ChannelFlow(
// upstream context
@JvmField public val context: CoroutineContext,
// buffer capacity between upstream and downstream context
@JvmField public val capacity: Int,
// buffer overflow strategy
@JvmField public val onBufferOverflow: BufferOverflow
) : FusibleFlow {
override suspend fun collect(collector: FlowCollector): Unit =
coroutineScope {
collector.emitAll(produceImpl(this))
}

public open fun produceImpl(scope: CoroutineScope): ReceiveChannel =
scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)
}

这里可以看到 ChannelFlowOperatorImpl.collect 最后会走到 collector.emitAll(produceImpl(this)) 生产消费的逻辑,我们分步骤看一下生产和接收的流程。

生产数据

首先看上面 produceImpl 方法:

internal fun CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
start: CoroutineStart = CoroutineStart.DEFAULT,
onCompletion: CompletionHandler? = null,
@BuilderInference block: suspend ProducerScope.() -> Unit
): ReceiveChannel {
val channel = Channel(capacity, onBufferOverflow)
val newContext = newCoroutineContext(context)
val coroutine = ProducerCoroutine(newContext, channel)
if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
coroutine.start(start, coroutine, block)
return coroutine
}

看到这个方法,是不是很熟悉呢?参考之前的 Kotlin之深入理解协程工作原理 的文章可以知道,这里的 produce 方法其实就是启动了一个新的协程,该协程执行的代码块 block 是传入的 collectToFun 参数,接着找 collectToFun 可以发现它会取 ChannelFlowOperator.collectTo 方法:

// ChannelFlowOperator
protected override suspend fun collectTo(scope: ProducerScope) =
// flowCollect 方法实现在子类 ChannelFlowOperatorImpl 中
flowCollect(SendingCollector(scope))

// ChannelFlowOperatorImpl
internal class ChannelFlowOperatorImpl(
flow: Flow,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) {
override suspend fun flowCollect(collector: FlowCollector) =
// 这个 flow 就是上层传入的 SafeFlow 对象
flow.collect(collector)
}

根据之前的解析, flow.collect(collector) 中的 flow 是 SafeFlow 对象,其 collect 方法会执行 SafeFlow 中传入的代码块(即flow {}),这个代码块中调用了 collector.emit(1) 方法(上面代码可以看出此时的 collector 是 SendingCollector 实例),因此我们看看 SendingCollector.emit 方法做了什么:

public class SendingCollector(
private val channel: SendChannel
) : FlowCollector {
override suspend fun emit(value: T): Unit = channel.send(value)
}

.emit 方法做了什么:

public class SendingCollector(
private val channel: SendChannel
) : FlowCollector {
override suspend fun emit(value: T): Unit = channel.send(value)
}

  移动开发 最新文章
Vue3装载axios和element-ui
android adb cmd
【xcode】Xcode常用快捷键与技巧
Android开发中的线程池使用
Java 和 Android 的 Base64
Android 测试文字编码格式
微信小程序支付
安卓权限记录
知乎之自动养号
【Android Jetpack】DataStore
上一篇文章      下一篇文章      查看所有文章
加:2022-01-28 12:02:03  更:2022-01-28 12:03:39 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 12:42:38-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码