Overview of Content
在這篇文章中,我們將深入探討 Dart 語言中的併發與異步處理技術。首先,我們會從 Dart 的單執行緒模型開始,證明其運作方式,並介紹如何使用 Isolate 來創建資源隔離的執行緒,在這個過程中,我們會詳細解釋 Isolate 通訊機制,包括 ReceivePort 與 SendPort 的應用,並通過實例證明 Isolate 的併發能力
接下來,我們會解析 Dart 的 Event Loop 機制,探討 EventQueue
和 MicrotaskQueue
的特性與運作方式,並通過實驗驗證其執行順序… 我們還將比較 Dart 與 Kotlin 在事件循環與協程方面的差異,更清楚兩者語言之間對於併發的處理差異
我們還會介紹 Dart 中強大的異步處理工具 Future,深入分析 then
、wait
和 catchError
函數的用法,讓您輕鬆應對各種異步操作
最後,本文將帶您認識 Stream 的概念,並通過實例展示如何創建 Stream 物件、將 Stream 轉為廣播模式,以及使用 StreamController 動態添加數據。您將學會如何利用 async*
與 yield
生成 stream,甚至使用 Stream 打造簡易的 EventBus
系統
此外,我們還將講解 async/await 關鍵字,讓您更直觀地掌握異步函數的運作原理
寫文章分享不易,如有引用參考請詳註出處,如有指導、意見歡迎留言(如果覺得寫得好也請給我一些支持),感謝 😀
個人程式分享時比較注重「縮排」,所以可能不適合手機的排版閱讀,建議切換至「電腦版」、「平板版」視窗看
Dart 的併發
我們之前有提到 Dart 是單執行緒(線程)的語言,但是單執行緒運作在耗時操作(像是網路請求、IO 操作)是相當不符合要求的,它會照成應用的卡頓…
所以 Dart 使採用併發(Concurrency
)的機制,但是其特點與 Java 併發不同,兩者併發的差異如下表~
語言 | 資源隔離性 | 介紹 |
---|---|---|
Java | 資源共享 | 由於資源共享,所以要注意資源的同步問題 |
Dart | 資源隔離,記憶體不共享 | 特性像是 Java 的進程(Process ),但是並不是進程,必須把它看成 Java Thread,但是它是記憶體安全模型 |
● 關於多執行緒其實還有「並行」、「併發」概念,兩者個差異的請點擊 Thread & Process 的差異:併發 & 並行概念 了解
證明 Dart 單執行緒
● 用以下程式用來證明 Dart 是單執行緒
import 'dart:io';
void main() {
bool finish = false;
print("ready read file($finish)");
new File(r"app_entry.dart").readAsString().then((value) {
finish = true;
print("already read done($finish)");
});
while(!finish) { } // 會卡在這,在 Event - Loop 會解釋
// 永遠不會執行到 "read file finish",因為主執行序不會等待 File 的 IO
print("read file finish");
}
我們可以發現以上程式永遠不會結束,這是因為 Dart 的單執行緒…
A. 執行到讀取檔案時,Dart 會把 IO 耗時操作放置到事件循環內等待處理,然後繼續執行後續代碼
B. 執行到 while(!finish)
之後,就進入了無限循環,並且這個循環會一直佔有 CPU 資源,導致沒有機會去處理 IO 事件,這個測試無法結束
isolate 類:開啟資源隔離執行緒
事件循環(
Event-Loop
)之後的小節會說明
● 我們前面有提及「資源隔離」,那這裡我們就來使用 Dart 的資源隔離類
要注意它與 Java Thread 最大的不同在資源的隔離不共享 (若需要資源必須透過傳遞)… 在 Java 中,執行序內的資源是不隔離的所以可以共享,而不須透過傳遞
首先 import isolate
類,並使用 isolate#spawn
開啟新的 isolate,然後我們做以下實驗來證明一些事情
● 開啟新執行序:
透過 isolate#spawn
方法可以開啟一個新的執行序
import 'dart:isolate';
int? data; // int 是類,所有 default 是 null
void main() {
data = 10;
// 開新線程,spawn 是泛型
// 原型 : external static Future<Isolate> spawn<T> (方法, ,{...})
Isolate.spawn(entryPoint, "Alien");
print("Finish main isolate, data: $data"); // 判斷是否會排隊進行任務 and 數據是否隔離
}
void entryPoint(String message) { // 可以省去 (message)
print("In child isolate, Params: $message, data: $data"); // 證明數據是隔離的
}
從執行結果來看我們可以知道
A. spawn
方法確實可以開啟一個新執行序,因為主執行緒正常會先結束,而開啟的執行序在之後才結束
B. isolate#spawn
方法開啟的執行序是預設不守護主執行緒的,因為主執行緒結束後,透過 spawn
開啟的執行緒也不會結束
isolate 類:開啟資源隔離執行緒
● 證明資源的隔離:
要正名資源的隔離也相當容易,讓不同的執行序持有相同資源的引用,如果資源被改變則是資源共享,而資源沒有被改變則是資源不共享!(也就是資源被隔離)
import 'dart:isolate';
int i = -1;
void main() {
i = 20;
Isolate.spawn(entryPoint, "Test"); // Java 可想成 new Thread
print("Main $i");
}
void entryPoint(String msg) {
print("Isolate $i");
}
從結果可以看到,兩者所取得的資源並不相同,Isolate.spawn
開啟的執行序仍舊是保持初始化的 -1(因為開啟執行緒後執行的 entryPoint
函數,它是取全域的 i
資源)
● 如果是資源共享的情況下,會變怎樣?
資源
i
如果是共享的,那代表主執行緒內將i
修改為 20,而後Isolate.spawn
開啟的執行序也會讀取到i
為 20
證明 isolate 資源的隔離
Isolate 通訊:ReceivePort & SendPort
● 由於 isolate 開啟的執行序,其資源是相互隔離的,若需要傳遞資源則必須透過「某種方法傳遞」,在 Dart 中,這種方法就是 ReceivePort
& SendPort
ReceivePort & SendPort,就像是 Android 的 Handler (不同執行緒之間的通訊)
每個 isolate 內都可以設定 ReceivePort & SendPort 物件,可以透過這兩個物件來發送與接收資源,參考以下的概念圖
isolate 與 ReceivePort 物件的關係
● ReceivePort 物件接收資源時,是否會被切換到別的執行緒中?
你在哪個執行緒內創建,就會回傳到該執行緒內中,並不用特別設定
● 使用 ReceivePort & SendPort 的範例如下,該範例會讓主執行緒與 isolate 執行緒通訊
A. 在主執行緒(Main Thread
)中創建 ReceivePort 物件,並把 ReceivePort 物件內的 SendPort 物件傳遞給 Isolate 創建出的執行序(這樣 Isolate 就可以透過它發送資源給主執行緒)
import 'dart:isolate';
void main() {
ReceivePort receivePort = ReceivePort();
// 創建 isolate 執行緒
Isolate.spawn<SendPort>(isolateFunc, receivePort.sendPort);
// receive 監聽
// 原型: StreamSubscription listen(void onData(var message), {...});
receivePort.listen(mainListenIsolate); // main 監聽 子 isolate 訊息
print("Main~~~~~~ finish"); // Main 並不會等待 isolate !
}
B. 主執行緒監聽 isolate 執行序發送的訊息
// main isolate 監聽子 isolate
void mainListenIsolate(var message) {
if(message is SendPort) {
print("I am Main, I get your send Port");
message.send("Main get"); // 2.
} else {
// 接收子 isolate 訊息
print("Main get Message from isolate: $message");
}
}
C. isolate 執行序接收主執行序的 SendPort
物件,並在 isolate 執行緒內創建 ReceivePort 物件,並將其傳給主執行緒(這樣主執行緒就可以透過它傳遞資源給 isolate 執行緒)
// isolate 執行序
void isolateFunc(SendPort sendPort) {
print('isolate created');
// 創建自己的接收器
ReceivePort receivePort = ReceivePort();
// 透過主執行緒給予的 SendPort 物件發送把 isolate 內的發送器(`SendPort`)發送給主執行緒
sendPort.send(receivePort.sendPort);
// 監聽 Main
receivePort.listen((var message) {
print("Child Isolate get Message from Main: $message");
});
sendPort.send("I am isolate"); // 透過主執行緒給予的 SendPort 物件發送資料給主執行緒
}
主執行序與 isolate 通訊的結果
● 以下做了個有趣的測試,可以用來加強 Dart 是單執行緒的觀念… 一樣是這個小節的範例,在這裡我們在主行緒休眠了 5s 中,那會怎樣呢?
import 'dart:io'; import 'dart:isolate'; void main() { ReceivePort receivePort = ReceivePort(); Isolate.spawn<SendPort>(isolateFunc, receivePort.sendPort); // receive 監聽 // 原型: StreamSubscription listen(void onData(var message), {...}); receivePort.listen(mainListenIsolate); // main 監聽 子 isolate 訊息 // 添加休眠 5 秒 sleep(Duration(seconds:5)); print("Main~~~~~~ finish"); // 證明 Main 並不會等待 isolate ! }
isolate 等待主執行序的訊息 在這個範例中,我們在主執行緒中進行了 5 秒的休眠(
sleep
)。儘管 Isolate 會在Isolate.spawn
後立即啟動並執行isolateFunc
函數,但由於主執行緒被 sleep 阻塞,主執行緒無法立即處理來自 Isolate 的訊息這意味著雖然 Isolate 已經獨立運行,但主執行緒必須等待 sleep 結束後,才會繼續處理剩餘的程式碼和非同步事件
因此,在主執行緒恢復後,你會看到 Isolate 發送的訊息被處理,並繼續執行剩餘程式碼
主執行序修眠結束後,就可以與 isolate 執行序通訊
● 有些人可能會發現,為什麼這個範例程式不會結束(要自己按
Ctrl+C
才會結束)而範例程式碼不會結束的原因是因為主執行緒的
ReceivePort
一直在監聽來自 Isolate 的訊息,這是導致程式持續運行的原因
Isolate 併發證明
● 以下程式碼展示了 Dart 中 Isolate 的併發性,它允許我們在獨立的執行緒中執行代碼
import 'dart:isolate';
void main() {
// 在新的兩個 isolate 中創建兩個任務,這兩個任務做 dowhile
Isolate.spawn(isoMain1, "start---1");
Isolate.spawn(isoMain2, "start------2");
while(true) {}
}
void isoMain1(String str) {
Future.doWhile(() {
print(str);
return true;
});
}
void isoMain2(String str) {
Future.doWhile(() {
print(str);
return true;
});
}
如果不是併發程序的話,就會是單一輸出結果
isolate 併發證明,交換執行
認識 Event Loop
Dart Event Loop 與 Android Handler 機制類似(都是以「事件驅動」模式設計),通過從 Loop 中不斷獲取消息
像是 isolate 發送的消息就是通過 Event Loop 處理後,由單執行緒的 Dart 應用接收
Dart Event Loop 特性:EventQueue、MicrotaskQueue
這個小節我們先撇除併發概念,先單獨來觀察
Dart Event Loop
的特性
● 首先我們要知道,在 Dart 中一個執行緒(線程)會對應一個事件循環(Event-loop
)
● Dart Event Loop
與 Android Handler 不同的是…
A. Android 每個 Thread 都有一個 Event-loop
,並對應 單獨的 MessageQueue,再藉由 Handler 處理消息
但是與 Dart 不同的是,Android 天生屬於多執行序
B. Dart 則是一個 Event-Looper 對應兩個 Queue,儲存不同等級的工作,兩個 Queue 的如下表所述
Dart 事件類型 | 介紹 | 特色 | 範例 |
---|---|---|---|
EventQueue | 普通事件 | 每次執行完都會檢查 微事件 | Future.delayed(Duration(seconds: 1)); |
MicrotaskQueue | 微事件 | 優先等級最高 | Future.microtask() |
黃色區塊為子執行序,要注意 Main 函數會正常執行,並 併發執行 Event-Loop
每次 Event-loop 在執行
EventQueue
任務前,都會檢查MicrotaskQueue
是否有需要執行的微任務,若MicrotaskQueue
有任務要執行會先執行,之後才接續執行EventQueue
任務event-loop & queue 檢查順序
●
Event-loop
的啟動:Event-loop 在 Dart 應用中是自動啟動的,不需要等 main 完全執行完畢才開始運行… 實際上,Event-loop 是一個「持續運行的機制」,從應用啟動之後就開始運行
EventQueue、MicrotaskQueue 驗證執行順序
● 在 Dart Event Loop 中插入任務的使用範例如下
● 插入任務至 EventQueue
:
then
函數是在 Event-Loop 檢查 EventQueue 時才執行,只要 EventQueue 內有任務就會被執行(以下任務是 IO)
import 'dart:io';
void main() {
new File(r"app_entry.dart").readAsString().then((value) {
print("執行 Task");
print(value);
});
print("read file finish");
}
任務放入 event-queue
● 插入任務至 MicrotaskQueue
:
這裡我們來驗證將任務插入至 MicrotaskQueue
是否會比起把任務插入 EventQueue
還要快被執行
插入任務至 MicrotaskQueue
需要使用 Future.microtask
函數
import 'dart:io';
void main() {
new File(r"app_entry.dart").readAsString().then((value) {
print("執行 Task");
});
// 原型 : factory Future.microtask(FutureOr<T> computation()) {
Future.microtask(() {
print("執行 micro Task");
});
print("Main finish");
}
如下結果,我們可以看到 微任務的執行會在一般任務之前!
任務放入 micro-queue
Micro 微任務:任務插隊
● 現在來證明 Micro 微任務是可以插隊一般任務的 (每次執行一般 Event 都會檢查有沒有需要執行的微任務),驗證程式如下:
我們在主執行序中創建 ReceivePort
物件並監聽事件,當有事件進入主執行序時「插入微任務」,並在微任務執行時打印,來驗證是否微任務不論順序,都會執行在一般任務之前
import 'dart:isolate';
void main() {
ReceivePort receivePort = ReceivePort();
receivePort.listen((message) {
print(message);
Future.microtask(() => print("微任務插隊~"));
});
// 一般 Event
receivePort.sendPort.send("傳送 Message 1");
// 一般 Event
receivePort.sendPort.send("傳送 Message 2");
// 一般 Event
receivePort.sendPort.send("傳送 Message 3");
print("Main function finish");
}
事件循環與協程:Dart vs. Kotlin
● 個人熟悉的協程是 Kotlin 的智能協程,所以以下透過 Kotlin 協程與 Dart 事件循環做比較
● 以下範例是最一開始我們看的「Dart 單執行緒的證明」,在這裡我們插入協程的概念會如何?
如果我改成在 while
判斷中修眠,那 Dart 事件循環是否可以像是 Kotlin 中提供的協程一樣,在主執行緒空閒時幫我去處理任務呢?
void main() {
...
new File(r"app_entry.dart").readAsString().then((value) {
// 會不會有機會處理到這裡?
});
while(!finish) {
print("test...");
sleep(const Duration(seconds: 1));
}
...
}
答案是否定的!就算是這樣修改,程式也不會有機會執行到以上的 IO 任務,因為 Dart 是採用事件循環機制,與 Kotlin 的智能協程 就有本質上的不同
● 兩者個差異關鍵為:
● Dart 事件循環:當主執行緒被阻塞時(例如透過 sleep
),事件循環無法繼續,非同步任務的回呼也無法執行
● Kotlin 協程:Kotlin 的協程能夠在任務之間智慧切換,避免主執行緒阻塞,能夠繼續處理其他任務
所以以效能來說的話 Kotlin 的協程能帶來更高的效能,不過相對的撰寫起來需要更多的概念,複雜度也會更高
結合 Dart 事件循環、執行序
● 首先我們仍要抱有一個觀念 Dart 是單執行序的!那它是怎麼(或是說何時)去執行異步任務的呢?這裡需要再澄清兩個觀念
● 前面小節我們所說的「Dart 事件循環」,是在說明 Dart 的 Event-loop
特性與內部結構,這時與執行序尚未產生關聯
● Event-loop
機制與異步任務關聯:
Event-loop
是用來掃描其內部的兩個 Queue 事件的機制,而我們撰寫的 異步任務就會放置到 Queue 中
● Event-loop
機制與執行序產生關聯
那放置在兩個 Queue 的任務何時被執行呢?
Dart 不會另外起一執行序來執行,仍舊保值單一執行序執行,而這個執行是採用「協作式」的方式執行… 也就是說「異步任務會在主執行序空閒時被執行」
認識 Future
在 Dart 庫中隨處可見 Future 物件… 並且 通常異步函數返回的物件就是一個 Future (像是 Isolate.spawn
方法返回就是一個 Future 物件)
// Isolate.dart
// spawn 方法,返回值就是 Future
external static Future<Isolate> spawn<T>(
void entryPoint(T message), T message,
{bool paused: false,
bool errorsAreFatal,
SendPort onExit,
SendPort onError,
@Since("2.3") String debugName});
以下介紹幾個 Future 常用的 API,如下表所示
Future API | 介紹 |
---|---|
then(...) | 當任務執行完畢後 |
wait(...) | 等待以上 Future 任務都執行完後再一起做結尾 |
catchError(...) | 抓取異步錯誤 |
then 函數:異步的回調、異步串接處理
● 異步的回調:
在呼叫異步函數時會返回一個 Future
物件,然後我們可以執行 then
函數,這個 then
函數就是異步的回調函數(Callback function
),以下是個簡單的使用範例
void main() {
new File(r"app_entry.dart").readAsString().then((value) {
print("Read file finish");
});
}
then 異步回調的結果
● 異步串接處理:
Future#then
函數返回的也是一個 Future
物件,所以可以接續使用
如同 Java 的 Builder 建構者模式,這在做網路 API 請求的時候非常好用
範例如下:
void main() {
new File(r"app_entry.dart").readAsString().then((value) {
print("Read file finish");
return value.length;
}).then((length) {
print("File length: $length");
});
}
then 串接任務
wait 函數:結合異步任務
● 結合異步任務:
透過 Future#wait
函數,我們可以用來等待多個 Future 都執行完後在做統一處理,可以用來做異步任務協作)
範例如下所示
void main() {
// 異步任務一
Future readFile = new File(r"app_entry.dart").readAsString().then((value) {
return value.length;
});
// 異步任務二
Future delay = Future.delayed(const Duration(seconds: 3));
// 等待兩任務都結束
Future.wait([readFile, delay]).then((resultArray) {
print("第一個 Future : ${resultArray[0]}");
print("第二個 Future : ${resultArray[1] ??= "null"}");
});
print("main function finish");
}
如下圖所見,我們會看到 wait
函數確實會等待兩個任務都執行完畢才會被執行
with 結合異步任務
catchError 函數:捕捉異常
● 捕捉異步任務異常:
使用 file#readAsString
切換為異步讀取資源 (該函數返回 Future),這時就可以使用 Future#catchError
函數來捕捉錯誤
以下範例會讀取一個不存在的檔案
import 'dart:io';
void throwError() {
// readAsString 原型 : Future<String> readAsString({Encoding encoding: utf8});
new File(r"123.txt").readAsString().then((value) {
// 讀取不存在文件
print(value);
}).catchError((e,s) { // catchError 原型 Future<T> catchError(Function onError, {bool test(Object error)});
print("----> Caught an exception: $e");
print("----> Stack trace: $s");
});
}
void main() {
throwError();
}
如下圖所見,我們會捕捉到 PathNoFoundException
異常
catchError 捕捉異常
如果沒有捕捉錯誤,則會產生
Unhandled exception
異常,對於 Dart 異常不熟悉的可以看 Dart 的捕捉異常限制 這篇文章
● Future#catchError
也接受捕捉特定類型異常,只要透設定第二個參數就可以指定異常類型,範例如下
import 'dart:io';
void throwError() {
new File(r"123.txt").readAsString().then((value) {
// 讀取不存在文件
print(value);
}).catchError((e, s) {
print("----> Caught an exception: $e");
print("----> Stack trace: $s");
}, test: (e) => e is PathNotFoundException);
}
void main() {
throwError();
}
catchError 捕捉指定異常類型
認識 Stream
Stream 表示的也是異步數據,它是 Dart 中處理異步事件「流」的 API
它與 Future 的差異在,Future 表示 一次性 任務的返回,而 Stream 可以 分多次 異步任務,這樣的好處有…
A. 可以減少一次性記憶體的消耗量(因為 Stream 可以分批回傳)
B. 響應速度較快;就像是加載較大的網頁時,Stream 可以先回傳已經加載好的部分,這樣可以有比較好的使用體驗(而 Future 則是全部加載完畢後再一次返回)
測試 Stream 與 Future 的不同
● Stream 有監聽函數可以監聽到目前執行到哪,像是 onDone
、onError
、resume
、pause
...等等函數可以使用(如下表所示),與 Future 最大不同在於 Stream 不會單次讀取完成
監聽方法 | 說明 |
---|---|
onData(必須參數) | 收到 Data 時會被觸發 |
onError | 收到 Error 時觸發 |
onDone | 結束 Stream 流時觸發 |
unsubscribeOnError | 當第一次收到 onError 時是否取消 Stream 流 |
以下範例中我們來讀取大一點的檔案(linux-5.6.4.tar.xz
),這樣就可以明顯看出兩者個不同
import 'dart:async';
import 'dart:io';
import 'dart:isolate';
// 讀取次數
const fileName = '/Users/user/Downloads/linux-5.6.4.tar.xz';
int times = 0;
void main() {
Isolate.spawn(isolateFuture, "start read File By Future");
Isolate.spawn(isolateStream, "start read File By Stream");
while(true) {}
}
void isolateFuture(String str) {
print(str);
new File(fileName).readAsBytes().then((value) {
print("Future : ${times++}");
});
print("Future Finish\n\n");
}
void isolateStream(String str) {
print(str);
// 多次
StreamSubscription<List<int>> listen = new File(fileName).openRead().listen((event) {
print("Stream : ${times++}");
});
// 結束通知閉包,還有 onError 等等...
listen.onDone(() {
print("onDone");
});
// stream 也可以暫停
listen.pause();
listen.resume();
print("Stream Finish");
}
看到結果知道 Stream 會多次讀取資料,而 Future 是一次性的全部讀取進記憶體中
stream vs. future
創建 Stream 物件
● 來看看幾個比較常見的 Stream 建構方法,如下表所示
Factory 建構 | 功能 |
---|---|
Stream.fromFuture | 當 future 完成時將觸發一個數據或錯誤,然後立即關閉這個流 |
Stream.fromFutures(多了個 s ) | 每個 future 都有自己的數據(onData )或錯誤(onError )事件,當整個 future 完成後,流將會關閉,如果 future 為空,流將會立即關閉 |
Stream.fromIterable | 從集合中戶取數據的單訂閱流 |
A. Stream.fromFuture
範例:
import 'dart:async';
void main() {
Future<String> myFuture = Future.delayed(
Duration(seconds: 2),
() => 'Hello from Future!');
Stream<String> streamFromFuture = Stream.fromFuture(myFuture);
streamFromFuture.listen(
(data) {
print('Data: $data');
},
onError: (error) {
print('Error: $error');
},
onDone: () {
print('Stream closed.');
},
);
}
B. Stream.fromFutures
範例:跟 Stream.fromFuture
差異不大,只是它會同時管理多個 Future
import 'dart:async';
void main() {
Future<String> future1 = Future.delayed(Duration(seconds: 1), () => 'First Future');
Future<String> future2 = Future.delayed(Duration(seconds: 2), () => 'Second Future');
Future<String> future3 = Future.delayed(Duration(seconds: 3), () => 'Third Future');
Stream<String> streamFromFutures = Stream.fromFutures([future1, future2, future3]);
streamFromFutures.listen(
(data) {
print('Data: $data');
},
onError: (error) {
print('Error: $error');
},
onDone: () {
print('Stream closed.');
},
);
}
C. Stream.fromIterable
範例:
import 'dart:async';
void main() {
List<int> myIterable = [1, 2, 3, 4, 5];
Stream<int> streamFromIterable = Stream.fromIterable(myIterable);
streamFromIterable.listen(
(data) {
print('Data: $data');
},
onError: (error) {
print('Error: $error');
},
onDone: () {
print('Stream closed.');
},
);
}
● 如果沒有額外設定,Stream 預設是單監聽模式,如果監聽數量大於一個以上的監聽則會拋出錯誤
void main() {
var stream = new File(r"/Users/user/Downloads/linux-5.6.4.tar.xz").openRead();
// stream 流讀取
stream.listen((List<int> bytes) {
print("stream listen 1");
});
// 錯誤! 只能有一個監聽者
stream.listen((List<int> bytes){
print("stream listen 2");
});
}
Stream 轉廣播模式:多個監聽者
● Stream 預設是單監聽模式,若要讓 Stream 可以讓多個用戶監聽(允許多個用戶監聽),就要將其轉為「廣播模式」
透過 Broadcast#asBroadcastStream
方法,就可以將 Stream 轉為可多個監聽者
import 'dart:io';
void main() {
// Stream 轉為 Broadcast 就可以多個監聽
var broadcastStream = new File(r"/Users/user/Downloads/linux-5.6.4.tar.xz").openRead().asBroadcastStream();
broadcastStream.listen((event){
print("廣播訂閱者 - 1");
});
broadcastStream.listen((event){
print("廣播訂閱者 - 2");
});
print("Main Finish");
}
StreamController:可動態添加數據
● StreamController 是流管理器,可透過 factory 建構函數的 broadcast
創建 StreamController,並且它的多訂閱模式是「熱訂閱」
我們可以從以下範例了解到 StreamController
如何使用:
A. 普通的 Stream 流是一個「單訂閱的不可變流」,它不可以臨時添加數據
void main() {
var stream = Stream.fromIterable([1, 2, 3]);
// 3 秒後添加訂閱者,Timer 延遲 (時間,callback)
Timer(const Duration(seconds: 3),
() => stream.listen((Object o) => print("延遲三秒: $o")));
}
B. StreamController 則是一個可以動態添加數據的流,但是它屬於「熱流」(當沒接收到數據就會流失掉)
void main() {
//創建一個 StreamController
var streamController = StreamController.broadcast(); // factory 構造器
// 在以後微任務中獲得事件
streamController.add("Hello"); // 無法接收,因為尚未有 listener
//訂閱事件
streamController.stream.listen((i){
print("StreamController broadcast: $i");
});
streamController.add("World"); // 可以被接收,因為已有 listener
// 記得關閉流 !!!
streamController.close();
}
如下圖所見,在 listen
之前的 Hello
數據是無法被 StreamController 監聽到的,只有在 listen
之後的 World
可被監聽到
streamcontroller 熱流監聽
通過 async*-yield
生成 stream
● 使用 async* 需要搭配上 yield
這個關鍵字一起使用,可以讓數據異步返回 (可執行判斷到一半就返回)
範例如下
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
yield i; // 每接收到一個數據就直接拋出異步訊息 (該拋出訊息不會中斷程式流程)
}
}
使用方式如下:
A. 使用 await-for
來消費 Stream 流
void main() async {
// 使用 countStream 函數生成一個 Stream
await for (int value in countStream(5)) {
print('Received: $value');
}
print('Stream processing completed.');
}
await-for 取得 stream 結果
B. 使用 listen
方法來監聽流
void main() {
// 使用 countStream 函數生成一個 Stream
final stream = countStream(5);
// 訂閱這個 Stream 並監聽數據
stream.listen((value) {
print('Received: $value');
},
onDone: () {
print('Stream processing completed.');
},
);
}
listene 取得 stream 結果
用 Stream 做簡易 EventBus
● 使用 Dart 的廣播機制創建 Android 的第三方庫 EventBus
,其功能類似於「觀察者模式」,主要的行為有 1. 訂閱、2. 通知
● 我們可以透過 StreamController
物件來設定、過濾泛型類型,以 達到指定類型的註冊與通知
範例如下:
● EventBus 類實作:
import 'dart:async';
class EventBus {
static EventBus _instance;
static StreamController _streamController;
EventBus._internal() {
_streamController = StreamController.broadcast();
}
factory EventBus.getDefault() {
// 單例模式
return _instance ??= EventBus._internal();
}
StreamSubscription<T> register<T>(void onData(T event)) {
if(T == dynamic) { // 未指定類型
return _streamController.stream.listen(onData);
} else {
// 傳送指定類型
return _streamController.stream.where((type) => type is T) // 判斷
.cast<T>() // 強制轉型
.listen(onData);
}
}
// 傳送廣播資訊,多廣播
void post<T>(T msg) {
_streamController.add(msg);
}
// 關閉廣播流
void close() {
_streamController.close();
}
}
● 使用自製的 EventBus 類:
import 'eventBus.dart';
void main() {
EventBus.getDefault().register((event) {
print("Receive All Message, from EventBus: $event");
});
EventBus.getDefault().register<String> ((event) {
print("--- Receive String Message, from EventBus: $event");
});
EventBus.getDefault().register<int>((event) {
print("--- --- Receive int Message, from EventBus: $event");
});
EventBus.getDefault().register<double>((event) {
print("--- --- --- Receive double Message, from EventBus: $event");
});
EventBus.getDefault().post(1111);
EventBus.getDefault().post("Hello World");
EventBus.getDefault().post(765.123);
EventBus.getDefault().close();
}
async / await 關鍵字
使用 async
關鍵字描述的函數是 異步代碼塊 (概念類似新開執行序)
使用 await
用來描述呼叫的函數,可以用來 等待異步執行的任務完畢
async 異步函數
● 被 async 描述的函數只能返回 void
或是 Future
類(因為是異步)
使用範例如下:
import 'dart:io';
void main() {
var string = asyncReadFile();
string.then((value) => print("Read Finish"));
}
// 異步函數
Future asyncReadFile() async { // 使用 async 描述
var file = new File(r"D:\mingw64\build-info.txt").readAsString();
return file;
}
await 等待函數
● await 必須要配合 async 使用,無法單獨使用 await
描述呼叫的函數
● await 可以有規律的順序執行,它會等待上一個異步函數結束才往下執行
我們可以透過以下範例了解 await
的功能、好處:
A. 尚未使用 await
描述異步任務,那異步任務的讀取順序將會不同(每次執行都可能是不同的結果)
void noSync() {
// 讀取順序不同,File1(linux-5.6.4.tar.xz) 較大的會讀得比較慢,File2(build.gradle) 會先讀完
new File(r"D:\Linux Code\linux-5.6.4.tar.xz").readAsBytes().then((value) => print("file1 read finish"));
new File(r"D:\mingw64\build.gradle").readAsString().then((value) => print("file2 read finish"));
}
B. 以往我們可能會使用瘋狂回調的方式,這容易進入回調地獄(callback hell
)
import 'dart:io';
// 多重回調,容易混亂
void oldStyle() {
new File(r"D:\Linux Code\linux-5.6.4.tar.xz").readAsBytes().then((value) {
print("then --- file1 read finish");
new File(r"D:\mingw64\build.gradle").readAsString().then((value) {
print("then --- file2 read finish");
});
});
}
C. 使用 await
關鍵字描述後,就可以擺脫這種回調地獄問題,可以讓函數按照預期順序執行,並且可讀性也高
import 'dart:io';
Future orderReadFile() async {
// await 異步中的同步
await new File(r"D:\Linux Code\linux-5.6.4.tar.xz").readAsBytes().then((value) => print("async --- file1 read finish"));
var file2 = await new File(r"D:\mingw64\build.gradle").readAsString().then((value) => print("async --- file1 read finish"));
return file2;
}
更多的 Flutter/Dart 語言相關文章
了解 Flutter 如何在跨平台開發中佔據重要地位,掌握快速上手的技巧與項目建置流程,開啟你的跨平台開發之旅!
探索跨平台與 Flutter 技術的未來:從認識到 Flutter 專案建置 | 3 種跨平台
Dart 語言基礎
● 探討 Dart 語言:宣告、數據類型、操作符 | 從基礎到應用指南
快速掌握 Dart 語言的核心概念,包括變數宣告、數據類型及操作符,為 Flutter 開發奠定扎實基礎。
● Dart 函數與方法、異常處理、引用庫 | Java 比較
深入了解 Dart 的函數與異常處理特性,並與 Java 的處理方式進行比較,幫助你跨語言切換更加順暢。
● 深入解析 Dart 語言:命名慣例、類特性、建構函數與抽象特性
學習 Dart 類的設計邏輯及命名慣例,深入探索抽象類與 Mixin 的強大應用場景。
● 深入探索 Dart 的併發與異步處理:從 Isolate 到 Event Loop 的全面指南 | Future、Stream
徹底搞懂 Dart 的併發與異步處理,掌握 Isolate 與 Event Loop 的運行機制,助你提升應用效能!
深入 Flutter 框架
● 深入解析 Flutter Navigator:常見錯誤、解決方法與路由跳轉技巧、動畫
從常見問題到自定義解決方案,學會如何利用 Navigator 實現路由跳轉與流暢動畫效果。
● 深入理解 Flutter 中的數據共享:從普遍方案到 InheritedWidget | 3 種方案
探討數據共享的最佳實踐,了解 InheritedWidget 等三種主要方案,幫助你優化應用結構。
● 深入解析 Flutter 三顆樹:Widget、Element 與 RenderObject 完整指南
拆解 Flutter 的內部結構,全面了解 Widget、Element 和 RenderObject 之間的關係,提升你的 Flutter 開發技能!