D言語はデータサイエンスのためにある

Andrew Pascoeは、部屋に座っているAdRollのシニアデータサイエンティストです。

プログラミング言語Dは、効率的にタスクをこなすためにデータサイエンスチームがすぐに好んで使うようになりました。今では重要なインフラストラクチャに欠かせない言語になっています。なぜでしょうか。それはD言語が多くを提供するからです。


簡単な紹介

他の典型的なデータサイエンスワークフローと比較して、D言語を使用する最も明確な利点の1つは、マシンコードにコンパイルできるという点です。インタプリタや仮想マシンレイヤがなければ、Java Hadoopフレームワーク、R、Pythonのような他のツールよりかなり高速にデータ中からリッピングできます。しかし、D言語のコンパイラは、多くの場合、まるでスクリプト言語のように高速に実行できます。100万の一様ランダム変量を生成し、ソートし、十分位数を見つける処理について、D言語とPythonを比較してみましょう。

from random import uniform

variates = [uniform(0.0, 1.0) for i in range(1000000)]
variates.sort()
for i in range(0, 1000000, 100000):
    print variates[i]

%> time python deciles.py > some_deciles

real    0m0.857s
user    0m0.825s
sys 0m0.032s

D言語では以下のようになります。

import std.random;
import std.stdio;

void main(string[] args) {
    double[] variates = new double[1_000_000];
    foreach(ulong i, double _; variates) {
        variates[i] = uniform(0.0, 1.0);
    }
    variates.sort;
    foreach(int i; 0..10) {
        writeln(variates[i * 100_000]);
    }
}

%> time rdmd deciles.d > some_deciles

real    0m0.929s
user    0m0.725s
sys 0m0.177s

待ってください。何ですって? 時間がかかっていますね。でもそれは、最初にコードを実行した際のコンパイル時間が含まれているからです。コードを変更せずに再度実行すると、次のようになります。

%> time rdmd deciles.d > some_deciles

real    0m0.293s
user    0m0.291s
sys 0m0.004s

よくなりましたね。rdmdは、コードの変更がなければ、わざわざ再コンパイルしません。コードの計算がより複雑になって、かなりの量のデータ上で多くの計算を行わなくてはならなくなったら、こういった節約がかなり重要になります。

しかし、もちろん本当に重要なのはそこではありません。超効率的なコードが必要な場合、コンパイラ型言語が一番だということは分かっています。よく推奨されるCやC++のような他の効率的な言語とD言語を区別する重要な点は、D言語なら一定の時間に最も快適に感じるスタイルで自由にプログラミングできるということです。

上記のコードは、よりシンプルなPythonでさらに頭を抱えることなく、D言語で素早く”スクリプト”を記述する方法を示しています。しかし、D言語はオブジェクト指向のコードを書き始める場合にもきれいに記述できます。

import std.random;
import std.range;
import std.stdio;

class Rectangle {
    double width;
    double height;

    this(double width, double height) {
        this.width = width;
        this.height = height;
    }

    double area() {
        return width * height;
    }
}

void main(string[] args) {
    Rectangle[] rs = new Rectangle[1_000_000];
    rs = rs.map!(x => new Rectangle(uniform(0.0, 1.0),
                                    uniform(0.0, 1.0))).array;
    rs.sort!((x, y) => x.width < y.width);
    foreach(int i; 0..10) {
        writefln("%0.4f\t%0.4f\t%0.4f",
                 rs[i * 100_000].width,
                 rs[i * 100_000].height,
                 rs[i * 100_000].area());
    }
}

%> rdmd rectangle_deciles.d
0.0000 0.9382   0.0000
0.1000 0.2020   0.0202
0.1996 0.3612   0.0721
0.2995 0.3947   0.1182
0.3994 0.7440   0.2972
0.4993 0.8733   0.4360
0.5997 0.0221   0.0132
0.6997 0.6624   0.4634
0.7997 0.6204   0.4961
0.9003 0.4640   0.4177

また、D言語ならハイパフォーマンスなニーズにいつでも応えられます。例えば、高速な逆平方根計算をしたい場合、(リンク先のWikipediaの記事から軽く修正された)何らかのポインタvoodooを使用できます。

import std.conv;
import std.stdio;

void main(string[] args) {
    //computes 1/sqrt(x)
    auto x = to!float(args[1]);
    int i;
    float x2, y;
    const float threehalves = 1.5f;

    x2 = x * 0.5f;
    y = x;
    i = *cast(int*)&y;
    i = 0x5f3759df - ( i >> 1 );
    y = *cast(float*)&i;
    y = y * (threehalves - (x2 * y * y));

    writeln(y);
}

%> rdmd invsqrt.d 1
0.998307
%> rdmd invsqrt.d 2
0.70693
%> rdmd invsqrt.d 4
0.499154
%> rdmd invsqrt.d 16
0.249577

最大限のパフォーマンスを引き出したい場合、D言語を使えばインラインアセンブリを書くことすらできるでしょう。しかし、これは全てただのお遊びです。D言語は現実世界のシナリオでどのように役立つでしょうか。


データ中からのリッピング

AdRollで仕事をしている過程で、D言語でうまく機能していたインフラストラクチャがいくつかありましたが、ある時点で、データの問題がコードで設計された範囲を超えた時、最適化しなければならなくなりました。そして、信じられないかもしれませんが、その問題は区切り形式のファイルからフィールドを引き出すくらいによくあることでした。以下は、私たちがやったことの要点です。

この特定のログファイルには、ASCIIレコードセパレータによって区切られた広告データがいくつか含まれています。タイムスタンプとそのデータがどの国から来たのかを引き出したいとしましょう。恐らくもう想像できるでしょうが、単純なD言語のソリューションはかなり読みやすいのですが、たぶん期待するほど動きはよくありません。

import std.stdio;
import std.string;

static immutable uint TIMESTAMP_INDEX = 19;
static immutable uint COUNTRY_INDEX = 42;
static immutable char DELIMITER = cast(char)30;

void main(string[] args) {
    auto file = File(args[1], "r");
    foreach(char[] line; file.byLine()) {
        char[][] fields = split(line, DELIMITER);
        writefln("%s\t%s",
                 fields[TIMESTAMP_INDEX],
                 fields[COUNTRY_INDEX]);
    }
    file.close();
}

%> time rdmd parser.d log.txt > country_info

real    0m13.421s
user    0m13.270s
sys 0m0.153s

13秒? 長すぎます! この1つのログファイルはAdRollで毎日生成されている約130TBのログファイルの単なるサンプルにすぎず、圧縮されていません。問題をばらまいてしまう可能性はありますが、クラスタをスピンアップするのに時間がかかりますし、私たちのモットーの1つは、”少ないものでより多くのことを行う”です。ここでパフォーマンスを向上させてスケーラビリティを抑えるにはどうすればよいでしょうか。

私たちができる最適な対処法の1つとして割り当てるメモリの量を極小化する方法があります。現在行っている手順として、行を読み込むごとに新たにchar[]を割り当ています。しかしこの場合、一度読み込んでchar[]に代入した行をレコードセパレータで分割するために、再度読み込むことになります。この分割によってその行にある全てのフィールドにおいて配列を生成し、その全てにメモリを割り当てるようになるのです。効率的な観点から見ると、単純なはずのコードが実際にはかなり乱雑なものなっていることは明らかです。

まずメモリの割り当てについて最初にできる対処法は、読み込むために既に割り当てられているバッファを設けるということです。ここでの問題は、前の行に続いて次の行が即座に同じバッファに入り、最終的にバッファに収まりきらなくなるということです。最悪、扱おうとしているフィールドを部分的にしか取得できないかもしれないのです。

この解決法として2つのバッファを用意して交換しながら使用する方法があります。扱うフィールドが両方のバッファにまたがっているかもしれないので、簡単に復元できるよう2倍長のバッファを構成するようにもなります。現行のバッファの終わりまで到達すると、それを”最終バッファ”とします。そして、もう一方のバッファにさらなるデータを読み込んでいき、それを現行のバッファにします。注意点として、両方のバッファを合わせたものより長い行がないことを確認する必要があります。

次に考えるべきことは非効率な分割についてです。完全に行を分離するのではなく、メモリに割り当てられるごとに、ただ順次それを読み込んでいきましょう。バッファ(配列のインデックス)と現在の行(それまでにあった区切り文字の数)の両方を通して進行をたどっていきます。ある数の区切り文字に遭遇したとき、次の区切り文字を探し出すだけで、それらの間にある内容がフィールドだと分かります。改行を読み込むことで行の最後に行きついたことが分かると、行の進行をリセットします。

最後に、全てのフィールドが集まると、次の改行に行きつくまでバッファリングするだけになります。

説明はこのくらいにして、コードを見ていきましょう。

import std.stdio;

immutable static ulong READ_BUFFER_SIZE = 32768;
immutable static char DELIMITER = cast(char)30;
immutable static char NEWLINE = cast(char)10;

immutable static uint TIMESTAMP_INDEX = 19;
immutable static uint COUNTRY_INDEX = 42;

immutable static char[] UNKNOWN_FIELD = "unknown";

class FastReader {
    File file;

    char[] bufferA;
    char[] bufferB;
    char[] double_buffer;
    char[] current_buffer;
    char[] last_buffer;
    ulong num_buf;

    ulong index;
    uint num_del;
    bool line_end;

    this(string filename) {
        file = File(filename, "r");
        bufferA = new char[READ_BUFFER_SIZE];
        bufferB = new char[READ_BUFFER_SIZE];
        double_buffer = new char[2 * READ_BUFFER_SIZE];
        current_buffer = bufferA;
        last_buffer = bufferB;
        num_buf = 0;

        index = 0;
        num_del = 0;
        line_end = false;
    }

    void reset_progress() {
        num_del = 0;
        line_end = false;
    }

    void swap_and_load() {
        last_buffer = current_buffer;
        num_buf++;
        if((num_buf & 1) == 0)
            current_buffer = bufferA;
        else
            current_buffer = bufferB;
        current_buffer = file.rawRead(current_buffer);
        index = 0;
    }

    void get_field(uint field_id, ref char[] field) {
        if(line_end) {
            field = UNKNOWN_FIELD.dup;
            return;
        }
        ulong start = index;
        while(num_del < field_id) {
            if(start == current_buffer.length) {
                swap_and_load();
                start = 0;
            }
            if(current_buffer[start] == NEWLINE) {
                line_end = true;
                break;
            }
            if(current_buffer[start] == DELIMITER)
                num_del++;
            start++;
        }
        if(start == current_buffer.length) {
            swap_and_load();
            start = 0;
        }
        if(line_end) {
            field = UNKNOWN_FIELD.dup;
            index = start + 1;
            return;
        }
        ulong end = start;
        bool swapped = false;
        while(current_buffer[end] != DELIMITER) {
            end++;
            if(end == current_buffer.length) {
                swap_and_load();
                swapped = true;
                end = 0;
            }
            if(current_buffer[end] == NEWLINE) {
                line_end = true;
                num_del--; //Don't count as delimiter
                break;
            }
        }
        num_del++;

        if(!swapped)
            field = current_buffer[start .. end];
        else {
            ulong size_end = last_buffer.length - start;
            double_buffer[0 .. size_end] = last_buffer[start .. $];
            double_buffer[size_end .. (size_end + end)] =
                current_buffer[0 .. end];
            field = double_buffer[0 .. (size_end + end)];
        }
        index = end + 1;
    }

    void advance_to_next() {
        ulong start = index;
        if(start == current_buffer.length) {
            swap_and_load();
            start = 0;
        }
        while(current_buffer[start] != NEWLINE) {
            start++;
            if(start == current_buffer.length) {
                swap_and_load();
                start = 0;
            }
        }
        index = start + 1;
        reset_progress();
    }

    bool eof() {
        return file.eof & (index == current_buffer.length);
    }

    void close() {
        file.close();
    }
}

void process_file(string file) {
    char[] timestamp;
    char[] country;
    FastReader frd = new FastReader(file);
    while(!frd.eof) {
        frd.get_field(TIMESTAMP_INDEX, timestamp);
        frd.get_field(COUNTRY_INDEX, country);
        frd.advance_to_next();
        writefln("%s\t%s", timestamp, country);
    }
    frd.close();
}

void main(string[] args) {
    process_file(args[1]);
}

同じ動作をするシンプルなプログラムですが、前よりかなり洗練されました。では、少し詳しく見ていきましょう。bufferAとbufferBは2つの補助的なバッファで、current_bufferまたはlast_bufferによって指定されます。これはnum_buffでたどったときの状態によって変わってきます。current_bufferによる進行をたどる場合はindexを使い、行による進行をたどる場合はそれまでにあった区切り文字の数を表すためにnum_delを使用します。そして最後にline_endで行の最後に行きついたかどうかを判断します。

FastReaderを使用した例が簡単です。それはbufferAから始めます。新しい行の最初に行きついたときにreset_progress()が呼び出され、そのことを反映させるために状態をアップデートします。swap_and_load()はトグリングを行うメソッドになります。ただし、current_bufferを決定するthenum_bufの動作がnum_buf % 2と同等であることを確認してください。

file.rawRead()メソッドを使用することで生じる問題もあります。char[]を用意し、そこにFileからデータを入れていくのですが、ファイルの最後まで行きつくと、char[]はその前の内容をEOFの先に入れてしまいます。例えば次のようになります。

import std.stdio;

void main(string[] args) {
    auto file = File("test.txt", "r");
    char[] buffer = "0123456789".dup;

    writeln("buffer length:\t", buffer.length);
    writeln("buffer content:\t", buffer);

    file.rawRead(buffer);

    writeln("buffer length:\t", buffer.length);
    writeln("buffer content:\t", buffer);
}
%> echo -n "test" > test.txt
%> rdmd test.d
buffer length:   10
buffer content:  0123456789
buffer length:   10
buffer content:  test456789

私たちがやろうとしている目的として、この動作はあまり好ましくありません。なぜなら、実際にいつFileや最終行の末尾に到達したか知りたいからです。結局、file.rawRead()でも新しくなった配列の内容の一部に前の内容を含めてchar[]で返すようになると分かります。これはコードをbuffer = file.rawRead(buffer)に変更するだけで解決します。

%> rdmd test.d
buffer length:  10
buffer content: 0123456789
buffer length:  4
buffer content: test

swap_and_load()では同じ構築を使用し、これにより最終的にcurrent_bufferの進行が0にリセットされます。

get_field()は特に複雑ですが、それでも分かりやすいメソッドです。すでに行の末尾に到達している場合は検索するフィールドがないため、これは不明であると記述されます。区切り文字のカウントはcurrent_bufferの場所から始めます。そしてcurrent_bufferの末尾に到達したらswap_and_load()を実行します。区切り文字が正しくカウントされたら、次のフィールドを検索する必要があります。基本的には同じコードを使用しますが、この処理でバッファをスワップするかどうかを把握している必要があります。改行に到達したら、その改行をフィールドの末尾としてカウントします。

フィールドの構築は単純です。スワップを行わなかった場合、フィールドは開始から終了までのインデックスのcurrent_bufferの一部分になります。それ以外の場合はlast_bufferとcurrent_bufferを組み合わせたものになります。

advance_to_next()メソッドも同じ方法で区切り文字を検索しますが、改行文字を検索する代わりに次の行に移動してreset_progres()を実行します。

ここで気を付けるべき点があります。それはeof()メソッドがあることです。EOFと検出されてもまだ読み取る行が残っている可能性があります。読み取る行が残っていない場合は、indexの長さがcurrent_bufferと同じになるはずです。最後に、process_file()の呼び出しで必要なフィールドが全て取得されてから次の行に移動していることを確認します。特に、フィールドの検索は番号順に行うことに注意してください。行の検索は順番に行われ、さかのぼることはないため、これは厳守する必要があります。

さて、コードに少しボリュームが出てきました。しかし、これは決して無駄な増量ではありません。こちらをご覧ください。

%> time rdmd parser_fast.d log.txt > country_info_fast

real   0m2.159s
user   0m2.093s
sys    0m0.068s

なんと、時間が6分の1に短縮されました。同じ時間でより多くのデータを検索できるようになりましたね。エラーがないか、念のため差分を確認してみましょう。

%> diff country_info country_info_fast
%>

問題ないようです。でも、せっかくマルチコアのマシンで実行しているなら、一度に複数行を読み取りたいと思いませんか。その場合、どうすればいいでしょう。

import std.parallelism;
import std.stdio;
import std.string;

/* Then it's all the same until main()... */

void main(string[] args) {
    auto files = split(args[1], ",");
    foreach(string file; parallel(files)) {
        process_file(file);
    }
}

意外と簡単でした。パフォーマンスはどう変化するでしょうか。

%> for i in 2 3 4; do cp log.txt log$i.txt; done
%> time rdmd parser_parallel.d log.txt > more_country_info

real    0m2.219s
user    0m2.153s
sys 0m0.070s
%> time rdmd parser_parallel.d log.txt,log2.txt > more_country_info

real    0m2.308s
user    0m4.340s
sys 0m0.228s
%> time rdmd parser_parallel.d log.txt,log2.txt,log3.txt > more_country_info

real    0m2.458s
user    0m6.821s
sys 0m0.354s
%> time rdmd parser_parallel.d log.txt,log2.txt,log3.txt,log4.txt > more_country_info

real    0m2.634s
user    0m9.213s
sys 0m0.781s

4倍の量のデータを実行しているのに、元の方法よりスピードが5分の1に短縮されました。つまりパフォーマンスが20倍も向上したということです。でも、まだ終わりではありません。

rdmdが実行する最適化の回数がこれまでよりも減っています。コードが最適化を必要とする状態になったら、スクリプト構文で実行するのではなくdmdでコンパイルすることが有効になります。公正を期すため、元の単純な方法と並列化されていない方法でもコンパイルを行います。

%> dmd -O -release -inline parser.d
%> time ./parser log.txt > country_info

real    0m8.352s
user    0m8.224s
sys 0m0.128s
%> dmd -O -release -inline parser_fast.d
%> time ./parser_fast log.txt > country_info_fast

real    0m0.697s
user    0m0.637s
sys 0m0.060s
%> dmd -O -release -inline parser_parallel.d
%> time ./parser_parallel log.txt > more_country_info
real    0m0.950s
user    0m0.759s
sys 0m0.181s
%> time ./parser_parallel log.txt,log2.txt > more_country_info

real    0m0.857s
user    0m1.467s
sys 0m0.196s
%> time ./parser_parallel log.txt,log2.txt,log3.txt > more_country_info

real    0m1.127s
user    0m2.469s
sys 0m0.617s
%> time ./parser_parallel log.txt,log2.txt,log3.txt,log4.txt > more_country_info

real    0m1.527s
user    0m3.792s
sys 0m1.264s

全てをコンパイルすることによって、この高速な方法は正統な方法と比べてパフォーマンスが12倍近く向上します。並列化された方法で4倍の量のデータを実行した場合には、22倍も向上しています。

AdRollデータサイエンスのメンバーは、すっかりD言語に夢中です。その理由はもうお分かりでしょう。新しいインフラストラクチャと分析タスクのプロトタイプは簡単に作成できます。そして効率が重視されるようになれば、同じコードベースを最大限のパフォーマンスが引き出されるようにリファクタリングできるのです。効率的で無駄のないコードを使ってビッグデータの問題を解決したいという方は、ぜひ私たちの仲間になってください