此文协程特指在Android平台上的kotlin协程实现,基于1.5.2版本kotlin。
协程基础知识
简单过一遍协程的基础类图:
- Job:协程的唯一标识,用来控制协程的生命周期(new、active、completing、completed、cancelling、cancelled)
- CoroutineDispatcher:指定协程运行的线程(IO、Default、Main、Unconfined)
- CoroutineName:指定协程的名称,默认为coroutine,调试用
- CoroutineExceptionHandle: 指定协程的异常处理器,用来处理未捕获的异常
- CombineContext: CoroutineContext通过CombineContext与+、-的运算符重载,实现了左链表结构的组织,同时CoroutineContext自身也可以根据重载的[]get方法,实现集合与map的特性。
协程启动
最常使用的协程构建器无外乎四种:
- runblocking:普通阻塞式函数,直接用GlobalScope执行阻塞式协程,一般只会用在单元测试中:
fun test() = runBlocking {
}
小tips:单元测试完成后,自定义的打印在测试结果的最底部。
- withContext:不创建协程,直接在当前协程上执行代码块,并等待执行完毕,返回结果。一般应用于如主线程需要等待异步线程上传完图片后,再进行下一步操作等,需要等待切换线程后作业的操作。
fun testWithContext() = runBlocking {
var str = "ori"
str = withContext(Dispatchers.Default) {
delay(1000)
"1000ori"
}
println(str)
}
在实现方面,withContext会将给定的CoroutineContext与当前协程的CoroutineContext结合,因此可以做到诸如替换CoroutineDispatcher等实现线程切换的操作。
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
val oldContext = uCont.context
val newContext = oldContext + context
...
}
- lanch:开启新的子协程,执行不阻塞当前线程的作业,不返回结果。
fun testLaunch1() = runBlocking {
var str = "ori"
launch {
delay(1000)
str = "1000ori"
}
println(str)
}
ori
可以发现,launch只是开启了新的子协程,但是父协程直接继续执行,如果需要等待子协程完成,则需要使用join:
fun testLaunch2() = runBlocking {
var str = "ori"
val c = launch {
delay(1000)
str = "1000ori"
}
logTime(str)
c.join()
logTime(str)
}
var lastTime = 0L
fun logTime(str: String) {
if (lastTime == 0L) {
println("0-$str")
} else {
println("${System.currentTimeMillis() - lastTime}-$str")
}
lastTime = System.currentTimeMillis()
}
0-ori
1015-1000ori
join会立即挂起协程,等待子协程执行完成。注意,如果仅仅是需要等待执行操作完毕的作业,直接使用withContext,而不是使用launch{…}.join()。
- async:开启新的子协程,通过Deffer返回作业结果。相较于launch,async更进一步,用于异步执行耗时作业,并且需要返回值(如网络请求、数据库读写、文件读写),在执行完毕通过 await() 函数获取返回值。
fun testAsync() = runBlocking {
var str = "ori"
val c = async {
delay(1000)
"1000ori"
}
logTime(str)
str = c.await()
logTime(str)
}
0-ori
1013-1000ori
启动模式
下面重点来看下launch函数定义:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
其中 1.参数一CoroutineContext可以设置 CoroutineDispatcher 协程运行的线程调度器(默认是Dispatchers.Default)等协程上下文 2.参数二CoroutineStart指定了协程启动模式,其中
- DEFAULT:默认模式,根据CoroutineContext立即调度协程(使用Dispatcher定义的线程),多数情况用这个就行了
- LAZY:懒加载模式,使用join()、await()等时才开始调度
- UNDISPATCHED,不常用,立即在当前线程调度,直到遇见第一个挂起函数后,由CoroutineContext定义的Dispatcher调度协程
var str = "ori"
var startTime = 0L
fun testCoroutineStart() = runBlocking {
logTime()
var str = "ori"
val c = launch(
context = Dispatchers.IO + CoroutineName("TEST"),
start = CoroutineStart.UNDISPATCHED
) {
logTime("launch1")
delay(1000)
str = "1000ori"
logTime("launch2")
}
logTime("main1")
c.join()
logTime("main2")
}
@ExperimentalStdlibApi
private suspend fun logTime(tag: String? = null) {
if (startTime == 0L) {
System.out.format(
"%5s |%10s |%15s |%30s |%10s |%10s %n",
"Time",
"Tag",
"CoroutineName",
"Dispatcher",
"Thread",
"Msg"
)
startTime = System.currentTimeMillis()
} else {
System.out.format(
"%5s |%10s |%15s |%30s |%10s |%10s %n",
"${System.currentTimeMillis() - startTime}",
tag,
"${coroutineContext[CoroutineName]?.name}",
"${coroutineContext[CoroutineDispatcher]}",
"${Thread.currentThread().id}",
msg
)
}
}
Time | Tag | CoroutineName | Dispatcher | Thread | Msg
2 | launch1 | TEST | Dispatchers.IO | 11 | ori
7 | main1 | null | BlockingEventLoop@3cc8b7e6 | 11 | ori
1027 | launch2 | TEST | Dispatchers.IO | 15 | 1000ori
1032 | main2 | null | BlockingEventLoop@3cc8b7e6 | 11 | 1000ori
可以发现,在delay之前子协程是直接在原线程执行的,delay时交由Dispatchers.IO调度线程,这一点与Dispatchers.Unconfined类似,但是挂起点resume后,UNDISPATCHED会使协程交由 CoroutineContext的CoroutineDispatche调度。 附带个CoroutineDispatche的例子:
- ATOMIC,试验性,一般不使用,类似于DEFAULT,但是此协程在被执行前无法被取消。
协程生命周期
launch返回Job对象: Job控制协程的生命周期,其有三个状态变量:
- isActive 当Job处于活动状态时为true,如果Job已经开始,但还没有完成、也没有取消或者失败,则是处于active状态
- isCompleted 当Job由于任何原因完成时为true,已取消、已失败和已完成Job都是被视为完成状态
- isCancelled 当Job由于任何原因被取消时为true,无论是通过显式调用cancel或这因为它已经失败亦或者它的子或父被取消,都是被视为已退出状态
dispatched
complete
finish
cancel/fail
cancel/fail
finish
new
active<isActive=true, isCompleted=false, isCancelled=false>
completing, wait for child<isActive=true, isCompleted=false, isCancelled=false>
completed<isActive=false, isCompleted=true, isCancelled=false>
cancelling, wait for child<isActive=true, isCompleted=false, isCancelled=false>
canceled<isActive=false, isCompleted=true, isCancelled=true>
由于协程是结构化的,因此在completing和cancelling时,会等待所有子协程完成。
协程取消
协程取消一般使用cancel()或cancelAndJoin()函数:
fun testCancel() = runBlocking {
val c = launch(Dispatchers.Default) {
var i = 0
while (i < 5) {
println("num ${i++}")
delay(500)
}
}
delay(1200)
println("try cancel")
c.cancelAndJoin()
println("end")
}
num 0
num 1
num 2
try cancel
end
一段协程代码必须协作才能被取消,所有kotlinx.coroutines包中的挂起函数都是可被取消的。协程取消时,会检查子协程的取消,并在取消时抛出CancellationException,CancellationException被默认处理,不会引发协程抛出异常。 然而,如果协程正在执行计算任务,并且没有检查取消的话,那么它是不能被取消的:
fun testCancelCpu() = runBlocking {
val c = launch(Dispatchers.Default) {
var i = 0
var nextPrintTime = System.currentTimeMillis()
while (i < 5) {
if (System.currentTimeMillis() > nextPrintTime) {
println("num ${i++}")
nextPrintTime += 500
}
}
}
delay(1200)
println("try cancel")
c.cancelAndJoin()
println("end")
}
num 0
num 1
num 2
try cancel
num 3
num 4
end
可以看出,在cancelAndJoin()之后,由于while还在不断占用CPU,所以还是会继续执行完毕(类似线程的cancel),针对这种情况,可以使用
- 使用Job.isActived或ensureActive() 判断
fun testCancelCpu1() = runBlocking {
val c = launch(Dispatchers.Default) {
var i = 0
var nextPrintTime = System.currentTimeMillis()
while (i < 5) {
if (!isActive) {
return@launch
}
if (System.currentTimeMillis() > nextPrintTime) {
println("num ${i++}")
nextPrintTime += 500
}
}
}
delay(1200)
println("try cancel")
c.cancelAndJoin()
println("end")
}
num 0
num 1
num 2
try cancel
end
- yeild(),放弃的意思,表现为暂时让出执行权,函数定义为:
public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
val context = uCont.context
context.ensureActive()
val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
if (cont.dispatcher.isDispatchNeeded(context)) {
cont.dispatchYield(context, Unit)
} else {
val yieldContext = YieldContext()
cont.dispatchYield(context + yieldContext, Unit)
if (yieldContext.dispatcherWasUnconfined) {
return@sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit
}
}
COROUTINE_SUSPENDED
}
说人话就是挂起当前任务(Job),释放此线程,让其他正在等待的任务公平的竞争获得执行权。 由于yield是个suspend函数,所以肯定也可以感知到cancel()被执行,进而实现协程取消:
fun testCancelCpu1() = runBlocking {
val c = launch(Dispatchers.Default) {
var i = 0
var nextPrintTime = System.currentTimeMillis()
while (i < 5) {
if (System.currentTimeMillis() > nextPrintTime) {
println("num ${i++}")
nextPrintTime += 500
}
yield()
}
}
delay(1200)
println("try cancel")
c.cancelAndJoin()
println("end")
}
num 0
num 1
num 2
try cancel
end
在协程取消,需要释放文件、数据库等资源时,可以在finaly中释放:
fun testCancelRelease() = runBlocking {
val c = launch(Dispatchers.Default) {
try {
println("reading from stream")
delay(3000)
println("reading end")
} finally {
println("finally release stream")
}
}
delay(1000)
println("try cancel")
c.cancelAndJoin()
println("end")
}
reading from stream
try cancel
finally release stream
end
对于实现了Closeable接口的类,如各种Stream、Buffer等,可以直接使用.use{}实现自动在finally中调用close()方法。
public inline fun <T : Closeable?, R> T.use(block: (T) -> R): R {
...
}
fun testCancelRelease() = runBlocking {
val c = launch(Dispatchers.Default) {
FileInputStream(File("build.gradle")).use {
println("reading from stream")
delay(3000)
println("reading end")
}
}
delay(1000)
println("try cancel")
c.cancelAndJoin()
println("end")
}
reading from stream
try cancel
end
特别注意,在finally中,调用挂起函数会直接抛出 CancellationException,因为挂起函数都是可取消的:
fun testCancelRelease() = runBlocking {
val c = launch(Dispatchers.Default) {
try {
println("reading from stream")
delay(3000)
println("reading end")
} finally {
println("finally release stream")
delay(2000)
println("release end")
}
}
delay(1000)
println("try cancel")
c.cancelAndJoin()
println("end")
}
reading from stream
try cancel
finally release stream
end
如果确实需要在finally中执行挂起,可以使用withContext(NonCancellable) {}执行:
fun testCancelRelease() = runBlocking {
val c = launch(Dispatchers.Default) {
try {
println("reading from stream")
delay(3000)
println("reading end")
} finally {
withContext(NonCancellable) {
println("finally release stream")
delay(2000)
println("release end")
}
}
}
delay(1000)
println("try cancel")
c.cancelAndJoin()
println("end")
}
reading from stream
try cancel
finally release stream
release end
end
此外,还可以使用withTimeout执行指定超时时间的等待作业,如果不希望超时后会抛出超时异常,可以使用withTimeoutOrNull在超时时返回null。
参考资料
揭秘kotlin协程中的CoroutineContext kotlin学习
|