RxPY - Arbeiten mit Betreff

Ein Subjekt ist eine beobachtbare Sequenz sowie ein Beobachter, der Multicasting durchführen kann, dh mit vielen Beobachtern spricht, die sich angemeldet haben.

Wir werden die folgenden Themen zum Thema diskutieren -

  • Erstellen Sie ein Thema
  • Abonnieren Sie ein Thema
  • Übergabe von Daten an den Betreff
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

Erstellen Sie ein Thema

Um mit einem Betreff zu arbeiten, müssen Sie den Betreff wie unten gezeigt importieren -

from rx.subject import Subject

Sie können ein Subjekt-Objekt wie folgt erstellen:

subject_test = Subject()

Das Objekt ist ein Beobachter, der drei Methoden hat -

  • on_next(value)
  • on_error (Fehler) und
  • on_completed()

Abonnieren Sie einen Betreff

Sie können mehrere Abonnements zu diesem Thema erstellen, wie unten gezeigt -

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

Übergabe von Daten an den Betreff

Sie können Daten an den Betreff übergeben, der mit der Methode on_next (value) erstellt wurde (siehe unten).

subject_test.on_next("A")
subject_test.on_next("B")

Die Daten werden an alle Abonnements weitergegeben, die zu diesem Thema hinzugefügt wurden.

Hier ist ein Arbeitsbeispiel für das Thema.

Beispiel

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

Das subject_test-Objekt wird durch Aufrufen eines Subject () erstellt. Das subject_test-Objekt verweist auf die Methoden on_next (Wert), on_error (Fehler) und on_completed (). Die Ausgabe des obigen Beispiels ist unten dargestellt -

Ausgabe

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

Wir können die on_completed () -Methode verwenden, um die Betreffausführung wie unten gezeigt zu stoppen.

Beispiel

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

Sobald wir complete aufrufen, wird die nächste später aufgerufene Methode nicht mehr aufgerufen.

Ausgabe

E:\pyrx>python testrx.py
The value is A
The value is A

Lassen Sie uns nun sehen, wie die Methode on_error (error) aufgerufen wird.

Beispiel

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

Ausgabe

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

BehaviorSubject

BehaviorSubject gibt Ihnen beim Aufruf den neuesten Wert. Sie können ein Verhaltensthema wie unten gezeigt erstellen -

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

Hier ist ein Arbeitsbeispiel für die Verwendung des Verhaltensthemas

Beispiel

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

Ausgabe

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

Betreff wiedergeben

Ein Wiederholungsobjekt ähnelt dem Verhaltensthema, wobei es die Werte puffern und sie den neuen Abonnenten wiedergeben kann. Hier ist ein funktionierendes Beispiel für ein Wiedergabethema.

Beispiel

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

Der verwendete Pufferwert ist 2 für das Wiedergabethema. Die letzten beiden Werte werden also gepuffert und für die neu angerufenen Teilnehmer verwendet.

Ausgabe

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

Im Fall von AsyncSubject wird der zuletzt aufgerufene Wert an den Abonnenten übergeben und erst nach dem Aufruf der Methode complete () ausgeführt.

Beispiel

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

Ausgabe

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2