可迭代串流 (Iterable Streams)#

穩定性:1 - 實驗性

node:stream/iter 模組提供了一種基於可迭代物件 (iterables) 的串流 API,而非基於事件驅動的 Readable/Writable/Transform 類別階層,也非 Web Streams 的 ReadableStream/WritableStream/TransformStream 介面。

此模組僅在啟用 --experimental-stream-iter CLI 旗標時可用。

串流表示為 AsyncIterable<Uint8Array[]>(非同步)或 Iterable<Uint8Array[]>(同步)。沒有需要繼承的基底類別——任何實作可迭代協定的物件皆可參與。轉換則是單純的函式或具有 transform 方法的物件。

資料以批次 (batches)(每次迭代為 Uint8Array[])流動,以攤提非同步操作的成本。

import { from, pull, text } from 'node:stream/iter';
import { compressGzip, decompressGzip } from 'node:zlib/iter';

// Compress and decompress a string
const compressed = pull(from('Hello, world!'), compressGzip());
const result = await text(pull(compressed, decompressGzip()));
console.log(result); // 'Hello, world!'
const { from, pull, text } = require('node:stream/iter');
const { compressGzip, decompressGzip } = require('node:zlib/iter');

async function run() {
  // Compress and decompress a string
  const compressed = pull(from('Hello, world!'), compressGzip());
  const result = await text(pull(compressed, decompressGzip()));
  console.log(result); // 'Hello, world!'
}

run().catch(console.error);
import { open } from 'node:fs/promises';
import { text, pipeTo } from 'node:stream/iter';
import { compressGzip, decompressGzip } from 'node:zlib/iter';

// Read a file, compress, write to another file
const src = await open('input.txt', 'r');
const dst = await open('output.gz', 'w');
await pipeTo(src.pull(), compressGzip(), dst.writer({ autoClose: true }));
await src.close();

// Read it back
const gz = await open('output.gz', 'r');
console.log(await text(gz.pull(decompressGzip(), { autoClose: true })));
const { open } = require('node:fs/promises');
const { text, pipeTo } = require('node:stream/iter');
const { compressGzip, decompressGzip } = require('node:zlib/iter');

async function run() {
  // Read a file, compress, write to another file
  const src = await open('input.txt', 'r');
  const dst = await open('output.gz', 'w');
  await pipeTo(src.pull(), compressGzip(), dst.writer({ autoClose: true }));
  await src.close();

  // Read it back
  const gz = await open('output.gz', 'r');
  console.log(await text(gz.pull(decompressGzip(), { autoClose: true })));
}

run().catch(console.error);

概念#

位元組串流#

此 API 中的所有資料皆表示為 Uint8Array 位元組。當字串傳遞給 from()push()pipeTo() 時,會自動進行 UTF-8 編碼。這消除了編碼方面的歧義,並實現了串流與原生程式碼之間的零複製傳輸。

批次處理#

每次迭代會產出一個批次 (batch)——即 Uint8Array 區塊的陣列 (Uint8Array[])。批次處理將 await 和 Promise 建立的成本攤提至多個區塊中。若消費者一次處理一個區塊,只需迭代內部陣列即可。

for await (const batch of source) {
  for (const chunk of batch) {
    handle(chunk);
  }
}
async function run() {
  for await (const batch of source) {
    for (const chunk of batch) {
      handle(chunk);
    }
  }
}

轉換#

轉換分為兩種形式:

  • 無狀態 (Stateless) —— 函式 (chunks, options) => result,每個批次呼叫一次。接收 Uint8Array[](或作為清除訊號的 null)與 options 物件。回傳 Uint8Array[]null 或區塊的可迭代物件。

  • 有狀態 (Stateful) —— 物件 { transform(source, options) },其中 transform 是一個產生器(同步或非同步),接收整個上游可迭代物件與 options 物件,並產出輸出。此形式用於壓縮、加密以及任何需要跨批次緩衝的轉換。

兩種形式都會接收包含以下屬性的 options 參數:

  • options.signal <AbortSignal> 當管線取消、發生錯誤或消費者停止讀取時觸發的 AbortSignal。轉換可以檢查 signal.aborted 或監聽 'abort' 事件來執行早期清理。

清除訊號 (null) 會在來源結束後發送,讓轉換有機會發出結尾資料(例如壓縮頁尾)。

// Stateless: uppercase transform
const upper = (chunks) => {
  if (chunks === null) return null; // flush
  return chunks.map((c) => new TextEncoder().encode(
    new TextDecoder().decode(c).toUpperCase(),
  ));
};

// Stateful: line splitter
const lines = {
  transform: async function*(source) {
    let partial = '';
    for await (const chunks of source) {
      if (chunks === null) {
        if (partial) yield [new TextEncoder().encode(partial)];
        continue;
      }
      for (const chunk of chunks) {
        const str = partial + new TextDecoder().decode(chunk);
        const parts = str.split('\n');
        partial = parts.pop();
        for (const line of parts) {
          yield [new TextEncoder().encode(`${line}\n`)];
        }
      }
    }
  },
};

拉取與推送#

本 API 支援兩種模型:

  • 拉取 (Pull) —— 資料按需流動。pull()pullSync() 建立延遲載入的管線,僅在消費者迭代時才從來源讀取。

  • 推送 (Push) —— 資料被明確寫入。push() 建立一個帶有反壓 (backpressure) 的寫入器/可讀取對。寫入器將資料推入;可讀取端作為非同步可迭代物件進行消費。

反壓 (Backpressure)#

拉取串流具有天然的反壓機制——消費者驅動步調,因此讀取速度永遠不會超過消費者的處理速度。推送串流則需要明確的反壓,因為生產者與消費者是獨立運作的。push()broadcast()share() 上的 highWaterMarkbackpressure 選項控制此運作方式。

雙緩衝區模型#

推送串流使用兩部分的緩衝系統。將其想像成一個水桶(槽位),透過一條軟管(待處理寫入)注入水,並配有一個在水桶滿時關閉的浮球閥:

                          highWaterMark (e.g., 3)
                                 |
    Producer                     v
       |                    +---------+
       v                    |         |
  [ write() ] ----+    +--->| slots   |---> Consumer pulls
  [ write() ]     |    |    | (bucket)|     for await (...)
  [ write() ]     v    |    +---------+
              +--------+         ^
              | pending|         |
              | writes |    float valve
              | (hose) |    (backpressure)
              +--------+
                   ^
                   |
          'strict' mode limits this too!
  • 槽位 (水桶) —— 準備好給消費者使用的資料,上限為 highWaterMark。當消費者拉取時,它會一次將所有槽位排空至單一批次中。

  • 待處理寫入 (軟管) —— 等待槽位空間的寫入。在消費者排空後,待處理寫入會提升至現已清空的槽位中,其 Promise 隨之結算。

各策略如何使用這些緩衝區:

策略 槽位限制 待處理寫入限制
'strict' highWaterMark highWaterMark
'block' highWaterMark 無限制
'drop-oldest' highWaterMark 不適用 (永不等待)
'drop-newest' highWaterMark 不適用 (永不等待)
嚴格模式 (預設)#

嚴格模式會攔截「發後即忘」(fire-and-forget) 的模式,即生產者在未 await 的情況下呼叫 write(),這會導致無限制的記憶體增長。它將槽位緩衝區和待處理寫入佇列皆限制為 highWaterMark

如果您正確地 await 每個寫入,則一次最多只能有一個待處理寫入,因此永遠不會觸發待處理寫入限制。未 await 的寫入會累積在待處理佇列中,並在溢出時拋出錯誤。

import { push, text } from 'node:stream/iter';

const { writer, readable } = push({ highWaterMark: 16 });

// Consumer must run concurrently -- without it, the first write
// that fills the buffer blocks the producer forever.
const consuming = text(readable);

// GOOD: awaited writes. The producer waits for the consumer to
// make room when the buffer is full.
for (const item of dataset) {
  await writer.write(item);
}
await writer.end();
console.log(await consuming);
const { push, text } = require('node:stream/iter');

async function run() {
  const { writer, readable } = push({ highWaterMark: 16 });

  // Consumer must run concurrently -- without it, the first write
  // that fills the buffer blocks the producer forever.
  const consuming = text(readable);

  // GOOD: awaited writes. The producer waits for the consumer to
  // make room when the buffer is full.
  for (const item of dataset) {
    await writer.write(item);
  }
  await writer.end();
  console.log(await consuming);
}

run().catch(console.error);

忘記 await 最終會導致拋出錯誤。

// BAD: fire-and-forget. Strict mode throws once both buffers fill.
for (const item of dataset) {
  writer.write(item); // Not awaited -- queues without bound
}
// --> throws "Backpressure violation: too many pending writes"
阻塞 (Block)#

阻塞模式將槽位上限設為 highWaterMark,但對待處理寫入佇列沒有限制。已 await 的寫入會阻塞,直到消費者騰出空間,這與嚴格模式相同。區別在於未 await 的寫入會默默地永遠排隊,而不是拋出錯誤——如果生產者忘記 await,這可能導致記憶體洩漏。

這是現有 Node.js 傳統串流和 Web Streams 的預設模式。當您能控制生產者並確信其正確使用 await,或是從這些 API 遷移程式碼時使用此模式。

import { push, text } from 'node:stream/iter';

const { writer, readable } = push({
  highWaterMark: 16,
  backpressure: 'block',
});

const consuming = text(readable);

// Safe -- awaited writes block until the consumer reads.
for (const item of dataset) {
  await writer.write(item);
}
await writer.end();
console.log(await consuming);
const { push, text } = require('node:stream/iter');

async function run() {
  const { writer, readable } = push({
    highWaterMark: 16,
    backpressure: 'block',
  });

  const consuming = text(readable);

  // Safe -- awaited writes block until the consumer reads.
  for (const item of dataset) {
    await writer.write(item);
  }
  await writer.end();
  console.log(await consuming);
}

run().catch(console.error);
丟棄最舊 (Drop-oldest)#

寫入永不等待。當槽位緩衝區滿時,最舊的緩衝區塊會被移除,為傳入的寫入騰出空間。消費者永遠看到最新的資料。適用於即時饋送、遙測或任何過期資料價值低於最新資料的情況。

import { push } from 'node:stream/iter';

// Keep only the 5 most recent readings
const { writer, readable } = push({
  highWaterMark: 5,
  backpressure: 'drop-oldest',
});
const { push } = require('node:stream/iter');

// Keep only the 5 most recent readings
const { writer, readable } = push({
  highWaterMark: 5,
  backpressure: 'drop-oldest',
});
丟棄最新 (Drop-newest)#

寫入永不等待。當槽位緩衝區滿時,傳入的寫入會被默默丟棄。消費者處理已緩衝的資料,而不會被新資料淹沒。適用於限流或在高壓下削減負載。

import { push } from 'node:stream/iter';

// Accept up to 10 buffered items; discard anything beyond that
const { writer, readable } = push({
  highWaterMark: 10,
  backpressure: 'drop-newest',
});
const { push } = require('node:stream/iter');

// Accept up to 10 buffered items; discard anything beyond that
const { writer, readable } = push({
  highWaterMark: 10,
  backpressure: 'drop-newest',
});

寫入器介面#

寫入器是任何符合寫入器介面的物件。僅 write() 是必須的;所有其他方法皆為選擇性。

每個非同步方法都有一個設計用於 try-fallback 模式的同步 *Sync 對應版本:先嘗試快速的同步路徑,僅在同步呼叫指示無法完成時才退回到非同步版本。

if (!writer.writeSync(chunk)) await writer.write(chunk);
if (!writer.writevSync(chunks)) await writer.writev(chunks);
if (writer.endSync() < 0) await writer.end();
writer.fail(err);  // Always synchronous, no fallback needed

writer.desiredSize#

達到高水位線之前可用的緩衝槽位數量。若寫入器已關閉或消費者已斷開連線,回傳 null

該值永遠為非負數。

writer.end([options])#

  • options <Object>
    • signal <AbortSignal> 僅取消此操作。該訊號僅取消待處理的 end() 呼叫;不會使寫入器本身失效。
  • 回傳:{Promise} 寫入的總位元組數。

訊號通知不會再有資料寫入。

writer.endSync()#

  • 回傳:<number> 寫入的總位元組數,若寫入器未開啟則為 -1

writer.end() 的同步版本。若寫入器已關閉或發生錯誤,回傳 -1。可用於 try-fallback 模式。

const result = writer.endSync();
if (result < 0) {
  writer.end();
}

writer.fail(reason)#

將寫入器置於終止錯誤狀態。若寫入器已關閉或發生錯誤,則此操作為空操作 (no-op)。與 write()end() 不同,fail() 是絕對同步的,因為使寫入器失效是一個純粹的狀態轉換,無需執行任何非同步工作。

writer.write(chunk[, options])#

寫入區塊。當緩衝區空間可用時,Promise 會結算。

writer.writeSync(chunk)#

同步寫入。不會阻塞;若反壓機制處於活動狀態,回傳 false

writer.writev(chunks[, options])#

將多個區塊作為單一批次寫入。

writer.writevSync(chunks)#

同步批次寫入。

stream/iter 模組#

所有函式皆可透過具名匯出或作為 Stream 命名空間物件的屬性使用。

// Named exports
import { from, pull, bytes, Stream } from 'node:stream/iter';

// Namespace access
Stream.from('hello');
// Named exports
const { from, pull, bytes, Stream } = require('node:stream/iter');

// Namespace access
Stream.from('hello');

在模組識別碼前加上 node: 前綴是可選的。

來源#

from(input)#

從給定輸入建立非同步位元組串流。字串會進行 UTF-8 編碼。ArrayBufferArrayBufferView 的值會被包裝為 Uint8Array。陣列和可迭代物件會被遞迴展平並正規化。

實作 Symbol.for('Stream.toAsyncStreamable')Symbol.for('Stream.toStreamable') 的物件會透過這些協定進行轉換。toAsyncStreamable 協定優先於 toStreamable,後者又優先於迭代協定 (Symbol.asyncIterator, Symbol.iterator)。

import { Buffer } from 'node:buffer';
import { from, text } from 'node:stream/iter';

console.log(await text(from('hello')));       // 'hello'
console.log(await text(from(Buffer.from('hello')))); // 'hello'
const { Buffer } = require('node:buffer');
const { from, text } = require('node:stream/iter');

async function run() {
  console.log(await text(from('hello')));       // 'hello'
  console.log(await text(from(Buffer.from('hello')))); // 'hello'
}

run().catch(console.error);

fromSync(input)#

from() 的同步版本。回傳同步可迭代物件。無法接受非同步可迭代物件或 Promise。實作 Symbol.for('Stream.toStreamable') 的物件會透過該協定進行轉換(優先於 Symbol.iterator)。toAsyncStreamable 協定則會被完全忽略。

import { fromSync, textSync } from 'node:stream/iter';

console.log(textSync(fromSync('hello'))); // 'hello'
const { fromSync, textSync } = require('node:stream/iter');

console.log(textSync(fromSync('hello'))); // 'hello'

管線#

pipeTo(source[, ...transforms], writer[, options])#

  • source <AsyncIterable> | <Iterable> 資料來源。
  • ...transforms <Function> | <Object> 零個或多個要應用的轉換。
  • writer <Object> 具有 write(chunk) 方法的目標。
  • options <Object>
    • signal <AbortSignal> 中止管線。
    • preventClose <boolean> 若為 true,當來源結束時不呼叫 writer.end()預設: false
    • preventFail <boolean> 若為 true,發生錯誤時不呼叫 writer.fail()預設: false
  • 回傳:{Promise} 寫入的總位元組數。

透過轉換將來源導向寫入器。若寫入器具有 writev(chunks) 方法,整個批次將在單一呼叫中傳遞(啟用 scatter/gather I/O)。

若寫入器實作了選擇性的 *Sync 方法 (writeSync, writevSync, endSync),pipeTo() 會嘗試優先使用同步方法作為快速路徑,僅在同步方法指示無法完成(例如反壓或等待下一個 tick)時才退回到非同步版本。fail() 永遠同步呼叫。

import { from, pipeTo } from 'node:stream/iter';
import { compressGzip } from 'node:zlib/iter';
import { open } from 'node:fs/promises';

const fh = await open('output.gz', 'w');
const totalBytes = await pipeTo(
  from('Hello, world!'),
  compressGzip(),
  fh.writer({ autoClose: true }),
);
const { from, pipeTo } = require('node:stream/iter');
const { compressGzip } = require('node:zlib/iter');
const { open } = require('node:fs/promises');

async function run() {
  const fh = await open('output.gz', 'w');
  const totalBytes = await pipeTo(
    from('Hello, world!'),
    compressGzip(),
    fh.writer({ autoClose: true }),
  );
}

run().catch(console.error);

pipeToSync(source[, ...transforms], writer[, options])#

pipeTo() 的同步版本。source、所有轉換和 writer 必須是同步的。無法接受非同步可迭代物件或 Promise。

writer 必須具有 *Sync 方法 (writeSync, writevSync, endSync) 以及 fail() 才能運作。

pull(source[, ...transforms][, options])#

建立延遲載入的非同步管線。在回傳的可迭代物件被消費前,不會從 source 讀取資料。轉換會按順序應用。

import { from, pull, text } from 'node:stream/iter';

const asciiUpper = (chunks) => {
  if (chunks === null) return null;
  return chunks.map((c) => {
    for (let i = 0; i < c.length; i++) {
      c[i] -= (c[i] >= 97 && c[i] <= 122) * 32;
    }
    return c;
  });
};

const result = pull(from('hello'), asciiUpper);
console.log(await text(result)); // 'HELLO'
const { from, pull, text } = require('node:stream/iter');

const asciiUpper = (chunks) => {
  if (chunks === null) return null;
  return chunks.map((c) => {
    for (let i = 0; i < c.length; i++) {
      c[i] -= (c[i] >= 97 && c[i] <= 122) * 32;
    }
    return c;
  });
};

async function run() {
  const result = pull(from('hello'), asciiUpper);
  console.log(await text(result)); // 'HELLO'
}

run().catch(console.error);

使用 AbortSignal

import { pull } from 'node:stream/iter';

const ac = new AbortController();
const result = pull(source, transform, { signal: ac.signal });
ac.abort(); // Pipeline throws AbortError on next iteration
const { pull } = require('node:stream/iter');

const ac = new AbortController();
const result = pull(source, transform, { signal: ac.signal });
ac.abort(); // Pipeline throws AbortError on next iteration

pullSync(source[, ...transforms])#

  • source <Iterable> 同步資料來源。
  • ...transforms <Function> | <Object> 零個或多個同步轉換。
  • 回傳:{Iterable<Uint8Array[]>}

pull() 的同步版本。所有轉換必須是同步的。

推送串流#

push([...transforms][, options])#

  • ...transforms <Function> | <Object> 應用於可讀取端的選擇性轉換。
  • options <Object>
    • highWaterMark <number> 在應用反壓之前最大緩衝槽位數。必須大於或等於 1;小於 1 的值會被截斷為 1。預設: 4
    • backpressure <string> 反壓策略:'strict', 'block', 'drop-oldest''drop-newest'預設: 'strict'
    • signal <AbortSignal> 中止串流。
  • 傳回:<Object>
    • writer {PushWriter} 寫入器端。
    • readable {AsyncIterable<Uint8Array[]>} 可讀取端。

建立帶有反壓的推送串流。寫入器將資料推入;可讀取端作為非同步可迭代物件進行消費。

import { push, text } from 'node:stream/iter';

const { writer, readable } = push();

// Producer and consumer must run concurrently. With strict backpressure
// (the default), awaited writes block until the consumer reads.
const producing = (async () => {
  await writer.write('hello');
  await writer.write(' world');
  await writer.end();
})();

console.log(await text(readable)); // 'hello world'
await producing;
const { push, text } = require('node:stream/iter');

async function run() {
  const { writer, readable } = push();

  // Producer and consumer must run concurrently. With strict backpressure
  // (the default), awaited writes block until the consumer reads.
  const producing = (async () => {
    await writer.write('hello');
    await writer.write(' world');
    await writer.end();
  })();

  console.log(await text(readable)); // 'hello world'
  await producing;
}

run().catch(console.error);

push() 回傳的寫入器符合 [Writer 介面][]。

雙向通道#

duplex([options])#

  • options <Object>
    • highWaterMark <number> 兩個方向的緩衝區大小。預設: 4
    • backpressure <string> 兩個方向的策略。預設: 'strict'
    • signal <AbortSignal> 兩個通道的取消訊號。
    • a <Object> A-to-B 方向的特定選項。覆蓋共用選項。
    • highWaterMark <number>
    • backpressure <string>
  • b <Object> B-to-A 方向的特定選項。覆蓋共用選項。
  • highWaterMark <number>
  • backpressure <string>
  • 回傳:<Array> 一對雙向通道 [channelA, channelB]

    建立一對連接的雙向通道以進行雙向通訊,類似於 socketpair()。寫入一個通道寫入器的資料會出現在另一個通道的可讀取端。

    每個通道具有:

    • writer — 用於向對端發送資料的 [Writer 介面][] 物件。
    • readable — 用於從對端讀取資料的 AsyncIterable<Uint8Array[]>
    • close() — 關閉通道此端(冪等)。
    • [Symbol.asyncDispose]() — 用於 await using 的非同步處置支援。
    import { duplex, text } from 'node:stream/iter';
    
    const [client, server] = duplex();
    
    // Server echoes back
    const serving = (async () => {
      for await (const chunks of server.readable) {
        await server.writer.writev(chunks);
      }
    })();
    
    await client.writer.write('hello');
    await client.writer.end();
    
    console.log(await text(server.readable)); // handled by echo
    await serving;
    const { duplex, text } = require('node:stream/iter');
    
    async function run() {
      const [client, server] = duplex();
    
      // Server echoes back
      const serving = (async () => {
        for await (const chunks of server.readable) {
          await server.writer.writev(chunks);
        }
      })();
    
      await client.writer.write('hello');
      await client.writer.end();
    
      console.log(await text(server.readable)); // handled by echo
      await serving;
    }
    
    run().catch(console.error);
    
  • 消費者#

    array(source[, options])#

    • source {AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>}
    • options <Object>
      • signal <AbortSignal>
      • limit <number> 最大消費位元組數。若收集的總位元組超過限制,會拋出 ERR_OUT_OF_RANGE 錯誤。
    • 回傳:{Promise<Uint8Array[]>}

    將所有區塊收集為 Uint8Array 值的陣列(不進行串接)。

    arrayBuffer(source[, options])#

    • source {AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>}
    • options <Object>
      • signal <AbortSignal>
      • limit <number> 最大消費位元組數。若收集的總位元組超過限制,會拋出 ERR_OUT_OF_RANGE 錯誤。
    • 回傳:{Promise}

    將所有位元組收集到一個 ArrayBuffer 中。

    arrayBufferSync(source[, options])#

    • source {Iterable<Uint8Array[]>}
    • options <Object>
      • limit <number> 最大消費位元組數。若收集的總位元組超過限制,會拋出 ERR_OUT_OF_RANGE 錯誤。
    • 回傳:<ArrayBuffer>

    arrayBuffer() 的同步版本。

    arraySync(source[, options])#

    • source {Iterable<Uint8Array[]>}
    • options <Object>
      • limit <number> 最大消費位元組數。若收集的總位元組超過限制,會拋出 ERR_OUT_OF_RANGE 錯誤。
    • 回傳:<Uint8Array[]>

    array() 的同步版本。

    bytes(source[, options])#

    • source {AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>}
    • options <Object>
      • signal <AbortSignal>
      • limit <number> 最大消費位元組數。若收集的總位元組超過限制,會拋出 ERR_OUT_OF_RANGE 錯誤。
    • 回傳:{Promise}

    將串流中的所有位元組收集為單一 Uint8Array

    import { from, bytes } from 'node:stream/iter';
    
    const data = await bytes(from('hello'));
    console.log(data); // Uint8Array(5) [ 104, 101, 108, 108, 111 ]
    const { from, bytes } = require('node:stream/iter');
    
    async function run() {
      const data = await bytes(from('hello'));
      console.log(data); // Uint8Array(5) [ 104, 101, 108, 108, 111 ]
    }
    
    run().catch(console.error);
    

    bytesSync(source[, options])#

    • source {Iterable<Uint8Array[]>}
    • options <Object>
      • limit <number> 最大消費位元組數。若收集的總位元組超過限制,會拋出 ERR_OUT_OF_RANGE 錯誤。
    • 回傳:<Uint8Array>

    bytes() 的同步版本。

    text(source[, options])#

    • source {AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>}
    • options <Object>
      • encoding <string> 文字編碼。預設: 'utf-8'
      • signal <AbortSignal>
      • limit <number> 最大消費位元組數。若收集的總位元組超過限制,會拋出 ERR_OUT_OF_RANGE 錯誤。
    • 回傳:{Promise}

    收集所有位元組並解碼為文字。

    import { from, text } from 'node:stream/iter';
    
    console.log(await text(from('hello'))); // 'hello'
    const { from, text } = require('node:stream/iter');
    
    async function run() {
      console.log(await text(from('hello'))); // 'hello'
    }
    
    run().catch(console.error);
    

    textSync(source[, options])#

    • source {Iterable<Uint8Array[]>}
    • options <Object>
      • encoding <string> 預設: 'utf-8'
      • limit <number> 最大消費位元組數。若收集的總位元組超過限制,會拋出 ERR_OUT_OF_RANGE 錯誤。
    • 傳回:<string>

    text() 的同步版本。

    公用程式#

    ondrain(drainable)#

    • drainable <Object> 實作 drainable 協定的物件。
    • 回傳:{Promise|null}

    等待 drainable 寫入器的反壓解除。當寫入器可以接收更多資料時,回傳結算為 true 的 Promise;若物件未實作 drainable 協定,則回傳 null

    import { push, ondrain, text } from 'node:stream/iter';
    
    const { writer, readable } = push({ highWaterMark: 2 });
    writer.writeSync('a');
    writer.writeSync('b');
    
    // Start consuming so the buffer can actually drain
    const consuming = text(readable);
    
    // Buffer is full -- wait for drain
    const canWrite = await ondrain(writer);
    if (canWrite) {
      await writer.write('c');
    }
    await writer.end();
    await consuming;
    const { push, ondrain, text } = require('node:stream/iter');
    
    async function run() {
      const { writer, readable } = push({ highWaterMark: 2 });
      writer.writeSync('a');
      writer.writeSync('b');
    
      // Start consuming so the buffer can actually drain
      const consuming = text(readable);
    
      // Buffer is full -- wait for drain
      const canWrite = await ondrain(writer);
      if (canWrite) {
        await writer.write('c');
      }
      await writer.end();
      await consuming;
    }
    
    run().catch(console.error);
    

    merge(...sources[, options])#

    • ...sources {AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>} 兩個或多個可迭代物件。
    • options <Object>
    • 回傳:{AsyncIterable<Uint8Array[]>}

    透過依時間順序產出批次來合併多個非同步可迭代物件(哪一個來源先產生資料,就先產出哪一個)。所有來源皆同時被消費。

    import { from, merge, text } from 'node:stream/iter';
    
    const merged = merge(from('hello '), from('world'));
    console.log(await text(merged)); // Order depends on timing
    const { from, merge, text } = require('node:stream/iter');
    
    async function run() {
      const merged = merge(from('hello '), from('world'));
      console.log(await text(merged)); // Order depends on timing
    }
    
    run().catch(console.error);
    

    tap(callback)#

    • callback <Function> (chunks) => void 每個批次呼叫一次。
    • 回傳:<Function> 一個無狀態轉換。

    建立一個觀察批次而不修改它們的直通轉換 (pass-through transform)。對於日誌記錄、指標收集或除錯非常有用。

    import { from, pull, text, tap } from 'node:stream/iter';
    
    const result = pull(
      from('hello'),
      tap((chunks) => console.log('Batch size:', chunks.length)),
    );
    console.log(await text(result));
    const { from, pull, text, tap } = require('node:stream/iter');
    
    async function run() {
      const result = pull(
        from('hello'),
        tap((chunks) => console.log('Batch size:', chunks.length)),
      );
      console.log(await text(result));
    }
    
    run().catch(console.error);
    

    tap() 刻意不防止回呼函數就地修改區塊,但回傳值會被忽略。

    tapSync(callback)#

    tap() 的同步版本。

    多消費者#

    broadcast([options])#

    • options <Object>
      • highWaterMark <number> 槽位中的緩衝區大小。必須大於或等於 1;小於 1 的值會被截斷為 1。預設: 16
      • backpressure <string> 'strict', 'block', 'drop-oldest''drop-newest'預設: 'strict'
      • signal <AbortSignal>
    • 傳回:<Object>
      • writer {BroadcastWriter}
      • broadcast {Broadcast}

    建立推送模型的多消費者廣播通道。單一寫入器將資料推送到多個消費者。每個消費者都有一個指向共用緩衝區的獨立游標。

    import { broadcast, text } from 'node:stream/iter';
    
    const { writer, broadcast: bc } = broadcast();
    
    // Create consumers before writing
    const c1 = bc.push();  // Consumer 1
    const c2 = bc.push();  // Consumer 2
    
    // Producer and consumers must run concurrently. Awaited writes
    // block when the buffer fills until consumers read.
    const producing = (async () => {
      await writer.write('hello');
      await writer.end();
    })();
    
    const [r1, r2] = await Promise.all([text(c1), text(c2)]);
    console.log(r1); // 'hello'
    console.log(r2); // 'hello'
    await producing;
    const { broadcast, text } = require('node:stream/iter');
    
    async function run() {
      const { writer, broadcast: bc } = broadcast();
    
      // Create consumers before writing
      const c1 = bc.push();  // Consumer 1
      const c2 = bc.push();  // Consumer 2
    
      // Producer and consumers must run concurrently. Awaited writes
      // block when the buffer fills until consumers read.
      const producing = (async () => {
        await writer.write('hello');
        await writer.end();
      })();
    
      const [r1, r2] = await Promise.all([text(c1), text(c2)]);
      console.log(r1); // 'hello'
      console.log(r2); // 'hello'
      await producing;
    }
    
    run().catch(console.error);
    
    broadcast.bufferSize#

    目前緩衝的區塊數。

    broadcast.cancel([reason])#

    取消廣播。所有消費者皆會收到錯誤。

    broadcast.consumerCount#

    活動消費者數量。

    broadcast.push([...transforms][, options])#

    建立新消費者。每個消費者接收從訂閱點開始寫入廣播的所有資料。選擇性轉換會應用於該消費者看到的資料視圖。

    broadcast[Symbol.dispose]()#

    broadcast.cancel() 的別名。

    Broadcast.from(input[, options])#

    從現有來源建立 {Broadcast}。來源會被自動消費並推送給所有訂閱者。

    share(source[, options])#

    • source <AsyncIterable> 要共用的來源。
    • options <Object>
      • highWaterMark <number> 緩衝區大小。必須大於或等於 1;小於 1 的值會被截斷為 1。預設: 16
      • backpressure <string> 'strict', 'block', 'drop-oldest''drop-newest'預設: 'strict'
    • 回傳:{Share}

    建立拉取模型的多消費者共用串流。與 broadcast() 不同,來源僅在消費者拉取時才讀取。多個消費者共用單一緩衝區。

    import { from, share, text } from 'node:stream/iter';
    
    const shared = share(from('hello'));
    
    const c1 = shared.pull();
    const c2 = shared.pull();
    
    // Consume concurrently to avoid deadlock with small buffers.
    const [r1, r2] = await Promise.all([text(c1), text(c2)]);
    console.log(r1); // 'hello'
    console.log(r2); // 'hello'
    const { from, share, text } = require('node:stream/iter');
    
    async function run() {
      const shared = share(from('hello'));
    
      const c1 = shared.pull();
      const c2 = shared.pull();
    
      // Consume concurrently to avoid deadlock with small buffers.
      const [r1, r2] = await Promise.all([text(c1), text(c2)]);
      console.log(r1); // 'hello'
      console.log(r2); // 'hello'
    }
    
    run().catch(console.error);
    
    share.bufferSize#

    目前緩衝的區塊數。

    share.cancel([reason])#

    取消共用。所有消費者皆會收到錯誤。

    share.consumerCount#

    活動消費者數量。

    share.pull([...transforms][, options])#

    建立共用來源的新消費者。

    share[Symbol.dispose]()#

    share.cancel() 的別名。

    Share.from(input[, options])#

    從現有來源建立 {Share}。

    shareSync(source[, options])#

    • source <Iterable> 要共用的同步來源。
    • options <Object>
      • highWaterMark <number> 必須大於或等於 1;小於 1 的值會被截斷為 1。預設: 16
      • backpressure <string> 預設: 'strict'
    • 回傳:{SyncShare}

    share() 的同步版本。

    SyncShare.fromSync(input[, options])#

    壓縮與解壓縮轉換#

    用於 pull()pullSync()pipeTo()pipeToSync() 的壓縮與解壓縮轉換可透過 node:zlib/iter 模組取得。詳細資訊請參閱 node:zlib/iter 文件

    協定符號#

    這些著名的符號允許第三方物件參與串流協定,而無需直接從 node:stream/iter 匯入。

    Stream.broadcastProtocol#

    • 值:Symbol.for('Stream.broadcastProtocol')

    該值必須是一個函式。當被 Broadcast.from() 呼叫時,它會接收傳遞給 Broadcast.from() 的選項,且必須回傳符合 {Broadcast} 介面的物件。實作是完全自訂的——它可以隨心所欲地管理消費者、緩衝和反壓。

    import { Broadcast, text } from 'node:stream/iter';
    
    // This example defers to the built-in Broadcast, but a custom
    // implementation could use any mechanism.
    class MessageBus {
      #broadcast;
      #writer;
    
      constructor() {
        const { writer, broadcast } = Broadcast();
        this.#writer = writer;
        this.#broadcast = broadcast;
      }
    
      [Symbol.for('Stream.broadcastProtocol')](options) {
        return this.#broadcast;
      }
    
      send(data) {
        this.#writer.write(new TextEncoder().encode(data));
      }
    
      close() {
        this.#writer.end();
      }
    }
    
    const bus = new MessageBus();
    const { broadcast } = Broadcast.from(bus);
    const consumer = broadcast.push();
    bus.send('hello');
    bus.close();
    console.log(await text(consumer)); // 'hello'
    const { Broadcast, text } = require('node:stream/iter');
    
    // This example defers to the built-in Broadcast, but a custom
    // implementation could use any mechanism.
    class MessageBus {
      #broadcast;
      #writer;
    
      constructor() {
        const { writer, broadcast } = Broadcast();
        this.#writer = writer;
        this.#broadcast = broadcast;
      }
    
      [Symbol.for('Stream.broadcastProtocol')](options) {
        return this.#broadcast;
      }
    
      send(data) {
        this.#writer.write(new TextEncoder().encode(data));
      }
    
      close() {
        this.#writer.end();
      }
    }
    
    const bus = new MessageBus();
    const { broadcast } = Broadcast.from(bus);
    const consumer = broadcast.push();
    bus.send('hello');
    bus.close();
    text(consumer).then(console.log); // 'hello'
    

    Stream.drainableProtocol#

    • 值:Symbol.for('Stream.drainableProtocol')

    實作此以使寫入器與 ondrain() 相容。該方法應回傳一個在反壓解除時結算的 Promise,若無反壓則回傳 null

    import { ondrain } from 'node:stream/iter';
    
    class CustomWriter {
      #queue = [];
      #drain = null;
      #closed = false;
      [Symbol.for('Stream.drainableProtocol')]() {
        if (this.#closed) return null;
        if (this.#queue.length < 3) return Promise.resolve(true);
        this.#drain ??= Promise.withResolvers();
        return this.#drain.promise;
      }
      write(chunk) {
        this.#queue.push(chunk);
      }
      flush() {
        this.#queue.length = 0;
        this.#drain?.resolve(true);
        this.#drain = null;
      }
      close() {
        this.#closed = true;
      }
    }
    const writer = new CustomWriter();
    const ready = ondrain(writer);
    console.log(ready); // Promise { true } -- no backpressure
    const { ondrain } = require('node:stream/iter');
    
    class CustomWriter {
      #queue = [];
      #drain = null;
      #closed = false;
    
      [Symbol.for('Stream.drainableProtocol')]() {
        if (this.#closed) return null;
        if (this.#queue.length < 3) return Promise.resolve(true);
        this.#drain ??= Promise.withResolvers();
        return this.#drain.promise;
      }
    
      write(chunk) {
        this.#queue.push(chunk);
      }
    
      flush() {
        this.#queue.length = 0;
        this.#drain?.resolve(true);
        this.#drain = null;
      }
    
      close() {
        this.#closed = true;
      }
    }
    
    const writer = new CustomWriter();
    const ready = ondrain(writer);
    console.log(ready); // Promise { true } -- no backpressure
    

    Stream.shareProtocol#

    • 值:Symbol.for('Stream.shareProtocol')

    該值必須是一個函式。當被 Share.from() 呼叫時,它會接收傳遞給 Share.from() 的選項,且必須回傳符合 {Share} 介面的物件。實作是完全自訂的——它可以隨心所欲地管理共用來源、消費者、緩衝和反壓。

    import { share, Share, text } from 'node:stream/iter';
    
    // This example defers to the built-in share(), but a custom
    // implementation could use any mechanism.
    class DataPool {
      #share;
    
      constructor(source) {
        this.#share = share(source);
      }
    
      [Symbol.for('Stream.shareProtocol')](options) {
        return this.#share;
      }
    }
    
    const pool = new DataPool(
      (async function* () {
        yield 'hello';
      })(),
    );
    
    const shared = Share.from(pool);
    const consumer = shared.pull();
    console.log(await text(consumer)); // 'hello'
    const { share, Share, text } = require('node:stream/iter');
    
    // This example defers to the built-in share(), but a custom
    // implementation could use any mechanism.
    class DataPool {
      #share;
    
      constructor(source) {
        this.#share = share(source);
      }
    
      [Symbol.for('Stream.shareProtocol')](options) {
        return this.#share;
      }
    }
    
    const pool = new DataPool(
      (async function* () {
        yield 'hello';
      })(),
    );
    
    const shared = Share.from(pool);
    const consumer = shared.pull();
    text(consumer).then(console.log); // 'hello'
    

    Stream.shareSyncProtocol#

    • 值:Symbol.for('Stream.shareSyncProtocol')

    該值必須是一個函式。當被 SyncShare.fromSync() 呼叫時,它會接收傳遞給 SyncShare.fromSync() 的選項,且必須回傳符合 {SyncShare} 介面的物件。實作是完全自訂的——它可以隨心所欲地管理共用來源、消費者和緩衝。

    import { shareSync, SyncShare, textSync } from 'node:stream/iter';
    
    // This example defers to the built-in shareSync(), but a custom
    // implementation could use any mechanism.
    class SyncDataPool {
      #share;
    
      constructor(source) {
        this.#share = shareSync(source);
      }
    
      [Symbol.for('Stream.shareSyncProtocol')](options) {
        return this.#share;
      }
    }
    
    const encoder = new TextEncoder();
    const pool = new SyncDataPool(
      function* () {
        yield [encoder.encode('hello')];
      }(),
    );
    
    const shared = SyncShare.fromSync(pool);
    const consumer = shared.pull();
    console.log(textSync(consumer)); // 'hello'
    const { shareSync, SyncShare, textSync } = require('node:stream/iter');
    
    // This example defers to the built-in shareSync(), but a custom
    // implementation could use any mechanism.
    class SyncDataPool {
      #share;
    
      constructor(source) {
        this.#share = shareSync(source);
      }
    
      [Symbol.for('Stream.shareSyncProtocol')](options) {
        return this.#share;
      }
    }
    
    const encoder = new TextEncoder();
    const pool = new SyncDataPool(
      function* () {
        yield [encoder.encode('hello')];
      }(),
    );
    
    const shared = SyncShare.fromSync(pool);
    const consumer = shared.pull();
    console.log(textSync(consumer)); // 'hello'
    

    Stream.toAsyncStreamable#

    • 值:Symbol.for('Stream.toAsyncStreamable')

    該值必須是一個將物件轉換為可串流值的函式。當物件在串流管線中的任何位置出現時(例如作為傳遞給 from() 的來源,或作為轉換回傳的值),會呼叫此方法來產生實際資料。它可以回傳(或結算為)任何可串流的值:字串、Uint8ArrayAsyncIterableIterable 或另一個可串流物件。

    import { from, text } from 'node:stream/iter';
    
    class Greeting {
      #name;
    
      constructor(name) {
        this.#name = name;
      }
    
      [Symbol.for('Stream.toAsyncStreamable')]() {
        return `hello ${this.#name}`;
      }
    }
    
    const stream = from(new Greeting('world'));
    console.log(await text(stream)); // 'hello world'
    const { from, text } = require('node:stream/iter');
    
    class Greeting {
      #name;
    
      constructor(name) {
        this.#name = name;
      }
    
      [Symbol.for('Stream.toAsyncStreamable')]() {
        return `hello ${this.#name}`;
      }
    }
    
    const stream = from(new Greeting('world'));
    text(stream).then(console.log); // 'hello world'
    

    Stream.toStreamable#

    • 值:Symbol.for('Stream.toStreamable')

    該值必須是一個同步將物件轉換為可串流值的函式。當物件在串流管線中的任何位置出現時(例如作為傳遞給 fromSync() 的來源,或作為同步轉換回傳的值),會呼叫此方法來產生實際資料。它必須同步回傳一個可串流的值:字串、Uint8ArrayIterable

    import { fromSync, textSync } from 'node:stream/iter';
    
    class Greeting {
      #name;
    
      constructor(name) {
        this.#name = name;
      }
    
      [Symbol.for('Stream.toStreamable')]() {
        return `hello ${this.#name}`;
      }
    }
    
    const stream = fromSync(new Greeting('world'));
    console.log(textSync(stream)); // 'hello world'
    const { fromSync, textSync } = require('node:stream/iter');
    
    class Greeting {
      #name;
    
      constructor(name) {
        this.#name = name;
      }
    
      [Symbol.for('Stream.toStreamable')]() {
        return `hello ${this.#name}`;
      }
    }
    
    const stream = fromSync(new Greeting('world'));
    console.log(textSync(stream)); // 'hello world'