Kotlin Channel 使用介紹 | Select、Actor | 生產者消費者

Kotlin Channel 使用介紹 | Select、Actor | 生產者消費者

Overview of Content

本文介紹 Kotlin 協程中的特點類:ChannelSelectActor

首先,我們探討 Channel 的基礎使用,深入了解生產者 / 消費者模式、中斷任務、以及 Channel 特點中的緩衝機制。接著,深入研究 ReceiveChannel 的 Pipe 機制,介紹如何使用它來傳遞結果。

在第二部分,我們研究 Kotlin 協程中的另一強大特性:Select 表達。透過 Select,我們能夠以更靈活的方式處理多個耗時任務的操作,並返回最快速的結果。

最後,我們介紹 Actor 表達,這是一種高級抽象,使得協程間的異步通信更加方便。通過閱讀本文,您將深入理解 Kotlin 協程中這些強大的異步通信機制,並學會如何在實際應用中運用它們。


Kotlin Channel 介紹

Kotlin 的 Channel 機制 生產者 & 消費者 機制 ,這與 Java 中的 BlockingQueue 類似

機制入隊函數出隊函數等待方式補充
Kotlin Channelsendreceive掛起(執行序讓出 CPU, 協程則繼續給其他地方使用)可主動關閉
Java BlockingQueueputtake堵塞(執行序讓出 CPU)-

掛起 & 堵塞 差異 ?

其實兩者差異不大,都是讓出 CPU 的時間片,但是在 API 的設計上確有不同的語意:

  • 堵塞是 讓出 CPU 時間,並在滿足條件後「被動喚醒」,如果條件不滿足就一直堵塞;如同 Object#wait() 方法

  • 掛起是 讓出 CPU 時間,並在一定時間後「主動喚醒」去檢查條件是否滿足;如同 Thread#sleep() 方法


生產者 / 消費者模式:Channel 基礎使用

● 以下做一個經典的生產者消費者(Consumer/Supplier)模式,這個模式下就是分為兩個腳色,一個負責生產資料,一個負責消耗資料;

● 在這裡我們透過 Channel 類的基本使用來實現生產者/消費者;範例如下


fun main() {
    runBlocking {
        println("Thread: ${Thread.currentThread().name}")

        testChannel(this)
    }

    println("Main finish")
}

suspend fun testChannel(scope : CoroutineScope) {
    val channel = Channel<Int>()

    scope.launch(Dispatchers.Default) {
        repeat(5) {
            delay(100)        // 生產者休眠 100ms
            println("Channel sender, data: $it, time: ${Date().time % 10_000}")

            // supplier
            // send 是一個 suspend 函數
            channel.send(it)
        }
    }

    repeat(5) {
        delay(150)    // 消費者休眠 150ms

        // consumer
        // receive 是一個 suspend 函數
        println("Channel receive: ${channel.receive()}, time: ${Date().time % 10_000}")
    }
}

以下範例中可以體現出 BlockingQueue 的堵塞功能 (讓消費者、生產者休眠不同的時間)

生產者 / 消費者模式:Channel 中斷任務

● 這裡我們仍實做一個生產者/消費者模式,不過這裡我們在執行到一半時中斷 Channel,突顯出 Channel 可中斷任務的特點;範例如下…


fun main() {
    val channel = Channel<Int>()

    runBlocking {
        println("Thread: ${Thread.currentThread().name}")

        this.launch(Dispatchers.Default) {
            repeat(5) {
                delay(100)
                println("Channel sender, data: $it, time: ${Date().time % 10_000}")

                // supplier
                // send 是一個 suspend 函數
                channel.send(it)
            }
        }

        this.launch(Dispatchers.Default) {
            repeat(5) {

                if (it == 2) {
                    // 中斷 Channel
                    channel.cancel()
                }
                delay(150)

                // consumer
                // receive 是一個 suspend 函數
                println("Channel receive: ${channel.receive()}, time: ${ java.util.Date().time % 10_000}}")
            }
        }

        delay(1_000)
    }

    println("Main finish")
}

Channel 特點:緩衝

● Channel 類的另一特點在於 可以設定緩衝區大小;在這裡我們可以做一個簡單的例子,這個緩衝區相當於座位的概念,當座位滿時程可自然只能等待座位空出後才可以進入,否則就只能站著


fun main() : Unit = runBlocking {

    val channel = Channel<Int>(capacity = 2)

    launch(coroutineContext) {
        repeat(4) {
            delay(50)
            channel.send(it)
            println("Channel sending: $it")
        }
    }

    launch(coroutineContext) {
        repeat(4) {
            delay(500)
            println("--- Channel receive: ${channel.receive()}")
        }
    }

    delay(3000)
}

以下範例中,當緩衝區大小滿的時候 send 函數就會被掛起

可以看到 send 函數在 Buffer 滿了之後就無法再放入,導致 send 函數掛起等待空間空閑次才能再放入

ReceiveChannel Pipe 機制:傳遞結果

Pipe 是 Linux 中的進程通訊機制之一,可以將上一個處理的結果傳遞給下一個使用;而 Coroutine 也有設計相同的效果

Pipe 機制也可以通過 OOP 設計中的 責任鏈模式 達成

Coroutine 使用 produce 來產生一個 ReceiveChannel 物件,這個物件可以用來接收 SendChannel 類透過 send 發出的運算結果

接下來的範例中,我們來將計算的步驟透過 ReceiveChannel 串接,計算的步驟串接概念如下圖

graph LR rc_1(發送要計算的數) rc_2(將數平方) rc_3(將數 + 1) rc_1 --> rc_2 --> rc_3

val coroutine = CoroutineScope(Job())

// 將結果透過 produce 傳出
fun produce1() = coroutine.produce(Dispatchers.Default) {
    // 上下文為 ProducerScope,而 ProducerScope 就繼承自 SendChannel
    repeat(3) {
        // 傳輸結果
        send(it)
    }
}

// 再開啟一個 ReceiveChannel 物件,這個 ReceiveChannel 是負責做「數的平方」
fun produce2(rec: ReceiveChannel<Int>) = coroutine.produce(Dispatchers.Default) {
    // 透過 ReceiveChannel 接收結果
    rec.toList().forEach {
        // 傳輸結果
        send(it * it)
    }
}

// 再開啟一個 ReceiveChannel 物件,這個 ReceiveChannel 是負責做「數 + 1」
fun produce3(rec: ReceiveChannel<Int>) = coroutine.produce(Dispatchers.Default) {
    for (x in rec) {
        // 傳輸結果
        send(x + 1)
    }
}

fun main() = runBlocking {
    val num = produce1()
    val squ = produce2(num)
    val add = produce3(squ)

    // 從最後一個 ReceiveChannel 消耗結果
    add.consumeEach {
        println("add it: $it")              // 消耗結果
    }

    num.cancel()
    squ.cancel()
    add.cancel()

    delay(1000)
}

上面範例運行的過程如下(請由上至下去看)

produce\number01234
produce1 (傳送數)01234
produce2 (平方)014916
produce3 (加一)1251017

被消費(consume)過後的元素就不會往下傳遞


Kotlin Select 表達

select 可以用來等待多個 suspend function

可以用來併發多個 suspend 函數,並可取得其中一個先回覆的任務作為結果

Select 使用範例

● 接下來的範例中,我們創建兩個 ReceiveChannel,再透過 Select 來選中第一個返回的結果作為函數的結果!


val coroutine = CoroutineScope(Job())

fun produce1() = coroutine.produce<String>(Dispatchers.Default) {
    while (true) {
        delay(400)
        send("Kyle")
    }
}

fun produce2() = coroutine.produce<String>(Dispatchers.Default) {
    while (true) {
        delay(200)
        send("Pan")
    }
}

suspend fun selectSample(channel1: ReceiveChannel<String>, channel2: ReceiveChannel<String>) : String {
    // select 函數會堵塞
    val sRes = select<String> {
        channel1.onReceive {
            "channel 1 -> $it"
        }

        channel2.onReceive {
            "channel 2 -> $it"
        }
    }

    println("Select finish")

    return sRes
}

fun main() : Unit = runBlocking {

    val produce1 = produce1()
    val produce2 = produce2()

    // 重複呼叫 4 次,看誰先處理完就會做為結果
    repeat(5) {
        println("------Final result: ${selectSample(produce1, produce2)}")
    }

    delay(3000)

    produce1.cancel()
    produce2.cancel()
}

Kotlin Actor 表達

actor 其本身就是一個協程它的內部包含 channel 成員,所有我們可以透過對 actor 發送訊息,讓它傳遞訊息,也可以用來作為不同的協程通訊

Actor 使用範例

A. actor 返回的類型是 SendChannel<E>,主要可以用它來發送資料,而它並不像 channel 一樣有 receive 函數

B. actor 的最後一個 Lambda 參數的上下文為 ActorScope,而它的內部有 channel 成員


fun main(): Unit = runBlocking {

    val actor = actor<Int>(coroutineContext) {     // 上下文為 ActorScope
        var sum = 0

        // channel 是 ActorScope 中的成員
        for (i in channel) {
            sum += i

            println("actor sum: $sum")
        }
    }

    repeat(5) {
        actor.send(it)
    }

    actor.close()
}


更多的 Kotlin 語言相關文章

在這裡,我們提供了一系列豐富且深入的 Kotlin 語言相關文章,涵蓋了從基礎到進階的各個方面。讓我們一起來探索這些精彩內容!

Kotlin 特性、特點

Kotlin 特性、特點:探索 Kotlin 的獨特特性和功能,加深對 Kotlin 語言的理解,並增強對於語言特性的應用

Kotlin 進階:協程、響應式、異步

Kotlin 進階:協程、響應式、異步:若想深入學習 Kotlin 的進階主題,包括協程應用、Channel 使用、以及 Flow 的探索,請查看以下文章

Leave a Comment

Comments

No comments yet. Why don’t you start the discussion?

發表迴響