如何使用流

在 Node.js 應用程式中處理大量資料可能是一把雙刃劍。處理海量資料的能力非常方便,但也可能導致效能瓶頸和記憶體耗盡。傳統上,開發人員透過一次性將整個資料集讀入記憶體來應對這一挑戰。這種方法雖然對於較小的資料集很直觀,但對於大資料(例如檔案、網路請求等)則變得效率低下且資源密集。

這就是 Node.js 流發揮作用的地方。流提供了一種根本不同的方法,允許您增量處理資料並最佳化記憶體使用。透過將資料處理成可管理的塊,流使您能夠構建可擴充套件的應用程式,從而高效地處理即使是最艱鉅的資料集。正如一句流行語所說,“流是隨時間變化的陣列。”

在本指南中,我們概述了流的概念、歷史和 API,並就如何使用和操作它們提供了一些建議。

什麼是 Node.js 流?

Node.js 流為管理應用程式中的資料流提供了強大的抽象。它們擅長處理大型資料集,例如從檔案和網路請求中讀取或寫入資料,而不會影響效能。

這種方法不同於一次性將整個資料集載入到記憶體中。流以塊的形式處理資料,從而顯著減少記憶體使用。Node.js 中的所有流都繼承自 EventEmitter 類,允許它們在資料處理的各個階段發出事件。這些流可以是可讀的、可寫的,或兩者兼備,為不同的資料處理場景提供了靈活性。

事件驅動架構

Node.js 以事件驅動架構為核心,使其成為即時 I/O 的理想選擇。這意味著一旦有輸入可用就立即消費,一旦應用程式生成輸出就立即傳送。流與這種方法無縫整合,實現了持續的資料處理。

它們透過在關鍵階段發出事件來實現這一點。這些事件包括接收到資料的訊號(data 事件)和流完成的訊號(end 事件)。開發人員可以監聽這些事件並相應地執行自定義邏輯。這種事件驅動的特性使得流在處理來自外部源的資料時非常高效。

為什麼要使用流?

與其他資料處理方法相比,流提供了三個關鍵優勢

  • 記憶體效率:流以增量方式處理資料,以塊的形式消費和處理資料,而不是將整個資料集載入到記憶體中。在處理大型資料集時,這是一個主要優勢,因為它顯著減少了記憶體使用並防止了與記憶體相關的效能問題。
  • 改善響應時間:流允許立即處理資料。當一塊資料到達時,可以立即處理,而無需等待整個有效載荷或資料集接收完畢。這減少了延遲並提高了應用程式的整體響應能力。
  • 即時處理的可擴充套件性:透過分塊處理資料,Node.js 流可以用有限的資源高效地處理大量資料。這種可擴充套件性使流成為即時處理大量資料的應用程式的理想選擇。

這些優勢使流成為構建高效能、可擴充套件的 Node.js 應用程式的強大工具,尤其是在處理大型資料集或即時資料時。

關於效能的說明

如果您的應用程式中已經有所有可用的資料,使用流可能會增加不必要的開銷、複雜性,並減慢您的應用程式。

流的歷史

本節參考了 Node.js 中流的歷史。除非您正在處理為 Node.js 0.11.5(2013 年)之前的版本編寫的程式碼庫,否則您很少會遇到舊版本的流 API,但這些術語可能仍在使用中。

流 0

第一版流與 Node.js 同時釋出。雖然當時還沒有 Stream 類,但不同的模組使用了這個概念並實現了 read/write 函式。util.pump() 函式可用於控制流之間的資料流。

流 1(經典)

隨著 2011 年 Node v0.4.0 的釋出,引入了 Stream 類以及 pipe() 方法。

流 2

2012 年,隨著 Node v0.10.0 的釋出,流 2 問世。此更新帶來了新的流子類,包括 Readable、Writable、Duplex 和 Transform。此外,還添加了 readable 事件。為了保持向後相容性,可以透過新增 data 事件監聽器或呼叫 pause()resume() 方法將流切換到舊模式。

流 3

2013 年,隨著 Node v0.11.5 的釋出,流 3 問世,以解決流同時具有 datareadable 事件處理程式的問題。這消除了在“當前”和“舊”模式之間進行選擇的需要。流 3 是 Node.js 中流的當前版本。

流型別

可讀流

Readable 是我們用來順序讀取資料來源的類。Node.js API 中 Readable 流的典型示例包括讀取檔案時的 fs.ReadStream 、讀取 HTTP 請求時的 http.IncomingMessage 以及從標準輸入讀取時的 process.stdin

關鍵方法和事件

可讀流通過幾個核心方法和事件進行操作,這些方法和事件允許對資料處理進行精細控制。

  • on('data'):每當流中有資料可用時,就會觸發此事件。它非常快,因為流會盡可能快地推送資料,使其適用於高吞吐量場景。
  • on('end'):當流中沒有更多資料可讀時發出。它表示資料傳遞的完成。此事件僅在流中的所有資料都已被消費後才會觸發。
  • on('readable'):當流中有資料可讀或已到達流末尾時,會觸發此事件。它允許在需要時更可控地讀取資料。
  • on('close'):當流及其底層資源已關閉時會發出此事件,並表示不會再發出任何事件。
  • on('error'):此事件可在任何時候發出,表示處理過程中出現錯誤。此事件的處理程式可用於避免未捕獲的異常。

以下各節將演示這些事件的使用。

基本可讀流

以下是一個動態生成資料的簡單可讀流實現示例:

class  extends Readable {
  #count = 0;
  () {
    this.push(':-)');
    if (++this.#count === 5) {
      this.push(null);
    }
  }
}

const  = new ();

.on('data',  => {
  .(.toString());
});

在此程式碼中,MyStream 類擴充套件了 Readable 並重寫了 _read() 方法,以將字串“:-)”推送到內部緩衝區。在推送該字串五次後,它透過推送 null 來表示流的結束。on('data') 事件處理程式會在接收到每個資料塊時將其記錄到控制檯。

使用 readable 事件進行高階控制

為了更精細地控制資料流,可以使用 readable 事件。此事件更復雜,但透過允許明確控制何時從流中讀取資料,為某些應用程式提供了更好的效能:

const  = new MyStream({
  : 1,
});

.on('readable', () => {
  .('>> readable event');
  let ;
  while (( = .read()) !== null) {
    .(.toString()); // Process the chunk
  }
});
.on('end', () => .('>> end event'));

在這裡,readable 事件用於根據需要手動從流中拉取資料。readable 事件處理程式內的迴圈會繼續從流緩衝區中讀取資料,直到返回 null,這表示緩衝區暫時為空或流已結束。將 highWaterMark 設定為 1 會保持較小的緩衝區大小,從而更頻繁地觸發 readable 事件,並允許對資料流進行更精細的控制。

使用前面的程式碼,您將得到如下輸出:

>> readable event: 1
:-):-)
:-)
:-)
:-)
>> readable event: 2
>> readable event: 3
>> readable event: 4
>> end event

我們來分析一下。當我們附加 on('readable') 事件時,它會首次呼叫 read(),因為這可能會觸發 readable 事件的發射。在該事件發射之後,我們在 while 迴圈的第一次迭代中呼叫 read。這就是為什麼我們在同一行中得到前兩個笑臉。之後,我們繼續呼叫 read,直到推送了 null。每次呼叫 read 都會安排發射一個新的 readable 事件,但由於我們處於“流動”模式(即使用 readable 事件),發射被安排在 nextTick。這就是為什麼我們在迴圈的同步程式碼結束後,最後一次性得到它們的原因。

注意:您可以嘗試使用 NODE_DEBUG=stream 執行程式碼,以檢視在每次 push 後都會觸發 emitReadable

如果我們想在每個笑臉之前看到 readable 事件被呼叫,我們可以將 push 包裝在 setImmediateprocess.nextTick 中,如下所示:

class  extends Readable {
  #count = 0;
  () {
    (() => {
      this.push(':-)');
      if (++this.#count === 5) {
        return this.push(null);
      }
    });
  }
}

然後我們會得到:

>> readable event: 1
:-)
>> readable event: 2
:-)
>> readable event: 3
:-)
>> readable event: 4
:-)
>> readable event: 5
:-)
>> readable event: 6
>> end event

可寫流

Writable 流對於建立檔案、上傳資料或任何涉及順序輸出資料的任務都很有用。雖然可讀流提供了資料來源,但 Node.js 中的可寫流則充當資料的目的地。Node.js API 中可寫流的典型示例是 fs.WriteStream process.stdout process.stderr

可寫流中的關鍵方法和事件

  • .write():此方法用於向流中寫入一塊資料。它透過緩衝資料直到達到定義的限制 (highWaterMark) 來處理資料,並返回一個布林值,指示是否可以立即寫入更多資料。
  • .end():此方法表示資料寫入過程的結束。它向流發出訊號,以完成寫入操作,並可能執行任何必要的清理工作。

建立可寫流

以下是建立一個可寫流的示例,該流在將所有傳入資料寫入標準輸出之前將其轉換為大寫:

const {  } = ('node:events');
const {  } = ('node:stream');

class  extends  {
  constructor() {
    super({ : 10 /* 10 bytes */ });
  }
  (, , ) {
    ..(.toString().toUpperCase() + '\n', );
  }
}

async function () {
  const  = new ();

  for (let  = 0;  < 10; ++) {
    const  = !.('hello');

    if () {
      .('>> wait drain');
      await (, 'drain');
    }
  }

  .('world');
}

// Call the async function
().(.);

在這段程式碼中,MyStream 是一個自定義的 Writable 流,其緩衝區容量(highWaterMark)為 10 位元組。它重寫了 _write 方法,在寫出資料之前將其轉換為大寫。

迴圈嘗試向流中寫入十次 hello。如果緩衝區已滿(waitDrain 變為 true),它會等待一個 drain 事件,然後再繼續,以確保我們不會使流的緩衝區過載。

輸出將是:

HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
WORLD

雙工流

Duplex 流同時實現了可讀和可寫介面。

雙工流中的關鍵方法和事件

雙工流實現了可讀流和可寫流中描述的所有方法和事件。

一個很好的雙工流示例是 net 模組中的 Socket 類:

const  = ('node:net');

// Create a TCP server
const  = .( => {
  .('Hello from server!\n');

  .('data',  => {
    .(`Client says: ${.()}`);
  });

  // Handle client disconnection
  .('end', () => {
    .('Client disconnected');
  });
});

// Start the server on port 8080
.(8080, () => {
  .('Server listening on port 8080');
});

之前的程式碼將在 8080 埠上開啟一個 TCP 套接字,向任何連線的客戶端傳送 Hello from server!,並記錄任何接收到的資料。

const  = ('node:net');

// Connect to the server at localhost:8080
const  = .({ : 8080 }, () => {
  .('Hello from client!\n');
});

.('data',  => {
  .(`Server says: ${.()}`);
});

// Handle the server closing the connection
.('end', () => {
  .('Disconnected from server');
});

前面的程式碼將連線到 TCP 套接字,傳送一條 Hello from client 訊息,並記錄任何接收到的資料。

轉換流

Transform 流是雙工流,其輸出是根據輸入計算得出的。顧名思義,它們通常用於在可讀流和可寫流之間轉換資料。

轉換流中的關鍵方法和事件

除了雙工流中的所有方法和事件之外,還有:

  • _transform:此函式在內部呼叫,用於處理可讀和可寫部分之間的資料流。應用程式程式碼不得呼叫此函式。

建立轉換流

要建立一個新的轉換流,我們可以向 Transform 建構函式傳遞一個 options 物件,其中包含一個 transform 函式,該函式使用 push 方法處理如何從輸入資料計算輸出資料。

const {  } = ('node:stream');

const  = new ({
  (, , ) {
    this.(.toString().toUpperCase());
    ();
  },
});

此流將接收任何輸入並以大寫形式輸出。

如何操作流

在使用流時,我們通常希望從一個源讀取並寫入一個目標,期間可能需要對資料進行一些轉換。以下各節將介紹實現此目的的不同方法。

.pipe()

.pipe() 方法將一個可讀流連線到一個可寫(或轉換)流。雖然這看起來是實現我們目標的簡單方法,但它將所有錯誤處理都委託給了程式設計師,這使得正確實現變得困難。

下面的示例展示了一個 pipe 試圖將當前檔案以大寫形式輸出到控制檯。

const  = ('node:fs');
const {  } = ('node:stream');

let  = 0;
const  = new ({
  (, , ) {
    if ( === 10) {
      return (new ('BOOM!'));
    }
    ++;
    this.(.toString().toUpperCase());
    ();
  },
});

const  = .(, { : 1 });
const  = .;

.().();

.('close', () => {
  .('Readable stream closed');
});

.('close', () => {
  .('Transform stream closed');
});

.('error',  => {
  .('\nError in transform stream:', .);
});

.('close', () => {
  .('Writable stream closed');
});

寫入 10 個字元後,upper 將在回撥中返回一個錯誤,這將導致流關閉。但是,其他流不會收到通知,從而導致記憶體洩漏。輸出將是:

CONST FS =
Error in transform stream: BOOM!
Transform stream closed

pipeline()

為了避免 .pipe() 方法的陷阱和低階複雜性,在大多數情況下,建議使用 pipeline() 方法。此方法是一種更安全、更健壯的管道流方式,可自動處理錯誤和清理工作。

下面的示例演示瞭如何使用 pipeline() 來避免前面示例的陷阱:

const  = ('node:fs');
const { ,  } = ('node:stream');

let  = 0;
const  = new ({
  (, , ) {
    if ( === 10) {
      return (new ('BOOM!'));
    }
    ++;
    this.(.toString().toUpperCase());
    ();
  },
});

const  = .(, { : 1 });
const  = .;

.('close', () => {
  .('Readable stream closed');
});

.('close', () => {
  .('\nTransform stream closed');
});

.('close', () => {
  .('Writable stream closed');
});

(, , ,  => {
  if () {
    return .('Pipeline error:', .);
  }
  .('Pipeline succeeded');
});

在這種情況下,所有流都將被關閉,並顯示以下輸出:

CONST FS =
Transform stream closed
Writable stream closed
Pipeline error: BOOM!
Readable stream closed

pipeline() 方法還有一個 async pipeline() 版本,它不接受回撥,而是返回一個在管道失敗時被拒絕的 promise。

非同步迭代器

非同步迭代器被推薦為與流 API 互動的標準方式。與 Web 和 Node.js 中的所有流原語相比,非同步迭代器更易於理解和使用,有助於減少錯誤並提高程式碼的可維護性。在 Node.js 的最新版本中,非同步迭代器已成為一種更優雅、更易讀的與流互動的方式。基於事件的基礎,非同步迭代器提供了一個更高級別的抽象,簡化了流的消費。

在 Node.js 中,所有可讀流都是非同步可迭代的。這意味著您可以使用 for await...of 語法來迴圈遍歷流的資料,並在資料可用時,以非同步程式碼的效率和簡單性處理每一塊資料。

將非同步迭代器與流一起使用的好處

將非同步迭代器與流一起使用,可以從以下幾個方面簡化非同步資料流的處理:

  • 增強可讀性:程式碼結構更清晰、更易讀,尤其是在處理多個非同步資料來源時。
  • 錯誤處理:非同步迭代器允許使用 try/catch 塊進行直接的錯誤處理,類似於常規的非同步函式。
  • 流量控制:它們透過消費者等待下一塊資料來固有地管理背壓,從而允許更高效的記憶體使用和處理。

非同步迭代器提供了一種更現代且通常更易讀的方式來處理可讀流,尤其是在處理非同步資料來源或當您更喜歡採用更順序的、基於迴圈的資料處理方法時。

下面是一個演示如何將非同步迭代器與可讀流一起使用的示例:

const  = ('node:fs');
const {  } = ('node:stream/promises');

async function () {
  await (
    .(),
    async function* () {
      for await (let  of ) {
        yield .toString().toUpperCase();
      }
    },
    .
  );
}

().(.);

此程式碼實現了與前面示例相同的結果,但無需定義新的轉換流。為了簡潔起見,已刪除了前面示例中的錯誤。已使用管道的非同步版本,應將其包裝在 try...catch 塊中以處理可能的錯誤。

物件模式

預設情況下,流可以處理字串、BufferTypedArrayDataView。如果將不同於這些型別的任意值(例如物件)推送到流中,則會丟擲 TypeError。但是,可以透過將 objectMode 選項設定為 true 來處理物件。這允許流處理任何 JavaScript 值,除了 null,它用於表示流的結束。這意味著您可以在可讀流中 pushread 任何值,並在可寫流中 write 任何值。

const {  } = ('node:stream');

const  = ({
  : true,
  () {
    this.push({ : 'world' });
    this.push(null);
  },
});

在物件模式下工作時,重要的是要記住 highWaterMark 選項指的是物件的數量,而不是位元組數。

背壓

使用流時,確保生產者不會壓垮消費者非常重要。為此,背壓機制在 Node.js API 的所有流中都得到了使用,並且實現者有責任維護這種行為。

在任何資料緩衝區超過 highWaterMark 或寫入隊列當前繁忙的情況下,.write() 將返回 false

當返回 false 值時,背壓系統就會啟動。它將暫停傳入的 Readable 流傳送任何資料,並等待消費者再次準備就緒。一旦資料緩衝區清空,將發出一個 'drain' 事件以恢復傳入的資料流。

要更深入地瞭解背壓,請檢視 背壓指南

流 vs Web 流

流的概念並非 Node.js 獨有。實際上,Node.js 對流概念有不同的實現,稱為 Web Streams,它實現了 WHATWG Streams Standard。雖然它們背後的概念相似,但重要的是要意識到它們有不同的 API 並且不直接相容。

Web Streams 實現了 ReadableStreamWritableStreamTransformStream 類,它們與 Node.js 的 ReadableWritableTransform 流同源。

流和 Web 流的互操作性

Node.js 提供了在 Web Streams 和 Node.js streams 之間進行轉換的實用函式。這些函式在每個流類中作為 toWebfromWeb 方法實現。

Duplex 類中的以下示例演示瞭如何處理轉換為 Web 流的可讀和可寫流:

const {  } = ('node:stream');

const  = ({
  () {
    this.push('world');
    this.push(null);
  },
  (, , ) {
    .('writable', );
    ();
  },
});

const { ,  } = .();
.().('hello');


  .()
  .()
  .( => {
    .('readable', .);
  });

如果您需要從 Node.js 模組返回一個 Web 流,反之亦然,那麼這些輔助函式會很有用。對於常規的流消費,非同步迭代器可以實現與 Node.js 和 Web 流的無縫互動。

const {  } = ('node:stream/promises');

async function () {
  const {  } = await ('https://nodejs.com.tw/api/stream.html');

  await (
    ,
    new (),
    async function* () {
      for await (const  of ) {
        yield .toString().toUpperCase();
      }
    },
    .
  );
}

().(.);

請注意,fetch 主體是 ReadableStream<Uint8Array>,因此需要一個 TextDecoderStream 來將資料塊作為字串處理。

這項工作源自 Matteo CollinaPlatformatic 部落格上釋出的內容。