【RxJSとは】Observable, Observer, Operators, subscribeの使い方!

この記事では『RxJS』について、

  • RxJSとは
  • Observable, Observer, Operators, subscribeとは
  • Observable, Observer, Operators, subscribeの使い方
    • Observableの作成方法
    • Observableを返す関数の作成方法
    • Observerのコールバックメソッド(next, error, complete)について
    • Operators(オペレーター)でデータを操作する方法
    • unsubscribeでObservableのsubscribe(購読)を解除する方法

などを分かりやすく説明するように心掛けています。ご参考になれば幸いです。

RxJSとは

RxJSとは

RxJSは、Reactive Extensions for JavaScriptの略で、非同期のデータストリーム(連続的なデータの流れ)を扱うためのJavaScriptライブラリです。JavaScriptでリアクティブプログラミングを実現するために使用されます。

Observableと呼ばれる「データのストリーム(連続的なデータの流れ)」をObserverが処理することで、非同期処理やイベント処理をシンプルかつ簡潔にコーディングすることができます(ObservableObserverについては後ほど詳しく説明します)。

補足

  • JavaScriptのフレームワークである「Angular」では、開発に「RxJS」がスタンダードに使用されています。
  • 「RxJS」はJavaScript用のライブラリですが、JavaScript以外のプログラミング言語のバージョンもあります。例えば、C#向けのライブラリ「Rx.NET」やJava向けのライブラリ「Rx.Java」などがあります。
  • Reactive Extensions(Rx)の始まりはMicrosoftが開発したC#向けのライブラリ「Rx.NET」です。

RxTS(Reactive Extensions for TypeScript)もあるの?

「RxTS」というライブラリはないですが、「RxJS」はTypeScriptとの互換性があります。そのため、TypeScriptを使用してRxJSのコードを書くことが可能です。

Observable, Observer, Operators, subscribeとは

Observable, Observer, Operators, subscribeとは

上図にObservable, Observer, Operators, subscribeを説明するための図を示しています。

  • Observable
    • Observableは「データのストリーム(連続的なデータの流れ)」を表しており、1つ以上のデータが流れるオブジェクトです。
  • Observer
    • ObserverはObservableから流れてくるデータをsubscribe(購読)する関数またはオブジェクトです。
    • Observableから流れてくるデータをObserverがsubscribe(購読)することで、Observableからデータが流れ初めます。
    • 後ほど詳しく説明しますが、subscribeメソッドの引数がObserverです。
  • Operators(オペレーター)
    • Observableから流れてくるデータをObserverがsubscribe(購読)する際にOperators(オペレーター)による中間操作を行うこともできます。
    • オペレーターはObservableに対して操作を行う関数です。Observableからデータを取得し、そのデータを操作して新しいObservableを生成します。新しいObservableは元のObservableからのデータを操作するだけで、元のObservable自体は変更されません。
    • オペレーターを用いる場合、Observableから流れてくるデータをオペレーターが操作し、その結果をObserverがsubscribe(購読)します。
    • オペレーターは省略することも、複数組み合わせて使用することも可能です。
    • 後ほど詳しく説明しますが、Observableのpipeメソッドの引数に渡す様々な関数がオペレーターです。

例えば、Observableから「1,2,3,4,5」のデータが流れてくる場合、Operators(オペレーター)による中間操作で2の倍数のみをフィルタリングすると、Observersubscribe(購読)するのは「2,4」になります。

この後に説明するプログラム例などを見ると、Observableの仕組みが理解しやすくなると思います。

Observable, Observer, subscribeの使い方

import { Observable, of } from 'rxjs';

const numberObservable$: Observable<number> = of(1, 2, 3);

numberObservable$.subscribe((value: number) => {
    console.log('Observer got a next value: ' + value);
});
//ログ
//Observer got a next value: 1
//Observer got a next value: 2
//Observer got a next value: 3

上記は、Observable, Observer, subscribeの基本的な使い方を示したプログラム例です。基本的な使い方を説明するために、上記のプログラムではOperators(オペレーター)を省略しています(この記事の後半でオペレーターの使い方を説明してます)。

上記のプログラムについて、簡単に説明すると、of(1, 2, 3)はof関数であり、「1,2,3」の3つのデータを順番に流すObservableを生成しており、このObservableを「numberObservable$」という定数に代入しています。

Observable(numberObservable$)に対して、Observer(subscribeメソッドの引数)が一度subscribe(購読)することで、初めてObservableからデータが流れます。そして、ObserverはObservableから流れてくるデータを順番に取得しています。上記のプログラムの場合、「1,2,3」が取得される度に「console.log関数」を使ってその値をコンソールに出力しています。

そのため、上記のプログラムの実行結果は以下のようになります。

//Observer got a next value: 1
//Observer got a next value: 2
//Observer got a next value: 3

イメージ的には、Observerがsubscribe(購読)すると、Observableから「1」というデータが流れてきて、そのデータをObserverが取得し、「console.log関数」を使って「Observer got a next value: 1」をコンソールに出力します。次にObservableから「2」が流れてきて、そのデータをObserverが取得し、「console.log関数」を使って「Observer got a next value: 2」をコンソールに出力します。この一連の流れがデータ全てに対して繰り返されます。

ではこれから、「Observable, Observer, subscribeの基本的な使い方」を以下の手順で説明します。

Observable, Observer, subscribeの基本的な使い方

  • 「Observable」と「of」をrxjsからインポートする
  • Observableを作成する
  • Observableをsubscribe(購読)する

step
1
「Observable」と「of」をrxjsからインポートする

import { Observable, of } from 'rxjs';

まず、「Observable」と「of関数」を用いるために、「rxjs」から「Observable」と「of」をインポートします。

step
2
Observableを作成する

const numberObservable$: Observable<number> = of(1, 2, 3);

次に、Observableを作成します。

上記のプログラムにおいて、of(1, 2, 3)はof関数であり、「1,2,3」の3つのデータが順番に流れるObservableを生成しています。このObservableを「numberObservable$」という定数に代入しています。

numberObservable$の末尾にあるドル記号($)について

numberObservable$の末尾にあるドル記号($)は、変数や定数がObservableであることを示すためのRxJSの一般的な慣習です。文法上の特別な意味はありません。

補足

上記のプログラムでは「1,2,3」の3つの数値が順番に流れるObservable(numberObservable$)を生成していました。ここで、「first,second,third」という3つの文字列が順番に流れるObservable(stringObservable$)を生成するためには、以下のようにデータ型をstring型に変更して、記述します。

const stringObservable$: Observable<string> = of('first', 'second', 'third');

また、データ型をany型にすると、文字列も数値も流すことができるようになります。例えば、「first,2,third」という3つのデータを順番が順番に流れるObservable(anyObservable$)を生成するためには、以下のように記述します。

const anyObservable$: Observable<any> = of('first', 2, 'third');

ユニオン型を用いて、「any」を「string | number」のように変更してもOKです。

const anyObservable$: Observable<string | number> = of('first', 2, 'third');

step
3
Observableをsubscribe(購読)する

numberObservable$.subscribe((value: number) => {
    console.log('Observer got a next value: ' + value);
});

Observable(numberObservable$)をsubscribe(購読)することで、Observableから初めてデータが流れます。

subscribeメソッドの引数には、「関数(メソッド)」または「オブジェクト」を渡すことができます。これらの引数はObserverと呼ばれ、Observableから流れてくるデータを取得しています。

上記のプログラムでは、subscribeメソッドの引数に「関数(メソッド)」を渡しています。

なお、以下のプログラムのように、関数宣言してその関数をsubscribeメソッドに渡しても、関数式を用いて関数を変数に代入してからその変数をsubscribeメソッドに渡しても同様の動作をします。

//関数宣言の場合
function observer(value: number): void {
    console.log('Observer got a next value: ' + value);
}
numberObservable$.subscribe(observer);

//関数式の場合
const observer = function (value: number): void {
    console.log('Observer got a next value: ' + value);
}
numberObservable$.subscribe(observer);

Observableを作成するのではなく、Observableを返す関数を作成した場合

import { Observable, of } from 'rxjs';

//Observableを作成する場合
//const numberObservable$: Observable<number> = of(1, 2, 3);

//Observableを返す関数を作成する場合
function createNumberObservable(): Observable<number> {
    return of(1, 2, 3);
}

//「numberObservable$」を「createNumberObservable()」に変更
createNumberObservable().subscribe((value: number) => {
    console.log('Observer got a next value: ' + value);
});
//ログ
//Observer got a next value: 1
//Observer got a next value: 2
//Observer got a next value: 3

上記のプログラムでは、Observableを直接作成するのではなく、Observableを返す関数createNumberObservableを作成しています。

関数createNumberObservableを実行するとObservableが生成され、そのObservableをsubscribe(購読)しています。

Observerのコールバックメソッド(next, error, complete)について

Observerはsubscribeメソッドの引数であり、「関数(メソッド)」または「オブジェクト」です。

このObserverは実際には、next, error, completeという3つの「コールバックメソッド(コールバッグ関数)」で構成されています。そして、これらのコールバッグメソッドはObservableがデータを流したり、エラーを通知したり、その完了を通知したりする際に呼び出されます。

各コールバッグメソッドの説明を以下に示します。

  • next
    • Observableがデータを流した時に実行されるメソッドです。
  • error
    • Observableでエラーが発生した時に実行されるメソッドです。
  • complete
    • Observableが完了した時に呼び出されるメソッドです。

これらのコールバッグメソッドは省略可能であり、必要に応じて、Observerを構成します。例えば、next, error, completeの3つのコールバッグメソッドで構成されているObserverでも良いですし、next, completeの2つのコールバッグメソッドで構成されているObserverでも良いです。この場合、Observerはオブジェクトで構成することが推奨されています。

なお、nextのコールバッグメソッドのみで構成されているObserverの場合、Observerをオブジェクトではなく、直接関数で構成することも可能です。

ではこれから、

  • next, error, completeの3つのコールバッグメソッドで構成されているObserver
  • next, completeの2つのコールバッグメソッドで構成されているObserver
  • nextのコールバッグメソッドのみで構成されているObserver

について順番にプログラム例を見てみましょう。

next, error, completeの3つのコールバッグメソッドで構成されているObserver

import { Observable, of } from 'rxjs';

const numberObservable$: Observable<number> = of(1, 2, 3);

numberObservable$.subscribe({
    next: (value: number) => console.log('Observer got a next value: ' + value),
    error: (err: any) => console.error('Observer got an error: ' + err),
    complete: () => console.log('Observer got a complete notification'),
});
//ログ
//Observer got a next value: 1
//Observer got a next value: 2
//Observer got a next value: 3
//Observer got a complete notification

上記はnext, error, completeの3つのコールバッグメソッドで構成されているObserver(subscribeメソッドの引数)のプログラム例です。

2つ以上のコールバッグメソッドでObserverを構成する場合、上記に示すようにオブジェクトで構成することが推奨されています。また、オブジェクトの各プロパティのキーには「next, error, complete」を記述します。

なお、上記のプログラムでは、オブジェクトの各プロパティのvalue(値)には、アロー関数を用いています。そのため、以下に示すように、関数式を用いることも可能です。

関数式を用いた場合

numberObservable$.subscribe({
    next: function (value: number) {
        console.log('Observer got a next value: ' + value);
    },
    error: function (err: any) {
        console.error('Observer got an error: ' + err);
    },
    complete: function () {
        console.log('Observer got a complete notification');
    },
});

また、ES2015(ES6)からは、オブジェクトリテラルの省略記法を用いることもできます。そのため、省略記法を用いて以下のように記述してもOKです。

省略記法を用いた場合

numberObservable$.subscribe({
    next(value: number) {
        console.log('Observer got a next value: ' + value);
    },
    error(err: any) {
        console.error('Observer got an error: ' + err);
    },
    complete() {
        console.log('Observer got a complete notification');
    },

推奨されていない記述方法

推奨されていませんが、以下のプログラムに示すようにObserver(subscribeメソッドの引数)に直接メソッドを渡すことも可能です。直接メソッドを渡す場合、第1引数がnext、第2引数がerror、第3引数がcompleteとして扱われます。

numberObservable$.subscribe(
    (value: number) => console.log('Observer got a next value: ' + value),
    (err: any) => console.error('Observer got an error: ' + err),
    () => console.log('Observer got a complete notification'),
);

以下のプログラムのように、必要ないコールバックメソッド(以下の場合は、error)をundefinedで省略することができます。

numberObservable$.subscribe(
    (value: number) => console.log('Observer got a next value: ' + value),
    undefined,
    () => console.log('Observer got a complete notification'),
);

これらの方法は、Observerオブジェクトを作成しない代わりに、各コールバックメソッドを直接subscribeメソッドの引数として渡しています。ただし、この方法は「RxJS 6.4」から非推奨になっています。また、私のエディタ上では「Subscription' は非推奨です。ts(6387)」とメッセージが表示されました。

next, completeの2つのコールバッグメソッドで構成されているObserver

import { Observable, of } from 'rxjs';

const numberObservable$: Observable<number> = of(1, 2, 3);

numberObservable$.subscribe({
    next: (value: number) => console.log('Observer got a next value: ' + value),
    complete: () => console.log('Observer got a complete notification'),
});
//ログ
//Observer got a next value: 1
//Observer got a next value: 2
//Observer got a next value: 3
//Observer got a complete notification

上記はnext, completeの2つのコールバッグメソッドで構成されているObserver(subscribeメソッドの引数)のプログラム例です。上記のプログラム例ではerrorのコールバッグメソッドを省略しています。

このように、必要ないコールバックメソッドを省略することができます。

nextのコールバッグメソッドのみで構成されているObserver

import { Observable, of } from 'rxjs';

const numberObservable$: Observable<number> = of(1, 2, 3);

numberObservable$.subscribe({
    next: (value: number) => console.log('Observer got a next value: ' + value),
});
//ログ
//Observer got a next value: 1
//Observer got a next value: 2
//Observer got a next value: 3

上記は、nextのみのコールバッグメソッドで構成されているObserver(subscribeメソッドの引数)のプログラム例です。上記のプログラム例ではerrorとcompleteのコールバッグメソッドを省略しています。

nextのコールバッグメソッドのみで構成されているObserverの場合、以下のプログラムのように直接メソッドを渡す方法は非推奨ではありません。そのため、書籍やネットでよく見かけます。

numberObservable$.subscribe(
    (value: number) => console.log('Observer got a next value: ' + value),
);

補足

書籍やネットで以下のプログラムを見かけることがあるかもしれませんが、以下の2つのプログラムは同じ動作をします。

//アロー関数がObserver(subscribeメソッドの引数)の場合
numberObservable$.subscribe(
    (value: number) => console.log(value),
);

//console.logの関数自体がObserver(subscribeメソッドの引数)の場合
numberObservable$.subscribe(console.log);

console.log関数自体をsubscribeメソッドの引数に渡すことができるのは、JavaScriptの関数がファーストクラスオブジェクトであり、他の変数やオブジェクトと同様に引数として渡すことができるためです。この場合、subscribeメソッドはObservableから流れてくる各データをconsole.log関数に直接渡しています。そして、console.log関数は引数を受け取り、それをコンソールに出力しています。

Observable, Observer, subscribeの使い方
(subscribeメソッドの引数に宣言したObserverを渡す場合)

import { Observable, of } from 'rxjs';

const numberObservable$: Observable<number> = of(1, 2, 3);

//オブジェクトの型エイリアス(Observer型)を定義
type Observer = {
    next: (value: number) => void;
    error: (err: any) => void;
    complete: () => void;
};

//Observer型のmyObserverを宣言する
const myObserver: Observer = {
    next: (value: number) => console.log('Observer got a next value: ' + value),
    error: (err: any) => console.error('Observer got an error: ' + err),
    complete: () => console.log('Observer got a complete notification'),
}

numberObservable$.subscribe(myObserver);
//ログ
//Observer got a next value: 1
//Observer got a next value: 2
//Observer got a next value: 3
//Observer got a complete notification

上記のプログラムのようにsubscribeメソッドの引数に宣言したObserverを渡すことも可能です。

上記のプログラムにおいて、以下の箇所では、Observerという型エイリアスを定義しており、next, error, completeというメソッドを持つオブジェクトの型を定義しています。

type Observer = {
    next: (value: number) => void;
    error: (err: any) => void;
    complete: () => void;
};

以下の箇所では、myObserverというObserver型の変数を宣言し、next, error, completeの各メソッドが実際に何を行うかを定義しています。

const myObserver: Observer = {
    next: (value: number) => console.log('Observer got a next value: ' + value),
    error: (err: any) => console.error('Observer got an error: ' + err),
    complete: () => console.log('Observer got a complete notification'),
}

以下の箇所では、Observable(numberObservable$)のsubscribeメソッドの引数に、先ほど宣言したmyObserverを渡しています。これにより、Observableから流れてくる各データに対してmyObserverのnextメソッドが呼び出されます。Observableでエラーが発生した場合はerrorメソッドが呼び出されます。Observableが完了した場合はcompleteメソッドが呼び出されます。

numberObservable$.subscribe(myObserver);

Operators(オペレーター)の使い方

import { Observable, of } from 'rxjs';
import { map } from 'rxjs/operators'

const numberObservable$: Observable<number> = of(1, 2, 3);

numberObservable$
    .pipe(map((value: number) => value * 10))
    .subscribe((value: number) => {
        console.log('Observer got a next value: ' + value);
    });
//ログ
//Observer got a next value: 10
//Observer got a next value: 20
//Observer got a next value: 30

上記にObservableから流れてくるデータに対して、subscribe(購読)する際にOperators(オペレーター)による中間操作を行ったプログラム例を示しています。

Observableのpipeメソッドの引数に渡す様々な関数をOperators(オペレーター)といいます。上記のプログラムの場合ではmap関数がオペレーターです。

オペレーターはObservableからデータを取得し、そのデータを操作して、新しいObservableを生成します。この新しいObservableをObserver(subscribeメソッドの引数)がsubscribe(購読)しています。

そのため、上記のプログラムの場合、オペレーター(map関数)はObservable(numberObservable$)から流れてきたデータの値を10倍にして、新たなObservableを生成しています。この新たなObservableをsubscribe(購読)しているので、ログは以下のようになっています。

//Observer got a next value: 10
//Observer got a next value: 20
//Observer got a next value: 30

なお、オペレーターを使用する場合、以下に示すように「rxjs/operators」から使用する関数をimportする必要があります。

import { Observable, of } from 'rxjs';

オペレーターを組み合わせて値を操作する場合

import { Observable, of } from 'rxjs';
import { map, filter } from 'rxjs/operators'

const numberObservable$: Observable<number> = of(1, 2, 3, 4, 5);

numberObservable$
    .pipe(
        filter((value: number) => value % 2 == 0),
        map((value: number) => value * 10),
    )
    .subscribe((value: number) => {
        console.log('Observer got a next value: ' + value);
    });
//ログ
//Observer got a next value: 20
//Observer got a next value: 40

Operators(オペレーター)はメソッドチェーンのように複数組み合わせることもできます。上記のプログラムはオペレーターであるmap関数とfilter関数を組み合わせたプログラム例です。

上記のプログラムでは、pipeメソッドの引数にオペレーターであるfilter関数とmap関数を渡しています。

pipeメソッドに複数のオペレーターを渡した場合、第1引数のオペレーターから順番に適用されます。

そのため、上記のプログラムの場合、第1引数のオペレーター(filter関数)がObservable(numberObservable$)から流れてきたデータの値に対して、2で割り切れる数のみをフィルタリングして、「新たなObservable1」を生成しています。

その後、第2引数のオペレーター(map関数)は「新たなObservable1」から流れてきたデータの値を10倍にして、「新たなObservable2」を生成しています。この「新たなObservable2」をsubscribe(購読)しているので、ログは以下のようになっています。

//Observer got a next value: 20
//Observer got a next value: 40

Observable, Observer, subscribeの使い方
(unsubscribeで停止させる方法)

import { Observable, interval } from 'rxjs';

const countObservable$: Observable<number> = interval(1000)

const mysubscription = countObservable$
    .subscribe((value: number) => {
        console.log('Observer got a next value: ' + value);
    });

setTimeout(() => {
    mysubscription.unsubscribe();
}, 10000)
//ログ
//Observer got a next value: 0
//Observer got a next value: 1
//Observer got a next value: 2
//Observer got a next value: 3
//Observer got a next value: 4
//Observer got a next value: 5
//Observer got a next value: 6
//Observer got a next value: 7
//Observer got a next value: 8

上記にunsubscribeを用いてObservableのsubscribe(購読)を解除したプログラム例を示しています。

subscribeメソッドの返り値はSubscriptionオブジェクトとなっています。このSubscriptionオブジェクトはObservableの購読を解除するためのunsubscribeメソッドを持っています。

上記のプログラムでは「interval(1000)」で1秒ごとに1増えるObservableを生成し、このObservableを「countObservable$」という定数に代入しています。

Observable(countObservable$)に対して、subscribe(購読)すると、1000msごとにログが表示されます。しかし、setTimeout関数内で10000ms後にSubscriptionオブジェクト(mysubscription)のunsubscribeメソッドを呼び出しているため、10000ms後にObservableのsubscribe(購読)が解除され、ログの表示が停止します。

本記事のまとめ

この記事では『RxJS』について、以下の内容を説明しました。

  • RxJSとは
  • Observable, Observer, Operators, subscribeとは
  • Observable, Observer, Operators, subscribeの使い方
    • Observableの作成方法
    • Observableを返す関数の作成方法
    • Observerのコールバックメソッド(next, error, complete)について
    • Operators(オペレーター)でデータを操作する方法
    • unsubscribeでObservableのsubscribe(購読)を解除する方法

お読み頂きありがとうございました。