RXJS-内部サブスクリプションを回避する
Aug 23 2020
必要なレシピを探し出そうとしていますが、どこにも見つかりません。
私はこのようなコードを持っています。
const Listeners = listen("data:join"); /* observable of people who want data */
const DataStream = stream("data"); /* observable of data */
次のようなパイプラインを構築するにはどうすればよいですか。
person
リスナーストリームに参加するそれぞれについて、データストリームにサブスクライブします。data:leave
イベントを発生させる各人は、ストリームから退会します- DataStreamの内部のパイプオペレーターの長いリストは、参加するすべての人に対して1回ではなく、1回だけ起動します。
編集:メモリセーフな方法でこれに相当するものは何ですか:
Listeners.subscribe((personListening) => {
DataStream.subscribe((data) => personListening.send(data))
// And until fromEvent(personListening, "data:leave") fires.
})
/* OR */
DataStream.subscribe((data) => {
Listeners.subscribe((person) => {
person.send(data);
})
})
回答
1 MichaelD Aug 23 2020 at 13:18
私は正確に確認してくださいあなたの観測行動のではないですが、一般的なレベルであなたは(のようなRxJS高次のマッピング演算子のいずれかを使用することができswitchMap
、concatMap
など-の違いここでは)別の観測可能なものからマップすること。また、RxJStakeUntilオペレーターを使用して、別のオブザーバブルに基づいてオブザーバブルを完了/サブスクライブ解除します。
takeUntil
コンポーネントが閉じられたときに、を使用して、開いているすべてのサブスクリプションを閉じることもできます。
次を試してください
import { Subject } from 'rxjs';
import { tap, takeUntil, switchMap } from 'rxjs/operators';
complete$ = new Subject<any>(); Listeners.pipe( switchMap((personListening) => { // <-- switch to the `DataStream` observable return DataStream.pipe( tap((data) => personListening.send(data)), // <-- call `send()` here takeUntil(fromEvent(personListening, 'data:leave')) ); }), takeUntil(this.complete$) // emit `complete$` on `ngOnDestroy` hook ).subscribe( _, // <-- do nothing on response (err) => console.log(err) // <-- handle error ); ngOnDestroy() { this.complete$.next(); // <-- close any open subscriptions
}
MoxxiManagarm Aug 23 2020 at 11:45
rxjsのスキップアンドテイク演算子を見てみたいと思います。
例:
const data = interval(1000);
const start = timer(4500);
const end = timer(21800);
data.pipe(
skipUntil(start),
takeUntil(end),
).subscribe(console.log);
data
ここでは、毎秒増分数の放出の連続ストリームです。start
そして、end
定義された時間後にいったん放出します。コンソールには、ストリーミングされるデータの限られた範囲が表示されます。
Stackblitz: https://stackblitz.com/edit/rxjs-ccdfif?file=index.ts