2017年11月28日
100万回のWebSocket接続とGo
(2017-08-02)by Sergey Kamardin
本記事は、原著者の許諾のもとに翻訳・掲載しております。
こんにちは。私はSergey Kamardin(セルゲイ・カマルディン)です。Mail.Ru(ロシアの電子メールサービス会社)で開発者をしています。
この記事では、どのように私がGoを使って高負荷対応のWebSocketサーバを開発したかについて説明したいと思っています。
パフォーマンス最適化のアイデアやテクニックを通じて、WebSocketの知識はあるもののGoについてはほとんど知らないという方のお役に立てれば幸いです。
1. はじめに
まずは開発に至った経緯について、どうして私たちがこのサーバを必要としたのかを説明しておきましょう。
Mail.Ruには多くのステートフルなシステムがあります。ユーザのeメール保存もその1つです。システム内、およびシステムイベントの状態変更を追跡する方法にはいくつかの種類がありますが、それらは主に状態変更に関するシステム通知、または周期的なシステムのポーリングのいずれかを通じて行われます。
いずれの方法も一長一短ですが、ことメールに関する限りは、ユーザが新しいメールを受け取る速度が速ければ速い方が理想的です。
メールのポーリングでは毎秒約50,000回のHTTPクエリが行われますが、そのうち60%が304ステータスを返します。つまり、メールボックスに変更がないということです。
従って、サーバの負荷軽減とユーザへのメール配信速度向上のために、一方で状態の変更についての通知を、他方でそうした通知の購読を受け取るPub/Subサーバ(バス、メッセージブローカ、イベントチャネルとも呼ばれています)を書き直すという車輪の再発明をするという決断に至りました。
以前は以下のような感じでした。
こちらが現在です。
1つ目の図は以前の状態を示しています。ブラウザは定期的にAPIをポーリングし、Storage(メールボックスサービス)の変更について問い合わせをします。
2つ目の図が示しているのは新しいアーキテクチャです。ブラウザは通知API(Busサーバのクライアント)との間にWebSocket接続を確立します。新しいeメールを受信すると、Storageはそれに関する通知をBusに(1)、そしてBusはその購読者に送信します(2)。その後、APIは受信した通知を送信するための接続を確定し、それをユーザのブラウザに送信します(3)。
今回の記事では、このAPIやWebSocketサーバについて説明したいと思っています。なお、話を進めるについては、このサーバで約300万回のオンライン接続を実行した場合を想定しています。
2. 慣用的な手法
最初に見ていくのは、最適化をせずに、Goの機能だけを使ってサーバの特定の部分を実装する方法です。
net/http
に進む前に、まずはデータの送受信に関する話しから始めましょう。なお、WebSocketプロトコルの 上 にあるデータ(JSONオブジェクトなど)は、以降、 パケット と呼ぶことにします。
それでは、前述したパケットの送受信をWebSocket接続上で行うロジックを含んだ Channel
構造の実装をしてみましょう。
2.1. チャネル構造体
// Packet represents application level data.
type Packet struct {
...
}
// Channel wraps user connection.
type Channel struct {
conn net.Conn // WebSocket connection.
send chan Packet // Outgoing packets queue.
}
func NewChannel(conn net.Conn) *Channel {
c := &Channel{
conn: conn,
send: make(chan Packet, N),
}
go c.reader()
go c.writer()
return c
}
WebSocketチャネルの実装
まずは、読み書き用の2つのゴルーチンの立ち上げから始めましょう。それぞれのゴルーチンには、Goのバージョンと オペレーティングシステムに応じて 、2~8KBの初期サイズを持つメモリスタックが必要です。
上に挙げた300万回のオンライン接続に関しては、全ての接続を合わせて 24GBのメモリ (4KBのスタック)が必要になります。ただし、これには Channel
構造や送信パケットの ch.send
、およびその他の内部フィールドに割り当てられたメモリは含まれていません。
2.2. I/O用のゴルーチン
以下は、”reader”の実装です。
func (c *Channel) reader() {
// We make a buffered read to reduce read syscalls.
buf := bufio.NewReader(c.conn)
for {
pkt, _ := readPacket(buf)
c.handle(pkt)
}
}
チャネルの読み込み用ゴルーチン
ここでは、 read()
システムコールの回数を減らし、かつ buf
バッファサイズが許す限り多く読み込むために、 bufio.Reader
を使用しています。無限ループの中で、新しいデータを待つ状態です。この 新しいデータを待つ という言葉は、後にも出てくるので覚えておいてください。
なお、受信パケットの解析と処理については、今回の最適化にとってはそれほど重要ではないので触れずに進めていきます。一方で重要なのが buf
です。デフォルトでは4KBなので、今回の接続にとっては、更に 12GB のメモリが必要になってくるということになります。また、”writer”でも、これは同じです。
func (c *Channel) writer() {
// We make buffered write to reduce write syscalls.
buf := bufio.NewWriter(c.conn)
for pkt := range c.send {
_ := writePacket(buf, pkt)
buf.Flush()
}
}
チャネルの書き出し用ゴルーチン
送信パケットのチャネル c.send
を反復し、それらをバッファに書き込みます。すでにお気付きの方もいると思いますが、こちらも300万回の接続において、更に4KBと 12GB のメモリが必要です。
2.3. HTTP
ここまでで簡単な Channel
の実装を見てきました。次は、これと接続するWebSocketを見ていきましょう。第2章のタイトルは 慣用的な手法 なので、ここもその流れに沿って進めたいと思います。
注:WebSocketの仕組みについて、クライアントはアップグレードというHTTPの特別なメカニズムを利用してWebSocketプロトコルに切り替えます。そしてアップグレード要求の処理が成功すると、サーバとクライアントはバイナリのWebSocketフレームを交換するためにTCP接続を利用します。なお、接続内のフレーム構造については こちら をご覧ください。
import (
"net/http"
"some/websocket"
)
http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) {
conn, _ := websocket.Upgrade(r, w)
ch := NewChannel(conn)
//...
})
WebSocketにアップグレードする慣用的手法
なお http.ResponseWriter
は、 *http.Request
の初期化と以降の応答書き込みのために、 bufio.Reader
と bufio.Writer
(いずれも4KBバッファ)に対してメモリ割り当てを行うことに注意してください。
使用するWebSocketライブラリにかかわらず、アップグレード要求への応答が成功すると、サーバは responseWriter.Hijack()
呼び出し後のTCP接続とともにI/Oバッファを 受け取ります 。
ヒント:状況によっては、
go:linkname
を使うことで、net/http.putBufio{Reader,Writer}
の呼び出しを通じてnet/http
内のsync.Pool
にバッファを返すことができます。
結果として、300万回の接続には、更に24GBが必要です。
つまり、まだ何もしていないアプリケーションのためにトータル72GBのメモリが必要ということになります。
3. 最適化
“はじめに”でお伝えした内容を振り返ってみましょう。ユーザ接続の動作について、覚えていらっしゃるでしょうか。WebSocketに切り替えた後、クライアントは関連するイベントを含むパケットを送信、つまりイベントを購読します。その後( ping/pong
のような技術メッセージは考慮しません)、クライアントはConnection Lifetime(接続の最大存続期間(秒))の全期間において、何も送る必要がありません。
Connection Lifetimeは、数秒から数日まで持続させることができます。
そのようなわけで、 Channel.reader()
と Channel.writer()
はほとんどの間、受信または送信するデータの処理を待っています。また、それらと併せて、それぞれ4KBのI/Oバッファも待機しています。
これで、何を改善できるかというのが分かりますよね。
3.1. netpoll
bufio.Reader.Read()
内の conn.Read()
呼び出しでロックされることによって 新しいデータを待つ Channel.reader()
の実装を覚えていますか。接続内にデータがある場合、Goのランタイムがゴルーチンを”ウェイクアップ”し、次のパケットの読み込みを許可します。その後、ゴルーチンは再びロックされ、新規データを待ちます。ゴルーチンを”ウェイクアップ”する必要があることを、どのようにGoのランタイムは知るのでしょうか。それを見ていきましょう。
conn.Read()の実装 を見ると、その中に net.netFD.Read()呼び出し があることが分かります。
// net/fd_unix.go
func (fd *netFD) Read(p []byte) (n int, err error) {
//...
for {
n, err = syscall.Read(fd.sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN {
if err = fd.pd.waitRead(); err == nil {
continue
}
}
}
//...
break
}
//...
}
非ブロッキング読み込みに関するGoの内部
Goは非ブロッキングモードでソケットを使用します。EAGAINによって、ソケット内にデータがなく空のソケットからの読み込みでロックされないことが示されると、OSが制御を返します。
接続のファイル記述子には read()
システムコールがあります。readが EAGAINエラー を返すと、ランタイムは pollDesc.waitRead()呼び出し を実行します。
// net/fd_poll_runtime.go
func (pd *pollDesc) waitRead() error {
return pd.wait('r')
}
func (pd *pollDesc) wait(mode int) error {
res := runtime_pollWait(pd.runtimeCtx, mode)
//...
}
netpoll使用に関するGoの内部
より詳しく見ていけば 、netpollの実装は、Linuxでは epoll を使って、BSDでは kqueue を使って行われていることが分かります。今回の接続でも同じ手法を使わない手はありません。本当に必要な時のみ、つまりソケットに本当に読み込み可能なデータがある時のみ、読み込みバッファを割り当て、読み込みのゴルーチンを開始することできます。
github.com/golang/goに、netpoll関数のエクスポートの トピック があります。
3.2. ゴルーチンの除去
Goのために netpoll実装 をするとしましょう。それにより内部バッファで Channel.reader()
ゴルーチンの開始を回避し、接続内の読み込み可能なデータのイベントを購読できます。
ch := NewChannel(conn)
// Make conn to be observed by netpoll instance.
poller.Start(conn, netpoll.EventRead, func() {
// We spawn goroutine here to prevent poller wait loop
// to become locked during receiving packet from ch.
go Receive(ch)
})
// Receive reads a packet from conn and handles it somehow.
func (ch *Channel) Receive() {
buf := bufio.NewReader(ch.conn)
pkt := readPacket(buf)
c.handle(pkt)
}
netpollの使用
パケット送信時だけゴルーチンを実行し、バッファを割り当てることができるので、 Channel.writer()
を使う方が容易です。
func (ch *Channel) Send(p Packet) {
if c.noWriterYet() {
go ch.writer()
}
ch.send <- p
}
必要な時だけwriterゴルーチンを開始する
write()
システムコールに対してオペレーティングシステムがEAGAIN
を返すようなケースを扱っているわけではないことに留意してください。そのような種類のサーバにとっては実に稀なケースなので、そうしたケースではGoランタイムを使います。ですが、必要であれば同様の方法で処理が可能です。
(1つないしいくつかの) ch.send
から発信パケットの読み込み後、writerは操作を終了し、ゴルーチンスタックと送信バッファを解放します。
これで完璧ですね。継続的に稼働している2つのゴルーチンのスタックとI/Oバッファを取り除くことで、48GB節約できました。
3.3. リソースの管理
接続数が非常に多くなると、メモリ消費量の増大を招くだけではありません。私は、サーバの開発時に、競合状態やデッドロックを何度も経験しました。またそれに続き、しばしば、いわゆるセルフDDoSという、アプリケーションクライアントがサーバに接続しようと猛烈に試行して、サーバへ侵入しようとする状態も経験しました。
例えば、何らかの理由で突然 ping/pong
メッセージを扱えなくなったものの、アイドル接続のハンドラはこうした接続をクローズし続けたとします(接続が切れたため、データを一切提供できないとします)。クライアント側はN秒毎の接続が切れたと思い、イベントを待たずに再接続を試みました。
もしロックや過負荷となったサーバが、新たな接続の受け入れをただちに中止し、バランサがその前に(例えばnginx)、次のサーバインスタンスに要求を渡すことができれば、それに越したことはありません。
更に、サーバに負荷をかけることになりますが、それでも何らかの理由で(恐らくバグのために)全てのクライアントがサーバに突然パケットを送信したいような場合は、接続毎のゴルーチンとバッファの初期状態に実際に回復するので、先ほど節約した 48GB が再利用されます。
ゴルーチンのプール
ゴルーチンのプールを使うことで、同時に扱われるパケットの数を制限できます。そうしたプールの単純な実装は、以下のようになります。
package gopool
func New(size int) *Pool {
return &Pool{
work: make(chan func()),
sem: make(chan struct{}, size),
}
}
func (p *Pool) Schedule(task func()) error {
select {
case p.work <- task:
case p.sem <- struct{}{}:
go p.worker(task)
}
}
func (p *Pool) worker(task func()) {
defer func() { <-p.sem }
for {
task()
task = <-p.work
}
}
ゴルーチンプールの単純な実装
netpoll
を用いたコードは以下のようになります。
pool := gopool.New(128)
poller.Start(conn, netpoll.EventRead, func() {
// We will block poller wait loop when
// all pool workers are busy.
pool.Schedule(func() {
Receive(ch)
})
})
ゴルーチンプール内のPollerイベントの処理
これにより、ソケット内に読み込み可能な状態のデータが出現した時だけでなく、プール内のフリーなゴルーチンを利用する最初の機会の到来時に、パケットを読み込みます。
同様に Send()
を変更します。
pool := gopool.New(128)
func (ch *Channel) Send(p Packet) {
if c.noWriterYet() {
pool.Schedule(ch.writer)
}
ch.send <- p
}
書き出しゴルーチンの再利用
go ch.writer()
の代わりに、再利用されたゴルーチンの1つに書き込みたいと思います。そうすれば、 N
のゴルーチンのプールのために、同時に処理された N
の要求を保証できます。そして、届いた N+1
の要求には、読み込み用の N+1
のバッファの割り当てを行いません。ゴルーチンプールにより、新たな接続の Accept()
と Upgrade()
を制限し、DDoSによるほとんどの状況を回避することができます。
3.4. ゼロコピーのアップグレード
WebSocketのプロトコルは、いったん脇に置きましょう。前に述べたようにクライアントはHTTPのアップグレード要求を使用して、WebSocketのプロトコルに切り替えます。それは次のようになります。
GET /ws HTTP/1.1
Host: mail.ru
Connection: Upgrade
Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==
Sec-Websocket-Version: 13
Upgrade: websocket
HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=
Upgrade: websocket
HTTPのアップグレードの例
私たちの場合、WebSocketプロトコルに切り替えるためだけに、HTTP要求とそのヘッダが必要でした。この知識や http.Request
の内部に 保存されているもの には、最適化のために、恐らく、HTTP要求を処理する際の不必要な割り当てやコピーの拒否が可能であり、標準の net/http
サーバを放棄できるだろうという示唆が含まれています。
例えば、
http.Request
には 同名のヘッダタイプを持ったフィールド があります。このフィールドには、文字列値への接続からデータをコピーすることにより、無条件に全ての要求ヘッダが詰まっています。例えば、大きなサイズのクッキーヘッダのために、このフィールド内に余計なデータがどのくらい保持されているか想像してみてください。
WebSocketの実装
残念ながら、私たちのサーバを最適化する際の既存のライブラリはどれも、標準の net/http
サーバだけをアップグレードできるものでした。更に、(2つの)ライブラリのどちらを使っても、前述した全ての読み込み、書き出しの最適化はできません。これらの最適化を有効にするには、WebSocketと稼動する、かなり低レベルなAPIがなければなりません。バッファを再利用するために、以下のようなプロトコル機能が必要です。
func ReadFrame(io.Reader) (Frame, error)
func WriteFrame(io.Writer, Frame) error
このようなAPIを備えたライブラリがあれば、以下のような接続からパケットを読み込めたでしょう(パケットの書き出しも同様です)。
// getReadBuf, putReadBuf are intended to
// reuse *bufio.Reader (with sync.Pool for example).
func getReadBuf(io.Reader) *bufio.Reader
func putReadBuf(*bufio.Reader)
// readPacket must be called when data could be read from conn.
func readPacket(conn io.Reader) error {
buf := getReadBuf()
defer putReadBuf(buf)
buf.Reset(conn)
frame, _ := ReadFrame(buf)
parsePacket(frame.Payload)
//...
}
求められるWebSocketの実装API
つまり、自分たちで何とかライブラリを作るしかなかったのです。
github.com/gobwas/ws
理想を言えば、 ws
ライブラリは、そのプロトコルの操作ロジックをユーザに強制しない形で記述されていることが望ましいでしょう。読み込みと書き出しメソッドは全て、標準の io.Reader
と io.Writer
インタフェースを受け入れます。それにより、バッファリングやその他のI/Oラッパを使っても使わなくても、どちらも可能となります。
標準の net/http
からのアップグレード要求に加えて、 ws
は ゼロコピーアップグレード をサポートします。これは、メモリの割り当てやコピーをせずにアップグレード要求とWebSocketへの切り替えを処理するものです。 ws.Upgrade()
は io.ReadWriter
( net.Conn
はこのインタフェースを実装)を受け入れます。言い換えると、私たちは標準の net.Listen()
を使用でき、更に ln.Accept()
から受け取った接続をすぐに ws.Upgrade()
に移すことができます。このライブラリにより、アプリケーション内で将来使用するために、あらゆるリクエストデータのコピーが可能になります(例えばセッションを確認する Cookie
など)。
以下に、アップグレード要求処理の ベンチマーク を記載しました。標準の net/http
サーバとゼロコピーアップグレードの net.Listen()
を比較したものです。
BenchmarkUpgradeHTTP 5156 ns/op 8576 B/op 9 allocs/op
BenchmarkUpgradeTCP 973 ns/op 0 B/op 0 allocs/op
ws
への切り替えと ゼロコピーアップグレード により、 net/http
ハンドラが要求時に処理するI/Oバッファのために割り当てていた容量を更に 24GB を節約することができました。
3.5. まとめ
ここまでお伝えした最適化についてまとめてみましょう。
- 内部バッファを使った読み込みゴルーチンは高コストです。 解決策: netpoll(epoll, kqueue)、バッファの再利用。
- 内部バッファを使った書き出しゴルーチンは高コストです。 解決策: 必要な時にゴルーチンを開始。バッファの再利用。
- 接続の集中に対しては、netpollは有効ではありません。 解決策: 数を制限した上でのゴルーチンの再利用。
net/http
はWebSocketへアップグレードする最速の方法ではありません。 解決策: 単なるTCP接続については、ゼロコピーアップグレードを使用。
これにより、サーバのコードは以下のようになります。
import (
"net"
"github.com/gobwas/ws"
)
ln, _ := net.Listen("tcp", ":8080")
for {
// Try to accept incoming connection inside free pool worker.
// If there no free workers for 1ms, do not accept anything and try later.
// This will help us to prevent many self-ddos or out of resource limit cases.
err := pool.ScheduleTimeout(time.Millisecond, func() {
conn := ln.Accept()
_ = ws.Upgrade(conn)
// Wrap WebSocket connection with our Channel struct.
// This will help us to handle/send our app's packets.
ch := NewChannel(conn)
// Wait for incoming bytes from connection.
poller.Start(conn, netpoll.EventRead, func() {
// Do not cross the resource limits.
pool.Schedule(func() {
// Read and handle incoming packet(s).
ch.Recevie()
})
})
})
if err != nil {
time.Sleep(time.Millisecond)
}
}
Netpoll、ゴルーチンプールとゼロコピーアップグレードを用いたWebSocketサーバの例
4. 結論
時期尚早の最適化は諸悪の根源である(少なくともほとんどの場合)。Donald Knuth
もちろん、前述した数々の最適化は妥当なものですが、全ての場合においてそうだとは言えません。例えば、フリーリソース(メモリ、CPU)とオンライン接続数を比較して、フリーリソースの割合がかなり高い場合、恐らく最適化する意味はありません。しかし、どこで、どんな改善をするべきかという知識を持つことは大変有益なことです。
読んでいただきありがとうございました。
5. 参考文献等
株式会社リクルート プロダクト統括本部 プロダクト開発統括室 グループマネジャー 株式会社ニジボックス デベロップメント室 室長 Node.js 日本ユーザーグループ代表
- Twitter: @yosuke_furukawa
- Github: yosuke-furukawa