前言:只有在那崎岖的小路上不畏艰险奋勇攀登的人,才有希望达到光辉的顶点。 ——马克思
前言
经过前面两篇协程的学习,我相信大家对协程的使用已经非常熟悉了。本着知其然更要知其之所以然的心态,很想知道它里面是怎么可以让异步代码同步化的?协程它是如何实现线程的调度的?协程的挂起和恢复本质是什么?今天在这里一一为大家解答。
整个Kotlin 协程学习分为三部曲,本文是第三篇:
Kotlin 协程实战进阶(一、筑基篇)
Kotlin 协程实战进阶(二、进阶篇)
Kotlin 协程实战进阶(三、原理篇)
(本文需要前面两篇文章协程的知识点作为基础)
本文大纲
协程最核心的就是挂起与恢复,但是这两个名称在一定程度上面迷惑了我们,因为这两个名词并不能够让我们在源码上面和它的实现原理有清晰的认知。
协程的挂起本质上是方法的挂起,而方法的挂起本质上是 return ,协程的恢复本质上方法的恢复,而恢复的本质是 callback 回调。
但是我们在Kotlin协程源码里面看不到 return 和 callback 回调的,其实这些都是kotlin编译器帮我们做了,单单看kotlin的源码是看不出所以然的,需要反编译成Java文件,才能看到本质之处。
通过AS的工具栏中 Tools ->kotlin ->show kotlin ByteCode ,得到的是java字节码,需要再点击Decompile 按钮反编译成java源码:
一、协程主要结构
1.suspend fun
再来复习一下挂起函数:
suspend 是 Kotlin 协程最核心的关键字;- 使用
suspend 关键字修饰的函数叫作挂起函数 ,挂起函数 只能在协程体内或者在其他挂起函数 内调用; - 被关键字
suspend 修饰的方法在编译阶段,编译器会修改方法的签名,包括返回值,修饰符,入参,方法体实现。
@GET("users/{login}")
suspend fun getUserSuspend(@Path("login") login: String): User
将上面的挂起函数反编译:
@GET("users/{login}")
@Nullable
Object getUserSuspend(@Path("login") @NotNull String var1, @NotNull Continuation var2);
- 反编译后你会发现多了一个
Continuation 参数(它就是 callback ),也就是说调用挂起函数的时候需要传递一个Continuation ,只是传递这个参数是由编译器悄悄传,而不是我们传递的。这就是挂起函数为什么只能在协程或者其他挂起函数中执行,因为只有挂起函数或者协程中才有Continuation 。 - 但是编译器怎么判断哪些方法需要
callback 呢?就是通过 suspend 关键字来区分的。suspend 修饰的方法会在编译期间被Kotlin编译器做特殊处理,编译器会认为一旦一个方法增加 suspend 关键字,有可能会导致协程暂停往下执行,所以此时会给方法传递要给 Continuation 。等方法执行完成后,通过 Continuation 回调回去,从而让协程恢复,继续往下执行。 - 它还把返回值
User 改成了 Object 。
2.Continuation
Continuation 是 Kotlin 协程中非常重要的一个概念,它表示一个挂起点之后的延续操作 。
public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
}
fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
resumeWith(Result.failure(exception))
Continuation 有一个 resumeWith 函数可以接收 Result 类型的参数。在结果成功获取时,调用resumeWith(Result.success(value)) 或者调用拓展函数resume(value) ;出现异常时,调用resumeWith(Result.failure(exception)) 或者调用拓展函数resumeWithException(exception) ,这就是 Continuation 的恢复调用。
Continuation 类似于网络请求回调Callback ,也是一个请求成功或失败的回调:
public interface Callback {
void onFailure(Call call, IOException e);
void onResponse(Call call, Response response) throws IOException;
}
3.SuspendLambda
suspend{} 其实就是协程的本体,它是协程真正执行的逻辑,会创建一个SuspendLambda 类,它是Continuation 的实现类。
二、协程的创建流程
1.协程的创建
标准库给我们提供的创建协程最原始的api:
public fun <T> (suspend () -> T).createCoroutine(
completion: Continuation<T>
): Continuation<Unit>
public fun <R, T> (suspend R.() -> T).createCoroutine(
receiver: R,
completion: Continuation<T>
): Continuation<Unit>
协程创建后会有两个 Contiunation ,需要分清楚:
completion :?????表示协程本体,协程执行完需要一个 Continuation 实例在恢复时调用;Contiunation<Unit> : 它是创建出来的协程的载体,(suspend () -> T) 函数会被传给该实例作为协程的实际执行体。
这两个Contiunation 是不同的东西。传进来的 completion 实际上是协程的本体,协程执行完需要一个 Contiunation 回调执行的,所以它叫 completion ;还有一个返回的 Contiunation ,它就是协程创建出来的载体,当它里面所有的resume都执行完成之后就会调用上面的 completion 的resumeWith() 方法恢复协程。
2.协程的作用域
在Androidx的Activity中模拟创建网络请求,通过这个例子来深挖协程的原理:
lifecycleScope.launch(Dispatchers.IO) {
val result = requestUserInfo()
tvName.text = result
}
suspend fun requestUserInfo(): String {
delay(2000)
return "result form userInfo"
}
跟进lifecycleScope 源码:
val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
get() = lifecycle.coroutineScope
val Lifecycle.coroutineScope: LifecycleCoroutineScope
get() {
while (true) {
······
val newScope = LifecycleCoroutineScopeImpl(
this,
SupervisorJob() + Dispatchers.Main.immediate
)
newScope.register()
return newScope
}
}
作用域也是其实就是为协程定义的作用范围,为了确保所有的协程都会被追踪,Kotlin 不允许在没有使用CoroutineScope 的情况下启动新的协程。 lifecycleScope通过lifecycle ,SupervisorJob() ,Dispatchers.Main 创建一个LifecycleCoroutineScopeImpl ,它是一个关联宿主生命周期的作用域。CoroutineScope 绑定到这个LifecycleOwner 的Lifecycle 。当宿主被销毁时,这个作用域也被取消。
3.协程的启动
进入launch() :
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
参数 block: suspend CoroutineSope.() -> Unit 表示协程代码,实际上就是闭包代码块。 这里面做了三件事:
- 根据context参数创建一个新的协程上下文
CoroutineContext ; - 创建
Coroutine ,如果启动模式为Lazy 则创建LazyStandaloneCoroutine ,否则创建StandaloneCoroutine ; coroutine.start() 启动协程。
newCoroutineContext
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
为新协程创建上下文。通过 + 将作用域的上下文 coroutineContext 与传入的上下文 context 合并为新的上下文。它在没有指定其他调度器或 ContinuationInterceptor 时则默认使用 Dispatchers.Default 。
StandaloneCoroutine
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}
public abstract class AbstractCoroutine<in T>(
protected val parentContext: CoroutineContext,
active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
······
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
}
如果不指定启动模式,则默认使用 CoroutineStart.DEFAULT ,创建一个独立协程 StandaloneCoroutine ,而 StandaloneCoroutine 继承了 AbstractCoroutine 类,并重写了父类的 handleJobException() 方法。AbstractCoroutine 用于在协程构建器中实现协程的抽象基类, 实现了 Continuation 、 Job 和 CoroutineScope 等接口。所以 AbstractCoroutine 本身也是一个 Continuation 。
coroutine.start()
start(block, receiver, this)
从上面的源码中,协程启动 coroutine.start() 方法是 AbstractCoroutine 类中实现的,这里涉及到运算符重载,而后该方法实际上会调用 CoroutineStart#invoke() 方法 ,并把代码块和接收者、completion 等参数传到 CoroutineStart 中。
public enum class CoroutineStart {
fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit
}
}
使用此协程策略将相应的块 [block] 作为协程启动。这里的 [block] 就是协程里面执行的代码块。
- block: ???协程里面执行的代码块;
- receiver: ??接收者;
- completion: ?协程的本体,协程执行完需要一个
Continuation 实例在恢复时调用。
在上面 AbstractCoroutine 我们看到 completion 传递的是 this ,也就是 AbstractCoroutine 自己,也就是 Coroutine 协程本身。所以这个 completion 就是协程本体。 (这就是Continuation 三层包装的第一层包装)
接着进入 startCoroutineCancellable() ,可以以可取消的方式启动协程,以便在等待调度时取消协程:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion)
.intercepted()
.resumeCancellableWith(Result.success(Unit))
}
这里主要做了三件事:
- 创建一个新的
Continuation 。 - 给
Continuation 加上 ContinuationInterceptor 拦截器,也是线程调度的关键。 resumeCancellableWith 最终调用 continuation.resumeWith(result) 执行协程。
4.创建Continuation<Unit>
createCoroutineUnintercepted() 每次调用此函数时,都会创建一个新的可暂停计算实例。 通过返回的 Continuation<Unit> 实例上调用 resumeWith(Unit) 开始执行创建的协程。
#IntrinsicsJvm.kt
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}
重点:创建并返回一个未拦截的Continuation,它就是协程的载体。(suspend () -> T) 函数会被传给该实例作为协程的实际执行体。这个 Continuation 封装了协程的代码运行逻辑和恢复接口,下面会讲到。
因为this就是(suspend () -> T) ,SuspendLambda 又是 BaseContinuationImpl 的实现类,则执行 create() 方法创建协程载体:
abstract class BaseContinuationImpl {
public open fun create(completion: Continuation<*>): Continuation<Unit> {
throw UnsupportedOperationException("create(Continuation) has not been overridden")
}
public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
}
}
create() 是 BaseContinuationImpl 类中的一个公开方法。那么是谁实现了这个方法呢? 看看 SuspendLambda 与 BaseContinuationImpl 与 Continuation 之间的关系讲解。
5.SuspendLambda及其父类
上面提到 suspend{} 就是 (suspend R.() -> T) ,它是协程真正需要执行的逻辑,传入的lambda表达式被编译成了继承 SuspendLambda 的子类,SuspendLambda 是 Continuation 的实现类。
internal abstract class SuspendLambda(
public override val arity: Int,
completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
constructor(arity: Int) : this(arity, null)
public override fun toString(): String =
if (completion == null)
Reflection.renderLambdaToString(this)
else
super.toString()
}
而SuspendLambda 继承自 ContinuationImpl :
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
) : BaseContinuationImpl(completion) {
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
protected override fun releaseIntercepted() { ··· }
}
ContinuationImpl 又继承自BaseContinuationImpl ,SuspendLambda 的 resume() 方法的具体实现为 BaseContinuationImpl 的 resumeWith() 方法:
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted()
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
completion.resumeWith(outcome)
return
}
}
}
}
}
这里主要做了四件事:
- 调用
invokeSuspend 方法,执行协程的真正运算逻辑,并返回一个结果; - 如果
outcome 是 COROUTINE_SUSPENDED ,代码块里面执行了挂起方法,则继续挂起; - 如果
completion 是 BaseContinuationImpl ,内部还有suspend方法,则会进入循环递归,继续执行挂起; - 如果
completion 不是 BaseContinuationImpl ,则实际调用父类 AbstractCoroutine 的 resumeWith 方法。
接下来再来看 AbstractCoroutine 的 resumeWith 实现:
public abstract class AbstractCoroutine<in T>(
protected val parentContext: CoroutineContext,
active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
}
其中一类 completion 是 BaseContinuationImpl ,每个实例就代表一个suspend方法状态机。resumeWith() 封装了协程的运算逻辑,用以协程的启动和恢复;而另一类 completion 是 AbstractCoroutine ,主要是负责维护协程的状态和管理,它的resumeWith 则是完成协程,恢复调用者协程。
其继承关系为: SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation 。
因此 create() 方法创建的 Continuation 是一个 SuspendLambda 对象。 回到上面的 createCoroutineUnintercepted() 方法:
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}
其实这段代码是在JVM平台中找到的,在 IntrinsicsJvm.kt 类中,但是在 Android 源码中是这样子的:
fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<kotlin.Unit> { }
compiled code 就已经提示里面的代码是编译后的代码。
前面也提到了,在协程源码里面是看不到完整的协程原理的,有一部分代码是kotlin编译器处理的,所以在研究协程的运行流程时,单单看kotlin的源码是看不出本质的,需要反编译成Java文件,看看反编译之后的代码被修改成什么样子了。
6.Function 的创建
将协程模拟网络请求的代码反编译:
fun getData() {
lifecycleScope.launch {
val result = requestUserInfo()
tvName.text = result
}
}
反编译后的代码如下(代码有删减),你会发现发生了巨大的变化,而这些工作都是kotlin编译器帮我们完成的:
public final void getData() {
BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var10000 = requestUserInfo(this);
String result = (String)var10000;
CoroutinesActivity.this.getTvName().setText((CharSequence)result);
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkParameterIsNotNull(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 3, (Object)null);
}
lifecycleScope.launch {} 在反编译后增加了 CoroutineScope ,CoroutineContext ,CoroutineStart ,Function2 ,3,Object 等参数,这些都是kotlin编译器帮我们做了。这里就是最顶层的completion处理协程挂起与恢复的地方。 这里一旦恢复了,那么说明整个协程恢复了。
这里创建了一个Function2 ,里面有三个重要的方法:
- invokeSuspend(result):??里面执行
luanch{} 里面的代码,所以它执行了协程真正的运行逻辑; - create(value, completion):通过传递的
completion 参数创建一个 Function2 并返回,实际是一个Continuation ; - invoke(var1, var2):????复写了
Funtion.invoke() 方法,通过传递过来的参数链式调用create().invokeSuspend() 。
-
从上面知道 (suspend R.() -> T) 是 BaseContinuationImpl 的实现类,所以会走 onCreate() 方法创建 Continuation , 通过 completion 参数创建一个新的 Function2 ,作为Continuation 返回,这就是创建出来的协程载体;(这就是Continuation 三层包装的第二层包装) -
然后调用 resumeWith() 启动协程,那么就会执行BaseContinuationImpl 的 resumeWith() 方法,此时就会执行 invokeSuspend() 方法,执行协程真正的运行逻辑。
协程的创建流程如下:
三、 协程的挂起与恢复
协程工作的核心就是它内部的状态机,invokeSuspend() 函数。 requestUserInfo() 方法是一个挂起函数,这里通过反编译它来阐述协程状态机的原理,逆向剖析协程的挂起和恢复。
1.方法的挂起
suspend fun requestUserInfo(): String {
delay(2000)
return "result form userInfo"
}
反编译后的代码如下(代码有删减),同样发现发生了巨大的变化,而这些工作都是kotlin编译器帮我们完成的:
public final Object requestUserInfo(@NotNull Continuation completion) {
Object continuation = new ContinuationImpl(completion) {
Object result;
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return requestUserInfo(this);
}
};
Object $result = (continuation).result;
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(continuation.label) {
case 0:
ResultKt.throwOnFailure($result);
continuation.label = 1;
Object delay = DelayKt.delay(2000L, continuation)
if (delay == var4) {
return var4;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return "result form userInfo";
}
上面主要步骤为:
- 函数返回值由
String 变成 Object ,函数没有入参的编译后也增加了 Continuation 参数。原本需要我们做的 callback ,现在编译器帮我们完成了。 - 根据
completion 创建了一个 ContinuationImpl ,复写了 invokeSuspend() 方法,在这个方法里面它又调用了 requestUserInfo() 方法,这里又调用了一次自己(是不是很神奇),并且把 continuation 传递进去。 - 在 switch 语句中,
label 的默认初始值为 0,第一次会进入 case 0 分支,delay() 是一个挂起函数,传入上面的 continuation 参数,会有一个 Object 类型的返回值。这个结果要么是COROUTINE_SUSPENDED ,否则就是真实结果。 DelayKt.delay(2000, continuation) 的返回结果如果是 COROUTINE_SUSPENDED , 则直接 return ,那么方法执行就被结束了,方法就被挂起了。
这就是挂起的真正原理。所以函数即便被 suspend 修饰了,但是也未必会挂起。需要里面的代码编译后有返回值为 COROUTINE_SUSPENDED 这样的标记位才可以,所以程序执行到 case 0 的时候就 return 了。那就意味着方法被暂停了,那么协程也被暂停了。所以说协成的挂起实际上是方法的挂起,方法的挂起本质是 return。
2.COROUTINE_SUSPENDED
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
public val COROUTINE_SUSPENDED: Any get() = CoroutineSingletons.COROUTINE_SUSPENDED
internal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED }
在 var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED() 中, COROUTINE_SUSPENDED 就是一个枚举常量,表示协程已经挂起,并且不会立即返回任何结果。那么 DelayKt.delay() 返回值是 COROUTINE_SUSPENDED 就被 return 了。
跟进 DelayKT 看看 COROUTINE_SUSPENDED 是如何被获取的: 找到 DelayKT 类(注意:不是Delay.kt 哈,别搞错了),Decomplie to java 反编译成java源码:
public final class DelayKt {
@Nullable
public static final Object delay(long timeMillis, @NotNull Continuation $completion) {
if (timeMillis <= 0L) {
return Unit.INSTANCE;
} else {
getDelay(cont.getContext()).scheduleResumeAfterDelay(timeMillis, cont);
Object var10000 = cancellable$iv.getResult();
return var10000;
}
}
}
可以看到 DelayKt.delay() 增加了 Object 返回值,并且追加了一个 completion 参数,这个返回值是 var10000 ,它是在 cancellable$iv.getResult() 得到的:
@PublishedApi
internal fun getResult(): Any? {
setupCancellation()
if (trySuspend()) return COROUTINE_SUSPENDED
return getSuccessfulResult(state)
}
trySuspend() 尝试把方法挂起,如果返回 true 则返回 COROUTINE_SUSPENDED :
private val _decision = atomic(UNDECIDED)
private fun trySuspend(): Boolean {
_decision.loop { decision ->
when (decision) {
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
RESUMED -> return false
else -> error("Already suspended")
}
}
}
trySuspend() 里面循环遍历了 _decision 的值, _decision 初始值为 UNDECIDED ,那么第一次会进入UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true 分支,这里就返回true了,并且把当前状态更改为 SUSPENDED ,代表着它已经被挂起了。
private fun tryResume(): Boolean {
_decision.loop { decision ->
when (decision) {
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
SUSPENDED -> return false
else -> error("Already resumed")
}
}
}
这个方法的状态会在它恢复的时候调用 tryResume() 把状态更改为 RESUMED 。这就是决策状态机:
那么 trySuspend() 返回 true 则 getResult() 返回了 COROUTINE_SUSPENDED 枚举常量,那么 DelayKt.delay() 就返回了 COROUTINE_SUSPENDED ,所以下面的判断条件就会满足,会直接return。delay() 方法是一个真真正正的挂起函数,能够导致协程被暂停。
所以 requestUserInfo() 方法在 delay(2000) 被暂停了,在协程中调用,那么协程也就暂停了,后面的结果 result form userInfo 也没有被返回。所以这就是被 suspend 修饰的函数不一定能导致协程被挂起,还需要里面的实现经过编译之后有返回值并且为 COROUTINE_SUSPENDED 才可以。
3.方法的恢复
继续回到 requestUserInfo() 分析恢复的原理:
@Nullable
public final Object requestUserInfo(@NotNull Continuation completion) {
Object continuation = new ContinuationImpl(completion) {
Object result;
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return requestUserInfo(this);
}
};
Object $result = (continuation).result;
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(continuation.label) {
case 0:
ResultKt.throwOnFailure($result);
continuation.label = 1;
Object delay = DelayKt.delay(2000L, continuation)
if (delay == var4) {
return var4;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return "result form userInfo";
}
- 因为
delay() 是 io 操作,在2000毫米后就会通过传递给它的 continuation 回调回来。 - 回调到
ContinuationImpl 这个类里面的 resumeWith() 方法,会再次调用 invokeSuspend() 方法,进而再次调用 requestUserInfo() 方法。 - 它又会进入switch语句,由于第一次在
case 0 时把 label = 1 赋值为1,所以这次会进入 case 1 分支,并且返回了结果result form userInfo 。 - 并且
requestUserInfo() 的返回值作为 invokeSuspend() 的返回值返回。重新被执行的时候就代表着方法被恢复了。
那么 invokeSuspend() 方法是怎么被触发回调的呢?它拿到返回值有什么用呢?
上面提到 ContinuationImpl 继承自 BaseContinuationImpl ,而它又实现了 continuation 接口并且复写了 resumeWith() 方法,里面就调用了 val outcome = invokeSuspend(param) 方法。(源码有删减)
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome == COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
completion.resumeWith(outcome)
return
}
}
}
}
实际上任何一个挂起函数它在恢复的时候都会调到 BaseContinuationImpl 的 resumeWith() 方法里面。
-
一但 invokeSuspend() 方法被执行,那么 requestUserInfo() 又会再次被调用, invokeSuspend() 就会拿到 requestUserInfo() 的返回值,在 ContinuationImpl 里面根据 val outcome = invokeSuspend() 的返回值来判断我们的 requestUserInfo() 方法恢复了之后的操作。 -
如果 outcome 是 COROUTINE_SUSPENDED 常量,说明你即使被恢复了,执行了一下, if (outcome == COROUTINE_SUSPENDED) return 但是立马又被挂起了,所以又 return 了。 -
如果本次恢复 outcome 是一个正常的结果,就会走到 completion.resumeWith(outcome) ,当前被挂起的方法已经被执行完了,实际调用的是其父类 AbstractCoroutine 的 resumeWith 方法,那么协程就恢复了。
我们知道 requestUserInfo() 肯定是会被协程调用的(从上面反编译代码知道会传递一个Continuation completion 参数),requestUserInfo() 方法恢复完了就会让协程completion.resumeWith() 去恢复,所以说协程的恢复本质上是方法的恢复。
这是在android studio当中通过反编译kotlin源码来分析协程挂起与恢复的流程。流程图如下:
4.在协程中运行的挂起与恢复
那么 requestUserInfo() 方法在协程里面执行的整个挂起和恢复流程是怎么样的呢?
fun getData() {
lifecycleScope.launch {
val result = requestUserInfo()
tvName.text = result
}
}
suspend fun requestUserInfo(): String {
delay(2000)
return "result form userInfo"
}
反编译代码:
public final void getData() {
BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
Object var10000;
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
this.label = 1;
var10000 = requestUserInfo(this);
if (var10000 == var3) {
return var3;
}
break;
case 1:
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
}
String result = (String)var10000;
CoroutinesActivity.this.getTvName().setText((CharSequence)result);
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkParameterIsNotNull(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 3, (Object)null);
}
@Nullable
public final Object requestUserInfo(@NotNull Continuation completion) {
Object continuation = new ContinuationImpl(completion) {
Object result;
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return requestUserInfo(this);
}
};
Object $result = (continuation).result;
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(label) {
case 0:
ResultKt.throwOnFailure($result);
continuation.label = 1;
Object delay = DelayKt.delay(2000L, continuation)
if (delay == var4) {
return var4;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return "result form userInfo";
}
- 可以看到协程里面反编译后的代码和
requestUserInfo() 方法反编译后的代码类似,Function2 里面也复写了invokeSuspend() 方法,状态机也类似, - 在
case 0 处判断 requestUserInfo() 返回值是否为COROUTINE_SUSPENDED , 如果是则挂起协程。我们在上面分析知道, requestUserInfo() 第一次返回的值是COROUTINE_SUSPENDED ,所以 requestUserInfo() 被挂起了,协程也被挂起了。所以说协程的挂起实际上是方法的挂起。 - 协程恢复的原理也和
requestUserInfo() 恢复的原理大致相同。在调用 requestUserInfo(this) 的时候把 Continuation 传递了进去。 - 那么
requestUserInfo() 函数2000毫秒后在恢复时将结果通过 invokeSuspend() 回调给上一层 completion 的 resumeWith() 里面,那么协程的 invokeSuspend(result) 就是被回调。 - 通过状态机流转执行之前挂起逻辑之后的代码。此时
lable = 1 进入 case 1 赋值给 var10000 ,然后执行剩下的代码。 所以 requestUserInfo() 方法恢复后,调用它的协程也跟着恢复了,所以说协程的恢复本质上是方法的恢复。
四、协程的调度
1.协程拦截
协程的线程调度是通过拦截器实现的,回到前面的 startCoroutineCancellable :
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R,
completion: Continuation<T>) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion)
.intercepted()
.resumeCancellableWith(Result.success(Unit))
}
看看 intercepted() 的具体实现:
//使用[ContinuationInterceptor]拦截Continuation
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
拦截器在每次(恢复)执行协程体的时候都会拦截协程本体SuspendLambda 。interceptContinuation() 方法中拦截了一个Continuation<T> 并且再返回一个Continuation<T> ,拦截到Continuation 后就可以做一些事情,比如线程切换等。
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
}
而 interceptContinuation() 方法的实现是在 CoroutineDispatcher 中,它是所有协程调度程序实现扩展的基类:
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
}
注意:[block] 是一个 Runnable 类型。
如果传递了协程调度器,那么协程中的闭包代码块就决定了所运行的线程环境,CoroutineDispatcher 有三个重要的方法:
- isDispatchNeeded():协程的启动需不需要分发到别的线程上面去。
- dispatch(): ?????将可运行块的执行分派到给定上下文中的另一个线程上,由子类去实现具体的调度。
- interceptContinuation:拦截协程本体,包装成一个
DispatchedContinuation 。
拦截协程本体,包装成一个 DispatchedContinuation ,它在执行任务的时候会通过 needDispatch() 来判断本次协程启动需不需要分发到别的线程上面,如果返回了true,那么就会调用子类的 dispatch(runnable) 方法,来完成协程的本次启动工作,如果返回false,就会由 CoroutineDispatcher 在当前线程立刻执行。
2.协程分发
在截获的 Continuation 上调用resume(Unit) 保证协程和完成的执行都发生在由 ContinuationInterceptor 建立的调用上下文中。而拦截后的 continuation 被 DispatchedContinuation 包装了一层:(这就是Continuation 三层包装中的第三层包装)
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
override val delegate: Continuation<T>
get() = this
inline fun resumeCancellable(value: T) {
if (dispatcher.isDispatchNeeded(context)) {
dispatcher.dispatch(context, this)
} else {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
}
}
override fun resumeWith(result: Result<T>) {
if (dispatcher.isDispatchNeeded(context)) {
dispatcher.dispatch(context, this)
} else {
continuation.resumeWith(result)
}
}
}
DispatchedContinuation 拦截了协程的启动和恢复,分别是resumeCancellable(Unit) 和重写的resumeWith(Result) 。
当需要分发时,就调用 dispatcher 的 dispatch(context, this) 方法,this是一个 DispatchedTask :
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
public final override fun run() {
withContinuationContext(continuation, delegate.countOrElement) {
continuation.resume(getSuccessfulResult(state))
}
}
}
DispatchedTask 实际上是一个Runnable 。
- 当需要线程调度时,则在调度后会调用
DispatchedContinuation.continuation.resumeWith() 来启动协程,其中 continuation 是 SuspendLambda 实例; - 当不需要线程调度时,则直接调用
continuation.resumeWith() 来直接启动协程。
也就是说对创建的 Continuation 的 resumeWith() 增加拦截操作,拦截协程的运行操作:
分别分析一下四种调度模式的具体实现:
3.Dispatchers.Unconfined
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
internal object Unconfined : CoroutineDispatcher() {
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
override fun dispatch(context: CoroutineContext, block: Runnable) {
}
}
Dispatchers.Unconfined :对应的是Unconfined ,它里面的isDispatchNeeded() 返回的是false,不限于任何特定线程的协程调度程序。那么它的父类ContinuationInterceptor 就不会把本次任务的调度交给子类来执行,而是由父类在当前线程立刻执行。
4.Dispatchers.Main
Dispatchers.Main 继承自 MainCoroutineDispatcher 通过 MainDispatcherLoader.dispatcher 实现调度器:
public actual object Dispatchers {
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
}
MainDispatcherLoader 通过工厂模式创建 MainCoroutineDispatcher :
internal object MainDispatcherLoader {
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
private fun loadMainDispatcher(): MainCoroutineDispatcher {
val factories = FastServiceLoader.loadMainDispatcherFactory()
return factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories)
}
}
public fun tryCreateDispatcher(factories: List<MainDispatcherFactory>): MainCoroutineDispatcher =
try {
createDispatcher(factories)
} catch (cause: Throwable) {
createMissingDispatcher(cause, hintOnError())
}
MainDispatcherFactory 是一个接口,通过实现类来创建Dispatcher :
public interface MainDispatcherFactory {
//子类实现
public fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher
}
internal class AndroidDispatcherFactory : MainDispatcherFactory {
//由AndroidDispatcherFactory创建HandlerContext,可以看到它是一个主线程调度器
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")
//···
}
我们看到了AndroidDispatcherFactory , Looper.getMainLooper() ,Main 等关键字,毫无疑问这就是主线程调度器:
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
override val immediate: HandlerContext = _immediate ?:
HandlerContext(handler, name, true).also { _immediate = it }
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
}
它们三者的继承关系: HandlerContext ->HandlerDispatcher ->MainCoroutineDispatcher() ->CoroutineDispatcher 。
它里面的isDispatchNeeded() 返回的是true,当协程启动的时候则由HandlerContext 来分发,而它里面的分发工作是通过 handler.post(runnable) 分发给主线程来完成的。在恢复的时候也是通过Dispatchers.Main 这个调度器来恢复。当完成任务之后就会通过HandlerDispatcher 把协程中的代码再次切换到主线程。
5.Dispatchers.IO
public actual object Dispatchers {
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
}
DefaultScheduler 协程调度器的默认调度器,是一个线程调度器,执行阻塞任务,此调度程序与Dispatcher.Default 调度程序共享线程。
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
val IO: CoroutineDispatcher = LimitingDispatcher(
this,
systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
"Dispatchers.IO",
TASK_PROBABLY_BLOCKING
)
}
private class LimitingDispatcher(
private val dispatcher: ExperimentalCoroutineDispatcher,
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {
override val executor: Executor
get() = this
override fun execute(command: Runnable) = dispatch(command, false)
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
private fun dispatch(block: Runnable, tailDispatch: Boolean) {
var taskToSchedule = block
while (true) {
if (inFlight <= parallelism) {
dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
return
}
queue.add(taskToSchedule)
}
}
}
dispatcher.dispatchWithContext() 立即分发任务由ExperimentalCoroutineDispatcher 实现:
public open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
) : ExecutorCoroutineDispatcher() {
override val executor: Executor
get() = coroutineScheduler
private var coroutineScheduler = createScheduler()
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
coroutineScheduler.dispatch(block, context, tailDispatch)
}
private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
}
ExperimentalCoroutineDispatcher 将任务分发到coroutineScheduler.dispatch() 实现。 CoroutineScheduler 就是一个线程池Executor。
internal class CoroutineScheduler(
val corePoolSize: Int,
val maxPoolSize: Int,
) : Executor, Closeable {
override fun execute(command: Runnable) = dispatch(command)
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
val task = createTask(block, taskContext)
val currentWorker = currentWorker()
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
if (!addToGlobalQueue(notAdded)) {
throw RejectedExecutionException("$schedulerName was terminated")
}
}
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
signalCpuWork()
} else {
signalBlockingWork(skipUnpark = skipUnpark)
}
}
}
上面代码主要做了以下几件事:
- 首先是通过
Runnable 构建了一个Task ,这个Task 其实也是实现了Runnable 接口; - 将当前线程取出来转换成
Worker ,这个Worker 是继承自Thread 的一个类; - 将
task 提交到本地队列中; - 如果
task 提交到本地队列的过程中没有成功,那么会添加到全局队列中; - 创建
Worker 线程,并开始执行任务。
class Worker private constructor() : Thread() {
val localQueue: WorkQueue = WorkQueue()
override fun run() = runWorker()
private fun runWorker() {
var rescanned = false
while (!isTerminated && state != WorkerState.TERMINATED) {
val task = findTask(mayHaveLocalTasks)
if (task != null) {
executeTask(task)
continue
} else {
mayHaveLocalTasks = false
}
}
}
private fun executeTask(task: Task) {
val taskMode = task.mode
idleReset(taskMode)
beforeTask(taskMode)
runSafely(task)
afterTask(taskMode)
}
}
fun runSafely(task: Task) {
try {
task.run()
}
}
run方法直接调用的runWorker() ,在里面是一个while循环,不断从队列中取Task 来执行,调用task.run() 。
- 从本地队列或者全局队列中取出
Task 。 - 执行这个task,最终其实就是调用这个
Runnable 的run方法。
也就是说,在Worker 这个线程中,执行了这个Runnable 的run方法。还记得这个Runnable 是谁么?它就是上面我们看过的DispatchedTask ,这里的run方法执行的就是协程任务,那这块具体的run方法的实现逻辑,我们应该到DispatchedTask 中去找。
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
public final override fun run() {
withContinuationContext(continuation, delegate.countOrElement) {
continuation.resume(getSuccessfulResult(state))
}
}
}
run方法执行continuation.resume 恢复协程执行。最后通过executor.execute() 启动线程池。
internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {
override fun dispatch(context: CoroutineContext, block: Runnable) {
try {
executor.execute(wrapTask(block))
} catch (e: RejectedExecutionException) {
unTrackTask()
DefaultExecutor.enqueue(block)
}
}
}
6.Dispatchers.Default
如果不指定调度器,则会默认 DefaultScheduler ,它实际和Dispatchers.IO 是同一个线程调度器,这个是线程调度器:
public actual object Dispatchers {
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
}
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) DefaultScheduler else CommonPool
如果指定了调度器则使用 CommonPool ,表示共享线程的公共池作为计算密集型任务的协程调度程序。
internal object CommonPool : ExecutorCoroutineDispatcher() {
override val executor: Executor
get() = pool ?: getOrCreatePoolSync()
override fun dispatch(context: CoroutineContext, block: Runnable) {
(pool ?: getOrCreatePoolSync()).execute(wrapTask(block))
}
private fun createPlainPool(): ExecutorService {
val threadId = AtomicInteger()
return Executors.newFixedThreadPool(parallelism) {
Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
}
}
CommonPool 中也是创建了一个固定大小的线程池,dispatch() 通过execute() 执行协程任务。
总结如下:
类型 | 调度器实现类 | 说明 |
---|
Dispatchers.Main | HandlerContext | 它里面的isDispatchNeeded() 返回的是true,当协程启动的时候则由HandlerDispatcher来分发,而它里面的分发工作是通过 handler.post(runnable) 来完成的。 | Dispatchers.IO | DefaultScheduler | 它是线程调度器,它里面的isDispatchNeeded() 返回的是true,而它调度任务的时候是通过 executors.execute(runnable) 来执行runnable任务。也就是把协程中的代码块运行到IO线程。 | Dispatchers.Default | DefaultScheduler,CommonPool | 如果不指定调度器,则会默认DefaultScheduler,它实际和Dispatchers.IO 是同一个线程调度器;如果指定调度器,则是CommonPool共享线程池。isDispatchNeeded()都是true,通过 executors.execute(runnable) 来执行runnable任务。 | Dispatchers.Unconfined | Unconfined | 它里面的isDispatchNeeded() 返回的是false,那么它的父类ContinuationInterceptor 就不会把本次任务的调度交给子类来执行,而是由父类在当前线程立刻执行。 |
五、总结
1.协程的三层包装
通过一步步的分析,慢慢发现协程其实有三层包装:
-
常用的launch 和async 返回的Job 、Deferred ,里面封装了协程状态,提供了取消协程接口,而它们的实例都是继承自AbstractCoroutine ,它是协程的第一层包装。 -
第二层包装是编译器生成的 SuspendLambda 的子类,封装了协程的真正运算逻辑,继承自BaseContinuationImpl ,包含了第一层包装,其中completion 就是协程的第一层包装。 -
第三层包装是协程的线程调度时的DispatchedContinuation ,封装了线程调度逻辑,包含了协程的第二层包装。
三层包装都实现了Continuation 接口,通过代理模式将协程的各层包装组合在一起,每层负责不同的功能。
2.协程的挂起与恢复原理
- 在研究协程原理时需要反编译成Java文件,才能看到本质之处。因为有一部分代码是kotlin编译器生成的,在协程源码里是看不出来的。
- 每个挂起点对应于一个case分支(状态机),每调用一次
label 加1;label 的默认初始值为 0,第一次会进入 case 0 分支,挂起函数在返回 COROUTINE_SUSPENDED 时直接 return ,那么方法执行就被结束了,方法就被挂起了。 - 协程体内的代码都是通过
continuation.resumeWith() 调用;获取到真实结果后,回调到 ContinuationImpl 这个类里面的 resumeWith() 方法,会再次调用 invokeSuspend(result) 方法,进入状态机case分支,返回真实结果,方法恢复后,接着恢复协程。 - 所以说,协程的挂起本质上是方法的挂起,而方法的挂起本质上是
return ,协程的恢复本质上方法的恢复,而恢复的本质是 callback 回调。
3.协程的调度原理
- 拦截器在每次(恢复)执行协程体的时候都会拦截协程本体
SuspendLambda ,然后会通过协程分发器的 interceptContinuation() 方法拦截了一个Continuation<T> 并且再返回一个Continuation<T> 。 - 把拦截的代码块封装为任务
DispatchedContinuation ,会通过 CoroutineDispatcher 的 needDispatch() 来判断需不需要分发,由子类的 dispatch(runnable) 方法来实现协程的本次调度工作。
4.协程面试常见问题
协程是一种解决方案,是一种解决嵌套,并发,弱化线程概念的方案。能让多个任务之间更好协作,能够以同步的方式完成异步工作,将异步代码像同步代码一样直观。
协程就像轻量级的线程,协程是依赖于线程,一个线程中可以创建多个协程。协程挂起时不会阻塞线程。线程进程都是同步机制,而协程则是异步。
根据创建协程指定调度器HandlerContext ,DefaultScheduler ,UnconfinedDispatcher 来执行任务,以解决协程中的代码运行在那个线程上。HandlerContext 通过handler.post(runnable) 分发到主线程,DefaultScheduler 本质是通过excutor.excute(runnable) 分发到IO线程。
协程的本质是编译时return+callback,只不过在调度任务时提供了能够运行在IO线程的调度器和主线程的调度器。把协程称为线程框架不够准确。
多任务并发流程控制场景,流程控制比较简单,不会涉及线程阻塞和唤醒,性能比Java并发控制手段高。
点关注,不迷路
好了各位,以上就是这篇文章的全部内容了,很感谢您阅读这篇文章。我是suming,感谢支持和认可,您的点赞就是我创作的最大动力。山水有相逢,我们下篇文章见!
本人水平有限,文章难免会有错误,请批评指正,不胜感激 !
Kotlin协程学习三部曲:
参考链接:
希望我们能成为朋友,在 Github、掘金 上一起分享知识,一起共勉!Keep Moving!
|