RXJS - Evite assinatura interna

Aug 23 2020

Estou tentando encontrar a receita que preciso, mas não consigo encontrá-la em lugar nenhum.

Eu tenho um código que se parece com isso.

const Listeners = listen("data:join"); /* observable of people who want data */
const DataStream = stream("data"); /* observable of data */

Como posso construir um pipeline que:

  • Para cada um personque se junta ao fluxo de meus ouvintes, eu os inscrevo no fluxo de dados.
  • Cada pessoa que dispara um data:leaveevento cancela a inscrição no stream
  • A longa lista de operadores de tubulação sob o capô do DataStream é acionada apenas uma vez, NÃO uma vez para cada pessoa que se junta.

EDIT: Qual é o equivalente a isso de uma maneira segura para a memória:

Listeners.subscribe((personListening) => {
     DataStream.subscribe((data) => personListening.send(data))
     // And until fromEvent(personListening, "data:leave") fires.
})
/* OR */
DataStream.subscribe((data) => {
    Listeners.subscribe((person) => {
        person.send(data);
    })
})

Respostas

1 MichaelD Aug 23 2020 at 13:18

Não tenho certeza do comportamento de seus observáveis, mas em um nível geral, você pode usar qualquer um dos operadores de mapeamento de ordem superior RxJS (como switchMap, concatMapetc. - diferenças aqui ) para mapear de um observável para outro. E use o operador RxJS takeUntilpara concluir/cancelar a assinatura de um observável com base em outro observável.

Você também pode usar o takeUntilpara fechar todas as assinaturas abertas quando o componente for fechado.

Tente o seguinte

import { Subject } from 'rxjs';
import { tap, takeUntil, switchMap } from 'rxjs/operators';

complete$ = new Subject<any>();

Listeners.pipe(
  switchMap((personListening) => {     // <-- switch to the `DataStream` observable
    return DataStream.pipe(
      tap((data) => personListening.send(data)),   // <-- call `send()` here
      takeUntil(fromEvent(personListening, 'data:leave'))
    );
  }),
  takeUntil(this.complete$)      // emit `complete$` on `ngOnDestroy` hook
).subscribe(
  _,                             // <-- do nothing on response
  (err) => console.log(err)      // <-- handle error
);

ngOnDestroy() {
  this.complete$.next();         // <-- close any open subscriptions
}
MoxxiManagarm Aug 23 2020 at 11:45

Acho que você quer olhar no skip e pegar operadores de rxjs.

Exemplo:

const data = interval(1000);

const start = timer(4500);
const end = timer(21800);

data.pipe(
  skipUntil(start),
  takeUntil(end),
).subscribe(console.log);

dataé aqui um fluxo contínuo de emissões com um número incremental a cada segundo. starte endemitir uma vez após um tempo definido. No console, você verá um intervalo limitado dos dados transmitidos.

Stackblitz:https://stackblitz.com/edit/rxjs-ccdfif?file=index.ts