1,概述
协程是一个轻量级的线程,将调度从系统线程切换拿到用户态,在一定程度上减少了线程切换开销。
2,简单实例
导入依赖:
dependencies {
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.2"
}
(1)基本使用
//主协程
fun main() = runBlocking {
//开启一个新协程
val job = launch {
delay(1000)
println("task-1")
}
println("task-0")
job.join()//等待job执行完毕,如果此处不join,会退出主协程
}
(2)开启多个协程
//开启10万次,
repeat(100_000){
launch {
println("task-t")
}
}
(3)async
//定义挂机函数suspend,此定义后可以运行在协程中
suspend fun task1(): Int {
delay(200)
return 0
}
suspend fun task2(): Int {
delay(300)
return 1
}
//主协程
fun main() = runBlocking {
val result1 = async { task1() }
val result2 = async { task2() }
println("wait task result: ${result1.await()} ${result2.await()}")
}
(4)withContext
//主协程
fun main() = runBlocking {
//指定协程运行的线程,
//Main IO Default Unconfined 四个默认值
withContext(Dispatchers.Unconfined) {
println("Unconfined Dispatchers")
}
}
(5)协程回调
job.invokeOnCompletion {
println("task-1 done!")
}
3,协程通信
(1)channel
suspend fun main(){
val channel = Channel<Int>()
val producer = GlobalScope.launch {
var i = 0
while (true){
delay(100)
channel.send(i++)
}
}
val consumer = GlobalScope.launch {
while (true){
println(channel.receive())
}
}
producer.join()
consumer.join()
}
上述代码中构造了两个协程producer和consumer,由于没有为它们明确指定调度器,所以它们的调度器都是默认的,在Java平台上就是基于线程池实现的Default。它们可以运行在不同的线程上,也可以运行在同一个线程上。
?(2)channel迭代
下例中,iterator.hasNext()是挂起函数,在判断是否有下一个元素的时候就需要去Channel中读取元素了。
suspend fun main() {
val channel = Channel<Int>()
val producer = GlobalScope.launch {
var i = 0
while (i <= 5) {
delay(100)
println("send before")
channel.send(i++)
println("send after")
}
//关闭通道
channel.close()
}
val consumer = GlobalScope.launch {
val iterator = channel.iterator()
while (iterator.hasNext()) {
println(iterator.next())
}
println("exit consumer")
}
producer.join()
consumer.join()
}
send、receiver也是挂起函数,可以通过输出了解,
send before
send after
0
send before
send after
1
send before
send after
2
send before
send after
3
send before
send after
4
send before
send after
5
exit consumer
(3)生产-消费API[实验阶段]
suspend fun main() {
//创建生产者
val receiveChannel: ReceiveChannel<Int> = GlobalScope.produce {
repeat(5) {
delay(100)
send(1)
}
}
//接收数据
GlobalScope.launch {
val iterator = receiveChannel.iterator()
while (iterator.hasNext()) {
println(iterator.next())
}
}
//创建消费者
val sendChannel: SendChannel<Int> = GlobalScope.actor {
while (true) {
println(receive())
}
}
// 发生数据
sendChannel.send(0)
}
(4)BroadcastChannel
发送端和接收端在Channel中存在一对多的情形,从数据处理本身来讲,虽然有多个接收端,但是同一个元素只会被一个接收端读到。广播则不然,多个接收端不存在互斥行为。
suspend fun main() {
val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
GlobalScope.launch {
List(3) {
delay(100)
broadcastChannel.send(it)
}
broadcastChannel.close()
}
List(3) {
GlobalScope.launch {
//订阅该广播channel
val receiveChannel = broadcastChannel.openSubscription()
//下列for循环,是重写了iterator方法,实质是while循环逻辑
//val iterator = iterator()
//while (iterator.hasNext()) {
// println(iterator.next())
//}
for (i in receiveChannel) {
println("[#$it] received: $i")
}
}
}.joinAll()//创建的三个协程全部join
}
输出如下,三个协程输出无顺序,
[#1] received: 0
[#0] received: 0
[#2] received: 0
[#1] received: 1
[#2] received: 1
[#0] received: 1
[#1] received: 2
[#2] received: 2
[#0] received: 2
除了直接创建以外,我们也可以用前面定义的普通Channel进行转换
suspend fun main() {
val channel = Channel<Int>()
val broadcast = channel.broadcast(3)
List(3) {
GlobalScope.launch {
delay(100) //注意,如果不delay,没有订阅者,send发送的信息会被丢弃
broadcast.send(it)
}
}
List(3){
GlobalScope.launch {
val receiveChannel = broadcast.openSubscription()
for (i in receiveChannel) {
println("[#$it] received: $i")
}
}
}.joinAll()
}
(5)Flow
随着RxJava的流行,响应式编程模型逐步深入人心。Flow就是Kotlin协程与响应式编程模型结合的产物。
fun main() = runBlocking {
val intFlow = flow<Int> {
(1..3).forEach {
emit(it)
println("emit:" + Thread.currentThread().name)
delay(100)
}
}
GlobalScope.launch {
// 手动设置调度器
intFlow.flowOn(Dispatchers.IO).collect {
println("Thread->${Thread.currentThread().name} $it")
}
}.join()
}
|