RXJS - Evitar suscripción interna

Aug 23 2020

Tratando de buscar la receta que necesito pero no puedo encontrarla en ningún lado.

Tengo un código que se parece a esto.

const Listeners = listen("data:join"); /* observable of people who want data */
const DataStream = stream("data"); /* observable of data */

¿Cómo puedo construir una canalización que:

  • Para cada uno personque se une a mi flujo de oyentes, los suscribo al flujo de datos.
  • Cada persona que dispara un data:leaveevento se da de baja de la transmisión.
  • La larga lista de operadores de tuberías bajo el capó de DataStream solo dispara una vez, NO una vez por cada persona que se une.

EDITAR: ¿Cuál es el equivalente a esto de una manera segura para la memoria?

Listeners.subscribe((personListening) => {
     DataStream.subscribe((data) => personListening.send(data))
     // And until fromEvent(personListening, "data:leave") fires.
})
/* OR */
DataStream.subscribe((data) => {
    Listeners.subscribe((person) => {
        person.send(data);
    })
})

Respuestas

1 MichaelD Aug 23 2020 at 13:18

No estoy exactamente seguro del comportamiento de sus observables, pero en un nivel general podría usar cualquiera de los operadores de mapeo de orden superior de RxJS (como switchMap, concatMapetc. - diferencias aquí ) para mapear de un observable a otro. Y use el operador RxJS takeUntilpara completar/darse de baja de un observable basado en otro observable.

También puede usar takeUntilpara cerrar todas las suscripciones abiertas cuando el componente está cerrado.

Prueba lo siguiente

import { Subject } from 'rxjs';
import { tap, takeUntil, switchMap } from 'rxjs/operators';

complete$ = new Subject<any>();

Listeners.pipe(
  switchMap((personListening) => {     // <-- switch to the `DataStream` observable
    return DataStream.pipe(
      tap((data) => personListening.send(data)),   // <-- call `send()` here
      takeUntil(fromEvent(personListening, 'data:leave'))
    );
  }),
  takeUntil(this.complete$)      // emit `complete$` on `ngOnDestroy` hook
).subscribe(
  _,                             // <-- do nothing on response
  (err) => console.log(err)      // <-- handle error
);

ngOnDestroy() {
  this.complete$.next();         // <-- close any open subscriptions
}
MoxxiManagarm Aug 23 2020 at 11:45

Creo que quieres mirar el salto y tomar operadores de rxjs.

Ejemplo:

const data = interval(1000);

const start = timer(4500);
const end = timer(21800);

data.pipe(
  skipUntil(start),
  takeUntil(end),
).subscribe(console.log);

dataes aquí un flujo continuo de emisiones con un número incremental cada segundo. starty endemitir una vez después de un tiempo definido. En la consola, verá un rango limitado de los datos transmitidos.

Apilado:https://stackblitz.com/edit/rxjs-ccdfif?file=index.ts