RxPY - Trabalhando com Assunto

Um assunto é uma sequência observável, bem como um observador que pode fazer multicast, ou seja, falar com muitos observadores que se inscreveram.

Vamos discutir os seguintes tópicos sobre o assunto -

  • Crie um assunto
  • Assine um assunto
  • Passando dados para o assunto
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

Crie um assunto

Para trabalhar com um assunto, precisamos importar Assunto conforme mostrado abaixo -

from rx.subject import Subject

Você pode criar um objeto-sujeito da seguinte maneira -

subject_test = Subject()

O objeto é um observador que possui três métodos -

  • on_next(value)
  • on_error (erro) e
  • on_completed()

Inscrever-se em um assunto

Você pode criar várias assinaturas sobre o assunto, conforme mostrado abaixo -

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

Passando Dados para o Assunto

Você pode passar dados para o assunto criado usando o método on_next (valor) como mostrado abaixo -

subject_test.on_next("A")
subject_test.on_next("B")

Os dados serão repassados ​​a todas as inscrições, agregadas no assunto.

Aqui está um exemplo prático do assunto.

Exemplo

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

O objeto subject_test é criado chamando um Subject (). O objeto subject_test faz referência aos métodos on_next (value), on_error (error) e on_completed (). O resultado do exemplo acima é mostrado abaixo -

Resultado

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

Podemos usar o método on_completed (), para parar a execução do assunto como mostrado abaixo.

Exemplo

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

Assim que chamarmos complete, o próximo método chamado posteriormente não será invocado.

Resultado

E:\pyrx>python testrx.py
The value is A
The value is A

Vamos agora ver como chamar o método on_error (error).

Exemplo

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

Resultado

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

BehaviorSubject

BehaviorSubject fornecerá o valor mais recente quando chamado. Você pode criar um assunto de comportamento conforme mostrado abaixo -

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

Aqui está um exemplo prático para usar o assunto de comportamento

Exemplo

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

Resultado

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

Assunto Repetir

Um sujeito de replay é semelhante ao sujeito de comportamento, em que ele pode armazenar os valores e reproduzir os mesmos para os novos assinantes. Aqui está um exemplo prático do assunto de repetição.

Exemplo

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

O valor do buffer usado é 2 no assunto de reprodução. Portanto, os dois últimos valores serão armazenados em buffer e usados ​​para os novos assinantes chamados.

Resultado

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

No caso de AsyncSubject, o último valor chamado é passado para o assinante, e isso será feito somente depois que o método complete () for chamado.

Exemplo

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

Resultado

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2