1.协程与普通方法任务调度对比
1.执行串行任务
1.普通方式执行
在子线程中执行耗时操作后,通过接口回调来回调结果,当多个任务串行依赖的时候,就会出现 “回调地狱”
object Normal{
private const val TAG = "Normal"
fun startSerialTask(){
request1 { result1 ->
request2(result1,callback = { result2->
request3(result2,callback = { result3->
Log.e(TAG,"get result = $result3 ")
HiExecutor.executeOnMain(Runnable {
Log.e(TAG,"update ui para = $result3 thread = ${Thread.currentThread().name}")
})
})
})
}
Log.e(TAG,"startSerialTask ...")
}
fun request1( callback:(result:String)->Unit){
HiExecutor.execute(runnable = Runnable {
Log.e(TAG,"request1 start...")
Thread.sleep(1000)
Log.e(TAG,"request1 end...")
callback.invoke("request1")
})
}
fun request2(paras: String, callback:(result:String)->Unit){
HiExecutor.execute(runnable = Runnable {
Log.e(TAG,"request2 start...")
Thread.sleep(2000)
Log.e(TAG,"request2 end...")
callback.invoke("request2 + $paras")
})
}
fun request3(paras: String, callback:(result:String)->Unit){
HiExecutor.execute(runnable = Runnable {
Log.e(TAG,"request3 start...")
Thread.sleep(2000)
Log.e(TAG,"request3 end...")
callback.invoke("request3 + $paras")
})
}
}
日志结果:
2.协程的方式执行
1.场景描述:现在有三个运行在子线程任务依赖关系:a --> b --> c,执行完后更新 UI
object CoroutineSample1 {
private const val TAG = "CoroutineSample1"
fun startSerialScene() {
GlobalScope.launch(Dispatchers.IO) {
Log.e(TAG, "coroutine is running thread = ${Thread.currentThread().name}")
val request1 = request1()
val request2 = request2(request1)
val request3 = request3(request2)
Log.e(TAG, "coroutine is end , result = $request3")
updateUI(request3)
}
Log.e(TAG, "coroutine has launched ")
}
fun updateUI(paras: String){
GlobalScope.launch(Dispatchers.Main) {
Log.e(TAG,"updateUI paras = $paras -- thread = ${Thread.currentThread().name}")
}
}
suspend fun request1(): String {
val threadName = Thread.currentThread().name
Log.e(TAG, "request1 start...$threadName")
delay(1000)
Log.e(TAG, "request1 end...")
return "request1"
}
suspend fun request2(paras: String): String {
val threadName = Thread.currentThread().name
Log.e(TAG, "request2 start... $threadName")
delay(2000)
Log.e(TAG, "request2 end...")
return "request2 + $paras"
}
suspend fun request3(paras: String): String {
val threadName = Thread.currentThread().name
Log.e(TAG, "request3 start... $threadName ")
delay(3000)
Log.e(TAG, "request3 end... ")
return "request3 + $paras"
}
}
执行结果日志: 2.直接在 UI 线程执行三个串行任务后更新 UI
fun startSerialScene2() {
GlobalScope.launch(Dispatchers.Main) {
Log.e(TAG, "coroutine is running thread = ${Thread.currentThread().name}")
val request1 = request1()
val request2 = request2(request1)
val request3 = request3(request2)
Log.e(TAG, "coroutine is end , result = $request3")
}
Log.e(TAG, "coroutine has launched ")
}
日志结果:
2.并行依赖任务
场景描述:在子线程中 request1() --》并行请求 request1() 和 request2() 的结果,最终执行更行 UI 的操作
1.普通的方法执行
普通方法进行多线程的并发控制,可以借助并发工具类 CountDownLatch 来控制,以拿到多个线程的并发结果之后更新UI
object NormalSample2 {
private const val TAG = "NormalSample2"
val countDownLatch = CountDownLatch(2)
fun startAsyncTask() {
Log.e(TAG,"startAsyncTask init... ")
request1 { result1 ->
var result2 = ""
var result3 = ""
request2(result1,callback = { request2->
result2 = request2
countDownLatch.countDown()
})
request3(result1,callback = { request3->
result3 += request3
countDownLatch.countDown()
})
countDownLatch.await()
Log.e(TAG,"startAsyncTask end... ")
updateUI(result2,result3)
}
Log.e(TAG,"startAsyncTask start... ")
}
.....
}
日志结果:
2.协程执行
object CoroutineSample2 {
private const val TAG = "CoroutineSample2"
fun startAsyncScene2() {
GlobalScope.launch(Dispatchers.IO) {
Log.e(TAG, "coroutine is running")
val result1 = request1()
val deferred2 = GlobalScope.async { request2(result1) }
val deferred3 = GlobalScope.async { request3(result1) }
updateUI(deferred2.await(), deferred3.await())
}
Log.e(TAG, "coroutine has started")
}
.....
}
日志结果: 从 logcat 日志可以看到:request2() 和 request3() 同时执行,并且都执行完之后才执行 updateUI 方法。
2.协程挂机与恢复的原理
1.反编译 suspend 方法源码分析
协程的挂起本质上是方法的挂机,协程的恢复本质上也是方法的恢复。
? 但是为什么当前方法已经调用了,却还能被挂起,然后又能自动恢复呢?普通同步方法一旦执行完 return 之后,继续向下执行了,而异步方法则需要通过 callback 来回调异步方法的执行结果。
? 在 kotlin 中写挂起函数的时候并没有传入任何 callback 接口,它是如何使得异步的方法以同步的方式拿到返回结果的呢?
object CoroutineScene2 {
private val TAG: String = "CoroutineScene2"
suspend fun request1(): String {
val request2 = request2()
return "result from request1 $request2";
}
suspend fun request2(): String {
delay(2 * 1000)
Log.e(TAG, "request2 completed")
return "result from request2"
}
}
1.将 CoroutineScene2.kt 代码进行反编译之后
public final class CoroutineScene2 {
private static final String TAG = "CoroutineScene2";
@NotNull
public static final CoroutineScene2 INSTANCE;
@Nullable
public final Object request1(@NotNull Continuation var1) {
Object $continuation;
label20: {
if (var1 instanceof <undefinedtype>) {
$continuation = (<undefinedtype>)var1;
if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
break label20;
}
}
$continuation = new ContinuationImpl(var1) {
Object result;
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return CoroutineScene2.this.request1(this);
}
};
}
Object $result = ((<undefinedtype>)$continuation).result;
Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
Object var10000;
switch(((<undefinedtype>)$continuation).label) {
case 0:
ResultKt.throwOnFailure($result);
((<undefinedtype>)$continuation).label = 1;
var10000 = this.request2((Continuation)$continuation);
if (var10000 == var5) {
return var5;
}
break;
case 1:
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
String request2 = (String)var10000;
return "result from request1 " + request2;
}
@Nullable
public final Object request2(@NotNull Continuation var1) {
Object $continuation;
label20: {
if (var1 instanceof <undefinedtype>) {
$continuation = (<undefinedtype>)var1;
if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
break label20;
}
}
$continuation = new ContinuationImpl(var1) {
Object result;
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return CoroutineScene2.this.request2(this);
}
};
}
Object $result = ((<undefinedtype>)$continuation).result;
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(((<undefinedtype>)$continuation).label) {
case 0:
ResultKt.throwOnFailure($result);
((<undefinedtype>)$continuation).label = 1;
if (DelayKt.delay(2000L, (Continuation)$continuation) == var4) {
return var4;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
Log.e(TAG, "request2 completed");
return "result from request2";
}
private CoroutineScene2() {
}
static {
CoroutineScene2 var0 = new CoroutineScene2();
INSTANCE = var0;
TAG = "CoroutineScene2";
}
}
? 整个函数的挂起和恢复本质就是前面提到的:return + callback 来将我们的异步方法转成同步方法,从 kotlin 代码是看不出来的。需要反编译之后分析。通过方法的递归调用做到挂起和恢复。
2.用Java协程的挂起与恢复
? 上述 CoroutineScene2.kt 中的方法被 suspend 方法修饰后就变成支持挂起的函数,是否被挂起还需要看其内部是否有挂起函数。下面用 Java 代码来复现一下CoroutineScene2.kt 反编译后的 Java代码的执行流程。这样可以直接通过 Debug 进行调试。
public class CoroutineScene2_decompiled {
private static final String TAG = "CoroutineScene2";
public static final Object request1(Continuation preCallback) {
ContinuationImpl request1Callback;
if (!(preCallback instanceof ContinuationImpl) || (((ContinuationImpl) preCallback).label & Integer.MIN_VALUE) == 0) {
request1Callback = new ContinuationImpl(preCallback) {
@Override
Object invokeSuspend(@NotNull Object resumeResult) {
this.result = resumeResult;
this.label |= Integer.MIN_VALUE;
Log.e(TAG, "request1 has resumed " +Thread.currentThread().getName() );
return request1(this);
}
};
} else {
request1Callback = (ContinuationImpl) preCallback;
}
switch (request1Callback.label) {
case 0: {
Object request2 = request2(request1Callback);
if (request2 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
Log.e(TAG, "request1 has suspended " + Thread.currentThread().getName());
return IntrinsicsKt.getCOROUTINE_SUSPENDED();
}
}
}
Log.e(TAG, "request1 completed");
return "result from request1" + request1Callback.result;
}
public static final Object request2(Continuation preCallback) {
ContinuationImpl request2Callback;
if (!(preCallback instanceof ContinuationImpl) || (((ContinuationImpl) preCallback).label & Integer.MIN_VALUE) == 0) {
request2Callback = new ContinuationImpl(preCallback) {
@Override
Object invokeSuspend(@NotNull Object resumeResult) {
this.result = resumeResult;
this.label |= Integer.MIN_VALUE;
Log.e(TAG, "request2 has resumed " + Thread.currentThread().getName());
return request2(this);
}
};
} else {
request2Callback = (ContinuationImpl) preCallback;
}
switch (request2Callback.label) {
case 0: {
Object delay = DelayKt.delay(2000, request2Callback);
if (delay == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
Log.e(TAG, "request2 has suspended" + Thread.currentThread().getName());
return IntrinsicsKt.getCOROUTINE_SUSPENDED();
}
}
}
Log.e(TAG, "request2 completed");
return "result from request2";
}
static abstract class ContinuationImpl<T> implements Continuation<T> {
private Continuation preCallback;
int label;
Object result;
public ContinuationImpl(Continuation preCallback) {
this.preCallback = preCallback;
}
@NotNull
@Override
public CoroutineContext getContext() {
return preCallback.getContext();
}
@Override
public void resumeWith(@NotNull Object resumeResult) {
Object suspend = invokeSuspend(resumeResult);
if (suspend == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
return;
}
preCallback.resumeWith(suspend);
}
abstract Object invokeSuspend(@NotNull Object resumeResult);
}
}
在Java方法中调用上述的 reques1() 方法,传入 Continuation 接口对象: 日志结果: ? 从 Java 代码中可以明显感觉到,所谓的挂起与恢复本质上就是一个 return 和 callback 的方式。可以按照上图代码标注的步骤打断点进行调试。能更加清晰的理解这协程挂起与恢复的流程。
|