ストリーム[src]#

安定性: 2 - 安定

ソースコード: lib/stream.js

ストリームは、Node.jsでストリーミングデータを扱うための抽象的なインターフェースです。 node:streamモジュールは、ストリームインターフェースを実装するためのAPIを提供します。

Node.jsによって提供されるストリームオブジェクトは多数あります。たとえば、HTTPサーバーへのリクエストprocess.stdoutの両方がストリームインスタンスです。

ストリームは、読み取り可能、書き込み可能、またはその両方である可能性があります。すべてのストリームはEventEmitterのインスタンスです。

node:streamモジュールにアクセスするには

const stream = require('node:stream'); 

node:streamモジュールは、新しいタイプのストリームインスタンスを作成するのに役立ちます。通常、ストリームを消費するためにnode:streamモジュールを使用する必要はありません。

このドキュメントの構成#

このドキュメントには、主に2つのセクションと、ノート用の3番目のセクションが含まれています。最初のセクションでは、アプリケーション内で既存のストリームを使用する方法について説明します。2番目のセクションでは、新しいタイプのストリームを作成する方法について説明します。

ストリームの種類#

Node.jsには、4つの基本的なストリームの種類があります。

さらに、このモジュールには、ユーティリティ関数 stream.pipeline()stream.finished()stream.Readable.from() および stream.addAbortSignal() が含まれています。

ストリーム Promises API#

stream/promises APIは、コールバックを使用するのではなく、Promiseオブジェクトを返すストリーム用の代替の非同期ユーティリティ関数セットを提供します。 APIは、require('node:stream/promises')またはrequire('node:stream').promisesを介してアクセスできます。

stream.pipeline(source[, ...transforms], destination[, options])#

stream.pipeline(streams[, options])#

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');

async function run() {
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz'),
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

await pipeline(
  createReadStream('archive.tar'),
  createGzip(),
  createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');

AbortSignal を使用するには、オプションオブジェクトの最後の引数として渡します。シグナルが中止されると、基になるパイプラインで destroyAbortError とともに呼び出されます。

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');

async function run() {
  const ac = new AbortController();
  const signal = ac.signal;

  setImmediate(() => ac.abort());
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz'),
    { signal },
  );
}

run().catch(console.error); // AbortErrorimport { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
  await pipeline(
    createReadStream('archive.tar'),
    createGzip(),
    createWriteStream('archive.tar.gz'),
    { signal },
  );
} catch (err) {
  console.error(err); // AbortError
}

pipeline API は非同期ジェネレーターもサポートしています

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');

async function run() {
  await pipeline(
    fs.createReadStream('lowercase.txt'),
    async function* (source, { signal }) {
      source.setEncoding('utf8');  // Work with strings rather than `Buffer`s.
      for await (const chunk of source) {
        yield await processChunk(chunk, { signal });
      }
    },
    fs.createWriteStream('uppercase.txt'),
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';

await pipeline(
  createReadStream('lowercase.txt'),
  async function* (source, { signal }) {
    source.setEncoding('utf8');  // Work with strings rather than `Buffer`s.
    for await (const chunk of source) {
      yield await processChunk(chunk, { signal });
    }
  },
  createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

非同期ジェネレーターに渡された signal 引数を処理することを忘れないでください。特に、非同期ジェネレーターがパイプラインのソース(つまり、最初の引数)である場合、パイプラインは決して完了しません。

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');

async function run() {
  await pipeline(
    async function* ({ signal }) {
      await someLongRunningfn({ signal });
      yield 'asd';
    },
    fs.createWriteStream('uppercase.txt'),
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
  async function* ({ signal }) {
    await someLongRunningfn({ signal });
    yield 'asd';
  },
  fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

pipeline API は コールバックバージョン を提供します

stream.finished(stream[, options])#

const { finished } = require('node:stream/promises');
const fs = require('node:fs');

const rs = fs.createReadStream('archive.tar');

async function run() {
  await finished(rs);
  console.log('Stream is done reading.');
}

run().catch(console.error);
rs.resume(); // Drain the stream.import { finished } from 'node:stream/promises';
import { createReadStream } from 'node:fs';

const rs = createReadStream('archive.tar');

async function run() {
  await finished(rs);
  console.log('Stream is done reading.');
}

run().catch(console.error);
rs.resume(); // Drain the stream.

finished API は コールバックバージョン も提供します。

オブジェクトモード#

Node.js API で作成されたすべてのストリームは、文字列と Buffer (または Uint8Array) オブジェクトのみを操作します。ただし、ストリームの実装で他の種類の JavaScript 値(ストリーム内で特別な目的を果たす null を除く)を操作することは可能です。このようなストリームは「オブジェクトモード」で動作すると見なされます。

ストリームインスタンスは、ストリームの作成時に objectMode オプションを使用してオブジェクトモードに切り替えられます。既存のストリームをオブジェクトモードに切り替えようとするのは安全ではありません。

バッファリング#

Writable ストリームと Readable ストリームは両方とも、内部バッファにデータを格納します。

バッファリングされる可能性のあるデータ量は、ストリームのコンストラクターに渡される highWaterMark オプションによって異なります。通常のストリームの場合、highWaterMark オプションは 合計バイト数 を指定します。オブジェクトモードで動作するストリームの場合、highWaterMark はオブジェクトの合計数を指定します。

データは、実装が stream.push(chunk) を呼び出すと、Readable ストリームにバッファリングされます。ストリームのコンシューマーが stream.read() を呼び出さない場合、データは消費されるまで内部キューに留まります。

内部読み取りバッファの合計サイズが highWaterMark で指定されたしきい値に達すると、ストリームは現在バッファリングされているデータが消費されるまで、基になるリソースからのデータの読み取りを一時的に停止します (つまり、ストリームは読み取りバッファを埋めるために使用される内部 readable._read() メソッドの呼び出しを停止します)。

データは、writable.write(chunk) メソッドが繰り返し呼び出されると、Writable ストリームにバッファリングされます。内部書き込みバッファの合計サイズが highWaterMark で設定されたしきい値より小さい間、writable.write() の呼び出しは true を返します。内部バッファのサイズが highWaterMark に達するか、超えると、false が返されます。

stream.pipe() メソッドを含む、stream API の重要な目標は、さまざまな速度のソースと宛先が利用可能なメモリを圧倒しないように、データのバッファリングを許容可能なレベルに制限することです。

highWaterMark オプションはしきい値であり、制限ではありません。ストリームがそれ以上のデータの要求を停止する前にバッファリングするデータの量を決定します。一般に、厳密なメモリ制限を強制するものではありません。特定のストリーム実装では、より厳格な制限を強制することを選択できますが、そうすることはオプションです。

Duplex ストリームと Transform ストリームは両方とも Readable および Writable であるため、それぞれが読み取りと書き込みに使用される *2 つの* 別個の内部バッファを保持し、各側がデータの適切な効率的な流れを維持しながら、互いに独立して動作できるようにします。たとえば、net.Socket インスタンスは、ソケット *から* 受信したデータの消費を可能にする Readable 側と、ソケット *へ* のデータの書き込みを可能にする Writable 側を備えた Duplex ストリームです。データは受信されるよりも高速または低速でソケットに書き込まれる可能性があるため、各側は互いに独立して動作 (およびバッファリング) する必要があります。

内部バッファリングのメカニズムは内部実装の詳細であり、いつでも変更される可能性があります。ただし、特定の上級実装では、writable.writableBuffer または readable.readableBuffer を使用して内部バッファを取得できます。これらの文書化されていないプロパティの使用は推奨されません。

ストリームコンシューマー用のAPI#

ほとんどすべての Node.js アプリケーションは、どれほど単純であっても、何らかの方法でストリームを使用しています。以下は、HTTP サーバーを実装する Node.js アプリケーションでストリームを使用する例です

const http = require('node:http');

const server = http.createServer((req, res) => {
  // `req` is an http.IncomingMessage, which is a readable stream.
  // `res` is an http.ServerResponse, which is a writable stream.

  let body = '';
  // Get the data as utf8 strings.
  // If an encoding is not set, Buffer objects will be received.
  req.setEncoding('utf8');

  // Readable streams emit 'data' events once a listener is added.
  req.on('data', (chunk) => {
    body += chunk;
  });

  // The 'end' event indicates that the entire body has been received.
  req.on('end', () => {
    try {
      const data = JSON.parse(body);
      // Write back something interesting to the user:
      res.write(typeof data);
      res.end();
    } catch (er) {
      // uh oh! bad json!
      res.statusCode = 400;
      return res.end(`error: ${er.message}`);
    }
  });
});

server.listen(1337);

// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON 

Writable ストリーム (例の res など) は、ストリームにデータを書き込むために使用される write()end() などのメソッドを公開します。

Readable ストリームは、ストリームから読み取るデータが利用可能になったときにアプリケーションコードに通知するために EventEmitter API を使用します。利用可能なデータは、複数の方法でストリームから読み取ることができます。

Writable ストリームと Readable ストリームはどちらも、ストリームの現在の状態を伝達するために EventEmitter API をさまざまな方法で使用します。

Duplex ストリームと Transform ストリームはどちらも Writable および Readable です。

ストリームへのデータの書き込みまたはストリームからのデータの消費を行うアプリケーションは、ストリームインターフェイスを直接実装する必要はなく、通常は require('node:stream') を呼び出す理由はありません。

新しいタイプのストリームを実装したい開発者は、ストリーム実装者用のAPI のセクションを参照してください。

書き込み可能ストリーム#

書き込み可能ストリームは、データが書き込まれる *宛先* の抽象化です。

Writable ストリームの例を次に示します

これらの例の一部は、実際には Writable インターフェイスを実装する Duplex ストリームです。

すべての Writable ストリームは、stream.Writable クラスによって定義されたインターフェイスを実装します。

Writable ストリームの特定のインスタンスはさまざまな点で異なる場合がありますが、すべての Writable ストリームは、以下の例に示すように同じ基本的な使用パターンに従います

const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data'); 
クラス: stream.Writable#
イベント: 'close'#

ストリームとその基になるリソース (たとえば、ファイル記述子) が閉じられたときに、'close' イベントが発行されます。このイベントは、これ以上のイベントが発行されず、これ以上の計算が行われないことを示します。

Writable ストリームは、emitClose オプションを使用して作成された場合、常に 'close' イベントを発行します。

イベント: 'drain'#

stream.write(chunk) の呼び出しが false を返した場合、ストリームへのデータ書き込みを再開するのが適切なタイミングになったときに 'drain' イベントが発生します。

// Write the data to the supplied writable stream one million times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000;
  write();
  function write() {
    let ok = true;
    do {
      i--;
      if (i === 0) {
        // Last time!
        writer.write(data, encoding, callback);
      } else {
        // See if we should continue, or wait.
        // Don't pass the callback, because we're not done yet.
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // Had to stop early!
      // Write some more once it drains.
      writer.once('drain', write);
    }
  }
} 
イベント: 'error'#

データの書き込みまたはパイプ処理中にエラーが発生した場合、'error' イベントが発生します。リスナーのコールバックには、呼び出される際に単一の Error 引数が渡されます。

ストリームの作成時に autoDestroy オプションが false に設定されていない限り、'error' イベントが発生するとストリームは閉じられます。

'error' の後、'close' 以外のイベント('error' イベントを含む)は発生しないはずです。

イベント: 'finish'#

stream.end() メソッドが呼び出され、すべてのデータが基礎となるシステムにフラッシュされた後、'finish' イベントが発生します。

const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`);
}
writer.on('finish', () => {
  console.log('All writes are now complete.');
});
writer.end('This is the end\n'); 
イベント: 'pipe'#
  • src <stream.Readable> この書き込み可能なストリームにパイプしているソースストリーム

stream.pipe() メソッドが Readable ストリームで呼び出され、この書き込み可能なストリームが宛先セットに追加されると、'pipe' イベントが発生します。

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
  console.log('Something is piping into the writer.');
  assert.equal(src, reader);
});
reader.pipe(writer); 
イベント: 'unpipe'#

stream.unpipe() メソッドが Readable ストリームで呼び出され、この Writable ストリームが宛先セットから削除されると、'unpipe' イベントが発生します。

この Writable ストリームに Readable ストリームがパイプ処理中にエラーを発生させた場合にも、このイベントが発生します。

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
  console.log('Something has stopped piping into the writer.');
  assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer); 
writable.cork()#

writable.cork() メソッドは、書き込まれたすべてのデータをメモリにバッファリングすることを強制します。バッファリングされたデータは、stream.uncork() または stream.end() メソッドのいずれかが呼び出されたときにフラッシュされます。

writable.cork() の主な目的は、複数の小さなチャンクが高速にストリームに書き込まれる状況に対応することです。それらを即座に基礎となる宛先に転送するのではなく、writable.cork()writable.uncork() が呼び出されるまですべてのチャンクをバッファリングし、存在する場合はそれらすべてを writable._writev() に渡します。これにより、最初の小さなチャンクが処理されるのを待機している間、データがバッファリングされるヘッドオブラインブロッキング状況を防ぎます。ただし、writable._writev() を実装せずに writable.cork() を使用すると、スループットに悪影響を及ぼす可能性があります。

参照: writable.uncork(), writable._writev()

writable.destroy([error])#
  • error <Error> オプション、'error' イベントで発生するエラー。
  • 戻り値: <this>

ストリームを破棄します。オプションで 'error' イベントを発生させ、'close' イベントを発生させます(emitClosefalse に設定されていない場合)。この呼び出し後、書き込み可能なストリームは終了し、後続の write() または end() の呼び出しは ERR_STREAM_DESTROYED エラーになります。これは、ストリームを破棄するための破壊的で即時の方法です。以前の write() の呼び出しがドレインされていない場合があり、ERR_STREAM_DESTROYED エラーをトリガーする可能性があります。データをフラッシュしてから閉じる必要がある場合、またはストリームを破棄する前に 'drain' イベントを待機する場合は、destroy の代わりに end() を使用してください。

const { Writable } = require('node:stream');

const myStream = new Writable();

const fooErr = new Error('foo error');
myStream.destroy(fooErr);
myStream.on('error', (fooErr) => console.error(fooErr.message)); // foo error 
const { Writable } = require('node:stream');

const myStream = new Writable();

myStream.destroy();
myStream.on('error', function wontHappen() {}); 
const { Writable } = require('node:stream');

const myStream = new Writable();
myStream.destroy();

myStream.write('foo', (error) => console.error(error.code));
// ERR_STREAM_DESTROYED 

destroy() が呼び出されると、それ以上の呼び出しは no-op になり、_destroy() からのエラーを除いて、それ以上のエラーは 'error' として発生しない可能性があります。

実装者はこのメソッドをオーバーライドするのではなく、代わりに writable._destroy() を実装する必要があります。

writable.closed#

'close' が発生した後、true です。

writable.destroyed#

writable.destroy() が呼び出された後、true です。

const { Writable } = require('node:stream');

const myStream = new Writable();

console.log(myStream.destroyed); // false
myStream.destroy();
console.log(myStream.destroyed); // true 
writable.end([chunk[, encoding]][, callback])#
  • chunk <string> | <Buffer> | <Uint8Array> | <any> 書き込むオプションのデータ。オブジェクトモードで動作していないストリームの場合、chunk は文字列、Buffer または Uint8Array である必要があります。オブジェクトモードのストリームの場合、chunknull 以外の任意の JavaScript 値にすることができます。
  • encoding <string> chunk が文字列の場合のエンコーディング
  • callback <Function> ストリームが終了したときに呼び出すコールバック。
  • 戻り値: <this>

writable.end() メソッドを呼び出すと、Writable にこれ以上データが書き込まれないことを通知します。オプションの chunk および encoding 引数を使用すると、ストリームを閉じる直前に追加の最後のデータチャンクを書き込むことができます。

stream.end() を呼び出した後に stream.write() メソッドを呼び出すと、エラーが発生します。

// Write 'hello, ' and then end with 'world!'.
const fs = require('node:fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// Writing more now is not allowed! 
writable.setDefaultEncoding(encoding)#
  • encoding <string> 新しいデフォルトのエンコーディング
  • 戻り値: <this>

writable.setDefaultEncoding() メソッドは、Writable ストリームのデフォルトの encoding を設定します。

writable.uncork()#

writable.uncork() メソッドは、stream.cork() が呼び出されてからバッファリングされたすべてのデータをフラッシュします。

writable.cork() および writable.uncork() を使用してストリームへの書き込みのバッファリングを管理する場合は、process.nextTick() を使用して writable.uncork() の呼び出しを遅延させます。これにより、特定の Node.js イベントループフェーズ内で発生するすべての writable.write() の呼び出しをバッチ処理できます。

stream.cork();
stream.write('some ');
stream.write('data ');
process.nextTick(() => stream.uncork()); 

ストリームで writable.cork() メソッドが複数回呼び出された場合は、バッファリングされたデータをフラッシュするために同じ回数だけ writable.uncork() を呼び出す必要があります。

stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
  stream.uncork();
  // The data will not be flushed until uncork() is called a second time.
  stream.uncork();
}); 

参照: writable.cork()

writable.writable#

ストリームが破棄、エラー発生、または終了していないことを意味し、writable.write() を呼び出すのが安全な場合、true です。

writable.writableAborted#

安定性: 1 - 実験的

'finish' を発生させる前にストリームが破棄されたか、エラーが発生したかを返します。

writable.writableEnded#

writable.end() が呼び出された後、true です。このプロパティは、データがフラッシュされたかどうかを示すものではありません。これには、代わりに writable.writableFinished を使用してください。

writable.writableCorked#

ストリームのコルクを完全に取り外すために writable.uncork() を呼び出す必要がある回数。

writable.errored#

ストリームがエラーで破棄された場合、エラーを返します。

writable.writableFinished#

'finish' イベントが発生する直前に true に設定されます。

writable.writableHighWaterMark#

この Writable の作成時に渡された highWaterMark の値を返します。

writable.writableLength#

このプロパティには、書き込み準備が整ったキュー内のバイト数(またはオブジェクト数)が含まれます。この値は、highWaterMark の状態に関するイントロスペクションデータを提供します。

writable.writableNeedDrain#

ストリームのバッファが満杯になり、ストリームが 'drain' を発行する場合は true です。

writable.writableObjectMode#

指定された Writable ストリームの objectMode プロパティのゲッター。

writable.write(chunk[, encoding][, callback])#
  • chunk <string> | <Buffer> | <Uint8Array> | <any> 書き込むオプションのデータ。オブジェクトモードで動作していないストリームの場合、chunk は文字列、Buffer または Uint8Array である必要があります。オブジェクトモードのストリームの場合、chunknull 以外の任意の JavaScript 値にすることができます。
  • encoding <string> | <null> chunk が文字列の場合のエンコーディング。デフォルト: 'utf8'
  • callback <Function> このデータのチャンクがフラッシュされたときのコールバック。
  • 戻り値: <boolean> 追加のデータの書き込みを続行する前に、ストリームが 'drain' イベントが発行されるのを待つことを呼び出し元のコードに要求する場合は false。それ以外の場合は true

writable.write() メソッドはストリームにデータを書き込み、データが完全に処理されたら、指定された callback を呼び出します。エラーが発生した場合、callback はエラーを最初の引数として呼び出されます。callback は非同期的に呼び出され、'error' が発行される前に呼び出されます。

戻り値は、chunk を受け入れた後、内部バッファがストリームの作成時に構成された highWaterMark より小さい場合は true です。false が返された場合、'drain' イベントが発行されるまで、ストリームへの追加のデータの書き込みを停止する必要があります。

ストリームがドレインされていない間、write() の呼び出しは chunk をバッファリングし、false を返します。現在バッファリングされているすべてのチャンクがドレインされる(オペレーティングシステムによる配信が受け入れられる)と、'drain' イベントが発行されます。write() が false を返したら、'drain' イベントが発行されるまで、それ以上のチャンクを書き込まないでください。ドレインされていないストリームで write() を呼び出すことは許可されていますが、Node.js は最大メモリ使用量が発生するまですべての書き込まれたチャンクをバッファリングし、その時点で無条件に中止します。中止する前であっても、メモリ使用量が多いと、ガベージコレクターのパフォーマンスが低下し、高い RSS (通常、メモリが不要になった後でもシステムに解放されない) が発生します。TCPソケットはリモートピアがデータを読み取らない場合、ドレインされない可能性があるため、ドレインされていないソケットを書き込むと、リモートから悪用可能な脆弱性につながる可能性があります。

ストリームがドレインされていない間にデータを書き込むことは、Transform にとって特に問題があります。これは、Transform ストリームは、パイプ処理されるか、'data' または 'readable' イベントハンドラーが追加されるまで、デフォルトで一時停止されるためです。

書き込むデータがオンデマンドで生成またはフェッチできる場合は、ロジックを Readable にカプセル化し、stream.pipe() を使用することをお勧めします。ただし、write() を呼び出すことを優先する場合は、'drain' イベントを使用してバックプレッシャーを尊重し、メモリの問題を回避することが可能です。

function write(data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb);
  } else {
    process.nextTick(cb);
  }
}

// Wait for cb to be called before doing any other write.
write('hello', () => {
  console.log('Write completed, do more writes now.');
}); 

オブジェクトモードの Writable ストリームは、常に encoding 引数を無視します。

読み取り可能ストリーム#

読み取り可能ストリームは、データが消費されるソースの抽象化です。

Readable ストリームの例としては、次のようなものがあります。

すべての Readable ストリームは、stream.Readable クラスによって定義されたインターフェイスを実装します。

2つの読み取りモード#

Readable ストリームは、事実上、フローモードと一時停止モードの2つのモードのいずれかで動作します。これらのモードは、オブジェクトモードとは異なります。Readable ストリームは、フローモードまたは一時停止モードのどちらであるかに関係なく、オブジェクトモードであるか、そうでない場合があります。

  • フローモードでは、データは基盤となるシステムから自動的に読み取られ、EventEmitter インターフェイスを介してイベントを使用して可能な限り迅速にアプリケーションに提供されます。

  • 一時停止モードでは、ストリームからデータのチャンクを読み取るために、stream.read() メソッドを明示的に呼び出す必要があります。

すべての Readable ストリームは一時停止モードで開始されますが、次のいずれかの方法でフローモードに切り替えることができます

Readable は、次のいずれかを使用して一時停止モードに戻すことができます

  • パイプの宛先がない場合は、stream.pause() メソッドを呼び出すこと。
  • パイプの宛先がある場合は、すべてのパイプの宛先を削除すること。複数のパイプの宛先は、stream.unpipe() メソッドを呼び出すことで削除できます。

覚えておくべき重要な概念は、Readable は、そのデータを消費または無視するためのメカニズムが提供されるまで、データを生成しないということです。消費メカニズムが無効または取り外された場合、Readable はデータの生成を停止する試行を行います。

後方互換性の理由から、'data' イベントハンドラーを削除しても、ストリームは自動的に一時停止されません。また、パイプされた宛先がある場合、stream.pause() を呼び出しても、それらの宛先がドレインされて、さらにデータを要求した後、ストリームが一時停止されたままになることは保証されません。

Readable がフローモードに切り替えられ、データを処理できるコンシューマーがない場合、そのデータは失われます。これは、たとえば、'data' イベントにリスナーがアタッチされていない状態で readable.resume() メソッドが呼び出された場合や、'data' イベントハンドラーがストリームから削除された場合に発生する可能性があります。

'readable' イベントハンドラーを追加すると、ストリームが自動的にフローを停止し、データは readable.read() を介して消費する必要があります。'readable' イベントハンドラーが削除された場合、'data' イベントハンドラーがあれば、ストリームは再びフローを開始します。

3つの状態#

Readable ストリームの「2つのモード」の動作は、Readable ストリームの実装内で発生する、より複雑な内部状態管理を簡略化した抽象化です。

具体的には、任意の時点で、すべての Readable は、次の3つの可能な状態のいずれかにあります。

  • readable.readableFlowing === null
  • readable.readableFlowing === false
  • readable.readableFlowing === true

readable.readableFlowingnull の場合、ストリームのデータを消費するメカニズムは提供されていません。したがって、ストリームはデータを生成しません。この状態では、'data' イベントのリスナーをアタッチしたり、readable.pipe() メソッドを呼び出したり、readable.resume() メソッドを呼び出すと、readable.readableFlowingtrue に切り替わり、データが生成されると Readable が積極的にイベントを発行し始めます。

readable.pause()readable.unpipe() を呼び出すか、バックプレッシャーを受け取ると、readable.readableFlowingfalse に設定され、イベントのフローが一時的に停止しますが、データの生成は停止しません。この状態では、'data' イベントのリスナーをアタッチしても、readable.readableFlowingtrue に切り替わることはありません。

const { PassThrough, Writable } = require('node:stream');
const pass = new PassThrough();
const writable = new Writable();

pass.pipe(writable);
pass.unpipe(writable);
// readableFlowing is now false.

pass.on('data', (chunk) => { console.log(chunk.toString()); });
// readableFlowing is still false.
pass.write('ok');  // Will not emit 'data'.
pass.resume();     // Must be called to make stream emit 'data'.
// readableFlowing is now true. 

readable.readableFlowingfalse の間、データはストリームの内部バッファ内に蓄積される可能性があります。

1つのAPIスタイルを選択する#

Readable ストリームAPIは、複数のNode.jsバージョンにわたって進化し、ストリームデータを消費するための複数のメソッドを提供しています。一般的に、開発者はデータの消費方法の1つを選択し、単一のストリームからデータを消費するために複数のメソッドを使用すべきではありません。具体的には、on('data')on('readable')pipe()、または非同期イテレーターを組み合わせて使用すると、直感的でない動作につながる可能性があります。

クラス: stream.Readable#
イベント: 'close'#

ストリームとその基になるリソース (たとえば、ファイル記述子) が閉じられたときに、'close' イベントが発行されます。このイベントは、これ以上のイベントが発行されず、これ以上の計算が行われないことを示します。

Readable ストリームは、emitClose オプションを使用して作成された場合、常に 'close' イベントを発行します。

イベント: 'data'#
  • chunk <Buffer> | <string> | <any> データのチャンク。オブジェクトモードで動作していないストリームの場合、チャンクは文字列または Buffer のいずれかになります。オブジェクトモードのストリームの場合、チャンクは null 以外の任意の JavaScript 値にすることができます。

'data' イベントは、ストリームがデータのチャンクの所有権をコンシューマーに譲渡するたびに発行されます。これは、readable.pipe()readable.resume() を呼び出すか、'data' イベントにリスナーコールバックをアタッチしてストリームがフローモードに切り替えられるたびに発生する可能性があります。'data' イベントは、readable.read() メソッドが呼び出され、返されるデータのチャンクが利用可能な場合にも発行されます。

明示的に一時停止されていないストリームに 'data' イベントリスナーをアタッチすると、ストリームがフローモードに切り替わります。データは利用可能になるとすぐに渡されます。

リスナーのコールバックには、readable.setEncoding() メソッドを使用してストリームにデフォルトのエンコーディングが指定されている場合は、データのチャンクが文字列として渡されます。それ以外の場合は、データは Buffer として渡されます。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
}); 
イベント: 'end'#

'end' イベントは、ストリームから消費するデータがなくなったときに発行されます。

'end' イベントは、データが完全に消費されない限り **発行されません**。これは、ストリームをフローモードに切り替えるか、すべてのデータが消費されるまで stream.read() を繰り返し呼び出すことで実現できます。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
  console.log('There will be no more data.');
}); 
イベント: 'error'#

'error' イベントは、Readable の実装によっていつでも発行される可能性があります。通常、これは、基礎となるストリームが内部的なエラーのためにデータを生成できない場合、またはストリームの実装が無効なデータのチャンクをプッシュしようとした場合に発生する可能性があります。

リスナーのコールバックには、単一の Error オブジェクトが渡されます。

イベント: 'pause'#

'pause' イベントは、stream.pause() が呼び出され、readableFlowingfalse でない場合に発行されます。

イベント: 'readable'#

'readable' イベントは、ストリームから読み取るデータがある場合、またはストリームの終わりに達した場合に発行されます。事実上、'readable' イベントは、ストリームに新しい情報があることを示します。データが利用可能な場合、stream.read() はそのデータを返します。

const readable = getReadableStreamSomehow();
readable.on('readable', function() {
  // There is some data to read now.
  let data;

  while ((data = this.read()) !== null) {
    console.log(data);
  }
}); 

ストリームの終わりに達した場合、stream.read() を呼び出すと null が返され、'end' イベントがトリガーされます。これは、読み取るデータがまったくなかった場合も同様です。たとえば、次の例では、foo.txt は空のファイルです。

const fs = require('node:fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
  console.log(`readable: ${rr.read()}`);
});
rr.on('end', () => {
  console.log('end');
}); 

このスクリプトを実行した場合の出力は次のとおりです。

$ node test.js
readable: null
end 

場合によっては、'readable' イベントのリスナーをアタッチすると、ある程度のデータが内部バッファーに読み込まれることがあります。

一般に、readable.pipe() および 'data' イベントのメカニズムは、'readable' イベントよりも理解しやすいです。ただし、'readable' を処理すると、スループットが向上する可能性があります。

'readable''data' の両方が同時に使用される場合、フローの制御において 'readable' が優先されます。つまり、stream.read() が呼び出された場合にのみ 'data' が発行されます。readableFlowing プロパティは false になります。'readable' が削除されたときに 'data' リスナーがある場合、ストリームはフローを開始します。つまり、.resume() を呼び出さなくても 'data' イベントが発行されます。

イベント: 'resume'#

'resume' イベントは、stream.resume() が呼び出され、readableFlowingtrue でない場合に発行されます。

readable.destroy([error])#
  • error <Error> 'error' イベントでペイロードとして渡されるエラー
  • 戻り値: <this>

ストリームを破棄します。必要に応じて、'error' イベントを発行し、'close' イベントを発行します(emitClosefalse に設定されていない場合)。この呼び出しの後、読み取り可能ストリームは内部リソースを解放し、後続の push() の呼び出しは無視されます。

destroy() が呼び出されると、それ以上の呼び出しは no-op になり、_destroy() からのエラーを除いて、それ以上のエラーは 'error' として発生しない可能性があります。

実装者はこのメソッドをオーバーライドするべきではありません。代わりに readable._destroy() を実装する必要があります。

readable.closed#

'close' が発生した後、true です。

readable.destroyed#

readable.destroy() が呼び出された後、true になります。

readable.isPaused()#

readable.isPaused() メソッドは、Readable の現在の動作状態を返します。これは主に、readable.pipe() メソッドの基礎となるメカニズムで使用されます。ほとんどの一般的なケースでは、このメソッドを直接使用する理由はありません。

const readable = new stream.Readable();

readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false 
readable.pause()#

readable.pause() メソッドは、フローモードのストリームが 'data' イベントの発行を停止させ、フローモードから切り替えます。利用可能になったデータは、内部バッファーに残ります。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
  readable.pause();
  console.log('There will be no additional data for 1 second.');
  setTimeout(() => {
    console.log('Now data will start flowing again.');
    readable.resume();
  }, 1000);
}); 

readable.pause() メソッドは、'readable' イベントリスナーがある場合は効果がありません。

readable.pipe(destination[, options])#
  • destination <stream.Writable> データの書き込み先
  • options <Object> パイプオプション
    • end <boolean> リーダーが終了したときにライターを終了します。デフォルト: true
  • 戻り値: <stream.Writable> destination は、Duplex または Transform ストリームである場合、パイプのチェーンを可能にします。

readable.pipe() メソッドは、Writable ストリームを readable にアタッチし、自動的にフローモードに切り替えて、すべてのデータをアタッチされた Writable にプッシュします。データのフローは自動的に管理され、宛先 Writable ストリームがより高速な Readable ストリームに圧倒されないようにします。

次の例では、readable からのすべてのデータを file.txt という名前のファイルにパイプします。

const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'.
readable.pipe(writable); 

複数の Writable ストリームを単一の Readable ストリームにアタッチできます。

readable.pipe() メソッドは、destination ストリームへの参照を返し、パイプされたストリームのチェーンを設定することを可能にします。

const fs = require('node:fs');
const zlib = require('node:zlib');
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w); 

デフォルトでは、ソース Readable ストリームが 'end' を発行したときに、宛先 Writable ストリームで stream.end() が呼び出され、宛先が書き込み可能でなくなります。このデフォルトの動作を無効にするには、end オプションを false として渡すと、宛先ストリームが開いたままになります。

reader.pipe(writer, { end: false });
reader.on('end', () => {
  writer.end('Goodbye\n');
}); 

1つの重要な注意点は、Readable ストリームが処理中にエラーを発行した場合、Writable 宛先は自動的に *閉じられない* ことです。エラーが発生した場合、メモリリークを防ぐために、各ストリームを *手動で* 閉じる必要があります。

process.stderr および process.stdout Writable ストリームは、指定されたオプションに関係なく、Node.js プロセスが終了するまで閉じられません。

readable.read([size])#

readable.read() メソッドは、内部バッファーからデータを読み取り、返します。読み取るデータがない場合、null が返されます。デフォルトでは、readable.setEncoding() メソッドを使用してエンコーディングが指定されているか、ストリームがオブジェクトモードで動作している場合を除き、データは Buffer オブジェクトとして返されます。

オプションの size 引数は、読み取る特定のバイト数を指定します。size バイトが読み取れない場合、ストリームが終了した場合 *を除き*、null が返されます。その場合は、内部バッファーに残っているすべてのデータが返されます。

size 引数が指定されていない場合、内部バッファーに含まれるすべてのデータが返されます。

size 引数は 1 GiB 以下である必要があります。

readable.read() メソッドは、ポーズモードで動作する Readable ストリームでのみ呼び出す必要があります。フローモードでは、内部バッファーが完全に排出されるまで、readable.read() が自動的に呼び出されます。

const readable = getReadableStreamSomehow();

// 'readable' may be triggered multiple times as data is buffered in
readable.on('readable', () => {
  let chunk;
  console.log('Stream is readable (new data received in buffer)');
  // Use a loop to make sure we read all currently available data
  while (null !== (chunk = readable.read())) {
    console.log(`Read ${chunk.length} bytes of data...`);
  }
});

// 'end' will be triggered once when there is no more data available
readable.on('end', () => {
  console.log('Reached end of stream.');
}); 

readable.read() を呼び出すたびに、データのチャンクまたは null が返されます。チャンクは連結されません。バッファー内のすべてのデータを消費するには、while ループが必要です。大きなファイルを読み込むとき、.read() は、バッファリングされたコンテンツをすべて消費して null を返す場合がありますが、まだバッファリングされていないデータがまだあります。この場合、バッファーにさらにデータがある場合は、新しい 'readable' イベントが発行されます。最後に、データがなくなる と 'end' イベントが発行されます。

したがって、readable からファイルのすべてのコンテンツを読み取るには、複数の 'readable' イベントにわたってチャンクを収集する必要があります。

const chunks = [];

readable.on('readable', () => {
  let chunk;
  while (null !== (chunk = readable.read())) {
    chunks.push(chunk);
  }
});

readable.on('end', () => {
  const content = chunks.join('');
}); 

オブジェクトモードの Readable ストリームは、size 引数の値に関係なく、readable.read(size) の呼び出しから常に単一の項目を返します。

readable.read() メソッドがデータのチャンクを返した場合、'data' イベントも発行されます。

'end' イベントが発行された後に stream.read([size]) を呼び出すと、null が返されます。ランタイムエラーは発生しません。

readable.readable#

readable.read() を呼び出しても安全な場合、つまり、ストリームが破棄されていないか、'error' または 'end' が発行されていない場合は true になります。

readable.readableAborted#

安定性: 1 - 実験的

ストリームが 'end' を発行する前に破棄されたか、エラーが発生したかどうかを返します。

readable.readableDidRead#

安定性: 1 - 実験的

'data' が発行されたかどうかを返します。

readable.readableEncoding#

与えられたReadableストリームのプロパティencodingを取得するゲッターです。encodingプロパティは、readable.setEncoding()メソッドを使用して設定できます。

readable.readableEnded#

'end'イベントが発行されるとtrueになります。

readable.errored#

ストリームがエラーで破棄された場合、エラーを返します。

readable.readableFlowing#

このプロパティは、3つの状態セクションで説明されているReadableストリームの現在の状態を反映します。

readable.readableHighWaterMark#

このReadableの作成時に渡されたhighWaterMarkの値を返します。

readable.readableLength#

このプロパティには、読み取り準備ができたキュー内のバイト数(またはオブジェクト数)が含まれています。この値は、highWaterMarkの状態に関するイントロスペクションデータを提供します。

readable.readableObjectMode#

与えられたReadableストリームのプロパティobjectModeを取得するゲッターです。

readable.resume()#

readable.resume()メソッドは、明示的に一時停止されたReadableストリームに'data'イベントの発行を再開させ、ストリームをフローモードに切り替えます。

readable.resume()メソッドは、データを実際に処理することなく、ストリームからデータを完全に消費するために使用できます。

getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('Reached the end, but did not read anything.');
  }); 

'readable'イベントリスナーがある場合、readable.resume()メソッドは効果がありません。

readable.setEncoding(encoding)#
  • encoding <string> 使用するエンコーディング。
  • 戻り値: <this>

readable.setEncoding()メソッドは、Readableストリームから読み取られたデータの文字エンコーディングを設定します。

デフォルトでは、エンコーディングは割り当てられず、ストリームデータはBufferオブジェクトとして返されます。エンコーディングを設定すると、ストリームデータはBufferオブジェクトではなく、指定されたエンコーディングの文字列として返されます。たとえば、readable.setEncoding('utf8')を呼び出すと、出力データはUTF-8データとして解釈され、文字列として渡されます。readable.setEncoding('hex')を呼び出すと、データは16進文字列形式でエンコードされます。

Readableストリームは、ストリームからBufferオブジェクトとして単純に取り出した場合には不適切にデコードされる可能性のある、ストリームを通じて配信されるマルチバイト文字を適切に処理します。

const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
  assert.equal(typeof chunk, 'string');
  console.log('Got %d characters of string data:', chunk.length);
}); 
readable.unpipe([destination])#

readable.unpipe()メソッドは、以前にstream.pipe()メソッドを使用してアタッチされたWritableストリームをデタッチします。

destinationが指定されていない場合、すべてのパイプがデタッチされます。

destinationが指定されているが、パイプが設定されていない場合、メソッドは何もしません。

const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second.
readable.pipe(writable);
setTimeout(() => {
  console.log('Stop writing to file.txt.');
  readable.unpipe(writable);
  console.log('Manually close the file stream.');
  writable.end();
}, 1000); 
readable.unshift(chunk[, encoding])#
  • chunk <Buffer> | <Uint8Array> | <string> | <null> | <any> 読み取りキューにunshiftするデータのチャンク。オブジェクトモードで動作しないストリームの場合、chunkは文字列、BufferUint8Array、またはnullである必要があります。オブジェクトモードのストリームの場合、chunkは任意のJavaScript値にすることができます。
  • encoding <string> 文字列チャンクのエンコーディング。'utf8''ascii'などの有効なBufferエンコーディングである必要があります。

chunknullとして渡すと、ストリームの終了(EOF)が通知され、readable.push(null)と同じように動作し、その後はデータを書き込むことはできません。EOF信号はバッファの最後に追加され、バッファリングされたデータはすべてフラッシュされます。

readable.unshift()メソッドは、データのチャンクを内部バッファに戻します。これは、ストリームが、ソースから楽観的に取り出したデータを「アンコンシューム」する必要があるコードによって消費され、そのデータを他の関係者に渡す必要がある特定の状況で役立ちます。

stream.unshift(chunk)メソッドは、'end'イベントが発行された後は呼び出すことができず、ランタイムエラーがスローされます。

stream.unshift()を使用している開発者は、代わりにTransformストリームの使用に切り替えることを検討する必要があります。詳細については、ストリーム実装者のためのAPIセクションを参照してください。

// Pull off a header delimited by \n\n.
// Use unshift() if we get too much.
// Call the callback with (error, header, stream).
const { StringDecoder } = require('node:string_decoder');
function parseHeader(stream, callback) {
  stream.on('error', callback);
  stream.on('readable', onReadable);
  const decoder = new StringDecoder('utf8');
  let header = '';
  function onReadable() {
    let chunk;
    while (null !== (chunk = stream.read())) {
      const str = decoder.write(chunk);
      if (str.includes('\n\n')) {
        // Found the header boundary.
        const split = str.split(/\n\n/);
        header += split.shift();
        const remaining = split.join('\n\n');
        const buf = Buffer.from(remaining, 'utf8');
        stream.removeListener('error', callback);
        // Remove the 'readable' listener before unshifting.
        stream.removeListener('readable', onReadable);
        if (buf.length)
          stream.unshift(buf);
        // Now the body of the message can be read from the stream.
        callback(null, header, stream);
        return;
      }
      // Still reading the header.
      header += str;
    }
  }
} 

stream.push(chunk)とは異なり、stream.unshift(chunk)は、ストリームの内部読み取り状態をリセットすることによって読み取りプロセスを終了しません。これは、読み取り中にreadable.unshift()が呼び出された場合(つまり、カスタムストリームのstream._read()実装内から)、予期しない結果を引き起こす可能性があります。readable.unshift()の呼び出しに続いて、すぐにstream.push('')を呼び出すと、読み取り状態が適切にリセットされますが、読み取りを実行中にreadable.unshift()の呼び出しを避けるのが最善です。

readable.wrap(stream)#
  • stream <Stream> "旧式"の読み取り可能なストリーム
  • 戻り値: <this>

Node.js 0.10より前は、ストリームは現在定義されているnode:streamモジュールAPI全体を実装していませんでした。(詳細については、互換性を参照してください。)

'data'イベントを発行し、アドバイザリのみのstream.pause()メソッドを持つ古いNode.jsライブラリを使用する場合、readable.wrap()メソッドを使用して、古いストリームをデータソースとして使用するReadableストリームを作成できます。

readable.wrap()を使用する必要はめったにありませんが、このメソッドは古いNode.jsアプリケーションおよびライブラリと対話するための便利な手段として提供されています。

const { OldReader } = require('./old-api-module.js');
const { Readable } = require('node:stream');
const oreader = new OldReader();
const myReader = new Readable().wrap(oreader);

myReader.on('readable', () => {
  myReader.read(); // etc.
}); 
readable[Symbol.asyncIterator]()#
  • 戻り値: ストリームを完全に消費するための<AsyncIterator>
const fs = require('node:fs');

async function print(readable) {
  readable.setEncoding('utf8');
  let data = '';
  for await (const chunk of readable) {
    data += chunk;
  }
  console.log(data);
}

print(fs.createReadStream('file')).catch(console.error); 

ループがbreakreturn、またはthrowで終了した場合、ストリームは破棄されます。言い換えれば、ストリームを反復処理すると、ストリームが完全に消費されます。ストリームは、highWaterMarkオプションと同じサイズのチャンクで読み取られます。上記のコード例では、fs.createReadStream()highWaterMarkオプションが指定されていないため、ファイルに64 KiB未満のデータがある場合、データは1つのチャンクになります。

readable[Symbol.asyncDispose]()#

安定性: 1 - 実験的

AbortErrorを使用してreadable.destroy()を呼び出し、ストリームが完了したときに履行されるPromiseを返します。

readable.compose(stream[, options])#

安定性: 1 - 実験的

import { Readable } from 'node:stream';

async function* splitToWords(source) {
  for await (const chunk of source) {
    const words = String(chunk).split(' ');

    for (const word of words) {
      yield word;
    }
  }
}

const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords);
const words = await wordsStream.toArray();

console.log(words); // prints ['this', 'is', 'compose', 'as', 'operator'] 

詳細については、stream.composeを参照してください。

readable.iterator([options])#

安定性: 1 - 実験的

  • options <Object>
    • destroyOnReturn <boolean> falseに設定すると、非同期イテレータでreturnを呼び出すか、breakreturn、またはthrowを使用してfor await...of反復を終了しても、ストリームは破棄されません。デフォルト: true
  • 戻り値: ストリームを消費するための<AsyncIterator>

このメソッドで作成されたイテレータを使用すると、for await...ofループがreturnbreak、またはthrowによって終了した場合、またはストリームが反復処理中にエラーを発行した場合に、イテレータがストリームを破棄する必要がある場合に、ストリームの破棄をキャンセルするオプションがユーザーに提供されます。

const { Readable } = require('node:stream');

async function printIterator(readable) {
  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk); // 1
    break;
  }

  console.log(readable.destroyed); // false

  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk); // Will print 2 and then 3
  }

  console.log(readable.destroyed); // True, stream was totally consumed
}

async function printSymbolAsyncIterator(readable) {
  for await (const chunk of readable) {
    console.log(chunk); // 1
    break;
  }

  console.log(readable.destroyed); // true
}

async function showBoth() {
  await printIterator(Readable.from([1, 2, 3]));
  await printSymbolAsyncIterator(Readable.from([1, 2, 3]));
}

showBoth(); 
readable.map(fn[, options])#

安定性: 1 - 実験的

  • fn <Function> | <AsyncFunction> ストリーム内のすべてのチャンクをマップする関数。
    • data <any> ストリームからのデータのチャンク。
    • options <Object>
      • signal <AbortSignal> ストリームが破棄された場合に中断され、fnの呼び出しを早期に中断できるようにします。
  • options <Object>
    • concurrency <number> ストリームで一度に呼び出すfnの最大同時呼び出し数。デフォルト: 1
    • highWaterMark <number> マップされた項目のユーザー消費を待機中にバッファリングする項目の数。デフォルト: concurrency * 2 - 1
    • signal <AbortSignal> シグナルが中断された場合にストリームを破棄できるようにします。
  • 戻り値: 関数fnでマップされたストリーム<Readable>

このメソッドを使用すると、ストリームをマップできます。fn関数は、ストリーム内のすべてのチャンクに対して呼び出されます。fn関数がPromiseを返す場合、そのPromiseは、結果のストリームに渡される前にawaitされます。

import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';

// With a synchronous mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
  console.log(chunk); // 2, 4, 6, 8
}
// With an asynchronous mapper, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
  'nodejs.org',
  'openjsf.org',
  'www.linuxfoundation.org',
]).map((domain) => resolver.resolve4(domain), { concurrency: 2 });
for await (const result of dnsResults) {
  console.log(result); // Logs the DNS result of resolver.resolve4.
} 
readable.filter(fn[, options])#

安定性: 1 - 実験的

  • fn <Function> | <AsyncFunction> ストリームからチャンクをフィルタリングするための関数。
    • data <any> ストリームからのデータのチャンク。
    • options <Object>
      • signal <AbortSignal> ストリームが破棄された場合に中断され、fnの呼び出しを早期に中断できるようにします。
  • options <Object>
    • concurrency <number> ストリームで一度に呼び出すfnの最大同時呼び出し数。デフォルト: 1
    • highWaterMark <number> フィルタリングされたアイテムのユーザーによる消費を待機している間にバッファリングするアイテムの数。デフォルト: concurrency * 2 - 1
    • signal <AbortSignal> シグナルが中断された場合にストリームを破棄できるようにします。
  • 戻り値: <Readable> 述語fnでフィルタリングされたストリーム。

このメソッドはストリームをフィルタリングできます。ストリーム内の各チャンクに対して、fn関数が呼び出され、それが真値を返す場合、チャンクは結果ストリームに渡されます。fn関数がPromiseを返す場合、そのPromiseはawaitされます。

import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';

// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
  console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
  'nodejs.org',
  'openjsf.org',
  'www.linuxfoundation.org',
]).filter(async (domain) => {
  const { address } = await resolver.resolve4(domain, { ttl: true });
  return address.ttl > 60;
}, { concurrency: 2 });
for await (const result of dnsResults) {
  // Logs domains with more than 60 seconds on the resolved dns record.
  console.log(result);
} 
readable.forEach(fn[, options])#

安定性: 1 - 実験的

  • fn <Function> | <AsyncFunction> ストリームの各チャンクで呼び出す関数。
    • data <any> ストリームからのデータのチャンク。
    • options <Object>
      • signal <AbortSignal> ストリームが破棄された場合に中断され、fnの呼び出しを早期に中断できるようにします。
  • options <Object>
    • concurrency <number> ストリームで一度に呼び出すfnの最大同時呼び出し数。デフォルト: 1
    • signal <AbortSignal> シグナルが中断された場合にストリームを破棄できるようにします。
  • 戻り値: <Promise> ストリームが終了したときに完了するPromise。

このメソッドはストリームを反復処理できます。ストリーム内の各チャンクに対して、fn関数が呼び出されます。fn関数がPromiseを返す場合、そのPromiseはawaitされます。

このメソッドは、チャンクを同時に処理できる点が、for await...ofループとは異なります。さらに、forEach反復処理は、signalオプションを渡して関連するAbortControllerを中止することによってのみ停止できますが、for await...ofbreakまたはreturnで停止できます。どちらの場合でも、ストリームは破棄されます。

このメソッドは、基盤のメカニズムでreadableイベントを使用し、同時fn呼び出しの数を制限できる点が、'data'イベントをリッスンすることとは異なります。

import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';

// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
  console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
  'nodejs.org',
  'openjsf.org',
  'www.linuxfoundation.org',
]).map(async (domain) => {
  const { address } = await resolver.resolve4(domain, { ttl: true });
  return address;
}, { concurrency: 2 });
await dnsResults.forEach((result) => {
  // Logs result, similar to `for await (const result of dnsResults)`
  console.log(result);
});
console.log('done'); // Stream has finished 
readable.toArray([options])#

安定性: 1 - 実験的

  • options <Object>
    • signal <AbortSignal> シグナルが中止された場合に、toArray操作をキャンセルできます。
  • 戻り値: <Promise> ストリームの内容を含む配列を含むPromise。

このメソッドを使用すると、ストリームの内容を簡単に取得できます。

このメソッドはストリーム全体をメモリに読み込むため、ストリームの利点を打ち消します。これは、ストリームを消費する主な方法としてではなく、相互運用性と利便性を目的としています。

import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';

await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]

// Make dns queries concurrently using .map and collect
// the results into an array using toArray
const dnsResults = await Readable.from([
  'nodejs.org',
  'openjsf.org',
  'www.linuxfoundation.org',
]).map(async (domain) => {
  const { address } = await resolver.resolve4(domain, { ttl: true });
  return address;
}, { concurrency: 2 }).toArray(); 
readable.some(fn[, options])#

安定性: 1 - 実験的

  • fn <Function> | <AsyncFunction> ストリームの各チャンクで呼び出す関数。
    • data <any> ストリームからのデータのチャンク。
    • options <Object>
      • signal <AbortSignal> ストリームが破棄された場合に中断され、fnの呼び出しを早期に中断できるようにします。
  • options <Object>
    • concurrency <number> ストリームで一度に呼び出すfnの最大同時呼び出し数。デフォルト: 1
    • signal <AbortSignal> シグナルが中断された場合にストリームを破棄できるようにします。
  • 戻り値: <Promise> fnが少なくとも1つのチャンクに対して真値を返した場合、trueと評価されるPromise。

このメソッドはArray.prototype.someに似ており、awaitされた戻り値がtrue(または任意の真値)になるまで、ストリーム内の各チャンクでfnを呼び出します。チャンクに対するfn呼び出しのawaitされた戻り値が真値になると、ストリームは破棄され、Promiseはtrueで完了します。チャンクに対するfn呼び出しのいずれも真値を返さない場合、Promiseはfalseで完了します。

import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';

// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).some((x) => x > 2); // true
await Readable.from([1, 2, 3, 4]).some((x) => x < 0); // false

// With an asynchronous predicate, making at most 2 file checks at a time.
const anyBigFile = await Readable.from([
  'file1',
  'file2',
  'file3',
]).some(async (fileName) => {
  const stats = await stat(fileName);
  return stats.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished 
readable.find(fn[, options])#

安定性: 1 - 実験的

  • fn <Function> | <AsyncFunction> ストリームの各チャンクで呼び出す関数。
    • data <any> ストリームからのデータのチャンク。
    • options <Object>
      • signal <AbortSignal> ストリームが破棄された場合に中断され、fnの呼び出しを早期に中断できるようにします。
  • options <Object>
    • concurrency <number> ストリームで一度に呼び出すfnの最大同時呼び出し数。デフォルト: 1
    • signal <AbortSignal> シグナルが中断された場合にストリームを破棄できるようにします。
  • 戻り値: <Promise> fnが真値で評価された最初のチャンクと評価されるPromise。要素が見つからなかった場合はundefined

このメソッドはArray.prototype.findに似ており、fnの真値を持つチャンクを見つけるために、ストリーム内の各チャンクでfnを呼び出します。fn呼び出しのawaitされた戻り値が真値になると、ストリームは破棄され、Promiseはfnが真値を返した値で完了します。チャンクに対するすべてのfn呼び出しが偽値を返す場合、Promiseはundefinedで完了します。

import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';

// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).find((x) => x > 2); // 3
await Readable.from([1, 2, 3, 4]).find((x) => x > 0); // 1
await Readable.from([1, 2, 3, 4]).find((x) => x > 10); // undefined

// With an asynchronous predicate, making at most 2 file checks at a time.
const foundBigFile = await Readable.from([
  'file1',
  'file2',
  'file3',
]).find(async (fileName) => {
  const stats = await stat(fileName);
  return stats.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(foundBigFile); // File name of large file, if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished 
readable.every(fn[, options])#

安定性: 1 - 実験的

  • fn <Function> | <AsyncFunction> ストリームの各チャンクで呼び出す関数。
    • data <any> ストリームからのデータのチャンク。
    • options <Object>
      • signal <AbortSignal> ストリームが破棄された場合に中断され、fnの呼び出しを早期に中断できるようにします。
  • options <Object>
    • concurrency <number> ストリームで一度に呼び出すfnの最大同時呼び出し数。デフォルト: 1
    • signal <AbortSignal> シグナルが中断された場合にストリームを破棄できるようにします。
  • 戻り値: <Promise> fnがすべてのチャンクに対して真値を返した場合、trueと評価されるPromise。

このメソッドはArray.prototype.everyに似ており、すべてのawaitされた戻り値がfnに対して真値であるかどうかを確認するために、ストリーム内の各チャンクでfnを呼び出します。チャンクに対するfn呼び出しのawaitされた戻り値が偽値になると、ストリームは破棄され、Promiseはfalseで完了します。チャンクに対するすべてのfn呼び出しが真値を返すと、Promiseはtrueで完了します。

import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';

// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).every((x) => x > 2); // false
await Readable.from([1, 2, 3, 4]).every((x) => x > 0); // true

// With an asynchronous predicate, making at most 2 file checks at a time.
const allBigFiles = await Readable.from([
  'file1',
  'file2',
  'file3',
]).every(async (fileName) => {
  const stats = await stat(fileName);
  return stats.size > 1024 * 1024;
}, { concurrency: 2 });
// `true` if all files in the list are bigger than 1MiB
console.log(allBigFiles);
console.log('done'); // Stream has finished 
readable.flatMap(fn[, options])#

安定性: 1 - 実験的

  • fn <Function> | <AsyncGeneratorFunction> | <AsyncFunction> ストリーム内のすべてのチャンクをマッピングする関数。
    • data <any> ストリームからのデータのチャンク。
    • options <Object>
      • signal <AbortSignal> ストリームが破棄された場合に中断され、fnの呼び出しを早期に中断できるようにします。
  • options <Object>
    • concurrency <number> ストリームで一度に呼び出すfnの最大同時呼び出し数。デフォルト: 1
    • signal <AbortSignal> シグナルが中断された場合にストリームを破棄できるようにします。
  • 戻り値: <Readable> 関数fnでフラットマップされたストリーム。

このメソッドは、指定されたコールバックをストリームの各チャンクに適用し、その結果をフラット化して、新しいストリームを返します。

fnからストリームまたは別のイテラブルまたは非同期イテラブルを返すことができ、結果のストリームは返されたストリームにマージ(フラット化)されます。

import { Readable } from 'node:stream';
import { createReadStream } from 'node:fs';

// With a synchronous mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
  console.log(chunk); // 1, 1, 2, 2, 3, 3, 4, 4
}
// With an asynchronous mapper, combine the contents of 4 files
const concatResult = Readable.from([
  './1.mjs',
  './2.mjs',
  './3.mjs',
  './4.mjs',
]).flatMap((fileName) => createReadStream(fileName));
for await (const result of concatResult) {
  // This will contain the contents (all chunks) of all 4 files
  console.log(result);
} 
readable.drop(limit[, options])#

安定性: 1 - 実験的

  • limit <number> readableから削除するチャンクの数。
  • options <Object>
    • signal <AbortSignal> シグナルが中断された場合にストリームを破棄できるようにします。
  • 戻り値: <Readable> limit個のチャンクが削除されたストリーム。

このメソッドは、最初のlimit個のチャンクが削除された新しいストリームを返します。

import { Readable } from 'node:stream';

await Readable.from([1, 2, 3, 4]).drop(2).toArray(); // [3, 4] 
readable.take(limit[, options])#

安定性: 1 - 実験的

  • limit <number> readableから取得するチャンクの数。
  • options <Object>
    • signal <AbortSignal> シグナルが中断された場合にストリームを破棄できるようにします。
  • 戻り値: <Readable> limit個のチャンクが取得されたストリーム。

このメソッドは、最初のlimit個のチャンクを含む新しいストリームを返します。

import { Readable } from 'node:stream';

await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2] 
readable.reduce(fn[, initial[, options]])#

安定性: 1 - 実験的

  • fn <Function> | <AsyncFunction> ストリーム内のすべてのチャンクに対して呼び出すリデューサー関数。
    • previous <any> fnへの最後の呼び出しから取得した値、または指定されている場合はinitial値、またはそれ以外の場合はストリームの最初のチャンク。
    • data <any> ストリームからのデータのチャンク。
    • options <Object>
      • signal <AbortSignal> ストリームが破棄された場合に中断され、fnの呼び出しを早期に中断できるようにします。
  • initial <any> リダクションで使用する初期値。
  • options <Object>
    • signal <AbortSignal> シグナルが中断された場合にストリームを破棄できるようにします。
  • 戻り値: <Promise> リダクションの最終値のPromise。

このメソッドは、ストリームの各チャンクに対して順番にfnを呼び出し、前の要素の計算結果を渡します。リダクションの最終値のPromiseを返します。

initial値が指定されていない場合、ストリームの最初のチャンクが初期値として使用されます。ストリームが空の場合、PromiseはERR_INVALID_ARGSコードプロパティを持つTypeErrorで拒否されます。

import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';

const directoryPath = './src';
const filesInDir = await readdir(directoryPath);

const folderSize = await Readable.from(filesInDir)
  .reduce(async (totalSize, file) => {
    const { size } = await stat(join(directoryPath, file));
    return totalSize + size;
  }, 0);

console.log(folderSize); 

リデューサー関数はストリームを要素ごとに反復処理するため、concurrencyパラメーターや並列処理はありません。reduceを同時に実行するには、非同期関数をreadable.mapメソッドに抽出できます。

import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';

const directoryPath = './src';
const filesInDir = await readdir(directoryPath);

const folderSize = await Readable.from(filesInDir)
  .map((file) => stat(join(directoryPath, file)), { concurrency: 2 })
  .reduce((totalSize, { size }) => totalSize + size, 0);

console.log(folderSize); 

二重ストリームと変換ストリーム#

クラス: stream.Duplex#

二重ストリームは、ReadableインターフェースとWritableインターフェースの両方を実装するストリームです。

Duplexストリームの例としては、以下のようなものがあります。

duplex.allowHalfOpen#

falseの場合、ストリームは読み取り側が終了すると、書き込み側を自動的に終了します。最初にallowHalfOpenコンストラクターオプションで設定されます。デフォルトはtrueです。

既存のDuplexストリームインスタンスのハーフオープン動作を変更するために、これを手動で変更できますが、'end'イベントが発行される前に変更する必要があります。

クラス: stream.Transform#

変換ストリームは、出力が何らかの形で入力に関連付けられているDuplexストリームです。すべてのDuplexストリームと同様に、Transformストリームは、ReadableインターフェースとWritableインターフェースの両方を実装します。

Transformストリームの例としては、以下のようなものがあります。

transform.destroy([error])#

ストリームを破棄し、オプションで'error'イベントを発行します。この呼び出し後、変換ストリームは内部リソースを解放します。実装者はこのメソッドをオーバーライドすべきではなく、代わりにreadable._destroy()を実装する必要があります。Transform_destroy()のデフォルト実装は、emitCloseがfalseに設定されていない限り、'close'も発行します。

destroy()が呼び出されると、それ以降の呼び出しは何も操作を行わず、_destroy()からのエラーを除き、'error'として発行されるエラーはなくなります。

stream.finished(stream[, options], callback)#

  • stream <Stream> | <ReadableStream> | <WritableStream> 可読および/または書込可能なストリーム/ウェブストリーム。
  • options <Object>
    • error <boolean> falseに設定すると、emit('error', err)の呼び出しは終了したと見なされません。デフォルト: true
    • readable <boolean> falseに設定すると、ストリームがまだ読み取り可能であっても、ストリームが終了したときにコールバックが呼び出されます。デフォルト: true
    • writable <boolean> falseに設定すると、ストリームがまだ書き込み可能であっても、ストリームが終了したときにコールバックが呼び出されます。デフォルト: true
    • signal <AbortSignal> ストリームの終了を待つのを中断できます。シグナルが中断された場合、基になるストリームは中断されません。コールバックはAbortErrorで呼び出されます。この関数によって追加されたすべての登録済みリスナーも削除されます。
    • cleanup <boolean> 登録されたすべてのストリームリスナーを削除します。デフォルト: false
  • callback <Function> オプションのエラー引数を取るコールバック関数。
  • 戻り値: <Function> 登録されたすべてのリスナーを削除するクリーンアップ関数。

ストリームが可読、書込可能でなくなった場合、またはエラーや早期クローズイベントが発生した場合に通知を受けるための関数。

const { finished } = require('node:stream');
const fs = require('node:fs');

const rs = fs.createReadStream('archive.tar');

finished(rs, (err) => {
  if (err) {
    console.error('Stream failed.', err);
  } else {
    console.log('Stream is done reading.');
  }
});

rs.resume(); // Drain the stream. 

ストリームが早期に破棄された(中断されたHTTPリクエストなど)、'end''finish'を発行しないエラー処理シナリオで特に役立ちます。

finished APIは、プロミスバージョンを提供します。

stream.finished()は、callbackが呼び出された後も、(特に'error''end''finish'、および'close')ぶら下がったイベントリスナーを残します。これは、予期しない'error'イベント(ストリームの実装が正しくないため)が予期しないクラッシュを引き起こさないようにするためです。これが不要な動作である場合は、返されたクリーンアップ関数をコールバックで呼び出す必要があります。

const cleanup = finished(rs, (err) => {
  cleanup();
  // ...
}); 

stream.pipeline(source[, ...transforms], destination, callback)#

stream.pipeline(streams, callback)#

ストリームとジェネレーターの間でパイプ処理を行い、エラーを転送し、適切にクリーンアップし、パイプラインが完了したときにコールバックを提供するモジュールメソッド。

const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.

// A pipeline to gzip a potentially huge tar file efficiently:

pipeline(
  fs.createReadStream('archive.tar'),
  zlib.createGzip(),
  fs.createWriteStream('archive.tar.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed.', err);
    } else {
      console.log('Pipeline succeeded.');
    }
  },
); 

pipeline APIは、プロミスバージョンを提供します。

stream.pipeline()は、次を除くすべてのストリームでstream.destroy(err)を呼び出します。

  • 'end'または'close'を発行したReadableストリーム。
  • 'finish'または'close'を発行したWritableストリーム。

stream.pipeline()は、callbackが呼び出された後も、ストリームにぶら下がったイベントリスナーを残します。失敗後にストリームを再利用する場合、これによりイベントリスナーのリークやエラーの飲み込みが発生する可能性があります。最後のストリームが可読な場合、最後のストリームを後で消費できるように、ぶら下がったイベントリスナーが削除されます。

stream.pipeline()は、エラーが発生するとすべてのストリームを閉じます。pipelineを使用したIncomingRequestの使用は、予期される応答を送信せずにソケットを破棄するため、予期しない動作につながる可能性があります。以下の例を参照してください。

const fs = require('node:fs');
const http = require('node:http');
const { pipeline } = require('node:stream');

const server = http.createServer((req, res) => {
  const fileStream = fs.createReadStream('./fileNotExist.txt');
  pipeline(fileStream, res, (err) => {
    if (err) {
      console.log(err); // No such file
      // this message can't be sent once `pipeline` already destroyed the socket
      return res.end('error!!!');
    }
  });
}); 

stream.compose(...streams)#

安定性: 1 - stream.composeは実験的です。

2つ以上のストリームを、最初のストリームに書き込み、最後のストリームから読み取るDuplexストリームに結合します。提供された各ストリームは、stream.pipelineを使用して次のストリームにパイプされます。いずれかのストリームがエラーになった場合、外側のDuplexストリームを含め、すべて破棄されます。

stream.composeは、順番に他のストリームにパイプできる(またパイプすべき)新しいストリームを返すため、構成が可能です。対照的に、ストリームをstream.pipelineに渡す場合、通常、最初のストリームは可読ストリーム、最後のストリームは書込可能ストリームであり、閉じた回路を形成します。

Functionを渡した場合、sourceIterableを取るファクトリーメソッドである必要があります。

import { compose, Transform } from 'node:stream';

const removeSpaces = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, String(chunk).replace(' ', ''));
  },
});

async function* toUpper(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase();
  }
}

let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
  res += buf;
}

console.log(res); // prints 'HELLOWORLD' 

stream.composeを使用して、非同期イテラブル、ジェネレーター、関数をストリームに変換できます。

  • AsyncIterableは、可読なDuplexに変換されます。nullを生成することはできません。
  • AsyncGeneratorFunctionは、可読/書込可能な変換Duplexに変換されます。最初のパラメーターとしてソースのAsyncIterableを取る必要があります。nullを生成することはできません。
  • AsyncFunctionは、書込可能なDuplexに変換されます。nullまたはundefinedのいずれかを返す必要があります。
import { compose } from 'node:stream';
import { finished } from 'node:stream/promises';

// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
  yield 'Hello';
  yield 'World';
}());

// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase();
  }
});

let res = '';

// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
  for await (const chunk of source) {
    res += chunk;
  }
});

await finished(compose(s1, s2, s3));

console.log(res); // prints 'HELLOWORLD' 

演算子としてのstream.composeについては、readable.compose(stream)を参照してください。

stream.Readable.from(iterable[, options])#

  • iterable <Iterable> Symbol.asyncIteratorまたはSymbol.iteratorイテラブルプロトコルを実装するオブジェクト。null値が渡された場合、'error'イベントを発行します。
  • options <Object> new stream.Readable([options])に提供されるオプション。デフォルトでは、Readable.from()は、options.objectModefalseに設定して明示的にオプトアウトしない限り、options.objectModetrueに設定します。
  • 戻り値: <stream.Readable>

イテレーターから可読ストリームを作成するためのユーティリティメソッド。

const { Readable } = require('node:stream');

async function * generate() {
  yield 'hello';
  yield 'streams';
}

const readable = Readable.from(generate());

readable.on('data', (chunk) => {
  console.log(chunk);
}); 

Readable.from(string) または Readable.from(buffer) を呼び出しても、パフォーマンス上の理由から、文字列やバッファが他のストリームのセマンティクスに合わせて反復処理されることはありません。

Promise を含む Iterable オブジェクトが引数として渡された場合、未処理の拒否が発生する可能性があります。

const { Readable } = require('node:stream');

Readable.from([
  new Promise((resolve) => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
]); 

stream.Readable.fromWeb(readableStream[, options])#

安定性: 1 - 実験的

stream.Readable.isDisturbed(stream)#

安定性: 1 - 実験的

ストリームが読み取り済みまたはキャンセル済みかどうかを返します。

stream.isErrored(stream)#

安定性: 1 - 実験的

ストリームでエラーが発生したかどうかを返します。

stream.isReadable(stream)#

安定性: 1 - 実験的

ストリームが読み取り可能かどうかを返します。

stream.Readable.toWeb(streamReadable[, options])#

安定性: 1 - 実験的

  • streamReadable <stream.Readable>
  • options <Object>
    • strategy <Object>
      • highWaterMark <number> 与えられた stream.Readable からの読み取りでバックプレッシャーが適用される前の、(作成された ReadableStream の)最大内部キューサイズ。値が提供されない場合、与えられた stream.Readable から取得されます。
      • size <Function> 与えられたデータのチャンクのサイズを計算する関数。値が提供されない場合、すべてのチャンクのサイズは 1 になります。
  • 戻り値: <ReadableStream>

stream.Writable.fromWeb(writableStream[, options])#

安定性: 1 - 実験的

stream.Writable.toWeb(streamWritable)#

安定性: 1 - 実験的

stream.Duplex.from(src)#

双方向ストリームを作成するためのユーティリティメソッドです。

  • Stream は書き込み可能ストリームを書き込み可能な Duplex に、読み取り可能ストリームを Duplex に変換します。
  • Blob は読み取り可能な Duplex に変換します。
  • string は読み取り可能な Duplex に変換します。
  • ArrayBuffer は読み取り可能な Duplex に変換します。
  • AsyncIterableは、可読なDuplexに変換されます。nullを生成することはできません。
  • AsyncGeneratorFunctionは、可読/書込可能な変換Duplexに変換されます。最初のパラメーターとしてソースのAsyncIterableを取る必要があります。nullを生成することはできません。
  • AsyncFunction は書き込み可能な Duplex に変換します。null または undefined のいずれかを返す必要があります。
  • Object ({ writable, readable }) は、readablewritableStream に変換し、それらを Duplex に結合します。Duplexwritable に書き込み、readable から読み取ります。
  • Promise は読み取り可能な Duplex に変換します。値 null は無視されます。
  • ReadableStream は読み取り可能な Duplex に変換します。
  • WritableStream は書き込み可能な Duplex に変換します。
  • 戻り値: <stream.Duplex>

Promise を含む Iterable オブジェクトが引数として渡された場合、未処理の拒否が発生する可能性があります。

const { Duplex } = require('node:stream');

Duplex.from([
  new Promise((resolve) => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
]); 

stream.Duplex.fromWeb(pair[, options])#

安定性: 1 - 実験的

import { Duplex } from 'node:stream';
import {
  ReadableStream,
  WritableStream,
} from 'node:stream/web';

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world');
  },
});

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk);
  },
});

const pair = {
  readable,
  writable,
};
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });

duplex.write('hello');

for await (const chunk of duplex) {
  console.log('readable', chunk);
}const { Duplex } = require('node:stream');
const {
  ReadableStream,
  WritableStream,
} = require('node:stream/web');

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world');
  },
});

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk);
  },
});

const pair = {
  readable,
  writable,
};
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });

duplex.write('hello');
duplex.once('readable', () => console.log('readable', duplex.read()));

stream.Duplex.toWeb(streamDuplex)#

安定性: 1 - 実験的

import { Duplex } from 'node:stream';

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world');
    this.push(null);
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk);
    callback();
  },
});

const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');

const { value } = await readable.getReader().read();
console.log('readable', value);const { Duplex } = require('node:stream');

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world');
    this.push(null);
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk);
    callback();
  },
});

const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');

readable.getReader().read().then((result) => {
  console.log('readable', result.value);
});

stream.addAbortSignal(signal, stream)#

AbortSignal を読み取り可能または書き込み可能なストリームにアタッチします。これにより、コードは AbortController を使用してストリームの破棄を制御できます。

渡された AbortSignal に対応する AbortControllerabort を呼び出すと、ストリームで .destroy(new AbortError()) を呼び出す場合と同じように動作し、Webストリームの場合は controller.error(new AbortError()) を呼び出す場合と同じように動作します。

const fs = require('node:fs');

const controller = new AbortController();
const read = addAbortSignal(
  controller.signal,
  fs.createReadStream(('object.json')),
);
// Later, abort the operation closing the stream
controller.abort(); 

または、非同期イテラブルとして読み取り可能なストリームで AbortSignal を使用する

const controller = new AbortController();
setTimeout(() => controller.abort(), 10_000); // set a timeout
const stream = addAbortSignal(
  controller.signal,
  fs.createReadStream(('object.json')),
);
(async () => {
  try {
    for await (const chunk of stream) {
      await process(chunk);
    }
  } catch (e) {
    if (e.name === 'AbortError') {
      // The operation was cancelled
    } else {
      throw e;
    }
  }
})(); 

または、ReadableStream で AbortSignal を使用する

const controller = new AbortController();
const rs = new ReadableStream({
  start(controller) {
    controller.enqueue('hello');
    controller.enqueue('world');
    controller.close();
  },
});

addAbortSignal(controller.signal, rs);

finished(rs, (err) => {
  if (err) {
    if (err.name === 'AbortError') {
      // The operation was cancelled
    }
  }
});

const reader = rs.getReader();

reader.read().then(({ value, done }) => {
  console.log(value); // hello
  console.log(done); // false
  controller.abort();
}); 

stream.getDefaultHighWaterMark(objectMode)#

ストリームで使用されるデフォルトの highWaterMark を返します。デフォルトは、objectMode の場合は 16384 (16 KiB)、または 16 です。

stream.setDefaultHighWaterMark(objectMode, value)#

ストリームで使用されるデフォルトの highWaterMark を設定します。

ストリーム実装者のためのAPI#

node:stream モジュール API は、JavaScript のプロトタイプ継承モデルを使用してストリームを簡単に実装できるように設計されています。

まず、ストリーム開発者は、4 つの基本ストリームクラス (stream.Writablestream.Readablestream.Duplex、または stream.Transform) のいずれかを拡張する新しい JavaScript クラスを宣言し、適切な親クラスコンストラクターを呼び出すようにします。

const { Writable } = require('node:stream');

class MyWritable extends Writable {
  constructor({ highWaterMark, ...options }) {
    super({ highWaterMark });
    // ...
  }
} 

ストリームを拡張する場合、ユーザーが提供できるオプションと提供すべきオプションを念頭に置き、それらをベースコンストラクターに転送する前に確認してください。たとえば、実装で autoDestroy オプションと emitClose オプションに関して前提条件がある場合は、ユーザーがこれらをオーバーライドできないようにしてください。すべてのオプションを暗黙的に転送するのではなく、転送されるオプションについて明示的に記述してください。

新しいストリームクラスは、作成されるストリームのタイプに応じて、以下の表に示すように、1 つ以上の特定のメソッドを実装する必要があります。

ユースケースクラス実装するメソッド
読み取り専用Readable_read()
書き込み専用Writable_write()_writev()_final()
読み取りと書き込みDuplex_read()_write()_writev()_final()
書き込まれたデータを操作し、結果を読み取るTransform_transform()_flush()_final()

ストリームの実装コードは、コンシューマーで使用することを目的としたストリームの「パブリック」メソッドを決して呼び出すべきではありません (「ストリームコンシューマー向けのAPI」セクションで説明)。そうすると、ストリームを消費するアプリケーションコードで悪影響が発生する可能性があります。

write()end()cork()uncork()read()destroy() などのパブリックメソッドをオーバーライドしたり、'error''data''end''finish''close' などの内部イベントを .emit() を使用して発生させたりすることは避けてください。そうすると、現在および将来のストリームの不変性が損なわれ、他のストリーム、ストリームユーティリティ、およびユーザーの期待との間で動作や互換性の問題が発生する可能性があります。

簡略化された構築#

多くの場合、継承に頼らずにストリームを作成できます。これは、stream.Writablestream.Readablestream.Duplex、または stream.Transform オブジェクトのインスタンスを直接作成し、適切なメソッドをコンストラクターオプションとして渡すことで実現できます。

const { Writable } = require('node:stream');

const myWritable = new Writable({
  construct(callback) {
    // Initialize state and load resources...
  },
  write(chunk, encoding, callback) {
    // ...
  },
  destroy() {
    // Free resources...
  },
}); 

書き込み可能ストリームの実装#

stream.Writable クラスは、Writable ストリームを実装するために拡張されます。

カスタムのWritableストリームは、必ずnew stream.Writable([options])コンストラクターを呼び出し、writable._write()および/またはwritable._writev()メソッドを実装する必要があります。

new stream.Writable([options])#
  • options <Object>
    • highWaterMark <number> stream.write()falseを返し始めるバッファーレベル。デフォルト: 16384 (16 KiB)、またはobjectModeストリームの場合は16
    • decodeStrings <boolean> stream.write()に渡されたstringを、stream._write()に渡す前に、Bufferにエンコードするかどうか(stream.write()呼び出しで指定されたエンコーディングを使用)。他の型のデータは変換されません(つまり、Bufferstringにデコードされません)。falseに設定すると、stringが変換されなくなります。デフォルト: true
    • defaultEncoding <string> stream.write()への引数としてエンコーディングが指定されていない場合に使用されるデフォルトのエンコーディング。デフォルト: 'utf8'
    • objectMode <boolean> stream.write(anyObj)が有効な操作かどうか。設定すると、ストリーム実装でサポートされている場合、string、Buffer、またはUint8Array以外のJavaScript値を書き込むことが可能になります。デフォルト: false
    • emitClose <boolean> ストリームが破棄された後に'close'を発行するかどうか。デフォルト: true
    • write <Function> stream._write()メソッドの実装。
    • writev <Function> stream._writev()メソッドの実装。
    • destroy <Function> stream._destroy()メソッドの実装。
    • final <Function> stream._final()メソッドの実装。
    • construct <Function> stream._construct()メソッドの実装。
    • autoDestroy <boolean> このストリームが終了後に自動的に.destroy()を自身に呼び出すかどうか。デフォルト: true
    • signal <AbortSignal> キャンセルされる可能性を表すシグナル。
const { Writable } = require('node:stream');

class MyWritable extends Writable {
  constructor(options) {
    // Calls the stream.Writable() constructor.
    super(options);
    // ...
  }
} 

または、pre-ES6スタイルのコンストラクターを使用する場合

const { Writable } = require('node:stream');
const util = require('node:util');

function MyWritable(options) {
  if (!(this instanceof MyWritable))
    return new MyWritable(options);
  Writable.call(this, options);
}
util.inherits(MyWritable, Writable); 

または、簡略化されたコンストラクターアプローチを使用する場合

const { Writable } = require('node:stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
}); 

渡されたAbortSignalに対応するAbortControllerabortを呼び出すと、書き込み可能ストリームで.destroy(new AbortError())を呼び出すのと同じように動作します。

const { Writable } = require('node:stream');

const controller = new AbortController();
const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
  signal: controller.signal,
});
// Later, abort the operation closing the stream
controller.abort(); 
writable._construct(callback)#
  • callback <Function> ストリームの初期化が完了したら、この関数を(オプションでエラー引数付きで)呼び出します。

_construct()メソッドは直接呼び出してはなりません。これは子クラスによって実装される可能性があり、その場合、内部のWritableクラスメソッドのみによって呼び出されます。

このオプションの関数は、ストリームコンストラクターが返された後のティックで呼び出され、callbackが呼び出されるまで_write()_final()、および_destroy()の呼び出しを遅延させます。これは、ストリームを使用する前に、状態を初期化したり、リソースを非同期的に初期化したりするのに役立ちます。

const { Writable } = require('node:stream');
const fs = require('node:fs');

class WriteStream extends Writable {
  constructor(filename) {
    super();
    this.filename = filename;
    this.fd = null;
  }
  _construct(callback) {
    fs.open(this.filename, (err, fd) => {
      if (err) {
        callback(err);
      } else {
        this.fd = fd;
        callback();
      }
    });
  }
  _write(chunk, encoding, callback) {
    fs.write(this.fd, chunk, callback);
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, (er) => callback(er || err));
    } else {
      callback(err);
    }
  }
} 
writable._write(chunk, encoding, callback)#
  • chunk <Buffer> | <string> | <any> stream.write()に渡されたstringから変換された書き込むBuffer。ストリームのdecodeStringsオプションがfalseの場合、またはストリームがオブジェクトモードで動作している場合、チャンクは変換されず、stream.write()に渡されたものがそのままになります。
  • encoding <string> チャンクが文字列の場合、encodingはその文字列の文字エンコーディングです。チャンクがBufferの場合、またはストリームがオブジェクトモードで動作している場合、encodingは無視される場合があります。
  • callback <Function> 提供されたチャンクの処理が完了したら、この関数を(オプションでエラー引数付きで)呼び出します。

すべてのWritableストリーム実装は、基になるリソースにデータを送信するためのwritable._write()および/またはwritable._writev()メソッドを提供する必要があります。

Transformストリームは、writable._write()の独自の実装を提供します。

この関数は、アプリケーションコードから直接呼び出してはなりません。これは子クラスによって実装される必要があり、内部のWritableクラスメソッドのみによって呼び出されます。

callback関数は、writable._write()内で同期的に、または非同期的に(つまり、異なるティックで)呼び出して、書き込みが正常に完了したか、エラーが発生して失敗したかを通知する必要があります。callbackに渡される最初の引数は、呼び出しが失敗した場合はErrorオブジェクト、書き込みが成功した場合はnullである必要があります。

writable._write()が呼び出されてからcallbackが呼び出されるまでの間に発生するwritable.write()へのすべての呼び出しにより、書き込まれたデータがバッファリングされます。callbackが呼び出されると、ストリームは'drain'イベントを発行する場合があります。ストリーム実装が複数のデータチャンクを一度に処理できる場合は、writable._writev()メソッドを実装する必要があります。

コンストラクターオプションでdecodeStringsプロパティが明示的にfalseに設定されている場合、chunk.write()に渡されるのと同じオブジェクトのままであり、Bufferではなく文字列である場合があります。これは、特定の文字列データエンコーディングの最適化された処理をサポートするためです。その場合、encoding引数は文字列の文字エンコーディングを示します。それ以外の場合、encoding引数は安全に無視できます。

writable._write()メソッドには、それが定義するクラスの内部であり、ユーザープログラムから直接呼び出すべきではないため、アンダースコアが先頭に付いています。

writable._writev(chunks, callback)#
  • chunks <Object[]> 書き込むデータ。値は、書き込むデータの個別のチャンクを表す<Object>の配列です。これらのオブジェクトのプロパティは次のとおりです。
    • chunk <Buffer> | <string> 書き込むデータを含むバッファインスタンスまたは文字列。WritabledecodeStringsオプションをfalseに設定して作成され、文字列がwrite()に渡された場合、chunkは文字列になります。
    • encoding <string> chunkの文字エンコーディング。chunkBufferの場合、encoding'buffer'になります。
  • callback <Function> 提供されたチャンクの処理が完了したときに呼び出されるコールバック関数(オプションでエラー引数付き)。

この関数は、アプリケーションコードから直接呼び出してはなりません。これは子クラスによって実装される必要があり、内部のWritableクラスメソッドのみによって呼び出されます。

writable._writev()メソッドは、複数のデータチャンクを一度に処理できるストリーム実装で、writable._write()に加えて、または代替として実装できます。実装されており、以前の書き込みからのバッファリングされたデータがある場合、_write()の代わりに_writev()が呼び出されます。

writable._writev()メソッドには、それが定義するクラスの内部であり、ユーザープログラムから直接呼び出すべきではないため、アンダースコアが先頭に付いています。

writable._destroy(err, callback)#
  • err <Error> 考えられるエラー。
  • callback <Function> オプションのエラー引数を取るコールバック関数。

_destroy()メソッドはwritable.destroy()によって呼び出されます。子クラスによってオーバーライドできますが、直接呼び出してはなりません。さらに、プロミスが解決されたときに実行されると、callbackをasync/awaitと混同しないでください。

writable._final(callback)#
  • callback <Function> 残りのデータの書き込みが完了したら、この関数を(オプションでエラー引数付きで)呼び出します。

_final() メソッドは、直接呼び出してはいけません。子クラスによって実装される可能性があり、その場合は、内部の Writable クラスメソッドによってのみ呼び出されます。

このオプションの関数は、ストリームが閉じる前に呼び出され、callback が呼び出されるまで 'finish' イベントを遅延させます。これは、ストリームが終了する前にリソースを閉じたり、バッファリングされたデータを書き込んだりするのに便利です。

書き込み中のエラー#

writable._write()writable._writev()、および writable._final() メソッドの処理中に発生したエラーは、コールバックを呼び出し、エラーを最初の引数として渡すことで伝播させる必要があります。これらのメソッド内から Error をスローしたり、手動で 'error' イベントを発生させたりすると、未定義の動作になります。

Writable ストリームがエラーを発生させたときに、Readable ストリームが Writable ストリームにパイプされている場合、Readable ストリームはパイプ解除されます。

const { Writable } = require('node:stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  },
}); 
書き込み可能ストリームの例#

以下は、かなり単純(でやや無意味)なカスタムの Writable ストリームの実装を示しています。この特定の Writable ストリームインスタンスは実際には特に有用ではありませんが、この例ではカスタムの Writable ストリームインスタンスに必要な各要素を示しています。

const { Writable } = require('node:stream');

class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  }
} 
書き込み可能ストリームでのバッファのデコード#

バッファのデコードは、たとえば、入力が文字列であるトランスフォーマーを使用する場合など、一般的なタスクです。これは、UTF-8 などのマルチバイト文字エンコーディングを使用する場合、簡単な処理ではありません。次の例は、StringDecoderWritable を使用してマルチバイト文字列をデコードする方法を示しています。

const { Writable } = require('node:stream');
const { StringDecoder } = require('node:string_decoder');

class StringWritable extends Writable {
  constructor(options) {
    super(options);
    this._decoder = new StringDecoder(options && options.defaultEncoding);
    this.data = '';
  }
  _write(chunk, encoding, callback) {
    if (encoding === 'buffer') {
      chunk = this._decoder.write(chunk);
    }
    this.data += chunk;
    callback();
  }
  _final(callback) {
    this.data += this._decoder.end();
    callback();
  }
}

const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from);
const w = new StringWritable();

w.write('currency: ');
w.write(euro[0]);
w.end(euro[1]);

console.log(w.data); // currency: € 

読み取り可能ストリームの実装#

Readable ストリームを実装するために、stream.Readable クラスが拡張されます。

カスタムの Readable ストリームは、必ず new stream.Readable([options]) コンストラクターを呼び出し、readable._read() メソッドを実装する必要があります。

new stream.Readable([options])#
  • options <Object>
    • highWaterMark <number> 基になるリソースからの読み取りを停止する前に、内部バッファに格納する最大バイト数デフォルト: 16384 (16 KiB) 、または objectMode ストリームの場合は 16
    • encoding <string> 指定した場合、バッファは指定されたエンコーディングを使用して文字列にデコードされます。デフォルト: null
    • objectMode <boolean> このストリームをオブジェクトのストリームとして動作させるかどうか。つまり、stream.read(n) は、サイズが nBuffer ではなく、単一の値を返します。デフォルト: false
    • emitClose <boolean> ストリームが破棄された後に'close'を発行するかどうか。デフォルト: true
    • read <Function> stream._read() メソッドの実装。
    • destroy <Function> stream._destroy() メソッドの実装。
    • construct <Function> stream._construct() メソッドの実装。
    • autoDestroy <boolean> このストリームが終了後に自動的に.destroy()を自身に呼び出すかどうか。デフォルト: true
    • signal <AbortSignal> キャンセルされる可能性を表すシグナル。
const { Readable } = require('node:stream');

class MyReadable extends Readable {
  constructor(options) {
    // Calls the stream.Readable(options) constructor.
    super(options);
    // ...
  }
} 

または、pre-ES6スタイルのコンストラクターを使用する場合

const { Readable } = require('node:stream');
const util = require('node:util');

function MyReadable(options) {
  if (!(this instanceof MyReadable))
    return new MyReadable(options);
  Readable.call(this, options);
}
util.inherits(MyReadable, Readable); 

または、簡略化されたコンストラクターアプローチを使用する場合

const { Readable } = require('node:stream');

const myReadable = new Readable({
  read(size) {
    // ...
  },
}); 

渡された AbortSignal に対応する AbortControllerabort を呼び出すと、作成された読み取り可能ストリームで .destroy(new AbortError()) を呼び出すのと同じように動作します。

const { Readable } = require('node:stream');
const controller = new AbortController();
const read = new Readable({
  read(size) {
    // ...
  },
  signal: controller.signal,
});
// Later, abort the operation closing the stream
controller.abort(); 
readable._construct(callback)#
  • callback <Function> ストリームの初期化が完了したら、この関数を(オプションでエラー引数付きで)呼び出します。

_construct() メソッドは、直接呼び出してはなりません。子クラスによって実装される可能性があり、その場合は、内部の Readable クラスメソッドによってのみ呼び出されます。

このオプションの関数は、ストリームコンストラクターによって次のティックでスケジュールされ、_read() および _destroy() の呼び出しは、callback が呼び出されるまで遅延されます。これは、ストリームを使用する前に、状態を初期化したり、リソースを非同期で初期化したりするのに便利です。

const { Readable } = require('node:stream');
const fs = require('node:fs');

class ReadStream extends Readable {
  constructor(filename) {
    super();
    this.filename = filename;
    this.fd = null;
  }
  _construct(callback) {
    fs.open(this.filename, (err, fd) => {
      if (err) {
        callback(err);
      } else {
        this.fd = fd;
        callback();
      }
    });
  }
  _read(n) {
    const buf = Buffer.alloc(n);
    fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
      if (err) {
        this.destroy(err);
      } else {
        this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null);
      }
    });
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, (er) => callback(er || err));
    } else {
      callback(err);
    }
  }
} 
readable._read(size)#
  • size <number> 非同期に読み取るバイト数

この関数は、アプリケーションコードから直接呼び出してはなりません。子クラスによって実装される必要があり、内部の Readable クラスメソッドによってのみ呼び出されます。

すべての Readable ストリームの実装は、基になるリソースからデータを取得するための readable._read() メソッドの実装を提供する必要があります。

readable._read() が呼び出されたときに、リソースからデータが利用可能な場合、実装は this.push(dataChunk) メソッドを使用してそのデータを読み取りキューにプッシュを開始する必要があります。ストリームがより多くのデータを受け入れる準備が整うと、this.push(dataChunk) が呼び出されるたびに、_read() が再度呼び出されます。_read() は、readable.push()false を返すまで、リソースから読み取り、データをプッシュし続けることができます。_read() が停止した後、再度呼び出された場合にのみ、キューに追加のデータのプッシュを再開する必要があります。

readable._read() メソッドが呼び出されると、readable.push() メソッドを介してより多くのデータがプッシュされるまで、再度呼び出されることはありません。空のバッファや文字列などの空のデータは、readable._read() を呼び出しません。

size 引数は推奨です。「読み取り」がデータを返す単一の操作である実装の場合、size 引数を使用してフェッチするデータ量を決定できます。他の実装では、この引数を無視し、データが利用可能になったらいつでもデータを単純に提供できます。stream.push(chunk) を呼び出す前に、size バイトが利用可能になるまで「待つ」必要はありません。

readable._read() メソッドには、それが定義されているクラスの内部であり、ユーザープログラムによって直接呼び出すべきではないため、アンダースコアがプレフィックスとして付いています。

readable._destroy(err, callback)#
  • err <Error> 考えられるエラー。
  • callback <Function> オプションのエラー引数を取るコールバック関数。

_destroy() メソッドは、readable.destroy() によって呼び出されます。子クラスによってオーバーライドできますが、直接呼び出してはいけません

readable.push(chunk[, encoding])#
  • chunk <Buffer> | <Uint8Array> | <string> | <null> | <any> 読み取りキューにプッシュするデータのチャンク。オブジェクトモードで動作していないストリームの場合、chunk は文字列、Buffer、または Uint8Array である必要があります。オブジェクトモードのストリームの場合、chunk は任意の JavaScript 値にできます。
  • encoding <string> 文字列チャンクのエンコーディング。'utf8''ascii'などの有効なBufferエンコーディングである必要があります。
  • 戻り値: <boolean> 追加のデータチャンクのプッシュを続行できる場合は true。それ以外の場合は false

chunkBufferUint8Array、または string の場合、データの chunk は、ストリームのユーザーが消費するために内部キューに追加されます。chunknull として渡すと、ストリームの終わり (EOF) が通知され、その後はデータを書き込むことができなくなります。

Readable が一時停止モードで動作している場合、'readable' イベントが発生したときに、readable.read() メソッドを呼び出すことで、readable.push() で追加されたデータを読み出すことができます。

Readable がフローモードで動作している場合、readable.push() で追加されたデータは 'data' イベントを発生させることで配信されます。

readable.push() メソッドは、可能な限り柔軟になるように設計されています。たとえば、何らかの一時停止/再開メカニズムとデータコールバックを提供する下位レベルのソースをラップする場合、カスタムの Readable インスタンスで下位レベルのソースをラップできます。

// `_source` is an object with readStop() and readStart() methods,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over.

class SourceWrapper extends Readable {
  constructor(options) {
    super(options);

    this._source = getLowLevelSourceObject();

    // Every time there's data, push it into the internal buffer.
    this._source.ondata = (chunk) => {
      // If push() returns false, then stop reading from source.
      if (!this.push(chunk))
        this._source.readStop();
    };

    // When the source ends, push the EOF-signaling `null` chunk.
    this._source.onend = () => {
      this.push(null);
    };
  }
  // _read() will be called when the stream wants to pull more data in.
  // The advisory size argument is ignored in this case.
  _read(size) {
    this._source.readStart();
  }
} 

readable.push() メソッドは、コンテンツを内部バッファにプッシュするために使用されます。これは、readable._read() メソッドによって駆動できます。

オブジェクトモードで動作していないストリームの場合、readable.push()chunk パラメーターが undefined の場合、空の文字列またはバッファとして扱われます。詳細については、readable.push('') を参照してください。

読み取り中のエラー#

readable._read() の処理中に発生したエラーは、readable.destroy(err) メソッドを介して伝播する必要があります。readable._read() 内から Error をスローしたり、手動で 'error' イベントを発生させたりすると、未定義の動作になります。

const { Readable } = require('node:stream');

const myReadable = new Readable({
  read(size) {
    const err = checkSomeErrorCondition();
    if (err) {
      this.destroy(err);
    } else {
      // Do some work.
    }
  },
}); 
カウントストリームの例#

以下は、1から1,000,000までの数値を昇順で出力し、その後終了するReadableストリームの基本的な例です。

const { Readable } = require('node:stream');

class Counter extends Readable {
  constructor(opt) {
    super(opt);
    this._max = 1000000;
    this._index = 1;
  }

  _read() {
    const i = this._index++;
    if (i > this._max)
      this.push(null);
    else {
      const str = String(i);
      const buf = Buffer.from(str, 'ascii');
      this.push(buf);
    }
  }
} 

双方向ストリームの実装#

Duplexストリームは、TCPソケット接続のように、ReadableWritableの両方を実装するものです。

JavaScriptは多重継承をサポートしていないため、Duplexストリームを実装するためにstream.Duplexクラスが拡張されます(stream.Readablestream.Writableクラスを拡張するのではなく)。

stream.Duplexクラスは、プロトタイプ的にstream.Readableから継承し、寄生的にstream.Writableから継承しますが、instanceofは、stream.WritableSymbol.hasInstanceをオーバーライドすることにより、両方の基本クラスで正しく動作します。

カスタムDuplexストリームは、new stream.Duplex([options])コンストラクタを呼び出し、readable._read()メソッドとwritable._write()メソッドの両方を実装する必要があります。

new stream.Duplex(options)#
  • options <Object> WritableReadableの両方のコンストラクタに渡されます。また、以下のフィールドがあります。
    • allowHalfOpen <boolean> falseに設定すると、読み取り可能側が終了すると、ストリームは書き込み可能側を自動的に終了します。デフォルト: true
    • readable <boolean> Duplexを読み取り可能にするかどうかを設定します。デフォルト: true
    • writable <boolean> Duplexを書き込み可能にするかどうかを設定します。デフォルト: true
    • readableObjectMode <boolean> ストリームの読み取り可能側のobjectModeを設定します。objectModetrueの場合は効果がありません。デフォルト: false
    • writableObjectMode <boolean> ストリームの書き込み可能側のobjectModeを設定します。objectModetrueの場合は効果がありません。デフォルト: false
    • readableHighWaterMark <number> ストリームの読み取り可能側のhighWaterMarkを設定します。highWaterMarkが指定されている場合は効果がありません。
    • writableHighWaterMark <number> ストリームの書き込み可能側のhighWaterMarkを設定します。highWaterMarkが指定されている場合は効果がありません。
const { Duplex } = require('node:stream');

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    // ...
  }
} 

または、pre-ES6スタイルのコンストラクターを使用する場合

const { Duplex } = require('node:stream');
const util = require('node:util');

function MyDuplex(options) {
  if (!(this instanceof MyDuplex))
    return new MyDuplex(options);
  Duplex.call(this, options);
}
util.inherits(MyDuplex, Duplex); 

または、簡略化されたコンストラクターアプローチを使用する場合

const { Duplex } = require('node:stream');

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  },
}); 

パイプラインを使用する場合

const { Transform, pipeline } = require('node:stream');
const fs = require('node:fs');

pipeline(
  fs.createReadStream('object.json')
    .setEncoding('utf8'),
  new Transform({
    decodeStrings: false, // Accept string input rather than Buffers
    construct(callback) {
      this.data = '';
      callback();
    },
    transform(chunk, encoding, callback) {
      this.data += chunk;
      callback();
    },
    flush(callback) {
      try {
        // Make sure is valid json.
        JSON.parse(this.data);
        this.push(this.data);
        callback();
      } catch (err) {
        callback(err);
      }
    },
  }),
  fs.createWriteStream('valid-object.json'),
  (err) => {
    if (err) {
      console.error('failed', err);
    } else {
      console.log('completed');
    }
  },
); 
双方向ストリームの例#

以下は、データを書き込むことができ、データが読み取ることができる、Node.jsストリームと互換性のないAPIを使用しているものの、仮説的な低レベルソースオブジェクトをラップするDuplexストリームの簡単な例を示しています。以下は、Writableインターフェイスを介して書き込まれた入力データをバッファリングし、Readableインターフェイスを介して読み戻されるDuplexストリームの簡単な例を示しています。

const { Duplex } = require('node:stream');
const kSource = Symbol('source');

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options);
    this[kSource] = source;
  }

  _write(chunk, encoding, callback) {
    // The underlying source only deals with strings.
    if (Buffer.isBuffer(chunk))
      chunk = chunk.toString();
    this[kSource].writeSomeData(chunk);
    callback();
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding));
    });
  }
} 

Duplexストリームの最も重要な点は、Readable側とWritable側が、単一のオブジェクトインスタンス内に共存しているにもかかわらず、互いに独立して動作することです。

オブジェクトモードの双方向ストリーム#

Duplexストリームの場合、objectModeは、readableObjectModeオプションとwritableObjectModeオプションを使用して、Readable側またはWritable側のいずれか専用に設定できます。

たとえば、次の例では、JavaScriptの数値を入力として受け取り、Readable側で16進数の文字列に変換する、オブジェクトモードのWritable側を持つ新しいTransformストリーム(Duplexストリームの一種)が作成されます。

const { Transform } = require('node:stream');

// All Transform streams are also Duplex Streams.
const myTransform = new Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    // Coerce the chunk to a number if necessary.
    chunk |= 0;

    // Transform the chunk into something else.
    const data = chunk.toString(16);

    // Push the data onto the readable queue.
    callback(null, '0'.repeat(data.length % 2) + data);
  },
});

myTransform.setEncoding('ascii');
myTransform.on('data', (chunk) => console.log(chunk));

myTransform.write(1);
// Prints: 01
myTransform.write(10);
// Prints: 0a
myTransform.write(100);
// Prints: 64 

変換ストリームの実装#

Transformストリームは、出力が入力から何らかの方法で計算されるDuplexストリームです。例としては、データを圧縮、暗号化、または復号化するzlibストリームやcryptoストリームがあります。

出力が入力と同じサイズである必要も、同じ数のチャンクである必要も、同時に到着する必要もありません。たとえば、Hashストリームは、入力が終了したときに提供される単一の出力チャンクのみを持ちます。zlibストリームは、入力よりもはるかに小さいか、はるかに大きい出力を生成します。

stream.Transformクラスは、Transformストリームを実装するために拡張されます。

stream.Transformクラスは、プロトタイプ的にstream.Duplexから継承し、writable._write()メソッドとreadable._read()メソッドの独自のバージョンを実装します。カスタムTransformの実装では、transform._transform()メソッドを実装する必要がありtransform._flush()メソッドを実装してもよいです。

Transformストリームを使用する際には、ストリームに書き込まれたデータによって、Readable側の出力が消費されない場合、ストリームのWritable側が一時停止する可能性があることに注意する必要があります。

new stream.Transform([options])#
const { Transform } = require('node:stream');

class MyTransform extends Transform {
  constructor(options) {
    super(options);
    // ...
  }
} 

または、pre-ES6スタイルのコンストラクターを使用する場合

const { Transform } = require('node:stream');
const util = require('node:util');

function MyTransform(options) {
  if (!(this instanceof MyTransform))
    return new MyTransform(options);
  Transform.call(this, options);
}
util.inherits(MyTransform, Transform); 

または、簡略化されたコンストラクターアプローチを使用する場合

const { Transform } = require('node:stream');

const myTransform = new Transform({
  transform(chunk, encoding, callback) {
    // ...
  },
}); 
イベント: 'end'#

'end'イベントは、stream.Readableクラスからのものです。 transform._flush()のコールバックが呼び出された後、すべてのデータが出力された後に、'end'イベントが発行されます。エラーが発生した場合、'end'を発行しないでください。

イベント: 'finish'#

'finish'イベントは、stream.Writableクラスからのものです。 stream.end()が呼び出され、すべてのチャンクがstream._transform()によって処理された後、'finish'イベントが発行されます。エラーが発生した場合、'finish'を発行しないでください。

transform._flush(callback)#
  • callback <Function> 残りのデータがフラッシュされたときに呼び出されるコールバック関数(オプションでエラー引数とデータ付き)。

この関数は、アプリケーションコードから直接呼び出してはなりません。子クラスによって実装される必要があり、内部の Readable クラスメソッドによってのみ呼び出されます。

場合によっては、変換操作でストリームの最後にさらに少しのデータを発行する必要がある場合があります。たとえば、zlib圧縮ストリームは、出力を最適に圧縮するために使用される内部状態の量を格納します。ただし、ストリームが終了すると、圧縮されたデータが完全になるように、その追加のデータをフラッシュする必要があります。

カスタムのTransform実装は、transform._flush()メソッドを実装してもよいです。これは、消費する書き込みデータがなくなったとき、ただし'end'イベントがReadableストリームの終わりを示すために発行される前に呼び出されます。

transform._flush()の実装内では、必要に応じて、transform.push()メソッドを0回以上呼び出すことができます。フラッシュ操作が完了したら、callback関数を呼び出す必要があります。

transform._flush()メソッドには、それを定義するクラスの内部のものであり、ユーザープログラムによって直接呼び出されるべきではないため、アンダースコアが付いています。

transform._transform(chunk, encoding, callback)#
  • chunk <Buffer> | <string> | <any> stream.write()に渡されたstringから変換される、変換されるBuffer。ストリームのdecodeStringsオプションがfalseであるか、ストリームがオブジェクトモードで動作している場合、チャンクは変換されず、stream.write()に渡されたものになります。
  • encoding <string> チャンクが文字列の場合、これはエンコーディングタイプです。チャンクがバッファの場合、これは特別な値'buffer'です。その場合は無視してください。
  • callback <Function> 指定されたchunkが処理された後に呼び出されるコールバック関数(オプションでエラー引数とデータ付き)。

この関数は、アプリケーションコードから直接呼び出してはなりません。子クラスによって実装される必要があり、内部の Readable クラスメソッドによってのみ呼び出されます。

すべてのTransformストリーム実装は、入力を受け入れて出力を生成する_transform()メソッドを提供する必要があります。 transform._transform()実装は、書き込まれるバイトを処理し、出力を計算し、transform.push()メソッドを使用して、その出力を読み取り可能な部分に渡します。

チャンクの結果として出力する量に応じて、transform.push()メソッドは、単一の入力チャンクから出力を生成するために、0回以上呼び出すことができます。

特定の入力データのチャンクから出力が生成されない可能性があります。

現在のチャンクが完全に消費された場合にのみ、callback関数を呼び出す必要があります。callbackに渡される最初の引数は、入力の処理中にエラーが発生した場合はErrorオブジェクトでなければならず、それ以外の場合はnullでなければなりません。2番目の引数がcallbackに渡された場合、最初の引数が偽の場合にのみ、transform.push()メソッドに転送されます。つまり、以下は同等です。

transform.prototype._transform = function(data, encoding, callback) {
  this.push(data);
  callback();
};

transform.prototype._transform = function(data, encoding, callback) {
  callback(null, data);
}; 

transform._transform()メソッドには、それを定義するクラスの内部のものであり、ユーザープログラムによって直接呼び出されるべきではないため、アンダースコアが付いています。

transform._transform()が並行して呼び出されることはありません。ストリームはキューメカニズムを実装しており、次のチャンクを受け取るには、callbackを同期または非同期のいずれかで呼び出す必要があります。

クラス: stream.PassThrough#

stream.PassThroughクラスは、入力を出力に単純に渡すTransformストリームの簡単な実装です。その目的は主に例とテストですが、stream.PassThroughが新しい種類のストリームの構成要素として役立つユースケースがいくつかあります。

追加の注意事項#

ストリームと非同期ジェネレーターおよび非同期イテレーターの互換性#

JavaScript における非同期ジェネレーターとイテレーターのサポートにより、非同期ジェネレーターは現時点で事実上、ファーストクラスの言語レベルのストリーム構成要素となっています。

以下に、Node.js ストリームを非同期ジェネレーターおよび非同期イテレーターとともに使用する一般的な相互運用ケースをいくつか示します。

非同期イテレーターによる読み取り可能なストリームの消費#
(async function() {
  for await (const chunk of readable) {
    console.log(chunk);
  }
})(); 

非同期イテレーターは、破棄後の未処理エラーを防ぐために、ストリームに永続的なエラーハンドラーを登録します。

非同期ジェネレーターによる読み取り可能なストリームの作成#

Node.js の読み取り可能なストリームは、Readable.from() ユーティリティメソッドを使用して、非同期ジェネレーターから作成できます。

const { Readable } = require('node:stream');

const ac = new AbortController();
const signal = ac.signal;

async function * generate() {
  yield 'a';
  await someLongRunningFn({ signal });
  yield 'b';
  yield 'c';
}

const readable = Readable.from(generate());
readable.on('close', () => {
  ac.abort();
});

readable.on('data', (chunk) => {
  console.log(chunk);
}); 
非同期イテレーターからの書き込み可能なストリームへのパイプ処理#

非同期イテレーターから書き込み可能なストリームに書き込む場合、バックプレッシャーとエラーの正しい処理を確保してください。 stream.pipeline() は、バックプレッシャーとバックプレッシャー関連のエラーの処理を抽象化します。

const fs = require('node:fs');
const { pipeline } = require('node:stream');
const { pipeline: pipelinePromise } = require('node:stream/promises');

const writable = fs.createWriteStream('./file');

const ac = new AbortController();
const signal = ac.signal;

const iterator = createIterator({ signal });

// Callback Pattern
pipeline(iterator, writable, (err, value) => {
  if (err) {
    console.error(err);
  } else {
    console.log(value, 'value returned');
  }
}).on('close', () => {
  ac.abort();
});

// Promise Pattern
pipelinePromise(iterator, writable)
  .then((value) => {
    console.log(value, 'value returned');
  })
  .catch((err) => {
    console.error(err);
    ac.abort();
  }); 

古い Node.js バージョンとの互換性#

Node.js 0.10 より前のバージョンでは、Readable ストリームのインターフェースはより単純でしたが、機能は劣り、使い勝手も良くありませんでした。

  • stream.read() メソッドの呼び出しを待つのではなく、'data' イベントはすぐに発行を開始していました。データを処理する方法を決定するために何らかの処理を実行する必要があるアプリケーションは、データが失われないように、読み取ったデータをバッファーに格納する必要がありました。
  • stream.pause() メソッドは保証されたものではなく、助言的なものでした。これは、ストリームが一時停止状態の場合でも、'data' イベントを受信する準備をする必要があったことを意味します。

Node.js 0.10 では、Readable クラスが追加されました。古い Node.js プログラムとの下位互換性のために、Readable ストリームは、'data' イベントハンドラーが追加されたとき、または stream.resume() メソッドが呼び出されたときに「フローモード」に切り替わります。その結果、新しい stream.read() メソッドと 'readable' イベントを使用していない場合でも、'data' チャンクが失われる心配はなくなりました。

ほとんどのアプリケーションは正常に機能し続けますが、これは次の条件でエッジケースを導入します。

  • 'data' イベントリスナーが追加されていません。
  • stream.resume() メソッドは決して呼び出されません。
  • ストリームは書き込み可能な宛先にパイプされていません。

例として、次のコードを考えてみましょう。

// WARNING!  BROKEN!
net.createServer((socket) => {

  // We add an 'end' listener, but never consume the data.
  socket.on('end', () => {
    // It will never get here.
    socket.end('The message was received but was not processed.\n');
  });

}).listen(1337); 

Node.js 0.10 より前のバージョンでは、受信したメッセージデータは単純に破棄されていました。ただし、Node.js 0.10 以降では、ソケットは永久に一時停止したままになります。

この状況での回避策は、stream.resume() メソッドを呼び出してデータのフローを開始することです。

// Workaround.
net.createServer((socket) => {
  socket.on('end', () => {
    socket.end('The message was received but was not processed.\n');
  });

  // Start the flow of data, discarding it.
  socket.resume();
}).listen(1337); 

新しい Readable ストリームがフローモードに切り替わるだけでなく、0.10 より前のスタイルのストリームは、readable.wrap() メソッドを使用して Readable クラスにラップできます。

readable.read(0)#

実際にはデータを消費せずに、基になる読み取り可能なストリームメカニズムのリフレッシュをトリガーする必要がある場合があります。そのような場合、常に null を返す readable.read(0) を呼び出すことができます。

内部の読み取りバッファーが highWaterMark を下回っていて、ストリームが現在読み取りを行っていない場合、stream.read(0) を呼び出すと、低レベルの stream._read() 呼び出しがトリガーされます。

ほとんどのアプリケーションではこれを行う必要はほとんどありませんが、Node.js 内、特に Readable ストリームクラスの内部でこれが行われる状況があります。

readable.push('')#

readable.push('') の使用は推奨されません。

ゼロバイトの文字列、Buffer、または Uint8Array をオブジェクトモードではないストリームにプッシュすると、興味深い副作用が発生します。これは readable.push() の呼び出しであるため、この呼び出しで読み取りプロセスが終了します。ただし、引数が空の文字列であるため、読み取り可能バッファーにデータは追加されず、ユーザーが消費するものはありません。

readable.setEncoding() を呼び出した後の highWaterMark の不一致#

readable.setEncoding() を使用すると、オブジェクトモードでない場合の highWaterMark の動作が変更されます。

通常、現在のバッファーのサイズは バイト 単位で highWaterMark と比較されます。ただし、setEncoding() が呼び出されると、比較関数はバッファーのサイズを 文字 単位で測定し始めます。

これは latin1ascii を使用する一般的なケースでは問題ありません。ただし、マルチバイト文字を含む可能性がある文字列を扱う場合は、この動作に注意することをお勧めします。