應用 Kotlin 協程:對比 Thread、創建協程、任務掛起 | Dispatcher、CoroutineContext、CoroutineScope

應用 Kotlin 協程:對比 Thread、創建協程、任務掛起 | Dispatcher、CoroutineContext、CoroutineScope

Overview of Content

本文深入探討 Kotlin 協程的多個應用方面,旨在提供讀者對協程應用機制的全面理解。

首先,我們對比了 Kotlin 協程與傳統 Thread 機制,強調協程的關鍵類和概念。

接著,我們探討了協程的創建方式,包括使用 launch 函數啟動新的 CoroutineScopeasync 函數實現可延遲啟動的協程以及延遲啟動協程的方法。

第三部份,我們深入研究了協程的掛起函數,包括 delay 函數實現狀態暫停、yield 讓出執行資源、以及使用 withContextcoroutineScope 進行協程切換。

第四部份,我們探討了 Coroutine Dispatcher 的使用,包括切換協程調度器和 Unconfined 特性。

第五部分,我們深入研究了 CoroutineContext,包括父子協程關係和 MultiCoroutineContext 的應用。

最後,我們深入了解了 CoroutineScope 協程作用域,包括處理協程異常、異常傳遞、捕捉異常以及使用 supervisorScopeCoroutineExceptionHandler 等機制。通過閱讀本文,您將深入瞭解 Kotlin 協程的核心概念,並學會在實際應用中靈活運用。


Coroutine 對比 Thread 機制

協程 Coroutine 是輕量級的執行序,它並不會綁定特定執行序(有可能掛起前在 A Thread, 恢復時轉為 B Thread 上下文)

Coroutine 與一般 Thread 比起來:

A. 普通 Thread 是透過 CPU 發出中斷信號時做上下文切換,而 Coroutine 則是靠程式自身實現(通常會有 Library 可以用),不需要 CPU 控制(跟硬體沒關係)

B. 普通 Thread 使用「堵塞」機制,Coroutine 使用「掛起」機制

機制行為特色CPU 使用概念
堵塞讓出 CPU 時間,並在滿足條件後「被動喚醒」,如果條件不滿足就一直堵塞;Thread#wait() 概念釋放 CPU
掛起讓出 CPU 時間,並在一定時間後「主動喚醒」去檢查條件是否滿足仍佔用 CPU

Kotlin Coroutine 重點類、重點概念

Kotlin Coroutine 重點抽象類

Coroutine 相關類說明補充
CoroutineDispather協程調度器;決定 Coroutine 內的任務要在哪個 Thread 中運行!-
CoroutineContext協程上下文;所有 Corotine 都要在 CoroutineContext 範圍內包含一個默認的 CoroutineDispather
CoroutineScope協程作用域;CoroutineScope 會在具有生命週期上的實體實現(GlobalScope 則是 top-level 是整個程式的生命週期)它包含了一個 CoroutineContext
graph TB; subgraph CoroutineScope subgraph CoroutineContext CoroutineDispather end end

Kotlin Coroutine 任務管理類

Coroutine 任務管理類說明補充
Job任務執行的過程被封裝成 Job,之後這個 Job 會交由 CoroutineDispather 執行Job 具有簡單的生命週期,可被執行 & 取消 …,其中也有父子類關係,父 Job 可以管控子 Job
DeferredJob 的拓展類;可以讓 Job 有返回值Deferred 拓展 Job

Kotlin Coroutine 概念 & 關鍵字

Coroutine 概念 & 關鍵字說明補充
suspend (關鍵字)表明該函數被執行時會被掛起!(掛起,但不是放 CPU)suspend 關鍵字只能被使用在 CoroutineContext
suspend point (概念)每個掛起的函數都稱為 suspend pointIDE 中標示圖案 
Continuation(概念)兩個 suspend point 之間都稱為 Continuation兩個 suspend point 之間的程式是運行在外部的 CoroutineScope
graph TB; subgraph 最外層 CoroutineScope subgraph suspend point 1 執行掛起行為_1 end Continuation subgraph suspend point 2 執行掛起行為_2 end end

Kotlin Coroutine 創建

Coroutine 的創建有多種方式,以下我們就來介紹解個 Coroutine 創建的方式

launch 函數:啟動新 CoroutineScope

launch 創建 Coroutine:它創建一個 CoroutineScope 當作上下文,將其賦予最後一個 Lambda 參數,再 返回 Job 對象(可透過 Job 來管理協程任務)


fun main() {
    // 返回 Job
    val job : Job = GlobalScope.launch {    // 上下文為 CoroutineScope
        println("Global with Job - start")

        delay(1000)

        println("Global with Job - end")
    }

    Thread.sleep(500)

    job.cancel()        // 直接取消協程

    println("Main finish")
}

async 函數:創建可延遲啟動的協程

async 創建 Coroutine:它同樣會創建 CoroutineScope 賦予最後一個 Lambda 參數作為上下文,再 返回 Deferred 對象

而它與 launch 函數的不同點在於,launch 在呼叫時就會啟動協程,而 async 可以在需要的時候再啟動協程


suspend fun main() {    // await 需配合 suspend 使用
    // 返回 Job
    val deferred = GlobalScope.async {    // 上下文為 CoroutineScope
        println("Global with Job - start")

        delay(1000)

        println("Global with Job - end")
    }
    
    // 故意延遲 500ms 
    delay(500)

    val startStamp = System.currentTimeMillis()
    deferred.await()        // 等待協程 (需要 suspend)
//    deferred.cancel()     // 可取消

    println("Main finish: use time: ${System.currentTimeMillis() - startStamp}")
}

runBlocking 函數:runBlocking 會堵塞當前 Thread 直到任務結束,較多使用在測試上

延遲啟動 Coroutine

Coroutine 預設在創建後會自動啟動任務;這其中可以透過 start 參數(launch, async 都可以)來控制 Coroutine 的啟動方式

以下為 launch 源碼


public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    // 預設使用 CoroutineStart.DEFAULT
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

● 我們可以透過設定 start 參數為 CoroutineStart.LAZY 可以達到 懶加載功能(它就不會自動啟動),之後只有在呼叫 startjoinawait 後才會開始任務


fun defaultStart() {
    val startTime = System.currentTimeMillis()

    val job = GlobalScope.launch(start = CoroutineStart.LAZY) {
        println("defaultStart - launch - start: ${System.currentTimeMillis() - startTime}")

        delay(1000)

        println("defaultStart - launch - finish: ${System.currentTimeMillis() - startTime}")
    }

    Thread.sleep(1000)

    println("Call start: ${System.currentTimeMillis() - startTime}")

    job.start()
}

fun main() {
    defaultStart()

    Thread.sleep(5000)
}


Coroutine 掛起函數

Kotlin 中使用 suspend 關鍵字來標記一個掛起函數,在該函數執行完之前,不會執行下一行程式 (類似一種「同步」操作)

Kotlin 的掛起函數是採用 「CPS, Continuation Passing Style」 與「狀態機」實現,保證會執行完掛起函數後,再往下執行

delay 函數:狀態暫停

delay 函數會將當前 CoroutineScope 中的 狀態進行暫停,它類似於 Thread#sleep 方法,但 delay 它其實暫停的是協程不是執行序(仍尚未放開 CPU 的使用權!)

以下我們透過 launch 函數來啟動一個新的 CoroutineScope,來觀察 delay 到底會不會暫停 Thread(同時我們也可以觀察到 launch 會不創建一個新的執行序)


fun main() = runBlocking {
    
    fun printThreadInfo(order: Int) =
        println("$order. current thread: ${Thread.currentThread().name}")

    printThreadInfo(1)

    GlobalScope.launch {
        // 內外不同 Thread!  所以不會被暫停
        printThreadInfo(3)

        delay(1000)

        printThreadInfo(4)
    }

    printThreadInfo(2)

    Thread.sleep(2000)

    printThreadInfo(5)
}

從下圖結果中,我們可以看到 launch 創建了一個新執行序,並且 delay 函數並不會影響到外部執行序,而是影響到 Coroutine 創建的執行序(請觀察執行順序)

yield 讓出執行資源

● 在 Java 中 Thread#yield 方法代表的意思是讓當前 Thread 讓出資源,並給其他 Thread 進行搶奪這個資源的使用權 (有可能其他 Thread 或自身搶到)

Thread#yied 通常使用的較少,因為它的可控性較低

● Koltin 中 yield 函數也是差不多的意思,不過差異點在於:協程讓出資源後會將當前協程分發到 CoroutineDispatcher 的對列中做等待,等其他協程執行完才會執行自身協程(也就是讓出,並向後排隊的概念)

以下開啟兩個協程來互讓 (yield) 資源,觀察它們讓出後排序的行為,是否會跟 Thread#yield 一樣隨機亂序


fun main() {
    runBlocking {

        fun printThreadInfo(order: Int) =
            println("$order. current thread: ${Thread.currentThread().name}")

        val job1 = launch {
            printThreadInfo(1)
            yield()
            printThreadInfo(3)
            yield()
            printThreadInfo(5)
        }

        val job2 = launch {
            printThreadInfo(2)
            yield()
            printThreadInfo(4)
            yield()
            printThreadInfo(6)
        }

        printThreadInfo(0)

        job1.join()
        job2.join()
    }
}

從下圖解果中,我們可以看出協程的 yield 更加的可控

● CoroutineScope 的重要性、特性:CoroutineScope 會有關到協程的執行影響範圍;我們將上面的例子稍加修改,用不同的 CoroutineScope 去呼叫 yield,我們可以觀察到 yield 就不會等待 (因為上下文已經不同)


fun main() {
    runBlocking {

        fun printThreadInfo(order: Int) =
            println("$order. current thread: ${Thread.currentThread().name}")

        val job1 = GlobalScope.launch {
            // 啟動新 CoroutineScope
            printThreadInfo(1)
            yield()
            printThreadInfo(3)
            yield()
            printThreadInfo(5)
        }

        val job2 = GlobalScope.launch {
            // 啟動新 CoroutineScope
            printThreadInfo(2)
            yield()
            printThreadInfo(4)
            yield()
            printThreadInfo(6)
        }

        printThreadInfo(0)

        job1.join()
        job2.join()
    }
}

runBlocking 內的 launch 會繼承上層的 CoroutineScope,所以輸出的就是 Thread 就是 MainThread (因為內部的協程在同一個 CoroutineContext 中)

withContext:等待指定的協程、NonCancelable

協程中的 withContext 用於切換該協程要在哪個 ContextDispatcher 中運行,並且它也是一個掛起操作 (它也可以返回一個值,如同 runBlocking)


fun main() : Unit = runBlocking {

    launch {
        println("In launch scope: ${Thread.currentThread().name}")

        withContext(Dispatchers.Default) {      // 切換 Dispatcher
            println("withContext(Default): ${Thread.currentThread().name}")
        }

        println("---")

        withContext(Dispatchers.Unconfined) {
            println("withContext(Unconfined): ${Thread.currentThread().name}")
        }
    }

    println("Outer scope: ${Thread.currentThread().name}")

}

不同的 Dispatcher 差異

Dispatcher 種類啟動協程的 Thread執行協程時使用的 Thread
Default被調用的 Thread協程的公用執行序池
IO被調用的 Thread協程的 IO 密集操作執行序池
Unconfined被調用的 Thread默認運行在當前協程的執行序,但如果碰到第一個暫停點(suspend point)後,它會運行在 任意執行序中

withContext 預設是可以取消的,但我們也可以配合 使用 NonCancelable (它是一個 CoroutineContext) 來讓該協程不可被取消


fun main() : Unit = runBlocking {

    val job = launch {
        println("In launch scope: ${Thread.currentThread().name}")

        // 加入 NonCancelable
        withContext(Dispatchers.Default + NonCancellable) {      // 切換 Dispatcher
            delay(200)
            println("withContext(Default): ${Thread.currentThread().name}")
        }
    }

    delay(100)
    job.cancelAndJoin()     // 將 cancel 立刻加入任務

    println("Outer scope: ${Thread.currentThread().name}")

}

可以看到任務在 100ms 後被取消,但是實際上還是有輸出

coroutineScope 關鍵字

coroutineScope 同樣採用父協程的 CoroutineContext,並且它與 withContext 不同,它無法設定其他的 CoroutineDispatcher


fun main() {

    fun printThreadInfo() =
        println("current thread: ${Thread.currentThread().name}")

    GlobalScope.launch {
        val res1 = withContext(Dispatchers.Default) {
            printThreadInfo()

            delay(100)

            10
        }

        val res2 = coroutineScope {
            printThreadInfo()

            delay(100)

            20
        }

        println("res: ${res1 + res2}")

    }

    Thread.sleep(1_000)

}


Coroutine Dispatcher

調度器代表的是該協程任務會在哪個執行序中運行

切換 Coroutine 調度器

● 以下創建多個不同的協程,並設定不同調度器,在觀察其運行時是使用哪個執行序執行


fun main() : Unit = runBlocking {

    fun threadInfo(msg: String) = println("$msg, working in ${Thread.currentThread().name}")

    // 使用父協程設定
    launch {
        threadInfo("`Default`")
    }

    launch(coroutineContext) {
        threadInfo("`Parent Context`")
    }

    // 使用預設 Dispatcher
    launch(Dispatchers.Unconfined) {
        threadInfo("`Unconfined`")
    }

    launch(Dispatchers.IO) {
        threadInfo("`IO`")
    }

    launch(Dispatchers.Default) {
        threadInfo("`Default`")
    }

    // 自己創建 Thread (也可以使用 newFixedThreadPoolContext)
    launch(newSingleThreadContext("Custom_Thread")) {
        threadInfo("`Custom Thread`")
    }

}

Unconfined 特性:碰到暫停我就換

● Coroutine 預設有提供我們以下幾種 Dispatcher

Dispatcher 種類啟動協程的 Thread執行協程時使用的 Thread
Default被調用的 Thread協程的公用執行序池
IO被調用的 Thread協程的 IO 密集操作執行序池
Unconfined被調用的 Thread默認運行在當前協程的執行序,但如果碰到第一個暫停點(suspend point)後,它會運行在 任意執行序中

fun main() : Unit = runBlocking {

    fun threadInfo() = println("Working in ${Thread.currentThread().name}")

    launch (Dispatchers.Unconfined) {
        // 預設與啟動 Dispatcher 的相同
        threadInfo()

        // 直到遇到第一個 suspend 函數
        delay(300)

        // 切換任意 Thread 運行
        threadInfo()
    }

}

從結果中我們可以看到,尚未碰到暫停點時,它就運行在呼叫者的執行序,當碰到暫停點後,就跑去其他執行序運作!


CoroutineContext

父子協程 - 關係

● 可以透過 CoroutineContext 來「區分協程之間的關係」,只有在相同 CoroutineContext 中的協程才會有父子關係… 範例如下


fun main() {
    val parentContext = GlobalScope.launch {

        // 使用外層的 CoroutineContext
        val childContext = launch(coroutineContext) {
            println("My context same as parent - start.")

            delay(1000)

            println("My context same as parent - end.")     // 沒有輸出
        }

        // 創建一個新的 CoroutineContext
        val diffContext = GlobalScope.launch {
            println("I have own context - start.")

            delay(1000)

            println("I have own context - end.")            // 正常輸出
        }

        childContext.join()
        diffContext.join()
    }

    Thread.sleep(500)
    parentContext.cancel()
    Thread.sleep(1500)
}

從下圖結果中我們可以看出

● 相同 CoroutineContext 之下的程式,會等待同一個 CoroutineContext 下的任務都執行完成 (parentContext 會等待 childContext

● 不同的 CoroutineContext 則不會等待(parentContext 不會等待 diffContext

MultiCoroutineContext:讓 Coroutine 產生關聯

● 協程支援 MultiCoroutineContext,可以透過 + 符號來添加 CoroutineContext (有複寫這個 Operator);透過 MultiCoroutineContext 可以讓無關係的 CoroutineContext 產生關係

A. 首先我們先看兩個完全不同的 CoroutineContext 執行的結果,看看兩者是否會有連結關係;

如果兩個 Coroutine 有連結關係的話,會等待內部 Coroutine 否則則不會等待


fun main() {

    // 外部 Coroutine
    val parentContext = GlobalScope.launch {

        // 內部 Coroutine
        val diffContext = GlobalScope.launch(coroutineContext) {
            println("I have own context - start.")

            delay(1000)

            println("I have own context - end.")            // 不輸出
        }

        diffContext.join()
    }

    Thread.sleep(500)
    parentContext.cancel()
    Thread.sleep(1500)
}

可以觀察到無關係的 diffContext 生命週期與 parentContext 同步 !

reference link

B. 再來,我們透過 MultiCoroutineContext 的方式,將多個 Coroutine 產生生關聯

以下,透過創建一個 Job 來管理多個不同的 CoroutineContext (同樣透過 + 號添加)


fun main() {
    val mrgJob = Job()

    GlobalScope.launch(Dispatchers.Default + mrgJob) {
        delay(100)
        println("Dispatchers.Default + mrgJob")
    }

    GlobalScope.launch(Dispatchers.IO + mrgJob) {
        delay(200)
        println("Dispatchers.IO + mrgJob")
    }

    GlobalScope.launch(Dispatchers.Unconfined + mrgJob) {
        delay(300)
        println("Dispatchers.Unconfined + mrgJob")              // 不會執行到
    }

    Thread.sleep(200)
    // 中途關閉 Coroutine
    mrgJob.cancel()
    Thread.sleep(500)
}

只要管理的 Job 被關閉後,所有相關的任務都會被關閉


CoroutineScope 協程作用域

上面使用到的 launchasync 都是 CoroutineScope 的拓展函數;而 GlobalScope 則是 CoroutineScope 的實現類

GlobalScope 是 top-level 函數,所以沒有綁定 Job 對象,其生命週期跟整個應用程式存亡

CoroutineScope 發生異常

● 首先我們先啟動一個 Coroutine,並在內部拋出異常,並且外部用 try/catch 包裹,來觀察是否可以抓取到異常


// 以下程式會拋出異常

fun failToCatch() {

    try {
        GlobalScope.launch {
            delay(100)

            throw Exception("inner coroutine throw exception")
        }
    } catch (e : Exception) {
        println("Catch Exception Success")
    }

}

fun main() = runBlocking {

    failToCatch()

    delay(1000)

    println("Main finish")

}

下圖中,我們可以看到在 Coroutine 中拋出異常,可以發現 外部的 try/catch 是捕捉不到

異常傳遞:父攜程接收

● 這裡有個要注意的點,就是「協程的異常是會傳遞的」,子協程發生異常會向父協程拋出,子協程自身並捕捉不到


fun failToCatchChild() {

    val handler = CoroutineExceptionHandler {
            context, throwable -> println("get exception: $throwable")
    }

    GlobalScope.launch {
        delay(100)

        launch(handler) {    // 自身捕捉不到異常
            throw Exception("inner coroutine throw exception of child")
        }

    }

}

fun main() = runBlocking {

    failToCatchChild()

    delay(1000)

    println("Main finish")

}

捕捉 CoroutineScope 異常:CoroutineExceptionHandler

● 要捕捉協程中的異常可以使用 CoroutineExceptionHandler,將它設定成 CoroutineContext,就可以捕捉協程異常


fun successToCatch() {

    val handler = CoroutineExceptionHandler {
            context, throwable -> println("get exception: $throwable")
    }

    GlobalScope.launch(handler) {
        delay(100)

        throw Exception("inner coroutine throw exception")
    }

}

fun main() = runBlocking {

    successToCatch()

    delay(1000)

    println("Main finish")

}

supervisorScope 自己捕捉異常

● coroutineScope 它不能捕捉到子協程的異常


fun catchSupervisorChild1() {
    val handler = CoroutineExceptionHandler {
            context, throwable -> println("get exception: $throwable")
    }

    GlobalScope.launch/*(handler)*/ {    // 放在這裡可以捕捉到
        delay(100)

        coroutineScope {    
            launch(handler) {            // 無法捕捉
                throw Exception("inner coroutine throw exception of supervisorScope's child")
            }
        }

    }

}

supervisorScope 比較特別,它會將異常「交由子協程自己處理」,所以這時候子協程才能捕捉到異常


fun catchSupervisorChild2() {
    val handler = CoroutineExceptionHandler {
            context, throwable -> println("get exception: $throwable")
    }

    GlobalScope.launch {
        delay(100)

        supervisorScope {       // supervisorScope 子協程需要自己處理異常
            launch(handler) {
                throw Exception("inner coroutine throw exception of supervisorScope's child")
            }
        }

    }

}

CoroutineExceptionHandler 無法捕捉 async / withContext

● 如果協程使用 async / withContext,而在其中拋出錯誤,CoroutineExceptionHandler 是無法捕捉到的,必須自己用 Try/Catch 包裹


fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, e ->
        if(e is Exception) {
            println("Throwable is exception: $e")
        }
        println("Get exception: ${e.message}")
    }

    val superJob = CoroutineScope(SupervisorJob() + handler)    // coroutine 中的任務 “可以” 繼續下去

    superJob.launch {
        delay(100)
        println("Job - 1")
    }
    superJob.launch {
        delay(200)
        println("Job - 2")
    }

//    superJob.async {              // CoroutineExceptionHandler 無法捕捉 async 拋出的錯誤
//        delay(250)
//        println("Job - 2.5")
//        throw Exception("Throw it from job 2.5")
//    }.await()

    withContext(superJob.coroutineContext) {// CoroutineExceptionHandler 無法捕捉 withContext 拋出的錯誤
        delay(270)
        println("Job - 2.7")
        throw Exception("Throw it from job 2.7")
    }

    // ------------------------------------ 這下面都收不到
    superJob.launch {
        delay(300)
        println("Job - 3")
    }

    println("Main finish")
}

必須手動捕捉其錯誤! 雖然可以捕捉,但 發生錯誤之後該協程就不會再繼續後面的任務了


try {
    withContext(superJob.coroutineContext) {// CoroutineExceptionHandler 無法捕捉 withContext 拋出的錯誤
        delay(270)
        println("Job - 2.7")
        throw Exception("Throw it from job 2.7")
    }
} catch (e :Exception) {
    println("withContext occur exception. $e")
}


更多的 Kotlin 語言相關文章

在這裡,我們提供了一系列豐富且深入的 Kotlin 語言相關文章,涵蓋了從基礎到進階的各個方面。讓我們一起來探索這些精彩內容!

Kotlin 特性、特點

Kotlin 特性、特點:探索 Kotlin 的獨特特性和功能,加深對 Kotlin 語言的理解,並增強對於語言特性的應用

Kotlin 進階:協程、響應式、異步

Kotlin 進階:協程、響應式、異步:若想深入學習 Kotlin 的進階主題,包括協程應用、Channel 使用、以及 Flow 的探索,請查看以下文章

Leave a Comment

Comments

No comments yet. Why don’t you start the discussion?

發表迴響