ストリームの使い方
Node.jsアプリケーションで大量のデータを扱うことは、諸刃の剣です。大量のデータを処理できる能力は非常に便利ですが、パフォーマンスのボトルネックやメモリの枯渇につながる可能性もあります。従来、開発者はデータセット全体を一度にメモリに読み込むことでこの課題に取り組んできました。このアプローチは、小さいデータセットには直感的ですが、大きなデータ(ファイル、ネットワークリクエストなど)には非効率でリソースを大量に消費します。
ここでNode.jsのストリームが登場します。ストリームは根本的に異なるアプローチを提供し、データを逐次的に処理してメモリ使用量を最適化できます。データを管理可能なチャンクで処理することにより、ストリームは最も困難なデータセットでも効率的に対処できるスケーラブルなアプリケーションを構築する力を与えます。よく引用されるように、「ストリームは時間経過に伴う配列」です。
このガイドでは、ストリームの概念、歴史、APIの概要と、それらの使用・操作方法に関するいくつかの推奨事項を説明します。
Node.jsのストリームとは?
Node.jsのストリームは、アプリケーションのデータフローを管理するための強力な抽象化を提供します。ファイルからの読み書きやネットワークリクエストなどの大きなデータセットを、パフォーマンスを損なうことなく処理することに優れています。
このアプローチは、データセット全体を一度にメモリに読み込む方法とは異なります。ストリームはデータをチャンク単位で処理するため、メモリ使用量を大幅に削減します。Node.jsのすべてのストリームは EventEmitter クラスを継承しており、データ処理のさまざまな段階でイベントを発行できます。これらのストリームは、読み取り可能、書き込み可能、またはその両方が可能で、さまざまなデータ処理シナリオに柔軟に対応します。
イベント駆動型アーキテクチャ
Node.jsはイベント駆動型アーキテクチャで動作するため、リアルタイムのI/Oに理想的です。これは、入力が利用可能になったらすぐに消費し、アプリケーションが生成したらすぐに出力を送信することを意味します。ストリームはこのアプローチとシームレスに統合され、継続的なデータ処理を可能にします。
これは、主要な段階でイベントを発行することによって実現されます。これらのイベントには、受信したデータ(data イベント)やストリームの完了(end イベント)のシグナルが含まれます。開発者はこれらのイベントをリッスンし、それに応じてカスタムロジックを実行できます。このイベント駆動型の性質により、ストリームは外部ソースからのデータ処理に非常に効率的です。
なぜストリームを使うのか?
ストリームは、他のデータ処理方法に比べて3つの主要な利点を提供します
- メモリ効率: ストリームはデータを逐次的に処理し、データセット全体をメモリに読み込むのではなく、チャンク単位で消費・処理します。これは、大量のデータセットを扱う際に大きな利点となり、メモリ使用量を大幅に削減し、メモリ関連のパフォーマンス問題を防止します。
- 応答時間の改善: ストリームは即時のデータ処理を可能にします。データのチャンクが到着すると、ペイロード全体やデータセット全体が受信されるのを待たずに処理できます。これにより、レイテンシが短縮され、アプリケーション全体の応答性が向上します。
- リアルタイム処理のスケーラビリティ: データをチャンク単位で処理することで、Node.jsのストリームは限られたリソースで大量のデータを効率的に処理できます。このスケーラビリティにより、ストリームはリアルタイムで大量のデータを処理するアプリケーションに最適です。
これらの利点により、ストリームは高性能でスケーラブルなNode.jsアプリケーションを構築するための強力なツールとなり、特に大規模なデータセットやリアルタイムのデータ処理を扱う場合に有効です。
パフォーマンスに関する注意
アプリケーションがすでにすべてのデータをメモリ上で利用可能な場合、ストリームを使用すると不要なオーバーヘッドや複雑さが加わり、アプリケーションの速度が低下する可能性があります。
ストリームの歴史
このセクションは、Node.jsにおけるストリームの歴史のリファレンスです。Node.jsバージョン0.11.5(2013年)以前のコードベースを扱っているのでなければ、古いバージョンのストリームAPIに遭遇することはほとんどありませんが、用語はまだ使われているかもしれません。
Streams 0
ストリームの最初のバージョンはNode.jsと同時にリリースされました。まだStreamクラスはありませんでしたが、さまざまなモジュールがこの概念を使用し、read/write関数を実装していました。util.pump()関数は、ストリーム間のデータフローを制御するために利用できました。
Streams 1 (Classic)
2011年のNode v0.4.0のリリースに伴い、Streamクラスとpipe()メソッドが導入されました。
Streams 2
2012年、Node v0.10.0のリリースに伴い、Streams 2が発表されました。このアップデートでは、Readable、Writable、Duplex、Transformを含む新しいストリームサブクラスが導入されました。さらに、readableイベントが追加されました。後方互換性を維持するため、dataイベントリスナーを追加するか、pause()またはresume()メソッドを呼び出すことで、ストリームを旧モードに切り替えることができました。
Streams 3
2013年、Node v0.11.5でStreams 3がリリースされました。これは、ストリームがdataとreadableの両方のイベントハンドラを持つ問題に対処するためでした。これにより、「現行」モードと「旧」モードのどちらかを選択する必要がなくなりました。Streams 3は、Node.jsにおける現在のストリームのバージョンです。
ストリームのタイプ
読み取り可能(Readable)
Readableは、データソースを順次読み込むために使用するクラスです。Node.js APIにおけるReadableストリームの典型的な例は、ファイルの読み込み時のfs.ReadStream、HTTPリクエストの読み込み時のhttp.IncomingMessage、標準入力からの読み込み時のprocess.stdinなどです。
主要なメソッドとイベント
読み取り可能ストリームは、データ処理を細かく制御するためのいくつかのコアメソッドとイベントで動作します。
on('data'): このイベントは、ストリームからデータが利用可能になるたびにトリガーされます。ストリームが処理できる限り速くデータをプッシュするため、非常に高速で、高スループットのシナリオに適しています。on('end'): ストリームから読み取るデータがなくなったときに発行されます。これはデータ配信の完了を示します。このイベントは、ストリームからのすべてのデータが消費されたときにのみ発行されます。on('readable'): このイベントは、ストリームから読み取るデータが利用可能になったとき、またはストリームの終わりに達したときにトリガーされます。必要に応じて、より制御されたデータ読み取りを可能にします。on('close'): このイベントは、ストリームとその基盤となるリソースが閉じられたときに発行され、これ以上イベントが発行されないことを示します。on('error'): このイベントはいつでも発行される可能性があり、処理中にエラーが発生したことを示します。このイベントのハンドラを使用して、キャッチされない例外を回避できます。
これらのイベントの使用例は、以降のセクションで示します。
基本的な読み取り可能ストリーム
以下は、動的にデータを生成する単純な読み取り可能ストリーム実装の例です。
class extends Readable {
#count = 0;
() {
this.push(':-)');
if (++this.#count === 5) {
this.push(null);
}
}
}
const = new ();
.on('data', => {
.(.toString());
});
このコードでは、MyStreamクラスがReadableを拡張し、_read()メソッドをオーバーライドして内部バッファに文字列「:-) 」をプッシュします。文字列を5回プッシュした後、nullをプッシュしてストリームの終わりを通知します。on('data')イベントハンドラは、各チャンクを受信するたびにコンソールにログを出力します。
readableイベントによる高度な制御
データフローをさらに細かく制御するために、readableイベントを使用できます。このイベントはより複雑ですが、ストリームからいつデータを読み取るかを明示的に制御できるため、特定のアプリケーションでより良いパフォーマンスを提供します。
const = new MyStream({
: 1,
});
.on('readable', () => {
.('>> readable event');
let ;
while (( = .read()) !== null) {
.(.toString()); // Process the chunk
}
});
.on('end', () => .('>> end event'));
ここでは、readableイベントを使用して、必要に応じてストリームから手動でデータを引き出しています。readableイベントハンドラ内のループは、ストリームバッファからnullが返されるまでデータを読み続けます。nullが返されると、バッファが一時的に空であるか、ストリームが終了したことを示します。highWaterMarkを1に設定すると、バッファサイズが小さく保たれ、readableイベントがより頻繁にトリガーされ、データフローをより細かく制御できます。
上記のコードでは、次のような出力が得られます。
>> readable event: 1
:-):-)
:-)
:-)
:-)
>> readable event: 2
>> readable event: 3
>> readable event: 4
>> end event
これを詳しく見てみましょう。on('readable')イベントをアタッチすると、readableイベントの発行をトリガーする可能性があるため、最初にread()を呼び出します。そのイベントの発行後、whileループの最初の反復でreadを呼び出します。これが、最初の2つのスマイリーが1行に表示される理由です。その後、nullがプッシュされるまでreadを呼び続けます。readを呼び出すたびに新しいreadableイベントの発行がプログラムされますが、「フロー」モード(つまり、readableイベントを使用している状態)にいるため、発行はnextTickにスケジュールされます。これが、ループの同期コードが終了した後に、すべてのスマイリーがまとめて表示される理由です。
注意: NODE_DEBUG=stream を付けてコードを実行すると、各 push の後に emitReadable がトリガーされることが確認できます。
各スマイリーの前にreadableイベントが呼び出されるのを見たい場合は、pushをsetImmediateまたはprocess.nextTickでラップすることができます。
class extends Readable {
#count = 0;
() {
(() => {
this.push(':-)');
if (++this.#count === 5) {
return this.push(null);
}
});
}
}
すると、以下のようになります。
>> readable event: 1
:-)
>> readable event: 2
:-)
>> readable event: 3
:-)
>> readable event: 4
:-)
>> readable event: 5
:-)
>> readable event: 6
>> end event
書き込み可能 (Writable)
Writableストリームは、ファイルの作成、データのアップロード、またはデータを順次出力するタスクに便利です。読み取り可能ストリームがデータソースを提供するのに対し、Node.jsの書き込み可能ストリームはデータの宛先として機能します。Node.js APIにおける書き込み可能ストリームの典型的な例は、fs.WriteStream、process.stdout、およびprocess.stderrです。
書き込み可能ストリームの主要なメソッドとイベント
.write(): このメソッドは、データのチャンクをストリームに書き込むために使用されます。定義された制限(highWaterMark)までデータをバッファリングし、さらにデータをすぐに書き込めるかどうかを示すブール値を返します。.end(): このメソッドは、データ書き込みプロセスの終了を示します。ストリームに書き込み操作を完了させ、必要に応じてクリーンアップを実行するよう指示します。
書き込み可能ストリームの作成
以下は、すべての受信データを大文字に変換してから標準出力に書き込む書き込み可能ストリームを作成する例です。
const { } = ('node:events');
const { } = ('node:stream');
class extends {
constructor() {
super({ : 10 /* 10 bytes */ });
}
(, , ) {
..(.toString().toUpperCase() + '\n', );
}
}
async function () {
const = new ();
for (let = 0; < 10; ++) {
const = !.('hello');
if () {
.('>> wait drain');
await (, 'drain');
}
}
.('world');
}
// Call the async function
().(.);
このコードでは、MyStreamはバッファ容量(highWaterMark)が10バイトのカスタムWritableストリームです。データを大文字に変換してから書き出すために、_writeメソッドをオーバーライドしています。
ループは、ストリームに「hello」を10回書き込もうとします。バッファがいっぱいになると(waitDrainがtrueになる)、drainイベントを待ってから続行し、ストリームのバッファを圧倒しないようにします。
出力は以下のようになります。
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
WORLD
Duplex
Duplexストリームは、読み取り可能インターフェースと書き込み可能インターフェースの両方を実装します。
Duplexストリームの主要なメソッドとイベント
Duplexストリームは、読み取り可能ストリームと書き込み可能ストリームで説明したすべてのメソッドとイベントを実装します。
duplexストリームの良い例は、netモジュールのSocketクラスです。
const = ('node:net');
// Create a TCP server
const = .( => {
.('Hello from server!\n');
.('data', => {
.(`Client says: ${.()}`);
});
// Handle client disconnection
.('end', () => {
.('Client disconnected');
});
});
// Start the server on port 8080
.(8080, () => {
.('Server listening on port 8080');
});
上記のコードは、ポート8080でTCPソケットを開き、接続してきたクライアントにHello from server!を送信し、受信したデータをログに出力します。
const = ('node:net');
// Connect to the server at localhost:8080
const = .({ : 8080 }, () => {
.('Hello from client!\n');
});
.('data', => {
.(`Server says: ${.()}`);
});
// Handle the server closing the connection
.('end', () => {
.('Disconnected from server');
});
上記のコードは、TCPソケットに接続し、Hello from clientメッセージを送信し、受信したデータをログに出力します。
Transform
Transformストリームは、入力に基づいて出力が計算されるduplexストリームです。その名前が示すように、通常は読み取り可能ストリームと書き込み可能ストリームの間でデータを通過させながら変換するために使用されます。
Transformストリームの主要なメソッドとイベント
Duplexストリームのすべてのメソッドとイベントに加えて、次のものがあります。
_transform: この関数は、読み取り可能部分と書き込み可能部分の間のデータフローを処理するために内部的に呼び出されます。これはアプリケーションコードから呼び出してはなりません。
Transformストリームの作成
新しいtransformストリームを作成するには、Transformコンストラクタにoptionsオブジェクトを渡します。これには、pushメソッドを使用して入力データから出力データをどのように計算するかを処理するtransform関数を含めます。
const { } = ('node:stream');
const = new ({
(, , ) {
this.(.toString().toUpperCase());
();
},
});
このストリームは、任意の入力を受け取り、それを大文字で出力します。
ストリームの操作方法
ストリームを扱う際には、通常、ソースから読み取り、宛先に書き込み、その間でデータの変換が必要になることがあります。以下のセクションでは、これを実現するさまざまな方法について説明します。
.pipe()
.pipe()メソッドは、1つの読み取り可能ストリームを書き込み可能(またはtransform)ストリームに連結します。これは目標を達成するための簡単な方法のように見えますが、すべてのエラー処理をプログラマに委ねるため、正しく実装するのが困難です。
以下の例は、現在のファイルを大文字でコンソールに出力しようとするpipeの例です。
const = ('node:fs');
const { } = ('node:stream');
let = 0;
const = new ({
(, , ) {
if ( === 10) {
return (new ('BOOM!'));
}
++;
this.(.toString().toUpperCase());
();
},
});
const = .(, { : 1 });
const = .;
.().();
.('close', () => {
.('Readable stream closed');
});
.('close', () => {
.('Transform stream closed');
});
.('error', => {
.('\nError in transform stream:', .);
});
.('close', () => {
.('Writable stream closed');
});
10文字書き込んだ後、upperはコールバックでエラーを返し、ストリームが閉じられます。しかし、他のストリームには通知されないため、メモリリークが発生します。出力は以下のようになります。
CONST FS =
Error in transform stream: BOOM!
Transform stream closed
pipeline()
.pipe()メソッドの落とし穴や低レベルの複雑さを避けるために、ほとんどの場合、pipeline()メソッドを使用することが推奨されます。このメソッドは、ストリームを安全かつ堅牢にパイプでつなぐ方法であり、エラー処理とクリーンアップを自動的に行います。
次の例は、pipeline()を使用することで、前の例の落とし穴をどのように防ぐかを示しています。
const = ('node:fs');
const { , } = ('node:stream');
let = 0;
const = new ({
(, , ) {
if ( === 10) {
return (new ('BOOM!'));
}
++;
this.(.toString().toUpperCase());
();
},
});
const = .(, { : 1 });
const = .;
.('close', () => {
.('Readable stream closed');
});
.('close', () => {
.('\nTransform stream closed');
});
.('close', () => {
.('Writable stream closed');
});
(, , , => {
if () {
return .('Pipeline error:', .);
}
.('Pipeline succeeded');
});
この場合、すべてのストリームは以下の出力とともに閉じられます。
CONST FS =
Transform stream closed
Writable stream closed
Pipeline error: BOOM!
Readable stream closed
pipeline()メソッドには、async pipeline()バージョンもあります。これはコールバックを受け付けず、代わりにパイプラインが失敗した場合に拒否されるプロミスを返します。
非同期イテレータ
非同期イテレータは、Streams APIとやり取りするための標準的な方法として推奨されます。WebとNode.jsの両方のすべてのストリームプリミティブと比較して、非同期イテレータは理解しやすく、使いやすいため、バグが少なく、より保守しやすいコードになります。最近のNode.jsのバージョンでは、非同期イテレータはストリームとやり取りするためのよりエレガントで読みやすい方法として登場しました。イベントの基盤の上に構築された非同期イテレータは、ストリームの消費を簡素化するより高レベルの抽象化を提供します。
Node.jsでは、すべての読み取り可能ストリームは非同期イテラブルです。これは、for await...of構文を使用して、ストリームのデータが利用可能になるにつれてループ処理し、各データを非同期コードの効率性とシンプルさで処理できることを意味します。
ストリームで非同期イテレータを使用する利点
ストリームで非同期イテレータを使用すると、いくつかの点で非同期データフローの処理が簡素化されます。
- 可読性の向上: 特に複数の非同期データソースを扱う場合、コード構造がよりクリーンで読みやすくなります。
- エラー処理: 非同期イテレータは、通常の非同期関数と同様に、try/catchブロックを使用した簡単なエラー処理を可能にします。
- フロー制御: コンシューマが次のデータチャンクを待機することでフローを制御するため、バックプレッシャーを本質的に管理し、より効率的なメモリ使用と処理を可能にします。
非同期イテレータは、読み取り可能ストリームを扱うためのより現代的で、多くの場合より読みやすい方法を提供します。特に非同期データソースを扱う場合や、より逐次的でループベースのデータ処理アプローチを好む場合に適しています。
以下は、読み取り可能ストリームで非同期イテレータを使用する例です。
const = ('node:fs');
const { } = ('node:stream/promises');
async function () {
await (
.(),
async function* () {
for await (let of ) {
yield .toString().toUpperCase();
}
},
.
);
}
().(.);
このコードは、新しいtransformストリームを定義する必要なく、前の例と同じ結果を達成します。簡潔にするため、前の例からのエラーは削除されています。パイプラインの非同期バージョンが使用されており、起こりうるエラーを処理するためにtry...catchブロックでラップする必要があります。
オブジェクトモード
デフォルトでは、ストリームは文字列、Buffer、TypedArray、またはDataViewで動作します。これら以外の任意の(オブジェクトなどの)値がストリームにプッシュされると、TypeErrorがスローされます。ただし、objectModeオプションをtrueに設定することで、オブジェクトを扱うことが可能です。これにより、ストリームはnullを除く任意のJavaScript値で動作できるようになります。nullはストリームの終了を通知するために使用されます。つまり、読み取り可能ストリームでは任意の値をpushおよびreadでき、書き込み可能ストリームでは任意の値をwriteできます。
const { } = ('node:stream');
const = ({
: true,
() {
this.push({ : 'world' });
this.push(null);
},
});
オブジェクトモードで作業する場合、highWaterMarkオプションはバイト数ではなく、オブジェクトの数を指すことを覚えておくことが重要です。
バックプレッシャー
ストリームを使用する場合、プロデューサーがコンシューマーを圧倒しないようにすることが重要です。このため、Node.js APIのすべてのストリームでバックプレッシャーメカニズムが使用されており、実装者はその動作を維持する責任があります。
データバッファがhighWaterMarkを超えたり、書き込みキューが現在ビジーな状況では、.write()はfalseを返します。
false値が返されると、バックプレッシャーシステムが作動します。これにより、入力側のReadableストリームからのデータ送信が一時停止され、コンシューマーが再び準備できるまで待機します。データバッファが空になると、'drain'イベントが発行され、入力データフローが再開されます。
バックプレッシャーについてさらに詳しく知りたい場合は、バックプレッシャーガイドを参照してください。
Streams vs Web Streams
ストリームの概念はNode.jsに限定されるものではありません。実際、Node.jsにはWeb Streamsと呼ばれるストリーム概念の異なる実装があり、これはWHATWG Streams Standardを実装しています。それらの背後にある概念は似ていますが、APIが異なり、直接互換性がないことに注意することが重要です。
Web Streamsは、Node.jsのReadable、Writable、およびTransformストリームに相当するReadableStream、WritableStream、およびTransformStreamクラスを実装しています。
StreamsとWeb Streamsの相互運用性
Node.jsは、Web StreamsとNode.jsストリーム間の変換を行うユーティリティ関数を提供しています。これらの関数は、各ストリームクラスにtoWebおよびfromWebメソッドとして実装されています。
Duplexクラスの次の例は、Web Streamsに変換された読み取り可能ストリームと書き込み可能ストリームの両方を扱う方法を示しています。
const { } = ('node:stream');
const = ({
() {
this.push('world');
this.push(null);
},
(, , ) {
.('writable', );
();
},
});
const { , } = .();
.().('hello');
.()
.()
.( => {
.('readable', .);
});
このヘルパー関数は、Node.jsモジュールからWeb Streamを返す必要がある場合やその逆の場合に便利です。通常のストリーム消費では、非同期イテレータによりNode.jsとWeb Streamsの両方とシームレスにやり取りできます。
const { } = ('node:stream/promises');
async function () {
const { } = await ('https://node.dokyumento.jp/api/stream.html');
await (
,
new (),
async function* () {
for await (const of ) {
yield .toString().toUpperCase();
}
},
.
);
}
().(.);
fetchのボディはReadableStream<Uint8Array>であり、そのためチャンクを文字列として扱うにはTextDecoderStreamが必要であることに注意してください。
この成果物は、Matteo CollinaがPlatformaticのブログで公開したコンテンツから派生したものです。