Go言語の並行性を映像化する

Goというプログラミング言語の強みの1つは、Tony Hoare考案のCSPに基づくビルトインの並行性(Concurrency)です。Goは並行性を念頭にデザインされているため、複雑に並行したパイプラインの構築を可能にしています。でも、それぞれの並行性パターンがどのように見えるものなのか気になったことはありませんか。

もちろん、気になったことはあると思います。恐らくそれぞれ形は違っても、誰もが頭に描いているのではないでしょうか。もし、「1から100までの数字」について聞かれたら、無意識に頭の中で数字のイメージを思い浮かべると思います。例えば、私の場合、自分の前から1から20までがまっすぐに並び、21以降は90度右に曲がり1000以降まで続くイメージが浮かびます。これは多分私が幼稚園の時に教室の壁に沿って数字が貼られていて、ちょうど角に数字の20があったからなのだと思います。別の例えをすると、1年の四季を心にどう描くかです。箱のように描く人もいれば、円のように描く人もいます。

ここでは、Go言語とWebGLを使用して一般的な並行性パターンの映像化をしてみたいと思います。私の描く並行性のプログラムを表現しています。皆さんのイメージとどう異なるのか興味が湧きます。特にRob PikeとSameer Ajmaniがどのように並行性を想像しているか気になります。私の持つイメージも面白いと思います。

では、ここで私の意図することを理解してもらうためにも、基本的な例である「Hello,Concurrent world」から初めましょう。

Hello, Concurrent world

コード自体は、チャネル1つ、goroutine 1つ、書き込み1回、読み込み1回と、とても簡単です。

package main

func main() {
    // create new channel of type int
    ch := make(chan int)

    // start new anonymous goroutine
    go func() {
        // send 42 to channel
        ch <- 42
    }()
    // read from channel
    <-ch
}

インタラクティブWebGLアニメーションへ移動します

Hello, World
アニメーションの青い線は時間の経過と共にgoroutineが実行される様子を示しています。「main」と「go #19」を接続している青い線は親子関係を示すgoroutineの開始と終了で、赤い矢印は送信/受信の実行を表します。送信と受信は2つの異なる動きですが、「AからBへ送信する」1つのイベントとしてアニメーションにしました。「#19」のgoroutine名は実際のgouroutineの内部IDで、Scott Mansfieldの記事『Goroutine IDs』で提案された方法でランタイムから取得したものです。

タイマー

このアプローチを使用して簡単なタイマーを構築することもできます。チャネルを作成し、goroutineを開始します。このgoroutineは一定時間後に前述のチャネルに書き込みを行い、関数の呼び出しもとにこのチャネルを返すものにします。すると、呼び出し元はその間チャネルからの読み込みをブロックします。では、実際にタイマーを24回実行し、それを映像化しましょう。

package main

import "time"

func timer(d time.Duration) <-chan int {
    c := make(chan int)
    go func() {
        time.Sleep(d)
        c <- 1
    }()
    return c
}

func main() {
    for i := 0; i < 24; i++ {
        c := timer(1 * time.Second)
        <-c
    }
}

インタラクティブWebGLアニメーションへ移動します

Recurrent Timers
すごいですよね。では次に移りましょう。

ピンポン

GooglerであるSameer Ajmaniの『Advanced Go Concurrency Patterns(Go言語の並行性パターン上級編)』というプレゼンで、並行性の良い例が出されています。もちろん、高度なパターンではありませんが、まだGo言語の並行性に馴染みがない人には新鮮で興味をそそられると思います。

ここでは、チャネルをピンポン(卓球)用のテーブルとします。ピンポン玉は整数変数とし、打つプレーヤーはgoroutineとします。そして、ピンポン玉が打たれるたびに、値が増えます(カウンタの数が増えます)。

package main

import "time"

func main() {
    var Ball int
    table := make(chan int)
    go player(table)
    go player(table)

    table <- Ball
    time.Sleep(1 * time.Second)
    <-table
}

func player(table chan int) {
    for {
        ball := <-table
        ball++
        time.Sleep(100 * time.Millisecond)
        table <- ball
    }
}

インタラクティブWebGLアニメーションへ移動します

Ping-Pong
ここで、上のリンクをクリックして、インタラクティブWebGLアニメーションを見てください(Ctrl/Cmdキーを押しながらクリックすれば新しいタブで開くことができます)。そして、実際に遊んでみてください。アニメーションの速度を下げたり上げたりできますし、角度を変えて見たりすることもできます。

では、プレーヤーの数を2から3に増やしましょう。

    go player(table)
    go player(table)
    go player(table)

インタラクティブWebGLアニメーションへ移動します

Ping-Pong 3
それぞれのプレーヤーが順番に連続して打っているのが分かりますが、どうしてこうなのか不思議ですよね。なぜgoroutineでは、このように規則正しく球を受け取っているのでしょう。

これは、Go言語のランタイムは、FIFO(先入れ先出し)キューで受け取りを待つからです(goroutineは特定のチャネルから受信できるような状態になります)。上の例では、プレーヤーがボールを別のプレーヤーにパスした直後に受け取る準備をしています。少し複雑な例を使って確認しましょう。プレーヤーを100人に増やします。

for i := 0; i < 100; i++ {
    go player(table)
}

インタラクティブWebGLアニメーションへ移動します

Ping-Pong 100
上では、FIFOの順番は明確ですね。Goroutineを100万にも増やすことができます(安価なので)。しかし、目的は実行することで数を増やすことではないので、ここでは増やしません。では、別のもので遊んでみましょう。例えば、人気のあるメッセージングパターンを使ってみましょう。

ファンイン

並行性の世界で人気のあるメッセージングパターンはファンインパターンです。これはファンアウトパターンの逆になるのですが、ファンアウトパターンについては後ほど説明します。簡単に言うと、ファンインとは、複数の入力を読み取り、それらを1つのチャネルに集約し受信する機能です。

例:

package main

import (
    "fmt"
    "time"
)

func producer(ch chan int, d time.Duration) {
    var i int
    for {
        ch <- i
        i++
        time.Sleep(d)
    }
}

func reader(out chan int) {
    for x := range out {
        fmt.Println(x)
    }
}

func main() {
    ch := make(chan int)
    out := make(chan int)
    go producer(ch, 100*time.Millisecond)
    go producer(ch, 250*time.Millisecond)
    go reader(out)
    for i := range ch {
        out <- i
    }
}

インタラクティブWebGLアニメーションへ移動します

Fan-In Pattern
このように、producer 1が毎100ミリ秒ごとに値を生成し、producer 2が毎250ミリ秒ごとに生成しますが、readerはすぐに両方のproducerから値を受け取っています。多重に送信されたものは図のmainのループ域で集約されます。

Workers

ファンインパターンの反対はファンアウトあるいはworkersパターンと言います。複数のgoroutineが1つのチャネルから読み取り、CPUコアにそれぞれ作業(work)を分配するため、workersパターンと呼ばれています。Go言語では、このパターンを実装するのは簡単です。チャネルをパラメータとした複数のgoroutineを起動し、複数の値をチャネルに送信するだけです。作業の分配や多重送信の集約はGoランタイムによって魔法がかかったように自動処理されます。

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(tasksCh <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        task, ok := <-tasksCh
        if !ok {
            return
        }
        d := time.Duration(task) * time.Millisecond
        time.Sleep(d)
        fmt.Println("processing task", task)
    }
}

func pool(wg *sync.WaitGroup, workers, tasks int) {
    tasksCh := make(chan int)

    for i := 0; i < workers; i++ {
        go worker(tasksCh, wg)
    }

    for i := 0; i < tasks; i++ {
        tasksCh <- i
    }

    close(tasksCh)
}

func main() {
    var wg sync.WaitGroup
    wg.Add(36)
    go pool(&wg, 36, 50)
    wg.Wait()
}

インタラクティブWebGLアニメーションへ移動します

Workers
ここで、覚えておくと良いのが、並列化です。見たとおり、全てのgoroutineは並列に「実行」し、チャネルから「作業(work)」が来るのを待ちます。上のアニメーションを見ると分かるように、goroutineは次々と作業を受け取っています。残念ながら、上のアニメーションでgoroutineが作業中なのか作業待ちなのかをカラーで表していませんが、GOMAXPROCS=4で記録されているので、4つのgoroutineだけが効果的に並列して実行されています。これについてはまた後ほど触れます。

ここでは、さらに複雑なことをしましょう。workersの中にさらにworkers(subworkers)を作ります。

package main

import (
    "fmt"
    "sync"
    "time"
)

const (
    WORKERS    = 5
    SUBWORKERS = 3
    TASKS      = 20
    SUBTASKS   = 10
)

func subworker(subtasks chan int) {
    for {
        task, ok := <-subtasks
        if !ok {
            return
        }
        time.Sleep(time.Duration(task) * time.Millisecond)
        fmt.Println(task)
    }
}

func worker(tasks <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        task, ok := <-tasks
        if !ok {
            return
        }

        subtasks := make(chan int)
        for i := 0; i < SUBWORKERS; i++ {
            go subworker(subtasks)
        }
        for i := 0; i < SUBTASKS; i++ {
            task1 := task * i
            subtasks <- task1
        }
        close(subtasks)
    }
}

func main() {
    var wg sync.WaitGroup
    wg.Add(WORKERS)
    tasks := make(chan int)

    for i := 0; i < WORKERS; i++ {
        go worker(tasks, &wg)
    }

    for i := 0; i < TASKS; i++ {
        tasks <- i
    }

    close(tasks)
    wg.Wait()
}

インタラクティブWebGLアニメーションへ移動します

Workers of workers
いいですね。もちろん、workersとsubworkersの数を大きくすることはできますが、ここでは、分かりやすいアニメーションにするために少なくしています。

workersやsubworkersを膨大な数にするとか、チャネルを多重送信するといった、もっとカッコいいファンアウトパターンは存在します。しかし、今はファンアウトパターンをなんとなく理解できれば十分です。

サーバ

次の一般的なパターンはファンアウトに似ていますが、同じことを達成するために短期間だけgoroutineを生成するパターンです。これは、サーバを実装する際に使用されるもので、リスナを作成し、accept関数をループで実行し、それぞれの接続のgoroutineを起動します。表現機能としては高く、サーバハンドラをできるだけ簡単に実装することができます。次の簡単な例を見てください。

package main

import "net"

func handler(c net.Conn) {
    c.Write([]byte("ok"))
    c.Close()
}

func main() {
    l, err := net.Listen("tcp", ":5000")
    if err != nil {
        panic(err)
    }
    for {
        c, err := l.Accept()
        if err != nil {
            continue
        }
        go handler(c)
    }
}

インタラクティブWebGLアニメーションへ移動します

Servers
あまり面白いとは言えません。並行性という意味では何も起きていないように見えます。もちろん水面下で複雑なことがたくさん起きていますが、意図的に隠されています。「Simplicity is complicated」(簡単は複雑)なのです。

では、並行性に戻りましょう。サーバにインタラクションを追加しましょう。例えば、それぞれのハンドラが非同期的にデータロガに書き込む必要があるとします。この例ではデータロガは、個別のgoroutineとして実行されます。

package main

import (
    "fmt"
    "net"
    "time"
)

func handler(c net.Conn, ch chan string) {
    ch <- c.RemoteAddr().String()
    c.Write([]byte("ok"))
    c.Close()
}

func logger(ch chan string) {
    for {
        fmt.Println(<-ch)
    }
}

func server(l net.Listener, ch chan string) {
    for {
        c, err := l.Accept()
        if err != nil {
            continue
        }
        go handler(c, ch)
    }
}

func main() {
    l, err := net.Listen("tcp", ":5000")
    if err != nil {
        panic(err)
    }
    ch := make(chan string)
    go logger(ch)
    go server(l, ch)
    time.Sleep(10 * time.Second)
}

インタラクティブWebGLアニメーションへ移動します

Servers 2
とてもわかりやすいですよね。しかし、リクエストの数が増加しロギングに時間がかかるようになった場合(データの準備や暗号化など)、データロガーのgoroutineがボトルネックになることが分かると思います。これを解消するために、先ほどのファンアウトパターンを使用することができます。では、やってみましょう。

サーバ + worker

サーバ + workerの例は少し高度なバージョンのデータロガです。作業するだけでなく、resultsチャネルを使用して作業結果をプールに返します。すごいことではありませんが、これの例を実用的なデータロガに近づけることができます。

では、コードとアニメーションを確認してみましょう。

package main

import (
    "net"
    "time"
)

func handler(c net.Conn, ch chan string) {
    addr := c.RemoteAddr().String()
    ch <- addr
    time.Sleep(100 * time.Millisecond)
    c.Write([]byte("ok"))
    c.Close()
}

func logger(wch chan int, results chan int) {
    for {
        data := <-wch
        data++
        results <- data
    }
}

func parse(results chan int) {
    for {
        <-results
    }
}

func pool(ch chan string, n int) {
    wch := make(chan int)
    results := make(chan int)
    for i := 0; i < n; i++ {
        go logger(wch, results)
    }
    go parse(results)
    for {
        addr := <-ch
        l := len(addr)
        wch <- l
    }
}

func server(l net.Listener, ch chan string) {
    for {
        c, err := l.Accept()
        if err != nil {
            continue
        }
        go handler(c, ch)
    }
}

func main() {
    l, err := net.Listen("tcp", ":5000")
    if err != nil {
        panic(err)
    }
    ch := make(chan string)
    go pool(ch, 4)
    go server(l, ch)
    time.Sleep(10 * time.Second)
}

インタラクティブWebGLアニメーションへ移動します

Server + Worker
作業を4つのgoroutineに分配し、効果的にデータロガのスループットを改善することができましたが、このアニメーションを見ると、データロガがまだ問題の原因になっている恐れがあることが分かります。何千もの接続が、分配以前は1つのチャネルに集約されており、結局はまたしてもデータロガがボトルネックとなってしまう可能性があります。しかし、実際に障害となるのは負荷がもっと高い場合だと思います。

並行性の素数検索アルゴリズム

ファンインとファンアウトは終りにして次に移りましょう。さらに洗練された並行性のアルゴリズムを見てみましょう。個人的にとても好きな例は、 『Go Concurrency Patterns(Go言語の並行性パターン)』というプレゼンに出てきた並行性の素数のふるいです。素数のふるい、またはエラトステネスの篩は、指定された範囲内で素数を探索する古いアルゴリズムです。素数を1つずつ取り出し、その素数の倍数を全て数列から順番に除外していきます。単純なアルゴリズムですが、特にマルチコア搭載の機械では効率を悪くします。

このアルゴリズムを並行化した応用では、goroutineを使用して値を素数フィルターにかけます。1つの素数の探索に1つのgoroutineが使用され、チャネルはジェネレータからフィルターへと値の送信に使用されます。素数が見つかると、チャネルを経てmainへと送信され、出力に用いられます。大きな素数を少ない計算複雑性のオーダーで探したい場合などは特にですが、当然このアルゴリズムは効率的ではありません。しかし、私はこれをとても優美だと思います。

// A concurrent prime sieve
package main

import "fmt"

// Send the sequence 2, 3, 4, ... to channel 'ch'.
func Generate(ch chan<- int) {
    for i := 2; ; i++ {
        ch <- i // Send 'i' to channel 'ch'.
    }
}

// Copy the values from channel 'in' to channel 'out',
// removing those divisible by 'prime'.
func Filter(in <-chan int, out chan<- int, prime int) {
    for {
        i := <-in // Receive value from 'in'.
        if i%prime != 0 {
            out <- i // Send 'i' to 'out'.
        }
    }
}

// The prime sieve: Daisy-chain Filter processes.
func main() {
    ch := make(chan int) // Create a new channel.
    go Generate(ch)      // Launch Generate goroutine.
    for i := 0; i < 10; i++ {
        prime := <-ch
        fmt.Println(prime)
        ch1 := make(chan int)
        go Filter(ch, ch1, prime)
        ch = ch1
    }
}

インタラクティブWebGLアニメーションへ移動します

PrimeSieve
上のアニメーションをインタラクティブモードで自由に遊んでください。図自体が説明となっているため、このアルゴリズムを理解しやすいと思います。Goroutineのgenerate関数は2以上の自然数から順番に素数を取り出し、新しいgoroutineによって特定の素数(2、3、5、7など)の倍数をフィルターします。最初に見つかった素数はmainへと送信されます。アニメーションの角度を変え上から見てみると、gouroutineからmainへ送信された全ての数字が素数であることが見えます。特に3Dでは美しいアルゴリズムです。

GOMAXPROCS

では、前述のworkersの例に戻りましょう。GOMAXPROCS=4でアニメーションを実行したのを覚えていますよね。これは、ここのアニメーションが芸術ではなく、実際のプログラムを映し出すものだからなのです。

まず、GOMAXPROCSのおさらいをしましょう。

複数のCPUで同時処理させたい場合に、GOMAXPROCSでCPUの最大数を設定できる。

もちろんCPUとは論理CPUのことを意味しています。前述の例で実際のCPUランタイムを使用して作業が(スリープだけでなく)実行されるよう少し修正しました。GOMAXPROCSの値の変更以外は一切せずプログラムを実行しました。私のLinuxマシンには12コアのCPUが2つあり、合計24コアとなります。

では、まずプログラムのデモを1コアで実行してから、24コア全部を使用して実行してみましょう。

WebGLアニメーション – 1 | WebGLアニメーション – 24

GOMAXPROCS1
GOMAXPROCS24

2つのアニメーションの速度は異なります(他のアニメーションの時間と大きさを同じにしてます)ので、違いは一目瞭然です。GOMAXPROCS=1の場合、次のworkerは前のworkerが終了しないと開始しません。GOMAXPROCS=4の場合、速度が上がり、多重化のオーバーヘッドは気になりません。

しかし、ここで重要な点は、GOMAXPROCSを増やすことが必ずしもパフォーマンスの向上につながるわけではなく、悪化する場合があることです。

Goroutineのリーク

他にGoの並行性に関係するものでデモできるものは何かと考えたとき、頭に浮かんだのはgoroutineのリークです。例えば、実行してもgoroutineがスコープを外れた場合にリークが発生することがあります。あるいは、終了条件を忘れ、forループを実行した場合にも生じることがあります。

最初に自分のプログラムでgoroutineのリークを見た時、怖いことを想像してしまい、次の週末にはexpvarmonを書きました。想像したものを映像化してみました。

次を見てください。

インタラクティブWebGLアニメーションへ移動します

Leak
見ているだけで心が痛みます。描かれた線はすべて無駄となり、プログラムを破壊するいつ爆発するか分からない爆弾となります。

並列化は並行性ではない

最後に見せたいのは、並列化と並行性の違いです。これについては頻繁議論の対象となっています。Rob Pikeによる素晴らしいプレゼンがあるので、ぜひ見てください。#mustwatch(必見)ビデオの1つです。

簡単に言うと次のようになります。

並列化は単純に何かを同時で実行すること
並行性はプログラムを構成する方法

従って、並行性のプログラムは並列化している場合と並列化していない場合があります。ある意味並列化と並行性は別物とも言えます。GOMAXPROCSの設定の効果は上のデモで分かったと思います。

これに関する記事やビデオのリンクを貼り付けることもできますが、「百聞は一見にしかず」なので、ここでは、違いを映像化してみましょう。次のアニメーションは並列化の例です。多くのものを並列して実行しています。

インタラクティブWebGLアニメーションへ移動します

Parallelism 1
次も並列化です。

インタラクティブWebGLアニメーションへ移動します

Parallelism 2
しかし、次は並行性です。

PrimeSieve
そしてこれも。

Workers of workers
さらにこれも並行性です。

Ping-Pong 100

アニメーションの作成方法

この記事のアニメーションの作成にあたり、gotracergothree.js のライブラリの2つのプログラムを書きました。まず、gotracerですが、次のことをします。

  • GoプログラムのASTツリーを構文解析し、並行性関係のイベント(Gouroutineの開始と終了、チャネルの作成、チャネルへの送受信)に特別なコマンドと出力を挿入。
  • 生成されたプログラムを実行。
  • この特別な出力を分析し、イベントの説明とタイムスタンプのあるJSONファイルを作成。

作成されたJSONファイルは次のとおりです。

JSON sample
では、gothree.jsですが、これは、素晴らしいThree.jsの力を借りて3Dの線やオブジェクトを、WebGLを使用して描きました。1つのhtmlページに収まるようラッパーを使用すれば出来上がりです。

しかし、この方法には限界があります。正しいトレースを作成するためには、正確に例を選び、チャネルとgoroutineの名前を変更し複雑なコードにします。しかし、これでは、名前が異なってしまうため、チャネルとgoroutineを結び付けることができません。型を実装したチャネルで送信する場合も同じです。また、タイミングの問題があります。標準出力へ出力する場合、値を送信するよりも時間がかかってしまいます。そのため、アニメーションが正しく動かすためにtime.Sleep(ミリ秒単位設定が可能)を設定する必要がありました。

解決できていないことがあるため、私のコードはまだオープンソースにできません。私はDmitry Vyukovのexecution tracer(実行トレーサ)をいじっています。これは、ある程度イベントの詳細を教えてくれますが、送信されている値の情報は含まれていません。自分の望む結果を出せるもっといい方法があるのではないかと思います。アイデアをお持ちの方はぜひTwitterやコメントで教えてください。2週末を要して作成したツールを実用的なGo言語プログラム用のデバッグ、トレースツールに拡張できればと考えています。

もちろんここで紹介した並行性アルゴリズムやパターン以外で映像化できるものはあります。お気軽にリクエストしてください。

では、Go言語を使ってプログラミングを楽しんでください。