Kotlin 的协程上下文叫做 CoroutineContext,通常用来切换线程池。
CoroutineContext 的应用
launch
launch 的第一个参数 context 是 CoroutineContext,默认值是 EmptyCoroutineContext。
如果需要指定 launch 工作的线程池,就需要指定 CoroutineContext 参数。
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
withContext
withContext 用来切换线程执行代码。它的第一个参数是 CoroutineContext,指定线程池。
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
}
在 getUserInfoIo 中指定 withContext 的上下文是 Dispatchers.IO。
fun main() = runBlocking {
val user = getUserInfoIo()
logX(user)
}
suspend fun getUserInfoIo(): String {
logX("Before IO Context.")
withContext(Dispatchers.IO) {
logX("In IO Context.")
delay(1000)
}
logX("After IO Context.")
return "BoyCoder"
}
输出如下:
================================
Before IO Context.
Thread:main @coroutine#1, time:1656560405319
================================
================================
In IO Context.
Thread:DefaultDispatcher-worker-1 @coroutine#1, time:1656560405351
================================
================================
After IO Context.
Thread:main @coroutine#1, time:1656560406360
================================
================================
BoyCoder
Thread:main @coroutine#1, time:1656560406361
================================
可以看到在 Thread:DefaultDispatcher-worker-1 线程执行协程1。其他代码在 main 线程。
suspend main
main 函数有 suspend 关键字的版本,可以直接执行挂起函数。
suspend fun main() {
val user = getUserInfoIo()
logX(user)
}
它和 runBlocking 的区别在于 withContext 切换线程后,都执行在 DefaultDispatcher-worker-1 线程。
================================
Before IO Context.
Thread:main, time:1656569681374
================================
================================
In IO Context.
Thread:DefaultDispatcher-worker-1, time:1656569681440
================================
================================
After IO Context.
Thread:DefaultDispatcher-worker-1, time:1656569682449
================================
================================
BoyCoder
Thread:DefaultDispatcher-worker-1, time:1656569682449
================================
runBlocking
runBlocking 的第一个参数也是 CoroutineContext,默认为 EmptyCoroutineContext。
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
}
可以给 runBlocking 增加自定义的 CoroutineContext。
fun main() = runBlocking(Dispatchers.IO) {
val user = getUserInfoIo()
logX(user)
}
输出如下:
================================
Before IO Context.
Thread:DefaultDispatcher-worker-2 @coroutine#1, time:1656570311862
================================
================================
In IO Context.
Thread:DefaultDispatcher-worker-2 @coroutine#1, time:1656570311896
================================
================================
After IO Context.
Thread:DefaultDispatcher-worker-2 @coroutine#1, time:1656570312903
================================
================================
BoyCoder
Thread:DefaultDispatcher-worker-2 @coroutine#1, time:1656570312903
================================
可以看出增加 Dispatchers.IO 后,协程一直执行在 DefaultDispatcher-worker-2 线程。
Dispatchers 调度器
Kotlin 内置的 Dispatchers 有 4 种,他们本质上都继承 CoroutineContext。
- Default:默认调度器,用于 CPU 密集型任务,线程数和核心数一致。
- Main:主线程调度器,只在 Android、Swing 等 UI 平台有用,普通 JVM 工程无法使用。
- UnConfined:无限制调度器,协程可能运行在任意线程上。
- IO:IO 调度器,用于 IO 密集型任务,线程数会多一些,比如 64 个线程
public actual object Dispatchers {
@JvmStatic
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
@JvmStatic
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
}
Dispatchers.IO
Dispatchers.IO 可能会复用 Dispatchers.Default 的线程。从上面例子可以看出,虽然设置的 Dispatchers.IO,实际是 DefaultDispatcher-worker 线程。
将 runBlocking 的 CoroutineContext 改为 Dispatchers.Default
fun main() = runBlocking(Dispatchers.Default) {
val user = getUserInfoIo()
logX(user)
}
输出结果如下:
================================
Before IO Context.
Thread:DefaultDispatcher-worker-1 @coroutine#1, time:1656575072324
================================
================================
In IO Context.
Thread:DefaultDispatcher-worker-1 @coroutine#1, time:1656575072358
================================
================================
After IO Context.
Thread:DefaultDispatcher-worker-1 @coroutine#1, time:1656575073366
================================
================================
BoyCoder
Thread:DefaultDispatcher-worker-1 @coroutine#1, time:1656575073367
================================
可以看到 withContext 切换到 IO 后,也用了 DefaultDispatcher-worker-1。这是因为 Dispatchers.Default 被 Dispatchers.IO 复用线程。
自定义 Dispatchers
使用自定义的 executor 然后转换为 Dispatcher 作为 CoroutineContext。
val mySingleDispatcher = Executors.newSingleThreadExecutor {
Thread(it, "mySingleThread").apply {
isDaemon = true
}
}
.asCoroutineDispatcher()
fun main() = runBlocking(mySingleDispatcher) {
val user = getUserInfoIo()
logX(user)
}
输出如下:
================================
Before IO Context.
Thread:mySingleThread @coroutine#1, time:1656575809476
================================
================================
In IO Context.
Thread:DefaultDispatcher-worker-1 @coroutine#1, time:1656575809510
================================
================================
After IO Context.
Thread:mySingleThread @coroutine#1, time:1656575810521
================================
================================
BoyCoder
Thread:mySingleThread @coroutine#1, time:1656575810521
================================
只有 In IO Context 运行在 DefaultDispatcher-worker-1,其他代码都运行在自定义的 dispatcher。
Dispatchers.Unconfined
如果 launch 使用默认 context,执行顺序为 1、4、2、3。
fun main() = runBlocking {
logX("Before launch")
launch {
logX("In launch")
delay(1000)
logX("End launch")
}
logX("After launch")
}
输出如下
================================
Before launch
Thread:main @coroutine#1, time:1656576229645
================================
================================
After launch
Thread:main @coroutine#1, time:1656576229677
================================
================================
In launch
Thread:main @coroutine#2, time:1656576229679
================================
================================
End launch
Thread:main @coroutine#2, time:1656576230686
================================
如果 launch 使用 Unconfined,执行顺序是不定的。
fun main() = runBlocking {
logX("Before launch")
launch(Dispatchers.Unconfined) {
logX("In launch")
delay(1000)
logX("End launch")
}
logX("After launch")
}
输出如下
================================
Before launch
Thread:main @coroutine#1, time:1656576759031
================================
================================
In launch
Thread:main @coroutine#2, time:1656576759055
================================
================================
After launch
Thread:main @coroutine#1, time:1656576759060
================================
================================
End launch
Thread:kotlinx.coroutines.DefaultExecutor @coroutine#2, time:1656576760059
================================
执行顺序变为 1、2、4、3。而且标记 3 执行在 DefaultExecutor。
Dispatchers.Unconfined 可能执行在任何线程,不应随意使用 Dispatchers.Unconfined。
CoroutineScope 协程作用域
使用 launch 时,必须有 CoroutineScope 协程作用域,launch 是 CoroutineScope 的扩展函数。
CoroutineScope 的实现很简单,它是接口类,只有 coroutineContext 成员。
public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}
指定自定义的 CoroutineScope,使用 scope 启动 3 个协程,实现结构化并发。
fun main() = runBlocking {
val scope = CoroutineScope(Job())
scope.launch {
logX("first start")
delay(1000)
logX("first end")
}
scope.launch {
logX("second start")
delay(1000)
logX("second end")
}
scope.launch {
logX("third start")
delay(1000)
logX("third end")
}
delay(500)
scope.cancel()
delay(1000)
}
输出如下
================================
first start
Thread:DefaultDispatcher-worker-2 @coroutine#2, time:1656577491658
================================
================================
third start
Thread:DefaultDispatcher-worker-3 @coroutine#4, time:1656577491663
================================
================================
second start
Thread:DefaultDispatcher-worker-1 @coroutine#3, time:1656577491660
================================
所有 scope 启动的协程都被取消,没有继续执行。
Job 和 Dispatcher
Job
Job 继承了 CoroutineContext.Element。
public interface Job : CoroutineContext.Element {
}
Element 继承 CoroutineContext
@kotlin.SinceKotlin public interface CoroutineContext {
public interface Element : kotlin.coroutines.CoroutineContext {
}
}
因此 Job 本身就是 CoroutineContext。
Dispatchers
Dispatchers 是单例类,内部的有 4 个预置 Dispatchers。
public actual object Dispatchers {
}
以 Default 为例,它是 CoroutineDispatcher 类,继承关系如下: CoroutineDispatcher -> ContinuationInterceptor -> CoroutineContext.Element -> CoroutineContext
因此 Dispatchers 也是 CoroutineContext。
CoroutineContext 的设计
CoroutineContext 的设计方式和 Map 的设计方式很类似。
public interface CoroutineContext {
public operator fun <E : Element> get(key: Key<E>): E?
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
public operator fun plus(context: CoroutineContext): CoroutineContext =
public fun minusKey(key: Key<*>): CoroutineContext
- get 相当于 map 的 get
- fold 相当于 map 的 foreach
- plus 相当于 map 的 put
- minusKey 相当于 map 的 remove
利用 CoroutineContext 的接口,可以写出 + 、[] 这种类似集合的操作。因为 CoroutineContext 重载了 plus 操作符,可以用 + 代替 plus;重载 get 操作符,可以用 [] 代替 get。
@OptIn(ExperimentalStdlibApi::class)
fun main() = runBlocking {
val scope = CoroutineScope(Job() + mySingleDispatcher)
scope.launch {
logX(coroutineContext[CoroutineDispatcher] == mySingleDispatcher)
delay(500)
logX("first end")
}
delay(500)
scope.cancel()
delay(1000)
}
输出如下
================================
true
Thread:mySingleThread @coroutine#2, time:1656584502696
================================
可以看出 CoroutineContext 内部的 dispatchers 就是 mySingleDispatcher。
CoroutineName 协程名称
CoroutineName 可以指定协程名称,它本质也是 CoroutineContext。
@OptIn(ExperimentalStdlibApi::class)
fun main() = runBlocking {
val scope = CoroutineScope(Job() + mySingleDispatcher)
scope.launch(CoroutineName("MyFirstCoroutine")) {
logX(coroutineContext[CoroutineDispatcher] == mySingleDispatcher)
delay(500)
logX("first end")
}
delay(500)
scope.cancel()
delay(1000)
}
输出如下
================================
true
Thread:mySingleThread @MyFirstCoroutine#2, time:1656584886943
================================
协程名称变为 CoroutineName 定义的 MyFirstCoroutine,#2 是协程 id。
CoroutineExceptionHandler 协程异常处理
CoroutineExceptionHandler 用来捕获协程中的异常,它也是 CoroutineContext。
@OptIn(ExperimentalStdlibApi::class)
suspend fun main() {
val scope = CoroutineScope(Job() + mySingleDispatcher)
val handler = CoroutineExceptionHandler { _, throwable ->
println("catch exception $throwable")
}
val job = scope.launch(handler) {
logX(coroutineContext[CoroutineDispatcher] == mySingleDispatcher)
val str: String? = null
str!!.length
logX("first end")
}
job.join()
}
输出如下
================================
true
Thread:mySingleThread @coroutine#1, time:1656585762125
================================
catch exception java.lang.NullPointerException
故意抛出空指针异常,然后给 CoroutineExceptionHandler 捕获。
挂起函数和 CoroutineContext
suspend 挂起函数能直接访问 coroutineContext。
suspend fun testCoroutineContext() = coroutineContext
fun main() = runBlocking {
print(testCoroutineContext())
}
输出如下
[CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@63e2203c, BlockingEventLoop@1efed156]
挂起函数需要在协程或者另外一个挂起函数中运行,因此它也能访问协程的 CoroutineContext。
coroutineContext 是 Continuation.kt 的内部成员。
public suspend inline val coroutineContext: CoroutineContext
get() {
throw NotImplementedError("Implemented as intrinsic")
}
总结
- CoroutineContext 协程上下文和 map 很类似,和 map 一样,它可以添加或者获取 Element。
- Job、Dispatchers、CoroutineName、CoroutineExceptionHandler 本质都是 CoroutineContext。
- CoroutineScope 封装了 CoroutineContext,CoroutineContext 是 CoroutineScope 的成员。
- suspend 挂起函数也和 CoroutineContext 有关。挂起函数要在协程中运行,协程有它的 CoroutineContext,因此挂起函数也能访问 CoroutineContext。
|