Node.js v25.0.0 文件
- Node.js v25.0.0
-
目錄
- 流
- 本文件的組織結構
- 流的型別
- 面向流消費者的 API
- 可寫流
- 類:
stream.Writable- 事件:
'close' - 事件:
'drain' - 事件:
'error' - 事件:
'finish' - 事件:
'pipe' - 事件:
'unpipe' writable.cork()writable.destroy([error])writable.closedwritable.destroyedwritable.end([chunk[, encoding]][, callback])writable.setDefaultEncoding(encoding)writable.uncork()writable.writablewritable.writableAbortedwritable.writableEndedwritable.writableCorkedwritable.erroredwritable.writableFinishedwritable.writableHighWaterMarkwritable.writableLengthwritable.writableNeedDrainwritable.writableObjectModewritable[Symbol.asyncDispose]()writable.write(chunk[, encoding][, callback])
- 事件:
- 類:
- 可讀流
- 兩種讀取模式
- 三種狀態
- 選擇一種 API 風格
- 類:
stream.Readable- 事件:
'close' - 事件:
'data' - 事件:
'end' - 事件:
'error' - 事件:
'pause' - 事件:
'readable' - 事件:
'resume' readable.destroy([error])readable.closedreadable.destroyedreadable.isPaused()readable.pause()readable.pipe(destination[, options])readable.read([size])readable.readablereadable.readableAbortedreadable.readableDidReadreadable.readableEncodingreadable.readableEndedreadable.erroredreadable.readableFlowingreadable.readableHighWaterMarkreadable.readableLengthreadable.readableObjectModereadable.resume()readable.setEncoding(encoding)readable.unpipe([destination])readable.unshift(chunk[, encoding])readable.wrap(stream)readable[Symbol.asyncIterator]()readable[Symbol.asyncDispose]()readable.compose(stream[, options])readable.iterator([options])readable.map(fn[, options])readable.filter(fn[, options])readable.forEach(fn[, options])readable.toArray([options])readable.some(fn[, options])readable.find(fn[, options])readable.every(fn[, options])readable.flatMap(fn[, options])readable.drop(limit[, options])readable.take(limit[, options])readable.reduce(fn[, initial[, options]])
- 事件:
- 雙工流和轉換流
stream.finished(stream[, options], callback)stream.pipeline(source[, ...transforms], destination, callback)stream.pipeline(streams, callback)stream.compose(...streams)stream.isErrored(stream)stream.isReadable(stream)stream.isWritable(stream)stream.Readable.from(iterable[, options])stream.Readable.fromWeb(readableStream[, options])stream.Readable.isDisturbed(stream)stream.Readable.toWeb(streamReadable[, options])stream.Writable.fromWeb(writableStream[, options])stream.Writable.toWeb(streamWritable)stream.Duplex.from(src)stream.Duplex.fromWeb(pair[, options])stream.Duplex.toWeb(streamDuplex)stream.addAbortSignal(signal, stream)stream.getDefaultHighWaterMark(objectMode)stream.setDefaultHighWaterMark(objectMode, value)
- 可寫流
- 面向流實現者的 API
- 補充說明
- 流
-
索引
- 斷言測試
- 非同步上下文跟蹤
- 非同步鉤子
- 緩衝區
- C++ 外掛
- 使用 Node-API 的 C/C++ 外掛
- C++ 嵌入器 API
- 子程序
- 叢集
- 命令列選項
- 控制檯
- 加密
- 偵錯程式
- 已棄用的 API
- 診斷通道
- DNS
- 域
- 環境變數
- 錯誤
- 事件
- 檔案系統
- 全域性物件
- HTTP
- HTTP/2
- HTTPS
- 檢查器
- 國際化
- 模組:CommonJS 模組
- 模組:ECMAScript 模組
- 模組:
node:moduleAPI - 模組:包
- 模組:TypeScript
- 網路
- 作業系統
- 路徑
- 效能鉤子
- 許可權
- 程序
- Punycode
- 查詢字串
- 逐行讀取
- REPL
- 報告
- 單一可執行檔案應用
- SQLite
- 流
- 字串解碼器
- 測試執行器
- 定時器
- TLS/SSL
- 跟蹤事件
- TTY
- UDP/資料報
- URL
- 實用工具
- V8
- 虛擬機器
- WASI
- Web Crypto API
- Web Streams API
- 工作執行緒
- Zlib
- 其他版本
- 選項
流[原始碼]#
原始碼: lib/stream.js
流是在 Node.js 中處理流式資料的抽象介面。node:stream 模組提供了一個用於實現流介面的 API。
Node.js 提供了許多流物件。例如,一個對 HTTP 伺服器的請求和 process.stdout 都是流的例項。
流可以是可讀的、可寫的,或兩者兼備。所有流都是 EventEmitter 的例項。
要訪問 node:stream 模組
const stream = require('node:stream');
node:stream 模組對於建立新型的流例項非常有用。通常消費流並不需要使用 node:stream 模組。
本文件的組織結構#
本文件包含兩個主要部分和一個用於說明的第三部分。第一部分解釋瞭如何在應用程式中使用現有的流。第二部分解釋瞭如何建立新型的流。
流的型別#
Node.js 中有四種基本的流型別
可寫流(Writable):可以向其寫入資料的流(例如,fs.createWriteStream())。可讀流(Readable):可以從中讀取資料的流(例如,fs.createReadStream())。雙工流(Duplex):既是可讀流又是可寫流的流(例如,net.Socket)。轉換流(Transform):在寫入和讀取資料時可以修改或轉換資料的雙工流(例如,zlib.createDeflate())。
此外,該模組還包括實用函式 stream.duplexPair()、stream.pipeline()、stream.finished()、stream.Readable.from() 和 stream.addAbortSignal()。
流 Promises API#
stream/promises API 為流提供了一組替代的非同步實用函式,這些函式返回 Promise 物件而不是使用回撥。該 API 可透過 require('node:stream/promises') 或 require('node:stream').promises 訪問。
stream.pipeline(streams[, options])#
stream.pipeline(source[, ...transforms], destination[, options])#
streams<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]>source<Stream> | <Iterable> | <AsyncIterable> | <Function>- 返回: <Promise> | <AsyncIterable>
...transforms<Stream> | <Function>source<AsyncIterable>- 返回: <Promise> | <AsyncIterable>
destination<Stream> | <Function>source<AsyncIterable>- 返回: <Promise> | <AsyncIterable>
options<Object> 管道選項signal<AbortSignal>end<boolean> 當源流結束時結束目標流。轉換流總是會被結束,即使該值為false。預設值:true。
- 返回:<Promise> 當管道完成時履行。
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
async function run() {
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
要使用 AbortSignal,請將其作為最後一個引數在一個選項物件中傳遞。當訊號中止時,destroy 將在底層管道上被呼叫,並帶有一個 AbortError。
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
async function run() {
const ac = new AbortController();
const signal = ac.signal;
setImmediate(() => ac.abort());
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
{ signal },
);
}
run().catch(console.error); // AbortErrorimport { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}
pipeline API 也支援非同步生成器
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
請記住處理傳入非同步生成器的 signal 引數。特別是在非同步生成器是管道的源頭(即第一個引數)的情況下,否則管道將永遠不會完成。
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
async function run() {
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
pipeline API 提供了回撥版本
stream.finished(stream[, options])#
stream<Stream> | <ReadableStream> | <WritableStream> 一個可讀和/或可寫的流/網路流。options<Object>error<boolean> | <undefined>readable<boolean> | <undefined>writable<boolean> | <undefined>signal<AbortSignal> | <undefined>cleanup<boolean> | <undefined> 如果為true,在 promise 被履行前移除此函式註冊的監聽器。預設值:false。
- 返回:<Promise> 當流不再可讀或可寫時履行。
const { finished } = require('node:stream/promises');
const fs = require('node:fs');
const rs = fs.createReadStream('archive.tar');
async function run() {
await finished(rs);
console.log('Stream is done reading.');
}
run().catch(console.error);
rs.resume(); // Drain the stream.import { finished } from 'node:stream/promises';
import { createReadStream } from 'node:fs';
const rs = createReadStream('archive.tar');
async function run() {
await finished(rs);
console.log('Stream is done reading.');
}
run().catch(console.error);
rs.resume(); // Drain the stream.
finished API 也提供了一個回撥版本。
stream.finished() 在返回的 promise 被解決或拒絕後,會留下懸空的事件監聽器(特別是 'error'、'end'、'finish' 和 'close')。這樣做的原因是為了防止因不正確的流實現而導致的意外 'error' 事件引起意外崩潰。如果不希望出現這種行為,應將 options.cleanup 設定為 true。
await finished(rs, { cleanup: true });
物件模式#
由 Node.js API 建立的所有流都只操作字串、<Buffer>、<TypedArray> 和 <DataView> 物件。
字串和緩衝區是流最常用的型別。TypedArray和DataView讓你能處理像Int32Array或Uint8Array這樣的二進位制資料型別。當你向流中寫入一個 TypedArray 或 DataView 時,Node.js 會處理其原始位元組。
然而,流的實現也可以處理其他型別的 JavaScript 值(null 除外,它在流中有特殊用途)。這樣的流被認為是在“物件模式”下執行。
流例項透過在建立流時使用 objectMode 選項切換到物件模式。嘗試將現有流切換到物件模式是不安全的。
緩衝#
潛在緩衝的資料量取決於傳入流建構函式的 highWaterMark 選項。對於普通流,highWaterMark 選項指定位元組總數。對於在物件模式下執行的流,highWaterMark 指定物件的總數。對於操作(但不解碼)字串的流,highWaterMark 指定 UTF-16 程式碼單元的總數。
當實現呼叫 stream.push(chunk) 時,資料被緩衝在可讀流中。如果流的消費者不呼叫 stream.read(),資料將一直存放在內部佇列中,直到被消費。
一旦內部讀取緩衝區的總大小達到 highWaterMark 指定的閾值,流將暫時停止從底層資源讀取資料,直到當前緩衝的資料可以被消費(也就是說,流將停止呼叫用於填充讀取緩衝區的內部 readable._read() 方法)。
當重複呼叫 writable.write(chunk) 方法時,資料被緩衝在可寫流中。當內部寫入緩衝區的總大小低於 highWaterMark 設定的閾值時,對 writable.write() 的呼叫將返回 true。一旦內部緩衝區的大小達到或超過 highWaterMark,將返回 false。
stream API 的一個關鍵目標,特別是 stream.pipe() 方法,是將資料緩衝限制在可接受的水平,這樣不同速度的源和目的地就不會耗盡可用記憶體。
highWaterMark 選項是一個閾值,而不是一個限制:它決定了流在停止請求更多資料之前緩衝的資料量。它通常不強制執行嚴格的記憶體限制。特定的流實現可能會選擇強制執行更嚴格的限制,但這樣做是可選的。
因為雙工流和轉換流既是可讀的又是可寫的,所以它們各自維護著兩個獨立的用於讀和寫的內部緩衝區,允許每一側獨立於另一側操作,同時保持適當且高效的資料流。例如,net.Socket 例項是雙工流,其可讀側允許消費從套接字接收的資料,其可寫側允許向套接字寫入資料。因為寫入套接字的資料速率可能比接收資料的速率快或慢,所以每一側都應該獨立於另一側進行操作(和緩衝)。
內部緩衝的機制是內部實現細節,可能隨時更改。然而,對於某些高階實現,可以使用 writable.writableBuffer 或 readable.readableBuffer 檢索內部緩衝區。不鼓勵使用這些未文件化的屬性。
面向流消費者的 API#
幾乎所有的 Node.js 應用程式,無論多麼簡單,都在某種程度上使用流。以下是在一個實現 HTTP 伺服器的 Node.js 應用程式中使用流的示例:
const http = require('node:http');
const server = http.createServer((req, res) => {
// `req` is an http.IncomingMessage, which is a readable stream.
// `res` is an http.ServerResponse, which is a writable stream.
let body = '';
// Get the data as utf8 strings.
// If an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');
// Readable streams emit 'data' events once a listener is added.
req.on('data', (chunk) => {
body += chunk;
});
// The 'end' event indicates that the entire body has been received.
req.on('end', () => {
try {
const data = JSON.parse(body);
// Write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});
server.listen(1337);
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON
可寫流(如示例中的 res)暴露了諸如 write() 和 end() 等方法,用於向流中寫入資料。
可讀流使用 EventEmitter API 來通知應用程式程式碼何時有資料可從流中讀取。可用的資料可以透過多種方式從流中讀取。
可寫流和可讀流都以各種方式使用 EventEmitter API 來傳達流的當前狀態。
向流寫入資料或從流消費資料的應用程式不需要直接實現流介面,並且通常沒有理由呼叫 require('node:stream')。
希望實現新型流的開發者應參考面向流實現者的 API部分。
可寫流#
可寫流是對寫入資料的目的地的抽象。
可寫流的例子包括:
所有可寫流都實現了由 stream.Writable 類定義的介面。
雖然可寫流的具體例項可能在各方面有所不同,但所有可寫流都遵循下例所示的基本使用模式:
const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data');
類:stream.Writable#
事件:'close'#
當流及其任何底層資源(例如檔案描述符)被關閉時,會發出 'close' 事件。該事件表明不會再發出更多事件,也不會發生進一步的計算。
如果一個可寫流是使用 emitClose 選項建立的,它將總是發出 'close' 事件。
事件:'drain'#
如果對 stream.write(chunk) 的呼叫返回 false,則當適合恢復向流中寫入資料時,會發出 'drain' 事件。
// Write the data to the supplied writable stream one million times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000;
write();
function write() {
let ok = true;
do {
i--;
if (i === 0) {
// Last time!
writer.write(data, encoding, callback);
} else {
// See if we should continue, or wait.
// Don't pass the callback, because we're not done yet.
ok = writer.write(data, encoding);
}
} while (i > 0 && ok);
if (i > 0) {
// Had to stop early!
// Write some more once it drains.
writer.once('drain', write);
}
}
}
事件:'error'#
- 型別: <Error>
如果在寫入或管道傳輸資料時發生錯誤,則會發出 'error' 事件。監聽器回撥在被呼叫時會傳入一個 Error 引數。
除非在建立流時將 autoDestroy 選項設定為 false,否則在發出 'error' 事件時流會被關閉。
在 'error' 事件之後,除了 'close' 之外,不應該再發出其他事件(包括 'error' 事件)。
事件:'finish'#
在呼叫 stream.end() 方法後,並且所有資料都已重新整理到底層系統時,會發出 'finish' 事件。
const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`);
}
writer.on('finish', () => {
console.log('All writes are now complete.');
});
writer.end('This is the end\n');
事件:'pipe'#
src<stream.Readable> 正在管道傳輸到此可寫流的源流
當在可讀流上呼叫 stream.pipe() 方法,將此可寫流新增到其目標集時,會發出 'pipe' 事件。
const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
console.log('Something is piping into the writer.');
assert.equal(src, reader);
});
reader.pipe(writer);
事件:'unpipe'#
src<stream.Readable> 從此可寫流取消管道傳輸的源流
當在可讀流上呼叫 stream.unpipe() 方法,將此可寫流從其目標集中移除時,會發出 'unpipe' 事件。
當一個可讀流管道傳輸到此可寫流時,如果該可寫流發出錯誤,也會發出此事件。
const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
console.log('Something has stopped piping into the writer.');
assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);
writable.cork()#
writable.cork() 方法強制所有寫入的資料都緩衝在記憶體中。當呼叫 stream.uncork() 或 stream.end() 方法時,緩衝的資料將被重新整理。
writable.cork() 的主要目的是為了適應一種情況,即在短時間內向流中寫入多個小資料塊。writable.cork() 不會立即將它們轉發到底層目標,而是緩衝所有資料塊,直到呼叫 writable.uncork(),屆時如果存在 writable._writev(),它會將所有資料塊傳遞給該方法。這可以防止因等待處理第一個小資料塊而導致資料被緩衝的“隊頭阻塞”情況。但是,在沒有實現 writable._writev() 的情況下使用 writable.cork() 可能會對吞吐量產生不利影響。
另請參閱: writable.uncork(), writable._writev().
writable.destroy([error])#
銷燬流。可選擇性地發出一個 'error' 事件,併發出一個 'close' 事件(除非 emitClose 設定為 false)。此呼叫之後,可寫流已結束,後續對 write() 或 end() 的呼叫將導致 ERR_STREAM_DESTROYED 錯誤。這是一種破壞性且立即銷燬流的方式。之前對 write() 的呼叫可能尚未排空,並可能觸發 ERR_STREAM_DESTROYED 錯誤。如果資料應在關閉前重新整理,請使用 end() 代替 destroy,或等待 'drain' 事件後再銷燬流。
const { Writable } = require('node:stream');
const myStream = new Writable();
const fooErr = new Error('foo error');
myStream.destroy(fooErr);
myStream.on('error', (fooErr) => console.error(fooErr.message)); // foo error
const { Writable } = require('node:stream');
const myStream = new Writable();
myStream.destroy();
myStream.on('error', function wontHappen() {});
const { Writable } = require('node:stream');
const myStream = new Writable();
myStream.destroy();
myStream.write('foo', (error) => console.error(error.code));
// ERR_STREAM_DESTROYED
一旦呼叫了 destroy(),任何進一步的呼叫都將是空操作,並且除了來自 _destroy() 的錯誤外,不會再有其他錯誤作為 'error' 發出。
實現者不應覆蓋此方法,而應實現 writable._destroy()。
writable.destroyed#
- 型別:<boolean>
在 writable.destroy() 被呼叫後為 true。
const { Writable } = require('node:stream');
const myStream = new Writable();
console.log(myStream.destroyed); // false
myStream.destroy();
console.log(myStream.destroyed); // true
writable.end([chunk[, encoding]][, callback])#
chunk<string> | <Buffer> | <TypedArray> | <DataView> | <any> 可選的要寫入的資料。對於非物件模式的流,chunk必須是 <string>、<Buffer>、<TypedArray> 或 <DataView>。對於物件模式的流,chunk可以是除null之外的任何 JavaScript 值。encoding<string> 如果chunk是字串,則為編碼callback<Function> 當流完成時的回撥函式。- 返回:<this>
呼叫 writable.end() 方法表示不會再有資料寫入到可寫流。可選的 chunk 和 encoding 引數允許在關閉流之前立即寫入最後一塊額外的資料。
在呼叫 stream.end() 之後呼叫 stream.write() 方法將引發錯誤。
// Write 'hello, ' and then end with 'world!'.
const fs = require('node:fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// Writing more now is not allowed!
writable.uncork()#
writable.uncork() 方法會重新整理自 stream.cork() 被呼叫以來緩衝的所有資料。
當使用 writable.cork() 和 writable.uncork() 來管理對流的寫入緩衝時,使用 process.nextTick() 來推遲對 writable.uncork() 的呼叫。這樣做可以將在給定的 Node.js 事件迴圈階段內發生的所有 writable.write() 呼叫進行批處理。
stream.cork();
stream.write('some ');
stream.write('data ');
process.nextTick(() => stream.uncork());
如果在一個流上多次呼叫 writable.cork() 方法,則必須呼叫相同次數的 writable.uncork() 才能重新整理緩衝的資料。
stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
stream.uncork();
// The data will not be flushed until uncork() is called a second time.
stream.uncork();
});
另請參閱: writable.cork().
writable.writableEnded#
- 型別:<boolean>
在呼叫 writable.end() 後為 true。此屬性不表示資料是否已重新整理,請改用 writable.writableFinished。
writable[Symbol.asyncDispose]()#
使用一個 AbortError 呼叫 writable.destroy(),並返回一個在流完成時履行的 promise。
writable.write(chunk[, encoding][, callback])#
chunk<string> | <Buffer> | <TypedArray> | <DataView> | <any> 可選的要寫入的資料。對於非物件模式的流,chunk必須是 <string>、<Buffer>、<TypedArray> 或 <DataView>。對於物件模式的流,chunk可以是除null之外的任何 JavaScript 值。encoding<string> | <null> 編碼,如果chunk是字串。預設值:'utf8'callback<Function> 當此資料塊被重新整理時的回撥函式。- 返回: <boolean> 如果流希望呼叫程式碼在繼續寫入額外資料之前等待
'drain'事件發出,則為false;否則為true。
writable.write() 方法向流中寫入一些資料,並在資料被完全處理後呼叫提供的 callback。如果發生錯誤,callback 將被呼叫,並將錯誤作為其第一個引數。callback 是非同步呼叫的,並且在發出 'error' 事件之前呼叫。
如果接納 chunk 後,內部緩衝區小於建立流時配置的 highWaterMark,則返回值為 true。如果返回 false,則應停止向流中寫入資料的進一步嘗試,直到發出 'drain' 事件。
當流未排空時,對 write() 的呼叫將緩衝 chunk 並返回 false。一旦所有當前緩衝的塊都被排空(即被作業系統接受以進行傳遞),將會觸發 'drain' 事件。一旦 write() 返回 false,在 'drain' 事件觸發前,請勿寫入更多的資料塊。雖然在流未排空時呼叫 write() 是允許的,但 Node.js 會緩衝所有寫入的塊,直到達到最大記憶體使用量,此時它將無條件地中止。即使在中止之前,高記憶體使用也會導致垃圾回收器效能下降和高 RSS(常駐記憶體大小,即使記憶體不再需要,通常也不會釋放回系統)。由於如果遠端對等方不讀取資料,TCP 套接字可能永遠不會排空,因此向一個未排空的套接字寫入資料可能導致可被遠端利用的漏洞。
在流未排空時寫入資料對於 Transform 流尤其成問題,因為 Transform 流預設是暫停的,直到它們被管道連線(pipe)或添加了 'data' 或 'readable' 事件處理程式。
如果要寫入的資料可以按需生成或獲取,建議將邏輯封裝到 Readable 流中並使用 stream.pipe()。但是,如果傾向於呼叫 write(),則可以透過使用 'drain' 事件來遵循背壓機制並避免記憶體問題。
function write(data, cb) {
if (!stream.write(data)) {
stream.once('drain', cb);
} else {
process.nextTick(cb);
}
}
// Wait for cb to be called before doing any other write.
write('hello', () => {
console.log('Write completed, do more writes now.');
});
處於物件模式(object mode)的 Writable 流將始終忽略 encoding 引數。
可讀流(Readable streams)#
可讀流是對資料消費來源的一種抽象。
Readable 流的示例包括:
所有 Readable 流都實現了 stream.Readable 類定義的介面。
兩種讀取模式#
Readable 流實際上以兩種模式之一執行:流動模式(flowing)和暫停模式(paused)。這些模式與物件模式是分開的。一個 Readable 流可以處於物件模式或非物件模式,無論它是處於流動模式還是暫停模式。
-
在流動模式下,資料會自動從底層系統讀取,並透過
EventEmitter介面的事件儘快提供給應用程式。 -
在暫停模式下,必須顯式呼叫
stream.read()方法來從流中讀取資料塊。
所有 Readable 流都以暫停模式開始,但可以透過以下方式之一切換到流動模式:
- 新增一個
'data'事件處理程式。 - 呼叫
stream.resume()方法。 - 呼叫
stream.pipe()方法將資料傳送到Writable流。
Readable 流可以透過以下方式之一切換回暫停模式:
- 如果沒有管道目標(pipe destinations),則呼叫
stream.pause()方法。 - 如果有管道目標,則移除所有管道目標。可以透過呼叫
stream.unpipe()方法來移除多個管道目標。
需要記住的重要概念是,Readable 流在提供消費或忽略其資料的機制之前,不會生成資料。如果消費機制被停用或移除,Readable 流將嘗試停止生成資料。
出於向後相容的原因,移除 'data' 事件處理程式不會自動暫停流。此外,如果存在管道目標,呼叫 stream.pause() 並不保證流在這些目標排空並請求更多資料後會保持暫停狀態。
如果一個 Readable 流被切換到流動模式,並且沒有可用的消費者來處理資料,那麼這些資料將會丟失。例如,當呼叫 readable.resume() 方法而沒有為 'data' 事件附加監聽器時,或者當從流中移除 'data' 事件處理程式時,就會發生這種情況。
新增 'readable' 事件處理程式會自動使流停止流動,並且資料必須透過 readable.read() 來消費。如果移除了 'readable' 事件處理程式,並且存在 'data' 事件處理程式,那麼流將再次開始流動。
三種狀態#
Readable 流的“兩種模式”操作是對 Readable 流實現內部發生的更復雜的內部狀態管理的簡化抽象。
具體來說,在任何給定時間點,每個 Readable 流都處於以下三種可能的狀態之一:
readable.readableFlowing === nullreadable.readableFlowing === falsereadable.readableFlowing === true
當 readable.readableFlowing 為 null 時,沒有提供消費流資料的機制。因此,流不會生成資料。在這種狀態下,為 'data' 事件附加監聽器、呼叫 readable.pipe() 方法或呼叫 readable.resume() 方法都會將 readable.readableFlowing 切換為 true,導致 Readable 流在資料生成時開始主動觸發事件。
呼叫 readable.pause()、readable.unpipe() 或接收到背壓(backpressure)將導致 readable.readableFlowing 被設定為 false,暫時停止事件的流動,但不會停止資料的生成。在此狀態下,為 'data' 事件附加監聽器不會將 readable.readableFlowing 切換為 true。
const { PassThrough, Writable } = require('node:stream');
const pass = new PassThrough();
const writable = new Writable();
pass.pipe(writable);
pass.unpipe(writable);
// readableFlowing is now false.
pass.on('data', (chunk) => { console.log(chunk.toString()); });
// readableFlowing is still false.
pass.write('ok'); // Will not emit 'data'.
pass.resume(); // Must be called to make stream emit 'data'.
// readableFlowing is now true.
當 readable.readableFlowing 為 false 時,資料可能會在流的內部緩衝區中累積。
選擇一種 API 風格#
Readable 流的 API 經過多個 Node.js 版本的演變,提供了多種消費流資料的方法。總的來說,開發者應該選擇一種消費資料的方法,並且永遠不應使用多種方法來消費同一個流的資料。具體來說,混合使用 on('data')、on('readable')、pipe() 或非同步迭代器可能導致不直觀的行為。
類:stream.Readable#
事件:'close'#
當流及其任何底層資源(例如檔案描述符)被關閉時,會發出 'close' 事件。該事件表明不會再發出更多事件,也不會發生進一步的計算。
如果 Readable 流建立時帶有 emitClose 選項,它將總是觸發 'close' 事件。
事件:'data'#
chunk<Buffer> | <string> | <any> 資料塊。對於非物件模式的流,chunk將是字串或Buffer。對於物件模式的流,chunk可以是除null之外的任何 JavaScript 值。
當流將一塊資料的所有權移交給消費者時,會觸發 'data' 事件。這可能發生在透過呼叫 readable.pipe()、readable.resume() 或為 'data' 事件附加監聽器回撥函式將流切換到流動模式時。當呼叫 readable.read() 方法且有可用資料塊返回時,也會觸發 'data' 事件。
為一個未被顯式暫停的流附加 'data' 事件監聽器會將其切換到流動模式。資料將在可用時立即傳遞。
如果使用 readable.setEncoding() 方法為流指定了預設編碼,則監聽器回撥函式將接收到字串形式的資料塊;否則,資料將作為 Buffer 傳遞。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
事件:'end'#
當流中沒有更多資料可供消費時,會觸發 'end' 事件。
除非資料被完全消費,否則 'end' 事件不會被觸發。這可以透過將流切換到流動模式,或者重複呼叫 stream.read() 直到所有資料都被消費完來實現。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
console.log('There will be no more data.');
});
事件:'error'#
- 型別: <Error>
'error' 事件可以由 Readable 實現隨時觸發。通常,這可能發生在底層流因內部故障而無法生成資料,或者當流實現嘗試推送一個無效的資料塊時。
監聽器回撥函式將接收到一個單獨的 Error 物件。
事件:'pause'#
當呼叫 stream.pause() 並且 readableFlowing 不為 false 時,會觸發 'pause' 事件。
事件:'readable'#
當有資料可從流中讀取時,會觸發 'readable' 事件,可讀取的資料量最多達到配置的高水位線(state.highWaterMark)。實際上,它表示流的緩衝區中有新的資訊。如果此緩衝區中有可用資料,可以呼叫 stream.read() 來檢索該資料。此外,當到達流的末尾時,也可能觸發 'readable' 事件。
const readable = getReadableStreamSomehow();
readable.on('readable', function() {
// There is some data to read now.
let data;
while ((data = this.read()) !== null) {
console.log(data);
}
});
如果到達了流的末尾,呼叫 stream.read() 將返回 null 並觸發 'end' 事件。如果從來沒有任何資料可讀,情況也是如此。例如,在下面的例子中,foo.txt 是一個空檔案:
const fs = require('node:fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
console.log(`readable: ${rr.read()}`);
});
rr.on('end', () => {
console.log('end');
});
執行此指令碼的輸出是:
$ node test.js
readable: null
end
在某些情況下,為 'readable' 事件附加監聽器會導致一定量的資料被讀入內部緩衝區。
總的來說,readable.pipe() 和 'data' 事件機制比 'readable' 事件更容易理解。然而,處理 'readable' 可能會帶來更高的吞吐量。
如果同時使用 'readable' 和 'data','readable' 在控制流方面具有優先權,即只有在呼叫 stream.read() 時才會觸發 'data'。readableFlowing 屬性將變為 false。如果在移除 'readable' 時存在 'data' 監聽器,流將開始流動,即無需呼叫 .resume() 就會觸發 'data' 事件。
事件:'resume'#
當呼叫 stream.resume() 並且 readableFlowing 不為 true 時,會觸發 'resume' 事件。
readable.destroy([error])#
銷燬流。可選擇性地觸發一個 'error' 事件,並觸發一個 'close' 事件(除非 emitClose 設定為 false)。此呼叫之後,可讀流將釋放任何內部資源,後續對 push() 的呼叫將被忽略。
一旦呼叫了 destroy(),任何進一步的呼叫都將是空操作,並且除了來自 _destroy() 的錯誤外,不會再有其他錯誤作為 'error' 發出。
實現者不應覆蓋此方法,而應實現 readable._destroy()。
readable.isPaused()#
- 返回:<boolean>
readable.isPaused() 方法返回 Readable 流的當前操作狀態。這主要由 readable.pipe() 方法底層的機制使用。在大多數典型情況下,沒有理由直接使用此方法。
const readable = new stream.Readable();
readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false
readable.pause()#
- 返回:<this>
readable.pause() 方法將導致處於流動模式的流停止觸發 'data' 事件,從而退出流動模式。任何可用的資料將保留在內部緩衝區中。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
readable.pause();
console.log('There will be no additional data for 1 second.');
setTimeout(() => {
console.log('Now data will start flowing again.');
readable.resume();
}, 1000);
});
如果存在 'readable' 事件監聽器,readable.pause() 方法無效。
readable.pipe(destination[, options])#
destination<stream.Writable> 用於寫入資料的目標options<Object> 管道選項end<boolean> 當讀取器結束時,結束寫入器。預設值:true。
- 返回:<stream.Writable> 目標流,如果它是
Duplex或Transform流,則允許管道鏈
readable.pipe() 方法將一個 Writable 流附加到 readable 流,使其自動切換到流動模式,並將其所有資料推送到附加的 Writable 流。資料流將被自動管理,以確保目標 Writable 流不會被速度更快的 Readable 流淹沒。
以下示例將 readable 流的所有資料透過管道傳輸到一個名為 file.txt 的檔案中:
const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'.
readable.pipe(writable);
可以將多個 Writable 流附加到一個 Readable 流。
readable.pipe() 方法返回對目標流的引用,從而可以建立管道流鏈:
const fs = require('node:fs');
const zlib = require('node:zlib');
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);
預設情況下,當源 Readable 流觸發 'end' 事件時,會在目標 Writable 流上呼叫 stream.end(),從而使目標流不再可寫。要停用此預設行為,可以將 end 選項設定為 false,這會使目標流保持開啟狀態:
reader.pipe(writer, { end: false });
reader.on('end', () => {
writer.end('Goodbye\n');
});
一個重要的注意事項是,如果 Readable 流在處理過程中觸發了錯誤,Writable 目標不會自動關閉。如果發生錯誤,需要手動關閉每個流以防止記憶體洩漏。
process.stderr 和 process.stdout 這兩個 Writable 流在 Node.js 程序退出之前永遠不會關閉,無論指定的選項如何。
readable.read([size])#
readable.read() 方法從內部緩衝區讀取資料並返回它。如果沒有可讀資料,則返回 null。預設情況下,資料作為 Buffer 物件返回,除非使用 readable.setEncoding() 方法指定了編碼,或者流處於物件模式。
可選的 size 引數指定要讀取的特定位元組數。如果沒有 size 位元組的資料可讀,將返回 null,除非流已經結束,在這種情況下,將返回內部緩衝區中剩餘的所有資料。
如果未指定 size 引數,將返回內部緩衝區中包含的所有資料。
size 引數必須小於或等於 1 GiB。
readable.read() 方法只應在處於暫停模式的 Readable 流上呼叫。在流動模式下,readable.read() 會被自動呼叫,直到內部緩衝區完全排空。
const readable = getReadableStreamSomehow();
// 'readable' may be triggered multiple times as data is buffered in
readable.on('readable', () => {
let chunk;
console.log('Stream is readable (new data received in buffer)');
// Use a loop to make sure we read all currently available data
while (null !== (chunk = readable.read())) {
console.log(`Read ${chunk.length} bytes of data...`);
}
});
// 'end' will be triggered once when there is no more data available
readable.on('end', () => {
console.log('Reached end of stream.');
});
每次呼叫 readable.read() 都會返回一個數據塊或 null,表示此時沒有更多資料可讀。這些資料塊不會自動連線。由於單次 read() 呼叫不會返回所有資料,可能需要使用 while 迴圈來連續讀取資料塊,直到檢索到所有資料。在讀取大檔案時,.read() 可能會暫時返回 null,表示它已消耗完所有緩衝內容,但可能還有更多資料待緩衝。在這種情況下,一旦緩衝區中有更多資料,就會觸發一個新的 'readable' 事件,而 'end' 事件則表示資料傳輸結束。
因此,要從一個 readable 流中讀取檔案的全部內容,需要在多個 'readable' 事件中收集資料塊:
const chunks = [];
readable.on('readable', () => {
let chunk;
while (null !== (chunk = readable.read())) {
chunks.push(chunk);
}
});
readable.on('end', () => {
const content = chunks.join('');
});
處於物件模式的 Readable 流在呼叫 readable.read(size) 時將始終返回單個專案,無論 size 引數的值是多少。
如果 readable.read() 方法返回一個數據塊,那麼也會觸發一個 'data' 事件。
在 'end' 事件觸發後呼叫 stream.read([size]) 將返回 null。不會引發執行時錯誤。
readable.readableEncoding#
獲取給定 Readable 流的 encoding 屬性的 getter。encoding 屬性可以使用 readable.setEncoding() 方法設定。
readable.resume()#
- 返回:<this>
readable.resume() 方法使一個被明確暫停的 Readable 流恢復觸發 'data' 事件,將流切換到流動模式。
readable.resume() 方法可用於完全消費流中的資料,而無需實際處理任何資料:
getReadableStreamSomehow()
.resume()
.on('end', () => {
console.log('Reached the end, but did not read anything.');
});
如果存在 'readable' 事件監聽器,readable.resume() 方法無效。
readable.setEncoding(encoding)#
readable.setEncoding() 方法為從 Readable 流讀取的資料設定字元編碼。
預設情況下,沒有分配編碼,流資料將作為 Buffer 物件返回。設定編碼會使流資料以指定編碼的字串形式返回,而不是作為 Buffer 物件。例如,呼叫 readable.setEncoding('utf8') 會使輸出資料被解釋為 UTF-8 資料,並以字串形式傳遞。呼叫 readable.setEncoding('hex') 會使資料被編碼為十六進位制字串格式。
Readable 流將正確處理透過流傳遞的多位元組字元,這些字元如果僅作為 Buffer 物件從流中拉取,可能會被錯誤解碼。
const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
assert.equal(typeof chunk, 'string');
console.log('Got %d characters of string data:', chunk.length);
});
readable.unpipe([destination])#
destination<stream.Writable> 可選的特定流以取消管道連線- 返回:<this>
readable.unpipe() 方法分離一個先前使用 stream.pipe() 方法附加的 Writable 流。
如果未指定 destination,則所有管道都將被分離。
如果指定了 destination,但沒有為它設定管道,則該方法不執行任何操作。
const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second.
readable.pipe(writable);
setTimeout(() => {
console.log('Stop writing to file.txt.');
readable.unpipe(writable);
console.log('Manually close the file stream.');
writable.end();
}, 1000);
readable.unshift(chunk[, encoding])#
chunk<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> 要插回到讀取佇列開頭的資料塊。對於非物件模式的流,chunk必須是 <string>、<Buffer>、<TypedArray>、<DataView> 或null。對於物件模式的流,chunk可以是任何 JavaScript 值。encoding<string> 字串塊的編碼。必須是有效的Buffer編碼,如'utf8'或'ascii'。
將 chunk 作為 null 傳遞表示流的結束(EOF),其行為與 readable.push(null) 相同,之後不能再寫入更多資料。EOF 訊號被放在緩衝區的末尾,任何已緩衝的資料仍將被重新整理。
readable.unshift() 方法將一塊資料推回到內部緩衝區。這在某些情況下很有用,例如當一個流被需要“反消費”(un-consume)一些它已從源中樂觀拉取的資料的程式碼消費時,以便這些資料可以傳遞給其他方。
在 'end' 事件觸發後,不能呼叫 stream.unshift(chunk) 方法,否則將丟擲執行時錯誤。
使用 stream.unshift() 的開發者通常應考慮切換到使用 Transform 流。有關更多資訊,請參見流實現者 API部分。
// Pull off a header delimited by \n\n.
// Use unshift() if we get too much.
// Call the callback with (error, header, stream).
const { StringDecoder } = require('node:string_decoder');
function parseHeader(stream, callback) {
stream.on('error', callback);
stream.on('readable', onReadable);
const decoder = new StringDecoder('utf8');
let header = '';
function onReadable() {
let chunk;
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk);
if (str.includes('\n\n')) {
// Found the header boundary.
const split = str.split(/\n\n/);
header += split.shift();
const remaining = split.join('\n\n');
const buf = Buffer.from(remaining, 'utf8');
stream.removeListener('error', callback);
// Remove the 'readable' listener before unshifting.
stream.removeListener('readable', onReadable);
if (buf.length)
stream.unshift(buf);
// Now the body of the message can be read from the stream.
callback(null, header, stream);
return;
}
// Still reading the header.
header += str;
}
}
}
與 stream.push(chunk) 不同,stream.unshift(chunk) 不會透過重置流的內部讀取狀態來結束讀取過程。如果在讀取期間(即在自定義流的 stream._read() 實現內部)呼叫 readable.unshift(),可能會導致意外結果。在呼叫 readable.unshift() 之後立即呼叫 stream.push('') 將適當地重置讀取狀態,但最好的做法是簡單地避免在執行讀取過程中呼叫 readable.unshift()。
readable.wrap(stream)#
在 Node.js 0.10 之前,流並未實現當前定義的整個 node:stream 模組 API。(有關更多資訊,請參見與舊版 Node.js 的相容性。)
當使用一個會觸發 'data' 事件並且其 stream.pause() 方法僅為建議性的舊 Node.js 庫時,可以使用 readable.wrap() 方法建立一個使用舊流作為其資料來源的 Readable 流。
很少需要使用 readable.wrap(),但提供此方法是為了方便與舊的 Node.js 應用程式和庫進行互動。
const { OldReader } = require('./old-api-module.js');
const { Readable } = require('node:stream');
const oreader = new OldReader();
const myReader = new Readable().wrap(oreader);
myReader.on('readable', () => {
myReader.read(); // etc.
});
readable[Symbol.asyncIterator]()#
- 返回:<AsyncIterator> 用於完全消費流。
const fs = require('node:fs');
async function print(readable) {
readable.setEncoding('utf8');
let data = '';
for await (const chunk of readable) {
data += chunk;
}
console.log(data);
}
print(fs.createReadStream('file')).catch(console.error);
如果迴圈以 break、return 或 throw 終止,流將被銷燬。換句話說,迭代一個流將完全消費它。流將以等於 highWaterMark 選項大小的塊進行讀取。在上面的程式碼示例中,如果檔案的資料小於 64 KiB,資料將位於單個塊中,因為沒有向 fs.createReadStream() 提供 highWaterMark 選項。
readable[Symbol.asyncDispose]()#
使用 AbortError 呼叫 readable.destroy() 並返回一個在流完成時履行的 promise。
readable.compose(stream[, options])#
stream<Stream> | <Iterable> | <AsyncIterable> | <Function>options<Object>signal<AbortSignal> 允許在訊號中止時銷燬流。
- 返回: <Duplex> 一個與
stream流組合的流。
import { Readable } from 'node:stream';
async function* splitToWords(source) {
for await (const chunk of source) {
const words = String(chunk).split(' ');
for (const word of words) {
yield word;
}
}
}
const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords);
const words = await wordsStream.toArray();
console.log(words); // prints ['this', 'is', 'compose', 'as', 'operator']
參見 stream.compose 瞭解更多資訊。
readable.iterator([options])#
options<Object>destroyOnReturn<boolean> 當設定為false時,在非同步迭代器上呼叫return,或使用break、return或throw退出for await...of迭代,將不會銷燬流。預設值:true。
- 返回: <AsyncIterator> 用於消費流。
此方法建立的迭代器為使用者提供了一個選項:如果 for await...of 迴圈因 return、break 或 throw 而退出,可以取消流的銷燬;或者如果流在迭代期間發出錯誤,迭代器應銷燬流。
const { Readable } = require('node:stream');
async function printIterator(readable) {
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk); // 1
break;
}
console.log(readable.destroyed); // false
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk); // Will print 2 and then 3
}
console.log(readable.destroyed); // True, stream was totally consumed
}
async function printSymbolAsyncIterator(readable) {
for await (const chunk of readable) {
console.log(chunk); // 1
break;
}
console.log(readable.destroyed); // true
}
async function showBoth() {
await printIterator(Readable.from([1, 2, 3]));
await printSymbolAsyncIterator(Readable.from([1, 2, 3]));
}
showBoth();
readable.map(fn[, options])#
fn<Function> | <AsyncFunction> 一個用於對映流中每個資料塊的函式。data<any> 來自流的一個數據塊。options<Object>signal<AbortSignal> 如果流被銷燬,該訊號會中止,從而允許儘早中止fn呼叫。
options<Object>concurrency<number> 在流上一次性併發呼叫fn的最大次數。預設值:1。highWaterMark<number> 在等待使用者消費對映後的專案時,可以緩衝的專案數量。預設值:concurrency * 2 - 1。signal<AbortSignal> 允許在訊號中止時銷燬流。
- 返回: <Readable> 一個使用函式
fn對映過的流。
此方法允許對流進行對映。對於流中的每個資料塊,都會呼叫 fn 函式。如果 fn 函式返回一個 promise,那麼在將結果傳遞給結果流之前,會 await 這個 promise。
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
// With a synchronous mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
console.log(chunk); // 2, 4, 6, 8
}
// With an asynchronous mapper, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map((domain) => resolver.resolve4(domain), { concurrency: 2 });
for await (const result of dnsResults) {
console.log(result); // Logs the DNS result of resolver.resolve4.
}
readable.filter(fn[, options])#
fn<Function> | <AsyncFunction> 一個用於從流中過濾資料塊的函式。data<any> 來自流的一個數據塊。options<Object>signal<AbortSignal> 如果流被銷燬,該訊號會中止,從而允許儘早中止fn呼叫。
options<Object>concurrency<number> 在流上一次性併發呼叫fn的最大次數。預設值:1。highWaterMark<number> 在等待使用者消費過濾後的專案時,可以緩衝的專案數量。預設值:concurrency * 2 - 1。signal<AbortSignal> 允許在訊號中止時銷燬流。
- 返回: <Readable> 一個使用謂詞
fn過濾過的流。
此方法允許過濾流。對於流中的每個資料塊,都會呼叫 fn 函式,如果它返回一個真值(truthy value),該資料塊將被傳遞到結果流。如果 fn 函式返回一個 promise,那麼會 await 這個 promise。
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).filter(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address.ttl > 60;
}, { concurrency: 2 });
for await (const result of dnsResults) {
// Logs domains with more than 60 seconds on the resolved dns record.
console.log(result);
}
readable.forEach(fn[, options])#
fn<Function> | <AsyncFunction> 一個在流的每個資料塊上呼叫的函式。data<any> 來自流的一個數據塊。options<Object>signal<AbortSignal> 如果流被銷燬,該訊號會中止,從而允許儘早中止fn呼叫。
options<Object>concurrency<number> 在流上一次性併發呼叫fn的最大次數。預設值:1。signal<AbortSignal> 允許在訊號中止時銷燬流。
- 返回: <Promise> 一個在流完成時履行的 promise。
此方法允許迭代一個流。對於流中的每個資料塊,都會呼叫 fn 函式。如果 fn 函式返回一個 promise,那麼會 await 這個 promise。
此方法與 for await...of 迴圈的不同之處在於,它可以選擇性地併發處理資料塊。此外,forEach 迭代只能透過傳遞一個 signal 選項並中止相關的 AbortController 來停止,而 for await...of 可以用 break 或 return 來停止。無論哪種情況,流都將被銷燬。
此方法與監聽 'data' 事件的不同之處在於,它在底層機制中使用 readable 事件,並且可以限制併發 fn 呼叫的數量。
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address;
}, { concurrency: 2 });
await dnsResults.forEach((result) => {
// Logs result, similar to `for await (const result of dnsResults)`
console.log(result);
});
console.log('done'); // Stream has finished
readable.toArray([options])#
options<Object>signal<AbortSignal> 允許在訊號中止時取消 toArray 操作。
- 返回: <Promise> 一個包含流內容的陣列的 promise。
此方法可以方便地獲取流的內容。
由於此方法會將整個流讀入記憶體,它抵消了流的優勢。它旨在用於互操作性和便利性,而不是作為消費流的主要方式。
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]
// Make dns queries concurrently using .map and collect
// the results into an array using toArray
const dnsResults = await Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address;
}, { concurrency: 2 }).toArray();
readable.some(fn[, options])#
fn<Function> | <AsyncFunction> 一個在流的每個資料塊上呼叫的函式。data<any> 來自流的一個數據塊。options<Object>signal<AbortSignal> 如果流被銷燬,該訊號會中止,從而允許儘早中止fn呼叫。
options<Object>concurrency<number> 在流上一次性併發呼叫fn的最大次數。預設值:1。signal<AbortSignal> 允許在訊號中止時銷燬流。
- 返回: <Promise> 如果
fn對至少一個數據塊返回了真值,則 promise 的評估值為true。
此方法類似於 Array.prototype.some,它對流中的每個資料塊呼叫 fn,直到等待的返回值為 true(或任何真值)。一旦對某個資料塊的 fn 呼叫等待的返回值為真值,流就會被銷燬,並且 promise 會以 true 履行。如果所有對資料塊的 fn 呼叫都未返回真值,則 promise 會以 false 履行。
import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';
// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).some((x) => x > 2); // true
await Readable.from([1, 2, 3, 4]).some((x) => x < 0); // false
// With an asynchronous predicate, making at most 2 file checks at a time.
const anyBigFile = await Readable.from([
'file1',
'file2',
'file3',
]).some(async (fileName) => {
const stats = await stat(fileName);
return stats.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished
readable.find(fn[, options])#
fn<Function> | <AsyncFunction> 一個在流的每個資料塊上呼叫的函式。data<any> 來自流的一個數據塊。options<Object>signal<AbortSignal> 如果流被銷燬,該訊號會中止,從而允許儘早中止fn呼叫。
options<Object>concurrency<number> 在流上一次性併發呼叫fn的最大次數。預設值:1。signal<AbortSignal> 允許在訊號中止時銷燬流。
- 返回: <Promise> 一個 promise,其評估值為
fn評估為真值的第一個資料塊,如果未找到任何元素,則為undefined。
此方法類似於 Array.prototype.find,它對流中的每個資料塊呼叫 fn 以查詢一個使 fn 返回真值的資料塊。一旦某個 fn 呼叫等待的返回值為真值,流就會被銷燬,並且 promise 會以該資料塊履行。如果所有對資料塊的 fn 呼叫都返回假值(falsy value),則 promise 會以 undefined 履行。
import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';
// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).find((x) => x > 2); // 3
await Readable.from([1, 2, 3, 4]).find((x) => x > 0); // 1
await Readable.from([1, 2, 3, 4]).find((x) => x > 10); // undefined
// With an asynchronous predicate, making at most 2 file checks at a time.
const foundBigFile = await Readable.from([
'file1',
'file2',
'file3',
]).find(async (fileName) => {
const stats = await stat(fileName);
return stats.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(foundBigFile); // File name of large file, if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished
readable.every(fn[, options])#
fn<Function> | <AsyncFunction> 一個在流的每個資料塊上呼叫的函式。data<any> 來自流的一個數據塊。options<Object>signal<AbortSignal> 如果流被銷燬,該訊號會中止,從而允許儘早中止fn呼叫。
options<Object>concurrency<number> 在流上一次性併發呼叫fn的最大次數。預設值:1。signal<AbortSignal> 允許在訊號中止時銷燬流。
- 返回: <Promise> 如果
fn對所有資料塊都返回了真值,則 promise 的評估值為true。
此方法類似於 Array.prototype.every,它對流中的每個資料塊呼叫 fn,以檢查所有等待的返回值是否都為真值。一旦對某個資料塊的 fn 呼叫等待的返回值為假值,流就會被銷燬,並且 promise 會以 false 履行。如果所有對資料塊的 fn 呼叫都返回真值,則 promise 會以 true 履行。
import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';
// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).every((x) => x > 2); // false
await Readable.from([1, 2, 3, 4]).every((x) => x > 0); // true
// With an asynchronous predicate, making at most 2 file checks at a time.
const allBigFiles = await Readable.from([
'file1',
'file2',
'file3',
]).every(async (fileName) => {
const stats = await stat(fileName);
return stats.size > 1024 * 1024;
}, { concurrency: 2 });
// `true` if all files in the list are bigger than 1MiB
console.log(allBigFiles);
console.log('done'); // Stream has finished
readable.flatMap(fn[, options])#
fn<Function> | <AsyncGeneratorFunction> | <AsyncFunction> 一個用於對映流中每個資料塊的函式。data<any> 來自流的一個數據塊。options<Object>signal<AbortSignal> 如果流被銷燬,該訊號會中止,從而允許儘早中止fn呼叫。
options<Object>concurrency<number> 在流上一次性併發呼叫fn的最大次數。預設值:1。signal<AbortSignal> 允許在訊號中止時銷燬流。
- 返回: <Readable> 一個使用函式
fn進行扁平化對映的流。
此方法透過將給定的回撥應用於流的每個資料塊然後將結果扁平化來返回一個新流。
可以從 fn 返回一個流或其他可迭代或非同步可迭代物件,結果流將被合併(扁平化)到返回的流中。
import { Readable } from 'node:stream';
import { createReadStream } from 'node:fs';
// With a synchronous mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
console.log(chunk); // 1, 1, 2, 2, 3, 3, 4, 4
}
// With an asynchronous mapper, combine the contents of 4 files
const concatResult = Readable.from([
'./1.mjs',
'./2.mjs',
'./3.mjs',
'./4.mjs',
]).flatMap((fileName) => createReadStream(fileName));
for await (const result of concatResult) {
// This will contain the contents (all chunks) of all 4 files
console.log(result);
}
readable.drop(limit[, options])#
limit<number> 要從可讀流中丟棄的資料塊數量。options<Object>signal<AbortSignal> 允許在訊號中止時銷燬流。
- 返回: <Readable> 一個丟棄了
limit個數據塊的流。
此方法返回一個丟棄了前 limit 個數據塊的新流。
import { Readable } from 'node:stream';
await Readable.from([1, 2, 3, 4]).drop(2).toArray(); // [3, 4]
readable.take(limit[, options])#
limit<number> 要從可讀流中獲取的資料塊數量。options<Object>signal<AbortSignal> 允許在訊號中止時銷燬流。
- 返回: <Readable> 一個包含
limit個數據塊的流。
此方法返回一個包含前 limit 個數據塊的新流。
import { Readable } from 'node:stream';
await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
readable.reduce(fn[, initial[, options]])#
fn<Function> | <AsyncFunction> 一個在流的每個資料塊上呼叫的歸納函式。previous<any> 上一次呼叫fn獲得的值,或者如果指定了initial值則是該值,否則是流的第一個資料塊。data<any> 來自流的一個數據塊。options<Object>signal<AbortSignal> 如果流被銷燬,該訊號會中止,從而允許儘早中止fn呼叫。
initial<any> 在歸納中使用的初始值。options<Object>signal<AbortSignal> 允許在訊號中止時銷燬流。
- 返回: <Promise> 一個歸納最終值的 promise。
此方法按順序對流的每個資料塊呼叫 fn,並將上一個元素計算的結果傳遞給它。它返回一個歸納最終值的 promise。
如果沒有提供 initial 值,則使用流的第一個資料塊作為初始值。如果流為空,則 promise 會被拒絕,並帶有一個具有 ERR_INVALID_ARGS 程式碼屬性的 TypeError。
import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';
const directoryPath = './src';
const filesInDir = await readdir(directoryPath);
const folderSize = await Readable.from(filesInDir)
.reduce(async (totalSize, file) => {
const { size } = await stat(join(directoryPath, file));
return totalSize + size;
}, 0);
console.log(folderSize);
歸納函式逐個元素地迭代流,這意味著沒有 concurrency 引數或並行性。要併發執行 reduce,你可以將非同步函式提取到 readable.map 方法中。
import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';
const directoryPath = './src';
const filesInDir = await readdir(directoryPath);
const folderSize = await Readable.from(filesInDir)
.map((file) => stat(join(directoryPath, file)), { concurrency: 2 })
.reduce((totalSize, { size }) => totalSize + size, 0);
console.log(folderSize);
雙工流和轉換流#
類: stream.Transform#
轉換流是 Duplex 流,其輸出與輸入以某種方式相關。與所有 Duplex 流一樣,Transform 流同時實現了 Readable 和 Writable 介面。
Transform 流的示例包括
transform.destroy([error])#
銷燬流,並可選擇性地發出一個 'error' 事件。此呼叫後,轉換流將釋放任何內部資源。實現者不應覆蓋此方法,而應實現 readable._destroy()。Transform 的 _destroy() 的預設實現也會發出 'close',除非 emitClose 設定為 false。
一旦呼叫了 destroy(),任何進一步的呼叫都將是空操作,並且除了來自 _destroy() 的錯誤外,不會再有其他錯誤作為 'error' 發出。
stream.finished(stream[, options], callback)#
stream<Stream> | <ReadableStream> | <WritableStream> 一個可讀和/或可寫的流/網路流。options<Object>error<boolean> 如果設定為false,則對emit('error', err)的呼叫不被視為完成。預設值:true。readable<boolean> 當設定為false時,即使流可能仍然可讀,當流結束時也會呼叫回撥。預設值:true。writable<boolean> 當設定為false時,即使流可能仍然可寫,當流結束時也會呼叫回撥。預設值:true。signal<AbortSignal> 允許中止等待流完成。如果訊號被中止,底層的流將不會被中止。回撥將帶有一個AbortError被呼叫。此函式新增的所有已註冊監聽器也將被移除。
callback<Function> 一個回撥函式,它接受一個可選的錯誤引數。- 返回: <Function> 一個清理函式,用於移除所有已註冊的監聽器。
一個用於在流不再可讀、可寫或遇到錯誤或過早關閉事件時獲得通知的函式。
const { finished } = require('node:stream');
const fs = require('node:fs');
const rs = fs.createReadStream('archive.tar');
finished(rs, (err) => {
if (err) {
console.error('Stream failed.', err);
} else {
console.log('Stream is done reading.');
}
});
rs.resume(); // Drain the stream.
在錯誤處理場景中特別有用,例如流被過早銷燬(如中止的 HTTP 請求),並且不會發出 'end' 或 'finish'。
finished API 提供了 promise 版本。
在呼叫 callback 之後,stream.finished() 會留下懸掛的事件監聽器(特別是 'error'、'end'、'finish' 和 'close')。這樣做的原因是為了防止意外的 'error' 事件(由於不正確的流實現)導致意外的崩潰。如果不需要這種行為,則需要在回撥中呼叫返回的清理函式。
const cleanup = finished(rs, (err) => {
cleanup();
// ...
});
stream.pipeline(source[, ...transforms], destination, callback)#
stream.pipeline(streams, callback)#
streams<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]>source<Stream> | <Iterable> | <AsyncIterable> | <Function> | <ReadableStream>- 返回: <Iterable> | <AsyncIterable>
...transforms<Stream> | <Function> | <TransformStream>source<AsyncIterable>- 返回: <AsyncIterable>
destination<Stream> | <Function> | <WritableStream>source<AsyncIterable>- 返回: <AsyncIterable> | <Promise>
callback<Function> 在管道完全完成後呼叫。err<Error>val由destination返回的Promise的解析值。
- 返回: <Stream>
一個模組方法,用於在流和生成器之間進行管道傳輸,轉發錯誤並正確清理,並在管道完成時提供回撥。
const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');
// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge tar file efficiently:
pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
},
);
pipeline API 提供了 promise 版本。
stream.pipeline() 會在所有流上呼叫 stream.destroy(err),除了
- 已經發出
'end'或'close'的Readable流。 - 已經發出
'finish'或'close'的Writable流。
在呼叫 callback 後,stream.pipeline() 會在流上留下懸掛的事件監聽器。在失敗後重用流的情況下,這可能導致事件監聽器洩漏和錯誤被吞噬。如果最後一個流是可讀的,懸掛的事件監聽器將被移除,以便之後可以消費最後一個流。
當發生錯誤時,stream.pipeline() 會關閉所有流。將 IncomingRequest 與 pipeline 一起使用可能會導致意外行為,因為它會在不傳送預期響應的情況下銷燬套接字。請參見下面的示例。
const fs = require('node:fs');
const http = require('node:http');
const { pipeline } = require('node:stream');
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('./fileNotExist.txt');
pipeline(fileStream, res, (err) => {
if (err) {
console.log(err); // No such file
// this message can't be sent once `pipeline` already destroyed the socket
return res.end('error!!!');
}
});
});
stream.compose(...streams)#
stream.compose 是實驗性的。streams<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]> | <Duplex[]> | <Function>- 返回: <stream.Duplex>
將兩個或多個流組合成一個 Duplex 流,該流寫入第一個流並從最後一個流讀取。每個提供的流都透過 stream.pipeline 管道連線到下一個流。如果任何一個流出錯,則所有流都會被銷燬,包括外部的 Duplex 流。
因為 stream.compose 返回一個新流,這個新流本身可以(也應該)被管道連線到其他流,所以它支援組合。相比之下,當將流傳遞給 stream.pipeline 時,通常第一個流是可讀流,最後一個是可寫流,形成一個閉合迴路。
如果傳遞一個 Function,它必須是一個工廠方法,接受一個 source Iterable。
import { compose, Transform } from 'node:stream';
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''));
},
});
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
}
let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf;
}
console.log(res); // prints 'HELLOWORLD'
stream.compose 可用於將非同步可迭代物件、生成器和函式轉換為流。
AsyncIterable轉換為可讀的Duplex。不能 yieldnull。AsyncGeneratorFunction轉換為可讀/可寫的轉換Duplex。必須接受一個源AsyncIterable作為第一個引數。不能 yieldnull。AsyncFunction轉換為可寫的Duplex。必須返回null或undefined。
import { compose } from 'node:stream';
import { finished } from 'node:stream/promises';
// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
yield 'Hello';
yield 'World';
}());
// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});
let res = '';
// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
for await (const chunk of source) {
res += chunk;
}
});
await finished(compose(s1, s2, s3));
console.log(res); // prints 'HELLOWORLD'
參見 readable.compose(stream) 以瞭解作為運算子的 stream.compose。
stream.isErrored(stream)#
stream<Readable> | <Writable> | <Duplex> | <WritableStream> | <ReadableStream>- 返回:<boolean>
返回流是否遇到了錯誤。
stream.isReadable(stream)#
stream<Readable> | <Duplex> | <ReadableStream>- 返回: <boolean> | <null> - 僅當
stream不是有效的Readable、Duplex或ReadableStream時才返回null。
返回流是否可讀。
stream.isWritable(stream)#
stream<Writable> | <Duplex> | <WritableStream>- 返回: <boolean> | <null> - 僅當
stream不是有效的Writable、Duplex或WritableStream時才返回null。
返回流是否可寫。
stream.Readable.from(iterable[, options])#
iterable<Iterable> 實現了Symbol.asyncIterator或Symbol.iterator可迭代協議的物件。如果傳遞了 null 值,則會發出 'error' 事件。options<Object> 提供給new stream.Readable([options])的選項。預設情況下,Readable.from()會將options.objectMode設定為true,除非透過將options.objectMode設定為false來明確選擇退出。- 返回: <stream.Readable>
一個用於從迭代器建立可讀流的工具方法。
const { Readable } = require('node:stream');
async function * generate() {
yield 'hello';
yield 'streams';
}
const readable = Readable.from(generate());
readable.on('data', (chunk) => {
console.log(chunk);
});
出於效能原因,呼叫 Readable.from(string) 或 Readable.from(buffer) 不會對字串或緩衝區進行迭代以匹配其他流的語義。
如果將包含 promise 的 Iterable 物件作為引數傳遞,可能會導致未處理的拒絕(unhandled rejection)。
const { Readable } = require('node:stream');
Readable.from([
new Promise((resolve) => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
]);
stream.Readable.fromWeb(readableStream[, options])#
readableStream<ReadableStream>options<Object>encoding<string>highWaterMark<number>objectMode<boolean>signal<AbortSignal>
- 返回: <stream.Readable>
stream.Readable.isDisturbed(stream)#
stream<stream.Readable> | <ReadableStream>- 返回:
boolean
返回流是否已被讀取或取消。
stream.Readable.toWeb(streamReadable[, options])#
streamReadable<stream.Readable>options<Object>strategy<Object>highWaterMark<number> 在從給定的stream.Readable讀取時應用背壓之前,建立的ReadableStream的最大內部佇列大小。如果未提供值,則將從給定的stream.Readable中獲取。size<Function> 一個計算給定資料塊大小的函式。如果未提供值,則所有資料塊的大小將為1。
- 返回:<ReadableStream>
stream.Writable.fromWeb(writableStream[, options])#
writableStream<WritableStream>options<Object>decodeStrings<boolean>highWaterMark<number>objectMode<boolean>signal<AbortSignal>
- 返回: <stream.Writable>
stream.Writable.toWeb(streamWritable)#
streamWritable<stream.Writable>- 返回: <WritableStream>
stream.Duplex.from(src)#
src<Stream> | <Blob> | <ArrayBuffer> | <string> | <Iterable> | <AsyncIterable> | <AsyncGeneratorFunction> | <AsyncFunction> | <Promise> | <Object> | <ReadableStream> | <WritableStream>
一個用於建立雙工流的實用方法。
Stream將可寫流轉換為可寫的Duplex,將可讀流轉換為Duplex。Blob轉換為可讀的Duplex。string轉換為可讀的Duplex。ArrayBuffer轉換為可讀的Duplex。AsyncIterable轉換為可讀的Duplex。不能 yieldnull。AsyncGeneratorFunction轉換為可讀/可寫的轉換Duplex。必須接受一個源AsyncIterable作為第一個引數。不能 yieldnull。AsyncFunction轉換為可寫的Duplex。必須返回null或undefinedObject ({ writable, readable })將readable和writable轉換為Stream,然後將它們組合成一個Duplex,該Duplex將寫入writable並從readable讀取。Promise轉換為可讀的Duplex。值null被忽略。ReadableStream轉換為可讀的Duplex。WritableStream轉換為可寫的Duplex。- 返回: <stream.Duplex>
如果將包含 promise 的 Iterable 物件作為引數傳遞,可能會導致未處理的拒絕(unhandled rejection)。
const { Duplex } = require('node:stream');
Duplex.from([
new Promise((resolve) => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
]);
stream.Duplex.fromWeb(pair[, options])#
pair<Object>readable<ReadableStream>writable<WritableStream>
options<Object>- 返回: <stream.Duplex>
import { Duplex } from 'node:stream';
import {
ReadableStream,
WritableStream,
} from 'node:stream/web';
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world');
},
});
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk);
},
});
const pair = {
readable,
writable,
};
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });
duplex.write('hello');
for await (const chunk of duplex) {
console.log('readable', chunk);
}const { Duplex } = require('node:stream');
const {
ReadableStream,
WritableStream,
} = require('node:stream/web');
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world');
},
});
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk);
},
});
const pair = {
readable,
writable,
};
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });
duplex.write('hello');
duplex.once('readable', () => console.log('readable', duplex.read()));
stream.Duplex.toWeb(streamDuplex)#
streamDuplex<stream.Duplex>- 返回:<Object>
readable<ReadableStream>writable<WritableStream>
import { Duplex } from 'node:stream';
const duplex = Duplex({
objectMode: true,
read() {
this.push('world');
this.push(null);
},
write(chunk, encoding, callback) {
console.log('writable', chunk);
callback();
},
});
const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');
const { value } = await readable.getReader().read();
console.log('readable', value);const { Duplex } = require('node:stream');
const duplex = Duplex({
objectMode: true,
read() {
this.push('world');
this.push(null);
},
write(chunk, encoding, callback) {
console.log('writable', chunk);
callback();
},
});
const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');
readable.getReader().read().then((result) => {
console.log('readable', result.value);
});
stream.addAbortSignal(signal, stream)#
signal<AbortSignal> 一個表示可能取消的訊號stream<Stream> | <ReadableStream> | <WritableStream> 要附加訊號的流。
將 AbortSignal 附加到可讀或可寫流。這讓程式碼可以使用 AbortController 控制流的銷燬。
在與傳遞的 AbortSignal 相對應的 AbortController 上呼叫 abort 的行為,將與在流上呼叫 .destroy(new AbortError()) 的行為相同,對於 Web 流則是 controller.error(new AbortError())。
const fs = require('node:fs');
const controller = new AbortController();
const read = addAbortSignal(
controller.signal,
fs.createReadStream(('object.json')),
);
// Later, abort the operation closing the stream
controller.abort();
或者將 AbortSignal 與可讀流一起用作非同步迭代器
const controller = new AbortController();
setTimeout(() => controller.abort(), 10_000); // set a timeout
const stream = addAbortSignal(
controller.signal,
fs.createReadStream(('object.json')),
);
(async () => {
try {
for await (const chunk of stream) {
await process(chunk);
}
} catch (e) {
if (e.name === 'AbortError') {
// The operation was cancelled
} else {
throw e;
}
}
})();
或者將 AbortSignal 與 ReadableStream 一起使用
const controller = new AbortController();
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello');
controller.enqueue('world');
controller.close();
},
});
addAbortSignal(controller.signal, rs);
finished(rs, (err) => {
if (err) {
if (err.name === 'AbortError') {
// The operation was cancelled
}
}
});
const reader = rs.getReader();
reader.read().then(({ value, done }) => {
console.log(value); // hello
console.log(done); // false
controller.abort();
});
stream.getDefaultHighWaterMark(objectMode)#
返回流使用的預設 highWaterMark。預設為 65536 (64 KiB),對於 objectMode 則為 16。
面向流實現者的 API#
node:stream 模組的 API 旨在使得使用 JavaScript 的原型繼承模型輕鬆實現流成為可能。
首先,流開發者需要宣告一個新的 JavaScript 類,該類繼承自四個基本流類之一(stream.Writable、stream.Readable、stream.Duplex 或 stream.Transform),並確保呼叫了相應的父類建構函式
const { Writable } = require('node:stream');
class MyWritable extends Writable {
constructor({ highWaterMark, ...options }) {
super({ highWaterMark });
// ...
}
}
在擴充套件流時,請記住使用者可以且應該提供哪些選項,然後再將這些選項轉發給基類建構函式。例如,如果實現對 autoDestroy 和 emitClose 選項有特定假設,則不要允許使用者覆蓋這些選項。應明確說明轉發了哪些選項,而不是隱式轉發所有選項。
新的流類必須實現一個或多個特定方法,具體取決於所建立的流的型別,如下表所示
| 用例 | 類 | 需要實現的方法 |
|---|---|---|
| 只讀 | Readable | _read() |
| 只寫 | Writable | _write(), _writev(), _final() |
| 讀和寫 | Duplex | _read(), _write(), _writev(), _final() |
| 對寫入的資料進行操作,然後讀取結果 | Transform | _transform(), _flush(), _final() |
流的實現程式碼絕不應該呼叫供消費者使用的流的“公共”方法(如面向流消費者的 API部分所述)。這樣做可能會導致使用該流的應用程式程式碼產生不良副作用。
避免覆蓋公共方法,如 write()、end()、cork()、uncork()、read() 和 destroy(),或者透過 .emit() 觸發內部事件,如 'error'、'data'、'end'、'finish' 和 'close'。這樣做會破壞當前和未來的流不變數,導致與其他流、流實用工具和使用者期望的行為和/或相容性問題。
簡化的構造#
對於許多簡單情況,可以在不依賴繼承的情況下建立流。這可以透過直接建立 stream.Writable、stream.Readable、stream.Duplex 或 stream.Transform 物件的例項,並將適當的方法作為建構函式選項傳遞來實現。
const { Writable } = require('node:stream');
const myWritable = new Writable({
construct(callback) {
// Initialize state and load resources...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
// Free resources...
},
});
實現一個可寫流#
stream.Writable 類被擴充套件以實現一個 Writable 流。
自定義的 Writable 流必須呼叫 new stream.Writable([options]) 建構函式並實現 writable._write() 和/或 writable._writev() 方法。
new stream.Writable([options])#
options<Object>highWaterMark<number> 當stream.write()開始返回false時的緩衝區級別。預設值:65536(64 KiB),對於objectMode流則為16。decodeStrings<boolean> 在將傳遞給stream.write()的string傳遞給stream._write()之前,是否將其編碼為Buffer(使用stream.write()呼叫中指定的編碼)。其他型別的資料不會被轉換(即Buffer不會被解碼為string)。設定為 false 將阻止string的轉換。預設值:true。defaultEncoding<string> 當沒有指定編碼作為stream.write()的引數時使用的預設編碼。預設值:'utf8'。objectMode<boolean>stream.write(anyObj)是否是有效操作。當設定時,如果流實現支援,則可以寫入除 string、<Buffer>、<TypedArray> 或 <DataView> 之外的 JavaScript 值。預設值:false。emitClose<boolean> 流在被銷燬後是否應發出'close'事件。預設值:true。write<Function>stream._write()方法的實現。writev<Function>stream._writev()方法的實現。destroy<Function>stream._destroy()方法的實現。final<Function>stream._final()方法的實現。construct<Function>stream._construct()方法的實現。autoDestroy<boolean> 此流在結束後是否應自動呼叫.destroy()。預設值:true。signal<AbortSignal> 一個表示可能取消的訊號。
const { Writable } = require('node:stream');
class MyWritable extends Writable {
constructor(options) {
// Calls the stream.Writable() constructor.
super(options);
// ...
}
}
或者,在使用 ES6 之前的建構函式風格時
const { Writable } = require('node:stream');
const util = require('node:util');
function MyWritable(options) {
if (!(this instanceof MyWritable))
return new MyWritable(options);
Writable.call(this, options);
}
util.inherits(MyWritable, Writable);
或者,使用簡化的建構函式方法
const { Writable } = require('node:stream');
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
});
在與傳遞的 AbortSignal 相對應的 AbortController 上呼叫 abort 的行為將與在可寫流上呼叫 .destroy(new AbortError()) 的行為相同。
const { Writable } = require('node:stream');
const controller = new AbortController();
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
signal: controller.signal,
});
// Later, abort the operation closing the stream
controller.abort();
writable._construct(callback)#
callback<Function> 當流完成初始化時呼叫此函式(可選地帶一個錯誤引數)。
_construct() 方法不能被直接呼叫。它可以由子類實現,如果實現,則僅由內部 Writable 類的方法呼叫。
這個可選函式將在流建構函式返回後的一個 tick 中被呼叫,將任何 _write()、_final() 和 _destroy() 呼叫延遲到 callback 被呼叫之後。這對於在流可以使用之前初始化狀態或非同步初始化資源很有用。
const { Writable } = require('node:stream');
const fs = require('node:fs');
class WriteStream extends Writable {
constructor(filename) {
super();
this.filename = filename;
this.fd = null;
}
_construct(callback) {
fs.open(this.filename, 'w', (err, fd) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback);
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
writable._write(chunk, encoding, callback)#
chunk<Buffer> | <string> | <any> 要寫入的Buffer,由傳遞給stream.write()的string轉換而來。如果流的decodeStrings選項為false或流在物件模式下操作,則 chunk 不會被轉換,並且將是傳遞給stream.write()的任何內容。encoding<string> 如果 chunk 是一個字串,則encoding是該字串的字元編碼。如果 chunk 是一個Buffer,或者流在物件模式下操作,encoding可能會被忽略。callback<Function> 當所提供的資料塊處理完成時呼叫此函式(可選地帶一個錯誤引數)。
所有 Writable 流的實現都必須提供一個 writable._write() 和/或 writable._writev() 方法來將資料傳送到底層資源。
Transform 流提供了它們自己的 writable._write() 實現。
此函式不能由應用程式程式碼直接呼叫。它應該由子類實現,並僅由內部 Writable 類的方法呼叫。
callback 函式必須在 writable._write() 內部同步呼叫,或非同步呼叫(即在不同的 tick 中)以表明寫入成功完成或因錯誤而失敗。傳遞給 callback 的第一個引數必須是 Error 物件(如果呼叫失敗)或 null(如果寫入成功)。
在 writable._write() 被呼叫和 callback 被呼叫之間發生的所有對 writable.write() 的呼叫都將導致寫入的資料被緩衝。當 callback 被呼叫時,流可能會發出一個 'drain' 事件。如果一個流的實現能夠一次處理多個數據塊,那麼應該實現 writable._writev() 方法。
如果在建構函式選項中將 decodeStrings 屬性顯式設定為 false,則 chunk 將保持與傳遞給 .write() 的物件相同,並且可能是一個字串而不是 Buffer。這是為了支援對某些字串資料編碼有最佳化處理的實現。在這種情況下,encoding 引數將指示字串的字元編碼。否則,encoding 引數可以安全地忽略。
writable._write() 方法帶有下劃線字首,因為它對於定義它的類是內部的,並且絕不應由使用者程式直接呼叫。
writable._writev(chunks, callback)#
chunks<Object[]> 要寫入的資料。該值是一個 <Object> 陣列,每個物件代表一個要寫入的離散資料塊。這些物件的屬性是callback<Function> 一個回撥函式(可選地帶一個錯誤引數),在處理完所提供的資料塊後被呼叫。
此函式不能由應用程式程式碼直接呼叫。它應該由子類實現,並僅由內部 Writable 類的方法呼叫。
writable._writev() 方法可以作為 writable._write() 的補充或替代方案,在能夠一次處理多個數據塊的流實現中實現。如果實現了該方法,並且存在來自先前寫入的緩衝資料,則將呼叫 _writev() 而不是 _write()。
writable._writev() 方法帶有下劃線字首,因為它對於定義它的類是內部的,並且絕不應由使用者程式直接呼叫。
writable._destroy(err, callback)#
err<Error> 一個可能的錯誤。callback<Function> 一個回撥函式,它接受一個可選的錯誤引數。
_destroy() 方法由 writable.destroy() 呼叫。它可以被子類覆蓋,但絕不能直接呼叫。
writable._final(callback)#
callback<Function> 當寫完所有剩餘資料後呼叫此函式(可選地帶一個錯誤引數)。
_final() 方法絕不能直接呼叫。它可以由子類實現,如果實現,則僅由內部 Writable 類的方法呼叫。
這個可選函式將在流關閉之前被呼叫,將 'finish' 事件延遲到 callback 被呼叫之後。這對於在流結束前關閉資源或寫入緩衝資料很有用。
寫入時出錯#
在處理 writable._write()、writable._writev() 和 writable._final() 方法期間發生的錯誤必須透過呼叫回撥函式並將錯誤作為第一個引數傳遞來傳播。在這些方法中丟擲 Error 或手動發出 'error' 事件會導致未定義的行為。
如果一個 Readable 流透過管道連線到一個 Writable 流,當 Writable 發出錯誤時,Readable 流將被取消管道連線。
const { Writable } = require('node:stream');
const myWritable = new Writable({
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'));
} else {
callback();
}
},
});
一個可寫流的例子#
下面展示了一個相當簡單(且有些無意義)的自定義 Writable 流實現。雖然這個特定的 Writable 流例項並沒有什麼實際用途,但這個例子展示了自定義 Writable 流例項的每個必需元素。
const { Writable } = require('node:stream');
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'));
} else {
callback();
}
}
}
在可寫流中解碼緩衝區#
解碼緩衝區是一項常見任務,例如,當使用輸入為字串的轉換器時。當使用多位元組字元編碼(如 UTF-8)時,這並不是一個簡單的過程。下面的例子展示瞭如何使用 StringDecoder 和 Writable 來解碼多位元組字串。
const { Writable } = require('node:stream');
const { StringDecoder } = require('node:string_decoder');
class StringWritable extends Writable {
constructor(options) {
super(options);
this._decoder = new StringDecoder(options?.defaultEncoding);
this.data = '';
}
_write(chunk, encoding, callback) {
if (encoding === 'buffer') {
chunk = this._decoder.write(chunk);
}
this.data += chunk;
callback();
}
_final(callback) {
this.data += this._decoder.end();
callback();
}
}
const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from);
const w = new StringWritable();
w.write('currency: ');
w.write(euro[0]);
w.end(euro[1]);
console.log(w.data); // currency: €
實現一個可讀流#
stream.Readable 類被擴充套件以實現一個 Readable 流。
自定義的 Readable 流必須呼叫 new stream.Readable([options]) 建構函式並實現 readable._read() 方法。
new stream.Readable([options])#
options<Object>highWaterMark<number> 在停止從底層資源讀取之前,內部緩衝區中儲存的最大位元組數。預設值:65536(64 KiB),對於objectMode流則為16。encoding<string> 如果指定,則緩衝區將使用指定的編碼解碼為字串。預設值:null。objectMode<boolean> 此流是否應表現為物件流。這意味著stream.read(n)返回單個值而不是大小為n的Buffer。預設值:false。emitClose<boolean> 流在被銷燬後是否應發出'close'事件。預設值:true。read<Function>stream._read()方法的實現。destroy<Function>stream._destroy()方法的實現。construct<Function>stream._construct()方法的實現。autoDestroy<boolean> 此流在結束後是否應自動呼叫.destroy()。預設值:true。signal<AbortSignal> 一個表示可能取消的訊號。
const { Readable } = require('node:stream');
class MyReadable extends Readable {
constructor(options) {
// Calls the stream.Readable(options) constructor.
super(options);
// ...
}
}
或者,在使用 ES6 之前的建構函式風格時
const { Readable } = require('node:stream');
const util = require('node:util');
function MyReadable(options) {
if (!(this instanceof MyReadable))
return new MyReadable(options);
Readable.call(this, options);
}
util.inherits(MyReadable, Readable);
或者,使用簡化的建構函式方法
const { Readable } = require('node:stream');
const myReadable = new Readable({
read(size) {
// ...
},
});
在與傳遞的 AbortSignal 對應的 AbortController 上呼叫 abort 的行為,將與在建立的可讀流上呼叫 .destroy(new AbortError()) 的行為相同。
const { Readable } = require('node:stream');
const controller = new AbortController();
const read = new Readable({
read(size) {
// ...
},
signal: controller.signal,
});
// Later, abort the operation closing the stream
controller.abort();
readable._construct(callback)#
callback<Function> 當流完成初始化時呼叫此函式(可選地帶一個錯誤引數)。
_construct() 方法不能被直接呼叫。它可以由子類實現,如果實現,則僅由內部 Readable 類的方法呼叫。
這個可選函式將由流建構函式在下一個 tick 中安排執行,將任何 _read() 和 _destroy() 呼叫延遲到 callback 被呼叫之後。這對於在流可以使用之前初始化狀態或非同步初始化資源很有用。
const { Readable } = require('node:stream');
const fs = require('node:fs');
class ReadStream extends Readable {
constructor(filename) {
super();
this.filename = filename;
this.fd = null;
}
_construct(callback) {
fs.open(this.filename, (err, fd) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_read(n) {
const buf = Buffer.alloc(n);
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err);
} else {
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null);
}
});
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
readable._read(size)#
size<number> 非同步讀取的位元組數
此函式不能由應用程式程式碼直接呼叫。它應該由子類實現,並僅由內部 Readable 類的方法呼叫。
所有 Readable 流的實現都必須提供一個 readable._read() 方法的實現,以從底層資源獲取資料。
當 readable._read() 被呼叫時,如果資源中有可用資料,實現應該開始使用 this.push(dataChunk) 方法將資料推入讀取佇列。在每次呼叫 this.push(dataChunk) 之後,一旦流準備好接受更多資料,_read() 將再次被呼叫。_read() 可以繼續從資源中讀取並推送資料,直到 readable.push() 返回 false。只有在它停止後再次呼叫 _read() 時,它才應恢復向佇列中推送額外的資料。
一旦 readable._read() 方法被呼叫,直到透過 readable.push() 方法推送更多資料之前,它不會再次被呼叫。空資料,如空緩衝區和空字串,不會導致 readable._read() 被呼叫。
size 引數是建議性的。對於那些“讀取”是返回資料的單個操作的實現,可以使用 size 引數來確定要獲取多少資料。其他實現可以忽略此引數,並在資料可用時簡單地提供資料。沒有必要在呼叫 stream.push(chunk) 之前“等待”直到 size 位元組可用。
readable._read() 方法帶有下劃線字首,因為它對於定義它的類是內部的,並且絕不應由使用者程式直接呼叫。
readable._destroy(err, callback)#
err<Error> 一個可能的錯誤。callback<Function> 一個回撥函式,它接受一個可選的錯誤引數。
_destroy() 方法由 readable.destroy() 呼叫。它可以被子類覆蓋,但絕不能直接呼叫。
readable.push(chunk[, encoding])#
chunk<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> 要推入讀取佇列的資料塊。對於非物件模式的流,chunk必須是 <string>、<Buffer>、<TypedArray> 或 <DataView>。對於物件模式的流,chunk可以是任何 JavaScript 值。encoding<string> 字串塊的編碼。必須是有效的Buffer編碼,如'utf8'或'ascii'。- 返回: <boolean> 如果可以繼續推送額外的資料塊,則為
true;否則為false。
當 chunk 是一個 <Buffer>、<TypedArray>、<DataView> 或 <string> 時,資料塊將被新增到內部佇列中,供流的消費者使用。將 chunk 作為 null 傳遞表示流的結束(EOF),之後不能再寫入任何資料。
當 Readable 流在暫停模式下執行時,可以透過在 'readable' 事件觸發時呼叫 readable.read() 方法來讀取透過 readable.push() 新增的資料。
當 Readable 流在流動模式下執行時,透過 readable.push() 新增的資料會透過觸發 'data' 事件來交付。
readable.push() 方法的設計儘可能靈活。例如,當包裝一個提供某種形式的暫停/恢復機制和資料回撥的底層源時,可以透過自定義的 Readable 例項來包裝這個底層源。
// `_source` is an object with readStop() and readStart() methods,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over.
class SourceWrapper extends Readable {
constructor(options) {
super(options);
this._source = getLowLevelSourceObject();
// Every time there's data, push it into the internal buffer.
this._source.ondata = (chunk) => {
// If push() returns false, then stop reading from source.
if (!this.push(chunk))
this._source.readStop();
};
// When the source ends, push the EOF-signaling `null` chunk.
this._source.onend = () => {
this.push(null);
};
}
// _read() will be called when the stream wants to pull more data in.
// The advisory size argument is ignored in this case.
_read(size) {
this._source.readStart();
}
}
readable.push() 方法用於將內容推送到內部緩衝區。它可以由 readable._read() 方法驅動。
對於非物件模式下執行的流,如果 readable.push() 的 chunk 引數是 undefined,它將被視為空字串或空緩衝區。更多資訊請參見 readable.push('')。
讀取時出錯#
在處理 readable._read() 期間發生的錯誤必須透過 readable.destroy(err) 方法傳播。在 readable._read() 內部丟擲 Error 或手動觸發 'error' 事件會導致未定義的行為。
const { Readable } = require('node:stream');
const myReadable = new Readable({
read(size) {
const err = checkSomeErrorCondition();
if (err) {
this.destroy(err);
} else {
// Do some work.
}
},
});
一個計數流的例子#
下面是一個 Readable 流的基本示例,它會按升序發出從 1 到 1,000,000 的數字,然後結束。
const { Readable } = require('node:stream');
class Counter extends Readable {
constructor(opt) {
super(opt);
this._max = 1000000;
this._index = 1;
}
_read() {
const i = this._index++;
if (i > this._max)
this.push(null);
else {
const str = String(i);
const buf = Buffer.from(str, 'ascii');
this.push(buf);
}
}
}
實現一個雙工流#
一個 Duplex(雙工)流是同時實現了 Readable 和 Writable 的流,例如 TCP 套接字連線。
由於 JavaScript 不支援多重繼承,stream.Duplex 類被擴充套件以實現 Duplex 流(而不是同時擴充套件 stream.Readable *和* stream.Writable 類)。
stream.Duplex 類在原型上繼承自 stream.Readable,並寄生式地繼承自 stream.Writable,但由於在 stream.Writable 上重寫了 Symbol.hasInstance,instanceof 對於這兩個基類都能正常工作。
自定義的 Duplex 流*必須*呼叫 new stream.Duplex([options]) 建構函式,並實現 readable._read() 和 writable._write() 這兩個方法。
new stream.Duplex(options)#
options<Object> 傳遞給Writable和Readable的建構函式。同時還有以下欄位:allowHalfOpen<boolean> 如果設定為false,那麼當可讀端結束時,流將自動結束可寫端。預設值:true。readable<boolean> 設定Duplex是否應為可讀的。預設值:true。writable<boolean> 設定Duplex是否應為可寫的。預設值:true。readableObjectMode<boolean> 為流的可讀端設定objectMode。如果objectMode為true,則此項無效。預設值:false。writableObjectMode<boolean> 為流的可寫端設定objectMode。如果objectMode為true,則此項無效。預設值:false。readableHighWaterMark<number> 為流的可讀端設定highWaterMark。如果已提供了highWaterMark,則此項無效。writableHighWaterMark<number> 為流的可寫端設定highWaterMark。如果已提供了highWaterMark,則此項無效。
const { Duplex } = require('node:stream');
class MyDuplex extends Duplex {
constructor(options) {
super(options);
// ...
}
}
或者,在使用 ES6 之前的建構函式風格時
const { Duplex } = require('node:stream');
const util = require('node:util');
function MyDuplex(options) {
if (!(this instanceof MyDuplex))
return new MyDuplex(options);
Duplex.call(this, options);
}
util.inherits(MyDuplex, Duplex);
或者,使用簡化的建構函式方法
const { Duplex } = require('node:stream');
const myDuplex = new Duplex({
read(size) {
// ...
},
write(chunk, encoding, callback) {
// ...
},
});
當使用管道時
const { Transform, pipeline } = require('node:stream');
const fs = require('node:fs');
pipeline(
fs.createReadStream('object.json')
.setEncoding('utf8'),
new Transform({
decodeStrings: false, // Accept string input rather than Buffers
construct(callback) {
this.data = '';
callback();
},
transform(chunk, encoding, callback) {
this.data += chunk;
callback();
},
flush(callback) {
try {
// Make sure is valid json.
JSON.parse(this.data);
this.push(this.data);
callback();
} catch (err) {
callback(err);
}
},
}),
fs.createWriteStream('valid-object.json'),
(err) => {
if (err) {
console.error('failed', err);
} else {
console.log('completed');
}
},
);
一個雙工流的例子#
下面演示了一個簡單的 Duplex 流示例,它包裝了一個假設的底層源物件,資料可以寫入該物件,也可以從中讀取資料,儘管其使用的 API 與 Node.js 流不相容。下面這個簡單的 Duplex 流示例,透過 Writable 介面緩衝傳入的寫入資料,然後透過 Readable 介面將這些資料讀出。
const { Duplex } = require('node:stream');
const kSource = Symbol('source');
class MyDuplex extends Duplex {
constructor(source, options) {
super(options);
this[kSource] = source;
}
_write(chunk, encoding, callback) {
// The underlying source only deals with strings.
if (Buffer.isBuffer(chunk))
chunk = chunk.toString();
this[kSource].writeSomeData(chunk);
callback();
}
_read(size) {
this[kSource].fetchSomeData(size, (data, encoding) => {
this.push(Buffer.from(data, encoding));
});
}
}
Duplex 流最重要的一個方面是,儘管 Readable 和 Writable 兩側共存於同一個物件例項中,但它們是獨立運作的。
物件模式的雙工流#
對於 Duplex 流,可以使用 readableObjectMode 和 writableObjectMode 選項分別為 Readable 或 Writable 端單獨設定 objectMode。
例如,在下面的例子中,建立了一個新的 Transform 流(它是一種 Duplex 流),其可寫端是物件模式,接受 JavaScript 數字,然後在可讀端將這些數字轉換為十六進位制字串。
const { Transform } = require('node:stream');
// All Transform streams are also Duplex Streams.
const myTransform = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
// Coerce the chunk to a number if necessary.
chunk |= 0;
// Transform the chunk into something else.
const data = chunk.toString(16);
// Push the data onto the readable queue.
callback(null, '0'.repeat(data.length % 2) + data);
},
});
myTransform.setEncoding('ascii');
myTransform.on('data', (chunk) => console.log(chunk));
myTransform.write(1);
// Prints: 01
myTransform.write(10);
// Prints: 0a
myTransform.write(100);
// Prints: 64
實現一個轉換流#
一個 Transform(轉換)流是一種 Duplex 流,其輸出是以某種方式根據輸入計算得出的。例子包括用於壓縮、加密或解密資料的 zlib 流或 crypto 流。
輸出的大小、塊數或到達時間不要求與輸入相同。例如,一個 Hash 流永遠只有一個輸出塊,它在輸入結束時提供。一個 zlib 流產生的輸出可能比其輸入小得多或大得多。
要實現一個 Transform 流,需要擴充套件 stream.Transform 類。
stream.Transform 類在原型上繼承自 stream.Duplex,並實現了它自己版本的 writable._write() 和 readable._read() 方法。自定義的 Transform 實現*必須*實現 transform._transform() 方法,並且*可以*實現 transform._flush() 方法。
使用 Transform 流時必須小心,因為如果可讀端的輸出未被消費,寫入流的資料可能會導致流的可寫端暫停。
new stream.Transform([options])#
options<Object> 傳遞給Writable和Readable的建構函式。同時還有以下欄位:transform<Function>stream._transform()方法的實現。flush<Function>stream._flush()方法的實現。
const { Transform } = require('node:stream');
class MyTransform extends Transform {
constructor(options) {
super(options);
// ...
}
}
或者,在使用 ES6 之前的建構函式風格時
const { Transform } = require('node:stream');
const util = require('node:util');
function MyTransform(options) {
if (!(this instanceof MyTransform))
return new MyTransform(options);
Transform.call(this, options);
}
util.inherits(MyTransform, Transform);
或者,使用簡化的建構函式方法
const { Transform } = require('node:stream');
const myTransform = new Transform({
transform(chunk, encoding, callback) {
// ...
},
});
事件:'end'#
'end' 事件來自 stream.Readable 類。'end' 事件在所有資料都已輸出後觸發,這發生在 transform._flush() 中的回撥被呼叫之後。如果發生錯誤,則不應觸發 'end' 事件。
事件:'finish'#
'finish' 事件來自 stream.Writable 類。'finish' 事件在呼叫 stream.end() 並且所有資料塊都已被 stream._transform() 處理後觸發。如果發生錯誤,則不應觸發 'finish' 事件。
transform._flush(callback)#
callback<Function> 一個回撥函式(可選地帶有一個錯誤引數和資料),在剩餘資料被刷清時呼叫。
此函式不能由應用程式程式碼直接呼叫。它應該由子類實現,並僅由內部 Readable 類的方法呼叫。
在某些情況下,轉換操作可能需要在流的末尾發出一些額外的資料。例如,一個 zlib 壓縮流會儲存一定量的內部狀態,用於最佳化壓縮輸出。然而,當流結束時,需要刷清這些額外的資料,以確保壓縮資料是完整的。
自定義的 Transform 實現*可以*實現 transform._flush() 方法。這將在沒有更多寫入資料需要消費時被呼叫,但在標誌 Readable 流結束的 'end' 事件觸發之前。
在 transform._flush() 實現中,可以根據需要呼叫 transform.push() 方法零次或多次。當刷清操作完成時,必須呼叫 callback 函式。
transform._flush() 方法帶有一個下劃線字首,因為它是定義它的類的內部方法,絕不應被使用者程式直接呼叫。
transform._transform(chunk, encoding, callback)#
chunk<Buffer> | <string> | <any> 要轉換的Buffer,由傳遞給stream.write()的string轉換而來。如果流的decodeStrings選項為false或流在物件模式下執行,則該資料塊不會被轉換,而是保持傳遞給stream.write()的原始值。encoding<string> 如果資料塊是字串,則這是編碼型別。如果資料塊是 buffer,則這是一個特殊值'buffer'。在這種情況下可以忽略它。callback<Function> 一個回撥函式(可選地帶有一個錯誤引數和資料),在提供的chunk被處理後呼叫。
此函式不能由應用程式程式碼直接呼叫。它應該由子類實現,並僅由內部 Readable 類的方法呼叫。
所有 Transform 流的實現都必須提供一個 _transform() 方法來接收輸入和產生輸出。transform._transform() 的實現處理正在寫入的位元組,計算出一個輸出,然後使用 transform.push() 方法將該輸出傳遞給可讀部分。
transform.push() 方法可以被呼叫零次或多次,以從單個輸入塊生成輸出,這取決於該塊需要產生多少輸出。
可能任何給定的輸入資料塊都不會產生任何輸出。
只有在當前資料塊被完全消費後,才必須呼叫 callback 函式。如果處理輸入時發生錯誤,傳遞給 callback 的第一個引數必須是一個 Error 物件,否則為 null。如果第二個引數被傳遞給 callback,它將被轉發給 transform.push() 方法,但前提是第一個引數為假值。換句話說,以下兩種寫法是等價的:
transform.prototype._transform = function(data, encoding, callback) {
this.push(data);
callback();
};
transform.prototype._transform = function(data, encoding, callback) {
callback(null, data);
};
transform._transform() 方法帶有一個下劃線字首,因為它是定義它的類的內部方法,絕不應被使用者程式直接呼叫。
transform._transform() 永遠不會並行呼叫;流實現了一個佇列機制,要接收下一個資料塊,必須呼叫 callback,可以是同步的也可以是非同步的。
附加說明#
流與非同步生成器和非同步迭代器的相容性#
隨著 JavaScript 對非同步生成器和迭代器的支援,非同步生成器在目前實際上已成為一種一流的語言級流構造。
下面提供了一些將 Node.js 流與非同步生成器和非同步迭代器一起使用的常見互操作案例。
使用非同步迭代器消費可讀流#
(async function() {
for await (const chunk of readable) {
console.log(chunk);
}
})();
非同步迭代器會在流上註冊一個永久的錯誤處理器,以防止任何銷燬後未處理的錯誤。
使用非同步生成器建立可讀流#
可以使用 Readable.from() 工具方法從非同步生成器建立 Node.js 可讀流。
const { Readable } = require('node:stream');
const ac = new AbortController();
const signal = ac.signal;
async function * generate() {
yield 'a';
await someLongRunningFn({ signal });
yield 'b';
yield 'c';
}
const readable = Readable.from(generate());
readable.on('close', () => {
ac.abort();
});
readable.on('data', (chunk) => {
console.log(chunk);
});
從非同步迭代器管道到可寫流#
當從非同步迭代器寫入可寫流時,請確保正確處理背壓和錯誤。stream.pipeline() 抽象了背壓及相關錯誤的處理。
const fs = require('node:fs');
const { pipeline } = require('node:stream');
const { pipeline: pipelinePromise } = require('node:stream/promises');
const writable = fs.createWriteStream('./file');
const ac = new AbortController();
const signal = ac.signal;
const iterator = createIterator({ signal });
// Callback Pattern
pipeline(iterator, writable, (err, value) => {
if (err) {
console.error(err);
} else {
console.log(value, 'value returned');
}
}).on('close', () => {
ac.abort();
});
// Promise Pattern
pipelinePromise(iterator, writable)
.then((value) => {
console.log(value, 'value returned');
})
.catch((err) => {
console.error(err);
ac.abort();
});
與舊版 Node.js 的相容性#
在 Node.js 0.10 之前,Readable 流的介面更簡單,但也功能較弱且用處不大。
'data'事件會立即開始觸發,而不是等待對stream.read()方法的呼叫。那些需要執行一些工作來決定如何處理資料的應用程式,必須將讀取的資料儲存到緩衝區中,以防資料丟失。stream.pause()方法是建議性的,而不是保證性的。這意味著,*即使流處於暫停狀態*,仍然需要準備好接收'data'事件。
在 Node.js 0.10 中,添加了 Readable 類。為了與舊的 Node.js 程式向後相容,當新增 'data' 事件處理器或呼叫 stream.resume() 方法時,Readable 流會切換到“流動模式”。其效果是,即使不使用新的 stream.read() 方法和 'readable' 事件,也不再需要擔心丟失 'data' 資料塊了。
雖然大多數應用程式會繼續正常執行,但這在以下情況下引入了一個邊界情況:
- 沒有新增
'data'事件監聽器。 - 從未呼叫
stream.resume()方法。 - 流沒有被管道到任何可寫的目標。
例如,考慮以下程式碼:
// WARNING! BROKEN!
net.createServer((socket) => {
// We add an 'end' listener, but never consume the data.
socket.on('end', () => {
// It will never get here.
socket.end('The message was received but was not processed.\n');
});
}).listen(1337);
在 Node.js 0.10 之前,傳入的訊息資料會被簡單地丟棄。然而,在 Node.js 0.10 及更高版本中,套接字會永遠保持暫停狀態。
這種情況下的解決方法是呼叫 stream.resume() 方法來開始資料流動。
// Workaround.
net.createServer((socket) => {
socket.on('end', () => {
socket.end('The message was received but was not processed.\n');
});
// Start the flow of data, discarding it.
socket.resume();
}).listen(1337);
除了新的 Readable 流會切換到流動模式外,0.10 之前的舊式流可以使用 readable.wrap() 方法包裝在一個 Readable 類中。
readable.read(0)#
在某些情況下,有必要觸發底層可讀流機制的重新整理,而不實際消費任何資料。在這種情況下,可以呼叫 readable.read(0),它將始終返回 null。
如果內部讀取緩衝區低於 highWaterMark,並且流當前沒有在讀取,那麼呼叫 stream.read(0) 將觸發一個底層的 stream._read() 呼叫。
雖然大多數應用程式幾乎永遠不需要這樣做,但在 Node.js 的某些情況下會這樣做,特別是在 Readable 流類的內部實現中。
readable.push('')#
不建議使用 readable.push('')。
將一個零位元組的 <string>、<Buffer>、<TypedArray> 或 <DataView> 推送到一個非物件模式的流中會產生一個有趣的副作用。因為這*是*一次對 readable.push() 的呼叫,所以這個呼叫會結束讀取過程。然而,由於引數是一個空字串,沒有資料被新增到可讀緩衝區,所以使用者沒有任何東西可以消費。
呼叫 readable.setEncoding() 後 highWaterMark 的差異#
使用 readable.setEncoding() 會改變 highWaterMark 在非物件模式下的行為方式。
通常,當前緩衝區的大小是根據 highWaterMark 以*位元組*為單位來衡量的。然而,在呼叫 setEncoding() 之後,比較函式將開始以*字元*為單位來衡量緩衝區的大小。
在處理 latin1 或 ascii 的常見情況下,這不成問題。但是,在處理可能包含多位元組字元的字串時,建議注意這種行為。