目錄

非同步程式設計:使用 stream

本章的重點

  • Stream 提供一個非同步的資料序列。

  • 資料序列包括使用者產生的事件和從檔案讀取的資料。

  • 你可以使用 Stream API 中的 listen() 方法和 await for 關鍵字來處理一個 Stream。

  • 當出現錯誤時,Stream 提供一種處理錯誤的方式。

  • Stream 有兩種型別:Single-Subscription 和 Broadcast。

FutureStream 類是 Dart 非同步程式設計的核心。

Future 表示一個不會立即完成的計算過程。與普通函式直接返回結果不同的是非同步函式返回一個將會包含結果的 Future。該 Future 會在結果準備好時通知呼叫者。

Stream 是一系列非同步事件的序列。其類似於一個非同步的 Iterable,不同的是當你向 Iterable 獲取下一個事件時它會立即給你,但是 Stream 則不會立即給你而是在它準備好時告訴你。

接收 Stream 事件

Stream 可以透過許多方式建立,這個話題我們會在另一篇文章詳述,而這些所有的建立方式都可以相同的方式在程式碼中使用:像使用 for迴圈 迭代一個 Iterable 一樣,我們可以使用 非同步 for迴圈 (通常我們直接稱之為 await for)來迭代 Stream 中的事件。例如:

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (final value in stream) {
    sum += value;
  }
  return sum;
}

該程式碼只是簡單地接收整型事件流中的每一個事件並將它們相加,然後返回(被 Future 包裹)相加後的整型值。當迴圈體結束時,函式會暫停直到下一個事件到達或 Stream 完成。

內部使用 await for 迴圈的函式需要使用 async 關鍵字標記。

下面的範例中使用了 async* 函式產生一個簡單的整型 Stream 來測試上一個程式碼片段:

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (final value in stream) {
    sum += value;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}

void main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // 55
}

錯誤事件

當 Stream 再也沒有需要處理的事件時會變為完成狀態,與此同時,呼叫者可以像接收到新事件回呼(Callback)那樣接收 Stream 完成的事件回呼(Callback)。當使用 await for 迴圈讀取事件時,迴圈會在 Stream 完成時停止。

有時在 Stream 完成前會出現錯誤;比如從遠端伺服器獲取檔案時出現網路請求失敗,或者建立事件時出現 bug,儘管錯誤總是會有可能存在,但它出現時應該告知使用者。

Stream 可以像提供資料事件那樣提供錯誤事件。大多數 Stream 會在第一次錯誤出現後停止,但其也可以提供多次錯誤並可以在在出現錯誤後繼續提供資料事件。在本篇文件中我們只討論 Stream 最多出現並提供一次錯誤事件的情況。

當使用 await for 讀取 Stream 時,如果出現錯誤,則由迴圈陳述式丟擲,同時迴圈結束。你可以使用 try-catch 陳述式捕獲錯誤。下面的範例會在迴圈迭代到引數值等於 4 時丟擲一個錯誤:

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  try {
    await for (final value in stream) {
      sum += value;
    }
  } catch (e) {
    return -1;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    if (i == 4) {
      throw Exception('Intentional exception');
    } else {
      yield i;
    }
  }
}

void main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // -1
}

Stream 的使用

Stream 類中包含了許多像 Iterable 類中一樣的輔助方法幫助你實現一些常用的操作。例如,你可以使用 Stream API 中的 lastWhere() 方法從 Stream 中找出最後一個正整數。

Future<int> lastPositive(Stream<int> stream) =>
    stream.lastWhere((x) => x >= 0);

Stream 的兩種型別

Stream 有兩種型別。

Single-Subscription 型別的 Stream

最常見的型別是一個 Stream 只包含了某個眾多事件序列的一個。而這些事件需要按順序提供並且不能丟失。當你讀取一個檔案或接收一個網頁請求時就需要使用這種型別的 Stream。

這種 Stream 只能設定一次監聽。重複設定則會丟失原來的事件,而導致你所監聽到的剩餘其它事件毫無意義。當你開始監聽時,資料將以塊的形式提供和獲取。

Broadcast 型別的 Stream

另一種流是針對單個訊息的,這種流可以一次處理一個訊息。例如可以將其用於瀏覽器的滑鼠事件。

你可以在任何時候監聽這種 Stream,且在此之後你可以獲取到任何觸發的事件。這種流可以在同一時間設定多個不同的監聽器同時監聽,同時你也可以在取消上一個訂閱後再次對其發起監聽。

處理 Stream 的方法

下面這些 Stream<T> 類中的方法可以對 Stream 進行處理並返回結果:

Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object? needle);
Future<E> drain<E>([E? futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function()? orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = '']);
Future<T> lastWhere(bool Function(T element) test, {T Function()? orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function()? orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();

上述所有的方法,除了 drain() and pipe() 方法外,都在 Iterable 類中有對應的相似方法。如果你在非同步函式中使用了 await for 迴圈(或者只是在另一個方法中使用),那麼使用上述的這些方法將會更加容易。例如,一些程式碼實現大概是這樣的:

Future<bool> contains(Object? needle) async {
  await for (final event in this) {
    if (event == needle) return true;
  }
  return false;
}

Future forEach(void Function(T element) action) async {
  await for (final event in this) {
    action(event);
  }
}

Future<List<T>> toList() async {
  final result = <T>[];
  await forEach(result.add);
  return result;
}

Future<String> join([String separator = '']) async =>
    (await toList()).join(separator);

(上述程式碼只是個簡單的範例,實際的實現邏輯可能要稍微複雜一點。)

修改 Stream 的方法

下面的方法可以對原始的 Stream 進行處理並返回新的 Stream。當呼叫了這些方法後,設定在原始 Stream 上的監聽器會先監聽被轉換後的新 Stream,待新的 Stream 處理完成後才會轉而回去監聽原始的 Stream。

Stream<R> cast<R>();
Stream<S> expand<S>(Iterable<S> Function(T element) convert);
Stream<S> map<S>(S Function(T event) convert);
Stream<T> skip(int count);
Stream<T> skipWhile(bool Function(T element) test);
Stream<T> take(int count);
Stream<T> takeWhile(bool Function(T element) test);
Stream<T> where(bool Function(T event) test);

Iterable 類中也有一些將一個 iterable 轉換為另一個 iterable 的方法,上述的這些方法與 Iterable 類中的這些方法相似。如果你在非同步函式中使用了 await for 迴圈,那麼使用上述的這些方法將會更加容易。

Stream<E> asyncExpand<E>(Stream<E>? Function(T event) convert);
Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert);
Stream<T> distinct([bool Function(T previous, T next)? equals]);

asyncExpand()asyncMap() 方法與 expand()map() 方法類似,不同的是前兩者允許將一個非同步函式作為函式引數。 Iterable 中沒有與 distinct() 類似的方法,但是在不久的將來可能會加上。

Stream<T> handleError(Function onError, {bool Function(dynamic error)? test});
Stream<T> timeout(Duration timeLimit,
    {void Function(EventSink<T> sink)? onTimeout});
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);

最後這三個方法比較特殊。它們用於處理 await for 迴圈不能處理的錯誤:當迴圈執行過程中出現錯誤時,該迴圈會結束同時取消 Stream 上的訂閱且不能恢復。你可以使用 handleError() 方法在 await for 迴圈中使用 Stream 前將相關錯誤移除。

Stream<S> mapLogErrors<S, T>(
  Stream<T> stream,
  S Function(T event) convert,
) async* {
  var streamWithoutErrors = stream.handleError((e) => log(e));
  await for (final event in streamWithoutErrors) {
    yield convert(event);
  }
}

transform() 方法

transform() 方法並不只是用於處理錯誤;它更是一個通用的 Stream “map 對映”。通常而言,一個 “map 對映”會為每一個輸入事件設定一個值。但是對於 I/O Stream 而言,它可能會使用多個輸入事件來產生一個輸出事件。這時候使用 StreamTransformer 就可以做到這一點。例如像 Utf8Decoder 這樣的解碼器就是一個變換器。一個變換器只需要實現一個 bind() 方法,其可透過 async 函式輕鬆實現。

讀取和解碼檔案

下面的程式碼範例讀取一個檔案並在其 Stream 上執行了兩次變換。第一次轉換是將檔案資料轉換成 UTF-8 編碼格式,然後將轉換後的資料變換成一個 LineSplitter 執行。檔案中除了 # 開頭的行外其它的行都會被打印出來。

import 'dart:convert';
import 'dart:io';

void main(List<String> args) async {
  var file = File(args[0]);
  var lines = utf8.decoder
      .bind(file.openRead())
      .transform(const LineSplitter());
  await for (final line in lines) {
    if (!line.startsWith('#')) print(line);
  }
}

listen() 方法

最後一個重要的方法是 listen()。這是一個“底層”方法,其它所有的 Stream 方法都根據 listen() 方法定義。

StreamSubscription<T> listen(void Function(T event)? onData,
    {Function? onError, void Function()? onDone, bool? cancelOnError});

你只需繼承 Stream 類並實現 listen() 方法來建立一個 Stream 型別的子類別。 Stream 類中所有其它的方法都依賴於對 listen() 方法的呼叫。

listen() 方法可以讓你對一個 Stream 進行監聽。在你對一個 Stream 進行監聽前,它只不過是個惰性物件,該物件描述了你想檢視的事件。當你對其進行監聽後,其會返回一個 StreamSubscription 物件,該物件用以表示一個生產事件的活躍的 Stream。這與 Iterable 物件的實現方式類似,不同的是 Iterable 物件可返回迭代器並可以進行真實的迭代操作。

Stream 允許你暫停、繼續甚至完全取消一個訂閱。你也可以為其設定一個回呼(Callback),該回調會在每一個數據事件、錯誤事件以及 Stream 自身關閉時通知呼叫者。

其它資源資訊

可以閱讀下面的文件獲取更多關於在 Dart 中使用 Stream 和非同步程式設計的資訊: