RXJS - Éviter l'abonnement interne
J'essaie de trouver la recette dont j'ai besoin mais je ne la trouve nulle part.
J'ai un code qui ressemble à ceci.
const Listeners = listen("data:join"); /* observable of people who want data */
const DataStream = stream("data"); /* observable of data */
Comment puis-je construire un pipeline qui :
- Pour chacun
personqui rejoint le flux de mes auditeurs, je les inscris au flux de données. - Chaque personne qui déclenche un
data:leaveévénement se désabonne du flux - La longue liste d'opérateurs de pipe de DataStream sous le capot ne se déclenche qu'une seule fois PAS une fois pour chaque personne qui se joint.
EDIT: Quel est l'équivalent de ceci d'une manière sûre pour la mémoire:
Listeners.subscribe((personListening) => {
DataStream.subscribe((data) => personListening.send(data))
// And until fromEvent(personListening, "data:leave") fires.
})
/* OR */
DataStream.subscribe((data) => {
Listeners.subscribe((person) => {
person.send(data);
})
})
Réponses
Je ne suis pas exactement sûr de votre comportement observable, mais de manière générale, vous pouvez utiliser n'importe lequel des opérateurs de mappage d'ordre supérieur RxJS (comme switchMap, concatMapetc. - différences ici ) pour mapper d'un observable à un autre. Et utilisez l'opérateur RxJS takeUntilpour compléter/désabonner d'un observable basé sur un autre observable.
Vous pouvez takeUntilégalement utiliser le pour fermer tous les abonnements ouverts lorsque le composant est fermé.
Essayez ce qui suit
import { Subject } from 'rxjs';
import { tap, takeUntil, switchMap } from 'rxjs/operators';
complete$ = new Subject<any>(); Listeners.pipe( switchMap((personListening) => { // <-- switch to the `DataStream` observable return DataStream.pipe( tap((data) => personListening.send(data)), // <-- call `send()` here takeUntil(fromEvent(personListening, 'data:leave')) ); }), takeUntil(this.complete$) // emit `complete$` on `ngOnDestroy` hook ).subscribe( _, // <-- do nothing on response (err) => console.log(err) // <-- handle error ); ngOnDestroy() { this.complete$.next(); // <-- close any open subscriptions
}
Je pense que vous voulez regarder sur le saut et prendre les opérateurs de rxjs.
Exemple:
const data = interval(1000);
const start = timer(4500);
const end = timer(21800);
data.pipe(
skipUntil(start),
takeUntil(end),
).subscribe(console.log);
dataest ici un flux continu d'émissions avec un nombre incrémental chaque seconde. startet endémettre une fois après un temps défini. Dans la console, vous verrez une plage limitée des données diffusées.
Stackblitz :https://stackblitz.com/edit/rxjs-ccdfif?file=index.ts