RxPY - Arbeiten mit Betreff
Ein Subjekt ist eine beobachtbare Sequenz sowie ein Beobachter, der Multicasting durchführen kann, dh mit vielen Beobachtern spricht, die sich angemeldet haben.
Wir werden die folgenden Themen zum Thema diskutieren -
- Erstellen Sie ein Thema
- Abonnieren Sie ein Thema
- Übergabe von Daten an den Betreff
- BehaviorSubject
- ReplaySubject
- AsyncSubject
Erstellen Sie ein Thema
Um mit einem Betreff zu arbeiten, müssen Sie den Betreff wie unten gezeigt importieren -
from rx.subject import Subject
Sie können ein Subjekt-Objekt wie folgt erstellen:
subject_test = Subject()
Das Objekt ist ein Beobachter, der drei Methoden hat -
- on_next(value)
- on_error (Fehler) und
- on_completed()
Abonnieren Sie einen Betreff
Sie können mehrere Abonnements zu diesem Thema erstellen, wie unten gezeigt -
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
Übergabe von Daten an den Betreff
Sie können Daten an den Betreff übergeben, der mit der Methode on_next (value) erstellt wurde (siehe unten).
subject_test.on_next("A")
subject_test.on_next("B")
Die Daten werden an alle Abonnements weitergegeben, die zu diesem Thema hinzugefügt wurden.
Hier ist ein Arbeitsbeispiel für das Thema.
Beispiel
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")
Das subject_test-Objekt wird durch Aufrufen eines Subject () erstellt. Das subject_test-Objekt verweist auf die Methoden on_next (Wert), on_error (Fehler) und on_completed (). Die Ausgabe des obigen Beispiels ist unten dargestellt -
Ausgabe
E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B
Wir können die on_completed () -Methode verwenden, um die Betreffausführung wie unten gezeigt zu stoppen.
Beispiel
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")
Sobald wir complete aufrufen, wird die nächste später aufgerufene Methode nicht mehr aufgerufen.
Ausgabe
E:\pyrx>python testrx.py
The value is A
The value is A
Lassen Sie uns nun sehen, wie die Methode on_error (error) aufgerufen wird.
Beispiel
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))
Ausgabe
E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!
BehaviorSubject
BehaviorSubject gibt Ihnen beim Aufruf den neuesten Wert. Sie können ein Verhaltensthema wie unten gezeigt erstellen -
from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject
Hier ist ein Arbeitsbeispiel für die Verwendung des Verhaltensthemas
Beispiel
from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")
Ausgabe
E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject
Betreff wiedergeben
Ein Wiederholungsobjekt ähnelt dem Verhaltensthema, wobei es die Werte puffern und sie den neuen Abonnenten wiedergeben kann. Hier ist ein funktionierendes Beispiel für ein Wiedergabethema.
Beispiel
from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)
Der verwendete Pufferwert ist 2 für das Wiedergabethema. Die letzten beiden Werte werden also gepuffert und für die neu angerufenen Teilnehmer verwendet.
Ausgabe
E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5
AsyncSubject
Im Fall von AsyncSubject wird der zuletzt aufgerufene Wert an den Abonnenten übergeben und erst nach dem Aufruf der Methode complete () ausgeführt.
Beispiel
from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.
Ausgabe
E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2