Job
使用 launch 函数启动协程后,launch 会返回 job 作为返回值。
可以通过 job 监控协程的生命周期状态,并且控制协程的生命周期。
job 的生命周期状态
定义一个扩展函数 Job.log(),在 log 中打印 isActive、isCancelled、isCompleted 三个状态。然后调用 job.cancel() 取消协程。
fun main() = runBlocking {
val job = launch {
delay(1000L)
}
job.log()
job.cancel()
job.log()
delay(1500L)
}
fun Job.log() {
logX(
"""
isActive = $isActive
isCancelled = $isCancelled
isCompleted = $isCompleted
""".trimIndent()
)
}
fun logX(any: Any?) {
println("""
================================
$any
Thread:${Thread.currentThread().name}
================================
""".trimIndent()
)
}
输出如下
================================
isActive = true
isCancelled = false
isCompleted = false
Thread:main @coroutine#1
================================
================================
isActive = false
isCancelled = true
isCompleted = false
Thread:main @coroutine#1
================================
Job.log() 用来监控协程的生命周期,包括 isActive、isCancelled、isCompleted。 Job.cancel() 用来操控协程的生命周期,取消协程。
当 launch 执行后,协程进入 Active 状态,isActive 为 true。 调用 cancel 后,协程进入 Cancelled 状态,isCancelled 为 true。但此时协程还没有结束运行,因此 isCompleted 为 false。
如果再 delay(500) 后打印协程状态,可以发现 isCompleted 变为 true。
fun main() = runBlocking {
val job = launch {
delay(1000L)
}
job.log()
job.cancel()
job.log()
delay(500)
job.log()
delay(1500L)
job.log()
}
输出如下
================================
isActive = true
isCancelled = false
isCompleted = false
Thread:main @coroutine#1
================================
================================
isActive = false
isCancelled = true
isCompleted = false
Thread:main @coroutine#1
================================
================================
isActive = false
isCancelled = true
isCompleted = true
Thread:main @coroutine#1
================================
================================
isActive = false
isCancelled = true
isCompleted = true
Thread:main @coroutine#1
================================
需要注意的是,调用 cancel 之后, isCancelled 和 isCompleted 都为 true。被取消的状态也被认为是结束状态,Job 的 Cancelled 和 Completed 都对应了 isCompleted 状态。
launch 懒加载启动
launch 方法的第二个参数 start 可以自定义启动方式。
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
start 是 CoroutineStart 类型的枚举值,有 4 个值:
- DEFAULT
- LAZY
- ATOMIC
- UNDISPATCHED
当 start 是 CoroutineStart.LAZY 时,只有真正使用协程才会启动,也就是懒加载模式。
launch 懒加载模式打印协程的生命周期:
fun main() = runBlocking {
val job = launch(start = CoroutineStart.LAZY) {
logX("Coroutine start!")
delay(1000L)
}
delay(500L)
job.log()
job.start()
job.log()
delay(500L)
job.cancel()
delay(500L)
job.log()
delay(2000L)
logX("Process end!")
}
输出结果如下:
================================
isActive = false
isCancelled = false
isCompleted = false
Thread:main @coroutine#1
================================
================================
isActive = true
isCancelled = false
isCompleted = false
Thread:main @coroutine#1
================================
================================
Coroutine start!
Thread:main @coroutine#2
================================
================================
isActive = false
isCancelled = true
isCompleted = true
Thread:main @coroutine#1
================================
================================
Process end!
Thread:main @coroutine#1
================================
可以看出 launch 方法执行后,isActive 为 false。只有当 job.start() 调用后 isActive 才为 true,协程才会真正执行。
监听 Job 结束
有时候协程内部的执行时间超过了调用者的执行时间,导致外部不清楚协程什么时候执行完毕。
launch 执行 4 秒,main 方法执行 3.6 秒。
fun main() = runBlocking {
val job = launch(start = CoroutineStart.LAZY) {
logX("Coroutine start!")
delay(4000L)
logX("Coroutine end!")
}
delay(500L)
job.log()
job.start()
job.log()
delay(1100L)
job.log()
delay(2000L)
logX("Process End!")
}
输出如下:
================================
isActive = false
isCancelled = false
isCompleted = false
Thread:main @coroutine#1
================================
================================
isActive = true
isCancelled = false
isCompleted = false
Thread:main @coroutine#1
================================
================================
Coroutine start!
Thread:main @coroutine#2
================================
================================
isActive = true
isCancelled = false
isCompleted = false
Thread:main @coroutine#1
================================
================================
Process End!
Thread:main @coroutine#1
================================
================================
Coroutine end!
Thread:main @coroutine#2
================================
可以看出 main 方法打印 “Process End!” 之后,协程才打印 “Coroutine end!”。而且 main 方法最后的 isCompleted = false。
join 和 invokeOnCompletion
使用 job.invokeOnCompletion() 监听 job 的结束事件。
使用 job.join() 等待协程执行完毕。
fun main() = runBlocking {
suspend fun download () {
val time = (Random.nextDouble() * 1000).toLong()
logX("Delay time = $time")
delay(time)
}
val job = launch(start = CoroutineStart.LAZY) {
logX("Coroutine start!")
download()
logX("Coroutine end!")
}
delay(500L)
job.log()
job.start()
job.log()
job.invokeOnCompletion {
job.log()
}
job.join()
logX("Process end!")
}
输出如下:
================================
isActive = false
isCancelled = false
isCompleted = false
Thread:main @coroutine#1
================================
================================
isActive = true
isCancelled = false
isCompleted = false
Thread:main @coroutine#1
================================
================================
Coroutine start!
Thread:main @coroutine#2
================================
================================
Delay time = 480
Thread:main @coroutine#2
================================
================================
Coroutine end!
Thread:main @coroutine#2
================================
================================
isActive = false
isCancelled = false
isCompleted = true
Thread:main @coroutine#2
================================
================================
Process end!
Thread:main @coroutine#1
================================
可以看出 job 执行完毕后,main 方法才继续执行,打印 “Process end!”。
Job 的源码
public interface Job : CoroutineContext.Element {
public val isActive: Boolean
public val isCompleted: Boolean
public val isCancelled: Boolean
public fun start(): Boolean
public fun cancel(cause: CancellationException? = null)
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
public fun cancel(): Unit = cancel(null)
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
public fun cancel(cause: Throwable? = null): Boolean
public val children: Sequence<Job>
public fun attachChild(child: ChildJob): ChildHandle
public suspend fun join()
public val onJoin: SelectClause0
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
Job 提供了 isActive、isCompleted、isCancelled 三个方法用来查询协程状态。
使用 start 和 cancel 来控制协程的启动和取消。
使用 join 来等待协程执行完毕。
使用 invokeOnCompletion 提供结束状态监听。
Deferred
async 方式启动的协程会返回 Deferred 结果。
Deferred 继承自 Job,多了 await 挂起方法。
public interface Deferred<out T> : Job {
public suspend fun await(): T
public val onAwait: SelectClause1<T>
@ExperimentalCoroutinesApi
public fun getCompleted(): T
@ExperimentalCoroutinesApi
public fun getCompletionExceptionOrNull(): Throwable?
}
await
await 方法等待 async 协程的执行结果,但是它不会阻塞所在的线程。
await方法将协程任务挂起,执行完毕后恢复执行。如果线程有其他任务,他们不会被阻塞。
fun main() = runBlocking {
val deferred = async {
logX("Coroutine start.")
delay(1000)
logX("Coroutine end.")
"Coroutine result"
}
val result = deferred.await()
logX("Result = $result")
logX("Process end!")
}
输出如下:
================================
Coroutine start.
Thread:main @coroutine#2
================================
================================
Coroutine end.
Thread:main @coroutine#2
================================
================================
Result = Coroutine result
Thread:main @coroutine#1
================================
================================
Process end!
Thread:main @coroutine#1
================================
结构化并发
Job 可以嵌套在另外一个 Job 中执行,这种带有层级关系的执行方式称为结构化并发。
Job parent child
launch 返回 parentJob,内部同时执行了 3 个 Job,每个 Job 是 parentJob 的子 Job。parentJob 的执行时间取决于是 3 个子 Job 中执行时间最长的 Job,也就是 job3。
fun main() = runBlocking {
val parentJob: Job
var job1 : Job? = null
var job2 : Job? = null
var job3 : Job? = null
parentJob = launch {
job1 = launch {
delay(1000)
}
job2 = launch {
delay(3000)
}
job3 = launch {
delay(5000)
}
}
delay(500)
parentJob.children.forEachIndexed { index, job ->
when(index) {
0 -> logX("job === job1 is ${job === job1}")
1 -> logX("job === job2 is ${job === job2}")
2 -> logX("job === job3 is ${job === job3}")
}
}
parentJob.join()
logX("Process end.")
}
输出如下
================================
job === job1 is true
Thread:main @coroutine#1, time:1652948268970
================================
================================
job === job2 is true
Thread:main @coroutine#1, time:1652948268991
================================
================================
job === job3 is true
Thread:main @coroutine#1, time:1652948268991
================================
================================
Process end.
Thread:main @coroutine#1, time:1652948273469
================================
可以看出子 Job 执行完 5 秒后程序结束,包括了 500ms 的 delay。
通过 Job 的源码可以看出 Job 有两个描述 Job 层级的接口:children 和 attachChild。
children 是一个 Sequence,它是惰性集合,用来遍历每一个子 Job。 attachChild 用来绑定子 Job。
public interface Job : CoroutineContext.Element {
...
public val children: Sequence<Job>
public fun attachChild(child: ChildJob): ChildHandle
}
再举一个例子,使用 cancel 代替 join。可以发现每个子 Job 都没有执行完成。
fun main() = runBlocking {
val parentJob: Job
var job1 : Job? = null
var job2 : Job? = null
var job3 : Job? = null
parentJob = launch {
job1 = launch {
logX("coroutine start.")
delay(1000)
logX("coroutine end.")
}
job2 = launch {
logX("coroutine start.")
delay(3000)
logX("coroutine end.")
}
job3 = launch {
logX("coroutine start.")
delay(5000)
logX("coroutine end.")
}
}
delay(500)
parentJob.children.forEachIndexed { index, job ->
when(index) {
0 -> logX("job === job1 is ${job === job1}")
1 -> logX("job === job2 is ${job === job2}")
2 -> logX("job === job3 is ${job === job3}")
}
}
parentJob.cancel()
logX("Process end.")
}
输出如下
================================
coroutine start.
Thread:main @coroutine#3, time:1652950243357
================================
================================
coroutine start.
Thread:main @coroutine#4, time:1652950243388
================================
================================
coroutine start.
Thread:main @coroutine#5, time:1652950243388
================================
================================
job === job1 is true
Thread:main @coroutine#1, time:1652950243874
================================
================================
job === job2 is true
Thread:main @coroutine#1, time:1652950243874
================================
================================
job === job3 is true
Thread:main @coroutine#1, time:1652950243874
================================
================================
Process end.
Thread:main @coroutine#1, time:1652950243903
================================
因为子 Job 在执行过程中被 cancel,所以 "coroutine end."没有打印。
顺序执行与并发执行
在 runBlocking 里面定义 3 个挂起函数,然后依次获取每个函数的执行结果,打印耗时。
fun main() = runBlocking {
suspend fun getResult1(): String {
delay(1000)
return "Result1"
}
suspend fun getResult2(): String {
delay(1000)
return "Result2"
}
suspend fun getResult3(): String {
delay(1000)
return "Result3"
}
val results = mutableListOf<String>()
val time = measureTimeMillis {
results.add(getResult1())
results.add(getResult2())
results.add(getResult3())
}
println("results:$results")
println("time:$time")
}
输出如下
results:[Result1, Result2, Result3]
time:3012
可以看出 3 个挂起函数是顺序执行的,总耗时 3 秒。
可以使用 async-await 优化上面的代码,并发执行。
fun main() = runBlocking {
suspend fun getResult1(): String {
delay(1000)
return "Result1"
}
suspend fun getResult2(): String {
delay(1000)
return "Result2"
}
suspend fun getResult3(): String {
delay(1000)
return "Result3"
}
val results = mutableListOf<String>()
val time = measureTimeMillis {
val deferred1 = async {
getResult1()
}
val deferred2 = async {
getResult2()
}
val deferred3 = async {
getResult3()
}
results.add(deferred1.await())
results.add(deferred2.await())
results.add(deferred3.await())
}
println("results:$results")
println("time:$time")
}
输出如下:
results:[Result1, Result2, Result3]
time:1016
每个 async 启动的协程是并发执行的,总耗时 1 秒。
思考题
以下代码的输出结果是?
fun main() = runBlocking {
val job1 = launch {
println("job1 start.")
delay(1000)
println("job1 end.")
}
job1.join()
val job2 = launch(job1) {
println("job2 start.")
delay(1000)
println("job2 end.")
}
job2.join()
println("process end.")
}
输出如下
job1 start.
job1 end.
process end.
思考: 因为 job2 的 coroutineContext 是 job1,因为 job1 执行完毕后变为了 isCompleted 执行完毕状态。因此由于 Job 的层级关系,父 Job job1 执行完毕,子 Job job2 直接变为 isCancelled 取消状态,因此 job2 根本不会执行。
总结
- Job 的状态有 isActive、isCompleted、isCancelled,可以使用 start、cancel、join、invokeOnCompletion 控制。
- Deferred 继承 Job,多了 await 等待返回执行结果。
- 协程具有结构化并发的特点,这是它除了挂起-恢复的第二大优势。
|