RxJSのswitchMap
, mergeMap
, concatMap
, exhaustMap
とcatchError
を組み合わせて使う際には、少し注意が必要です。
特にcatchError
の配置場所を間違えた場合、エラーが1回発生しただけで、ストリーム(stream)が停止して、後続の処理が一切動かなくなるといった問題に直面することがあります。
この記事では、switchMap
を例にして「ストリームが停止しないcatchError
の正しい配置場所」を、サンプルコードを用いてわかりやすく解説します。ご参考になれば幸いです。
switchMapの外でcatchErrorを使った場合(ストリームが止まる)
switchMap
の外でcatchError
を使った場合、switchMap
の内部でエラーが発生すると、ストリーム全体が終了(停止)してしまいます。サンプルコードを以下に示します。
import { of, throwError, timer } from 'rxjs';
import { catchError, switchMap } from 'rxjs/operators';
// 1秒毎に値を流す外側のストリーム
const source$ = timer(0, 1000);
// 値に応じてObservableを返す関数
function handleValue(value) {
if (value === 2) {
return throwError(() => new Error('switchMapの中でエラーが発生しました!!'));
}
return of(value);
}
const example$ = source$.pipe(
switchMap((value) => {
return handleValue(value);
}),
// switchMapの外でcatchErrorを使っている
catchError((error) => {
console.log('catchErrorで補足:', error.message);
return of('エラーを復旧します');
})
);
// 購読して結果を表示
example$.subscribe({
next: (value) => console.log('Next:', value),
error: (error) => console.log('Error:', error),
complete: () => console.log('Complete'),
});
// 実行結果
// Next: 0
// Next: 1
// catchErrorで補足: switchMapの中でエラーが発生しました!!
// Next: エラーを復旧します
// Complete
上記のサンプルコードでは、switchMap
が生成する内部のObservable(handleValue(value)
)がthrowError
によって例外を発生させます。するとその例外はswitchMap
の外に流れ、パイプ全体のcatchError
によって捕捉されます。
外側のcatchError
はパイプの全体を対象にしているため、エラーを捕捉した時点で元のストリーム(source$
)が終了(停止)してしまいます。そのため、それ以降のsource$
の値は流れてきません。実行結果を見ると、3
が流れずに終わっていますね。
つまり、switchMap
の外でcatchError
を使った場合、一度でもエラーが発生すると、Observable全体が終了してしまうのです。
あわせて読みたい
今回はswitchMap
を例にして説明しましたが、mergeMap
, concatMap
, exhaustMap
でも同様の問題が生じます。
続きを見るswitchMap
, mergeMap
, concatMap
, exhaustMap
の違いについては下記の記事で詳しく説明しています。興味のある方は下記のリンクからぜひチェックをしてみてください。 mergeMap、concatMap、switchMap、exhaustMapの違い【RxJS】
switchMapの中でcatchErrorを使った場合(ストリームが継続)
catchError
はswitchMap
が生成する内部のObservableに対して直接適用する必要があります。これによりswitchMap
の内部のストリームでエラーをキャッチして処理を行い、外側のストリームsource$
が終了しないようにします。
import { of, throwError, timer } from 'rxjs';
import { catchError, switchMap } from 'rxjs/operators';
// 1秒毎に値を流す外側のストリーム
const source$ = timer(0, 1000);
// 値に応じてObservableを返す関数
function handleValue(value) {
if (value === 2) {
return throwError(() => new Error('switchMapの中でエラーが発生しました!!'));
}
return of(value);
}
const example$ = source$.pipe(
switchMap((value) => {
return handleValue(value).pipe(
catchError((error) => {
console.log('catchErrorで補足:', error.message);
return of('エラーを復旧します');
})
);
})
);
// 購読して結果を表示
example$.subscribe({
next: (value) => console.log('Next:', value),
error: (error) => console.log('Error:', error),
complete: () => console.log('Complete'),
});
// 実行結果
// Next: 0
// Next: 1
// catchErrorで補足: switchMapの中でエラーが発生しました!!
// Next: エラーを復旧します
// Next: 3
// Next: 4
// ...
上記のサンプルコードでは、switchMap
が生成する内部のObservable(handleValue(value)
)に対して直接catchError
を適用しています。そのため、handleValue(value)
がthrowError
によって例外を発生させたとしても、そのエラーはswitchMap
の中でキャッチされ、代わりに例外ではない値(ここでは'エラーを復旧します'
)を流すことができます。その結果、元のストリーム(source$
)はエラーの影響を受けずに、1秒ごとに次の値を処理し続けることができます。
本記事のまとめ
この記事ではcatchError
の正しい配置場所について、以下の内容を説明しました。
catchError
をswitchMapの外側に配置すると、内部のObservableでエラーが発生した時にストリーム全体が停止してしまう。- ストリームを継続させたい場合は、
switchMap
内部のObservableに対して直接catchError
を適用する必要がある。 - 正しく配置することで、エラー発生時も
"source$"
からの値を受け取り続けることができる。 mergeMap
、concatMap
、exhaustMap
でも同様の問題が起きるため、catchError
の位置には常に注意が必要。- ストリームの継続性を保つには、「どこで
catchError
しているか」を意識するのが重要。
お読み頂きありがとうございました。