Observableの作成方法!エラー処理や非同期処理も解説!

この記事ではRxJSの『Observable』について、

  • Observableを作成する方法
  • Observableを作成する際に「エラー処理」を行う方法
  • Observableを作成して「非同期処理」ができることを確認する方法

などを分かりやすく説明するように心掛けています。ご参考になれば幸いです。

そもそも「Observableって何?」という方は、「Observableの役割や意味など」を以下の記事で詳しく説明していますので、ご参考にしてください。

【RxJSとは】Observable, Observer, Operators, subscribeの使い方!
【RxJSとは】Observable, Observer, Operators, subscribeの使い方!

続きを見る

Observableを作成する方法

RxJSにはof関数やfrom関数のようにObservableを生成してくれる関数が用意されていますが、Observableのコンストラクタを使用すると、Observableを作成することができます。

まず、of関数を使ったプログラム例から説明します。

of関数を使ったプログラム例

import { Observable, of } from 'rxjs';

const numberObservable$: Observable<number> = of(1, 2, 3);

numberObservable$.subscribe((value: number) => {
    console.log('Observer got a next value: ' + value);
});
// ログ
// Observer got a next value: 1
// Observer got a next value: 2
// Observer got a next value: 3

上記は、Observableの使い方を示したプログラム例です。

of(1, 2, 3)はof関数であり、「1,2,3」の3つのデータを順番に流すObservableを生成しており、このObservableを「numberObservable$」という定数に代入しています。

Observable(numberObservable$)に対して、Observer(subscribeメソッドの引数)が一度subscribe(購読)することで、初めてObservableからデータが流れます。そして、ObserverはObservableから流れてくるデータを順番に取得しています。上記のプログラムの場合、「1,2,3」が取得される度に「console.log関数」を使ってその値をコンソールに出力しています。

そのため、上記のプログラムの実行結果は以下のようになります。

// Observer got a next value: 1
// Observer got a next value: 2
// Observer got a next value: 3

上記のプログラムをObservableのコンストラクタを使用して実装すると、以下のようになります。

Observableのコンストラクタを使ったプログラム例

次に、Observableのコンストラクタを使ったプログラム例を説明します。

import { Observable } from 'rxjs';  // of関数を使わないので、ofを削除

// Observableのコンストラクタを使った場合
const numberObservable$: Observable<number> = new Observable<number>((subscriber) => {
    subscriber.next(1);
    subscriber.next(2);
    subscriber.next(3);
    subscriber.complete();
});

numberObservable$.subscribe((value: number) => {
    console.log('Observer got a next value: ' + value);
});
// ログ
// Observer got a next value: 1
// Observer got a next value: 2
// Observer got a next value: 3

上記のプログラムのように、Observableのコンストラクタを使用して、Observableを生成することも可能です。

Observable内部でsubscriber.next(value)を呼び出すと、そのvalueのデータが流れます。また、subscriber.complete()を呼び出すとデータの流れが終了したことを通知します。つまり、上記のプログラムでは「1,2,3」の3つのデータを順番に流すObservableを生成しています。

そのため、以下のプログラムはどちらも同じになります。

// Observableのコンストラクタを使った場合
const numberObservable$: Observable<number> = new Observable<number>((subscriber) => {
    subscriber.next(1);
    subscriber.next(2);
    subscriber.next(3);
    subscriber.complete();
});

// of関数を使った場合
const numberObservable$: Observable<number> = of(1, 2, 3);

ポイント

Observableのコンストラクタは、そのObservableがどのように動作するか(つまり、どのようなデータがどのようなタイミングで流れるか)を定義するために使用されます。

Observableのコンストラクタには、「Subscriberオブジェクトを引数とする関数」を渡します(上記のプログラムでは「subscriber」という名前のSubscriberオブジェクトを引数とした関数を渡しています)。

この関数内部で「subscriber.next()」を呼び出すことでデータが流れ、「subscriber.error()」を呼び出すことでエラーが流れ、そして「subscriber.complete()」を呼び出すことでデータの流れが終了したことを通知します(subscriber.error()については後ほど説明します)。

したがって、上記のコードは以下のように解釈することができます。

  • new Observable<number>で、number型のデータが流れる新しいObservableを作成します。
  • コンストラクタの引数として渡された関数により、このObservableがどのようにデータが流れるかを定義します。

なお、一般的なクラスのコンストラクタは、オブジェクトの作成時に実行される処理を実装することで、そのクラスの新しく生成されるインスタンスの初期状態を設定するために使用されます。

例えば、以下のプログラムを見てみましょう。

class MyClass {
    public value: number;

    constructor(value: number) {
        this.value = value;
    }
}

const obj = new MyClass(5);
console.log(obj.value);  // ログ: 5

上記のプログラムにおいて、「MyClassクラス」のコンストラクタは、新しい「MyClassクラスのインスタンス(オブジェクト)」のvalueプロパティを初期化します。したがって、「新しいMyClassのインスタンス(オブジェクト)obj」が作成されるとき(const obj = new MyClass(5);の箇所)、「MyClassのインスタンス(オブジェクト)obj」のvalueプロパティはコンストラクタに渡された値(ここでは5)に設定されます。

このように、RxJSのObservableのコンストラクタは、一般的なクラスのコンストラクタとは少し異なるので注意しましょう。

Observableを作成する際に「エラー処理」を行う方法

Observableはデータ流れるだけなく、エラーが流れることもあります。エラーを流すためには、subscriber.error()を用います。

以下にsubscriber.error()を用いたプログラム例を示しています。

import { Observable } from 'rxjs';  //of関数を使わないので、ofを削除

// Observableのコンストラクタを使用してObservableを作成
const numberObservable$: Observable<number> = new Observable<number>((subscriber) => {
    try {
        // 流れるデータ
        subscriber.next(1);
        subscriber.next(2);
        subscriber.next(3);
        // データの流れが完了したらcompleteを呼び出す
        subscriber.complete();
    } catch (err) {
        // エラーが流れる場合
        subscriber.error(err);
    }
});

numberObservable$.subscribe({
    next: (value: number) => console.log('Observer got a next value: ' + value),
    error: (err: any) => console.error('Observer got an error: ' + err),
    complete: () => console.log('Observer got a complete notification'),
});

// ログ
// Observer got a next value: 1
// Observer got a next value: 2
// Observer got a next value: 3
// Observer got a complete notification

上記のプログラムでは、try-catchブロックを使用しており、「next()メソッド」の呼び出しによって、引き起こされるエラーをキャッチしています。

このように、エラー処理には、try-catchブロックを使用して、データの送出中(「next()メソッド」の呼び出し中)に発生する可能性のあるエラーをキャッチすることが一般的です。

エラーが発生した場合、「error()メソッド」が呼び出され、そのObservableはすぐに終了します。そのため、エラー発生後の「next()メソッド」と「complete()メソッド」の呼び出しは無視されます。

エラーが発生しなかった場合、「complete()メソッド」が呼び出され、正常に完了します。

なお、Observableのコンストラクタを以下のように作成した場合、常にエラーが流れてしまいます。「error()メソッド」を呼び出した後、「complete()メソッド」を呼び出しても、その「complete()メソッド」の呼び出しは無視されますので注意してください。これは、Observableが一度エラーを発出した時点で終了するというRxJSの仕様によるものです。

const numberObservable$ = new Observable<number>((subscriber) => {
    subscriber.next(1);
    subscriber.next(2);
    subscriber.next(3);
    subscriber.error(new Error('Something went wrong'));
    subscriber.complete();
});

Observableを作成して「非同期処理」ができることを確認する方法

Observableのメリットの一つは非同期処理ができることです。

自分自身でObservableを作成することにより、非同期処理がどのように行われているかを直接確認することができます。

以下のプログラムは、Observableを使った非同期処理の一例です。「ここ重要!!」と書いてあることろに注目してください。

import { Observable } from 'rxjs';

const numberObservable$: Observable<number> = new Observable<number>((subscriber) => {
    subscriber.next(1);
    subscriber.next(2);
    subscriber.next(3);
    setTimeout(() => {            // ここ重要!!
        subscriber.next(4);       // ここ重要!!
        subscriber.complete();    // ここ重要!!
    }, 1000);
});


console.log('before subscribe');
numberObservable$.subscribe({
    next: (value: number) => console.log('Observer got a next value: ' + value),
    error: (err: any) => console.error('Observer got an error: ' + err),
    complete: () => console.log('Observer got a complete notification'),
});
console.log('after subscribe');
// ログ:
// before subscribe
// Observer got a next value: 1
// Observer got a next value: 2
// Observer got a next value: 3
// after subscribe
// Observer got a next value: 4
// Observer got a complete notification

プログラムの実行結果(ログ)を見ると、「4」が出力される前に「after subscribe」が出力されていることが分かります。

setTimeout(関数,時間)では、時間(上記のプログラムの場合だと1000ms)が経過してから、関数の処理を実行しています。

非同期処理を行っているので「subscriber.next(4)」と「subscriber.complete()」の実行をする前に「console.log('after subscribe')」が実行されています。

非同期処理とは

非同期処理とは、一つの処理を実行中であっても他の処理を実行できる実行方式をいいます。

時間がかかる処理(ネットワークリクエストやタイマーなど)を行う際に重要となります。

非同期処理により、これらの時間がかかる処理がアプリケーション全体の処理をブロックすることを防ぐことができます。

本記事のまとめ

この記事ではRxJSの『Observable』について、以下の内容を説明しました。

  • Observableを作成する方法
  • Observableを作成する際に「エラー処理」を行う方法
  • Observableを作成して「非同期処理」ができることを確認する方法

お読み頂きありがとうございました。