非同期コンテキスト追跡#

安定性: 2 - 安定版

ソースコード: lib/async_hooks.js

はじめに#

これらのクラスは、状態を関連付け、コールバックやPromiseチェーン全体にそれを伝播するために使用されます。これにより、Webリクエストやその他の非同期期間のライフタイム全体にわたってデータを保存できます。これは、他の言語におけるスレッドローカルストレージに似ています。

`AsyncLocalStorage`クラスと`AsyncResource`クラスは、`node:async_hooks`モジュールの一部です。

import { AsyncLocalStorage, AsyncResource } from 'node:async_hooks';const { AsyncLocalStorage, AsyncResource } = require('node:async_hooks');

クラス: `AsyncLocalStorage`#

このクラスは、非同期操作を通じて一貫性を保つストアを作成します。

`node:async_hooks`モジュールの上に独自のインプリメンテーションを作成できますが、`AsyncLocalStorage`は、実装が容易ではない重要な最適化を含む、パフォーマンスが高くメモリセーフなインプリメンテーションであるため、推奨されます。

次の例では、`AsyncLocalStorage`を使用して、着信HTTPリクエストにIDを割り当て、各リクエスト内でログに記録されたメッセージに含めるシンプルなロガーを構築します。

import http from 'node:http';
import { AsyncLocalStorage } from 'node:async_hooks';

const asyncLocalStorage = new AsyncLocalStorage();

function logWithId(msg) {
  const id = asyncLocalStorage.getStore();
  console.log(`${id !== undefined ? id : '-'}:`, msg);
}

let idSeq = 0;
http.createServer((req, res) => {
  asyncLocalStorage.run(idSeq++, () => {
    logWithId('start');
    // Imagine any chain of async operations here
    setImmediate(() => {
      logWithId('finish');
      res.end();
    });
  });
}).listen(8080);

http.get('https://127.0.0.1:8080');
http.get('https://127.0.0.1:8080');
// Prints:
//   0: start
//   1: start
//   0: finish
//   1: finishconst http = require('node:http');
const { AsyncLocalStorage } = require('node:async_hooks');

const asyncLocalStorage = new AsyncLocalStorage();

function logWithId(msg) {
  const id = asyncLocalStorage.getStore();
  console.log(`${id !== undefined ? id : '-'}:`, msg);
}

let idSeq = 0;
http.createServer((req, res) => {
  asyncLocalStorage.run(idSeq++, () => {
    logWithId('start');
    // Imagine any chain of async operations here
    setImmediate(() => {
      logWithId('finish');
      res.end();
    });
  });
}).listen(8080);

http.get('https://127.0.0.1:8080');
http.get('https://127.0.0.1:8080');
// Prints:
//   0: start
//   1: start
//   0: finish
//   1: finish

`AsyncLocalStorage`の各インスタンスは、独立したストレージコンテキストを維持します。複数のインスタンスが同時に安全に存在でき、互いのデータに干渉するリスクはありません。

`new AsyncLocalStorage()`#

`AsyncLocalStorage`の新しいインスタンスを作成します。ストアは、`run()`呼び出し内、または`enterWith()`呼び出し後でのみ提供されます。

静的メソッド: `AsyncLocalStorage.bind(fn)`#

安定性: 1 - 実験的

  • `fn` <Function> 現在の実行コンテキストにバインドする関数。
  • 戻り値: <Function> キャプチャされた実行コンテキスト内で`fn`を呼び出す新しい関数。

指定された関数を現在の実行コンテキストにバインドします。

静的メソッド: `AsyncLocalStorage.snapshot()`#

安定性: 1 - 実験的

  • 戻り値: <Function> シグネチャ `(fn: (...args) : R, ...args) : R` の新しい関数。

現在の実行コンテキストをキャプチャし、引数として関数を取得する関数を返します。返された関数が呼び出されるたびに、キャプチャされたコンテキスト内で渡された関数を呼び出します。

const asyncLocalStorage = new AsyncLocalStorage();
const runInAsyncScope = asyncLocalStorage.run(123, () => AsyncLocalStorage.snapshot());
const result = asyncLocalStorage.run(321, () => runInAsyncScope(() => asyncLocalStorage.getStore()));
console.log(result);  // returns 123 

AsyncLocalStorage.snapshot() は、単純な非同期コンテキスト追跡のために AsyncResource の使用を置き換えることができます。

class Foo {
  #runInAsyncScope = AsyncLocalStorage.snapshot();

  get() { return this.#runInAsyncScope(() => asyncLocalStorage.getStore()); }
}

const foo = asyncLocalStorage.run(123, () => new Foo());
console.log(asyncLocalStorage.run(321, () => foo.get())); // returns 123 

`asyncLocalStorage.disable()`#

安定性: 1 - 実験的

`AsyncLocalStorage`のインスタンスを無効にします。`asyncLocalStorage.run()`または`asyncLocalStorage.enterWith()`が再度呼び出されるまで、`asyncLocalStorage.getStore()`への後続の呼び出しはすべて`undefined`を返します。

`asyncLocalStorage.disable()`を呼び出すと、インスタンスにリンクされている現在のすべてのコンテキストが終了します。

`asyncLocalStorage`をガベージコレクションする前に、`asyncLocalStorage.disable()`を呼び出す必要があります。これは、`asyncLocalStorage`によって提供されるストアには適用されません。これらのオブジェクトは、対応する非同期リソースとともにガベージコレクションされます。

現在のプロセスで`asyncLocalStorage`が不要になった場合にこのメソッドを使用します。

`asyncLocalStorage.getStore()`#

現在のストアを返します。`asyncLocalStorage.run()`または`asyncLocalStorage.enterWith()`を呼び出すことによって初期化された非同期コンテキストの外で呼び出された場合、`undefined`を返します。

`asyncLocalStorage.enterWith(store)`#

安定性: 1 - 実験的

現在の同期実行の残りの部分のコンテキストに移行し、その後続の非同期呼び出しを通じてストアを永続化します。

const store = { id: 1 };
// Replaces previous store with the given store object
asyncLocalStorage.enterWith(store);
asyncLocalStorage.getStore(); // Returns the store object
someAsyncOperation(() => {
  asyncLocalStorage.getStore(); // Returns the same object
}); 

この移行は、全体の同期実行に継続されます。つまり、たとえば、コンテキストがイベントハンドラー内で開始された場合、後続のイベントハンドラーもそのコンテキスト内で実行されます(`AsyncResource`で別のコンテキストに特に関数バインドしない限り)。そのため、後者のメソッドを使用する正当な理由がない限り、`run()`を`enterWith()`よりも優先する必要があります。

const store = { id: 1 };

emitter.on('my-event', () => {
  asyncLocalStorage.enterWith(store);
});
emitter.on('my-event', () => {
  asyncLocalStorage.getStore(); // Returns the same object
});

asyncLocalStorage.getStore(); // Returns undefined
emitter.emit('my-event');
asyncLocalStorage.getStore(); // Returns the same object 

`asyncLocalStorage.run(store, callback[, ...args])`#

コンテキスト内で関数を同期的に実行し、その戻り値を返します。ストアは、コールバック関数の外部ではアクセスできません。ストアは、コールバック内で作成された非同期操作からアクセスできます。

オプションの`args`は、コールバック関数に渡されます。

コールバック関数がエラーをスローした場合、エラーは`run()`によってもスローされます。スタックトレースはこの呼び出しの影響を受けず、コンテキストは終了します。

const store = { id: 2 };
try {
  asyncLocalStorage.run(store, () => {
    asyncLocalStorage.getStore(); // Returns the store object
    setTimeout(() => {
      asyncLocalStorage.getStore(); // Returns the store object
    }, 200);
    throw new Error();
  });
} catch (e) {
  asyncLocalStorage.getStore(); // Returns undefined
  // The error will be caught here
} 

`asyncLocalStorage.exit(callback[, ...args])`#

安定性: 1 - 実験的

コンテキストの外で関数を同期的に実行し、その戻り値を返します。ストアは、コールバック関数またはコールバック内で作成された非同期操作内ではアクセスできません。コールバック関数内で実行される`getStore()`呼び出しは、常に`undefined`を返します。

オプションの`args`は、コールバック関数に渡されます。

コールバック関数がエラーをスローした場合、エラーは`exit()`によってもスローされます。スタックトレースはこの呼び出しの影響を受けず、コンテキストは再入力されます。

// Within a call to run
try {
  asyncLocalStorage.getStore(); // Returns the store object or value
  asyncLocalStorage.exit(() => {
    asyncLocalStorage.getStore(); // Returns undefined
    throw new Error();
  });
} catch (e) {
  asyncLocalStorage.getStore(); // Returns the same object or value
  // The error will be caught here
} 

`async/await` での使用#

非同期関数内で、コンテキスト内で実行される`await`呼び出しが1つだけである場合、次のパターンを使用する必要があります。

async function fn() {
  await asyncLocalStorage.run(new Map(), () => {
    asyncLocalStorage.getStore().set('key', value);
    return foo(); // The return value of foo will be awaited
  });
} 

この例では、ストアはコールバック関数と`foo`によって呼び出される関数でのみ使用可能です。`run`の外側では、`getStore`を呼び出すと`undefined`が返されます。

トラブルシューティング: コンテキストの消失#

ほとんどの場合、`AsyncLocalStorage`は問題なく動作します。まれに、現在のストアが非同期操作のいずれかで失われる場合があります。

コードがコールバックベースである場合、`util.promisify()`を使用してPromise化すれば、ネイティブPromiseで動作し始めます。

コールバックベースのAPIを使用する必要がある場合、またはコードがカスタムのThenable実装を想定している場合は、`AsyncResource`クラスを使用して、非同期操作を正しい実行コンテキストに関連付けます。コンテキストの消失の原因となっている関数呼び出しを、疑わしい呼び出しの後に`asyncLocalStorage.getStore()`の内容をログに記録することで特定します。コードが`undefined`をログに記録する場合、最後に呼び出されたコールバックがコンテキストの消失の原因となっている可能性があります。

クラス: `AsyncResource`#

`AsyncResource`クラスは、エンベッダーの非同期リソースによって拡張されるように設計されています。これを使用すると、ユーザーは独自の資源のライフタイムイベントを簡単にトリガーできます。

`init`フックは、`AsyncResource`がインスタンス化されるときにトリガーされます。

以下は、`AsyncResource` APIの概要です。

import { AsyncResource, executionAsyncId } from 'node:async_hooks';

// AsyncResource() is meant to be extended. Instantiating a
// new AsyncResource() also triggers init. If triggerAsyncId is omitted then
// async_hook.executionAsyncId() is used.
const asyncResource = new AsyncResource(
  type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false },
);

// Run a function in the execution context of the resource. This will
// * establish the context of the resource
// * trigger the AsyncHooks before callbacks
// * call the provided function `fn` with the supplied arguments
// * trigger the AsyncHooks after callbacks
// * restore the original execution context
asyncResource.runInAsyncScope(fn, thisArg, ...args);

// Call AsyncHooks destroy callbacks.
asyncResource.emitDestroy();

// Return the unique ID assigned to the AsyncResource instance.
asyncResource.asyncId();

// Return the trigger ID for the AsyncResource instance.
asyncResource.triggerAsyncId();const { AsyncResource, executionAsyncId } = require('node:async_hooks');

// AsyncResource() is meant to be extended. Instantiating a
// new AsyncResource() also triggers init. If triggerAsyncId is omitted then
// async_hook.executionAsyncId() is used.
const asyncResource = new AsyncResource(
  type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false },
);

// Run a function in the execution context of the resource. This will
// * establish the context of the resource
// * trigger the AsyncHooks before callbacks
// * call the provided function `fn` with the supplied arguments
// * trigger the AsyncHooks after callbacks
// * restore the original execution context
asyncResource.runInAsyncScope(fn, thisArg, ...args);

// Call AsyncHooks destroy callbacks.
asyncResource.emitDestroy();

// Return the unique ID assigned to the AsyncResource instance.
asyncResource.asyncId();

// Return the trigger ID for the AsyncResource instance.
asyncResource.triggerAsyncId();

`new AsyncResource(type[, options])`#

  • `type` <string> 非同期イベントの種類。
  • options <Object>
    • triggerAsyncId <number> この非同期イベントを作成した実行コンテキストのID。デフォルト: executionAsyncId()
    • requireManualDestroy <boolean> trueに設定すると、オブジェクトがガベージコレクションされたときにemitDestroyを無効にします。リソースのasyncIdが取得され、重要なAPIのemitDestroyがそれを使用して呼び出されない限り、通常は(emitDestroyを手動で呼び出した場合でも)設定する必要はありません。falseに設定されている場合、ガベージコレクション時のemitDestroy呼び出しは、アクティブなdestroyフックが少なくとも1つ存在する場合にのみ行われます。デフォルト: false

使用例

class DBQuery extends AsyncResource {
  constructor(db) {
    super('DBQuery');
    this.db = db;
  }

  getInfo(query, callback) {
    this.db.get(query, (err, data) => {
      this.runInAsyncScope(callback, null, err, data);
    });
  }

  close() {
    this.db = null;
    this.emitDestroy();
  }
} 

静的メソッド: AsyncResource.bind(fn[, type[, thisArg]])#

  • `fn` <Function> 現在の実行コンテキストにバインドする関数。
  • type <string> 基になるAsyncResourceに関連付けるオプションの名前。
  • thisArg <any>

指定された関数を現在の実行コンテキストにバインドします。

asyncResource.bind(fn[, thisArg])#

  • fn <Function> 現在のAsyncResourceにバインドする関数。
  • thisArg <any>

指定された関数を、このAsyncResourceのスコープで実行するようにバインドします。

asyncResource.runInAsyncScope(fn[, thisArg, ...args])#

  • fn <Function> この非同期リソースの実行コンテキストで呼び出す関数。
  • thisArg <any> 関数呼び出しに使用するレシーバ。
  • ...args <any> 関数に渡すオプションの引数。

指定された引数を使用して、非同期リソースの実行コンテキストで指定された関数を呼び出します。これにより、コンテキストが確立され、コールバックの前にAsyncHooksがトリガーされ、関数が呼び出され、コールバックの後にAsyncHooksがトリガーされ、元のExecutionContextが復元されます。

asyncResource.emitDestroy()#

すべてのdestroyフックを呼び出します。これは一度だけ呼び出す必要があります。複数回呼び出すとエラーが発生します。これは**必ず**手動で呼び出す必要があります。リソースをGCによって収集するように残すと、destroyフックは決して呼び出されません。

asyncResource.asyncId()#

  • 戻り値: <number> リソースに割り当てられた一意のasyncId

asyncResource.triggerAsyncId()#

  • 戻り値: <number> AsyncResourceコンストラクタに渡されたものと同じtriggerAsyncId

ワーカー・スレッドプールでのAsyncResourceの使用#

次の例は、AsyncResourceクラスを使用して、Workerプールの非同期追跡を適切に提供する方法を示しています。データベース接続プールなどの他のリソースプールでも、同様のモデルに従うことができます。

タスクが2つの数値の加算であると仮定し、次の内容のtask_processor.jsという名前のファイルを使用する

import { parentPort } from 'node:worker_threads';
parentPort.on('message', (task) => {
  parentPort.postMessage(task.a + task.b);
});const { parentPort } = require('node:worker_threads');
parentPort.on('message', (task) => {
  parentPort.postMessage(task.a + task.b);
});

その周りのワーカープールは、次の構造を使用できます

import { AsyncResource } from 'node:async_hooks';
import { EventEmitter } from 'node:events';
import path from 'node:path';
import { Worker } from 'node:worker_threads';

const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');

class WorkerPoolTaskInfo extends AsyncResource {
  constructor(callback) {
    super('WorkerPoolTaskInfo');
    this.callback = callback;
  }

  done(err, result) {
    this.runInAsyncScope(this.callback, null, err, result);
    this.emitDestroy();  // `TaskInfo`s are used only once.
  }
}

export default class WorkerPool extends EventEmitter {
  constructor(numThreads) {
    super();
    this.numThreads = numThreads;
    this.workers = [];
    this.freeWorkers = [];
    this.tasks = [];

    for (let i = 0; i < numThreads; i++)
      this.addNewWorker();

    // Any time the kWorkerFreedEvent is emitted, dispatch
    // the next task pending in the queue, if any.
    this.on(kWorkerFreedEvent, () => {
      if (this.tasks.length > 0) {
        const { task, callback } = this.tasks.shift();
        this.runTask(task, callback);
      }
    });
  }

  addNewWorker() {
    const worker = new Worker(new URL('task_processor.js', import.meta.url));
    worker.on('message', (result) => {
      // In case of success: Call the callback that was passed to `runTask`,
      // remove the `TaskInfo` associated with the Worker, and mark it as free
      // again.
      worker[kTaskInfo].done(null, result);
      worker[kTaskInfo] = null;
      this.freeWorkers.push(worker);
      this.emit(kWorkerFreedEvent);
    });
    worker.on('error', (err) => {
      // In case of an uncaught exception: Call the callback that was passed to
      // `runTask` with the error.
      if (worker[kTaskInfo])
        worker[kTaskInfo].done(err, null);
      else
        this.emit('error', err);
      // Remove the worker from the list and start a new Worker to replace the
      // current one.
      this.workers.splice(this.workers.indexOf(worker), 1);
      this.addNewWorker();
    });
    this.workers.push(worker);
    this.freeWorkers.push(worker);
    this.emit(kWorkerFreedEvent);
  }

  runTask(task, callback) {
    if (this.freeWorkers.length === 0) {
      // No free threads, wait until a worker thread becomes free.
      this.tasks.push({ task, callback });
      return;
    }

    const worker = this.freeWorkers.pop();
    worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
    worker.postMessage(task);
  }

  close() {
    for (const worker of this.workers) worker.terminate();
  }
}const { AsyncResource } = require('node:async_hooks');
const { EventEmitter } = require('node:events');
const path = require('node:path');
const { Worker } = require('node:worker_threads');

const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');

class WorkerPoolTaskInfo extends AsyncResource {
  constructor(callback) {
    super('WorkerPoolTaskInfo');
    this.callback = callback;
  }

  done(err, result) {
    this.runInAsyncScope(this.callback, null, err, result);
    this.emitDestroy();  // `TaskInfo`s are used only once.
  }
}

class WorkerPool extends EventEmitter {
  constructor(numThreads) {
    super();
    this.numThreads = numThreads;
    this.workers = [];
    this.freeWorkers = [];
    this.tasks = [];

    for (let i = 0; i < numThreads; i++)
      this.addNewWorker();

    // Any time the kWorkerFreedEvent is emitted, dispatch
    // the next task pending in the queue, if any.
    this.on(kWorkerFreedEvent, () => {
      if (this.tasks.length > 0) {
        const { task, callback } = this.tasks.shift();
        this.runTask(task, callback);
      }
    });
  }

  addNewWorker() {
    const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
    worker.on('message', (result) => {
      // In case of success: Call the callback that was passed to `runTask`,
      // remove the `TaskInfo` associated with the Worker, and mark it as free
      // again.
      worker[kTaskInfo].done(null, result);
      worker[kTaskInfo] = null;
      this.freeWorkers.push(worker);
      this.emit(kWorkerFreedEvent);
    });
    worker.on('error', (err) => {
      // In case of an uncaught exception: Call the callback that was passed to
      // `runTask` with the error.
      if (worker[kTaskInfo])
        worker[kTaskInfo].done(err, null);
      else
        this.emit('error', err);
      // Remove the worker from the list and start a new Worker to replace the
      // current one.
      this.workers.splice(this.workers.indexOf(worker), 1);
      this.addNewWorker();
    });
    this.workers.push(worker);
    this.freeWorkers.push(worker);
    this.emit(kWorkerFreedEvent);
  }

  runTask(task, callback) {
    if (this.freeWorkers.length === 0) {
      // No free threads, wait until a worker thread becomes free.
      this.tasks.push({ task, callback });
      return;
    }

    const worker = this.freeWorkers.pop();
    worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
    worker.postMessage(task);
  }

  close() {
    for (const worker of this.workers) worker.terminate();
  }
}

module.exports = WorkerPool;

WorkerPoolTaskInfoオブジェクトによって追加された明示的な追跡がないと、コールバックは個々のWorkerオブジェクトに関連付けられているように見えます。しかし、Workerの作成はタスクの作成とは関連しておらず、タスクがいつスケジュールされたかについての情報は提供しません。

このプールは次のように使用できます

import WorkerPool from './worker_pool.js';
import os from 'node:os';

const pool = new WorkerPool(os.availableParallelism());

let finished = 0;
for (let i = 0; i < 10; i++) {
  pool.runTask({ a: 42, b: 100 }, (err, result) => {
    console.log(i, err, result);
    if (++finished === 10)
      pool.close();
  });
}const WorkerPool = require('./worker_pool.js');
const os = require('node:os');

const pool = new WorkerPool(os.availableParallelism());

let finished = 0;
for (let i = 0; i < 10; i++) {
  pool.runTask({ a: 42, b: 100 }, (err, result) => {
    console.log(i, err, result);
    if (++finished === 10)
      pool.close();
  });
}

AsyncResourceEventEmitterの統合#

EventEmitterによってトリガーされるイベントリスナーは、eventEmitter.on()が呼び出されたときにアクティブだったものとは異なる実行コンテキストで実行される場合があります。

次の例は、AsyncResourceクラスを使用して、イベントリスナーを正しい実行コンテキストに適切に関連付ける方法を示しています。同じアプローチをStreamまたは同様のイベント駆動型クラスに適用できます。

import { createServer } from 'node:http';
import { AsyncResource, executionAsyncId } from 'node:async_hooks';

const server = createServer((req, res) => {
  req.on('close', AsyncResource.bind(() => {
    // Execution context is bound to the current outer scope.
  }));
  req.on('close', () => {
    // Execution context is bound to the scope that caused 'close' to emit.
  });
  res.end();
}).listen(3000);const { createServer } = require('node:http');
const { AsyncResource, executionAsyncId } = require('node:async_hooks');

const server = createServer((req, res) => {
  req.on('close', AsyncResource.bind(() => {
    // Execution context is bound to the current outer scope.
  }));
  req.on('close', () => {
    // Execution context is bound to the scope that caused 'close' to emit.
  });
  res.end();
}).listen(3000);