本文参考自Jetbrains Kotlin官方文档

在我看来,Kotlin可以作为Java的强化版,在引入现代化语法的同时,还完美兼容了Java丰富的生态

而协程作为Kotlin最重要的语言特性,自然不可忽视,他进一步完善了异步模型,让异步编程更加简单,优雅

由于项目需要,重新复习一下Kotlin协程并查缺补漏,并在此记录一下

协程基础

基本协程程序

1
2
3
4
5
6
7
8
fun main() {
    GlobalScope.launch { // 在后台启动一个新协程,并继续执行之后的代码
        delay(1000L) // 非阻塞式地延迟一秒
        println("World!") // 延迟结束后打印
    }
    println("Hello,") //主线程继续执行,不受协程 delay 所影响
    Thread.sleep(2000L) // 主线程阻塞式睡眠2秒,以此来保证JVM存活
}

在本段代码中,GlocalScope.launch代码块中的协程作用域为全局作用域,只要程序还在运行该协程就可以一直运行

开启协程后线程不会堵塞,即使协程没有执行完毕县城也会结束,因此要在协程结束后阻塞主线程等待协程执行完毕

delay()是一个挂起函数,挂起函数会暂停当前协程,并等待一段时间后恢复,不阻塞线程

Thread.sleep()是阻塞式函数,会阻塞当前线程,并等待一段时间后继续执行

一个线程上可以运行多个协程,挂起函数不会阻塞线程,只会将当前线程调整为执行另外一个协程,所以线程不会因为协程延时而阻塞

桥接阻塞与非阻塞

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
fun main() { 
    GlobalScope.launch { // 启动一个新协程
        delay(1000L)
        println("World!")
    }
    println("Hello,")
    runBlocking {
        delay(2000L)
    } 
}

runBlocking()可以将一个协程转换为阻塞式,阻塞式代码会阻塞当前线程,直到协程执行完毕

1
2
3
4
5
6
7
8
fun main() = runBlocking<Unit> {
    GlobalScope.launch {
        delay(1000L)
        println("World!")
    }
    println("Hello,")
    delay(2000L)
}

也可以将整个函数体转换为阻塞式,阻塞式代码会阻塞当前协程,直到协程执行完毕

等待作业

1
2
3
4
5
6
7
8
fun main() = runBlocking {
    val job = GlobalScope.launch {
        delay(1000L)
        println("World!")
    }
    println("Hello,")
    job.join()
}

阻塞线程等待作业完成通常是不合理的,可以通过join()等待协程完成

join()是一个挂起函数,必须在协程或挂起函数作用域中运行

结构化并发

GlobalScope.launch()会创建一个顶级协程,生命周期跟随全局,如果不处理会一直运行到程序停止,因此我们可以选择结构化并发在特定范围启动协程

1
2
3
4
5
6
7
fun main() = runBlocking {
    launch {
        delay(1000L)
        println("World!")
    }
    println("Hello,")
}

runBlocking()中使用launch()启动一个线程,线程在其所属作用域中不会结束,launch()函数饮食持有和runBlocking()相同的协程作用域

作用域构造器

coroutineScope()可以声明一个协程作用域,直到所有启动的协程完成后才结束

coroutineScope()是一个挂起函数,而runBlocking()是一个阻塞式函数,两者都会阻塞代码块执行,前者不会阻塞线程,而后者会阻塞线程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
fun main() = runBlocking {
    launch { 
        delay(200L)
        println("Task from runBlocking")
    }
    coroutineScope {
        launch {
            delay(500L) 
            println("Task from nested launch")
        }
        delay(100L)
        println("Task from coroutine scope")
    }
    println("Coroutine scope is over")
}

提取函数并重构

1
2
3
4
5
6
7
8
9
fun main() = runBlocking {
    launch { doWorld() }
    println("Hello,")
}

suspend fun doWorld() {
    delay(1000L)
    println("World!")
}

这种模式可以提取出函数,让代码更清晰,但是提取后的函数不包括在当前作用域中调用的构建器,因此有两种比较好的解决办法用来传递构建器

构建器参数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
fun main() = runBlocking {
    launch { doWorld(this) }
    println("Hello,")
}

fun doWorld(scope: CoroutineScope) {
    scope.launch {
        delay(1000L)
        println("World!")
    }
}

拓展函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
fun main() = runBlocking {
    launch { doWorld() }
    println("Hello,")
}

fun CoroutineScope.doWorld() {
    launch {
        delay(1000L)
        println("World!")
    }
}

协程的轻量型

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
var i = 0

fun main() = runBlocking {
    repeat(100_000) {
        launch {
            i++
            print(i)
            delay(1000L)
        }
    }
}

这里同时启动100000个协程,但是每次只打印一个,说明协程是轻量级的,不会阻塞线程

但是这里有个小bug,因为协程是并发的,因为多个携程都在争夺控制权,可能导致数据出错,可以使用kotlin提供的原子操作解决

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
var i = AtomicInteger(0)

fun main() = runBlocking {
    repeat(100_000) {
        launch {
            i.incrementAndGet() // 原子递增操作
            print(i.get())
            delay(1000L)
        }
    }
}

全局协程类似于守护线程

1
2
3
4
5
6
7
8
9
fun main() = runBlocking {
    GlobalScope.launch {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L)
}

GlobalScope.launch()生命周期依附于全局作用域,如果进程结束随即消亡

取消与超时

取消携程执行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
fun main() = runBlocking {
    val job = launch {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel()
    job.join()
    println("main: Now I can quit.")
}

通过cancel()取消协程,join()等待协程执行完毕,也可以使用cancelAndJoin()同时执行取消和等待

取消操作是协作完成的

kotlin中所有的挂起函数都是可以取消的,挂起函数的原理是在运行时检查是否有取消操作,但是如果协程正在执行计算任务且未检查是否处于取消状态则无法取消协程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
fun main() = runBlocking {
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = System.currentTimeMillis()
        var i = 0
        while (i < 5) {
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("job: I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin()
    println("main: Now I can quit.")
}

使计算操作可取消

可以在挂起函数中检查是否处于取消状态,也可以使用yield()挂起函数让出协程控制权

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
fun main() = runBlocking {
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (isActive) {//检查是否处于取消状态
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("job: I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L)
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
fun main() = runBlocking {
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = System.currentTimeMillis()
        var i = 0
        while (i < 5) {
            yield()//让出协程控制权
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("job: I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin()
    println("main: Now I can quit.")
}

用finally关闭资源

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
fun main() = runBlocking {
    //sampleStart
    val job = launch {
        try {
            repeat(1000) { i ->
                println("job: I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            println("job: I'm running finally")
        }
    }
    delay(1300L)
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
}

join()等待协程执行完毕并释放资源后,再继续执行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
fun main() = runBlocking {
    val job = launch {
        try {
            repeat(1000) { i ->
                println("job: I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            withContext(NonCancellable) {
                println("job: I'm running finally")
                delay(1000L)
                println("job: And I've just delayed for 1 sec because I'm non-cancellable")
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
}

执行不可取消的代码块

通过withContext()函数与NonCancellable上下文包裹代码块设置为不可取消,即使协程处于取消状态,代码块仍然可以执行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
fun main() = runBlocking {
    //sampleStart
    val job = launch {
        try {
            repeat(1000) { i ->
                println("job: I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            withContext(NonCancellable) {
                println("job: I'm running finally")
                delay(1000L)
                println("job: And I've just delayed for 1 sec because I'm non-cancellable")
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
    //sampleEnd    
}

超时

1
2
3
4
5
6
7
8
fun main() = runBlocking {
    withTimeout(1300L) {//超时
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
}

上方代码会抛出TimeoutCancellationException异常,可以通过try catch捕获

但是如果不想抛出异常,可以使用withTimeoutOrNull(),若超时,直接返回null

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
fun main() = runBlocking {
    val result = withTimeoutOrNull(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
        "Done"
    }
    println("Result is $result")
}

组合和挂起函数

默认顺序

在协程中代码和常规代码一样,默认按照顺序执行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
fun main() = runBlocking<Unit> {
    //sampleStart
    val time = measureTimeMillis {
        val one = doSomethingUsefulOne()
        val two = doSomethingUsefulTwo()
        println("The answer is ${one + two}")
    }
    println("Completed in $time ms")
}

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L)
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L)
    return 29
}

async并发

async类似于launch,但是async返回一个Deferred对象,可以通过await()获取到返回值,并且可以等待执行完成,而launch返回Job对象,无法获取到返回值,并且不能等待执行完成

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async { doSomethingUsefulOne() }
        val two = async { doSomethingUsefulTwo() }
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")
}

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L)
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L)
    return 29
}

运行时间几乎减半,因为两个函数是同时运行的,总时间几乎取决于耗时最长的任务

惰性启动 async

可以执行start参数为CoroutineStart.lazy,这样只有主动调用await()或者start()才会执行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
fun main() = runBlocking<Unit> {
    //sampleStart
    val time = measureTimeMillis {
        val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
        val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
        // some computation
        one.start() // start the first one
        two.start() // start the second one
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")
    //sampleEnd    
}

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // pretend we are doing something useful here
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // pretend we are doing something useful here, too
    return 29
}

如果直接在println()中调用await()会导致挂起函数顺序执行,因为await()会等待程序完成,所以必须事先启动协程

异步风格函数

有时可能会需要在非协程上下文中执行挂起函数,可以将挂起函数包裹在GlobalScope.async()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
fun main() {
    val time = measureTimeMillis {
        val one = somethingUsefulOneAsync()
        val two = somethingUsefulTwoAsync()
        runBlocking {
            println("The answer is ${one.await() + two.await()}")
        }
    }
    println("Completed in $time ms")
}
//sampleEnd

fun somethingUsefulOneAsync() = GlobalScope.async {
    doSomethingUsefulOne()
}

fun somethingUsefulTwoAsync() = GlobalScope.async {
    doSomethingUsefulTwo()
}

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // pretend we are doing something useful here
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // pretend we are doing something useful here, too
    return 29
}

但是这种情况是不推荐的,由于GlobalScope生命周期跟随整个程序,如果挂起函数被取消,XXXAsync()函数仍然会继续执行

基于Async实现结构化并发

可以使用coroutineScope()搭配async实现结构化并发,如果coroutineScope()发生异常,则内部所有协程都将被取消

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
fun main() = runBlocking<Unit> {
    //sampleStart
    val time = measureTimeMillis {
        println("The answer is ${concurrentSum()}")
    }
    println("Completed in $time ms")
    //sampleEnd    
}

suspend fun concurrentSum(): Int = coroutineScope {
    val one = async { doSomethingUsefulOne() }
    val two = async { doSomethingUsefulTwo() }
    one.await() + two.await()
}

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // pretend we are doing something useful here
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // pretend we are doing something useful here, too
    return 29
}

协程取消也是按照协程层次结构进行传播

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
fun main() = runBlocking<Unit> {
    try {
        failedConcurrentSum()
    } catch(e: ArithmeticException) {
        println("Computation failed with ArithmeticException")
    }
}

suspend fun failedConcurrentSum(): Int = coroutineScope {
    val one = async<Int> {
        try {
            delay(Long.MAX_VALUE) // Emulates very long computation
            42
        } finally {
            println("First child was cancelled")
        }
    }
    val two = async<Int> {
        println("Second child throws an exception")
        throw ArithmeticException()
    }
    one.await() + two.await()
}

取消顺序为two -> failedConcurrentSum -> one -> runBlocking

协程上下文和调度器

协程总是运行在某个CoroutineContext表示的某个上下文中,上下文包括多种元素,主要的元素是协程作业job和调度器Dispatche

调度器和线程

协程上下文包含一个协程调度器,协程调度器用于确定协程运行与哪个线程或线程池甚至无限制运行

所有协程构造器都可以接受一个可选参数,用于显式指定调度器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
fun main() = runBlocking<Unit> {
    launch { // 继承外部上下文
        println("main runBlocking      : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Unconfined) { // 未定义协程
        println("Unconfined            : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Default) { // 默认线程池
        println("Default               : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(newSingleThreadContext("MyOwnThread")) { // 自定义线程
        println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
    }
}

当协程构造器不指定调度器时,会默认继承外部协程的调度器

使用协程构造器启动时默认指定的调度器就是(Dispatchers.default),因此launch(Dispatchers.default){...}GlobalScope.launch{...}是使用相同的调度器

未定义调度器Unconfined

Dispatchers.Unconfined调度器是一个特殊的调度器,他仅仅运行到第一个挂起点,如delay(),挂起后会恢复到调用者线程的协程上下文

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
fun main() = runBlocking<Unit> {
    //sampleStart
    launch(Dispatchers.Unconfined) { // not confined -- will work with main thread
        println("Unconfined      : I'm working in thread ${Thread.currentThread().name}")
        delay(500)
        println("Unconfined      : After delay in thread ${Thread.currentThread().name}")
    }
    launch { // context of the parent, main runBlocking coroutine
        println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
        delay(1000)
        println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
    }
}

输出结果为

1
2
3
4
Unconfined      : I'm working in thread main
main runBlocking: I'm working in thread main
Unconfined      : After delay in thread kotlinx.coroutines.DefaultExecutor
main runBlocking: After delay in thread main

由此可见,当未指定调度器执行到挂起点后,如果恢复,则运行在调用者线程

线程与协程调试

启动时指定-Dkotlinx.coroutines.debug开启调试模式,在调试器中可以看到协程的堆栈信息

首先配置虚拟机全局变量,加上-Dkotlinx.coroutines.debug

然后在启动配置虚拟机选项中添加-Dkotlinx.coroutines.debug=on

执行launch(newSingleThreadContext("MyOwnThread")) {...}时,在调试器中可以看到协程的堆栈信息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main() = runBlocking<Unit> {
    //sampleStart
    val a = async {
        log("I'm computing a piece of the answer")
        6
    }
    val b = async {
        log("I'm computing another piece of the answer")
        7
    }
    log("The answer is ${a.await() * b.await()}")
    //sampleEnd    
}

输出结果

1
2
3
[main @coroutine#2] I'm computing a piece of the answer
[main @coroutine#3] I'm computing another piece of the answer
[main @coroutine#1] The answer is 42

线程间的切换

withContext可以主动更改协程上下文,ues函数会自动关闭线程释放资源

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
fun main() {
    //sampleStart
    newSingleThreadContext("Ctx1").use { ctx1 ->
        newSingleThreadContext("Ctx2").use { ctx2 ->
            runBlocking(ctx1) {
                log("Started in ctx1")
                withContext(ctx2) {
                    log("Working in ctx2")
                }
                log("Back to ctx1")
            }
        }
    }
    //sampleEnd
}

打印结果为

1
2
3
[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1

上下文中的job

1
2
3
4
5
fun main() = runBlocking<Unit> {
    //sampleStart
    println("My job is ${coroutineContext[Job]}")
    //sampleEnd    
}

CoroutineScopeisActive属性只是coroutineContext[Job]?.isActive == true的一种简便写法

子协程

当一个协程在另一个协程作用域中启动时,会通过CoroutineScope.coroutineContext继承其上下文,新的协程的job也会成为父协程job的子job

当父协程被取消时,子协程也会被取消

但是基于GlobalScope启动的协程,不会继承父协程的job,不受父协程作用域独立运作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
fun main() = runBlocking<Unit> {
    //sampleStart
    // launch a coroutine to process some kind of incoming request
    val request = launch {
        // it spawns two other jobs, one with GlobalScope
        GlobalScope.launch {
            println("job1: I run in GlobalScope and execute independently!")
            delay(1000)
            println("job1: I am not affected by cancellation of the request")
        }
        // and the other inherits the parent context
        launch {
            delay(100)
            println("job2: I am a child of the request coroutine")
            delay(1000)
            println("job2: I will not execute this line if my parent request is cancelled")
        }
    }
    delay(500)
    request.cancel() // cancel processing of the request
    delay(1000) // delay a second to see what happens
    println("main: Who has survived request cancellation?")
    //sampleEnd
}

父协程的职责

父协程会等待所有子协程完成,无需通过join或者await来等待子协程完成

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
fun main() = runBlocking<Unit> {
    //sampleStart
    // launch a coroutine to process some kind of incoming request
    val request = launch {
        repeat(3) { i -> // launch a few children jobs
            launch  {
                delay((i + 1) * 200L) // variable delay 200ms, 400ms, 600ms
                println("Coroutine $i is done")
            }
        }
        println("request: I'm done and I don't explicitly join my children that are still active")
    }
    request.join() // wait for completion of the request, including all its children
    println("Now processing of the request is complete")
    //sampleEnd
}

输出结果

1
2
3
4
5
request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete

协程命名

需要开启-Dkotlinx.coroutines.debug

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main() = runBlocking(CoroutineName("main")) {
    //sampleStart
    log("Started main coroutine")
    // run two background value computations
    val v1 = async(CoroutineName("v1coroutine")) {
        delay(500)
        log("Computing v1")
        252
    }
    val v2 = async(CoroutineName("v2coroutine")) {
        delay(1000)
        log("Computing v2")
        6
    }
    log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
    //sampleEnd    
}

组合上下文元素

1
2
3
4
5
6
7
fun main() = runBlocking<Unit> {
    //sampleStart
    launch(Dispatchers.Default + CoroutineName("test")) {
        println("I'm working in thread ${Thread.currentThread().name}")
    }
    //sampleEnd    
}

构建协程作用域

通过CoroutineScope构建协程作用域及其拓展函数实现

1
2
3
4
5
6
7
8
class Activity {
    private val mainScope = MainScope()
    
    fun destroy() {
        mainScope.cancel()
    }
    // to be continued ...
}

也可以通过委托实现

1
2
    class Activity : CoroutineScope by CoroutineScope(Dispatchers.Default) {
    // to be continued ...

现在在Activity中,便可以启动协程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    // class Activity continues
    fun doSomething() {
        // launch ten coroutines for a demo, each working for a different time
        repeat(10) { i ->
            launch {
                delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
                println("Coroutine $i is done")
            }
        }
    }

调用构建的协程作用域

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Activity : CoroutineScope by CoroutineScope(Dispatchers.Default) {

    fun destroy() {
        cancel() // Extension on CoroutineScope
    }
    // to be continued ...

    // class Activity continues
    fun doSomething() {
        // launch ten coroutines for a demo, each working for a different time
        repeat(10) { i ->
            launch {
                delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
                println("Coroutine $i is done")
            }
        }
    }
} // class Activity ends

fun main() = runBlocking<Unit> {
    //sampleStart
    val activity = Activity()
    activity.doSomething() // run test function
    println("Launched coroutines")
    delay(500L) // delay for half a second
    println("Destroying activity!")
    activity.destroy() // cancels all coroutines
    delay(1000) // visually confirm that they don't work
    //sampleEnd    
}

输出结果

1
2
3
4
Launched coroutines
Coroutine 0 is done
Coroutine 1 is done
Destroying activity!

只有前两个协程执行,其他协程被destroy取消

线程局部数据

可以通过ThreadLocal实现局部变量的传递,通过threadLocal.asContextElement(value="launch")来设置局部数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
val threadLocal = ThreadLocal<String?>() // declare thread-local variable

fun main() = runBlocking<Unit> {
    //sampleStart
    threadLocal.set("main")
    println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
        println("Launch start, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
        yield()
        println("After yield, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    }
    job.join()
    println("Post-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    //sampleEnd    
}

输出结果为

1
2
3
4
Pre-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'
Launch start, current thread: Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main], thread local value: 'launch'
After yield, current thread: Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main], thread local value: 'launch'
Post-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'