摘抄自 Kotlin 语言官网文档,为的是列出相关操作符,方便自己后期查看 点我跳转 Kotlin 官网文档 Flow
Flow 是什么?
挂起函数可以异步的返回单个值,但是该如何异步返回多个计算好的值呢?这正是 Kotlin 流(Flow)的用武之地。
创建Flow
构建中,不允许 withContext 修改上下文
flow{}
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
flowOf(1,2,3)
.asFlow()
集合与序列转换为流
emit
发射值。
collect
收集值,构造器内的代码,直到被收集的时候才会运行。
simple().collect{ it-> println(it) }
打印:
1
2
3
取消
withTimeoutOrNull(250)
250毫秒后超时自动取消。
withTimeoutOrNull(250) {
simple().collect { value -> println(value) }
}
println("Done")
转换
map
转换成我们想要的数据。
fun main() = runBlocking<Unit> {
(1..3).asFlow()
.map { "number: $it" }
.collect { response -> println(response) }
}
打印:
number: 1
number: 2
number: 3
transform
可以发射任意值任意次。
fun main() = runBlocking<Unit> {
(1..3).asFlow()
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
}
限长操作符
take
在流触及相应限制的时候将它的执行取消
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
emit(4)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2)
.collect { value -> println(value) }
}
打印:
1
2
This line will not execute
3
Finally in numbers
末端操作符
toList、toSet
转换为需要的集合
first、single、fold
后续再试验
reduce
将流规约到单个值
fun main() = runBlocking<Unit> {
val sum = (1..5).asFlow()
.map { it * it }
.reduce { a, b -> a + b }
println(sum)
}
launchIn
使用 launchIn 替换 collect 我们可以在单独的协程中启动流的收集,这样就可以立即继续进一步执行代码。
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this)
println("Done")
}
打印:
Done
Event: 1
Event: 2
Event: 3
filter
过滤。
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
打印:
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
flowOn
用于更改流发射的上下文。
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100)
log("Emitting $i")
emit(i)
}
}.flowOn(Dispatchers.Default)
buffer
发射不需要等待执行的时间。
val time = measureTimeMillis {
simple()
.buffer()
.collect { value ->
delay(300)
println(value)
}
}
println("Collected in $time ms")
合并
conflate
只处理当前最新的那一个。
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.conflate()
.collect { value ->
delay(300)
println(value)
}
}
println("Collected in $time ms")
打印:
1
3
Collected in 758 ms
collectLatest
取消并重新发射最后一个值。
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.collectLatest { value ->
println("Collecting $value")
delay(300)
println("Done $value")
}
}
println("Collected in $time ms")
}
zip
组合流。
fun main() = runBlocking<Unit> {
val nums = (1..3).asFlow()
val strs = flowOf("one", "two", "three")
nums.zip(strs) { a, b -> "$a -> $b" }
.collect { println(it) }
}
打印:
1 -> one
2 -> two
3 -> three
fun main() = runBlocking<Unit> {
val nums = (1..3).asFlow().onEach { delay(300) }
val strs = flowOf("one", "two", "three").onEach { delay(400) }
val startTime = System.currentTimeMillis()
nums.zip(strs) { a, b -> "$a -> $b" }
.collect { value ->
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
打印:
1 -> one at 428 ms from start
2 -> two at 828 ms from start
3 -> three at 1230 ms from start
combine
fun main() = runBlocking<Unit> {
val nums = (1..3).asFlow().onEach { delay(300) }
val strs = flowOf("one", "two", "three").onEach { delay(400) }
val startTime = System.currentTimeMillis()
nums.combine(strs) { a, b -> "$a -> $b" }
.collect { value ->
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
打印:
1 -> one at 430 ms from start
2 -> one at 632 ms from start
2 -> two at 831 ms from start
3 -> two at 932 ms from start
3 -> three at 1232 ms from start
flatMapConcat
展平 Flow<Flow<String>>
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500)
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }
.flatMapConcat { requestFlow(it) }
.collect { value ->
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
打印:
1: First at 124 ms from start
1: Second at 625 ms from start
2: First at 725 ms from start
2: Second at 1226 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start
flatMapMerge
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500)
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }
.flatMapMerge { requestFlow(it) }
.collect { value ->
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
打印:
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
flatMapLatest
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500)
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }
.flatMapLatest { requestFlow(it) }
.collect { value ->
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
打印:
1: First at 145 ms from start
2: First at 294 ms from start
3: First at 395 ms from start
3: Second at 896 ms from start
流异常
try/catch
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
打印:
Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2
fun simple(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} catch (e: Throwable) {
println("Caught $e")
}
}
打印:
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
catch
可以使用 catch 代码块中的 emit 将异常转换为值发射出去。 仅能捕获上游异常,下游发生的异常会逃逸报错。
fun simple(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
simple()
.catch { e -> emit("Caught $e") }
.collect { value -> println(value) }
}
打印:
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
onCompletion
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
simple()
.onCompletion { println("Done") }
.collect { value -> println(value) }
}
打印:
1
2
3
Done
|