Node.js v26.0.0 說明文件
- Node.js v26.0.0
- 目錄
- Stream
- 本文件架構
- 串流類型
- 給串流使用者的 API
- 可寫串流 (Writable streams)
- 類別:
stream.Writable- 事件:
'close' - 事件:
'drain' - 事件:
'error' - 事件:
'finish' - 事件:
'pipe' - 事件:
'unpipe' writable.cork()writable.destroy([error])writable.closedwritable.destroyedwritable.end([chunk[, encoding]][, callback])writable.setDefaultEncoding(encoding)writable.uncork()writable.writablewritable.writableAbortedwritable.writableEndedwritable.writableCorkedwritable.erroredwritable.writableFinishedwritable.writableHighWaterMarkwritable.writableLengthwritable.writableNeedDrainwritable.writableObjectModewritable[Symbol.asyncDispose]()writable.write(chunk[, encoding][, callback])
- 事件:
- 類別:
- 可讀串流 (Readable streams)
- 兩種讀取模式
- 三種狀態
- 選擇一種 API 風格
- 類別:
stream.Readable- 事件:
'close' - 事件:
'data' - 事件:
'end' - 事件:
'error' - 事件:
'pause' - 事件:
'readable' - 事件:
'resume' readable.destroy([error])readable.closedreadable.destroyedreadable.isPaused()readable.pause()readable.pipe(destination[, options])readable.read([size])readable.readablereadable.readableAbortedreadable.readableDidReadreadable.readableEncodingreadable.readableEndedreadable.erroredreadable.readableFlowingreadable.readableHighWaterMarkreadable.readableLengthreadable.readableObjectModereadable.resume()readable.setEncoding(encoding)readable.unpipe([destination])readable.unshift(chunk[, encoding])readable.wrap(stream)readable[Symbol.asyncIterator]()readable[Symbol.asyncDispose]()readable.compose(stream[, options])readable.iterator([options])readable.map(fn[, options])readable.filter(fn[, options])readable.forEach(fn[, options])readable.toArray([options])readable.some(fn[, options])readable.find(fn[, options])readable.every(fn[, options])readable.flatMap(fn[, options])readable.drop(limit[, options])readable.take(limit[, options])readable.reduce(fn[, initial[, options]])
- 事件:
- 雙工與轉換串流 (Duplex and transform streams)
stream.finished(stream[, options], callback)stream.pipeline(source[, ...transforms], destination, callback)stream.pipeline(streams, callback)stream.compose(...streams)stream.isErrored(stream)stream.isReadable(stream)stream.isWritable(stream)stream.Readable.from(iterable[, options])stream.Readable.fromWeb(readableStream[, options])stream.Readable.isDisturbed(stream)stream.Readable.toWeb(streamReadable[, options])stream.Writable.fromWeb(writableStream[, options])stream.Writable.toWeb(streamWritable)stream.Duplex.from(src)stream.Duplex.fromWeb(pair[, options])stream.Duplex.toWeb(streamDuplex[, options])stream.addAbortSignal(signal, stream)stream.getDefaultHighWaterMark(objectMode)stream.setDefaultHighWaterMark(objectMode, value)
- 可寫串流 (Writable streams)
- 給串流實作者的 API
- 補充說明
- Stream
- 索引
- 關於此說明文件
- 用法與範例
- 斷言測試
- 非同步內容追蹤
- Async hooks
- Buffer
- C++ 擴充套件
- 使用 Node-API 的 C/C++ 擴充套件
- C++ 嵌入器 API
- 子程序
- 叢集
- 命令列選項
- Console
- Crypto
- 除錯器
- 棄用的 API
- Diagnostics Channel
- DNS
- 網域 (Domain)
- 環境變數
- 錯誤
- 事件
- 檔案系統
- 全域變數
- HTTP
- HTTP/2
- HTTPS
- 檢查器
- 國際化
- 模組:CommonJS 模組
- 模組:ECMAScript 模組
- 模組:
node:moduleAPI - 模組:套件
- 模組:TypeScript
- Net
- Iterable Streams API
- OS
- Path
- 效能勾子 (Performance hooks)
- 權限
- 程序
- Punycode
- 查詢字串
- Readline
- REPL
- 報告
- 單一可執行應用程式
- SQLite
- Stream
- 字串解碼器
- 測試執行器
- 計時器
- TLS/SSL
- 追蹤事件
- TTY
- UDP/資料報
- URL
- 公用工具
- V8
- VM
- WASI
- Web Crypto API
- Web Streams API
- 工作執行緒
- Zlib
- Zlib 可反覆運算壓縮
- 其他版本
- 選項
串流 (Stream)#
穩定度:2 - 穩定
串流 (stream) 是 Node.js 中處理串流資料的抽象介面。node:stream 模組提供了實作此串流介面的 API。
Node.js 提供了許多串流物件。例如,對 HTTP 伺服器的請求與 process.stdout 都是串流實例。
串流可以是可讀、可寫,或兩者兼具。所有串流都是 EventEmitter 的實例。
存取 node:stream 模組
const stream = require('node:stream');
node:stream 模組對於建立新類型的串流實例非常有用。通常不需要使用 node:stream 模組來消耗 (consume) 串流。
本文件架構#
本文件包含兩個主要章節與第三個補充說明章節。第一部分說明如何在應用程式中使用現有的串流。第二部分說明如何建立新類型的串流。
串流類型#
Node.js 中有四種基本的串流類型:
Writable:可寫入資料的串流(例如fs.createWriteStream())。Readable:可從中讀取資料的串流(例如fs.createReadStream())。Duplex:同時為Readable和Writable的串流(例如net.Socket)。Transform:在寫入和讀取時可以修改或轉換資料的Duplex串流(例如zlib.createDeflate())。
此外,此模組還包含公用函式 stream.duplexPair()、stream.pipeline()、stream.finished()、stream.Readable.from() 與 stream.addAbortSignal()。
串流 Promise API#
stream/promises API 為串流提供了一組替代的非同步公用函式,它們會回傳 Promise 物件而非使用回呼 (callback)。此 API 可透過 require('node:stream/promises') 或 require('node:stream').promises 存取。
stream.pipeline(streams[, options])#
stream.pipeline(source[, ...transforms], destination[, options])#
streams<Stream[]>|<Iterable[]>|<AsyncIterable[]>|<Function[]>source<Stream>|<Iterable>|<AsyncIterable>|<Function>- 回傳:
<Promise>|<AsyncIterable>
- 回傳:
...transforms<Stream>|<Function>source<AsyncIterable>- 回傳:
<Promise>|<AsyncIterable>
destination<Stream>|<Function>source<AsyncIterable>- 回傳:
<Promise>|<AsyncIterable>
options<Object>管道選項signal<AbortSignal>end<boolean>當來源串流結束時,是否結束目標串流。轉換串流 (Transform streams) 總是會結束,即使此值為false。預設值:true。
- 回傳:
<Promise>在管道完成時履行 (fulfill)。
const { pipeline } = require('node:stream/promises'); const fs = require('node:fs'); const zlib = require('node:zlib'); async function run() { await pipeline( fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), ); console.log('Pipeline succeeded.'); } run().catch(console.error);import { pipeline } from 'node:stream/promises'; import { createReadStream, createWriteStream } from 'node:fs'; import { createGzip } from 'node:zlib'; await pipeline( createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), ); console.log('Pipeline succeeded.');
若要使用 AbortSignal,請將其放入選項物件中作為最後一個參數傳遞。當信號被中止時,底層管道將被呼叫 destroy,並帶有 AbortError。
const { pipeline } = require('node:stream/promises'); const fs = require('node:fs'); const zlib = require('node:zlib'); async function run() { const ac = new AbortController(); const signal = ac.signal; setImmediate(() => ac.abort()); await pipeline( fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), { signal }, ); } run().catch(console.error); // AbortErrorimport { pipeline } from 'node:stream/promises'; import { createReadStream, createWriteStream } from 'node:fs'; import { createGzip } from 'node:zlib'; const ac = new AbortController(); const { signal } = ac; setImmediate(() => ac.abort()); try { await pipeline( createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal }, ); } catch (err) { console.error(err); // AbortError }
pipeline API 也支援非同步產生器:
const { pipeline } = require('node:stream/promises'); const fs = require('node:fs'); async function run() { await pipeline( fs.createReadStream('lowercase.txt'), async function* (source, { signal }) { source.setEncoding('utf8'); // Work with strings rather than `Buffer`s. for await (const chunk of source) { yield await processChunk(chunk, { signal }); } }, fs.createWriteStream('uppercase.txt'), ); console.log('Pipeline succeeded.'); } run().catch(console.error);import { pipeline } from 'node:stream/promises'; import { createReadStream, createWriteStream } from 'node:fs'; await pipeline( createReadStream('lowercase.txt'), async function* (source, { signal }) { source.setEncoding('utf8'); // Work with strings rather than `Buffer`s. for await (const chunk of source) { yield await processChunk(chunk, { signal }); } }, createWriteStream('uppercase.txt'), ); console.log('Pipeline succeeded.');
請記得處理傳入非同步產生器的 signal 參數。特別是在非同步產生器作為管道來源(即第一個參數)的情況下,否則管道可能永遠無法完成。
const { pipeline } = require('node:stream/promises'); const fs = require('node:fs'); async function run() { await pipeline( async function* ({ signal }) { await someLongRunningfn({ signal }); yield 'asd'; }, fs.createWriteStream('uppercase.txt'), ); console.log('Pipeline succeeded.'); } run().catch(console.error);import { pipeline } from 'node:stream/promises'; import fs from 'node:fs'; await pipeline( async function* ({ signal }) { await someLongRunningfn({ signal }); yield 'asd'; }, fs.createWriteStream('uppercase.txt'), ); console.log('Pipeline succeeded.');
pipeline API 亦提供 回呼版本。
stream.finished(stream[, options])#
stream<Stream>|<ReadableStream>|<WritableStream>一個可讀及/或可寫的串流/網頁串流 (webstream)。options<Object>error<boolean>|<undefined>readable<boolean>|<undefined>writable<boolean>|<undefined>signal<AbortSignal>|<undefined>cleanup<boolean>|<undefined>若為true,則在 promise 履行前移除由此函式註冊的接聽器。預設值:false。
- 回傳:
<Promise>當串流不再可讀或可寫時履行。
const { finished } = require('node:stream/promises'); const fs = require('node:fs'); const rs = fs.createReadStream('archive.tar'); async function run() { await finished(rs); console.log('Stream is done reading.'); } run().catch(console.error); rs.resume(); // Drain the stream.import { finished } from 'node:stream/promises'; import { createReadStream } from 'node:fs'; const rs = createReadStream('archive.tar'); async function run() { await finished(rs); console.log('Stream is done reading.'); } run().catch(console.error); rs.resume(); // Drain the stream.
finished API 亦提供 回呼版本。
在回傳的 promise 被解析 (resolved) 或拒絕 (rejected) 後,stream.finished() 會留下懸掛的事件接聽器(特別是 'error'、'end'、'finish' 與 'close')。這樣做是為了避免非預期的 'error' 事件(由於錯誤的串流實作)導致程式異常崩潰。如果不希望有此行為,應將 options.cleanup 設定為 true。
await finished(rs, { cleanup: true });
物件模式#
Node.js API 建立的所有串流僅操作字串、<Buffer>、<TypedArray> 與 <DataView> 物件。
Strings與Buffers是串流中最常用的類型。TypedArray與DataView讓您可以使用Int32Array或Uint8Array等類型處理二進位資料。當您將 TypedArray 或 DataView 寫入串流時,Node.js 會處理原始位元組。
然而,串流實作可以處理其他類型的 JavaScript 值(null 除外,它在串流中具有特殊用途)。此類串流被認為運作於「物件模式」。
在建立串流時使用 objectMode 選項可將串流實例切換到物件模式。嘗試將現有串流切換到物件模式是不安全的。
緩衝#
Writable 與 Readable 串流都會將資料儲存在內部緩衝區中。
潛在緩衝的資料量取決於傳入串流建構函式的 highWaterMark 選項。對於一般串流,highWaterMark 選項指定了位元組總數。對於在物件模式下運作的串流,highWaterMark 指定了物件總數。對於操作字串(但未解碼)的串流,highWaterMark 指定了 UTF-16 編碼單元的總數。
當實作呼叫 stream.push(chunk) 時,資料會緩存在 Readable 串流中。如果串流的使用者未呼叫 stream.read(),資料將留在內部隊列中直到被消耗。
一旦內部讀取緩衝區的總大小達到 highWaterMark 指定的閾值,串流將暫時停止從底層資源讀取資料,直到目前緩衝的資料可以被消耗(也就是說,串流將停止呼叫用於填充讀取緩衝區的內部 readable._read() 方法)。
當重複呼叫 writable.write(chunk) 方法時,資料會緩存在 Writable 串流中。當內部寫入緩衝區的總大小低於 highWaterMark 設定的閾值時,呼叫 writable.write() 將回傳 true。一旦內部緩衝區的大小達到或超過 highWaterMark,將回傳 false。
stream API 的一個關鍵目標,特別是 stream.pipe() 方法,是將資料緩衝限制在可接受的水平,使得速度不同的來源和目標不會耗盡可用記憶體。
highWaterMark 選項是一個閾值,而不是限制:它規定了串流在停止請求更多資料之前緩衝的資料量。它通常不強制執行嚴格的記憶體限制。特定的串流實作可以選擇強制執行更嚴格的限制,但這是選配的。
由於 Duplex 與 Transform 串流同時是 Readable 與 Writable,因此各別維護了*兩個*獨立的內部緩衝區用於讀取和寫入,允許各側獨立運作,同時保持適當且高效的資料流。例如,net.Socket 實例是 Duplex 串流,其 Readable 側允許消耗*從* socket 接收的資料,而其 Writable 側允許將資料寫入*至* socket。由於寫入 socket 的速度可能比接收資料的速度快或慢,因此各側應獨立運作(和緩衝)。
內部緩衝機制是內部實作細節,隨時可能更改。然而,對於某些進階實作,可以使用 writable.writableBuffer 或 readable.readableBuffer 取得內部緩衝區。不鼓勵使用這些未公開的屬性。
給串流使用者的 API#
幾乎所有的 Node.js 應用程式,無論多麼簡單,都會以某種方式使用串流。以下是在實作 HTTP 伺服器的 Node.js 應用程式中使用串流的範例:
const http = require('node:http');
const server = http.createServer((req, res) => {
// `req` is an http.IncomingMessage, which is a readable stream.
// `res` is an http.ServerResponse, which is a writable stream.
let body = '';
// Get the data as utf8 strings.
// If an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');
// Readable streams emit 'data' events once a listener is added.
req.on('data', (chunk) => {
body += chunk;
});
// The 'end' event indicates that the entire body has been received.
req.on('end', () => {
try {
const data = JSON.parse(body);
// Write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});
server.listen(1337);
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON
Writable 串流(如範例中的 res)公開了 write() 與 end() 等方法,用於將資料寫入串流。
Readable 串流使用 EventEmitter API 來通知應用程式程式碼何時可以從串流中讀取資料。這些可用資料可以透過多種方式從串流中讀取。
Writable 與 Readable 串流都以各種方式使用 EventEmitter API 來傳達串流的目前狀態。
Duplex 與 Transform 串流同時是 Writable 與 Readable。
正在將資料寫入串流或從串流消耗資料的應用程式不需要直接實作串流介面,通常也沒有理由呼叫 require('node:stream')。
希望實作新類型串流的開發人員應參考 給串流實作者的 API 章節。
可寫串流 (Writable streams)#
可寫串流是資料寫入之*目的地*的抽象化。
Writable 串流的例子包括:
- 用戶端上的 HTTP 請求
- 伺服器上的 HTTP 回應
- fs 寫入串流
- zlib 串流
- crypto 串流
- TCP sockets
- 子程序 stdin
process.stdout,process.stderr
其中一些範例實際上是實作了 Writable 介面的 Duplex 串流。
所有 Writable 串流都實作了由 stream.Writable 類別定義的介面。
雖然特定的 Writable 串流實例在各方面可能有所不同,但所有 Writable 串流都遵循相同的基本用法模式,如下例所示:
const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data');
類別:stream.Writable#
事件:'close'#
當串流及其任何底層資源(例如檔案描述符)已關閉時,會發出 'close' 事件。該事件表示將不再發出任何事件,也不會再進行任何計算。
如果 Writable 串流是在帶有 emitClose 選項的情況下建立的,則它始終會發出 'close' 事件。
事件:'drain'#
如果對 stream.write(chunk) 的呼叫回傳 false,則當適合恢復向串流寫入資料時,將發出 'drain' 事件。
// Write the data to the supplied writable stream one million times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000;
write();
function write() {
let ok = true;
do {
i--;
if (i === 0) {
// Last time!
writer.write(data, encoding, callback);
} else {
// See if we should continue, or wait.
// Don't pass the callback, because we're not done yet.
ok = writer.write(data, encoding);
}
} while (i > 0 && ok);
if (i > 0) {
// Had to stop early!
// Write some more once it drains.
writer.once('drain', write);
}
}
}
事件:'error'#
- 類型:
<Error>
如果在寫入或導流資料時發生錯誤,則會發出 'error' 事件。呼叫時會向接聽器回呼傳遞單個 Error 參數。
除非在建立串流時將 autoDestroy 選項設定為 false,否則發出 'error' 事件時串流會關閉。
在 'error' 之後,除了 'close' 之外*不應*再發出任何其他事件(包括 'error' 事件)。
事件:'finish'#
在呼叫 stream.end() 方法且所有資料都已排空 (flushed) 到底層系統後,會發出 'finish' 事件。
const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`);
}
writer.on('finish', () => {
console.log('All writes are now complete.');
});
writer.end('This is the end\n');
事件:'pipe'#
src<stream.Readable>正在導流到此可寫串流的來源串流
當在可讀串流上呼叫 stream.pipe() 方法並將此可寫串流新增到其目標集合時,會發出 'pipe' 事件。
const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
console.log('Something is piping into the writer.');
assert.equal(src, reader);
});
reader.pipe(writer);
事件:'unpipe'#
src<stream.Readable>對此可寫串流執行了 unpipe 的來源串流
當在 Readable 串流上呼叫 stream.unpipe() 方法,將此 Writable 從其目標集合中移除時,會發出 'unpipe' 事件。
當 Readable 串流導流到此 Writable 串流但後者發出錯誤時,也會發出此事件。
const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
console.log('Something has stopped piping into the writer.');
assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);
writable.cork()#
writable.cork() 方法強制將所有寫入的資料緩存在記憶體中。當呼叫 stream.uncork() 或 stream.end() 方法時,緩存的資料將被排空 (flushed)。
writable.cork() 的主要目的是應對在極短時間內連續向串流寫入多個小區塊的情況。writable.cork() 不會立即將它們轉發到底層目的地,而是緩存所有區塊,直到呼叫 writable.uncork(),這會將它們全部傳遞給 writable._writev() (如果存在)。這可以防止在等待處理第一個小區塊時發生隊頭阻塞 (head-of-line blocking)。但是,在未實作 writable._writev() 的情況下使用 writable.cork() 可能會對吞吐量產生不利影響。
writable.destroy([error])#
銷毀串流。選配地發出 'error' 事件,並發出 'close' 事件(除非 emitClose 設定為 false)。在此呼叫之後,可寫串流已結束,隨後對 write() 或 end() 的呼叫將導致 ERR_STREAM_DESTROYED 錯誤。這是一種破壞性且立即銷毀串流的方法。之前的 write() 呼叫可能尚未排空,且可能觸發 ERR_STREAM_DESTROYED 錯誤。如果資料應在關閉前排空,請使用 end() 而非 destroy,或者在銷毀串流前等待 'drain' 事件。
const { Writable } = require('node:stream');
const myStream = new Writable();
const fooErr = new Error('foo error');
myStream.destroy(fooErr);
myStream.on('error', (fooErr) => console.error(fooErr.message)); // foo error
const { Writable } = require('node:stream');
const myStream = new Writable();
myStream.destroy();
myStream.on('error', function wontHappen() {});
const { Writable } = require('node:stream');
const myStream = new Writable();
myStream.destroy();
myStream.write('foo', (error) => console.error(error.code));
// ERR_STREAM_DESTROYED
一旦呼叫了 destroy(),任何進一步的呼叫都將是空操作,除了來自 _destroy() 的錯誤外,不會再發出進一步的 'error'。
實作編寫者不應覆寫此方法,而應實作 writable._destroy()。
writable.closed#
- 類型:
<boolean>
在 'close' 發出後為 true。
writable.destroyed#
- 類型:
<boolean>
在呼叫 writable.destroy() 後為 true。
const { Writable } = require('node:stream');
const myStream = new Writable();
console.log(myStream.destroyed); // false
myStream.destroy();
console.log(myStream.destroyed); // true
writable.end([chunk[, encoding]][, callback])#
chunk<string>|<Buffer>|<TypedArray>|<DataView>|<any>選配的寫入資料。對於不在物件模式下運作的串流,chunk必須是<string>,<Buffer>,<TypedArray>或<DataView>。對於物件模式串流,chunk可以是除null以外的任何 JavaScript 值。encoding<string>如果chunk是字串,則為其編碼格式callback<Function>串流結束時的回呼。- 傳回:
<this>
呼叫 writable.end() 方法表示不會再向 Writable 寫入更多資料。選配的 chunk 與 encoding 參數允許在關閉串流前立即寫入最後一塊資料。
在呼叫 stream.end() 之後呼叫 stream.write() 方法將會引發錯誤。
// Write 'hello, ' and then end with 'world!'.
const fs = require('node:fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// Writing more now is not allowed!
writable.setDefaultEncoding(encoding)#
writable.setDefaultEncoding() 方法為 Writable 串流設定預設 encoding。
writable.uncork()#
writable.uncork() 方法會排空 (flush) 自呼叫 stream.cork() 以來緩存的所有資料。
當使用 writable.cork() 與 writable.uncork() 來管理串流寫入的緩衝時,請使用 process.nextTick() 延遲呼叫 writable.uncork()。這樣做可以批次處理在給定 Node.js 事件迴圈階段中發生的所有 writable.write() 呼叫。
stream.cork();
stream.write('some ');
stream.write('data ');
process.nextTick(() => stream.uncork());
如果在串流上多次呼叫 writable.cork() 方法,則必須呼叫相同次數的 writable.uncork() 才能完全排空緩存的資料。
stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
stream.uncork();
// The data will not be flushed until uncork() is called a second time.
stream.uncork();
});
另請參閱:writable.cork()。
writable.writable#
- 類型:
<boolean>
如果呼叫 writable.write() 是安全的(表示串流尚未被銷毀、報錯或結束),則為 true。
writable.writableAborted#
- 類型:
<boolean>
回傳串流是否在發出 'finish' 之前已被銷毀或報錯。
writable.writableEnded#
- 類型:
<boolean>
在呼叫 writable.end() 後為 true。此屬性不表示資料是否已排空,若要判斷此項,請改用 writable.writableFinished。
writable.writableCorked#
- 類型:
<integer>
需要呼叫 writable.uncork() 的次數,以便完全取消串流的緩衝 (uncork)。
writable.errored#
- 類型:
<Error>
如果串流在銷毀時帶有錯誤,則回傳該錯誤。
writable.writableFinished#
- 類型:
<boolean>
在發出 'finish' 事件前立即設定為 true。
writable.writableHighWaterMark#
- 類型:
<number>
回傳建立此 Writable 時傳入的 highWaterMark 值。
writable.writableLength#
- 類型:
<number>
此屬性包含隊列中準備寫入的位元組數(或物件數)。此值提供了關於 highWaterMark 狀態的內省資料。
writable.writableNeedDrain#
- 類型:
<boolean>
如果串流的緩衝區已滿且串流將發出 'drain',則為 true。
writable.writableObjectMode#
- 類型:
<boolean>
給定 Writable 串流之 objectMode 屬性的 getter。
writable[Symbol.asyncDispose]()#
以 AbortError 呼叫 writable.destroy() 並回傳一個在串流結束時履行的 promise。
writable.write(chunk[, encoding][, callback])#
chunk<string>|<Buffer>|<TypedArray>|<DataView>|<any>選配的寫入資料。對於不在物件模式下運作的串流,chunk必須是<string>,<Buffer>,<TypedArray>或<DataView>。對於物件模式串流,chunk可以是除null以外的任何 JavaScript 值。encoding<string>|<null>如果chunk是字串,則為其編碼。預設值:'utf8'callback<Function>當此資料區塊排空後的回呼。- 回傳:
<boolean>如果串流希望呼叫端程式碼在繼續寫入額外資料前等待'drain'事件發出,則回傳false;否則回傳true。
writable.write() 方法將一些資料寫入串流,並在資料被完全處理後呼叫提供的 callback。如果發生錯誤,callback 將以錯誤作為其第一個參數被呼叫。callback 是非同步呼叫的,且在 'error' 發出之前。
如果在接受 chunk 後,內部緩衝區小於建立串流時配置的 highWaterMark,則回傳值為 true。如果回傳 false,則應停止進一步嘗試向串流寫入資料,直到發出 'drain' 事件。
當串流未處於消耗 (draining) 狀態時,呼叫 write() 會緩存 chunk 並回傳 false。一旦所有目前緩存的區塊被消耗完畢(作業系統接受傳遞),將發出 'drain' 事件。一旦 write() 回傳 false,在發出 'drain' 事件前請勿再寫入更多區塊。雖然允許在未消耗完畢的串流上呼叫 write(),但 Node.js 會緩存所有寫入的區塊,直到達到最大記憶體使用量,屆時它將無條件中止。即使在中止前,高記憶體使用量也會導致垃圾回收器效能不佳以及高 RSS(通常即使在不再需要記憶體後也不會釋放回系統)。由於如果遠端對等端不讀取資料,TCP socket 可能永遠不會消耗完畢,因此寫入未在消耗狀態的 socket 可能會導致遠端可利用的漏洞。
在串流未處於消耗狀態時寫入資料對於 Transform 來說尤其成問題,因為 Transform 串流預設處於暫停狀態,直到它們被導流 (piped) 或新增了 'data' 或 'readable' 事件處理程式。
如果待寫入的資料可以按需產生或獲取,建議將邏輯封裝到 Readable 中並使用 stream.pipe()。但是,如果偏好呼叫 write(),可以使用 'drain' 事件來尊重反壓 (backpressure) 並避免記憶體問題:
function write(data, cb) {
if (!stream.write(data)) {
stream.once('drain', cb);
} else {
process.nextTick(cb);
}
}
// Wait for cb to be called before doing any other write.
write('hello', () => {
console.log('Write completed, do more writes now.');
});
物件模式下的 Writable 串流將始終忽略 encoding 參數。
可讀串流 (Readable streams)#
可讀串流是消耗資料之*來源*的抽象化。
Readable 串流的例子包括:
所有 Readable 串流都實作了由 stream.Readable 類別定義的介面。
兩種讀取模式#
Readable 串流實際上在以下兩種模式之一運作:流動模式 (flowing) 與暫停模式 (paused)。這些模式與物件模式無關。不論是否處於流動模式或暫停模式,Readable 串流都可以是或不是物件模式。
-
在流動模式下,資料會自動從底層系統讀取,並透過
EventEmitter介面的事件儘快提供給應用程式。 -
在暫停模式下,必須明確呼叫
stream.read()方法來從串流中讀取資料區塊。
所有 Readable 串流最初都處於暫停模式,但可以透過以下方式之一切換到流動模式:
- 新增
'data'事件處理程式。 - 呼叫
stream.resume()方法。 - 呼叫
stream.pipe()方法將資料傳送到Writable。
Readable 可以使用以下方式之一切換回暫停模式:
- 如果沒有導流目標,則呼叫
stream.pause()方法。 - 如果有導流目標,則移除所有導流目標。可以透過呼叫
stream.unpipe()方法移除多個導流目標。
要記住的重要概念是,除非提供了消耗或忽略資料的機制,否則 Readable 不會產生資料。如果消耗機制被停用或移除,Readable 將*嘗試*停止產生資料。
基於回溯相容性,移除 'data' 事件處理程式**不會**自動暫停串流。此外,如果有導流目標,呼叫 stream.pause() 並不能保證串流在這些目標消耗完畢並請求更多資料時會*保持*暫停狀態。
如果 Readable 切換到流動模式,但沒有可用的使用者來處理資料,則該資料將會丟失。例如,當呼叫 readable.resume() 方法但未附加 'data' 事件接聽器時,或從串流中移除 'data' 事件處理程式時,就會發生這種情況。
新增 'readable' 事件處理程式會自動使串流停止流動,資料必須透過 readable.read() 消耗。如果移除了 'readable' 事件處理程式,且有 'data' 事件處理程式,則串流將再次開始流動。
三種狀態#
Readable 串流的「兩種模式」是對 Readable 串流實作內部發生的更複雜狀態管理的簡化抽象。
具體而言,在任何給定時間點,每個 Readable 都處於以下三種可能狀態之一:
readable.readableFlowing === nullreadable.readableFlowing === falsereadable.readableFlowing === true
當 readable.readableFlowing 為 null 時,不提供消耗串流資料的機制。因此,串流不會產生資料。在此狀態下,附加 'data' 事件的接聽器、呼叫 readable.pipe() 方法或呼叫 readable.resume() 方法將使 readable.readableFlowing 切換為 true,導致 Readable 在產生資料時開始積極地發出事件。
呼叫 readable.pause()、readable.unpipe() 或接收到反壓將導致 readable.readableFlowing 被設定為 false,暫時停止事件流,但*不*停止資料產生。在此狀態下,附加 'data' 事件的接聽器不會將 readable.readableFlowing 切換為 true。
const { PassThrough, Writable } = require('node:stream');
const pass = new PassThrough();
const writable = new Writable();
pass.pipe(writable);
pass.unpipe(writable);
// readableFlowing is now false.
pass.on('data', (chunk) => { console.log(chunk.toString()); });
// readableFlowing is still false.
pass.write('ok'); // Will not emit 'data'.
pass.resume(); // Must be called to make stream emit 'data'.
// readableFlowing is now true.
當 readable.readableFlowing 為 false 時,資料可能會在串流的內部緩衝區中累積。
選擇一種 API 風格#
Readable 串流 API 跨多個 Node.js 版本演進,並提供多種消耗串流資料的方法。通常,開發人員應選擇*其中一種*消耗資料的方法,*絕不應*使用多種方法從單個串流消耗資料。具體而言,結合使用 on('data')、on('readable')、pipe() 或非同步迭代器可能會導致不直覺的行為。
類別:stream.Readable#
事件:'close'#
當串流及其任何底層資源(例如檔案描述符)已關閉時,會發出 'close' 事件。該事件表示將不再發出任何事件,也不會再進行任何計算。
如果 Readable 串流是在帶有 emitClose 選項的情況下建立的,則它始終會發出 'close' 事件。
事件:'data'#
chunk<Buffer>|<string>|<any>資料區塊。對於不在物件模式下運作的串流,區塊將是字串或Buffer。對於物件模式串流,區塊可以是除null以外的任何 JavaScript 值。
每當串流將資料區塊的所有權移交給使用者時,都會發出 'data' 事件。每當透過呼叫 readable.pipe()、readable.resume() 或透過為 'data' 事件附加接聽器回呼而將串流切換到流動模式時,都可能發生這種情況。每當呼叫 readable.read() 方法且有資料區塊可回傳時,也會發出 'data' 事件。
為未明確暫停的串流附加 'data' 事件接聽器會將串流切換到流動模式。資料將在可用時立即傳遞。
如果使用 readable.setEncoding() 方法為串流指定了預設編碼,則接聽器回呼將接收到字串形式的資料區塊;否則資料將以 Buffer 形式傳遞。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
事件:'end'#
當串流中不再有資料可供消耗時,會發出 'end' 事件。
除非資料已被完全消耗,否則 'end' 事件**不會發出**。這可以透過將串流切換到流動模式,或重複呼叫 stream.read() 直到所有資料都被消耗來達成。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
console.log('There will be no more data.');
});
事件:'error'#
- 類型:
<Error>
Readable 實作可能隨時發出 'error' 事件。通常,如果底層串流由於底層內部故障而無法產生資料,或者當串流實作嘗試推送無效的資料區塊時,就會發生這種情況。
接聽器回呼將接收到單個 Error 物件。
事件:'pause'#
當呼叫 stream.pause() 且 readableFlowing 不為 false 時,會發出 'pause' 事件。
事件:'readable'#
當串流中有資料可供讀取(最多到配置的高水位線 state.highWaterMark)時,會發出 'readable' 事件。實際上,它表示串流在緩衝區內有新資訊。如果緩衝區內有資料,可以呼叫 stream.read() 來檢索該資料。此外,當達到串流結尾時,也可能發出 'readable' 事件。
const readable = getReadableStreamSomehow();
readable.on('readable', function() {
// There is some data to read now.
let data;
while ((data = this.read()) !== null) {
console.log(data);
}
});
如果已達到串流結尾,呼叫 stream.read() 將回傳 null 並觸發 'end' 事件。如果從未有任何資料可讀,情況也是如此。例如,在下例中,foo.txt 是一個空檔案:
const fs = require('node:fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
console.log(`readable: ${rr.read()}`);
});
rr.on('end', () => {
console.log('end');
});
執行此腳本的輸出為:
$ node test.js
readable: null
end
在某些情況下,為 'readable' 事件附加接聽器會導致一定數量的資料被讀入內部緩衝區。
通常,readable.pipe() 與 'data' 事件機制比 'readable' 事件更容易理解。然而,處理 'readable' 可能會提高吞吐量。
如果同時使用 'readable' 與 'data',則 'readable' 在控制流動方面具有優先權,即 'data' 僅在呼叫 stream.read() 時發出。readableFlowing 屬性將變為 false。如果在移除 'readable' 時有 'data' 接聽器,串流將開始流動,即無需呼叫 .resume() 即可發出 'data' 事件。
事件:'resume'#
當呼叫 stream.resume() 且 readableFlowing 不為 true 時,會發出 'resume' 事件。
readable.destroy([error])#
銷毀串流。選配地發出 'error' 事件,並發出 'close' 事件(除非 emitClose 設定為 false)。在此呼叫之後,可讀串流將釋放任何內部資源,隨後對 push() 的呼叫將被忽略。
一旦呼叫了 destroy(),任何進一步的呼叫都將是空操作,除了來自 _destroy() 的錯誤外,不會再發出進一步的 'error'。
實作編寫者不應覆寫此方法,而應實作 readable._destroy()。
readable.closed#
- 類型:
<boolean>
在 'close' 發出後為 true。
readable.destroyed#
- 類型:
<boolean>
在呼叫 readable.destroy() 後為 true。
readable.isPaused()#
- 傳回:
<boolean>
readable.isPaused() 方法回傳 Readable 目前的運作狀態。這主要由 readable.pipe() 方法底層的機制使用。在大多數典型情況下,沒有理由直接使用此方法。
const readable = new stream.Readable();
readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false
readable.pause()#
- 傳回:
<this>
readable.pause() 方法將使處於流動模式的串流停止發出 'data' 事件,並切換出流動模式。任何變為可用的資料都將保留在內部緩衝區中。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
readable.pause();
console.log('There will be no additional data for 1 second.');
setTimeout(() => {
console.log('Now data will start flowing again.');
readable.resume();
}, 1000);
});
如果存在 'readable' 事件接聽器,readable.pause() 方法將不起作用。
readable.pipe(destination[, options])#
destination<stream.Writable>資料寫入的目標options<Object>導流選項end<boolean>當讀取端結束時結束寫入端。預設值:true。
- 回傳:
<stream.Writable>目標 (*destination*),如果它是Duplex或Transform串流,則允許鏈式導流
readable.pipe() 方法將 Writable 串流連接到 readable,使其自動切換到流動模式並將其所有資料推送到連接的 Writable。資料流將自動管理,以便目標 Writable 串流不會被更快的 Readable 串流淹沒。
以下範例將 readable 的所有資料導流到名為 file.txt 的檔案中:
const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'.
readable.pipe(writable);
可以將多個 Writable 串流連接到單個 Readable 串流。
readable.pipe() 方法回傳對目標串流的引用,使得建立導流串流鏈成為可能:
const fs = require('node:fs');
const zlib = require('node:zlib');
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);
預設情況下,當來源 Readable 串流發出 'end' 時,會在目標 Writable 串流上呼叫 stream.end(),使得目標不再可寫。若要停用此預設行為,可以將 end 選項傳遞為 false,讓目標串流保持開啟:
reader.pipe(writer, { end: false });
reader.on('end', () => {
writer.end('Goodbye\n');
});
一個重要的警告是,如果 Readable 串流在處理過程中發出錯誤,目標 Writable *不會*自動關閉。如果發生錯誤,則需要*手動*關閉每個串流以防止記憶體洩漏。
無論指定的選項為何,process.stderr 與 process.stdout Writable 串流直到 Node.js 程序結束前都不會關閉。
readable.read([size])#
readable.read() 方法從內部緩衝區中讀取資料並將其回傳。如果沒有資料可讀,則回傳 null。預設情況下,資料以 Buffer 物件形式回傳,除非已使用 readable.setEncoding() 方法指定編碼或串流運作於物件模式。
選配的 size 參數指定要讀取的特定位元組數。如果沒有 size 位元組的資料可讀,則將回傳 null,*除非*串流已結束,在這種情況下將回傳內部緩衝區中剩餘的所有資料。
如果未指定 size 參數,則將回傳內部緩衝區中包含的所有資料。
size 參數必須小於或等於 1 GiB。
readable.read() 方法應僅在處於暫停模式的 Readable 串流上呼叫。在流動模式下,readable.read() 會自動被呼叫,直到內部緩衝區完全消耗完畢。
const readable = getReadableStreamSomehow();
// 'readable' may be triggered multiple times as data is buffered in
readable.on('readable', () => {
let chunk;
console.log('Stream is readable (new data received in buffer)');
// Use a loop to make sure we read all currently available data
while (null !== (chunk = readable.read())) {
console.log(`Read ${chunk.length} bytes of data...`);
}
});
// 'end' will be triggered once when there is no more data available
readable.on('end', () => {
console.log('Reached end of stream.');
});
每次對 readable.read() 的呼叫都會回傳一個資料區塊或 null,表示目前沒有更多資料可讀。這些區塊不會自動串接。由於單次 read() 呼叫不會回傳所有資料,因此可能需要使用 while 迴圈持續讀取區塊,直到檢索到所有資料。讀取大檔案時,.read() 可能會暫時回傳 null,表示它已消耗完所有緩衝內容,但可能還有更多資料待緩衝。在這種情況下,一旦緩衝區中有更多資料,就會發出新的 'readable' 事件,而 'end' 事件表示資料傳輸結束。
因此,若要從 readable 讀取檔案的全部內容,必須在多個 'readable' 事件中收集資料區塊:
const chunks = [];
readable.on('readable', () => {
let chunk;
while (null !== (chunk = readable.read())) {
chunks.push(chunk);
}
});
readable.on('end', () => {
const content = chunks.join('');
});
不論 size 參數的值為何,物件模式下的 Readable 串流呼叫 readable.read(size) 始終會回傳單個項目。
如果 readable.read() 方法回傳一個資料區塊,也會發出 'data' 事件。
在發出 'end' 事件後呼叫 stream.read([size]) 將回傳 null。不會引發執行階段錯誤。
readable.readable#
- 類型:
<boolean>
如果呼叫 readable.read() 是安全的(表示串流尚未被銷毀、報錯或結束),則為 true。
readable.readableAborted#
- 類型:
<boolean>
回傳串流是否在發出 'end' 之前已被銷毀或報錯。
readable.readableDidRead#
- 類型:
<boolean>
回傳是否已發出 'data'。
readable.readableEncoding#
給定 Readable 串流之 encoding 屬性的 getter。可以使用 readable.setEncoding() 方法設定 encoding 屬性。
readable.readableEnded#
- 類型:
<boolean>
當發出 'end' 事件時變為 true。
readable.errored#
- 類型:
<Error>
如果串流在銷毀時帶有錯誤,則回傳該錯誤。
readable.readableFlowing#
- 類型:
<boolean>
此屬性反映了 三種狀態 章節中描述的 Readable 串流目前狀態。
readable.readableHighWaterMark#
- 類型:
<number>
回傳建立此 Readable 時傳入的 highWaterMark 值。
readable.readableLength#
- 類型:
<number>
此屬性包含隊列中準備讀取的位元組數(或物件數)。此值提供了關於 highWaterMark 狀態的內省資料。
readable.readableObjectMode#
- 類型:
<boolean>
給定 Readable 串流之 objectMode 屬性的 getter。
readable.resume()#
- 傳回:
<this>
readable.resume() 方法使明確暫停的 Readable 串流恢復發出 'data' 事件,並將串流切換到流動模式。
readable.resume() 方法可以用於完全消耗來自串流的資料,而無需實際處理任何資料:
getReadableStreamSomehow()
.resume()
.on('end', () => {
console.log('Reached the end, but did not read anything.');
});
如果存在 'readable' 事件接聽器,readable.resume() 方法將不起作用。
readable.setEncoding(encoding)#
readable.setEncoding() 方法設定從 Readable 串流讀取之資料的字元編碼。
預設情況下,未分配編碼,串流資料將以 Buffer 物件形式回傳。設定編碼會導致串流資料以指定編碼的字串而非 Buffer 物件形式回傳。例如,呼叫 readable.setEncoding('utf8') 將導致輸出資料被解釋為 UTF-8 資料並作為字串傳遞。呼叫 readable.setEncoding('hex') 將使資料以十六進位字串格式編碼。
Readable 串流將正確處理透過串流傳遞的多位元組字元,如果僅將其作為 Buffer 物件從串流中拉取,這些字元可能會被錯誤地解碼。
const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
assert.equal(typeof chunk, 'string');
console.log('Got %d characters of string data:', chunk.length);
});
readable.unpipe([destination])#
destination<stream.Writable>選配,要取消導流的特定串流- 傳回:
<this>
readable.unpipe() 方法分離先前使用 stream.pipe() 方法附加的 Writable 串流。
如果未指定 destination,則分離*所有*管道。
如果指定了 destination,但未對其設置管道,則該方法不執行任何操作。
const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second.
readable.pipe(writable);
setTimeout(() => {
console.log('Stop writing to file.txt.');
readable.unpipe(writable);
console.log('Manually close the file stream.');
writable.end();
}, 1000);
readable.unshift(chunk[, encoding])#
chunk<Buffer>|<TypedArray>|<DataView>|<string>|<null>|<any>推回讀取隊列的資料區塊。對於不在物件模式下運作的串流,chunk必須是<string>,<Buffer>,<TypedArray>,<DataView>或null。對於物件模式串流,chunk可以是任何 JavaScript 值。encoding<string>字串區塊的編碼。必須是有效的Buffer編碼,例如'utf8'或'ascii'。
將 chunk 作為 null 傳遞表示串流結束 (EOF),其行為與 readable.push(null) 相同,之後無法再寫入任何資料。EOF 信號被置於緩衝區末尾,任何已緩衝的資料仍將被排空。
readable.unshift() 方法將資料區塊推回內部緩衝區。這在某些情況下很有用,例如當程式碼消耗串流時,需要「退回」一些它樂觀地從來源拉取的資料,以便將資料傳遞給其他方。
在發出 'end' 事件後不能呼叫 stream.unshift(chunk) 方法,否則將拋出執行階段錯誤。
使用 stream.unshift() 的開發者通常應考慮改用 Transform 串流。更多資訊請參閱 串流實作者 API 章節。
// Pull off a header delimited by \n\n.
// Use unshift() if we get too much.
// Call the callback with (error, header, stream).
const { StringDecoder } = require('node:string_decoder');
function parseHeader(stream, callback) {
stream.on('error', callback);
stream.on('readable', onReadable);
const decoder = new StringDecoder('utf8');
let header = '';
function onReadable() {
let chunk;
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk);
if (str.includes('\n\n')) {
// Found the header boundary.
const split = str.split(/\n\n/);
header += split.shift();
const remaining = split.join('\n\n');
const buf = Buffer.from(remaining, 'utf8');
stream.removeListener('error', callback);
// Remove the 'readable' listener before unshifting.
stream.removeListener('readable', onReadable);
if (buf.length)
stream.unshift(buf);
// Now the body of the message can be read from the stream.
callback(null, header, stream);
return;
}
// Still reading the header.
header += str;
}
}
}
與 stream.push(chunk) 不同,stream.unshift(chunk) 不會透過重設串流內部的讀取狀態來結束讀取程序。如果在讀取過程中(即從自訂串流的 stream._read() 實作內部)呼叫 readable.unshift(),可能會導致非預期的結果。在呼叫 readable.unshift() 後立即呼叫 stream.push('') 會適當地重設讀取狀態,然而,最好避免在執行讀取的過程中呼叫 readable.unshift()。
readable.wrap(stream)#
在 Node.js 0.10 之前,串流並未實作目前定義的整個 node:stream 模組 API。(更多資訊請參閱 相容性。)
當使用會發出 'data' 事件且具有僅具建議性的 stream.pause() 方法的舊版 Node.js 函式庫時,可以使用 readable.wrap() 方法建立一個使用該舊串流作為其資料來源的 Readable 串流。
很少需要使用 readable.wrap(),但該方法是為了方便與舊版 Node.js 應用程式和函式庫進行互動而提供的。
const { OldReader } = require('./old-api-module.js');
const { Readable } = require('node:stream');
const oreader = new OldReader();
const myReader = new Readable().wrap(oreader);
myReader.on('readable', () => {
myReader.read(); // etc.
});
readable[Symbol.asyncIterator]()#
- 回傳:
<AsyncIterator>用於完全消耗串流。
const fs = require('node:fs');
async function print(readable) {
readable.setEncoding('utf8');
let data = '';
for await (const chunk of readable) {
data += chunk;
}
console.log(data);
}
print(fs.createReadStream('file')).catch(console.error);
如果迴圈以 break、return 或 throw 終止,串流將被銷毀。換句話說,迭代串流將完全消耗該串流。串流將以等於 highWaterMark 選項大小的區塊進行讀取。在上面的程式碼範例中,如果檔案的資料少於 64 KiB,則資料將在單個區塊中,因為沒有為 fs.createReadStream() 提供 highWaterMark 選項。
readable[Symbol.asyncDispose]()#
使用 AbortError 呼叫 readable.destroy() 並回傳一個在串流結束時完成 (fulfill) 的 promise。
readable.compose(stream[, options])#
stream<Writable>|<Duplex>|<WritableStream>|<TransformStream>|<Function>options<Object>signal<AbortSignal>如果信號被中止,允許銷毀串流。
- 回傳:
<Duplex>與stream組合的串流。
import { Readable } from 'node:stream';
async function* splitToWords(source) {
for await (const chunk of source) {
const words = String(chunk).split(' ');
for (const word of words) {
yield word;
}
}
}
const wordsStream = Readable.from(['text passed through', 'composed stream']).compose(splitToWords);
const words = await wordsStream.toArray();
console.log(words); // prints ['text', 'passed', 'through', 'composed', 'stream']
readable.compose(s) 等同於 stream.compose(readable, s)。
此方法還允許提供 <AbortSignal>,當中止時會銷毀組合後的串流。
更多資訊請參閱 stream.compose(...streams)。
readable.iterator([options])#
options<Object>destroyOnReturn<boolean>當設定為false時,在非同步迭代器上呼叫return,或使用break、return或throw結束for await...of迭代將不會銷毀串流。預設值:true。
- 回傳:
<AsyncIterator>用於消耗串流。
此方法建立的迭代器讓使用者可以選擇在 for await...of 迴圈透過 return、break 或 throw 結束時,是否取消串流的銷毀,或者在迭代期間串流發出錯誤時,迭代器是否應該銷毀串流。
const { Readable } = require('node:stream');
async function printIterator(readable) {
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk); // 1
break;
}
console.log(readable.destroyed); // false
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk); // Will print 2 and then 3
}
console.log(readable.destroyed); // True, stream was totally consumed
}
async function printSymbolAsyncIterator(readable) {
for await (const chunk of readable) {
console.log(chunk); // 1
break;
}
console.log(readable.destroyed); // true
}
async function showBoth() {
await printIterator(Readable.from([1, 2, 3]));
await printSymbolAsyncIterator(Readable.from([1, 2, 3]));
}
showBoth();
readable.map(fn[, options])#
穩定性:1 - 實驗性
fn<Function>|<AsyncFunction>用於對串流中的每個區塊進行映射 (map) 的函數。data<any>來自串流的一個資料區塊。options<Object>signal<AbortSignal>如果串流被銷毀則中止,允許提前中止fn呼叫。
options<Object>concurrency<number>串流上一次可以同時呼叫fn的最大並發數。預設值:1。highWaterMark<number>在等待使用者消耗已映射項目時要緩衝多少個項目。預設值:concurrency * 2 - 1。signal<AbortSignal>如果信號被中止,允許銷毀串流。
- 回傳:
<Readable>使用函數fn映射後的串流。
此方法允許對串流進行映射。對串流中的每個區塊都會呼叫一次 fn 函數。如果 fn 函數回傳一個 promise,則在將結果傳遞給結果串流之前會先 await 該 promise。
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
// With a synchronous mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
console.log(chunk); // 2, 4, 6, 8
}
// With an asynchronous mapper, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map((domain) => resolver.resolve4(domain), { concurrency: 2 });
for await (const result of dnsResults) {
console.log(result); // Logs the DNS result of resolver.resolve4.
}
readable.filter(fn[, options])#
穩定性:1 - 實驗性
fn<Function>|<AsyncFunction>用於過濾串流區塊的函數。data<any>來自串流的一個資料區塊。options<Object>signal<AbortSignal>如果串流被銷毀則中止,允許提前中止fn呼叫。
options<Object>concurrency<number>串流上一次可以同時呼叫fn的最大並發數。預設值:1。highWaterMark<number>在等待使用者消耗已過濾項目時要緩衝多少個項目。預設值:concurrency * 2 - 1。signal<AbortSignal>如果信號被中止,允許銷毀串流。
- 回傳:
<Readable>使用謂詞fn過濾後的串流。
此方法允許過濾串流。對於串流中的每個區塊,都會呼叫 fn 函數,如果它回傳真值 (truthy),則該區塊將被傳遞給結果串流。如果 fn 函數回傳一個 promise,則會先 await 該 promise。
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).filter(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address.ttl > 60;
}, { concurrency: 2 });
for await (const result of dnsResults) {
// Logs domains with more than 60 seconds on the resolved dns record.
console.log(result);
}
readable.forEach(fn[, options])#
穩定性:1 - 實驗性
fn<Function>|<AsyncFunction>對串流的每個區塊呼叫的函數。data<any>來自串流的一個資料區塊。options<Object>signal<AbortSignal>如果串流被銷毀則中止,允許提前中止fn呼叫。
options<Object>concurrency<number>串流上一次可以同時呼叫fn的最大並發數。預設值:1。signal<AbortSignal>如果信號被中止,允許銷毀串流。
- 回傳:
<Promise>串流完成時的 promise。
此方法允許迭代串流。對於串流中的每個區塊,都會呼叫 fn 函數。如果 fn 函數回傳一個 promise,則會先 await 該 promise。
此方法與 for await...of 迴圈不同之處在於它可以選擇性地並發處理區塊。此外,forEach 迭代只能透過傳遞 signal 選項並中止相關的 AbortController 來停止,而 for await...of 可以使用 break 或 return 停止。在任何一種情況下,串流都將被銷毀。
此方法與監聽 'data' 事件不同之處在於它在底層機制中使用了 readable 事件,並且可以限制並發 fn 呼叫的數量。
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address;
}, { concurrency: 2 });
await dnsResults.forEach((result) => {
// Logs result, similar to `for await (const result of dnsResults)`
console.log(result);
});
console.log('done'); // Stream has finished
readable.toArray([options])#
穩定性:1 - 實驗性
options<Object>signal<AbortSignal>如果信號被中止,允許取消 toArray 操作。
- 回傳:
<Promise>一個包含串流內容陣列的 promise。
此方法可以輕鬆獲取串流的內容。
由於此方法會將整個串流讀入記憶體,因此它抵消了串流的優點。它旨在用於互操作性和便利性,而不是作為消耗串流的主要方式。
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]
const resolver = new Resolver();
// Make dns queries concurrently using .map and collect
// the results into an array using toArray
const dnsResults = await Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address;
}, { concurrency: 2 }).toArray();
readable.some(fn[, options])#
穩定性:1 - 實驗性
fn<Function>|<AsyncFunction>對串流的每個區塊呼叫的函數。data<any>來自串流的一個資料區塊。options<Object>signal<AbortSignal>如果串流被銷毀則中止,允許提前中止fn呼叫。
options<Object>concurrency<number>串流上一次可以同時呼叫fn的最大並發數。預設值:1。signal<AbortSignal>如果信號被中止,允許銷毀串流。
- 回傳:
<Promise>如果fn對於至少一個區塊回傳真值,則此 promise 的評估結果為true。
此方法類似於 Array.prototype.some,並對串流中的每個區塊呼叫 fn,直到 awaited 的回傳值為 true(或任何真值)。一旦某個區塊的 fn 呼叫 awaited 回傳值為真,串流將被銷毀,並且 promise 將以 true 完成。如果對所有區塊的 fn 呼叫都沒有回傳真值,則 promise 將以 false 完成。
import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';
// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).some((x) => x > 2); // true
await Readable.from([1, 2, 3, 4]).some((x) => x < 0); // false
// With an asynchronous predicate, making at most 2 file checks at a time.
const anyBigFile = await Readable.from([
'file1',
'file2',
'file3',
]).some(async (fileName) => {
const stats = await stat(fileName);
return stats.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished
readable.find(fn[, options])#
穩定性:1 - 實驗性
fn<Function>|<AsyncFunction>對串流的每個區塊呼叫的函數。data<any>來自串流的一個資料區塊。options<Object>signal<AbortSignal>如果串流被銷毀則中止,允許提前中止fn呼叫。
options<Object>concurrency<number>串流上一次可以同時呼叫fn的最大並發數。預設值:1。signal<AbortSignal>如果信號被中止,允許銷毀串流。
- 回傳:
<Promise>此 promise 的評估結果為fn評估為真值的第一個區塊,如果未找到任何元素,則為undefined。
此方法類似於 Array.prototype.find,並對串流中的每個區塊呼叫 fn,以尋找使 fn 為真值的區塊。一旦某個 fn 呼叫的 awaited 回傳值為真,串流將被銷毀,並且 promise 將以 fn 回傳真值的該值完成。如果對所有區塊的 fn 呼叫都回傳假值 (falsy),則 promise 將以 undefined 完成。
import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';
// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).find((x) => x > 2); // 3
await Readable.from([1, 2, 3, 4]).find((x) => x > 0); // 1
await Readable.from([1, 2, 3, 4]).find((x) => x > 10); // undefined
// With an asynchronous predicate, making at most 2 file checks at a time.
const foundBigFile = await Readable.from([
'file1',
'file2',
'file3',
]).find(async (fileName) => {
const stats = await stat(fileName);
return stats.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(foundBigFile); // File name of large file, if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished
readable.every(fn[, options])#
穩定性:1 - 實驗性
fn<Function>|<AsyncFunction>對串流的每個區塊呼叫的函數。data<any>來自串流的一個資料區塊。options<Object>signal<AbortSignal>如果串流被銷毀則中止,允許提前中止fn呼叫。
options<Object>concurrency<number>串流上一次可以同時呼叫fn的最大並發數。預設值:1。signal<AbortSignal>如果信號被中止,允許銷毀串流。
- 回傳:
<Promise>如果fn對所有區塊都回傳真值,則此 promise 的評估結果為true。
此方法類似於 Array.prototype.every,並對串流中的每個區塊呼叫 fn,以檢查是否所有 awaited 回傳值對於 fn 都是真值。一旦某個區塊的 fn 呼叫 awaited 回傳值為假,串流將被銷毀,並且 promise 將以 false 完成。如果對所有區塊的 fn 呼叫都回傳真值,則 promise 將以 true 完成。
import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';
// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).every((x) => x > 2); // false
await Readable.from([1, 2, 3, 4]).every((x) => x > 0); // true
// With an asynchronous predicate, making at most 2 file checks at a time.
const allBigFiles = await Readable.from([
'file1',
'file2',
'file3',
]).every(async (fileName) => {
const stats = await stat(fileName);
return stats.size > 1024 * 1024;
}, { concurrency: 2 });
// `true` if all files in the list are bigger than 1MiB
console.log(allBigFiles);
console.log('done'); // Stream has finished
readable.flatMap(fn[, options])#
穩定性:1 - 實驗性
fn<Function>|<AsyncGeneratorFunction>|<AsyncFunction>用於對串流中的每個區塊進行映射的函數。data<any>來自串流的一個資料區塊。options<Object>signal<AbortSignal>如果串流被銷毀則中止,允許提前中止fn呼叫。
options<Object>concurrency<number>串流上一次可以同時呼叫fn的最大並發數。預設值:1。signal<AbortSignal>如果信號被中止,允許銷毀串流。
- 回傳:
<Readable>使用函數fn進行扁平映射 (flat-mapped) 後的串流。
此方法透過對串流的每個區塊應用給定的回呼函數,然後將結果扁平化,從而回傳一個新串流。
可以從 fn 回傳一個串流或其他可迭代對象或非同步可迭代對象,結果串流將合併(扁平化)到回傳的串流中。
import { Readable } from 'node:stream';
import { createReadStream } from 'node:fs';
// With a synchronous mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
console.log(chunk); // 1, 1, 2, 2, 3, 3, 4, 4
}
// With an asynchronous mapper, combine the contents of 4 files
const concatResult = Readable.from([
'./1.mjs',
'./2.mjs',
'./3.mjs',
'./4.mjs',
]).flatMap((fileName) => createReadStream(fileName));
for await (const result of concatResult) {
// This will contain the contents (all chunks) of all 4 files
console.log(result);
}
readable.drop(limit[, options])#
穩定性:1 - 實驗性
limit<number>要從可讀串流中捨棄的區塊數量。options<Object>signal<AbortSignal>如果信號被中止,允許銷毀串流。
- 回傳:
<Readable>捨棄了limit個區塊後的串流。
此方法回傳一個捨棄了前 limit 個區塊的新串流。
import { Readable } from 'node:stream';
await Readable.from([1, 2, 3, 4]).drop(2).toArray(); // [3, 4]
readable.take(limit[, options])#
穩定性:1 - 實驗性
limit<number>要從可讀串流中獲取的區塊數量。options<Object>signal<AbortSignal>如果信號被中止,允許銷毀串流。
- 回傳:
<Readable>獲取了limit個區塊後的串流。
此方法回傳一個包含前 limit 個區塊的新串流。
import { Readable } from 'node:stream';
await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
readable.reduce(fn[, initial[, options]])#
穩定性:1 - 實驗性
fn<Function>|<AsyncFunction>用於對串流中的每個區塊呼叫的歸納 (reducer) 函數。previous<any>從上一次呼叫fn獲得的值,或指定的initial值,否則為串流的第一個區塊。data<any>來自串流的一個資料區塊。options<Object>signal<AbortSignal>如果串流被銷毀則中止,允許提前中止fn呼叫。
initial<any>歸納計算中使用的初始值。options<Object>signal<AbortSignal>如果信號被中止,允許銷毀串流。
- 回傳:
<Promise>歸納計算最終值的 promise。
此方法按順序對串流的每個區塊呼叫 fn,並將上一個元素的計算結果傳遞給它。它會回傳一個歸納計算最終值的 promise。
如果沒有提供 initial 值,則使用串流的第一個區塊作為初始值。如果串流為空,則 promise 會被拒絕,並帶有 TypeError 和 ERR_INVALID_ARGS 錯誤代碼屬性。
import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';
const directoryPath = './src';
const filesInDir = await readdir(directoryPath);
const folderSize = await Readable.from(filesInDir)
.reduce(async (totalSize, file) => {
const { size } = await stat(join(directoryPath, file));
return totalSize + size;
}, 0);
console.log(folderSize);
歸納函數逐個元素地迭代串流,這意味著沒有 concurrency 參數或並行處理。要同時執行 reduce,您可以將非同步函數提取到 readable.map 方法中。
import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';
const directoryPath = './src';
const filesInDir = await readdir(directoryPath);
const folderSize = await Readable.from(filesInDir)
.map((file) => stat(join(directoryPath, file)), { concurrency: 2 })
.reduce((totalSize, { size }) => totalSize + size, 0);
console.log(folderSize);
雙工與轉換串流#
類別:stream.Duplex#
雙工串流是同時實作了 Readable 和 Writable 介面的串流。
Duplex 串流的範例包括
duplex.allowHalfOpen#
- 類型:
<boolean>
如果為 false,則串流會在可讀端結束時自動結束可寫端。最初由 allowHalfOpen 建構函式選項設定,預設值為 true。
這可以手動更改以更改現有 Duplex 串流實例的半開放行為,但必須在發出 'end' 事件之前更改。
類別:stream.Transform#
轉換串流是輸出以某種方式與輸入相關聯的 Duplex 串流。與所有 Duplex 串流一樣,Transform 串流同時實作了 Readable 和 Writable 介面。
Transform 串流的範例包括
transform.destroy([error])#
銷毀串流,並選擇性地發出 'error' 事件。在此呼叫之後,轉換串流將釋放任何內部資源。實作者不應覆寫此方法,而應實作 readable._destroy()。Transform 的 _destroy() 預設實作也會發出 'close',除非 emitClose 設定為 false。
一旦呼叫了 destroy(),任何進一步的呼叫都將不起作用,除了來自 _destroy() 之外的任何進一步錯誤都不會以 'error' 的形式發出。
stream.duplexPair([options])#
工具函數 duplexPair 回傳一個包含兩個項目的陣列,每個項目都是一個連接到另一側的 Duplex 串流。
const [ sideA, sideB ] = duplexPair();
寫入其中一個串流的任何內容都會在另一個串流中變為可讀。它提供了類似於網路連接的行為,其中用戶端寫入的資料變為伺服器可讀的資料,反之亦然。
Duplex 串流是對稱的;可以使用其中之一,行為沒有任何區別。
stream.finished(stream[, options], callback)#
stream<Stream>|<ReadableStream>|<WritableStream>一個可讀及/或可寫的串流/網頁串流 (webstream)。options<Object>error<boolean>如果設定為false,則呼叫emit('error', err)不會被視為已結束。預設值:true。readable<boolean>當設定為false時,即使串流可能仍然可讀,當串流結束時也會呼叫回呼。預設值:true。writable<boolean>當設定為false時,即使串流可能仍然可寫,當串流結束時也會呼叫回呼。預設值:true。signal<AbortSignal>允許中止對串流完成的等待。如果信號被中止,底層串流將*不會*被中止。回呼將被呼叫並帶有AbortError。此函數新增的所有已註冊監聽器也將被移除。
callback<Function>接受一個選用的錯誤參數的回呼函數。- 回傳:
<Function>一個用於移除所有已註冊監聽器的清理函數。
當串流不再可讀、可寫或遇到錯誤或過早關閉事件時得到通知的函數。
const { finished } = require('node:stream');
const fs = require('node:fs');
const rs = fs.createReadStream('archive.tar');
finished(rs, (err) => {
if (err) {
console.error('Stream failed.', err);
} else {
console.log('Stream is done reading.');
}
});
rs.resume(); // Drain the stream.
在串流過早被銷毀(例如中止的 HTTP 請求)且不會發出 'end' 或 'finish' 的錯誤處理場景中特別有用。
finished API 提供 Promise 版本。
stream.finished() 在 callback 被叫用後會留下懸空的事件監聽器(特別是 'error'、'end'、'finish' 和 'close')。原因是為了防止非預期的 'error' 事件(由於不正確的串流實作)導致非預期的崩潰。如果不希望這種行為,則需要在回呼中叫用回傳的清理函數。
const cleanup = finished(rs, (err) => {
cleanup();
// ...
});
stream.pipeline(source[, ...transforms], destination, callback)#
stream.pipeline(streams, callback)#
streams<Stream[]>|<Iterable[]>|<AsyncIterable[]>|<Function[]>|<ReadableStream[]>|<WritableStream[]>|<TransformStream[]>source<Stream>|<Iterable>|<AsyncIterable>|<Function>|<ReadableStream>...transforms<Stream>|<Function>|<TransformStream>source<AsyncIterable>- 回傳:
<AsyncIterable>
destination<Stream>|<Function>|<WritableStream>source<AsyncIterable>- 回傳:
<AsyncIterable>|<Promise>
callback<Function>當管線完全完成時呼叫。err<Error>valdestination回傳的Promise的解析值。
- 回傳:
<Stream>
一個模組方法,用於在串流和產生器之間建立管線,轉發錯誤並正確清理,並在管線完成時提供回呼。
const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');
// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge tar file efficiently:
pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
},
);
pipeline API 提供 Promise 版本。
stream.pipeline() 將對所有串流呼叫 stream.destroy(err),除了
- 已經發出
'end'或'close'的Readable串流。 - 已經發出
'finish'或'close'的Writable串流。
stream.pipeline() 在叫用 callback 後會在串流上留下懸空的事件監聽器。在失敗後重複使用串流的情況下,這可能會導致事件監聽器洩漏和錯誤被忽略。如果最後一個串流是可讀的,懸空的事件監聽器將被移除,以便稍後可以消耗最後一個串流。
當引發錯誤時,stream.pipeline() 會關閉所有串流。將 IncomingRequest 與 pipeline 一起使用可能會導致非預期行為,因為它會在不發送預期回應的情況下銷毀 Socket。請參閱下面的範例
const fs = require('node:fs');
const http = require('node:http');
const { pipeline } = require('node:stream');
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('./fileNotExist.txt');
pipeline(fileStream, res, (err) => {
if (err) {
console.log(err); // No such file
// this message can't be sent once `pipeline` already destroyed the socket
return res.end('error!!!');
}
});
});
stream.compose(...streams)#
穩定性:1 - stream.compose 為實驗性的。
streams<Stream[]>|<Iterable[]>|<AsyncIterable[]>|<Function[]>|<ReadableStream[]>|<WritableStream[]>|<TransformStream[]>|<Duplex[]>|<Function>- 回傳:
<stream.Duplex>
將兩個或多個串流組合成一個 Duplex 串流,該串流寫入第一個串流並從最後一個串流讀取。每個提供的串流都使用 stream.pipeline 導入下一個。如果任何串流出錯,則所有串流(包括外部 Duplex 串流)都將被銷毀。
因為 stream.compose 回傳一個新串流,而該串流又可以(且應該)導入其他串流,所以它啟用了組合。相比之下,當將串流傳遞給 stream.pipeline 時,通常第一個串流是可讀串流,最後一個是可寫串流,形成一個封閉電路。
如果傳遞的是 Function,它必須是接受 source Iterable 的工廠方法。
import { compose, Transform } from 'node:stream';
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''));
},
});
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
}
let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf;
}
console.log(res); // prints 'HELLOWORLD'
stream.compose 可用於將非同步可迭代對象、產生器和函數轉換為串流。
AsyncIterable轉換為可讀的Duplex。不能 yieldnull。AsyncGeneratorFunction轉換為可讀/可寫的轉換Duplex。必須接受一個來源AsyncIterable作為第一個參數。不能 yieldnull。AsyncFunction轉換為可寫的Duplex。必須回傳null或undefined。
import { compose } from 'node:stream';
import { finished } from 'node:stream/promises';
// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
yield 'Hello';
yield 'World';
}());
// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});
let res = '';
// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
for await (const chunk of source) {
res += chunk;
}
});
await finished(compose(s1, s2, s3));
console.log(res); // prints 'HELLOWORLD'
為了方便起見,readable.compose(stream) 方法在 <Readable> 和 <Duplex> 串流上可用,作為此函數的包裝器。
stream.isErrored(stream)#
stream<Readable>|<Writable>|<Duplex>|<WritableStream>|<ReadableStream>- 傳回:
<boolean>
回傳串流是否遇到錯誤。
stream.isReadable(stream)#
stream<Readable>|<Duplex>|<ReadableStream>- 回傳:
<boolean>|<null>- 僅當stream不是有效的Readable、Duplex或ReadableStream時才回傳null。
回傳串流是否為可讀的。
stream.isWritable(stream)#
stream<Writable>|<Duplex>|<WritableStream>- 回傳:
<boolean>|<null>- 僅當stream不是有效的Writable、Duplex或WritableStream時才回傳null。
回傳串流是否為可寫的。
stream.Readable.from(iterable[, options])#
iterable<Iterable>實作Symbol.asyncIterator或Symbol.iterator迭代協定的物件。如果傳遞了 null 值,則發出 'error' 事件。options<Object>提供給new stream.Readable([options])的選項。預設情況下,Readable.from()會將options.objectMode設定為true,除非透過將options.objectMode設定為false明確退出。- 回傳:
<stream.Readable>
一個從迭代器建立可讀串流的工具方法。
const { Readable } = require('node:stream');
async function * generate() {
yield 'hello';
yield 'streams';
}
const readable = Readable.from(generate());
readable.on('data', (chunk) => {
console.log(chunk);
});
基於效能考量,呼叫 Readable.from(string) 或 Readable.from(buffer) 不會對字串或緩衝區進行迭代,以符合其他串流的語義。
如果傳遞了包含 promise 的 Iterable 物件作為參數,可能會導致未處理的拒絕 (unhandled rejection)。
const { Readable } = require('node:stream');
Readable.from([
new Promise((resolve) => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
]);
stream.Readable.fromWeb(readableStream[, options])#
readableStream<ReadableStream>options<Object>encoding<string>highWaterMark<number>objectMode<boolean>signal<AbortSignal>
- 回傳:
<stream.Readable>
stream.Readable.isDisturbed(stream)#
stream<stream.Readable>|<ReadableStream>- 回傳:
boolean
回傳串流是否已被讀取或取消。
stream.Readable.toWeb(streamReadable[, options])#
streamReadable<stream.Readable>options<Object>- 回傳:
<ReadableStream>
stream.Writable.fromWeb(writableStream[, options])#
writableStream<WritableStream>options<Object>decodeStrings<boolean>highWaterMark<number>objectMode<boolean>signal<AbortSignal>
- 回傳:
<stream.Writable>
stream.Writable.toWeb(streamWritable)#
streamWritable<stream.Writable>- 回傳:
<WritableStream>
stream.Duplex.from(src)#
src<Stream>|<Blob>|<ArrayBuffer>|<string>|<Iterable>|<AsyncIterable>|<AsyncGeneratorFunction>|<AsyncFunction>|<Promise>|<Object>|<ReadableStream>|<WritableStream>
一個建立雙工串流的工具方法。
Stream將可寫串流轉換為可寫Duplex,將可讀串流轉換為Duplex。Blob轉換為可讀的Duplex。string轉換為可讀的Duplex。ArrayBuffer轉換為可讀的Duplex。AsyncIterable轉換為可讀的Duplex。不能 yieldnull。AsyncGeneratorFunction轉換為可讀/可寫的轉換Duplex。必須接受一個來源AsyncIterable作為第一個參數。不能 yieldnull。AsyncFunction轉換為可寫的Duplex。必須回傳null或undefinedObject ({ writable, readable })將readable和writable轉換為Stream,然後將它們組合成Duplex,其中Duplex將寫入writable並從readable讀取。Promise轉換為可讀的Duplex。null值會被忽略。ReadableStream轉換為可讀的Duplex。WritableStream轉換為可寫的Duplex。- 回傳:
<stream.Duplex>
如果傳遞了包含 promise 的 Iterable 物件作為參數,可能會導致未處理的拒絕 (unhandled rejection)。
const { Duplex } = require('node:stream');
Duplex.from([
new Promise((resolve) => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
]);
stream.Duplex.fromWeb(pair[, options])#
pair<Object>readable<ReadableStream>writable<WritableStream>
options<Object>- 回傳:
<stream.Duplex>
import { Duplex } from 'node:stream'; import { ReadableStream, WritableStream, } from 'node:stream/web'; const readable = new ReadableStream({ start(controller) { controller.enqueue('world'); }, }); const writable = new WritableStream({ write(chunk) { console.log('writable', chunk); }, }); const pair = { readable, writable, }; const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true }); duplex.write('hello'); for await (const chunk of duplex) { console.log('readable', chunk); }const { Duplex } = require('node:stream'); const { ReadableStream, WritableStream, } = require('node:stream/web'); const readable = new ReadableStream({ start(controller) { controller.enqueue('world'); }, }); const writable = new WritableStream({ write(chunk) { console.log('writable', chunk); }, }); const pair = { readable, writable, }; const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true }); duplex.write('hello'); duplex.once('readable', () => console.log('readable', duplex.read()));
stream.Duplex.toWeb(streamDuplex[, options])#
streamDuplex<stream.Duplex>options<Object>readableType<string>指定建立的可讀可寫對的ReadableStream部分的類型。必須為'bytes'或 undefined。(options.type是此選項的已棄用別名。)
- 傳回:
<Object>readable<ReadableStream>writable<WritableStream>
import { Duplex } from 'node:stream'; const duplex = Duplex({ objectMode: true, read() { this.push('world'); this.push(null); }, write(chunk, encoding, callback) { console.log('writable', chunk); callback(); }, }); const { readable, writable } = Duplex.toWeb(duplex); writable.getWriter().write('hello'); const { value } = await readable.getReader().read(); console.log('readable', value);const { Duplex } = require('node:stream'); const duplex = Duplex({ objectMode: true, read() { this.push('world'); this.push(null); }, write(chunk, encoding, callback) { console.log('writable', chunk); callback(); }, }); const { readable, writable } = Duplex.toWeb(duplex); writable.getWriter().write('hello'); readable.getReader().read().then((result) => { console.log('readable', result.value); });
stream.addAbortSignal(signal, stream)#
signal<AbortSignal>代表可能取消的信號stream<Stream>|<ReadableStream>|<WritableStream>要附加信號的串流。
將 AbortSignal 附加到可讀或可寫串流。這讓程式碼可以使用 AbortController 控制串流銷毀。
在傳遞的 AbortSignal 對應的 AbortController 上呼叫 abort,其行為與在串流上呼叫 .destroy(new AbortError()) 以及對 webstreams 呼叫 controller.error(new AbortError()) 相同。
const fs = require('node:fs');
const controller = new AbortController();
const read = addAbortSignal(
controller.signal,
fs.createReadStream(('object.json')),
);
// Later, abort the operation closing the stream
controller.abort();
或者將 AbortSignal 與作為非同步可迭代對象的可讀串流一起使用
const controller = new AbortController();
setTimeout(() => controller.abort(), 10_000); // set a timeout
const stream = addAbortSignal(
controller.signal,
fs.createReadStream(('object.json')),
);
(async () => {
try {
for await (const chunk of stream) {
await process(chunk);
}
} catch (e) {
if (e.name === 'AbortError') {
// The operation was cancelled
} else {
throw e;
}
}
})();
或者將 AbortSignal 與 ReadableStream 一起使用
const controller = new AbortController();
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello');
controller.enqueue('world');
controller.close();
},
});
addAbortSignal(controller.signal, rs);
finished(rs, (err) => {
if (err) {
if (err.name === 'AbortError') {
// The operation was cancelled
}
}
});
const reader = rs.getReader();
reader.read().then(({ value, done }) => {
console.log(value); // hello
console.log(done); // false
controller.abort();
});
stream.getDefaultHighWaterMark(objectMode)#
回傳串流使用的預設 highWaterMark。預設為 65536 (64 KiB),對於 objectMode 則為 16。
stream.setDefaultHighWaterMark(objectMode, value)#
設定串流使用的預設 highWaterMark。
串流實作者 API#
node:stream 模組 API 旨在讓使用 JavaScript 的原型繼承模型輕鬆實作串流成為可能。
首先,串流開發者會聲明一個繼承自四種基本串流類別(stream.Writable、stream.Readable、stream.Duplex 或 stream.Transform)之一的新 JavaScript 類別,並確保呼叫適當的父類別建構函式。
const { Writable } = require('node:stream');
class MyWritable extends Writable {
constructor({ highWaterMark, ...options }) {
super({ highWaterMark });
// ...
}
}
繼承串流時,請記住使用者在將選項轉發給基礎建構函式之前可以且應該提供哪些選項。例如,如果實作對 autoDestroy 和 emitClose 選項有假設,則不允許使用者覆寫這些選項。明確說明哪些選項被轉發,而不是隱含地轉發所有選項。
新的串流類別隨後必須實作一個或多個特定方法,具體取決於正在建立的串流類型,如下表所示
| 使用案例 | 類別 | 要實作的方法 |
|---|---|---|
| 僅讀取 | Readable |
_read() |
| 僅寫入 | Writable |
_write(), _writev(), _final() |
| 讀取與寫入 | Duplex |
_read(), _write(), _writev(), _final() |
| 操作寫入的資料,然後讀取結果 | Transform |
_transform(), _flush(), _final() |
串流的實作程式碼*絕不*應該呼叫串流中供消費者使用的「公開」方法(如 串流消費者 API 章節中所述)。這樣做可能會導致消耗串流的應用程式程式碼產生不利的副作用。
避免覆寫公開方法,例如 write()、end()、cork()、uncork()、read() 和 destroy(),或透過 .emit() 發出內部事件,例如 'error'、'data'、'end'、'finish' 和 'close'。這樣做會破壞目前和未來的串流不變性 (invariants),導致與其他串流、串流工具以及使用者預期的行為和/或相容性問題。
簡化建構#
在許多簡單的情況下,可以不依賴繼承來建立串流。這可以透過直接建立 stream.Writable、stream.Readable、stream.Duplex 或 stream.Transform 物件的實例並傳遞適當的方法作為建構函式選項來實現。
const { Writable } = require('node:stream');
const myWritable = new Writable({
construct(callback) {
// Initialize state and load resources...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
// Free resources...
},
});
實作可寫串流#
繼承 stream.Writable 類別以實作 Writable 串流。
自訂 Writable 串流*必須*呼叫 new stream.Writable([options]) 建構函式,並實作 writable._write() 和/或 writable._writev() 方法。
new stream.Writable([options])#
options<Object>highWaterMark<number>當stream.write()開始回傳false時的緩衝層級。預設值:65536(64 KiB),對於objectMode串流則為16。decodeStrings<boolean>在將傳遞給stream.write()的string傳遞給stream._write()之前,是否要將其編碼為Buffer(使用stream.write()呼叫中指定的編碼)。其他類型的資料不會被轉換(即Buffer不會被解碼為string)。設定為 false 將防止string被轉換。預設值:true。defaultEncoding<string>當沒有編碼作為參數提供給stream.write()時所使用的預設編碼。預設值:'utf8'。objectMode<boolean>stream.write(anyObj)是否為有效操作。設定後,如果串流實作支援,則可以寫入字串、<Buffer>、<TypedArray>或<DataView>以外的 JavaScript 值。預設值:false。emitClose<boolean>串流在被銷毀後是否應發出'close'。預設值:true。write<Function>stream._write()方法的實作。writev<Function>stream._writev()方法的實作。destroy<Function>stream._destroy()方法的實作。final<Function>stream._final()方法的實作。construct<Function>stream._construct()方法的實作。autoDestroy<boolean>此串流是否應在結束後自動呼叫.destroy()。預設值:true。signal<AbortSignal>代表可能取消的信號。
const { Writable } = require('node:stream');
class MyWritable extends Writable {
constructor(options) {
// Calls the stream.Writable() constructor.
super(options);
// ...
}
}
或者,當使用 ES6 之前的樣式建構函式時
const { Writable } = require('node:stream');
const util = require('node:util');
function MyWritable(options) {
if (!(this instanceof MyWritable))
return new MyWritable(options);
Writable.call(this, options);
}
util.inherits(MyWritable, Writable);
或者,使用簡化建構方式
const { Writable } = require('node:stream');
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
});
在傳遞的 AbortSignal 對應的 AbortController 上呼叫 abort,其行為與在可寫串流上呼叫 .destroy(new AbortError()) 相同。
const { Writable } = require('node:stream');
const controller = new AbortController();
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
signal: controller.signal,
});
// Later, abort the operation closing the stream
controller.abort();
writable._construct(callback)#
callback<Function>當串流完成初始化時呼叫此函數(可選擇性地帶有一個錯誤參數)。
_construct() 方法絕不能被直接呼叫。它可以由子類別實作,如果是這樣,則僅會由內部的 Writable 類別方法呼叫。
此選用函數將在串流建構函式回傳後的下一個 tick 中被呼叫,延遲任何 _write()、_final() 和 _destroy() 呼叫,直到 callback 被呼叫。這對於在串流可以使用之前初始化狀態或非同步初始化資源非常有用。
const { Writable } = require('node:stream');
const fs = require('node:fs');
class WriteStream extends Writable {
constructor(filename) {
super();
this.filename = filename;
this.fd = null;
}
_construct(callback) {
fs.open(this.filename, 'w', (err, fd) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback);
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
writable._write(chunk, encoding, callback)#
chunk<Buffer>|<string>|<any>要寫入的Buffer,是從傳遞給stream.write()的string轉換而來。如果串流的decodeStrings選項為false或串流以物件模式運作,則區塊將不會被轉換,並且將是傳遞給stream.write()的任何內容。encoding<string>如果區塊是字串,則encoding是該字串的字元編碼。如果區塊是Buffer,或者串流以物件模式運作,則可以忽略encoding。callback<Function>當提供的區塊處理完成時,呼叫此函數(可選擇性地帶有一個錯誤參數)。
所有 Writable 串流實作必須提供 writable._write() 和/或 writable._writev() 方法以將資料傳送到基礎資源。
Transform 串流提供其自己的 writable._write() 實作。
此函數絕不能由應用程式程式碼直接呼叫。它應該由子類別實作,且僅由內部的 Writable 類別方法呼叫。
必須在 writable._write() 內部同步呼叫 callback 函數,或非同步(例如在不同的 tick 中)呼叫,以表示寫入成功完成或因錯誤而失敗。如果呼叫失敗,傳遞給 callback 的第一個參數必須是 Error 物件,如果寫入成功,則為 null。
在呼叫 writable._write() 到呼叫 callback 之間發生的所有對 writable.write() 的呼叫都會導致寫入資料被緩衝。當 callback 被叫用時,串流可能會發出一個 'drain' 事件。如果串流實作能夠一次處理多個資料區塊,則應實作 writable._writev() 方法。
如果在建構函式選項中將 decodeStrings 屬性明確設定為 false,則 chunk 將保持為傳遞給 .write() 的同一個物件,並且可能是字串而非 Buffer。這是為了支援對某些字串資料編碼有最佳化處理的實作。在這種情況下,encoding 參數將指示字串的字元編碼。否則,可以安全地忽略 encoding 參數。
writable._write() 方法以前綴底線命名,因為它是定義它的類別的內部方法,且絕不應由使用者程式直接呼叫。
writable._writev(chunks, callback)#
chunks<Object[]>要寫入的資料。該值是一個<Object>陣列,每個物件代表一個離散的寫入資料區塊。這些物件的屬性是callback<Function>當提供的區塊處理完成時,要叫用的回呼函數(可選擇性地帶有一個錯誤參數)。
此函數絕不能由應用程式程式碼直接呼叫。它應該由子類別實作,且僅由內部的 Writable 類別方法呼叫。
在能夠一次處理多個資料區塊的串流實作中,除了 writable._write() 之外,或者作為其替代方法,還可以實作 writable._writev() 方法。如果已實作且有來自先前寫入的緩衝資料,則會呼叫 _writev() 而不是 _write()。
writable._writev() 方法以前綴底線命名,因為它是定義它的類別的內部方法,且絕不應由使用者程式直接呼叫。
writable._destroy(err, callback)#
err<Error>一個可能的錯誤。callback<Function>接受一個選用的錯誤參數的回呼函數。
_destroy() 方法由 writable.destroy() 呼叫。它可以由子類別覆寫,但*絕不能*被直接呼叫。
writable._final(callback)#
callback<Function>當完成寫入任何剩餘資料時呼叫此函數(可選擇性地帶有一個錯誤參數)。
_final() 方法*絕不能*被直接呼叫。它可以由子類別實作,如果是這樣,則僅會由內部的 Writable 類別方法呼叫。
此選用函數將在串流關閉前被呼叫,延遲 'finish' 事件直到 callback 被呼叫。這對於在串流結束前關閉資源或寫入緩衝資料非常有用。
寫入時的錯誤#
處理 writable._write()、writable._writev() 和 writable._final() 方法期間發生的錯誤,必須透過叫用回呼並將錯誤作為第一個參數傳遞來傳播。在這些方法中拋出 Error 或手動發出 'error' 事件會導致未定義的行為。
如果當 Writable 發出錯誤時,Readable 串流正導入 Writable 串流,則 Readable 串流將被斷開管線 (unpiped)。
const { Writable } = require('node:stream');
const myWritable = new Writable({
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'));
} else {
callback();
}
},
});
一個可寫串流範例#
下面說明了一個相當簡單(且有些無意義)的自定義 Writable 串流實作。雖然此特定的 Writable 串流實例沒有任何實際用途,但範例說明了自定義 Writable 串流實例的每個必需元素
const { Writable } = require('node:stream');
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'));
} else {
callback();
}
}
}
在可寫串流中解碼緩衝區#
解碼緩衝區是一項常見任務,例如使用輸入為字串的轉換器時。當使用多位元組字元編碼(例如 UTF-8)時,這不是一個簡單的過程。以下範例顯示如何使用 StringDecoder 和 Writable 解碼多位元組字串。
const { Writable } = require('node:stream');
const { StringDecoder } = require('node:string_decoder');
class StringWritable extends Writable {
constructor(options) {
super(options);
this._decoder = new StringDecoder(options?.defaultEncoding);
this.data = '';
}
_write(chunk, encoding, callback) {
if (encoding === 'buffer') {
chunk = this._decoder.write(chunk);
}
this.data += chunk;
callback();
}
_final(callback) {
this.data += this._decoder.end();
callback();
}
}
const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from);
const w = new StringWritable();
w.write('currency: ');
w.write(euro[0]);
w.end(euro[1]);
console.log(w.data); // currency: €
實作可讀串流#
繼承 stream.Readable 類別以實作 Readable 串流。
自訂 Readable 串流*必須*呼叫 new stream.Readable([options]) 建構函式,並實作 readable._read() 方法。
new stream.Readable([options])#
options<Object>highWaterMark<number>在停止從基礎資源讀取之前存儲在內部緩衝區中的最大 位元組數。預設值:65536(64 KiB),對於objectMode串流則為16。encoding<string>如果指定,則緩衝區將使用指定的編碼解碼為字串。預設值:null。objectMode<boolean>此串流是否應表現為物件串流。這意味著stream.read(n)回傳單個值,而不是大小為n的Buffer。預設值:false。emitClose<boolean>串流在被銷毀後是否應發出'close'。預設值:true。read<Function>stream._read()方法的實作。destroy<Function>stream._destroy()方法的實作。construct<Function>stream._construct()方法的實作。autoDestroy<boolean>此串流是否應在結束後自動呼叫.destroy()。預設值:true。signal<AbortSignal>代表可能取消的信號。
const { Readable } = require('node:stream');
class MyReadable extends Readable {
constructor(options) {
// Calls the stream.Readable(options) constructor.
super(options);
// ...
}
}
或者,當使用 ES6 之前的樣式建構函式時
const { Readable } = require('node:stream');
const util = require('node:util');
function MyReadable(options) {
if (!(this instanceof MyReadable))
return new MyReadable(options);
Readable.call(this, options);
}
util.inherits(MyReadable, Readable);
或者,使用簡化建構方式
const { Readable } = require('node:stream');
const myReadable = new Readable({
read(size) {
// ...
},
});
在傳遞的 AbortSignal 對應的 AbortController 上呼叫 abort,其行為與在建立的可讀串流上呼叫 .destroy(new AbortError()) 相同。
const { Readable } = require('node:stream');
const controller = new AbortController();
const read = new Readable({
read(size) {
// ...
},
signal: controller.signal,
});
// Later, abort the operation closing the stream
controller.abort();
readable._construct(callback)#
callback<Function>當串流完成初始化時呼叫此函數(可選擇性地帶有一個錯誤參數)。
_construct() 方法絕不能被直接呼叫。它可以由子類別實作,如果是這樣,則僅會由內部的 Readable 類別方法呼叫。
此選用函數將由串流建構函式在下一個 tick 中排程呼叫,延遲任何 _read() 和 _destroy() 呼叫,直到 callback 被呼叫。這對於在串流可以使用之前初始化狀態或非同步初始化資源非常有用。
const { Readable } = require('node:stream');
const fs = require('node:fs');
class ReadStream extends Readable {
constructor(filename) {
super();
this.filename = filename;
this.fd = null;
}
_construct(callback) {
fs.open(this.filename, (err, fd) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_read(n) {
const buf = Buffer.alloc(n);
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err);
} else {
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null);
}
});
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
readable._read(size)#
size<number>非同步讀取的位元組數
此函數絕不能由應用程式程式碼直接呼叫。它應該由子類別實作,且僅由內部的 Readable 類別方法呼叫。
所有 Readable 串流實作必須提供 readable._read() 方法的實作,以從基礎資源中提取資料。
當呼叫 readable._read() 時,如果資源有可用資料,實作應開始使用 this.push(dataChunk) 方法將該資料推入讀取佇列。一旦串流準備好接受更多資料,在每次呼叫 this.push(dataChunk) 之後,將再次呼叫 _read()。_read() 可能會繼續從資源讀取並推送資料,直到 readable.push() 回傳 false。只有當 _read() 在停止後再次被呼叫時,它才應恢復將額外資料推入佇列。
一旦呼叫了 readable._read() 方法,直到透過 readable.push() 方法推送更多資料之前,它都不會再次被呼叫。空資料(如空緩衝區和字串)不會導致呼叫 readable._read()。
size 參數是建議性的。對於「讀取」是回傳資料的單個操作的實作,可以使用 size 參數來確定要提取多少資料。其他實作可能會忽略此參數,並在資料可用時立即提供資料。在呼叫 stream.push(chunk) 之前,無需「等待」直到有 size 位元組可用。
readable._read() 方法以前綴底線命名,因為它是定義它的類別的內部方法,且絕不應由使用者程式直接呼叫。
readable._destroy(err, callback)#
err<Error>一個可能的錯誤。callback<Function>接受一個選用的錯誤參數的回呼函數。
_destroy() 方法由 readable.destroy() 呼叫。它可以由子類別覆寫,但*絕不能*被直接呼叫。
readable.push(chunk[, encoding])#
chunk<Buffer>|<TypedArray>|<DataView>|<string>|<null>|<any>推入讀取佇列的資料區塊。對於非物件模式運作的串流,chunk必須是<string>、<Buffer>、<TypedArray>或<DataView>。對於物件模式串流,chunk可以是任何 JavaScript 值。encoding<string>字串區塊的編碼。必須是有效的Buffer編碼,例如'utf8'或'ascii'。- 回傳:
<boolean>如果可以繼續推入額外的資料區塊,則為true;否則為false。
當 chunk 是 <Buffer>、<TypedArray>、<DataView> 或 <string> 時,資料 chunk 將被新增到內部佇列供串流使用者消耗。將 chunk 傳遞為 null 表示串流結束 (EOF),之後不能再寫入資料。
當 Readable 以暫停模式 (paused mode) 運作時,使用 readable.push() 新增的資料可以在發出 'readable' 事件時透過呼叫 readable.read() 方法讀出。
當 Readable 以流動模式 (flowing mode) 運作時,使用 readable.push() 新增的資料將透過發出 'data' 事件來交付。
readable.push() 方法設計得儘可能靈活。例如,當包裝一個提供某種暫停/恢復機制和資料回呼的低階來源時,可以使用自訂 Readable 實例來包裝該低階來源
// `_source` is an object with readStop() and readStart() methods,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over.
class SourceWrapper extends Readable {
constructor(options) {
super(options);
this._source = getLowLevelSourceObject();
// Every time there's data, push it into the internal buffer.
this._source.ondata = (chunk) => {
// If push() returns false, then stop reading from source.
if (!this.push(chunk))
this._source.readStop();
};
// When the source ends, push the EOF-signaling `null` chunk.
this._source.onend = () => {
this.push(null);
};
}
// _read() will be called when the stream wants to pull more data in.
// The advisory size argument is ignored in this case.
_read(size) {
this._source.readStart();
}
}
readable.push() 方法用於將內容推入內部緩衝區。它可以由 readable._read() 方法驅動。
對於非物件模式運作的串流,如果 readable.push() 的 chunk 參數為 undefined,它將被視為空字串或緩衝區。更多資訊請參閱 readable.push('')。
讀取時的錯誤#
處理 readable._read() 期間發生的錯誤,必須透過 readable.destroy(err) 方法傳播。在 readable._read() 中拋出 Error 或手動發出 'error' 事件會導致未定義的行為。
const { Readable } = require('node:stream');
const myReadable = new Readable({
read(size) {
const err = checkSomeErrorCondition();
if (err) {
this.destroy(err);
} else {
// Do some work.
}
},
});
一個計數串流範例#
以下是一個 Readable 串流的基本範例,它按升序發出從 1 到 1,000,000 的數字,然後結束。
const { Readable } = require('node:stream');
class Counter extends Readable {
constructor(opt) {
super(opt);
this._max = 1000000;
this._index = 1;
}
_read() {
const i = this._index++;
if (i > this._max)
this.push(null);
else {
const str = String(i);
const buf = Buffer.from(str, 'ascii');
this.push(buf);
}
}
}
實作雙工串流#
Duplex 串流是同時實作了 Readable 和 Writable 的串流,例如 TCP Socket 連接。
由於 JavaScript 不支援多重繼承,因此繼承 stream.Duplex 類別來實作 Duplex 串流(而不是同時繼承 stream.Readable *和* stream.Writable 類別)。
stream.Duplex 類別原型繼承自 stream.Readable,並寄生繼承 (parasitically) 自 stream.Writable,但由於在 stream.Writable 上覆寫了 Symbol.hasInstance,instanceof 對於這兩個基礎類別都能正常運作。
自定義的 Duplex 串流必須呼叫 new stream.Duplex([options]) 建構子,並實作 readable._read() 與 writable._write() 兩個方法。
new stream.Duplex(options)#
options<Object>同時傳遞給Writable與Readable建構子。另外還包含以下欄位:allowHalfOpen<boolean>若設為false,則當可讀端結束時,串流將自動結束其可寫端。預設值:true。readable<boolean>設定Duplex是否應具備可讀性。預設值:true。writable<boolean>設定Duplex是否應具備可寫性。預設值:true。readableObjectMode<boolean>為串流的可讀端設定objectMode。若objectMode為true則此項無效。預設值:false。writableObjectMode<boolean>為串流的可寫端設定objectMode。若objectMode為true則此項無效。預設值:false。readableHighWaterMark<number>為串流的可讀端設定highWaterMark。若已提供highWaterMark則此項無效。writableHighWaterMark<number>為串流的可寫端設定highWaterMark。若已提供highWaterMark則此項無效。
const { Duplex } = require('node:stream');
class MyDuplex extends Duplex {
constructor(options) {
super(options);
// ...
}
}
或者,當使用 ES6 之前的樣式建構函式時
const { Duplex } = require('node:stream');
const util = require('node:util');
function MyDuplex(options) {
if (!(this instanceof MyDuplex))
return new MyDuplex(options);
Duplex.call(this, options);
}
util.inherits(MyDuplex, Duplex);
或者,使用簡化建構方式
const { Duplex } = require('node:stream');
const myDuplex = new Duplex({
read(size) {
// ...
},
write(chunk, encoding, callback) {
// ...
},
});
使用 pipeline 時
const { Transform, pipeline } = require('node:stream');
const fs = require('node:fs');
pipeline(
fs.createReadStream('object.json')
.setEncoding('utf8'),
new Transform({
decodeStrings: false, // Accept string input rather than Buffers
construct(callback) {
this.data = '';
callback();
},
transform(chunk, encoding, callback) {
this.data += chunk;
callback();
},
flush(callback) {
try {
// Make sure is valid json.
JSON.parse(this.data);
this.push(this.data);
callback();
} catch (err) {
callback(err);
}
},
}),
fs.createWriteStream('valid-object.json'),
(err) => {
if (err) {
console.error('failed', err);
} else {
console.log('completed');
}
},
);
Duplex 串流範例#
下文展示了一個簡單的 Duplex 串流範例,它封裝了一個假設的底層來源物件,該物件可以寫入資料並從中讀取資料,儘管其使用的 API 與 Node.js 串流不相容。下文展示了一個簡單的 Duplex 串流範例,它透過 Writable 介面快取傳入的寫入資料,並透過 Readable 介面將其讀回。
const { Duplex } = require('node:stream');
const kSource = Symbol('source');
class MyDuplex extends Duplex {
constructor(source, options) {
super(options);
this[kSource] = source;
}
_write(chunk, encoding, callback) {
// The underlying source only deals with strings.
if (Buffer.isBuffer(chunk))
chunk = chunk.toString();
this[kSource].writeSomeData(chunk);
callback();
}
_read(size) {
this[kSource].fetchSomeData(size, (data, encoding) => {
this.push(Buffer.from(data, encoding));
});
}
}
Duplex 串流最重要的特點在於,儘管 Readable 端和 Writable 端共存於單個物件實例中,但它們是彼此獨立運作的。
物件模式的 Duplex 串流#
對於 Duplex 串流,可以分別使用 readableObjectMode 和 writableObjectMode 選項,單獨為 Readable 端或 Writable 端設定 objectMode。
例如在以下範例中,建立了一個新的 Transform 串流(這是一種 Duplex 串流),其 Writable 端為物件模式,接收 JavaScript 數字,並在 Readable 端轉換為十六進位字串。
const { Transform } = require('node:stream');
// All Transform streams are also Duplex Streams.
const myTransform = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
// Coerce the chunk to a number if necessary.
chunk |= 0;
// Transform the chunk into something else.
const data = chunk.toString(16);
// Push the data onto the readable queue.
callback(null, '0'.repeat(data.length % 2) + data);
},
});
myTransform.setEncoding('ascii');
myTransform.on('data', (chunk) => console.log(chunk));
myTransform.write(1);
// Prints: 01
myTransform.write(10);
// Prints: 0a
myTransform.write(100);
// Prints: 64
實作 Transform 串流#
Transform 串流是一種 Duplex 串流,其輸出是根據輸入以某種方式計算出來的。範例包括進行資料壓縮、加密或解密的 zlib 串流或 crypto 串流。
並沒有規定輸出的大小必須與輸入相同、資料塊(chunk)數量必須一致,或必須同時到達。例如,Hash 串流永遠只會有一個輸出的資料塊,且是在輸入結束時才提供。而 zlib 串流產生的輸出可能會比輸入小得多或大得多。
透過繼承 stream.Transform 類別來實作 Transform 串流。
stream.Transform 類別在原型上繼承自 stream.Duplex,並實作了其專屬版本的 writable._write() 與 readable._read() 方法。自定義的 Transform 實作必須實作 transform._transform() 方法,且可以選擇實作 transform._flush() 方法。
使用 Transform 串流時必須注意,如果 Readable 端的輸出未被消耗,寫入串流的資料可能會導致串流的 Writable 端進入暫停狀態。
new stream.Transform([options])#
options<Object>同時傳遞給Writable與Readable建構子。另外還包含以下欄位:transform<Function>stream._transform()方法的實作。flush<Function>stream._flush()方法的實作。
const { Transform } = require('node:stream');
class MyTransform extends Transform {
constructor(options) {
super(options);
// ...
}
}
或者,當使用 ES6 之前的樣式建構函式時
const { Transform } = require('node:stream');
const util = require('node:util');
function MyTransform(options) {
if (!(this instanceof MyTransform))
return new MyTransform(options);
Transform.call(this, options);
}
util.inherits(MyTransform, Transform);
或者,使用簡化建構方式
const { Transform } = require('node:stream');
const myTransform = new Transform({
transform(chunk, encoding, callback) {
// ...
},
});
事件:'end'#
'end' 事件來自 stream.Readable 類別。'end' 事件在所有資料輸出完畢後觸發,這發生在 transform._flush() 中的回呼函式被呼叫之後。如果發生錯誤,則不應觸發 'end' 事件。
事件:'finish'#
'finish' 事件來自 stream.Writable 類別。'finish' 事件在呼叫 stream.end() 且所有資料塊都已由 stream._transform() 處理完畢後觸發。如果發生錯誤,則不應觸發 'finish' 事件。
transform._flush(callback)#
callback<Function>當剩餘資料已清空(flushed)時呼叫的回呼函式(可選擇性地傳入錯誤參數和資料)。
此函數絕不能由應用程式程式碼直接呼叫。它應該由子類別實作,且僅由內部的 Readable 類別方法呼叫。
在某些情況下,轉換操作可能需要在串流末端發送額外的資料。例如,zlib 壓縮串流會儲存一些內部狀態,用於優化壓縮輸出。然而,當串流結束時,這些額外的資料需要被清空(flush),以使壓縮資料完整。
自定義的 Transform 實作可以實作 transform._flush() 方法。當沒有更多寫入資料需要消耗,但在觸發代表 Readable 串流結束的 'end' 事件之前,會呼叫此方法。
在 transform._flush() 的實作中,可以視需要呼叫 transform.push() 零次或多次。當清空操作完成時,必須呼叫 callback 函式。
transform._flush() 方法以底線開頭,是因為它是定義類別的內部方法,不應由使用者程式直接呼叫。
transform._transform(chunk, encoding, callback)#
chunk<Buffer>|<string>|<any>要轉換的Buffer,由傳遞給stream.write()的string轉換而來。如果串流的decodeStrings選項為false,或者串流正以物件模式運作,則資料塊將不會被轉換,且會是傳遞給stream.write()的原始內容。encoding<string>如果資料塊是字串,則這是編碼類型。如果資料塊是 Buffer,則這是特殊值'buffer',在這種情況下請忽略它。callback<Function>在處理完提供的chunk後要呼叫的回呼函式(可選擇性地傳入錯誤參數和資料)。
此函數絕不能由應用程式程式碼直接呼叫。它應該由子類別實作,且僅由內部的 Readable 類別方法呼叫。
所有 Transform 串流實作都必須提供 _transform() 方法來接收輸入並產生輸出。transform._transform() 的實作處理正在寫入的位元組、計算輸出,然後使用 transform.push() 方法將該輸出傳遞給可讀部分。
根據單個輸入資料塊產生的輸出量,transform.push() 方法可能會被呼叫零次或多次。
任何給定的輸入資料塊都有可能不產生任何輸出。
只有在當前資料塊完全消耗後,才必須呼叫 callback 函式。如果處理輸入時發生錯誤,傳遞給 callback 的第一個參數必須是 Error 物件,否則為 null。如果傳遞了第二個參數給 callback,它將被轉發到 transform.push() 方法,但僅在第一個參數為 falsey 值時才會執行。換句話說,以下兩者是等價的:
transform.prototype._transform = function(data, encoding, callback) {
this.push(data);
callback();
};
transform.prototype._transform = function(data, encoding, callback) {
callback(null, data);
};
transform._transform() 方法以底線開頭,是因為它是定義類別的內部方法,不應由使用者程式直接呼叫。
transform._transform() 永遠不會平行呼叫;串流實作了佇列機制,為了接收下一個資料塊,必須同步或非同步地呼叫 callback。
類別:stream.PassThrough#
stream.PassThrough 類別是 Transform 串流的簡易實作,僅僅是將輸入的位元組傳遞到輸出。它的目的主要是用於範例和測試,但在某些情況下,stream.PassThrough 作為建構新型串流的基礎元件也是很有用的。
補充說明#
串流對非同步產生器與非同步迭代器的相容性#
隨著 JavaScript 支援非同步產生器(async generators)與迭代器,非同步產生器目前實際上已成為語言層級的一等串流結構。
下文提供了在非同步產生器與非同步迭代器中使用 Node.js 串流的一些常見互操作案例。
使用非同步迭代器消耗可讀串流#
(async function() {
for await (const chunk of readable) {
console.log(chunk);
}
})();
非同步迭代器會在串流上註冊一個永久的錯誤處理常式,以防止任何未處理的銷毀後錯誤(post-destroy errors)。
使用非同步產生器建立可讀串流#
可以使用 Readable.from() 實用方法從非同步產生器建立 Node.js 可讀串流:
const { Readable } = require('node:stream');
const ac = new AbortController();
const signal = ac.signal;
async function * generate() {
yield 'a';
await someLongRunningFn({ signal });
yield 'b';
yield 'c';
}
const readable = Readable.from(generate());
readable.on('close', () => {
ac.abort();
});
readable.on('data', (chunk) => {
console.log(chunk);
});
從非同步迭代器管線傳輸至可寫串流#
當從非同步迭代器寫入可寫串流時,請確保正確處理背壓(backpressure)與錯誤。stream.pipeline() 抽象化了背壓與背壓相關錯誤的處理過程。
const fs = require('node:fs');
const { pipeline } = require('node:stream');
const { pipeline: pipelinePromise } = require('node:stream/promises');
const writable = fs.createWriteStream('./file');
const ac = new AbortController();
const signal = ac.signal;
const iterator = createIterator({ signal });
// Callback Pattern
pipeline(iterator, writable, (err, value) => {
if (err) {
console.error(err);
} else {
console.log(value, 'value returned');
}
}).on('close', () => {
ac.abort();
});
// Promise Pattern
pipelinePromise(iterator, writable)
.then((value) => {
console.log(value, 'value returned');
})
.catch((err) => {
console.error(err);
ac.abort();
});
與舊版 Node.js 版本的相容性#
在 Node.js 0.10 之前,Readable 串流介面較為簡單,但功能也較弱且較不實用。
- 與其等待呼叫
stream.read()方法,'data'事件會立即開始觸發。需要進行一定工作量來決定如何處理資料的應用程式,必須將讀取的資料儲存在緩衝區中,以免資料遺失。 stream.pause()方法是建議性質的,而非保證性質。這意味著即使當串流處於暫停狀態時,仍必須準備好接收'data'事件。
在 Node.js 0.10 中加入了 Readable 類別。為了與舊版 Node.js 程式保持向後相容,當加入 'data' 事件處理常式,或呼叫 stream.resume() 方法時,Readable 串流會切換到「流動模式」(flowing mode)。其效果是,即使不使用新的 stream.read() 方法與 'readable' 事件,也不再需要擔心遺失 'data' 資料塊。
雖然大多數應用程式將繼續正常運作,但在以下條件下會引入一種邊緣情況:
- 沒有加入
'data'事件監聽器。 - 從未呼叫
stream.resume()方法。 - 串流未透過管線(pipe)傳輸至任何可寫目的地。
例如,考慮以下程式碼:
// WARNING! BROKEN!
net.createServer((socket) => {
// We add an 'end' listener, but never consume the data.
socket.on('end', () => {
// It will never get here.
socket.end('The message was received but was not processed.\n');
});
}).listen(1337);
在 Node.js 0.10 之前,傳入的訊息資料會被直接丟棄。然而,在 Node.js 0.10 及以後版本中,該 socket 將永遠保持暫停狀態。
這種情況下的解決方法是呼叫 stream.resume() 方法來開始資料流:
// Workaround.
net.createServer((socket) => {
socket.on('end', () => {
socket.end('The message was received but was not processed.\n');
});
// Start the flow of data, discarding it.
socket.resume();
}).listen(1337);
除了新的 Readable 串流會切換到流動模式外,0.10 版之前的舊式串流可以使用 readable.wrap() 方法包裝在 Readable 類別中。
readable.read(0)#
在某些情況下,需要觸發底層可讀串流機制的重新整理,而不實際消耗任何資料。在這種情況下,可以呼叫 readable.read(0),它始終會返回 null。
如果內部讀取緩衝區低於 highWaterMark,且串流當前未在讀取中,則呼叫 stream.read(0) 將觸發底層的 stream._read() 呼叫。
雖然大多數應用程式幾乎不需要這樣做,但在 Node.js 內部存在這種情況,特別是在 Readable 串流類別的內部實作中。
readable.push('')#
不建議使用 readable.push('')。
向非物件模式的串流推入(push)零位元組的 <string>、<Buffer>、<TypedArray> 或 <DataView> 會產生一個有趣的副作用。因為它確實是對 readable.push() 的呼叫,該呼叫會結束讀取過程。然而,由於參數是空字串,沒有資料被添加到可讀緩衝區,因此使用者沒有東西可以消耗。
呼叫 readable.setEncoding() 後的 highWaterMark 差異#
使用 readable.setEncoding() 會改變 highWaterMark 在非物件模式下的運作方式。
通常情況下,當前緩衝區的大小是按位元組(bytes)計算並與 highWaterMark 比較的。但在呼叫 setEncoding() 後,比較函數將開始以字元(characters)為單位測量緩衝區的大小。
在 latin1 或 ascii 的常見情況下這不是問題。但在處理可能包含多位元組字元的字串時,建議留意此行為。