RxJS-クイックガイド

この章では、RxJSの機能、長所、短所について説明します。ここでは、RxJSをいつ使用するかも学びます。

RxJSの完全な形式は Reactive Extension for Javascript。これは、オブザーバブルを使用して、非同期データ呼び出し、コールバック、およびイベントベースのプログラムを処理するリアクティブプログラミングを処理するJavaScriptライブラリです。RxJSは、他のJavascriptライブラリおよびフレームワークで使用できます。javascriptとtypescriptでもサポートされています。

RxJSとは何ですか?

RxJSの公式ウェブサイトによると、これは、監視可能なシーケンスを使用して非同期およびイベントベースのプログラムを作成するためのライブラリとして定義されています。1つのコアタイプ、Observable、サテライトタイプ(オブザーバー、スケジューラー、サブジェクト)、およびArray#extras(map、filter、reduce、everyなど)に触発された演算子を提供して、非同期イベントをコレクションとして処理できるようにします。

RxJSの機能

RxJSでは、次の概念が非同期タスクの処理を処理します-

観察可能

オブザーバブルは、オブザーバーを作成し、値が期待されるソースにアタッチする関数です。たとえば、クリック、dom要素からのマウスイベント、またはHttpリクエストなどです。

観察者

これは、next()、error()、およびcomplete()メソッドを持つオブジェクトであり、オブザーバブルとの相互作用がある場合に呼び出されます。つまり、ソースは、ボタンのクリックやHttpリクエストなどの例で相互作用します。

サブスクリプション

オブザーバブルが作成されたら、オブザーバブルを実行するためにサブスクライブする必要があります。実行をキャンセルするためにも使用できます。

演算子

演算子は、オブザーバブルを入力として取り込む純粋関数であり、出力もオブザーバブルです。

件名

サブジェクトは、マルチキャストできる、つまり多くのオブザーバーと話すことができるオブザーバブルです。イベントリスナーのあるボタンを考えてみましょう。ユーザーがボタンをクリックするたびに、addlistenerを使用してイベントにアタッチされた関数が呼び出され、同様の機能がサブジェクトにも適用されます。

スケジューラー

スケジューラーは、サブスクリプションを開始して通知する必要がある場合の実行を制御します。

RxJSをいつ使用するのですか?

プロジェクトが多くの非同期タスク処理で構成されている場合は、RxJSが適しています。デフォルトでは、Angularプロジェクトにロードされます。

RxJSを使用する利点

RxJSを使用する利点は次のとおりです。

  • RxJSは、他のJavascriptライブラリおよびフレームワークで使用できます。javascriptとtypescriptでもサポートされています。いくつかの例は、Angular、ReactJS、Vuejs、nodejsなどです。

  • RxJSは、非同期タスクの処理に関しては素晴らしいライブラリです。RxJSは、オブザーバブルを使用して、非同期データ呼び出し、コールバック、およびイベントベースのプログラムを処理するリアクティブプログラミングを処理します。

  • RxJSは、数学、変換、フィルタリング、ユーティリティ、条件付き、エラー処理、結合カテゴリの演算子の膨大なコレクションを提供し、リアクティブプログラミングで使用すると作業が楽になります。

RxJSを使用することのデメリット

以下は、RxJSを使用することの欠点です。

  • オブザーバブルを使用してコードをデバッグすることは少し難しいです。

  • Observablesの使用を開始すると、完全なコードがObservablesの下にラップされる可能性があります。

この章では、RxJSをインストールします。RxJSを使用するには、次の設定が必要です。

  • NodeJS
  • Npm
  • RxJSパッケージのインストール

NODEJSとNPMのインストール

npmを使用してRxJSをインストールするのは非常に簡単です。システムにnodejsとnpmをインストールする必要があります。NodeJSとnpmがシステムにインストールされているかどうかを確認するには、コマンドプロンプトで次のコマンドを実行してみてください。

E:\>node -v && npm -v
v10.15.1
6.4.1

バージョンを取得している場合は、nodejsとnpmがシステムにインストールされており、現在システムにバージョンが10と6であることを意味します。

何も出力されない場合は、nodejsをシステムにインストールしてください。nodejsをインストールするには、ホームページにアクセスしてくださいhttps://nodejs.org/en/download/ nodejsを使用して、OSに基づいてパッケージをインストールします。

nodejsのダウンロードページは次のようになります-

OSに基づいて、必要なパッケージをインストールします。nodejsがインストールされると、npmも一緒にインストールされます。npmがインストールされているかどうかを確認するには、ターミナルでnpm –vと入力します。npmのバージョンが表示されます。

RxJSパッケージのインストール

RxJSのインストールを開始するには、最初にというフォルダーを作成します。 rxjsproj/ ここでは、すべてのRxJSの例を練習します。

一度フォルダ rxjsproj/ 作成されたら、コマンドを実行します npm init、以下に示すプロジェクト設定用

E:\>mkdir rxjsproj
E:\>cd rxjsproj
E:\rxjsproj>npm init

Npm initコマンドは実行中にいくつかの質問をします。Enterキーを押して続行してください。npm initの実行が完了すると、作成されますpackage.json 以下に示すようにrxjsproj /内-

rxjsproj/
   package.json

これで、以下のコマンドを使用してrxjsをインストールできます-

npm install ---save-dev rxjs

E:\rxjsproj>npm install --save-dev rxjs
npm notice created a lockfile as package-lock.json. You should commit this file.

npm WARN [email protected] No description
npm WARN [email protected] No repository field.

+ [email protected]
added 2 packages from 7 contributors and audited 2 packages in 21.89s
found 0 vulnerabilities

RxJSのインストールは完了です。ここで、RxJSを使用して、フォルダーを作成してみましょう。src/ 内部 rxjsproj/

これで、次のようなフォルダ構造になります-

rxjsproj/
   node_modules/
   src/
   package.json

内部 src/ ファイルを作成する testrx.js、次のコードを記述します-

testrx.js

import { of } from 'rxjs;
import { map } from 'rxjs/operators';

map(x => x * x)(of(1, 2, 3)).subscribe((v) => console.log(`Output is: ${v}`));

コマンドプロンプトで上記のコードを実行する場合は、コマンド-を使用します。 node testrx.js、nodejsはインポートをどう処理するかわからないため、インポートのエラーが表示されます。

nodejsでインポートを機能させるには、以下に示すように、npmを使用してES6モジュールパッケージをインストールする必要があります。

E:\rxjsproj\src>npm install --save-dev esm
npm WARN [email protected] No description
npm WARN [email protected] No repository field.

+ [email protected]
added 1 package from 1 contributor and audited 3 packages in 9.32s
found 0 vulnerabilities

パッケージがインストールされると、実行できるようになります testrx.js 以下に示すファイル-

E:\rxjsproj\src>node -r esm testrx.js
Output is: 1
Output is: 4
Output is: 9

これで、RxJSがインストールされ、使用する準備ができていることを示す出力が表示されます。上記の方法は、コマンドラインでRxJSをテストするのに役立ちます。ブラウザでRxJSをテストする場合は、いくつかの追加パッケージが必要になります。

ブラウザでのRxJSのテスト

rxjsproj /フォルダー内に次のパッケージをインストールします-

npm install --save-dev babel-loader @babel/core @babel/preset-env webpack webpack-cli webpack-dev-server

E:\rxjsproj>npm install --save-dev babel-loader 
@babel/core @babel/preset-env webpack webpack-cli webpack-dev-server

npm WARN [email protected] No description
npm WARN [email protected] No repository field.
npm WARN optional SKIPPING OPTIONAL DEPENDENCY: [email protected]
(node_modules\fsevents):
npm WARN notsup SKIPPING OPTIONAL DEPENDENCY: Unsupported platform for fsevents@
1.2.9: wanted {"os":"darwin","arch":"any"} (current: {"os":"win32","arch":"x64"})

+ [email protected]
+ [email protected]
+ @babel/[email protected]
+ @babel/[email protected]
+ [email protected]
+ [email protected]
added 675 packages from 373 contributors and audited 10225 packages in 255.567s
found 0 vulnerabilities

サーバーを起動してHtmlファイルを実行するには、webpack-serverを使用します。package.jsonのコマンド「publish」は、webpackを使用してすべてのjsファイルを起動およびパックするのに役立ちます。使用する最後のjsファイルであるパックされたjsファイルは、パス/ devフォルダーに保存されます。

webpackを使用するには、実行する必要があります npm run publish 以下に示すように、コマンドとコマンドがpackage.jsonに追加されます-

Package.json

{
   "name": "rxjsproj",
   "version": "1.0.0",
   "description": "",
   "main": "index.js",
   "scripts": {
      "publish":"webpack && webpack-dev-server --output-public=/dev/",
      "test": "echo \"Error: no test specified\" && exit 1"
   },
   "author": "",
   "license": "ISC",
   "devDependencies": {
      "@babel/core": "^7.6.0",
      "@babel/preset-env": "^7.6.0",
      "babel-loader": "^8.0.6",
      "esm": "^3.2.25",
      "rxjs": "^6.5.3",
      "webpack": "^4.39.3",
      "webpack-cli": "^3.3.8",
      "webpack-dev-server": "^3.8.0"
   }
}

webpackを使用するには、最初にwebpack.config.jsというファイルを作成する必要があります。このファイルには、webpackを機能させるための構成の詳細が含まれています。

ファイルの詳細は次のとおりです-

var path = require('path');

module.exports = {
   entry: {
      app: './src/testrx.js'
   },
   output: {
      path: path.resolve(__dirname, 'dev'),
      filename: 'main_bundle.js'
   },
   mode:'development',
   module: {
      rules: [
         {
            test:/\.(js)$/,
            include: path.resolve(__dirname, 'src'),
            loader: 'babel-loader',
            query: {
               presets: ['@babel/preset-env']
            }
         }
      ]
   }
};

ファイルの構造は上記のとおりです。現在のパスの詳細を示すパスから始まります。

var path = require('path'); //gives the current path

次は、プロパティentry、output、およびmoduleを持つmodule.exportsオブジェクトです。エントリーが出発点です。ここでは、コンパイルする開始jsファイルを指定する必要があります。

entry: {
   app: './src/testrx.js'
},

path.resolve(_dirname、 'src / testrx.js')-ディレクトリ内のsrcフォルダーと、そのフォルダー内のtestrx.jsを検索します。

出力

output: {
   path: path.resolve(__dirname, 'dev'),
   filename: 'main_bundle.js'
},

出力は、パスとファイル名の詳細を持つオブジェクトです。パスは、コンパイルされたファイルが保存されるフォルダーを保持し、ファイル名は、.htmlファイルで使用される最終ファイルの名前を示します。

モジュール

module: {
   rules: [
      {
         test:/\.(js)$/,
         include: path.resolve(__dirname, 'src'),
         loader: 'babel-loader',
         query: {
            presets: ['@babel/preset-env']
         }
      }
   ]
}

Moduleは、テスト、インクルード、ローダー、クエリなどのプロパティを持つルールの詳細を持つオブジェクトです。テストでは、.jsと.jsxで終わるすべてのjsファイルの詳細が保持されます。指定されたエントリポイントの最後で.jsを検索するパターンがあります。

Include ファイルの表示に使用するフォルダーを指示します。

The loader コードのコンパイルにbabel-loaderを使用します。

The query値が「@babel / preset-env」の配列であるプロパティプリセットがあります。必要なES環境に従ってコードをトランスパイルします。

最終的なフォルダ構造は次のようになります-

rxjsproj/
   node_modules/
   src/
      testrx.js
   index.html
   package.json
   webpack.config.js

コマンドを実行

npm run publishmain_bundle.jsファイルを含むdev /フォルダーを作成します。サーバーが起動し、以下に示すようにブラウザでindex.htmlをテストできます。

ブラウザを開き、URLを押します- http://localhost:8080/

出力はコンソールに表示されます。

このチュートリアルでは、RxJSバージョン6を使用しています。RxJSは、リアクティブプログラミングを処理するために一般的に使用され、Angular、ReactJSでより頻繁に使用されます。Angular6はデフォルトでrxjs6をロードします。

RxJSバージョン5は、バージョン6とは異なる方法で処理されました。RxJS5を6に更新すると、コードが壊れます。この章では、バージョン更新の処理方法の違いについて説明します。

RxJSを6に更新していて、コードを変更したくない場合は、それも可能であり、次のパッケージをインストールする必要があります。

npm install --save-dev rxjs-compact

このパッケージは下位互換性を提供し、古いコードはRxJSバージョン6で正常に機能します。RxJS6で正常に機能するコード変更を行う場合は、以下の変更を行う必要があります。

オペレーター、オブザーバブル、サブジェクトのパッケージが再構築されたため、インポートに大きな変更が加えられました。以下で説明します。

オペレーター向けの輸入

バージョン5と同様に、オペレーターには次のインポートステートメントを含める必要があります-

import 'rxjs/add/operator/mapTo'
import 'rxjs/add/operator/take'
import 'rxjs/add/operator/tap'
import 'rxjs/add/operator/map'

RxJSバージョン6では、インポートは次のようになります-

import {mapTo, take, tap, map} from "rxjs/operators"

オブザーバブルを作成するためのメソッドのインポート

バージョン5と同様に、Observablesを使用する場合は、次のインポートメソッドを含める必要があります-

import "rxjs/add/observable/from";
import "rxjs/add/observable/of";
import "rxjs/add/observable/fromEvent";
import "rxjs/add/observable/interval";

RxJSバージョン6では、インポートは次のようになります-

import {from, of, fromEvent, interval} from 'rxjs';

オブザーバブルのインポート

RxJSバージョン5では、Observablesを操作する際に、次のインポートステートメントを含める必要があります-

import { Observable } from 'rxjs/Observable'

RxJSバージョン6では、インポートは次のようになります-

import { Observable } from 'rxjs'

件名のインポート

RxJSバージョン5では、件名は次のように含める必要があります-

import { Subject} from 'rxjs/Subject'

RxJSバージョン6では、インポートは次のようになります-

import { Subject } from 'rxjs'

RxJS 6で演算子を使用する方法は?

pipe() method作成されたオブザーバブルで利用できます。バージョン5.5からRxJSに追加されています。pipe()を使用すると、複数の演算子を順番に一緒に操作できるようになりました。これは、RxJSバージョン5で演算子が使用された方法です。

import "rxjs/add/observable/from";
import 'rxjs/add/operator/max'

let list1 = [1, 6, 15, 10, 58, 2, 40];
from(list1).max((a,b)=>a-b).subscribe(x => console.log("The Max value is "+x));

RxJSバージョン5.5以降では、pipe()を使用して演算子を実行する必要があります-

import { from } from 'rxjs';
import { max } from 'rxjs/operators';

from(list1).pipe(max((a,b)=>a-b)).subscribe(x => console.log(
   "The Max value is "+x)
);

演算子の名前が変更されました

パッケージの再構築中に、一部の演算子は、javascriptキーワードと競合または一致していたため、名前が変更されました。リストは以下のとおりです-

オペレーター 名前が変更されました
行う() tap()
キャッチ() catchError()
スイッチ() switchAll()
最終的に() finalize()
スロー() throwError()

オブザーバブルは、オブザーバーを作成し、それをソースにアタッチする関数です。ソースには、クリック、dom要素からのマウスイベント、Httpリクエストなどの値が期待されます。

Observer is an object with callback functions、Observableとの相互作用がある場合、つまり、ソースがボタンクリック、Httpリクエストなどの例で相互作用した場合に呼び出されます。

この章では、次のトピックについて説明します。

  • Observableを作成する
  • Observableを購読する
  • Observableを実行する

Observableを作成する

observableは、observableコンストラクターを使用し、observable createメソッドを使用し、以下に示すように、サブスクライブ関数を引数として渡すことで作成できます。

testrx.js

import { Observable } from 'rxjs';

var observable = new Observable(
   function subscribe(subscriber) {
      subscriber.next("My First Observable")
   }
);

オブザーバブルを作成し、「MyFirstObservable」というメッセージを追加しました。 subscriber.next Observable内で利用可能なメソッド。

以下に示すように、Observable.create()メソッドを使用してObservableを作成することもできます。

testrx.js

import { Observable } from 'rxjs';
var observer = Observable.create(
   function subscribe(subscriber) {
      subscriber.next("My First Observable")
   }
);

Observableを購読する

次のようにオブザーバブルをサブスクライブできます-

testrx.js

import { Observable } from 'rxjs';

var observer = new Observable(
   function subscribe(subscriber) {
      subscriber.next("My First Observable")
   }
);
observer.subscribe(x => console.log(x));

オブザーバーがサブスクライブされると、オブザーバブルの実行が開始されます。

これは、ブラウザコンソールに表示されるものです-

Observableを実行する

オブザーバブルは、サブスクライブされると実行されます。オブザーバーは、通知される3つのメソッドを持つオブジェクトです。

next() −このメソッドは、数値、文字列、オブジェクトなどの値を送信します。

complete() −このメソッドは値を送信せず、オブザーバブルが完了したことを示します。

error() −このメソッドは、エラーがある場合はそれを送信します。

3つの通知すべてを使用してオブザーバブルを作成し、同じように実行しましょう。

testrx.js

import { Observable } from 'rxjs';
var observer = new Observable(
   function subscribe(subscriber) {
      try {
         subscriber.next("My First Observable");
         subscriber.next("Testing Observable");
         subscriber.complete();
      } catch(e){
         subscriber.error(e);
      }
   }
);
observer.subscribe(x => console.log(x), (e)=>console.log(e), 
   ()=>console.log("Observable is complete"));

上記のコードでは、次に、completeメソッドとerrorメソッドを追加しました。

try{
   subscriber.next("My First Observable");
   subscriber.next("Testing Observable");
   subscriber.complete();
} catch(e){
   subscriber.error(e);
}

next、complete、errorを実行するには、以下に示すようにsubscribeメソッドを呼び出す必要があります-

observer.subscribe(x => console.log(x), (e)=>console.log(e), 
   ()=>console.log("Observable is complete"));

errorメソッドは、エラーが発生した場合にのみ呼び出されます。

これはブラウザに表示される出力です-

演算子はRxJSの重要な部分です。演算子は、オブザーバブルを入力として取り込む純粋関数であり、出力もオブザーバブルです。

オペレーターとの協力

演算子は、オブザーバブルを入力として取り込む純粋関数であり、出力もオブザーバブルです。

演算子を操作するには、pipe()メソッドが必要です。

pipe()の使用例

let obs = of(1,2,3); // an observable
obs.pipe(
   operator1(),
   operator2(),
   operator3(),
   operator3(),
)

上記の例では、を使用してオブザーバブルを作成しました of()値1、2、および3を受け取るメソッド。これで、このobservableで、上記のように、pipe()メソッドを使用して任意の数の演算子を使用してさまざまな操作を実行できます。演算子の実行は、指定されたオブザーバブルで順番に実行されます。

以下は実際の例です-

import { of } from 'rxjs';
import { map, reduce, filter } from 'rxjs/operators';

let test1 = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
let case1 = test1.pipe(
   filter(x => x % 2 === 0),
   reduce((acc, one) => acc + one, 0)
)
case1.subscribe(x => console.log(x));

出力

30

上記の例では、偶数をフィルタリングするフィルター演算子を使用し、次に使用しました reduce() 偶数の値を追加し、サブスクライブ時に結果を提供する演算子。

これが、これから説明するObservableのリストです。

  • Creation
  • Mathematical
  • Join
  • Transformation
  • Filtering
  • Utility
  • Conditional
  • Multicasting
  • エラー処理

生成演算子

以下は、作成演算子カテゴリで説明する演算子です。

シニア番号 オペレーターと説明
1 ajax

このオペレーターは、指定されたURLに対してajaxリクエストを行います。

2 から

この演算子は、配列、配列のようなオブジェクト、promise、反復可能なオブジェクト、またはobservableのようなオブジェクトからobservableを作成します。

3 fromEvent

この演算子は、ボタンやクリックなどのイベントを発行する要素で使用されるオブザーバブルとして出力を提供します。

4 fromEventPattern

この演算子は、イベントハンドラーの登録に使用される入力関数からオブザーバブルを作成します。

5 間隔

この演算子は、指定された時間の間、毎回Observableを作成します。

6

この演算子は、渡された引数を受け取り、それらをobservableに変換します。

7 範囲

この演算子は、提供された範囲に基づいて一連の数値を提供するObservableを作成します。

8 throwError

この演算子は、エラーを通知するオブザーバブルを作成します。

9 タイマー

この演算子は、タイムアウト後に値を発行するオブザーバブルを作成し、呼び出しごとに値が増加し続けます。

10 iif

このオペレーターは、どのObservableをサブスクライブするかを決定します。

数学演算子

以下は、数学演算子のカテゴリで説明する演算子です。

シニア番号 オペレーターと説明
1 カウント

count()演算子は、値を持つObservableを受け取り、それを単一の値を与えるObservableに変換します

2 マックス

Maxメソッドは、すべての値でオブザーバブルを取り込み、最大値でオブザーバブルを返します

3 最小

Minメソッドは、すべての値でオブザーバブルを取り込み、最小値でオブザーバブルを返します。

4 減らす

reduce演算子では、入力オブザーバブルでアキュムレータ関数が使用され、アキュムレータ関数は、オプションのシード値がアキュムレータ関数に渡された状態で、累積値をオブザーバブルの形式で返します。

reduce()関数は、2つの引数、1つのアキュムレータ関数、2番目のシード値を取ります。

オペレーターに参加する

以下は、結合演算子カテゴリで説明する演算子です。

シニア番号 オペレーターと説明
1 concat

このオペレーターは、入力として指定されたObservableを順次発行し、次のObservableに進みます。

2 forkJoin

この演算子は、配列またはdictオブジェクトに入力として取り込まれ、オブザーバブルが完了するのを待って、指定されたオブザーバブルから発行された最後の値を返します。

3 マージ

この演算子は、入力オブザーバブルを取り込み、オブザーバブルからすべての値を出力し、単一の出力オブザーバブルを出力します。

4 人種

これは、最初のソースオブザーバブルのミラーコピーとなるオブザーバブルを返します。

変換演算子

以下は、変換演算子カテゴリで説明する演算子です。

シニア番号 オペレーターと説明
1 バッファ

バッファはオブザーバブルで動作し、引数をオブザーバブルとして受け取ります。配列内の元のオブザーバブルで発行された値のバッファリングを開始し、引数として取得されたオブザーバブルが発行するときに同じ値を発行します。引数として取得されたオブザーバブルが発行されると、バッファーがリセットされ、入力オブザーバブルが発行されて同じシナリオが繰り返されるまで、元のバッファーが再度開始されます。

2 bufferCount

buffercount()演算子の場合、呼び出されたオブザーバブルから値を収集し、buffercountに指定されたバッファーサイズが一致すると同じ値を出力します。

3 bufferTime

これはbufferCountに似ているため、ここでは、呼び出されたオブザーバブルから値を収集し、bufferTimeSpanを発行します。1つの引数、つまりbufferTimeSpanを取ります。

4 bufferToggle

bufferToggle()の場合、openingsとcloseingSelectorの2つの引数を取ります。開始引数はサブスクライブ可能であるか、バッファーを開始することを約束し、2番目の引数closeingSelectorは再びサブスクライブ可能であるか、バッファーを閉じて収集された値を発行するインジケーターを約束します。

5 bufferWhen

この演算子は、配列形式で値を指定します。これは、バッファーをいつ閉じ、発行し、リセットするかを決定する関数として1つの引数を取ります。

6 展開

展開演算子は、関数を引数として受け取ります。この関数は、ソースのオブザーバブルと出力のオブザーバブルに適用されます。最終的な値は観察可能です。

7 groupBy

groupBy演算子では、出力は特定の条件に基づいてグループ化され、これらのグループ項目はGroupedObservableとして出力されます。

8 地図

マップ演算子の場合、プロジェクト関数がソースObservableの各値に適用され、同じ出力がObservableとして出力されます。

9 mapTo

ソースObservableが値を発行するたびに、Observableとともに一定の値が出力として提供されます。

10 mergeMap

mergeMapオペレーターの場合、プロジェクト関数が各ソース値に適用され、その出力が出力Observableとマージされます。

11 switchMap

switchMap演算子の場合、プロジェクト関数が各ソース値に適用され、その出力が出力Observableとマージされ、指定された値が最新の予測されたObservableです。

12

これは、observableである引数windowboundariesを取り、指定されたwindowboundariesが発行するたびにネストされたobservableを返します。

フィルタリング演算子

以下は、フィルタリング演算子のカテゴリで説明する演算子です。

シニア番号 オペレーターと説明
1 デバウンス

しばらくしてソースObservableから放出された値であり、放出はObservableまたはpromiseとして指定された別の入力によって決定されます。

2 debounceTime

時間が完了した後にのみ、観測可能なソースから値を放出します。

3 明確な

この演算子は、前の値と比較したときに異なる、ソースオブザーバブルからのすべての値を提供します。

4 elementAt

この演算子は、指定されたインデックスに基づいて、監視可能なソースから単一の値を提供します。

5 フィルタ

この演算子は、指定された述語関数に基づいて、ソースObservableからの値をフィルタリングします。

6 最初

この演算子は、ソースObservableによって発行された最初の値を提供します。

7 最終

この演算子は、ソースObservableによって発行された最後の値を提供します。

8 ignoreElements

この演算子は、ソースObservableからのすべての値を無視し、コールバック関数を完了するかエラーにするための呼び出しのみを実行します。

9 サンプル

この演算子は、ソースObservableからの最新の値を提供し、出力は、渡された引数が出力するかどうかによって異なります。

10 スキップ

この演算子は、入力として取得されたカウント項目の最初の出現をスキップするオブザーバブルを返します。

11 スロットル

この演算子は、引数として使用される入力関数によって決定された時間、監視可能なソースからの値を出力するだけでなく無視し、同じプロセスが繰り返されます。

ユーティリティオペレーター

以下は、ユーティリティ演算子のカテゴリで説明する演算子です。

シニア番号 オペレーターと説明
1 タップ

この演算子は、ソースのオブザーバブルと同じ出力を持ち、オブザーバブルからユーザーに値をログに記録するために使用できます。主な値、エラーがある場合、またはタスクが完了した場合。

2 ディレイ

この演算子は、指定されたタイムアウトに基づいて、ソースObservableから発行された値を遅延させます。

3 delayWhen

この演算子は、入力として取得された別のオブザーバブルからのタイムアウトに基づいて、ソースオブザーバブルから発行された値を遅延させます。

4 observeOn

入力スケジューラに基づくこの演算子は、ソースObservableからの通知を再送信します。

5 subscribeOn

この演算子は、入力として取得されたスケジューラーに基づいて、ソースObservableへの非同期サブスクライブを支援します。

6 時間間隔

この演算子は、現在の値と、取得したスケジューラ入力を使用して計算された現在の値と前の値の間の経過時間を含むオブジェクトを返します。

7 タイムスタンプ

タイムスタンプと、値が発行された時刻を示すソースObservableから発行された値を返します。

8 タイムアウト

指定されたタイムアウト後にソースObservableが値を出力しない場合、この演算子はエラーをスローします。

9 toArray

Observableからすべてのソース値を累積し、ソースが完了するとそれらを配列として出力します。

条件付き演算子

以下は、条件演算子のカテゴリで説明する演算子です。

シニア番号 オペレーターと説明
1 defaultIfEmpty

ソースオブザーバブルが空の場合、この演算子はデフォルト値を返します。

2 すべて

入力関数がソースObservableの各値の条件を満たすことに基づいてObservableを返します。

3 見つける

これにより、ソースObservableの最初の値が、入力として取得された述語関数の条件を満たす場合にobservableが返されます。

4 findIndex

入力スケジューラに基づくこの演算子は、ソースObservableからの通知を再送信します。

5 isEmpty

この演算子は、入力オブザーバブルが値を出力せずに完全なコールバックに進む場合はtrueとして出力を返し、入力オブザーバブルが値を出力する場合はfalseとして出力を返します。

マルチキャスト演算子

以下は、マルチキャスト演算子カテゴリで説明する演算子です。

シニア番号 オペレーターと説明
1 マルチキャスト

マルチキャストオペレーターは、作成された単一のサブスクリプションを他のサブスクライバーと共有します。マルチキャストが取り込むパラメータは、connect()メソッドを持つConnectableObservableを返すサブジェクトまたはファクトリメソッドです。サブスクライブするには、connect()メソッドを呼び出す必要があります。

2 公開する

この演算子はConnectableObservableを返し、connect()メソッドを使用してオブザーバブルをサブスクライブする必要があります。

3 publishBehavior

publishBehaviourはBehaviourSubjectを利用し、ConnectableObservableを返します。作成されたオブザーバブルをサブスクライブするには、connect()メソッドを使用する必要があります。

4 publishLast

publishBehaviourはAsyncSubjectを利用し、ConnectableObservableを返します。作成されたオブザーバブルをサブスクライブするには、connect()メソッドを使用する必要があります。

5 publishReplay

publishReplayは、動作サブジェクトを利用して、値をバッファリングし、同じものを新しいサブスクライバーに再生して、ConnectableObservableを返します。作成されたオブザーバブルをサブスクライブするには、connect()メソッドを使用する必要があります。

6 シェア

これはmutlicast()演算子のエイリアスですが、唯一の違いは、サブスクリプションを開始するためにconnect()メソッドを手動で呼び出す必要がないことです。

エラー処理演算子

以下は、エラー処理演算子のカテゴリで説明する演算子です。

シニア番号 オペレーターと説明
1 catchError

この演算子は、新しいObservableまたはエラーを返すことにより、ソースObservableのエラーをキャッチします。

2 リトライ

このオペレーターは、エラーが発生した場合にソースObservableで再試行し、指定された入力カウントに基づいて再試行が行われます。

オブザーバブルが作成されたら、オブザーバブルを実行するためにサブスクライブする必要があります。

count()演算子

これは、オブザーバブルをサブスクライブする方法の簡単な例です。

例1

import { of } from 'rxjs';
import { count } from 'rxjs/operators';

let all_nums = of(1, 7, 5, 10, 10, 20);
let final_val = all_nums.pipe(count());
final_val.subscribe(x => console.log("The count is "+x));

出力

The count is 6

サブスクリプションには、unsubscribe()と呼ばれる1つのメソッドがあります。unsubscribe()メソッドを呼び出すと、そのオブザーバブルに使用されているすべてのリソースが削除されます。つまり、オブザーバブルはキャンセルされます。これは、unsubscribe()メソッドを使用する実際の例です。

例2

import { of } from 'rxjs';
import { count } from 'rxjs/operators';

let all_nums = of(1, 7, 5, 10, 10, 20);
let final_val = all_nums.pipe(count());
let test = final_val.subscribe(x => console.log("The count is "+x));
test.unsubscribe();

サブスクリプションは変数testに保存されます。test.unsubscribe()オブザーバブルを使用しました。

出力

The count is 6

サブジェクトは、マルチキャストできる、つまり多くのオブザーバーと話すことができるオブザーバブルです。イベントリスナーのあるボタンを考えてみましょう。ユーザーがボタンをクリックするたびに、リスナーの追加を使用してイベントにアタッチされた関数が呼び出されます。同様の機能が件名にも適用されます。

この章では、次のトピックについて説明します。

  • 件名を作成する
  • ObservableとSubjectの違いは何ですか?
  • 行動主体
  • 件名を再生
  • AsyncSubject

件名を作成する

サブジェクトを操作するには、以下に示すようにサブジェクトをインポートする必要があります-

import { Subject } from 'rxjs';

次のようにサブジェクトオブジェクトを作成できます-

const subject_test = new Subject();

オブジェクトは、3つのメソッドを持つオブザーバーです-

  • next(v)
  • error(e)
  • complete()

件名を購読する

以下に示すように、件名に複数のサブスクリプションを作成できます-

subject_test.subscribe({
   next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
   next: (v) => console.log(`From Subject: ${v}`)
});

サブスクリプションは、前に説明したaddlistenerと同じように、サブジェクトオブジェクトに登録されます。

サブジェクトへのデータの受け渡し

next()メソッドを使用して作成されたサブジェクトにデータを渡すことができます。

subject_test.next("A");

データは、サブジェクトに追加されたすべてのサブスクリプションに渡されます。

ここに、主題の実用的な例があります-

import { Subject } from 'rxjs';

const subject_test = new Subject();

subject_test.subscribe({
   next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
   next: (v) => console.log(`From Subject: ${v}`)
});
subject_test.next("A");
subject_test.next("B");

subject_testオブジェクトは、新しいSubject()を呼び出すことによって作成されます。subject_testオブジェクトには、next()、error()、およびcomplete()メソッドへの参照があります。上記の例の出力を以下に示します-

出力

以下に示すように、complete()メソッドを使用してサブジェクトの実行を停止できます。

import { Subject } from 'rxjs';

const subject_test = new Subject();

subject_test.subscribe({
   next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
   next: (v) => console.log(`From Subject: ${v}`)
});
subject_test.next("A");
subject_test.complete();
subject_test.next("B");

completeを呼び出すと、後で呼び出される次のメソッドは呼び出されません。

出力

error()メソッドを呼び出す方法を見てみましょう。

以下は実際の例です-

import { Subject } from 'rxjs';

const subject_test = new Subject();

subject_test.subscribe({
   error: (e) => console.log(`From Subject : ${e}`)
});
subject_test.subscribe({
   error: (e) => console.log(`From Subject : ${e}`)
});
subject_test.error(new Error("There is an error"));

出力

ObservableとSubjectの違いは何ですか?

オブザーバブルは、サブスクライバーと1対1で話します。observableをサブスクライブするときはいつでも、実行は最初から開始されます。ajaxを使用して行われたHttp呼び出しを行い、2人のサブスクライバーがobservableを呼び出します。ブラウザのネットワークタブに2つのHttpHttpリクエストが表示されます。

これが同じの実例です-

import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';

let final_val = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
let subscriber1 = final_val.subscribe(a => console.log(a));
let subscriber2 = final_val.subscribe(a => console.log(a));

出力

ここで問題となるのは、同じデータを共有したいのですが、2回のHttp呼び出しを犠牲にして共有したくないということです。1回のHttp呼び出しを行い、サブスクライバー間でデータを共有したいと考えています。

これは、サブジェクトを使用して可能になります。マルチキャストできる、つまり多くのオブザーバーと話すことができるオブザーバブルです。サブスクライバー間で値を共有できます。

これはSubjectsを使用した実際の例です-

import { Subject } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';

const subject_test = new Subject();

subject_test.subscribe({
   next: (v) => console.log(v)
});
subject_test.subscribe({
   next: (v) => console.log(v)
});

let final_val = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
let subscriber = final_val.subscribe(subject_test);

出力

これで、1つのHttp呼び出しのみが表示され、呼び出されたサブスクライバー間で同じデータが共有されます。

行動主体

行動サブジェクトは、呼び出されたときに最新の値を提供します。

以下に示すように行動サブジェクトを作成できます-

import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject("Testing Behaviour Subject"); 
// initialized the behaviour subject with value:Testing Behaviour Subject

これは、BehaviorSubjectを使用するための実用的な例です-

import { BehaviorSubject } from 'rxjs';
const behavior_subject = new BehaviorSubject("Testing Behaviour Subject"); 
// 0 is the initial value

behavior_subject.subscribe({
   next: (v) => console.log(`observerA: ${v}`)
});

behavior_subject.next("Hello");
behavior_subject.subscribe({
   next: (v) => console.log(`observerB: ${v}`)
});
behavior_subject.next("Last call to Behaviour Subject");

出力

件名を再生

replaysubjectは動作subjectに似ており、値をバッファリングして、新しいサブスクライバーに対して同じものを再生できます。

これがリプレイサブジェクトの実例です-

import { ReplaySubject } from 'rxjs';
const replay_subject = new ReplaySubject(2); 
// buffer 2 values but new subscribers

replay_subject.subscribe({
   next: (v) => console.log(`Testing Replay Subject A: ${v}`)
});

replay_subject.next(1);
replay_subject.next(2);
replay_subject.next(3);
replay_subject.subscribe({
   next: (v) => console.log(`Testing Replay Subject B: ${v}`)
});

replay_subject.next(5);

使用されるバッファ値は、再生対象で2です。したがって、最後の2つの値はバッファリングされ、呼び出される新しいサブスクライバーに使用されます。

出力

AsyncSubject

AsyncSubjectの場合、最後に呼び出された値がサブスクライバーに渡され、complete()メソッドが呼び出された後にのみ実行されます。

これは同じの実用的な例です-

import { AsyncSubject } from 'rxjs';

const async_subject = new AsyncSubject();

async_subject.subscribe({
   next: (v) => console.log(`Testing Async Subject A: ${v}`)
});

async_subject.next(1);
async_subject.next(2);
async_subject.complete();
async_subject.subscribe({
   next: (v) => console.log(`Testing Async Subject B: ${v}`)
});

ここで、completeが呼び出される前に、サブジェクトに渡される最後の値は2であり、サブスクライバーに渡される値と同じです。

出力

スケジューラーは、サブスクリプションを開始して通知する必要がある場合の実行を制御します。

スケジューラーを利用するには、以下が必要です。

import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';

これは実際の例です。ここでは、実行を決定するスケジューラーを使用します。

import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';

var observable = new Observable(function subscribe(subscriber) {
   subscriber.next("My First Observable");
   subscriber.next("Testing Observable");
   subscriber.complete();
}).pipe(
   observeOn(asyncScheduler)
);
console.log("Observable Created");
observable.subscribe(
   x => console.log(x),
   (e)=>console.log(e),
   ()=>console.log("Observable is complete")
);

console.log('Observable Subscribed');

出力

スケジューラがない場合、出力は次のようになります。

この章では、AngularでRxJを使用する方法を説明します。ここではAngularのインストールプロセスには触れません。Angularのインストールについては、このリンクを参照してください-https://www.tutorialspoint.com/angular7/angular7_environment_setup.htm

RxJSのAjaxを使用してデータをロードする例に直接取り組みます。

app.component.ts

import { Component } from '@angular/core';
import { environment } from './../environments/environment';
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators'

@Component({
   selector: 'app-root',
   templateUrl: './app.component.html',
   styleUrls: ['./app.component.css']
})
export class AppComponent {
   title = '';
   data;
   constructor() {
      this.data = "";
      this.title = "Using RxJs with Angular";
      let a = this.getData();
   }
   getData() {
      const response =
      ajax('https://jsonplaceholder.typicode.com/users')
         .pipe(map(e => e.response));
      response.subscribe(res => {
         console.log(res);
         this.data = res;
      });
   }
}

app.component.html

<div>
   <h3>{{title}}</h3>
   <ul *ngFor="let i of data">
      <li>{{i.id}}: {{i.name}}</li>
   </ul>
</div>

<router-outlet></router-outlet>

このURLからデータをロードするRxJSのajaxを使用しました-https://jsonplaceholder.typicode.com/users。

コンパイルすると、以下のように表示されます。

この章では、ReactJSでRxJを使用する方法を説明します。ここでは、Reactjsのインストールプロセスについては説明しません。ReactJSのインストールについては、次のリンクを参照してください。https://www.tutorialspoint.com/reactjs/reactjs_environment_setup.htm

以下の例で直接作業します。ここでは、RxJSのAjaxを使用してデータをロードします。

index.js

import React, { Component } from "react";
import ReactDOM from "react-dom";
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';
class App extends Component {
   constructor() {
      super();
      this.state = { data: [] };
   }
   componentDidMount() {
      const response = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
      response.subscribe(res => {
         this.setState({ data: res });
      });
   }
   render() {
      return (
         <div>
            <h3>Using RxJS with ReactJS</h3>
            <ul>
               {this.state.data.map(el => (
                  <li>
                     {el.id}: {el.name}
                  </li>
               ))}
            </ul>
         </div>
      );
   }
}
ReactDOM.render(<App />, document.getElementById("root"));

index.html

<!DOCTYPE html>
<html>
   <head>
      <meta charset = "UTF-8" />
      <title>ReactJS Demo</title>
   <head>
   <body>
      <div id = "root"></div>
   </body>
</html>

このURLからデータをロードするRxJSのajaxを使用しました-https://jsonplaceholder.typicode.com/users。

コンパイルすると、表示は以下のようになります-