この記事ではRxJSの「Cold Observable」と「Hot Observable」について
- 「Cold Observable」と「Hot Observable」の違い
- 「Cold Observable」の特徴とプログラム例
- 「Hot Observable」の特徴とプログラム例
などを図を用いて分かりやすく説明するように心掛けています。ご参考になれば幸いです。
「Cold Observable」と「Hot Observable」の違い
ObservableにはColdな性質とHotな性質があります。
Coldな性質を持つObservableは「Cold Observable」、Hotな性質を持つObservableは「Hot Observable」と呼ばれています。
ほとんどのObservableは「Cold Observable」です。「Cold Observable」を「Hot Observable」に変換しない限りは「Cold Observable」となります。
以下に「Cold Observable」と「Hot Observable」の違いを示します。
- Cold Observable
- subscribeされるまでデータを流さない受動的なObservable
- ストリームが枝分かれしない
- 1つの「Cold Observable」に対して複数subscribeした場合、それぞれに対して個別にストリーム(連続的なデータの流れ)が生成されて、割り当てられる。
- Hot Observable
- subscribeされなくてもデータを流す能動的なObservable
- ストリームを枝分かれさせる特徴を持つ
- 1つの「Hot Observable」に対して複数subscribeした場合、ストリームが分岐し、後続には同じデータを流す。
Cold Observableとは
Cold Observableの特徴を以下に示します。
Cold Observableの特徴
- subscribeされるまでデータを流さない
- それぞれのObserverに対して個別にデータを流す(ストリームが枝分かれしない)
上記の特徴について順番に説明します。
subscribeされるまでデータを流さない
「Cold Observable」はsubscribeされるまでデータを流しません。subscribeすることで、初めてデータが流れるようになります。subscribeしない場合、「Cold Observable」で流すデータは消滅してしまいます。
例えば、以下のプログラムでは「Cold Observable(cold$
)」をsubscribeしていないので、データが流れません。そのため、console.log
でのメッセージ(‘subscribe!!’
)も出力されないし、Math.random
メソッドによって流すデータも消滅してしまいます。
import { Observable } from 'rxjs';
// Cold Observable(cold$)を生成。
// Cold Observable(cold$)はobserverがsubscribeするまでデータが流れない。
const cold$ = new Observable((obserber) => {
console.log('subscribed!');
obserber.next(Math.random());
obserber.next(Math.random());
obserber.next(Math.random());
obserber.complete();
}
)
// ログ出力結果
// ログには何も表示されない
Cold Observableのプログラム例(subscribeされるまでデータを流さない)
以下のプログラムでは、3つのランダムな数字を流す「Cold Observable(cold$
)」を生成しています。この「Cold Observable(cold$
)」をsubscribeすることによって、データが流れ始めます。流れたデータはconsole.log
により出力しています。
import { Observable } from 'rxjs';
// Cold Observable(cold$)を生成。
// Cold Observable(cold$)はobserverがsubscribeするまでデータが流れない。
const cold$ = new Observable((obserber) => {
console.log('subscribed!');
obserber.next(Math.random());
obserber.next(Math.random());
obserber.next(Math.random());
obserber.complete();
}
)
// Cold Observable(cold$)が流すデータをobserverがsubscribeする。
// Cold Observable(cold$)はobserverがsubscribeすると、データが流れ始める。
cold$.subscribe((value) => console.log('1st:' + value));
// ログ出力結果
// subscribed!
// 1st:0.4162071714889599
// 1st:0.2171136394504538
// 1st:0.5395479011489133
それぞれのObserverに対して個別にデータを流す
1つのCold Observableに対して複数subscribeした場合、それぞれに対して個別にストリームが生成されて、割り当てられます。すなわち、Cold Observableはストリームを分岐させる特徴を持っていません。
Cold Observableのプログラム例(それぞれのObserverに対して個別にデータを流す)
以下のプログラムでは、3つのランダムな数字を流す「Cold Observable(cold$
)」を生成しています。この「Cold Observable(cold$
)」に対して、2つのObserverがsubscribeしています。それぞれに対して個別にストリームが生成されて、割り当てられるので、それぞれのObserverがsubscribeしたデータが異なります。
import { Observable } from 'rxjs';
// Cold Observable(cold$)を生成
// Cold Observable(cold$)はobserverがsubscribeするまでデータが流れない。
const cold$ = new Observable((obserber) => {
console.log('subscribed!');
obserber.next(Math.random());
obserber.next(Math.random());
obserber.next(Math.random());
obserber.complete();
}
)
// Cold Observable(cold$)が流すデータをobserverがsubscribeする。
// Cold Observable(cold$)はobserverがsubscribeすると、データが流れ始める。
cold$.subscribe((value) => console.log('1st:' + value));
// 1つのCold Observableに対して複数subscribeした場合、それぞれに別々のストリームが生成されて、割り当てられるので、値が異なる。
cold$.subscribe((value) => console.log('2nd:' + value));
// ログ出力結果
// subscribed!
// 1st:0.3504699269808764
// 1st:0.6635557092959816
// 1st:0.08736549608011779
// subscribed!
// 2nd:0.33503948630486957
// 2nd:0.09301964393071249
// 2nd:0.8749606152565355
以下のプログラムも見てみましょう。以下のプログラムでも「Cold Observable(cold$
)」に対して、2つのObserverがsubscribeしています。「Cold Observable(cold$
)」は「0→1→2→3→4」のデータを1秒ごとに流すObservableです。1秒後に「0」が流れ、2秒後に「1」が流れ、3秒後に「2」が流れ、4秒後に「3」が流れ、5秒後に「4」が流れます。
ObserverBはObserberAより2.5秒遅くsubcribeしているので、ObserberBがsubscribeするデータは「2→3→4」になるように感じますが、ObserberBがsubscribeするデータは「0→1→2→3→4」になります。これは、「Cold Observable(cold$
)」は「subcribeされるとデータを流す」かつ「それぞれのObserverに対して個別にデータを流す」という特徴があるからです。
import { interval, take } from 'rxjs';
// Cold Observable(cold$)を生成。
const cold$ = interval(1000).pipe(take(5));
// Cold Observable(cold$)が流すデータをobserverがsubscribeする。
cold$.subscribe((value) => console.log('1st:' + value));
// Cold Observable(cold$)が流すデータを2.5秒後にobserverがsubscribeする。
setTimeout(() => {
cold$.subscribe((value) => console.log(' 2st:' + value));
}, 2500)
// ログ出力結果
// 1st:0
// 1st:1
// 1st:2
// 2st:0
// 1st:3
// 2st:1
// 1st:4
// 2st:2
// 2st:3
// 2st:4
Hot Observableとは
Hot Observableの特徴を以下に示します。
Hot Observableの特徴
- subscribeされなくてもデータを流す
- ストリームを分岐させて、それぞれのObserverに対して同じデータを流す
上記の特徴について順番に説明します。
subscribeされなくてもデータを流す
「Hot Observable」はsubscribeされなくてもデータを流します。
subscribeしなければ、「Hot Observable」はデータを流し切って消滅します。例えば、以下のプログラムでは、「Hot Observable(hot$
)」をsubcribeしていませんが、データが流れています。そのため、console.log
でのメッセージ(‘subscribe!!’
)がログ出力されます。observer.next(Math.random())
により流れるデータは何にも使用されず、ただ流れて消滅してしまいます。
import { Observable } from 'rxjs';
import { publish } from 'rxjs/operators';
// Cold Observable(cold$)を生成。
const cold$ = new Observable((obserber) => {
console.log('subscribed!');
obserber.next(Math.random());
obserber.next(Math.random());
obserber.next(Math.random());
obserber.complete();
}
)
// publishで「Cold Observable(cold$)」を「Hot Observable(hot$)」に変換。
const hot$ = publish()(cold$);
// connectで強制的に「Hot Observable(hot$)」のデータを流す。
hot$.connect();
// ログ出力結果
// subscribed!
Hot Observableのプログラム例(subscribeされなくてもデータを流す)
以下のプログラムでは、publish
メソッドで「Cold Observable(cold$
)」を「Hot Observable(hot$
)」に変換しています。また、connect
メソッドで強制的に「Hot Observable(hot$
)」のデータを流しています。
ポイントは「Hot Observable(hot$
)」をsubscibeした後にconnect
メソッドを実行している点です。これは、connect
メソッドを実行したタイミングでデータが流れるからです。
import { Observable } from 'rxjs';
import { publish } from 'rxjs/operators';
// Cold Observable(cold$)を生成。
const cold$ = new Observable((obserber) => {
console.log('subscribed!');
obserber.next(Math.random());
obserber.next(Math.random());
obserber.next(Math.random());
obserber.complete();
}
)
// publishで「Cold Observable(cold$)」を「Hot Observable(hot$)」に変換。
const hot$ = publish()(cold$);
// Hot Observable(hot$)が流すデータをobserverがsubscribeする。
// Hot Observable(hot$)はsubscribeしてもデータが流れない。
hot$.subscribe((value) => console.log('1st:' + value));
// connectで強制的に「Hot Observable(hot$)」のデータを流す。
hot$.connect();
// ログ出力結果
// subscribed!
// 1st:0.2876292052350131
// 1st:0.3756210150071897
// 1st:0.3268416496599933
「Hot Observable(hot$
)」をsubscibeする前に、connect
メソッドを実行した場合のプログラム例を以下に示します。この場合、データが流れ終わった後にsubscribeしているので、「Hot Observable(hot$
)」が流すデータをsubscribeすることができません。
import { Observable } from 'rxjs';
import { publish } from 'rxjs/operators';
// Cold Observable(cold$)を生成。
const cold$ = new Observable((obserber) => {
console.log('subscribed!');
obserber.next(Math.random());
obserber.next(Math.random());
obserber.next(Math.random());
obserber.complete();
}
)
// publishで「Cold Observable(cold$)」を「Hot Observable(hot$)」に変換。
const hot$ = publish()(cold$);
// connectで強制的に「Hot Observable(hot$)」のデータを流す。
hot$.connect();
// Hot Observable(hot$)が流すデータをobserverがsubscribeする。
// データを流した後にsubcribeしているので、データをsubcribeできていない。
hot$.subscribe((value) => console.log('1st:' + value));
// ログ出力結果
// subscribed!
以下のプログラムも見てみましょう。以下のプログラムでも、「Hot Observable(hot$
)」が流すデータをsubscribeする前に、connect
メソッドで強制的に「Hot Observable(hot$
)」のデータを流しています。
「Hot Observable(hot$
)」は「0→1→2→3→4」のデータを1秒ごとに流すObservableです。1秒後に「0」が流れ、2秒後に「1」が流れ、3秒後に「2」が流れ、4秒後に「3」が流れ、5秒後に「4」が流れます。このプログラムでは、2.5秒後に「Hot Observable(hot$
)」をsubscibeしているので、subscribeするデータは「2→3→4」となります。
import { interval, take } from 'rxjs';
import { publish } from 'rxjs/operators';
// Cold Observable(cold$)を生成。
const cold$ = interval(1000).pipe(take(5));
// publishで「Cold Observable(cold$)」を「Hot Observable(hot$)」に変換。
const hot$ = publish()(cold$);
// connectで強制的に「Hot Observable(hot$)」のデータを流す。
hot$.connect();
// Hot Observable(hot$)が流すデータを2.5秒後にobserverがsubscribeする。
setTimeout(() => {
hot$.subscribe((value) => console.log('1st:' + value));
}, 2500)
// ログ出力結果
// 1st:2
// 1st:3
// 1st:4
ストリームを分岐させて、それぞれのObserverに対して同じデータを流す
1つの「Hot Observable」に対して複数subscribeした場合、ストリームが分岐し、後続には同じデータを流します。
Hot Observableのプログラム例(ストリームを分岐させて、それぞれのObserverに対して同じデータを流す)
以下のプログラムでは、3つのランダムな数字を流す「Hot Observable(hot$
)」を生成しています。この「Hot Observable(hot$
)」に対して、2つのObserverがsubscribeしています。ストリームを分岐させて、それぞれのObserverに対して同じデータを流すので、それぞれのObserverがsubscribeしたデータは同じになります。
import { Observable } from 'rxjs';
import { publish } from 'rxjs/operators';
// Cold Observable(cold$)を生成。
const cold$ = new Observable((obserber) => {
console.log('subscribed!');
obserber.next(Math.random());
obserber.next(Math.random());
obserber.next(Math.random());
obserber.complete();
}
)
// publishで「Cold Observable(cold$)」を「Hot Observable(hot$)」に変換。
const hot$ = publish()(cold$);
// Hot Observable(hot$)が流すデータをobserverがsubscribeする。
// 同じストロームを見ているので、値が同じになる。
hot$.subscribe((value) => console.log('1st:' + value));
hot$.subscribe((value) => console.log('2nd:' + value));
// connectで強制的に「Hot Observable(hot$)」のデータを流す。
hot$.connect();
// ログ出力結果
// 1st:0.280938957016291
// 2nd:0.280938957016291
// 1st:0.14415824216825102
// 2nd:0.14415824216825102
// 1st:0.038248231064267335
// 2nd:0.038248231064267335
Hot Observableのshare
「Hot Observable」はconnect
メソッドを実行することで、データが流れ始めます。
しかし、これだと「Hot Observable」がいつデータを流すのかを把握していなくてはいけません。これを解決するのがshare
です。share
を用いると、「Cold Observable(cold$
)」を「Hot Observable(hot$
)」に変換するとともに、「Cold Observable」のように、最初のsubscribeでデータを流すことができるようになります。また、「Cold Observable」と異なり、それぞれのObserverがsubscribeするデータは同じであるというHotな性質も持っています。
import { interval, take,Observable } from 'rxjs';
import { share } from 'rxjs/operators';
// Cold Observable(cold$)を生成。
const cold$ = interval(1000).pipe(take(5));
// publishで「Cold Observable(cold$)」を「Hot Observable(hot$)」に変換。
const hot$ =cold$.pipe(share());
// Hot Observable(hot$)が流すデータをobserverがsubscribeする。
// shareを使うと最初のsubscribeでデータが流れる。
hot$.subscribe((value) => console.log('1st:' + value));
// Hot Observable(hot$)が流すデータを2.5秒後にobserverがsubscribeする。
// 同じストロームを見ているので、値が同じになる。
setTimeout(() => {
hot$.subscribe((value) => console.log(' 2st:' + value));
}, 2500)
// ログ出力結果
// 1st:0
// 1st:1
// 1st:2
// 2st:2
// 1st:3
// 2st:3
// 1st:4
// 2st:4
本記事のまとめ
この記事ではRxJSの「Cold Observable」と「Hot Observable」について、以下の内容を説明しました。
- 「Cold Observable」と「Hot Observable」の違い
- 「Cold Observable」の特徴とプログラム例
- 「Hot Observable」の特徴とプログラム例
お読み頂きありがとうございました。