RxJSのストリームが止まる?switchMapとcatchErrorの正しい配置場所とは?

RxJSのswitchMap, mergeMap, concatMap, exhaustMapcatchErrorを組み合わせて使う際には、少し注意が必要です。

特にcatchErrorの配置場所を間違えた場合、エラーが1回発生しただけで、ストリーム(stream)が停止して、後続の処理が一切動かなくなるといった問題に直面することがあります。

この記事では、switchMapを例にして「ストリームが停止しないcatchErrorの正しい配置場所」を、サンプルコードを用いてわかりやすく解説します。ご参考になれば幸いです。

switchMapの外でcatchErrorを使った場合(ストリームが止まる)

switchMapcatchErrorを使った場合、switchMapの内部でエラーが発生すると、ストリーム全体が終了(停止)してしまいます。サンプルコードを以下に示します。

import { of, throwError, timer } from 'rxjs';
import { catchError, switchMap } from 'rxjs/operators';

// 1秒毎に値を流す外側のストリーム
const source$ = timer(0, 1000);

// 値に応じてObservableを返す関数
function handleValue(value) {
  if (value === 2) {
    return throwError(() => new Error('switchMapの中でエラーが発生しました!!'));
  }
  return of(value);
}

const example$ = source$.pipe(
  switchMap((value) => {
    return handleValue(value);
  }),
  // switchMapの外でcatchErrorを使っている
  catchError((error) => {
    console.log('catchErrorで補足:', error.message);
    return of('エラーを復旧します');
  })
);

// 購読して結果を表示
example$.subscribe({
  next: (value) => console.log('Next:', value),
  error: (error) => console.log('Error:', error),
  complete: () => console.log('Complete'),
});

// 実行結果
// Next: 0
// Next: 1
// catchErrorで補足: switchMapの中でエラーが発生しました!!
// Next: エラーを復旧します
// Complete

上記のサンプルコードでは、switchMapが生成する内部のObservable(handleValue(value))がthrowErrorによって例外を発生させます。するとその例外はswitchMapの外に流れ、パイプ全体のcatchErrorによって捕捉されます。

外側のcatchErrorはパイプの全体を対象にしているため、エラーを捕捉した時点で元のストリーム(source$)が終了(停止)してしまいます。そのため、それ以降のsource$の値は流れてきません。実行結果を見ると、3が流れずに終わっていますね。

つまり、switchMapの外でcatchErrorを使った場合、一度でもエラーが発生すると、Observable全体が終了してしまうのです。

あわせて読みたい

今回はswitchMapを例にして説明しましたが、mergeMap, concatMap, exhaustMapでも同様の問題が生じます。

switchMap, mergeMap, concatMap, exhaustMapの違いについては下記の記事で詳しく説明しています。興味のある方は下記のリンクからぜひチェックをしてみてください。

switchMapの中でcatchErrorを使った場合(ストリームが継続)

catchErrorswitchMapが生成する内部のObservableに対して直接適用する必要があります。これによりswitchMapの内部のストリームでエラーをキャッチして処理を行い、外側のストリームsource$が終了しないようにします。

import { of, throwError, timer } from 'rxjs';
import { catchError, switchMap } from 'rxjs/operators';

// 1秒毎に値を流す外側のストリーム
const source$ = timer(0, 1000);

// 値に応じてObservableを返す関数
function handleValue(value) {
  if (value === 2) {
    return throwError(() => new Error('switchMapの中でエラーが発生しました!!'));
  }
  return of(value);
}

const example$ = source$.pipe(
  switchMap((value) => {
    return handleValue(value).pipe(
      catchError((error) => {
        console.log('catchErrorで補足:', error.message);
        return of('エラーを復旧します');
      })
    );
  })
);

// 購読して結果を表示
example$.subscribe({
  next: (value) => console.log('Next:', value),
  error: (error) => console.log('Error:', error),
  complete: () => console.log('Complete'),
});

// 実行結果
// Next: 0
// Next: 1
// catchErrorで補足: switchMapの中でエラーが発生しました!!
// Next: エラーを復旧します
// Next: 3
// Next: 4
// ...

上記のサンプルコードでは、switchMapが生成する内部のObservable(handleValue(value))に対して直接catchErrorを適用しています。そのため、handleValue(value)throwErrorによって例外を発生させたとしても、そのエラーはswitchMapの中でキャッチされ、代わりに例外ではない値(ここでは'エラーを復旧します')を流すことができます。その結果、元のストリーム(source$)はエラーの影響を受けずに、1秒ごとに次の値を処理し続けることができます。

本記事のまとめ

この記事ではcatchErrorの正しい配置場所について、以下の内容を説明しました。

  • catchErrorswitchMapの外側に配置すると、内部のObservableでエラーが発生した時にストリーム全体が停止してしまう
  • ストリームを継続させたい場合は、switchMap内部のObservableに対して直接catchErrorを適用する必要がある。
  • 正しく配置することで、エラー発生時も"source$"からの値を受け取り続けることができる。
  • mergeMapconcatMapexhaustMapでも同様の問題が起きるため、catchErrorの位置には常に注意が必要。
  • ストリームの継続性を保つには、「どこでcatchErrorしているか」を意識するのが重要。

お読み頂きありがとうございました。