协程的异步流
为什么要有异步流?
挂起函数可以异步返回单个值, 但如果需要返回多个值怎么办? 此时便可以使用异步流
以前没有异步流的情况下, 我们使用传统方式将会是下面这样:
private fun simple(): List<Int> {
val list = mutableListOf<Int>()
for (i in 1..3) {
TimeUnit.MILLISECONDS.sleep(200)
list.add(i)
}
return list
}
fun main() = simple().forEach{ value -> println(value) }
随着我们不断学习, 发现还有一种比较好用的方式达到异步返回的功能:
private fun simple(): Sequence<Int> = sequence {
for (i in 1..3) {
TimeUnit.MILLISECONDS.sleep(200)
yield(i)
}
}
fun main(): Unit = runBlocking {
simple().forEach { i -> log("$i") }
}
前面说过了, Sequence 的好处非常明显, 不会产生额外的临时集合, 也不会需要等待其他元素再执行下一步骤的函数
对比下两种方式你会发现, List 的方式是等半天一下子同时(几乎)打印, 而 Sequence 的方式是一个一个一个的打印出来
这就涉及集合的两种操作方式了, 一种是一个集合的元素同时完成某个函数比如: filter 函数的过滤效果, 然后过滤后的元素当成一个整体再给新的函数 比如: map 函数操作, 这是一种集合类型的操作
第二种是每个集合的元素独立的, 元素不等待整个集合都执行完 filter 函数, 而是 自己 做完 filter 和 map 最后 forEach 打印出去, 不等待其它元素
现在我们学习了 协程, 发现上面这段代码的 TimeUnit.MILLISECONDS.sleep(200) 其实是阻塞了主线程, 这是不对的, 如果运行 这段代码的线程是 UI 线程呢? 此时 UI 将会在执行 sleep 而阻塞, 用户将会看到app 卡顿, 所以我们需要改造, 借助协程改造
private fun simple() = flow<Int> {
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main(): Unit = runBlocking {
simple().collect { value -> log("$value") }
}
这样的好处在于, 遇到类似于 delay 这种操作时, 主线程不会阻塞, 而是交给后台线程执行
源码:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
会发现参数suspend , 所以我们可以不需要添加额外的suspend
而 flow 使用 emit 添加值, 使用 collect 收集值
flow 是冷流(需要启动开关collect函数)
它跟 sequence 一样, 只有在调用 collect (末端操作) 时才会执行
这和 JavaScript 的 generator 一个德行,yield 函数不会被执行,直到调用了 next 函数才会执行
let i = 1
function *generator() {
for (let i = 0; i < 1000; i++) {
console.log(`zhazha ${i}`)
yield i
}
}
const gen = generator();
console.log(gen.next().value)
console.log(gen.next().value)
控制台:
zhazha 0
0
zhazha 1
1
而 kotlin flow 在“偷懒”方面的表现和 JavaScript generator 一模一样
var i = 0
fun generator() = flow {
repeat(1000) {
println("repeat $it")
emit(it)
}
}
fun main(): Unit = runBlocking {
val gen = generator()
val take = gen.take(3)
take.collect {
println(it)
}
take.collect {
println(it)
}
}
repeat 0
0
repeat 1
1
repeat 2
2
repeat 0
0
repeat 1
1
repeat 2
2
但是在便利方面不及 JavaScript ,我还是比较喜欢 next
flow 取消
超时流取消
private fun simple() = flow<Int> {
for (i in 1..3) {
delay(1000)
log("emit $i")
emit(i)
}
}
fun main(): Unit = runBlocking {
withTimeout(2500) {
simple().collect { value -> log("$value") }
}
}
流取消检测
flow 对每个元素的 emit 都有 ensureActive 检测, 好做取消, 这意味着我们的 flow 中的循环是可以取消的
fun foo() = flow<Int> {
for (i in 1..5) {
log("emit $i")
emit(i)
}
}
fun main(): Unit = runBlocking {
foo().collect { value ->
if (value == 3) {
cancel()
}
log("main $value")
}
}
[main] emit 1
[main] main 1
[main] emit 2
[main] main 2
[main] emit 3
[main] main 3
[main] emit 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@3712b94
我们在 main 函数中取消了 flow , 所以上面控制台打印最后到 emit 4 结束
但为了提高性能, 有些方式申请的 flow 并没有取消检测, 比如 range.asFLow ,
private fun simple02() = (1..20).asFlow()
fun main(): Unit = runBlocking {
simple02().collect {
if (3 == it) {
cancel()
}
log(it)
}
}
[main] 1
[main] 2
[main] 3
[main] 4
[main] ... 省略
[main] 20
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@5d76b067
让繁忙的协程可以取消
前面说了, 协程是可以取消的 ,需要使用 ensureActive 判断, 所以我们可以考虑主动给他加上取消检测功能
private fun simple03() = (1..20).asFlow()
fun main(): Unit = runBlocking {
simple03().onEach { currentCoroutineContext().ensureActive() }.collect { value ->
if (value == 3) {
cancel()
}
log(value)
}
}
这样flow 就可以取消了
同时 kotlin 还提供了 cancellable 这种方式添加取消检测:
private fun simple03() = (1..20).asFlow()
fun main(): Unit = runBlocking {
simple03().cancellable().collect { value ->
if (value == 3) {
cancel()
}
log(value)
}
}
[main] 1
[main] 2
[main] 3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@4e7dc304
上面那两种方式底层原理都一样的
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xLHL9dv9-1657634748567)(https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/13416afdf59b460c853509188bfba389~tplv-k3u1fbpfcp-watermark.image?)]
flow 的创建方式
flow {} flowOf .asFlow
listOf(1, 2, 3).asFlow().collect { value -> log("$value") }
中间流操作符
- transform
- take
这些操作和 sequence 的方式一样, 有中间操作, 在中间操作时, 流元素不会被操作, 等到末端流操作时才会一起执行掉, 而且是一个元素一个元素单独执行(除特殊函数外)
arrayOf("zhazha", "heihei", "xixi").asFlow().filter { s -> s.startsWith("z") }
.map { value -> value.first().toUpperCase() }.collect { value -> log("$value") }
变换运算符(类似Map + Filter)
fun main(): Unit = runBlocking {
val flow = (1..3).asFlow().transform { value ->
if (value % 2 == 0) {
emit(value)
} else {
emit((value + 1) * 10)
}
}
flow.collect {
println(it)
}
}
在执行流操作之前, 包装下元素
限?操作符
private fun number() = flow<Int> {
try {
emit(1)
emit(2)
log("zhazha")
emit(3)
} finally {
log("finally xixi")
}
}
fun main(): Unit = runBlocking {
val sum = number().take(2).reduce { accumulator, value ->
accumulator + value
}
println(sum)
}
末端流操作符
toList toSet first single reduce fold
总体来说和 List 或者 sequence 差不多
reduce
求和案例:
fun main() = runBlocking<Unit> {
val sum = (1..5).asFlow()
.map { it * it }
.reduce { a, b -> a + b }
println(sum)
}
55
Flow是有序的
除非使用了对多个flow进行操作的特殊运算符,否则flow的每个单独集合都是按顺序进行的。collection直接在协程中运行,该协程调用终端操作。此时在默认情况下,不会有新的协程被启动。所有的中间操作将以从上游到下游的顺序处理emit过的value,最后再交给末端操作符操作
请参见以下示例,该示例过滤偶数并将其映射到字符串:
fun main() = runBlocking<Unit> {
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
}
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
Flow上下文
Collection of a flow always happens in the context of the calling coroutine. For example, if there is a simple flow, then the following code runs in the context specified by the author of this code, regardless of the implementation details of the simple flow:
flow的collection过程总是发生在调用协程的上下文中。比如有一个simple flow,然后以下代码在开发者指定的上下文中运行,而不论 simple flow的实现细节如何
withContext(context) {
simple().collect { value ->
println(value)
}
}
simple 流的信息将被 context 上下文保存
因此,在 flow {…} 构建器中的代码运行在相应的 flow collect 提供的上下文中
例如:
考虑 simple 函数打印了调用该函数的 线程 然后 emit 了 三个数字:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun simple(): Flow<Int> = flow {
log("Started simple flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> log("Collected $value") }
}
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
由于,从主线程调用 simple.collect 函数,也在主线程中调用 simple flow 的主体。这是快速运行或异步代码的理想默认形式,它不关心执行的上下文并且不会阻塞调用者。
flow 的 collect 和 emit 必须在同一个协程上下文
flow 不是协程安全的,所以会报错IllegalStateException
前面说过的,flow 借助上下文保存属性,但很多时候我们的 UI-updaing 代码需要执行在 Dispatcher.Main 中,在执行 CPU 计算型代码的时候需要在 Dispatcher.Default 导致上下文变了,这会引起 flow 报错
fun simple(): Flow<Int> = flow {
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100)
emit(i)
}
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> println(value) }
}
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
at ...
异常已经给出解决方案了, 使用 flowOn
flowOn 保证协程安全
flowOn 将执行此流的上下文更改为给定的上下文
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100)
log("Emitting $i")
emit(i)
}
}.flowOn(Dispatchers.Default)
fun main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}
[DefaultDispatcher-worker-2 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-2 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-2 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3
仔细看 flowOn 函数的注释:
改变上下文到flow被执行所给定的上下文中。该函数是可组合的,并且只影响前面没有上下文的操作,该函数是上下文保存的:上下文不会影响到下游操作
例如:
withContext(Dispatchers.Main) {
val singleValue = intFlow
.map { ... }
.flowOn(Dispatchers.IO)
.filter { ... }
.flowOn(Dispatchers.Default)
.single()
}
flowOn 是可以做组合的
withContext(Dispatchers.Main) {
val singleValue = intFlow
.map { ... }
.flowOn(Dispatchers.IO)
.filter { ... }
.flowOn(Dispatchers.Default)
.single()
}
记得看下上面代码中的英文
buffer缓冲
private fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().collect { value ->
delay(300)
println(value)
}
}
println("Collected in $time ms")
}
正常情况下, 我们执行完上面的代码需要 1200ms+ 但是我们可以考虑使用 buffer
前面的 flowOn 就这利用这种方式实现的, 只不过前面的是隐式的, 这里是显示的
private fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().buffer().collect { value ->
delay(300)
println(value)
}
}
println("Collected in $time ms")
}
simple().buffer().collect 这样之后执行时间将会是 1000ms 左右
for (i in 1..3) {
delay(100)
emit(i)
}
他类似于起三个协程, 同时等待 100 之后一起 emit 出去
这样的话, 我们的代码只要等待 3 次 300ms 外加上 一次 100ms 就完事了, 节省了 2 次 100ms 的等待
为了不阻塞主线程, 我们还可以, 让别的线程去等待 delay 函数, 就像下面这样:
private fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
launch {
simple().buffer().collect { value ->
delay(300)
log(value)
}
}
}
println("Collected in $time ms")
}
跳过中间比较耗时的操作
有些情况下我们并不需要知道所有的流操作是否,我们只要知道中间的流操作的部分结果或者部分操作状态的更新,此时我们可以使用 conflate 跳过中间的部分操作
private fun simple02(): Flow<Int> = flow {
for (i in 1..10) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple02()
.conflate()
.collect { value ->
delay(300)
println(value)
}
}
println("Collected in $time ms")
}
main: 1
main: 3
main: 6
main: 9
main: 10
main: Collected in 1778 ms
处理最新的值
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.collectLatest { value ->
println("Collecting $value")
delay(300)
println("Done $value")
}
}
println("Collected in $time ms")
}
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 777 ms
组合多个流
Zip
就像在 kotlin 标准库中的 Sequence.zip 扩展函数一样,flows 也有一个 zip 操作,该操作 组合两个 flows 相应的值
fun main(): Unit = runBlocking {
(1..3).asFlow()
.zip(flowOf("one", "two", "three")) { a, b ->
"$a -> $b"
}.collect(::println)
}
1 -> one 2 -> two 3 -> three
Combine
当流代表一个变量或者操作的最新值时,他可能被需要执行一个取决于相对应流最新值的计算,并且每当上游flows发出一个值时,重新计算它。相应的运算符族称为组合。
展平Flow
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500)
emit("$i: Second")
}
fun main(): Unit = runBlocking {
val flow: Flow<Flow<String>> = (1..3).asFlow().map { requestFlow(it) }
flow.collect(::println)
}
注意: Flow<Flow<String>> 没有被展平成一个 Flow
很多情况,我们需要将多个 Flow 合并成一个 Flow,那么有什么方法呢?
flatMapConcat
@OptIn(FlowPreview::class)
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }
.flatMapConcat { requestFlow(it) }
.collect { value ->
println("$value at ${System.currentTimeMillis() - startTime}")
}
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3W0kWxrr-1657634748569)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/26af81bbb8ae490087db1c8c45750912~tplv-k3u1fbpfcp-watermark.image?)]
可以看到,多个 Flow 被整合成一个 Flow<String>
1: First at 198 ms from start 1: Second at 747 ms from start 2: First at 858 ms from start 2: Second at 1370 ms from start 3: First at 1480 ms from start 3: Second at 1986 ms from start
flatMapMerge 并发组合流
还有一种展平的方式flatMapMerge , 它利用了并发的方式, 将多个流组合成一个流, 但有限制, 合并的流不能超过 DEFAULT_CONCURRENCY , 默认是16个, 我们可以在 jvm 上用 DEFAULT_CONCURRENCY_PROPERTY_NAME 属性进行修改
println(measureTimeMillis {
(1..3).asFlow().onEach { delay(100) }.flatMapMerge(3) { requestFlow(it) }.collect { value -> log(value) }
})
你会发现时间明显变少 前面两种方式分别是 19xxms 18xxms , 而我们这里通常在 9xxms , 少了一半的时间
1: First at 210 ms from start 2: First at 286 ms from start 3: First at 397 ms from start 1: Second at 728 ms from start 2: Second at 789 ms from start 3: Second at 919 ms from start
注意,这里的打印就和上面的flatMapConcat 不相同了
flatMapLatest
与 collectLatest 操作符类似(在"处理最新值" 小节中已经讨论过),也有相对应的“最新”展平模式,在发出新流后立即取消先前流的收集。 这由 flatMapLatest 操作符来实现。
@OptIn(ExperimentalCoroutinesApi::class)
fun main(): Unit = runBlocking {
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }
.flatMapLatest { requestFlow(it) }
.collect {
println("$it at ${System.currentTimeMillis() - startTime} ms from start")
}
}
1: First at 265 ms from start 2: First at 381 ms from start 3: First at 485 ms from start 3: Second at 996 ms from start
注意,flatMapLatest 在一个新值到来时取消了块中的所有代码 (本示例中的 { requestFlow(it) } )。 这在该特定示例中不会有什么区别,由于调用 requestFlow 自身的速度是很快的,不会发生挂起, 所以不会被取消。然而,如果我们要在块中调用诸如 delay 之类的挂起函数,这将会被表现出来。
流异常
Flow 出现异常的处理方法
collect 函数使用 try 和 catch
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main(): Unit = runBlocking {
try {
simple().collect {
println(it)
check(it <= 1) { "Collected $it" }
}
} catch (e: Exception) {
e.printStackTrace()
println("Caught $e")
}
}
在收集器内部出现了异常,被 try catch 捕获
捕获任何异常
在非末端函数中的异常也会被捕获
fun simple() = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main(): Unit = runBlocking {
try {
simple().collect {
println(it)
}
} catch (e: Exception) {
println("Caught $e")
}
}
流异常的透明性
上面的代码存在一个问题。
从 try、catch 代码块中的 flow 构建器中emit的值是违反异常透明的
换句话说,流的异常不应给交给调用它的地调用方去处理,而是交给flow 自己去解决
透明捕获
fun simple(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
simple()
.catch { e -> emit("Caught $e") }
.collect { value -> println(value) }
}
使用这种方式捕获异常只能捕获catch函数前面的异常,如果 collect 报错的话,无法被 catch 函数捕获
如下:
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple()
.catch { e -> println("Caught $e") }
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
声明式的异常捕获
我们可以将 catch 操作符的声明性与处理所有异常的期望相结合,将 collect 操作符的代码块移动到 onEach 中,并将其放到 catch 操作符之前。收集该流必须由调用无参的 collect() 来触发
private fun simple() = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main(): Unit = runBlocking {
simple().onEach {
check(it <= 2)
log(it)
}.catch { cause -> log("Cause $cause") }.collect()
}
flow 中 on 开头的函数, 类似于 JavaScript 中的 事件触发, 比如 onClick 就是鼠标点击, OnEach 就是在遍历的时候触发
完成流(命令式和声明式)
命令式
private fun simple() = (1..3).asFlow()
fun main(): Unit = runBlocking {
try {
simple().collect { value -> log(value) }
} finally {
log("Done")
}
}
声明式
private fun simple() = (1..3).asFlow()
fun main(): Unit = runBlocking {
simple().onCompletion { cause ->
log("Done")
}.collect { value -> log(value) }
}
声明式的好处, 在于还可以发现我们的流有没有异常
private fun simple() = flow {
for (i in (1..3)) {
delay(100)
check(i <= 1) { "错误" }
emit(i)
}
}
fun main(): Unit = runBlocking {
simple().onCompletion { cause ->
if (cause != null) {
log("Flow completed with $cause")
}
}.catch { cause ->
log("Caught exception $cause")
}.collect { value -> log(value) }
}
onCompletion 不会处理流的异常,它可以被用于观察是否发生异常
所以不要企图使用 onCompletion 处理异常
指定flow在给定的协程上运行: launchIn
前面的 collect 收集器有个缺点, collect 后面的代码会等待
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect()
println("Done")
}
比如上面你的代码最终会打印出下面的情况:
[main] Event: 1
[main] Event: 2
[main] Event: 3
[main] Done
println("Done") 被延迟了
明明不在流上的操作,却阻塞了 Done
如果不想要 collect 等待的话, 则可以使用 launchIn(coroutineScope)
让任务在单独的协程中执行
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main(): Unit = runBlocking {
events().onEach { event-> log("Event: $event") }
.launchIn(this)
log("Done")
}
[main] Done
[main] Event: 1
[main] Event: 2
[main] Event: 3
Done 最先输出了
咱们还可以把流给其他协程执行, 让他在后台执行
fun main(): Unit = runBlocking {
val job = events().onEach { event -> log("Event: $event") }
.launchIn(CoroutineScope(Dispatchers.Default))
log("Done")
job.join()
}
流取消检测
fun foo(): Flow<Int> = flow {
for (i in 1..5) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo().collect { value ->
if (value == 3) cancel()
println(value)
}
}
正常情况下, Flow 都会检测取消情况
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sS0vCDXO-1657634748571)(https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/28a89608252341f99208e7f95d1b4856~tplv-k3u1fbpfcp-watermark.image?)]
在上图剪头的位置(也就是 emit 的时候会检测),会不断的随着循环检测ensureActive
Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@cb0ed20
但是出于性能考虑,大多数情况下, flow 默认会关闭 ensureActive 检测
fun main() = runBlocking<Unit> {
(1..5).asFlow().collect { value ->
if (value == 3) cancel()
println(value)
}
}
1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@9a7504c
能看得出来,打印出了 4 5 最后才报的错
让繁忙的流可取消
有时候,出于某些目的,我们偏偏需要在该取消的时候取消掉我们的 flow,这时可以使用这种方式:
.onEach { currentCoroutineContext().ensureActive() }
fun main() = runBlocking<Unit> {
(1..5).asFlow()
.onEach { currentCoroutineContext().ensureActive() }
.collect { value ->
if (value == 3) cancel()
println(value)
}
}
不过我们还能看到这个方法:.cancellable()
fun main() = runBlocking<Unit> {
(1..5).asFlow()
.cancellable()
.collect { value ->
if (value == 3) cancel()
println(value)
}
}
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@59906517
|