流中的背壓

資料處理中存在一個普遍問題,稱為背壓(backpressure),它描述了資料傳輸過程中緩衝區後方資料的積壓。當傳輸的接收端有複雜操作,或由於某種原因速度較慢時,來自傳入源的資料就有可能積聚起來,就像堵塞一樣。

為了解決這個問題,必須有一個協調系統來確保資料從一個源頭到另一個源頭的順暢流動。不同的社群針對其程式以獨特的方式解決了這個問題,Unix 管道和 TCP 套接字就是很好的例子,它們通常被稱為*流量控制*。在 Node.js 中,流(streams)是採納的解決方案。

本指南的目的是進一步詳細說明什麼是背壓,以及流在 Node.js 原始碼中究竟如何解決這個問題。指南的第二部分將介紹建議的最佳實踐,以確保您在實現流時應用程式程式碼是安全和最佳化的。

我們假設您對 Node.js 中的背壓BufferEventEmitter的一般定義有所瞭解,並有一些使用Stream的經驗。如果您還沒有閱讀過這些文件,不妨先看一下 API 文件,這將有助於您在閱讀本指南時加深理解。

資料處理的問題

在計算機系統中,資料透過管道、套接字和訊號從一個程序傳輸到另一個程序。在 Node.js 中,我們找到了一個類似的機制,稱為Stream。流非常棒!它們為 Node.js 做了很多工作,幾乎內部程式碼庫的每個部分都利用了該模組。作為開發者,我們也非常鼓勵您使用它們!

const  = ('node:readline');

// process.stdin and process.stdout are both instances of Streams.
const  = .({
  : .,
  : .,
});

.('Why should you use streams? ',  => {
  .(`Maybe it's ${}, maybe it's because they are awesome! :)`);

  .();
});

透過比較 Node.js 的Stream實現與內部系統工具,可以很好地說明為什麼透過流實現的背壓機制是一項出色的最佳化。

在一個場景中,我們將一個大檔案(大約 9 GB)用大家熟悉的zip(1)工具進行壓縮。

zip The.Matrix.1080p.mkv

雖然這需要幾分鐘才能完成,但在另一個 shell 中,我們可以執行一個指令碼,使用 Node.js 的zlib模組,它封裝了另一個壓縮工具gzip(1)

const  = ('node:fs');
const  = ('node:zlib').();

const  = .('The.Matrix.1080p.mkv');
const  = .('The.Matrix.1080p.mkv.gz');

.().();

要測試結果,請嘗試開啟每個壓縮檔案。由zip(1)工具壓縮的檔案會通知您檔案已損壞,而由Stream完成的壓縮則可以無誤地解壓。

在這個例子中,我們使用 .pipe() 將資料來源從一端傳輸到另一端。但是,請注意沒有附加適當的錯誤處理程式。如果一個數據塊未能被正確接收,Readable源或gzip流將不會被銷燬。pump是一個實用工具,如果管道中的某個流失敗或關閉,它會正確地銷燬所有流,在這種情況下是必不可少的!

pump僅適用於 Node.js 8.x 或更早版本,因為在 Node.js 10.x 或更高版本中,引入了pipeline來替代pump。這是一個模組方法,用於在流之間建立管道,轉發錯誤,並妥善清理,並在管道完成時提供回撥。

這是一個使用 pipeline 的例子

const  = ('node:fs');
const {  } = ('node:stream');
const  = ('node:zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge video file efficiently:

(
  .('The.Matrix.1080p.mkv'),
  .(),
  .('The.Matrix.1080p.mkv.gz'),
   => {
    if () {
      .('Pipeline failed', );
    } else {
      .('Pipeline succeeded');
    }
  }
);

你也可以使用stream/promises模組,透過async / await來使用 pipeline。

const  = ('node:fs');
const {  } = ('node:stream/promises');
const  = ('node:zlib');

async function () {
  try {
    await (
      .('The.Matrix.1080p.mkv'),
      .(),
      .('The.Matrix.1080p.mkv.gz')
    );
    .('Pipeline succeeded');
  } catch () {
    .('Pipeline failed', );
  }
}

資料太多、太快

在某些情況下,Readable流可能會以過快的速度將資料提供給Writable流——遠超消費者的處理能力!

當這種情況發生時,消費者將開始將所有資料塊排隊等待稍後消費。寫入佇列會越來越長,因此必須在記憶體中保留更多資料,直到整個過程完成。

寫入磁碟比從磁碟讀取要慢得多,因此,當我們嘗試壓縮檔案並將其寫入硬碟時,會發生背壓,因為寫入磁碟的速度跟不上讀取的速度。

// Secretly the stream is saying: "whoa, whoa! hang on, this is way too much!"
// Data will begin to build up on the read side of the data buffer as
// `write` tries to keep up with the incoming data flow.
inp.pipe(gzip).pipe(outputFile);

這就是為什麼背壓機制很重要。如果沒有背壓系統,程序會耗盡系統記憶體,從而有效地減慢其他程序的速度,並獨佔系統的大部分資源直到完成。

這會導致幾個問題

  • 減慢所有其他當前程序的速度
  • 一個工作負荷極重的垃圾回收器
  • 記憶體耗盡

在下面的示例中,我們將取出 .write() 函式的返回值並將其更改為 true,這實際上停用了 Node.js 核心中的背壓支援。在任何提到“修改後”的二進位制檔案時,我們指的是執行不帶 return ret; 行,而是替換為 return true;node 二進位制檔案。

對垃圾回收的過度拖累

讓我們來看一個快速的基準測試。使用上面相同的例子,我們進行了一些計時試驗,以獲得兩種二進位制檔案的中位數時間。

   trial (#)  | `node` binary (ms) | modified `node` binary (ms)
=================================================================
      1       |      56924         |           55011
      2       |      52686         |           55869
      3       |      59479         |           54043
      4       |      54473         |           55229
      5       |      52933         |           59723
=================================================================
average time: |      55299         |           55975

兩者執行時間都大約一分鐘,所以差別不大,但讓我們仔細看看以確認我們的懷疑是否正確。我們使用 Linux 工具dtrace來評估 V8 垃圾回收器的情況。

GC(垃圾回收器)測量的時間表示垃圾回收器完成一次完整清除週期的間隔

approx. time (ms) | GC (ms) | modified GC (ms)
=================================================
          0       |    0    |      0
          1       |    0    |      0
         40       |    0    |      2
        170       |    3    |      1
        300       |    3    |      1

         *             *           *
         *             *           *
         *             *           *

      39000       |    6    |     26
      42000       |    6    |     21
      47000       |    5    |     32
      50000       |    8    |     28
      54000       |    6    |     35

雖然這兩個程序開始時相同,並且似乎以相同的速率使 GC 工作,但很明顯,幾秒鐘後,在有正常工作的背壓系統的情況下,它會將 GC 負載分散到 4-8 毫秒的持續間隔中,直到資料傳輸結束。

然而,當沒有背壓系統時,V8 的垃圾回收開始變得拖沓。正常的二進位制檔案在一分鐘內大約觸發了 75 次 GC,而修改後的二進位制檔案只觸發了 36 次。

這是由於記憶體使用量增長而緩慢積累的效能債務。在沒有背壓系統的情況下,隨著資料傳輸,每個資料塊的傳輸都會佔用更多記憶體。

分配的記憶體越多,GC 在一次清理中需要處理的就越多。清理範圍越大,GC 就需要更多時間來決定哪些可以被釋放,而在更大的記憶體空間中掃描遊離指標將消耗更多的計算能力。

記憶體耗盡

為了確定每個二進位制檔案的記憶體消耗,我們分別使用 /usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js 對每個程序進行了計時。

這是正常二進位制檔案的輸出

Respecting the return value of .write()
=============================================
real        58.88
user        56.79
sys          8.79
  87810048  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
     19427  page reclaims
      3134  page faults
         0  swaps
         5  block input operations
       194  block output operations
         0  messages sent
         0  messages received
         1  signals received
        12  voluntary context switches
    666037  involuntary context switches

虛擬記憶體佔用的最大位元組大小約為 87.81 MB。

現在改變 .write() 函式的返回值,我們得到

Without respecting the return value of .write():
==================================================
real        54.48
user        53.15
sys          7.43
1524965376  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
    373617  page reclaims
      3139  page faults
         0  swaps
        18  block input operations
       199  block output operations
         0  messages sent
         0  messages received
         1  signals received
        25  voluntary context switches
    629566  involuntary context switches

虛擬記憶體佔用的最大位元組大小約為 1.52 GB。

如果沒有流來協調背壓,分配的記憶體空間會大一個數量級——同一個程序之間存在巨大的差異!

這個實驗展示了 Node.js 的背壓機制對於你的計算系統是多麼最佳化和高效。現在,讓我們來分析一下它是如何工作的!

背壓如何解決這些問題?

有不同的函式可以將資料從一個程序傳輸到另一個程序。在 Node.js 中,有一個內建的內部函式叫做 .pipe()。你也可以使用其他包!但最終,在這個過程的基礎層面上,我們有兩個獨立的元件:資料*源*和*消費者*。

當從源頭呼叫 .pipe() 時,它會向消費者發出訊號,表示有資料要傳輸。pipe 函式幫助為事件觸發器設定適當的背壓閉包。

在 Node.js 中,源是 Readable 流,消費者是 Writable 流(這兩者都可以與 DuplexTransform 流互換,但這超出了本指南的範圍)。

背壓被觸發的時刻可以精確地縮小到 Writable.write() 函式的返回值。當然,這個返回值是由一些條件決定的。

在任何資料緩衝區超過 highWaterMark 或寫入隊列當前繁忙的情況下,.write() 將返回 false

當返回一個 false 值時,背壓系統就會啟動。它會暫停傳入的 Readable 流傳送任何資料,並等待消費者再次準備好。一旦資料緩衝區被清空,就會發出一個 'drain' 事件,並恢復傳入的資料流。

一旦佇列處理完成,背壓機制將允許資料再次傳送。之前被佔用的記憶體空間將被釋放,為下一批資料做準備。

這有效地允許在任何給定時間為 .pipe() 函式使用固定量的記憶體。不會有記憶體洩漏,也不會有無限緩衝,垃圾回收器只需要處理記憶體中的一個區域!

那麼,如果背壓如此重要,你(可能)為什麼沒有聽說過它?嗯,答案很簡單:Node.js 會自動為你完成這一切。

這太棒了!但當我們試圖理解如何實現我們自己的自定義流時,這又不是那麼好。

在大多數機器中,有一個位元組大小決定了緩衝區何時已滿(這在不同機器上會有所不同)。Node.js 允許你設定自定義的 highWaterMark,但通常預設設定為 16kb(16384,或者對於 objectMode 流是 16)。在某些情況下,你可能想提高這個值,可以這樣做,但要謹慎!

.pipe() 的生命週期

為了更好地理解背壓,這裡有一個關於 Readable 流被管道傳輸Writable 流的生命週期流程圖

                                                     +===================+
                         x-->  Piping functions   +-->   src.pipe(dest)  |
                         x     are set up during     |===================|
                         x     the .pipe method.     |  Event callbacks  |
  +===============+      x                           |-------------------|
  |   Your Data   |      x     They exist outside    | .on('close', cb)  |
  +=======+=======+      x     the data flow, but    | .on('data', cb)   |
          |              x     importantly attach    | .on('drain', cb)  |
          |              x     events, and their     | .on('unpipe', cb) |
+---------v---------+    x     respective callbacks. | .on('error', cb)  |
|  Readable Stream  +----+                           | .on('finish', cb) |
+-^-------^-------^-+    |                           | .on('end', cb)    |
  ^       |       ^      |                           +-------------------+
  |       |       |      |
  |       ^       |      |
  ^       ^       ^      |    +-------------------+         +=================+
  ^       |       ^      +---->  Writable Stream  +--------->  .write(chunk)  |
  |       |       |           +-------------------+         +=======+=========+
  |       |       |                                                 |
  |       ^       |                              +------------------v---------+
  ^       |       +-> if (!chunk)                |    Is this chunk too big?  |
  ^       |       |     emit .end();             |    Is the queue busy?      |
  |       |       +-> else                       +-------+----------------+---+
  |       ^       |     emit .write();                   |                |
  |       ^       ^                                   +--v---+        +---v---+
  |       |       ^-----------------------------------<  No  |        |  Yes  |
  ^       |                                           +------+        +---v---+
  ^       |                                                               |
  |       ^               emit .pause();          +=================+     |
  |       ^---------------^-----------------------+  return false;  <-----+---+
  |                                               +=================+         |
  |                                                                           |
  ^            when queue is empty     +============+                         |
  ^------------^-----------------------<  Buffering |                         |
               |                       |============|                         |
               +> emit .drain();       |  ^Buffer^  |                         |
               +> emit .resume();      +------------+                         |
                                       |  ^Buffer^  |                         |
                                       +------------+   add chunk to queue    |
                                       |            <---^---------------------<
                                       +============+

如果你正在設定一個管道來將幾個流連線起來以操作你的資料,你很可能會實現 Transform 流。

在這種情況下,你從 Readable 流的輸出將進入 Transform 流,然後管道傳輸到 Writable 流。

Readable.pipe(Transformable).pipe(Writable);

背壓會自動應用,但請注意,Transform 流的傳入和傳出 highWaterMark 都可以被操作,並且會影響背壓系統。

背壓指南

Node.js v0.10 起,Stream 類提供了修改 .read().write() 行為的能力,透過使用這些相應函式的下劃線版本(._read()._write())。

有關於實現可讀流實現可寫流的文件指南。我們假設您已經閱讀過這些內容,下一節將進行更深入的探討。

實現自定義流時應遵守的規則

流的黃金法則是**始終尊重背壓**。最佳實踐就是不相互矛盾的實踐。只要你小心避免與內部背壓支援衝突的行為,你就可以確保你遵循了良好的實踐。

總的來說,

  1. 如果沒有被要求,絕不要 .push()
  2. .write() 返回 false 後,絕不要再呼叫它,而應等待 'drain' 事件。
  3. 流在不同的 Node.js 版本和你使用的庫之間會有變化。請小心並進行測試。

關於第 3 點,一個用於構建瀏覽器流的非常有用的包是readable-stream。Rodd Vagg 寫了一篇很棒的部落格文章,描述了這個庫的實用性。簡而言之,它為Readable流提供了一種自動的優雅降級,並支援舊版本的瀏覽器和 Node.js。

針對可讀流的特定規則

到目前為止,我們已經探討了 .write() 如何影響背壓,並且主要關注了 Writable 流。由於 Node.js 的功能,資料技術上是從 Readable 流向 Writable 流。然而,正如我們在任何資料、物質或能量傳輸中觀察到的那樣,源頭和目的地同樣重要,Readable 流對於如何處理背壓至關重要。

這兩個過程相互依賴以實現有效通訊,如果 Readable 流忽略了 Writable 流要求其停止傳送資料的請求,這可能與 .write() 的返回值不正確一樣 problematic。

因此,除了尊重 .write() 的返回之外,我們還必須尊重在 ._read() 方法中使用的 .push() 的返回值。如果 .push() 返回一個 false 值,流將停止從源頭讀取。否則,它將不間斷地繼續。

這是一個使用 .push() 的不良實踐示例

// This is problematic as it completely ignores the return value from the push
// which may be a signal for backpressure from the destination stream!
class  extends Readable {
  () {
    let ;
    while (null !== ( = getNextChunk())) {
      this.push();
    }
  }
}

這是一個良好實踐的例子,其中 Readable 流透過檢查 this.push() 的返回值來尊重背壓

class  extends Readable {
  () {
    let ;
    let  = true;
    while ( && null !== ( = getNextChunk())) {
       = this.push();
    }
  }
}

此外,從自定義流外部,忽略背壓也存在陷阱。在這個良好實踐的反例中,應用程式的程式碼在資料可用時(由 'data' 事件發出訊號)強制推送資料

// This ignores the backpressure mechanisms Node.js has set in place,
// and unconditionally pushes through data, regardless if the
// destination stream is ready for it or not.
readable.on('data',  => writable.write());

這裡有一個在可讀流中使用 .push() 的例子。

const {  } = ('node:stream');

// Create a custom Readable stream
const  = new ({
  : true,
  () {
    // Push some data onto the stream
    this.({ : 'Hello, world!' });
    this.(null); // Mark the end of the stream
  },
});

// Consume the stream
.('data',  => {
  .();
});

// Output:
// { message: 'Hello, world!' }

在這個例子中,我們建立了一個自定義的可讀流,它使用 .push() 將單個物件推入流中。._read() 方法在流準備好消費資料時被呼叫,在這種情況下,我們立即將一些資料推入流中,並透過推送 null 來標記流的結束。

然後,我們透過監聽 'data' 事件來消費流,並記錄推入流中的每個資料塊。在這種情況下,我們只向流中推送了一個數據塊,所以我們只看到一條日誌訊息。

特定於可寫流的規則

回想一下,.write() 可能會根據一些條件返回 true 或 false。幸運的是,當我們構建自己的 Writable 流時,流狀態機會處理我們的回撥,並決定何時處理背壓以及為我們最佳化資料流。

然而,當我們想直接使用 Writable 時,我們必須尊重 .write() 的返回值,並密切關注這些條件

  • 如果寫佇列繁忙,.write() 將返回 false。
  • 如果資料塊太大,.write() 將返回 false(限制由變數 highWaterMark 指示)。
// This writable is invalid because of the async nature of JavaScript callbacks.
// Without a return statement for each callback prior to the last,
// there is a great chance multiple callbacks will be called.
class  extends Writable {
  (, , ) {
    if (.toString().indexOf('a') >= 0) {
      ();
    } else if (.toString().indexOf('b') >= 0) {
      ();
    }
    ();
  }
}

// The proper way to write this would be:
if (chunk.contains('a')) {
  return callback();
}

if (chunk.contains('b')) {
  return callback();
}
callback();

在實現 ._writev() 時也需要注意一些事情。該函式與 .cork() 配對使用,但在編寫時有一個常見的錯誤

// Using .uncork() twice here makes two calls on the C++ layer, rendering the
// cork/uncork technique useless.
ws.cork();
ws.write('hello ');
ws.write('world ');
ws.uncork();

ws.cork();
ws.write('from ');
ws.write('Matteo');
ws.uncork();

// The correct way to write this is to utilize process.nextTick(), which fires
// on the next event loop.
ws.cork();
ws.write('hello ');
ws.write('world ');
.(, ws);

ws.cork();
ws.write('from ');
ws.write('Matteo');
.(, ws);

// As a global function.
function () {
  .uncork();
}

.cork() 可以被呼叫任意多次,我們只需要小心地呼叫 .uncork() 相同次數,以使其再次流動。

結論

流是 Node.js 中常用的模組。它們對內部結構很重要,對於開發者來說,擴充套件和連線 Node.js 模組生態系統也很重要。

希望現在你能夠在考慮背壓的情況下,安全地編寫自己的 WritableReadable 流,並與同事和朋友分享你的知識。

請務必多閱讀關於 Stream 的其他 API 函式,以幫助你在使用 Node.js 構建應用程式時,提升和釋放你的流處理能力。