前言
本来这一篇准备写Jetpack对应的paging的,但在整理资料的时候,发现Kotlin还有Flow未讲解,这个也是一大重点,因此本篇将对Flow进行详解!
方便后续结合Flow与Paging,进行混合讲解!
既然如此!那么Flow是什么呢?
1、认识Flow
1.1 Kotlin Flow 介绍
官方文档给予了一句话简单的介绍:
Flow — cold asynchronous stream with flow builder and comprehensive operator set (filter, map, etc);
Flow 从文档的介绍来看,它有点类似 RxJava 的 Observable。因为 Observable 也有 Cold 、Hot 之分。
官方的就是太官方了,对于不熟悉的RxJava 的小伙伴,光凭这个概念还是有点云里雾里。
我们还是通过一系列小案例来逐步深入Flow!
1.1 如何表示多个值?
仔细想想:挂起函数可以异步的返回单个值,但是该如何异步返回多个计算好的值呢?
emmm…
异步返回多个值能从哪些方面入手呢?
我们来看看这几种方案怎么实现的?
1.1.1 集合
fun simpleList(): List<Int> = listOf<Int>(1, 2, 3)
@Test
fun `test multiple values`() {
simpleList().forEach { value -> println(value) }
}
这种方式确实是返回了多个值,但不是异步!
1.1.2 序列
fun simpleSequence(): Sequence<Int> = sequence {
for (i in 1..3) {
yield(i)
}
}
@Test
fun `test multiple values`() {
simpleSequence().forEach { value -> println(value) }
}
这种方式也是返回了多个值,但不是异步!
1.1.3 挂起函数
suspend fun simpleList2(): List<Int> {
delay(1000)
return listOf<Int>(1, 2, 3)
}
@Test
fun `test multiple values2`() = runBlocking<Unit> {
simpleList2().forEach { value -> println(value) }
}
这种方式既返回了多个值,并且也是异步!满足异步返回多个值!
那么Flow方式该怎样呢?
1.1.4 Flow方式
fun simpleFlow() = flow<Int> {
for (i in 1..3) {
delay(1000)
emit(i)
}
}
@Test
fun `test multiple values3`() = runBlocking<Unit> {
simpleFlow().collect { value -> println(value) }
}
这里我们看到simpleFlow 方法,里面使用了delay 挂起函数,但这个方法并没有suspend 修饰符,因此该方法并不是挂起函数!可以在任意地方使用!(非协程模块,非挂起模块)
看完了上面的示例,现在总结下:
1.1.5 Flow与其他方式的区别
- 名为flow的Flow类型构建起函数
flow{...} 构建块中的代码可以挂起- 函数simpleFlow不再标有suspend修饰符
- 流使用emit函数发射值
- 流使用collect函数收集值
再来一张图,加深下印象 现在对Flow有了大致的印象,那么它有啥作用呢?
1.2 Flow应用
在Android开发中,文件下载是Flow的一个非常经典的案例
如图所示
当文件下载时,对应的后台下载进度,就可以通过Flow里面的emit发送数据,通过collect接收对应的数据。(后面将会结合Jetpack对应的paging进行讲解)
1.3 冷流
fun simpleFlow2() = flow<Int> {
println("Flow started")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
@Test
fun `test flow is cold`() = runBlocking<Unit> {
val flow = simpleFlow2()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
运行结果
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
从这个运行结果可以看出:
Flow是一种类似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行。
注意:下篇准备讲的channelFlow是热流!
1.4 流的连续性
@Test
fun `test flow continuation`() = runBlocking<Unit> {
(1..5).asFlow().filter {
it % 2 == 0
}.map {
"string $it"
}.collect {
println("Collect $it")
}
}
运行结果
Collect string 2
Collect string 4
从这个运行效果可知:
- 流的每次单独收集都是按顺序执行的,除非使用特殊操作符
- 从上游到下游每个过度操作符都会处理每个发射出的值,然后再交给末端操作符
1.5 流构建器
@Test
fun `test flow builder`() = runBlocking<Unit> {
flowOf("one","two","three")
.onEach { delay(1000) }
.collect { value ->
println(value)
}
(1..3).asFlow().collect { value ->
println(value)
}
}
运行效果
one
two
three
1
2
3
从这个运行效果可以得知:可以通过对应的flowOf 与asFlow 构建对应的flow流
1.6 流的上下文
这里将通过几个小案例进行详解
1.6.1 案例一
fun simpleFlow3() = flow<Int> {
println("Flow started ${Thread.currentThread().name}")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
@Test
fun `test flow context`() = runBlocking<Unit> {
simpleFlow3()
.collect { value -> println("Collected $value ${Thread.currentThread().name}") }
}
运行效果
Flow started Test worker @coroutine
Collected 1 Test worker @coroutine
Collected 2 Test worker @coroutine
Collected 3 Test worker @coroutine
从这里可以看出,从emit到collect 上下文贯穿了所有,都为同一个上下文。
那如果说,想在emit 时使用另一个上下文该怎样呢?
1.6.2 案例二
fun simpleFlow4() = flow<Int> {
withContext(Dispatchers.Default) {
println("Flow started ${Thread.currentThread().name}")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
}
@Test
fun `test flow context`() = runBlocking<Unit> {
simpleFlow4()
.collect { value -> println("Collected $value ${Thread.currentThread().name}") }
}
运行效果
Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@3f3e9270, BlockingEventLoop@67bc91f8],
...略
可以发现,这样写直接崩溃了,并不是Flow invariant is violated: ,那么该如何使用呢?
1.6.3 案例三
fun simpleFlow5() = flow<Int> {
println("Flow started ${Thread.currentThread().name}")
for (i in 1..3) {
delay(1000)
emit(i)
}
}.flowOn(Dispatchers.Default)
@Test
fun `test flow context`() = runBlocking<Unit> {
simpleFlow5()
.collect { value -> println("Collected $value ${Thread.currentThread().name}") }
}
运行效果
Flow started DefaultDispatcher-worker-1 @coroutine
Collected 1 Test worker @coroutine
Collected 2 Test worker @coroutine
Collected 3 Test worker @coroutine
从这几个小案例可以总结出
- 流的收集总是在调用协程的上下文中发生,流的该属性成为上下文保存。
flow{...} 构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发生(emit)flowOn操作符 ,该函数用于更改流发射的上下文
1.7 启动流
1.7.1 案例一
fun events() = (1..3)
.asFlow()
.onEach { delay(100) }
.flowOn(Dispatchers.Default)
@Test
fun `test flow launch`() = runBlocking<Unit> {
val job = events()
.onEach { event -> println("Event: $event ${Thread.currentThread().name}") }
.launchIn(CoroutineScope(Dispatchers.IO))
delay(200)
job.cancelAndJoin()
}
这里我们看到已经取消了collect ,而改为了launchIn ,对应传入了新的上下文作为Flow的保存上下文。
正因为这里传入的非当前上下文所以需要调用job.join() 或者job.cancelAndJoin() ,来等待对应Flow完成对应操作
运行效果
Event: 1 DefaultDispatcher-worker-1 @coroutine
Event: 2 DefaultDispatcher-worker-1 @coroutine
因为这里挂起了两秒就取消了,所以这里并没有打印所有的日志,同时上下文为:DefaultDispatcher
再来看看案例二
1.7.2 案例二
fun events() = (1..3)
.asFlow()
.onEach { delay(100) }
.flowOn(Dispatchers.Default)
@Test
fun `test flow launch`() = runBlocking<Unit> {
val job = events()
.onEach { event -> println("Event: $event ${Thread.currentThread().name}") }
.launchIn(this)
}
运行效果
Event: 1 Test worker @coroutine
Event: 2 Test worker @coroutine
Event: 3 Test worker @coroutine
因为这里Flow传入的当前上下文,因此不需要额外通过其他方式等待执行完成。
1.8 流的取消
1.8.1 被动取消
fun simpleFlow6() = flow<Int> {
for (i in 1..3) {
delay(1000)
emit(i)
println("Emitting $i")
}
}
@Test
fun `test cancel flow`() = runBlocking<Unit> {
withTimeoutOrNull(2500) {
simpleFlow6().collect { value -> println(value) }
}
println("Done")
}
运行效果
1
Emitting 1
2
Emitting 2
Done
这里我们看到,使用了withTimeoutOrNull 设置超时的方法,让它在超时情况下取消并停止执行。
不过这个取消都是被动取消,如果主动取消该是怎样呢?
1.8.2 主动取消
@Test
fun `test cancel flow check`() = runBlocking<Unit> {
(1..5).asFlow().collect { value ->
println(value)
if (value == 3) cancel()
println("cancel check ${coroutineContext[Job]?.isActive}")
}
}
运行效果
1
cancel check true
2
cancel check true
3
cancel check false
4
cancel check false
5
cancel check false
这里我们看到,主动取消时,对应状态已经变了,但是还是全部执行了!
这是因为这里并没有加入取消检测!
1.8.3 流的取消检测
@Test
fun `test cancel flow check`() = runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
println(value)
if (value == 3) cancel()
println("cancel check ${coroutineContext[Job]?.isActive}")
}
}
这里我们看到使用了cancellable 方法!
再次运行看看效果
1
cancel check true
2
cancel check true
3
cancel check false
OK,这里可以看到已经成功的取消了!进入下一专题!
1.9 背压
讲这个之前,我们先看比较原始的案例
fun simpleFlow8() = flow<Int> {
for (i in 1..3) {
delay(100)
emit(i)
println("Emitting $i ${Thread.currentThread().name}")
}
}
@Test
fun `test flow back pressure`() = runBlocking<Unit> {
val time = measureTimeMillis {
simpleFlow8()
.collect { value ->
delay(300)
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms")
}
来看看运行效果
Collected 1 Test worker @coroutine
Emitting 1 Test worker @coroutine
Collected 2 Test worker @coroutine
Emitting 2 Test worker @coroutine
Collected 3 Test worker @coroutine
Emitting 3 Test worker @coroutine
Collected in 1237 ms
这里我们看到,这是非常标准的一个生产者—消费者模式,都是一一对应的。那么加上不同的关键字试试?
1.9.1 buffer(xx)
@Test
fun `test flow back pressure`() = runBlocking<Unit> {
val time = measureTimeMillis {
simpleFlow8()
.buffer(50)
.collect { value ->
delay(300)
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms")
}
这里我们看到加入了.buffer(50) !来看看运行效果
Emitting 1 Test worker @coroutine
Emitting 2 Test worker @coroutine
Emitting 3 Test worker @coroutine
Collected 1 Test worker @coroutine
Collected 2 Test worker @coroutine
Collected 3 Test worker @coroutine
Collected in 1108 ms
这里我们看到生产的消息先是全堆在一起,然后集中发送,总耗时也比标准的少了一点。
如图所示
我们暂可理解为,buffer(50) 将对应的传输通道变长了,使传输通道能够装更多的元素。
接下来,继续看下一个操作符!
1.9.2 conflate()
@Test
fun `test flow back pressure`() = runBlocking<Unit> {
val time = measureTimeMillis {
simpleFlow8()
.conflate()
.collect { value ->
delay(300)
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms")
}
运行效果
Emitting 1 Test worker @coroutine
Emitting 2 Test worker @coroutine
Emitting 3 Test worker @coroutine
Collected 1 Test worker @coroutine
Collected 3 Test worker @coroutine
Collected in 800 ms
这里我们看到,消费者并非全部处理完对应生产者的元素。接着看下一个!
1.9.3 collectLatest{}
@Test
fun `test flow back pressure`() = runBlocking<Unit> {
val time = measureTimeMillis {
simpleFlow8()
.collectLatest { value ->
delay(300)
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms")
}
这次,将collect 换成了collectLatest ,看其名,感觉像是只处理最后一个元素,来看看运行效果
Emitting 1 Test worker @coroutine
Emitting 2 Test worker @coroutine
Emitting 3 Test worker @coroutine
Collected 3 Test worker @coroutine
Collected in 807 ms
果真如此,该操作符只会处理最后一个元素!最后再来个总结!
1.9.4 背压总结
- buffer(),并发运行流中发射元素的代码;
- conflate(),合并发射项,不对每个值进行处理;
- collectLatest(),取消并重新发射最后一个值
2、操作符
2.1 过渡流操作符
2.1.1 案例一
suspend fun performRequest(request: Int): String {
delay(500)
return "response $request"
}
@Test
fun `test transform flow operator`() = runBlocking<Unit> {
(1..3).asFlow()
.map { request -> performRequest(request) }
.collect { value -> println(value) }
}
运行效果
response 1
response 2
response 3
这里我们看到,将数据流转为Map类型,然后依次发送每个元素!
那如果说,想要发送前后还额外想发送自己的自定义元素该怎么办呢?
suspend fun performRequest(request: Int): String {
delay(500)
return "response $request"
}
@Test
fun `test transform flow operator`() = runBlocking<Unit> {
(1..3).asFlow()
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}.collect { value -> println(value) }
}
这里我们看到使用了transform ,在对应闭包里使用了emit 发送元素,来看看运行效果
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
从这个运行效果可知:我们可以通过这样的方式,来自定义发射元素!
2.1.2 案例二
fun numbers() = flow<Int> {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
@Test
fun `test limit length operator`() = runBlocking<Unit> {
numbers().take(2).collect { value -> println(value) }
}
来看看运行效果
1
2
Finally in numbers
从这个运行效果可知:当消费者处理元素2时,就将对应的原始流给取消掉了!
2.1.3 总结
过渡流操作符:
- 可以使用操作符转换流,就像使用集合与序列一样;
- 过渡操作符应用于上游流,并返回下游流;
- 这些操作符也是冷操作符,就像流一样。这类操作符本身不是挂起函数
- 它运行速度很快,返回新的转换的定义
2.2 末端操作符
末端操作符是在流上用户**启动流收集的挂起函数。**collect是最基础的末端操作符,但是还有另外一些更方便使用的末端操作符:
- 转化为各种集合,比如toList与toSet;
- 获取第一个(first)值与确保流发射单个(single)值的操作符
- 使用reduce与fold将流规约到单个值
说了这么多,来看看怎么使用的!
@Test
fun `test terminal operator`() = runBlocking<Unit> {
val sum = (1..5).asFlow()
.map {
println("it * it= ${it * it}")
it * it
}
.reduce { a, b ->
println("a=$a,b=$b,a+b=${a + b}")
a + b
}
println(sum)
}
运行效果
it * it= 1
it * it= 4
a=1,b=4,a+b=5
it * it= 9
a=5,b=9,a+b=14
it * it= 16
a=14,b=16,a+b=30
it * it= 25
a=30,b=25,a+b=55
55
一切尽在注释中,虽然有点少!下一个!
2.3 组合多个流
就像Kotlin标准库重的Sequence.zip扩展函数一样,流拥有一个zip操作符用于组合两个流中的相关值!
话不多说,直接开整!
2.3.1 案例一
@Test
fun `test zip`() = runBlocking<Unit> {
val numbs = (1..3).asFlow()
val strs = flowOf("One", "Two", "Three")
numbs.zip(strs) { a, b -> "$a -> $b" }.collect { println(it) }
}
运行效果
1 -> One
2 -> Two
3 -> Three
很简单,通过zip 将对应流亚索成一个,然后输出!狠简单,再来个稍微复杂点的!
2.3.2 案例二
@Test
fun `test zip2`() = runBlocking<Unit> {
val numbs = (1..3).asFlow().onEach { delay(300) }
val strs = flowOf("One", "Two", "Three").onEach { delay(400) }
val startTime = System.currentTimeMillis()
numbs.zip(strs) { a, b -> "$a -> $b" }.collect {
println("$it at ${System.currentTimeMillis() - startTime} ms from start")
}
}
哈哈哈,稍微复杂,就是在对应流后面加了额外的挂起等待,就是模拟对应耗时操作
运行效果
1 -> One at 462 ms from start
2 -> Two at 861 ms from start
3 -> Three at 1269 ms from start
因为这里是挂起,并非阻塞,因此numbs 与strs 在合并发射元素时,他们互相不干扰,各做各的。
因此每次发射用的基础时间是以strs 为准,而不是numbs 与strs 这两者之和。 相信读者能够进一步认识挂起与阻塞的区别。
2.4 展平流
流表示异步接收的值序列,所以很容易遇到这样的情况:每个值都会触发对另一个值序列的请求,然而,由于流具有异步的性质,因此需要不同的展平模式,所以存在一系列的流展平操作符:
- flatMapConcat 连接模式;
- flatMapMerge 合并模式;
- flatMapLatest 最新展平模式
来看看怎么使用!
2.4.1 flatMapConcat 连接模式
fun requestFlow(i: Int) = flow<String> {
emit("$i: First")
delay(500)
emit("$i: Second")
}
@Test
fun `test flatMapConcat`() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
(1..3).asFlow()
.onEach { delay(100) }
.flatMapConcat { requestFlow(it) }
.collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
}
运行效果
1: First at 147 ms from start
1: Second at 653 ms from start
2: First at 755 ms from start
2: Second at 1256 ms from start
3: First at 1357 ms from start
3: Second at 1859 ms from start
这里我们看到这里将两个流连接了起来,类似于串联模式
2.4.2 flatMapMerge 合并模式
fun requestFlow(i: Int) = flow<String> {
emit("$i: First")
delay(500)
emit("$i: Second")
}
@Test
fun `test flatMapMerge`() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
(1..3).asFlow()
.onEach { delay(100) }
.flatMapMerge { requestFlow(it) }
.collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
}
运行效果
1: First at 166 ms from start
2: First at 261 ms from start
3: First at 366 ms from start
1: Second at 668 ms from start
2: Second at 762 ms from start
3: Second at 871 ms from start
这种模式类似于上面的组合+背压里面的conflate模式,先是生产者将元素全部生成好,随后再通知消费者消费!
2.4.3 flatMapLatest 最新展平模式
@Test
fun `test flatMapLatest`() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
(1..3).asFlow()
.onEach { delay(100) }
.flatMapLatest { requestFlow(it) }
.collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
}
运行效果
1: First at 164 ms from start
2: First at 364 ms from start
3: First at 469 ms from start
3: Second at 971 ms from start
这种模式就类似于上面的组合+collectLatest模式。很简单!
3、异常处理
3.1 流的异常处理
当运算符中的发射器或代码抛出异常时,有几种处理异常的方法:
3.1.1 案例一(try/catch块)
fun simpleFlow() = flow<Int> {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
@Test
fun `test flow exception`() = runBlocking<Unit> {
try {
simpleFlow().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
运行效果
Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2
这个狠简单,下一个!
3.1.2 案例二(catch函数)
@Test
fun `test flow exception2`() = runBlocking<Unit> {
flow {
emit(1)
throw ArithmeticException("Div 0")
}.catch { e: Throwable -> println("Caught $e") }
.flowOn(Dispatchers.IO)
.collect { println(it) }
}
现在我们直接通过catch {} 代码块,来处理对应的异常信息。
运行效果
1
Caught java.lang.ArithmeticException: Div 0
我们看到这样写,所有的逻辑全在catch里面,能不能提出来呢?
@Test
fun `test flow exception2`() = runBlocking<Unit> {
flow {
emit(1)
throw ArithmeticException("Div 0")
}.catch { e: Throwable ->
println("Caught $e")
emit(10)
}.flowOn(Dispatchers.IO).collect { println(it) }
}
我们看到,将对应逻辑提出来了,而且这样看无论是代码格式以及阅读性都比之前好了多
来看看运行效果
Caught java.lang.ArithmeticException: Div 0
1
10
3.2 流的完成
当流收集完成时(普通情况或异常情况),它可能需要执行一个动作。
- 命令式finally块
- onCompletion声明式处理
3.2.1 案例一(finally块)
fun simpleFlow2() = (1..3).asFlow()
@Test
fun `test flow complete in finally`() = runBlocking<Unit> {
try {
simpleFlow2().collect { println(it) }
} finally {
println("Done")
}
}
很简单的代码块,这个就不贴运行效果了哈,不用想最终会打印Done!
3.2.2 案例二(onCompletion)
@Test
fun `test flow complete in onCompletion`() = runBlocking<Unit> {
simpleFlow2()
.onCompletion { println("Done") }
.collect { println(it) }
}
这里在上一个案例基础上去掉了try…finally,并额外加了onCompletion 代码块,里面实现了对应的逻辑
来看看运行效果
1
2
3
Done
很明显和上面一样!
结束语
好了,本篇到这就结束了!相信看到这的小伙伴对Flow异步流有了一个比较清晰的认知!在下一篇中,将会讲解Flow对应的通道-多路复用-并发安全
|