mergeMap、concatMap、switchMap、exhaustMapの違い【RxJS】

この記事ではRxJSのオペレーターであるmergeMapconcatMapswitchMapexhaustMapについて

  • mergeMap、concatMap、switchMap、exhaustMapの「特徴」・「違い」・「使い分け」

などを図を用いて分かりやすく説明するように心掛けています。ご参考になれば幸いです。

mergeMap、concatMap、switchMap、exhaustMapの特徴

mergeMapconcatMapswitchMapexhaustMapは以下の処理を行っています。

○○Mapの処理

  • Observebleから受け取ったデータを元に新たなObservebleを生成する
  • 各データごとに生成された複数のObservebleを合成して、1つの新たなObservebleを生成する

例えば、以下のプログラムでは、of(1, 2, 3)により、「1→2→3」のデータが流れるObservebleを生成し、そのObservebleをmergeMapオペレーターを用いて、「1A→1B→1C→2A→2B→2C→3A→3B→3C」のデータが流れるObservebleに変換しています。

プログラム1

import { of } from 'rxjs';
import { mergeMap, concatMap, switchMap, exhaustMap } from 'rxjs/operators';

of(1, 2, 3)
    .pipe(
        // mergeMap内で作成された複数のObserveble(「1A → 1B → 1C」、「2A → 2B → 2C」、「3A → 3B → 3C」}を合成して新たなObservebleを作成
        mergeMap((value) =>
            // 「1」というデータを受け取って「1A → 1B → 1C」が流れるObservebleを作成
            // 「2」というデータを受け取って「2A → 2B → 2C」が流れるObservebleを作成
            // 「3」というデータを受け取って「3A → 3B → 3C」が流れるObservebleを作成
            of(`${value}A`, `${value}B`, `${value}C`)
        )
    )
    // mergeMapによって作成された新たなObservebleをコンソール出力
    .subscribe((value) =>
        console.log(value)
    );
// ログ
// 1A
// 1B
// 1C
// 2A
// 2B
// 2C
// 3A
// 3B
// 3C

上記のプログラムのmergeMapconcatMapswitchMapexhaustMapに変えても、console.log関数の出力は同じになります。

ではこれらの違いはなんでしょうか?次にその違いについて説明します。

mergeMap、concatMap、switchMap、exhaustMapの違い

mergeMapconcatMapswitchMapexhaustMapの違いは、各データごとに生成された複数のObservebleを合成する時のルールです。

mergeMap

mergeMapオペレーターは、Observebleから受け取ったデータの順番に関わらず、非同期処理が解決した順番でObservebleを合成します。

例えば、mergeMapオペレーターに「A→B」のデータが流れるObservebleを渡した時の流れは以下のようになります。

mergeMapオペレーターの流れ

  • ObservebleからデータAを受け取ると、データAによる処理を実行する
  • データAによる処理が解決していなくても、Observebleから次のデータBを受け取ると、そのデータBによる処理も並列して実行する

mergeMapオペレーターを用いたプログラム例を以下に示します。

プログラム2

import { of, delay } from 'rxjs';
import { mergeMap, concatMap, switchMap, exhaustMap } from 'rxjs/operators';

of(9, 1, 5)
    .pipe(
        mergeMap((val) =>
            // 「9」というデータを受け取って、「(9 * 1000)ms後(9秒後)」に「Value: 9」が流れるObservebleを作成
            // 「1」というデータを受け取って、「(1 * 1000)ms後(1秒後)」に「Value: 1」が流れるObservebleを作成
            // 「5」というデータを受け取って、「(5 * 1000)ms後(5秒後)」に「Value: 5」が流れるObservebleを作成
            of(`Value: ${val}`)
                .pipe(
                    delay(val * 1000)
                )
        )
    )
    // mergeMapによって作成された新たなObservebleをコンソール出力
    .subscribe((val) =>
        console.log(val)
    );
// ログ
// Value: 1 ← 処理を実行してから「1秒後」に出力される
// Value: 5 ← 処理を実行してから「5秒後」に出力される
// Value: 9 ← 処理を実行してから「9秒後」に出力される

上記のプログラムのログ出力を確認すると、mergeMapオペレーターはObservebleから受け取ったデータの順番に関わらず、非同期処理を実行していることが分かります。

上記のプログラムのログを図で説明すると、以下のようになります。

mergeMap

concatMap

concatMapオペレーターは、Observebleから受け取ったデータの順番で非同期処理を実行してObservebleを合成します。

例えば、concatMapオペレーターに「A→B」のデータが流れるObservebleを渡した時の流れは以下のようになります。

concatMapオペレーターの流れ

  • ObservebleからデータAを受け取ると、データAによる処理を実行する
  • Observebleから次のデータB受け取っても、データAによる処理が解決しないと、データBによる処理を実行しない

concatMapオペレーターを用いたプログラム例を以下に示します。先ほど示した「プログラム2」において、mergeMapconcatMapに変えています。

import { of, delay } from 'rxjs';
import { mergeMap, concatMap, switchMap, exhaustMap } from 'rxjs/operators';

of(9, 1, 5)
    .pipe(
        concatMap((val) =>
            // 「9」というデータを受け取って、「(9 * 1000)ms後(9秒後)」に「Value: 9」が流れるObservebleを作成
            // 「1」というデータを受け取って、「(1 * 1000)ms後(1秒後)」に「Value: 1」が流れるObservebleを作成
            // 「5」というデータを受け取って、「(5 * 1000)ms後(5秒後)」に「Value: 5」が流れるObservebleを作成
            of(`Value: ${val}`)
                .pipe(
                    delay(val * 1000)
                )
        )
    )
    // mergeMapによって作成された新たなObservebleをコンソール出力
    .subscribe((val) =>
        console.log(val)
    );
// ログ
// Value: 9 ← 処理を実行してから「9秒後」に出力される
// Value: 1 ← 処理を実行してから「10秒後(9 + 1)」に出力される
// Value: 5 ← 処理を実行してから「15秒後(9 + 1 + 5)」に出力される

上記のプログラムのログ出力を確認すると、concatMapオペレーターはObservebleから受け取ったデータの順番で非同期処理を実行していることが分かります。

上記のプログラムのログを図で説明すると、以下のようになります。

concatMap

switchMap

switchMapオペレーターは、非同期処理が解決する前に、Observebleから次のデータ受け取ると、未解決の非同期処理をキャンセルして、次の非同期処理を実行して、Observebleを合成します。

つまり、Observebleに流れている最新のデータを優先しているのがswitchMapオペレーターです。後ほど説明するexhaustMapオペレーターとは逆の動作をします。

例えば、switchMapオペレーターに「A→B」のデータが流れるObservebleを渡した時の流れは以下のようになります。

switchMapオペレーターの流れ

  • ObservebleからデータAを受け取ると、データAによる処理を実行する
  • データAによる処理が解決していなくても、Observebleから次のデータBを受け取ると、データAによる処理をキャンセルし、データBによる処理を実行する

switchMapオペレーターを用いたプログラム例を以下に示します。先ほど示した「プログラム2」において、mergeMapswitchMapに変えています。

import { of, delay } from 'rxjs';
import { mergeMap, concatMap, switchMap, exhaustMap } from 'rxjs/operators';

of(9, 1, 5)
    .pipe(
        switchMap((val) =>
            // 「9」というデータを受け取って、「(9 * 1000)ms後(9秒後)」に「Value: 9」が流れるObservebleを作成
            // 「1」というデータを受け取って、「(1 * 1000)ms後(1秒後)」に「Value: 1」が流れるObservebleを作成
            // 「5」というデータを受け取って、「(5 * 1000)ms後(5秒後)」に「Value: 5」が流れるObservebleを作成
            of(`Value: ${val}`)
                .pipe(
                    delay(val * 1000)
                )
        )
    )
    // mergeMapによって作成された新たなObservebleをコンソール出力
    .subscribe((val) =>
        console.log(val)
    );
// ログ
// Value: 5 ← 処理を実行してから「5秒後」に出力される

上記のプログラムのログ出力を確認すると、switchMapオペレーターは最新のデータを優先し、前の処理が未完了でも新しい処理に切り替えるという特性を持っていることが分かります。

上記のプログラムのログを図で説明すると、以下のようになります。

switchMap

exhaustMap

exhaustMapオペレーターは、非同期処理が解決する前に、Observebleから次のデータ受け取ると、受け取ったデータを破棄して、そのデータによる非同期処理を実行せずに、Observebleを合成します。

つまり、Observebleに流れている古いデータを優先しているのがexhaustMapオペレーターです。先ほど説明したswitchMapオペレーターとは逆の動作をします。

例えば、exhaustMapオペレーターに「A→B」のデータが流れるObservebleを渡した時の流れは以下のようになります。

exhaustMapオペレーターの流れ

  • ObservebleからデータAを受け取ると、データAによる処理を実行する
  • データAによる処理が解決していない状態で、Observebleから次のデータBを受け取ると、そのデータBを破棄する

exhaustMapオペレーターを用いたプログラム例を以下に示します。先ほど示した「プログラム2」において、mergeMapexhaustMapに変えています。

import { of, delay } from 'rxjs';
import { mergeMap, concatMap, switchMap, exhaustMap } from 'rxjs/operators';

of(9, 1, 5)
    .pipe(
        exhaustMap((val) =>
            // 「9」というデータを受け取って、「(9 * 1000)ms後(9秒後)」に「Value: 9」が流れるObservebleを作成
            // 「1」というデータを受け取って、「(1 * 1000)ms後(1秒後)」に「Value: 1」が流れるObservebleを作成
            // 「5」というデータを受け取って、「(5 * 1000)ms後(5秒後)」に「Value: 5」が流れるObservebleを作成
            of(`Value: ${val}`)
                .pipe(
                    delay(val * 1000)
                )
        )
    )
    // mergeMapによって作成された新たなObservebleをコンソール出力
    .subscribe((val) =>
        console.log(val)
    );
// ログ
// Value: 9 ← 処理を実行してから「9秒後」に出力される

上記のプログラムのログ出力を確認すると、exhaustMapオペレーターは現在行っている処理を優先し、処理が解決していない状態でObservebleからデータを受け取っても、そのデータを破棄していることが分かります。

上記のプログラムのログを図で説明すると、以下のようになります。

exhaustMap

mergeMap、concatMap、switchMap、exhaustMapのプログラム例

もう少し複雑なプログラムでmergeMapconcatMapswitchMapexhaustMapの動きを確認してみましょう。

mergeMapを用いたプログラム例

以下に、mergeMapオペレーターを用いたプログラム例を以下に示します。

プログラム3

import { interval, take } from 'rxjs';
import { mergeMap, concatMap, switchMap, exhaustMap } from 'rxjs/operators';

interval(3500)
    .pipe(
        mergeMap(() =>
            interval(1000)
                .pipe(
                    take(5)
                )
        )
    )
    // mergeMapによって作成された新たなObservebleをコンソール出力
    .subscribe((val) =>
        console.log(val)
    );
// ログ
// 0 ←interval(3500)が始まり、mergeMapにより内側のinterval(1000)が開始され、「0」が出力
// 1 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「1」が出力
// 2 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「2」が出力
// 3 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「3」が出力
// 0 ←interval(3500)が「1」を出力し、mergeMapにより新たなinterval(1000)が開始され、「0」が出力
// 4 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「4」が出力
// 1 ←interval(3500)から渡される「1」によって開始されたinterval(1000)から「1」が出力
// 2 ←interval(3500)から渡される「1」によって開始されたinterval(1000)から「2」が出力
// というように続きます。

intervalは「指定した時間間隔で連続した整数値を流すObservable」を生成するオペレーターです。例えば、interval(1000)は1000ms(1s)ごとに、「0→1→2→3→…」という整数値を流します。

上記のプログラムのログを図で説明すると、以下のようになります。

mergeMapを用いたプログラム例

mergeMapオペレーターはObservebleから受け取ったデータの順番に関わらず、非同期処理を実行していることが分かります。

concatMapを用いたプログラム例

concatMapオペレーターを用いたプログラム例を以下に示します。先ほど示した「プログラム3」において、mergeMapconcatMapに変えています。

import { interval, take } from 'rxjs';
import { mergeMap, concatMap, switchMap, exhaustMap } from 'rxjs/operators';

interval(3500)
    .pipe(
        concatMap(() =>
            interval(1000)
                .pipe(
                    take(5)
                )
        )
    )
    .subscribe((val) =>
        console.log(val)
    );
// ログ
// 0 ←interval(3500)が始まり、concatMapにより内側のinterval(1000)が開始され、「0」が出力
// 1 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「1」が出力
// 2 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「2」が出力
// 3 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「3」が出力
// 4 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「4」が出力
// 0 ←interval(3500)から渡される「1」によるinterval(1000)が開始。ただし、前のinterval(1000)が完了するまで待たされるため、「4」の後に「0」が出力される
// 1 ←interval(3500)から渡される「1」によって開始されたinterval(1000)から「1」が出力
// 2 ←interval(3500)から渡される「1」によって開始されたinterval(1000)から「2」が出力
// というように続きます。

上記のプログラムのログを図で説明すると、以下のようになります。

concatMapを用いたプログラム例

concatMapオペレーターはObservebleから受け取ったデータの順番で非同期処理を実行していることが分かります。

switchMapを用いたプログラム例

switchMapオペレーターを用いたプログラム例を以下に示します。先ほど示した「プログラム3」において、mergeMapswitchMapに変えています。

import { interval, take } from 'rxjs';
import { mergeMap, concatMap, switchMap, exhaustMap } from 'rxjs/operators';

interval(3500)
    .pipe(
        switchMap(() =>
            interval(1000)
                .pipe(
                    take(5)
                )
        )
    )
    .subscribe((val) =>
        console.log(val)
    );
// ログ
// 0 ←interval(3500)が始まり、switchMapにより内側のinterval(1000)が開始され、「0」が出力
// 1 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「1」が出力
// 2 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「2」が出力
// 0 ←interval(3500)から渡される「1」をトリガーに、新しいinterval(1000)が開始され、「0」が出力。同時に前のinterval(1000)はキャンセルされる。
// 1 ←interval(3500)から渡される「1」によって開始された新しいinterval(1000)から「1」が出力
// 2 ←interval(3500)から渡される「1」によって開始された新しいinterval(1000)から「2」が出力
// というように続きます。

上記のプログラムのログを図で説明すると、以下のようになります。

switchMapを用いたプログラム例

switchMapオペレーターは最新のデータを優先し、前の処理が未完了でも新しい処理に切り替えるという特性を持っていることが分かります。

exhaustMapを用いたプログラム例

exhaustMapオペレーターを用いたプログラム例を以下に示します。先ほど示した「プログラム3」において、mergeMapexhaustMapに変えています。

import { interval, take } from 'rxjs';
import { mergeMap, concatMap, switchMap, exhaustMap } from 'rxjs/operators';

interval(3500)
    .pipe(
        exhaustMap(() =>
            interval(1000)
                .pipe(
                    take(5)
                )
        )
    )
    .subscribe((val) =>
        console.log(val)
    );
// ログ
// 0 ←interval(3500)が始まり、exhaustMapにより内側のinterval(1000)が開始され、「0」が出力
// 1 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「1」が出力
// 2 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「2」が出力
// 3 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「3」が出力
//    (この期間にinterval(3500)から渡される「1」による新しいinterval(1000)が開始しようとするが、前のinterval(1000)がまだ完了していないため無視される)
// 4 ←interval(3500)から渡される「0」によって開始されたinterval(1000)から「4」が出力
//    (interval(1000)が終了し、次にinterval(3500)から渡される値により新しいinterval(1000)が開始可能になる)
// 0 ←interval(3500)から渡される「2」による新しいinterval(1000)が開始、「0」が出力
// 1 ←interval(3500)から渡される「2」によって新しいinterval(1000)が開始され、「1」が出力
// 2 ←interval(3500)から渡される「2」によって新しいinterval(1000)が開始され、「2」が出力
// というように続きます。

上記のプログラムのログを図で説明すると、以下のようになります。

exhaustMapを用いたプログラム例

exhaustMapオペレーターは現在行っている処理を優先し、処理が解決していない状態でObservebleからデータを受け取っても、そのデータを破棄していることが分かります。

mergeMap、concatMap、switchMap、exhaustMapの使い分け

これまで説明したmergeMapconcatMapswitchMapexhaustMapは似ているので、どの場面でどのオペレーターを使うのが最適か分かりにくいことがあります。

以下に、各オペレーターの使用例を示します。

  • mergeMap
    • 非同期処理を並列に行うことが可能な場面に使用します。
    • 例:複数のAPIリクエストの並行処理
      • あるWebアプリでユーザーのプロフィール情報とそのユーザーの住所を取得したい場合、これらのリクエストは互いに依存していないので、並列に行うことができます。
  • concatMap
    • 受け取るデータに依存関係があり、データの順番が重要な場合に使用します。または、前の処理が終わるのを待つ必要がある場合に使用します。
    • 例:ファイルのアップロードなど
      • ユーザーがアップロードした画像やファイルの順番が重要な場合には、前の処理が終わってから次の処理をする必要があります。
  • switchMap
    • 最新のデータが重要で、以前のデータを無視することができる場合に使用します。
    • 例:ユーザーの検索入力
      • ユーザーが検索ボックスにテキストを入力するとき、新しいテキストが入力されると以前の検索結果は無関係になるため、以前の検索をキャンセルすることができます。
  • exhaustMap
    • 現在行っている処理が重要で、その処理が終了するまで新しいデータを無視する必要がある場合に使用します。
    • 例:送信ボタンのクリックによる処理
      • ユーザーが何度も送信ボタンをクリックすると、新たな送信が発生しますが、前の送信が完了するまで新しい送信は無視します。

本記事のまとめ

この記事ではRxJSのオペレーターであるmergeMapconcatMapswitchMapexhaustMapについて、以下の内容を説明しました。

  • mergeMap、concatMap、switchMap、exhaustMapの「特徴」・「違い」・「使い分け」

お読み頂きありがとうございました。