Cluster (叢集)#

穩定性:2 - 穩定

原始碼: lib/cluster.js

Node.js 程序叢集可用於執行多個 Node.js 例項,這些例項可以在其應用程式執行緒之間分配工作負載。當不需要程序隔離時,請改用 worker_threads 模組,它允許在單個 Node.js 例項中執行多個應用程式執行緒。

cluster 模組可以輕鬆建立共享伺服器埠的子程序。

import cluster from 'node:cluster';
import http from 'node:http';
import { availableParallelism } from 'node:os';
import process from 'node:process';

const numCPUs = availableParallelism();

if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);

  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`worker ${worker.process.pid} died`);
  });
} else {
  // Workers can share any TCP connection
  // In this case it is an HTTP server
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');
  }).listen(8000);

  console.log(`Worker ${process.pid} started`);
}const cluster = require('node:cluster');
const http = require('node:http');
const numCPUs = require('node:os').availableParallelism();
const process = require('node:process');

if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);

  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`worker ${worker.process.pid} died`);
  });
} else {
  // Workers can share any TCP connection
  // In this case it is an HTTP server
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');
  }).listen(8000);

  console.log(`Worker ${process.pid} started`);
}

現在,執行 Node.js 將在工作程序之間共享 8000 埠。

$ node server.js
Primary 3596 is running
Worker 4324 started
Worker 4520 started
Worker 6056 started
Worker 5644 started 

在 Windows 上,目前還無法在工作程序中設定命名管道伺服器。

工作原理#

工作程序是使用 child_process.fork() 方法派生的,因此它們可以透過 IPC 與父程序通訊,並來回傳遞伺服器控制代碼。

cluster 模組支援兩種分發傳入連線的方法。

第一種(也是除 Windows 外所有平臺上的預設方法)是輪詢(round-robin)方法,即主程序監聽一個埠,接受新連線,並以輪詢方式將它們分發給工作程序,同時內建了一些智慧機制以避免工作程序過載。

第二種方法是主程序建立監聽套接字並將其傳送給感興趣的工作程序。然後,工作程序直接接受傳入的連線。

理論上,第二種方法應該能提供最佳效能。然而在實踐中,由於作業系統排程程式的變幻莫測,分發往往非常不均衡。曾觀察到,在總共八個程序中,超過 70% 的連線最終都落在了僅僅兩個程序上。

因為 server.listen() 將大部分工作交給了主程序,所以在以下三種情況下,普通 Node.js 程序和叢集工作程序的行為會有所不同:

  1. server.listen({fd: 7}) 因為訊息被傳遞給主程序,所以監聽的是父程序中的檔案描述符 7,控制代碼被傳遞給工作程序,而不是監聽工作程序自己認為的檔案描述符 7 所引用的內容。
  2. server.listen(handle) 顯式地監聽控制代碼將導致工作程序使用提供的控制代碼,而不是與主程序通訊。
  3. server.listen(0) 通常情況下,這會導致伺服器在隨機埠上監聽。然而,在叢集中,每個工作程序每次執行 listen(0) 時都會收到相同的“隨機”埠。實質上,埠在第一次是隨機的,但此後是可預測的。要監聽一個唯一的埠,請根據叢集工作程序的 ID 生成一個埠號。

Node.js 不提供路由邏輯。因此,設計應用程式時,對於像會話和登入這樣的東西,不要過分依賴記憶體中的資料物件,這一點很重要。

因為工作程序都是獨立的程序,可以根據程式的需要被殺死或重新派生,而不會影響其他工作程序。只要還有一些工作程序存活,伺服器就會繼續接受連線。如果沒有存活的工作程序,現有連線將被斷開,新的連線將被拒絕。然而,Node.js 不會自動管理工作程序的數量。應用程式有責任根據自身需求來管理工作程序池。

雖然 node:cluster 模組的一個主要用例是網路程式設計,但它也可以用於其他需要工作程序的場景。

類: Worker#

一個 Worker 物件包含關於一個工作程序的所有公共資訊和方法。在主程序中,可以透過 cluster.workers 獲取。在工作程序中,可以透過 cluster.worker 獲取。

事件: 'disconnect'#

類似於 cluster.on('disconnect') 事件,但特定於此工作程序。

cluster.fork().on('disconnect', () => {
  // Worker has disconnected
}); 

事件: 'error'#

此事件與 child_process.fork() 提供的事件相同。

在工作程序內部,也可以使用 process.on('error')

事件: 'exit'#

  • code <number> 如果是正常退出,則為退出碼。
  • signal <string> 導致程序被殺死的訊號名稱(例如 'SIGHUP')。

類似於 cluster.on('exit') 事件,但特定於此工作程序。

import cluster from 'node:cluster';

if (cluster.isPrimary) {
  const worker = cluster.fork();
  worker.on('exit', (code, signal) => {
    if (signal) {
      console.log(`worker was killed by signal: ${signal}`);
    } else if (code !== 0) {
      console.log(`worker exited with error code: ${code}`);
    } else {
      console.log('worker success!');
    }
  });
}const cluster = require('node:cluster');

if (cluster.isPrimary) {
  const worker = cluster.fork();
  worker.on('exit', (code, signal) => {
    if (signal) {
      console.log(`worker was killed by signal: ${signal}`);
    } else if (code !== 0) {
      console.log(`worker exited with error code: ${code}`);
    } else {
      console.log('worker success!');
    }
  });
}

事件: 'listening'#

類似於 cluster.on('listening') 事件,但特定於此工作程序。

cluster.fork().on('listening', (address) => {
  // Worker is listening
});cluster.fork().on('listening', (address) => {
  // Worker is listening
});

它不會在工作程序中觸發。

事件: 'message'#

類似於 cluster'message' 事件,但特定於此工作程序。

在工作程序內部,也可以使用 process.on('message')

參見 process 事件: 'message'

下面是一個使用訊息系統的示例。它在主程序中記錄工作程序接收到的 HTTP 請求數量:

import cluster from 'node:cluster';
import http from 'node:http';
import { availableParallelism } from 'node:os';
import process from 'node:process';

if (cluster.isPrimary) {

  // Keep track of http requests
  let numReqs = 0;
  setInterval(() => {
    console.log(`numReqs = ${numReqs}`);
  }, 1000);

  // Count requests
  function messageHandler(msg) {
    if (msg.cmd && msg.cmd === 'notifyRequest') {
      numReqs += 1;
    }
  }

  // Start workers and listen for messages containing notifyRequest
  const numCPUs = availableParallelism();
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  for (const id in cluster.workers) {
    cluster.workers[id].on('message', messageHandler);
  }

} else {

  // Worker processes have a http server.
  http.Server((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');

    // Notify primary about the request
    process.send({ cmd: 'notifyRequest' });
  }).listen(8000);
}const cluster = require('node:cluster');
const http = require('node:http');
const numCPUs = require('node:os').availableParallelism();
const process = require('node:process');

if (cluster.isPrimary) {

  // Keep track of http requests
  let numReqs = 0;
  setInterval(() => {
    console.log(`numReqs = ${numReqs}`);
  }, 1000);

  // Count requests
  function messageHandler(msg) {
    if (msg.cmd && msg.cmd === 'notifyRequest') {
      numReqs += 1;
    }
  }

  // Start workers and listen for messages containing notifyRequest
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  for (const id in cluster.workers) {
    cluster.workers[id].on('message', messageHandler);
  }

} else {

  // Worker processes have a http server.
  http.Server((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');

    // Notify primary about the request
    process.send({ cmd: 'notifyRequest' });
  }).listen(8000);
}

事件: 'online'#

類似於 cluster.on('online') 事件,但特定於此工作程序。

cluster.fork().on('online', () => {
  // Worker is online
}); 

它不會在工作程序中觸發。

worker.disconnect()#

在工作程序中,此函式將關閉所有伺服器,等待這些伺服器上的 'close' 事件,然後斷開 IPC 通道。

在主程序中,會向工作程序傳送一個內部訊息,使其在自身上呼叫 .disconnect()

會導致 .exitedAfterDisconnect 被設定。

伺服器關閉後,它將不再接受新的連線,但連線可能會被任何其他正在監聽的工作程序接受。現有連線將被允許正常關閉。當不再有連線存在時,參見 server.close(),與工作程序的 IPC 通道將關閉,使其能夠優雅地退出。

上述內容*僅*適用於伺服器連線,客戶端連線不會被工作程序自動關閉,並且 disconnect 不會等待它們關閉後再退出。

在工作程序中,存在 process.disconnect,但它不是此函式;它是 disconnect()

因為長時間存活的伺服器連線可能會阻止工作程序斷開連線,所以傳送一個訊息可能會很有用,以便應用程式可以採取特定操作來關閉它們。實現一個超時機制也可能很有用,如果在一段時間後 'disconnect' 事件仍未觸發,則殺死該工作程序。

if (cluster.isPrimary) {
  const worker = cluster.fork();
  let timeout;

  worker.on('listening', (address) => {
    worker.send('shutdown');
    worker.disconnect();
    timeout = setTimeout(() => {
      worker.kill();
    }, 2000);
  });

  worker.on('disconnect', () => {
    clearTimeout(timeout);
  });

} else if (cluster.isWorker) {
  const net = require('node:net');
  const server = net.createServer((socket) => {
    // Connections never end
  });

  server.listen(8000);

  process.on('message', (msg) => {
    if (msg === 'shutdown') {
      // Initiate graceful close of any connections to server
    }
  });
} 

worker.exitedAfterDisconnect#

如果工作程序因 .disconnect() 而退出,則此屬性為 true。如果工作程序以任何其他方式退出,則為 false。如果工作程序尚未退出,則為 undefined

布林值 worker.exitedAfterDisconnect 允許區分自願退出和意外退出,主程序可以根據此值選擇不重新派生工作程序。

cluster.on('exit', (worker, code, signal) => {
  if (worker.exitedAfterDisconnect === true) {
    console.log('Oh, it was just voluntary – no need to worry');
  }
});

// kill worker
worker.kill(); 

worker.id#

每個新的工作程序都會被賦予一個唯一的 ID,此 ID 儲存在 id 中。

當一個工作程序存活時,這個 ID 是它在 cluster.workers 中的索引鍵。

worker.isConnected()#

如果工作程序透過其 IPC 通道連線到其主程序,此函式返回 true,否則返回 false。一個工作程序在被建立後即連線到其主程序。在 'disconnect' 事件被觸發後,它會斷開連線。

worker.isDead()#

如果工作程序的程序已經終止(無論是由於退出還是被訊號殺死),此函式返回 true。否則,返回 false

import cluster from 'node:cluster';
import http from 'node:http';
import { availableParallelism } from 'node:os';
import process from 'node:process';

const numCPUs = availableParallelism();

if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);

  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('fork', (worker) => {
    console.log('worker is dead:', worker.isDead());
  });

  cluster.on('exit', (worker, code, signal) => {
    console.log('worker is dead:', worker.isDead());
  });
} else {
  // Workers can share any TCP connection. In this case, it is an HTTP server.
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end(`Current process\n ${process.pid}`);
    process.kill(process.pid);
  }).listen(8000);
}const cluster = require('node:cluster');
const http = require('node:http');
const numCPUs = require('node:os').availableParallelism();
const process = require('node:process');

if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);

  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('fork', (worker) => {
    console.log('worker is dead:', worker.isDead());
  });

  cluster.on('exit', (worker, code, signal) => {
    console.log('worker is dead:', worker.isDead());
  });
} else {
  // Workers can share any TCP connection. In this case, it is an HTTP server.
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end(`Current process\n ${process.pid}`);
    process.kill(process.pid);
  }).listen(8000);
}

worker.kill([signal])#

  • signal <string> 傳送給工作程序的 kill 訊號的名稱。預設值: 'SIGTERM'

此函式將殺死工作程序。在主程序中,它透過斷開 worker.process 的連線來實現,並在斷開連線後,用 signal 殺死它。在工作程序中,它透過用 signal 殺死程序來實現。

kill() 函式會殺死工作程序而不等待其優雅斷開,其行為與 worker.process.kill() 相同。

為了向後相容,此方法也別名為 worker.destroy()

在工作程序中,存在 process.kill(),但它不是此函式;它是 kill()

worker.process#

所有工作程序都是使用 child_process.fork() 建立的,此函式返回的物件儲存為 .process。在工作程序中,儲存的是全域性的 process

參見:子程序模組

如果 process 上發生 'disconnect' 事件且 .exitedAfterDisconnect 不為 true,工作程序將呼叫 process.exit(0)。這可以防止意外斷開連線。

worker.send(message[, sendHandle[, options]][, callback])#

  • message <Object>
  • sendHandle <Handle>
  • options <Object> 如果存在 options 引數,它是一個用於引數化傳送某些型別控制代碼的物件。options 支援以下屬性:
    • keepOpen <boolean> 一個可以在傳遞 net.Socket 例項時使用的值。當為 true 時,套接字在傳送程序中保持開啟狀態。預設值: false
  • callback <Function>
  • 返回:<boolean>

向工作程序或主程序傳送訊息,可選擇性地附帶一個控制代碼。

在主程序中,這會向一個特定的工作程序傳送訊息。它與 ChildProcess.send() 完全相同。

在工作程序中,這會向主程序傳送訊息。它與 process.send() 完全相同。

此示例將回顯來自主程序的所有訊息:

if (cluster.isPrimary) {
  const worker = cluster.fork();
  worker.send('hi there');

} else if (cluster.isWorker) {
  process.on('message', (msg) => {
    process.send(msg);
  });
} 

事件: 'disconnect'#

在工作程序 IPC 通道斷開後觸發。這可能發生在工作程序正常退出、被殺死或被手動斷開連線(例如使用 worker.disconnect())時。

'disconnect''exit' 事件之間可能會有延遲。這些事件可用於檢測程序是否在清理過程中卡住,或者是否存在長時間存活的連線。

cluster.on('disconnect', (worker) => {
  console.log(`The worker #${worker.id} has disconnected`);
}); 

事件: 'exit'#

當任何一個工作程序死亡時,cluster 模組將觸發 'exit' 事件。

這可以用於透過再次呼叫 .fork() 來重啟工作程序。

cluster.on('exit', (worker, code, signal) => {
  console.log('worker %d died (%s). restarting...',
              worker.process.pid, signal || code);
  cluster.fork();
}); 

參見 child_process 事件: 'exit'

事件: 'fork'#

當一個新的工作程序被派生時,cluster 模組將觸發一個 'fork' 事件。這可以用於記錄工作程序活動,並建立自定義超時。

const timeouts = [];
function errorMsg() {
  console.error('Something must be wrong with the connection ...');
}

cluster.on('fork', (worker) => {
  timeouts[worker.id] = setTimeout(errorMsg, 2000);
});
cluster.on('listening', (worker, address) => {
  clearTimeout(timeouts[worker.id]);
});
cluster.on('exit', (worker, code, signal) => {
  clearTimeout(timeouts[worker.id]);
  errorMsg();
}); 

事件: 'listening'#

在工作程序中呼叫 listen() 之後,當伺服器上觸發 'listening' 事件時,在主程序的 cluster 上也會觸發一個 'listening' 事件。

事件處理程式執行時帶有兩個引數,worker 包含工作程序物件,address 物件包含以下連線屬性:addressportaddressType。如果工作程序正在監聽多個地址,這將非常有用。

cluster.on('listening', (worker, address) => {
  console.log(
    `A worker is now connected to ${address.address}:${address.port}`);
}); 

addressType 是以下之一:

  • 4 (TCPv4)
  • 6 (TCPv6)
  • -1 (Unix 域套接字)
  • 'udp4''udp6' (UDPv4 或 UDPv6)

事件: 'message'#

當叢集主程序收到來自任何工作程序的訊息時觸發。

參見 child_process 事件: 'message'

事件: 'online'#

派生一個新的工作程序後,該工作程序應該響應一個線上訊息。當主程序收到線上訊息時,它將觸發此事件。'fork''online' 的區別在於,fork 是在主程序派生工作程序時觸發的,而 'online' 是在工作程序執行時觸發的。

cluster.on('online', (worker) => {
  console.log('Yay, the worker responded after it was forked');
}); 

事件: 'setup'#

每次呼叫 .setupPrimary() 時都會觸發。

settings 物件是呼叫 .setupPrimary() 時的 cluster.settings 物件,並且僅供參考,因為在單個 tick 中可以多次呼叫 .setupPrimary()

如果準確性很重要,請使用 cluster.settings

cluster.disconnect([callback])#

  • callback <Function> 當所有工作程序都斷開連線並且控制代碼都關閉時呼叫。

cluster.workers 中的每個工作程序呼叫 .disconnect()

當它們斷開連線時,所有內部控制代碼將被關閉,如果沒有其他事件在等待,則允許主程序正常退出。

該方法接受一個可選的回撥引數,該引數將在完成後被呼叫。

這隻能從主程序呼叫。

cluster.fork([env])#

派生一個新的工作程序。

這隻能從主程序呼叫。

cluster.isMaster#

穩定性: 0 - 廢棄

cluster.isPrimary 的廢棄別名。

cluster.isPrimary#

如果程序是主程序,則為 True。這是由 process.env.NODE_UNIQUE_ID 決定的。如果 process.env.NODE_UNIQUE_ID 是 undefined,則 isPrimarytrue

cluster.isWorker#

如果程序不是主程序,則為 True(它是 cluster.isPrimary 的否定)。

cluster.schedulingPolicy#

排程策略,可以是用於輪詢的 cluster.SCHED_RR,或者交由作業系統處理的 cluster.SCHED_NONE。這是一個全域性設定,一旦第一個工作程序被派生,或者 .setupPrimary() 被呼叫(以先發生者為準),該設定實際上就被凍結了。

除了 Windows,SCHED_RR 是所有作業系統上的預設值。一旦 libuv 能夠有效分發 IOCP 控制代碼而不會導致大的效能損失,Windows 將更改為 SCHED_RR

cluster.schedulingPolicy 也可以透過 NODE_CLUSTER_SCHED_POLICY 環境變數來設定。有效值為 'rr''none'

cluster.settings#

  • 型別:<Object>
    • execArgv <string[]> 傳遞給 Node.js 可執行檔案的字串引數列表。預設值: process.execArgv
    • exec <string> 工作程序檔案的路徑。預設值: process.argv[1]
    • args <string[]> 傳遞給工作程序的字串引數。預設值: process.argv.slice(2)
    • cwd <string> 工作程序的當前工作目錄。預設值: undefined(從父程序繼承)。
    • serialization <string> 指定用於在程序之間傳送訊息的序列化型別。可能的值是 'json''advanced'。更多詳情請參見 child_process 的高階序列化預設值: false
    • silent <boolean> 是否將輸出傳送到父程序的 stdio。預設值: false
    • stdio <Array> 配置派生程序的 stdio。因為 cluster 模組依賴 IPC 來執行,這個配置必須包含一個 'ipc' 條目。當提供此選項時,它會覆蓋 silent。參見 child_process.spawn()stdio
    • uid <number> 設定程序的使用者標識。(參見 setuid(2).)
    • gid <number> 設定程序的組標識。(參見 setgid(2).)
    • inspectPort <number> | <Function> 設定工作程序的檢查器埠。這可以是一個數字,或者一個不帶引數並返回數字的函式。預設情況下,每個工作程序都會獲得自己的埠,從主程序的 process.debugPort 開始遞增。
    • windowsHide <boolean> 隱藏在 Windows 系統上通常會建立的派生程序的控制檯視窗。預設值: false

呼叫 .setupPrimary()(或 .fork())之後,此設定物件將包含設定,包括預設值。

不應手動更改或設定此物件。

cluster.setupMaster([settings])#

穩定性: 0 - 廢棄

.setupPrimary() 的廢棄別名。

cluster.setupPrimary([settings])#

setupPrimary 用於更改預設的 'fork' 行為。一旦呼叫,這些設定將存在於 cluster.settings 中。

任何設定更改僅影響未來的 .fork() 呼叫,對已經執行的工作程序沒有影響。

工作程序中唯一不能透過 .setupPrimary() 設定的屬性是傳遞給 .fork()env

上述預設值僅適用於首次呼叫;後續呼叫的預設值是呼叫 cluster.setupPrimary() 時的當前值。

import cluster from 'node:cluster';

cluster.setupPrimary({
  exec: 'worker.js',
  args: ['--use', 'https'],
  silent: true,
});
cluster.fork(); // https worker
cluster.setupPrimary({
  exec: 'worker.js',
  args: ['--use', 'http'],
});
cluster.fork(); // http workerconst cluster = require('node:cluster');

cluster.setupPrimary({
  exec: 'worker.js',
  args: ['--use', 'https'],
  silent: true,
});
cluster.fork(); // https worker
cluster.setupPrimary({
  exec: 'worker.js',
  args: ['--use', 'http'],
});
cluster.fork(); // http worker

這隻能從主程序呼叫。

cluster.worker#

對當前工作程序物件的引用。在主程序中不可用。

import cluster from 'node:cluster';

if (cluster.isPrimary) {
  console.log('I am primary');
  cluster.fork();
  cluster.fork();
} else if (cluster.isWorker) {
  console.log(`I am worker #${cluster.worker.id}`);
}const cluster = require('node:cluster');

if (cluster.isPrimary) {
  console.log('I am primary');
  cluster.fork();
  cluster.fork();
} else if (cluster.isWorker) {
  console.log(`I am worker #${cluster.worker.id}`);
}

cluster.workers#

一個儲存活動工作程序物件的雜湊表,以 id 欄位為鍵。這使得遍歷所有工作程序變得容易。它僅在主程序中可用。

一個工作程序在斷開連線*並且*退出後會從 cluster.workers 中移除。這兩個事件的順序無法預先確定。但是,可以保證從 cluster.workers 列表中移除的操作發生在最後一個 'disconnect''exit' 事件觸發之前。

import cluster from 'node:cluster';

for (const worker of Object.values(cluster.workers)) {
  worker.send('big announcement to all workers');
}const cluster = require('node:cluster');

for (const worker of Object.values(cluster.workers)) {
  worker.send('big announcement to all workers');
}