一.MutableStateFlow接口的实现
1.MutableStateFlow方法
????在Kotlin协程:StateFlow的设计与使用中,讲到可以通过MutableSharedFlow方法创建一个MutableSharedFlow接口指向的对象,代码如下:
@Suppress("FunctionName")
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> =
StateFlowImpl(value ?: NULL)
...
@JvmField
@SharedImmutable
internal val NULL = Symbol("NULL")
????在MutableStateFlow方法中,根据参数value创建并返回一个类型为StateFlowImpl的对象,如果参数value为空,则传入对应的标识NULL。
二.StateFlowImpl类
????StateFlowImpl类是MutableStateFlow接口的核心实现,它的继承关系与SharedFlowImpl类的继承关系类似,如下图所示: 
- AbstractSharedFlow类:提供了对订阅者进行管理的方法。
- CancellableFlow接口:用于标记StateFlowImpl类型的Flow对象是可取消的。
- MutableStateFlow接口:表示StateFlowImpl类型的Flow对象是一个单数据更新的热流。
- FusibleFlow接口:表示StateFlowImpl类型的Flow对象是可融合的。
1.发射数据的管理
????在StateFlowImpl中,当前的数据被保存在名为_state的全局变量中,_state表示StateFlowImpl类型对象的状态,当前代码如下:
private class StateFlowImpl<T>(
initialState: Any
) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
private val _state = atomic(initialState)
...
}
????除此之外,StateFlowImpl不对其他的数据做缓存。
2.订阅者的管理
????由于StateFlowImpl类与SharedFLowImpl类都继承自AbstractSharedFlow类,因此二者订阅者管理的核心逻辑相同,这里不再赘述,详情可参考Kotlin协程:MutableSharedFlow的实现原理。
????唯一不同的地方在,在SharedFlowImpl类中,订阅者数组中存储的对象类型为SharedFlowSlot,而在StateFlowImpl类中,订阅者数组存储的对象类型为StateFlowSlot。
1)StateFlowSlot类
????StateFlowSlot类与SharedFlowSlot类类似,都继承自AbstractSharedFlowSlot类。但相比于SharedFlowImpl类型的对象,StateFlowImpl类型的对象是有状态的。
????在StateFlowSlot类中,有一个名为_state的全局变量,代码如下:
@SharedImmutable
private val NONE = Symbol("NONE")
@SharedImmutable
private val PENDING = Symbol("PENDING")
private class StateFlowSlot : AbstractSharedFlowSlot<StateFlowImpl<*>>() {
private val _state = atomic<Any?>(null)
...
}
????根据_state保存对象的不同,可以确定StateFlowSlot类型的对象的状态。StateFlowSlot类型的对象共有四种状态:
-
null:如果_state保存的对象为空,表示当前StateFlowSlot类型的对象没有被任何订阅者使用。 -
NONE:如果_state保存的对象为NONE标识,表示当前StateFlowSlot类型的对象已经被对应的订阅者使用,但既没有挂起,也没有在处理当前的数据。 -
PENDING:如果_state保存的对象为PENDING标识,表示当前StateFlowSlot类型的对象已经被对应的订阅者使用,并且将开始处理当前的数据。 -
CancellableContinuationImpl<Unit>:如果_state保存的对象为续体,表示当前StateFlowSlot类型的对象已经被对应的订阅者使用,但是订阅者已处理完当前的数据,所在的协程已被挂起,等待新的数据到来。
a)订阅者状态的管理
????在StateFlowSlot类中,重写了AbstractSharedFlowSlot类的allocateLocked方法与freeLocked方法,顶用两个方法会对订阅者的初始状态和最终状态进行改变,代码如下:
override fun allocateLocked(flow: StateFlowImpl<*>): Boolean {
if (_state.value != null) return false
_state.value = NONE
return true
}
override fun freeLocked(flow: StateFlowImpl<*>): Array<Continuation<Unit>?> {
_state.value = null
return EMPTY_RESUMES
}
@JvmField
@SharedImmutable
internal val EMPTY_RESUMES = arrayOfNulls<Continuation<Unit>?>(0)
????为了实现上述对订阅者状态的管理,在StateFlowSlot类中,还额外提供了三个方法用于实现对订阅者的状态的切换,代码如下:
@Suppress("UNCHECKED_CAST")
fun makePending() {
_state.loop { state ->
when {
state == null -> return
state === PENDING -> return
state === NONE -> {
if (_state.compareAndSet(state, PENDING)) return
}
else -> {
if (_state.compareAndSet(state, NONE)) {
(state as CancellableContinuationImpl<Unit>).resume(Unit)
return
}
}
}
}
}
fun takePending(): Boolean = _state.getAndSet(NONE)!!.let { state ->
assert { state !is CancellableContinuationImpl<*> }
return state === PENDING
}
@Suppress("UNCHECKED_CAST")
suspend fun awaitPending(): Unit = suspendCancellableCoroutine sc@ { cont ->
assert { _state.value !is CancellableContinuationImpl<*> }
if (_state.compareAndSet(NONE, cont)) return@sc
assert { _state.value === PENDING }
cont.resume(Unit)
}
3.数据的接收
????当调用StateFlow类型对象的collect方法,会触发订阅过程,接收emit方法发送的数据,这部分在 StateFlowImpl中实现,代码如下:
override suspend fun collect(collector: FlowCollector<T>) {
val slot = allocateSlot()
try {
if (collector is SubscribedFlowCollector) collector.onSubscription()
val collectorJob = currentCoroutineContext()[Job]
var oldState: Any? = null
while (true) {
val newState = _state.value
collectorJob?.ensureActive()
if (oldState == null || oldState != newState) {
collector.emit(NULL.unbox(newState))
oldState = newState
}
if (!slot.takePending()) {
slot.awaitPending()
}
}
} finally {
freeSlot(slot)
}
}
????在上述代码中,假设当前订阅者处于PENGDING状态,并在处理数据后,通过takePending方法,将自身状态修改为NONE,由于之前为PENGDING状态,因此不会执行awaitPending方法进行挂起。因此进行了第二次循环,而在第二次调用takePending方法之前,如果数据没有更新,则订阅者将一直处于NONE状态,当再次调用takePending方法时,会调用awaitPending方法,将订阅者所在协程挂起。
4.数据的发射
????在StateFlowImpl类中,当需要发射数据时,可以调用emit方法、tryEmit方法、compareAndSet方法,代码如下:
override fun tryEmit(value: T): Boolean {
this.value = value
return true
}
override suspend fun emit(value: T) {
this.value = value
}
override fun compareAndSet(expect: T, update: T): Boolean =
updateState(expect ?: NULL, update ?: NULL)
????compareAndSet方法内部调用updateState方法对数据进行更新,而emit方法与tryEmit方法内部通过value属性对数据进行更新,代码如下:
@Suppress("UNCHECKED_CAST")
public override var value: T
get() = NULL.unbox(_state.value)
set(value) { updateState(null, value ?: NULL) }
@Suppress("UNCHECKED_CAST", "NOTHING_TO_INLINE")
inline fun <T> unbox(value: Any?): T = if (value === this) null as T else value as T
????可以发现,无论是通过emit方法、tryEmit方法还是compareAndSet方法,最终都是通过updateState方法实现数据的更新,代码如下:
private var sequence = 0
private fun updateState(expectedState: Any?, newState: Any): Boolean {
var curSequence = 0
var curSlots: Array<StateFlowSlot?>? = this.slots
synchronized(this) {
val oldState = _state.value
if (expectedState != null && oldState != expectedState) return false
if (oldState == newState) return true
_state.value = newState
curSequence = sequence
if (curSequence and 1 == 0) {
curSequence++
sequence = curSequence
} else {
sequence = curSequence + 2
return true
}
curSlots = slots
}
while (true) {
curSlots?.forEach {
it?.makePending()
}
synchronized(this) {
if (sequence == curSequence) {
sequence = curSequence + 1
return true
}
curSequence = sequence
curSlots = slots
}
}
}
5.新订阅者获取缓存数据
????当新订阅者出现时,StateFlow会将当前最新的数据发送给订阅者。可以通过调用StateFlowImpl类重写的常量replayCache获取当前最新的数据,代码如下:
override val replayCache: List<T>
get() = listOf(value)
????在StateFlow中,清除replayCache是无效的,因为StateFlow中必须持有一个数据,因此调用 resetReplayCache方法会抛出异常,代码如下:
@Suppress("UNCHECKED_CAST")
override fun resetReplayCache() {
throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported")
}
6.热流的融合
????SharedFlowImpl类实现了FusibleFlow接口,重写了其中的fuse方法,代码如下:
override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
fuseStateFlow(context, capacity, onBufferOverflow)
...
internal fun <T> StateFlow<T>.fuseStateFlow(
context: CoroutineContext,
capacity: Int,
onBufferOverflow: BufferOverflow
): Flow<T> {
assert { capacity != Channel.CONFLATED }
if ((capacity in 0..1 || capacity == Channel.BUFFERED) && onBufferOverflow == BufferOverflow.DROP_OLDEST) {
return this
}
return fuseSharedFlow(context, capacity, onBufferOverflow)
}
7.只读热流
????调用MutableStateFlow方法,可以得到一个类型为MutableStateFlow的对象。通过这个对象,我们可以调用它的collect方法来订阅接收,也可以调用它的emit方法来发射数据。但大多数的时候,我们需要统一数据的发射过程,因此需要对外暴露一个只可以调用collect方法订阅而不能调用emit方法发射的对象,而不是直接暴露MutableStateFlow类型的对象。
????根据上面代码的介绍,订阅的过程实际上是对数据的获取,而发射的过程实际上是数据的修改,因此如果一个流只能调用collect方法而不能调用emit方法,这种流这是一种只读流。
????事实上,在Kotlin协程:StateFlow的设计与使用分析接口的时候可以发现,MutableStateFlow接口继承了MutableSharedFlow接口,MutableSharedFlow接口继承了FlowCollector接口,emit方法定义在FlowCollector中。StateFlow接口继承了Flow接口,collect方法定义在Flow接口中。因此只要将MutableStateFlow接口指向的对象转换为StateFlow接口指向的对象就可以将读写流转换为只读流。
????在代码中,对MutableStateFlow类型的对象调用asStateFlow方法恰好可以实现将读写流转换为只读流,代码如下:
public fun <T> MutableStateFlow<T>.asStateFlow(): StateFlow<T> =
ReadonlyStateFlow(this)
private class ReadonlyStateFlow<T>(
flow: StateFlow<T>
) : StateFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> {
override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
fuseStateFlow(context, capacity, onBufferOverflow)
}
|