RXJS - Evita la sottoscrizione interna

Aug 23 2020

Sto cercando la ricetta che mi serve ma non la trovo da nessuna parte.

Ho un codice che assomiglia a questo.

const Listeners = listen("data:join"); /* observable of people who want data */
const DataStream = stream("data"); /* observable of data */

Come posso costruire una pipeline che:

  • Per ognuno personche si unisce al flusso dei miei ascoltatori, li iscrivo al flusso di dati.
  • Ogni persona che attiva un data:leaveevento annulla l'iscrizione allo stream
  • Il lungo elenco di operatori di tubi di DataStream sotto il cofano spara solo una volta NON una volta per ogni persona che si unisce.

EDIT: qual è l'equivalente di questo in un modo sicuro per la memoria:

Listeners.subscribe((personListening) => {
     DataStream.subscribe((data) => personListening.send(data))
     // And until fromEvent(personListening, "data:leave") fires.
})
/* OR */
DataStream.subscribe((data) => {
    Listeners.subscribe((person) => {
        person.send(data);
    })
})

Risposte

1 MichaelD Aug 23 2020 at 13:18

Non sono esattamente sicuro del tuo comportamento osservabile, ma a livello generale potresti utilizzare uno qualsiasi degli operatori di mappatura di ordine superiore RxJS (come switchMap, concatMapecc. - differenze qui ) per mappare da un osservabile a un altro. E usa l'operatore RxJS takeUntilper completare/annullare l'iscrizione a un osservabile basato su un altro osservabile.

È possibile utilizzare takeUntilanche per chiudere tutte le sottoscrizioni aperte quando il componente è chiuso.

Prova quanto segue

import { Subject } from 'rxjs';
import { tap, takeUntil, switchMap } from 'rxjs/operators';

complete$ = new Subject<any>();

Listeners.pipe(
  switchMap((personListening) => {     // <-- switch to the `DataStream` observable
    return DataStream.pipe(
      tap((data) => personListening.send(data)),   // <-- call `send()` here
      takeUntil(fromEvent(personListening, 'data:leave'))
    );
  }),
  takeUntil(this.complete$)      // emit `complete$` on `ngOnDestroy` hook
).subscribe(
  _,                             // <-- do nothing on response
  (err) => console.log(err)      // <-- handle error
);

ngOnDestroy() {
  this.complete$.next();         // <-- close any open subscriptions
}
MoxxiManagarm Aug 23 2020 at 11:45

Penso che tu voglia guardare il salto e prendere gli operatori di rxjs.

Esempio:

const data = interval(1000);

const start = timer(4500);
const end = timer(21800);

data.pipe(
  skipUntil(start),
  takeUntil(end),
).subscribe(console.log);

dataè qui un flusso continuo di emissioni con un numero incrementale ogni secondo. started endemettere una volta dopo un tempo definito. Nella console vedrai una gamma limitata di dati trasmessi.

Stackblitz:https://stackblitz.com/edit/rxjs-ccdfif?file=index.ts