RxPY - praca z tematem

Podmiot jest obserwowalną sekwencją, a także obserwatorem, który może przesyłać strumieniowo, tj. Rozmawiać z wieloma subskrybentami.

Omówimy następujące tematy na temat -

  • Utwórz temat
  • Zapisz się do tematu
  • Przekazywanie danych podmiotowi
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

Utwórz temat

Aby pracować z tematem, musimy zaimportować temat, jak pokazano poniżej -

from rx.subject import Subject

Możesz utworzyć podmiot-obiekt w następujący sposób -

subject_test = Subject()

Obiekt jest obserwatorem, który ma trzy metody -

  • on_next(value)
  • on_error (błąd) i
  • on_completed()

Zasubskrybuj temat

Możesz utworzyć wiele subskrypcji na ten temat, jak pokazano poniżej -

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

Przekazywanie danych podmiotowi

Możesz przekazać dane podmiotowi utworzonemu za pomocą metody on_next (wartość), jak pokazano poniżej -

subject_test.on_next("A")
subject_test.on_next("B")

Dane zostaną przekazane do wszystkich abonamentów, dodanych w temacie.

Oto roboczy przykład tematu.

Przykład

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

Obiekt subject_test jest tworzony przez wywołanie metody Subject (). Obiekt subject_test odwołuje się do metod on_next (wartość), on_error (błąd) i on_completed (). Dane wyjściowe z powyższego przykładu pokazano poniżej -

Wynik

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

Możemy użyć metody on_completed (), aby zatrzymać wykonywanie podmiotu, jak pokazano poniżej.

Przykład

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

Po wywołaniu complete następna metoda wywoływana później nie jest wywoływana.

Wynik

E:\pyrx>python testrx.py
The value is A
The value is A

Zobaczmy teraz, jak wywołać metodę on_error (error).

Przykład

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

Wynik

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

BehaviorSubject

BehaviorSubject poda najnowszą wartość po wywołaniu. Możesz stworzyć temat zachowania, jak pokazano poniżej -

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

Oto działający przykład użycia tematu zachowania

Przykład

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

Wynik

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

Powtórz temat

Obiekt powtórki jest podobny do tematu zachowania, w którym może buforować wartości i odtwarzać to samo nowym subskrybentom. Oto działający przykład tematu powtórki.

Przykład

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

Wartość bufora dla tematu powtórki to 2. Zatem ostatnie dwie wartości zostaną zbuforowane i użyte dla nowych wywoływanych abonentów.

Wynik

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

W przypadku AsyncSubject ostatnia wywołana wartość jest przekazywana do subskrybenta i zostanie to zrobione dopiero po wywołaniu metody complete ().

Przykład

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

Wynik

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2