RXJS - Evita la sottoscrizione interna
Sto cercando la ricetta che mi serve ma non la trovo da nessuna parte.
Ho un codice che assomiglia a questo.
const Listeners = listen("data:join"); /* observable of people who want data */
const DataStream = stream("data"); /* observable of data */
Come posso costruire una pipeline che:
- Per ognuno
person
che si unisce al flusso dei miei ascoltatori, li iscrivo al flusso di dati. - Ogni persona che attiva un
data:leave
evento annulla l'iscrizione allo stream - Il lungo elenco di operatori di tubi di DataStream sotto il cofano spara solo una volta NON una volta per ogni persona che si unisce.
EDIT: qual è l'equivalente di questo in un modo sicuro per la memoria:
Listeners.subscribe((personListening) => {
DataStream.subscribe((data) => personListening.send(data))
// And until fromEvent(personListening, "data:leave") fires.
})
/* OR */
DataStream.subscribe((data) => {
Listeners.subscribe((person) => {
person.send(data);
})
})
Risposte
Non sono esattamente sicuro del tuo comportamento osservabile, ma a livello generale potresti utilizzare uno qualsiasi degli operatori di mappatura di ordine superiore RxJS (come switchMap
, concatMap
ecc. - differenze qui ) per mappare da un osservabile a un altro. E usa l'operatore RxJS takeUntilper completare/annullare l'iscrizione a un osservabile basato su un altro osservabile.
È possibile utilizzare takeUntil
anche per chiudere tutte le sottoscrizioni aperte quando il componente è chiuso.
Prova quanto segue
import { Subject } from 'rxjs';
import { tap, takeUntil, switchMap } from 'rxjs/operators';
complete$ = new Subject<any>();
Listeners.pipe(
switchMap((personListening) => { // <-- switch to the `DataStream` observable
return DataStream.pipe(
tap((data) => personListening.send(data)), // <-- call `send()` here
takeUntil(fromEvent(personListening, 'data:leave'))
);
}),
takeUntil(this.complete$) // emit `complete$` on `ngOnDestroy` hook
).subscribe(
_, // <-- do nothing on response
(err) => console.log(err) // <-- handle error
);
ngOnDestroy() {
this.complete$.next(); // <-- close any open subscriptions
}
Penso che tu voglia guardare il salto e prendere gli operatori di rxjs.
Esempio:
const data = interval(1000);
const start = timer(4500);
const end = timer(21800);
data.pipe(
skipUntil(start),
takeUntil(end),
).subscribe(console.log);
data
è qui un flusso continuo di emissioni con un numero incrementale ogni secondo. start
ed end
emettere una volta dopo un tempo definito. Nella console vedrai una gamma limitata di dati trasmessi.
Stackblitz:https://stackblitz.com/edit/rxjs-ccdfif?file=index.ts