RXJS-内部サブスクリプションを回避する

Aug 23 2020

必要なレシピを探し出そうとしていますが、どこにも見つかりません。

私はこのようなコードを持っています。

const Listeners = listen("data:join"); /* observable of people who want data */
const DataStream = stream("data"); /* observable of data */

次のようなパイプラインを構築するにはどうすればよいですか。

  • personリスナーストリームに参加するそれぞれについて、データストリームにサブスクライブします。
  • data:leaveイベントを発生させる各人は、ストリームから退会します
  • DataStreamの内部のパイプオペレーターの長いリストは、参加するすべての人に対して1回ではなく、1回だけ起動します。

編集:メモリセーフな方法でこれに相当するものは何ですか:

Listeners.subscribe((personListening) => {
     DataStream.subscribe((data) => personListening.send(data))
     // And until fromEvent(personListening, "data:leave") fires.
})
/* OR */
DataStream.subscribe((data) => {
    Listeners.subscribe((person) => {
        person.send(data);
    })
})

回答

1 MichaelD Aug 23 2020 at 13:18

私は正確に確認してくださいあなたの観測行動のではないですが、一般的なレベルであなたは(のようなRxJS高次のマッピング演算子のいずれかを使用することができswitchMapconcatMapなど-の違いここでは)別の観測可能なものからマップすること。また、RxJStakeUntilオペレーターを使用して、別のオブザーバブルに基づいてオブザーバブルを完了/サブスクライブ解除します。

takeUntilコンポーネントが閉じられたときに、を使用して、開いているすべてのサブスクリプションを閉じることもできます。

次を試してください

import { Subject } from 'rxjs';
import { tap, takeUntil, switchMap } from 'rxjs/operators';

complete$ = new Subject<any>(); Listeners.pipe( switchMap((personListening) => { // <-- switch to the `DataStream` observable return DataStream.pipe( tap((data) => personListening.send(data)), // <-- call `send()` here takeUntil(fromEvent(personListening, 'data:leave')) ); }), takeUntil(this.complete$)      // emit `complete$` on `ngOnDestroy` hook ).subscribe( _, // <-- do nothing on response (err) => console.log(err) // <-- handle error ); ngOnDestroy() { this.complete$.next();         // <-- close any open subscriptions
}
MoxxiManagarm Aug 23 2020 at 11:45

rxjsのスキップアンドテイク演算子を見てみたいと思います。

例:

const data = interval(1000);

const start = timer(4500);
const end = timer(21800);

data.pipe(
  skipUntil(start),
  takeUntil(end),
).subscribe(console.log);

dataここでは、毎秒増分数の放出の連続ストリームです。startそして、end定義された時間後にいったん放出します。コンソールには、ストリーミングされるデータの限られた範囲が表示されます。

Stackblitz: https://stackblitz.com/edit/rxjs-ccdfif?file=index.ts