Node.js v21.7.2 ドキュメント
- Node.js v21.7.2
-
► 目次
- ストリーム
- このドキュメントの構成
- ストリームの種類
- ストリームコンシューマーのAPI
- 書き込み可能なストリーム
- クラス:
stream.Writable
- イベント:
'close'
- イベント:
'drain'
- イベント:
'error'
- イベント:
'finish'
- イベント:
'pipe'
- イベント:
'unpipe'
writable.cork()
writable.destroy([error])
writable.closed
writable.destroyed
writable.end([chunk[, encoding]][, callback])
writable.setDefaultEncoding(encoding)
writable.uncork()
writable.writable
writable.writableAborted
writable.writableEnded
writable.writableCorked
writable.errored
writable.writableFinished
writable.writableHighWaterMark
writable.writableLength
writable.writableNeedDrain
writable.writableObjectMode
writable.write(chunk[, encoding][, callback])
- イベント:
- クラス:
- 読み取り可能なストリーム
- 2つの読み取りモード
- 3つの状態
- 1つのAPIスタイルを選択する
- クラス:
stream.Readable
- イベント:
'close'
- イベント:
'data'
- イベント:
'end'
- イベント:
'error'
- イベント:
'pause'
- イベント:
'readable'
- イベント:
'resume'
readable.destroy([error])
readable.closed
readable.destroyed
readable.isPaused()
readable.pause()
readable.pipe(destination[, options])
readable.read([size])
readable.readable
readable.readableAborted
readable.readableDidRead
readable.readableEncoding
readable.readableEnded
readable.errored
readable.readableFlowing
readable.readableHighWaterMark
readable.readableLength
readable.readableObjectMode
readable.resume()
readable.setEncoding(encoding)
readable.unpipe([destination])
readable.unshift(chunk[, encoding])
readable.wrap(stream)
readable[Symbol.asyncIterator]()
readable[Symbol.asyncDispose]()
readable.compose(stream[, options])
readable.iterator([options])
readable.map(fn[, options])
readable.filter(fn[, options])
readable.forEach(fn[, options])
readable.toArray([options])
readable.some(fn[, options])
readable.find(fn[, options])
readable.every(fn[, options])
readable.flatMap(fn[, options])
readable.drop(limit[, options])
readable.take(limit[, options])
readable.reduce(fn[, initial[, options]])
- イベント:
- Duplexストリームと変換ストリーム
stream.finished(stream[, options], callback)
stream.pipeline(source[, ...transforms], destination, callback)
stream.pipeline(streams, callback)
stream.compose(...streams)
stream.Readable.from(iterable[, options])
stream.Readable.fromWeb(readableStream[, options])
stream.Readable.isDisturbed(stream)
stream.isErrored(stream)
stream.isReadable(stream)
stream.Readable.toWeb(streamReadable[, options])
stream.Writable.fromWeb(writableStream[, options])
stream.Writable.toWeb(streamWritable)
stream.Duplex.from(src)
stream.Duplex.fromWeb(pair[, options])
stream.Duplex.toWeb(streamDuplex)
stream.addAbortSignal(signal, stream)
stream.getDefaultHighWaterMark(objectMode)
stream.setDefaultHighWaterMark(objectMode, value)
- 書き込み可能なストリーム
- ストリーム実装者のためのAPI
- 補足事項
- ストリーム
-
► インデックス
- アサーションテスト
- 非同期コンテキスト追跡
- 非同期フック
- バッファー
- C++アドオン
- Node-APIを使用したC/C++アドオン
- C++組み込みAPI
- 子プロセス
- クラスター
- コマンドラインオプション
- コンソール
- Corepack
- 暗号
- デバッガー
- 非推奨のAPI
- 診断チャネル
- DNS
- ドメイン
- エラー
- イベント
- ファイルシステム
- グローバル
- HTTP
- HTTP/2
- HTTPS
- インスペクター
- 国際化
- モジュール: CommonJS モジュール
- モジュール: ECMAScript モジュール
- モジュール:
node:module
API - モジュール: パッケージ
- ネット
- OS
- パス
- パフォーマンスフック
- パーミッション
- プロセス
- Punycode
- クエリ文字列
- Readline
- REPL
- レポート
- 単一実行可能アプリケーション
- ストリーム
- 文字列デコーダー
- テストランナー
- タイマー
- TLS/SSL
- トレースイベント
- TTY
- UDP/データグラム
- URL
- ユーティリティ
- V8
- VM
- WASI
- Web Crypto API
- Web Streams API
- ワーカー スレッド
- Zlib
- ► その他のバージョン
- ► オプション
ストリーム[src]#
ソースコード: lib/stream.js
ストリームは、Node.jsでストリーミングデータを扱うための抽象的なインターフェースです。 node:stream
モジュールは、ストリームインターフェースを実装するためのAPIを提供します。
Node.jsによって提供されるストリームオブジェクトは多数あります。たとえば、HTTPサーバーへのリクエストとprocess.stdout
の両方がストリームインスタンスです。
ストリームは、読み取り可能、書き込み可能、またはその両方である可能性があります。すべてのストリームはEventEmitter
のインスタンスです。
node:stream
モジュールにアクセスするには
const stream = require('node:stream');
node:stream
モジュールは、新しいタイプのストリームインスタンスを作成するのに役立ちます。通常、ストリームを消費するためにnode:stream
モジュールを使用する必要はありません。
このドキュメントの構成#
このドキュメントには、主に2つのセクションと、ノート用の3番目のセクションが含まれています。最初のセクションでは、アプリケーション内で既存のストリームを使用する方法について説明します。2番目のセクションでは、新しいタイプのストリームを作成する方法について説明します。
ストリームの種類#
Node.jsには、4つの基本的なストリームの種類があります。
Writable
: データを書き込むことができるストリーム(たとえば、fs.createWriteStream()
)。Readable
: データを読み取ることができるストリーム(たとえば、fs.createReadStream()
)。Duplex
:Readable
とWritable
の両方であるストリーム(たとえば、net.Socket
)。Transform
: 書き込まれ、読み取られる際にデータを変更または変換できるDuplex
ストリーム(たとえば、zlib.createDeflate()
)。
さらに、このモジュールには、ユーティリティ関数 stream.pipeline()
、stream.finished()
、stream.Readable.from()
および stream.addAbortSignal()
が含まれています。
ストリーム Promises API#
stream/promises
APIは、コールバックを使用するのではなく、Promise
オブジェクトを返すストリーム用の代替の非同期ユーティリティ関数セットを提供します。 APIは、require('node:stream/promises')
またはrequire('node:stream').promises
を介してアクセスできます。
stream.pipeline(source[, ...transforms], destination[, options])
#
stream.pipeline(streams[, options])
#
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function>- 戻り値: <Promise> | <AsyncIterable>
...transforms
<Stream> | <Function>source
<AsyncIterable>- 戻り値: <Promise> | <AsyncIterable>
destination
<Stream> | <Function>source
<AsyncIterable>- 戻り値: <Promise> | <AsyncIterable>
options
<Object>signal
<AbortSignal>end
<boolean>
- 戻り値: <Promise> パイプラインが完了したときに履行されます。
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
async function run() {
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
AbortSignal
を使用するには、オプションオブジェクトの最後の引数として渡します。シグナルが中止されると、基になるパイプラインで destroy
が AbortError
とともに呼び出されます。
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
async function run() {
const ac = new AbortController();
const signal = ac.signal;
setImmediate(() => ac.abort());
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
{ signal },
);
}
run().catch(console.error); // AbortError
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}
pipeline
API は非同期ジェネレーターもサポートしています
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
非同期ジェネレーターに渡された signal
引数を処理することを忘れないでください。特に、非同期ジェネレーターがパイプラインのソース(つまり、最初の引数)である場合、パイプラインは決して完了しません。
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
async function run() {
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);
import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
pipeline
API は コールバックバージョン を提供します
stream.finished(stream[, options])
#
stream
<Stream>options
<Object>error
<boolean> | <undefined>readable
<boolean> | <undefined>writable
<boolean> | <undefined>signal
: <AbortSignal> | <undefined>
- 戻り値: <Promise> ストリームが読み取りまたは書き込み可能でなくなったときに履行されます。
const { finished } = require('node:stream/promises');
const fs = require('node:fs');
const rs = fs.createReadStream('archive.tar');
async function run() {
await finished(rs);
console.log('Stream is done reading.');
}
run().catch(console.error);
rs.resume(); // Drain the stream.
import { finished } from 'node:stream/promises';
import { createReadStream } from 'node:fs';
const rs = createReadStream('archive.tar');
async function run() {
await finished(rs);
console.log('Stream is done reading.');
}
run().catch(console.error);
rs.resume(); // Drain the stream.
finished
API は コールバックバージョン も提供します。
オブジェクトモード#
Node.js API で作成されたすべてのストリームは、文字列と Buffer
(または Uint8Array
) オブジェクトのみを操作します。ただし、ストリームの実装で他の種類の JavaScript 値(ストリーム内で特別な目的を果たす null
を除く)を操作することは可能です。このようなストリームは「オブジェクトモード」で動作すると見なされます。
ストリームインスタンスは、ストリームの作成時に objectMode
オプションを使用してオブジェクトモードに切り替えられます。既存のストリームをオブジェクトモードに切り替えようとするのは安全ではありません。
バッファリング#
Writable
ストリームと Readable
ストリームは両方とも、内部バッファにデータを格納します。
バッファリングされる可能性のあるデータ量は、ストリームのコンストラクターに渡される highWaterMark
オプションによって異なります。通常のストリームの場合、highWaterMark
オプションは 合計バイト数 を指定します。オブジェクトモードで動作するストリームの場合、highWaterMark
はオブジェクトの合計数を指定します。
データは、実装が stream.push(chunk)
を呼び出すと、Readable
ストリームにバッファリングされます。ストリームのコンシューマーが stream.read()
を呼び出さない場合、データは消費されるまで内部キューに留まります。
内部読み取りバッファの合計サイズが highWaterMark
で指定されたしきい値に達すると、ストリームは現在バッファリングされているデータが消費されるまで、基になるリソースからのデータの読み取りを一時的に停止します (つまり、ストリームは読み取りバッファを埋めるために使用される内部 readable._read()
メソッドの呼び出しを停止します)。
データは、writable.write(chunk)
メソッドが繰り返し呼び出されると、Writable
ストリームにバッファリングされます。内部書き込みバッファの合計サイズが highWaterMark
で設定されたしきい値より小さい間、writable.write()
の呼び出しは true
を返します。内部バッファのサイズが highWaterMark
に達するか、超えると、false
が返されます。
stream.pipe()
メソッドを含む、stream
API の重要な目標は、さまざまな速度のソースと宛先が利用可能なメモリを圧倒しないように、データのバッファリングを許容可能なレベルに制限することです。
highWaterMark
オプションはしきい値であり、制限ではありません。ストリームがそれ以上のデータの要求を停止する前にバッファリングするデータの量を決定します。一般に、厳密なメモリ制限を強制するものではありません。特定のストリーム実装では、より厳格な制限を強制することを選択できますが、そうすることはオプションです。
Duplex
ストリームと Transform
ストリームは両方とも Readable
および Writable
であるため、それぞれが読み取りと書き込みに使用される *2 つの* 別個の内部バッファを保持し、各側がデータの適切な効率的な流れを維持しながら、互いに独立して動作できるようにします。たとえば、net.Socket
インスタンスは、ソケット *から* 受信したデータの消費を可能にする Readable
側と、ソケット *へ* のデータの書き込みを可能にする Writable
側を備えた Duplex
ストリームです。データは受信されるよりも高速または低速でソケットに書き込まれる可能性があるため、各側は互いに独立して動作 (およびバッファリング) する必要があります。
内部バッファリングのメカニズムは内部実装の詳細であり、いつでも変更される可能性があります。ただし、特定の上級実装では、writable.writableBuffer
または readable.readableBuffer
を使用して内部バッファを取得できます。これらの文書化されていないプロパティの使用は推奨されません。
ストリームコンシューマー用のAPI#
ほとんどすべての Node.js アプリケーションは、どれほど単純であっても、何らかの方法でストリームを使用しています。以下は、HTTP サーバーを実装する Node.js アプリケーションでストリームを使用する例です
const http = require('node:http');
const server = http.createServer((req, res) => {
// `req` is an http.IncomingMessage, which is a readable stream.
// `res` is an http.ServerResponse, which is a writable stream.
let body = '';
// Get the data as utf8 strings.
// If an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');
// Readable streams emit 'data' events once a listener is added.
req.on('data', (chunk) => {
body += chunk;
});
// The 'end' event indicates that the entire body has been received.
req.on('end', () => {
try {
const data = JSON.parse(body);
// Write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});
server.listen(1337);
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON
Writable
ストリーム (例の res
など) は、ストリームにデータを書き込むために使用される write()
や end()
などのメソッドを公開します。
Readable
ストリームは、ストリームから読み取るデータが利用可能になったときにアプリケーションコードに通知するために EventEmitter
API を使用します。利用可能なデータは、複数の方法でストリームから読み取ることができます。
Writable
ストリームと Readable
ストリームはどちらも、ストリームの現在の状態を伝達するために EventEmitter
API をさまざまな方法で使用します。
Duplex
ストリームと Transform
ストリームはどちらも Writable
および Readable
です。
ストリームへのデータの書き込みまたはストリームからのデータの消費を行うアプリケーションは、ストリームインターフェイスを直接実装する必要はなく、通常は require('node:stream')
を呼び出す理由はありません。
新しいタイプのストリームを実装したい開発者は、ストリーム実装者用のAPI のセクションを参照してください。
書き込み可能ストリーム#
書き込み可能ストリームは、データが書き込まれる *宛先* の抽象化です。
Writable
ストリームの例を次に示します
- クライアントでのHTTPリクエスト
- サーバーでのHTTPレスポンス
- fs書き込みストリーム
- zlibストリーム
- cryptoストリーム
- TCPソケット
- 子プロセスのstdin
process.stdout
、process.stderr
これらの例の一部は、実際には Writable
インターフェイスを実装する Duplex
ストリームです。
すべての Writable
ストリームは、stream.Writable
クラスによって定義されたインターフェイスを実装します。
Writable
ストリームの特定のインスタンスはさまざまな点で異なる場合がありますが、すべての Writable
ストリームは、以下の例に示すように同じ基本的な使用パターンに従います
const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data');
クラス: stream.Writable
#
イベント: 'close'
#
ストリームとその基になるリソース (たとえば、ファイル記述子) が閉じられたときに、'close'
イベントが発行されます。このイベントは、これ以上のイベントが発行されず、これ以上の計算が行われないことを示します。
Writable
ストリームは、emitClose
オプションを使用して作成された場合、常に 'close'
イベントを発行します。
イベント: 'drain'
#
stream.write(chunk)
の呼び出しが false
を返した場合、ストリームへのデータ書き込みを再開するのが適切なタイミングになったときに 'drain'
イベントが発生します。
// Write the data to the supplied writable stream one million times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000;
write();
function write() {
let ok = true;
do {
i--;
if (i === 0) {
// Last time!
writer.write(data, encoding, callback);
} else {
// See if we should continue, or wait.
// Don't pass the callback, because we're not done yet.
ok = writer.write(data, encoding);
}
} while (i > 0 && ok);
if (i > 0) {
// Had to stop early!
// Write some more once it drains.
writer.once('drain', write);
}
}
}
イベント: 'error'
#
データの書き込みまたはパイプ処理中にエラーが発生した場合、'error'
イベントが発生します。リスナーのコールバックには、呼び出される際に単一の Error
引数が渡されます。
ストリームの作成時に autoDestroy
オプションが false
に設定されていない限り、'error'
イベントが発生するとストリームは閉じられます。
'error'
の後、'close'
以外のイベント('error'
イベントを含む)は発生しないはずです。
イベント: 'finish'
#
stream.end()
メソッドが呼び出され、すべてのデータが基礎となるシステムにフラッシュされた後、'finish'
イベントが発生します。
const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`);
}
writer.on('finish', () => {
console.log('All writes are now complete.');
});
writer.end('This is the end\n');
イベント: 'pipe'
#
src
<stream.Readable> この書き込み可能なストリームにパイプしているソースストリーム
stream.pipe()
メソッドが Readable ストリームで呼び出され、この書き込み可能なストリームが宛先セットに追加されると、'pipe'
イベントが発生します。
const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
console.log('Something is piping into the writer.');
assert.equal(src, reader);
});
reader.pipe(writer);
イベント: 'unpipe'
#
src
<stream.Readable> この書き込み可能なストリームのパイプを解除したソースストリーム
stream.unpipe()
メソッドが Readable
ストリームで呼び出され、この Writable
ストリームが宛先セットから削除されると、'unpipe'
イベントが発生します。
この Writable
ストリームに Readable
ストリームがパイプ処理中にエラーを発生させた場合にも、このイベントが発生します。
const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
console.log('Something has stopped piping into the writer.');
assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);
writable.cork()
#
writable.cork()
メソッドは、書き込まれたすべてのデータをメモリにバッファリングすることを強制します。バッファリングされたデータは、stream.uncork()
または stream.end()
メソッドのいずれかが呼び出されたときにフラッシュされます。
writable.cork()
の主な目的は、複数の小さなチャンクが高速にストリームに書き込まれる状況に対応することです。それらを即座に基礎となる宛先に転送するのではなく、writable.cork()
は writable.uncork()
が呼び出されるまですべてのチャンクをバッファリングし、存在する場合はそれらすべてを writable._writev()
に渡します。これにより、最初の小さなチャンクが処理されるのを待機している間、データがバッファリングされるヘッドオブラインブロッキング状況を防ぎます。ただし、writable._writev()
を実装せずに writable.cork()
を使用すると、スループットに悪影響を及ぼす可能性があります。
参照: writable.uncork()
, writable._writev()
。
writable.destroy([error])
#
ストリームを破棄します。オプションで 'error'
イベントを発生させ、'close'
イベントを発生させます(emitClose
が false
に設定されていない場合)。この呼び出し後、書き込み可能なストリームは終了し、後続の write()
または end()
の呼び出しは ERR_STREAM_DESTROYED
エラーになります。これは、ストリームを破棄するための破壊的で即時の方法です。以前の write()
の呼び出しがドレインされていない場合があり、ERR_STREAM_DESTROYED
エラーをトリガーする可能性があります。データをフラッシュしてから閉じる必要がある場合、またはストリームを破棄する前に 'drain'
イベントを待機する場合は、destroy の代わりに end()
を使用してください。
const { Writable } = require('node:stream');
const myStream = new Writable();
const fooErr = new Error('foo error');
myStream.destroy(fooErr);
myStream.on('error', (fooErr) => console.error(fooErr.message)); // foo error
const { Writable } = require('node:stream');
const myStream = new Writable();
myStream.destroy();
myStream.on('error', function wontHappen() {});
const { Writable } = require('node:stream');
const myStream = new Writable();
myStream.destroy();
myStream.write('foo', (error) => console.error(error.code));
// ERR_STREAM_DESTROYED
destroy()
が呼び出されると、それ以上の呼び出しは no-op になり、_destroy()
からのエラーを除いて、それ以上のエラーは 'error'
として発生しない可能性があります。
実装者はこのメソッドをオーバーライドするのではなく、代わりに writable._destroy()
を実装する必要があります。
writable.closed
#
'close'
が発生した後、true
です。
writable.destroyed
#
writable.destroy()
が呼び出された後、true
です。
const { Writable } = require('node:stream');
const myStream = new Writable();
console.log(myStream.destroyed); // false
myStream.destroy();
console.log(myStream.destroyed); // true
writable.end([chunk[, encoding]][, callback])
#
chunk
<string> | <Buffer> | <Uint8Array> | <any> 書き込むオプションのデータ。オブジェクトモードで動作していないストリームの場合、chunk
は文字列、Buffer
またはUint8Array
である必要があります。オブジェクトモードのストリームの場合、chunk
はnull
以外の任意の JavaScript 値にすることができます。encoding
<string>chunk
が文字列の場合のエンコーディングcallback
<Function> ストリームが終了したときに呼び出すコールバック。- 戻り値: <this>
writable.end()
メソッドを呼び出すと、Writable
にこれ以上データが書き込まれないことを通知します。オプションの chunk
および encoding
引数を使用すると、ストリームを閉じる直前に追加の最後のデータチャンクを書き込むことができます。
stream.end()
を呼び出した後に stream.write()
メソッドを呼び出すと、エラーが発生します。
// Write 'hello, ' and then end with 'world!'.
const fs = require('node:fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// Writing more now is not allowed!
writable.setDefaultEncoding(encoding)
#
writable.setDefaultEncoding()
メソッドは、Writable
ストリームのデフォルトの encoding
を設定します。
writable.uncork()
#
writable.uncork()
メソッドは、stream.cork()
が呼び出されてからバッファリングされたすべてのデータをフラッシュします。
writable.cork()
および writable.uncork()
を使用してストリームへの書き込みのバッファリングを管理する場合は、process.nextTick()
を使用して writable.uncork()
の呼び出しを遅延させます。これにより、特定の Node.js イベントループフェーズ内で発生するすべての writable.write()
の呼び出しをバッチ処理できます。
stream.cork();
stream.write('some ');
stream.write('data ');
process.nextTick(() => stream.uncork());
ストリームで writable.cork()
メソッドが複数回呼び出された場合は、バッファリングされたデータをフラッシュするために同じ回数だけ writable.uncork()
を呼び出す必要があります。
stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
stream.uncork();
// The data will not be flushed until uncork() is called a second time.
stream.uncork();
});
参照: writable.cork()
。
writable.writable
#
ストリームが破棄、エラー発生、または終了していないことを意味し、writable.write()
を呼び出すのが安全な場合、true
です。
writable.writableAborted
#
'finish'
を発生させる前にストリームが破棄されたか、エラーが発生したかを返します。
writable.writableEnded
#
writable.end()
が呼び出された後、true
です。このプロパティは、データがフラッシュされたかどうかを示すものではありません。これには、代わりに writable.writableFinished
を使用してください。
writable.writableCorked
#
ストリームのコルクを完全に取り外すために writable.uncork()
を呼び出す必要がある回数。
writable.errored
#
ストリームがエラーで破棄された場合、エラーを返します。
writable.writableFinished
#
'finish'
イベントが発生する直前に true
に設定されます。
writable.writableHighWaterMark
#
この Writable
の作成時に渡された highWaterMark
の値を返します。
writable.writableLength
#
このプロパティには、書き込み準備が整ったキュー内のバイト数(またはオブジェクト数)が含まれます。この値は、highWaterMark
の状態に関するイントロスペクションデータを提供します。
writable.writableNeedDrain
#
ストリームのバッファが満杯になり、ストリームが 'drain'
を発行する場合は true
です。
writable.writableObjectMode
#
指定された Writable
ストリームの objectMode
プロパティのゲッター。
writable.write(chunk[, encoding][, callback])
#
chunk
<string> | <Buffer> | <Uint8Array> | <any> 書き込むオプションのデータ。オブジェクトモードで動作していないストリームの場合、chunk
は文字列、Buffer
またはUint8Array
である必要があります。オブジェクトモードのストリームの場合、chunk
はnull
以外の任意の JavaScript 値にすることができます。encoding
<string> | <null>chunk
が文字列の場合のエンコーディング。デフォルト:'utf8'
callback
<Function> このデータのチャンクがフラッシュされたときのコールバック。- 戻り値: <boolean> 追加のデータの書き込みを続行する前に、ストリームが
'drain'
イベントが発行されるのを待つことを呼び出し元のコードに要求する場合はfalse
。それ以外の場合はtrue
。
writable.write()
メソッドはストリームにデータを書き込み、データが完全に処理されたら、指定された callback
を呼び出します。エラーが発生した場合、callback
はエラーを最初の引数として呼び出されます。callback
は非同期的に呼び出され、'error'
が発行される前に呼び出されます。
戻り値は、chunk
を受け入れた後、内部バッファがストリームの作成時に構成された highWaterMark
より小さい場合は true
です。false
が返された場合、'drain'
イベントが発行されるまで、ストリームへの追加のデータの書き込みを停止する必要があります。
ストリームがドレインされていない間、write()
の呼び出しは chunk
をバッファリングし、false を返します。現在バッファリングされているすべてのチャンクがドレインされる(オペレーティングシステムによる配信が受け入れられる)と、'drain'
イベントが発行されます。write()
が false を返したら、'drain'
イベントが発行されるまで、それ以上のチャンクを書き込まないでください。ドレインされていないストリームで write()
を呼び出すことは許可されていますが、Node.js は最大メモリ使用量が発生するまですべての書き込まれたチャンクをバッファリングし、その時点で無条件に中止します。中止する前であっても、メモリ使用量が多いと、ガベージコレクターのパフォーマンスが低下し、高い RSS (通常、メモリが不要になった後でもシステムに解放されない) が発生します。TCPソケットはリモートピアがデータを読み取らない場合、ドレインされない可能性があるため、ドレインされていないソケットを書き込むと、リモートから悪用可能な脆弱性につながる可能性があります。
ストリームがドレインされていない間にデータを書き込むことは、Transform
にとって特に問題があります。これは、Transform
ストリームは、パイプ処理されるか、'data'
または 'readable'
イベントハンドラーが追加されるまで、デフォルトで一時停止されるためです。
書き込むデータがオンデマンドで生成またはフェッチできる場合は、ロジックを Readable
にカプセル化し、stream.pipe()
を使用することをお勧めします。ただし、write()
を呼び出すことを優先する場合は、'drain'
イベントを使用してバックプレッシャーを尊重し、メモリの問題を回避することが可能です。
function write(data, cb) {
if (!stream.write(data)) {
stream.once('drain', cb);
} else {
process.nextTick(cb);
}
}
// Wait for cb to be called before doing any other write.
write('hello', () => {
console.log('Write completed, do more writes now.');
});
オブジェクトモードの Writable
ストリームは、常に encoding
引数を無視します。
読み取り可能ストリーム#
読み取り可能ストリームは、データが消費されるソースの抽象化です。
Readable
ストリームの例としては、次のようなものがあります。
- クライアント上の HTTP レスポンス
- サーバー上の HTTP リクエスト
- fs 読み取りストリーム
- zlibストリーム
- cryptoストリーム
- TCPソケット
- 子プロセス stdout および stderr
process.stdin
すべての Readable
ストリームは、stream.Readable
クラスによって定義されたインターフェイスを実装します。
2つの読み取りモード#
Readable
ストリームは、事実上、フローモードと一時停止モードの2つのモードのいずれかで動作します。これらのモードは、オブジェクトモードとは異なります。Readable
ストリームは、フローモードまたは一時停止モードのどちらであるかに関係なく、オブジェクトモードであるか、そうでない場合があります。
-
フローモードでは、データは基盤となるシステムから自動的に読み取られ、
EventEmitter
インターフェイスを介してイベントを使用して可能な限り迅速にアプリケーションに提供されます。 -
一時停止モードでは、ストリームからデータのチャンクを読み取るために、
stream.read()
メソッドを明示的に呼び出す必要があります。
すべての Readable
ストリームは一時停止モードで開始されますが、次のいずれかの方法でフローモードに切り替えることができます
'data'
イベントハンドラーの追加。stream.resume()
メソッドの呼び出し。stream.pipe()
メソッドを呼び出して、データをWritable
に送信します。
Readable
は、次のいずれかを使用して一時停止モードに戻すことができます
- パイプの宛先がない場合は、
stream.pause()
メソッドを呼び出すこと。 - パイプの宛先がある場合は、すべてのパイプの宛先を削除すること。複数のパイプの宛先は、
stream.unpipe()
メソッドを呼び出すことで削除できます。
覚えておくべき重要な概念は、Readable
は、そのデータを消費または無視するためのメカニズムが提供されるまで、データを生成しないということです。消費メカニズムが無効または取り外された場合、Readable
はデータの生成を停止する試行を行います。
後方互換性の理由から、'data'
イベントハンドラーを削除しても、ストリームは自動的に一時停止されません。また、パイプされた宛先がある場合、stream.pause()
を呼び出しても、それらの宛先がドレインされて、さらにデータを要求した後、ストリームが一時停止されたままになることは保証されません。
Readable
がフローモードに切り替えられ、データを処理できるコンシューマーがない場合、そのデータは失われます。これは、たとえば、'data'
イベントにリスナーがアタッチされていない状態で readable.resume()
メソッドが呼び出された場合や、'data'
イベントハンドラーがストリームから削除された場合に発生する可能性があります。
'readable'
イベントハンドラーを追加すると、ストリームが自動的にフローを停止し、データは readable.read()
を介して消費する必要があります。'readable'
イベントハンドラーが削除された場合、'data'
イベントハンドラーがあれば、ストリームは再びフローを開始します。
3つの状態#
Readable
ストリームの「2つのモード」の動作は、Readable
ストリームの実装内で発生する、より複雑な内部状態管理を簡略化した抽象化です。
具体的には、任意の時点で、すべての Readable
は、次の3つの可能な状態のいずれかにあります。
readable.readableFlowing === null
readable.readableFlowing === false
readable.readableFlowing === true
readable.readableFlowing
が null
の場合、ストリームのデータを消費するメカニズムは提供されていません。したがって、ストリームはデータを生成しません。この状態では、'data'
イベントのリスナーをアタッチしたり、readable.pipe()
メソッドを呼び出したり、readable.resume()
メソッドを呼び出すと、readable.readableFlowing
が true
に切り替わり、データが生成されると Readable
が積極的にイベントを発行し始めます。
readable.pause()
、readable.unpipe()
を呼び出すか、バックプレッシャーを受け取ると、readable.readableFlowing
が false
に設定され、イベントのフローが一時的に停止しますが、データの生成は停止しません。この状態では、'data'
イベントのリスナーをアタッチしても、readable.readableFlowing
が true
に切り替わることはありません。
const { PassThrough, Writable } = require('node:stream');
const pass = new PassThrough();
const writable = new Writable();
pass.pipe(writable);
pass.unpipe(writable);
// readableFlowing is now false.
pass.on('data', (chunk) => { console.log(chunk.toString()); });
// readableFlowing is still false.
pass.write('ok'); // Will not emit 'data'.
pass.resume(); // Must be called to make stream emit 'data'.
// readableFlowing is now true.
readable.readableFlowing
が false
の間、データはストリームの内部バッファ内に蓄積される可能性があります。
1つのAPIスタイルを選択する#
Readable
ストリームAPIは、複数のNode.jsバージョンにわたって進化し、ストリームデータを消費するための複数のメソッドを提供しています。一般的に、開発者はデータの消費方法の1つを選択し、単一のストリームからデータを消費するために複数のメソッドを使用すべきではありません。具体的には、on('data')
、on('readable')
、pipe()
、または非同期イテレーターを組み合わせて使用すると、直感的でない動作につながる可能性があります。
クラス: stream.Readable
#
イベント: 'close'
#
ストリームとその基になるリソース (たとえば、ファイル記述子) が閉じられたときに、'close'
イベントが発行されます。このイベントは、これ以上のイベントが発行されず、これ以上の計算が行われないことを示します。
Readable
ストリームは、emitClose
オプションを使用して作成された場合、常に 'close'
イベントを発行します。
イベント: 'data'
#
chunk
<Buffer> | <string> | <any> データのチャンク。オブジェクトモードで動作していないストリームの場合、チャンクは文字列またはBuffer
のいずれかになります。オブジェクトモードのストリームの場合、チャンクはnull
以外の任意の JavaScript 値にすることができます。
'data'
イベントは、ストリームがデータのチャンクの所有権をコンシューマーに譲渡するたびに発行されます。これは、readable.pipe()
、readable.resume()
を呼び出すか、'data'
イベントにリスナーコールバックをアタッチしてストリームがフローモードに切り替えられるたびに発生する可能性があります。'data'
イベントは、readable.read()
メソッドが呼び出され、返されるデータのチャンクが利用可能な場合にも発行されます。
明示的に一時停止されていないストリームに 'data'
イベントリスナーをアタッチすると、ストリームがフローモードに切り替わります。データは利用可能になるとすぐに渡されます。
リスナーのコールバックには、readable.setEncoding()
メソッドを使用してストリームにデフォルトのエンコーディングが指定されている場合は、データのチャンクが文字列として渡されます。それ以外の場合は、データは Buffer
として渡されます。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
イベント: 'end'
#
'end'
イベントは、ストリームから消費するデータがなくなったときに発行されます。
'end'
イベントは、データが完全に消費されない限り **発行されません**。これは、ストリームをフローモードに切り替えるか、すべてのデータが消費されるまで stream.read()
を繰り返し呼び出すことで実現できます。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
console.log('There will be no more data.');
});
イベント: 'error'
#
'error'
イベントは、Readable
の実装によっていつでも発行される可能性があります。通常、これは、基礎となるストリームが内部的なエラーのためにデータを生成できない場合、またはストリームの実装が無効なデータのチャンクをプッシュしようとした場合に発生する可能性があります。
リスナーのコールバックには、単一の Error
オブジェクトが渡されます。
イベント: 'pause'
#
'pause'
イベントは、stream.pause()
が呼び出され、readableFlowing
が false
でない場合に発行されます。
イベント: 'readable'
#
'readable'
イベントは、ストリームから読み取るデータがある場合、またはストリームの終わりに達した場合に発行されます。事実上、'readable'
イベントは、ストリームに新しい情報があることを示します。データが利用可能な場合、stream.read()
はそのデータを返します。
const readable = getReadableStreamSomehow();
readable.on('readable', function() {
// There is some data to read now.
let data;
while ((data = this.read()) !== null) {
console.log(data);
}
});
ストリームの終わりに達した場合、stream.read()
を呼び出すと null
が返され、'end'
イベントがトリガーされます。これは、読み取るデータがまったくなかった場合も同様です。たとえば、次の例では、foo.txt
は空のファイルです。
const fs = require('node:fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
console.log(`readable: ${rr.read()}`);
});
rr.on('end', () => {
console.log('end');
});
このスクリプトを実行した場合の出力は次のとおりです。
$ node test.js
readable: null
end
場合によっては、'readable'
イベントのリスナーをアタッチすると、ある程度のデータが内部バッファーに読み込まれることがあります。
一般に、readable.pipe()
および 'data'
イベントのメカニズムは、'readable'
イベントよりも理解しやすいです。ただし、'readable'
を処理すると、スループットが向上する可能性があります。
'readable'
と 'data'
の両方が同時に使用される場合、フローの制御において 'readable'
が優先されます。つまり、stream.read()
が呼び出された場合にのみ 'data'
が発行されます。readableFlowing
プロパティは false
になります。'readable'
が削除されたときに 'data'
リスナーがある場合、ストリームはフローを開始します。つまり、.resume()
を呼び出さなくても 'data'
イベントが発行されます。
イベント: 'resume'
#
'resume'
イベントは、stream.resume()
が呼び出され、readableFlowing
が true
でない場合に発行されます。
readable.destroy([error])
#
ストリームを破棄します。必要に応じて、'error'
イベントを発行し、'close'
イベントを発行します(emitClose
が false
に設定されていない場合)。この呼び出しの後、読み取り可能ストリームは内部リソースを解放し、後続の push()
の呼び出しは無視されます。
destroy()
が呼び出されると、それ以上の呼び出しは no-op になり、_destroy()
からのエラーを除いて、それ以上のエラーは 'error'
として発生しない可能性があります。
実装者はこのメソッドをオーバーライドするべきではありません。代わりに readable._destroy()
を実装する必要があります。
readable.closed
#
'close'
が発生した後、true
です。
readable.destroyed
#
readable.destroy()
が呼び出された後、true
になります。
readable.isPaused()
#
- 戻り値: <boolean>
readable.isPaused()
メソッドは、Readable
の現在の動作状態を返します。これは主に、readable.pipe()
メソッドの基礎となるメカニズムで使用されます。ほとんどの一般的なケースでは、このメソッドを直接使用する理由はありません。
const readable = new stream.Readable();
readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false
readable.pause()
#
- 戻り値: <this>
readable.pause()
メソッドは、フローモードのストリームが 'data'
イベントの発行を停止させ、フローモードから切り替えます。利用可能になったデータは、内部バッファーに残ります。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
readable.pause();
console.log('There will be no additional data for 1 second.');
setTimeout(() => {
console.log('Now data will start flowing again.');
readable.resume();
}, 1000);
});
readable.pause()
メソッドは、'readable'
イベントリスナーがある場合は効果がありません。
readable.pipe(destination[, options])
#
destination
<stream.Writable> データの書き込み先options
<Object> パイプオプションend
<boolean> リーダーが終了したときにライターを終了します。デフォルト:true
。
- 戻り値: <stream.Writable> destination は、
Duplex
またはTransform
ストリームである場合、パイプのチェーンを可能にします。
readable.pipe()
メソッドは、Writable
ストリームを readable
にアタッチし、自動的にフローモードに切り替えて、すべてのデータをアタッチされた Writable
にプッシュします。データのフローは自動的に管理され、宛先 Writable
ストリームがより高速な Readable
ストリームに圧倒されないようにします。
次の例では、readable
からのすべてのデータを file.txt
という名前のファイルにパイプします。
const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'.
readable.pipe(writable);
複数の Writable
ストリームを単一の Readable
ストリームにアタッチできます。
readable.pipe()
メソッドは、destination ストリームへの参照を返し、パイプされたストリームのチェーンを設定することを可能にします。
const fs = require('node:fs');
const zlib = require('node:zlib');
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);
デフォルトでは、ソース Readable
ストリームが 'end'
を発行したときに、宛先 Writable
ストリームで stream.end()
が呼び出され、宛先が書き込み可能でなくなります。このデフォルトの動作を無効にするには、end
オプションを false
として渡すと、宛先ストリームが開いたままになります。
reader.pipe(writer, { end: false });
reader.on('end', () => {
writer.end('Goodbye\n');
});
1つの重要な注意点は、Readable
ストリームが処理中にエラーを発行した場合、Writable
宛先は自動的に *閉じられない* ことです。エラーが発生した場合、メモリリークを防ぐために、各ストリームを *手動で* 閉じる必要があります。
process.stderr
および process.stdout
Writable
ストリームは、指定されたオプションに関係なく、Node.js プロセスが終了するまで閉じられません。
readable.read([size])
#
readable.read()
メソッドは、内部バッファーからデータを読み取り、返します。読み取るデータがない場合、null
が返されます。デフォルトでは、readable.setEncoding()
メソッドを使用してエンコーディングが指定されているか、ストリームがオブジェクトモードで動作している場合を除き、データは Buffer
オブジェクトとして返されます。
オプションの size
引数は、読み取る特定のバイト数を指定します。size
バイトが読み取れない場合、ストリームが終了した場合 *を除き*、null
が返されます。その場合は、内部バッファーに残っているすべてのデータが返されます。
size
引数が指定されていない場合、内部バッファーに含まれるすべてのデータが返されます。
size
引数は 1 GiB 以下である必要があります。
readable.read()
メソッドは、ポーズモードで動作する Readable
ストリームでのみ呼び出す必要があります。フローモードでは、内部バッファーが完全に排出されるまで、readable.read()
が自動的に呼び出されます。
const readable = getReadableStreamSomehow();
// 'readable' may be triggered multiple times as data is buffered in
readable.on('readable', () => {
let chunk;
console.log('Stream is readable (new data received in buffer)');
// Use a loop to make sure we read all currently available data
while (null !== (chunk = readable.read())) {
console.log(`Read ${chunk.length} bytes of data...`);
}
});
// 'end' will be triggered once when there is no more data available
readable.on('end', () => {
console.log('Reached end of stream.');
});
readable.read()
を呼び出すたびに、データのチャンクまたは null
が返されます。チャンクは連結されません。バッファー内のすべてのデータを消費するには、while
ループが必要です。大きなファイルを読み込むとき、.read()
は、バッファリングされたコンテンツをすべて消費して null
を返す場合がありますが、まだバッファリングされていないデータがまだあります。この場合、バッファーにさらにデータがある場合は、新しい 'readable'
イベントが発行されます。最後に、データがなくなる と 'end'
イベントが発行されます。
したがって、readable
からファイルのすべてのコンテンツを読み取るには、複数の 'readable'
イベントにわたってチャンクを収集する必要があります。
const chunks = [];
readable.on('readable', () => {
let chunk;
while (null !== (chunk = readable.read())) {
chunks.push(chunk);
}
});
readable.on('end', () => {
const content = chunks.join('');
});
オブジェクトモードの Readable
ストリームは、size
引数の値に関係なく、readable.read(size)
の呼び出しから常に単一の項目を返します。
readable.read()
メソッドがデータのチャンクを返した場合、'data'
イベントも発行されます。
'end'
イベントが発行された後に stream.read([size])
を呼び出すと、null
が返されます。ランタイムエラーは発生しません。
readable.readable
#
readable.read()
を呼び出しても安全な場合、つまり、ストリームが破棄されていないか、'error'
または 'end'
が発行されていない場合は true
になります。
readable.readableAborted
#
ストリームが 'end'
を発行する前に破棄されたか、エラーが発生したかどうかを返します。
readable.readableDidRead
#
'data'
が発行されたかどうかを返します。
readable.readableEncoding
#
与えられたReadable
ストリームのプロパティencoding
を取得するゲッターです。encoding
プロパティは、readable.setEncoding()
メソッドを使用して設定できます。
readable.readableEnded
#
'end'
イベントが発行されるとtrue
になります。
readable.errored
#
ストリームがエラーで破棄された場合、エラーを返します。
readable.readableFlowing
#
このプロパティは、3つの状態セクションで説明されているReadable
ストリームの現在の状態を反映します。
readable.readableHighWaterMark
#
このReadable
の作成時に渡されたhighWaterMark
の値を返します。
readable.readableLength
#
このプロパティには、読み取り準備ができたキュー内のバイト数(またはオブジェクト数)が含まれています。この値は、highWaterMark
の状態に関するイントロスペクションデータを提供します。
readable.readableObjectMode
#
与えられたReadable
ストリームのプロパティobjectMode
を取得するゲッターです。
readable.resume()
#
- 戻り値: <this>
readable.resume()
メソッドは、明示的に一時停止されたReadable
ストリームに'data'
イベントの発行を再開させ、ストリームをフローモードに切り替えます。
readable.resume()
メソッドは、データを実際に処理することなく、ストリームからデータを完全に消費するために使用できます。
getReadableStreamSomehow()
.resume()
.on('end', () => {
console.log('Reached the end, but did not read anything.');
});
'readable'
イベントリスナーがある場合、readable.resume()
メソッドは効果がありません。
readable.setEncoding(encoding)
#
readable.setEncoding()
メソッドは、Readable
ストリームから読み取られたデータの文字エンコーディングを設定します。
デフォルトでは、エンコーディングは割り当てられず、ストリームデータはBuffer
オブジェクトとして返されます。エンコーディングを設定すると、ストリームデータはBuffer
オブジェクトではなく、指定されたエンコーディングの文字列として返されます。たとえば、readable.setEncoding('utf8')
を呼び出すと、出力データはUTF-8データとして解釈され、文字列として渡されます。readable.setEncoding('hex')
を呼び出すと、データは16進文字列形式でエンコードされます。
Readable
ストリームは、ストリームからBuffer
オブジェクトとして単純に取り出した場合には不適切にデコードされる可能性のある、ストリームを通じて配信されるマルチバイト文字を適切に処理します。
const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
assert.equal(typeof chunk, 'string');
console.log('Got %d characters of string data:', chunk.length);
});
readable.unpipe([destination])
#
destination
<stream.Writable> オプションのアンパイプする特定のストリーム- 戻り値: <this>
readable.unpipe()
メソッドは、以前にstream.pipe()
メソッドを使用してアタッチされたWritable
ストリームをデタッチします。
destination
が指定されていない場合、すべてのパイプがデタッチされます。
destination
が指定されているが、パイプが設定されていない場合、メソッドは何もしません。
const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second.
readable.pipe(writable);
setTimeout(() => {
console.log('Stop writing to file.txt.');
readable.unpipe(writable);
console.log('Manually close the file stream.');
writable.end();
}, 1000);
readable.unshift(chunk[, encoding])
#
chunk
<Buffer> | <Uint8Array> | <string> | <null> | <any> 読み取りキューにunshiftするデータのチャンク。オブジェクトモードで動作しないストリームの場合、chunk
は文字列、Buffer
、Uint8Array
、またはnull
である必要があります。オブジェクトモードのストリームの場合、chunk
は任意のJavaScript値にすることができます。encoding
<string> 文字列チャンクのエンコーディング。'utf8'
や'ascii'
などの有効なBuffer
エンコーディングである必要があります。
chunk
をnull
として渡すと、ストリームの終了(EOF)が通知され、readable.push(null)
と同じように動作し、その後はデータを書き込むことはできません。EOF信号はバッファの最後に追加され、バッファリングされたデータはすべてフラッシュされます。
readable.unshift()
メソッドは、データのチャンクを内部バッファに戻します。これは、ストリームが、ソースから楽観的に取り出したデータを「アンコンシューム」する必要があるコードによって消費され、そのデータを他の関係者に渡す必要がある特定の状況で役立ちます。
stream.unshift(chunk)
メソッドは、'end'
イベントが発行された後は呼び出すことができず、ランタイムエラーがスローされます。
stream.unshift()
を使用している開発者は、代わりにTransform
ストリームの使用に切り替えることを検討する必要があります。詳細については、ストリーム実装者のためのAPIセクションを参照してください。
// Pull off a header delimited by \n\n.
// Use unshift() if we get too much.
// Call the callback with (error, header, stream).
const { StringDecoder } = require('node:string_decoder');
function parseHeader(stream, callback) {
stream.on('error', callback);
stream.on('readable', onReadable);
const decoder = new StringDecoder('utf8');
let header = '';
function onReadable() {
let chunk;
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk);
if (str.includes('\n\n')) {
// Found the header boundary.
const split = str.split(/\n\n/);
header += split.shift();
const remaining = split.join('\n\n');
const buf = Buffer.from(remaining, 'utf8');
stream.removeListener('error', callback);
// Remove the 'readable' listener before unshifting.
stream.removeListener('readable', onReadable);
if (buf.length)
stream.unshift(buf);
// Now the body of the message can be read from the stream.
callback(null, header, stream);
return;
}
// Still reading the header.
header += str;
}
}
}
stream.push(chunk)
とは異なり、stream.unshift(chunk)
は、ストリームの内部読み取り状態をリセットすることによって読み取りプロセスを終了しません。これは、読み取り中にreadable.unshift()
が呼び出された場合(つまり、カスタムストリームのstream._read()
実装内から)、予期しない結果を引き起こす可能性があります。readable.unshift()
の呼び出しに続いて、すぐにstream.push('')
を呼び出すと、読み取り状態が適切にリセットされますが、読み取りを実行中にreadable.unshift()
の呼び出しを避けるのが最善です。
readable.wrap(stream)
#
Node.js 0.10より前は、ストリームは現在定義されているnode:stream
モジュールAPI全体を実装していませんでした。(詳細については、互換性を参照してください。)
'data'
イベントを発行し、アドバイザリのみのstream.pause()
メソッドを持つ古いNode.jsライブラリを使用する場合、readable.wrap()
メソッドを使用して、古いストリームをデータソースとして使用するReadable
ストリームを作成できます。
readable.wrap()
を使用する必要はめったにありませんが、このメソッドは古いNode.jsアプリケーションおよびライブラリと対話するための便利な手段として提供されています。
const { OldReader } = require('./old-api-module.js');
const { Readable } = require('node:stream');
const oreader = new OldReader();
const myReader = new Readable().wrap(oreader);
myReader.on('readable', () => {
myReader.read(); // etc.
});
readable[Symbol.asyncIterator]()
#
- 戻り値: ストリームを完全に消費するための<AsyncIterator>。
const fs = require('node:fs');
async function print(readable) {
readable.setEncoding('utf8');
let data = '';
for await (const chunk of readable) {
data += chunk;
}
console.log(data);
}
print(fs.createReadStream('file')).catch(console.error);
ループがbreak
、return
、またはthrow
で終了した場合、ストリームは破棄されます。言い換えれば、ストリームを反復処理すると、ストリームが完全に消費されます。ストリームは、highWaterMark
オプションと同じサイズのチャンクで読み取られます。上記のコード例では、fs.createReadStream()
にhighWaterMark
オプションが指定されていないため、ファイルに64 KiB未満のデータがある場合、データは1つのチャンクになります。
readable[Symbol.asyncDispose]()
#
AbortError
を使用してreadable.destroy()
を呼び出し、ストリームが完了したときに履行されるPromiseを返します。
readable.compose(stream[, options])
#
stream
<Stream> | <Iterable> | <AsyncIterable> | <Function>options
<Object>signal
<AbortSignal> シグナルが中断された場合にストリームを破棄できるようにします。
- 戻り値: ストリーム
stream
で構成されたストリーム<Duplex>。
import { Readable } from 'node:stream';
async function* splitToWords(source) {
for await (const chunk of source) {
const words = String(chunk).split(' ');
for (const word of words) {
yield word;
}
}
}
const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords);
const words = await wordsStream.toArray();
console.log(words); // prints ['this', 'is', 'compose', 'as', 'operator']
詳細については、stream.compose
を参照してください。
readable.iterator([options])
#
options
<Object>destroyOnReturn
<boolean>false
に設定すると、非同期イテレータでreturn
を呼び出すか、break
、return
、またはthrow
を使用してfor await...of
反復を終了しても、ストリームは破棄されません。デフォルト:true
。
- 戻り値: ストリームを消費するための<AsyncIterator>。
このメソッドで作成されたイテレータを使用すると、for await...of
ループがreturn
、break
、またはthrow
によって終了した場合、またはストリームが反復処理中にエラーを発行した場合に、イテレータがストリームを破棄する必要がある場合に、ストリームの破棄をキャンセルするオプションがユーザーに提供されます。
const { Readable } = require('node:stream');
async function printIterator(readable) {
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk); // 1
break;
}
console.log(readable.destroyed); // false
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk); // Will print 2 and then 3
}
console.log(readable.destroyed); // True, stream was totally consumed
}
async function printSymbolAsyncIterator(readable) {
for await (const chunk of readable) {
console.log(chunk); // 1
break;
}
console.log(readable.destroyed); // true
}
async function showBoth() {
await printIterator(Readable.from([1, 2, 3]));
await printSymbolAsyncIterator(Readable.from([1, 2, 3]));
}
showBoth();
readable.map(fn[, options])
#
fn
<Function> | <AsyncFunction> ストリーム内のすべてのチャンクをマップする関数。data
<any> ストリームからのデータのチャンク。options
<Object>signal
<AbortSignal> ストリームが破棄された場合に中断され、fn
の呼び出しを早期に中断できるようにします。
options
<Object>concurrency
<number> ストリームで一度に呼び出すfn
の最大同時呼び出し数。デフォルト:1
。highWaterMark
<number> マップされた項目のユーザー消費を待機中にバッファリングする項目の数。デフォルト:concurrency * 2 - 1
。signal
<AbortSignal> シグナルが中断された場合にストリームを破棄できるようにします。
- 戻り値: 関数
fn
でマップされたストリーム<Readable>。
このメソッドを使用すると、ストリームをマップできます。fn
関数は、ストリーム内のすべてのチャンクに対して呼び出されます。fn
関数がPromiseを返す場合、そのPromiseは、結果のストリームに渡される前にawait
されます。
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
// With a synchronous mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
console.log(chunk); // 2, 4, 6, 8
}
// With an asynchronous mapper, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map((domain) => resolver.resolve4(domain), { concurrency: 2 });
for await (const result of dnsResults) {
console.log(result); // Logs the DNS result of resolver.resolve4.
}
readable.filter(fn[, options])
#
fn
<Function> | <AsyncFunction> ストリームからチャンクをフィルタリングするための関数。data
<any> ストリームからのデータのチャンク。options
<Object>signal
<AbortSignal> ストリームが破棄された場合に中断され、fn
の呼び出しを早期に中断できるようにします。
options
<Object>concurrency
<number> ストリームで一度に呼び出すfn
の最大同時呼び出し数。デフォルト:1
。highWaterMark
<number> フィルタリングされたアイテムのユーザーによる消費を待機している間にバッファリングするアイテムの数。デフォルト:concurrency * 2 - 1
。signal
<AbortSignal> シグナルが中断された場合にストリームを破棄できるようにします。
- 戻り値: <Readable> 述語
fn
でフィルタリングされたストリーム。
このメソッドはストリームをフィルタリングできます。ストリーム内の各チャンクに対して、fn
関数が呼び出され、それが真値を返す場合、チャンクは結果ストリームに渡されます。fn
関数がPromiseを返す場合、そのPromiseはawait
されます。
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).filter(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address.ttl > 60;
}, { concurrency: 2 });
for await (const result of dnsResults) {
// Logs domains with more than 60 seconds on the resolved dns record.
console.log(result);
}
readable.forEach(fn[, options])
#
fn
<Function> | <AsyncFunction> ストリームの各チャンクで呼び出す関数。data
<any> ストリームからのデータのチャンク。options
<Object>signal
<AbortSignal> ストリームが破棄された場合に中断され、fn
の呼び出しを早期に中断できるようにします。
options
<Object>concurrency
<number> ストリームで一度に呼び出すfn
の最大同時呼び出し数。デフォルト:1
。signal
<AbortSignal> シグナルが中断された場合にストリームを破棄できるようにします。
- 戻り値: <Promise> ストリームが終了したときに完了するPromise。
このメソッドはストリームを反復処理できます。ストリーム内の各チャンクに対して、fn
関数が呼び出されます。fn
関数がPromiseを返す場合、そのPromiseはawait
されます。
このメソッドは、チャンクを同時に処理できる点が、for await...of
ループとは異なります。さらに、forEach
反復処理は、signal
オプションを渡して関連するAbortController
を中止することによってのみ停止できますが、for await...of
はbreak
またはreturn
で停止できます。どちらの場合でも、ストリームは破棄されます。
このメソッドは、基盤のメカニズムでreadable
イベントを使用し、同時fn
呼び出しの数を制限できる点が、'data'
イベントをリッスンすることとは異なります。
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address;
}, { concurrency: 2 });
await dnsResults.forEach((result) => {
// Logs result, similar to `for await (const result of dnsResults)`
console.log(result);
});
console.log('done'); // Stream has finished
readable.toArray([options])
#
options
<Object>signal
<AbortSignal> シグナルが中止された場合に、toArray操作をキャンセルできます。
- 戻り値: <Promise> ストリームの内容を含む配列を含むPromise。
このメソッドを使用すると、ストリームの内容を簡単に取得できます。
このメソッドはストリーム全体をメモリに読み込むため、ストリームの利点を打ち消します。これは、ストリームを消費する主な方法としてではなく、相互運用性と利便性を目的としています。
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]
// Make dns queries concurrently using .map and collect
// the results into an array using toArray
const dnsResults = await Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address;
}, { concurrency: 2 }).toArray();
readable.some(fn[, options])
#
fn
<Function> | <AsyncFunction> ストリームの各チャンクで呼び出す関数。data
<any> ストリームからのデータのチャンク。options
<Object>signal
<AbortSignal> ストリームが破棄された場合に中断され、fn
の呼び出しを早期に中断できるようにします。
options
<Object>concurrency
<number> ストリームで一度に呼び出すfn
の最大同時呼び出し数。デフォルト:1
。signal
<AbortSignal> シグナルが中断された場合にストリームを破棄できるようにします。
- 戻り値: <Promise>
fn
が少なくとも1つのチャンクに対して真値を返した場合、true
と評価されるPromise。
このメソッドはArray.prototype.some
に似ており、await
された戻り値がtrue
(または任意の真値)になるまで、ストリーム内の各チャンクでfn
を呼び出します。チャンクに対するfn
呼び出しのawait
された戻り値が真値になると、ストリームは破棄され、Promiseはtrue
で完了します。チャンクに対するfn
呼び出しのいずれも真値を返さない場合、Promiseはfalse
で完了します。
import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';
// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).some((x) => x > 2); // true
await Readable.from([1, 2, 3, 4]).some((x) => x < 0); // false
// With an asynchronous predicate, making at most 2 file checks at a time.
const anyBigFile = await Readable.from([
'file1',
'file2',
'file3',
]).some(async (fileName) => {
const stats = await stat(fileName);
return stats.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished
readable.find(fn[, options])
#
fn
<Function> | <AsyncFunction> ストリームの各チャンクで呼び出す関数。data
<any> ストリームからのデータのチャンク。options
<Object>signal
<AbortSignal> ストリームが破棄された場合に中断され、fn
の呼び出しを早期に中断できるようにします。
options
<Object>concurrency
<number> ストリームで一度に呼び出すfn
の最大同時呼び出し数。デフォルト:1
。signal
<AbortSignal> シグナルが中断された場合にストリームを破棄できるようにします。
- 戻り値: <Promise>
fn
が真値で評価された最初のチャンクと評価されるPromise。要素が見つからなかった場合はundefined
。
このメソッドはArray.prototype.find
に似ており、fn
の真値を持つチャンクを見つけるために、ストリーム内の各チャンクでfn
を呼び出します。fn
呼び出しのawait
された戻り値が真値になると、ストリームは破棄され、Promiseはfn
が真値を返した値で完了します。チャンクに対するすべてのfn
呼び出しが偽値を返す場合、Promiseはundefined
で完了します。
import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';
// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).find((x) => x > 2); // 3
await Readable.from([1, 2, 3, 4]).find((x) => x > 0); // 1
await Readable.from([1, 2, 3, 4]).find((x) => x > 10); // undefined
// With an asynchronous predicate, making at most 2 file checks at a time.
const foundBigFile = await Readable.from([
'file1',
'file2',
'file3',
]).find(async (fileName) => {
const stats = await stat(fileName);
return stats.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(foundBigFile); // File name of large file, if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished
readable.every(fn[, options])
#
fn
<Function> | <AsyncFunction> ストリームの各チャンクで呼び出す関数。data
<any> ストリームからのデータのチャンク。options
<Object>signal
<AbortSignal> ストリームが破棄された場合に中断され、fn
の呼び出しを早期に中断できるようにします。
options
<Object>concurrency
<number> ストリームで一度に呼び出すfn
の最大同時呼び出し数。デフォルト:1
。signal
<AbortSignal> シグナルが中断された場合にストリームを破棄できるようにします。
- 戻り値: <Promise>
fn
がすべてのチャンクに対して真値を返した場合、true
と評価されるPromise。
このメソッドはArray.prototype.every
に似ており、すべてのawait
された戻り値がfn
に対して真値であるかどうかを確認するために、ストリーム内の各チャンクでfn
を呼び出します。チャンクに対するfn
呼び出しのawait
された戻り値が偽値になると、ストリームは破棄され、Promiseはfalse
で完了します。チャンクに対するすべてのfn
呼び出しが真値を返すと、Promiseはtrue
で完了します。
import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';
// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).every((x) => x > 2); // false
await Readable.from([1, 2, 3, 4]).every((x) => x > 0); // true
// With an asynchronous predicate, making at most 2 file checks at a time.
const allBigFiles = await Readable.from([
'file1',
'file2',
'file3',
]).every(async (fileName) => {
const stats = await stat(fileName);
return stats.size > 1024 * 1024;
}, { concurrency: 2 });
// `true` if all files in the list are bigger than 1MiB
console.log(allBigFiles);
console.log('done'); // Stream has finished
readable.flatMap(fn[, options])
#
fn
<Function> | <AsyncGeneratorFunction> | <AsyncFunction> ストリーム内のすべてのチャンクをマッピングする関数。data
<any> ストリームからのデータのチャンク。options
<Object>signal
<AbortSignal> ストリームが破棄された場合に中断され、fn
の呼び出しを早期に中断できるようにします。
options
<Object>concurrency
<number> ストリームで一度に呼び出すfn
の最大同時呼び出し数。デフォルト:1
。signal
<AbortSignal> シグナルが中断された場合にストリームを破棄できるようにします。
- 戻り値: <Readable> 関数
fn
でフラットマップされたストリーム。
このメソッドは、指定されたコールバックをストリームの各チャンクに適用し、その結果をフラット化して、新しいストリームを返します。
fn
からストリームまたは別のイテラブルまたは非同期イテラブルを返すことができ、結果のストリームは返されたストリームにマージ(フラット化)されます。
import { Readable } from 'node:stream';
import { createReadStream } from 'node:fs';
// With a synchronous mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
console.log(chunk); // 1, 1, 2, 2, 3, 3, 4, 4
}
// With an asynchronous mapper, combine the contents of 4 files
const concatResult = Readable.from([
'./1.mjs',
'./2.mjs',
'./3.mjs',
'./4.mjs',
]).flatMap((fileName) => createReadStream(fileName));
for await (const result of concatResult) {
// This will contain the contents (all chunks) of all 4 files
console.log(result);
}
readable.drop(limit[, options])
#
limit
<number> readableから削除するチャンクの数。options
<Object>signal
<AbortSignal> シグナルが中断された場合にストリームを破棄できるようにします。
- 戻り値: <Readable>
limit
個のチャンクが削除されたストリーム。
このメソッドは、最初のlimit
個のチャンクが削除された新しいストリームを返します。
import { Readable } from 'node:stream';
await Readable.from([1, 2, 3, 4]).drop(2).toArray(); // [3, 4]
readable.take(limit[, options])
#
limit
<number> readableから取得するチャンクの数。options
<Object>signal
<AbortSignal> シグナルが中断された場合にストリームを破棄できるようにします。
- 戻り値: <Readable>
limit
個のチャンクが取得されたストリーム。
このメソッドは、最初のlimit
個のチャンクを含む新しいストリームを返します。
import { Readable } from 'node:stream';
await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
readable.reduce(fn[, initial[, options]])
#
fn
<Function> | <AsyncFunction> ストリーム内のすべてのチャンクに対して呼び出すリデューサー関数。previous
<any>fn
への最後の呼び出しから取得した値、または指定されている場合はinitial
値、またはそれ以外の場合はストリームの最初のチャンク。data
<any> ストリームからのデータのチャンク。options
<Object>signal
<AbortSignal> ストリームが破棄された場合に中断され、fn
の呼び出しを早期に中断できるようにします。
initial
<any> リダクションで使用する初期値。options
<Object>signal
<AbortSignal> シグナルが中断された場合にストリームを破棄できるようにします。
- 戻り値: <Promise> リダクションの最終値のPromise。
このメソッドは、ストリームの各チャンクに対して順番にfn
を呼び出し、前の要素の計算結果を渡します。リダクションの最終値のPromiseを返します。
initial
値が指定されていない場合、ストリームの最初のチャンクが初期値として使用されます。ストリームが空の場合、PromiseはERR_INVALID_ARGS
コードプロパティを持つTypeError
で拒否されます。
import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';
const directoryPath = './src';
const filesInDir = await readdir(directoryPath);
const folderSize = await Readable.from(filesInDir)
.reduce(async (totalSize, file) => {
const { size } = await stat(join(directoryPath, file));
return totalSize + size;
}, 0);
console.log(folderSize);
リデューサー関数はストリームを要素ごとに反復処理するため、concurrency
パラメーターや並列処理はありません。reduce
を同時に実行するには、非同期関数をreadable.map
メソッドに抽出できます。
import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';
const directoryPath = './src';
const filesInDir = await readdir(directoryPath);
const folderSize = await Readable.from(filesInDir)
.map((file) => stat(join(directoryPath, file)), { concurrency: 2 })
.reduce((totalSize, { size }) => totalSize + size, 0);
console.log(folderSize);
二重ストリームと変換ストリーム#
クラス: stream.Duplex
#
二重ストリームは、Readable
インターフェースとWritable
インターフェースの両方を実装するストリームです。
Duplex
ストリームの例としては、以下のようなものがあります。
duplex.allowHalfOpen
#
false
の場合、ストリームは読み取り側が終了すると、書き込み側を自動的に終了します。最初にallowHalfOpen
コンストラクターオプションで設定されます。デフォルトはtrue
です。
既存のDuplex
ストリームインスタンスのハーフオープン動作を変更するために、これを手動で変更できますが、'end'
イベントが発行される前に変更する必要があります。
クラス: stream.Transform
#
変換ストリームは、出力が何らかの形で入力に関連付けられているDuplex
ストリームです。すべてのDuplex
ストリームと同様に、Transform
ストリームは、Readable
インターフェースとWritable
インターフェースの両方を実装します。
Transform
ストリームの例としては、以下のようなものがあります。
transform.destroy([error])
#
ストリームを破棄し、オプションで'error'
イベントを発行します。この呼び出し後、変換ストリームは内部リソースを解放します。実装者はこのメソッドをオーバーライドすべきではなく、代わりにreadable._destroy()
を実装する必要があります。Transform
の_destroy()
のデフォルト実装は、emitClose
がfalseに設定されていない限り、'close'
も発行します。
destroy()
が呼び出されると、それ以降の呼び出しは何も操作を行わず、_destroy()
からのエラーを除き、'error'
として発行されるエラーはなくなります。
stream.finished(stream[, options], callback)
#
stream
<Stream> | <ReadableStream> | <WritableStream> 可読および/または書込可能なストリーム/ウェブストリーム。options
<Object>error
<boolean>false
に設定すると、emit('error', err)
の呼び出しは終了したと見なされません。デフォルト:true
。readable
<boolean>false
に設定すると、ストリームがまだ読み取り可能であっても、ストリームが終了したときにコールバックが呼び出されます。デフォルト:true
。writable
<boolean>false
に設定すると、ストリームがまだ書き込み可能であっても、ストリームが終了したときにコールバックが呼び出されます。デフォルト:true
。signal
<AbortSignal> ストリームの終了を待つのを中断できます。シグナルが中断された場合、基になるストリームは中断されません。コールバックはAbortError
で呼び出されます。この関数によって追加されたすべての登録済みリスナーも削除されます。cleanup
<boolean> 登録されたすべてのストリームリスナーを削除します。デフォルト:false
。
callback
<Function> オプションのエラー引数を取るコールバック関数。- 戻り値: <Function> 登録されたすべてのリスナーを削除するクリーンアップ関数。
ストリームが可読、書込可能でなくなった場合、またはエラーや早期クローズイベントが発生した場合に通知を受けるための関数。
const { finished } = require('node:stream');
const fs = require('node:fs');
const rs = fs.createReadStream('archive.tar');
finished(rs, (err) => {
if (err) {
console.error('Stream failed.', err);
} else {
console.log('Stream is done reading.');
}
});
rs.resume(); // Drain the stream.
ストリームが早期に破棄された(中断されたHTTPリクエストなど)、'end'
や'finish'
を発行しないエラー処理シナリオで特に役立ちます。
finished
APIは、プロミスバージョンを提供します。
stream.finished()
は、callback
が呼び出された後も、(特に'error'
、'end'
、'finish'
、および'close'
)ぶら下がったイベントリスナーを残します。これは、予期しない'error'
イベント(ストリームの実装が正しくないため)が予期しないクラッシュを引き起こさないようにするためです。これが不要な動作である場合は、返されたクリーンアップ関数をコールバックで呼び出す必要があります。
const cleanup = finished(rs, (err) => {
cleanup();
// ...
});
stream.pipeline(source[, ...transforms], destination, callback)
#
stream.pipeline(streams, callback)
#
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function> | <ReadableStream>- 戻り値: <Iterable> | <AsyncIterable>
...transforms
<Stream> | <Function> | <TransformStream>source
<AsyncIterable>- 戻り値: <AsyncIterable>
destination
<Stream> | <Function> | <WritableStream>source
<AsyncIterable>- 戻り値: <AsyncIterable> | <Promise>
callback
<Function> パイプラインが完全に完了したときに呼び出されます。err
<Error>val
destination
によって返されたPromise
の解決値。
- 戻り値: <Stream>
ストリームとジェネレーターの間でパイプ処理を行い、エラーを転送し、適切にクリーンアップし、パイプラインが完了したときにコールバックを提供するモジュールメソッド。
const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');
// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge tar file efficiently:
pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
},
);
pipeline
APIは、プロミスバージョンを提供します。
stream.pipeline()
は、次を除くすべてのストリームでstream.destroy(err)
を呼び出します。
'end'
または'close'
を発行したReadable
ストリーム。'finish'
または'close'
を発行したWritable
ストリーム。
stream.pipeline()
は、callback
が呼び出された後も、ストリームにぶら下がったイベントリスナーを残します。失敗後にストリームを再利用する場合、これによりイベントリスナーのリークやエラーの飲み込みが発生する可能性があります。最後のストリームが可読な場合、最後のストリームを後で消費できるように、ぶら下がったイベントリスナーが削除されます。
stream.pipeline()
は、エラーが発生するとすべてのストリームを閉じます。pipeline
を使用したIncomingRequest
の使用は、予期される応答を送信せずにソケットを破棄するため、予期しない動作につながる可能性があります。以下の例を参照してください。
const fs = require('node:fs');
const http = require('node:http');
const { pipeline } = require('node:stream');
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('./fileNotExist.txt');
pipeline(fileStream, res, (err) => {
if (err) {
console.log(err); // No such file
// this message can't be sent once `pipeline` already destroyed the socket
return res.end('error!!!');
}
});
});
stream.compose(...streams)
#
stream.compose
は実験的です。streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]> | <Duplex[]> | <Function>- 戻り値: <stream.Duplex>
2つ以上のストリームを、最初のストリームに書き込み、最後のストリームから読み取るDuplex
ストリームに結合します。提供された各ストリームは、stream.pipeline
を使用して次のストリームにパイプされます。いずれかのストリームがエラーになった場合、外側のDuplex
ストリームを含め、すべて破棄されます。
stream.compose
は、順番に他のストリームにパイプできる(またパイプすべき)新しいストリームを返すため、構成が可能です。対照的に、ストリームをstream.pipeline
に渡す場合、通常、最初のストリームは可読ストリーム、最後のストリームは書込可能ストリームであり、閉じた回路を形成します。
Function
を渡した場合、source
のIterable
を取るファクトリーメソッドである必要があります。
import { compose, Transform } from 'node:stream';
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''));
},
});
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
}
let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf;
}
console.log(res); // prints 'HELLOWORLD'
stream.compose
を使用して、非同期イテラブル、ジェネレーター、関数をストリームに変換できます。
AsyncIterable
は、可読なDuplex
に変換されます。null
を生成することはできません。AsyncGeneratorFunction
は、可読/書込可能な変換Duplex
に変換されます。最初のパラメーターとしてソースのAsyncIterable
を取る必要があります。null
を生成することはできません。AsyncFunction
は、書込可能なDuplex
に変換されます。null
またはundefined
のいずれかを返す必要があります。
import { compose } from 'node:stream';
import { finished } from 'node:stream/promises';
// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
yield 'Hello';
yield 'World';
}());
// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});
let res = '';
// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
for await (const chunk of source) {
res += chunk;
}
});
await finished(compose(s1, s2, s3));
console.log(res); // prints 'HELLOWORLD'
演算子としてのstream.compose
については、readable.compose(stream)
を参照してください。
stream.Readable.from(iterable[, options])
#
iterable
<Iterable>Symbol.asyncIterator
またはSymbol.iterator
イテラブルプロトコルを実装するオブジェクト。null値が渡された場合、'error'イベントを発行します。options
<Object>new stream.Readable([options])
に提供されるオプション。デフォルトでは、Readable.from()
は、options.objectMode
をfalse
に設定して明示的にオプトアウトしない限り、options.objectMode
をtrue
に設定します。- 戻り値: <stream.Readable>
イテレーターから可読ストリームを作成するためのユーティリティメソッド。
const { Readable } = require('node:stream');
async function * generate() {
yield 'hello';
yield 'streams';
}
const readable = Readable.from(generate());
readable.on('data', (chunk) => {
console.log(chunk);
});
Readable.from(string)
または Readable.from(buffer)
を呼び出しても、パフォーマンス上の理由から、文字列やバッファが他のストリームのセマンティクスに合わせて反復処理されることはありません。
Promise を含む Iterable
オブジェクトが引数として渡された場合、未処理の拒否が発生する可能性があります。
const { Readable } = require('node:stream');
Readable.from([
new Promise((resolve) => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
]);
stream.Readable.fromWeb(readableStream[, options])
#
readableStream
<ReadableStream>options
<Object>encoding
<string>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
- 戻り値: <stream.Readable>
stream.Readable.isDisturbed(stream)
#
stream
<stream.Readable> | <ReadableStream>- 戻り値:
boolean
ストリームが読み取り済みまたはキャンセル済みかどうかを返します。
stream.isErrored(stream)
#
stream
<Readable> | <Writable> | <Duplex> | <WritableStream> | <ReadableStream>- 戻り値: <boolean>
ストリームでエラーが発生したかどうかを返します。
stream.isReadable(stream)
#
stream
<Readable> | <Duplex> | <ReadableStream>- 戻り値: <boolean>
ストリームが読み取り可能かどうかを返します。
stream.Readable.toWeb(streamReadable[, options])
#
streamReadable
<stream.Readable>options
<Object>strategy
<Object>highWaterMark
<number> 与えられたstream.Readable
からの読み取りでバックプレッシャーが適用される前の、(作成されたReadableStream
の)最大内部キューサイズ。値が提供されない場合、与えられたstream.Readable
から取得されます。size
<Function> 与えられたデータのチャンクのサイズを計算する関数。値が提供されない場合、すべてのチャンクのサイズは1
になります。
- 戻り値: <ReadableStream>
stream.Writable.fromWeb(writableStream[, options])
#
writableStream
<WritableStream>options
<Object>decodeStrings
<boolean>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
- 戻り値: <stream.Writable>
stream.Writable.toWeb(streamWritable)
#
streamWritable
<stream.Writable>- 戻り値: <WritableStream>
stream.Duplex.from(src)
#
src
<Stream> | <Blob> | <ArrayBuffer> | <string> | <Iterable> | <AsyncIterable> | <AsyncGeneratorFunction> | <AsyncFunction> | <Promise> | <Object> | <ReadableStream> | <WritableStream>
双方向ストリームを作成するためのユーティリティメソッドです。
Stream
は書き込み可能ストリームを書き込み可能なDuplex
に、読み取り可能ストリームをDuplex
に変換します。Blob
は読み取り可能なDuplex
に変換します。string
は読み取り可能なDuplex
に変換します。ArrayBuffer
は読み取り可能なDuplex
に変換します。AsyncIterable
は、可読なDuplex
に変換されます。null
を生成することはできません。AsyncGeneratorFunction
は、可読/書込可能な変換Duplex
に変換されます。最初のパラメーターとしてソースのAsyncIterable
を取る必要があります。null
を生成することはできません。AsyncFunction
は書き込み可能なDuplex
に変換します。null
またはundefined
のいずれかを返す必要があります。Object ({ writable, readable })
は、readable
とwritable
をStream
に変換し、それらをDuplex
に結合します。Duplex
はwritable
に書き込み、readable
から読み取ります。Promise
は読み取り可能なDuplex
に変換します。値null
は無視されます。ReadableStream
は読み取り可能なDuplex
に変換します。WritableStream
は書き込み可能なDuplex
に変換します。- 戻り値: <stream.Duplex>
Promise を含む Iterable
オブジェクトが引数として渡された場合、未処理の拒否が発生する可能性があります。
const { Duplex } = require('node:stream');
Duplex.from([
new Promise((resolve) => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
]);
stream.Duplex.fromWeb(pair[, options])
#
pair
<Object>readable
<ReadableStream>writable
<WritableStream>
options
<Object>- 戻り値: <stream.Duplex>
import { Duplex } from 'node:stream';
import {
ReadableStream,
WritableStream,
} from 'node:stream/web';
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world');
},
});
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk);
},
});
const pair = {
readable,
writable,
};
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });
duplex.write('hello');
for await (const chunk of duplex) {
console.log('readable', chunk);
}
const { Duplex } = require('node:stream');
const {
ReadableStream,
WritableStream,
} = require('node:stream/web');
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world');
},
});
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk);
},
});
const pair = {
readable,
writable,
};
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });
duplex.write('hello');
duplex.once('readable', () => console.log('readable', duplex.read()));
stream.Duplex.toWeb(streamDuplex)
#
streamDuplex
<stream.Duplex>- 戻り値: <Object>
readable
<ReadableStream>writable
<WritableStream>
import { Duplex } from 'node:stream';
const duplex = Duplex({
objectMode: true,
read() {
this.push('world');
this.push(null);
},
write(chunk, encoding, callback) {
console.log('writable', chunk);
callback();
},
});
const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');
const { value } = await readable.getReader().read();
console.log('readable', value);
const { Duplex } = require('node:stream');
const duplex = Duplex({
objectMode: true,
read() {
this.push('world');
this.push(null);
},
write(chunk, encoding, callback) {
console.log('writable', chunk);
callback();
},
});
const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');
readable.getReader().read().then((result) => {
console.log('readable', result.value);
});
stream.addAbortSignal(signal, stream)
#
signal
<AbortSignal> キャンセルされる可能性を表すシグナルstream
<Stream> | <ReadableStream> | <WritableStream> シグナルをアタッチするストリーム。
AbortSignal
を読み取り可能または書き込み可能なストリームにアタッチします。これにより、コードは AbortController
を使用してストリームの破棄を制御できます。
渡された AbortSignal
に対応する AbortController
で abort
を呼び出すと、ストリームで .destroy(new AbortError())
を呼び出す場合と同じように動作し、Webストリームの場合は controller.error(new AbortError())
を呼び出す場合と同じように動作します。
const fs = require('node:fs');
const controller = new AbortController();
const read = addAbortSignal(
controller.signal,
fs.createReadStream(('object.json')),
);
// Later, abort the operation closing the stream
controller.abort();
または、非同期イテラブルとして読み取り可能なストリームで AbortSignal
を使用する
const controller = new AbortController();
setTimeout(() => controller.abort(), 10_000); // set a timeout
const stream = addAbortSignal(
controller.signal,
fs.createReadStream(('object.json')),
);
(async () => {
try {
for await (const chunk of stream) {
await process(chunk);
}
} catch (e) {
if (e.name === 'AbortError') {
// The operation was cancelled
} else {
throw e;
}
}
})();
または、ReadableStream で AbortSignal
を使用する
const controller = new AbortController();
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello');
controller.enqueue('world');
controller.close();
},
});
addAbortSignal(controller.signal, rs);
finished(rs, (err) => {
if (err) {
if (err.name === 'AbortError') {
// The operation was cancelled
}
}
});
const reader = rs.getReader();
reader.read().then(({ value, done }) => {
console.log(value); // hello
console.log(done); // false
controller.abort();
});
stream.getDefaultHighWaterMark(objectMode)
#
ストリームで使用されるデフォルトの highWaterMark を返します。デフォルトは、objectMode
の場合は 16384
(16 KiB)、または 16
です。
stream.setDefaultHighWaterMark(objectMode, value)
#
ストリームで使用されるデフォルトの highWaterMark を設定します。
ストリーム実装者のためのAPI#
node:stream
モジュール API は、JavaScript のプロトタイプ継承モデルを使用してストリームを簡単に実装できるように設計されています。
まず、ストリーム開発者は、4 つの基本ストリームクラス (stream.Writable
、stream.Readable
、stream.Duplex
、または stream.Transform
) のいずれかを拡張する新しい JavaScript クラスを宣言し、適切な親クラスコンストラクターを呼び出すようにします。
const { Writable } = require('node:stream');
class MyWritable extends Writable {
constructor({ highWaterMark, ...options }) {
super({ highWaterMark });
// ...
}
}
ストリームを拡張する場合、ユーザーが提供できるオプションと提供すべきオプションを念頭に置き、それらをベースコンストラクターに転送する前に確認してください。たとえば、実装で autoDestroy
オプションと emitClose
オプションに関して前提条件がある場合は、ユーザーがこれらをオーバーライドできないようにしてください。すべてのオプションを暗黙的に転送するのではなく、転送されるオプションについて明示的に記述してください。
新しいストリームクラスは、作成されるストリームのタイプに応じて、以下の表に示すように、1 つ以上の特定のメソッドを実装する必要があります。
ユースケース | クラス | 実装するメソッド |
---|---|---|
読み取り専用 | Readable | _read() |
書き込み専用 | Writable | _write() 、_writev() 、_final() |
読み取りと書き込み | Duplex | _read() 、_write() 、_writev() 、_final() |
書き込まれたデータを操作し、結果を読み取る | Transform | _transform() 、_flush() 、_final() |
ストリームの実装コードは、コンシューマーで使用することを目的としたストリームの「パブリック」メソッドを決して呼び出すべきではありません (「ストリームコンシューマー向けのAPI」セクションで説明)。そうすると、ストリームを消費するアプリケーションコードで悪影響が発生する可能性があります。
write()
、end()
、cork()
、uncork()
、read()
、destroy()
などのパブリックメソッドをオーバーライドしたり、'error'
、'data'
、'end'
、'finish'
、'close'
などの内部イベントを .emit()
を使用して発生させたりすることは避けてください。そうすると、現在および将来のストリームの不変性が損なわれ、他のストリーム、ストリームユーティリティ、およびユーザーの期待との間で動作や互換性の問題が発生する可能性があります。
簡略化された構築#
多くの場合、継承に頼らずにストリームを作成できます。これは、stream.Writable
、stream.Readable
、stream.Duplex
、または stream.Transform
オブジェクトのインスタンスを直接作成し、適切なメソッドをコンストラクターオプションとして渡すことで実現できます。
const { Writable } = require('node:stream');
const myWritable = new Writable({
construct(callback) {
// Initialize state and load resources...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
// Free resources...
},
});
書き込み可能ストリームの実装#
stream.Writable
クラスは、Writable
ストリームを実装するために拡張されます。
カスタムのWritable
ストリームは、必ずnew stream.Writable([options])
コンストラクターを呼び出し、writable._write()
および/またはwritable._writev()
メソッドを実装する必要があります。
new stream.Writable([options])
#
options
<Object>highWaterMark
<number>stream.write()
がfalse
を返し始めるバッファーレベル。デフォルト:16384
(16 KiB)、またはobjectMode
ストリームの場合は16
。decodeStrings
<boolean>stream.write()
に渡されたstring
を、stream._write()
に渡す前に、Buffer
にエンコードするかどうか(stream.write()
呼び出しで指定されたエンコーディングを使用)。他の型のデータは変換されません(つまり、Buffer
はstring
にデコードされません)。falseに設定すると、string
が変換されなくなります。デフォルト:true
。defaultEncoding
<string>stream.write()
への引数としてエンコーディングが指定されていない場合に使用されるデフォルトのエンコーディング。デフォルト:'utf8'
。objectMode
<boolean>stream.write(anyObj)
が有効な操作かどうか。設定すると、ストリーム実装でサポートされている場合、string、Buffer
、またはUint8Array
以外のJavaScript値を書き込むことが可能になります。デフォルト:false
。emitClose
<boolean> ストリームが破棄された後に'close'
を発行するかどうか。デフォルト:true
。write
<Function>stream._write()
メソッドの実装。writev
<Function>stream._writev()
メソッドの実装。destroy
<Function>stream._destroy()
メソッドの実装。final
<Function>stream._final()
メソッドの実装。construct
<Function>stream._construct()
メソッドの実装。autoDestroy
<boolean> このストリームが終了後に自動的に.destroy()
を自身に呼び出すかどうか。デフォルト:true
。signal
<AbortSignal> キャンセルされる可能性を表すシグナル。
const { Writable } = require('node:stream');
class MyWritable extends Writable {
constructor(options) {
// Calls the stream.Writable() constructor.
super(options);
// ...
}
}
または、pre-ES6スタイルのコンストラクターを使用する場合
const { Writable } = require('node:stream');
const util = require('node:util');
function MyWritable(options) {
if (!(this instanceof MyWritable))
return new MyWritable(options);
Writable.call(this, options);
}
util.inherits(MyWritable, Writable);
または、簡略化されたコンストラクターアプローチを使用する場合
const { Writable } = require('node:stream');
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
});
渡されたAbortSignal
に対応するAbortController
でabort
を呼び出すと、書き込み可能ストリームで.destroy(new AbortError())
を呼び出すのと同じように動作します。
const { Writable } = require('node:stream');
const controller = new AbortController();
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
signal: controller.signal,
});
// Later, abort the operation closing the stream
controller.abort();
writable._construct(callback)
#
callback
<Function> ストリームの初期化が完了したら、この関数を(オプションでエラー引数付きで)呼び出します。
_construct()
メソッドは直接呼び出してはなりません。これは子クラスによって実装される可能性があり、その場合、内部のWritable
クラスメソッドのみによって呼び出されます。
このオプションの関数は、ストリームコンストラクターが返された後のティックで呼び出され、callback
が呼び出されるまで_write()
、_final()
、および_destroy()
の呼び出しを遅延させます。これは、ストリームを使用する前に、状態を初期化したり、リソースを非同期的に初期化したりするのに役立ちます。
const { Writable } = require('node:stream');
const fs = require('node:fs');
class WriteStream extends Writable {
constructor(filename) {
super();
this.filename = filename;
this.fd = null;
}
_construct(callback) {
fs.open(this.filename, (err, fd) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback);
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
writable._write(chunk, encoding, callback)
#
chunk
<Buffer> | <string> | <any>stream.write()
に渡されたstring
から変換された書き込むBuffer
。ストリームのdecodeStrings
オプションがfalse
の場合、またはストリームがオブジェクトモードで動作している場合、チャンクは変換されず、stream.write()
に渡されたものがそのままになります。encoding
<string> チャンクが文字列の場合、encoding
はその文字列の文字エンコーディングです。チャンクがBuffer
の場合、またはストリームがオブジェクトモードで動作している場合、encoding
は無視される場合があります。callback
<Function> 提供されたチャンクの処理が完了したら、この関数を(オプションでエラー引数付きで)呼び出します。
すべてのWritable
ストリーム実装は、基になるリソースにデータを送信するためのwritable._write()
および/またはwritable._writev()
メソッドを提供する必要があります。
Transform
ストリームは、writable._write()
の独自の実装を提供します。
この関数は、アプリケーションコードから直接呼び出してはなりません。これは子クラスによって実装される必要があり、内部のWritable
クラスメソッドのみによって呼び出されます。
callback
関数は、writable._write()
内で同期的に、または非同期的に(つまり、異なるティックで)呼び出して、書き込みが正常に完了したか、エラーが発生して失敗したかを通知する必要があります。callback
に渡される最初の引数は、呼び出しが失敗した場合はError
オブジェクト、書き込みが成功した場合はnull
である必要があります。
writable._write()
が呼び出されてからcallback
が呼び出されるまでの間に発生するwritable.write()
へのすべての呼び出しにより、書き込まれたデータがバッファリングされます。callback
が呼び出されると、ストリームは'drain'
イベントを発行する場合があります。ストリーム実装が複数のデータチャンクを一度に処理できる場合は、writable._writev()
メソッドを実装する必要があります。
コンストラクターオプションでdecodeStrings
プロパティが明示的にfalse
に設定されている場合、chunk
は.write()
に渡されるのと同じオブジェクトのままであり、Buffer
ではなく文字列である場合があります。これは、特定の文字列データエンコーディングの最適化された処理をサポートするためです。その場合、encoding
引数は文字列の文字エンコーディングを示します。それ以外の場合、encoding
引数は安全に無視できます。
writable._write()
メソッドには、それが定義するクラスの内部であり、ユーザープログラムから直接呼び出すべきではないため、アンダースコアが先頭に付いています。
writable._writev(chunks, callback)
#
chunks
<Object[]> 書き込むデータ。値は、書き込むデータの個別のチャンクを表す<Object>の配列です。これらのオブジェクトのプロパティは次のとおりです。callback
<Function> 提供されたチャンクの処理が完了したときに呼び出されるコールバック関数(オプションでエラー引数付き)。
この関数は、アプリケーションコードから直接呼び出してはなりません。これは子クラスによって実装される必要があり、内部のWritable
クラスメソッドのみによって呼び出されます。
writable._writev()
メソッドは、複数のデータチャンクを一度に処理できるストリーム実装で、writable._write()
に加えて、または代替として実装できます。実装されており、以前の書き込みからのバッファリングされたデータがある場合、_write()
の代わりに_writev()
が呼び出されます。
writable._writev()
メソッドには、それが定義するクラスの内部であり、ユーザープログラムから直接呼び出すべきではないため、アンダースコアが先頭に付いています。
writable._destroy(err, callback)
#
err
<Error> 考えられるエラー。callback
<Function> オプションのエラー引数を取るコールバック関数。
_destroy()
メソッドはwritable.destroy()
によって呼び出されます。子クラスによってオーバーライドできますが、直接呼び出してはなりません。さらに、プロミスが解決されたときに実行されると、callback
をasync/awaitと混同しないでください。
writable._final(callback)
#
callback
<Function> 残りのデータの書き込みが完了したら、この関数を(オプションでエラー引数付きで)呼び出します。
_final()
メソッドは、直接呼び出してはいけません。子クラスによって実装される可能性があり、その場合は、内部の Writable
クラスメソッドによってのみ呼び出されます。
このオプションの関数は、ストリームが閉じる前に呼び出され、callback
が呼び出されるまで 'finish'
イベントを遅延させます。これは、ストリームが終了する前にリソースを閉じたり、バッファリングされたデータを書き込んだりするのに便利です。
書き込み中のエラー#
writable._write()
、writable._writev()
、および writable._final()
メソッドの処理中に発生したエラーは、コールバックを呼び出し、エラーを最初の引数として渡すことで伝播させる必要があります。これらのメソッド内から Error
をスローしたり、手動で 'error'
イベントを発生させたりすると、未定義の動作になります。
Writable
ストリームがエラーを発生させたときに、Readable
ストリームが Writable
ストリームにパイプされている場合、Readable
ストリームはパイプ解除されます。
const { Writable } = require('node:stream');
const myWritable = new Writable({
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'));
} else {
callback();
}
},
});
書き込み可能ストリームの例#
以下は、かなり単純(でやや無意味)なカスタムの Writable
ストリームの実装を示しています。この特定の Writable
ストリームインスタンスは実際には特に有用ではありませんが、この例ではカスタムの Writable
ストリームインスタンスに必要な各要素を示しています。
const { Writable } = require('node:stream');
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'));
} else {
callback();
}
}
}
書き込み可能ストリームでのバッファのデコード#
バッファのデコードは、たとえば、入力が文字列であるトランスフォーマーを使用する場合など、一般的なタスクです。これは、UTF-8 などのマルチバイト文字エンコーディングを使用する場合、簡単な処理ではありません。次の例は、StringDecoder
と Writable
を使用してマルチバイト文字列をデコードする方法を示しています。
const { Writable } = require('node:stream');
const { StringDecoder } = require('node:string_decoder');
class StringWritable extends Writable {
constructor(options) {
super(options);
this._decoder = new StringDecoder(options && options.defaultEncoding);
this.data = '';
}
_write(chunk, encoding, callback) {
if (encoding === 'buffer') {
chunk = this._decoder.write(chunk);
}
this.data += chunk;
callback();
}
_final(callback) {
this.data += this._decoder.end();
callback();
}
}
const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from);
const w = new StringWritable();
w.write('currency: ');
w.write(euro[0]);
w.end(euro[1]);
console.log(w.data); // currency: €
読み取り可能ストリームの実装#
Readable
ストリームを実装するために、stream.Readable
クラスが拡張されます。
カスタムの Readable
ストリームは、必ず new stream.Readable([options])
コンストラクターを呼び出し、readable._read()
メソッドを実装する必要があります。
new stream.Readable([options])
#
options
<Object>highWaterMark
<number> 基になるリソースからの読み取りを停止する前に、内部バッファに格納する最大バイト数。デフォルト:16384
(16 KiB) 、またはobjectMode
ストリームの場合は16
。encoding
<string> 指定した場合、バッファは指定されたエンコーディングを使用して文字列にデコードされます。デフォルト:null
。objectMode
<boolean> このストリームをオブジェクトのストリームとして動作させるかどうか。つまり、stream.read(n)
は、サイズがn
のBuffer
ではなく、単一の値を返します。デフォルト:false
。emitClose
<boolean> ストリームが破棄された後に'close'
を発行するかどうか。デフォルト:true
。read
<Function>stream._read()
メソッドの実装。destroy
<Function>stream._destroy()
メソッドの実装。construct
<Function>stream._construct()
メソッドの実装。autoDestroy
<boolean> このストリームが終了後に自動的に.destroy()
を自身に呼び出すかどうか。デフォルト:true
。signal
<AbortSignal> キャンセルされる可能性を表すシグナル。
const { Readable } = require('node:stream');
class MyReadable extends Readable {
constructor(options) {
// Calls the stream.Readable(options) constructor.
super(options);
// ...
}
}
または、pre-ES6スタイルのコンストラクターを使用する場合
const { Readable } = require('node:stream');
const util = require('node:util');
function MyReadable(options) {
if (!(this instanceof MyReadable))
return new MyReadable(options);
Readable.call(this, options);
}
util.inherits(MyReadable, Readable);
または、簡略化されたコンストラクターアプローチを使用する場合
const { Readable } = require('node:stream');
const myReadable = new Readable({
read(size) {
// ...
},
});
渡された AbortSignal
に対応する AbortController
で abort
を呼び出すと、作成された読み取り可能ストリームで .destroy(new AbortError())
を呼び出すのと同じように動作します。
const { Readable } = require('node:stream');
const controller = new AbortController();
const read = new Readable({
read(size) {
// ...
},
signal: controller.signal,
});
// Later, abort the operation closing the stream
controller.abort();
readable._construct(callback)
#
callback
<Function> ストリームの初期化が完了したら、この関数を(オプションでエラー引数付きで)呼び出します。
_construct()
メソッドは、直接呼び出してはなりません。子クラスによって実装される可能性があり、その場合は、内部の Readable
クラスメソッドによってのみ呼び出されます。
このオプションの関数は、ストリームコンストラクターによって次のティックでスケジュールされ、_read()
および _destroy()
の呼び出しは、callback
が呼び出されるまで遅延されます。これは、ストリームを使用する前に、状態を初期化したり、リソースを非同期で初期化したりするのに便利です。
const { Readable } = require('node:stream');
const fs = require('node:fs');
class ReadStream extends Readable {
constructor(filename) {
super();
this.filename = filename;
this.fd = null;
}
_construct(callback) {
fs.open(this.filename, (err, fd) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_read(n) {
const buf = Buffer.alloc(n);
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err);
} else {
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null);
}
});
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
readable._read(size)
#
size
<number> 非同期に読み取るバイト数
この関数は、アプリケーションコードから直接呼び出してはなりません。子クラスによって実装される必要があり、内部の Readable
クラスメソッドによってのみ呼び出されます。
すべての Readable
ストリームの実装は、基になるリソースからデータを取得するための readable._read()
メソッドの実装を提供する必要があります。
readable._read()
が呼び出されたときに、リソースからデータが利用可能な場合、実装は this.push(dataChunk)
メソッドを使用してそのデータを読み取りキューにプッシュを開始する必要があります。ストリームがより多くのデータを受け入れる準備が整うと、this.push(dataChunk)
が呼び出されるたびに、_read()
が再度呼び出されます。_read()
は、readable.push()
が false
を返すまで、リソースから読み取り、データをプッシュし続けることができます。_read()
が停止した後、再度呼び出された場合にのみ、キューに追加のデータのプッシュを再開する必要があります。
readable._read()
メソッドが呼び出されると、readable.push()
メソッドを介してより多くのデータがプッシュされるまで、再度呼び出されることはありません。空のバッファや文字列などの空のデータは、readable._read()
を呼び出しません。
size
引数は推奨です。「読み取り」がデータを返す単一の操作である実装の場合、size
引数を使用してフェッチするデータ量を決定できます。他の実装では、この引数を無視し、データが利用可能になったらいつでもデータを単純に提供できます。stream.push(chunk)
を呼び出す前に、size
バイトが利用可能になるまで「待つ」必要はありません。
readable._read()
メソッドには、それが定義されているクラスの内部であり、ユーザープログラムによって直接呼び出すべきではないため、アンダースコアがプレフィックスとして付いています。
readable._destroy(err, callback)
#
err
<Error> 考えられるエラー。callback
<Function> オプションのエラー引数を取るコールバック関数。
_destroy()
メソッドは、readable.destroy()
によって呼び出されます。子クラスによってオーバーライドできますが、直接呼び出してはいけません。
readable.push(chunk[, encoding])
#
chunk
<Buffer> | <Uint8Array> | <string> | <null> | <any> 読み取りキューにプッシュするデータのチャンク。オブジェクトモードで動作していないストリームの場合、chunk
は文字列、Buffer
、またはUint8Array
である必要があります。オブジェクトモードのストリームの場合、chunk
は任意の JavaScript 値にできます。encoding
<string> 文字列チャンクのエンコーディング。'utf8'
や'ascii'
などの有効なBuffer
エンコーディングである必要があります。- 戻り値: <boolean> 追加のデータチャンクのプッシュを続行できる場合は
true
。それ以外の場合はfalse
。
chunk
が Buffer
、Uint8Array
、または string
の場合、データの chunk
は、ストリームのユーザーが消費するために内部キューに追加されます。chunk
を null
として渡すと、ストリームの終わり (EOF) が通知され、その後はデータを書き込むことができなくなります。
Readable
が一時停止モードで動作している場合、'readable'
イベントが発生したときに、readable.read()
メソッドを呼び出すことで、readable.push()
で追加されたデータを読み出すことができます。
Readable
がフローモードで動作している場合、readable.push()
で追加されたデータは 'data'
イベントを発生させることで配信されます。
readable.push()
メソッドは、可能な限り柔軟になるように設計されています。たとえば、何らかの一時停止/再開メカニズムとデータコールバックを提供する下位レベルのソースをラップする場合、カスタムの Readable
インスタンスで下位レベルのソースをラップできます。
// `_source` is an object with readStop() and readStart() methods,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over.
class SourceWrapper extends Readable {
constructor(options) {
super(options);
this._source = getLowLevelSourceObject();
// Every time there's data, push it into the internal buffer.
this._source.ondata = (chunk) => {
// If push() returns false, then stop reading from source.
if (!this.push(chunk))
this._source.readStop();
};
// When the source ends, push the EOF-signaling `null` chunk.
this._source.onend = () => {
this.push(null);
};
}
// _read() will be called when the stream wants to pull more data in.
// The advisory size argument is ignored in this case.
_read(size) {
this._source.readStart();
}
}
readable.push()
メソッドは、コンテンツを内部バッファにプッシュするために使用されます。これは、readable._read()
メソッドによって駆動できます。
オブジェクトモードで動作していないストリームの場合、readable.push()
の chunk
パラメーターが undefined
の場合、空の文字列またはバッファとして扱われます。詳細については、readable.push('')
を参照してください。
読み取り中のエラー#
readable._read()
の処理中に発生したエラーは、readable.destroy(err)
メソッドを介して伝播する必要があります。readable._read()
内から Error
をスローしたり、手動で 'error'
イベントを発生させたりすると、未定義の動作になります。
const { Readable } = require('node:stream');
const myReadable = new Readable({
read(size) {
const err = checkSomeErrorCondition();
if (err) {
this.destroy(err);
} else {
// Do some work.
}
},
});
カウントストリームの例#
以下は、1から1,000,000までの数値を昇順で出力し、その後終了するReadable
ストリームの基本的な例です。
const { Readable } = require('node:stream');
class Counter extends Readable {
constructor(opt) {
super(opt);
this._max = 1000000;
this._index = 1;
}
_read() {
const i = this._index++;
if (i > this._max)
this.push(null);
else {
const str = String(i);
const buf = Buffer.from(str, 'ascii');
this.push(buf);
}
}
}
双方向ストリームの実装#
Duplex
ストリームは、TCPソケット接続のように、Readable
とWritable
の両方を実装するものです。
JavaScriptは多重継承をサポートしていないため、Duplex
ストリームを実装するためにstream.Duplex
クラスが拡張されます(stream.Readable
とstream.Writable
クラスを拡張するのではなく)。
stream.Duplex
クラスは、プロトタイプ的にstream.Readable
から継承し、寄生的にstream.Writable
から継承しますが、instanceof
は、stream.Writable
のSymbol.hasInstance
をオーバーライドすることにより、両方の基本クラスで正しく動作します。
カスタムDuplex
ストリームは、new stream.Duplex([options])
コンストラクタを呼び出し、readable._read()
メソッドとwritable._write()
メソッドの両方を実装する必要があります。
new stream.Duplex(options)
#
options
<Object>Writable
とReadable
の両方のコンストラクタに渡されます。また、以下のフィールドがあります。allowHalfOpen
<boolean>false
に設定すると、読み取り可能側が終了すると、ストリームは書き込み可能側を自動的に終了します。デフォルト:true
。readable
<boolean>Duplex
を読み取り可能にするかどうかを設定します。デフォルト:true
。writable
<boolean>Duplex
を書き込み可能にするかどうかを設定します。デフォルト:true
。readableObjectMode
<boolean> ストリームの読み取り可能側のobjectMode
を設定します。objectMode
がtrue
の場合は効果がありません。デフォルト:false
。writableObjectMode
<boolean> ストリームの書き込み可能側のobjectMode
を設定します。objectMode
がtrue
の場合は効果がありません。デフォルト:false
。readableHighWaterMark
<number> ストリームの読み取り可能側のhighWaterMark
を設定します。highWaterMark
が指定されている場合は効果がありません。writableHighWaterMark
<number> ストリームの書き込み可能側のhighWaterMark
を設定します。highWaterMark
が指定されている場合は効果がありません。
const { Duplex } = require('node:stream');
class MyDuplex extends Duplex {
constructor(options) {
super(options);
// ...
}
}
または、pre-ES6スタイルのコンストラクターを使用する場合
const { Duplex } = require('node:stream');
const util = require('node:util');
function MyDuplex(options) {
if (!(this instanceof MyDuplex))
return new MyDuplex(options);
Duplex.call(this, options);
}
util.inherits(MyDuplex, Duplex);
または、簡略化されたコンストラクターアプローチを使用する場合
const { Duplex } = require('node:stream');
const myDuplex = new Duplex({
read(size) {
// ...
},
write(chunk, encoding, callback) {
// ...
},
});
パイプラインを使用する場合
const { Transform, pipeline } = require('node:stream');
const fs = require('node:fs');
pipeline(
fs.createReadStream('object.json')
.setEncoding('utf8'),
new Transform({
decodeStrings: false, // Accept string input rather than Buffers
construct(callback) {
this.data = '';
callback();
},
transform(chunk, encoding, callback) {
this.data += chunk;
callback();
},
flush(callback) {
try {
// Make sure is valid json.
JSON.parse(this.data);
this.push(this.data);
callback();
} catch (err) {
callback(err);
}
},
}),
fs.createWriteStream('valid-object.json'),
(err) => {
if (err) {
console.error('failed', err);
} else {
console.log('completed');
}
},
);
双方向ストリームの例#
以下は、データを書き込むことができ、データが読み取ることができる、Node.jsストリームと互換性のないAPIを使用しているものの、仮説的な低レベルソースオブジェクトをラップするDuplex
ストリームの簡単な例を示しています。以下は、Writable
インターフェイスを介して書き込まれた入力データをバッファリングし、Readable
インターフェイスを介して読み戻されるDuplex
ストリームの簡単な例を示しています。
const { Duplex } = require('node:stream');
const kSource = Symbol('source');
class MyDuplex extends Duplex {
constructor(source, options) {
super(options);
this[kSource] = source;
}
_write(chunk, encoding, callback) {
// The underlying source only deals with strings.
if (Buffer.isBuffer(chunk))
chunk = chunk.toString();
this[kSource].writeSomeData(chunk);
callback();
}
_read(size) {
this[kSource].fetchSomeData(size, (data, encoding) => {
this.push(Buffer.from(data, encoding));
});
}
}
Duplex
ストリームの最も重要な点は、Readable
側とWritable
側が、単一のオブジェクトインスタンス内に共存しているにもかかわらず、互いに独立して動作することです。
オブジェクトモードの双方向ストリーム#
Duplex
ストリームの場合、objectMode
は、readableObjectMode
オプションとwritableObjectMode
オプションを使用して、Readable
側またはWritable
側のいずれか専用に設定できます。
たとえば、次の例では、JavaScriptの数値を入力として受け取り、Readable
側で16進数の文字列に変換する、オブジェクトモードのWritable
側を持つ新しいTransform
ストリーム(Duplex
ストリームの一種)が作成されます。
const { Transform } = require('node:stream');
// All Transform streams are also Duplex Streams.
const myTransform = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
// Coerce the chunk to a number if necessary.
chunk |= 0;
// Transform the chunk into something else.
const data = chunk.toString(16);
// Push the data onto the readable queue.
callback(null, '0'.repeat(data.length % 2) + data);
},
});
myTransform.setEncoding('ascii');
myTransform.on('data', (chunk) => console.log(chunk));
myTransform.write(1);
// Prints: 01
myTransform.write(10);
// Prints: 0a
myTransform.write(100);
// Prints: 64
変換ストリームの実装#
Transform
ストリームは、出力が入力から何らかの方法で計算されるDuplex
ストリームです。例としては、データを圧縮、暗号化、または復号化するzlibストリームやcryptoストリームがあります。
出力が入力と同じサイズである必要も、同じ数のチャンクである必要も、同時に到着する必要もありません。たとえば、Hash
ストリームは、入力が終了したときに提供される単一の出力チャンクのみを持ちます。zlib
ストリームは、入力よりもはるかに小さいか、はるかに大きい出力を生成します。
stream.Transform
クラスは、Transform
ストリームを実装するために拡張されます。
stream.Transform
クラスは、プロトタイプ的にstream.Duplex
から継承し、writable._write()
メソッドとreadable._read()
メソッドの独自のバージョンを実装します。カスタムTransform
の実装では、transform._transform()
メソッドを実装する必要があり、transform._flush()
メソッドを実装してもよいです。
Transform
ストリームを使用する際には、ストリームに書き込まれたデータによって、Readable
側の出力が消費されない場合、ストリームのWritable
側が一時停止する可能性があることに注意する必要があります。
new stream.Transform([options])
#
options
<Object>Writable
とReadable
の両方のコンストラクタに渡されます。また、以下のフィールドがあります。transform
<Function>stream._transform()
メソッドの実装。flush
<Function>stream._flush()
メソッドの実装。
const { Transform } = require('node:stream');
class MyTransform extends Transform {
constructor(options) {
super(options);
// ...
}
}
または、pre-ES6スタイルのコンストラクターを使用する場合
const { Transform } = require('node:stream');
const util = require('node:util');
function MyTransform(options) {
if (!(this instanceof MyTransform))
return new MyTransform(options);
Transform.call(this, options);
}
util.inherits(MyTransform, Transform);
または、簡略化されたコンストラクターアプローチを使用する場合
const { Transform } = require('node:stream');
const myTransform = new Transform({
transform(chunk, encoding, callback) {
// ...
},
});
イベント: 'end'
#
'end'
イベントは、stream.Readable
クラスからのものです。 transform._flush()
のコールバックが呼び出された後、すべてのデータが出力された後に、'end'
イベントが発行されます。エラーが発生した場合、'end'
を発行しないでください。
イベント: 'finish'
#
'finish'
イベントは、stream.Writable
クラスからのものです。 stream.end()
が呼び出され、すべてのチャンクがstream._transform()
によって処理された後、'finish'
イベントが発行されます。エラーが発生した場合、'finish'
を発行しないでください。
transform._flush(callback)
#
callback
<Function> 残りのデータがフラッシュされたときに呼び出されるコールバック関数(オプションでエラー引数とデータ付き)。
この関数は、アプリケーションコードから直接呼び出してはなりません。子クラスによって実装される必要があり、内部の Readable
クラスメソッドによってのみ呼び出されます。
場合によっては、変換操作でストリームの最後にさらに少しのデータを発行する必要がある場合があります。たとえば、zlib
圧縮ストリームは、出力を最適に圧縮するために使用される内部状態の量を格納します。ただし、ストリームが終了すると、圧縮されたデータが完全になるように、その追加のデータをフラッシュする必要があります。
カスタムのTransform
実装は、transform._flush()
メソッドを実装してもよいです。これは、消費する書き込みデータがなくなったとき、ただし'end'
イベントがReadable
ストリームの終わりを示すために発行される前に呼び出されます。
transform._flush()
の実装内では、必要に応じて、transform.push()
メソッドを0回以上呼び出すことができます。フラッシュ操作が完了したら、callback
関数を呼び出す必要があります。
transform._flush()
メソッドには、それを定義するクラスの内部のものであり、ユーザープログラムによって直接呼び出されるべきではないため、アンダースコアが付いています。
transform._transform(chunk, encoding, callback)
#
chunk
<Buffer> | <string> | <any>stream.write()
に渡されたstring
から変換される、変換されるBuffer
。ストリームのdecodeStrings
オプションがfalse
であるか、ストリームがオブジェクトモードで動作している場合、チャンクは変換されず、stream.write()
に渡されたものになります。encoding
<string> チャンクが文字列の場合、これはエンコーディングタイプです。チャンクがバッファの場合、これは特別な値'buffer'
です。その場合は無視してください。callback
<Function> 指定されたchunk
が処理された後に呼び出されるコールバック関数(オプションでエラー引数とデータ付き)。
この関数は、アプリケーションコードから直接呼び出してはなりません。子クラスによって実装される必要があり、内部の Readable
クラスメソッドによってのみ呼び出されます。
すべてのTransform
ストリーム実装は、入力を受け入れて出力を生成する_transform()
メソッドを提供する必要があります。 transform._transform()
実装は、書き込まれるバイトを処理し、出力を計算し、transform.push()
メソッドを使用して、その出力を読み取り可能な部分に渡します。
チャンクの結果として出力する量に応じて、transform.push()
メソッドは、単一の入力チャンクから出力を生成するために、0回以上呼び出すことができます。
特定の入力データのチャンクから出力が生成されない可能性があります。
現在のチャンクが完全に消費された場合にのみ、callback
関数を呼び出す必要があります。callback
に渡される最初の引数は、入力の処理中にエラーが発生した場合はError
オブジェクトでなければならず、それ以外の場合はnull
でなければなりません。2番目の引数がcallback
に渡された場合、最初の引数が偽の場合にのみ、transform.push()
メソッドに転送されます。つまり、以下は同等です。
transform.prototype._transform = function(data, encoding, callback) {
this.push(data);
callback();
};
transform.prototype._transform = function(data, encoding, callback) {
callback(null, data);
};
transform._transform()
メソッドには、それを定義するクラスの内部のものであり、ユーザープログラムによって直接呼び出されるべきではないため、アンダースコアが付いています。
transform._transform()
が並行して呼び出されることはありません。ストリームはキューメカニズムを実装しており、次のチャンクを受け取るには、callback
を同期または非同期のいずれかで呼び出す必要があります。
クラス: stream.PassThrough
#
stream.PassThrough
クラスは、入力を出力に単純に渡すTransform
ストリームの簡単な実装です。その目的は主に例とテストですが、stream.PassThrough
が新しい種類のストリームの構成要素として役立つユースケースがいくつかあります。
追加の注意事項#
ストリームと非同期ジェネレーターおよび非同期イテレーターの互換性#
JavaScript における非同期ジェネレーターとイテレーターのサポートにより、非同期ジェネレーターは現時点で事実上、ファーストクラスの言語レベルのストリーム構成要素となっています。
以下に、Node.js ストリームを非同期ジェネレーターおよび非同期イテレーターとともに使用する一般的な相互運用ケースをいくつか示します。
非同期イテレーターによる読み取り可能なストリームの消費#
(async function() {
for await (const chunk of readable) {
console.log(chunk);
}
})();
非同期イテレーターは、破棄後の未処理エラーを防ぐために、ストリームに永続的なエラーハンドラーを登録します。
非同期ジェネレーターによる読み取り可能なストリームの作成#
Node.js の読み取り可能なストリームは、Readable.from()
ユーティリティメソッドを使用して、非同期ジェネレーターから作成できます。
const { Readable } = require('node:stream');
const ac = new AbortController();
const signal = ac.signal;
async function * generate() {
yield 'a';
await someLongRunningFn({ signal });
yield 'b';
yield 'c';
}
const readable = Readable.from(generate());
readable.on('close', () => {
ac.abort();
});
readable.on('data', (chunk) => {
console.log(chunk);
});
非同期イテレーターからの書き込み可能なストリームへのパイプ処理#
非同期イテレーターから書き込み可能なストリームに書き込む場合、バックプレッシャーとエラーの正しい処理を確保してください。 stream.pipeline()
は、バックプレッシャーとバックプレッシャー関連のエラーの処理を抽象化します。
const fs = require('node:fs');
const { pipeline } = require('node:stream');
const { pipeline: pipelinePromise } = require('node:stream/promises');
const writable = fs.createWriteStream('./file');
const ac = new AbortController();
const signal = ac.signal;
const iterator = createIterator({ signal });
// Callback Pattern
pipeline(iterator, writable, (err, value) => {
if (err) {
console.error(err);
} else {
console.log(value, 'value returned');
}
}).on('close', () => {
ac.abort();
});
// Promise Pattern
pipelinePromise(iterator, writable)
.then((value) => {
console.log(value, 'value returned');
})
.catch((err) => {
console.error(err);
ac.abort();
});
古い Node.js バージョンとの互換性#
Node.js 0.10 より前のバージョンでは、Readable
ストリームのインターフェースはより単純でしたが、機能は劣り、使い勝手も良くありませんでした。
stream.read()
メソッドの呼び出しを待つのではなく、'data'
イベントはすぐに発行を開始していました。データを処理する方法を決定するために何らかの処理を実行する必要があるアプリケーションは、データが失われないように、読み取ったデータをバッファーに格納する必要がありました。stream.pause()
メソッドは保証されたものではなく、助言的なものでした。これは、ストリームが一時停止状態の場合でも、'data'
イベントを受信する準備をする必要があったことを意味します。
Node.js 0.10 では、Readable
クラスが追加されました。古い Node.js プログラムとの下位互換性のために、Readable
ストリームは、'data'
イベントハンドラーが追加されたとき、または stream.resume()
メソッドが呼び出されたときに「フローモード」に切り替わります。その結果、新しい stream.read()
メソッドと 'readable'
イベントを使用していない場合でも、'data'
チャンクが失われる心配はなくなりました。
ほとんどのアプリケーションは正常に機能し続けますが、これは次の条件でエッジケースを導入します。
'data'
イベントリスナーが追加されていません。stream.resume()
メソッドは決して呼び出されません。- ストリームは書き込み可能な宛先にパイプされていません。
例として、次のコードを考えてみましょう。
// WARNING! BROKEN!
net.createServer((socket) => {
// We add an 'end' listener, but never consume the data.
socket.on('end', () => {
// It will never get here.
socket.end('The message was received but was not processed.\n');
});
}).listen(1337);
Node.js 0.10 より前のバージョンでは、受信したメッセージデータは単純に破棄されていました。ただし、Node.js 0.10 以降では、ソケットは永久に一時停止したままになります。
この状況での回避策は、stream.resume()
メソッドを呼び出してデータのフローを開始することです。
// Workaround.
net.createServer((socket) => {
socket.on('end', () => {
socket.end('The message was received but was not processed.\n');
});
// Start the flow of data, discarding it.
socket.resume();
}).listen(1337);
新しい Readable
ストリームがフローモードに切り替わるだけでなく、0.10 より前のスタイルのストリームは、readable.wrap()
メソッドを使用して Readable
クラスにラップできます。
readable.read(0)
#
実際にはデータを消費せずに、基になる読み取り可能なストリームメカニズムのリフレッシュをトリガーする必要がある場合があります。そのような場合、常に null
を返す readable.read(0)
を呼び出すことができます。
内部の読み取りバッファーが highWaterMark
を下回っていて、ストリームが現在読み取りを行っていない場合、stream.read(0)
を呼び出すと、低レベルの stream._read()
呼び出しがトリガーされます。
ほとんどのアプリケーションではこれを行う必要はほとんどありませんが、Node.js 内、特に Readable
ストリームクラスの内部でこれが行われる状況があります。
readable.push('')
#
readable.push('')
の使用は推奨されません。
ゼロバイトの文字列、Buffer
、または Uint8Array
をオブジェクトモードではないストリームにプッシュすると、興味深い副作用が発生します。これは readable.push()
の呼び出しであるため、この呼び出しで読み取りプロセスが終了します。ただし、引数が空の文字列であるため、読み取り可能バッファーにデータは追加されず、ユーザーが消費するものはありません。
readable.setEncoding()
を呼び出した後の highWaterMark
の不一致#
readable.setEncoding()
を使用すると、オブジェクトモードでない場合の highWaterMark
の動作が変更されます。
通常、現在のバッファーのサイズは バイト 単位で highWaterMark
と比較されます。ただし、setEncoding()
が呼び出されると、比較関数はバッファーのサイズを 文字 単位で測定し始めます。
これは latin1
や ascii
を使用する一般的なケースでは問題ありません。ただし、マルチバイト文字を含む可能性がある文字列を扱う場合は、この動作に注意することをお勧めします。