【RxJS】「Cold Observable」・「Hot Observable」とは?違いなどを解説!

この記事ではRxJSの「Cold Observable」と「Hot Observable」について

  • 「Cold Observable」と「Hot Observable」の違い
  • 「Cold Observable」の特徴とプログラム例
  • 「Hot Observable」の特徴とプログラム例

などを図を用いて分かりやすく説明するように心掛けています。ご参考になれば幸いです。

「Cold Observable」と「Hot Observable」の違い

ObservableにはColdな性質Hotな性質があります。

Coldな性質を持つObservableは「Cold Observable」、Hotな性質を持つObservableは「Hot Observable」と呼ばれています。

ほとんどのObservableは「Cold Observable」です。「Cold Observable」を「Hot Observable」に変換しない限りは「Cold Observable」となります。

以下に「Cold Observable」と「Hot Observable」の違いを示します。

  • Cold Observable
    • subscribeされるまでデータを流さない受動的なObservable
    • ストリームが枝分かれしない
      • 1つの「Cold Observable」に対して複数subscribeした場合、それぞれに対して個別にストリーム(連続的なデータの流れ)が生成されて、割り当てられる。
  • Hot Observable
    • subscribeされなくてもデータを流す能動的なObservable
    • ストリームを枝分かれさせる特徴を持つ
      • 1つの「Hot Observable」に対して複数subscribeした場合、ストリームが分岐し、後続には同じデータを流す。

Cold Observableとは

Cold Observableの特徴を以下に示します。

Cold Observableの特徴

  • subscribeされるまでデータを流さない
  • それぞれのObserverに対して個別にデータを流す(ストリームが枝分かれしない)

上記の特徴について順番に説明します。

subscribeされるまでデータを流さない

Cold Observableの特徴(subscribeされるまでデータを流さない)

「Cold Observable」はsubscribeされるまでデータを流しません。subscribeすることで、初めてデータが流れるようになります。subscribeしない場合、「Cold Observable」で流すデータは消滅してしまいます。

例えば、以下のプログラムでは「Cold Observable(cold$)」をsubscribeしていないので、データが流れません。そのため、console.logでのメッセージ(‘subscribe!!’)も出力されないし、Math.randomメソッドによって流すデータも消滅してしまいます。

import { Observable } from 'rxjs';

// Cold Observable(cold$)を生成。
// Cold Observable(cold$)はobserverがsubscribeするまでデータが流れない。
const cold$ = new Observable((obserber) => {
    console.log('subscribed!');
    obserber.next(Math.random());
    obserber.next(Math.random());
    obserber.next(Math.random());
    obserber.complete();
}
)

// ログ出力結果
// ログには何も表示されない

Cold Observableのプログラム例(subscribeされるまでデータを流さない)

以下のプログラムでは、3つのランダムな数字を流す「Cold Observable(cold$)」を生成しています。この「Cold Observable(cold$)」をsubscribeすることによって、データが流れ始めます。流れたデータはconsole.logにより出力しています。

import { Observable } from 'rxjs';

// Cold Observable(cold$)を生成。
// Cold Observable(cold$)はobserverがsubscribeするまでデータが流れない。
const cold$ = new Observable((obserber) => {
    console.log('subscribed!');
    obserber.next(Math.random());
    obserber.next(Math.random());
    obserber.next(Math.random());
    obserber.complete();
}
)

// Cold Observable(cold$)が流すデータをobserverがsubscribeする。
// Cold Observable(cold$)はobserverがsubscribeすると、データが流れ始める。
cold$.subscribe((value) => console.log('1st:' + value));

// ログ出力結果
// subscribed!
// 1st:0.4162071714889599
// 1st:0.2171136394504538
// 1st:0.5395479011489133

それぞれのObserverに対して個別にデータを流す

Cold Observableの特徴(それぞれのObserverに対して個別にデータを流す)

1つのCold Observableに対して複数subscribeした場合、それぞれに対して個別にストリームが生成されて、割り当てられます。すなわち、Cold Observableはストリームを分岐させる特徴を持っていません。

Cold Observableのプログラム例(それぞれのObserverに対して個別にデータを流す)

以下のプログラムでは、3つのランダムな数字を流す「Cold Observable(cold$)」を生成しています。この「Cold Observable(cold$)」に対して、2つのObserverがsubscribeしています。それぞれに対して個別にストリームが生成されて、割り当てられるので、それぞれのObserverがsubscribeしたデータが異なります。

import { Observable } from 'rxjs';

// Cold Observable(cold$)を生成
// Cold Observable(cold$)はobserverがsubscribeするまでデータが流れない。
const cold$ = new Observable((obserber) => {
    console.log('subscribed!');
    obserber.next(Math.random());
    obserber.next(Math.random());
    obserber.next(Math.random());
    obserber.complete();
}
)

// Cold Observable(cold$)が流すデータをobserverがsubscribeする。
// Cold Observable(cold$)はobserverがsubscribeすると、データが流れ始める。
cold$.subscribe((value) => console.log('1st:' + value));

// 1つのCold Observableに対して複数subscribeした場合、それぞれに別々のストリームが生成されて、割り当てられるので、値が異なる。
cold$.subscribe((value) => console.log('2nd:' + value));

// ログ出力結果
// subscribed!
// 1st:0.3504699269808764
// 1st:0.6635557092959816
// 1st:0.08736549608011779
// subscribed!
// 2nd:0.33503948630486957
// 2nd:0.09301964393071249
// 2nd:0.8749606152565355

以下のプログラムも見てみましょう。以下のプログラムでも「Cold Observable(cold$)」に対して、2つのObserverがsubscribeしています。「Cold Observable(cold$)」は「0→1→2→3→4」のデータを1秒ごとに流すObservableです。1秒後に「0」が流れ、2秒後に「1」が流れ、3秒後に「2」が流れ、4秒後に「3」が流れ、5秒後に「4」が流れます。

ObserverBはObserberAより2.5秒遅くsubcribeしているので、ObserberBがsubscribeするデータは「2→3→4」になるように感じますが、ObserberBがsubscribeするデータは「0→1→2→3→4」になります。これは、「Cold Observable(cold$)」は「subcribeされるとデータを流す」かつ「それぞれのObserverに対して個別にデータを流す」という特徴があるからです。

import { interval, take } from 'rxjs';

// Cold Observable(cold$)を生成。
const cold$ = interval(1000).pipe(take(5));

// Cold Observable(cold$)が流すデータをobserverがsubscribeする。
cold$.subscribe((value) => console.log('1st:' + value));

// Cold Observable(cold$)が流すデータを2.5秒後にobserverがsubscribeする。
setTimeout(() => {
    cold$.subscribe((value) => console.log('       2st:' + value));
}, 2500)

// ログ出力結果
// 1st:0
// 1st:1
// 1st:2
//        2st:0
// 1st:3
//        2st:1
// 1st:4
//        2st:2
//        2st:3
//        2st:4

Hot Observableとは

Hot Observableの特徴を以下に示します。

Hot Observableの特徴

  • subscribeされなくてもデータを流す
  • ストリームを分岐させて、それぞれのObserverに対して同じデータを流す

上記の特徴について順番に説明します。

subscribeされなくてもデータを流す

Hot Observableの特徴(subscribeされなくてもデータを流す)

「Hot Observable」はsubscribeされなくてもデータを流します。

subscribeしなければ、「Hot Observable」はデータを流し切って消滅します。例えば、以下のプログラムでは、「Hot Observable(hot$)」をsubcribeしていませんが、データが流れています。そのため、console.logでのメッセージ(‘subscribe!!’)がログ出力されます。observer.next(Math.random())により流れるデータは何にも使用されず、ただ流れて消滅してしまいます。

import { Observable } from 'rxjs';
import { publish } from 'rxjs/operators';

// Cold Observable(cold$)を生成。
const cold$ = new Observable((obserber) => {
    console.log('subscribed!');
    obserber.next(Math.random());
    obserber.next(Math.random());
    obserber.next(Math.random());
    obserber.complete();
}
)

// publishで「Cold Observable(cold$)」を「Hot Observable(hot$)」に変換。
const hot$ = publish()(cold$);

// connectで強制的に「Hot Observable(hot$)」のデータを流す。
hot$.connect();

// ログ出力結果
// subscribed!

Hot Observableのプログラム例(subscribeされなくてもデータを流す)

以下のプログラムでは、publishメソッドで「Cold Observable(cold$)」を「Hot Observable(hot$)」に変換しています。また、connectメソッドで強制的に「Hot Observable(hot$)」のデータを流しています。

ポイントは「Hot Observable(hot$)」をsubscibeした後にconnectメソッドを実行している点です。これは、connectメソッドを実行したタイミングでデータが流れるからです。

import { Observable } from 'rxjs';
import { publish } from 'rxjs/operators';

// Cold Observable(cold$)を生成。
const cold$ = new Observable((obserber) => {
    console.log('subscribed!');
    obserber.next(Math.random());
    obserber.next(Math.random());
    obserber.next(Math.random());
    obserber.complete();
}
)

// publishで「Cold Observable(cold$)」を「Hot Observable(hot$)」に変換。
const hot$ = publish()(cold$);

// Hot Observable(hot$)が流すデータをobserverがsubscribeする。
// Hot Observable(hot$)はsubscribeしてもデータが流れない。
hot$.subscribe((value) => console.log('1st:' + value));

// connectで強制的に「Hot Observable(hot$)」のデータを流す。
hot$.connect();

// ログ出力結果
// subscribed!
// 1st:0.2876292052350131
// 1st:0.3756210150071897
// 1st:0.3268416496599933

「Hot Observable(hot$)」をsubscibeする前に、connectメソッドを実行した場合のプログラム例を以下に示します。この場合、データが流れ終わった後にsubscribeしているので、「Hot Observable(hot$)」が流すデータをsubscribeすることができません。

import { Observable } from 'rxjs';
import { publish } from 'rxjs/operators';

// Cold Observable(cold$)を生成。
const cold$ = new Observable((obserber) => {
    console.log('subscribed!');
    obserber.next(Math.random());
    obserber.next(Math.random());
    obserber.next(Math.random());
    obserber.complete();
}
)

// publishで「Cold Observable(cold$)」を「Hot Observable(hot$)」に変換。
const hot$ = publish()(cold$);

// connectで強制的に「Hot Observable(hot$)」のデータを流す。
hot$.connect();

// Hot Observable(hot$)が流すデータをobserverがsubscribeする。
// データを流した後にsubcribeしているので、データをsubcribeできていない。
hot$.subscribe((value) => console.log('1st:' + value));

// ログ出力結果
// subscribed!

以下のプログラムも見てみましょう。以下のプログラムでも、「Hot Observable(hot$)」が流すデータをsubscribeする前に、connectメソッドで強制的に「Hot Observable(hot$)」のデータを流しています。

「Hot Observable(hot$)」は「0→1→2→3→4」のデータを1秒ごとに流すObservableです。1秒後に「0」が流れ、2秒後に「1」が流れ、3秒後に「2」が流れ、4秒後に「3」が流れ、5秒後に「4」が流れます。このプログラムでは、2.5秒後に「Hot Observable(hot$)」をsubscibeしているので、subscribeするデータは「2→3→4」となります。

import { interval, take } from 'rxjs';
import { publish } from 'rxjs/operators';

// Cold Observable(cold$)を生成。
const cold$ = interval(1000).pipe(take(5));

// publishで「Cold Observable(cold$)」を「Hot Observable(hot$)」に変換。
const hot$ = publish()(cold$);

// connectで強制的に「Hot Observable(hot$)」のデータを流す。
hot$.connect();

// Hot Observable(hot$)が流すデータを2.5秒後にobserverがsubscribeする。
setTimeout(() => {
    hot$.subscribe((value) => console.log('1st:' + value));
}, 2500)

// ログ出力結果
// 1st:2
// 1st:3
// 1st:4

ストリームを分岐させて、それぞれのObserverに対して同じデータを流す

Hot Observableの特徴(ストリームを分岐させて、それぞれのObserverに対して同じデータを流す)a

1つの「Hot Observable」に対して複数subscribeした場合、ストリームが分岐し、後続には同じデータを流します。

Hot Observableのプログラム例(ストリームを分岐させて、それぞれのObserverに対して同じデータを流す)

以下のプログラムでは、3つのランダムな数字を流す「Hot Observable(hot$)」を生成しています。この「Hot Observable(hot$)」に対して、2つのObserverがsubscribeしています。ストリームを分岐させて、それぞれのObserverに対して同じデータを流すので、それぞれのObserverがsubscribeしたデータは同じになります。

import { Observable } from 'rxjs';
import { publish } from 'rxjs/operators';

// Cold Observable(cold$)を生成。
const cold$ = new Observable((obserber) => {
    console.log('subscribed!');
    obserber.next(Math.random());
    obserber.next(Math.random());
    obserber.next(Math.random());
    obserber.complete();
}
)

// publishで「Cold Observable(cold$)」を「Hot Observable(hot$)」に変換。
const hot$ = publish()(cold$);

// Hot Observable(hot$)が流すデータをobserverがsubscribeする。
// 同じストロームを見ているので、値が同じになる。
hot$.subscribe((value) => console.log('1st:' + value));
hot$.subscribe((value) => console.log('2nd:' + value));

// connectで強制的に「Hot Observable(hot$)」のデータを流す。
hot$.connect();

// ログ出力結果
// 1st:0.280938957016291
// 2nd:0.280938957016291
// 1st:0.14415824216825102
// 2nd:0.14415824216825102
// 1st:0.038248231064267335
// 2nd:0.038248231064267335

Hot Observableのshare

「Hot Observable」はconnectメソッドを実行することで、データが流れ始めます。

しかし、これだと「Hot Observable」がいつデータを流すのかを把握していなくてはいけません。これを解決するのがshareです。shareを用いると、「Cold Observable(cold$)」を「Hot Observable(hot$)」に変換するとともに、「Cold Observable」のように、最初のsubscribeでデータを流すことができるようになります。また、「Cold Observable」と異なり、それぞれのObserverがsubscribeするデータは同じであるというHotな性質も持っています。

import { interval, take,Observable } from 'rxjs';
import { share } from 'rxjs/operators';

// Cold Observable(cold$)を生成。
const cold$ = interval(1000).pipe(take(5));

// publishで「Cold Observable(cold$)」を「Hot Observable(hot$)」に変換。
const hot$ =cold$.pipe(share());

// Hot Observable(hot$)が流すデータをobserverがsubscribeする。
// shareを使うと最初のsubscribeでデータが流れる。
hot$.subscribe((value) => console.log('1st:' + value));

// Hot Observable(hot$)が流すデータを2.5秒後にobserverがsubscribeする。
// 同じストロームを見ているので、値が同じになる。
setTimeout(() => {
    hot$.subscribe((value) => console.log('       2st:' + value));
}, 2500)

// ログ出力結果
// 1st:0
// 1st:1
// 1st:2
//        2st:2
// 1st:3
//        2st:3
// 1st:4
//        2st:4

本記事のまとめ

この記事ではRxJSの「Cold Observable」と「Hot Observable」について、以下の内容を説明しました。

  • 「Cold Observable」と「Hot Observable」の違い
  • 「Cold Observable」の特徴とプログラム例
  • 「Hot Observable」の特徴とプログラム例

お読み頂きありがとうございました。