Node.js v26.0.0 說明文件
- Node.js v26.0.0
- 目錄
- 可迭代串流 (Iterable Streams)
- 概念
stream/iter模組- 來源 (Sources)
- 管線 (Pipelines)
- 推送串流 (Push streams)
- 雙向通道 (Duplex channels)
- 消費者 (Consumers)
- 公用工具
- 多消費者 (Multi-consumer)
- 壓縮與解壓縮轉換
- 協定符號 (Protocol symbols)
- 可迭代串流 (Iterable Streams)
- 索引
- 關於此說明文件
- 用法與範例
- 斷言測試
- 非同步內容追蹤
- Async hooks
- Buffer
- C++ 擴充套件
- 使用 Node-API 的 C/C++ 擴充套件
- C++ 嵌入器 API
- 子程序
- 叢集
- 命令列選項
- Console
- Crypto
- 除錯器
- 棄用的 API
- Diagnostics Channel
- DNS
- 網域 (Domain)
- 環境變數
- 錯誤
- 事件
- 檔案系統
- 全域變數
- HTTP
- HTTP/2
- HTTPS
- 檢查器
- 國際化
- 模組:CommonJS 模組
- 模組:ECMAScript 模組
- 模組:
node:moduleAPI - 模組:套件
- 模組:TypeScript
- Net
- Iterable Streams API
- OS
- Path
- 效能勾子 (Performance hooks)
- 權限
- 程序
- Punycode
- 查詢字串
- Readline
- REPL
- 報告
- 單一可執行應用程式
- SQLite
- Stream
- 字串解碼器
- 測試執行器
- 計時器
- TLS/SSL
- 追蹤事件
- TTY
- UDP/資料報
- URL
- 公用工具
- V8
- VM
- WASI
- Web Crypto API
- Web Streams API
- 工作執行緒
- Zlib
- Zlib 可反覆運算壓縮
- 其他版本
- 選項
可迭代串流 (Iterable Streams)#
穩定性:1 - 實驗性
node:stream/iter 模組提供了一種基於可迭代物件 (iterables) 的串流 API,而非基於事件驅動的 Readable/Writable/Transform 類別階層,也非 Web Streams 的 ReadableStream/WritableStream/TransformStream 介面。
此模組僅在啟用 --experimental-stream-iter CLI 旗標時可用。
串流表示為 AsyncIterable<Uint8Array[]>(非同步)或 Iterable<Uint8Array[]>(同步)。沒有需要繼承的基底類別——任何實作可迭代協定的物件皆可參與。轉換則是單純的函式或具有 transform 方法的物件。
資料以批次 (batches)(每次迭代為 Uint8Array[])流動,以攤提非同步操作的成本。
import { from, pull, text } from 'node:stream/iter'; import { compressGzip, decompressGzip } from 'node:zlib/iter'; // Compress and decompress a string const compressed = pull(from('Hello, world!'), compressGzip()); const result = await text(pull(compressed, decompressGzip())); console.log(result); // 'Hello, world!'const { from, pull, text } = require('node:stream/iter'); const { compressGzip, decompressGzip } = require('node:zlib/iter'); async function run() { // Compress and decompress a string const compressed = pull(from('Hello, world!'), compressGzip()); const result = await text(pull(compressed, decompressGzip())); console.log(result); // 'Hello, world!' } run().catch(console.error);
import { open } from 'node:fs/promises'; import { text, pipeTo } from 'node:stream/iter'; import { compressGzip, decompressGzip } from 'node:zlib/iter'; // Read a file, compress, write to another file const src = await open('input.txt', 'r'); const dst = await open('output.gz', 'w'); await pipeTo(src.pull(), compressGzip(), dst.writer({ autoClose: true })); await src.close(); // Read it back const gz = await open('output.gz', 'r'); console.log(await text(gz.pull(decompressGzip(), { autoClose: true })));const { open } = require('node:fs/promises'); const { text, pipeTo } = require('node:stream/iter'); const { compressGzip, decompressGzip } = require('node:zlib/iter'); async function run() { // Read a file, compress, write to another file const src = await open('input.txt', 'r'); const dst = await open('output.gz', 'w'); await pipeTo(src.pull(), compressGzip(), dst.writer({ autoClose: true })); await src.close(); // Read it back const gz = await open('output.gz', 'r'); console.log(await text(gz.pull(decompressGzip(), { autoClose: true }))); } run().catch(console.error);
概念#
位元組串流#
此 API 中的所有資料皆表示為 Uint8Array 位元組。當字串傳遞給 from()、push() 或 pipeTo() 時,會自動進行 UTF-8 編碼。這消除了編碼方面的歧義,並實現了串流與原生程式碼之間的零複製傳輸。
批次處理#
每次迭代會產出一個批次 (batch)——即 Uint8Array 區塊的陣列 (Uint8Array[])。批次處理將 await 和 Promise 建立的成本攤提至多個區塊中。若消費者一次處理一個區塊,只需迭代內部陣列即可。
for await (const batch of source) { for (const chunk of batch) { handle(chunk); } }async function run() { for await (const batch of source) { for (const chunk of batch) { handle(chunk); } } }
轉換#
轉換分為兩種形式:
-
無狀態 (Stateless) —— 函式
(chunks, options) => result,每個批次呼叫一次。接收Uint8Array[](或作為清除訊號的null)與options物件。回傳Uint8Array[]、null或區塊的可迭代物件。 -
有狀態 (Stateful) —— 物件
{ transform(source, options) },其中transform是一個產生器(同步或非同步),接收整個上游可迭代物件與options物件,並產出輸出。此形式用於壓縮、加密以及任何需要跨批次緩衝的轉換。
兩種形式都會接收包含以下屬性的 options 參數:
options.signal<AbortSignal>當管線取消、發生錯誤或消費者停止讀取時觸發的 AbortSignal。轉換可以檢查signal.aborted或監聽'abort'事件來執行早期清理。
清除訊號 (null) 會在來源結束後發送,讓轉換有機會發出結尾資料(例如壓縮頁尾)。
// Stateless: uppercase transform
const upper = (chunks) => {
if (chunks === null) return null; // flush
return chunks.map((c) => new TextEncoder().encode(
new TextDecoder().decode(c).toUpperCase(),
));
};
// Stateful: line splitter
const lines = {
transform: async function*(source) {
let partial = '';
for await (const chunks of source) {
if (chunks === null) {
if (partial) yield [new TextEncoder().encode(partial)];
continue;
}
for (const chunk of chunks) {
const str = partial + new TextDecoder().decode(chunk);
const parts = str.split('\n');
partial = parts.pop();
for (const line of parts) {
yield [new TextEncoder().encode(`${line}\n`)];
}
}
}
},
};
拉取與推送#
本 API 支援兩種模型:
-
拉取 (Pull) —— 資料按需流動。
pull()和pullSync()建立延遲載入的管線,僅在消費者迭代時才從來源讀取。 -
推送 (Push) —— 資料被明確寫入。
push()建立一個帶有反壓 (backpressure) 的寫入器/可讀取對。寫入器將資料推入;可讀取端作為非同步可迭代物件進行消費。
反壓 (Backpressure)#
拉取串流具有天然的反壓機制——消費者驅動步調,因此讀取速度永遠不會超過消費者的處理速度。推送串流則需要明確的反壓,因為生產者與消費者是獨立運作的。push()、broadcast() 和 share() 上的 highWaterMark 和 backpressure 選項控制此運作方式。
雙緩衝區模型#
推送串流使用兩部分的緩衝系統。將其想像成一個水桶(槽位),透過一條軟管(待處理寫入)注入水,並配有一個在水桶滿時關閉的浮球閥:
highWaterMark (e.g., 3)
|
Producer v
| +---------+
v | |
[ write() ] ----+ +--->| slots |---> Consumer pulls
[ write() ] | | | (bucket)| for await (...)
[ write() ] v | +---------+
+--------+ ^
| pending| |
| writes | float valve
| (hose) | (backpressure)
+--------+
^
|
'strict' mode limits this too!
-
槽位 (水桶) —— 準備好給消費者使用的資料,上限為
highWaterMark。當消費者拉取時,它會一次將所有槽位排空至單一批次中。 -
待處理寫入 (軟管) —— 等待槽位空間的寫入。在消費者排空後,待處理寫入會提升至現已清空的槽位中,其 Promise 隨之結算。
各策略如何使用這些緩衝區:
| 策略 | 槽位限制 | 待處理寫入限制 |
|---|---|---|
'strict' |
highWaterMark |
highWaterMark |
'block' |
highWaterMark |
無限制 |
'drop-oldest' |
highWaterMark |
不適用 (永不等待) |
'drop-newest' |
highWaterMark |
不適用 (永不等待) |
嚴格模式 (預設)#
嚴格模式會攔截「發後即忘」(fire-and-forget) 的模式,即生產者在未 await 的情況下呼叫 write(),這會導致無限制的記憶體增長。它將槽位緩衝區和待處理寫入佇列皆限制為 highWaterMark。
如果您正確地 await 每個寫入,則一次最多只能有一個待處理寫入,因此永遠不會觸發待處理寫入限制。未 await 的寫入會累積在待處理佇列中,並在溢出時拋出錯誤。
import { push, text } from 'node:stream/iter'; const { writer, readable } = push({ highWaterMark: 16 }); // Consumer must run concurrently -- without it, the first write // that fills the buffer blocks the producer forever. const consuming = text(readable); // GOOD: awaited writes. The producer waits for the consumer to // make room when the buffer is full. for (const item of dataset) { await writer.write(item); } await writer.end(); console.log(await consuming);const { push, text } = require('node:stream/iter'); async function run() { const { writer, readable } = push({ highWaterMark: 16 }); // Consumer must run concurrently -- without it, the first write // that fills the buffer blocks the producer forever. const consuming = text(readable); // GOOD: awaited writes. The producer waits for the consumer to // make room when the buffer is full. for (const item of dataset) { await writer.write(item); } await writer.end(); console.log(await consuming); } run().catch(console.error);
忘記 await 最終會導致拋出錯誤。
// BAD: fire-and-forget. Strict mode throws once both buffers fill.
for (const item of dataset) {
writer.write(item); // Not awaited -- queues without bound
}
// --> throws "Backpressure violation: too many pending writes"
阻塞 (Block)#
阻塞模式將槽位上限設為 highWaterMark,但對待處理寫入佇列沒有限制。已 await 的寫入會阻塞,直到消費者騰出空間,這與嚴格模式相同。區別在於未 await 的寫入會默默地永遠排隊,而不是拋出錯誤——如果生產者忘記 await,這可能導致記憶體洩漏。
這是現有 Node.js 傳統串流和 Web Streams 的預設模式。當您能控制生產者並確信其正確使用 await,或是從這些 API 遷移程式碼時使用此模式。
import { push, text } from 'node:stream/iter'; const { writer, readable } = push({ highWaterMark: 16, backpressure: 'block', }); const consuming = text(readable); // Safe -- awaited writes block until the consumer reads. for (const item of dataset) { await writer.write(item); } await writer.end(); console.log(await consuming);const { push, text } = require('node:stream/iter'); async function run() { const { writer, readable } = push({ highWaterMark: 16, backpressure: 'block', }); const consuming = text(readable); // Safe -- awaited writes block until the consumer reads. for (const item of dataset) { await writer.write(item); } await writer.end(); console.log(await consuming); } run().catch(console.error);
丟棄最舊 (Drop-oldest)#
寫入永不等待。當槽位緩衝區滿時,最舊的緩衝區塊會被移除,為傳入的寫入騰出空間。消費者永遠看到最新的資料。適用於即時饋送、遙測或任何過期資料價值低於最新資料的情況。
import { push } from 'node:stream/iter'; // Keep only the 5 most recent readings const { writer, readable } = push({ highWaterMark: 5, backpressure: 'drop-oldest', });const { push } = require('node:stream/iter'); // Keep only the 5 most recent readings const { writer, readable } = push({ highWaterMark: 5, backpressure: 'drop-oldest', });
丟棄最新 (Drop-newest)#
寫入永不等待。當槽位緩衝區滿時,傳入的寫入會被默默丟棄。消費者處理已緩衝的資料,而不會被新資料淹沒。適用於限流或在高壓下削減負載。
import { push } from 'node:stream/iter'; // Accept up to 10 buffered items; discard anything beyond that const { writer, readable } = push({ highWaterMark: 10, backpressure: 'drop-newest', });const { push } = require('node:stream/iter'); // Accept up to 10 buffered items; discard anything beyond that const { writer, readable } = push({ highWaterMark: 10, backpressure: 'drop-newest', });
寫入器介面#
寫入器是任何符合寫入器介面的物件。僅 write() 是必須的;所有其他方法皆為選擇性。
每個非同步方法都有一個設計用於 try-fallback 模式的同步 *Sync 對應版本:先嘗試快速的同步路徑,僅在同步呼叫指示無法完成時才退回到非同步版本。
if (!writer.writeSync(chunk)) await writer.write(chunk);
if (!writer.writevSync(chunks)) await writer.writev(chunks);
if (writer.endSync() < 0) await writer.end();
writer.fail(err); // Always synchronous, no fallback needed
writer.desiredSize#
達到高水位線之前可用的緩衝槽位數量。若寫入器已關閉或消費者已斷開連線,回傳 null。
該值永遠為非負數。
writer.end([options])#
options<Object>signal<AbortSignal>僅取消此操作。該訊號僅取消待處理的end()呼叫;不會使寫入器本身失效。
- 回傳:{Promise
} 寫入的總位元組數。
訊號通知不會再有資料寫入。
writer.endSync()#
- 回傳:
<number>寫入的總位元組數,若寫入器未開啟則為-1。
writer.end() 的同步版本。若寫入器已關閉或發生錯誤,回傳 -1。可用於 try-fallback 模式。
const result = writer.endSync();
if (result < 0) {
writer.end();
}
writer.fail(reason)#
reason<any>
將寫入器置於終止錯誤狀態。若寫入器已關閉或發生錯誤,則此操作為空操作 (no-op)。與 write() 和 end() 不同,fail() 是絕對同步的,因為使寫入器失效是一個純粹的狀態轉換,無需執行任何非同步工作。
writer.write(chunk[, options])#
chunk<Uint8Array>|<string>options<Object>signal<AbortSignal>僅取消此寫入操作。該訊號僅取消待處理的write()呼叫;不會使寫入器本身失效。
- 回傳:{Promise
}
寫入區塊。當緩衝區空間可用時,Promise 會結算。
writer.writeSync(chunk)#
chunk<Uint8Array>|<string>- 回傳:
<boolean>寫入被接受則回傳true,若緩衝區已滿則回傳false。
同步寫入。不會阻塞;若反壓機制處於活動狀態,回傳 false。
writer.writev(chunks[, options])#
chunks<Uint8Array[]>|<string[]>options<Object>signal<AbortSignal>僅取消此寫入操作。該訊號僅取消待處理的writev()呼叫;不會使寫入器本身失效。
- 回傳:{Promise
}
將多個區塊作為單一批次寫入。
writer.writevSync(chunks)#
chunks<Uint8Array[]>|<string[]>- 回傳:
<boolean>寫入被接受則回傳true,若緩衝區已滿則回傳false。
同步批次寫入。
stream/iter 模組#
所有函式皆可透過具名匯出或作為 Stream 命名空間物件的屬性使用。
// Named exports import { from, pull, bytes, Stream } from 'node:stream/iter'; // Namespace access Stream.from('hello');// Named exports const { from, pull, bytes, Stream } = require('node:stream/iter'); // Namespace access Stream.from('hello');
在模組識別碼前加上 node: 前綴是可選的。
來源#
from(input)#
input<string>|<ArrayBuffer>|<ArrayBufferView>|<Iterable>|<AsyncIterable>|<Object>不可為null或undefined。- 回傳:{AsyncIterable<Uint8Array[]>}
從給定輸入建立非同步位元組串流。字串會進行 UTF-8 編碼。ArrayBuffer 和 ArrayBufferView 的值會被包裝為 Uint8Array。陣列和可迭代物件會被遞迴展平並正規化。
實作 Symbol.for('Stream.toAsyncStreamable') 或 Symbol.for('Stream.toStreamable') 的物件會透過這些協定進行轉換。toAsyncStreamable 協定優先於 toStreamable,後者又優先於迭代協定 (Symbol.asyncIterator, Symbol.iterator)。
import { Buffer } from 'node:buffer'; import { from, text } from 'node:stream/iter'; console.log(await text(from('hello'))); // 'hello' console.log(await text(from(Buffer.from('hello')))); // 'hello'const { Buffer } = require('node:buffer'); const { from, text } = require('node:stream/iter'); async function run() { console.log(await text(from('hello'))); // 'hello' console.log(await text(from(Buffer.from('hello')))); // 'hello' } run().catch(console.error);
fromSync(input)#
input<string>|<ArrayBuffer>|<ArrayBufferView>|<Iterable>|<Object>不可為null或undefined。- 回傳:{Iterable<Uint8Array[]>}
from() 的同步版本。回傳同步可迭代物件。無法接受非同步可迭代物件或 Promise。實作 Symbol.for('Stream.toStreamable') 的物件會透過該協定進行轉換(優先於 Symbol.iterator)。toAsyncStreamable 協定則會被完全忽略。
import { fromSync, textSync } from 'node:stream/iter'; console.log(textSync(fromSync('hello'))); // 'hello'const { fromSync, textSync } = require('node:stream/iter'); console.log(textSync(fromSync('hello'))); // 'hello'
管線#
pipeTo(source[, ...transforms], writer[, options])#
source<AsyncIterable>|<Iterable>資料來源。...transforms<Function>|<Object>零個或多個要應用的轉換。writer<Object>具有write(chunk)方法的目標。options<Object>signal<AbortSignal>中止管線。preventClose<boolean>若為true,當來源結束時不呼叫writer.end()。預設:false。preventFail<boolean>若為true,發生錯誤時不呼叫writer.fail()。預設:false。
- 回傳:{Promise
} 寫入的總位元組數。
透過轉換將來源導向寫入器。若寫入器具有 writev(chunks) 方法,整個批次將在單一呼叫中傳遞(啟用 scatter/gather I/O)。
若寫入器實作了選擇性的 *Sync 方法 (writeSync, writevSync, endSync),pipeTo() 會嘗試優先使用同步方法作為快速路徑,僅在同步方法指示無法完成(例如反壓或等待下一個 tick)時才退回到非同步版本。fail() 永遠同步呼叫。
import { from, pipeTo } from 'node:stream/iter'; import { compressGzip } from 'node:zlib/iter'; import { open } from 'node:fs/promises'; const fh = await open('output.gz', 'w'); const totalBytes = await pipeTo( from('Hello, world!'), compressGzip(), fh.writer({ autoClose: true }), );const { from, pipeTo } = require('node:stream/iter'); const { compressGzip } = require('node:zlib/iter'); const { open } = require('node:fs/promises'); async function run() { const fh = await open('output.gz', 'w'); const totalBytes = await pipeTo( from('Hello, world!'), compressGzip(), fh.writer({ autoClose: true }), ); } run().catch(console.error);
pipeToSync(source[, ...transforms], writer[, options])#
source<Iterable>同步資料來源。...transforms<Function>|<Object>零個或多個同步轉換。writer<Object>具有write(chunk)方法的目標。options<Object>- 回傳:
<number>寫入的總位元組數。
pipeTo() 的同步版本。source、所有轉換和 writer 必須是同步的。無法接受非同步可迭代物件或 Promise。
writer 必須具有 *Sync 方法 (writeSync, writevSync, endSync) 以及 fail() 才能運作。
pull(source[, ...transforms][, options])#
source<AsyncIterable>|<Iterable>資料來源。...transforms<Function>|<Object>零個或多個要應用的轉換。options<Object>signal<AbortSignal>中止管線。
- 回傳:{AsyncIterable<Uint8Array[]>}
建立延遲載入的非同步管線。在回傳的可迭代物件被消費前,不會從 source 讀取資料。轉換會按順序應用。
import { from, pull, text } from 'node:stream/iter'; const asciiUpper = (chunks) => { if (chunks === null) return null; return chunks.map((c) => { for (let i = 0; i < c.length; i++) { c[i] -= (c[i] >= 97 && c[i] <= 122) * 32; } return c; }); }; const result = pull(from('hello'), asciiUpper); console.log(await text(result)); // 'HELLO'const { from, pull, text } = require('node:stream/iter'); const asciiUpper = (chunks) => { if (chunks === null) return null; return chunks.map((c) => { for (let i = 0; i < c.length; i++) { c[i] -= (c[i] >= 97 && c[i] <= 122) * 32; } return c; }); }; async function run() { const result = pull(from('hello'), asciiUpper); console.log(await text(result)); // 'HELLO' } run().catch(console.error);
使用 AbortSignal
import { pull } from 'node:stream/iter'; const ac = new AbortController(); const result = pull(source, transform, { signal: ac.signal }); ac.abort(); // Pipeline throws AbortError on next iterationconst { pull } = require('node:stream/iter'); const ac = new AbortController(); const result = pull(source, transform, { signal: ac.signal }); ac.abort(); // Pipeline throws AbortError on next iteration
pullSync(source[, ...transforms])#
source<Iterable>同步資料來源。...transforms<Function>|<Object>零個或多個同步轉換。- 回傳:{Iterable<Uint8Array[]>}
pull() 的同步版本。所有轉換必須是同步的。
推送串流#
push([...transforms][, options])#
...transforms<Function>|<Object>應用於可讀取端的選擇性轉換。options<Object>highWaterMark<number>在應用反壓之前最大緩衝槽位數。必須大於或等於 1;小於 1 的值會被截斷為 1。預設:4。backpressure<string>反壓策略:'strict','block','drop-oldest'或'drop-newest'。預設:'strict'。signal<AbortSignal>中止串流。
- 傳回:
<Object>writer{PushWriter} 寫入器端。readable{AsyncIterable<Uint8Array[]>} 可讀取端。
建立帶有反壓的推送串流。寫入器將資料推入;可讀取端作為非同步可迭代物件進行消費。
import { push, text } from 'node:stream/iter'; const { writer, readable } = push(); // Producer and consumer must run concurrently. With strict backpressure // (the default), awaited writes block until the consumer reads. const producing = (async () => { await writer.write('hello'); await writer.write(' world'); await writer.end(); })(); console.log(await text(readable)); // 'hello world' await producing;const { push, text } = require('node:stream/iter'); async function run() { const { writer, readable } = push(); // Producer and consumer must run concurrently. With strict backpressure // (the default), awaited writes block until the consumer reads. const producing = (async () => { await writer.write('hello'); await writer.write(' world'); await writer.end(); })(); console.log(await text(readable)); // 'hello world' await producing; } run().catch(console.error);
push() 回傳的寫入器符合 [Writer 介面][]。
雙向通道#
duplex([options])#
建立一對連接的雙向通道以進行雙向通訊,類似於 socketpair()。寫入一個通道寫入器的資料會出現在另一個通道的可讀取端。
每個通道具有:
writer— 用於向對端發送資料的 [Writer 介面][] 物件。readable— 用於從對端讀取資料的AsyncIterable<Uint8Array[]>。close()— 關閉通道此端(冪等)。[Symbol.asyncDispose]()— 用於await using的非同步處置支援。
import { duplex, text } from 'node:stream/iter'; const [client, server] = duplex(); // Server echoes back const serving = (async () => { for await (const chunks of server.readable) { await server.writer.writev(chunks); } })(); await client.writer.write('hello'); await client.writer.end(); console.log(await text(server.readable)); // handled by echo await serving;const { duplex, text } = require('node:stream/iter'); async function run() { const [client, server] = duplex(); // Server echoes back const serving = (async () => { for await (const chunks of server.readable) { await server.writer.writev(chunks); } })(); await client.writer.write('hello'); await client.writer.end(); console.log(await text(server.readable)); // handled by echo await serving; } run().catch(console.error);
消費者#
array(source[, options])#
source{AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>}options<Object>signal<AbortSignal>limit<number>最大消費位元組數。若收集的總位元組超過限制,會拋出ERR_OUT_OF_RANGE錯誤。
- 回傳:{Promise<Uint8Array[]>}
將所有區塊收集為 Uint8Array 值的陣列(不進行串接)。
arrayBuffer(source[, options])#
source{AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>}options<Object>signal<AbortSignal>limit<number>最大消費位元組數。若收集的總位元組超過限制,會拋出ERR_OUT_OF_RANGE錯誤。
- 回傳:{Promise
}
將所有位元組收集到一個 ArrayBuffer 中。
arrayBufferSync(source[, options])#
source{Iterable<Uint8Array[]>}options<Object>limit<number>最大消費位元組數。若收集的總位元組超過限制,會拋出ERR_OUT_OF_RANGE錯誤。
- 回傳:
<ArrayBuffer>
arrayBuffer() 的同步版本。
arraySync(source[, options])#
source{Iterable<Uint8Array[]>}options<Object>limit<number>最大消費位元組數。若收集的總位元組超過限制,會拋出ERR_OUT_OF_RANGE錯誤。
- 回傳:
<Uint8Array[]>
array() 的同步版本。
bytes(source[, options])#
source{AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>}options<Object>signal<AbortSignal>limit<number>最大消費位元組數。若收集的總位元組超過限制,會拋出ERR_OUT_OF_RANGE錯誤。
- 回傳:{Promise
}
將串流中的所有位元組收集為單一 Uint8Array。
import { from, bytes } from 'node:stream/iter'; const data = await bytes(from('hello')); console.log(data); // Uint8Array(5) [ 104, 101, 108, 108, 111 ]const { from, bytes } = require('node:stream/iter'); async function run() { const data = await bytes(from('hello')); console.log(data); // Uint8Array(5) [ 104, 101, 108, 108, 111 ] } run().catch(console.error);
bytesSync(source[, options])#
source{Iterable<Uint8Array[]>}options<Object>limit<number>最大消費位元組數。若收集的總位元組超過限制,會拋出ERR_OUT_OF_RANGE錯誤。
- 回傳:
<Uint8Array>
bytes() 的同步版本。
text(source[, options])#
source{AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>}options<Object>encoding<string>文字編碼。預設:'utf-8'。signal<AbortSignal>limit<number>最大消費位元組數。若收集的總位元組超過限制,會拋出ERR_OUT_OF_RANGE錯誤。
- 回傳:{Promise
}
收集所有位元組並解碼為文字。
import { from, text } from 'node:stream/iter'; console.log(await text(from('hello'))); // 'hello'const { from, text } = require('node:stream/iter'); async function run() { console.log(await text(from('hello'))); // 'hello' } run().catch(console.error);
textSync(source[, options])#
text() 的同步版本。
公用程式#
ondrain(drainable)#
drainable<Object>實作 drainable 協定的物件。- 回傳:{Promise
|null}
等待 drainable 寫入器的反壓解除。當寫入器可以接收更多資料時,回傳結算為 true 的 Promise;若物件未實作 drainable 協定,則回傳 null。
import { push, ondrain, text } from 'node:stream/iter'; const { writer, readable } = push({ highWaterMark: 2 }); writer.writeSync('a'); writer.writeSync('b'); // Start consuming so the buffer can actually drain const consuming = text(readable); // Buffer is full -- wait for drain const canWrite = await ondrain(writer); if (canWrite) { await writer.write('c'); } await writer.end(); await consuming;const { push, ondrain, text } = require('node:stream/iter'); async function run() { const { writer, readable } = push({ highWaterMark: 2 }); writer.writeSync('a'); writer.writeSync('b'); // Start consuming so the buffer can actually drain const consuming = text(readable); // Buffer is full -- wait for drain const canWrite = await ondrain(writer); if (canWrite) { await writer.write('c'); } await writer.end(); await consuming; } run().catch(console.error);
merge(...sources[, options])#
...sources{AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>} 兩個或多個可迭代物件。options<Object>signal<AbortSignal>
- 回傳:{AsyncIterable<Uint8Array[]>}
透過依時間順序產出批次來合併多個非同步可迭代物件(哪一個來源先產生資料,就先產出哪一個)。所有來源皆同時被消費。
import { from, merge, text } from 'node:stream/iter'; const merged = merge(from('hello '), from('world')); console.log(await text(merged)); // Order depends on timingconst { from, merge, text } = require('node:stream/iter'); async function run() { const merged = merge(from('hello '), from('world')); console.log(await text(merged)); // Order depends on timing } run().catch(console.error);
tap(callback)#
callback<Function>(chunks) => void每個批次呼叫一次。- 回傳:
<Function>一個無狀態轉換。
建立一個觀察批次而不修改它們的直通轉換 (pass-through transform)。對於日誌記錄、指標收集或除錯非常有用。
import { from, pull, text, tap } from 'node:stream/iter'; const result = pull( from('hello'), tap((chunks) => console.log('Batch size:', chunks.length)), ); console.log(await text(result));const { from, pull, text, tap } = require('node:stream/iter'); async function run() { const result = pull( from('hello'), tap((chunks) => console.log('Batch size:', chunks.length)), ); console.log(await text(result)); } run().catch(console.error);
tap() 刻意不防止回呼函數就地修改區塊,但回傳值會被忽略。
tapSync(callback)#
callback<Function>- 回傳:
<Function>
tap() 的同步版本。
多消費者#
broadcast([options])#
options<Object>highWaterMark<number>槽位中的緩衝區大小。必須大於或等於 1;小於 1 的值會被截斷為 1。預設:16。backpressure<string>'strict','block','drop-oldest'或'drop-newest'。預設:'strict'。signal<AbortSignal>
- 傳回:
<Object>writer{BroadcastWriter}broadcast{Broadcast}
建立推送模型的多消費者廣播通道。單一寫入器將資料推送到多個消費者。每個消費者都有一個指向共用緩衝區的獨立游標。
import { broadcast, text } from 'node:stream/iter'; const { writer, broadcast: bc } = broadcast(); // Create consumers before writing const c1 = bc.push(); // Consumer 1 const c2 = bc.push(); // Consumer 2 // Producer and consumers must run concurrently. Awaited writes // block when the buffer fills until consumers read. const producing = (async () => { await writer.write('hello'); await writer.end(); })(); const [r1, r2] = await Promise.all([text(c1), text(c2)]); console.log(r1); // 'hello' console.log(r2); // 'hello' await producing;const { broadcast, text } = require('node:stream/iter'); async function run() { const { writer, broadcast: bc } = broadcast(); // Create consumers before writing const c1 = bc.push(); // Consumer 1 const c2 = bc.push(); // Consumer 2 // Producer and consumers must run concurrently. Awaited writes // block when the buffer fills until consumers read. const producing = (async () => { await writer.write('hello'); await writer.end(); })(); const [r1, r2] = await Promise.all([text(c1), text(c2)]); console.log(r1); // 'hello' console.log(r2); // 'hello' await producing; } run().catch(console.error);
broadcast.bufferSize#
目前緩衝的區塊數。
broadcast.cancel([reason])#
reason<Error>
取消廣播。所有消費者皆會收到錯誤。
broadcast.consumerCount#
活動消費者數量。
broadcast.push([...transforms][, options])#
...transforms<Function>|<Object>options<Object>signal<AbortSignal>
- 回傳:{AsyncIterable<Uint8Array[]>}
建立新消費者。每個消費者接收從訂閱點開始寫入廣播的所有資料。選擇性轉換會應用於該消費者看到的資料視圖。
broadcast[Symbol.dispose]()#
broadcast.cancel() 的別名。
Broadcast.from(input[, options])#
input<AsyncIterable>|<Iterable>options<Object>同broadcast()。- 回傳:
<Object>{ writer, broadcast }
從現有來源建立 {Broadcast}。來源會被自動消費並推送給所有訂閱者。
share(source[, options])#
source<AsyncIterable>要共用的來源。options<Object>- 回傳:{Share}
建立拉取模型的多消費者共用串流。與 broadcast() 不同,來源僅在消費者拉取時才讀取。多個消費者共用單一緩衝區。
import { from, share, text } from 'node:stream/iter'; const shared = share(from('hello')); const c1 = shared.pull(); const c2 = shared.pull(); // Consume concurrently to avoid deadlock with small buffers. const [r1, r2] = await Promise.all([text(c1), text(c2)]); console.log(r1); // 'hello' console.log(r2); // 'hello'const { from, share, text } = require('node:stream/iter'); async function run() { const shared = share(from('hello')); const c1 = shared.pull(); const c2 = shared.pull(); // Consume concurrently to avoid deadlock with small buffers. const [r1, r2] = await Promise.all([text(c1), text(c2)]); console.log(r1); // 'hello' console.log(r2); // 'hello' } run().catch(console.error);
share.bufferSize#
目前緩衝的區塊數。
share.cancel([reason])#
reason<Error>
取消共用。所有消費者皆會收到錯誤。
share.consumerCount#
活動消費者數量。
share.pull([...transforms][, options])#
...transforms<Function>|<Object>options<Object>signal<AbortSignal>
- 回傳:{AsyncIterable<Uint8Array[]>}
建立共用來源的新消費者。
share[Symbol.dispose]()#
share.cancel() 的別名。
Share.from(input[, options])#
input<AsyncIterable>options<Object>同share()。- 回傳:{Share}
從現有來源建立 {Share}。
shareSync(source[, options])#
source<Iterable>要共用的同步來源。options<Object>- 回傳:{SyncShare}
share() 的同步版本。
SyncShare.fromSync(input[, options])#
input<Iterable>options<Object>- 回傳:{SyncShare}
壓縮與解壓縮轉換#
用於 pull()、pullSync()、pipeTo() 和 pipeToSync() 的壓縮與解壓縮轉換可透過 node:zlib/iter 模組取得。詳細資訊請參閱 node:zlib/iter 文件。
協定符號#
這些著名的符號允許第三方物件參與串流協定,而無需直接從 node:stream/iter 匯入。
Stream.broadcastProtocol#
- 值:
Symbol.for('Stream.broadcastProtocol')
該值必須是一個函式。當被 Broadcast.from() 呼叫時,它會接收傳遞給 Broadcast.from() 的選項,且必須回傳符合 {Broadcast} 介面的物件。實作是完全自訂的——它可以隨心所欲地管理消費者、緩衝和反壓。
import { Broadcast, text } from 'node:stream/iter'; // This example defers to the built-in Broadcast, but a custom // implementation could use any mechanism. class MessageBus { #broadcast; #writer; constructor() { const { writer, broadcast } = Broadcast(); this.#writer = writer; this.#broadcast = broadcast; } [Symbol.for('Stream.broadcastProtocol')](options) { return this.#broadcast; } send(data) { this.#writer.write(new TextEncoder().encode(data)); } close() { this.#writer.end(); } } const bus = new MessageBus(); const { broadcast } = Broadcast.from(bus); const consumer = broadcast.push(); bus.send('hello'); bus.close(); console.log(await text(consumer)); // 'hello'const { Broadcast, text } = require('node:stream/iter'); // This example defers to the built-in Broadcast, but a custom // implementation could use any mechanism. class MessageBus { #broadcast; #writer; constructor() { const { writer, broadcast } = Broadcast(); this.#writer = writer; this.#broadcast = broadcast; } [Symbol.for('Stream.broadcastProtocol')](options) { return this.#broadcast; } send(data) { this.#writer.write(new TextEncoder().encode(data)); } close() { this.#writer.end(); } } const bus = new MessageBus(); const { broadcast } = Broadcast.from(bus); const consumer = broadcast.push(); bus.send('hello'); bus.close(); text(consumer).then(console.log); // 'hello'
Stream.drainableProtocol#
- 值:
Symbol.for('Stream.drainableProtocol')
實作此以使寫入器與 ondrain() 相容。該方法應回傳一個在反壓解除時結算的 Promise,若無反壓則回傳 null。
import { ondrain } from 'node:stream/iter'; class CustomWriter { #queue = []; #drain = null; #closed = false; [Symbol.for('Stream.drainableProtocol')]() { if (this.#closed) return null; if (this.#queue.length < 3) return Promise.resolve(true); this.#drain ??= Promise.withResolvers(); return this.#drain.promise; } write(chunk) { this.#queue.push(chunk); } flush() { this.#queue.length = 0; this.#drain?.resolve(true); this.#drain = null; } close() { this.#closed = true; } } const writer = new CustomWriter(); const ready = ondrain(writer); console.log(ready); // Promise { true } -- no backpressureconst { ondrain } = require('node:stream/iter'); class CustomWriter { #queue = []; #drain = null; #closed = false; [Symbol.for('Stream.drainableProtocol')]() { if (this.#closed) return null; if (this.#queue.length < 3) return Promise.resolve(true); this.#drain ??= Promise.withResolvers(); return this.#drain.promise; } write(chunk) { this.#queue.push(chunk); } flush() { this.#queue.length = 0; this.#drain?.resolve(true); this.#drain = null; } close() { this.#closed = true; } } const writer = new CustomWriter(); const ready = ondrain(writer); console.log(ready); // Promise { true } -- no backpressure
Stream.shareProtocol#
- 值:
Symbol.for('Stream.shareProtocol')
該值必須是一個函式。當被 Share.from() 呼叫時,它會接收傳遞給 Share.from() 的選項,且必須回傳符合 {Share} 介面的物件。實作是完全自訂的——它可以隨心所欲地管理共用來源、消費者、緩衝和反壓。
import { share, Share, text } from 'node:stream/iter'; // This example defers to the built-in share(), but a custom // implementation could use any mechanism. class DataPool { #share; constructor(source) { this.#share = share(source); } [Symbol.for('Stream.shareProtocol')](options) { return this.#share; } } const pool = new DataPool( (async function* () { yield 'hello'; })(), ); const shared = Share.from(pool); const consumer = shared.pull(); console.log(await text(consumer)); // 'hello'const { share, Share, text } = require('node:stream/iter'); // This example defers to the built-in share(), but a custom // implementation could use any mechanism. class DataPool { #share; constructor(source) { this.#share = share(source); } [Symbol.for('Stream.shareProtocol')](options) { return this.#share; } } const pool = new DataPool( (async function* () { yield 'hello'; })(), ); const shared = Share.from(pool); const consumer = shared.pull(); text(consumer).then(console.log); // 'hello'
Stream.shareSyncProtocol#
- 值:
Symbol.for('Stream.shareSyncProtocol')
該值必須是一個函式。當被 SyncShare.fromSync() 呼叫時,它會接收傳遞給 SyncShare.fromSync() 的選項,且必須回傳符合 {SyncShare} 介面的物件。實作是完全自訂的——它可以隨心所欲地管理共用來源、消費者和緩衝。
import { shareSync, SyncShare, textSync } from 'node:stream/iter'; // This example defers to the built-in shareSync(), but a custom // implementation could use any mechanism. class SyncDataPool { #share; constructor(source) { this.#share = shareSync(source); } [Symbol.for('Stream.shareSyncProtocol')](options) { return this.#share; } } const encoder = new TextEncoder(); const pool = new SyncDataPool( function* () { yield [encoder.encode('hello')]; }(), ); const shared = SyncShare.fromSync(pool); const consumer = shared.pull(); console.log(textSync(consumer)); // 'hello'const { shareSync, SyncShare, textSync } = require('node:stream/iter'); // This example defers to the built-in shareSync(), but a custom // implementation could use any mechanism. class SyncDataPool { #share; constructor(source) { this.#share = shareSync(source); } [Symbol.for('Stream.shareSyncProtocol')](options) { return this.#share; } } const encoder = new TextEncoder(); const pool = new SyncDataPool( function* () { yield [encoder.encode('hello')]; }(), ); const shared = SyncShare.fromSync(pool); const consumer = shared.pull(); console.log(textSync(consumer)); // 'hello'
Stream.toAsyncStreamable#
- 值:
Symbol.for('Stream.toAsyncStreamable')
該值必須是一個將物件轉換為可串流值的函式。當物件在串流管線中的任何位置出現時(例如作為傳遞給 from() 的來源,或作為轉換回傳的值),會呼叫此方法來產生實際資料。它可以回傳(或結算為)任何可串流的值:字串、Uint8Array、AsyncIterable、Iterable 或另一個可串流物件。
import { from, text } from 'node:stream/iter'; class Greeting { #name; constructor(name) { this.#name = name; } [Symbol.for('Stream.toAsyncStreamable')]() { return `hello ${this.#name}`; } } const stream = from(new Greeting('world')); console.log(await text(stream)); // 'hello world'const { from, text } = require('node:stream/iter'); class Greeting { #name; constructor(name) { this.#name = name; } [Symbol.for('Stream.toAsyncStreamable')]() { return `hello ${this.#name}`; } } const stream = from(new Greeting('world')); text(stream).then(console.log); // 'hello world'
Stream.toStreamable#
- 值:
Symbol.for('Stream.toStreamable')
該值必須是一個同步將物件轉換為可串流值的函式。當物件在串流管線中的任何位置出現時(例如作為傳遞給 fromSync() 的來源,或作為同步轉換回傳的值),會呼叫此方法來產生實際資料。它必須同步回傳一個可串流的值:字串、Uint8Array 或 Iterable。
import { fromSync, textSync } from 'node:stream/iter'; class Greeting { #name; constructor(name) { this.#name = name; } [Symbol.for('Stream.toStreamable')]() { return `hello ${this.#name}`; } } const stream = fromSync(new Greeting('world')); console.log(textSync(stream)); // 'hello world'const { fromSync, textSync } = require('node:stream/iter'); class Greeting { #name; constructor(name) { this.#name = name; } [Symbol.for('Stream.toStreamable')]() { return `hello ${this.#name}`; } } const stream = fromSync(new Greeting('world')); console.log(textSync(stream)); // 'hello world'