深入探索 Dart 的併發與異步處理:從 Isolate 到 Event Loop 的全面指南 | Future、Stream

深入探索 Dart 的併發與異步處理:從 Isolate 到 Event Loop 的全面指南 | Future、Stream

Overview of Content

在這篇文章中,我們將深入探討 Dart 語言中的併發與異步處理技術。首先,我們會從 Dart 的單執行緒模型開始,證明其運作方式,並介紹如何使用 Isolate 來創建資源隔離的執行緒,在這個過程中,我們會詳細解釋 Isolate 通訊機制,包括 ReceivePort 與 SendPort 的應用,並通過實例證明 Isolate 的併發能力

接下來,我們會解析 Dart 的 Event Loop 機制,探討 EventQueueMicrotaskQueue 的特性與運作方式,並通過實驗驗證其執行順序… 我們還將比較 Dart 與 Kotlin 在事件循環與協程方面的差異,更清楚兩者語言之間對於併發的處理差異

我們還會介紹 Dart 中強大的異步處理工具 Future,深入分析 thenwaitcatchError 函數的用法,讓您輕鬆應對各種異步操作

最後,本文將帶您認識 Stream 的概念,並通過實例展示如何創建 Stream 物件、將 Stream 轉為廣播模式,以及使用 StreamController 動態添加數據。您將學會如何利用 async*yield 生成 stream,甚至使用 Stream 打造簡易的 EventBus 系統

此外,我們還將講解 async/await 關鍵字,讓您更直觀地掌握異步函數的運作原理

參考文章: Stream 參考線上編譯 Dart

寫文章分享不易,如有引用參考請詳註出處,如有指導、意見歡迎留言(如果覺得寫得好也請給我一些支持),感謝 😀

個人程式分享時比較注重「縮排」,所以可能不適合手機的排版閱讀,建議切換至「電腦版」、「平板版」視窗看


Dart 的併發

我們之前有提到 Dart 是單執行緒(線程)的語言,但是單執行緒運作在耗時操作(像是網路請求、IO 操作)是相當不符合要求的,它會照成應用的卡頓…

所以 Dart 使採用併發(Concurrency)的機制,但是其特點與 Java 併發不同,兩者併發的差異如下表~

語言資源隔離性介紹
Java資源共享由於資源共享,所以要注意資源的同步問題
Dart資源隔離,記憶體不共享特性像是 Java 的進程(Process),但是並不是進程,必須把它看成 Java Thread,但是它是記憶體安全模型

● 關於多執行緒其實還有「並行」、「併發」概念,兩者個差異的請點擊 Thread & Process 的差異:併發 & 並行概念 了解

證明 Dart 單執行緒

● 用以下程式用來證明 Dart 是單執行緒


import 'dart:io';

void main() {
  bool finish = false;

  print("ready read file($finish)");

  new File(r"app_entry.dart").readAsString().then((value) {
    finish = true;
    print("already read done($finish)");
  });

  while(!finish) { }    // 會卡在這,在 Event - Loop 會解釋

  // 永遠不會執行到 "read file finish",因為主執行序不會等待 File 的 IO
  print("read file finish");
}

image

我們可以發現以上程式永遠不會結束,這是因為 Dart 的單執行緒…

A. 執行到讀取檔案時,Dart 會把 IO 耗時操作放置到事件循環內等待處理,然後繼續執行後續代碼

B. 執行到 while(!finish) 之後,就進入了無限循環,並且這個循環會一直佔有 CPU 資源,導致沒有機會去處理 IO 事件,這個測試無法結束

sequenceDiagram participant MainThread participant FileIO participant EventLoop MainThread->>MainThread: print("start read file(false)") MainThread->>FileIO: File.readAsString() FileIO-->>MainThread: 返回 Future,往下執行 MainThread->>MainThread: 進入 while(!finish) 循環 MainThread->>MainThread: 無限循環阻塞 Note right of EventLoop: 事件循環被塞住,無法被處理 FileIO->>EventLoop: 讀取文件完成,加入事件隊列 EventLoop-->>MainThread: 回調帶隊列中,無法被執行(因為主執行緒被卡住)

isolate 類:開啟資源隔離執行緒

事件循環(Event-Loop)之後的小節會說明

● 我們前面有提及「資源隔離」,那這裡我們就來使用 Dart 的資源隔離類

要注意它與 Java Thread 最大的不同在資源的隔離不共享 (若需要資源必須透過傳遞)… 在 Java 中,執行序內的資源是不隔離的所以可以共享,而不須透過傳遞

首先 import isolate 類,並使用 isolate#spawn 開啟新的 isolate,然後我們做以下實驗來證明一些事情

開啟新執行序

透過 isolate#spawn 方法可以開啟一個新的執行序


import 'dart:isolate';

int? data;     // int 是類,所有 default 是 null

void main() {
  data = 10;

  // 開新線程,spawn 是泛型
  // 原型 : external static Future<Isolate> spawn<T> (方法, ,{...})
  Isolate.spawn(entryPoint, "Alien");

  print("Finish main isolate, data: $data");   // 判斷是否會排隊進行任務  and  數據是否隔離
}

void entryPoint(String message) { // 可以省去 (message)
  print("In child isolate, Params: $message, data: $data");   // 證明數據是隔離的
}

從執行結果來看我們可以知道

A. spawn 方法確實可以開啟一個新執行序,因為主執行緒正常會先結束,而開啟的執行序在之後才結束

B. isolate#spawn 方法開啟的執行序是預設不守護主執行緒的,因為主執行緒結束後,透過 spawn 開啟的執行緒也不會結束

isolate 類:開啟資源隔離執行緒

證明資源的隔離

要正名資源的隔離也相當容易,讓不同的執行序持有相同資源的引用,如果資源被改變則是資源共享,而資源沒有被改變則是資源不共享!(也就是資源被隔離)


import 'dart:isolate';

int i = -1;

void main() {
  i = 20;

  Isolate.spawn(entryPoint, "Test");    // Java 可想成 new Thread

  print("Main $i");
}

void entryPoint(String msg) {
  print("Isolate $i");
}

從結果可以看到,兩者所取得的資源並不相同,Isolate.spawn 開啟的執行序仍舊是保持初始化的 -1(因為開啟執行緒後執行的 entryPoint 函數,它是取全域的 i 資源)

● 如果是資源共享的情況下,會變怎樣?

資源 i 如果是共享的,那代表主執行緒內將 i 修改為 20,而後 Isolate.spawn 開啟的執行序也會讀取到 i 為 20

證明 isolate 資源的隔離

Isolate 通訊:ReceivePort & SendPort

● 由於 isolate 開啟的執行序,其資源是相互隔離的,若需要傳遞資源則必須透過「某種方法傳遞」,在 Dart 中,這種方法就是 ReceivePort & SendPort

ReceivePort & SendPort,就像是 Android 的 Handler (不同執行緒之間的通訊)

每個 isolate 內都可以設定 ReceivePort & SendPort 物件,可以透過這兩個物件來發送與接收資源,參考以下的概念圖

isolate 與 ReceivePort 物件的關係

● ReceivePort 物件接收資源時,是否會被切換到別的執行緒中?

你在哪個執行緒內創建,就會回傳到該執行緒內中,並不用特別設定

● 使用 ReceivePort & SendPort 的範例如下,該範例會讓主執行緒與 isolate 執行緒通訊

A. 在主執行緒(Main Thread)中創建 ReceivePort 物件,並把 ReceivePort 物件內的 SendPort 物件傳遞給 Isolate 創建出的執行序(這樣 Isolate 就可以透過它發送資源給主執行緒)


import 'dart:isolate';

void main() {

  ReceivePort receivePort = ReceivePort();

  // 創建 isolate 執行緒
  Isolate.spawn<SendPort>(isolateFunc, receivePort.sendPort);

  // receive 監聽
  // 原型: StreamSubscription listen(void onData(var message), {...});
  receivePort.listen(mainListenIsolate);    // main 監聽 子 isolate 訊息

  print("Main~~~~~~ finish");      // Main 並不會等待 isolate !
}

B. 主執行緒監聽 isolate 執行序發送的訊息


// main isolate 監聽子 isolate

void mainListenIsolate(var message) {
  if(message is SendPort) {

    print("I am Main, I get your send Port");
    message.send("Main get");      // 2.

  } else {
    // 接收子 isolate 訊息
    print("Main get Message from isolate: $message");
  }
}

C. isolate 執行序接收主執行序的 SendPort 物件,並在 isolate 執行緒內創建 ReceivePort 物件,並將其傳給主執行緒(這樣主執行緒就可以透過它傳遞資源給 isolate 執行緒)


// isolate 執行序

void isolateFunc(SendPort sendPort) {
  print('isolate created');
  
  // 創建自己的接收器
  ReceivePort receivePort = ReceivePort();
  // 透過主執行緒給予的 SendPort 物件發送把 isolate 內的發送器(`SendPort`)發送給主執行緒
  sendPort.send(receivePort.sendPort);    

  // 監聽 Main
  receivePort.listen((var message) {
    print("Child Isolate get Message from Main: $message");
  });

  sendPort.send("I am isolate");    // 透過主執行緒給予的 SendPort 物件發送資料給主執行緒
}
主執行序與 isolate 通訊的結果

● 以下做了個有趣的測試,可以用來加強 Dart 是單執行緒的觀念… 一樣是這個小節的範例,在這裡我們在主行緒休眠了 5s 中,那會怎樣呢?


import 'dart:io';
import 'dart:isolate';


void main() {
  ReceivePort receivePort = ReceivePort();

  Isolate.spawn<SendPort>(isolateFunc, receivePort.sendPort);

  // receive 監聽
  // 原型: StreamSubscription listen(void onData(var message), {...});
  receivePort.listen(mainListenIsolate);    // main 監聽 子 isolate 訊息

  // 添加休眠 5 秒
  sleep(Duration(seconds:5));
  print("Main~~~~~~ finish");      // 證明 Main 並不會等待 isolate !
}
isolate 等待主執行序的訊息

在這個範例中,我們在主執行緒中進行了 5 秒的休眠(sleep)。儘管 Isolate 會在 Isolate.spawn 後立即啟動並執行 isolateFunc 函數,但由於主執行緒被 sleep 阻塞,主執行緒無法立即處理來自 Isolate 的訊息

這意味著雖然 Isolate 已經獨立運行,但主執行緒必須等待 sleep 結束後,才會繼續處理剩餘的程式碼和非同步事件

因此,在主執行緒恢復後,你會看到 Isolate 發送的訊息被處理,並繼續執行剩餘程式碼

主執行序修眠結束後,就可以與 isolate 執行序通訊

● 有些人可能會發現,為什麼這個範例程式不會結束(要自己按 Ctrl+C 才會結束)

而範例程式碼不會結束的原因是因為主執行緒的 ReceivePort 一直在監聽來自 Isolate 的訊息,這是導致程式持續運行的原因

Isolate 併發證明

● 以下程式碼展示了 Dart 中 Isolate 的併發性,它允許我們在獨立的執行緒中執行代碼


import 'dart:isolate';

void main() {

  //  在新的兩個 isolate 中創建兩個任務,這兩個任務做 dowhile
  Isolate.spawn(isoMain1, "start---1");

  Isolate.spawn(isoMain2, "start------2");

  while(true) {}
}

void isoMain1(String str) {
  Future.doWhile(() {
    print(str);
    return true;
  });
}

void isoMain2(String str) {
  Future.doWhile(() {
    print(str);
    return true;
  });
}

如果不是併發程序的話,就會是單一輸出結果

isolate 併發證明,交換執行

認識 Event Loop

Dart Event Loop 與 Android Handler 機制類似(都是以「事件驅動」模式設計),通過從 Loop 中不斷獲取消息

像是 isolate 發送的消息就是通過 Event Loop 處理後,由單執行緒的 Dart 應用接收

graph LR subgraph Dart Event Loop 待處理任務 subgraph queue EventQueue MicrotaskQueue end 待處理任務 --> queue end 耗時任務 --> |給予待處理的任務| 待處理任務 queue -.-> |空閒時| d(dart 執行序處理)

Dart Event Loop 特性:EventQueue、MicrotaskQueue

這個小節我們先撇除併發概念,先單獨來觀察 Dart Event Loop 的特性

● 首先我們要知道,在 Dart 中一個執行緒(線程)會對應一個事件循環(Event-loop

Dart Event Loop 與 Android Handler 不同的是…

A. Android 每個 Thread 都有一個 Event-loop,並對應 單獨的 MessageQueue,再藉由 Handler 處理消息

graph LR MainThread <-.-> Event-loop <-.-> MessageQueue

但是與 Dart 不同的是,Android 天生屬於多執行序

B. Dart 則是一個 Event-Looper 對應兩個 Queue,儲存不同等級的工作,兩個 Queue 的如下表所述

graph LR MainThread <-.-> Event-loop Event-loop <-.-> MessageQueue Event-loop <-.-> MicrotaskQueue
Dart 事件類型介紹特色範例
EventQueue普通事件每次執行完都會檢查 微事件Future.delayed(Duration(seconds: 1));
MicrotaskQueue微事件優先等級最高Future.microtask()

黃色區塊為子執行序,要注意 Main 函數會正常執行,並 併發執行 Event-Loop

每次 Event-loop 在執行 EventQueue 任務前,都會檢查 MicrotaskQueue 是否有需要執行的微任務,若 MicrotaskQueue 有任務要執行會先執行,之後才接續執行 EventQueue 任務

event-loop & queue 檢查順序

Event-loop 的啟動

Event-loop 在 Dart 應用中是自動啟動的,不需要等 main 完全執行完畢才開始運行… 實際上,Event-loop 是一個「持續運行的機制」,從應用啟動之後就開始運行

EventQueue、MicrotaskQueue 驗證執行順序

在 Dart Event Loop 中插入任務的使用範例如下

插入任務至 EventQueue

then 函數是在 Event-Loop 檢查 EventQueue 時才執行,只要 EventQueue 內有任務就會被執行(以下任務是 IO)


import 'dart:io';

void main() {

  new File(r"app_entry.dart").readAsString().then((value) {
    print("執行 Task");
    print(value);
  });

  print("read file finish");
}
任務放入 event-queue

插入任務至 MicrotaskQueue

這裡我們來驗證將任務插入至 MicrotaskQueue 是否會比起把任務插入 EventQueue 還要快被執行

插入任務至 MicrotaskQueue 需要使用 Future.microtask 函數


import 'dart:io';

void main() {

  new File(r"app_entry.dart").readAsString().then((value) {
    print("執行 Task");
  });

  // 原型 : factory Future.microtask(FutureOr<T> computation()) {
  Future.microtask(() {
    print("執行 micro Task");
  });


  print("Main finish");
}

如下結果,我們可以看到 微任務的執行會在一般任務之前

任務放入 micro-queue

Micro 微任務:任務插隊

● 現在來證明 Micro 微任務是可以插隊一般任務的 (每次執行一般 Event 都會檢查有沒有需要執行的微任務),驗證程式如下:

我們在主執行序中創建 ReceivePort 物件並監聽事件,當有事件進入主執行序時「插入微任務」,並在微任務執行時打印,來驗證是否微任務不論順序,都會執行在一般任務之前


import 'dart:isolate';

void main() {
  ReceivePort receivePort = ReceivePort();
  receivePort.listen((message) {
    print(message);

    Future.microtask(() => print("微任務插隊~"));
  });

  // 一般 Event
  receivePort.sendPort.send("傳送 Message 1");
  // 一般 Event
  receivePort.sendPort.send("傳送 Message 2");
  // 一般 Event
  receivePort.sendPort.send("傳送 Message 3");

  print("Main function finish");
}

事件循環與協程:Dart vs. Kotlin

● 個人熟悉的協程是 Kotlin 的智能協程,所以以下透過 Kotlin 協程與 Dart 事件循環做比較

● 以下範例是最一開始我們看的「Dart 單執行緒的證明」,在這裡我們插入協程的概念會如何?

如果我改成在 while 判斷中修眠,那 Dart 事件循環是否可以像是 Kotlin 中提供的協程一樣,在主執行緒空閒時幫我去處理任務呢?


void main() {
  ...

  new File(r"app_entry.dart").readAsString().then((value) {
    // 會不會有機會處理到這裡?
  });

  while(!finish) {

    print("test...");
    sleep(const Duration(seconds: 1));

  }

  ...
}

答案是否定的!就算是這樣修改,程式也不會有機會執行到以上的 IO 任務,因為 Dart 是採用事件循環機制,與 Kotlin 的智能協程 就有本質上的不同

● 兩者個差異關鍵為:

Dart 事件循環:當主執行緒被阻塞時(例如透過 sleep),事件循環無法繼續,非同步任務的回呼也無法執行

Kotlin 協程:Kotlin 的協程能夠在任務之間智慧切換,避免主執行緒阻塞,能夠繼續處理其他任務

所以以效能來說的話 Kotlin 的協程能帶來更高的效能,不過相對的撰寫起來需要更多的概念,複雜度也會更高

結合 Dart 事件循環、執行序

● 首先我們仍要抱有一個觀念 Dart 是單執行序的!那它是怎麼(或是說何時)去執行異步任務的呢?這裡需要再澄清兩個觀念

● 前面小節我們所說的「Dart 事件循環」,是在說明 Dart 的 Event-loop 特性與內部結構,這時與執行序尚未產生關聯

Event-loop 機制與異步任務關聯

Event-loop 是用來掃描其內部的兩個 Queue 事件的機制,而我們撰寫的 異步任務就會放置到 Queue 中

Event-loop 機制與執行序產生關聯

那放置在兩個 Queue 的任務何時被執行呢?

Dart 不會另外起一執行序來執行,仍舊保值單一執行序執行,而這個執行是採用「協作式」的方式執行… 也就是說「異步任務會在主執行序空閒時被執行


認識 Future

在 Dart 庫中隨處可見 Future 物件… 並且 通常異步函數返回的物件就是一個 Future (像是 Isolate.spawn 方法返回就是一個 Future 物件)


// Isolate.dart

// spawn 方法,返回值就是 Future
  external static Future<Isolate> spawn<T>(
      void entryPoint(T message), T message,
      {bool paused: false,
      bool errorsAreFatal,
      SendPort onExit,
      SendPort onError,
      @Since("2.3") String debugName});

以下介紹幾個 Future 常用的 API,如下表所示

Future API介紹
then(...)當任務執行完畢後
wait(...)等待以上 Future 任務都執行完後再一起做結尾
catchError(...)抓取異步錯誤

then 函數:異步的回調、異步串接處理

異步的回調

在呼叫異步函數時會返回一個 Future 物件,然後我們可以執行 then 函數,這個 then 函數就是異步的回調函數(Callback function),以下是個簡單的使用範例


void main() {
  new File(r"app_entry.dart").readAsString().then((value) {
    print("Read file finish");
  });
}
then 異步回調的結果

異步串接處理

Future#then 函數返回的也是一個 Future 物件,所以可以接續使用

如同 Java 的 Builder 建構者模式,這在做網路 API 請求的時候非常好用

範例如下:


void main() {
  new File(r"app_entry.dart").readAsString().then((value) {
    print("Read file finish");
    return value.length;

  }).then((length) {
    print("File length: $length");

  });
}
then 串接任務

wait 函數:結合異步任務

結合異步任務

透過 Future#wait 函數,我們可以用來等待多個 Future 都執行完後在做統一處理,可以用來做異步任務協作)

範例如下所示


void main() {

  // 異步任務一
  Future readFile = new File(r"app_entry.dart").readAsString().then((value) {
    return value.length;
  });

  // 異步任務二
  Future delay = Future.delayed(const Duration(seconds: 3));

  // 等待兩任務都結束
  Future.wait([readFile, delay]).then((resultArray) {

    print("第一個 Future : ${resultArray[0]}");
    print("第二個 Future : ${resultArray[1] ??= "null"}");

  });

  print("main function finish");

}

如下圖所見,我們會看到 wait 函數確實會等待兩個任務都執行完畢才會被執行

with 結合異步任務

catchError 函數:捕捉異常

捕捉異步任務異常

使用 file#readAsString 切換為異步讀取資源 (該函數返回 Future),這時就可以使用 Future#catchError 函數來捕捉錯誤

以下範例會讀取一個不存在的檔案


import 'dart:io';

void throwError() {
  // readAsString 原型 : Future<String> readAsString({Encoding encoding: utf8});
  new File(r"123.txt").readAsString().then((value) {
    // 讀取不存在文件
    print(value);
      
  }).catchError((e,s) { // catchError 原型 Future<T> catchError(Function onError, {bool test(Object error)});
    
    print("----> Caught an exception: $e");
    print("----> Stack trace: $s");
      
  });
}


void main() {
  throwError();
}

如下圖所見,我們會捕捉到 PathNoFoundException 異常

catchError 捕捉異常

如果沒有捕捉錯誤,則會產生 Unhandled exception 異常,對於 Dart 異常不熟悉的可以看 Dart 的捕捉異常限制 這篇文章

● Future#catchError 也接受捕捉特定類型異常,只要透設定第二個參數就可以指定異常類型,範例如下


import 'dart:io';

void throwError() {
  new File(r"123.txt").readAsString().then((value) {
    // 讀取不存在文件
    print(value);
  }).catchError((e, s) {
    print("----> Caught an exception: $e");
    print("----> Stack trace: $s");

  }, test: (e) => e is PathNotFoundException);
}


void main() {

  throwError();

}
catchError 捕捉指定異常類型

認識 Stream

Stream 表示的也是異步數據,它是 Dart 中處理異步事件「」的 API

它與 Future 的差異在,Future 表示 一次性 任務的返回,而 Stream 可以 分多次 異步任務,這樣的好處有…

A. 可以減少一次性記憶體的消耗量(因為 Stream 可以分批回傳)

B. 響應速度較快;就像是加載較大的網頁時,Stream 可以先回傳已經加載好的部分,這樣可以有比較好的使用體驗(而 Future 則是全部加載完畢後再一次返回)

測試 Stream 與 Future 的不同

● Stream 有監聽函數可以監聽到目前執行到哪,像是 onDoneonErrorresumepause...等等函數可以使用(如下表所示),與 Future 最大不同在於 Stream 不會單次讀取完成

監聽方法說明
onData(必須參數)收到 Data 時會被觸發
onError收到 Error 時觸發
onDone結束 Stream 流時觸發
unsubscribeOnError當第一次收到 onError 時是否取消 Stream 流

以下範例中我們來讀取大一點的檔案(linux-5.6.4.tar.xz),這樣就可以明顯看出兩者個不同


import 'dart:async';
import 'dart:io';

import 'dart:isolate';

// 讀取次數
const fileName = '/Users/user/Downloads/linux-5.6.4.tar.xz';
int times = 0;

void main() {
  Isolate.spawn(isolateFuture, "start read File By Future");

  Isolate.spawn(isolateStream, "start read File By Stream");

  while(true) {}
}

void isolateFuture(String str) {
  print(str);

  new File(fileName).readAsBytes().then((value) {
    print("Future : ${times++}");
  });

  print("Future Finish\n\n");
}

void isolateStream(String str) {
  print(str);

  // 多次
  StreamSubscription<List<int>> listen = new File(fileName).openRead().listen((event) {
    print("Stream : ${times++}");
  });

  // 結束通知閉包,還有 onError 等等...
  listen.onDone(() {
    print("onDone");
  });
  // stream 也可以暫停
  listen.pause();

  listen.resume();

  print("Stream Finish");
}

看到結果知道 Stream 會多次讀取資料,而 Future 是一次性的全部讀取進記憶體中

stream vs. future

創建 Stream 物件

● 來看看幾個比較常見的 Stream 建構方法,如下表所示

Factory 建構功能
Stream.fromFuture當 future 完成時將觸發一個數據或錯誤,然後立即關閉這個流
Stream.fromFutures(多了個 s每個 future 都有自己的數據(onData)或錯誤(onError)事件,當整個 future 完成後,流將會關閉,如果 future 為空,流將會立即關閉
Stream.fromIterable從集合中戶取數據的單訂閱流

A. Stream.fromFuture 範例


import 'dart:async';

void main() {
  Future<String> myFuture = Future.delayed(
      Duration(seconds: 2), 
      () => 'Hello from Future!');

  Stream<String> streamFromFuture = Stream.fromFuture(myFuture);

  streamFromFuture.listen(
    (data) {
      print('Data: $data');
    },
    onError: (error) {
      print('Error: $error');
    },
    onDone: () {
      print('Stream closed.');
    },
  );
}

B. Stream.fromFutures 範例:跟 Stream.fromFuture 差異不大,只是它會同時管理多個 Future


import 'dart:async';

void main() {
  Future<String> future1 = Future.delayed(Duration(seconds: 1), () => 'First Future');
  Future<String> future2 = Future.delayed(Duration(seconds: 2), () => 'Second Future');
  Future<String> future3 = Future.delayed(Duration(seconds: 3), () => 'Third Future');

  Stream<String> streamFromFutures = Stream.fromFutures([future1, future2, future3]);

  streamFromFutures.listen(
    (data) {
      print('Data: $data');
    },
    onError: (error) {
      print('Error: $error');
    },
    onDone: () {
      print('Stream closed.');
    },
  );
}

C. Stream.fromIterable 範例


import 'dart:async';

void main() {
  List<int> myIterable = [1, 2, 3, 4, 5];

  Stream<int> streamFromIterable = Stream.fromIterable(myIterable);

  streamFromIterable.listen(
    (data) {
      print('Data: $data');
    },
    onError: (error) {
      print('Error: $error');
    },
    onDone: () {
      print('Stream closed.');
    },
  );
}

● 如果沒有額外設定,Stream 預設是單監聽模式,如果監聽數量大於一個以上的監聽則會拋出錯誤


void main() {
  var stream = new File(r"/Users/user/Downloads/linux-5.6.4.tar.xz").openRead();
  // stream 流讀取
  stream.listen((List<int> bytes) {
    print("stream listen 1");
  });
  // 錯誤! 只能有一個監聽者
  stream.listen((List<int> bytes){
    print("stream listen 2");
  });
}

Stream 轉廣播模式:多個監聽者

● Stream 預設是單監聽模式,若要讓 Stream 可以讓多個用戶監聽(允許多個用戶監聽),就要將其轉為「廣播模式

透過 Broadcast#asBroadcastStream 方法,就可以將 Stream 轉為可多個監聽者


import 'dart:io';

void main() {

  // Stream 轉為 Broadcast 就可以多個監聽
  var broadcastStream = new File(r"/Users/user/Downloads/linux-5.6.4.tar.xz").openRead().asBroadcastStream();
  broadcastStream.listen((event){
    print("廣播訂閱者 - 1");
  });
  broadcastStream.listen((event){
    print("廣播訂閱者 - 2");
  });

  print("Main Finish");
}

StreamController:可動態添加數據

● StreamController 是流管理器,可透過 factory 建構函數的 broadcast 創建 StreamController,並且它的多訂閱模式是「熱訂閱

我們可以從以下範例了解到 StreamController 如何使用:

A. 普通的 Stream 流是一個「單訂閱的不可變流」,它不可以臨時添加數據


void main() {

  var stream = Stream.fromIterable([1, 2, 3]);
  // 3 秒後添加訂閱者,Timer 延遲 (時間,callback)
  Timer(const Duration(seconds: 3),
          () => stream.listen((Object o) => print("延遲三秒: $o")));

}

B. StreamController 則是一個可以動態添加數據的流,但是它屬於「熱流」(當沒接收到數據就會流失掉)


void main() {

  //創建一個 StreamController
  var streamController = StreamController.broadcast();  // factory 構造器
  // 在以後微任務中獲得事件
  streamController.add("Hello");  // 無法接收,因為尚未有 listener
  //訂閱事件
  streamController.stream.listen((i){
    print("StreamController broadcast: $i");
  });
  streamController.add("World");  // 可以被接收,因為已有 listener
  // 記得關閉流 !!!
  streamController.close();

}

如下圖所見,在 listen 之前的 Hello 數據是無法被 StreamController 監聽到的,只有在 listen 之後的 World 可被監聽到

streamcontroller 熱流監聽

通過 async*-yield 生成 stream

● 使用 async* 需要搭配上 yield 這個關鍵字一起使用,可以讓數據異步返回 (可執行判斷到一半就返回)

範例如下


Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;    // 每接收到一個數據就直接拋出異步訊息 (該拋出訊息不會中斷程式流程)
  }
}

使用方式如下:

A. 使用 await-for 來消費 Stream 流


void main() async {
  // 使用 countStream 函數生成一個 Stream
  await for (int value in countStream(5)) {
    print('Received: $value');
  }

  print('Stream processing completed.');
}
await-for 取得 stream 結果

B. 使用 listen 方法來監聽流


void main() {
  // 使用 countStream 函數生成一個 Stream
  final stream = countStream(5);

  // 訂閱這個 Stream 並監聽數據
  stream.listen((value) {
      print('Received: $value');
    },
    onDone: () {
      print('Stream processing completed.');
    },
  );
}
listene 取得 stream 結果

用 Stream 做簡易 EventBus

● 使用 Dart 的廣播機制創建 Android 的第三方庫 EventBus,其功能類似於「觀察者模式」,主要的行為有 1. 訂閱、2. 通知

● 我們可以透過 StreamController 物件來設定、過濾泛型類型,以 達到指定類型的註冊與通知

範例如下:

EventBus 類實作


import 'dart:async';

class EventBus {

  static EventBus _instance;
  static StreamController _streamController;

  EventBus._internal() {
    _streamController = StreamController.broadcast();
  }

  factory EventBus.getDefault() {
    // 單例模式
    return _instance ??= EventBus._internal();
  }

  StreamSubscription<T> register<T>(void onData(T event)) {
    if(T == dynamic) {  // 未指定類型
      return _streamController.stream.listen(onData);
    } else {
      // 傳送指定類型
      return _streamController.stream.where((type) => type is T)  // 判斷
          .cast<T>()        // 強制轉型
          .listen(onData);
    }
  }

  // 傳送廣播資訊,多廣播
  void post<T>(T msg) {
    _streamController.add(msg);
  }

  // 關閉廣播流
  void close() {
    _streamController.close();
  }

}

使用自製的 EventBus 類


import 'eventBus.dart';

void main() {

  EventBus.getDefault().register((event) {
    print("Receive All Message, from EventBus: $event");
  });

  EventBus.getDefault().register<String> ((event) {
    print("--- Receive String Message, from EventBus: $event");
  });

  EventBus.getDefault().register<int>((event) {
    print("--- --- Receive int Message, from EventBus: $event");
  });

  EventBus.getDefault().register<double>((event) {
    print("--- --- --- Receive double Message, from EventBus: $event");
  });


  EventBus.getDefault().post(1111);
  EventBus.getDefault().post("Hello World");
  EventBus.getDefault().post(765.123);

  EventBus.getDefault().close();
}


async / await 關鍵字

使用 async 關鍵字描述的函數是 異步代碼塊 (概念類似新開執行序)

使用 await 用來描述呼叫的函數,可以用來 等待異步執行的任務完畢

async 異步函數

被 async 描述的函數只能返回 void 或是 Future 類(因為是異步)

使用範例如下:


import 'dart:io';

void main() {
  var string = asyncReadFile();
  string.then((value) => print("Read Finish"));
}

// 異步函數
Future asyncReadFile() async {        // 使用 async 描述
  var file = new File(r"D:\mingw64\build-info.txt").readAsString();

  return file;
}

await 等待函數

await 必須要配合 async 使用,無法單獨使用 await 描述呼叫的函數

await 可以有規律的順序執行,它會等待上一個異步函數結束才往下執行

我們可以透過以下範例了解 await 的功能、好處:

A. 尚未使用 await 描述異步任務,那異步任務的讀取順序將會不同(每次執行都可能是不同的結果)


void noSync() {
  // 讀取順序不同,File1(linux-5.6.4.tar.xz) 較大的會讀得比較慢,File2(build.gradle) 會先讀完
  new File(r"D:\Linux Code\linux-5.6.4.tar.xz").readAsBytes().then((value) => print("file1 read finish"));
  new File(r"D:\mingw64\build.gradle").readAsString().then((value) => print("file2 read finish"));
}

B. 以往我們可能會使用瘋狂回調的方式,這容易進入回調地獄(callback hell


import 'dart:io';

// 多重回調,容易混亂
void oldStyle() {
  new File(r"D:\Linux Code\linux-5.6.4.tar.xz").readAsBytes().then((value) {
    print("then --- file1 read finish");

    new File(r"D:\mingw64\build.gradle").readAsString().then((value) {
      print("then --- file2 read finish");
    });
  });
}

C. 使用 await 關鍵字描述後,就可以擺脫這種回調地獄問題,可以讓函數按照預期順序執行,並且可讀性也高


import 'dart:io';

Future orderReadFile() async {
  // await 異步中的同步
  await new File(r"D:\Linux Code\linux-5.6.4.tar.xz").readAsBytes().then((value) => print("async --- file1 read finish"));

  var file2 = await new File(r"D:\mingw64\build.gradle").readAsString().then((value) => print("async --- file1 read finish"));

  return file2;
}


更多的 Flutter/Dart 語言相關文章

了解 Flutter 如何在跨平台開發中佔據重要地位,掌握快速上手的技巧與項目建置流程,開啟你的跨平台開發之旅!

探索跨平台與 Flutter 技術的未來:從認識到 Flutter 專案建置 | 3 種跨平台

Dart 語言基礎

探討 Dart 語言:宣告、數據類型、操作符 | 從基礎到應用指南

快速掌握 Dart 語言的核心概念,包括變數宣告、數據類型及操作符,為 Flutter 開發奠定扎實基礎。

Dart 函數與方法、異常處理、引用庫 | Java 比較

深入了解 Dart 的函數與異常處理特性,並與 Java 的處理方式進行比較,幫助你跨語言切換更加順暢。

深入解析 Dart 語言:命名慣例、類特性、建構函數與抽象特性

學習 Dart 類的設計邏輯及命名慣例,深入探索抽象類與 Mixin 的強大應用場景。

深入探索 Dart 的併發與異步處理:從 Isolate 到 Event Loop 的全面指南 | Future、Stream

徹底搞懂 Dart 的併發與異步處理,掌握 Isolate 與 Event Loop 的運行機制,助你提升應用效能!

深入 Flutter 框架

深入解析 Flutter Navigator:常見錯誤、解決方法與路由跳轉技巧、動畫

從常見問題到自定義解決方案,學會如何利用 Navigator 實現路由跳轉與流暢動畫效果。

深入理解 Flutter 中的數據共享:從普遍方案到 InheritedWidget | 3 種方案

探討數據共享的最佳實踐,了解 InheritedWidget 等三種主要方案,幫助你優化應用結構。

深入解析 Flutter 三顆樹:Widget、Element 與 RenderObject 完整指南

拆解 Flutter 的內部結構,全面了解 Widget、Element 和 RenderObject 之間的關係,提升你的 Flutter 開發技能!

Leave a Comment

Comments

No comments yet. Why don’t you start the discussion?

發表迴響