100万回のWebSocket接続とGo

こんにちは。私はSergey Kamardin(セルゲイ・カマルディン)です。Mail.Ru(ロシアの電子メールサービス会社)で開発者をしています。

この記事では、どのように私がGoを使って高負荷対応のWebSocketサーバを開発したかについて説明したいと思っています。

パフォーマンス最適化のアイデアやテクニックを通じて、WebSocketの知識はあるもののGoについてはほとんど知らないという方のお役に立てれば幸いです。


1. はじめに

まずは開発に至った経緯について、どうして私たちがこのサーバを必要としたのかを説明しておきましょう。

Mail.Ruには多くのステートフルなシステムがあります。ユーザのeメール保存もその1つです。システム内、およびシステムイベントの状態変更を追跡する方法にはいくつかの種類がありますが、それらは主に状態変更に関するシステム通知、または周期的なシステムのポーリングのいずれかを通じて行われます。

いずれの方法も一長一短ですが、ことメールに関する限りは、ユーザが新しいメールを受け取る速度が速ければ速い方が理想的です。

メールのポーリングでは毎秒約50,000回のHTTPクエリが行われますが、そのうち60%が304ステータスを返します。つまり、メールボックスに変更がないということです。

従って、サーバの負荷軽減とユーザへのメール配信速度向上のために、一方で状態の変更についての通知を、他方でそうした通知の購読を受け取るPub/Subサーバ(バス、メッセージブローカ、イベントチャネルとも呼ばれています)を書き直すという車輪の再発明をするという決断に至りました。

以前は以下のような感じでした。

こちらが現在です。

1つ目の図は以前の状態を示しています。ブラウザは定期的にAPIをポーリングし、Storage(メールボックスサービス)の変更について問い合わせをします。

2つ目の図が示しているのは新しいアーキテクチャです。ブラウザは通知API(Busサーバのクライアント)との間にWebSocket接続を確立します。新しいeメールを受信すると、Storageはそれに関する通知をBusに(1)、そしてBusはその購読者に送信します(2)。その後、APIは受信した通知を送信するための接続を確定し、それをユーザのブラウザに送信します(3)。

今回の記事では、このAPIやWebSocketサーバについて説明したいと思っています。なお、話を進めるについては、このサーバで約300万回のオンライン接続を実行した場合を想定しています。

2. 慣用的な手法

最初に見ていくのは、最適化をせずに、Goの機能だけを使ってサーバの特定の部分を実装する方法です。

net/httpに進む前に、まずはデータの送受信に関する話しから始めましょう。なお、WebSocketプロトコルのにあるデータ(JSONオブジェクトなど)は、以降、パケットと呼ぶことにします。

それでは、前述したパケットの送受信をWebSocket接続上で行うロジックを含んだChannel構造の実装をしてみましょう。

2.1. チャネル構造体

// Packet represents application level data.
type Packet struct {
    ...
}

// Channel wraps user connection.
type Channel struct {
    conn net.Conn    // WebSocket connection.
    send chan Packet // Outgoing packets queue.
}

func NewChannel(conn net.Conn) *Channel {
    c := &Channel{
        conn: conn,
        send: make(chan Packet, N),
    }

    go c.reader()
    go c.writer()

    return c
}

WebSocketチャネルの実装

まずは、読み書き用の2つのゴルーチンの立ち上げから始めましょう。それぞれのゴルーチンには、Goのバージョンとオペレーティングシステムに応じて、2~8KBの初期サイズを持つメモリスタックが必要です。

上に挙げた300万回のオンライン接続に関しては、全ての接続を合わせて24GBのメモリ(4KBのスタック)が必要になります。ただし、これにはChannel構造や送信パケットのch.send、およびその他の内部フィールドに割り当てられたメモリは含まれていません。

2.2. I/O用のゴルーチン

以下は、”reader”の実装です。

func (c *Channel) reader() {
    // We make a buffered read to reduce read syscalls.
    buf := bufio.NewReader(c.conn)

    for {
        pkt, _ := readPacket(buf)
        c.handle(pkt)
    }
}

チャネルの読み込み用ゴルーチン

ここでは、read()システムコールの回数を減らし、かつbufバッファサイズが許す限り多く読み込むために、bufio.Readerを使用しています。無限ループの中で、新しいデータを待つ状態です。この新しいデータを待つという言葉は、後にも出てくるので覚えておいてください。

なお、受信パケットの解析と処理については、今回の最適化にとってはそれほど重要ではないので触れずに進めていきます。一方で重要なのがbufです。デフォルトでは4KBなので、今回の接続にとっては、更に12GBのメモリが必要になってくるということになります。また、”writer”でも、これは同じです。

func (c *Channel) writer() {
    // We make buffered write to reduce write syscalls. 
    buf := bufio.NewWriter(c.conn)

    for pkt := range c.send {
        _ := writePacket(buf, pkt)
        buf.Flush()
    }
}

チャネルの書き出し用ゴルーチン

送信パケットのチャネルc.sendを反復し、それらをバッファに書き込みます。すでにお気付きの方もいると思いますが、こちらも300万回の接続において、更に4KBと12GBのメモリが必要です。

2.3. HTTP

ここまでで簡単なChannelの実装を見てきました。次は、これと接続するWebSocketを見ていきましょう。第2章のタイトルは慣用的な手法なので、ここもその流れに沿って進めたいと思います。

注:WebSocketの仕組みについて、クライアントはアップグレードというHTTPの特別なメカニズムを利用してWebSocketプロトコルに切り替えます。そしてアップグレード要求の処理が成功すると、サーバとクライアントはバイナリのWebSocketフレームを交換するためにTCP接続を利用します。なお、接続内のフレーム構造についてはこちらをご覧ください。

import (
    "net/http"
    "some/websocket"
)

http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) {
    conn, _ := websocket.Upgrade(r, w)
    ch := NewChannel(conn)
    //...
})

WebSocketにアップグレードする慣用的手法

なおhttp.ResponseWriterは、*http.Requestの初期化と以降の応答書き込みのために、bufio.Readerbufio.Writer(いずれも4KBバッファ)に対してメモリ割り当てを行うことに注意してください。

使用するWebSocketライブラリにかかわらず、アップグレード要求への応答が成功すると、サーバはresponseWriter.Hijack()呼び出し後のTCP接続とともにI/Oバッファを受け取ります

ヒント:状況によっては、go:linknameを使うことで、net/http.putBufio{Reader,Writer}の呼び出しを通じてnet/http内のsync.Poolにバッファを返すことができます。

結果として、300万回の接続には、更に24GBが必要です。

つまり、まだ何もしていないアプリケーションのためにトータル72GBのメモリが必要ということになります。

3. 最適化

“はじめに”でお伝えした内容を振り返ってみましょう。ユーザ接続の動作について、覚えていらっしゃるでしょうか。WebSocketに切り替えた後、クライアントは関連するイベントを含むパケットを送信、つまりイベントを購読します。その後(ping/pongのような技術メッセージは考慮しません)、クライアントはConnection Lifetime(接続の最大存続期間(秒))の全期間において、何も送る必要がありません。

Connection Lifetimeは、数秒から数日まで持続させることができます。

そのようなわけで、Channel.reader()Channel.writer()はほとんどの間、受信または送信するデータの処理を待っています。また、それらと併せて、それぞれ4KBのI/Oバッファも待機しています。

これで、何を改善できるかというのが分かりますよね。

3.1. netpoll

bufio.Reader.Read()内のconn.Read()呼び出しでロックされることによって新しいデータを待つChannel.reader()の実装を覚えていますか。接続内にデータがある場合、Goのランタイムがゴルーチンを”ウェイクアップ”し、次のパケットの読み込みを許可します。その後、ゴルーチンは再びロックされ、新規データを待ちます。ゴルーチンを”ウェイクアップ”する必要があることを、どのようにGoのランタイムは知るのでしょうか。それを見ていきましょう。

conn.Read()の実装を見ると、その中にnet.netFD.Read()呼び出しがあることが分かります。

// net/fd_unix.go

func (fd *netFD) Read(p []byte) (n int, err error) {
    //...
    for {
        n, err = syscall.Read(fd.sysfd, p)
        if err != nil {
            n = 0
            if err == syscall.EAGAIN {
                if err = fd.pd.waitRead(); err == nil {
                    continue
                }
            }
        }
        //...
        break
    }
    //...
}

非ブロッキング読み込みに関するGoの内部

Goは非ブロッキングモードでソケットを使用します。EAGAINによって、ソケット内にデータがなく空のソケットからの読み込みでロックされないことが示されると、OSが制御を返します。

接続のファイル記述子にはread()システムコールがあります。readがEAGAINエラーを返すと、ランタイムはpollDesc.waitRead()呼び出しを実行します。

// net/fd_poll_runtime.go

func (pd *pollDesc) waitRead() error {
   return pd.wait('r')
}

func (pd *pollDesc) wait(mode int) error {
   res := runtime_pollWait(pd.runtimeCtx, mode)
   //...
}

netpoll使用に関するGoの内部

より詳しく見ていけば、netpollの実装は、Linuxではepollを使って、BSDではkqueueを使って行われていることが分かります。今回の接続でも同じ手法を使わない手はありません。本当に必要な時のみ、つまりソケットに本当に読み込み可能なデータがある時のみ、読み込みバッファを割り当て、読み込みのゴルーチンを開始することできます。

github.com/golang/goに、netpoll関数のエクスポートのトピックがあります。

3.2. ゴルーチンの除去

Goのためにnetpoll実装をするとしましょう。それにより内部バッファでChannel.reader()ゴルーチンの開始を回避し、接続内の読み込み可能なデータのイベントを購読できます。

ch := NewChannel(conn)

// Make conn to be observed by netpoll instance.
poller.Start(conn, netpoll.EventRead, func() {
    // We spawn goroutine here to prevent poller wait loop
    // to become locked during receiving packet from ch.
    go Receive(ch)
})

// Receive reads a packet from conn and handles it somehow.
func (ch *Channel) Receive() {
    buf := bufio.NewReader(ch.conn)
    pkt := readPacket(buf)
    c.handle(pkt)
}

netpollの使用

パケット送信時だけゴルーチンを実行し、バッファを割り当てることができるので、Channel.writer()を使う方が容易です。

func (ch *Channel) Send(p Packet) {
    if c.noWriterYet() {
        go ch.writer()
    }
    ch.send <- p
}

必要な時だけwriterゴルーチンを開始する

write()システムコールに対してオペレーティングシステムがEAGAINを返すようなケースを扱っているわけではないことに留意してください。そのような種類のサーバにとっては実に稀なケースなので、そうしたケースではGoランタイムを使います。ですが、必要であれば同様の方法で処理が可能です。

(1つないしいくつかの)ch.sendから発信パケットの読み込み後、writerは操作を終了し、ゴルーチンスタックと送信バッファを解放します。

これで完璧ですね。継続的に稼働している2つのゴルーチンのスタックとI/Oバッファを取り除くことで、48GB節約できました。

3.3. リソースの管理

接続数が非常に多くなると、メモリ消費量の増大を招くだけではありません。私は、サーバの開発時に、競合状態やデッドロックを何度も経験しました。またそれに続き、しばしば、いわゆるセルフDDoSという、アプリケーションクライアントがサーバに接続しようと猛烈に試行して、サーバへ侵入しようとする状態も経験しました。

例えば、何らかの理由で突然ping/pongメッセージを扱えなくなったものの、アイドル接続のハンドラはこうした接続をクローズし続けたとします(接続が切れたため、データを一切提供できないとします)。クライアント側はN秒毎の接続が切れたと思い、イベントを待たずに再接続を試みました。

もしロックや過負荷となったサーバが、新たな接続の受け入れをただちに中止し、バランサがその前に(例えばnginx)、次のサーバインスタンスに要求を渡すことができれば、それに越したことはありません。

更に、サーバに負荷をかけることになりますが、それでも何らかの理由で(恐らくバグのために)全てのクライアントがサーバに突然パケットを送信したいような場合は、接続毎のゴルーチンとバッファの初期状態に実際に回復するので、先ほど節約した48GBが再利用されます。

ゴルーチンのプール

ゴルーチンのプールを使うことで、同時に扱われるパケットの数を制限できます。そうしたプールの単純な実装は、以下のようになります。

package gopool

func New(size int) *Pool {
    return &Pool{
        work: make(chan func()),
        sem:  make(chan struct{}, size),
    }
}

func (p *Pool) Schedule(task func()) error {
    select {
    case p.work <- task:
    case p.sem <- struct{}{}:
        go p.worker(task)
    }
}

func (p *Pool) worker(task func()) {
    defer func() { <-p.sem }
    for {
        task()
        task = <-p.work
    }
}

ゴルーチンプールの単純な実装

netpollを用いたコードは以下のようになります。

pool := gopool.New(128)

poller.Start(conn, netpoll.EventRead, func() {
    // We will block poller wait loop when
    // all pool workers are busy.
    pool.Schedule(func() {
        Receive(ch)
    })
})

ゴルーチンプール内のPollerイベントの処理

これにより、ソケット内に読み込み可能な状態のデータが出現した時だけでなく、プール内のフリーなゴルーチンを利用する最初の機会の到来時に、パケットを読み込みます。

同様にSend()を変更します。

pool := gopool.New(128)

func (ch *Channel) Send(p Packet) {
    if c.noWriterYet() {
        pool.Schedule(ch.writer)
    }
    ch.send <- p
}

書き出しゴルーチンの再利用

go ch.writer()の代わりに、再利用されたゴルーチンの1つに書き込みたいと思います。そうすれば、Nのゴルーチンのプールのために、同時に処理されたNの要求を保証できます。そして、届いたN+1の要求には、読み込み用のN+1のバッファの割り当てを行いません。ゴルーチンプールにより、新たな接続のAccept()Upgrade()を制限し、DDoSによるほとんどの状況を回避することができます。

3.4. ゼロコピーのアップグレード

WebSocketのプロトコルは、いったん脇に置きましょう。前に述べたようにクライアントはHTTPのアップグレード要求を使用して、WebSocketのプロトコルに切り替えます。それは次のようになります。

GET /ws HTTP/1.1
Host: mail.ru
Connection: Upgrade
Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==
Sec-Websocket-Version: 13
Upgrade: websocket

HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=
Upgrade: websocket

HTTPのアップグレードの例

私たちの場合、WebSocketプロトコルに切り替えるためだけに、HTTP要求とそのヘッダが必要でした。この知識やhttp.Requestの内部に保存されているものには、最適化のために、恐らく、HTTP要求を処理する際の不必要な割り当てやコピーの拒否が可能であり、標準のnet/httpサーバを放棄できるだろうという示唆が含まれています。

例えば、http.Requestには同名のヘッダタイプを持ったフィールドがあります。このフィールドには、文字列値への接続からデータをコピーすることにより、無条件に全ての要求ヘッダが詰まっています。例えば、大きなサイズのクッキーヘッダのために、このフィールド内に余計なデータがどのくらい保持されているか想像してみてください。

WebSocketの実装

残念ながら、私たちのサーバを最適化する際の既存のライブラリはどれも、標準のnet/httpサーバだけをアップグレードできるものでした。更に、(2つの)ライブラリのどちらを使っても、前述した全ての読み込み、書き出しの最適化はできません。これらの最適化を有効にするには、WebSocketと稼動する、かなり低レベルなAPIがなければなりません。バッファを再利用するために、以下のようなプロトコル機能が必要です。

func ReadFrame(io.Reader) (Frame, error)
func WriteFrame(io.Writer, Frame) error

このようなAPIを備えたライブラリがあれば、以下のような接続からパケットを読み込めたでしょう(パケットの書き出しも同様です)。

// getReadBuf, putReadBuf are intended to 
// reuse *bufio.Reader (with sync.Pool for example).
func getReadBuf(io.Reader) *bufio.Reader
func putReadBuf(*bufio.Reader)

// readPacket must be called when data could be read from conn.
func readPacket(conn io.Reader) error {
    buf := getReadBuf()
    defer putReadBuf(buf)

    buf.Reset(conn)
    frame, _ := ReadFrame(buf)
    parsePacket(frame.Payload)
    //...
}

求められるWebSocketの実装API

つまり、自分たちで何とかライブラリを作るしかなかったのです。

github.com/gobwas/ws

理想を言えば、wsライブラリは、そのプロトコルの操作ロジックをユーザに強制しない形で記述されていることが望ましいでしょう。読み込みと書き出しメソッドは全て、標準のio.Readerio.Writerインタフェースを受け入れます。それにより、バッファリングやその他のI/Oラッパを使っても使わなくても、どちらも可能となります。

標準のnet/httpからのアップグレード要求に加えて、wsゼロコピーアップグレードをサポートします。これは、メモリの割り当てやコピーをせずにアップグレード要求とWebSocketへの切り替えを処理するものです。ws.Upgrade()io.ReadWriternet.Connはこのインタフェースを実装)を受け入れます。言い換えると、私たちは標準のnet.Listen()を使用でき、更にln.Accept()から受け取った接続をすぐにws.Upgrade()に移すことができます。このライブラリにより、アプリケーション内で将来使用するために、あらゆるリクエストデータのコピーが可能になります(例えばセッションを確認するCookieなど)。

以下に、アップグレード要求処理のベンチマークを記載しました。標準のnet/httpサーバとゼロコピーアップグレードのnet.Listen()を比較したものです。

BenchmarkUpgradeHTTP    5156 ns/op    8576 B/op    9 allocs/op
BenchmarkUpgradeTCP     973 ns/op     0 B/op       0 allocs/op

wsへの切り替えとゼロコピーアップグレードにより、net/httpハンドラが要求時に処理するI/Oバッファのために割り当てていた容量を更に24GBを節約することができました。

3.5. まとめ

ここまでお伝えした最適化についてまとめてみましょう。

  • 内部バッファを使った読み込みゴルーチンは高コストです。解決策:netpoll(epoll, kqueue)、バッファの再利用。
  • 内部バッファを使った書き出しゴルーチンは高コストです。解決策:必要な時にゴルーチンを開始。バッファの再利用。
  • 接続の集中に対しては、netpollは有効ではありません。解決策:数を制限した上でのゴルーチンの再利用。
  • net/httpはWebSocketへアップグレードする最速の方法ではありません。解決策:単なるTCP接続については、ゼロコピーアップグレードを使用。

これにより、サーバのコードは以下のようになります。

import (
    "net"
    "github.com/gobwas/ws"
)

ln, _ := net.Listen("tcp", ":8080")

for {
    // Try to accept incoming connection inside free pool worker.
    // If there no free workers for 1ms, do not accept anything and try later.
    // This will help us to prevent many self-ddos or out of resource limit cases.
    err := pool.ScheduleTimeout(time.Millisecond, func() {
        conn := ln.Accept()
        _ = ws.Upgrade(conn)

        // Wrap WebSocket connection with our Channel struct.
        // This will help us to handle/send our app's packets.
        ch := NewChannel(conn)

        // Wait for incoming bytes from connection.
        poller.Start(conn, netpoll.EventRead, func() {
            // Do not cross the resource limits.
            pool.Schedule(func() {
                // Read and handle incoming packet(s).
                ch.Recevie()
            })
        })
    })
    if err != nil {   
        time.Sleep(time.Millisecond)
    }
}

Netpoll、ゴルーチンプールとゼロコピーアップグレードを用いたWebSocketサーバの例

4. 結論

時期尚早の最適化は諸悪の根源である(少なくともほとんどの場合)。Donald Knuth

もちろん、前述した数々の最適化は妥当なものですが、全ての場合においてそうだとは言えません。例えば、フリーリソース(メモリ、CPU)とオンライン接続数を比較して、フリーリソースの割合がかなり高い場合、恐らく最適化する意味はありません。しかし、どこで、どんな改善をするべきかという知識を持つことは大変有益なことです。

読んでいただきありがとうございました。

5. 参考文献等