RxJavaのレシピ:Androidのイベントをストリームに変換する

問題

まず、これから解決していく問題を先に述べます。

イベント(例えばOSの通知やBluetoothの状態)を受け取る部分がアプリ内に複数あり、各部分に関係がある時にのみその情報を受け取りたい場合を想像してみてください。

  • アプリの最初のクライアントが関心を持った時にのみイベントリスナーが登録され、誰も関心を持たなくなれば、すぐに登録を解除する必要がある。
  • 新しいクライアントがイベントを購読する際、既存のクライアントに送信済みの前回のイベントを新しいクライアントがすぐに受け取れるようにして、全クライアントが常に同期するようにする。

今回の記事では、RxJavaの様々なコンポーネントを組み合わせることで、この問題を解決する道を探ろうと思います。本題に入る前に、SubjectReplay、そしてConnectable Observablesなどのコンセプトを理解しておいてください。今回の記事は、各コンポーネントのニュアンスを理解するのに役立つと思います。

ネタバレ注意:答えにたどり着くには、複数のオペレータが必要です。

例え話

ここで、例え話を使って問題を示してみます。


画像著作者:vectomart – Fotolia

次のような機能を持つ自動サウンドシステムがリビングリームにあるとしましょう。

  • 最初の1人が部屋に入ると、音楽がノンストップの状態で再生される。
  • 部屋に人がいる限り、音楽は再生され続け、全員が同じ曲順で同じ音楽を聴く。
  • 部屋から1人が立ち去っても音楽の再生は残った人たちのために続けられる。つまり立ち去った1人は単純にそれ以上、音楽を聴くことはない。
  • 最後の1人が部屋を出ると音楽は再生を止め、サウンドシステムの電源は自動で切れる。

解決策を調理するための準備

解決策は(クッキングレシピの要領で)順を追って紹介し、具材を追加する理由や、それぞれが解決しようとしている問題を説明していきます。

TL;DR 単純に解決策が知りたい場合は最後までスクロールし、内容を飛ばしてオペレータの順序を確認してください。

オーブンの予熱

RxJavaの最初の具材はRxRelayのBehaviorRelay(またの名を安全なBehaviorSubject)です。これは今回の料理完成に必要な機能の大半をカバーするもので、購読/停止イベントを提供し、Hot Observableと同じように複数のobserverに対してマルチキャストし、前回に発行された値を新しいobserverにリプレイします。

この具材を準備する手順は以下のとおりです。

  • doOnSubscribe()でListener、EventBus、あるいはその他のイベントプロデューサを登録する。
  • doOnUnsubscribe()でListener、EventBus、あるいはその他のイベントプロデューサを登録解除する。
  • プロデューサからの新規イベントを受け取ったら、全てのobserverに対してイベントを発行するようリレーを呼び出す。

onCompleteまたはonErrorを呼び出せないことを別にすれば、リレーは基本的にSubjectです。


画像著作者:reactivex.io

仕組み自体はこれで大丈夫そうですが、1つだけ問題があります。それはObservableを新たに購読する度にListenerに余分な登録が行われ、Observableの購読を停止する度に、たとえ他にも多数のobserverが依然としてObservableを購読中でも、Listenerから余分な登録解除が行われてしまうことです。

つまり、このレシピは並列する複数のobserverには使えないということになります。


上記のコードは次の出力を生成します。

BehaviorRelayProblem -> observer-1 subscribes
BehaviorRelayProblem -> doOnSubscribe
BehaviorRelayProblem -> observer-1 -> onNext with 1
BehaviorRelayProblem -> observer-2 subscribes
BehaviorRelayProblem -> doOnSubscribe <= 問題:不要な呼び出し
BehaviorRelayProblem -> observer-2 -> onNext with 1
BehaviorRelayProblem -> observer-1 -> onNext with 2
BehaviorRelayProblem -> observer-2 -> onNext with 2
BehaviorRelayProblem -> observer-1 unsubscribes
BehaviorRelayProblem -> doOnUnsubscribe <= 問題:不要な呼び出し
BehaviorRelayProblem -> observer-2 unsubscribes
BehaviorRelayProblem -> doOnUnsubscribe

新たなobserverの購読が、Listenerへの余分な登録を引き起こしています。

スパイスの追加

この問題に対するシンプルな対策はBehaviorRelayにshare()オペレータを組み合わせることです。これによりソースのObservableのdoOnSubscribe()メソッドは、最初のobserverがストリームを購読した時に1回だけ呼び出され、同様にdoOnUnsubscribe()は、最後のobserverがストリームの購読を停止した時に1回だけ呼び出されます。

share()オペレータはpublish().refCount()の別名です。


画像著作者:Netflixオリジナル作品

上記のコードは次の出力を生成します。

share -> observer-1 subscribes
share -> doOnSubscribe
share -> observer-1 -> onNext with 1
share -> observer-2 subscribes
<= 問題:呼び出しがないobserver-2 -> onNext with 1
share -> observer-1 -> onNext with 2
share -> observer-2 -> onNext with 2
share -> observer-1 unsubscribes
share -> observer-2 unsubscribes
share -> doOnUnsubscribe

前回に発行された値が新規のobserverにリプレイされていません。

完璧な料理

完成にかなり近づいていますが、あと一歩です。上記では2番目のobserverが購読した際、そのobserverは新しいイベントが発行されるまで何の値も取得できません。新たなobserverが購読をする時、それ以前のobserverに対してイベントは発行済みであることを考慮すると、新しいobserverに前回の値をすぐに発行できた方がいいでしょう。

この問題の答えは.share()をRxReplayingShareに置き換えることです。これでリプレイのキャッシュはReplayingShareオブジェクトの一部となるため、BehaviorRelayではなくPublishRelayを使うことになります。

ReplayingShareはRxJavaの変換器であり、replay(1)、publish()、そしてrefCount()オペレータを組み合わせます。


画像著作者:Jake Wharton 2016年


上記のコードは次の出力を生成します。

ReplayingShare -> observer-1 subscribes
ReplayingShare -> doOnSubscribe
ReplayingShare -> observer-1 -> onNext with 1
ReplayingShare -> observer-2 subscribes
ReplayingShare -> observer-2 -> onNext with 1
ReplayingShare -> observer-1 -> onNext with 2
ReplayingShare -> observer-2 -> onNext with 2
ReplayingShare -> observer-1 unsubscribes
ReplayingShare -> observer-2 unsubscribes
ReplayingShare -> doOnUnsubscribe

最終的な解決策はBehaviorSubjectにreplay(1)、publish()、そしてrefCount()を加えて構成することです。この他にもベターな代替手段としてPublishRelayにReplayingShareを組み合わせる方法もあります。

最後に

この方法が適用できるのは、例えばサービスによって生成されたイベントにアプリの一部が関係する時にのみ実行する必要があるようなAndroidのサービスで、それが誰も関心を持たなくなるまで要素を生成し続けた後、シャットダウンするような場合です。

個人的に、この問題を解決するために他の人がどういうレシピでRxJavaのコンポーネントを組み合わせるかというのにとても興味があります。RxJavaの具材を別のやり方でミックスしても、同様の結果を得ることはできるはずです。同じ結果にたどり着く別の調理法があれば、ぜひコメントでお聞かせください。

グルメ向けのコツ:最近導入されたObservable.fromEmitter()は、Listenerのregister/unregisterのユースケースをカバーするために設計されており、ここで述べた問題もfromAsyncで解決して同様の結果を得ることが可能です。

この記事を査読してくれたJolanda VerhoefOlaf Achthovenに感謝します。