Node.jsフロー制御 Part 1 – コールバック地獄 vs. Async vs. Highland

(このシリーズのPart 2はこちら: Node.jsフロー制御 Part 2 – FiberとGenerator

今回は、JavaScript/node.jsアプリケーションのフロー制御に対するアプローチを、いくつか取り上げて比較してみたいと思います。

Expressフレームワークを使った以下のルート処理(お粗末ですが)を例に見てみましょう。

  • ファイルから読み込む
  • いくつかのプロセスを実行する(ステップの数は3つ)
    プロセスとは、単に拡張データをコールバックする任意の非同期処理を指します
  • ファイルに結果を書き出す
  • リクエストに対して成功またはエラーのメッセージを返す

アプローチ1 – 継続渡し

var express = require('express');  
var fs = require('fs');  
var app = express();

app.post('/process-file', function(req, res) {  
  var inputFile = 'input.txt';
  var outputFile = 'output.txt';

  fs.readFile(inputFile, function(err, data) {
    if (err) return res.status(500).send(err);

    process1(data, function(err, data) {
      if (err) return res.status(500).send(err);

      process2(data, function(err, data) {
        if (err) return res.status(500).send(err);

        process3(data, function(err, data) {
          if (err) return res.status(500).send(err);

          fs.writeFile(outputFile, data, function(err) {
            if (err) return res.status(500).send(err);

            res.status(200).send('processed successfully using callback hell');
          });
        });
      });
    });
  });
});

たったこれだけの(比較的)単純なステップで、すでに読みづらく、かつ論理的に考察しづらくなっています(少なくとも私は)。このルート処理が何をしているのかすぐに把握できません。

実行ステップ数が増えれば増えるほど、ひどいコードになり、アプリケーションの理解や保守、拡張がとても困難になっていくでしょう。

その上、各ステップのエラー処理で同じコードを使いまわすのはDRY原則から外れています。

アプローチ2 – 名前付き継続渡し

この例はpmuellr(@pmuellr)が提供してくれました。

var express = require('express')  
var fs = require('fs')  
var app = express()

app.post('/process-file', onProcessFile)

function onProcessFile(req, res) {  
  var inputFile = 'input.txt'
  var outputFile = 'output.txt'

  fs.readFile(inputFile, onReadFile);

  function onReadFile(err, data) {
    if (err) return res.status(500).send(err)
    process1(data, onProcess1)
  }  

  function onProcess1(err, data) {
    if (err) return res.status(500).send(err)
    process2(data, onProcess2)
  }  

  function onProcess2(err, data) {
    if (err) return res.status(500).send(err)
    process3(data, onProcess3)
  }  

  function onProcess3(err, data) {
    if (err) return res.status(500).send(err)
    fs.writeFile(outputFile, data, onWriteFile)
  }

  function onWriteFile(err) {
    if (err) return res.status(500).send(err)
    res.status(200).send('processed successfully using callback hell')
  }
}

少なくともこのアプローチでは、継続が深くネストされることはありません。しかし、フローの各ステップでエラー処理がDRYの原則に反することで生じる問題があります。また、ステップが増えるほど、フローを追うのが大変になる可能性もあります。

アプローチ3 – async.jsを使う

以下は、asyncライブラリとwaterfallメソッドを使用した同じルート処理です。

var express = require('express');  
var async = require('async');  
var fs = require('fs');  
var app = express();

app.post('/process-file', function(req, res) {  
  var inputFile = 'input.txt';
  var outputFile = 'output.txt';

  async.waterfall([
    function(callback) {
      fs.readFile(inputFile, function(err, data) {
        callback(err, data);
      });
    },
    function(data, callback) {
      process1(data, function(err, data) {
        callback(err, data);
      });
    },
    function(data, callback) {
      process1(data, function(err, data) {
        callback(err, data);
      });
    },
    function(data, callback) {
      process1(data, function(err, data) {
        callback(err, data);
      });
    },
    function(data, callback) {
      fs.writeFile(outputFile, data, function(err) {
        callback(err, data);
      });
    }
  ], function(err, result) {
    if (err) return res.status(500).send(err);
    res.status(200).send('processed successfully using async lib');
  });
});

このアプローチだと、処理の手順として何をしているかが分かりやすくなり、少なくともエラー処理での重複を防ぐことができます。

また、フロー中で処理関数を直接使用することもでき(これは引数がフロー関数のものと同じためです)、最終処理のための名前付きコールバックを作成することもできます。

var express = require('express');  
var async = require('async');  
var fs = require('fs');  
var app = express();

app.post('/process-file', function(req, res) {  
  var inputFile = 'input.txt';
  var outputFile = 'output.txt';

  var done = function(err, result) {
    if (err) return res.status(500).send(err);
    res.status(200).send('processed successfully with async');
  };

  async.waterfall([
    fs.readFile.bind(fs, inputFile),
    process1,
    process2,
    process3,
    fs.writeFile.bind(fs, outputFile)
  ], done);
});

“コールバック地獄”に比べれば、格段に良くなりました。でも、まだ十分とは言えません。

アプローチ4 – highland.js(stream)を使う

最近は、@caolanhighlandライブラリを活用しています。これはネイティブのnode.jsのstreamとの完全な互換性を目指した、高度なstreamのライブラリです。今回はこのライブラリについて触れませんが、以下はhighland streamを使用した同じルート処理です。

var express = require('express');  
var _ = require('highland');  
var fs = require('fs');  
var app = express();

app.post('/process-file', function(req, res) {  
  var inputFile = 'input.txt';
  var outputFile = 'output.txt';

  // create a highland stream
  var data = _([inputFile]);

  data
    .flatMap(_.wrapCallback(fs.readFile))
    .flatMap(_.wrapCallback(process1))
    .flatMap(_.wrapCallback(process2))
    .flatMap(_.wrapCallback(process3))
    .flatMap(_.wrapCallback(fs.writeFile.bind(fs, outputFile)))
    .stopOnError(function(err) {
      res.status(500).send(err);
    })
    .apply(function(data) {
      res.status(200).send('processed successfully using highland streams')
    });
});

このアプローチは読みやすく拡張性があると思います。これは関数的アプローチで、アプリケーションはさらに保守しやすく考察しやすいものになります。あと50ステップ追加しても、アプリケーションのフローを簡単に素早く把握できるでしょう。JavaScriptのアプリケーションを作るなら、このライブラリを、また一般に、node.js streamを一読されることを強くお勧めします。

アプローチ5 – Promise(bluebird)

Esailija – @PetkaAntonovbluebirdの作者)の、promiseを使用した例に賛同し、感謝します。

var express = require('express');  
var Promise = require("bluebird");  
var fs = Promise.promisifyAll(require('fs'));  
var app = express();

app.post('/process-file', function(req, res) {  
  var inputFile = 'input.txt';
  var outputFile = 'output.txt';

  fs.readFileAsync(inputFile)
    .then(Promise.promisify(process1))
    .then(Promise.promisify(process2))
    .then(Promise.promisify(process3))
    .then(fs.writeFileAsync.bind(fs, outputFile))
    .then(function(data) {
      res.status(200).send('processed successfully using bluebird promises');
    })
    .catch(function(err) {
      res.status(500).send(err);
    });
});

この例から分かるように、promiseはstreamとよく似ていますが、単一の値だけを出すという点で異なります。promiseの.then()メソッドはnode streamの.pipe()と同等と考えられます。

その結果、私は先週、勉強のためにpromises(A+)を一から実装することに時間を費やしましたが、これがとても面白くて役に立ちました。

おまけ:highlandと.reduce()を使用した例

Lewis Ellis – @LewisJEllishighlandのcontributorの1人)の巧みな例に感謝します。

var express = require('express');  
var _ = require('highland');  
var fs = require('fs');  
var app = express();

function chain(s, f) {  
  return s.flatMap(_.wrapCallback(f))
}

app.post('/process-file', function(req, res) {  
  var inputFile = 'input.txt';
  var outputFile = 'output.txt';

  var data = _([inputFile]);

  _([
    fs.readFile,
    process1,
    process2,
    process3,
    writeToFileName(outputFile)
  ]).reduce(data, chain).flatten()
    .stopOnError(function (err) {
      return res.status(500).send(err);
    }).apply(function (data) {
      return res.status(200).send('processed');
    });
});

おまけ2:highlandとasync.composeを使用した例

var express = require('express');  
var _ = require('highland');  
var async = require('async');  
var fs = require('fs');  
var app = express();

function chain(s, f) {  
  return s.flatMap(_.wrapCallback(f))
}

app.post('/process-file', function(req, res) {  
  var inputFile = 'input.txt';
  var outputFile = 'output.txt';

  var data = _([inputFile]);

  data
    .flatMap(_.wrapCallback(
      async.compose(
        fs.writeFile.bind(fs, outputFile),
        process3,
        process2,
        process1,
        fs.readFile
      )
    ))
    .errors(function(err) {
      res.status(500).send(err);
    })
    .each(function(data) {
      res.status(200).send('processed successfully using highland streams and async.compose');
    });

この記事のパート2(Node.jsフロー制御 Part 2 – FiberとGenerator)には、さらに、generatorとfiberを使った非同期フローの例があります。