Overview of Content
在這篇文章中,我們將深入探討 Dart 語言中的併發與異步處理技術。首先,我們會從 Dart 的單執行緒模型開始,證明其運作方式,並介紹如何使用 Isolate 來創建資源隔離的執行緒,在這個過程中,我們會詳細解釋 Isolate 通訊機制,包括 ReceivePort 與 SendPort 的應用,並通過實例證明 Isolate 的併發能力
接下來,我們會解析 Dart 的 Event Loop 機制,探討 EventQueue 和 MicrotaskQueue 的特性與運作方式,並通過實驗驗證其執行順序… 我們還將比較 Dart 與 Kotlin 在事件循環與協程方面的差異,更清楚兩者語言之間對於併發的處理差異
我們還會介紹 Dart 中強大的異步處理工具 Future,深入分析 then、wait 和 catchError 函數的用法,讓您輕鬆應對各種異步操作
最後,本文將帶您認識 Stream 的概念,並通過實例展示如何創建 Stream 物件、將 Stream 轉為廣播模式,以及使用 StreamController 動態添加數據。您將學會如何利用 async* 與 yield 生成 stream,甚至使用 Stream 打造簡易的 EventBus 系統
此外,我們還將講解 async/await 關鍵字,讓您更直觀地掌握異步函數的運作原理
寫文章分享不易,如有引用參考請詳註出處,如有指導、意見歡迎留言(如果覺得寫得好也請給我一些支持),感謝 😀
個人程式分享時比較注重「縮排」,所以可能不適合手機的排版閱讀,建議切換至「電腦版」、「平板版」視窗看
Dart 的併發
我們之前有提到 Dart 是單執行緒(線程)的語言,但是單執行緒運作在耗時操作(像是網路請求、IO 操作)是相當不符合要求的,它會照成應用的卡頓…
所以 Dart 使採用併發(Concurrency)的機制,但是其特點與 Java 併發不同,兩者併發的差異如下表~
| 語言 | 資源隔離性 | 介紹 |
|---|---|---|
| Java | 資源共享 | 由於資源共享,所以要注意資源的同步問題 |
| Dart | 資源隔離,記憶體不共享 | 特性像是 Java 的進程(Process),但是並不是進程,必須把它看成 Java Thread,但是它是記憶體安全模型 |
● 關於多執行緒其實還有「並行」、「併發」概念,兩者個差異的請點擊 Thread & Process 的差異:併發 & 並行概念 了解
證明 Dart 單執行緒
● 用以下程式用來證明 Dart 是單執行緒
import 'dart:io';
void main() {
bool finish = false;
print("ready read file($finish)");
new File(r"app_entry.dart").readAsString().then((value) {
finish = true;
print("already read done($finish)");
});
while(!finish) { } // 會卡在這,在 Event - Loop 會解釋
// 永遠不會執行到 "read file finish",因為主執行序不會等待 File 的 IO
print("read file finish");
}
我們可以發現以上程式永遠不會結束,這是因為 Dart 的單執行緒…
A. 執行到讀取檔案時,Dart 會把 IO 耗時操作放置到事件循環內等待處理,然後繼續執行後續代碼
B. 執行到 while(!finish) 之後,就進入了無限循環,並且這個循環會一直佔有 CPU 資源,導致沒有機會去處理 IO 事件,這個測試無法結束
isolate 類:開啟資源隔離執行緒
事件循環(
Event-Loop)之後的小節會說明
● 我們前面有提及「資源隔離」,那這裡我們就來使用 Dart 的資源隔離類
要注意它與 Java Thread 最大的不同在資源的隔離不共享 (若需要資源必須透過傳遞)… 在 Java 中,執行序內的資源是不隔離的所以可以共享,而不須透過傳遞
首先 import isolate 類,並使用 isolate#spawn 開啟新的 isolate,然後我們做以下實驗來證明一些事情
● 開啟新執行序:
透過 isolate#spawn 方法可以開啟一個新的執行序
import 'dart:isolate';
int? data; // int 是類,所有 default 是 null
void main() {
data = 10;
// 開新線程,spawn 是泛型
// 原型 : external static Future<Isolate> spawn<T> (方法, ,{...})
Isolate.spawn(entryPoint, "Alien");
print("Finish main isolate, data: $data"); // 判斷是否會排隊進行任務 and 數據是否隔離
}
void entryPoint(String message) { // 可以省去 (message)
print("In child isolate, Params: $message, data: $data"); // 證明數據是隔離的
}
從執行結果來看我們可以知道
A. spawn 方法確實可以開啟一個新執行序,因為主執行緒正常會先結束,而開啟的執行序在之後才結束
B. isolate#spawn 方法開啟的執行序是預設不守護主執行緒的,因為主執行緒結束後,透過 spawn 開啟的執行緒也不會結束
isolate 類:開啟資源隔離執行緒
● 證明資源的隔離:
要正名資源的隔離也相當容易,讓不同的執行序持有相同資源的引用,如果資源被改變則是資源共享,而資源沒有被改變則是資源不共享!(也就是資源被隔離)
import 'dart:isolate';
int i = -1;
void main() {
i = 20;
Isolate.spawn(entryPoint, "Test"); // Java 可想成 new Thread
print("Main $i");
}
void entryPoint(String msg) {
print("Isolate $i");
}
從結果可以看到,兩者所取得的資源並不相同,Isolate.spawn 開啟的執行序仍舊是保持初始化的 -1(因為開啟執行緒後執行的 entryPoint 函數,它是取全域的 i 資源)
● 如果是資源共享的情況下,會變怎樣?
資源
i如果是共享的,那代表主執行緒內將i修改為 20,而後Isolate.spawn開啟的執行序也會讀取到i為 20
證明 isolate 資源的隔離
Isolate 通訊:ReceivePort & SendPort
● 由於 isolate 開啟的執行序,其資源是相互隔離的,若需要傳遞資源則必須透過「某種方法傳遞」,在 Dart 中,這種方法就是 ReceivePort & SendPort
ReceivePort & SendPort,就像是 Android 的 Handler (不同執行緒之間的通訊)
每個 isolate 內都可以設定 ReceivePort & SendPort 物件,可以透過這兩個物件來發送與接收資源,參考以下的概念圖
isolate 與 ReceivePort 物件的關係
● ReceivePort 物件接收資源時,是否會被切換到別的執行緒中?
你在哪個執行緒內創建,就會回傳到該執行緒內中,並不用特別設定
● 使用 ReceivePort & SendPort 的範例如下,該範例會讓主執行緒與 isolate 執行緒通訊
A. 在主執行緒(Main Thread)中創建 ReceivePort 物件,並把 ReceivePort 物件內的 SendPort 物件傳遞給 Isolate 創建出的執行序(這樣 Isolate 就可以透過它發送資源給主執行緒)
import 'dart:isolate';
void main() {
ReceivePort receivePort = ReceivePort();
// 創建 isolate 執行緒
Isolate.spawn<SendPort>(isolateFunc, receivePort.sendPort);
// receive 監聽
// 原型: StreamSubscription listen(void onData(var message), {...});
receivePort.listen(mainListenIsolate); // main 監聽 子 isolate 訊息
print("Main~~~~~~ finish"); // Main 並不會等待 isolate !
}
B. 主執行緒監聽 isolate 執行序發送的訊息
// main isolate 監聽子 isolate
void mainListenIsolate(var message) {
if(message is SendPort) {
print("I am Main, I get your send Port");
message.send("Main get"); // 2.
} else {
// 接收子 isolate 訊息
print("Main get Message from isolate: $message");
}
}
C. isolate 執行序接收主執行序的 SendPort 物件,並在 isolate 執行緒內創建 ReceivePort 物件,並將其傳給主執行緒(這樣主執行緒就可以透過它傳遞資源給 isolate 執行緒)
// isolate 執行序
void isolateFunc(SendPort sendPort) {
print('isolate created');
// 創建自己的接收器
ReceivePort receivePort = ReceivePort();
// 透過主執行緒給予的 SendPort 物件發送把 isolate 內的發送器(`SendPort`)發送給主執行緒
sendPort.send(receivePort.sendPort);
// 監聽 Main
receivePort.listen((var message) {
print("Child Isolate get Message from Main: $message");
});
sendPort.send("I am isolate"); // 透過主執行緒給予的 SendPort 物件發送資料給主執行緒
}
主執行序與 isolate 通訊的結果
● 以下做了個有趣的測試,可以用來加強 Dart 是單執行緒的觀念… 一樣是這個小節的範例,在這裡我們在主行緒休眠了 5s 中,那會怎樣呢?
import 'dart:io'; import 'dart:isolate'; void main() { ReceivePort receivePort = ReceivePort(); Isolate.spawn<SendPort>(isolateFunc, receivePort.sendPort); // receive 監聽 // 原型: StreamSubscription listen(void onData(var message), {...}); receivePort.listen(mainListenIsolate); // main 監聽 子 isolate 訊息 // 添加休眠 5 秒 sleep(Duration(seconds:5)); print("Main~~~~~~ finish"); // 證明 Main 並不會等待 isolate ! }isolate 等待主執行序的訊息 在這個範例中,我們在主執行緒中進行了 5 秒的休眠(
sleep)。儘管 Isolate 會在Isolate.spawn後立即啟動並執行isolateFunc函數,但由於主執行緒被 sleep 阻塞,主執行緒無法立即處理來自 Isolate 的訊息這意味著雖然 Isolate 已經獨立運行,但主執行緒必須等待 sleep 結束後,才會繼續處理剩餘的程式碼和非同步事件
因此,在主執行緒恢復後,你會看到 Isolate 發送的訊息被處理,並繼續執行剩餘程式碼
主執行序修眠結束後,就可以與 isolate 執行序通訊
● 有些人可能會發現,為什麼這個範例程式不會結束(要自己按
Ctrl+C才會結束)而範例程式碼不會結束的原因是因為主執行緒的
ReceivePort一直在監聽來自 Isolate 的訊息,這是導致程式持續運行的原因
Isolate 併發證明
● 以下程式碼展示了 Dart 中 Isolate 的併發性,它允許我們在獨立的執行緒中執行代碼
import 'dart:isolate';
void main() {
// 在新的兩個 isolate 中創建兩個任務,這兩個任務做 dowhile
Isolate.spawn(isoMain1, "start---1");
Isolate.spawn(isoMain2, "start------2");
while(true) {}
}
void isoMain1(String str) {
Future.doWhile(() {
print(str);
return true;
});
}
void isoMain2(String str) {
Future.doWhile(() {
print(str);
return true;
});
}
如果不是併發程序的話,就會是單一輸出結果
isolate 併發證明,交換執行
認識 Event Loop
Dart Event Loop 與 Android Handler 機制類似(都是以「事件驅動」模式設計),通過從 Loop 中不斷獲取消息
像是 isolate 發送的消息就是通過 Event Loop 處理後,由單執行緒的 Dart 應用接收
Dart Event Loop 特性:EventQueue、MicrotaskQueue
這個小節我們先撇除併發概念,先單獨來觀察
Dart Event Loop的特性
● 首先我們要知道,在 Dart 中一個執行緒(線程)會對應一個事件循環(Event-loop)
● Dart Event Loop 與 Android Handler 不同的是…
A. Android 每個 Thread 都有一個 Event-loop,並對應 單獨的 MessageQueue,再藉由 Handler 處理消息
但是與 Dart 不同的是,Android 天生屬於多執行序
B. Dart 則是一個 Event-Looper 對應兩個 Queue,儲存不同等級的工作,兩個 Queue 的如下表所述
| Dart 事件類型 | 介紹 | 特色 | 範例 |
|---|---|---|---|
| EventQueue | 普通事件 | 每次執行完都會檢查 微事件 | Future.delayed(Duration(seconds: 1)); |
| MicrotaskQueue | 微事件 | 優先等級最高 | Future.microtask() |
黃色區塊為子執行序,要注意 Main 函數會正常執行,並 併發執行 Event-Loop
每次 Event-loop 在執行
EventQueue任務前,都會檢查MicrotaskQueue是否有需要執行的微任務,若MicrotaskQueue有任務要執行會先執行,之後才接續執行EventQueue任務event-loop & queue 檢查順序
●
Event-loop的啟動:Event-loop 在 Dart 應用中是自動啟動的,不需要等 main 完全執行完畢才開始運行… 實際上,Event-loop 是一個「持續運行的機制」,從應用啟動之後就開始運行
EventQueue、MicrotaskQueue 驗證執行順序
● 在 Dart Event Loop 中插入任務的使用範例如下
● 插入任務至 EventQueue:
then 函數是在 Event-Loop 檢查 EventQueue 時才執行,只要 EventQueue 內有任務就會被執行(以下任務是 IO)
import 'dart:io';
void main() {
new File(r"app_entry.dart").readAsString().then((value) {
print("執行 Task");
print(value);
});
print("read file finish");
}
任務放入 event-queue
● 插入任務至 MicrotaskQueue:
這裡我們來驗證將任務插入至 MicrotaskQueue 是否會比起把任務插入 EventQueue 還要快被執行
插入任務至 MicrotaskQueue 需要使用 Future.microtask 函數
import 'dart:io';
void main() {
new File(r"app_entry.dart").readAsString().then((value) {
print("執行 Task");
});
// 原型 : factory Future.microtask(FutureOr<T> computation()) {
Future.microtask(() {
print("執行 micro Task");
});
print("Main finish");
}
如下結果,我們可以看到 微任務的執行會在一般任務之前!
任務放入 micro-queue
Micro 微任務:任務插隊
● 現在來證明 Micro 微任務是可以插隊一般任務的 (每次執行一般 Event 都會檢查有沒有需要執行的微任務),驗證程式如下:
我們在主執行序中創建 ReceivePort 物件並監聽事件,當有事件進入主執行序時「插入微任務」,並在微任務執行時打印,來驗證是否微任務不論順序,都會執行在一般任務之前
import 'dart:isolate';
void main() {
ReceivePort receivePort = ReceivePort();
receivePort.listen((message) {
print(message);
Future.microtask(() => print("微任務插隊~"));
});
// 一般 Event
receivePort.sendPort.send("傳送 Message 1");
// 一般 Event
receivePort.sendPort.send("傳送 Message 2");
// 一般 Event
receivePort.sendPort.send("傳送 Message 3");
print("Main function finish");
}
事件循環與協程:Dart vs. Kotlin
● 個人熟悉的協程是 Kotlin 的智能協程,所以以下透過 Kotlin 協程與 Dart 事件循環做比較
● 以下範例是最一開始我們看的「Dart 單執行緒的證明」,在這裡我們插入協程的概念會如何?
如果我改成在 while 判斷中修眠,那 Dart 事件循環是否可以像是 Kotlin 中提供的協程一樣,在主執行緒空閒時幫我去處理任務呢?
void main() {
...
new File(r"app_entry.dart").readAsString().then((value) {
// 會不會有機會處理到這裡?
});
while(!finish) {
print("test...");
sleep(const Duration(seconds: 1));
}
...
}
答案是否定的!就算是這樣修改,程式也不會有機會執行到以上的 IO 任務,因為 Dart 是採用事件循環機制,與 Kotlin 的智能協程 就有本質上的不同
● 兩者個差異關鍵為:
● Dart 事件循環:當主執行緒被阻塞時(例如透過 sleep),事件循環無法繼續,非同步任務的回呼也無法執行
● Kotlin 協程:Kotlin 的協程能夠在任務之間智慧切換,避免主執行緒阻塞,能夠繼續處理其他任務
所以以效能來說的話 Kotlin 的協程能帶來更高的效能,不過相對的撰寫起來需要更多的概念,複雜度也會更高
結合 Dart 事件循環、執行序
● 首先我們仍要抱有一個觀念 Dart 是單執行序的!那它是怎麼(或是說何時)去執行異步任務的呢?這裡需要再澄清兩個觀念
● 前面小節我們所說的「Dart 事件循環」,是在說明 Dart 的 Event-loop 特性與內部結構,這時與執行序尚未產生關聯
● Event-loop 機制與異步任務關聯:
Event-loop 是用來掃描其內部的兩個 Queue 事件的機制,而我們撰寫的 異步任務就會放置到 Queue 中
● Event-loop 機制與執行序產生關聯
那放置在兩個 Queue 的任務何時被執行呢?
Dart 不會另外起一執行序來執行,仍舊保值單一執行序執行,而這個執行是採用「協作式」的方式執行… 也就是說「異步任務會在主執行序空閒時被執行」
認識 Future
在 Dart 庫中隨處可見 Future 物件… 並且 通常異步函數返回的物件就是一個 Future (像是 Isolate.spawn 方法返回就是一個 Future 物件)
// Isolate.dart
// spawn 方法,返回值就是 Future
external static Future<Isolate> spawn<T>(
void entryPoint(T message), T message,
{bool paused: false,
bool errorsAreFatal,
SendPort onExit,
SendPort onError,
@Since("2.3") String debugName});
以下介紹幾個 Future 常用的 API,如下表所示
| Future API | 介紹 |
|---|---|
| then(...) | 當任務執行完畢後 |
| wait(...) | 等待以上 Future 任務都執行完後再一起做結尾 |
| catchError(...) | 抓取異步錯誤 |
then 函數:異步的回調、異步串接處理
● 異步的回調:
在呼叫異步函數時會返回一個 Future 物件,然後我們可以執行 then 函數,這個 then 函數就是異步的回調函數(Callback function),以下是個簡單的使用範例
void main() {
new File(r"app_entry.dart").readAsString().then((value) {
print("Read file finish");
});
}
then 異步回調的結果
● 異步串接處理:
Future#then 函數返回的也是一個 Future 物件,所以可以接續使用
如同 Java 的 Builder 建構者模式,這在做網路 API 請求的時候非常好用
範例如下:
void main() {
new File(r"app_entry.dart").readAsString().then((value) {
print("Read file finish");
return value.length;
}).then((length) {
print("File length: $length");
});
}
then 串接任務
wait 函數:結合異步任務
● 結合異步任務:
透過 Future#wait 函數,我們可以用來等待多個 Future 都執行完後在做統一處理,可以用來做異步任務協作)
範例如下所示
void main() {
// 異步任務一
Future readFile = new File(r"app_entry.dart").readAsString().then((value) {
return value.length;
});
// 異步任務二
Future delay = Future.delayed(const Duration(seconds: 3));
// 等待兩任務都結束
Future.wait([readFile, delay]).then((resultArray) {
print("第一個 Future : ${resultArray[0]}");
print("第二個 Future : ${resultArray[1] ??= "null"}");
});
print("main function finish");
}
如下圖所見,我們會看到 wait 函數確實會等待兩個任務都執行完畢才會被執行
with 結合異步任務
catchError 函數:捕捉異常
● 捕捉異步任務異常:
使用 file#readAsString 切換為異步讀取資源 (該函數返回 Future),這時就可以使用 Future#catchError 函數來捕捉錯誤
以下範例會讀取一個不存在的檔案
import 'dart:io';
void throwError() {
// readAsString 原型 : Future<String> readAsString({Encoding encoding: utf8});
new File(r"123.txt").readAsString().then((value) {
// 讀取不存在文件
print(value);
}).catchError((e,s) { // catchError 原型 Future<T> catchError(Function onError, {bool test(Object error)});
print("----> Caught an exception: $e");
print("----> Stack trace: $s");
});
}
void main() {
throwError();
}
如下圖所見,我們會捕捉到 PathNoFoundException 異常
catchError 捕捉異常
如果沒有捕捉錯誤,則會產生
Unhandled exception異常,對於 Dart 異常不熟悉的可以看 Dart 的捕捉異常限制 這篇文章
● Future#catchError 也接受捕捉特定類型異常,只要透設定第二個參數就可以指定異常類型,範例如下
import 'dart:io';
void throwError() {
new File(r"123.txt").readAsString().then((value) {
// 讀取不存在文件
print(value);
}).catchError((e, s) {
print("----> Caught an exception: $e");
print("----> Stack trace: $s");
}, test: (e) => e is PathNotFoundException);
}
void main() {
throwError();
}
catchError 捕捉指定異常類型
認識 Stream
Stream 表示的也是異步數據,它是 Dart 中處理異步事件「流」的 API
它與 Future 的差異在,Future 表示 一次性 任務的返回,而 Stream 可以 分多次 異步任務,這樣的好處有…
A. 可以減少一次性記憶體的消耗量(因為 Stream 可以分批回傳)
B. 響應速度較快;就像是加載較大的網頁時,Stream 可以先回傳已經加載好的部分,這樣可以有比較好的使用體驗(而 Future 則是全部加載完畢後再一次返回)
測試 Stream 與 Future 的不同
● Stream 有監聽函數可以監聽到目前執行到哪,像是 onDone、onError、resume、pause...等等函數可以使用(如下表所示),與 Future 最大不同在於 Stream 不會單次讀取完成
| 監聽方法 | 說明 |
|---|---|
| onData(必須參數) | 收到 Data 時會被觸發 |
| onError | 收到 Error 時觸發 |
| onDone | 結束 Stream 流時觸發 |
| unsubscribeOnError | 當第一次收到 onError 時是否取消 Stream 流 |
以下範例中我們來讀取大一點的檔案(linux-5.6.4.tar.xz),這樣就可以明顯看出兩者個不同
import 'dart:async';
import 'dart:io';
import 'dart:isolate';
// 讀取次數
const fileName = '/Users/user/Downloads/linux-5.6.4.tar.xz';
int times = 0;
void main() {
Isolate.spawn(isolateFuture, "start read File By Future");
Isolate.spawn(isolateStream, "start read File By Stream");
while(true) {}
}
void isolateFuture(String str) {
print(str);
new File(fileName).readAsBytes().then((value) {
print("Future : ${times++}");
});
print("Future Finish\n\n");
}
void isolateStream(String str) {
print(str);
// 多次
StreamSubscription<List<int>> listen = new File(fileName).openRead().listen((event) {
print("Stream : ${times++}");
});
// 結束通知閉包,還有 onError 等等...
listen.onDone(() {
print("onDone");
});
// stream 也可以暫停
listen.pause();
listen.resume();
print("Stream Finish");
}
看到結果知道 Stream 會多次讀取資料,而 Future 是一次性的全部讀取進記憶體中
stream vs. future
創建 Stream 物件
● 來看看幾個比較常見的 Stream 建構方法,如下表所示
| Factory 建構 | 功能 |
|---|---|
| Stream.fromFuture | 當 future 完成時將觸發一個數據或錯誤,然後立即關閉這個流 |
Stream.fromFutures(多了個 s) | 每個 future 都有自己的數據(onData)或錯誤(onError)事件,當整個 future 完成後,流將會關閉,如果 future 為空,流將會立即關閉 |
| Stream.fromIterable | 從集合中戶取數據的單訂閱流 |
A. Stream.fromFuture 範例:
import 'dart:async';
void main() {
Future<String> myFuture = Future.delayed(
Duration(seconds: 2),
() => 'Hello from Future!');
Stream<String> streamFromFuture = Stream.fromFuture(myFuture);
streamFromFuture.listen(
(data) {
print('Data: $data');
},
onError: (error) {
print('Error: $error');
},
onDone: () {
print('Stream closed.');
},
);
}
B. Stream.fromFutures 範例:跟 Stream.fromFuture 差異不大,只是它會同時管理多個 Future
import 'dart:async';
void main() {
Future<String> future1 = Future.delayed(Duration(seconds: 1), () => 'First Future');
Future<String> future2 = Future.delayed(Duration(seconds: 2), () => 'Second Future');
Future<String> future3 = Future.delayed(Duration(seconds: 3), () => 'Third Future');
Stream<String> streamFromFutures = Stream.fromFutures([future1, future2, future3]);
streamFromFutures.listen(
(data) {
print('Data: $data');
},
onError: (error) {
print('Error: $error');
},
onDone: () {
print('Stream closed.');
},
);
}
C. Stream.fromIterable 範例:
import 'dart:async';
void main() {
List<int> myIterable = [1, 2, 3, 4, 5];
Stream<int> streamFromIterable = Stream.fromIterable(myIterable);
streamFromIterable.listen(
(data) {
print('Data: $data');
},
onError: (error) {
print('Error: $error');
},
onDone: () {
print('Stream closed.');
},
);
}
● 如果沒有額外設定,Stream 預設是單監聽模式,如果監聽數量大於一個以上的監聽則會拋出錯誤
void main() {
var stream = new File(r"/Users/user/Downloads/linux-5.6.4.tar.xz").openRead();
// stream 流讀取
stream.listen((List<int> bytes) {
print("stream listen 1");
});
// 錯誤! 只能有一個監聽者
stream.listen((List<int> bytes){
print("stream listen 2");
});
}
Stream 轉廣播模式:多個監聽者
● Stream 預設是單監聽模式,若要讓 Stream 可以讓多個用戶監聽(允許多個用戶監聽),就要將其轉為「廣播模式」
透過 Broadcast#asBroadcastStream 方法,就可以將 Stream 轉為可多個監聽者
import 'dart:io';
void main() {
// Stream 轉為 Broadcast 就可以多個監聽
var broadcastStream = new File(r"/Users/user/Downloads/linux-5.6.4.tar.xz").openRead().asBroadcastStream();
broadcastStream.listen((event){
print("廣播訂閱者 - 1");
});
broadcastStream.listen((event){
print("廣播訂閱者 - 2");
});
print("Main Finish");
}
StreamController:可動態添加數據
● StreamController 是流管理器,可透過 factory 建構函數的 broadcast 創建 StreamController,並且它的多訂閱模式是「熱訂閱」
我們可以從以下範例了解到 StreamController 如何使用:
A. 普通的 Stream 流是一個「單訂閱的不可變流」,它不可以臨時添加數據
void main() {
var stream = Stream.fromIterable([1, 2, 3]);
// 3 秒後添加訂閱者,Timer 延遲 (時間,callback)
Timer(const Duration(seconds: 3),
() => stream.listen((Object o) => print("延遲三秒: $o")));
}
B. StreamController 則是一個可以動態添加數據的流,但是它屬於「熱流」(當沒接收到數據就會流失掉)
void main() {
//創建一個 StreamController
var streamController = StreamController.broadcast(); // factory 構造器
// 在以後微任務中獲得事件
streamController.add("Hello"); // 無法接收,因為尚未有 listener
//訂閱事件
streamController.stream.listen((i){
print("StreamController broadcast: $i");
});
streamController.add("World"); // 可以被接收,因為已有 listener
// 記得關閉流 !!!
streamController.close();
}
如下圖所見,在 listen 之前的 Hello 數據是無法被 StreamController 監聽到的,只有在 listen 之後的 World 可被監聽到
streamcontroller 熱流監聽
通過 async*-yield 生成 stream
● 使用 async* 需要搭配上 yield 這個關鍵字一起使用,可以讓數據異步返回 (可執行判斷到一半就返回)
範例如下
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
yield i; // 每接收到一個數據就直接拋出異步訊息 (該拋出訊息不會中斷程式流程)
}
}
使用方式如下:
A. 使用 await-for 來消費 Stream 流
void main() async {
// 使用 countStream 函數生成一個 Stream
await for (int value in countStream(5)) {
print('Received: $value');
}
print('Stream processing completed.');
}
await-for 取得 stream 結果
B. 使用 listen 方法來監聽流
void main() {
// 使用 countStream 函數生成一個 Stream
final stream = countStream(5);
// 訂閱這個 Stream 並監聽數據
stream.listen((value) {
print('Received: $value');
},
onDone: () {
print('Stream processing completed.');
},
);
}
listene 取得 stream 結果
用 Stream 做簡易 EventBus
● 使用 Dart 的廣播機制創建 Android 的第三方庫 EventBus,其功能類似於「觀察者模式」,主要的行為有 1. 訂閱、2. 通知
● 我們可以透過 StreamController 物件來設定、過濾泛型類型,以 達到指定類型的註冊與通知
範例如下:
● EventBus 類實作:
import 'dart:async';
class EventBus {
static EventBus _instance;
static StreamController _streamController;
EventBus._internal() {
_streamController = StreamController.broadcast();
}
factory EventBus.getDefault() {
// 單例模式
return _instance ??= EventBus._internal();
}
StreamSubscription<T> register<T>(void onData(T event)) {
if(T == dynamic) { // 未指定類型
return _streamController.stream.listen(onData);
} else {
// 傳送指定類型
return _streamController.stream.where((type) => type is T) // 判斷
.cast<T>() // 強制轉型
.listen(onData);
}
}
// 傳送廣播資訊,多廣播
void post<T>(T msg) {
_streamController.add(msg);
}
// 關閉廣播流
void close() {
_streamController.close();
}
}
● 使用自製的 EventBus 類:
import 'eventBus.dart';
void main() {
EventBus.getDefault().register((event) {
print("Receive All Message, from EventBus: $event");
});
EventBus.getDefault().register<String> ((event) {
print("--- Receive String Message, from EventBus: $event");
});
EventBus.getDefault().register<int>((event) {
print("--- --- Receive int Message, from EventBus: $event");
});
EventBus.getDefault().register<double>((event) {
print("--- --- --- Receive double Message, from EventBus: $event");
});
EventBus.getDefault().post(1111);
EventBus.getDefault().post("Hello World");
EventBus.getDefault().post(765.123);
EventBus.getDefault().close();
}
async / await 關鍵字
使用 async 關鍵字描述的函數是 異步代碼塊 (概念類似新開執行序)
使用 await 用來描述呼叫的函數,可以用來 等待異步執行的任務完畢
async 異步函數
● 被 async 描述的函數只能返回 void 或是 Future 類(因為是異步)
使用範例如下:
import 'dart:io';
void main() {
var string = asyncReadFile();
string.then((value) => print("Read Finish"));
}
// 異步函數
Future asyncReadFile() async { // 使用 async 描述
var file = new File(r"D:\mingw64\build-info.txt").readAsString();
return file;
}
await 等待函數
● await 必須要配合 async 使用,無法單獨使用 await 描述呼叫的函數
● await 可以有規律的順序執行,它會等待上一個異步函數結束才往下執行
我們可以透過以下範例了解 await 的功能、好處:
A. 尚未使用 await 描述異步任務,那異步任務的讀取順序將會不同(每次執行都可能是不同的結果)
void noSync() {
// 讀取順序不同,File1(linux-5.6.4.tar.xz) 較大的會讀得比較慢,File2(build.gradle) 會先讀完
new File(r"D:\Linux Code\linux-5.6.4.tar.xz").readAsBytes().then((value) => print("file1 read finish"));
new File(r"D:\mingw64\build.gradle").readAsString().then((value) => print("file2 read finish"));
}
B. 以往我們可能會使用瘋狂回調的方式,這容易進入回調地獄(callback hell)
import 'dart:io';
// 多重回調,容易混亂
void oldStyle() {
new File(r"D:\Linux Code\linux-5.6.4.tar.xz").readAsBytes().then((value) {
print("then --- file1 read finish");
new File(r"D:\mingw64\build.gradle").readAsString().then((value) {
print("then --- file2 read finish");
});
});
}
C. 使用 await 關鍵字描述後,就可以擺脫這種回調地獄問題,可以讓函數按照預期順序執行,並且可讀性也高
import 'dart:io';
Future orderReadFile() async {
// await 異步中的同步
await new File(r"D:\Linux Code\linux-5.6.4.tar.xz").readAsBytes().then((value) => print("async --- file1 read finish"));
var file2 = await new File(r"D:\mingw64\build.gradle").readAsString().then((value) => print("async --- file1 read finish"));
return file2;
}
更多的 Flutter/Dart 語言相關文章
了解 Flutter 如何在跨平台開發中佔據重要地位,掌握快速上手的技巧與項目建置流程,開啟你的跨平台開發之旅!
探索跨平台與 Flutter 技術的未來:從認識到 Flutter 專案建置 | 3 種跨平台
Dart 語言基礎
● 探討 Dart 語言:宣告、數據類型、操作符 | 從基礎到應用指南
快速掌握 Dart 語言的核心概念,包括變數宣告、數據類型及操作符,為 Flutter 開發奠定扎實基礎。
● Dart 函數與方法、異常處理、引用庫 | Java 比較
深入了解 Dart 的函數與異常處理特性,並與 Java 的處理方式進行比較,幫助你跨語言切換更加順暢。
● 深入解析 Dart 語言:命名慣例、類特性、建構函數與抽象特性
學習 Dart 類的設計邏輯及命名慣例,深入探索抽象類與 Mixin 的強大應用場景。
● 深入探索 Dart 的併發與異步處理:從 Isolate 到 Event Loop 的全面指南 | Future、Stream
徹底搞懂 Dart 的併發與異步處理,掌握 Isolate 與 Event Loop 的運行機制,助你提升應用效能!
深入 Flutter 框架
● 深入解析 Flutter Navigator:常見錯誤、解決方法與路由跳轉技巧、動畫
從常見問題到自定義解決方案,學會如何利用 Navigator 實現路由跳轉與流暢動畫效果。
● 深入理解 Flutter 中的數據共享:從普遍方案到 InheritedWidget | 3 種方案
探討數據共享的最佳實踐,了解 InheritedWidget 等三種主要方案,幫助你優化應用結構。
● 深入解析 Flutter 三顆樹:Widget、Element 與 RenderObject 完整指南
拆解 Flutter 的內部結構,全面了解 Widget、Element 和 RenderObject 之間的關係,提升你的 Flutter 開發技能!



























