探索 Kotlin Flow:基本使用、RxJava 對比、背壓機制 | Flow 細節

探索 Kotlin Flow:基本使用、RxJava 對比、背壓機制 | Flow 細節

Overview of Content

本文深入研究 Kotlin Flow,涵蓋了流的基本使用、與 RxJava 的對比、背壓機制以及其他豐富的操作

首先,我們介紹了 Flow 的基本使用,包括如何創建、特點、切換執行緒、取消和關閉流等

接著,我們深入比較了 Flow 與 RxJava 中的冷流和熱流概念;在背壓機制部分,我們討論了 Flow 實現 BUFFER 策略、LATEST 策略的具體方式

最後,我們探討了 Flow 的其他操作,包括轉換、限制取用、計算結果、合併操作符以及 Nest Flow 的應用,透過本文的閱讀,您將深入瞭解 Kotlin Flow 的豐富功能、細節,並學會在實際應用中基礎運用。

寫文章分享不易,如有引用參考請詳註出處,如有指導、意見歡迎留言(如果覺得寫得好也請給我一些支持),感謝 😀


Flow 基本使用、介紹

我們在 Java 中常用的響應式編程模型就是 RxJava;而 Flow 就是 Koltin 結合 Coroutine 響應式編程的產物

響應式編程:簡單來想就是依照上一步反應而觸發不同的行為編程;最常見的就是 API 請求,由 API 請求的結果來驅動 APP 內的邏輯,讓它們之間產生對應的反應(像是很多的監聽者)

Flow 使用起來類似於 RxJava,其函數的對應如無下

功能FlowRxJava
對流發送資料emit()onNext()
接收資料collect()subscribe()

Flow 是一個介面


// Flow 源碼

public interface Flow<out T> {
    public suspend fun collect(collector: FlowCollector<T>)
}

flow 基本使用

Flow 可以返回(或是說發射 emit)多個異步計算的結果並用 collect 接收結果;FlowBuilder 有幾種使用方式,常用的方式如下

A. 使用頂層函數 flow{ }:可以直接使用 flow{ } 創建 SafeFlow


// 源碼

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

flow{ } 的使用範例如下


// 範例

suspend fun sampleUse01() = coroutineScope {
    flow<Int> {
        for (i in 1..5) {
            val res = measureTimeMillis {
                delay(1000L * i)
                emit(i)
            }
            println("Use time: $res")
        }
    }.collect {
        println("res: $it")
    }
}

B. 使用頂層函數 flowOf{ }:其實內部就是透過 flow 並呼叫 emit 函數


// 源碼

public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
    for (element in elements) {
        emit(element)
    }
}

flowOf{ } 使用範例如下


// 範例

suspend fun sampleUse02() = coroutineScope {

    flowOf("A", "B", "C", "D")
        .onEach {    // onEach 每次發送前執行
            delay(100)
        }
        .collect {
            println("it each: $it")
        }
}

C. 使用頂層函數 asFlow:它是一個 Iterable 類的拓展泛型函數,同樣透過 emit 發送


// 源碼

public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

asFlow 的使用範例如下


// 範例

suspend fun sampleUse03() = coroutineScope {

    listOf(1.1, 2.2, 3.3, 4.4, 5.5).asFlow()
        .onEach {    // onEach 每次發送前執行
            delay(100)
        }
        .collect {
            println("it each: $it")
        }
}

D. 使用頂層函數 channelFlow,跟上面的 差別在於,它透過 channel 的 send 發送(Channel 的特點是可以與異步產生掛勾)


// 源碼

public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
    ChannelFlowBuilder(block)

channelFlow 的使用範例如下


// 範例

suspend fun sampleUse04() = coroutineScope {

    channelFlow {
        for (i in 1..5) {
            val res = measureTimeMillis {
                delay(1000L * i)
                send(i)
            }
            println("Use time: $res")
        }
    }.collect {
        println("channel res: $it")
    }
}

Channel Flow 特點

● Channel Flow 與一般 Flow 兩個都是 生產者跟消費者模型,但是還是有差異,其差異如下

Flow 種類是否同步特點
Flow非掛起(非阻塞,CPU 沒有讓出資源)
ChannelFlow否、異步可切換上下文(不同 Coroutine 間通訊)

一般的 Flow 是「同步」的方式傳遞訊息,必須等待 collect 處理完畢才可以進行下一次 emit 行為


suspend fun normalFlow_Sync() = coroutineScope {

    val totalTime = measureTimeMillis {
        flow {          // 與使用 `flowOf` 則相同
            for (it in 1 .. 5) {        // 1 ~ 5
                delay(100)
                println("Hello, emit now: $it")
                emit(it)
            }
        }.collect{
            delay(500)
            println("World, I get normal flow item: $it")
        }
    }
    
    // 
    println("Flow with sync, total use time: $totalTime")
}

概念圖如下,每次都必須傳輸(耗時 100ms)、收集(耗時 500ms)結束才往下一個事件走

graph TB 1 --> |100ms + 500ms| 2 --> |100ms + 500ms| 3 --> |100ms + 500ms| 4 --> |100ms + 500ms| 5

B. ChannelFlow 使用範例

ChannelFlow 是「異步」傳送訊息,不需等待 collect 處理完就可以繼續 send 訊息(collect 仍需要等待)


suspend fun channelFlow_Async() = coroutineScope {
    val totalTime = measureTimeMillis {
        channelFlow {
            for (it in 1 .. 5) {
                delay(100)
                println("Hello, send now: $it")
                send(it)        // 可以一直發送,不用等待
            }
        }.collect{
            delay(500)        // 這裡必須等待
            println("World, I get channel flow item: $it")
        }
    }

    println("Flow with async, total use time: $totalTime")
}

Flow 切換 Thread - flowOn

● 在 Flow 中要切換 Thread 就需要使用到 flowOn 關鍵字,flowOn 影響到的範圍是在 collect 之前

類似 RxJava 中切換 Thread 的 observeOnsubscribeOn 方法


suspend fun flowSwitchThread() = coroutineScope {
    flow{
        for (i in 1..3) {
            println("---Flow thread: ${Thread.currentThread().name}=$i")
            emit(i)
        }
    }
    .map {
        println("------Map thread: ${Thread.currentThread().name}=$it")
        it * it
    }
    .flowOn(Dispatchers.IO)        // 讓 flow、map 運作在 IO 上
    .collect {
        // collect 仍運作在 main
        println("Collect thread: ${Thread.currentThread().name}=$it")
    }
}

以下範例將 flowmap 運作在 IO 執行序,collect接收在 Main 執行序接收!

● 我們在使用 Coroutine 時是使用 withContext 函數做切換,但 在 Flow 中不可以使用 withContext 切換

Flow#collect 運作在哪個執行序是由 flow 在哪個執行序啟動為準(以下指定運作 Flow 的執行序)


suspend fun flowSwitchThread2() = coroutineScope {

    withContext(newSingleThreadContext("HelloWorld")) {
        flow{
            for (i in 1..3) {
                println("---Flow thread: ${Thread.currentThread().name}=$i")
                emit(i)
            }
        }
        .map {
            println("------Map thread: ${Thread.currentThread().name}=$it")
            it * it
        }
        .flowOn(Dispatchers.IO)
        .collect {
            // 切換到 HelloWorld Thread
            println("Collect thread: ${Thread.currentThread().name}=$it")
        }
    }
}

flowOn影響上游的操作,所以一個 Flow 中可以多次切換 flowOn 去指定業務邏輯要在哪個不同 Thread 的運作;請注意以下 Thread 切換操作


suspend fun flowSwitchThread3() = coroutineScope {
    val customerDispatcher = newSingleThreadContext("HelloWorld")

    flow{
        for (i in 1..3) {
            println("---Flow thread: ${Thread.currentThread().name}=$i")
            emit(i)
        }
    }
    .map {    // map 1
        println("------Map on customerDispatcher: ${Thread.currentThread().name}=$it")
        it * it
    }
    .flowOn(customerDispatcher)        // 影響上游操作: 影響 flow、map 1 兩個操作
    .map {    // map 2
        println("------Map on IO: ${Thread.currentThread().name}=$it")
        it * it
    }
    .flowOn(Dispatchers.IO)            // 只影響 map 2 操作
    .collect {
        println("Collect thread: ${Thread.currentThread().name}=$it")
    }
}

取消、關閉 Flow 流

● Flow 是 可以被關閉、取消,範例如下…


fun main() : Unit = runBlocking {

    withTimeoutOrNull(2000) {
        flow{
            for (i in 1..5) {
                delay(400)
                emit(i)
                println("---Flow=$i")
            }
        }.collect {
            delay(400)
            println("Collect get: $it")
        }
    }

    println("Main done.")

}

監聽 Flow 流

● 在 Flow 的運行中可以知道 Flow 的開始 onStart、結束 onCompletion

類似 RxJava 中的 do 函數


fun main() : Unit = runBlocking {

    flow {
        for (i in 1..5) {
            emit(i)
            println("---Flow=$i")
        }
    }.onStart {
        println("onStart ~~")
    }.onCompletion {
        println("onCompletion ~~")
    }.collect {
        println("Collect get: $it")
    }

}

Flow 的協程 & Sequences 的同步

● Flow 是按照順序執行,而 Sequences 也可以達到相同效果,但兩者仍有差異

順序類發送數據的方法差異
Flowemit同步,但是發送資料時內部是使用協程的機制,並不堵塞執行序
Sequencesyield同步,內部 不支援 suspend function(會堵塞執行序)

A. Flow 非同步

以下範例中,Flow 在發送 (emit) 後資料後,會把 MainThread 讓出 (delay 函數) 讓其他需要 MainThread 的函數去使用

以下的測試將只會在 MainThread 執行

測試的目的是為了確認 Flow 在執行 suspend function 時是否會堵塞 MainThread,如果會堵塞則以下的 launch Lambda 將無法被運行


suspend fun flowUse() = coroutineScope {

    launch {
        for(i in 1 .. 5) {
            delay(100)
            println("Launch item=${i}, Thread: ${Thread.currentThread().name}")
        }
    }

    flow {
        for (i in 1 .. 5) {
            delay(100)
            emit(i)      
            println("flow item=${i}, Thread: ${Thread.currentThread().name}")
        }
    }.collect {
        println("collect get=(${it}), Thread: ${Thread.currentThread().name}")
    }

    println("Done")
}

從結果來看 flow 是不對堵塞 CPU 的

● 由上圖結果可知,flow 在運行 suspend function 時不會堵塞 MainThread,它會讓出 Thread 的使用權給其他需要的函數使用

從這裡我們也可以看出 flow 是使用了「協程, Coroutine」技術,才可以達到不堵塞單一執行序,而執行異步的行為!

B. Sequences 同步

sequences 會堵塞 Mainthread 的使用(正確點來說是,當前運行的 Thread) 直到它執行完畢

以下案例,同樣的程式,不過我們將 flow 換成 sequence,來觀察它是否會堵塞 MainThread 的執行


suspend fun sequencesUse() = coroutineScope {

    launch {
        // sequences 會堵占 main 的使用
        for(i in 1 .. 5) {
            delay(100)
            println("Launch item=${i}, Thread: ${Thread.currentThread().name}")
        }
    }

    sequence {
        for (i in 1 .. 5) {
            Thread.sleep(100)
            yield(i)        // 等同 emit 的意思
            println("sequence item=${i}, Thread: ${Thread.currentThread().name}")
        }
    }.forEach {
        println("forEach get=(${it}), Thread: ${Thread.currentThread().name}")
    }

    println("Done")
}

可以看到 Sequence 佔用了當前 MainThread 的執行

● 在特別強調一次:

sequences 堵塞的是當前執行序,而不是只堵塞 Mainthread


// 創建指定 Thread 測試
fun main() : Unit = runBlocking(newSingleThreadContext("Hello")) {          // 堵塞當前 thread
    sequencesUse()
}

監看 Flow 結束:就算異常也要看到結束

● Flow 是一個 Suspend function,如果需要在 Flow 結束時 (不管是正常結束、還是拋出錯誤),通知使用者進行操作

這裡只說明如何判斷 Flow 的結束,並 不包含 Flow 的錯誤捕捉

● 可以通過以下兩種方式

A. imperative:用一個大的 try/finally 包裹,透過 finally 通知使用者 Flow 已經完成


suspend fun flowTryFinally() = coroutineScope {
    try {
        flowOf(1, 2, 3)
            .map {
                it * it
            }
            .collect {
                if (it == 4) {
                    throw Exception("Test throw. $it")
                }
            }
    } finally {
        println("Try finally flow finish.")
    }
}

fun main() : Unit = runBlocking {
    flowTryFinally()

    // 不會執行到
    println("Main finish.")
}

沒有 catch 的話就不能捕捉 Exception,只能通過 finally 知道最終結果

B. declatative:透過 onCompletion 函數,就可以達到上面 finally 的相同效果!


suspend fun flowOnCompletion() = coroutineScope {
    flowOf(1, 2, 3)
        .map {
            it * it
        }
        .onCompletion {
            println("onCompletion flow finish.")
        }
        .collect {
            if (it == 9) {
                throw Exception("Test throw. $it")
            }
        }
}


fun main() : Unit = runBlocking {
//    flowTryFinally()
    flowOnCompletion()

    // 不會執行到   
    println("Main finish.")
}

Flow 異常處理 / 重試

● 上面我們說了 Flow 的結束,但並沒有說如何處理異常;而這邊我們就特別來說說 Flow 是如何處理異常的;Flow 處理異常有兩種方案

A. 使用傳統的 try/catch 處理


suspend fun traditionTryCatch() {
    try {
        flowOf(1, 2, 3)
            .map {
                it * it
            }
            .onCompletion {
                println("onCompletion flow finish.")
            }
            .collect {
                if (it == 9) {
                    throw Exception("Test throw. $it")
                }
            }
    } catch (e : Exception) {
        println("flow get exception: $e")
    }
}

fun main() : Unit = runBlocking {
    traditionTryCatch()

    println("Main finish")
}

B. 使用 catch 操作符:catch 操作符可以捕捉 上游 的操作錯誤


suspend fun flowCatch() {
    flowOf(1, 2, 3)
        .map {
            if (it == 3) {
                throw Exception("Test throw. $it")
            }
            it * it
        }
        .catch {
            // 捕捉上游
            println("flow get exception: $it")
        }
        .onCompletion {
            // 不影響下游
            println("onCompletion flow finish. e: $it")
        }
        .collect {
            println("Flow catch: $it.")
        }
}

● 何謂「上游錯誤 」? 就是在設定 catch 操作符之前的錯誤


suspend fun flowCatch2() {
    flowOf(1, 2, 3)
        .catch {
            // 這時就無法捕捉 map 中的錯誤
            println("flow get exception: $it")
        }
        .map {
            if (it == 3) {
                throw Exception("Test throw. $it")
            }
            it * it
        }
        .onCompletion {
            println("onCompletion flow finish. e: $it")
        }
        .collect {
            println("Flow catch: $it.")
        }
}

由此我們也 可以知道 catch 是無法處理 collect 中的錯誤的

● Flow 在發生錯誤時可以透過 retryretryWhen 操作符對上游做重試

retry 重試次數是都是從 1 開始計算;retryWhen 重試次數是都是從 0 開始通知

A. retry 操作符可以透過 return Boolean 來決定是否再次重試;Return true 代表重試,否則不重試


suspend fun flowCatchRetry() {
    flowOf(1, 2, 3)
        .onEach {
            println("Current number=($it)")
            if (it == 2) {
                throw Exception("Test throw=($it)")
            }
        }
        .retry(2) {    // 重試兩次
            if (it.message == "Test throw=(2)") {
                println("Handle exception.")
                return@retry true
            }
            false
        }
        .onCompletion {
            println("onCompletion flow finish.")
        }
        .collect {
            if (it == 9) {
                throw Exception("Test throw. $it")
            }
        }
}

reference link

B. retryWhen 可以達到跟上述一樣的效果;retryWhen 會不斷地重試,並且它多了一個當前重試次數給使用者判斷


suspend fun flowCatchRetryWhen() {
    flowOf(1, 2, 3)
        .onEach {
            println("Current number=($it)")
            if (it == 2) {
                throw Exception("Test throw=($it)")
            }
        }
        .retryWhen {e, times ->
            println("Current try times=($times), e=($e)")
            if(e is Exception) {
                // 同 `retry`,返回 true 代表同意重試
                
                return@retryWhen times < 2        
            }
            false
        }
        .onCompletion {
            println("onCompletion flow finish.")
        }
        .collect {
            if (it == 9) {
                throw Exception("Test throw. $it")
            }
        }
}


Flow & RxJava 對比

Koltin 協程可以透過一些類的組合來達到等同於 RxJava 的效果,如下表

RxJavaCorotines
Single<T>Defered<T>
Maybe<T>Defered<T>
CompletableJob
Observable<T>Channel<T>Flow<T>
Flowable<T>Channel<T>Flow<T>

Cold Stream 冷流:Flow

● Cold Stream 如同上面的範例一樣,在呼叫 Flow#collect 後才開始運行 Flow 的流程

如同 RxJava 使用 subscribe 函數才開始運行

A. 實驗一:延遲 collect,觀察是否會少收到 Flow 的資料


@Test
fun testColdStream() {
    var flow: Flow<String>? = null

    CoroutineScope(Dispatchers.IO).launch {
        println("start flow: ${Thread.currentThread().name}")
        flow = flowOf("A", "B", "C", "D", "E", "F", "G")
                .onEach {    // onEach 每次發送前執行
                    delay(100)
                    println("${Thread.currentThread().name}, send: $it")
                }
    }

    println("start")
    runBlocking {
        // 延遲 `collect`
        delay(300)

        flow!!.collect {
            println("${Thread.currentThread().name}, receive: $it")
        }
    }
    println("done")
}

從結果可以看出來,我其實已經延遲 collect,但是 Flow 仍是等到有人 collect 才發送資料

image

B. 實驗二:創建兩個 collect,並延遲 collect,觀察兩個是否會少收到 Flow 的資料


@Test
fun testColdStream() {
    var flow: Flow<String>? = null

    CoroutineScope(Dispatchers.IO).launch {
        println("start flow: ${Thread.currentThread().name}")
        flow = flowOf("A", "B", "C", "D", "E", "F", "G")
                .onEach {    // onEach 每次發送前執行
                    delay(100)
                    println("${Thread.currentThread().name}, send: $it")
                }
    }

    println("start")
    runBlocking {
        delay(300)

        flow!!.collect {
            println("1111 ${Thread.currentThread().name}, receive: $it")
        }
        delay(100)

        flow!!.collect {
            println("2222 ${Thread.currentThread().name}, receive: $it")
        }
    }
    println("done")
}

從結果可以看到,第二個 collect 仍會完整地收到全部的資料

image

Hot Stream 熱流:MutableSharedFlow

Hot Stream 如同直播,數據要即時擷取否則過了就不會再見到;Kotlin 則可以透過 MutableSharedFlow 的協助來到相同的效果


@Test
fun testHotStream() {
    // 創建發射器,熱流發射器
    val flow: MutableSharedFlow<String> = MutableSharedFlow()

    CoroutineScope(Dispatchers.IO).launch {
        println("start flow: ${Thread.currentThread().name}")
        
        flowOf("A", "B", "C", "D", "E", "F", "G")
                .onEach {
                    delay(100)
                    println("${Thread.currentThread().name}, send: $it")
                    flow.emit(it)
                }
                .onCompletion {
                    println("Flow completed")
                }
                .launchIn(this) // 啟動 Flow,這裡是使用 launchIn
    }

    println("start")
    runBlocking {
        delay(300)

        // 只會收到當下的數據!
        flow.collect {
            println("${Thread.currentThread().name}, receive: $it")
        }
    }
    println("done")
}

image


Backpressure 背壓

首先先來介紹一下 何謂 Backpressure ? 我們知道 Flow 就像是一個生產者消費者模型,而 Backpressure 的情況則是 生產者的產量遠遠大於消費者

Backpressure 產生後如果沒有策略處理則可能導致應用崩潰

RxJava 有對 Backpressure 有相對應的策略,反映在 Flow 中也有相同的策略,請見下表

RxJavaFlow說明
BUFFERbuffer()Buffer 用來儲存尚未處理的數據,沒有固定 Buffer 大小,有可能導致 OOM
DROP-Flow 緩衝池滿了,則拋棄準備進入緩衝池的新數據
LATESTconflate()行為同 DROP,但 LETEST 會強制將最後一個數據放入緩衝池中

Flow 實現 BUFFER 策略

● 首先先來看看沒有 Buffer 時,Flow 面對消費者/生產者時間差,會有甚麼反應


suspend fun flowWithoutBuffer() = coroutineScope {

    fun curTime() = System.currentTimeMillis()

    var startTimeStamp : Long = 0

    flowOf(1, 2, 3, 4, 5)
        .onStart {
            startTimeStamp = curTime()
        }.onEach {
            println("Supplier $it (${curTime() - startTimeStamp} ms).")
        }
        .collect {
            // 一個一個處理
            println("Consumer $it start(${curTime() - startTimeStamp} ms).")
            delay(500)      // 延緩消費者
            println("Consumer $it finish(${curTime() - startTimeStamp} ms).")
        }
}

如下圖,我們可以看到沒有 Buffer 機制,那 flow 則要等待 collect 處理完,才能發送下一個 emit 數據

● 以下使用 flow 來達成 RxJava BUFFER 的功能 (重點其實就是加了一個 buffer 函數)


suspend fun flowWithBuffer() = coroutineScope {

    fun curTime() = System.currentTimeMillis()

    var startTimeStamp : Long = 0

    flowOf(1, 2, 3, 4, 5)
        .onStart {
            startTimeStamp = curTime()
        }.onEach {
            println("Supplier $it (${curTime() - startTimeStamp} ms).")
        }
        .buffer()           // 不限定則 Buffer 無限大
        .collect {
            // 一個一個處理
            println("Consumer $it start(${curTime() - startTimeStamp} ms).")
            delay(500)      // 延緩消費者
            println("Consumer $it finish(${curTime() - startTimeStamp} ms).")
        }
}

故意延遲 Comsumer 消耗,但這些仍在 Buffer 中,就不必等待 collect 處理完就可以 emit 下一個數據

● Flow Buffer 如果沒有給定 capacity 限制則無限大,假設有給定 capacity 數值,則須加設預設的 2 個容量


suspend fun flowBuffer_2() {

    fun curTime() = System.currentTimeMillis()

    var startTimeStamp : Long = 0

    flowOf(1, 2, 3, 4, 5)
        .onStart {
            startTimeStamp = curTime()
        }.onEach {
            println("Supplier $it (${curTime() - startTimeStamp} ms).")
        }
        .buffer(1)       // 它預設有 2 個 capacity
        .collect {
            // 一個一個處理
            println("Consumer $it start(${curTime() - startTimeStamp} ms).")
            delay(500)      // 延緩消費者
            println("Consumer $it finish(${curTime() - startTimeStamp} ms).")
        }

}

可以看到 buffer 明明設定為 1,不過 flow 在 emit 時直到 3 才真正堵塞,由此可見 buffer 預設有 2 個 capacity

BUFFER 策略:異步併發

● Flow 使用 buffer 操作就可以達到併發的效果 (如果 Buffer 尚未滿的情況下)

A. 首先我們知道一般 非 Channel 的 Flow 是一個同步操作,必須要等待 collect 操作完才可以執行下一個步驟;


suspend fun flowNoBuffer() {

    val uesTimes = measureTimeMillis {
        flowOf(1, 2, 3, 4, 5)
            .onEach {
                delay(100)
            }
            .collect {
                delay(500)
                println("$it")
            }
    }

    println("Without buffer=($uesTimes ms)")

}

B. 這時如果多了 buffer 情況就會如同「ChannelFlow」,不需等待 collect 結束就可以執行下一個操作;可以達到類似 ChannelFlow 的效果


suspend fun flowBufferAsChannel() {

    val uesTimes = measureTimeMillis {
        flowOf(1, 2, 3, 4, 5)
            .onEach {
                delay(100)
            }
            .buffer()
            .collect {
                delay(500)
                println("$it")
            }
    }

    println("Without buffer=($uesTimes ms)")

}

Flow 實現 LATEST 策略

● 使用 Flow 實現 RxJava 中的 LATEST (LATEST 的特色是會保存最後一個數據,我們就檢查最後一個數據是否有被保存)


suspend fun flowLatest() {

    fun curTime() = System.currentTimeMillis()

    var startTimeStamp : Long = 0

    flowOf(1, 2, 3, 4, 5)
        .onStart {
            startTimeStamp = curTime()
        }.onEach {
            println("Supplier $it (${curTime() - startTimeStamp} ms).")
        }
        .conflate()           // LATEST 策略
        .collect {
            // 一個一個處理
            println("Consumer $it start(${curTime() - startTimeStamp} ms).")
            delay(500)      // 延緩消費者
            println("Consumer $it finish(${curTime() - startTimeStamp} ms).")
        }

}


Flow 其他更多操作

轉換 transform

● 在 transform 操作符中幾個特點:可以多次 emit 數據、emit 數據沒有限制

A. 多次 emit


suspend fun transformMultiEmit() = coroutineScope {

    val startTimeStamp = System.currentTimeMillis()

    (1..3).asFlow()
        .transform {
            println("Transform --- $it")
            emit(it * 2)            // 多次 emit
            delay(100)
            emit(it * 4)
        }
        .collect{
            println("Collect($it), time=(${System.currentTimeMillis() - startTimeStamp})")
        }
}

B. emit 數據沒有限制:一般 Flow 是有限制 emit 數據類型,其類型必須與 Flow 相同,而 transform 內的 emit 則沒有限制


suspend fun transformEmitOtherType() = coroutineScope {

    val startTimeStamp = System.currentTimeMillis()

    (1..3).asFlow()
        .transform {
            println("Transform --- $it")
            emit(it)            // 多次 emit
            delay(100)
            emit("Hello: $it")
        }
        .collect{
            println("Collect=($it), time=(${System.currentTimeMillis() - startTimeStamp})")
        }
}

限制取用 take

● 一般的 Flow 在發射數據 (emit) 時都沒有限制,這時 如果你要限制數據的發射接收數量,就可以使用 take 操作符


fun main() : Unit = runBlocking {
    flowOf(1, 2, 3, 4, 5)
        .take(3)     // 限制接收數量
        .collect {
            println("Flow with take=($it)")
        }

    println("Main finish.")
}

Flow 計算結果

● Flow 除了 emit 以外我們還可以透過兩個操作符來聚集所有 Flow 的結果

A. reduce 操作符:獲取上一次的結果,返回下一個結果


suspend fun flowReduce() = coroutineScope {

    val res = (1..5)
        .asFlow()
        .reduce {lastValue, curValue ->
            println("lastValue=($lastValue), curValue=($curValue)")

            lastValue + curValue
        }

    println("Reduce=($res)")
}

B. fold 操作符:跟 reduce 操作符很像,不過它可以設定初始值,透過初始值開始計算


suspend fun flowFold() = coroutineScope {

    val res = (1..5)
        .asFlow()
        // 初始值設定為 3,從 3 開始計算
        .fold(3) {lastValue, curValue ->
            println("lastValue=($lastValue), curValue=($curValue)")

            lastValue + curValue
        }

    println("Reduce=($res)")
}

合併操作符

Flow 也有合併操作符,可以合併兩個不同的 Flow

A. zip 操作符:兩個不同的 Flow


suspend fun flowZip() = coroutineScope {

    val flowA = flowOf(1, 2, 3, 4, 5)
    val flowB = flowOf("A", "B", "C", "D", "E")

    flowA.zip(flowB) {
        a, b ->
            val tmp = "Zip flowA=($a), flowB=($b)"
            println(tmp)
            tmp
    }.collect {
        println("Collect=($it)")
    }
}

● 當兩個 Flow 的數據量不同時,會以最少數據量的 Flow 為準


suspend fun flowZip2() = coroutineScope {

    // 數量差異
    val flowA = flowOf(1, 2, 3, 4)
    val flowB = flowOf("A", "B", "C", "D", "E")

    flowA.zip(flowB) {
            a, b ->
        val tmp = "Zip flowA=($a), flowB=($b)"
        println(tmp)
        tmp
    }.collect {
        println("Collect=($it)")
    }
}

B. combine 操作符:與 zip 類似 (但不同);當兩個 Flow 數量不同時,不足處會取用最後一個數據來合併


suspend fun flowCombine() = coroutineScope {

    val flowA = flowOf(1, 2)
    val flowB = flowOf("A", "B", "C", "D", "E")

    flowA.combine(flowB) {
        a, b ->
            val tmp = "Combine flowA=($a), flowB=($b)"
            println(tmp)
            tmp
    }.collect {
        println("Collect=($it)")
    }
}

● 上面兩個操作符會將 Flow 合併成一個 Flow (算是合併、並聯);操作符 flatMerge 則是 串聯 Flow,將數據會合成一個流


suspend fun flowConcat() = coroutineScope {

    val flowA = flowOf(1, 2, 3, 4)
    val flowB = flowOf("A", "B", "C", "D", "E")

    flowOf(flowA, flowB)
        .flattenConcat()
        .collect {
            println("Collect=($it)")
        }
}

Nest Flow: Flat 鋪平

● 如果有使用到巢狀 (Nest) Flow 並控制其行為的狀況就可以使用 Flat 相關操作符

Flat 相關操作符特色
flatMapConcat等待 Flat 內部完成才通知 Collect
flatMapMerge併發操作,不會等待 Flat 內部就直接通知 Collect
flatMapLatest當有第二次數據發送 (emit) 時,就會停止 Collect 接收

A. flatMapConcat 操作符:等待 Flat 內部完成才通知 Collect


suspend fun flatConcat() {
    var startTime : Long = 0

    (1..5)
        .asFlow()
        .onStart { startTime = System.currentTimeMillis() }
        .flatMapConcat {
            flow {
                emit("$it: First")
                delay(500)
                emit("$it: Second")
            }
        }
        .collect {
            println("Collect=($it), time=(${System.currentTimeMillis() - startTime} ms)")
        }
}

B. flatMapMerge 操作符:併發操作,不會等待 Flat 內部就直接通知 Collect


suspend fun flatMerge() {
    var startTime : Long = 0

    (1..5)
        .asFlow()
        .onStart { startTime = System.currentTimeMillis() }
        .flatMapMerge {
            flow {
                emit("$it: First")
                delay(500)
                emit("$it: Second")
            }
        }
        .collect {
            println("Collect=($it), time=(${System.currentTimeMillis() - startTime} ms)")
        }

}

C. flatMapLatest 操作符:當有第二次數據發送 (emit) 時,就會停止 Collect 接收


suspend fun flatLatest() {
    var startTime : Long = 0

    (1..5)
        .asFlow()
        .onStart { startTime = System.currentTimeMillis() }
        .flatMapLatest {
            flow {
                emit("$it: First")
                delay(500)
                emit("$it: Second")
            }
        }
        .collect {
            println("Collect=($it), time=(${System.currentTimeMillis() - startTime} ms)")
        }

}


更多的 Kotlin 語言相關文章

在這裡,我們提供了一系列豐富且深入的 Kotlin 語言相關文章,涵蓋了從基礎到進階的各個方面。讓我們一起來探索這些精彩內容!

Kotlin 特性、特點

Kotlin 特性、特點:探索 Kotlin 的獨特特性和功能,加深對 Kotlin 語言的理解,並增強對於語言特性的應用

Kotlin 進階:協程、響應式、異步

Kotlin 進階:協程、響應式、異步:若想深入學習 Kotlin 的進階主題,包括協程應用、Channel 使用、以及 Flow 的探索,請查看以下文章

Leave a Comment

Comments

No comments yet. Why don’t you start the discussion?

發表迴響