Node.js v25.0.0 文件
- Node.js v25.0.0
-
目錄
- 非同步上下文跟蹤
- 介紹
- 類:
AsyncLocalStoragenew AsyncLocalStorage([options])- 靜態方法:
AsyncLocalStorage.bind(fn) - 靜態方法:
AsyncLocalStorage.snapshot() asyncLocalStorage.disable()asyncLocalStorage.getStore()asyncLocalStorage.enterWith(store)asyncLocalStorage.nameasyncLocalStorage.run(store, callback[, ...args])asyncLocalStorage.exit(callback[, ...args])- 與
async/await一起使用 - 問題排查:上下文丟失
- 類:
AsyncResourcenew AsyncResource(type[, options])- 靜態方法:
AsyncResource.bind(fn[, type[, thisArg]]) asyncResource.bind(fn[, thisArg])asyncResource.runInAsyncScope(fn[, thisArg, ...args])asyncResource.emitDestroy()asyncResource.asyncId()asyncResource.triggerAsyncId()- 為
Worker執行緒池使用AsyncResource - 將
AsyncResource與EventEmitter整合
- 非同步上下文跟蹤
-
索引
- 斷言測試
- 非同步上下文跟蹤
- 非同步鉤子
- 緩衝區
- C++ 外掛
- 使用 Node-API 的 C/C++ 外掛
- C++ 嵌入器 API
- 子程序
- 叢集
- 命令列選項
- 控制檯
- 加密
- 偵錯程式
- 已棄用的 API
- 診斷通道
- DNS
- 域
- 環境變數
- 錯誤
- 事件
- 檔案系統
- 全域性物件
- HTTP
- HTTP/2
- HTTPS
- 檢查器
- 國際化
- 模組:CommonJS 模組
- 模組:ECMAScript 模組
- 模組:
node:moduleAPI - 模組:包
- 模組:TypeScript
- 網路
- 作業系統
- 路徑
- 效能鉤子
- 許可權
- 程序
- Punycode
- 查詢字串
- 逐行讀取
- REPL
- 報告
- 單一可執行檔案應用
- SQLite
- 流
- 字串解碼器
- 測試執行器
- 定時器
- TLS/SSL
- 跟蹤事件
- TTY
- UDP/資料報
- URL
- 實用工具
- V8
- 虛擬機器
- WASI
- Web Crypto API
- Web Streams API
- 工作執行緒
- Zlib
- 其他版本
- 選項
非同步上下文跟蹤#
原始碼: lib/async_hooks.js
介紹#
這些類用於關聯狀態並在回撥函式和 Promise 鏈中傳播。它們允許在 Web 請求或任何其他非同步持續時間的生命週期記憶體儲資料。這與其他語言中的執行緒區域性儲存(thread-local storage)類似。
AsyncLocalStorage 和 AsyncResource 類是 node:async_hooks 模組的一部分。
import { AsyncLocalStorage, AsyncResource } from 'node:async_hooks';const { AsyncLocalStorage, AsyncResource } = require('node:async_hooks');
類:AsyncLocalStorage#
這個類建立的儲存在非同步操作中保持一致。
雖然你可以在 node:async_hooks 模組之上構建自己的實現,但應優先選擇 AsyncLocalStorage,因為它是一個高效能且記憶體安全的實現,其中包含了許多不易實現的顯著最佳化。
以下示例使用 AsyncLocalStorage 構建一個簡單的日誌記錄器,它為傳入的 HTTP 請求分配 ID,並將這些 ID 包含在每個請求中記錄的訊息裡。
import http from 'node:http';
import { AsyncLocalStorage } from 'node:async_hooks';
const asyncLocalStorage = new AsyncLocalStorage();
function logWithId(msg) {
const id = asyncLocalStorage.getStore();
console.log(`${id !== undefined ? id : '-'}:`, msg);
}
let idSeq = 0;
http.createServer((req, res) => {
asyncLocalStorage.run(idSeq++, () => {
logWithId('start');
// Imagine any chain of async operations here
setImmediate(() => {
logWithId('finish');
res.end();
});
});
}).listen(8080);
http.get('https://:8080');
http.get('https://:8080');
// Prints:
// 0: start
// 0: finish
// 1: start
// 1: finishconst http = require('node:http');
const { AsyncLocalStorage } = require('node:async_hooks');
const asyncLocalStorage = new AsyncLocalStorage();
function logWithId(msg) {
const id = asyncLocalStorage.getStore();
console.log(`${id !== undefined ? id : '-'}:`, msg);
}
let idSeq = 0;
http.createServer((req, res) => {
asyncLocalStorage.run(idSeq++, () => {
logWithId('start');
// Imagine any chain of async operations here
setImmediate(() => {
logWithId('finish');
res.end();
});
});
}).listen(8080);
http.get('https://:8080');
http.get('https://:8080');
// Prints:
// 0: start
// 0: finish
// 1: start
// 1: finish
每個 AsyncLocalStorage 例項都維護一個獨立的儲存上下文。多個例項可以安全地同時存在,而不會有互相干擾資料的風險。
new AsyncLocalStorage([options])#
建立一個新的 AsyncLocalStorage 例項。儲存僅在 run() 呼叫內部或 enterWith() 呼叫之後提供。
靜態方法:AsyncLocalStorage.bind(fn)#
fn<Function> 要繫結到當前執行上下文的函式。- 返回:<Function> 一個新函式,它在捕獲的執行上下文中呼叫
fn。
將給定的函式繫結到當前的執行上下文。
靜態方法:AsyncLocalStorage.snapshot()#
- 返回:<Function> 一個簽名為
(fn: (...args) : R, ...args) : R的新函式。
捕獲當前的執行上下文並返回一個接受函式作為引數的函式。每當呼叫返回的函式時,它將在捕獲的上下文中呼叫傳遞給它的函式。
const asyncLocalStorage = new AsyncLocalStorage();
const runInAsyncScope = asyncLocalStorage.run(123, () => AsyncLocalStorage.snapshot());
const result = asyncLocalStorage.run(321, () => runInAsyncScope(() => asyncLocalStorage.getStore()));
console.log(result); // returns 123
AsyncLocalStorage.snapshot() 可以替代 AsyncResource 用於簡單的非同步上下文跟蹤,例如:
class Foo {
#runInAsyncScope = AsyncLocalStorage.snapshot();
get() { return this.#runInAsyncScope(() => asyncLocalStorage.getStore()); }
}
const foo = asyncLocalStorage.run(123, () => new Foo());
console.log(asyncLocalStorage.run(321, () => foo.get())); // returns 123
asyncLocalStorage.disable()#
停用 AsyncLocalStorage 例項。所有後續對 asyncLocalStorage.getStore() 的呼叫都將返回 undefined,直到再次呼叫 asyncLocalStorage.run() 或 asyncLocalStorage.enterWith()。
呼叫 asyncLocalStorage.disable() 時,所有與該例項關聯的當前上下文都將退出。
在 asyncLocalStorage 可以被垃圾回收之前,必須呼叫 asyncLocalStorage.disable()。這不適用於 asyncLocalStorage 提供的儲存,因為這些物件會與相應的非同步資源一起被垃圾回收。
當 asyncLocalStorage 在當前程序中不再使用時,請使用此方法。
asyncLocalStorage.getStore()#
- 返回:<any>
返回當前儲存。如果在透過呼叫 asyncLocalStorage.run() 或 asyncLocalStorage.enterWith() 初始化的非同步上下文之外呼叫,則返回 undefined。
asyncLocalStorage.enterWith(store)#
store<any>
在當前同步執行的剩餘部分進入該上下文,然後在任何後續的非同步呼叫中持久化該儲存。
示例
const store = { id: 1 };
// Replaces previous store with the given store object
asyncLocalStorage.enterWith(store);
asyncLocalStorage.getStore(); // Returns the store object
someAsyncOperation(() => {
asyncLocalStorage.getStore(); // Returns the same object
});
這種轉換將持續整個同步執行過程。這意味著,例如,如果在一個事件處理程式中進入了上下文,那麼後續的事件處理程式也將在該上下文中執行,除非使用 AsyncResource 將它們明確繫結到另一個上下文。這就是為什麼應該優先使用 run() 而不是 enterWith(),除非有充分的理由使用後一種方法。
const store = { id: 1 };
emitter.on('my-event', () => {
asyncLocalStorage.enterWith(store);
});
emitter.on('my-event', () => {
asyncLocalStorage.getStore(); // Returns the same object
});
asyncLocalStorage.getStore(); // Returns undefined
emitter.emit('my-event');
asyncLocalStorage.getStore(); // Returns the same object
asyncLocalStorage.run(store, callback[, ...args])#
store<any>callback<Function>...args<any>
在一個上下文中同步執行一個函式並返回其返回值。儲存在回撥函式之外是不可訪問的。儲存對於在回撥函式內建立的任何非同步操作都是可訪問的。
可選的 args 會被傳遞給回撥函式。
如果回撥函式丟擲錯誤,run() 也會丟擲該錯誤。呼叫堆疊不會受此呼叫的影響,並且上下文會被退出。
示例
const store = { id: 2 };
try {
asyncLocalStorage.run(store, () => {
asyncLocalStorage.getStore(); // Returns the store object
setTimeout(() => {
asyncLocalStorage.getStore(); // Returns the store object
}, 200);
throw new Error();
});
} catch (e) {
asyncLocalStorage.getStore(); // Returns undefined
// The error will be caught here
}
asyncLocalStorage.exit(callback[, ...args])#
callback<Function>...args<any>
在一個上下文之外同步執行一個函式並返回其返回值。儲存在回撥函式或回撥函式內建立的非同步操作中是不可訪問的。在回撥函式內進行的任何 getStore() 呼叫都將始終返回 undefined。
可選的 args 會被傳遞給回撥函式。
如果回撥函式丟擲錯誤,exit() 也會丟擲該錯誤。呼叫堆疊不會受此呼叫的影響,並且上下文會被重新進入。
示例
// Within a call to run
try {
asyncLocalStorage.getStore(); // Returns the store object or value
asyncLocalStorage.exit(() => {
asyncLocalStorage.getStore(); // Returns undefined
throw new Error();
});
} catch (e) {
asyncLocalStorage.getStore(); // Returns the same object or value
// The error will be caught here
}
與 async/await 一起使用#
如果在一個非同步函式中,只有一個 await 呼叫需要在某個上下文中執行,應使用以下模式:
async function fn() {
await asyncLocalStorage.run(new Map(), () => {
asyncLocalStorage.getStore().set('key', value);
return foo(); // The return value of foo will be awaited
});
}
在這個例子中,儲存僅在回撥函式和被 foo 呼叫的函式中可用。在 run 之外呼叫 getStore 將返回 undefined。
問題排查:上下文丟失#
在大多數情況下,AsyncLocalStorage 都能正常工作。在極少數情況下,當前儲存會在某個非同步操作中丟失。
如果你的程式碼是基於回撥的,使用 util.promisify() 將其 Promise 化就足夠了,這樣它就能與原生 Promise 一起工作。
如果你需要使用基於回撥的 API,或者你的程式碼假定了一種自定義的 thenable 實現,請使用 AsyncResource 類將非同步操作與正確的執行上下文關聯起來。透過在你懷疑導致上下文丟失的呼叫之後記錄 asyncLocalStorage.getStore() 的內容,來找到導致上下文丟失的函式呼叫。當代碼記錄 undefined 時,最後被呼叫的那個回撥函式很可能就是導致上下文丟失的原因。
類:AsyncResource#
AsyncResource 類被設計為由嵌入者的非同步資源來擴充套件。透過這種方式,使用者可以輕鬆地觸發他們自己資源的生命週期事件。
當一個 AsyncResource 被例項化時,init 鉤子將被觸發。
以下是 AsyncResource API 的概述。
import { AsyncResource, executionAsyncId } from 'node:async_hooks';
// AsyncResource() is meant to be extended. Instantiating a
// new AsyncResource() also triggers init. If triggerAsyncId is omitted then
// async_hook.executionAsyncId() is used.
const asyncResource = new AsyncResource(
type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false },
);
// Run a function in the execution context of the resource. This will
// * establish the context of the resource
// * trigger the AsyncHooks before callbacks
// * call the provided function `fn` with the supplied arguments
// * trigger the AsyncHooks after callbacks
// * restore the original execution context
asyncResource.runInAsyncScope(fn, thisArg, ...args);
// Call AsyncHooks destroy callbacks.
asyncResource.emitDestroy();
// Return the unique ID assigned to the AsyncResource instance.
asyncResource.asyncId();
// Return the trigger ID for the AsyncResource instance.
asyncResource.triggerAsyncId();const { AsyncResource, executionAsyncId } = require('node:async_hooks');
// AsyncResource() is meant to be extended. Instantiating a
// new AsyncResource() also triggers init. If triggerAsyncId is omitted then
// async_hook.executionAsyncId() is used.
const asyncResource = new AsyncResource(
type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false },
);
// Run a function in the execution context of the resource. This will
// * establish the context of the resource
// * trigger the AsyncHooks before callbacks
// * call the provided function `fn` with the supplied arguments
// * trigger the AsyncHooks after callbacks
// * restore the original execution context
asyncResource.runInAsyncScope(fn, thisArg, ...args);
// Call AsyncHooks destroy callbacks.
asyncResource.emitDestroy();
// Return the unique ID assigned to the AsyncResource instance.
asyncResource.asyncId();
// Return the trigger ID for the AsyncResource instance.
asyncResource.triggerAsyncId();
new AsyncResource(type[, options])#
用法示例:
class DBQuery extends AsyncResource {
constructor(db) {
super('DBQuery');
this.db = db;
}
getInfo(query, callback) {
this.db.get(query, (err, data) => {
this.runInAsyncScope(callback, null, err, data);
});
}
close() {
this.db = null;
this.emitDestroy();
}
}
靜態方法:AsyncResource.bind(fn[, type[, thisArg]])#
fn<Function> 要繫結到當前執行上下文的函式。type<string> 一個可選的名稱,用於關聯底層的AsyncResource。thisArg<any>
將給定的函式繫結到當前的執行上下文。
asyncResource.bind(fn[, thisArg])#
fn<Function> 要繫結到當前AsyncResource的函式。thisArg<any>
將給定的函式繫結到此 AsyncResource 的作用域中執行。
asyncResource.runInAsyncScope(fn[, thisArg, ...args])#
fn<Function> 在此非同步資源的執行上下文中呼叫的函式。thisArg<any> 用於函式呼叫的接收者。...args<any> 傳遞給函式的可選引數。
在非同步資源的執行上下文中,使用提供的引數呼叫提供的函式。這將建立上下文,觸發 AsyncHooks 的 before 回撥,呼叫函式,觸發 AsyncHooks 的 after 回撥,然後恢復原始的執行上下文。
asyncResource.emitDestroy()#
- 返回:<AsyncResource> 對
asyncResource的引用。
呼叫所有 destroy 鉤子。這應該只被呼叫一次。如果呼叫超過一次,將會丟擲錯誤。這**必須**被手動呼叫。如果資源被留給垃圾回收器(GC)收集,那麼 destroy 鉤子將永遠不會被呼叫。
為 Worker 執行緒池使用 AsyncResource#
以下示例展示瞭如何使用 AsyncResource 類為 Worker 池正確地提供非同步跟蹤。其他資源池,如資料庫連線池,可以遵循類似的模型。
假設任務是計算兩個數字的和,使用一個名為 task_processor.js 的檔案,內容如下:
import { parentPort } from 'node:worker_threads';
parentPort.on('message', (task) => {
parentPort.postMessage(task.a + task.b);
});const { parentPort } = require('node:worker_threads');
parentPort.on('message', (task) => {
parentPort.postMessage(task.a + task.b);
});
圍繞它構建的 Worker 池可以使用以下結構:
import { AsyncResource } from 'node:async_hooks';
import { EventEmitter } from 'node:events';
import { Worker } from 'node:worker_threads';
const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}
done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}
export default class WorkerPool extends EventEmitter {
constructor(numThreads) {
super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];
this.tasks = [];
for (let i = 0; i < numThreads; i++)
this.addNewWorker();
// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const { task, callback } = this.tasks.shift();
this.runTask(task, callback);
}
});
}
addNewWorker() {
const worker = new Worker(new URL('task_processor.js', import.meta.url));
worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.tasks.push({ task, callback });
return;
}
const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}
close() {
for (const worker of this.workers) worker.terminate();
}
}const { AsyncResource } = require('node:async_hooks');
const { EventEmitter } = require('node:events');
const path = require('node:path');
const { Worker } = require('node:worker_threads');
const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}
done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}
class WorkerPool extends EventEmitter {
constructor(numThreads) {
super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];
this.tasks = [];
for (let i = 0; i < numThreads; i++)
this.addNewWorker();
// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const { task, callback } = this.tasks.shift();
this.runTask(task, callback);
}
});
}
addNewWorker() {
const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.tasks.push({ task, callback });
return;
}
const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}
close() {
for (const worker of this.workers) worker.terminate();
}
}
module.exports = WorkerPool;
如果沒有 WorkerPoolTaskInfo 物件新增的顯式跟蹤,回撥似乎會與單個 Worker 物件相關聯。然而,Worker 的建立與任務的建立無關,並且不提供任務何時被排程的資訊。
這個池可以按如下方式使用:
import WorkerPool from './worker_pool.js';
import os from 'node:os';
const pool = new WorkerPool(os.availableParallelism());
let finished = 0;
for (let i = 0; i < 10; i++) {
pool.runTask({ a: 42, b: 100 }, (err, result) => {
console.log(i, err, result);
if (++finished === 10)
pool.close();
});
}const WorkerPool = require('./worker_pool.js');
const os = require('node:os');
const pool = new WorkerPool(os.availableParallelism());
let finished = 0;
for (let i = 0; i < 10; i++) {
pool.runTask({ a: 42, b: 100 }, (err, result) => {
console.log(i, err, result);
if (++finished === 10)
pool.close();
});
}
將 AsyncResource 與 EventEmitter 整合#
由 EventEmitter 觸發的事件監聽器可能在與呼叫 eventEmitter.on() 時活躍的執行上下文不同的上下文中執行。
以下示例展示瞭如何使用 AsyncResource 類將事件監聽器與正確的執行上下文正確關聯。同樣的方法可以應用於 Stream 或類似的事件驅動類。
import { createServer } from 'node:http';
import { AsyncResource, executionAsyncId } from 'node:async_hooks';
const server = createServer((req, res) => {
req.on('close', AsyncResource.bind(() => {
// Execution context is bound to the current outer scope.
}));
req.on('close', () => {
// Execution context is bound to the scope that caused 'close' to emit.
});
res.end();
}).listen(3000);const { createServer } = require('node:http');
const { AsyncResource, executionAsyncId } = require('node:async_hooks');
const server = createServer((req, res) => {
req.on('close', AsyncResource.bind(() => {
// Execution context is bound to the current outer scope.
}));
req.on('close', () => {
// Execution context is bound to the scope that caused 'close' to emit.
});
res.end();
}).listen(3000);