RXJS - Evitar suscripción interna
Tratando de buscar la receta que necesito pero no puedo encontrarla en ningún lado.
Tengo un código que se parece a esto.
const Listeners = listen("data:join"); /* observable of people who want data */
const DataStream = stream("data"); /* observable of data */
¿Cómo puedo construir una canalización que:
- Para cada uno
personque se une a mi flujo de oyentes, los suscribo al flujo de datos. - Cada persona que dispara un
data:leaveevento se da de baja de la transmisión. - La larga lista de operadores de tuberías bajo el capó de DataStream solo dispara una vez, NO una vez por cada persona que se une.
EDITAR: ¿Cuál es el equivalente a esto de una manera segura para la memoria?
Listeners.subscribe((personListening) => {
DataStream.subscribe((data) => personListening.send(data))
// And until fromEvent(personListening, "data:leave") fires.
})
/* OR */
DataStream.subscribe((data) => {
Listeners.subscribe((person) => {
person.send(data);
})
})
Respuestas
No estoy exactamente seguro del comportamiento de sus observables, pero en un nivel general podría usar cualquiera de los operadores de mapeo de orden superior de RxJS (como switchMap, concatMapetc. - diferencias aquí ) para mapear de un observable a otro. Y use el operador RxJS takeUntilpara completar/darse de baja de un observable basado en otro observable.
También puede usar takeUntilpara cerrar todas las suscripciones abiertas cuando el componente está cerrado.
Prueba lo siguiente
import { Subject } from 'rxjs';
import { tap, takeUntil, switchMap } from 'rxjs/operators';
complete$ = new Subject<any>();
Listeners.pipe(
switchMap((personListening) => { // <-- switch to the `DataStream` observable
return DataStream.pipe(
tap((data) => personListening.send(data)), // <-- call `send()` here
takeUntil(fromEvent(personListening, 'data:leave'))
);
}),
takeUntil(this.complete$) // emit `complete$` on `ngOnDestroy` hook
).subscribe(
_, // <-- do nothing on response
(err) => console.log(err) // <-- handle error
);
ngOnDestroy() {
this.complete$.next(); // <-- close any open subscriptions
}
Creo que quieres mirar el salto y tomar operadores de rxjs.
Ejemplo:
const data = interval(1000);
const start = timer(4500);
const end = timer(21800);
data.pipe(
skipUntil(start),
takeUntil(end),
).subscribe(console.log);
dataes aquí un flujo continuo de emisiones con un número incremental cada segundo. starty endemitir una vez después de un tiempo definido. En la consola, verá un rango limitado de los datos transmitidos.
Apilado:https://stackblitz.com/edit/rxjs-ccdfif?file=index.ts