IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 移动开发 -> Kotlin之SharedFlow和Stateflow -> 正文阅读

[移动开发]Kotlin之SharedFlow和Stateflow

SharedFlow

SharedFlow是一个hot stream. sharedflow有以下特点:

  1. 没有默认值
  2. 可以保持旧值
  3. emit会挂起直到所有的订阅者处理完成
public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
)

replay: 重新发送给新订阅者之前值的个数

extraBufferCapacity:除了replay缓冲区个数之外的缓冲区的值。当有剩余空间的时候emit就不会挂起

onBufferOverflow:当extraBufferCapacity溢出时的处理。有下面三种处理方式:

public enum class BufferOverflow {
    /**
     * Suspend on buffer overflow.
     */
    SUSPEND,

    /**
     * Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
     */
    DROP_OLDEST,

    /**
     * Drop **the latest** value that is being added to the buffer right now on buffer overflow
     * (so that buffer contents stay the same), do not suspend.
     */
    DROP_LATEST
}

使用demo来测试这几个参数的作用:

1.测试replay的作用

fun `sharedflow Reply = 1`()= runBlocking{
    var sharedflow = MutableSharedFlow<Int>(replay = 1)
    GlobalScope.launch {
        sharedflow.emit(111)
        sharedflow.emit(222)
    }
    val job1 = GlobalScope.launch {
        delay(5000)
        sharedflow.collect{
            println("first job : $it")
        }
    }

    val job2 = GlobalScope.launch {
        delay(10000)
        sharedflow.collect{
            println("second job : $it")
        }
    }

    job1.join()
    job2.join()
}

结果:

first job : 222
second job : 222

上面的例子可以看出,设置了replay为1(默认为0),job1和job2最后打印的都是“222”

2.测试extraBufferCapacity的作用

fun `sharedflow emit suspend`() = runBlocking{
    var sharedflow = MutableSharedFlow<Int>()

    val job1 = GlobalScope.launch {
        sharedflow.collect{
            //println("${gettime()}:first job: $it")
        }
    }
    val job2 = GlobalScope.launch {
        sharedflow.collect{
            delay(5000)
            //println("${gettime()}:second job: $it")
        }
    }
    val job3 = GlobalScope.launch {
        for (i in 1..5){
            delay(500)
            println("${gettime()}:start--------emit $i")
            sharedflow.emit(i)
            println("${gettime()}:end--------emit $i")
        }
    }
    job1.join()
    job2.join()
    job3.join()
}

结果:

第一个emit没有挂起
11:20:46:699:start--------emit 1
11:20:46:704:end--------emit 1
第二个emit挂起了4s左右
11:20:47:206:start--------emit 2
11:20:51:711:end--------emit 2
第三个emit挂起了6s左右
11:20:52:217:start--------emit 3
11:20:56:716:end--------emit 3
第四个emit挂起了4s左右
11:20:57:219:start--------emit 4
11:21:1:717:end--------emit 4
第五个emit挂起了6s左右
11:21:2:218:start--------emit 5
11:21:6:720:end--------emit 5

代码中job2在collect时候delay了5s,从emit的结果看,从emit 2开始从start emit到end emit中间大概执行了4s多的时间。extraBufferCapacity默认为0

下面再测试一下,将extraBufferCapacity设置为2:

var sharedflow = MutableSharedFlow<Int>(extraBufferCapacity = 2)

测试结果:

emit没有挂起
11:24:47:843:start--------emit 1
11:24:47:848:end--------emit 1
emit没有挂起
11:24:48:354:start--------emit 2
11:24:48:356:end--------emit 2
emit没有挂起
11:24:48:860:start--------emit 3
11:24:48:861:end--------emit 3
emit挂起3s多
11:24:49:365:start--------emit 4
11:24:52:852:end--------emit 4
emit挂起6s多
11:24:53:355:start--------emit 5
11:24:57:857:end--------emit 5

从结果看emit1 到 emit3 的emit没有耗费太多的时间,从emit4开始中间分别耗费了3s,6s。从demo的测试来看当缓冲区满了之后执行了emit执行了挂起操作。

3.测试onBufferOverflow值对sharedflow的影响,修改onBufferOverflow = BufferOverflow.DROP_OLDEST,这个意思是丢弃最先的值

 fun `sharedflow emit suspend`() = runBlocking{
        var sharedflow = MutableSharedFlow<Int>(replay = 1,extraBufferCapacity = 0,onBufferOverflow = BufferOverflow.DROP_OLDEST)

        val job1 = GlobalScope.launch {
            sharedflow.collect{
                println("${gettime()}:first job: $it")
            }
        }
        val job2 = GlobalScope.launch {
            sharedflow.collect{
                delay(5000)
                println("${gettime()}:second job: $it")
            }
        }
        val job3 = GlobalScope.launch {
            for (i in 1..5){
                delay(500)
                //println("${gettime()}:start--------emit $i")
                sharedflow.emit(i)
               // println("${gettime()}:end--------emit $i")
            }
        }
        job1.join()
        job2.join()
        job3.join()
    }

运行结果:

1:20:26:627:first job: 1
1:20:27:129:first job: 2
1:20:27:637:first job: 3
1:20:28:140:first job: 4
1:20:28:643:first job: 5
1:20:31:632:second job: 1
1:20:36:639:second job: 5

从上面运行结果看job1执行完成了,job2只输出了第一个值和最后一个值。job2将2,3,4几个值都丢失了。

onBufferOverflow != BufferOverflow.SUSPEND时,replay和extraBufferCapacity不能同时为0(默认值),否则运行时会出现如下错误:

replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy DROP_OLDEST
java.lang.IllegalArgumentException: replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy DROP_OLDEST
	at kotlinx.coroutines.flow.SharedFlowKt.MutableSharedFlow(SharedFlow.kt:246)
	at com.example.flowdemo.ExampleUnitTest$sharedflow emit suspend$1.invokeSuspend(ExampleUnitTest.kt:165)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:85)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
	at com.example.flowdemo.ExampleUnitTest.sharedflow emit suspend(ExampleUnitTest.kt:164)

将onBufferOverflow 的值改为DROP_LATEST

var sharedflow = MutableSharedFlow<Int>(replay = 1,extraBufferCapacity = 0,onBufferOverflow = BufferOverflow.DROP_LATEST)

运行结果:

1:36:44:978:first job: 1
1:36:45:481:first job: 2
1:36:49:983:second job: 1
1:36:54:989:second job: 2

从结果看只输出去了前两个值,后面的值全部丢掉了。

4.sharedflow是不防抖的,即如果连续放入相同的值,那么每个值collect都会响应一次。

fun `sharedflow shake`()= runBlocking{
    val sharedflow = MutableSharedFlow<String>()
    val job1 = GlobalScope.launch {
        sharedflow.collect {
            println("job---$it")
        }
    }
    val job2 = GlobalScope.launch {
        for (i in 1..5){
            delay(100)
            sharedflow.emit("hello laworks")
        }
    }
    job1.join()
    job2.join()
}

运行结果

job---hello laworks
job---hello laworks
job---hello laworks
job---hello laworks
job---hello laworks

这个和stateflow做一个对比

@Test
fun `stateflow shake`()= runBlocking{
    val stateflow = MutableStateFlow("state init")
    val job1 = GlobalScope.launch {
        stateflow.collect {
            println("job---$it")
        }
    }
    val job2 = GlobalScope.launch {
        for (i in 1..5){
            delay(100)
            stateflow.emit("I'am stateflow")
        }
    }
    job1.join()
    job2.join()
}

运行结果:

job---state init
job---I'am stateflow

从上面对比的结果可以看出stateflow是防抖的。

stateFlow

sharedflow是不防抖的,但是stateflow是防抖的,从上面的例子可以看出,下面是源码的分析:

源码如下:

synchronized(this) {
            val oldState = _state.value
            if (expectedState != null && oldState != expectedState) return false // CAS support
            
            /*********return if equal*********/
            if (oldState == newState) return true // Don't do anything if value is not changing, but CAS -> true
            
            _state.value = newState
            curSequence = sequence
            if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
                curSequence++ // make it odd
                sequence = curSequence
            } else {
                // update is already in process, notify it, and return
                sequence = curSequence + 2 // change sequence to notify, keep it odd
                return true // updated
            }
            curSlots = slots // read current reference to collectors under lock
        }

从源码上可以看出来,当值没有变化的时候就直接return了,不会做任何的处理。这也就是为什么stateflow是防抖的。

下面看看关于stateflow值丢失的问题:

@Test
fun `stateflow suspend`() = runBlocking {
    val stateflow = MutableStateFlow(0)
    val job1 = GlobalScope.launch {
        stateflow.collect{
            println("job1----$it")
        }
    }
    val job2 = GlobalScope.launch {
        stateflow.collect{
            delay(5000)
            println("job2----$it")
        }
    }
    val job3 = GlobalScope.launch {
        for(i in 1..5){
            //delay(500)
            println("${gettime()}:start--------emit $i")
            stateflow.emit(i)
            println("${gettime()}:end--------emit $i")
        }
    }
    job1.join()
    job2.join()
    job3.join()
}

输入的结果如下:

10:3:3:681:start--------emit 1
job1----0
10:3:3:684:end--------emit 1
job1----1
10:3:3:684:start--------emit 2
10:3:3:684:end--------emit 2
10:3:3:684:start--------emit 3
10:3:3:684:end--------emit 3
10:3:3:684:start--------emit 4
10:3:3:684:end--------emit 4
10:3:3:684:start--------emit 5
10:3:3:685:end--------emit 5
job1----5
job2----0
job2----5

结果看job1分别输出了0,1,5. job2分别输出了0和5. 并且emit很快就完成了,这是为什么?为什么job1没有输出所有的值,job2的延迟也没有像sharedflow一样将emit挂起?

首先说stateflow没有buffer的概念,它只能存一个值,所以在有值的时候就会去更新。再看看collect的实现:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

从上面的方法可以看出flow的collect方法会继续调用带参数的collect方法,参数里面的emit方法就是我们外面的实现。

在看看带参数的collect在stateflow里面的实现:

override suspend fun collect(collector: FlowCollector<T>) {
    val slot = allocateSlot()
    try {
        if (collector is SubscribedFlowCollector) collector.onSubscription()
        val collectorJob = currentCoroutineContext()[Job]
        var oldState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
        // The loop is arranged so that it starts delivering current value without waiting first
        while (true) {
           ....
            if (oldState == null || oldState != newState) {
            	 //emit is the user action
                collector.emit(NULL.unbox(newState))
                oldState = newState
            }
            ....
        }
    } finally {
        freeSlot(slot)
    }
}

从代码中可以看出:

1.这个方法里面是一个无限循环

2.这个里面在不断的取值,并和之前的值做对比,如果一样的话那么就不会再调用emit方法了。

在这个方法里面假设collector.emit(NULL.unbox(newState)) 执行的时间很长,那么就有可能出现stateflow的值变化之后没有及时取的情况。

  移动开发 最新文章
Vue3装载axios和element-ui
android adb cmd
【xcode】Xcode常用快捷键与技巧
Android开发中的线程池使用
Java 和 Android 的 Base64
Android 测试文字编码格式
微信小程序支付
安卓权限记录
知乎之自动养号
【Android Jetpack】DataStore
上一篇文章      下一篇文章      查看所有文章
加:2022-02-24 15:24:46  更:2022-02-24 15:26:44 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 16:01:45-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码