目錄

在 Dart 裡使用 Stream

Written by Lasse Nielsen
April 2013 (updated May 2021)

dart:async 庫中有兩個型別,它們對許多 Dart API 來說都非常重要: StreamFuture。 Future 用於表示單個運算的結果,而 Stream 則表示多個結果的序列。你可以監聽 Stream 以獲取其結果(包括資料和錯誤)或其關閉事件。也可以在 Stream 完成前對其暫停或停止監聽。

但是本篇文章並非闡述 如何使用 Stream,而是向你介紹如何建立 Stream。你可以透過以下幾種方式建立 Stream。

  • 轉換現有的 Stream。

  • 使用 async* 函式建立 Stream。

  • 使用 StreamController 產生 Stream。

本文將向你展示每種方式的程式碼並且會給你一些有用的提示,這些提示可以幫助你正確建立 Stream。

可以查閱 非同步程式設計:使用 Stream 獲取更多關於 Stream 的資訊。

轉換現有的 Stream

我們在建立 Stream 時常見的情形是根據現有 Stream 的事件建立一個新的 Stream。比如你已經有了一個可以提供位元組事件的 Stream,然後你想將該 Stream 變為一個可以提供字串的 Stream,並且該 Stream 中的字串還經過 UTF-8 編碼。對於這種情況,常用的辦法是建立一個新的 Stream 去等待獲取原 Stream 的事件,然後再將新 Stream 中的事件輸出。例如:

/// Splits a stream of consecutive strings into lines.
///
/// The input string is provided in smaller chunks through
/// the `source` stream.
Stream<String> lines(Stream<String> source) async* {
  // Stores any partial line from the previous chunk.
  var partial = '';
  // Wait until a new chunk is available, then process it.
  await for (final chunk in source) {
    var lines = chunk.split('\n');
    lines[0] = partial + lines[0]; // Prepend partial line.
    partial = lines.removeLast(); // Remove new partial line.
    for (final line in lines) {
      yield line; // Add lines to output stream.
    }
  }
  // Add final partial line to output stream, if any.
  if (partial.isNotEmpty) yield partial;
}

你可以使用 Stream 類提供的轉換類方法,比如 map()where()expand()take() 來應對大多數常見的轉換需求。

例如,假設你有一個名為 counterStream 的 Stream,用於每秒列印輸出一個自增的整數。其實現過程可能如下:

var counterStream =
    Stream<int>.periodic(const Duration(seconds: 1), (x) => x).take(15);

你可以使用下面的程式碼區塊速檢視事件:

counterStream.forEach(print); // Print an integer every second, 15 times.

你可以在監聽 Stream 前呼叫一個類別似 map() 的轉換方法來轉換 Stream 的事件。該方法將返回一個新的 Stream。

// Double the integer in each event.
var doubleCounterStream = counterStream.map((int x) => x * 2);
doubleCounterStream.forEach(print);

你可以使用任意其它的變換方法替代 map(),比如類似下面的這些:

.where((int x) => x.isEven) // Retain only even integer events.
.expand((var x) => [x, x]) // Duplicate each event.
.take(5) // Stop after the first five events.

通常而言,使用各種轉換方法足以滿足你簡單的使用需求。但是,如果你需要對轉換進行更多的控制,你可以使用 Stream 類別的 transform() 方法指定一個 StreamTransformer。 Dart 平台庫為許多常見的任務需求提供了 Stream 轉換器。例如下面的程式碼使用了由 dart:convert 庫提供的 utf8.decoderLineSplitter 轉換器。

Stream<List<int>> content = File('someFile.txt').openRead();
List<String> lines = await content
    .transform(utf8.decoder)
    .transform(const LineSplitter())
    .toList();

從零開始建立 Stream

上一小節中我們使用一個現有的 Stream 經過轉換產生新的 Stream。這一小節我們透過非同步產生器 (async*) 函式來完完全全地建立一個 Stream。當非同步產生器函式被呼叫時會建立一個 Stream,而函式體則會在該 Stream 被監聽時開始執行。當函式返回時,Stream 關閉。在函式返回前,你可以使用 yieldyield* 陳述式向該 Stream 提交事件。

下面是一個週期性傳送整數的函式例子:

Stream<int> timedCounter(Duration interval, [int? maxCount]) async* {
  int i = 0;
  while (true) {
    await Future.delayed(interval);
    yield i++;
    if (i == maxCount) break;
  }
}

該函式返回一個 Stream。而函式體會在該 Stream 被監聽時開始執行且以一定的週期間隔在指定的數字範圍內不斷地產生一個遞增數字。如果省略掉 count 引數,那麼迴圈將無休止地執行下去,此時除非取消訂閱,否則 Stream 會不停地產生越來越多的數字。

當監聽器取消時(呼叫由 listen() 方法返回的 StreamSubscription 物件中的 cancel() 方法),如果下一次迴圈體執行到 yield 陳述式,此時該陳述式的作用類似於 return 陳述式。而且任意 finally 陳述式塊在此時執行均會導致函式退出。如果函式嘗試在退出前 yield 一個值,那麼該嘗試將會以失敗告終併產生類似於 return 陳述式的效果。

當函式最終退出時,由 cancel() 方法返回的 Future 完成。如果函式是因為出錯導致退出,則 Future 完成時會攜帶對應的錯誤,否則其會攜帶一個 null

另外,一個更有用的範例是將一個 Future 序列轉換為 Stream 的函式:

Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
  for (final future in futures) {
    var result = await future;
    yield result;
  }
}

該函式迴圈向 Future 序列請求一個 Future 並等待該 Future 完成獲取其結果後提交給 Stream。如果某個 Future 因出錯完成,則該錯誤也會提交給 Stream。

在實際應用中,透過 async* 函式從零建構 Stream 的情況比較少見。 async* 函式通常會根據某些資料源來建立 Stream,而這些資料源常常又是另一個 Stream。比如像上述範例中的 Future 序列,其資料往往來自於其它的非同步事件源。然而,在許多情況下,非同步函式過於簡單難以輕鬆地處理多個數據源的場景。而這就是 StreamController 類別的用武之地。

使用 StreamController

如果你 Stream 的事件不僅來自於非同步函式可以遍歷的 Stream 和 Future,還來自於你程式的不同部分,這種情況使用上述兩種方式產生 Stream 就顯得比較困難。面對這種情況,我們可以使用一個 StreamController 來建立和填充 Stream。

StreamController 可以為你產生一個 Stream,並提供在任何時候、任何地方將事件新增到該 Stream 的方法。該 Stream 具有處理監聽器和暫停所需的所有邏輯。控制器物件你可以自行處理而只需返回呼叫者所需的 Stream 即可。

下面的程式碼將為你展示一個簡單的範例(出自 stream_controller_bad.dart),該範例使用 StreamController 來實現上一個範例中的 timedCounter() 函式。儘管該範例有一定的缺陷,但其為你展示了 StreamController 的基本用法。該程式碼將資料直接新增至 StreamController 而不是從 Future 或 Stream 中獲取,並在最後返回 StreamController 中的 Stream。

// NOTE: This implementation is FLAWED!
// It starts before it has subscribers, and it doesn't implement pause.
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
  var controller = StreamController<int>();
  int counter = 0;
  void tick(Timer timer) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (maxCount != null && counter >= maxCount) {
      timer.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  Timer.periodic(interval, tick); // BAD: Starts before it has subscribers.
  return controller.stream;
}

與前面一樣,你可以像下面這樣使用由 timedCounter() 函式返回的 Stream:

var counterStream = timedCounter(const Duration(seconds: 1), 15);
counterStream.listen(print); // Print an integer every second, 15 times.

timedCounter() 函式的實現有兩個問題:

  • 它在擁有訂閱者之前就開始產生事件。

  • 即使訂閱者請求暫停,它也會繼續產生事件。

如下一節所示,你可以在建立 StreamController 時透過指定回呼(Callback),比如 onListenonPause 來修復這些問題。

等待訂閱

一般來說,Stream 應該在它產生事件前等待訂閱者,否則事件的產生毫無意義。對 async* 函式而言,它可以自行處理該問題。但是當使用 StreamController 時,因為你可以有比使用 async* 函式更多的控制能力,因此你完全可以無視相關規則自行新增並控制事件。如果一個 Stream 沒有訂閱者,它的 StreamController 會不斷快取事件,這可能會導致記憶體洩露。

將上面範例中使用 Stream 的程式碼更改為如下:

void listenAfterDelay() async {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  await Future.delayed(const Duration(seconds: 5));

  // After 5 seconds, add a listener.
  await for (final n in counterStream) {
    print(n); // Print an integer every second, 15 times.
  }
}

當我們執行上述程式碼時,儘管 Stream 一開始就工作,但最開始的 5 秒內不會有任何東西列印輸出。 5 秒後我們向 Stream 新增監聽器,此時前面的 5 個事件會被同時輸出,因為它們被 StreamController 快取了。

當你建構 StreamController 時,可以為其指定一個 onListen 引數回呼(Callback)用以接收訂閱通知。當 Stream 獲取到它的第一個訂閱者時會觸發呼叫 onListen 回呼(Callback)。同樣地,你也可以指定一個 onCancel 回呼(Callback),該回調則會在控制器丟失它最後一個訂閱者時觸發呼叫。在上述例子中, Timer.periodic() 的呼叫應該移至 onListen 中進行,如下一節所示。

遵循並實現暫停

當監聽器請求暫停時應當避免繼續產生事件。當 Stream 訂閱暫停時,async* 函式可以自動地在一個 yield 陳述式執行時暫停。而 StreamController 則會在暫停時快取事件。如果程式碼在處理事件產生時不考慮暫停功能,則快取的大小可以無限制地增長。而且如果在暫停後監聽器很快又請求停止,那麼在暫停到停止這段時間內所做的快取工作都是浪費的。

為了可以檢視在不支援暫停的時候會發生什麼,我們將上面使用 Stream 的程式碼更改為如下:

void listenWithPause() {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  late StreamSubscription<int> subscription;

  subscription = counterStream.listen((int counter) {
    print(counter); // Print an integer every second.
    if (counter == 5) {
      // After 5 ticks, pause for five seconds, then resume.
      subscription.pause(Future.delayed(const Duration(seconds: 5)));
    }
  });
}

當五秒鐘的暫停時間結束時,在此期間產生的事件將同時被輸出。出現這種狀況的原因是因為產生 Stream 的源沒有遵循暫停規則,因此其會持續不斷地向向 Stream 中新增事件。進而導致 Stream 快取事件,然後,當 Stream 從暫停中恢復時,它會清空並輸出其快取。

下面程式碼所實現的 timedCounter() 版本(出自 stream_controller.dart)透過使用 StreamController 中的 onListenonPauseonResumeonCancel 回呼(Callback)實現暫停功能。

Stream<int> timedCounter(Duration interval, [int? maxCount]) {
  late StreamController<int> controller;
  Timer? timer;
  int counter = 0;

  void tick(_) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (counter == maxCount) {
      timer?.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  void startTimer() {
    timer = Timer.periodic(interval, tick);
  }

  void stopTimer() {
    timer?.cancel();
    timer = null;
  }

  controller = StreamController<int>(
      onListen: startTimer,
      onPause: stopTimer,
      onResume: startTimer,
      onCancel: stopTimer);

  return controller.stream;
}

listenWithPause() 函式中使用上面的這個 timedCounter 函式,執行後你就可以看到當訂閱暫停時列印輸出的計數也會暫停,爾後又可以正確地恢復。

你必須使用全部的回呼(Callback) onListenonCancelonPauseonResume 來通知暫停狀態的變化,否則如果訂閱狀態與暫停狀態在同一時間都改變了,只會有 onListenonCancel 回呼(Callback)會被呼叫。

最後的提示

當你不透過 async* 函式建立 Stream 時,請務必牢記以下幾點:

  • 使用同步控制器時要小心。例如,使用 StreamController(sync: true) 構造方法建立控制器。當你傳送一個事件到一個未暫停的同步控制器(例如:使用 EventSink 中定義的 add()addError()close() 方法),事件立即傳送給所有 Stream 的監聽器。在新增監聽器的程式碼返回之前,決不能呼叫 Stream 監聽器,而在錯誤的事件使用同步控制器會破壞該規則並導致其它正常程式碼執行失敗。因此,你應該避免使用同步控制器。

  • 如果你使用 StreamControlleronListen 回呼(Callback)會在 listen 方法呼叫返回 StreamSubscription 前返回。不要讓 onListen 回呼(Callback)依賴於已經存在的訂閱。例如,在下面的程式碼中,onListen 回呼(Callback)有可能會在 subscription 變數被初始化為一個有效值之前被觸發(同時 處理器 被呼叫)。

    subscription = stream.listen(handler);
  • 當 Stream 的監聽器狀態改變時,由 StreamController 定義的 onListenonPauseonResumeonCancel 回呼(Callback)會被呼叫,該呼叫絕不會發生在事件產生時或在某個狀態變化處理回呼(Callback)的呼叫期間。在這些情況出現時,狀態變化的回呼(Callback)會被延遲,直到上一個回呼(Callback)執行完成。

  • 不要嘗試自己去實現 Stream 介面。否則很容易在事件、回呼(Callback)以及新增和移除監聽器這些操作互動時出現一些難以察覺的錯誤。你應該總是使用一個現有的 Stream(比如由 StreamController 產生的)去實現新 Stream 中 listen 方法的呼叫。

  • 儘管你可以透過擴充 Stream 類並實現 listen 方法來實現更多額外的功能,但一般不建議這麼做,因為這樣會引入一個呼叫者必須考慮的新型別。相反,你可以建立一個(或多個)具有 Stream 的類而不是一個(或多個)Stream。