Overview of Content
本文深入研究 Kotlin Flow,涵蓋了流的基本使用、與 RxJava 的對比、背壓機制以及其他豐富的操作
首先,我們介紹了 Flow 的基本使用,包括如何創建、特點、切換執行緒、取消和關閉流等
接著,我們深入比較了 Flow 與 RxJava 中的冷流和熱流概念;在背壓機制部分,我們討論了 Flow 實現 BUFFER 策略、LATEST 策略的具體方式
最後,我們探討了 Flow 的其他操作,包括轉換、限制取用、計算結果、合併操作符以及 Nest Flow 的應用,透過本文的閱讀,您將深入瞭解 Kotlin Flow 的豐富功能、細節,並學會在實際應用中基礎運用。
寫文章分享不易,如有引用參考請詳註出處,如有指導、意見歡迎留言(如果覺得寫得好也請給我一些支持),感謝 😀
Flow 基本使用、介紹
我們在 Java 中常用的響應式編程模型就是 RxJava;而 Flow 就是 Koltin 結合 Coroutine 響應式編程的產物
響應式編程:簡單來想就是依照上一步反應而觸發不同的行為編程;最常見的就是 API 請求,由 API 請求的結果來驅動 APP 內的邏輯,讓它們之間產生對應的反應(像是很多的監聽者)
Flow 使用起來類似於 RxJava,其函數的對應如無下
功能 | Flow | RxJava |
---|---|---|
對流發送資料 | emit() | onNext() |
接收資料 | collect() | subscribe() |
Flow 是一個介面
// Flow 源碼
public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}
flow 基本使用
● Flow 可以返回(或是說發射 emit
)多個異步計算的結果,並用 collect
接收結果;FlowBuilder 有幾種使用方式,常用的方式如下
A. 使用頂層函數 flow{ }
:可以直接使用 flow{ }
創建 SafeFlow
// 源碼
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
flow{ }
的使用範例如下:
// 範例
suspend fun sampleUse01() = coroutineScope {
flow<Int> {
for (i in 1..5) {
val res = measureTimeMillis {
delay(1000L * i)
emit(i)
}
println("Use time: $res")
}
}.collect {
println("res: $it")
}
}
B. 使用頂層函數 flowOf{ }
:其實內部就是透過 flow 並呼叫 emit
函數
// 源碼
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
for (element in elements) {
emit(element)
}
}
flowOf{ }
使用範例如下:
// 範例
suspend fun sampleUse02() = coroutineScope {
flowOf("A", "B", "C", "D")
.onEach { // onEach 每次發送前執行
delay(100)
}
.collect {
println("it each: $it")
}
}
C. 使用頂層函數 asFlow
:它是一個 Iterable
類的拓展泛型函數,同樣透過 emit
發送
// 源碼
public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
}
asFlow
的使用範例如下:
// 範例
suspend fun sampleUse03() = coroutineScope {
listOf(1.1, 2.2, 3.3, 4.4, 5.5).asFlow()
.onEach { // onEach 每次發送前執行
delay(100)
}
.collect {
println("it each: $it")
}
}
D. 使用頂層函數 channelFlow
,跟上面的 差別在於,它透過 channel 的 send 發送(Channel 的特點是可以與異步產生掛勾)
// 源碼
public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
ChannelFlowBuilder(block)
channelFlow
的使用範例如下:
// 範例
suspend fun sampleUse04() = coroutineScope {
channelFlow {
for (i in 1..5) {
val res = measureTimeMillis {
delay(1000L * i)
send(i)
}
println("Use time: $res")
}
}.collect {
println("channel res: $it")
}
}
Channel Flow 特點
● Channel Flow 與一般 Flow 兩個都是 生產者跟消費者模型,但是還是有差異,其差異如下
Flow 種類 | 是否同步 | 特點 |
---|---|---|
Flow | 是 | 非掛起(非阻塞,CPU 沒有讓出資源) |
ChannelFlow | 否、異步 | 可切換上下文(不同 Coroutine 間通訊) |
一般的 Flow 是「同步」的方式傳遞訊息,必須等待 collect
處理完畢才可以進行下一次 emit
行為
suspend fun normalFlow_Sync() = coroutineScope {
val totalTime = measureTimeMillis {
flow { // 與使用 `flowOf` 則相同
for (it in 1 .. 5) { // 1 ~ 5
delay(100)
println("Hello, emit now: $it")
emit(it)
}
}.collect{
delay(500)
println("World, I get normal flow item: $it")
}
}
//
println("Flow with sync, total use time: $totalTime")
}
概念圖如下,每次都必須傳輸(耗時 100ms)、收集(耗時 500ms)結束才往下一個事件走
B. ChannelFlow 使用範例:
ChannelFlow 是「異步」傳送訊息,不需等待 collect
處理完就可以繼續 send
訊息(collect 仍需要等待)
suspend fun channelFlow_Async() = coroutineScope {
val totalTime = measureTimeMillis {
channelFlow {
for (it in 1 .. 5) {
delay(100)
println("Hello, send now: $it")
send(it) // 可以一直發送,不用等待
}
}.collect{
delay(500) // 這裡必須等待
println("World, I get channel flow item: $it")
}
}
println("Flow with async, total use time: $totalTime")
}
Flow 切換 Thread - flowOn
● 在 Flow 中要切換 Thread 就需要使用到 flowOn
關鍵字,flowOn
影響到的範圍是在 collect 之前
類似 RxJava 中切換 Thread 的
observeOn
、subscribeOn
方法
suspend fun flowSwitchThread() = coroutineScope {
flow{
for (i in 1..3) {
println("---Flow thread: ${Thread.currentThread().name}=$i")
emit(i)
}
}
.map {
println("------Map thread: ${Thread.currentThread().name}=$it")
it * it
}
.flowOn(Dispatchers.IO) // 讓 flow、map 運作在 IO 上
.collect {
// collect 仍運作在 main
println("Collect thread: ${Thread.currentThread().name}=$it")
}
}
以下範例將
flow
、map
運作在 IO 執行序,collect
接收在 Main 執行序接收!
● 我們在使用 Coroutine 時是使用
withContext
函數做切換,但 在 Flow 中不可以使用withContext
切換!
● Flow#collect
運作在哪個執行序是由 flow 在哪個執行序啟動為準(以下指定運作 Flow 的執行序)
suspend fun flowSwitchThread2() = coroutineScope {
withContext(newSingleThreadContext("HelloWorld")) {
flow{
for (i in 1..3) {
println("---Flow thread: ${Thread.currentThread().name}=$i")
emit(i)
}
}
.map {
println("------Map thread: ${Thread.currentThread().name}=$it")
it * it
}
.flowOn(Dispatchers.IO)
.collect {
// 切換到 HelloWorld Thread
println("Collect thread: ${Thread.currentThread().name}=$it")
}
}
}
● flowOn
是 影響上游的操作,所以一個 Flow 中可以多次切換 flowOn 去指定業務邏輯要在哪個不同 Thread 的運作;請注意以下 Thread 切換操作
suspend fun flowSwitchThread3() = coroutineScope {
val customerDispatcher = newSingleThreadContext("HelloWorld")
flow{
for (i in 1..3) {
println("---Flow thread: ${Thread.currentThread().name}=$i")
emit(i)
}
}
.map { // map 1
println("------Map on customerDispatcher: ${Thread.currentThread().name}=$it")
it * it
}
.flowOn(customerDispatcher) // 影響上游操作: 影響 flow、map 1 兩個操作
.map { // map 2
println("------Map on IO: ${Thread.currentThread().name}=$it")
it * it
}
.flowOn(Dispatchers.IO) // 只影響 map 2 操作
.collect {
println("Collect thread: ${Thread.currentThread().name}=$it")
}
}
取消、關閉 Flow 流
● Flow 是 可以被關閉、取消,範例如下…
fun main() : Unit = runBlocking {
withTimeoutOrNull(2000) {
flow{
for (i in 1..5) {
delay(400)
emit(i)
println("---Flow=$i")
}
}.collect {
delay(400)
println("Collect get: $it")
}
}
println("Main done.")
}
監聽 Flow 流
● 在 Flow 的運行中可以知道 Flow 的開始 onStart
、結束 onCompletion
類似 RxJava 中的
do
函數
fun main() : Unit = runBlocking {
flow {
for (i in 1..5) {
emit(i)
println("---Flow=$i")
}
}.onStart {
println("onStart ~~")
}.onCompletion {
println("onCompletion ~~")
}.collect {
println("Collect get: $it")
}
}
Flow 的協程 & Sequences 的同步
● Flow 是按照順序執行,而 Sequences 也可以達到相同效果,但兩者仍有差異
順序類 | 發送數據的方法 | 差異 |
---|---|---|
Flow | emit | 同步,但是發送資料時內部是使用協程的機制,並不堵塞執行序 |
Sequences | yield | 同步,內部 不支援 suspend function(會堵塞執行序) |
A. Flow 非同步:
以下範例中,Flow 在發送 (emit) 後資料後,會把 MainThread 讓出 (delay
函數) 讓其他需要 MainThread 的函數去使用
以下的測試將只會在 MainThread 執行
測試的目的是為了確認 Flow
在執行 suspend function
時是否會堵塞 MainThread,如果會堵塞則以下的 launch
Lambda 將無法被運行
suspend fun flowUse() = coroutineScope {
launch {
for(i in 1 .. 5) {
delay(100)
println("Launch item=${i}, Thread: ${Thread.currentThread().name}")
}
}
flow {
for (i in 1 .. 5) {
delay(100)
emit(i)
println("flow item=${i}, Thread: ${Thread.currentThread().name}")
}
}.collect {
println("collect get=(${it}), Thread: ${Thread.currentThread().name}")
}
println("Done")
}
從結果來看
flow
是不對堵塞 CPU 的
● 由上圖結果可知,
flow
在運行suspend function
時不會堵塞 MainThread,它會讓出 Thread 的使用權給其他需要的函數使用從這裡我們也可以看出
flow
是使用了「協程, Coroutine」技術,才可以達到不堵塞單一執行序,而執行異步的行為!
B. Sequences 同步:
sequences 會堵塞 Mainthread 的使用(正確點來說是,當前運行的 Thread) 直到它執行完畢
以下案例,同樣的程式,不過我們將 flow
換成 sequence
,來觀察它是否會堵塞 MainThread 的執行
suspend fun sequencesUse() = coroutineScope {
launch {
// sequences 會堵占 main 的使用
for(i in 1 .. 5) {
delay(100)
println("Launch item=${i}, Thread: ${Thread.currentThread().name}")
}
}
sequence {
for (i in 1 .. 5) {
Thread.sleep(100)
yield(i) // 等同 emit 的意思
println("sequence item=${i}, Thread: ${Thread.currentThread().name}")
}
}.forEach {
println("forEach get=(${it}), Thread: ${Thread.currentThread().name}")
}
println("Done")
}
可以看到 Sequence 佔用了當前 MainThread 的執行
● 在特別強調一次:
sequences 堵塞的是當前執行序,而不是只堵塞 Mainthread
// 創建指定 Thread 測試 fun main() : Unit = runBlocking(newSingleThreadContext("Hello")) { // 堵塞當前 thread sequencesUse() }
監看 Flow 結束:就算異常也要看到結束
● Flow 是一個 Suspend function,如果需要在 Flow 結束時 (不管是正常結束、還是拋出錯誤),通知使用者進行操作
這裡只說明如何判斷 Flow 的結束,並 不包含 Flow 的錯誤捕捉
● 可以通過以下兩種方式
A. imperative:用一個大的 try/finally 包裹,透過 finally 通知使用者 Flow 已經完成;
suspend fun flowTryFinally() = coroutineScope {
try {
flowOf(1, 2, 3)
.map {
it * it
}
.collect {
if (it == 4) {
throw Exception("Test throw. $it")
}
}
} finally {
println("Try finally flow finish.")
}
}
fun main() : Unit = runBlocking {
flowTryFinally()
// 不會執行到
println("Main finish.")
}
沒有 catch 的話就不能捕捉 Exception,只能通過 finally 知道最終結果
B. declatative:透過 onCompletion
函數,就可以達到上面 finally
的相同效果!
suspend fun flowOnCompletion() = coroutineScope {
flowOf(1, 2, 3)
.map {
it * it
}
.onCompletion {
println("onCompletion flow finish.")
}
.collect {
if (it == 9) {
throw Exception("Test throw. $it")
}
}
}
fun main() : Unit = runBlocking {
// flowTryFinally()
flowOnCompletion()
// 不會執行到
println("Main finish.")
}
Flow 異常處理 / 重試
● 上面我們說了 Flow 的結束,但並沒有說如何處理異常;而這邊我們就特別來說說 Flow 是如何處理異常的;Flow 處理異常有兩種方案
A. 使用傳統的 try/catch
處理
suspend fun traditionTryCatch() {
try {
flowOf(1, 2, 3)
.map {
it * it
}
.onCompletion {
println("onCompletion flow finish.")
}
.collect {
if (it == 9) {
throw Exception("Test throw. $it")
}
}
} catch (e : Exception) {
println("flow get exception: $e")
}
}
fun main() : Unit = runBlocking {
traditionTryCatch()
println("Main finish")
}
B. 使用 catch
操作符:catch 操作符可以捕捉 上游 的操作錯誤
suspend fun flowCatch() {
flowOf(1, 2, 3)
.map {
if (it == 3) {
throw Exception("Test throw. $it")
}
it * it
}
.catch {
// 捕捉上游
println("flow get exception: $it")
}
.onCompletion {
// 不影響下游
println("onCompletion flow finish. e: $it")
}
.collect {
println("Flow catch: $it.")
}
}
● 何謂「上游錯誤 」? 就是在設定
catch
操作符之前的錯誤suspend fun flowCatch2() { flowOf(1, 2, 3) .catch { // 這時就無法捕捉 map 中的錯誤 println("flow get exception: $it") } .map { if (it == 3) { throw Exception("Test throw. $it") } it * it } .onCompletion { println("onCompletion flow finish. e: $it") } .collect { println("Flow catch: $it.") } }
由此我們也 可以知道
catch
是無法處理collect
中的錯誤的
● Flow 在發生錯誤時可以透過 retry
、retryWhen
操作符對上游做重試
●
retry
重試次數是都是從 1 開始計算;retryWhen
重試次數是都是從 0 開始通知
A. retry
操作符可以透過 return Boolean 來決定是否再次重試;Return true 代表重試,否則不重試
suspend fun flowCatchRetry() {
flowOf(1, 2, 3)
.onEach {
println("Current number=($it)")
if (it == 2) {
throw Exception("Test throw=($it)")
}
}
.retry(2) { // 重試兩次
if (it.message == "Test throw=(2)") {
println("Handle exception.")
return@retry true
}
false
}
.onCompletion {
println("onCompletion flow finish.")
}
.collect {
if (it == 9) {
throw Exception("Test throw. $it")
}
}
}
B. retryWhen
可以達到跟上述一樣的效果;retryWhen
會不斷地重試,並且它多了一個當前重試次數給使用者判斷
suspend fun flowCatchRetryWhen() {
flowOf(1, 2, 3)
.onEach {
println("Current number=($it)")
if (it == 2) {
throw Exception("Test throw=($it)")
}
}
.retryWhen {e, times ->
println("Current try times=($times), e=($e)")
if(e is Exception) {
// 同 `retry`,返回 true 代表同意重試
return@retryWhen times < 2
}
false
}
.onCompletion {
println("onCompletion flow finish.")
}
.collect {
if (it == 9) {
throw Exception("Test throw. $it")
}
}
}
Flow & RxJava 對比
Koltin 協程可以透過一些類的組合來達到等同於 RxJava 的效果,如下表
RxJava | Corotines |
---|---|
Single<T> | Defered<T> |
Maybe<T> | Defered<T> |
Completable | Job |
Observable<T> | Channel<T> 、Flow<T> |
Flowable<T> | Channel<T> 、Flow<T> |
Cold Stream 冷流:Flow
● Cold Stream 如同上面的範例一樣,在呼叫 Flow#collect
後才開始運行 Flow 的流程
如同 RxJava 使用 subscribe 函數才開始運行
A. 實驗一:延遲 collect
,觀察是否會少收到 Flow 的資料
@Test
fun testColdStream() {
var flow: Flow<String>? = null
CoroutineScope(Dispatchers.IO).launch {
println("start flow: ${Thread.currentThread().name}")
flow = flowOf("A", "B", "C", "D", "E", "F", "G")
.onEach { // onEach 每次發送前執行
delay(100)
println("${Thread.currentThread().name}, send: $it")
}
}
println("start")
runBlocking {
// 延遲 `collect`
delay(300)
flow!!.collect {
println("${Thread.currentThread().name}, receive: $it")
}
}
println("done")
}
從結果可以看出來,我其實已經延遲
collect
,但是 Flow 仍是等到有人collect
才發送資料
B. 實驗二:創建兩個 collect
,並延遲 collect
,觀察兩個是否會少收到 Flow 的資料
@Test
fun testColdStream() {
var flow: Flow<String>? = null
CoroutineScope(Dispatchers.IO).launch {
println("start flow: ${Thread.currentThread().name}")
flow = flowOf("A", "B", "C", "D", "E", "F", "G")
.onEach { // onEach 每次發送前執行
delay(100)
println("${Thread.currentThread().name}, send: $it")
}
}
println("start")
runBlocking {
delay(300)
flow!!.collect {
println("1111 ${Thread.currentThread().name}, receive: $it")
}
delay(100)
flow!!.collect {
println("2222 ${Thread.currentThread().name}, receive: $it")
}
}
println("done")
}
從結果可以看到,第二個
collect
仍會完整地收到全部的資料
Hot Stream 熱流:MutableSharedFlow
● Hot Stream 如同直播,數據要即時擷取否則過了就不會再見到;Kotlin 則可以透過 MutableSharedFlow
的協助來到相同的效果
@Test
fun testHotStream() {
// 創建發射器,熱流發射器
val flow: MutableSharedFlow<String> = MutableSharedFlow()
CoroutineScope(Dispatchers.IO).launch {
println("start flow: ${Thread.currentThread().name}")
flowOf("A", "B", "C", "D", "E", "F", "G")
.onEach {
delay(100)
println("${Thread.currentThread().name}, send: $it")
flow.emit(it)
}
.onCompletion {
println("Flow completed")
}
.launchIn(this) // 啟動 Flow,這裡是使用 launchIn
}
println("start")
runBlocking {
delay(300)
// 只會收到當下的數據!
flow.collect {
println("${Thread.currentThread().name}, receive: $it")
}
}
println("done")
}
Backpressure 背壓
首先先來介紹一下 何謂 Backpressure ? 我們知道 Flow 就像是一個生產者消費者模型,而 Backpressure 的情況則是 生產者的產量遠遠大於消費者
Backpressure 產生後如果沒有策略處理則可能導致應用崩潰
RxJava 有對 Backpressure 有相對應的策略,反映在 Flow 中也有相同的策略,請見下表
RxJava | Flow | 說明 |
---|---|---|
BUFFER | buffer() | Buffer 用來儲存尚未處理的數據,沒有固定 Buffer 大小,有可能導致 OOM |
DROP | - | Flow 緩衝池滿了,則拋棄準備進入緩衝池的新數據 |
LATEST | conflate() | 行為同 DROP ,但 LETEST 會強制將最後一個數據放入緩衝池中 |
Flow 實現 BUFFER 策略
● 首先先來看看沒有 Buffer 時,Flow 面對消費者/生產者時間差,會有甚麼反應
suspend fun flowWithoutBuffer() = coroutineScope {
fun curTime() = System.currentTimeMillis()
var startTimeStamp : Long = 0
flowOf(1, 2, 3, 4, 5)
.onStart {
startTimeStamp = curTime()
}.onEach {
println("Supplier $it (${curTime() - startTimeStamp} ms).")
}
.collect {
// 一個一個處理
println("Consumer $it start(${curTime() - startTimeStamp} ms).")
delay(500) // 延緩消費者
println("Consumer $it finish(${curTime() - startTimeStamp} ms).")
}
}
如下圖,我們可以看到沒有 Buffer 機制,那 flow 則要等待 collect
處理完,才能發送下一個 emit
數據
● 以下使用 flow 來達成 RxJava BUFFER
的功能 (重點其實就是加了一個 buffer
函數)
suspend fun flowWithBuffer() = coroutineScope {
fun curTime() = System.currentTimeMillis()
var startTimeStamp : Long = 0
flowOf(1, 2, 3, 4, 5)
.onStart {
startTimeStamp = curTime()
}.onEach {
println("Supplier $it (${curTime() - startTimeStamp} ms).")
}
.buffer() // 不限定則 Buffer 無限大
.collect {
// 一個一個處理
println("Consumer $it start(${curTime() - startTimeStamp} ms).")
delay(500) // 延緩消費者
println("Consumer $it finish(${curTime() - startTimeStamp} ms).")
}
}
故意延遲 Comsumer 消耗,但這些仍在 Buffer 中,就不必等待 collect
處理完就可以 emit 下一個數據
● Flow Buffer 如果沒有給定 capacity
限制則無限大,假設有給定 capacity
數值,則須加設預設的 2 個容量
suspend fun flowBuffer_2() {
fun curTime() = System.currentTimeMillis()
var startTimeStamp : Long = 0
flowOf(1, 2, 3, 4, 5)
.onStart {
startTimeStamp = curTime()
}.onEach {
println("Supplier $it (${curTime() - startTimeStamp} ms).")
}
.buffer(1) // 它預設有 2 個 capacity
.collect {
// 一個一個處理
println("Consumer $it start(${curTime() - startTimeStamp} ms).")
delay(500) // 延緩消費者
println("Consumer $it finish(${curTime() - startTimeStamp} ms).")
}
}
可以看到 buffer 明明設定為 1,不過 flow 在 emit 時直到 3 才真正堵塞,由此可見 buffer
預設有 2 個 capacity
BUFFER 策略:異步併發
● Flow 使用 buffer
操作就可以達到併發的效果 (如果 Buffer 尚未滿的情況下)
A. 首先我們知道一般 非 Channel 的 Flow 是一個同步操作,必須要等待 collect 操作完才可以執行下一個步驟;
suspend fun flowNoBuffer() {
val uesTimes = measureTimeMillis {
flowOf(1, 2, 3, 4, 5)
.onEach {
delay(100)
}
.collect {
delay(500)
println("$it")
}
}
println("Without buffer=($uesTimes ms)")
}
B. 這時如果多了 buffer
情況就會如同「ChannelFlow」,不需等待 collect 結束就可以執行下一個操作;可以達到類似 ChannelFlow 的效果
suspend fun flowBufferAsChannel() {
val uesTimes = measureTimeMillis {
flowOf(1, 2, 3, 4, 5)
.onEach {
delay(100)
}
.buffer()
.collect {
delay(500)
println("$it")
}
}
println("Without buffer=($uesTimes ms)")
}
Flow 實現 LATEST 策略
● 使用 Flow 實現 RxJava 中的 LATEST
(LATEST
的特色是會保存最後一個數據,我們就檢查最後一個數據是否有被保存)
suspend fun flowLatest() {
fun curTime() = System.currentTimeMillis()
var startTimeStamp : Long = 0
flowOf(1, 2, 3, 4, 5)
.onStart {
startTimeStamp = curTime()
}.onEach {
println("Supplier $it (${curTime() - startTimeStamp} ms).")
}
.conflate() // LATEST 策略
.collect {
// 一個一個處理
println("Consumer $it start(${curTime() - startTimeStamp} ms).")
delay(500) // 延緩消費者
println("Consumer $it finish(${curTime() - startTimeStamp} ms).")
}
}
Flow 其他更多操作
轉換 transform
● 在 transform
操作符中幾個特點:可以多次 emit 數據、emit 數據沒有限制
A. 多次 emit
suspend fun transformMultiEmit() = coroutineScope {
val startTimeStamp = System.currentTimeMillis()
(1..3).asFlow()
.transform {
println("Transform --- $it")
emit(it * 2) // 多次 emit
delay(100)
emit(it * 4)
}
.collect{
println("Collect($it), time=(${System.currentTimeMillis() - startTimeStamp})")
}
}
B. emit 數據沒有限制:一般 Flow 是有限制 emit 數據類型,其類型必須與 Flow 相同,而 transform
內的 emit 則沒有限制
suspend fun transformEmitOtherType() = coroutineScope {
val startTimeStamp = System.currentTimeMillis()
(1..3).asFlow()
.transform {
println("Transform --- $it")
emit(it) // 多次 emit
delay(100)
emit("Hello: $it")
}
.collect{
println("Collect=($it), time=(${System.currentTimeMillis() - startTimeStamp})")
}
}
限制取用 take
● 一般的 Flow 在發射數據 (emit) 時都沒有限制,這時 如果你要限制數據的發射接收數量,就可以使用 take
操作符
fun main() : Unit = runBlocking {
flowOf(1, 2, 3, 4, 5)
.take(3) // 限制接收數量
.collect {
println("Flow with take=($it)")
}
println("Main finish.")
}
Flow 計算結果
● Flow 除了 emit 以外我們還可以透過兩個操作符來聚集所有 Flow 的結果
A. reduce
操作符:獲取上一次的結果,返回下一個結果
suspend fun flowReduce() = coroutineScope {
val res = (1..5)
.asFlow()
.reduce {lastValue, curValue ->
println("lastValue=($lastValue), curValue=($curValue)")
lastValue + curValue
}
println("Reduce=($res)")
}
B. fold
操作符:跟 reduce
操作符很像,不過它可以設定初始值,透過初始值開始計算
suspend fun flowFold() = coroutineScope {
val res = (1..5)
.asFlow()
// 初始值設定為 3,從 3 開始計算
.fold(3) {lastValue, curValue ->
println("lastValue=($lastValue), curValue=($curValue)")
lastValue + curValue
}
println("Reduce=($res)")
}
合併操作符
● Flow 也有合併操作符,可以合併兩個不同的 Flow
A. zip
操作符:兩個不同的 Flow
suspend fun flowZip() = coroutineScope {
val flowA = flowOf(1, 2, 3, 4, 5)
val flowB = flowOf("A", "B", "C", "D", "E")
flowA.zip(flowB) {
a, b ->
val tmp = "Zip flowA=($a), flowB=($b)"
println(tmp)
tmp
}.collect {
println("Collect=($it)")
}
}
● 當兩個 Flow 的數據量不同時,會以最少數據量的 Flow 為準
suspend fun flowZip2() = coroutineScope { // 數量差異 val flowA = flowOf(1, 2, 3, 4) val flowB = flowOf("A", "B", "C", "D", "E") flowA.zip(flowB) { a, b -> val tmp = "Zip flowA=($a), flowB=($b)" println(tmp) tmp }.collect { println("Collect=($it)") } }
B. combine
操作符:與 zip
類似 (但不同);當兩個 Flow 數量不同時,不足處會取用最後一個數據來合併
suspend fun flowCombine() = coroutineScope {
val flowA = flowOf(1, 2)
val flowB = flowOf("A", "B", "C", "D", "E")
flowA.combine(flowB) {
a, b ->
val tmp = "Combine flowA=($a), flowB=($b)"
println(tmp)
tmp
}.collect {
println("Collect=($it)")
}
}
● 上面兩個操作符會將 Flow 合併成一個 Flow (算是合併、並聯);操作符 flatMerge
則是 串聯 Flow,將數據會合成一個流
suspend fun flowConcat() = coroutineScope {
val flowA = flowOf(1, 2, 3, 4)
val flowB = flowOf("A", "B", "C", "D", "E")
flowOf(flowA, flowB)
.flattenConcat()
.collect {
println("Collect=($it)")
}
}
Nest Flow: Flat 鋪平
● 如果有使用到巢狀 (Nest) Flow 並控制其行為的狀況就可以使用 Flat 相關操作符
Flat 相關操作符 | 特色 |
---|---|
flatMapConcat | 等待 Flat 內部完成才通知 Collect |
flatMapMerge | 併發操作,不會等待 Flat 內部就直接通知 Collect |
flatMapLatest | 當有第二次數據發送 (emit) 時,就會停止 Collect 接收 |
A. flatMapConcat
操作符:等待 Flat 內部完成才通知 Collect
suspend fun flatConcat() {
var startTime : Long = 0
(1..5)
.asFlow()
.onStart { startTime = System.currentTimeMillis() }
.flatMapConcat {
flow {
emit("$it: First")
delay(500)
emit("$it: Second")
}
}
.collect {
println("Collect=($it), time=(${System.currentTimeMillis() - startTime} ms)")
}
}
B. flatMapMerge
操作符:併發操作,不會等待 Flat 內部就直接通知 Collect
suspend fun flatMerge() {
var startTime : Long = 0
(1..5)
.asFlow()
.onStart { startTime = System.currentTimeMillis() }
.flatMapMerge {
flow {
emit("$it: First")
delay(500)
emit("$it: Second")
}
}
.collect {
println("Collect=($it), time=(${System.currentTimeMillis() - startTime} ms)")
}
}
C. flatMapLatest
操作符:當有第二次數據發送 (emit) 時,就會停止 Collect 接收
suspend fun flatLatest() {
var startTime : Long = 0
(1..5)
.asFlow()
.onStart { startTime = System.currentTimeMillis() }
.flatMapLatest {
flow {
emit("$it: First")
delay(500)
emit("$it: Second")
}
}
.collect {
println("Collect=($it), time=(${System.currentTimeMillis() - startTime} ms)")
}
}
更多的 Kotlin 語言相關文章
在這裡,我們提供了一系列豐富且深入的 Kotlin 語言相關文章,涵蓋了從基礎到進階的各個方面。讓我們一起來探索這些精彩內容!
Kotlin 語言基礎
● Kotlin 語言基礎:想要建立堅實的 Kotlin 基礎?以下這些文章將帶你深入探索 Kotlin 的關鍵基礎和概念,幫你打造更堅固的 Kotlin 語言基礎
Kotlin 特性、特點
● Kotlin 特性、特點:探索 Kotlin 的獨特特性和功能,加深對 Kotlin 語言的理解,並增強對於語言特性的應用
Kotlin 進階:協程、響應式、異步
● Kotlin 進階:協程、響應式、異步:若想深入學習 Kotlin 的進階主題,包括協程應用、Channel 使用、以及 Flow 的探索,請查看以下文章