この記事ではRxJSのオペレーターであるmergeMap
、concatMap
、switchMap
、exhaustMap
について
- mergeMap、concatMap、switchMap、exhaustMapの「特徴」・「違い」・「使い分け」
などを図を用いて分かりやすく説明するように心掛けています。ご参考になれば幸いです。
mergeMap、concatMap、switchMap、exhaustMapの特徴
mergeMap
、concatMap
、switchMap
、exhaustMap
は以下の処理を行っています。
○○Mapの処理
- Observebleから受け取ったデータを元に新たなObservebleを生成する
- 各データごとに生成された複数のObservebleを合成して、1つの新たなObservebleを生成する
例えば、以下のプログラムでは、of(1, 2, 3)
により、「1→2→3」のデータが流れるObservebleを生成し、そのObservebleをmergeMap
オペレーターを用いて、「1A→1B→1C→2A→2B→2C→3A→3B→3C」のデータが流れるObservebleに変換しています。
プログラム1
import { of } from 'rxjs';
import { mergeMap, concatMap, switchMap, exhaustMap } from 'rxjs/operators';
of(1, 2, 3)
.pipe(
// mergeMap内で作成された複数のObserveble(「1A → 1B → 1C」、「2A → 2B → 2C」、「3A → 3B → 3C」}を合成して新たなObservebleを作成
mergeMap((value) =>
// 「1」というデータを受け取って「1A → 1B → 1C」が流れるObservebleを作成
// 「2」というデータを受け取って「2A → 2B → 2C」が流れるObservebleを作成
// 「3」というデータを受け取って「3A → 3B → 3C」が流れるObservebleを作成
of(`${value}A`, `${value}B`, `${value}C`)
)
)
// mergeMapによって作成された新たなObservebleをコンソール出力
.subscribe((value) =>
console.log(value)
);
// ログ
// 1A
// 1B
// 1C
// 2A
// 2B
// 2C
// 3A
// 3B
// 3C
上記のプログラムのmergeMap
をconcatMap
やswitchMap
やexhaustMap
に変えても、console.log
関数の出力は同じになります。
ではこれらの違いはなんでしょうか?次にその違いについて説明します。
mergeMap、concatMap、switchMap、exhaustMapの違い
mergeMap
、concatMap
、switchMap
、exhaustMap
の違いは、各データごとに生成された複数のObservebleを合成する時のルールです。
mergeMap
mergeMap
オペレーターは、Observebleから受け取ったデータの順番に関わらず、非同期処理が解決した順番でObservebleを合成します。
例えば、mergeMap
オペレーターに「A→B」のデータが流れるObservebleを渡した時の流れは以下のようになります。
mergeMapオペレーターの流れ
- ObservebleからデータAを受け取ると、データAによる処理を実行する
- データAによる処理が解決していなくても、Observebleから次のデータBを受け取ると、そのデータBによる処理も並列して実行する
mergeMap
オペレーターを用いたプログラム例を以下に示します。
プログラム2
import { of, delay } from 'rxjs';
import { mergeMap, concatMap, switchMap, exhaustMap } from 'rxjs/operators';
of(9, 1, 5)
.pipe(
mergeMap((val) =>
// 「9」というデータを受け取って、「(9 * 1000)ms後(9秒後)」に「Value: 9」が流れるObservebleを作成
// 「1」というデータを受け取って、「(1 * 1000)ms後(1秒後)」に「Value: 1」が流れるObservebleを作成
// 「5」というデータを受け取って、「(5 * 1000)ms後(5秒後)」に「Value: 5」が流れるObservebleを作成
of(`Value: ${val}`)
.pipe(
delay(val * 1000)
)
)
)
// mergeMapによって作成された新たなObservebleをコンソール出力
.subscribe((val) =>
console.log(val)
);
// ログ
// Value: 1 ← 処理を実行してから「1秒後」に出力される
// Value: 5 ← 処理を実行してから「5秒後」に出力される
// Value: 9 ← 処理を実行してから「9秒後」に出力される
上記のプログラムのログ出力を確認すると、mergeMap
オペレーターはObservebleから受け取ったデータの順番に関わらず、非同期処理を実行していることが分かります。
上記のプログラムのログを図で説明すると、以下のようになります。
concatMap
concatMap
オペレーターは、Observebleから受け取ったデータの順番で非同期処理を実行してObservebleを合成します。
例えば、concatMap
オペレーターに「A→B」のデータが流れるObservebleを渡した時の流れは以下のようになります。
concatMapオペレーターの流れ
- ObservebleからデータAを受け取ると、データAによる処理を実行する
- Observebleから次のデータB受け取っても、データAによる処理が解決しないと、データBによる処理を実行しない
concatMap
オペレーターを用いたプログラム例を以下に示します。先ほど示した「プログラム2」において、mergeMap
をconcatMap
に変えています。
import { of, delay } from 'rxjs';
import { mergeMap, concatMap, switchMap, exhaustMap } from 'rxjs/operators';
of(9, 1, 5)
.pipe(
concatMap((val) =>
// 「9」というデータを受け取って、「(9 * 1000)ms後(9秒後)」に「Value: 9」が流れるObservebleを作成
// 「1」というデータを受け取って、「(1 * 1000)ms後(1秒後)」に「Value: 1」が流れるObservebleを作成
// 「5」というデータを受け取って、「(5 * 1000)ms後(5秒後)」に「Value: 5」が流れるObservebleを作成
of(`Value: ${val}`)
.pipe(
delay(val * 1000)
)
)
)
// mergeMapによって作成された新たなObservebleをコンソール出力
.subscribe((val) =>
console.log(val)
);
// ログ
// Value: 9 ← 処理を実行してから「9秒後」に出力される
// Value: 1 ← 処理を実行してから「10秒後(9 + 1)」に出力される
// Value: 5 ← 処理を実行してから「15秒後(9 + 1 + 5)」に出力される
上記のプログラムのログ出力を確認すると、concatMap
オペレーターはObservebleから受け取ったデータの順番で非同期処理を実行していることが分かります。
上記のプログラムのログを図で説明すると、以下のようになります。
switchMap
switchMap
オペレーターは、非同期処理が解決する前に、Observebleから次のデータ受け取ると、未解決の非同期処理をキャンセルして、次の非同期処理を実行して、Observebleを合成します。
つまり、Observebleに流れている最新のデータを優先しているのがswitchMap
オペレーターです。後ほど説明するexhaustMap
オペレーターとは逆の動作をします。
例えば、switchMap
オペレーターに「A→B」のデータが流れるObservebleを渡した時の流れは以下のようになります。
switchMapオペレーターの流れ
- ObservebleからデータAを受け取ると、データAによる処理を実行する
- データAによる処理が解決していなくても、Observebleから次のデータBを受け取ると、データAによる処理をキャンセルし、データBによる処理を実行する
switchMap
オペレーターを用いたプログラム例を以下に示します。先ほど示した「プログラム2」において、mergeMap
をswitchMap
に変えています。
import { of, delay } from 'rxjs';
import { mergeMap, concatMap, switchMap, exhaustMap } from 'rxjs/operators';
of(9, 1, 5)
.pipe(
switchMap((val) =>
// 「9」というデータを受け取って、「(9 * 1000)ms後(9秒後)」に「Value: 9」が流れるObservebleを作成
// 「1」というデータを受け取って、「(1 * 1000)ms後(1秒後)」に「Value: 1」が流れるObservebleを作成
// 「5」というデータを受け取って、「(5 * 1000)ms後(5秒後)」に「Value: 5」が流れるObservebleを作成
of(`Value: ${val}`)
.pipe(
delay(val * 1000)
)
)
)
// mergeMapによって作成された新たなObservebleをコンソール出力
.subscribe((val) =>
console.log(val)
);
// ログ
// Value: 5 ← 処理を実行してから「5秒後」に出力される
上記のプログラムのログ出力を確認すると、switchMap
オペレーターは最新のデータを優先し、前の処理が未完了でも新しい処理に切り替えるという特性を持っていることが分かります。
上記のプログラムのログを図で説明すると、以下のようになります。
exhaustMap
exhaustMap
オペレーターは、非同期処理が解決する前に、Observebleから次のデータ受け取ると、受け取ったデータを破棄して、そのデータによる非同期処理を実行せずに、Observebleを合成します。
つまり、Observebleに流れている古いデータを優先しているのがexhaustMap
オペレーターです。先ほど説明したswitchMap
オペレーターとは逆の動作をします。
例えば、exhaustMap
オペレーターに「A→B」のデータが流れるObservebleを渡した時の流れは以下のようになります。
exhaustMapオペレーターの流れ
- ObservebleからデータAを受け取ると、データAによる処理を実行する
- データAによる処理が解決していない状態で、Observebleから次のデータBを受け取ると、そのデータBを破棄する
exhaustMap
オペレーターを用いたプログラム例を以下に示します。先ほど示した「プログラム2」において、mergeMap
をexhaustMap
に変えています。
import { of, delay } from 'rxjs';
import { mergeMap, concatMap, switchMap, exhaustMap } from 'rxjs/operators';
of(9, 1, 5)
.pipe(
exhaustMap((val) =>
// 「9」というデータを受け取って、「(9 * 1000)ms後(9秒後)」に「Value: 9」が流れるObservebleを作成
// 「1」というデータを受け取って、「(1 * 1000)ms後(1秒後)」に「Value: 1」が流れるObservebleを作成
// 「5」というデータを受け取って、「(5 * 1000)ms後(5秒後)」に「Value: 5」が流れるObservebleを作成
of(`Value: ${val}`)
.pipe(
delay(val * 1000)
)
)
)
// mergeMapによって作成された新たなObservebleをコンソール出力
.subscribe((val) =>
console.log(val)
);
// ログ
// Value: 9 ← 処理を実行してから「9秒後」に出力される
上記のプログラムのログ出力を確認すると、exhaustMap
オペレーターは現在行っている処理を優先し、処理が解決していない状態でObservebleからデータを受け取っても、そのデータを破棄していることが分かります。
上記のプログラムのログを図で説明すると、以下のようになります。
mergeMap、concatMap、switchMap、exhaustMapのプログラム例
もう少し複雑なプログラムでmergeMap
、concatMap
、switchMap
、exhaustMap
の動きを確認してみましょう。
mergeMapを用いたプログラム例
以下に、mergeMap
オペレーターを用いたプログラム例を以下に示します。
プログラム3
import { interval, take } from 'rxjs';
import { mergeMap, concatMap, switchMap, exhaustMap } from 'rxjs/operators';
interval(3500)
.pipe(
mergeMap(() =>
interval(1000)
.pipe(
take(5)
)
)
)
// mergeMapによって作成された新たなObservebleをコンソール出力
.subscribe((val) =>
console.log(val)
);
// ログ
// 0 ←interval(3500)が始まり、mergeMapにより内側のinterval(1000)が開始され、「0」が出力
// 1 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「1」が出力
// 2 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「2」が出力
// 3 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「3」が出力
// 0 ←interval(3500)が「1」を出力し、mergeMapにより新たなinterval(1000)が開始され、「0」が出力
// 4 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「4」が出力
// 1 ←interval(3500)から渡される「1」によって開始されたinterval(1000)から「1」が出力
// 2 ←interval(3500)から渡される「1」によって開始されたinterval(1000)から「2」が出力
// というように続きます。
interval
は「指定した時間間隔で連続した整数値を流すObservable」を生成するオペレーターです。例えば、interval(1000)
は1000ms(1s)ごとに、「0→1→2→3→…」という整数値を流します。
上記のプログラムのログを図で説明すると、以下のようになります。
mergeMap
オペレーターはObservebleから受け取ったデータの順番に関わらず、非同期処理を実行していることが分かります。
concatMapを用いたプログラム例
concatMap
オペレーターを用いたプログラム例を以下に示します。先ほど示した「プログラム3」において、mergeMap
をconcatMap
に変えています。
import { interval, take } from 'rxjs';
import { mergeMap, concatMap, switchMap, exhaustMap } from 'rxjs/operators';
interval(3500)
.pipe(
concatMap(() =>
interval(1000)
.pipe(
take(5)
)
)
)
.subscribe((val) =>
console.log(val)
);
// ログ
// 0 ←interval(3500)が始まり、concatMapにより内側のinterval(1000)が開始され、「0」が出力
// 1 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「1」が出力
// 2 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「2」が出力
// 3 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「3」が出力
// 4 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「4」が出力
// 0 ←interval(3500)から渡される「1」によるinterval(1000)が開始。ただし、前のinterval(1000)が完了するまで待たされるため、「4」の後に「0」が出力される
// 1 ←interval(3500)から渡される「1」によって開始されたinterval(1000)から「1」が出力
// 2 ←interval(3500)から渡される「1」によって開始されたinterval(1000)から「2」が出力
// というように続きます。
上記のプログラムのログを図で説明すると、以下のようになります。
concatMap
オペレーターはObservebleから受け取ったデータの順番で非同期処理を実行していることが分かります。
switchMapを用いたプログラム例
switchMap
オペレーターを用いたプログラム例を以下に示します。先ほど示した「プログラム3」において、mergeMap
をswitchMap
に変えています。
import { interval, take } from 'rxjs';
import { mergeMap, concatMap, switchMap, exhaustMap } from 'rxjs/operators';
interval(3500)
.pipe(
switchMap(() =>
interval(1000)
.pipe(
take(5)
)
)
)
.subscribe((val) =>
console.log(val)
);
// ログ
// 0 ←interval(3500)が始まり、switchMapにより内側のinterval(1000)が開始され、「0」が出力
// 1 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「1」が出力
// 2 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「2」が出力
// 0 ←interval(3500)から渡される「1」をトリガーに、新しいinterval(1000)が開始され、「0」が出力。同時に前のinterval(1000)はキャンセルされる。
// 1 ←interval(3500)から渡される「1」によって開始された新しいinterval(1000)から「1」が出力
// 2 ←interval(3500)から渡される「1」によって開始された新しいinterval(1000)から「2」が出力
// というように続きます。
上記のプログラムのログを図で説明すると、以下のようになります。
switchMap
オペレーターは最新のデータを優先し、前の処理が未完了でも新しい処理に切り替えるという特性を持っていることが分かります。
exhaustMapを用いたプログラム例
exhaustMap
オペレーターを用いたプログラム例を以下に示します。先ほど示した「プログラム3」において、mergeMap
をexhaustMap
に変えています。
import { interval, take } from 'rxjs';
import { mergeMap, concatMap, switchMap, exhaustMap } from 'rxjs/operators';
interval(3500)
.pipe(
exhaustMap(() =>
interval(1000)
.pipe(
take(5)
)
)
)
.subscribe((val) =>
console.log(val)
);
// ログ
// 0 ←interval(3500)が始まり、exhaustMapにより内側のinterval(1000)が開始され、「0」が出力
// 1 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「1」が出力
// 2 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「2」が出力
// 3 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「3」が出力
// (この期間にinterval(3500)から渡される「1」による新しいinterval(1000)が開始しようとするが、前のinterval(1000)がまだ完了していないため無視される)
// 4 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「4」が出力
// (interval(1000)が終了し、次にinterval(3500)から渡される値により新しいinterval(1000)が開始可能になる)
// 0 ←interval(3500)から渡される「2」による新しいinterval(1000)が開始、「0」が出力
// 1 ←interval(3500)から渡される「2」によって新しいinterval(1000)が開始され、「1」が出力
// 2 ←interval(3500)から渡される「2」によって新しいinterval(1000)が開始され、「2」が出力
// というように続きます。
上記のプログラムのログを図で説明すると、以下のようになります。
exhaustMap
オペレーターは現在行っている処理を優先し、処理が解決していない状態でObservebleからデータを受け取っても、そのデータを破棄していることが分かります。
mergeMap、concatMap、switchMap、exhaustMapの使い分け
これまで説明したmergeMap
、concatMap
、switchMap
、exhaustMap
は似ているので、どの場面でどのオペレーターを使うのが最適か分かりにくいことがあります。
以下に、各オペレーターの使用例を示します。
- mergeMap
- 非同期処理を並列に行うことが可能な場面に使用します。
- 例:複数のAPIリクエストの並行処理
- あるWebアプリでユーザーのプロフィール情報とそのユーザーの住所を取得したい場合、これらのリクエストは互いに依存していないので、並列に行うことができます。
- concatMap
- 受け取るデータに依存関係があり、データの順番が重要な場合に使用します。または、前の処理が終わるのを待つ必要がある場合に使用します。
- 例:ファイルのアップロードなど
- ユーザーがアップロードした画像やファイルの順番が重要な場合には、前の処理が終わってから次の処理をする必要があります。
- switchMap
- 最新のデータが重要で、以前のデータを無視することができる場合に使用します。
- 例:ユーザーの検索入力
- ユーザーが検索ボックスにテキストを入力するとき、新しいテキストが入力されると以前の検索結果は無関係になるため、以前の検索をキャンセルすることができます。
- exhaustMap
- 現在行っている処理が重要で、その処理が終了するまで新しいデータを無視する必要がある場合に使用します。
- 例:送信ボタンのクリックによる処理
- ユーザーが何度も送信ボタンをクリックすると、新たな送信が発生しますが、前の送信が完了するまで新しい送信は無視します。
本記事のまとめ
この記事ではRxJSのオペレーターであるmergeMap
、concatMap
、switchMap
、exhaustMap
について、以下の内容を説明しました。
- mergeMap、concatMap、switchMap、exhaustMapの「特徴」・「違い」・「使い分け」
お読み頂きありがとうございました。