IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 移动开发 -> Kotlin-Coroutines-Flow -> 正文阅读

[移动开发]Kotlin-Coroutines-Flow

Flow:异步数据流,上游按顺序发送值,下游接收值。
是以协程为基础,采用响应式编程方式可以连续进行操作的库,类似RxJava ,属于生产者-消费者模式,上游流生产,下游流接收处理

上游流生产发送
中间操作符加工厂
下游流接收处理

Flow特点
1,属于冷流,默认上游流代码不被执行,只有在下游流处理时才会执行上游代码,一般是emit()发送,collect{}接收(终端操作符均可接收)
2,链式调用
3,中间操作符:map,filter,take,zip,combine等(数据发送后中间加工一下)
4,终端操作符: collect, single, reduce, toList等(触发上游流代码执行,接收处理)

Flow的创建

  1. flowOf() 以固定值创建流
  2. asFlow() 扩展函数将各种类型(集合,序列等)转换为流,asFlow()有各种类型的扩展
  3. flow { } 构建任意流,内部调用了emit()函数
  4. channelFlow { } 构建通道流,内部并发调用了send()函数
  5. MutableStateFlow and MutableSharedFlow 创建可以直接更新的热流

冷流的体现

fun simple(): Flow<Int> = flow {
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i) // 发送
    }
}

fun main() = runBlocking {

    listOf(1, 2, 3).asFlow().collect { value -> println(value) }

    println("Calling simple function...")
    val flow = simple() // 不会执行内部代码,没有打印Flow started
    println("Calling collect...")
    flow.collect { value -> println(value) } // 接收处理,才执行流内代码
    println("Calling collect again...")
    flow.collect { value -> println(value) }
}
输出:
Calling simple function...
Calling collect...    
Flow started      // 接收处理时,才执行流内代码
1
2
3
Calling collect again...
Flow started
1
2
3

Flow流切换协程:flowOn()

一般流是按照顺序执行的,都是在同一个协程中,上游发送,下游接收。
下游接收流在哪个协程中,上游代码就在哪个协程中。不能手动更改协程上下文,否则就会报错。如果要更改请使用Kotlin提供的Flow扩展函数flowOn()。

另,可以使用launchIn(CoroutineScope(Dispatchers.IO))返回一个新的协程,launchIn的返回值是Job

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {}
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
           
fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder

fun main() = runBlocking<Unit> {
    simple().collect { value ->
        log("Collected $value") 
    } 
}   
输出:
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3   

可以多次运用flowOn()方法

flow.flowOn(Dispatchers.Default).map { it }.flowOn(Dispatchers.IO).toList()

Flow异常处理
1,flow.catch{}
2,try { }catch(e: Throwable){}finally {}

Flow开始和完成
onStart { }
onCompletion { } or try { }finally {}

 	val flow = simple()
    flow.flowOn(Dispatchers.Default)
        .map { it }
        .flowOn(Dispatchers.IO)
        .onStart { println("onStart function... ${Thread.currentThread().name}") }
        .onCompletion {
            println("onCompletion function... ${Thread.currentThread().name}")
            throw NullPointerException()
        }
        .catch { println("catch function... ${Thread.currentThread().name}") }
        .collect { value -> println("value = $value")}
输出:
Calling simple function... main
onStart function... main
Flow started DefaultDispatcher-worker-3
value = 1
value = 2
value = 3
value = 4
value = 5
value = 6
onCompletion function... main
catch function... main

Flow取消流
由于流是以协程为基础,所以Flow的取消和协程的取消相同,
1,可以用withTimeoutOrNull(){},超时取消 or 抛出异常中段执行。
2,如果采用了launchIn()那就是一个全新的协程,可以通过cancel()和join()或者cancelAndJoin()取消协程
3,取消flow所在的协程也是可以的

Flow取消检测机制
Flow 构建器会对每个emit发出的值进行检测,所以当我们的协程比较密集繁忙是,就需要取消这种检测机制。
2种方法:
1,.onEach { currentCoroutineContext().ensureActive() }
2, 提供了cancellable()

 withTimeoutOrNull(250) { // Timeout after 250ms
        simple().collect { value -> println(value) }
    }
    
    val flow = simple() 
    flow.cancellable().map { it }.collect { value ->
        if (value == 3) cancel()
        println(value)
    }

Flow Buffering 缓存:发射速度or处理速度不均衡时,就需要buffer

一般情况,上游生产下游接收处理是同时执行的,但是当上游生产速度超过下游处理速度,就需要设置buffer

BufferOverflow.SUSPEND 溢出,挂起当前协程
BufferOverflow.DROP_OLDEST 溢出,丢掉老数据
BufferOverflow.DROP_LATEST 溢出,丢掉新数据

public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {}
flow.map { it }
.buffer(BUFFERED,BufferOverflow.SUSPEND)

Conflation
合并处理,当程序不需要处理每一个值时,可能只关心最近的数据,可以用conflate()跳过中间值,返回最近的值。 中间某些值就漏掉了

confate()就相当于设置了缓存buffer:BufferOverflow.DROP_OLDEST 溢出,丢掉老数据。就是缓存DROP_OLDEST的快捷方式!!


Requests a conflated channel in the Channel(...) factory function. 
This is a shortcut to creating a channel with onBufferOverflow = DROP_OLDEST.
public const val CONFLATED: Int = -1

public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)

collectLatest 只接受最后一条数据

flow.collectLatest { value ->
        delay(300)
        println("value = $value")
    }

Composing multiple flows 多流操作

zip(),combine()

val nums = (1..3).asFlow() // numbers 1..3
    val strs = flowOf("one", "two", "three") // strings 
    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
        .collect { println(it) } // collect and print
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms          
val startTime = System.currentTimeMillis() // remember the start time 
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 

Flattening flows 展平流
流里面的流,集合中的集合流
Kotlin提供了一些展平操作符,flatMapConcat,flatMapMerge,flatMapLatest

flatMapConcat,按照顺序将内部流展平为单个流,然后收集,最后返回一个所有的集合流
是map(transform).flattenConcat()的快捷方式

@FlowPreview
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
    map(transform).flattenConcat()
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapConcat { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

输出:
1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start

flatMapMerge:同时收集所有传入流,并将其值合并到单个流中,以便尽快发出值
是map(transform).flattenMerge()的快捷方式

@FlowPreview
public fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) -> Flow<R>
): Flow<R> =
    map(transform).flattenMerge(concurrency)
val startTime = System.currentTimeMillis()
    (1..3).asFlow().onEach { delay(100) }.flatMapMerge { requestFlow(it) }.collect { value -> // collect and print
        println("$value at ${System.currentTimeMillis() - startTime} ms from start")
    }

输出:
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start

flatMapLatest 发出新流就会取消对上一个流收集,就是会看到多次上游发射,收集只收集最后一个

  移动开发 最新文章
Vue3装载axios和element-ui
android adb cmd
【xcode】Xcode常用快捷键与技巧
Android开发中的线程池使用
Java 和 Android 的 Base64
Android 测试文字编码格式
微信小程序支付
安卓权限记录
知乎之自动养号
【Android Jetpack】DataStore
上一篇文章      下一篇文章      查看所有文章
加:2022-03-10 22:41:17  更:2022-03-10 22:44:58 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 18:47:44-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码