Web Streams API#

穩定性:2 - 穩定

WHATWG Streams 標準的實現。

概述#

WHATWG Streams 標準(或稱“Web Streams”)定義了一個用於處理流式資料的 API。它與 Node.js 的 Streams API 類似,但出現得更晚,並已成為跨多個 JavaScript 環境處理流式資料的“標準”API。

主要有三種類型的物件

  • ReadableStream - 表示流式資料的來源。
  • WritableStream - 表示流式資料的目的地。
  • TransformStream - 表示轉換流式資料的演算法。

ReadableStream 示例#

此示例建立了一個簡單的 ReadableStream,它會每秒推送一次當前的 performance.now() 時間戳,並無限持續。一個非同步可迭代物件被用來從流中讀取資料。

import {
  ReadableStream,
} from 'node:stream/web';

import {
  setInterval as every,
} from 'node:timers/promises';

import {
  performance,
} from 'node:perf_hooks';

const SECOND = 1000;

const stream = new ReadableStream({
  async start(controller) {
    for await (const _ of every(SECOND))
      controller.enqueue(performance.now());
  },
});

for await (const value of stream)
  console.log(value);const {
  ReadableStream,
} = require('node:stream/web');

const {
  setInterval: every,
} = require('node:timers/promises');

const {
  performance,
} = require('node:perf_hooks');

const SECOND = 1000;

const stream = new ReadableStream({
  async start(controller) {
    for await (const _ of every(SECOND))
      controller.enqueue(performance.now());
  },
});

(async () => {
  for await (const value of stream)
    console.log(value);
})();

Node.js 流的互操作性#

Node.js 流可以透過 stream.Readablestream.Writablestream.Duplex 物件上的 toWebfromWeb 方法與 Web Streams 進行相互轉換。

更多詳情請參閱相關文件

API#

類: ReadableStream#

new ReadableStream([underlyingSource [, strategy]])#
  • underlyingSource <Object>
    • start <Function> 一個使用者定義的函式,在 ReadableStream 建立時立即被呼叫。
    • pull <Function> 一個使用者定義的函式,當 ReadableStream 內部佇列未滿時會重複呼叫。此操作可以是同步或非同步的。如果是非同步的,直到前一個返回的 promise 被兌現後,該函式才會再次被呼叫。
    • cancel <Function> 一個使用者定義的函式,當 ReadableStream 被取消時呼叫。
      • reason <any>
      • 返回:一個解析為 undefined 的 promise。
    • type <string> 必須是 'bytes'undefined
    • autoAllocateChunkSize <number> 僅在 type 等於 'bytes' 時使用。當設定為非零值時,會自動為 ReadableByteStreamController.byobRequest 分配一個檢視緩衝區。未設定時,必須使用流的內部佇列透過預設讀取器 ReadableStreamDefaultReader 來傳輸資料。
  • strategy <Object>
    • highWaterMark <number> 在施加背壓之前的最大內部佇列大小。
    • size <Function> 一個使用者定義的函式,用於確定每個資料塊的大小。
readableStream.locked#

readableStream.locked 屬性預設為 false,當有活動的讀取器消耗流資料時,會切換為 true

readableStream.cancel([reason])#
  • reason <any>
  • 返回:一個在取消操作完成後解析為 undefined 的 promise。
readableStream.getReader([options])#
import { ReadableStream } from 'node:stream/web';

const stream = new ReadableStream();

const reader = stream.getReader();

console.log(await reader.read());const { ReadableStream } = require('node:stream/web');

const stream = new ReadableStream();

const reader = stream.getReader();

reader.read().then(console.log);

使 readableStream.locked 變為 true

readableStream.pipeThrough(transform[, options])#
  • transform <Object>
    • readable <ReadableStream> transform.writable 會將從該 ReadableStream 接收到的可能已修改的資料推送到此 ReadableStream 中。
    • writable <WritableStream>ReadableStream 的資料將被寫入的 WritableStream
  • options <Object>
    • preventAbort <boolean> 當為 true 時,此 ReadableStream 中的錯誤不會導致 transform.writable 被中止。
    • preventCancel <boolean> 當為 true 時,目標 transform.writable 中的錯誤不會導致此 ReadableStream 被取消。
    • preventClose <boolean> 當為 true 時,關閉此 ReadableStream 不會導致 transform.writable 被關閉。
    • signal <AbortSignal> 允許使用 <AbortController> 來取消資料傳輸。
  • 返回:<ReadableStream> 來自 transform.readable

將此 <ReadableStream> 連線到 transform 引數中提供的一對 <ReadableStream><WritableStream>,使得此 <ReadableStream> 的資料被寫入 transform.writable,可能經過轉換,然後推送到 transform.readable。一旦管道配置完成,將返回 transform.readable

在管道操作期間,使 readableStream.locked 變為 true

import {
  ReadableStream,
  TransformStream,
} from 'node:stream/web';

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('a');
  },
});

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  },
});

const transformedStream = stream.pipeThrough(transform);

for await (const chunk of transformedStream)
  console.log(chunk);
  // Prints: Aconst {
  ReadableStream,
  TransformStream,
} = require('node:stream/web');

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('a');
  },
});

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  },
});

const transformedStream = stream.pipeThrough(transform);

(async () => {
  for await (const chunk of transformedStream)
    console.log(chunk);
    // Prints: A
})();
readableStream.pipeTo(destination[, options])#
  • destination <WritableStream>ReadableStream 的資料將被寫入的 <WritableStream>
  • options <Object>
    • preventAbort <boolean> 當為 true 時,此 ReadableStream 中的錯誤不會導致 destination 被中止。
    • preventCancel <boolean> 當為 true 時,destination 中的錯誤不會導致此 ReadableStream 被取消。
    • preventClose <boolean> 當為 true 時,關閉此 ReadableStream 不會導致 destination 被關閉。
    • signal <AbortSignal> 允許使用 <AbortController> 來取消資料傳輸。
  • 返回:一個解析為 undefined 的 promise

在管道操作期間,使 readableStream.locked 變為 true

readableStream.tee()#

返回一對新的 <ReadableStream> 例項,此 ReadableStream 的資料將被轉發到這兩個例項中。每個例項將接收相同的資料。

使 readableStream.locked 變為 true

readableStream.values([options])#

建立並返回一個可用於消費此 ReadableStream 資料的非同步迭代器。

在非同步迭代器活動期間,使 readableStream.locked 變為 true

import { Buffer } from 'node:buffer';

const stream = new ReadableStream(getSomeSource());

for await (const chunk of stream.values({ preventCancel: true }))
  console.log(Buffer.from(chunk).toString()); 
非同步迭代#

<ReadableStream> 物件支援使用 for await 語法的非同步迭代器協議。

import { Buffer } from 'node:buffer';

const stream = new ReadableStream(getSomeSource());

for await (const chunk of stream)
  console.log(Buffer.from(chunk).toString()); 

非同步迭代器將消費 <ReadableStream> 直到其終止。

預設情況下,如果非同步迭代器提前退出(透過 breakreturnthrow),<ReadableStream> 將被關閉。為防止自動關閉 <ReadableStream>,請使用 readableStream.values() 方法獲取非同步迭代器,並將 preventCancel 選項設定為 true

<ReadableStream> 必須未被鎖定(即,它不能有已存在的活動讀取器)。在非同步迭代期間,<ReadableStream> 將被鎖定。

使用 postMessage() 傳輸#

一個 <ReadableStream> 例項可以使用 <MessagePort> 進行傳輸。

const stream = new ReadableStream(getReadableSourceSomehow());

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => {
  data.getReader().read().then((chunk) => {
    console.log(chunk);
  });
};

port2.postMessage(stream, [stream]); 

ReadableStream.from(iterable)#

  • iterable <Iterable> 實現 Symbol.asyncIteratorSymbol.iterator 可迭代協議的物件。

一個從可迭代物件建立新的 <ReadableStream> 的實用工具方法。

import { ReadableStream } from 'node:stream/web';

async function* asyncIterableGenerator() {
  yield 'a';
  yield 'b';
  yield 'c';
}

const stream = ReadableStream.from(asyncIterableGenerator());

for await (const chunk of stream)
  console.log(chunk); // Prints: 'a', 'b', 'c'const { ReadableStream } = require('node:stream/web');

async function* asyncIterableGenerator() {
  yield 'a';
  yield 'b';
  yield 'c';
}

(async () => {
  const stream = ReadableStream.from(asyncIterableGenerator());

  for await (const chunk of stream)
    console.log(chunk); // Prints: 'a', 'b', 'c'
})();

要將生成的 <ReadableStream> 管道傳輸到一個 <WritableStream><Iterable> 應該產生一個由 <Buffer><TypedArray><DataView> 物件組成的序列。

import { ReadableStream } from 'node:stream/web';
import { Buffer } from 'node:buffer';

async function* asyncIterableGenerator() {
  yield Buffer.from('a');
  yield Buffer.from('b');
  yield Buffer.from('c');
}

const stream = ReadableStream.from(asyncIterableGenerator());

await stream.pipeTo(createWritableStreamSomehow());const { ReadableStream } = require('node:stream/web');
const { Buffer } = require('node:buffer');

async function* asyncIterableGenerator() {
  yield Buffer.from('a');
  yield Buffer.from('b');
  yield Buffer.from('c');
}

const stream = ReadableStream.from(asyncIterableGenerator());

(async () => {
  await stream.pipeTo(createWritableStreamSomehow());
})();

類: ReadableStreamDefaultReader#

預設情況下,不帶引數呼叫 readableStream.getReader() 將返回一個 ReadableStreamDefaultReader 的例項。預設讀取器將透過流的資料塊視為不透明的值,這使得 <ReadableStream> 可以處理幾乎任何 JavaScript 值。

new ReadableStreamDefaultReader(stream)#

建立一個新的 <ReadableStreamDefaultReader>,並鎖定到給定的 <ReadableStream>

readableStreamDefaultReader.cancel([reason])#
  • reason <any>
  • 返回:一個解析為 undefined 的 promise。

取消 <ReadableStream> 並返回一個在底層流被取消時兌現的 promise。

readableStreamDefaultReader.closed#
  • 型別: <Promise> 當關聯的 <ReadableStream> 關閉時,解析為 undefined;如果流出錯或在流完成關閉前讀取器的鎖被釋放,則被拒絕。
readableStreamDefaultReader.read()#
  • 返回:一個解析為物件的 promise

從底層的 <ReadableStream> 請求下一個資料塊,並返回一個在資料可用時解析為該資料的 promise。

readableStreamDefaultReader.releaseLock()#

釋放此讀取器對底層 <ReadableStream> 的鎖定。

類: ReadableStreamBYOBReader#

ReadableStreamBYOBReader 是面向位元組的 <ReadableStream>(即在建立 ReadableStreamunderlyingSource.type 設定為 'bytes' 的流)的另一種消費者。

BYOB 是“bring your own buffer”(自帶緩衝區)的縮寫。這是一種模式,可以更高效地讀取面向位元組的資料,避免不必要的複製。

import {
  open,
} from 'node:fs/promises';

import {
  ReadableStream,
} from 'node:stream/web';

import { Buffer } from 'node:buffer';

class Source {
  type = 'bytes';
  autoAllocateChunkSize = 1024;

  async start(controller) {
    this.file = await open(new URL(import.meta.url));
    this.controller = controller;
  }

  async pull(controller) {
    const view = controller.byobRequest?.view;
    const {
      bytesRead,
    } = await this.file.read({
      buffer: view,
      offset: view.byteOffset,
      length: view.byteLength,
    });

    if (bytesRead === 0) {
      await this.file.close();
      this.controller.close();
    }
    controller.byobRequest.respond(bytesRead);
  }
}

const stream = new ReadableStream(new Source());

async function read(stream) {
  const reader = stream.getReader({ mode: 'byob' });

  const chunks = [];
  let result;
  do {
    result = await reader.read(Buffer.alloc(100));
    if (result.value !== undefined)
      chunks.push(Buffer.from(result.value));
  } while (!result.done);

  return Buffer.concat(chunks);
}

const data = await read(stream);
console.log(Buffer.from(data).toString()); 
new ReadableStreamBYOBReader(stream)#

建立一個新的 ReadableStreamBYOBReader,並鎖定到給定的 <ReadableStream>

readableStreamBYOBReader.cancel([reason])#
  • reason <any>
  • 返回:一個解析為 undefined 的 promise。

取消 <ReadableStream> 並返回一個在底層流被取消時兌現的 promise。

readableStreamBYOBReader.closed#
  • 型別: <Promise> 當關聯的 <ReadableStream> 關閉時,解析為 undefined;如果流出錯或在流完成關閉前讀取器的鎖被釋放,則被拒絕。
readableStreamBYOBReader.read(view[, options])#

從底層的 <ReadableStream> 請求下一個資料塊,並返回一個在資料可用時解析為該資料的 promise。

不要將池化的 <Buffer> 物件例項傳遞給此方法。池化的 Buffer 物件是使用 Buffer.allocUnsafe()Buffer.from() 建立的,或者通常由各種 node:fs 模組的回撥返回。這些型別的 Buffer 使用共享的底層 <ArrayBuffer> 物件,該物件包含所有池化 Buffer 例項的資料。當一個 Buffer<TypedArray><DataView> 傳遞到 readableStreamBYOBReader.read() 中時,檢視的底層 ArrayBuffer 會被分離,這會使該 ArrayBuffer 上的所有現有檢視失效。這可能會對您的應用程式造成災難性的後果。

readableStreamBYOBReader.releaseLock()#

釋放此讀取器對底層 <ReadableStream> 的鎖定。

類: ReadableStreamDefaultController#

每個 <ReadableStream> 都有一個控制器,負責流的內部狀態和佇列管理。ReadableStreamDefaultController 是非面向位元組的 ReadableStream 的預設控制器實現。

readableStreamDefaultController.close()#

關閉與此控制器關聯的 <ReadableStream>

readableStreamDefaultController.desiredSize#

返回填滿 <ReadableStream> 佇列所需的剩餘資料量。

readableStreamDefaultController.enqueue([chunk])#

將一個新的資料塊附加到 <ReadableStream> 的佇列中。

readableStreamDefaultController.error([error])#

發出一個錯誤訊號,導致 <ReadableStream> 出錯並關閉。

類: ReadableByteStreamController#

每個 <ReadableStream> 都有一個控制器,負責流的內部狀態和佇列管理。ReadableByteStreamController 用於面向位元組的 ReadableStream

readableByteStreamController.byobRequest#
readableByteStreamController.close()#

關閉與此控制器關聯的 <ReadableStream>

readableByteStreamController.desiredSize#

返回填滿 <ReadableStream> 佇列所需的剩餘資料量。

readableByteStreamController.enqueue(chunk)#

將一個新的資料塊附加到 <ReadableStream> 的佇列中。

readableByteStreamController.error([error])#

發出一個錯誤訊號,導致 <ReadableStream> 出錯並關閉。

類: ReadableStreamBYOBRequest#

在面向位元組的流中使用 ReadableByteStreamController,並且使用 ReadableStreamBYOBReader 時,readableByteStreamController.byobRequest 屬性提供對 ReadableStreamBYOBRequest 例項的訪問,該例項代表當前的讀取請求。此物件用於訪問為讀取請求提供的 ArrayBuffer/TypedArray,並提供用於通知資料已提供的方法。

readableStreamBYOBRequest.respond(bytesWritten)#

通知已有 bytesWritten 數量的位元組被寫入到 readableStreamBYOBRequest.view

readableStreamBYOBRequest.respondWithNewView(view)#

通知請求已透過寫入位元組到一個新的 BufferTypedArrayDataView 中來完成。

readableStreamBYOBRequest.view#

類: WritableStream#

WritableStream 是流資料傳送的目的地。

import {
  WritableStream,
} from 'node:stream/web';

const stream = new WritableStream({
  write(chunk) {
    console.log(chunk);
  },
});

await stream.getWriter().write('Hello World'); 
new WritableStream([underlyingSink[, strategy]])#
  • underlyingSink <Object>
    • start <Function> 一個使用者定義的函式,在 WritableStream 建立時立即被呼叫。
    • write <Function> 一個使用者定義的函式,當一個數據塊被寫入 WritableStream 時呼叫。
    • close <Function> 一個使用者定義的函式,當 WritableStream 關閉時呼叫。
      • 返回:一個解析為 undefined 的 promise。
    • abort <Function> 一個使用者定義的函式,用於突然關閉 WritableStream
      • reason <any>
      • 返回:一個解析為 undefined 的 promise。
    • type <any> type 選項為將來使用保留,並且必須為 undefined。
  • strategy <Object>
    • highWaterMark <number> 在施加背壓之前的最大內部佇列大小。
    • size <Function> 一個使用者定義的函式,用於確定每個資料塊的大小。
writableStream.abort([reason])#
  • reason <any>
  • 返回:一個解析為 undefined 的 promise。

突然終止 WritableStream。所有排隊的寫入都將被取消,並且其關聯的 promise 將被拒絕。

writableStream.close()#
  • 返回:一個解析為 undefined 的 promise。

在預計沒有更多寫入時關閉 WritableStream

writableStream.getWriter()#

建立並返回一個新的寫入器例項,可用於將資料寫入 WritableStream

writableStream.locked#

writableStream.locked 屬性預設為 false,當有活動的寫入器附加到此 WritableStream 時,會切換為 true

使用 postMessage() 傳輸#

一個 <WritableStream> 例項可以使用 <MessagePort> 進行傳輸。

const stream = new WritableStream(getWritableSinkSomehow());

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => {
  data.getWriter().write('hello');
};

port2.postMessage(stream, [stream]); 

類: WritableStreamDefaultWriter#

new WritableStreamDefaultWriter(stream)#

建立一個新的 WritableStreamDefaultWriter,並鎖定到給定的 WritableStream

writableStreamDefaultWriter.abort([reason])#
  • reason <any>
  • 返回:一個解析為 undefined 的 promise。

突然終止 WritableStream。所有排隊的寫入都將被取消,並且其關聯的 promise 將被拒絕。

writableStreamDefaultWriter.close()#
  • 返回:一個解析為 undefined 的 promise。

在預計沒有更多寫入時關閉 WritableStream

writableStreamDefaultWriter.closed#
  • 型別: <Promise> 當關聯的 <WritableStream> 關閉時,解析為 undefined;如果流出錯或在流完成關閉前寫入器的鎖被釋放,則被拒絕。
writableStreamDefaultWriter.desiredSize#

填滿 <WritableStream> 佇列所需的資料量。

writableStreamDefaultWriter.ready#
  • 型別: <Promise> 當寫入器準備好使用時,解析為 undefined
writableStreamDefaultWriter.releaseLock()#

釋放此寫入器對底層 <ReadableStream> 的鎖定。

writableStreamDefaultWriter.write([chunk])#
  • chunk <any>
  • 返回:一個解析為 undefined 的 promise。

將一個新的資料塊附加到 <WritableStream> 的佇列中。

類: WritableStreamDefaultController#

WritableStreamDefaultController 管理 <WritableStream> 的內部狀態。

writableStreamDefaultController.error([error])#

由使用者程式碼呼叫,以通知在處理 WritableStream 資料時發生錯誤。呼叫時,<WritableStream> 將被中止,當前待處理的寫入將被取消。

writableStreamDefaultController.signal#

類: TransformStream#

一個 TransformStream 由一個 <ReadableStream> 和一個 <WritableStream> 組成,它們被連線起來,使得寫入 WritableStream 的資料在被推送到 ReadableStream 的佇列之前被接收並可能被轉換。

import {
  TransformStream,
} from 'node:stream/web';

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  },
});

await Promise.all([
  transform.writable.getWriter().write('A'),
  transform.readable.getReader().read(),
]); 
new TransformStream([transformer[, writableStrategy[, readableStrategy]]])#
  • transformer <Object>
    • start <Function> 一個使用者定義的函式,在 TransformStream 建立時立即被呼叫。
    • transform <Function> 一個使用者定義的函式,接收並可能修改寫入 transformStream.writable 的資料塊,然後將其轉發到 transformStream.readable
    • flush <Function> 一個使用者定義的函式,在 TransformStream 的可寫側關閉之前立即呼叫,標誌著轉換過程的結束。
    • readableType <any> readableType 選項為將來使用保留,並且必須undefined
    • writableType <any> writableType 選項為將來使用保留,並且必須undefined
  • writableStrategy <Object>
    • highWaterMark <number> 在施加背壓之前的最大內部佇列大小。
    • size <Function> 一個使用者定義的函式,用於確定每個資料塊的大小。
  • readableStrategy <Object>
    • highWaterMark <number> 在施加背壓之前的最大內部佇列大小。
    • size <Function> 一個使用者定義的函式,用於確定每個資料塊的大小。
transformStream.readable#
transformStream.writable#
使用 postMessage() 傳輸#

一個 <TransformStream> 例項可以使用 <MessagePort> 進行傳輸。

const stream = new TransformStream();

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => {
  const { writable, readable } = data;
  // ...
};

port2.postMessage(stream, [stream]); 

類: TransformStreamDefaultController#

TransformStreamDefaultController 管理 TransformStream 的內部狀態。

transformStreamDefaultController.desiredSize#

填滿可讀側佇列所需的資料量。

transformStreamDefaultController.enqueue([chunk])#

將一個數據塊附加到可讀側的佇列中。

transformStreamDefaultController.error([reason])#

向可讀和可寫兩側傳送訊號,通知在處理轉換資料時發生錯誤,導致兩側都被突然關閉。

transformStreamDefaultController.terminate()#

關閉傳輸的可讀側,並導致可寫側因錯誤而突然關閉。

類: ByteLengthQueuingStrategy#

new ByteLengthQueuingStrategy(init)#
byteLengthQueuingStrategy.highWaterMark#
byteLengthQueuingStrategy.size#

類: CountQueuingStrategy#

new CountQueuingStrategy(init)#
countQueuingStrategy.highWaterMark#
countQueuingStrategy.size#

類: TextEncoderStream#

new TextEncoderStream()#

建立一個新的 TextEncoderStream 例項。

textEncoderStream.encoding#

TextEncoderStream 例項支援的編碼。

textEncoderStream.readable#
textEncoderStream.writable#

類: TextDecoderStream#

new TextDecoderStream([encoding[, options]])#
  • encoding <string> 標識此 TextDecoder 例項支援的 encoding預設值: 'utf-8'
  • options <Object>
    • fatal <boolean> 如果解碼失敗是致命的,則為 true
    • ignoreBOM <boolean> 當為 true 時,TextDecoderStream 將在解碼結果中包含位元組順序標記 (BOM)。當為 false 時,位元組順序標記將從輸出中移除。此選項僅在 encoding'utf-8''utf-16be''utf-16le' 時使用。預設值: false

建立一個新的 TextDecoderStream 例項。

textDecoderStream.encoding#

TextDecoderStream 例項支援的編碼。

textDecoderStream.fatal#

如果解碼錯誤會導致丟擲 TypeError,則該值為 true

textDecoderStream.ignoreBOM#

如果解碼結果將包含位元組順序標記,則該值為 true

textDecoderStream.readable#
textDecoderStream.writable#

類: CompressionStream#

new CompressionStream(format)#
  • format <string> 'deflate''deflate-raw''gzip''brotli' 之一。
compressionStream.readable#
compressionStream.writable#

類: DecompressionStream#

new DecompressionStream(format)#
  • format <string> 'deflate''deflate-raw''gzip''brotli' 之一。
decompressionStream.readable#
decompressionStream.writable#

實用工具消費者#

實用工具消費者函式提供了消費流的常用選項。

它們透過以下方式訪問

import {
  arrayBuffer,
  blob,
  buffer,
  json,
  text,
} from 'node:stream/consumers';const {
  arrayBuffer,
  blob,
  buffer,
  json,
  text,
} = require('node:stream/consumers');
streamConsumers.arrayBuffer(stream)#
import { arrayBuffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { TextEncoder } from 'node:util';

const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');

const readable = Readable.from(dataArray);
const data = await arrayBuffer(readable);
console.log(`from readable: ${data.byteLength}`);
// Prints: from readable: 76const { arrayBuffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { TextEncoder } = require('node:util');

const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');
const readable = Readable.from(dataArray);
arrayBuffer(readable).then((data) => {
  console.log(`from readable: ${data.byteLength}`);
  // Prints: from readable: 76
});
streamConsumers.blob(stream)#
import { blob } from 'node:stream/consumers';

const dataBlob = new Blob(['hello world from consumers!']);

const readable = dataBlob.stream();
const data = await blob(readable);
console.log(`from readable: ${data.size}`);
// Prints: from readable: 27const { blob } = require('node:stream/consumers');

const dataBlob = new Blob(['hello world from consumers!']);

const readable = dataBlob.stream();
blob(readable).then((data) => {
  console.log(`from readable: ${data.size}`);
  // Prints: from readable: 27
});
streamConsumers.buffer(stream)#
import { buffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { Buffer } from 'node:buffer';

const dataBuffer = Buffer.from('hello world from consumers!');

const readable = Readable.from(dataBuffer);
const data = await buffer(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27const { buffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { Buffer } = require('node:buffer');

const dataBuffer = Buffer.from('hello world from consumers!');

const readable = Readable.from(dataBuffer);
buffer(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
  // Prints: from readable: 27
});
streamConsumers.json(stream)#
import { json } from 'node:stream/consumers';
import { Readable } from 'node:stream';

const items = Array.from(
  {
    length: 100,
  },
  () => ({
    message: 'hello world from consumers!',
  }),
);

const readable = Readable.from(JSON.stringify(items));
const data = await json(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 100const { json } = require('node:stream/consumers');
const { Readable } = require('node:stream');

const items = Array.from(
  {
    length: 100,
  },
  () => ({
    message: 'hello world from consumers!',
  }),
);

const readable = Readable.from(JSON.stringify(items));
json(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
  // Prints: from readable: 100
});
streamConsumers.text(stream)#
import { text } from 'node:stream/consumers';
import { Readable } from 'node:stream';

const readable = Readable.from('Hello world from consumers!');
const data = await text(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27const { text } = require('node:stream/consumers');
const { Readable } = require('node:stream');

const readable = Readable.from('Hello world from consumers!');
text(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
  // Prints: from readable: 27
});