ストリームにおけるバックプレッシャー

データ処理中に発生する一般的な問題として、バックプレッシャーと呼ばれるものがあり、これはデータ転送中にバッファの後ろにデータが蓄積することを指します。転送の受信側が複雑な操作を行っている場合や、何らかの理由で速度が遅い場合、入ってくるソースからのデータが詰まりのように蓄積する傾向があります。

この問題を解決するには、あるソースから別のソースへのデータのスムーズな流れを確保するための委任システムが必要です。異なるコミュニティは、それぞれのプログラムに対して独自にこの問題を解決しており、Unix パイプや TCP ソケットは良い例であり、しばしばフロー制御と呼ばれます。Node.js では、ストリームが採用された解決策となっています。

このガイドの目的は、バックプレッシャーとは何か、そしてストリームが Node.js のソースコードでどのように正確にこの問題に対処しているかを詳しく説明することです。ガイドの後半では、ストリームを実装する際にアプリケーションのコードが安全かつ最適化されていることを保証するための推奨されるベストプラクティスを紹介します。

Node.js における バックプレッシャーBufferEventEmitters の一般的な定義について、および Stream の使用経験について、ある程度の知識があることを前提としています。これらのドキュメントをまだ読んでいない場合は、最初に API ドキュメントを見ておくことをお勧めします。そうすることで、このガイドを読みながら理解を深めるのに役立ちます。

データ処理の問題

コンピュータシステムでは、データはパイプ、ソケット、および信号を介してあるプロセスから別のプロセスに転送されます。Node.js では、Stream と呼ばれる同様のメカニズムが見られます。ストリームは素晴らしいです!Node.js に多くの機能を提供し、内部コードベースのほとんどすべての部分がそのモジュールを利用しています。開発者として、ストリームの使用も強く推奨されます!

const readline = require('node:readline');

// process.stdin and process.stdout are both instances of Streams.
const rl = readline.createInterface({
  input: process.stdin,
  output: process.stdout,
});

rl.question('Why should you use streams? ', answer => {
  console.log(`Maybe it's ${answer}, maybe it's because they are awesome! :)`);

  rl.close();
});

ストリームを通じて実装されるバックプレッシャーメカニズムが優れた最適化である理由の良い例は、Node.js の Stream 実装の内部システムツールを比較することで実証できます。

1 つのシナリオでは、大きなファイル(約〜9 GB)を取得し、おなじみの zip(1) ツールを使用して圧縮します。

zip The.Matrix.1080p.mkv

これには完了するまでに数分かかりますが、別のシェルで、Node.js のモジュール zlib を使用するスクリプトを実行できます。これは、別の圧縮ツールである gzip(1) をラップします。

const gzip = require('node:zlib').createGzip();
const fs = require('node:fs');

const inp = fs.createReadStream('The.Matrix.1080p.mkv');
const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz');

inp.pipe(gzip).pipe(out);

結果をテストするには、圧縮された各ファイルを開いてみてください。zip(1) ツールで圧縮されたファイルは、ファイルが破損していることを通知しますが、Stream で完了した圧縮はエラーなしで解凍されます。

この例では、.pipe() を使用して、ある端から別の端にデータソースを取得します。ただし、適切なエラーハンドラーがアタッチされていないことに注意してください。データのチャンクが適切に受信されなかった場合、Readable ソースまたは gzip ストリームは破棄されません。pump は、パイプライン内のいずれかのストリームが失敗またはクローズした場合に、パイプライン内のすべてのストリームを適切に破棄するユーティリティツールであり、この場合には必須です!

pump は、Node.js 8.x 以前でのみ必要です。Node.js 10.x 以降のバージョンでは、pipelinepump の代わりとして導入されています。これは、ストリーム間でエラーを転送し、パイプラインが完了したときに適切にクリーンアップしてコールバックを提供するモジュールメソッドです。

これはパイプラインの使用例です。

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge video file efficiently:

pipeline(
  fs.createReadStream('The.Matrix.1080p.mkv'),
  zlib.createGzip(),
  fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
  err => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

promisify をパイプラインで呼び出して、async / await で使用することもできます。

const stream = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');
const util = require('node:util');

const pipeline = util.promisify(stream.pipeline);

async function run() {
  try {
    await pipeline(
      fs.createReadStream('The.Matrix.1080p.mkv'),
      zlib.createGzip(),
      fs.createWriteStream('The.Matrix.1080p.mkv.gz')
    );
    console.log('Pipeline succeeded');
  } catch (err) {
    console.error('Pipeline failed', err);
  }
}

データが多すぎる、速すぎる

Readable ストリームが Writable にデータを非常に速く、つまりコンシューマーが処理できる量よりもはるかに多く送信するインスタンスがあります。

それが起こると、コンシューマーは後で消費するためにデータのすべてのチャンクをキューに入れ始めます。書き込みキューはどんどん長くなり、このため、プロセス全体が完了するまで、より多くのデータをメモリに保持する必要があります。

ディスクへの書き込みはディスクからの読み込みよりもはるかに遅いため、ファイルを圧縮してハードディスクに書き込もうとすると、書き込みディスクが読み込み速度に追いつけなくなるため、バックプレッシャーが発生します。

// Secretly the stream is saying: "whoa, whoa! hang on, this is way too much!"
// Data will begin to build up on the read side of the data buffer as
// `write` tries to keep up with the incoming data flow.
inp.pipe(gzip).pipe(outputFile);

そのため、バックプレッシャーメカニズムが重要になります。バックプレッシャーシステムがないと、プロセスがシステムのメモリを使い果たし、他のプロセスを実質的に遅くし、完了するまでシステムの大半を独占することになります。

これにより、いくつかの問題が発生します。

  • 他のすべての現在のプロセスの速度低下
  • 過負荷になったガベージコレクター
  • メモリの枯渇

以下の例では、.write() 関数の戻り値を取り出し、それを true に変更します。これにより、Node.js コアのバックプレッシャーサポートが無効になります。「変更された」バイナリとは、return ret; の行なしで、代わりに return true; に置き換えて node バイナリを実行することを指します。

ガベージコレクションへの過剰な負荷

簡単なベンチマークを見てみましょう。上記と同じ例を使用して、両方のバイナリの中央値を求めるために、いくつかのタイムトライアルを実行しました。

   trial (#)  | `node` binary (ms) | modified `node` binary (ms)
=================================================================
      1       |      56924         |           55011
      2       |      52686         |           55869
      3       |      59479         |           54043
      4       |      54473         |           55229
      5       |      52933         |           59723
=================================================================
average time: |      55299         |           55975

どちらも実行に約1分かかるため、あまり違いはありませんが、念のため詳しく見てみましょう。Linuxツールdtraceを使用して、V8ガベージコレクターで何が起こっているかを評価します。

GC(ガベージコレクター)の測定時間は、ガベージコレクターによる1回のスキャンサイクルの間隔を示します。

approx. time (ms) | GC (ms) | modified GC (ms)
=================================================
          0       |    0    |      0
          1       |    0    |      0
         40       |    0    |      2
        170       |    3    |      1
        300       |    3    |      1

         *             *           *
         *             *           *
         *             *           *

      39000       |    6    |     26
      42000       |    6    |     21
      47000       |    5    |     32
      50000       |    8    |     28
      54000       |    6    |     35

2つのプロセスは同じように始まり、GCを同じレートで動作させているように見えますが、バックプレッシャーシステムが正しく機能している場合、データ転送が完了するまで、GCの負荷を4〜8ミリ秒の一貫した間隔に分散することが明らかになります。

ただし、バックプレッシャーシステムがない場合、V8ガベージコレクションが引き延ばされ始めます。通常のバイナリでは、1分間に約75回GCが起動するのに対し、変更されたバイナリでは36回しか起動しません。

これは、メモリ使用量の増加によってゆっくりと蓄積される負債です。データが転送されるにつれて、バックプレッシャーシステムがないと、チャンク転送ごとに多くのメモリが使用されます。

割り当てられるメモリが多いほど、GCは1回のスキャンで多くのことを処理する必要があります。スキャンが大きいほど、GCは解放できるものを決定する必要があり、より大きなメモリ空間で分離されたポインターをスキャンすると、より多くの計算能力を消費します。

メモリの枯渇

各バイナリのメモリ消費量を調べるために、/usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js を使用して各プロセスを個別に測定しました。

これが通常のバイナリの出力です

Respecting the return value of .write()
=============================================
real        58.88
user        56.79
sys          8.79
  87810048  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
     19427  page reclaims
      3134  page faults
         0  swaps
         5  block input operations
       194  block output operations
         0  messages sent
         0  messages received
         1  signals received
        12  voluntary context switches
    666037  involuntary context switches

仮想メモリが占有する最大バイトサイズは約87.81MBであることがわかりました。

次に、.write() 関数の戻り値を変更すると、次のようになります

Without respecting the return value of .write():
==================================================
real        54.48
user        53.15
sys          7.43
1524965376  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
    373617  page reclaims
      3139  page faults
         0  swaps
        18  block input operations
       199  block output operations
         0  messages sent
         0  messages received
         1  signals received
        25  voluntary context switches
    629566  involuntary context switches

仮想メモリが占有する最大バイトサイズは約1.52GBであることがわかりました。

バックプレッシャーを委譲するストリームがない場合、割り当てられるメモリ空間は桁違いに大きくなります。同じプロセス間で大きな差があります!

この実験は、Node.jsのバックプレッシャーメカニズムがコンピューティングシステムに対してどれほど最適化され、コスト効率が高いかを示しています。では、その仕組みを詳しく見ていきましょう!

バックプレッシャーはどのようにこれらの問題を解決するのですか?

あるプロセスから別のプロセスにデータを転送するためのさまざまな関数があります。Node.jsには、.pipe() という組み込み関数があります。その他のパッケージも使用できます!しかし、最終的には、このプロセスの基本的なレベルでは、データのソースコンシューマーという2つの別々のコンポーネントがあります。

ソースから.pipe()が呼び出されると、転送するデータがあることをコンシューマーに通知します。pipe関数は、イベントトリガーの適切なバックプレッシャークロージャを設定するのに役立ちます。

Node.jsでは、ソースはReadableストリームで、コンシューマーはWritableストリームです(これらは両方ともDuplexストリームまたはTransformストリームと交換される場合がありますが、このガイドでは範囲外です)。

バックプレッシャーがトリガーされる瞬間は、Writable.write()関数の戻り値に正確に絞ることができます。この戻り値は、もちろんいくつかの条件によって決定されます。

データバッファーがhighWaterMarkを超えた場合、または書き込みキューが現在ビジー状態である場合、.write()false を返します。

false値が返されると、バックプレッシャーシステムが作動します。受信するReadableストリームがデータの送信を一時停止し、コンシューマーが再び準備が整うまで待機します。データバッファーが空になると、'drain'イベントが発行され、受信データフローが再開されます。

キューが完了すると、バックプレッシャーによりデータの再送信が可能になります。使用されていたメモリ空間は解放され、次のデータのバッチに備えます。

これにより、.pipe()関数で、任意の時点で固定量のメモリを使用できるようになります。メモリリークや無限バッファリングが発生せず、ガベージコレクターはメモリの1つの領域のみを処理する必要があります!

では、バックプレッシャーが非常に重要であるにもかかわらず、(おそらく)聞いたことがないのはなぜでしょうか?その答えは簡単です。Node.jsはこれをすべて自動的に実行しているからです。

それは素晴らしいことですが、カスタムストリームを実装する方法を理解しようとしている場合は、それほど素晴らしいことではありません。

ほとんどのマシンでは、バッファーがいっぱいになったかどうかを判断するバイトサイズがあります(これはマシンによって異なります)。Node.jsでは、カスタムのhighWaterMarkを設定できますが、通常、デフォルトは16kb(16384、またはobjectModeストリームの場合は16)に設定されています。その値を上げる必要がある場合は、実行してもかまいませんが、注意して実行してください!

.pipe()のライフサイクル

バックプレッシャーをより深く理解するために、ReadableストリームがパイプWritableストリームに接続されるライフサイクルに関するフローチャートを次に示します。

                                                     +===================+
                         x-->  Piping functions   +-->   src.pipe(dest)  |
                         x     are set up during     |===================|
                         x     the .pipe method.     |  Event callbacks  |
  +===============+      x                           |-------------------|
  |   Your Data   |      x     They exist outside    | .on('close', cb)  |
  +=======+=======+      x     the data flow, but    | .on('data', cb)   |
          |              x     importantly attach    | .on('drain', cb)  |
          |              x     events, and their     | .on('unpipe', cb) |
+---------v---------+    x     respective callbacks. | .on('error', cb)  |
|  Readable Stream  +----+                           | .on('finish', cb) |
+-^-------^-------^-+    |                           | .on('end', cb)    |
  ^       |       ^      |                           +-------------------+
  |       |       |      |
  |       ^       |      |
  ^       ^       ^      |    +-------------------+         +=================+
  ^       |       ^      +---->  Writable Stream  +--------->  .write(chunk)  |
  |       |       |           +-------------------+         +=======+=========+
  |       |       |                                                 |
  |       ^       |                              +------------------v---------+
  ^       |       +-> if (!chunk)                |    Is this chunk too big?  |
  ^       |       |     emit .end();             |    Is the queue busy?      |
  |       |       +-> else                       +-------+----------------+---+
  |       ^       |     emit .write();                   |                |
  |       ^       ^                                   +--v---+        +---v---+
  |       |       ^-----------------------------------<  No  |        |  Yes  |
  ^       |                                           +------+        +---v---+
  ^       |                                                               |
  |       ^               emit .pause();          +=================+     |
  |       ^---------------^-----------------------+  return false;  <-----+---+
  |                                               +=================+         |
  |                                                                           |
  ^            when queue is empty     +============+                         |
  ^------------^-----------------------<  Buffering |                         |
               |                       |============|                         |
               +> emit .drain();       |  ^Buffer^  |                         |
               +> emit .resume();      +------------+                         |
                                       |  ^Buffer^  |                         |
                                       +------------+   add chunk to queue    |
                                       |            <---^---------------------<
                                       +============+

データを操作するためにいくつかのストリームを連鎖させるパイプラインを設定する場合、Transformストリームを実装する可能性が最も高くなります。

この場合、Readableストリームからの出力は、Transformに入り、Writableにパイプされます。

Readable.pipe(Transformable).pipe(Writable);

バックプレッシャーは自動的に適用されますが、Transformストリームの入力および出力のhighWaterMarkの両方を操作でき、バックプレッシャーシステムに影響を与えることに注意してください。

バックプレッシャーのガイドライン

Node.js v0.10以降、Streamクラスは、それぞれの関数のアンダースコアバージョン(._read()および._write())を使用して、.read()または.write()の動作を変更する機能を提供しています。

Readableストリームを実装するおよびWritableストリームを実装するためのガイドラインが文書化されています。これらを読んで理解していることを前提として、次のセクションで少し詳しく説明します。

カスタムストリームを実装する際に守るべきルール

ストリームの黄金律は、常にバックプレッシャーを尊重することです。ベストプラクティスとみなされるのは、矛盾しないプラクティスです。内部のバックプレッシャーサポートと矛盾する動作を避けるように注意すれば、適切なプラクティスに従っていることを確認できます。

一般的に、

  1. 要求されていない場合は、絶対に.push()を使用しないでください。
  2. .write()がfalseを返した後は、.write()を呼び出さないで、代わりに「drain」を待機してください。
  3. ストリームは、異なるNode.jsバージョンや使用するライブラリによって変更されます。注意してテストしてください。

3点目に関しては、ブラウザストリームを構築するための非常に便利なパッケージがreadable-streamです。Rodd Vaggは、このライブラリの有用性を説明する素晴らしいブログ記事を書いています。簡単に言うと、Readableストリームの自動的で正常な劣化の一種を提供し、古いバージョンのブラウザとNode.jsをサポートします。

Readableストリームに固有のルール

これまで、.write()がバックプレッシャーにどのように影響するかを見てきて、Writableストリームに焦点を当ててきました。Node.jsの機能により、データは技術的にはReadableからWritableにダウンストリームに流れています。ただし、データ、物質、またはエネルギーの伝送で観察できるとおり、ソースは宛先と同じくらい重要であり、Readableストリームはバックプレッシャーの処理方法にとって不可欠です。

これらのプロセスは、効果的に通信するために互いに依存しています。もしReadableストリームが、Writableストリームからデータの送信を停止するように求められた際にそれを無視した場合、.write()の戻り値が正しくない場合と同様に問題が発生する可能性があります。

したがって、.write()の戻り値を尊重するだけでなく、.push()._read()メソッドで使用された場合の戻り値も尊重する必要があります。.push()false値を返した場合、ストリームはソースからの読み取りを停止します。それ以外の場合は、一時停止せずに続行します。

以下は、.push()を使用した悪い例です。

// This is problematic as it completely ignores the return value from the push
// which may be a signal for backpressure from the destination stream!
class MyReadable extends Readable {
  _read(size) {
    let chunk;
    while (null !== (chunk = getNextChunk())) {
      this.push(chunk);
    }
  }
}

さらに、カスタムストリームの外部から、バックプレッシャーを無視することにも落とし穴があります。この良い例に対する反例では、アプリケーションのコードは、データが利用可能になると('data'イベントによって通知される)、常にデータを強制的に送信します。

// This ignores the backpressure mechanisms Node.js has set in place,
// and unconditionally pushes through data, regardless if the
// destination stream is ready for it or not.
readable.on('data', data => writable.write(data));

以下は、Readableストリームで.push()を使用する例です。

const { Readable } = require('node:stream');

// Create a custom Readable stream
const myReadableStream = new Readable({
  objectMode: true,
  read(size) {
    // Push some data onto the stream
    this.push({ message: 'Hello, world!' });
    this.push(null); // Mark the end of the stream
  },
});

// Consume the stream
myReadableStream.on('data', chunk => {
  console.log(chunk);
});

// Output:
// { message: 'Hello, world!' }

この例では、.push()を使用して単一のオブジェクトをストリームにプッシュするカスタムReadableストリームを作成します。ストリームがデータを消費する準備ができると._read()メソッドが呼び出され、この場合は、すぐにデータをストリームにプッシュし、nullをプッシュしてストリームの終了をマークします。

次に、'data'イベントをリッスンし、ストリームにプッシュされた各データチャンクをログに記録することで、ストリームを消費します。この場合、ストリームにプッシュするデータチャンクは1つだけなので、ログメッセージが1つだけ表示されます。

Writableストリームに固有のルール

.write()は、いくつかの条件に応じてtrueまたはfalseを返す可能性があることを思い出してください。幸いなことに、独自のWritableストリームを構築する場合、ストリーム状態マシンがコールバックを処理し、バックプレッシャーを処理するタイミングを決定し、データフローを最適化します。

ただし、Writableを直接使用する場合は、.write()の戻り値を尊重し、これらの条件に注意を払う必要があります。

  • 書き込みキューがビジーの場合、.write()はfalseを返します。
  • データチャンクが大きすぎる場合、.write()はfalseを返します(制限は変数highWaterMarkで示されます)。
// This writable is invalid because of the async nature of JavaScript callbacks.
// Without a return statement for each callback prior to the last,
// there is a great chance multiple callbacks will be called.
class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) callback();
    else if (chunk.toString().indexOf('b') >= 0) callback();
    callback();
  }
}

// The proper way to write this would be:
if (chunk.contains('a')) return callback();
if (chunk.contains('b')) return callback();
callback();

._writev()を実装する際に注意すべきこともあります。この関数は.cork()と組み合わされていますが、記述する際によくある間違いがあります。

// Using .uncork() twice here makes two calls on the C++ layer, rendering the
// cork/uncork technique useless.
ws.cork();
ws.write('hello ');
ws.write('world ');
ws.uncork();

ws.cork();
ws.write('from ');
ws.write('Matteo');
ws.uncork();

// The correct way to write this is to utilize process.nextTick(), which fires
// on the next event loop.
ws.cork();
ws.write('hello ');
ws.write('world ');
process.nextTick(doUncork, ws);

ws.cork();
ws.write('from ');
ws.write('Matteo');
process.nextTick(doUncork, ws);

// As a global function.
function doUncork(stream) {
  stream.uncork();
}

.cork()は必要な回数だけ呼び出すことができます。ただし、再びフローさせるには、.uncork()を同じ回数だけ呼び出すように注意する必要があります。

結論

ストリームは、Node.jsでよく使用されるモジュールです。これらは内部構造にとって重要であり、開発者にとっては、Node.jsモジュールのエコシステム全体を拡張および接続するために重要です。

この記事を通じて、バックプレッシャーを念頭に置いて、独自のWritableおよびReadableストリームを安全にコード化し、トラブルシューティングできるようになり、同僚や友人と知識を共有できることを願っています。

Node.jsでアプリケーションを構築する際にストリーミング機能を改善および解放するのに役立つその他のAPI関数については、Streamの詳細を必ずお読みください。