Overview of Content
本文介紹 Kotlin 協程中的特點類:Channel
、Select
與 Actor
。
首先,我們探討 Channel 的基礎使用,深入了解生產者 / 消費者模式、中斷任務、以及 Channel 特點中的緩衝機制。接著,深入研究 ReceiveChannel 的 Pipe 機制,介紹如何使用它來傳遞結果。
在第二部分,我們研究 Kotlin 協程中的另一強大特性:Select 表達。透過 Select,我們能夠以更靈活的方式處理多個耗時任務的操作,並返回最快速的結果。
最後,我們介紹 Actor 表達,這是一種高級抽象,使得協程間的異步通信更加方便。通過閱讀本文,您將深入理解 Kotlin 協程中這些強大的異步通信機制,並學會如何在實際應用中運用它們。
Kotlin Channel 介紹
Kotlin 的 Channel 機制 生產者 & 消費者 機制 ,這與 Java 中的 BlockingQueue 類似
機制 | 入隊函數 | 出隊函數 | 等待方式 | 補充 |
---|---|---|---|---|
Kotlin Channel | send | receive | 掛起(執行序讓出 CPU, 協程則繼續給其他地方使用) | 可主動關閉 |
Java BlockingQueue | put | take | 堵塞(執行序讓出 CPU) | - |
● 掛起 & 堵塞 差異 ?
其實兩者差異不大,都是讓出 CPU 的時間片,但是在 API 的設計上確有不同的語意:
- 堵塞是 讓出 CPU 時間,並在滿足條件後「被動喚醒」,如果條件不滿足就一直堵塞;如同
Object#wait()
方法
- 掛起是 讓出 CPU 時間,並在一定時間後「主動喚醒」去檢查條件是否滿足;如同
Thread#sleep()
方法
生產者 / 消費者模式:Channel 基礎使用
● 以下做一個經典的生產者消費者(Consumer
/Supplier
)模式,這個模式下就是分為兩個腳色,一個負責生產資料,一個負責消耗資料;
● 在這裡我們透過 Channel 類的基本使用來實現生產者/消費者;範例如下
fun main() {
runBlocking {
println("Thread: ${Thread.currentThread().name}")
testChannel(this)
}
println("Main finish")
}
suspend fun testChannel(scope : CoroutineScope) {
val channel = Channel<Int>()
scope.launch(Dispatchers.Default) {
repeat(5) {
delay(100) // 生產者休眠 100ms
println("Channel sender, data: $it, time: ${Date().time % 10_000}")
// supplier
// send 是一個 suspend 函數
channel.send(it)
}
}
repeat(5) {
delay(150) // 消費者休眠 150ms
// consumer
// receive 是一個 suspend 函數
println("Channel receive: ${channel.receive()}, time: ${Date().time % 10_000}")
}
}
以下範例中可以體現出
BlockingQueue
的堵塞功能 (讓消費者、生產者休眠不同的時間)
生產者 / 消費者模式:Channel 中斷任務
● 這裡我們仍實做一個生產者/消費者模式,不過這裡我們在執行到一半時中斷 Channel,突顯出 Channel 可中斷任務的特點;範例如下…
fun main() {
val channel = Channel<Int>()
runBlocking {
println("Thread: ${Thread.currentThread().name}")
this.launch(Dispatchers.Default) {
repeat(5) {
delay(100)
println("Channel sender, data: $it, time: ${Date().time % 10_000}")
// supplier
// send 是一個 suspend 函數
channel.send(it)
}
}
this.launch(Dispatchers.Default) {
repeat(5) {
if (it == 2) {
// 中斷 Channel
channel.cancel()
}
delay(150)
// consumer
// receive 是一個 suspend 函數
println("Channel receive: ${channel.receive()}, time: ${ java.util.Date().time % 10_000}}")
}
}
delay(1_000)
}
println("Main finish")
}
Channel 特點:緩衝
● Channel 類的另一特點在於 可以設定緩衝區大小;在這裡我們可以做一個簡單的例子,這個緩衝區相當於座位的概念,當座位滿時程可自然只能等待座位空出後才可以進入,否則就只能站著
fun main() : Unit = runBlocking {
val channel = Channel<Int>(capacity = 2)
launch(coroutineContext) {
repeat(4) {
delay(50)
channel.send(it)
println("Channel sending: $it")
}
}
launch(coroutineContext) {
repeat(4) {
delay(500)
println("--- Channel receive: ${channel.receive()}")
}
}
delay(3000)
}
以下範例中,當緩衝區大小滿的時候
send
函數就會被掛起
可以看到 send
函數在 Buffer 滿了之後就無法再放入,導致 send
函數掛起等待空間空閑次才能再放入
ReceiveChannel Pipe 機制:傳遞結果
● Pipe 是 Linux 中的進程通訊機制之一,可以將上一個處理的結果傳遞給下一個使用;而 Coroutine
也有設計相同的效果
Pipe 機制也可以通過 OOP 設計中的 責任鏈模式 達成
● Coroutine 使用 produce
來產生一個 ReceiveChannel
物件,這個物件可以用來接收 SendChannel
類透過 send
發出的運算結果
接下來的範例中,我們來將計算的步驟透過 ReceiveChannel
串接,計算的步驟串接概念如下圖
val coroutine = CoroutineScope(Job())
// 將結果透過 produce 傳出
fun produce1() = coroutine.produce(Dispatchers.Default) {
// 上下文為 ProducerScope,而 ProducerScope 就繼承自 SendChannel
repeat(3) {
// 傳輸結果
send(it)
}
}
// 再開啟一個 ReceiveChannel 物件,這個 ReceiveChannel 是負責做「數的平方」
fun produce2(rec: ReceiveChannel<Int>) = coroutine.produce(Dispatchers.Default) {
// 透過 ReceiveChannel 接收結果
rec.toList().forEach {
// 傳輸結果
send(it * it)
}
}
// 再開啟一個 ReceiveChannel 物件,這個 ReceiveChannel 是負責做「數 + 1」
fun produce3(rec: ReceiveChannel<Int>) = coroutine.produce(Dispatchers.Default) {
for (x in rec) {
// 傳輸結果
send(x + 1)
}
}
fun main() = runBlocking {
val num = produce1()
val squ = produce2(num)
val add = produce3(squ)
// 從最後一個 ReceiveChannel 消耗結果
add.consumeEach {
println("add it: $it") // 消耗結果
}
num.cancel()
squ.cancel()
add.cancel()
delay(1000)
}
上面範例運行的過程如下(請由上至下去看)
produce\number | 0 | 1 | 2 | 3 | 4 |
---|---|---|---|---|---|
produce1 (傳送數) | 0 | 1 | 2 | 3 | 4 |
produce2 (平方) | 0 | 1 | 4 | 9 | 16 |
produce3 (加一) | 1 | 2 | 5 | 10 | 17 |
被消費(consume)過後的元素就不會往下傳遞
Kotlin Select 表達
select 可以用來等待多個 suspend function
可以用來併發多個 suspend
函數,並可取得其中一個先回覆的任務作為結果
Select 使用範例
● 接下來的範例中,我們創建兩個 ReceiveChannel
,再透過 Select 來選中第一個返回的結果作為函數的結果!
val coroutine = CoroutineScope(Job())
fun produce1() = coroutine.produce<String>(Dispatchers.Default) {
while (true) {
delay(400)
send("Kyle")
}
}
fun produce2() = coroutine.produce<String>(Dispatchers.Default) {
while (true) {
delay(200)
send("Pan")
}
}
suspend fun selectSample(channel1: ReceiveChannel<String>, channel2: ReceiveChannel<String>) : String {
// select 函數會堵塞
val sRes = select<String> {
channel1.onReceive {
"channel 1 -> $it"
}
channel2.onReceive {
"channel 2 -> $it"
}
}
println("Select finish")
return sRes
}
fun main() : Unit = runBlocking {
val produce1 = produce1()
val produce2 = produce2()
// 重複呼叫 4 次,看誰先處理完就會做為結果
repeat(5) {
println("------Final result: ${selectSample(produce1, produce2)}")
}
delay(3000)
produce1.cancel()
produce2.cancel()
}
Kotlin Actor 表達
actor 其本身就是一個協程,它的內部包含 channel 成員,所有我們可以透過對 actor 發送訊息,讓它傳遞訊息,也可以用來作為不同的協程通訊
Actor 使用範例
A. actor 返回的類型是 SendChannel<E>
,主要可以用它來發送資料,而它並不像 channel 一樣有 receive
函數
B. actor 的最後一個 Lambda 參數的上下文為 ActorScope,而它的內部有 channel
成員
fun main(): Unit = runBlocking {
val actor = actor<Int>(coroutineContext) { // 上下文為 ActorScope
var sum = 0
// channel 是 ActorScope 中的成員
for (i in channel) {
sum += i
println("actor sum: $sum")
}
}
repeat(5) {
actor.send(it)
}
actor.close()
}
更多的 Kotlin 語言相關文章
在這裡,我們提供了一系列豐富且深入的 Kotlin 語言相關文章,涵蓋了從基礎到進階的各個方面。讓我們一起來探索這些精彩內容!
Kotlin 語言基礎
● Kotlin 語言基礎:想要建立堅實的 Kotlin 基礎?以下這些文章將帶你深入探索 Kotlin 的關鍵基礎和概念,幫你打造更堅固的 Kotlin 語言基礎
Kotlin 特性、特點
● Kotlin 特性、特點:探索 Kotlin 的獨特特性和功能,加深對 Kotlin 語言的理解,並增強對於語言特性的應用
Kotlin 進階:協程、響應式、異步
● Kotlin 進階:協程、響應式、異步:若想深入學習 Kotlin 的進階主題,包括協程應用、Channel 使用、以及 Flow 的探索,請查看以下文章