POSTD PRODUCED BY NIJIBOX

POSTD PRODUCED BY NIJIBOX

ニジボックスが運営する
エンジニアに向けた
キュレーションメディア

本記事は、原著者の許諾のもとに翻訳・掲載しております。

(編注:2020/08/18、いただいたフィードバックをもとに記事を修正いたしました。)

本記事は、Tang Liu(tl@pingcap.com)が「Bay Area Rust Meetup August 2017」で行った講演内容をまとめたものです。 動画はこちらからご覧いただけます

講演者の自己紹介

皆さん、こんにちは。今回、この会議に参加できて大変うれしいです。Rustチームに感謝します。

本日は「RustにおけるfutureとgRPC」というテーマでお話しします。本題に入る前に、簡単な自己紹介をさせていただきます。私は、PingCAPで主任エンジニアを務めているSiddon Tangと申します。GitHubのユーザ名はsiddontangです。次世代SQLデータベース TiDB や、分散キーバリューストア TiKV の開発に携わっています。ちなみに、TiKVもRustで書かれています。私はオープンソース愛好家でもあり、LedisDB、go-mysql、go-mysql-elasticsearch、rust-prometheusなどのオープンソースプロジェクトを手掛けています。

今日はまず、非同期について簡単に触れた後、Rustにおけるfutureの話をします。もちろん、皆さんはどちらもよくご存じだと思いますので、手短に済ませるつもりです。その後gRPCについてお話しして、最後に、Rustでfutureを用いてgRPCをラップする方法をご説明します。

非同期プログラミング

では始めましょう。非同期についてです。

なぜ同期にしないのか

そもそも、なぜ同期にしないのでしょうか。私たちの誰もが承知しているように、同期プログラミングの方が簡単ですよね。サービスの負荷が低い場合は、同期を使った方がよいかもしれません。並行処理を扱うには、いくつかのスレッドを開始するだけです。

しかし、データベースなど高いパフォーマンスが求められるサービスをサポートしたい場合、同期では対応し切れません。同期I/Oは実行をブロックすることがあり、そうなるとパフォーマンスが低下してしまいます。スレッドを使うことはできますが、スレッドは重いのでシステムリソースを食います。その上、頻繁なスレッド切り替えは効率が悪く、パフォーマンスの大幅な低下につながるのです。

なぜ非同期にするのか

そこで、私たちは非同期を採用しました。

非同期プログラミングではブロッキングが発生しませんので、遅いI/Oを待つ必要がなく、他の処理を行うことができます。I/Oの準備ができたらシステムから通知が来て、I/Oの処理を再開できるのです。これは非常に効率が良いため、パフォーマンスは高くなります。しかし、ご覧のように、非同期方式の方がはるかに複雑であり、コードを正確に書くのが大変です。I/Oの準備ができていない時点でコードロジックを分割し、別の処理に切り替えなくてはなりません。

コールバック地獄

auto r = make_shared<int>();
do_async_foo() {
    do_foo_a(|| {
    do_finish(|| {
        *r = 10;
    })
  })
})

時には、I/Oや他の非同期通知を扱うのにコールバック機構を使う必要があり、上記のようなコールバック地獄に陥るかもしれません。コールバックの連鎖ですね。

コルーチンで簡単に書く

もちろん、コールバックだらけのコードを書かなければならないとしたら悲惨な作業になるでしょうが、幸い、それを回避する方法が少なくとも2つあります。1つはコルーチンです。

コルーチンは多くの言語でサポートされている軽量スレッドです。例えば、C++のboost coroutineライブラリやWebChat libcoライブラリ、Pythonのyieldやgreenlet、そしてもちろん、Goのgoroutineがあります。

上記はGoでgoroutineとチャネルを使う単純な例です。この2つの便利な機能が備わっているおかげで、パフォーマンスの高い並行処理コードを簡単に書くことができます。Goの人気が最近どんどん高まっている主な理由はそこにあると、個人的には思っています。

もう1つの方法、future

let future = do_async( future() )
    .then( future_a() )
    .then( future_b() )
    .then( future_c() );
future.wait();

コルーチンが提供されていない言語もありますが、その場合に役立つのがfutureです。futureとはpromiseのような機能です。futureを解決しようとする時、今はまだ結果が出ておらず読み取れなくても、futureがのちに実行されてから結果を取得することができます。

futureの実行が完了するまで待機することもできますし、複数のfutureを1つのチェーンにすることも可能です。

Rustにおけるfuture

では、Rustにおけるfutureはどうなっているでしょうか。

Rustでは、futureは Alex のおかげですでにサポートされています。Alexに感謝します。

Rustのfutureはトレイトがベースとなっており、ゼロコストです。つまり、余分なヒープ割り当てや動的ディスパッチの必要がないということです。futureは使いやすく、多数のfutureを1つのチェーンにすることができますが、その際にはIterator APIのようなコンビネータを使います。

futureはコールバック駆動型ではなく要求駆動型であり、futureの準備ができているかどうかはfutureに明示的にポーリングして確認します。コールバックがないのでヒープ割り当ても回避できますし、futureを簡単に取り消すことも可能です。

future

futureはトレイトであり、その中心となる関数はpollです。

pub trait Future {
    type Item;
    type Error;
    // check the future is resolved, return Ready or NotReady
    fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
    // wait until the future is resolved
    fn wait() -> result::Result<Self::Item, Self::Error>;
}

pub type Poll<T, E> = Result<Async<T>, E>;
pub enum Async<T> {
    Ready(T),
    NotReady,
}

カスタマイズしたfutureには、pollを実装しなくてはなりません。pollから NotReady が返されたら、futureは準備ができていないという意味なので、後でポーリングします。 Ready が返されたら、futureは完了しているので、その結果を取得できます。また、futureの完了を明示的に待機することも可能です。waitを呼び出した場合、実行はfutureが完了するまでブロックされます。

Futureの例

以下はfutureの非常に単純な例です。 ok futureでは、 waitReady を返し、その結果は 1 となります。 empty futureでは、 pollNotReady を返します。

let f = ok::<u32, u32>(1);
assert_eq!(f.wait().unwrap(), 1);

let mut f = empty::<u32, u32>();
assert_eq!(f.poll(), Ok(Async::NotReady));

コンビネータ

コンビネータを使って、futureをチェーンさせることができます。

let f = ok::<u32, u32>(1).map(|x| x + 1);
assert_eq!(f.wait().unwrap(), 2);

let f1 = ok::<u32, u32>(1);
let f2 = ok::<u32, u32>(2);
let (_, next) = f1.select(f2).wait().ok().unwrap();
assert_eq!(next.wait().unwrap(), 2);

例えば、 ok futureとmapコンビネータを組み合わせて使うことができ、その結果は 2 となります。 select を使って2つのfutureを待機することも可能です。どちらかのfutureの準備ができれば select futureは終了し、その結果と次に解決されるべきfutureを返します。

同期

futureライブラリでは2つの同期チャネルが提供されています。oneshotはシングルプロデューサ・シングルコンシューマ(SPSC)向け、mpscはマルチプロデューサ・シングルコンシューマ(MPSC)向けのチャネルです。どちらもスレッド間通信に使うことができます。

Stream

streamはfutureと似ており、streamの実行が完了するまで、値を1つずつ解決することができます。

pub trait Stream {
    type Item;
    type Error;
    // check the future is resolved, return Ready or NotReady
    // Ready(Some) means next value is on the stream
    // Ready(None) means the stream is finished
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
}

streamにポーリングして Ready Some が返されれば、streamの次の値を取得できるという意味です。 Ready None が返されれば、streamは完了したので再びポーリングすることはできないという意味です。

Sink

sinkは値を非同期的に送信する方法です。

pub trait Sink {
    type SinkItem;
    type SinkError;
    fn start_send(self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError>;
    fn poll_complete(&mut self) -> Poll<(), Self::SinkError>;
    fn close(&mut self) -> Poll<(), Self::SinkError>;
}

start_send を使うと1つの値を送信できます。 poll_complete ではsinkのバッファをクリアして、全ての値が送信されたかどうかを確認できます。 close はsinkを終了させる関数です。

Task

taskはfutureの処理を駆動するために使います。

# If the future is not ready?
let handle = task::current();

# If the event of interest occurs?
handle.notify();

# What to do after notify?
executor.poll(f);

futureの準備ができていない場合は、task current を使ってtaskハンドルを取得できます。task notify は準備ができたtaskを起床させるために使い、 executor はfutureに再びポーリングを行います。

以上が、Rustにおけるfutureの現状です。

gRPC

ではgRPCの話をしましょう。

サービスの開発をしたい時、まず決めるべきはクライアントとサービスの間の通信方法です。独自のプロトコルとRPCの実装も可能です。それは効率的ですが、一般的ではありません。HTTPプロトコルをベースにしたRESTful APIを直接使うこともできますが、パフォーマンスを優先し、多数のAPIを供給する必要がある場合には不便です。

なぜgRPCを使うのか

今では多くのユーザが、Googleが開発したRPCフレームワーク、gRPCを選ぶようになりました。多くの言語がgRPCをサポートしています。ユーザは容易に好きな言語を使ってサービスと通信できるのです。gRPCはプロトコルバッファを使ってバイナリデータを効率的にシリアライズ、デシリアライズします。gRPCにはリッチなインターフェースがあり、ケースによって単一通信、クライアントストリーミング、サーバストリーミング、双方向ストリーミングが使えます。gRPCはHTTP/2をベースにしていますので、利点が多いのです。データ暗号化にHTTP/2のSSLが利用できるのはその一例です。

HTTP/2

gRPCはHTTP/2をベースにしています。テキストプロトコルを使うHTTP/1と異なり、HTTP/2はバイナリプロトコルなのでパフォーマンスに優れていますが、直接読み取るのは困難です。幸い、Wiresharkなど多くのツールがすでにこのプロトコル解析をサポートしています。

HTTP/1では、リクエストを送信すると、応答を待ってから再度リクエストを同じ接続上で送らなければなりません。これは効率的ではありませんが、HTTP/2はstreamを使い、1度の接続で多重化を行います。

HTTP/2は優先度制御もサポートしていますので、重要なリクエストに先に対応できます。HTTP/2は制御フローもサポートしています。場合によってはいつ、どのようにデータを送信するかをコントロールできるのです。HTTP/2はHTTPヘッダの圧縮にHPACKを使い、ネットワーク転送サイズを削減しています。

gRPCのベースはHTTP/2

gRPCはHTTP/2ヘッダを使ってgRPC独自のヘッダを渡します。データ転送にはHTTP/2データフレームを使い、HTTP/2パス内にサービス名とメソッドをマッピングします。もちろん、HTTP/2のContent-Lengthヘッダはapplication/grpc + protoです。

以上がgRPCの簡単な概要です。次に進みましょう。

futureとgRPCを組み合わせる

言語によって様々なgRPC実装方法があります。残念ながらRust用のものは見当たりません。しかし、どうしてもTiKVでgRPCを使いたいので、自分たちで実装することにしました。

C gRPCのキーワード

Google gRPCにはC言語で書かれたコアライブラリがあります。他にC++、php、C#もCコアAPIを使用していますので、Rustで直接それを使うことにしました。

始める前に、Cコアのキーワードを知る必要があります。

  • コール: コールは、単一通信、クライアントストリーミング、サーバストリーミング、双方向ストリーミングといったgRPCの呼び出しを意味します。
  • チャネル: チャネルは接続を指します。
  • サーバ: サーバ内にサービスを登録できます。
  • 完了キュー: 最重要。Next関数で無限に完了キューを駆動します。

疑似フロー

C APIを使用した際のクライアント単一通信の疑似フローです。

  • 完了キューを作成。
  • クライアントチャネルを作成。
  • チャネルからコールを作成。
  • tag を使い、コールのバッチ内でオペレーション開始。
  • 完了キューを ポーリング してコールを実行、 tag を含むイベントを戻す。
  • tagを使って何らかの実装をする。

最初に完了キューを作ってからクライアントとサーバでチャネルを作成します。次にチャネルからコールを作り、関係するtagでコールのオペレーションを開始します。完了キューをポーリングし、コールが終了したところでイベントを取得します。そうすると、イベントから関連するtagを取得して、何らかの実装ができます。

見て分かるように、フローは非同期なので、futureを使ってgRPC C APIをラップし、外部利用の可能なエルゴノミックなAPIをサポートすることができます。

単一通信

単一通信については、クライアントがリクエストを送信し応答を待ちます。

クライアント:

let future = unary(service, method, request);
let response = future.wait();

クライアント側のAPIの定義はシンプルです。futureを直接返して後で応答を取得できます。

サーバハンドラ:

fn on_unary(context, request, response_sink) {
    context.spawn(|| {
        // do something with request
        response_sink.send(response)
    });
}

サーバ側では関連するサーバハンドラは上記のようになります。コンテキストがタスクを生成し、タスク内でリクエストを操作し、sinkを用いて応答を返します。

クライアントストリーミング

クライアントストリーミングでは、クライアントが多数のリクエストを送信して1つの応答を受け取ってからstreamingが終了します。

クライアント:

let (sink, future) = client_streaming(service, method);
sink = sink.send(request).wait().unwrap();
sink = sink.send(request).wait().unwrap();
……
let response = future.wait();

クライアント側では、sink + futureを返します。そしてsinkでリクエストを送信し、futureで応答を待ちます。

サーバハンドラ:

fn on_client_streaming(context, request_stream, response_sink) {
    context.spawn(||{
    request_stream.fold(|request| {
    // do something
}.and_then() {
    response_sink.send();
}})}

サーバ側ではstreamを使ってクライアントリクエストを受信し、sinkで応答を返します。

サーバストリーミング

サーバストリーミングでは、クライアントはまずリクエストを送信し、多数の応答を1つずつ受信します。

クライアント:

let stream = server_streaming(service, method, request);
stream.for_each(|response| {
    // do something
});

サーバハンドラ:

Server Handler:
fn on_server_streaming(context, request, response_sink) {
    context.spawn(||{
        response_sink.send();
    });
}

クライアント側では、streamを返して応答を受け取ります。サーバ側ではsinkを使って応答を返信します。

双方向ストリーミング

双方向ストリーミングでは、クライアントとサーバは多くのリクエストと応答を送信できます。

クライアント:

let (sink, stream) = duplex_streaming(service, method);
sink.send_all();
stream.for_each();

サーバハンドラ:

fn on_duplex_streaming(context, request_stream, response_sink) {
    context.spawn(||{
    let resps = request_stream.map().collect().flatten();
        response_sink.send_all(resps);
}
}

クライアント側ではsink + streamを返します。また、サーバ側でもstreamとsinkを使います。

単一通信futureの実装

Futureを使えば、sinkとstreamは容易にC gRPC コア APIをラップできます。詳しく見ていきましょう。ここではクライアント単一通信を例に取ってfutureとgRPCをどう組み合わせるかをやってみます。

クライアント単一通信

最初にチャネルからコールを作成してから、futureとtagを作ります。tagをC APIに渡す必要があるためで、ここではタグの寿命をgRPCで管理し、タグをBoxから解放できるよう Box::into_raw を使いました。次にCを呼び出してバッチAPIを開始させます。

let call = channel.create_call();
let (future, tag) = CallTag::batch_pair();
// create a tag and let gRPC manages its lifetime 
let tag_box = Box:new(tag);
let tag_ptr = Box::into_raw(tag_box) as _;
call_start_batch(…, tag_ptr);

単一通信future

futureの実装は簡単です。poll関数では、まず結果を持っているかどうかをチェックします。もしYesであれば、 Ready を返します。持っていない場合は、タスクがあるかどうかを確認します。タスクがない、あるいはタスクの will_notify_current がfalseの場合は、task currentを使ってハンドルを保存します。

fn poll(&mut self) -> Poll<T, Error> {
    let guard = self.inner.lock();
    if let Some(res) = guard.result.take() {
        let r = try!(res);
        return Ok(Async::Ready(r));
}
// Has not been finished yet, Add notification hook
if guard.task.is_none() || !guard.task.as_ref().unwrap().will_notify_current() {
    guard.task = task::current();
}
Ok(Async::NotReady)
}

futureを解決する

完了キュー内で、 next を使って次に完了したイベントを取得し、行から Box::from_raw を使い、再びtagを取ります。次にtagを解決し、コールの結果を設定します。タスクハンドルがあれば、通知するようにします。

完了キュー内:

let e = cq.next();
// Get the tag from gRPC again
let tag: Box<CallTag> = unsafe { Box::from_raw(e.tag as _) } ;
tag.resolve(&cq, e.success != 0);

resolve内:

let task = {
let mut guard = self.inner.lock();
guard.set_result(res) 
gurad.task.take();
};
task.map(|t| t.notify());

フローは非常に簡単で、クライアントストリーミング、サーバストリーミングと双方向ストリーミングも同様にできますので、ここでは説明しません。

ベンチマーク

futureでgRPCをサポートするととても効率的です。上記のとおり、多数のベンチマークを取ってみました。パフォーマンスの優秀さが分かるでしょう。

その他

すでにTiKVでgRPCを実用に使うようになって久しいです。順調に動いています。試したい方は、 github.com/pingcap/grpc-rs をチェックしてください。Cargoを使ってのgrpcioダウンロードも可能です。

以上です。ありがとうございました。