工作執行緒#

穩定性:2 - 穩定

原始碼: lib/worker_threads.js

node:worker_threads 模組允許使用並行執行 JavaScript 的執行緒。要訪問它:

import worker_threads from 'node:worker_threads';'use strict';

const worker_threads = require('node:worker_threads');

工作執行緒(Worker)對於執行 CPU 密集型的 JavaScript 操作非常有用。它們對 I/O 密集型的工作幫助不大。Node.js 內建的非同步 I/O 操作比工作執行緒更高效。

child_processcluster 不同,worker_threads 可以共享記憶體。它們透過傳輸 ArrayBuffer 例項或共享 SharedArrayBuffer 例項來實現這一點。

import {
  Worker,
  isMainThread,
  parentPort,
  workerData,
} from 'node:worker_threads';

if (!isMainThread) {
  const { parse } = await import('some-js-parsing-library');
  const script = workerData;
  parentPort.postMessage(parse(script));
}

export default function parseJSAsync(script) {
  return new Promise((resolve, reject) => {
    const worker = new Worker(new URL(import.meta.url), {
      workerData: script,
    });
    worker.on('message', resolve);
    worker.on('error', reject);
    worker.on('exit', (code) => {
      if (code !== 0)
        reject(new Error(`Worker stopped with exit code ${code}`));
    });
  });
};'use strict';

const {
  Worker,
  isMainThread,
  parentPort,
  workerData,
} = require('node:worker_threads');

if (isMainThread) {
  module.exports = function parseJSAsync(script) {
    return new Promise((resolve, reject) => {
      const worker = new Worker(__filename, {
        workerData: script,
      });
      worker.on('message', resolve);
      worker.on('error', reject);
      worker.on('exit', (code) => {
        if (code !== 0)
          reject(new Error(`Worker stopped with exit code ${code}`));
      });
    });
  };
} else {
  const { parse } = require('some-js-parsing-library');
  const script = workerData;
  parentPort.postMessage(parse(script));
}

以上示例為每個 parseJSAsync() 呼叫都生成一個工作執行緒。在實踐中,對此類任務應使用工作執行緒池。否則,建立工作執行緒的開銷可能會超過其帶來的好處。

在實現工作執行緒池時,請使用 AsyncResource API 來告知診斷工具(例如,提供非同步堆疊跟蹤)任務與其結果之間的關聯。有關示例實現,請參閱 async_hooks 文件中的“為 Worker 執行緒池使用 AsyncResource

工作執行緒預設繼承非程序特定的選項。請參閱 Worker 建構函式選項 以瞭解如何自定義工作執行緒選項,特別是 argvexecArgv 選項。

worker_threads.getEnvironmentData(key)#

  • key <any> 任何可用作 <Map> 鍵的任意、可克隆的 JavaScript 值。
  • 返回:<any>

在工作執行緒內部,worker.getEnvironmentData() 返回傳遞給生成執行緒的 worker.setEnvironmentData() 的資料的克隆。每個新的 Worker 都會自動接收其自己的一份環境資料副本。

import {
  Worker,
  isMainThread,
  setEnvironmentData,
  getEnvironmentData,
} from 'node:worker_threads';

if (isMainThread) {
  setEnvironmentData('Hello', 'World!');
  const worker = new Worker(new URL(import.meta.url));
} else {
  console.log(getEnvironmentData('Hello'));  // Prints 'World!'.
}'use strict';

const {
  Worker,
  isMainThread,
  setEnvironmentData,
  getEnvironmentData,
} = require('node:worker_threads');

if (isMainThread) {
  setEnvironmentData('Hello', 'World!');
  const worker = new Worker(__filename);
} else {
  console.log(getEnvironmentData('Hello'));  // Prints 'World!'.
}

worker_threads.isInternalThread#

如果此程式碼在內部 Worker 執行緒(例如載入器執行緒)中執行,則為 true

node --experimental-loader ./loader.js main.js 
// loader.js
import { isInternalThread } from 'node:worker_threads';
console.log(isInternalThread);  // true// loader.js
'use strict';

const { isInternalThread } = require('node:worker_threads');
console.log(isInternalThread);  // true
// main.js
import { isInternalThread } from 'node:worker_threads';
console.log(isInternalThread);  // false// main.js
'use strict';

const { isInternalThread } = require('node:worker_threads');
console.log(isInternalThread);  // false

worker_threads.isMainThread#

如果此程式碼不在 Worker 執行緒中執行,則為 true

import { Worker, isMainThread } from 'node:worker_threads';

if (isMainThread) {
  // This re-loads the current file inside a Worker instance.
  new Worker(new URL(import.meta.url));
} else {
  console.log('Inside Worker!');
  console.log(isMainThread);  // Prints 'false'.
}'use strict';

const { Worker, isMainThread } = require('node:worker_threads');

if (isMainThread) {
  // This re-loads the current file inside a Worker instance.
  new Worker(__filename);
} else {
  console.log('Inside Worker!');
  console.log(isMainThread);  // Prints 'false'.
}

worker_threads.markAsUntransferable(object)#

  • object <any> 任何任意的 JavaScript 值。

將一個物件標記為不可傳輸。如果 object 出現在 port.postMessage() 呼叫的傳輸列表中,將丟擲一個錯誤。如果 object 是一個原始值,則此操作為空操作。

特別是,這對於可以被克隆而不是被傳輸,並且在傳送端被其他物件使用的物件很有意義。例如,Node.js 用此方法標記其用於 BufferArrayBuffer

此操作無法撤銷。

import { MessageChannel, markAsUntransferable } from 'node:worker_threads';

const pooledBuffer = new ArrayBuffer(8);
const typedArray1 = new Uint8Array(pooledBuffer);
const typedArray2 = new Float64Array(pooledBuffer);

markAsUntransferable(pooledBuffer);

const { port1 } = new MessageChannel();
try {
  // This will throw an error, because pooledBuffer is not transferable.
  port1.postMessage(typedArray1, [ typedArray1.buffer ]);
} catch (error) {
  // error.name === 'DataCloneError'
}

// The following line prints the contents of typedArray1 -- it still owns
// its memory and has not been transferred. Without
// `markAsUntransferable()`, this would print an empty Uint8Array and the
// postMessage call would have succeeded.
// typedArray2 is intact as well.
console.log(typedArray1);
console.log(typedArray2);'use strict';

const { MessageChannel, markAsUntransferable } = require('node:worker_threads');

const pooledBuffer = new ArrayBuffer(8);
const typedArray1 = new Uint8Array(pooledBuffer);
const typedArray2 = new Float64Array(pooledBuffer);

markAsUntransferable(pooledBuffer);

const { port1 } = new MessageChannel();
try {
  // This will throw an error, because pooledBuffer is not transferable.
  port1.postMessage(typedArray1, [ typedArray1.buffer ]);
} catch (error) {
  // error.name === 'DataCloneError'
}

// The following line prints the contents of typedArray1 -- it still owns
// its memory and has not been transferred. Without
// `markAsUntransferable()`, this would print an empty Uint8Array and the
// postMessage call would have succeeded.
// typedArray2 is intact as well.
console.log(typedArray1);
console.log(typedArray2);

瀏覽器中沒有與此 API 等效的功能。

worker_threads.isMarkedAsUntransferable(object)#

檢查一個物件是否已使用 markAsUntransferable() 標記為不可傳輸。

import { markAsUntransferable, isMarkedAsUntransferable } from 'node:worker_threads';

const pooledBuffer = new ArrayBuffer(8);
markAsUntransferable(pooledBuffer);

isMarkedAsUntransferable(pooledBuffer);  // Returns true.'use strict';

const { markAsUntransferable, isMarkedAsUntransferable } = require('node:worker_threads');

const pooledBuffer = new ArrayBuffer(8);
markAsUntransferable(pooledBuffer);

isMarkedAsUntransferable(pooledBuffer);  // Returns true.

瀏覽器中沒有與此 API 等效的功能。

worker_threads.markAsUncloneable(object)#

  • object <any> 任何任意的 JavaScript 值。

將一個物件標記為不可克隆。如果 objectport.postMessage() 呼叫中用作 message,將丟擲一個錯誤。如果 object 是一個原始值,則此操作為空操作。

這對 ArrayBuffer 或任何類似 Buffer 的物件沒有影響。

此操作無法撤銷。

import { markAsUncloneable } from 'node:worker_threads';

const anyObject = { foo: 'bar' };
markAsUncloneable(anyObject);
const { port1 } = new MessageChannel();
try {
  // This will throw an error, because anyObject is not cloneable.
  port1.postMessage(anyObject);
} catch (error) {
  // error.name === 'DataCloneError'
}'use strict';

const { markAsUncloneable } = require('node:worker_threads');

const anyObject = { foo: 'bar' };
markAsUncloneable(anyObject);
const { port1 } = new MessageChannel();
try {
  // This will throw an error, because anyObject is not cloneable.
  port1.postMessage(anyObject);
} catch (error) {
  // error.name === 'DataCloneError'
}

瀏覽器中沒有與此 API 等效的功能。

worker_threads.moveMessagePortToContext(port, contextifiedSandbox)#

將一個 MessagePort 傳輸到另一個 vm 上下文。原始的 port 物件變得不可用,返回的 MessagePort 例項將取而代之。

返回的 MessagePort 是目標上下文中的一個物件,並繼承自其全域性 Object 類。傳遞給 port.onmessage() 監聽器的物件也在目標上下文中建立,並繼承自其全域性 Object 類。

然而,建立的 MessagePort 不再繼承自 <EventTarget>,只能使用 port.onmessage() 來接收事件。

worker_threads.parentPort#

如果當前執行緒是一個 Worker,這是一個允許與父執行緒通訊的 MessagePort。使用 parentPort.postMessage() 傳送的訊息在父執行緒中可以透過 worker.on('message') 獲得,而從父執行緒使用 worker.postMessage() 傳送的訊息在此執行緒中可以透過 parentPort.on('message') 獲得。

import { Worker, isMainThread, parentPort } from 'node:worker_threads';

if (isMainThread) {
  const worker = new Worker(new URL(import.meta.url));
  worker.once('message', (message) => {
    console.log(message);  // Prints 'Hello, world!'.
  });
  worker.postMessage('Hello, world!');
} else {
  // When a message from the parent thread is received, send it back:
  parentPort.once('message', (message) => {
    parentPort.postMessage(message);
  });
}'use strict';

const { Worker, isMainThread, parentPort } = require('node:worker_threads');

if (isMainThread) {
  const worker = new Worker(__filename);
  worker.once('message', (message) => {
    console.log(message);  // Prints 'Hello, world!'.
  });
  worker.postMessage('Hello, world!');
} else {
  // When a message from the parent thread is received, send it back:
  parentPort.once('message', (message) => {
    parentPort.postMessage(message);
  });
}

worker_threads.postMessageToThread(threadId, value[, transferList][, timeout])#

穩定性:1.1 - 活躍開發

向另一個工作執行緒傳送一個值,透過其執行緒 ID 來標識。

如果目標執行緒沒有 workerMessage 事件的監聽器,那麼該操作將丟擲 ERR_WORKER_MESSAGING_FAILED 錯誤。

如果在處理 workerMessage 事件時目標執行緒丟擲了錯誤,那麼該操作將丟擲 ERR_WORKER_MESSAGING_ERRORED 錯誤。

當目標執行緒不是當前執行緒的直接父執行緒或子執行緒時,應使用此方法。如果兩個執行緒是父子關係,請使用 require('node:worker_threads').parentPort.postMessage()worker.postMessage() 讓執行緒進行通訊。

下面的示例展示了 postMessageToThread 的用法:它建立了 10 個巢狀執行緒,最後一個執行緒將嘗試與主執行緒通訊。

import process from 'node:process';
import {
  postMessageToThread,
  threadId,
  workerData,
  Worker,
} from 'node:worker_threads';

const channel = new BroadcastChannel('sync');
const level = workerData?.level ?? 0;

if (level < 10) {
  const worker = new Worker(new URL(import.meta.url), {
    workerData: { level: level + 1 },
  });
}

if (level === 0) {
  process.on('workerMessage', (value, source) => {
    console.log(`${source} -> ${threadId}:`, value);
    postMessageToThread(source, { message: 'pong' });
  });
} else if (level === 10) {
  process.on('workerMessage', (value, source) => {
    console.log(`${source} -> ${threadId}:`, value);
    channel.postMessage('done');
    channel.close();
  });

  await postMessageToThread(0, { message: 'ping' });
}

channel.onmessage = channel.close;'use strict';

const process = require('node:process');
const {
  postMessageToThread,
  threadId,
  workerData,
  Worker,
} = require('node:worker_threads');

const channel = new BroadcastChannel('sync');
const level = workerData?.level ?? 0;

if (level < 10) {
  const worker = new Worker(__filename, {
    workerData: { level: level + 1 },
  });
}

if (level === 0) {
  process.on('workerMessage', (value, source) => {
    console.log(`${source} -> ${threadId}:`, value);
    postMessageToThread(source, { message: 'pong' });
  });
} else if (level === 10) {
  process.on('workerMessage', (value, source) => {
    console.log(`${source} -> ${threadId}:`, value);
    channel.postMessage('done');
    channel.close();
  });

  postMessageToThread(0, { message: 'ping' });
}

channel.onmessage = channel.close;

worker_threads.receiveMessageOnPort(port)#

從給定的 MessagePort 接收單條訊息。如果沒有可用訊息,則返回 undefined,否則返回一個帶有名為 message 的單一屬性的物件,該屬性包含訊息的有效負載,對應於 MessagePort 佇列中最舊的訊息。

import { MessageChannel, receiveMessageOnPort } from 'node:worker_threads';
const { port1, port2 } = new MessageChannel();
port1.postMessage({ hello: 'world' });

console.log(receiveMessageOnPort(port2));
// Prints: { message: { hello: 'world' } }
console.log(receiveMessageOnPort(port2));
// Prints: undefined'use strict';

const { MessageChannel, receiveMessageOnPort } = require('node:worker_threads');
const { port1, port2 } = new MessageChannel();
port1.postMessage({ hello: 'world' });

console.log(receiveMessageOnPort(port2));
// Prints: { message: { hello: 'world' } }
console.log(receiveMessageOnPort(port2));
// Prints: undefined

當使用此函式時,不會觸發 'message' 事件,也不會呼叫 onmessage 監聽器。

worker_threads.resourceLimits#

提供此工作執行緒內部的 JS 引擎資源約束集。如果 resourceLimits 選項已傳遞給 Worker 建構函式,則此值與其值匹配。

如果在主執行緒中使用此屬性,其值為空物件。

worker_threads.SHARE_ENV#

一個特殊值,可以作為 Worker 建構函式的 env 選項傳遞,以表明當前執行緒和工作執行緒應共享對同一組環境變數的讀寫訪問許可權。

import process from 'node:process';
import { Worker, SHARE_ENV } from 'node:worker_threads';
new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })
  .on('exit', () => {
    console.log(process.env.SET_IN_WORKER);  // Prints 'foo'.
  });'use strict';

const { Worker, SHARE_ENV } = require('node:worker_threads');
new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })
  .on('exit', () => {
    console.log(process.env.SET_IN_WORKER);  // Prints 'foo'.
  });

worker_threads.setEnvironmentData(key[, value])#

  • key <any> 任何可用作 <Map> 鍵的任意、可克隆的 JavaScript 值。
  • value <any> 任何任意的、可克隆的 JavaScript 值,它將被克隆並自動傳遞給所有新的 Worker 例項。如果 value 作為 undefined 傳遞,則將刪除之前為 key 設定的任何值。

worker.setEnvironmentData() API 設定當前執行緒和從當前上下文派生的所有新 Worker 例項中 worker.getEnvironmentData() 的內容。

worker_threads.threadId#

當前執行緒的整數識別符號。在相應的工作執行緒物件上(如果有),它作為 worker.threadId 可用。此值對於單個程序內的每個 Worker 例項都是唯一的。

worker_threads.threadName#

當前執行緒的字串識別符號,如果執行緒未執行則為 null。在相應的工作執行緒物件上(如果有),它作為 worker.threadName 可用。

worker_threads.workerData#

一個任意的 JavaScript 值,其中包含傳遞給該執行緒的 Worker 建構函式的資料的克隆。

資料是根據 HTML 結構化克隆演算法,如同使用 postMessage() 一樣被克隆的。

import { Worker, isMainThread, workerData } from 'node:worker_threads';

if (isMainThread) {
  const worker = new Worker(new URL(import.meta.url), { workerData: 'Hello, world!' });
} else {
  console.log(workerData);  // Prints 'Hello, world!'.
}'use strict';

const { Worker, isMainThread, workerData } = require('node:worker_threads');

if (isMainThread) {
  const worker = new Worker(__filename, { workerData: 'Hello, world!' });
} else {
  console.log(workerData);  // Prints 'Hello, world!'.
}

worker_threads.locks#

穩定性:1 - 實驗性

一個 LockManager 例項,可用於協調對同一程序內多個執行緒可能共享的資源的訪問。該 API 模仿了 瀏覽器 LockManager 的語義。

類:Lock#

Lock 介面提供有關透過 locks.request() 授予的鎖的資訊。

lock.name#

鎖的名稱。

lock.mode#

鎖的模式。可以是 sharedexclusive

類:LockManager#

LockManager 介面提供了請求和內省鎖的方法。要獲取一個 LockManager 例項,請使用:

import { locks } from 'node:worker_threads';'use strict';

const { locks } = require('node:worker_threads');

此實現與 瀏覽器 LockManager API 匹配。

locks.request(name[, options], callback)#
  • name <string>
  • options <Object>
    • mode <string> 可以是 'exclusive''shared'預設值: 'exclusive'
    • ifAvailable <boolean> 如果為 true,則僅當鎖未被持有時才會授予請求。如果無法授予,callback 將以 null 而非 Lock 例項被呼叫。預設值: false
    • steal <boolean> 如果為 true,則任何同名的現有鎖都將被釋放,並且請求會立即被授予,搶佔任何排隊的請求。預設值: false
    • signal <AbortSignal> 可用於中止一個待處理(但尚未授予)的鎖請求。
  • callback <Function> 在鎖被授予後(或者如果 ifAvailabletrue 且鎖不可用時,立即以 null 呼叫)被呼叫一次。當函式返回時,鎖會自動釋放;或者——如果函式返回一個 promise——當該 promise 解決時釋放。
  • 返回:<Promise> 在鎖被釋放後解析。
import { locks } from 'node:worker_threads';

await locks.request('my_resource', async (lock) => {
  // The lock has been acquired.
});
// The lock has been released here.'use strict';

const { locks } = require('node:worker_threads');

locks.request('my_resource', async (lock) => {
  // The lock has been acquired.
}).then(() => {
  // The lock has been released here.
});
locks.query()#

解析為一個 LockManagerSnapshot,描述當前程序中當前持有和待處理的鎖。

import { locks } from 'node:worker_threads';

const snapshot = await locks.query();
for (const lock of snapshot.held) {
  console.log(`held lock: name ${lock.name}, mode ${lock.mode}`);
}
for (const pending of snapshot.pending) {
  console.log(`pending lock: name ${pending.name}, mode ${pending.mode}`);
}'use strict';

const { locks } = require('node:worker_threads');

locks.query().then((snapshot) => {
  for (const lock of snapshot.held) {
    console.log(`held lock: name ${lock.name}, mode ${lock.mode}`);
  }
  for (const pending of snapshot.pending) {
    console.log(`pending lock: name ${pending.name}, mode ${pending.mode}`);
  }
});

類:BroadcastChannel extends EventTarget#

BroadcastChannel 的例項允許與繫結到相同頻道名稱的所有其他 BroadcastChannel 例項進行非同步的一對多通訊。

import {
  isMainThread,
  BroadcastChannel,
  Worker,
} from 'node:worker_threads';

const bc = new BroadcastChannel('hello');

if (isMainThread) {
  let c = 0;
  bc.onmessage = (event) => {
    console.log(event.data);
    if (++c === 10) bc.close();
  };
  for (let n = 0; n < 10; n++)
    new Worker(new URL(import.meta.url));
} else {
  bc.postMessage('hello from every worker');
  bc.close();
}'use strict';

const {
  isMainThread,
  BroadcastChannel,
  Worker,
} = require('node:worker_threads');

const bc = new BroadcastChannel('hello');

if (isMainThread) {
  let c = 0;
  bc.onmessage = (event) => {
    console.log(event.data);
    if (++c === 10) bc.close();
  };
  for (let n = 0; n < 10; n++)
    new Worker(__filename);
} else {
  bc.postMessage('hello from every worker');
  bc.close();
}

new BroadcastChannel(name)#

  • name <any> 要連線的頻道名稱。任何可以使用 `${name}` 轉換為字串的 JavaScript 值都是允許的。

broadcastChannel.close()#

關閉 BroadcastChannel 連線。

broadcastChannel.onmessage#

  • 型別:<Function> 當收到訊息時,以單個 MessageEvent 引數呼叫。

broadcastChannel.onmessageerror#

  • 型別:<Function> 當接收到的訊息無法反序列化時呼叫。

broadcastChannel.postMessage(message)#

  • message <any> 任何可克隆的 JavaScript 值。

broadcastChannel.ref()#

unref() 相反。在一個先前被 unref() 的 BroadcastChannel 上呼叫 ref() 並*不*會讓程式在它是唯一活動控制代碼的情況下退出(預設行為)。如果埠已經被 ref(),再次呼叫 ref() 沒有效果。

broadcastChannel.unref()#

在一個 BroadcastChannel 上呼叫 unref() 允許執行緒在它是事件系統中唯一活動控制代碼時退出。如果 BroadcastChannel 已經被 unref(),再次呼叫 unref() 沒有效果。

類:MessageChannel#

worker.MessageChannel 類的例項表示一個非同步的雙向通訊通道。MessageChannel 本身沒有方法。new MessageChannel() 會產生一個帶有 port1port2 屬性的物件,這兩個屬性引用了相互連結的 MessagePort 例項。

import { MessageChannel } from 'node:worker_threads';

const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log('received', message));
port2.postMessage({ foo: 'bar' });
// Prints: received { foo: 'bar' } from the `port1.on('message')` listener'use strict';

const { MessageChannel } = require('node:worker_threads');

const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log('received', message));
port2.postMessage({ foo: 'bar' });
// Prints: received { foo: 'bar' } from the `port1.on('message')` listener

類:MessagePort#

worker.MessagePort 類的例項代表一個非同步、雙向通訊通道的一端。它可以用來在不同的 Worker 之間傳輸結構化資料、記憶體區域和其他 MessagePort

此實現與瀏覽器 MessagePort 匹配。

事件:'close'#

一旦通道的任何一方斷開連線,就會發出 'close' 事件。

import { MessageChannel } from 'node:worker_threads';
const { port1, port2 } = new MessageChannel();

// Prints:
//   foobar
//   closed!
port2.on('message', (message) => console.log(message));
port2.on('close', () => console.log('closed!'));

port1.postMessage('foobar');
port1.close();'use strict';

const { MessageChannel } = require('node:worker_threads');
const { port1, port2 } = new MessageChannel();

// Prints:
//   foobar
//   closed!
port2.on('message', (message) => console.log(message));
port2.on('close', () => console.log('closed!'));

port1.postMessage('foobar');
port1.close();

事件:'message'#

  • value <any> 傳輸的值

對於任何傳入的訊息,都會發出 'message' 事件,其中包含 port.postMessage() 的克隆輸入。

此事件的監聽器接收傳遞給 postMessage()value 引數的克隆,沒有其他引數。

事件:'messageerror'#

當反序列化訊息失敗時,會發出 'messageerror' 事件。

目前,當在接收端例項化釋出的 JS 物件時發生錯誤,會發出此事件。這種情況很少見,但可能會發生,例如,當在 vm.Context 中接收到某些 Node.js API 物件時(目前 Node.js API 在其中不可用)。

port.close()#

停用連線雙方進一步傳送訊息。當不再透過此 MessagePort 進行通訊時,可以呼叫此方法。

在作為通道一部分的兩個 MessagePort 例項上都會發出 'close' 事件

port.postMessage(value[, transferList])#

向此通道的接收方傳送一個 JavaScript 值。value 的傳輸方式與 HTML 結構化克隆演算法相容。

特別是,與 JSON 的顯著區別是:

import { MessageChannel } from 'node:worker_threads';
const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => console.log(message));

const circularData = {};
circularData.foo = circularData;
// Prints: { foo: [Circular] }
port2.postMessage(circularData);'use strict';

const { MessageChannel } = require('node:worker_threads');
const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => console.log(message));

const circularData = {};
circularData.foo = circularData;
// Prints: { foo: [Circular] }
port2.postMessage(circularData);

transferList 可以是 <ArrayBuffer>MessagePortFileHandle 物件的列表。傳輸後,它們在通道的傳送端將不再可用(即使它們不包含在 value 中)。與子程序不同,目前不支援傳輸網路套接字等控制代碼。

如果 value 包含 <SharedArrayBuffer> 例項,這些例項可以從任一執行緒訪問。它們不能在 transferList 中列出。

value 仍可能包含不在 transferList 中的 ArrayBuffer 例項;在這種情況下,底層記憶體會被複制而不是移動。

import { MessageChannel } from 'node:worker_threads';
const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => console.log(message));

const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]);
// This posts a copy of `uint8Array`:
port2.postMessage(uint8Array);
// This does not copy data, but renders `uint8Array` unusable:
port2.postMessage(uint8Array, [ uint8Array.buffer ]);

// The memory for the `sharedUint8Array` is accessible from both the
// original and the copy received by `.on('message')`:
const sharedUint8Array = new Uint8Array(new SharedArrayBuffer(4));
port2.postMessage(sharedUint8Array);

// This transfers a freshly created message port to the receiver.
// This can be used, for example, to create communication channels between
// multiple `Worker` threads that are children of the same parent thread.
const otherChannel = new MessageChannel();
port2.postMessage({ port: otherChannel.port1 }, [ otherChannel.port1 ]);'use strict';

const { MessageChannel } = require('node:worker_threads');
const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => console.log(message));

const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]);
// This posts a copy of `uint8Array`:
port2.postMessage(uint8Array);
// This does not copy data, but renders `uint8Array` unusable:
port2.postMessage(uint8Array, [ uint8Array.buffer ]);

// The memory for the `sharedUint8Array` is accessible from both the
// original and the copy received by `.on('message')`:
const sharedUint8Array = new Uint8Array(new SharedArrayBuffer(4));
port2.postMessage(sharedUint8Array);

// This transfers a freshly created message port to the receiver.
// This can be used, for example, to create communication channels between
// multiple `Worker` threads that are children of the same parent thread.
const otherChannel = new MessageChannel();
port2.postMessage({ port: otherChannel.port1 }, [ otherChannel.port1 ]);

訊息物件會立即被克隆,傳送後可以修改而不會產生副作用。

有關此 API 背後的序列化和反序列化機制的更多資訊,請參閱 node:v8 模組的序列化 API

傳輸 TypedArray 和 Buffer 時的注意事項#

所有 <TypedArray> | <Buffer> 例項都是底層 <ArrayBuffer> 的檢視。也就是說,實際儲存原始資料的是 ArrayBuffer,而 TypedArrayBuffer 物件提供了一種檢視和操作資料的方式。在同一個 ArrayBuffer 例項上建立多個檢視是可能且常見的。使用傳輸列表傳輸 ArrayBuffer 時必須非常小心,因為這樣做會導致所有共享該 ArrayBufferTypedArrayBuffer 例項變得不可用。

const ab = new ArrayBuffer(10);

const u1 = new Uint8Array(ab);
const u2 = new Uint16Array(ab);

console.log(u2.length);  // prints 5

port.postMessage(u1, [u1.buffer]);

console.log(u2.length);  // prints 0 

特別是對於 Buffer 例項,其底層的 ArrayBuffer 是否可以被傳輸或克隆完全取決於例項的建立方式,而這通常無法可靠地確定。

一個 ArrayBuffer 可以用 markAsUntransferable() 標記,以表明它應始終被克隆而不是傳輸。

根據 Buffer 例項的建立方式,它可能擁有也可能不擁有其底層的 ArrayBuffer。除非已知 Buffer 例項擁有它,否則不應傳輸 ArrayBuffer。特別是,對於從內部 Buffer 池建立的 Buffer(例如使用 Buffer.from()Buffer.allocUnsafe()),傳輸它們是不可能的,它們總是被克隆,這會發送整個 Buffer 池的副本。這種行為可能會帶來意想不到的更高記憶體使用和潛在的安全問題。

有關 Buffer 池的更多詳細資訊,請參見 Buffer.allocUnsafe()

使用 Buffer.alloc()Buffer.allocUnsafeSlow() 建立的 Buffer 例項的 ArrayBuffer 總是可以被傳輸,但這樣做會使這些 ArrayBuffer 的所有其他現有檢視變得不可用。

克隆帶有原型、類和訪問器的物件時的注意事項#

因為物件克隆使用 HTML 結構化克隆演算法,不可列舉的屬性、屬性訪問器和物件原型不會被保留。特別是,<Buffer> 物件在接收端將被讀取為普通的 <Uint8Array>,而 JavaScript 類的例項將被克隆為普通的 JavaScript 物件。

const b = Symbol('b');

class Foo {
  #a = 1;
  constructor() {
    this[b] = 2;
    this.c = 3;
  }

  get d() { return 4; }
}

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => console.log(data);

port2.postMessage(new Foo());

// Prints: { c: 3 } 

這個限制擴充套件到了許多內建物件,比如全域性的 URL 物件:

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => console.log(data);

port2.postMessage(new URL('https://example.org'));

// Prints: { } 

port.hasRef()#

如果為 true,MessagePort 物件將保持 Node.js 事件迴圈的活動狀態。

port.ref()#

unref() 相反。在一個先前被 unref() 的埠上呼叫 ref() 並*不*讓程式在它是唯一活動控制代碼的情況下退出(預設行為)。如果埠已經被 ref(),再次呼叫 ref() 沒有效果。

如果使用 .on('message') 新增或移除監聽器,埠會根據事件監聽器是否存在而自動進行 ref()unref() 操作。

port.start()#

開始在此 MessagePort 上接收訊息。當將此埠用作事件發射器時,一旦附加了 'message' 監聽器,此方法會自動呼叫。

此方法的存在是為了與 Web MessagePort API 保持一致。在 Node.js 中,它僅用於在沒有事件監聽器時忽略訊息。Node.js 在處理 .onmessage 方面也存在差異。設定它會自動呼叫 .start(),但取消設定它會讓訊息排隊,直到設定了新的處理程式或埠被丟棄。

port.unref()#

在一個埠上呼叫 unref() 允許執行緒在它是事件系統中唯一活動控制代碼時退出。如果埠已經被 unref(),再次呼叫 unref() 沒有效果。

如果使用 .on('message') 新增或移除監聽器,埠會根據事件監聽器是否存在而自動進行 ref()unref() 操作。

類:Worker#

Worker 類代表一個獨立的 JavaScript 執行執行緒。大多數 Node.js API 在其中都可用。

在 Worker 環境中的顯著差異是:

可以在其他 Worker 內部建立 Worker 例項。

Web Workersnode:cluster 模組一樣,可以透過執行緒間訊息傳遞實現雙向通訊。在內部,一個 Worker 有一對內建的 MessagePort,在 Worker 建立時就已經相互關聯。雖然父端的 MessagePort 物件沒有直接暴露,但其功能透過 worker.postMessage() 和父執行緒 Worker 物件上的 worker.on('message') 事件暴露出來。

要建立自定義的訊息通道(鼓勵使用,因為它有助於關注點分離),使用者可以在任一執行緒上建立一個 MessageChannel 物件,並透過一個預先存在的通道(如全域性通道)將該 MessageChannel 上的一個 MessagePort 傳遞給另一個執行緒。

有關訊息如何傳遞以及哪些型別的 JavaScript 值可以成功地跨越執行緒屏障的更多資訊,請參閱 port.postMessage()

import assert from 'node:assert';
import {
  Worker, MessageChannel, MessagePort, isMainThread, parentPort,
} from 'node:worker_threads';
if (isMainThread) {
  const worker = new Worker(new URL(import.meta.url));
  const subChannel = new MessageChannel();
  worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
  subChannel.port2.on('message', (value) => {
    console.log('received:', value);
  });
} else {
  parentPort.once('message', (value) => {
    assert(value.hereIsYourPort instanceof MessagePort);
    value.hereIsYourPort.postMessage('the worker is sending this');
    value.hereIsYourPort.close();
  });
}'use strict';

const assert = require('node:assert');
const {
  Worker, MessageChannel, MessagePort, isMainThread, parentPort,
} = require('node:worker_threads');
if (isMainThread) {
  const worker = new Worker(__filename);
  const subChannel = new MessageChannel();
  worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
  subChannel.port2.on('message', (value) => {
    console.log('received:', value);
  });
} else {
  parentPort.once('message', (value) => {
    assert(value.hereIsYourPort instanceof MessagePort);
    value.hereIsYourPort.postMessage('the worker is sending this');
    value.hereIsYourPort.close();
  });
}

new Worker(filename[, options])#

  • filename <string> | <URL> 工作執行緒主指令碼或模組的路徑。必須是絕對路徑或以 ./../ 開頭的相對路徑(即相對於當前工作目錄),或者是一個使用 file:data: 協議的 WHATWG URL 物件。當使用 data: URL 時,資料會使用 ECMAScript 模組載入器根據 MIME 型別進行解釋。如果 options.evaltrue,則這是一個包含 JavaScript 程式碼的字串,而不是一個路徑。
  • options <Object>
    • argv <any[]> 引數列表,這些引數將被字串化並附加到工作執行緒中的 process.argv。這在很大程度上類似於 workerData,但這些值在全域性 process.argv 上可用,就好像它們是作為命令列選項傳遞給指令碼一樣。
    • env <Object> 如果設定,指定工作執行緒內部 process.env 的初始值。作為一個特殊值,可以使用 worker.SHARE_ENV 來指定父執行緒和子執行緒應共享它們的環境變數;在這種情況下,對一個執行緒的 process.env 物件的更改也會影響另一個執行緒。預設值:process.env
    • eval <boolean> 如果為 true 且第一個引數是 string,則將建構函式的第一個引數解釋為在工作執行緒上線後執行的指令碼。
    • execArgv <string[]> 傳遞給工作執行緒的 node CLI 選項列表。不支援 V8 選項(如 --max-old-space-size)和影響程序的選項(如 --title)。如果設定,這將在工作執行緒內部作為 process.execArgv 提供。預設情況下,選項從父執行緒繼承。
    • stdin <boolean> 如果此項設定為 true,則 worker.stdin 提供一個可寫流,其內容在工作執行緒內顯示為 process.stdin。預設情況下,不提供資料。
    • stdout <boolean> 如果此項設定為 true,則 worker.stdout 不會自動透過管道傳輸到父程序的 process.stdout
    • stderr <boolean> 如果此項設定為 true,則 worker.stderr 不會自動透過管道傳輸到父程序的 process.stderr
    • workerData <any> 任何被克隆並作為 require('node:worker_threads').workerData 可用的 JavaScript 值。克隆過程如 HTML 結構化克隆演算法中所述,如果物件無法克隆(例如,因為它包含 function),則會丟擲錯誤。
    • trackUnmanagedFds <boolean> 如果設定為 true,則 Worker 會跟蹤透過 fs.open()fs.close() 管理的原始檔案描述符,並在 Worker 退出時關閉它們,類似於網路套接字或透過 FileHandle API 管理的檔案描述符等其他資源。此選項會自動被所有巢狀的 Worker 繼承。預設值:true
    • transferList <Object[]> 如果在 workerData 中傳遞了一個或多個類 MessagePort 的物件,則需要為這些專案提供一個 transferList,否則會丟擲 ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST 錯誤。更多資訊請參見 port.postMessage()
    • resourceLimits <Object> 新 JS 引擎例項的可選資源限制集。達到這些限制會導致 Worker 例項的終止。這些限制隻影響 JS 引擎,不影響包括 ArrayBuffers 在內的任何外部資料。即使設定了這些限制,如果程序遇到全域性記憶體不足的情況,仍可能中止。
      • maxOldGenerationSizeMb <number> 主堆的最大大小(MB)。如果設定了命令列引數 --max-old-space-size,它將覆蓋此設定。
      • maxYoungGenerationSizeMb <number> 用於最近建立物件的堆空間的最大大小。如果設定了命令列引數 --max-semi-space-size,它將覆蓋此設定。
      • codeRangeSizeMb <number> 用於生成程式碼的預分配記憶體範圍的大小。
      • stackSizeMb <number> 執行緒的預設最大堆疊大小。較小的值可能導致 Worker 例項不可用。預設值: 4
    • name <string> 一個可選的 name,用於替換執行緒名稱和工作執行緒標題,以用於除錯/識別目的,最終標題為 [worker ${id}] ${name}。此引數有最大允許大小,具體取決於作業系統。如果提供的名稱超出限制,它將被截斷。
      • 最大大小
        • Windows:32,767 個字元
        • macOS:64 個字元
        • Linux:16 個字元
        • NetBSD:受限於 PTHREAD_MAX_NAMELEN_NP
        • FreeBSD 和 OpenBSD:受限於 MAXCOMLEN 預設值: 'WorkerThread'

事件:'error'#

如果工作執行緒丟擲一個未捕獲的異常,就會發出 'error' 事件。在這種情況下,工作執行緒將被終止。

事件:'exit'#

一旦工作執行緒停止,就會發出 'exit' 事件。如果工作執行緒透過呼叫 process.exit() 退出,exitCode 引數是傳遞的退出碼。如果工作執行緒被終止,exitCode 引數是 1

這是任何 Worker 例項發出的最後一個事件。

事件:'message'#

  • value <any> 傳輸的值

當工作執行緒呼叫 require('node:worker_threads').parentPort.postMessage() 時,會發出 'message' 事件。更多詳情請參閱 port.on('message') 事件。

從工作執行緒傳送的所有訊息都會在 Worker 物件上發出 'exit' 事件之前發出。

事件:'messageerror'#

當反序列化訊息失敗時,會發出 'messageerror' 事件。

事件:'online'#

當工作執行緒開始執行 JavaScript 程式碼時,會發出 'online' 事件。

worker.cpuUsage([prev])#

此方法返回一個 Promise,它將解析為一個與 process.threadCpuUsage() 相同的物件,或者如果工作執行緒不再執行,則以 ERR_WORKER_NOT_RUNNING 錯誤拒絕。此方法允許從實際執行緒外部觀察統計資訊。

worker.getHeapSnapshot([options])#

  • options <Object>
    • exposeInternals <boolean> 如果為 true,則在堆快照中暴露內部資訊。預設值:false
    • exposeNumericValues <boolean> 如果為 true,則在人工欄位中暴露數值。預設值:false
  • 返回:<Promise> 一個包含 V8 堆快照的可讀流的 Promise

返回一個可讀流,用於獲取工作執行緒當前狀態的 V8 快照。更多詳情請參閱 v8.getHeapSnapshot()

如果工作執行緒不再執行,這可能在發出 'exit' 事件之前發生,返回的 Promise 將立即以 ERR_WORKER_NOT_RUNNING 錯誤拒絕。

worker.getHeapStatistics()#

此方法返回一個 Promise,它將解析為一個與 v8.getHeapStatistics() 相同的物件,或者如果工作執行緒不再執行,則以 ERR_WORKER_NOT_RUNNING 錯誤拒絕。此方法允許從實際執行緒外部觀察統計資訊。

worker.performance#

一個可用於從工作執行緒例項查詢效能資訊的物件。類似於 perf_hooks.performance

performance.eventLoopUtilization([utilization1[, utilization2]])#
  • utilization1 <Object> 上一次呼叫 eventLoopUtilization() 的結果。
  • utilization2 <Object>utilization1 之前上一次呼叫 eventLoopUtilization() 的結果。
  • 返回:<Object>

perf_hooks eventLoopUtilization() 的呼叫相同,只是返回的是工作執行緒例項的值。

一個區別是,與主執行緒不同,工作執行緒內的引導是在事件迴圈內完成的。因此,一旦工作執行緒的指令碼開始執行,事件迴圈利用率就立即可用。

一個不增加的 idle 時間並不表示工作執行緒卡在引導階段。以下示例顯示了工作執行緒的整個生命週期從未累積任何 idle 時間,但仍能處理訊息。

import { Worker, isMainThread, parentPort } from 'node:worker_threads';

if (isMainThread) {
  const worker = new Worker(new URL(import.meta.url));
  setInterval(() => {
    worker.postMessage('hi');
    console.log(worker.performance.eventLoopUtilization());
  }, 100).unref();
} else {
  parentPort.on('message', () => console.log('msg')).unref();
  (function r(n) {
    if (--n < 0) return;
    const t = Date.now();
    while (Date.now() - t < 300);
    setImmediate(r, n);
  })(10);
}'use strict';

const { Worker, isMainThread, parentPort } = require('node:worker_threads');

if (isMainThread) {
  const worker = new Worker(__filename);
  setInterval(() => {
    worker.postMessage('hi');
    console.log(worker.performance.eventLoopUtilization());
  }, 100).unref();
} else {
  parentPort.on('message', () => console.log('msg')).unref();
  (function r(n) {
    if (--n < 0) return;
    const t = Date.now();
    while (Date.now() - t < 300);
    setImmediate(r, n);
  })(10);
}

工作執行緒的事件迴圈利用率僅在發出 'online' 事件後可用,如果在此之前或在 'exit' 事件之後呼叫,則所有屬性的值都為 0

worker.postMessage(value[, transferList])#

向工作執行緒傳送一條訊息,該訊息透過 require('node:worker_threads').parentPort.on('message') 接收。更多詳情請參閱 port.postMessage()

worker.ref()#

unref() 相反,在一個先前被 unref() 的工作執行緒上呼叫 ref() 並*不*讓程式在它是唯一活動控制代碼的情況下退出(預設行為)。如果工作執行緒已經被 ref(),再次呼叫 ref() 沒有效果。

worker.resourceLimits#

提供此工作執行緒的 JS 引擎資源約束集。如果 resourceLimits 選項已傳遞給 Worker 建構函式,則此值與其值匹配。

如果工作執行緒已停止,則返回值為一個空物件。

worker.startCpuProfile()#

啟動 CPU 分析,然後返回一個 Promise,該 Promise 會兌現為一個錯誤或一個 CPUProfileHandle 物件。此 API 支援 await using 語法。

const { Worker } = require('node:worker_threads');

const worker = new Worker(`
  const { parentPort } = require('worker_threads');
  parentPort.on('message', () => {});
  `, { eval: true });

worker.on('online', async () => {
  const handle = await worker.startCpuProfile();
  const profile = await handle.stop();
  console.log(profile);
  worker.terminate();
}); 

await using 示例。

const { Worker } = require('node:worker_threads');

const w = new Worker(`
  const { parentPort } = require('node:worker_threads');
  parentPort.on('message', () => {});
  `, { eval: true });

w.on('online', async () => {
  // Stop profile automatically when return and profile will be discarded
  await using handle = await w.startCpuProfile();
}); 

worker.startHeapProfile()#

啟動堆分析,然後返回一個 Promise,該 Promise 會兌現為一個錯誤或一個 HeapProfileHandle 物件。此 API 支援 await using 語法。

const { Worker } = require('node:worker_threads');

const worker = new Worker(`
  const { parentPort } = require('worker_threads');
  parentPort.on('message', () => {});
  `, { eval: true });

worker.on('online', async () => {
  const handle = await worker.startHeapProfile();
  const profile = await handle.stop();
  console.log(profile);
  worker.terminate();
}); 

await using 示例。

const { Worker } = require('node:worker_threads');

const w = new Worker(`
  const { parentPort } = require('node:worker_threads');
  parentPort.on('message', () => {});
  `, { eval: true });

w.on('online', async () => {
  // Stop profile automatically when return and profile will be discarded
  await using handle = await w.startHeapProfile();
}); 

worker.stderr#

這是一個可讀流,其中包含寫入工作執行緒內部 process.stderr 的資料。如果沒有將 stderr: true 傳遞給 Worker 建構函式,那麼資料將被管道傳輸到父執行緒的 process.stderr 流。

worker.stdin#

如果將 stdin: true 傳遞給了 Worker 建構函式,那麼這是一個可寫流。寫入此流的資料將在工作執行緒中作為 process.stdin 可用。

worker.stdout#

這是一個可讀流,其中包含寫入工作執行緒內部 process.stdout 的資料。如果沒有將 stdout: true 傳遞給 Worker 建構函式,那麼資料將被管道傳輸到父執行緒的 process.stdout 流。

worker.terminate()#

儘快停止工作執行緒中的所有 JavaScript 執行。返回一個 Promise,該 Promise 在 'exit' 事件發出時兌現退出碼。

worker.threadId#

所引用執行緒的整數識別符號。在工作執行緒內部,它作為 require('node:worker_threads').threadId 可用。此值對於單個程序內的每個 Worker 例項都是唯一的。

worker.threadName#

所引用執行緒的字串識別符號,如果執行緒未執行則為 null。在工作執行緒內部,它作為 require('node:worker_threads').threadName 可用。

worker.unref()#

在一個工作執行緒上呼叫 unref() 允許執行緒在它是事件系統中唯一活動控制代碼時退出。如果工作執行緒已經被 unref(),再次呼叫 unref() 沒有效果。

worker[Symbol.asyncDispose]()#

當 dispose 作用域退出時,呼叫 worker.terminate()

async function example() {
  await using worker = new Worker('for (;;) {}', { eval: true });
  // Worker is automatically terminate when the scope is exited.
} 

注意#

stdio 的同步阻塞#

Worker 利用透過 <MessagePort> 的訊息傳遞來實現與 stdio 的互動。這意味著源自 Workerstdio 輸出可能會被接收端的同步程式碼阻塞,該程式碼阻塞了 Node.js 事件迴圈。

import {
  Worker,
  isMainThread,
} from 'node:worker_threads';

if (isMainThread) {
  new Worker(new URL(import.meta.url));
  for (let n = 0; n < 1e10; n++) {
    // Looping to simulate work.
  }
} else {
  // This output will be blocked by the for loop in the main thread.
  console.log('foo');
}'use strict';

const {
  Worker,
  isMainThread,
} = require('node:worker_threads');

if (isMainThread) {
  new Worker(__filename);
  for (let n = 0; n < 1e10; n++) {
    // Looping to simulate work.
  }
} else {
  // This output will be blocked by the for loop in the main thread.
  console.log('foo');
}

從預載入指令碼啟動工作執行緒#

從預載入指令碼(使用 -r 命令列標誌載入並執行的指令碼)啟動工作執行緒時要小心。除非明確設定了 execArgv 選項,否則新的工作執行緒會自動繼承正在執行的程序的命令列標誌,並將預載入與主執行緒相同的預載入指令碼。如果預載入指令碼無條件地啟動一個工作執行緒,那麼每個派生的執行緒都會派生另一個執行緒,直到應用程式崩潰。