在Kotlin普及之前,RxJava 无疑是Android开发领域中最受欢迎的响应式编程的三方库,而RxJava 在我们日常的Android开发应用的最多的场景就是配合Retrofit 进行网络请求和类似EventBus 的事件订阅(RxBus)。但是到了2017年,随着LiveData 刚一面世,就受到了很大的关注,LiveData 是一个以观察者模式为核心,让界面对变量进行订阅,从而实现自动通知刷新的组件。跟一般的订阅比起来,LiveData 有两大特点:一是他的目标非常直接,直指界面刷新,所以它的数据更新只发生在主线程。二是它借助了另外一个组件Lifecycle 的功能,让它可以只在界面到了前台的时候才通知更新,避免了浪费性能。并且LiveData 相比RxJava 来说也有两大优点:
LiveData 的学习成本比较低LiveData 相比较于RxJava 要轻量级很多
所以在一些简单场景人们逐渐从RxJava 过渡到LiveData ,而一些比较复杂的场景还是使用RxJava ,因为LiveData 的轻量级也决定了它不够强大,不适合一些复杂场景。而随着Kotlin协程库的更新,Flow 诞生了。
Flow 库是在 Kotlin Coroutines 1.3.2 发布之后新增的库。从文档的介绍来看Flow 有点类似 RxJava ,都是一种基于事件的响应式编程。那么接下来我们就看一下Flow 的基本使用。
1.创建Flow
fun simpleFlow(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
// Collect the flow
simpleFlow().collect { value -> println(value) }
}
通过上面例子可以看到,Flow有以下特征:
2.Flow是冷流
Flow 是一种冷流,Flow 构建器中的代码只有在collect 函数被执行的时候才会运行。这一点与 Channel 正对应:Channel 的发送端并不依赖于接收端。
fun simpleFlow2() = flow<Int> {
println("Flow started")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
@Test
fun `test flow is cold`() = runBlocking<Unit> {
val flow = simpleFlow2()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
3.Flow是具有连续性的流
流的每次单独收集都是按顺序执行的,除非使用了特殊的操作符,从上游到下游每个过渡操作符都会处理每个发射出的值,然后再交给末端操作符
@Test
fun `test flow continuation`() = runBlocking<Unit> {
(1..5).asFlow().filter {
it % 2 == 0
}.map {
"string $it"
}.collect {
println("Collect $it")
}
}
4.Flow的构建器
通常情况下有两种方式可以构建一个Flow ,一种是通过flowOf 构建器定义一个发射固定值集的流
flowOf("one","two","three")
.onEach { delay(1000) }
.collect { value ->
println(value)
}
另一种方式是使用.asFlow() 扩展函数可以将各种集合与序列转换为Flow
(1..3).asFlow().collect { value ->
println(value)
}
5.Flow的上下文
Flow 的收集总是在调用协程的上下文中发生的,Flow 的该属性称为上下文保存
fun simpleFlow3() = flow<Int> {
println("Flow started ${Thread.currentThread().name}")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
@Test
fun `test flow context`() = runBlocking<Unit> {
simpleFlow3()
.collect { value -> println("Collected $value ${Thread.currentThread().name}") }
}
flow{...} 构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发射(emit)
fun simpleFlow4() = flow<Int> {
withContext(Dispatchers.IO) {
println("Flow started ${Thread.currentThread().name}")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
} //Error
flowOn 操作符用于更改流发射的上下文
fun simpleFlow5() = flow<Int> {
println("Flow started ${Thread.currentThread().name}")
for (i in 1..3) {
delay(1000)
emit(i)
}
}.flowOn(Dispatchers.Default)
6.分离 Flow 的消费和触发
我们除了可以在 collect 处消费 Flow 的元素以外,还可以通过 onEach 来做到这一点。这样消费的具体操作就不需要与末端操作符放到一起,collect 函数可以放到其他任意位置调用,例如:
fun createFlow() = flow<Int> {
(1..3).forEach {
emit(it)
delay(100)
}
}.onEach { println(it) }
fun main(){
GlobalScope.launch {
createFlow().collect()
}
}
7.Flow的取消
Flow 本身并没有提供取消操作, Flow 的消费依赖于 collect 这样的末端操作符,而它们又必须在协程当中调用,因此 Flow 的取消主要依赖于末端操作符所在的协程的状态。像往常一样,Flow 的收集可以是当流在一个可取消的挂起函数中取消的时候取消。
fun simpleFlow6() = flow<Int> {
for (i in 1..3) {
delay(1000)
emit(i)
println("Emitting $i")
}
}
@Test
fun `test cancel flow`() = runBlocking<Unit> {
withTimeoutOrNull(2500) {
simpleFlow6().collect { value -> println(value) }
}
println("Done")
}
8.Flow的取消检测
- 为方便起见,流构建器对每个发射值执行附加的enureActive检测以进行取消,这意味着从
flow{...} 发出的繁忙循环是可以取消的
fun simpleFlow7() = flow<Int> {
for (i in 1..5) {
emit(i)
println("Emitting $i")
}
}
@Test
fun `test cancel flow check`() = runBlocking<Unit> {
simpleFlow7().collect { value ->
println(value)
if (value == 3) cancel()
}
}
- 出于性能原因,大多数其他流操作不会自行执行其他取消检测,在协程出于繁忙循环的情况下,必须明确检测是否取消。
@Test
fun `test cancel flow check`() = runBlocking<Unit> {
(1..5).asFlow().collect { value ->
println(value)
if (value == 3) cancel()
}
}
@Test
fun `test cancel flow check`() = runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
println(value)
if (value == 3) cancel()
}
}
9.Flow的背压
只要是响应式编程,就一定会有背压问题,生产者的生产速率高于消费者的处理速率的情况下出现。为了保证数据不丢失,我们也会考虑添加缓存来缓解问题,buffer 的本质是并发运行流中发射元素的代码
fun simpleFlow8() = flow<Int> {
for (i in 1..3) {
delay(100)
emit(i)
println("Emitting $i ${Thread.currentThread().name}")
}
}
@Test
fun `test flow back pressure`() = runBlocking<Unit> {
val time = measureTimeMillis {
simpleFlow8()
.buffer(50)
.collect { value ->
delay(300)
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms") //1028ms
}
不过,如果我们只是单纯地添加缓存,而不是从根本上解决问题就始终会造成数据积压。问题产生的根本原因是生产和消费速率的不匹配,除直接优化消费者的性能以外,我们也可以采取一些取舍的手段。第一种是 conflate ,conflate() 的策略是如果缓存池满了,新数据会覆盖老数据
fun simpleFlow8() = flow<Int> {
for (i in 1..3) {
delay(100)
emit(i)
println("Emitting $i ${Thread.currentThread().name}")
}
}
@Test
fun `test flow back pressure`() = runBlocking<Unit> {
val time = measureTimeMillis {
simpleFlow8()
.conflate()
.collect { value ->
println("Collected start $value ${Thread.currentThread().name}")
delay(300)
println("Collected end $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms") //770ms
}
第二种是 collectLatest 。顾名思义,只处理最新的数据,这看上去似乎与 conflate 没有区别,其实区别大了:它并不会直接用新数据覆盖老数据,而是每一个都会被处理,只不过如果前一个还没被处理完后一个就来了的话,处理前一个数据的逻辑就会被取消。
fun simpleFlow8() = flow<Int> {
for (i in 1..3) {
delay(100)
emit(i)
println("Emitting $i ${Thread.currentThread().name}")
}
}
@Test
fun `test flow back pressure`() = runBlocking<Unit> {
val time = measureTimeMillis {
simpleFlow8()
.collectLatest { value ->
println("Collected start $value ${Thread.currentThread().name}")
delay(300)
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms")//785ms
}
除 collectLatest 之外还有 mapLatest 、flatMapLatest 等等,都是这个作用。
10.Flow的操作符
- 转换操作符(过渡操作符)
- 可以使用操作符转换流,就像使用集合与序列一样
- 过渡操作符应用于上游流,并返回下游流
- 这些操作符也是冷操作符,并且这类操作符本身并不是挂起函数
- 运行速度很快,返回新的转换流的定义
suspend fun performRequest(request: Int): String {
delay(1000)
return "response $request"
}
@Test
fun `test transform flow operator1`() = runBlocking<Unit> {
(1..3).asFlow()
.map { request -> performRequest(request) }
.collect { value -> println(value) }
}
@Test
fun `test transform flow operator2`() = runBlocking<Unit> {
(1..3).asFlow()
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}.collect { value -> println(value) }
}
- 限长操作符
fun numbers() = flow<Int> {
emit(1)
emit(2)
emit(3)
}
@Test
fun `test limit length operator`() = runBlocking<Unit> {
numbers().take(2).collect { value -> println(value) }
}
- 末端操作符
末端操作符是在流上用于启动流收集的挂起函数。collect 是最基础的末端操作符,功能与 RxJava 的 subscribe 类似。但还有另外一些更方便使用的末端操作符,大体分为两类:
- 集合类型转换操作,包括
toList 、toSet 等。 - 聚合操作,包括将 Flow 规约到单值的
reduce 、fold 等操作,以及获得单个元素的操作包括 single 、singleOrNull 、first 等。
实际上,识别是否为末端操作符,还有一个简单方法,由于 Flow 的消费端一定需要运行在协程当中,因此末端操作符都是挂起函数。
@Test
fun `test terminal operator`() = runBlocking<Unit> {
val sum = (1..5).asFlow()
.map { it * it }
.reduce { a, b -> a + b }
println(sum)
}
- 组合操作符
就像Kotlin标准库中的Sequence.zip() 扩展函数一样,可以使用zip 操作符组合两个流中的相关值
@Test
fun `test zip`() = runBlocking<Unit> {
val numbs = (1..3).asFlow()
val strs = flowOf("One", "Two", "Three")
numbs.zip(strs) { a, b -> "$a -> $b" }.collect { println(it) }
}
combine 虽然也是合并,但是跟 zip 不太一样。
使用 combine 合并时,每次从 flowA 发出新的 item ,会将其与 flowB 的最新的 item 合并。
fun main() = runBlocking {
val flowA = (1..5).asFlow().onEach { delay(100) }
val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200) }
flowA.combine(flowB) { a, b -> "$a and $b" }
.collect { println(it) }
}
1 and one
2 and one
3 and one
3 and two
4 and two
5 and two
5 and three
5 and four
5 and five
- 展平操作符
Flow 表示异步接收的值序列,所以很容易遇到这样的情况:每个值都会触发对另一个值序列的请求。然而由于流具有“异步”的性质,因此需要不同的展平模式,为此,存在一系列的展平操作符:
flatMapConcat flatMapMerge flatMapLatest
fun requestFlow(i: Int) = flow<String> {
emit("$i: First")
delay(500)
emit("$i: Second")
}
@Test
fun `test map`() = runBlocking<Unit> {
//Flow<Flow<String>>
val startTime = System.currentTimeMillis()
(1..3).asFlow()
.map { requestFlow(it) }
.collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
}
@Test
fun `test flatMapConcat`() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
(1..3).asFlow()
.onEach { delay(100) }
.flatMapConcat { requestFlow(it) }
.collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
}
1: First
1: Second
2: First
2: Second
3: First
3: Second
@Test
fun `test flatMapMerge`() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
(1..3).asFlow()
.onEach { delay(100) }
.flatMapMerge { requestFlow(it) }
.collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
}
1: First
2: First
3: First
1: Second
2: Second
3: Second
@Test
fun `test flatMapLatest`() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
(1..3).asFlow()
.onEach { delay(100) }
.flatMapLatest { requestFlow(it) }
.collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
}
1: First
2: First
3: First
3: Second
11.Flow的异常处理
当运算符中的发射器或者代码抛出异常时,通常有一下两个处理方法:
第一种是 try/catch 代码块
fun simpleFlow() = flow<Int> {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
@Test
fun `test flow exception`() = runBlocking<Unit> {
try {
simpleFlow().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
第二种是通过catch() 函数,但是只能捕获上游异常
@Test
fun `test flow exception2`() = runBlocking<Unit> {
flow {
emit(1)
throw ArithmeticException("Div 0")
}.catch { e: Throwable -> println("Caught $e") }
.flowOn(Dispatchers.IO)
.collect { println(it) }
}
异常恢复
@Test
fun `test flow exception2`() = runBlocking<Unit> {
flow {
throw ArithmeticException("Div 0")
emit(1)
}.catch { e: Throwable ->
println("Caught $e")
emit(10)
}.collect { println(it) }
}
12.Flow的完成
当流收集完成时,它可能需要执行一个动作
- 命令式
finally 代码块 onCompletion 声明式处理,onCompletion 用起来比较类似于 try ... catch ... finally 中的 finally ,无论前面是否存在异常,它都会被调用,参数 t 则是前面未捕获的异常。
flow {
emit(1)
throw ArithmeticException("Div 0")
}.catch { t: Throwable ->
println("caught error: $t")
}.onCompletion { t: Throwable? ->
println("finally.")
}
|