読者です 読者をやめる 読者になる 読者になる

ninjinkun's diary

ninjinkunの日記

【翻訳】あなたが求めていたリアクティブプログラミング入門

あなたはリアクティブプログラミングと呼ばれる新しい方法が気になっている。 勉強するのは大変で、良い教材がないのでさらに難しい。私が勉強を始めたときは、まずチュートリアルを探した。見つけたのは一握りの実践的なガイドだけ、しかもそれらは表面をなぞっているだけで、リアクティブプログラミングのアーキテクチャ全体像を構築しようとしてはいなかった。ある関数を理解するのに、ライブラリのドキュメントは役に立たないことがある。 これを見て欲しい。

Rx.Observable.prototype.flatMapLatest(selector, [thisArg])

Projects each element of an observable sequence into a new sequence of observable sequences by incorporating the element's index and then transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence.

なんじゃこりゃ。

私は2冊の本を読んだが、片方は全体像のみを描き、もう片方は特定のFRPライブラリの使い方に注力していた。私は作りながら理解するという、困難な方法でリアクティブプログラミングを学習する羽目になった。Futuriceでの仕事では実際のプロジェクトで使用し、問題に当たったときは同僚にサポートしてもらった

学習の上で一番難しい部分は、FRPで考えるということだ。従来の典型的な命令型でステートフルなプログラミングから離れて、違うパラダイムで考えるように脳を強制しなくてはならない。私はこのようなガイドはインターネットで見つけられなかったので、FRPで考えるための実践的なガイドは世界にとって価値があると考えた。ライブラリのドキュメントは後から道を照らしてくれるだろう。このガイドが役に立つことを願う。

"関数型リアクティブプログラミング (FRP) とは何か?"

インターネットには良くない説明と定義が溢れている。Wikipediaは案の定一般的で理論的すぎる。 Stackoverflowの模範的解答は新人向きではない。Reactive Manifestoはプロジェクトマネージャやビジネス系の人間に見せるもののようだ。"Rx = Observables + LINQ + Schedulers" はとても重たく、我々を混乱させるマイクロソフト的なものだ。"リアクティブ" や "変更の伝搬" といった言葉は、あなたの知っている典型的なMV*や、あなたのお気に入りの言語が既にやっていることとFRPとは何が違うのかを伝えていない。もちろん私のフレーワークのビューはモデルに反応する。もちろん変更は伝搬される。そうでなければ何も描画されないはずだ。

戯言はやめにしよう。

FRPは非同期データストリームを用いるプログラミングである ( FRP is programming with asynchronous data streams)

見方によれば、これは別に新しいものではない。イベントバスやクリックイベントは本物の非同期イベントストリームで、観測したり、副作用を実行したりできる。FRPはそのようなアイデアステロイド剤なのだ。データストリームはクリックやホバーイベントだけではなく、何からでも作れる。ストリームは安価でどこにでもある。変数、ユーザー入力、プロパティ、キャッシュ、データ構造などなど何でもストリームにできる。例えば、Twitterのフィードをクリックと同じ方法でデータストリームにできると想像してみてほしい。ストリームをlistenして、それに応じて反応することができる。

その上、ストリームを合成、作成、フィルタする驚くべき関数の道具箱が手に入る。そこには "関数型" のマジックが働いている。ストリームは他のストリームの入力としても使う事ができる。複数のストリームを入力に使う事も可能だ。二つのストリームをmergeできる。あなたの興味があるイベントだけをfilterして違うストリームを得ることもできる。あるストリームからデータの値を他の新しいストリームにmapすることもできる。

ストリームがFRPの中心だとして、注意深く見ていこう。おなじみの "ボタンのクリック" イベントストリームから始めることになる。

Click event stream

ストリームとは、時間順に並んだ進行中のイベントの列だ。ストリームは、(何かの型の) 値、エラー、"完了" という3つのシグナルを発行できる。"完了" が発行される場合を考えると、例えばボタンを含んだウィンドウやビューが閉じられる場合が考えられる。

発行される3つのイベントは、値が出力された際に実行される関数、エラーが出力された際に実行される関数、'完了'が出力された際に実行される関数を定義することによって、非同期にキャプチャする。値のみに注目している場合には後の二つは割愛する場合もある。ストリームを "listening" することは subscribing と呼ばれる。この関数群をobserverと定義する。ストリームはobserveされるsubject (もしくは "observable") である。これの正確な定義はObserver Design Patternにある。

ダイアグラムを描画する他の手段としてはASCIIを使う方法があり、このチュートリアルのいくつかの部分で使っていく。

--a---b-c---d---X---|->

a, b, c, d are emitted values
X is an error
| is the 'completed' signal
---> is the timeline

既に見たものと似ているので退屈しないで欲しいのだが、新しい要素として、オリジナルのクリックイベントストリームから変換された新しいクリックイベントストリームを作るつもりだ。

始めに、何回クリックされたかを表すカウンターストリームを作ろう。普通のFRPライブラリでは、mapfilterscanなどのように、ストリームに対して使える関数が沢山用意されている。この関数群からclickStream.map(f)のように一つを適用すると、関数はクリックストリームをベースとした新しいストリームを返す。これはどんな場合でもオリジナルのクリックストリームを変更することはない。これは不変性と呼ばれる、FRPストリームにつきものな性質だ。美味しいパンケーキにシロップがつきものなようにね。これにより、clickStream.map(f).scan(g)のようにチェーンできる。

  clickStream: ---c----c--c----c------c-->
               vvvvv map(c becomes 1) vvvv
               ---1----1--1----1------1-->
               vvvvvvvvv scan(+) vvvvvvvvv
counterStream: ---1----2--3----4------5-->

map(f)関数は定義した関数fに従って出力された値を (新しいストリームの中で) 置き換える。我々のケースでは、各クリックを1という数字にマップすることになる。scan(g)関数はストリームのそれまでの値をまとめ上げて、 x = g(accumulated, current) として値を返す。この例ではgは単純な加算である。そして、クリックが発生した際に、counterStreamが総クリック数を出力する。

FRPの本当の力を見せるために、"ダブルクリック" イベントのストリームについて考えよう。この話でより面白いのは、トリプルクリックのストリームについても、ダブルクリックや他の複数回クリック (2回かそれ以上) と同じように考えられることだ。息を深く吸い込んで、伝統的な命令的でステートフルな方法でどう実現するかを想像してみよう。扱いづらく、状態を保持するために幾つかの変数を必要とし、さらにタイムインターバルを操るものになると思う。

FRPのそれはもっとシンプルだ。実際、ロジックはたった4行のコードなのだ。しかし今はコードのことは置いておこう。あなたが初心者でもエキスパートでも、ストリームの構築を理解するためには、ダイアグラムで考えるのが一番だ。

Multiple clicks stream

グレーのボックスはストリームを他のストリームに変換する関数だ。まずクリックをリストに積み上げていくのだが、250ミリ秒の "イベントが無い時間" が発生する (これがbuffer(stream.throttle(250ms))のやっていることだ。ここでは詳細を理解しなくていい。ただのFRPのデモなのだ)。結果はリストのストリームなので、そこに map()を適用して、対応するリストの長さを整数にして、リストにmapする。最後にfilter(x >= 2)関数で1の場合を無視するようにする。3つの操作で欲しかったストリームが得られた。これをsubscribe ("listen")して、望むように反応する処理を書ける。

このアプローチの美しさを楽しんで欲しい。この例は氷山の一角だ。例えば、同じ操作をAPIレスポンスのストリームなど違う種類のストリームに適用できるし、他にもたくさんの関数が使える。

"なぜFRPの導入を検討するべきなのか"

FRPはコードの抽象のレベルを引き上げる。巨大な実装の詳細に常にうんざりさせられることなく、ビジネスロジックを定義するイベントの相互関係に集中できる。FRPを使ったコードはより簡潔になるだろう。

モダンなWebアプリやモバイルアプリは、データイベントに関連したUIイベントが多数あり、高度にインタラクティブなので、この利点がより明確だ。10年前、Webページのインタラクションと言えば、長いフォームをバックエンドに送って、フロントエンドはシンプルに描画するだけだった。アプリはよりリアルタイム性を持つように進化している。一つのフォームフィールドの変更は、自動的にバックエンドでの保存を引き起こす。コンテンツを "いいね" すると、リアルタイムで接続している他のユーザーに反映される。

現代のアプリは、高度にインタラクティブな体験ユーザーに与えるために、多数のリアルタイムイベントを扱っている。我々はこれを適切に取り扱うツールを探しており、リアクティブプログラミングがその答えなのだ。

例で見るFRPの考え方

現実の事柄に飛び込んでみよう。FRPでどう考えるかについてステップ・バイ・ステップでガイドしてくれる、現実世界の実例だ。模造の例では無く、中途半端なコンセプトでも無い。このチュートリアルが終わる頃には、どうしてそうするのかを理解しながら、実際に動くコードを生み出すことになるだろう。

私はツールとしてJavaScriptRxJSを選んだ。なぜなら、JavaScriptはこの時点で最もよく知られている言語で、Rx*ライブラリファミリー は多くの言語とプラットフォームで広く利用可能だからだ (.NET, JavaScala, ClojureJavaScript, Ruby, PythonC++Objective-C/Cocoa, Groovy、等々)。使うツールがなんであれ、次のチュートリアルは具体的に役立つだろう。

"Who to follow" 推薦ボックスを実装する

Twitterには、フォローするアカウントを推薦するUIエレメントがある。

Twitter Who to follow suggestions box

我々はこのコア機能を真似することに集中する。コア機能は

  • 起動時にAPIからアカウントのデータをロードし、3人の候補を表示する
  • "更新" が押されたら、3人の他の候補をロードして、3つの行に挿入する
  • 'x'ボタンが押されたら、その候補だけをクローズして別の候補を表示する
  • それぞれの行は候補のアバターを表示して、彼らのページにリンクする

となる。他の機能とボタンは、重要でないので考えないことにする。そしてTwitterの代わりに、似たようなAPIで認証なしで使えるもの使う。GithubのユーザーをフォローするUIを作ろう。ユーザーを取得するGithub APIがあるのだ。

先に試してみたければ、完全なコードは http://jsfiddle.net/staltz/8jFJH/48/ にある。

リクエストとレスポンス

この問題にどうやってFRPで取り組むのか? そう、これから始めよう。全てがストリームにできる。これがFRPマントラだ。簡単な機能から始めよう。"起動時にAPIから3人のアカウントデータをロードする"。特別なものは何もない。単純に (1) リクエストを発行し (2) レスポンスを受けとり (3) レスポンスを描画する。ではリクエストをストリームとして表現してみよう。初めの方では、これはやり過ぎだと感じるかもしれないが、我々はまず基本から始めたいと思う。いいね?

最初は1つのリクエストからスタートする。これを1つのデータストリームとしてモデル化すると、1つの出力される値を持つストリームとなる。後でまた複数のリクエストを取り扱うが、まずは1つだけだ。

--a------|->

Where a is the string 'https://api.github.com/users'

これがリクエストしたいURLのストリームだ。リクエストイベントが発生すると、このストリームはいつ実行するかと、何をリクエストするかを問い合わせてくる。いつ実行するかは、イベントが発行された時だ。そして何をリクエストするかは出力された値、すなわちURLを含む文字列だ。

単一の値のストリームを作るのは、Rx*ではとてもシンプルだ。公式な用語ではこのようなストリームは "Observable" と言い、事実これは観測可能なものだ。しかし私はこれを馬鹿げた名前だと感じるので、ストリームと呼ぶ。

var requestStream = Rx.Observable.returnValue('https://api.github.com/users');

しかしこのままではただの文字列のストリームなので、値が出力された時に何かしなければならない。これはストリームをsubscribingすることで実現できる。

非同期リクエストをハンドリングするために、jQuery Ajax callbackを使っていることに注意して欲しい。しかしちょっと待ってもらいたい。FRP非同期データストリームを取り扱うものだ。リクエストに対するレスポンスもストリームにできるのではないか?そう、できる。コンセプトレベルではこのようになる。さあやってみよう。

requestStream.subscribe(function(requestUrl) {
  // execute the request
  var responseStream = Rx.Observable.create(function (observer) {
    jQuery.getJSON(requestUrl)
    .done(function(response) { observer.onNext(response); })
    .fail(function(jqXHR, status, error) { observer.onError(error); })
    .always(function() { observer.onCompleted(); });
  });

  responseStream.subscribe(function(response) {
    // do something with the response
  });
}

Rx.Observable.create() は、observer (他の言葉で言えば "subscriber") にデータイベント (onNext()) やエラー (onError()) の情報を伝えることによって、カスタムストリームを作り出す。これはjQuery Ajax Promiseをラップしているだけではないのか。もしかしてPromiseはObservableという意味なのか?

         

Amazed

イエス。

ObservableはPromise++だ。Rxではvar stream = Rx.Observable.fromPromise(promise)のように、Promiseを簡単にObservableに変換できる。唯一の違いは、ObservableはPromises/A+規約に則っていないことだが、コンセプトレベルでは衝突していない。Promiseは一つの発行される値のみを扱う、シンプルなObservableなのだ。FRPストリームはpromiseの枠を超えて、複数の返値を扱うことができる。

これは素晴らしいことだ。そしてFRPが少なくともPromiseと同じくらい強力であることを示している。もしあなたがPromise厨なら、FRPにできることをよく見ておくといい。

元の例に戻ろう。気がつきやすい人ならsubscribe()コールを他の内部でも使うと、コールバック地獄と同じことが起こるのに気づくだろう。responseStreamの生成もまたrequestStreamに依存している。前に聞いたように、FRPは変換したり新しいストリームを生成するシンプルなメカニズムを持っているので、そのようにしてみよう。

基本的な関数の中で今最も知るべきなのはmap(f)だ。これはstream Aから値を受けとって、f()を適用して、stream Bの値を生み出す。リクエストとレスポンスに対して行おうとするなら、リクエストURLを (ストリームに偽装した) レスポンスPromiseにmapできる。

var responseMetastream = requestStream
  .map(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

そして我々は "メタストリーム" と呼ばれる獣、ストリームのストリームを作り出すことになる。慌ててはいけない。メタストリームは出力された値それぞれが別のストリームになっている。これはポインターとして考えればよい。出力された値それぞれが別のストリームへのポインターなのだ。この例では、リクエストURLそれぞれが、対応するレスポンスを含むpromiseストリームにmapされている。

Response metastream

レスポンスに対するメタストリームはややこしくて、役に立たないように見える。欲しいのはレスポンスに対するシンプルなストリームで、JSONに対する'Promise'等ではなく、出力される値がJSONオブジェクトになっているものだ。ここでミスターFlatmapにご登場願おう。これはメタストリームを "平滑化する" ようなmap()の変形で、"幹" のストリームで出力されたすべてを、"枝" のストリームで出力されるようにする。メタストリームはバグなどではないし、Flatmapもそれを取り繕って修正するものではない。FRPで非同期レスポンスを扱うためのツールなのだ。

var responseStream = requestStream
  .flatMap(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

Response stream

いいね。レスポンスストリームがリクエストストリームに対応して定義されているので、もしこの後で複数のイベントがリクエストストリームで発生しても、レスポンスストリームでは対応したレスポンスイベントが、期待通りに発生する。

requestStream:  --a-----b--c------------|->
responseStream: -----A--------B-----C---|->

(lowercase is a request, uppercase is its response)

ついにレスポンスストリームを手に入れた。受けとったデータを描画しよう。

responseStream.subscribe(function(response) {
  // render `response` to the DOM however you wish
});

今までのコードを繋げるとこうなる。

var requestStream = Rx.Observable.returnValue('https://api.github.com/users');

var responseStream = requestStream
  .flatMap(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

responseStream.subscribe(function(response) {
  // render `response` to the DOM however you wish
});

更新ボタン

まだレスポンスJSONには100人のユーザーが含まれていることを伝えていなかった。APIはページオフセットのみを許可していて、ページサイズは指定できないので、我々は3つだけデータオブジェクトを使って、残りの97個は捨てることになる。この問題については今は目をつぶろう。後でレスポンスをキャッシュする方法についても見ていく。

更新ボタンがクリックされる度、リクエストストリームは新しいURLを出力して、新しいレスポンスが得られる。このためには2つのことが必要だ。それは、更新ボタンのクリックイベントのストリーム (マントラ: 全てがストリームにできる) と、更新クリックストリームによってリクエストストリームを変更することだ。喜ばしいことに、RxJSはイベントリスナーからObservableを生成するツールがある。

var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');

更新クリックイベントは、それ自体がAPI URLを運んでくれるわけでない。クリックを実際のURLにmapする必要がある。リクエストストリームを変更して、これを更新クリックストリームにする。これは毎回ランダムなオフセットパラメーターが付与されたAPIエンドポイントを返すようにmapされている。

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

私は愚かで自動テストもしていなかったので、前に作った機能を壊してしまった。起動時には何のリクエストも発生せず、更新ボタンをクリックしたときだけ発生するようになってしまった。ううむ。更新がクリックされた際のリクエストと、Webページが開かれた際のリクエスト、両方の振る舞いが必要だ。

それぞれのケースに対応した、別々のストリームを作る方法は既に知っている。

var requestOnRefreshStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

var startupRequestStream = Rx.Observable.returnValue('https://api.github.com/users');

しかしこの2つどのように1つに "マージ" すれば良いのだろう?そう、merge()があるのだ。ダイアグラム表現で説明するとこのようになる。

stream A: ---a--------e-----o----->
stream B: -----B---C-----D-------->
          vvvvvvvvv merge vvvvvvvvv
          ---a-B---C--e--D--o----->

今やこれは簡単にできる。

var requestOnRefreshStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

var startupRequestStream = Rx.Observable.returnValue('https://api.github.com/users');

var requestStream = Rx.Observable.merge(
  requestOnRefreshStream, startupRequestStream
);

中間ストリームを抜きにして、違った方法でより綺麗な書き方だと、このようになる。

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  })
  .merge(Rx.Observable.returnValue('https://api.github.com/users'));

もっと短くて読みやすくするとこうだ。

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  })
  .startWith('https://api.github.com/users');

startWith() 関数はあなたが思っている通りの関数だ。入力ストリームが何であれ、startWith(x)が実行された出力ストリームは、最初はxから始まる。私はまだ充分にDRYではなく、APIエンドポイントの文字列を繰り返してしまっている。これを修正する一つの方法は、startWith()refreshClickStreamの近くに動かして、更新クリックを起動時に"エミューレート"することだ。

var requestStream = refreshClickStream.startWith('startup click')
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

いいね。"自動テストを壊してしまった" ところまで戻って考えると、最後のアプローチとの唯一の違いは、startWith()を追加したことだとわかるはずだ。

3つの候補をストリームでモデリングする

これまで、responseStreamのsubscribe()した際に描画する推薦 UIエレメントのみ扱ってきた。今更新ボタンについて考えてみると、問題がある。それは'更新'ボタンをクリックしてすぐには、現在の3つの候補が消えないことだ。新しい候補はレスポンスが到着してからやって来る。しかしUIを良い感じにするためには、現在の候補を更新が押された時に消す必要がある。

refreshClickStream.subscribe(function() {
  // clear the 3 suggestion DOM elements
});

違う。急いではいけないよ、相棒。これは良くない。なぜなら、候補DOMエレメントに影響する (他の1つもresponseStream.subscribe()を実行しているため)、2つのsubscriberがあり、これは関心の分離に反しているように見える。FRPマントラを覚えているかい?

       

Mantra

出力される値が候補データを含むJSONオブジェクトのストリームになるように、候補をモデル化してみよう。これを3つの候補それぞれに行う。候補のストリーム #1 はこんな感じだ。

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // get one random user from the list
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  });

他のsuggestion2Streamsuggestion3Streamは単純にsuggestion1Streamをコピペすれば良い。これはDRYではないが、このチュートリアルの例をシンプルに保つためと、こういうケースで繰り返しをどう避けるかを考える良い問題になると思うので、このままにしておく。

responseStreamのsubscribe()で描画する方法に代わって、こうやるようにする。

suggestion1Stream.subscribe(function(suggestion) {
  // render the 1st suggestion to the DOM
});

"更新の際に候補を消す" というところに戻ると、シンプルに更新クリックを'null'候補データにmapすれば良く、suggestion1Streamに含めるとこのようになる。

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // get one random user from the list
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  })
  .merge(
    refreshClickStream.map(function(){ return null; })
  );

そして描画の際には、nullを "データが無い" ものとして実装し、UIエレメントを非表示にする。

suggestion1Stream.subscribe(function(suggestion) {
  if (suggestion === null) {
    // hide the first suggestion DOM element
  }
  else {
    // show the first suggestion DOM element
    // and render the data
  }
});

今の全体像はこうだ。

refreshClickStream: ----------o--------o---->
     requestStream: -r--------r--------r---->
    responseStream: ----R---------R------R-->
 suggestion1Stream: ----s-----N---s----N-s-->
 suggestion2Stream: ----q-----N---q----N-q-->
 suggestion3Stream: ----t-----N---t----N-t-->

Nnullを表している。

おまけとして、起動時に "空の" 候補を表示してみたい。これは候補ストリームにstartWith(null)を追加することで実現できる。

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // get one random user from the list
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  })
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);

その結果はこうなる。

refreshClickStream: ----------o---------o---->
     requestStream: -r--------r---------r---->
    responseStream: ----R----------R------R-->
 suggestion1Stream: -N--s-----N----s----N-s-->
 suggestion2Stream: -N--q-----N----q----N-q-->
 suggestion3Stream: -N--t-----N----t----N-t-->

候補のクローズとレスポンスキャッシュの使用

まだ実装していない機能が一つ残っていた。それぞれの候補は、自身をクローズして他の候補を同じ位置に表示するための、'x'ボタンを持っている。まず、どのクローズボタンがクリックされても新しいリクエストが作られるものを考える。

var close1Button = document.querySelector('.close1');
var close1ClickStream = Rx.Observable.fromEvent(close1Button, 'click');
// and the same for close2Button and close3Button

var requestStream = refreshClickStream.startWith('startup click')
  .merge(close1ClickStream) // we added this
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

これは動かない。これは1つをクリックしただけなのに、クローズして全ての候補を再読み込みする。この問題を解決する方法は色々あるが、面白さを保つためにも、先ほどのレスポンスを再利用して解決してみる。APIレスポンスのページサイズは100人のユーザー分あるが、我々は3人分しか使っていない。そこにはまだ豊富な新しいデータがある。追加のリクエストをする必要は無い。

もう一度、ストリームで考えてみよう。'close1'クリックイベントが発生したとき、レスポンスのリストの中から1人のランダムなユーザーを得るため、このようにresponseStream最も最近出力されたレスポンスを使いたい。

    requestStream: --r--------------->
   responseStream: ------R----------->
close1ClickStream: ------------c----->
suggestion1Stream: ------s-----s----->

Rx*ではcombineLatestという、ストリームを組み合わせるための関数がある。これこそ我々が欲しかったもののようだ。これは2つのストリームAとBを入力として受けとり、どちらかのストリームが値を出力すると、combineLatestabから最も最近出力された2つの値を合わせて、定義した関数fを適用して1つの値c = f(x,y)を出力する。これはダイアグラムで説明するのが良い。

stream A: --a-----------e--------i-------->
stream B: -----b----c--------d-------q---->
          vvvvvvvv combineLatest(f) vvvvvvv
          ----AB---AC--EC---ED--ID--IQ---->

where f is the uppercase function

combineLatest()をclose1ClickStreamresponseStreamに適用して、close 1ボタンがクリックされた時、suggestion1Streamの最新のレスポンス出力を受けとって新しい値を生成するようにする。一方combineLatest()は対称性を持っている。それは、responseStreamから新しいレスポンスが出力されたとき、combineLatest()は最新の'close 1'クリックと新しい候補の生成を結びつけるというものだ。これは面白い。なぜならこのように先ほどのsuggestion1Streamのコードもシンプルになるからだ。

var suggestion1Stream = close1ClickStream
  .combineLatest(responseStream,
    function(click, listUsers) {
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);

パズルの中で1ピースだけ欠けているものがある。combineLatest()は2つのソースの最新の値を使う。しかし片方のソースがまだ値を出力していなかったら、combineLatest()は出力ストリームにデータイベントを生成できなくなってしまう。上記のASCIIダイアグラムを見れば、1つ目のストリームが値aを出力した時点では、出力は何もないことがわかるだろう。2つ目のストリームがbを出力したときのみ、出力の値が生成されるのだ。

この問題を解決するためには異なる方法がある。そして最もシンプルなものを選んでいきたい。それは、起動時に'close 1'ボタンのクリックをシミュレートすることだ。

まとめ

これでやり終えた。完全なコードはこのようになった。

var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');

var closeButton1 = document.querySelector('.close1');
var close1ClickStream = Rx.Observable.fromEvent(closeButton1, 'click');
// and the same logic for close2 and close3

var requestStream = refreshClickStream.startWith('startup click')
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

var responseStream = requestStream
  .flatMap(function (requestUrl) {
    return Rx.Observable.fromPromise($.ajax({url: requestUrl}));
  });

var suggestion1Stream = close1ClickStream.startWith('startup click')
  .combineLatest(responseStream,
    function(click, listUsers) {
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);
// and the same logic for suggestion2Stream and suggestion3Stream

suggestion1Stream.subscribe(function(suggestion) {
  if (suggestion === null) {
    // hide the first suggestion DOM element
  }
  else {
    // show the first suggestion DOM element
    // and render the data
  }
});

動作する例はここで見ることができる http://jsfiddle.net/staltz/8jFJH/48/

このコードのかけらは小さいが、密度は高い。複数のイベントを、関心の分離とレスポンスキャッシュによって管理しているのが特徴だ。関数型のスタイルによって、コードは命令的ではなく、より宣言的なものになる。実行する命令列を与えるのではなく、ストリーム間の関係を定義することにより、これは何であるかを伝えるだけで良くなる。例えば、FRPではコンピューターにsuggestion1Streamは'close 1'ストリームと最新のレスポンスから、1人のユーザーを組み合わせるように伝える。再読み込み時とプログラムの起動時にはnullが入る。

さらに印象的なのは、ifforwhileなどの制御フローの要素と、JavaScriptアプリケーションでよく見られる典型的なコールバックベースの制御フローが存在しないのだ。subscribe()の中では、ifelsefilter()を使って取り除くことができる (実装の詳細は課題とするので省く)。FRPには、mapfilterscanmergecombineLatest, startWithや、イベント駆動プログラムのフローを制御する、多数のストリーム関数がある。このツールセットは、少ないコードでより多くのパワーを与えてくれる。

次に来るもの

もしRx*がリアクティブプログラミングに適したライブラリと考えるなら、ぜひ時間をとって、Observableを変換、組み合わせ、生成するために、この長い関数のリストに精通して欲しい。ストリームのダイアグラムでこれらの関数を学びたいなら、マーブルダイアグラムを使ったRxJavaのとても有益なドキュメントを見るといい。どの様にすれば良いのかわからなくなったときは、ダイアグラムを描いて、その上で考え、関数の長いリストを眺め、また考えるのだ。このワークフローは私の経験上効果的だった。

Rxでのプログラミングのコツを理解したければ、Cold vs Hot Observablesのコンセプトの理解が絶対に欠かせない。これは無視しても戻ってきて、容赦なく噛みつくだろう。あなたは警告を受けたのだ。本当の関数型プログラミングを学んでスキルを磨き、Rx*で影響の出る副作用などの問題に精通することだ。

しかし関数型リアクティブプログラミング(FRP)はRxそのものではない。直感的に動き、Rxで直面した変な挙動を取り除いたBacon.jsというものがある。Elm言語は独自の世界を築いており、これはJavaScript+HTML+CSSコンパイルでき、タイムトラベリングデバッガーを備えているFRP言語なのだ。めっちゃ凄いね。

FRPはイベントだらけのフロントエンドとアプリでとても有用だ。しかしこれはクライアントサイドの話で、FRPはバックエンドとデータベース周りでも有用だ。実際、RxJavaはNetflixのAPIで、サーバーサイドの平行処理を可能にするキーコンポーネントになっている。FRPは特定のアプリケーションや言語に制限されるフレームワークではない。これは実のところ、イベント駆動なソフトウェアのプログラミングならどこでも使えるパラダイムなのだ。

もしこのチュートリアルが役に立ったら、Tweetしてみてくれ

         

staltz commented on Jul 1

関数型リアクティブプログラミングリアクティブプログラミングという言葉の周辺にはたくさんの混乱がある。[1][2]

すまない、私の落ち度だ。この種の混乱は新しいコンピューティングのパラダイムでは容易に起こり得ると思っている。このチュートリアルで "FRP" が出現している所は全て "RP" に置き換えて欲しい。関数型リアクティブプログラミングはリアクティブプログラミングの派生で、参照透過性と純粋関数型の追求ような関数型プログラミングの原則に従っているものだ。他の人々が私より良い説明をしてくれている。[3][4][5]