RXJS - Evite assinatura interna
Estou tentando encontrar a receita que preciso, mas não consigo encontrá-la em lugar nenhum.
Eu tenho um código que se parece com isso.
const Listeners = listen("data:join"); /* observable of people who want data */
const DataStream = stream("data"); /* observable of data */
Como posso construir um pipeline que:
- Para cada um
personque se junta ao fluxo de meus ouvintes, eu os inscrevo no fluxo de dados. - Cada pessoa que dispara um
data:leaveevento cancela a inscrição no stream - A longa lista de operadores de tubulação sob o capô do DataStream é acionada apenas uma vez, NÃO uma vez para cada pessoa que se junta.
EDIT: Qual é o equivalente a isso de uma maneira segura para a memória:
Listeners.subscribe((personListening) => {
DataStream.subscribe((data) => personListening.send(data))
// And until fromEvent(personListening, "data:leave") fires.
})
/* OR */
DataStream.subscribe((data) => {
Listeners.subscribe((person) => {
person.send(data);
})
})
Respostas
Não tenho certeza do comportamento de seus observáveis, mas em um nível geral, você pode usar qualquer um dos operadores de mapeamento de ordem superior RxJS (como switchMap, concatMapetc. - diferenças aqui ) para mapear de um observável para outro. E use o operador RxJS takeUntilpara concluir/cancelar a assinatura de um observável com base em outro observável.
Você também pode usar o takeUntilpara fechar todas as assinaturas abertas quando o componente for fechado.
Tente o seguinte
import { Subject } from 'rxjs';
import { tap, takeUntil, switchMap } from 'rxjs/operators';
complete$ = new Subject<any>();
Listeners.pipe(
switchMap((personListening) => { // <-- switch to the `DataStream` observable
return DataStream.pipe(
tap((data) => personListening.send(data)), // <-- call `send()` here
takeUntil(fromEvent(personListening, 'data:leave'))
);
}),
takeUntil(this.complete$) // emit `complete$` on `ngOnDestroy` hook
).subscribe(
_, // <-- do nothing on response
(err) => console.log(err) // <-- handle error
);
ngOnDestroy() {
this.complete$.next(); // <-- close any open subscriptions
}
Acho que você quer olhar no skip e pegar operadores de rxjs.
Exemplo:
const data = interval(1000);
const start = timer(4500);
const end = timer(21800);
data.pipe(
skipUntil(start),
takeUntil(end),
).subscribe(console.log);
dataé aqui um fluxo contínuo de emissões com um número incremental a cada segundo. starte endemitir uma vez após um tempo definido. No console, você verá um intervalo limitado dos dados transmitidos.
Stackblitz:https://stackblitz.com/edit/rxjs-ccdfif?file=index.ts