SharedFlow
SharedFlow是一个hot stream. sharedflow有以下特点:
- 没有默认值
- 可以保持旧值
- emit会挂起直到所有的订阅者处理完成
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
)
replay: 重新发送给新订阅者之前值的个数
extraBufferCapacity:除了replay缓冲区个数之外的缓冲区的值。当有剩余空间的时候emit就不会挂起
onBufferOverflow:当extraBufferCapacity溢出时的处理。有下面三种处理方式:
public enum class BufferOverflow {
/**
* Suspend on buffer overflow.
*/
SUSPEND,
/**
* Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
*/
DROP_OLDEST,
/**
* Drop **the latest** value that is being added to the buffer right now on buffer overflow
* (so that buffer contents stay the same), do not suspend.
*/
DROP_LATEST
}
使用demo来测试这几个参数的作用:
1.测试replay的作用
fun `sharedflow Reply = 1`()= runBlocking{
var sharedflow = MutableSharedFlow<Int>(replay = 1)
GlobalScope.launch {
sharedflow.emit(111)
sharedflow.emit(222)
}
val job1 = GlobalScope.launch {
delay(5000)
sharedflow.collect{
println("first job : $it")
}
}
val job2 = GlobalScope.launch {
delay(10000)
sharedflow.collect{
println("second job : $it")
}
}
job1.join()
job2.join()
}
结果:
first job : 222
second job : 222
上面的例子可以看出,设置了replay为1(默认为0),job1和job2最后打印的都是“222”
2.测试extraBufferCapacity的作用
fun `sharedflow emit suspend`() = runBlocking{
var sharedflow = MutableSharedFlow<Int>()
val job1 = GlobalScope.launch {
sharedflow.collect{
//println("${gettime()}:first job: $it")
}
}
val job2 = GlobalScope.launch {
sharedflow.collect{
delay(5000)
//println("${gettime()}:second job: $it")
}
}
val job3 = GlobalScope.launch {
for (i in 1..5){
delay(500)
println("${gettime()}:start--------emit $i")
sharedflow.emit(i)
println("${gettime()}:end--------emit $i")
}
}
job1.join()
job2.join()
job3.join()
}
结果:
第一个emit没有挂起
11:20:46:699:start--------emit 1
11:20:46:704:end--------emit 1
第二个emit挂起了4s左右
11:20:47:206:start--------emit 2
11:20:51:711:end--------emit 2
第三个emit挂起了6s左右
11:20:52:217:start--------emit 3
11:20:56:716:end--------emit 3
第四个emit挂起了4s左右
11:20:57:219:start--------emit 4
11:21:1:717:end--------emit 4
第五个emit挂起了6s左右
11:21:2:218:start--------emit 5
11:21:6:720:end--------emit 5
代码中job2在collect时候delay了5s,从emit的结果看,从emit 2开始从start emit到end emit中间大概执行了4s多的时间。extraBufferCapacity默认为0
下面再测试一下,将extraBufferCapacity设置为2:
var sharedflow = MutableSharedFlow<Int>(extraBufferCapacity = 2)
测试结果:
emit没有挂起
11:24:47:843:start--------emit 1
11:24:47:848:end--------emit 1
emit没有挂起
11:24:48:354:start--------emit 2
11:24:48:356:end--------emit 2
emit没有挂起
11:24:48:860:start--------emit 3
11:24:48:861:end--------emit 3
emit挂起3s多
11:24:49:365:start--------emit 4
11:24:52:852:end--------emit 4
emit挂起6s多
11:24:53:355:start--------emit 5
11:24:57:857:end--------emit 5
从结果看emit1 到 emit3 的emit没有耗费太多的时间,从emit4开始中间分别耗费了3s,6s。从demo的测试来看当缓冲区满了之后执行了emit执行了挂起操作。
3.测试onBufferOverflow值对sharedflow的影响,修改onBufferOverflow = BufferOverflow.DROP_OLDEST,这个意思是丢弃最先的值
fun `sharedflow emit suspend`() = runBlocking{
var sharedflow = MutableSharedFlow<Int>(replay = 1,extraBufferCapacity = 0,onBufferOverflow = BufferOverflow.DROP_OLDEST)
val job1 = GlobalScope.launch {
sharedflow.collect{
println("${gettime()}:first job: $it")
}
}
val job2 = GlobalScope.launch {
sharedflow.collect{
delay(5000)
println("${gettime()}:second job: $it")
}
}
val job3 = GlobalScope.launch {
for (i in 1..5){
delay(500)
//println("${gettime()}:start--------emit $i")
sharedflow.emit(i)
// println("${gettime()}:end--------emit $i")
}
}
job1.join()
job2.join()
job3.join()
}
运行结果:
1:20:26:627:first job: 1
1:20:27:129:first job: 2
1:20:27:637:first job: 3
1:20:28:140:first job: 4
1:20:28:643:first job: 5
1:20:31:632:second job: 1
1:20:36:639:second job: 5
从上面运行结果看job1执行完成了,job2只输出了第一个值和最后一个值。job2将2,3,4几个值都丢失了。
onBufferOverflow != BufferOverflow.SUSPEND时,replay和extraBufferCapacity不能同时为0(默认值),否则运行时会出现如下错误:
replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy DROP_OLDEST
java.lang.IllegalArgumentException: replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy DROP_OLDEST
at kotlinx.coroutines.flow.SharedFlowKt.MutableSharedFlow(SharedFlow.kt:246)
at com.example.flowdemo.ExampleUnitTest$sharedflow emit suspend$1.invokeSuspend(ExampleUnitTest.kt:165)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:85)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
at com.example.flowdemo.ExampleUnitTest.sharedflow emit suspend(ExampleUnitTest.kt:164)
将onBufferOverflow 的值改为DROP_LATEST
var sharedflow = MutableSharedFlow<Int>(replay = 1,extraBufferCapacity = 0,onBufferOverflow = BufferOverflow.DROP_LATEST)
运行结果:
1:36:44:978:first job: 1
1:36:45:481:first job: 2
1:36:49:983:second job: 1
1:36:54:989:second job: 2
从结果看只输出去了前两个值,后面的值全部丢掉了。
4.sharedflow是不防抖的,即如果连续放入相同的值,那么每个值collect都会响应一次。
fun `sharedflow shake`()= runBlocking{
val sharedflow = MutableSharedFlow<String>()
val job1 = GlobalScope.launch {
sharedflow.collect {
println("job---$it")
}
}
val job2 = GlobalScope.launch {
for (i in 1..5){
delay(100)
sharedflow.emit("hello laworks")
}
}
job1.join()
job2.join()
}
运行结果
job---hello laworks
job---hello laworks
job---hello laworks
job---hello laworks
job---hello laworks
这个和stateflow做一个对比
@Test
fun `stateflow shake`()= runBlocking{
val stateflow = MutableStateFlow("state init")
val job1 = GlobalScope.launch {
stateflow.collect {
println("job---$it")
}
}
val job2 = GlobalScope.launch {
for (i in 1..5){
delay(100)
stateflow.emit("I'am stateflow")
}
}
job1.join()
job2.join()
}
运行结果:
job---state init
job---I'am stateflow
从上面对比的结果可以看出stateflow是防抖的。
stateFlow
sharedflow是不防抖的,但是stateflow是防抖的,从上面的例子可以看出,下面是源码的分析:
源码如下:
synchronized(this) {
val oldState = _state.value
if (expectedState != null && oldState != expectedState) return false // CAS support
/*********return if equal*********/
if (oldState == newState) return true // Don't do anything if value is not changing, but CAS -> true
_state.value = newState
curSequence = sequence
if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
curSequence++ // make it odd
sequence = curSequence
} else {
// update is already in process, notify it, and return
sequence = curSequence + 2 // change sequence to notify, keep it odd
return true // updated
}
curSlots = slots // read current reference to collectors under lock
}
从源码上可以看出来,当值没有变化的时候就直接return了,不会做任何的处理。这也就是为什么stateflow是防抖的。
下面看看关于stateflow值丢失的问题:
@Test
fun `stateflow suspend`() = runBlocking {
val stateflow = MutableStateFlow(0)
val job1 = GlobalScope.launch {
stateflow.collect{
println("job1----$it")
}
}
val job2 = GlobalScope.launch {
stateflow.collect{
delay(5000)
println("job2----$it")
}
}
val job3 = GlobalScope.launch {
for(i in 1..5){
//delay(500)
println("${gettime()}:start--------emit $i")
stateflow.emit(i)
println("${gettime()}:end--------emit $i")
}
}
job1.join()
job2.join()
job3.join()
}
输入的结果如下:
10:3:3:681:start--------emit 1
job1----0
10:3:3:684:end--------emit 1
job1----1
10:3:3:684:start--------emit 2
10:3:3:684:end--------emit 2
10:3:3:684:start--------emit 3
10:3:3:684:end--------emit 3
10:3:3:684:start--------emit 4
10:3:3:684:end--------emit 4
10:3:3:684:start--------emit 5
10:3:3:685:end--------emit 5
job1----5
job2----0
job2----5
结果看job1分别输出了0,1,5. job2分别输出了0和5. 并且emit很快就完成了,这是为什么?为什么job1没有输出所有的值,job2的延迟也没有像sharedflow一样将emit挂起?
首先说stateflow没有buffer的概念,它只能存一个值,所以在有值的时候就会去更新。再看看collect的实现:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
从上面的方法可以看出flow的collect方法会继续调用带参数的collect方法,参数里面的emit方法就是我们外面的实现。
在看看带参数的collect在stateflow里面的实现:
override suspend fun collect(collector: FlowCollector<T>) {
val slot = allocateSlot()
try {
if (collector is SubscribedFlowCollector) collector.onSubscription()
val collectorJob = currentCoroutineContext()[Job]
var oldState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
// The loop is arranged so that it starts delivering current value without waiting first
while (true) {
....
if (oldState == null || oldState != newState) {
//emit is the user action
collector.emit(NULL.unbox(newState))
oldState = newState
}
....
}
} finally {
freeSlot(slot)
}
}
从代码中可以看出:
1.这个方法里面是一个无限循环
2.这个里面在不断的取值,并和之前的值做对比,如果一样的话那么就不会再调用emit方法了。
在这个方法里面假设collector.emit(NULL.unbox(newState)) 执行的时间很长,那么就有可能出现stateflow的值变化之后没有及时取的情况。
|