Web Streams API#

穩定度:2 - 穩定

WHATWG 串流標準 (Streams Standard) 的一種實作。

總覽#

WHATWG 串流標準 (或稱「Web 串流」) 定義了處理串流資料的 API。它與 Node.js Streams API 相似,但出現較晚,且已成為許多 JavaScript 環境中串流資料的「標準」API。

主要有三種類型的物件:

  • ReadableStream - 代表串流資料的來源。
  • WritableStream - 代表串流資料的目的地。
  • TransformStream - 代表轉換串流資料的演算法。

ReadableStream 範例#

此範例建立了一個簡單的 ReadableStream,每隔一秒推送一次目前的 performance.now() 時間戳記。並使用非同步反覆運算器 (async iterable) 從串流中讀取資料。

import {
  ReadableStream,
} from 'node:stream/web';

import {
  setInterval as every,
} from 'node:timers/promises';

import {
  performance,
} from 'node:perf_hooks';

const SECOND = 1000;

const stream = new ReadableStream({
  async start(controller) {
    for await (const _ of every(SECOND))
      controller.enqueue(performance.now());
  },
});

for await (const value of stream)
  console.log(value);
const {
  ReadableStream,
} = require('node:stream/web');

const {
  setInterval: every,
} = require('node:timers/promises');

const {
  performance,
} = require('node:perf_hooks');

const SECOND = 1000;

const stream = new ReadableStream({
  async start(controller) {
    for await (const _ of every(SECOND))
      controller.enqueue(performance.now());
  },
});

(async () => {
  for await (const value of stream)
    console.log(value);
})();

Node.js 串流互通性#

Node.js 串流可以透過 stream.Readablestream.Writablestream.Duplex 物件上的 toWebfromWeb 方法與 Web 串流進行互相轉換。

更多細節請參考相關說明文件:

API#

類別:ReadableStream#

new ReadableStream([underlyingSource [, strategy]])#
  • underlyingSource <Object>
    • start <Function> 使用者定義的函式,在建立 ReadableStream 時會立即被調用。
    • pull <Function> 使用者定義的函式,當 ReadableStream 內部隊列未滿時會被重複呼叫。此操作可以是同步或非同步的。若為非同步,在先前傳回的 Promise 實現之前,該函式不會再次被呼叫。
    • cancel <Function>ReadableStream 被取消時所呼叫的使用者定義函式。
      • reason <any>
      • 回傳:一個已實現為 undefined 的 Promise。
    • type <string> 必須為 'bytes'undefined
    • autoAllocateChunkSize <number> 僅在 type 等於 'bytes' 時使用。設定為非零值時,會自動為 ReadableByteStreamController.byobRequest 分配一個視圖緩衝區 (view buffer)。未設定時,必須使用串流的內部隊列透過預設讀取器 ReadableStreamDefaultReader 傳輸資料。
  • strategy <Object>
    • highWaterMark <number> 施加背壓 (backpressure) 前的內部隊列最大值。
    • size <Function> 用於識別每個資料區塊 (chunk) 大小的使用者定義函式。
readableStream.locked#

readableStream.locked 屬性預設為 false,當有活動中的讀取器正在取用串流資料時會切換為 true

readableStream.cancel([reason])#
  • reason <any>
  • 回傳:一個在取消完成後實現為 undefined 的 Promise。
readableStream.getReader([options])#
import { ReadableStream } from 'node:stream/web';

const stream = new ReadableStream();

const reader = stream.getReader();

console.log(await reader.read());
const { ReadableStream } = require('node:stream/web');

const stream = new ReadableStream();

const reader = stream.getReader();

reader.read().then(console.log);

導致 readableStream.locked 變為 true

readableStream.pipeThrough(transform[, options])#
  • transform <Object>
    • readable <ReadableStream> transform.writable 會將從此 ReadableStream 接收並可能經過修改的資料推送到此 ReadableStream
    • writable <WritableStream>ReadableStream 的資料將被寫入的 WritableStream
  • options <Object>
    • preventAbort <boolean>true 時,此 ReadableStream 中的錯誤不會導致 transform.writable 中斷。
    • preventCancel <boolean>true 時,目的地 transform.writable 中的錯誤不會導致此 ReadableStream 被取消。
    • preventClose <boolean>true 時,關閉此 ReadableStream 不會導致 transform.writable 被關閉。
    • signal <AbortSignal> 允許使用 <AbortController> 取消資料傳輸。
  • 回傳:來自 transform.readable<ReadableStream>

將此 <ReadableStream> 連接到 transform 引數中提供的 <ReadableStream><WritableStream> 配對,使得來自此 <ReadableStream> 的資料寫入 transform.writable,經過可能的轉換後,推送到 transform.readable。管線配置完成後,將回傳 transform.readable

當導管 (pipe) 操作活動期間,會導致 readableStream.lockedtrue

import {
  ReadableStream,
  TransformStream,
} from 'node:stream/web';

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('a');
  },
});

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  },
});

const transformedStream = stream.pipeThrough(transform);

for await (const chunk of transformedStream)
  console.log(chunk);
  // Prints: A
const {
  ReadableStream,
  TransformStream,
} = require('node:stream/web');

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('a');
  },
});

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  },
});

const transformedStream = stream.pipeThrough(transform);

(async () => {
  for await (const chunk of transformedStream)
    console.log(chunk);
    // Prints: A
})();
readableStream.pipeTo(destination[, options])#
  • destination <WritableStream>ReadableStream 資料將寫入的 <WritableStream>
  • options <Object>
    • preventAbort <boolean>true 時,此 ReadableStream 中的錯誤不會導致 destination 中斷。
    • preventCancel <boolean>true 時,destination 中的錯誤不會導致此 ReadableStream 被取消。
    • preventClose <boolean>true 時,關閉此 ReadableStream 不會導致 destination 被關閉。
    • signal <AbortSignal> 允許使用 <AbortController> 取消資料傳輸。
  • 回傳:一個實現為 undefined 的 Promise。

當導管 (pipe) 操作活動期間,會導致 readableStream.lockedtrue

readableStream.tee()#

回傳一對新的 <ReadableStream> 實例,此 ReadableStream 的資料將被轉發至其中。兩者都將接收到相同的資料。

導致 readableStream.locked 變為 true

readableStream.values([options])#

建立並回傳一個可用於取用此 ReadableStream 資料的非同步反覆運算器。

當非同步反覆運算器活動期間,會導致 readableStream.lockedtrue

import { Buffer } from 'node:buffer';

const stream = new ReadableStream(getSomeSource());

for await (const chunk of stream.values({ preventCancel: true }))
  console.log(Buffer.from(chunk).toString());
非同步反覆運算 (Async Iteration)#

<ReadableStream> 物件支援使用 for await 語法的非同步反覆運算器協定。

import { Buffer } from 'node:buffer';

const stream = new ReadableStream(getSomeSource());

for await (const chunk of stream)
  console.log(Buffer.from(chunk).toString());

非同步反覆運算器將取用 <ReadableStream> 直到其終止。

預設情況下,如果非同步反覆運算器提早結束(透過 breakreturnthrow),<ReadableStream> 將被關閉。若要防止 <ReadableStream> 自動關閉,請使用 readableStream.values() 方法取得非同步反覆運算器,並將 preventCancel 選項設為 true

<ReadableStream> 必須未被鎖定(即不得有現有的活動讀取器)。在非同步反覆運算期間,<ReadableStream> 將被鎖定。

使用 postMessage() 傳輸#

<ReadableStream> 實例可以使用 <MessagePort> 進行傳輸。

const stream = new ReadableStream(getReadableSourceSomehow());

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => {
  data.getReader().read().then((chunk) => {
    console.log(chunk);
  });
};

port2.postMessage(stream, [stream]);

ReadableStream.from(iterable)#

  • iterable <Iterable> 實作 Symbol.asyncIteratorSymbol.iterator 反覆運算協定的物件。

從可反覆運算物件建立新 <ReadableStream> 的工具方法。

import { ReadableStream } from 'node:stream/web';

async function* asyncIterableGenerator() {
  yield 'a';
  yield 'b';
  yield 'c';
}

const stream = ReadableStream.from(asyncIterableGenerator());

for await (const chunk of stream)
  console.log(chunk); // Prints: 'a', 'b', 'c'
const { ReadableStream } = require('node:stream/web');

async function* asyncIterableGenerator() {
  yield 'a';
  yield 'b';
  yield 'c';
}

(async () => {
  const stream = ReadableStream.from(asyncIterableGenerator());

  for await (const chunk of stream)
    console.log(chunk); // Prints: 'a', 'b', 'c'
})();

要將產生的 <ReadableStream> 導向 <WritableStream>,該 <Iterable> 應產生一序列的 <Buffer><TypedArray><DataView> 物件。

import { ReadableStream } from 'node:stream/web';
import { Buffer } from 'node:buffer';

async function* asyncIterableGenerator() {
  yield Buffer.from('a');
  yield Buffer.from('b');
  yield Buffer.from('c');
}

const stream = ReadableStream.from(asyncIterableGenerator());

await stream.pipeTo(createWritableStreamSomehow());
const { ReadableStream } = require('node:stream/web');
const { Buffer } = require('node:buffer');

async function* asyncIterableGenerator() {
  yield Buffer.from('a');
  yield Buffer.from('b');
  yield Buffer.from('c');
}

const stream = ReadableStream.from(asyncIterableGenerator());

(async () => {
  await stream.pipeTo(createWritableStreamSomehow());
})();

類別:ReadableStreamDefaultReader#

預設情況下,呼叫不帶引數的 readableStream.getReader() 將傳回 ReadableStreamDefaultReader 實例。預設讀取器將通過串流的資料區塊視為不透明值,這使得 <ReadableStream> 基本上可以處理任何 JavaScript 值。

new ReadableStreamDefaultReader(stream)#

建立一個鎖定至給定 <ReadableStream> 的新 <ReadableStreamDefaultReader>

readableStreamDefaultReader.cancel([reason])#
  • reason <any>
  • 回傳:一個已實現為 undefined 的 Promise。

取消 <ReadableStream> 並傳回一個在底層串流被取消時實現的 Promise。

readableStreamDefaultReader.closed#
  • 類型:<Promise> 當關聯的 <ReadableStream> 關閉時實現為 undefined;若串流出錯或讀取器的鎖定在串流完成關閉前被釋放,則被拒絕 (rejected)。
readableStreamDefaultReader.read()#
  • 回傳:一個實現為物件的 Promise

從底層 <ReadableStream> 請求下一個資料區塊,並回傳一個在資料可用時實現的 Promise。

readableStreamDefaultReader.releaseLock()#

釋放此讀取器對底層 <ReadableStream> 的鎖定。

類別:ReadableStreamBYOBReader#

ReadableStreamBYOBReader 是針對位元組導向的 <ReadableStream> 的另一種取用方式(即建立 ReadableStream 時將 underlyingSource.type 設為 'bytes' 的串流)。

BYOB 是 "bring your own buffer" (自備緩衝區) 的縮寫。這是一種能更有效率地讀取位元組導向資料的模式,可避免不必要的複製動作。

import {
  open,
} from 'node:fs/promises';

import {
  ReadableStream,
} from 'node:stream/web';

import { Buffer } from 'node:buffer';

class Source {
  type = 'bytes';
  autoAllocateChunkSize = 1024;

  async start(controller) {
    this.file = await open(new URL(import.meta.url));
    this.controller = controller;
  }

  async pull(controller) {
    const view = controller.byobRequest?.view;
    const {
      bytesRead,
    } = await this.file.read({
      buffer: view,
      offset: view.byteOffset,
      length: view.byteLength,
    });

    if (bytesRead === 0) {
      await this.file.close();
      this.controller.close();
    }
    controller.byobRequest.respond(bytesRead);
  }
}

const stream = new ReadableStream(new Source());

async function read(stream) {
  const reader = stream.getReader({ mode: 'byob' });

  const chunks = [];
  let result;
  do {
    result = await reader.read(Buffer.alloc(100));
    if (result.value !== undefined)
      chunks.push(Buffer.from(result.value));
  } while (!result.done);

  return Buffer.concat(chunks);
}

const data = await read(stream);
console.log(Buffer.from(data).toString());
new ReadableStreamBYOBReader(stream)#

建立一個鎖定至給定 <ReadableStream> 的新 ReadableStreamBYOBReader

readableStreamBYOBReader.cancel([reason])#
  • reason <any>
  • 回傳:一個已實現為 undefined 的 Promise。

取消 <ReadableStream> 並傳回一個在底層串流被取消時實現的 Promise。

readableStreamBYOBReader.closed#
  • 類型:<Promise> 當關聯的 <ReadableStream> 關閉時實現為 undefined;若串流出錯或讀取器的鎖定在串流完成關閉前被釋放,則被拒絕 (rejected)。
readableStreamBYOBReader.read(view[, options])#

從底層 <ReadableStream> 請求下一個資料區塊,並回傳一個在資料可用時實現的 Promise。

請勿將池化 (pooled) 的 <Buffer> 物件實例傳入此方法。池化的 Buffer 物件是使用 Buffer.allocUnsafe()Buffer.from() 建立的,或常由各種 node:fs 模組回呼函式回傳。這類 Buffer 使用一個共享的底層 <ArrayBuffer> 物件,其中包含所有池化 Buffer 實例的資料。當 Buffer<TypedArray><DataView> 被傳入 readableStreamBYOBReader.read() 時,該視圖的底層 ArrayBuffer 會被「脫離」(detached),使該 ArrayBuffer 上可能存在的所有現有視圖失效。這可能會對您的應用程式造成災難性的後果。

readableStreamBYOBReader.releaseLock()#

釋放此讀取器對底層 <ReadableStream> 的鎖定。

類別:ReadableStreamDefaultController#

每個 <ReadableStream> 都有一個控制器,負責內部狀態和串流隊列的管理。ReadableStreamDefaultController 是非位元組導向 ReadableStream 的預設控制器實作。

readableStreamDefaultController.close()#

關閉與此控制器關聯的 <ReadableStream>

readableStreamDefaultController.desiredSize#

回傳填滿 <ReadableStream> 隊列所需的剩餘資料量。

readableStreamDefaultController.enqueue([chunk])#

將新的資料區塊附加到 <ReadableStream> 的隊列中。

readableStreamDefaultController.error([error])#

發出錯誤訊號,導致 <ReadableStream> 出錯並關閉。

類別:ReadableByteStreamController#

每個 <ReadableStream> 都有一個控制器,負責內部狀態和串流隊列的管理。ReadableByteStreamController 用於位元組導向的 ReadableStream

readableByteStreamController.byobRequest#
readableByteStreamController.close()#

關閉與此控制器關聯的 <ReadableStream>

readableByteStreamController.desiredSize#

回傳填滿 <ReadableStream> 隊列所需的剩餘資料量。

readableByteStreamController.enqueue(chunk)#

將新的資料區塊附加到 <ReadableStream> 的隊列中。

readableByteStreamController.error([error])#

發出錯誤訊號,導致 <ReadableStream> 出錯並關閉。

類別:ReadableStreamBYOBRequest#

在位元組導向串流中使用 ReadableByteStreamController,且使用 ReadableStreamBYOBReader 時,readableByteStreamController.byobRequest 屬性提供了對 ReadableStreamBYOBRequest 實例的存取,該實例代表目前的讀取請求。此物件用於存取為讀取請求提供以供填充的 ArrayBuffer/TypedArray,並提供發送資料已提供訊號的方法。

readableStreamBYOBRequest.respond(bytesWritten)#

發出訊號,表示已有 bytesWritten 數量的位元組寫入 readableStreamBYOBRequest.view

readableStreamBYOBRequest.respondWithNewView(view)#

發出訊號,表示請求已透過寫入新的 BufferTypedArrayDataView 完成。

readableStreamBYOBRequest.view#

類別:WritableStream#

WritableStream 是發送串流資料的目的地。

import {
  WritableStream,
} from 'node:stream/web';

const stream = new WritableStream({
  write(chunk) {
    console.log(chunk);
  },
});

await stream.getWriter().write('Hello World');
new WritableStream([underlyingSink[, strategy]])#
  • underlyingSink <Object>
    • start <Function> 使用者定義的函式,在建立 WritableStream 時會立即被調用。
    • write <Function> 當資料區塊寫入 WritableStream 時所呼叫的使用者定義函式。
    • close <Function>WritableStream 關閉時所呼叫的使用者定義函式。
      • 回傳:一個已實現為 undefined 的 Promise。
    • abort <Function> 用於突然關閉 WritableStream 的使用者定義函式。
      • reason <any>
      • 回傳:一個已實現為 undefined 的 Promise。
    • type <any> type 選項保留供未來使用,且「必須」為 undefined。
  • strategy <Object>
    • highWaterMark <number> 施加背壓 (backpressure) 前的內部隊列最大值。
    • size <Function> 用於識別每個資料區塊 (chunk) 大小的使用者定義函式。
writableStream.abort([reason])#
  • reason <any>
  • 回傳:一個已實現為 undefined 的 Promise。

突然終止 WritableStream。所有排隊中的寫入操作都將被取消,其關聯的 Promise 將被拒絕。

writableStream.close()#
  • 回傳:一個已實現為 undefined 的 Promise。

當不再預期有額外的寫入時,關閉 WritableStream

writableStream.getWriter()#

建立並回傳一個可用於將資料寫入 WritableStream 的新寫入器實例。

writableStream.locked#

writableStream.locked 屬性預設為 false,當有活動中的寫入器附加到此 WritableStream 時會切換為 true

使用 postMessage() 傳輸#

<WritableStream> 實例可以使用 <MessagePort> 進行傳輸。

const stream = new WritableStream(getWritableSinkSomehow());

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => {
  data.getWriter().write('hello');
};

port2.postMessage(stream, [stream]);

類別:WritableStreamDefaultWriter#

new WritableStreamDefaultWriter(stream)#

建立一個鎖定至給定 WritableStream 的新 WritableStreamDefaultWriter

writableStreamDefaultWriter.abort([reason])#
  • reason <any>
  • 回傳:一個已實現為 undefined 的 Promise。

突然終止 WritableStream。所有排隊中的寫入操作都將被取消,其關聯的 Promise 將被拒絕。

writableStreamDefaultWriter.close()#
  • 回傳:一個已實現為 undefined 的 Promise。

當不再預期有額外的寫入時,關閉 WritableStream

writableStreamDefaultWriter.closed#
  • 類型:<Promise> 當關聯的 <WritableStream> 關閉時實現為 undefined;若串流出錯或寫入器的鎖定在串流完成關閉前被釋放,則被拒絕。
writableStreamDefaultWriter.desiredSize#

填滿 <WritableStream> 隊列所需的資料量。

writableStreamDefaultWriter.ready#
  • 類型:<Promise> 當寫入器準備好可以使用時實現為 undefined
writableStreamDefaultWriter.releaseLock()#

釋放此寫入器對底層 <ReadableStream> 的鎖定。

writableStreamDefaultWriter.write([chunk])#
  • chunk <any>
  • 回傳:一個已實現為 undefined 的 Promise。

將新的資料區塊附加到 <WritableStream> 的隊列中。

類別:WritableStreamDefaultController#

WritableStreamDefaultController 管理 <WritableStream> 的內部狀態。

writableStreamDefaultController.error([error])#

由使用者程式碼呼叫,以發出處理 WritableStream 資料時發生錯誤的訊號。呼叫時,<WritableStream> 將被中斷,目前待處理的寫入操作將被取消。

writableStreamDefaultController.signal#

類別:TransformStream#

TransformStream 由一個 <ReadableStream> 和一個 <WritableStream> 組成,兩者相互連接,使得寫入 WritableStream 的資料在推送到 ReadableStream 隊列之前被接收並可能經過轉換。

import {
  TransformStream,
} from 'node:stream/web';

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  },
});

await Promise.all([
  transform.writable.getWriter().write('A'),
  transform.readable.getReader().read(),
]);
new TransformStream([transformer[, writableStrategy[, readableStrategy]]])#
  • transformer <Object>
    • start <Function> 使用者定義的函式,在建立 TransformStream 時會立即被調用。
    • transform <Function> 使用者定義的函式,接收並可能修改寫入 transformStream.writable 的資料區塊,然後將其轉發給 transformStream.readable
    • flush <Function> 使用者定義的函式,在 TransformStream 的可寫端點關閉前立即呼叫,代表轉換過程結束。
    • readableType <any> readableType 選項保留供未來使用,且「必須」為 undefined
    • writableType <any> writableType 選項保留供未來使用,且「必須」為 undefined
  • writableStrategy <Object>
    • highWaterMark <number> 施加背壓 (backpressure) 前的內部隊列最大值。
    • size <Function> 用於識別每個資料區塊 (chunk) 大小的使用者定義函式。
  • readableStrategy <Object>
    • highWaterMark <number> 施加背壓 (backpressure) 前的內部隊列最大值。
    • size <Function> 用於識別每個資料區塊 (chunk) 大小的使用者定義函式。
transformStream.readable#
transformStream.writable#
使用 postMessage() 傳輸#

<TransformStream> 實例可以使用 <MessagePort> 進行傳輸。

const stream = new TransformStream();

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => {
  const { writable, readable } = data;
  // ...
};

port2.postMessage(stream, [stream]);

類別:TransformStreamDefaultController#

TransformStreamDefaultController 管理 TransformStream 的內部狀態。

transformStreamDefaultController.desiredSize#

填滿可讀端點隊列所需的資料量。

transformStreamDefaultController.enqueue([chunk])#

將資料區塊附加到可讀端點的隊列中。

transformStreamDefaultController.error([reason])#

向可讀端與可寫端發出訊號,表示處理轉換資料時發生錯誤,導致兩端同時突然關閉。

transformStreamDefaultController.terminate()#

關閉傳輸的可讀端,並導致可寫端因錯誤而突然關閉。

類別:ByteLengthQueuingStrategy#

new ByteLengthQueuingStrategy(init)#
byteLengthQueuingStrategy.highWaterMark#
byteLengthQueuingStrategy.size#

類別:CountQueuingStrategy#

new CountQueuingStrategy(init)#
countQueuingStrategy.highWaterMark#
countQueuingStrategy.size#

類別:TextEncoderStream#

new TextEncoderStream()#

建立一個新的 TextEncoderStream 實例。

textEncoderStream.encoding#

TextEncoderStream 實例支援的編碼方式。

textEncoderStream.readable#
textEncoderStream.writable#

類別:TextDecoderStream#

new TextDecoderStream([encoding[, options]])#
  • encoding <string> 識別此 TextDecoder 實例支援的編碼。預設值: 'utf-8'
  • options <Object>
    • fatal <boolean>true 時,解碼失敗會被視為嚴重錯誤。
    • ignoreBOM <boolean>true 時,TextDecoderStream 會在解碼結果中包含位元組順序標記 (BOM)。為 false 時,BOM 會從輸出中移除。此選項僅在 encoding'utf-8''utf-16be''utf-16le' 時使用。預設值: false

建立一個新的 TextDecoderStream 實例。

textDecoderStream.encoding#

TextDecoderStream 實例支援的編碼方式。

textDecoderStream.fatal#

若解碼錯誤會導致拋出 TypeError,則此值為 true

textDecoderStream.ignoreBOM#

若解碼結果包含位元組順序標記 (BOM),則此值為 true

textDecoderStream.readable#
textDecoderStream.writable#

類別:CompressionStream#

new CompressionStream(format)#
  • format <string> 'deflate''deflate-raw''gzip''brotli' 其中之一。
compressionStream.readable#
compressionStream.writable#

類別:DecompressionStream#

new DecompressionStream(format)#
  • format <string> 'deflate''deflate-raw''gzip''brotli' 其中之一。
decompressionStream.readable#
decompressionStream.writable#

工具取用端 (Utility Consumers)#

工具型取用端函式提供了取用串流的常見選項。

可以使用以下方式存取:

import {
  arrayBuffer,
  blob,
  buffer,
  json,
  text,
} from 'node:stream/consumers';
const {
  arrayBuffer,
  blob,
  buffer,
  json,
  text,
} = require('node:stream/consumers');
streamConsumers.arrayBuffer(stream)#
import { arrayBuffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { TextEncoder } from 'node:util';

const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');

const readable = Readable.from(dataArray);
const data = await arrayBuffer(readable);
console.log(`from readable: ${data.byteLength}`);
// Prints: from readable: 76
const { arrayBuffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { TextEncoder } = require('node:util');

const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');
const readable = Readable.from(dataArray);
arrayBuffer(readable).then((data) => {
  console.log(`from readable: ${data.byteLength}`);
  // Prints: from readable: 76
});
streamConsumers.blob(stream)#
import { blob } from 'node:stream/consumers';

const dataBlob = new Blob(['hello world from consumers!']);

const readable = dataBlob.stream();
const data = await blob(readable);
console.log(`from readable: ${data.size}`);
// Prints: from readable: 27
const { blob } = require('node:stream/consumers');

const dataBlob = new Blob(['hello world from consumers!']);

const readable = dataBlob.stream();
blob(readable).then((data) => {
  console.log(`from readable: ${data.size}`);
  // Prints: from readable: 27
});
streamConsumers.buffer(stream)#
import { buffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { Buffer } from 'node:buffer';

const dataBuffer = Buffer.from('hello world from consumers!');

const readable = Readable.from(dataBuffer);
const data = await buffer(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27
const { buffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { Buffer } = require('node:buffer');

const dataBuffer = Buffer.from('hello world from consumers!');

const readable = Readable.from(dataBuffer);
buffer(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
  // Prints: from readable: 27
});
streamConsumers.bytes(stream)#
import { bytes } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { Buffer } from 'node:buffer';

const dataBuffer = Buffer.from('hello world from consumers!');

const readable = Readable.from(dataBuffer);
const data = await bytes(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27
const { bytes } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { Buffer } = require('node:buffer');

const dataBuffer = Buffer.from('hello world from consumers!');

const readable = Readable.from(dataBuffer);
bytes(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
  // Prints: from readable: 27
});
streamConsumers.json(stream)#
import { json } from 'node:stream/consumers';
import { Readable } from 'node:stream';

const items = Array.from(
  {
    length: 100,
  },
  () => ({
    message: 'hello world from consumers!',
  }),
);

const readable = Readable.from(JSON.stringify(items));
const data = await json(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 100
const { json } = require('node:stream/consumers');
const { Readable } = require('node:stream');

const items = Array.from(
  {
    length: 100,
  },
  () => ({
    message: 'hello world from consumers!',
  }),
);

const readable = Readable.from(JSON.stringify(items));
json(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
  // Prints: from readable: 100
});
streamConsumers.text(stream)#
import { text } from 'node:stream/consumers';
import { Readable } from 'node:stream';

const readable = Readable.from('Hello world from consumers!');
const data = await text(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27
const { text } = require('node:stream/consumers');
const { Readable } = require('node:stream');

const readable = Readable.from('Hello world from consumers!');
text(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
  // Prints: from readable: 27
});