入力側のストリームが複数ある場合に、それを順に切り替えながら、一つのストリームに出力するという処理を考えます。

例として、3つのファイル(a.txt, b.txt, c.txt)の内容を順に連結し、out.txt に出力するプログラムを Stream を使って書いてみます。Unix のコマンドで表現すると、概ね、次のようなものに対応します。

$ cat a.txt b.txt c.txt > out.txt

テスト用のファイル(a.txt, b.txt, c.txt)を作っておきます。

const fs = require("fs"),
const size = 4 * 1024 * 1024;  // 4MB

fs.writeFileSync("a.txt", "a".repeat(size));
fs.writeFileSync("b.txt", "b".repeat(size));
fs.writeFileSync("c.txt", "c".repeat(size));

各 4MB のファイルを作ります。テストデータが十分大きくない場合は、偶然うまく動作してしまうことがあります。ストリームのデフォルトのバッファサイズは 64kB ですから、十分大きいと言えるでしょう。

今回作成したコードは次のようになります。入力側のストリームが複数ある場合に、それを順に切り替えながら、一つのストリームに出力することがテーマですので、それ以外の細かい部分はざっくり切り捨ててあります。

"use strict";

const fs = require("fs"),
      async = require("async");

// input files
const files = [ "a.txt", "b.txt", "c.txt" ];

// output stream
const writer = fs.createWriteStream("out.txt");

// concatenate input files
async.mapSeries(files, (fn, callback) => {

  const reader = fs.createReadStream(fn);

  reader.on("end", () => {
    reader.unpipe();
    callback();
  });
  reader.pipe(writer, { end: false });

}, () => {

  // finally, close the WritableStream
  writer.end();

});

ポイントは、次の点です。

pipe() する時に、{ end: false } を指定しましょう。これを忘れると、1番目のファイルの読み込みが終了した時点で writer も一緒にクローズされて、以後のファイルの内容が出力されません。 unpipe() しなくても動きますが、やっておくと、メモリ消費量が若干減ります。お行儀の良いプログラムを心掛けましょう。

async.js のうち、finally の処理が書けるものが使えます。当然のことながら、writer をクローズする処理を書かないといけません。eachSeries() ではなく、mapSeries() を選んだのは、この理由です。

async.js のうち、〜Series() 系のものを使いましょう。〜Series() が付いていないものは、各処理が並列に動いてしまうので、意図した順で動きません。 この記事のコードは Node.js 4.4.0 で実行しています。