Flow:异步数据流,上游按顺序发送值,下游接收值。 是以协程为基础,采用响应式编程方式可以连续进行操作的库,类似RxJava ,属于生产者-消费者模式,上游流生产,下游流接收处理
Flow特点: 1,属于冷流,默认上游流代码不被执行,只有在下游流处理时才会执行上游代码,一般是emit()发送,collect{}接收(终端操作符均可接收) 2,链式调用 3,中间操作符:map,filter,take,zip,combine等(数据发送后中间加工一下) 4,终端操作符: collect, single, reduce, toList等(触发上游流代码执行,接收处理)
Flow的创建:
- flowOf() 以固定值创建流
- asFlow() 扩展函数将各种类型(集合,序列等)转换为流,asFlow()有各种类型的扩展
- flow { } 构建任意流,内部调用了emit()函数
- channelFlow { } 构建通道流,内部并发调用了send()函数
- MutableStateFlow and MutableSharedFlow 创建可以直接更新的热流
冷流的体现:
fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking {
listOf(1, 2, 3).asFlow().collect { value -> println(value) }
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
输出:
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
Flow流切换协程:flowOn()
一般流是按照顺序执行的,都是在同一个协程中,上游发送,下游接收。 下游接收流在哪个协程中,上游代码就在哪个协程中。不能手动更改协程上下文,否则就会报错。如果要更改请使用Kotlin提供的Flow扩展函数flowOn()。
另,可以使用launchIn(CoroutineScope(Dispatchers.IO))返回一个新的协程,launchIn的返回值是Job
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {}
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100)
log("Emitting $i")
emit(i)
}
}.flowOn(Dispatchers.Default)
fun main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}
输出:
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3
可以多次运用flowOn()方法
flow.flowOn(Dispatchers.Default).map { it }.flowOn(Dispatchers.IO).toList()
Flow异常处理: 1,flow.catch{} 2,try { }catch(e: Throwable){}finally {}
Flow开始和完成: onStart { } onCompletion { } or try { }finally {}
val flow = simple()
flow.flowOn(Dispatchers.Default)
.map { it }
.flowOn(Dispatchers.IO)
.onStart { println("onStart function... ${Thread.currentThread().name}") }
.onCompletion {
println("onCompletion function... ${Thread.currentThread().name}")
throw NullPointerException()
}
.catch { println("catch function... ${Thread.currentThread().name}") }
.collect { value -> println("value = $value")}
输出:
Calling simple function... main
onStart function... main
Flow started DefaultDispatcher-worker-3
value = 1
value = 2
value = 3
value = 4
value = 5
value = 6
onCompletion function... main
catch function... main
Flow取消流: 由于流是以协程为基础,所以Flow的取消和协程的取消相同, 1,可以用withTimeoutOrNull(){},超时取消 or 抛出异常中段执行。 2,如果采用了launchIn()那就是一个全新的协程,可以通过cancel()和join()或者cancelAndJoin()取消协程 3,取消flow所在的协程也是可以的
Flow取消检测机制 Flow 构建器会对每个emit发出的值进行检测,所以当我们的协程比较密集繁忙是,就需要取消这种检测机制。 2种方法: 1,.onEach { currentCoroutineContext().ensureActive() } 2, 提供了cancellable()
withTimeoutOrNull(250) {
simple().collect { value -> println(value) }
}
val flow = simple()
flow.cancellable().map { it }.collect { value ->
if (value == 3) cancel()
println(value)
}
Flow Buffering 缓存:发射速度or处理速度不均衡时,就需要buffer
一般情况,上游生产下游接收处理是同时执行的,但是当上游生产速度超过下游处理速度,就需要设置buffer
BufferOverflow.SUSPEND 溢出,挂起当前协程 BufferOverflow.DROP_OLDEST 溢出,丢掉老数据 BufferOverflow.DROP_LATEST 溢出,丢掉新数据
public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {}
flow.map { it }
.buffer(BUFFERED,BufferOverflow.SUSPEND)
Conflation 合并处理,当程序不需要处理每一个值时,可能只关心最近的数据,可以用conflate()跳过中间值,返回最近的值。 中间某些值就漏掉了
confate()就相当于设置了缓存buffer:BufferOverflow.DROP_OLDEST 溢出,丢掉老数据。就是缓存DROP_OLDEST的快捷方式!!
Requests a conflated channel in the Channel(...) factory function.
This is a shortcut to creating a channel with onBufferOverflow = DROP_OLDEST.
public const val CONFLATED: Int = -1
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
collectLatest 只接受最后一条数据
flow.collectLatest { value ->
delay(300)
println("value = $value")
}
Composing multiple flows 多流操作
zip(),combine()
val nums = (1..3).asFlow()
val strs = flowOf("one", "two", "three")
nums.zip(strs) { a, b -> "$a -> $b" }
.collect { println(it) }
val nums = (1..3).asFlow().onEach { delay(300) }
val strs = flowOf("one", "two", "three").onEach { delay(400) }
val startTime = System.currentTimeMillis()
nums.combine(strs) { a, b -> "$a -> $b" }
.collect { value ->
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
Flattening flows 展平流 流里面的流,集合中的集合流 Kotlin提供了一些展平操作符,flatMapConcat,flatMapMerge,flatMapLatest
flatMapConcat,按照顺序将内部流展平为单个流,然后收集,最后返回一个所有的集合流 是map(transform).flattenConcat()的快捷方式
@FlowPreview
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
map(transform).flattenConcat()
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500)
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }
.flatMapConcat { requestFlow(it) }
.collect { value ->
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
输出: 1: First at 121 ms from start 1: Second at 622 ms from start 2: First at 727 ms from start 2: Second at 1227 ms from start 3: First at 1328 ms from start 3: Second at 1829 ms from start
flatMapMerge:同时收集所有传入流,并将其值合并到单个流中,以便尽快发出值 是map(transform).flattenMerge()的快捷方式
@FlowPreview
public fun <T, R> Flow<T>.flatMapMerge(
concurrency: Int = DEFAULT_CONCURRENCY,
transform: suspend (value: T) -> Flow<R>
): Flow<R> =
map(transform).flattenMerge(concurrency)
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }.flatMapMerge { requestFlow(it) }.collect { value ->
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
输出: 1: First at 136 ms from start 2: First at 231 ms from start 3: First at 333 ms from start 1: Second at 639 ms from start 2: Second at 732 ms from start 3: Second at 833 ms from start
flatMapLatest 发出新流就会取消对上一个流收集,就是会看到多次上游发射,收集只收集最后一个
|