RxPY - Kurzanleitung

In diesem Kapitel wird erläutert, was reaktive Programmierung ist, was RxPY ist, seine Operatoren, Funktionen, Vor- und Nachteile.

Was ist reaktive Programmierung?

Reaktive Programmierung ist ein Programmierparadigma, das sich mit dem Datenfluss und der Ausbreitung von Veränderungen befasst. Dies bedeutet, dass, wenn ein Datenfluss von einer Komponente ausgegeben wird, die Änderung von einer reaktiven Programmierbibliothek auf andere Komponenten übertragen wird. Die Weitergabe von Änderungen wird fortgesetzt, bis sie den endgültigen Empfänger erreichen.

Durch die Verwendung von RxPY haben Sie eine gute Kontrolle über die asynchronen Datenströme. Beispielsweise kann eine an die URL gesendete Anforderung mithilfe von Observable verfolgt werden und mithilfe des Beobachters abhören, wenn die Anforderung auf Antwort oder Fehler abgeschlossen ist.

RxPY bietet Ihnen die Möglichkeit, asynchrone Datenströme mit zu verarbeiten ObservablesFragen Sie die Datenströme mit ab Operators dh filtern, summieren, concat, map und nutzen auch die Parallelität für die Datenströme mit Schedulers. Wenn Sie ein Observable erstellen, erhalten Sie ein Observer-Objekt mit den Methoden on_next (v), on_error (e) und on_completed (), die benötigt werdensubscribed damit wir eine Benachrichtigung erhalten, wenn ein Ereignis eintritt.

Das Observable kann mithilfe des Pipe-Operators mit mehreren Operatoren in einem Kettenformat abgefragt werden.

RxPY bietet Betreibern in verschiedenen Kategorien wie: -

  • Mathematische Operatoren

  • Transformationsoperatoren

  • Filteroperatoren

  • Fehlerbehandlungsoperatoren

  • Versorgungsunternehmen

  • Bedingte Operatoren

  • Erstellungsoperatoren

  • Anschließbare Bediener

Diese Operatoren werden in diesem Lernprogramm ausführlich erläutert.

Was ist RxPy?

RxPY ist definiert als a library for composing asynchronous and event-based programs using observable collections and pipable query operators in Python gemäß der offiziellen Website von RxPy, die ist https://rxpy.readthedocs.io/en/latest/.

RxPY ist eine Python-Bibliothek zur Unterstützung der reaktiven Programmierung. RxPy steht fürReactive Extensions for Python. Es ist eine Bibliothek, die Observables verwendet, um mit reaktiver Programmierung zu arbeiten, die sich mit asynchronen Datenaufrufen, Rückrufen und ereignisbasierten Programmen befasst.

Funktionen von RxPy

In RxPy kümmern sich die folgenden Konzepte um die Behandlung der asynchronen Aufgabe:

Beobachtbar

Ein Observable ist eine Funktion, die einen Beobachter erstellt und an die Quelle mit Datenströmen anfügt, die Datenströme enthalten, die beispielsweise von Tweets, computerbezogenen Ereignissen usw. erwartet werden.

Beobachter

Es ist ein Objekt mit den Methoden on_next (), on_error () und on_completed (), das aufgerufen wird, wenn eine Interaktion mit dem Observable stattfindet, dh die Quelle interagiert für ein Beispiel für eingehende Tweets usw.

Abonnement

Wenn das Observable erstellt wird, müssen wir es abonnieren, um das Observable auszuführen.

Betreiber

Ein Operator ist eine reine Funktion, die Observable als Eingabe aufnimmt, und die Ausgabe ist auch Observable. Sie können mehrere Operatoren für beobachtbare Daten verwenden, indem Sie den Pipe-Operator verwenden.

Gegenstand

Ein Subjekt ist eine beobachtbare Sequenz sowie ein Beobachter, der Multicasting durchführen kann, dh mit vielen Beobachtern spricht, die sich angemeldet haben. Das Thema ist kalt beobachtbar, dh die Werte werden zwischen den abonnierten Beobachtern geteilt.

Scheduler

Ein wichtiges Merkmal von RxPy ist die Parallelität, dh die parallele Ausführung der Aufgabe. Um dies zu erreichen, verfügt RxPy über zwei Operatoren subscribe_on () und compare_on (), die mit Schedulern zusammenarbeiten und über die Ausführung der abonnierten Aufgabe entscheiden.

Vorteile der Verwendung von RxPY

Das Folgende sind die Vorteile von RxPy -

  • RxPY ist eine großartige Bibliothek, wenn es um die Behandlung von asynchronen Datenströmen und Ereignissen geht. RxPY verwendet Observables, um mit reaktiver Programmierung zu arbeiten, die asynchrone Datenaufrufe, Rückrufe und ereignisbasierte Programme behandelt.

  • RxPY bietet eine große Sammlung von Operatoren in den Kategorien Mathematik, Transformation, Filterung, Nützlichkeit, Bedingung, Fehlerbehandlung und Verknüpfung, die das Leben bei Verwendung mit reaktiver Programmierung erleichtern.

  • Die gleichzeitige Bearbeitung mehrerer Aufgaben wird mithilfe von Schedulern in RxPY erreicht.

  • Die Leistung wird mithilfe von RxPY verbessert, da die Handhabung asynchroner Aufgaben und die parallele Verarbeitung vereinfacht werden.

Nachteil der Verwendung von RxPY

  • Das Debuggen des Codes mit Observablen ist etwas schwierig.

In diesem Kapitel werden wir an der Installation von RxPy arbeiten. Um mit RxPY arbeiten zu können, müssen wir zuerst Python installieren. Also werden wir an Folgendem arbeiten:

  • Installieren Sie Python
  • Installieren Sie RxPy

Python installieren

Gehen Sie zur offiziellen Python-Website: https://www.python.org/downloads/.Klicken Sie auf die neueste Version für Windows, Linux / Unix und Mac OS. Laden Sie Python gemäß Ihrem 64- oder 32-Bit-Betriebssystem herunter, das Ihnen zur Verfügung steht.

Klicken Sie nach dem Herunterladen auf .exe file und befolgen Sie die Schritte, um Python auf Ihrem System zu installieren.

Der Python-Paketmanager, dh pip, wird bei der obigen Installation ebenfalls standardmäßig installiert. Damit es global auf Ihrem System funktioniert, fügen Sie der PATH-Variablen direkt den Speicherort von Python hinzu. Dies wird zu Beginn der Installation angezeigt. Denken Sie daran, das Kontrollkästchen ADD to PATH zu aktivieren. Falls Sie vergessen haben, dies zu überprüfen, befolgen Sie bitte die unten angegebenen Schritte, um PATH hinzuzufügen.

Führen Sie die folgenden Schritte aus, um PATH hinzuzufügen:

Klicken Sie mit der rechten Maustaste auf Ihr Computersymbol und klicken Sie auf Eigenschaften → Erweiterte Systemeinstellungen.

Es wird der Bildschirm wie unten gezeigt angezeigt -

Klicken Sie wie oben gezeigt auf Umgebungsvariablen. Es wird der Bildschirm wie unten gezeigt angezeigt -

Wählen Sie Pfad und klicken Sie auf die Schaltfläche Bearbeiten. Fügen Sie am Ende den Speicherortpfad Ihrer Python hinzu. Lassen Sie uns nun die Python-Version überprüfen.

Nach Python-Version suchen

E:\pyrx>python --version
Python 3.7.3

Installieren Sie RxPY

Nachdem wir Python installiert haben, werden wir RxPy installieren.

Sobald Python installiert ist, wird auch der Python-Paketmanager, dh pip, installiert. Es folgt der Befehl zum Überprüfen der Pip-Version:

E:\pyrx>pip --version
pip 19.1.1 from c:\users\xxxx\appdata\local\programs\python\python37\lib\site-
packages\pip (python 3.7)

Wir haben Pip installiert und die Version ist 19.1.1. Jetzt werden wir pip verwenden, um RxPy zu installieren

Der Befehl lautet wie folgt:

pip install rx

In diesem Tutorial verwenden wir RxPY Version 3 und Python Version 3.7.3. Die Funktionsweise von RxPY Version 3 unterscheidet sich geringfügig von der früheren Version, dh RxPY Version 1.

In diesem Kapitel werden die Unterschiede zwischen den beiden Versionen und die Änderungen erläutert, die vorgenommen werden müssen, wenn Sie Python- und RxPY-Versionen aktualisieren.

Beobachtbar in RxPY

In RxPy Version 1 war Observable eine separate Klasse -

from rx import Observable

Um das Observable zu verwenden, müssen Sie es wie folgt verwenden:

Observable.of(1,2,3,4,5,6,7,8,9,10)

In RxPy Version 3 ist Observable direkt Teil des RX-Pakets.

Example

import rx
rx.of(1,2,3,4,5,6,7,8,9,10)

Operatoren in RxPy

In Version 1 war der Operator Methoden in der Observable-Klasse. Um beispielsweise Operatoren zu verwenden, müssen wir Observable wie unten gezeigt importieren -

from rx import Observable

Die Operatoren werden beispielsweise als Observable.operator verwendet, wie unten gezeigt -

Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

Im Fall von RxPY Version 3 sind Operatoren funktionsfähig und werden wie folgt importiert und verwendet:

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

Verketten von Operatoren mit der Pipe () -Methode

In RxPy Version 1 musste für den Fall, dass Sie mehrere Operatoren für ein Observable verwenden mussten, wie folgt vorgegangen werden:

Example

from rx import Observable
Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

Im Fall von RxPY Version 3 können Sie jedoch die pipe () -Methode und mehrere Operatoren verwenden, wie unten gezeigt -

Example

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

Ein Observable ist eine Funktion, die einen Beobachter erstellt und an die Quelle anfügt, an der Werte erwartet werden, z. B. Klicks, Mausereignisse von einem dom-Element usw.

Die unten genannten Themen werden in diesem Kapitel ausführlich behandelt.

  • Observables erstellen

  • Abonnieren Sie ein Observable und führen Sie es aus

Erstellen Sie Observablen

Um ein Observable zu erstellen, werden wir verwenden create() Methode und übergeben Sie die Funktion an sie, die die folgenden Elemente enthält.

  • on_next() - Diese Funktion wird aufgerufen, wenn das Observable ein Element ausgibt.

  • on_completed() - Diese Funktion wird aufgerufen, wenn das Observable abgeschlossen ist.

  • on_error() - Diese Funktion wird aufgerufen, wenn auf dem Observable ein Fehler auftritt.

Um mit der Methode create () zu arbeiten, importieren Sie zuerst die Methode wie unten gezeigt -

from rx import create

Hier ist ein Arbeitsbeispiel, um ein Observable zu erstellen -

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error")
   observer.on_completed()
source = create(test_observable).

Abonnieren Sie ein Observable und führen Sie es aus

Um ein Observable zu abonnieren, müssen wir die Funktion subscribe () verwenden und die Rückruffunktion on_next, on_error und on_completed übergeben.

Hier ist ein Arbeitsbeispiel -

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

Die subscribe () -Methode sorgt für die Ausführung des Observable. Die Rückruffunktionon_next, on_error und on_completedmuss an die subscribe-Methode übergeben werden. Die Methode call to subscribe führt wiederum die Funktion test_observable () aus.

Es ist nicht zwingend erforderlich, alle drei Rückruffunktionen an die subscribe () -Methode zu übergeben. Sie können on_next (), on_error () und on_completed () gemäß Ihren Anforderungen übergeben.

Die Lambda-Funktion wird für on_next, on_error und on_completed verwendet. Es nimmt die Argumente auf und führt den angegebenen Ausdruck aus.

Hier ist die Ausgabe des beobachtbaren erstellt -

E:\pyrx>python testrx.py
Got - Hello
Job Done!

In diesem Kapitel werden die Operatoren in RxPY ausführlich erläutert. Diese Operatoren umfassen -

  • Arbeiten mit Operatoren
  • Mathematische Operatoren
  • Transformationsoperatoren
  • Filteroperatoren
  • Fehlerbehandlungsoperatoren
  • Versorgungsunternehmen
  • Bedingte Operatoren
  • Erstellungsoperatoren
  • Anschließbare Bediener
  • Operatoren kombinieren

Reactive (Rx) Python hat fast viele Operatoren, die das Leben mit Python-Codierung erleichtern. Sie können diese mehreren Operatoren zusammen verwenden. Wenn Sie beispielsweise mit Zeichenfolgen arbeiten, können Sie Zuordnungs-, Filter- und Zusammenführungsoperatoren verwenden.

Arbeiten mit Operatoren

Mit der Methode pipe () können Sie mit mehreren Operatoren zusammenarbeiten. Diese Methode ermöglicht die Verkettung mehrerer Operatoren.

Hier ist ein funktionierendes Beispiel für die Verwendung von Operatoren -

test = of(1,2,3) // an observable
subscriber = test.pipe(
   op1(),
   op2(),
   op3()
)

Im obigen Beispiel haben wir eine beobachtbare Methode mit der Methode () erstellt, die die Werte 1, 2 und 3 annimmt. Auf dieser Beobachtungsmethode können Sie nun eine andere Operation ausführen, indem Sie eine beliebige Anzahl von Operatoren mit der Methode pipe () verwenden (siehe Abbildung) über. Die Ausführung der Operatoren erfolgt nacheinander auf der angegebenen beobachtbaren Stelle.

Um mit Operatoren zu arbeiten, importieren Sie es zuerst wie unten gezeigt -

from rx import of, operators as op

Hier ist ein Arbeitsbeispiel -

testrx.py

from rx import of, operators as op
test = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
sub1 = test.pipe(
   op.filter(lambda s: s%2==0),
   op.reduce(lambda acc, x: acc + x)
)
sub1.subscribe(lambda x: print("Sum of Even numbers is {0}".format(x)))

Im obigen Beispiel gibt es eine Liste von Zahlen, aus der wir gerade Zahlen mit einem Filteroperator filtern und später mit einem Reduzierungsoperator hinzufügen.

Output

E:\pyrx>python testrx.py
Sum of Even numbers is 30

Hier ist eine Liste der Operatoren, die wir diskutieren werden -

  • Observables erstellen
  • Mathematische Operatoren
  • Transformationsoperatoren
  • Filteroperatoren
  • Fehlerbehandlungsoperatoren
  • Versorgungsunternehmen
  • Conditional
  • Connectable
  • Operatoren kombinieren

Observables erstellen

Im Folgenden sind die Observablen aufgeführt, die wir in der Kategorie "Schöpfung" diskutieren werden

Beispiele anzeigen

Beobachtbar Beschreibung
erstellen Diese Methode wird verwendet, um ein Observable zu erstellen.
leer Dieses Observable gibt nichts aus und gibt direkt den vollständigen Zustand aus.
noch nie Diese Methode erstellt ein Observable, das niemals den vollständigen Zustand erreicht.
werfen Diese Methode erstellt eine Observable, die einen Fehler auslöst.
von_ Diese Methode konvertiert das angegebene Array oder Objekt in ein Observable.
Intervall Diese Methode liefert eine Reihe von Werten, die nach einer Zeitüberschreitung erzeugt werden.
gerade Diese Methode wandelt den angegebenen Wert in einen beobachtbaren Wert um.
Angebot Diese Methode gibt einen Bereich von Ganzzahlen basierend auf der angegebenen Eingabe an.
repeat_value Diese Methode erstellt eine Observable, die den angegebenen Wert gemäß der angegebenen Anzahl wiederholt.
Start Diese Methode nimmt eine Funktion als Eingabe auf und gibt eine Observable zurück, die einen Wert von der Eingabefunktion zurückgibt.
Timer Diese Methode gibt die Werte nach Ablauf des Timeouts nacheinander aus.

Mathematische Operatoren

Die Operatoren, die wir in der Kategorie Mathematische Operatoren diskutieren werden, sind wie folgt: -

Beispiele anzeigen

Operator Beschreibung
durchschnittlich Dieser Operator berechnet den Durchschnitt aus der angegebenen beobachtbaren Quelle und gibt eine beobachtbare Größe mit dem Durchschnittswert aus.
concat Dieser Operator nimmt zwei oder mehr Observablen auf und erhält eine einzige Observable mit allen Werten in der Sequenz.
Anzahl

Dieser Operator nimmt ein Observable mit Werten auf und konvertiert es in ein Observable mit einem einzelnen Wert. Die Zählfunktion nimmt die Prädikatfunktion als optionales Argument auf.

Die Funktion ist vom Typ Boolean und fügt der Ausgabe nur dann einen Wert hinzu, wenn sie die Bedingung erfüllt.

max Dieser Operator gibt ein Observable mit maximalem Wert aus der beobachtbaren Quelle an.
Mindest Dieser Operator gibt eine beobachtbare mit einem minimalen Wert von der beobachtbaren Quelle an.
reduzieren Dieser Operator übernimmt eine Funktion namens Akkumulatorfunktion, die für die Werte verwendet wird, die von der beobachtbaren Quelle stammen, und gibt die akkumulierten Werte in Form einer beobachtbaren zurück, wobei ein optionaler Startwert an die Akkumulatorfunktion übergeben wird.
Summe Dieser Operator gibt eine Observable mit der Summe aller Werte von Source Observables zurück.

Transformationsoperatoren

Die Operatoren, die wir in der Kategorie Transformationsoperatoren diskutieren werden, sind unten aufgeführt -

Beispiele anzeigen

Operator Kategorie
Puffer Dieser Operator sammelt alle Werte aus der beobachtbaren Quelle und gibt sie in regelmäßigen Abständen aus, sobald die angegebene Randbedingung erfüllt ist.
ground_by Dieser Operator gruppiert die Werte, die von der beobachtbaren Quelle stammen, basierend auf der angegebenen Funktion key_mapper.
Karte Dieser Operator ändert jeden Wert aus der beobachtbaren Quelle in einen neuen Wert, basierend auf der Ausgabe des angegebenen mapper_func.
Scan Dieser Operator wendet eine Akkumulatorfunktion auf die Werte an, die von der beobachtbaren Quelle stammen, und gibt eine beobachtbare mit neuen Werten zurück.

Filteroperatoren

Die Operatoren, die wir in der Kategorie Filteroperatoren diskutieren werden, sind unten angegeben -

Beispiele anzeigen

Operator Kategorie
entprellen Dieser Operator gibt die Werte aus der beobachtbaren Quelle bis zur angegebenen Zeitspanne an und ignoriert den Rest der Zeit.
deutlich Dieser Operator gibt alle Werte an, die sich von der beobachtbaren Quelle unterscheiden.
element_at Dieser Operator gibt ein Element aus der Quelle an, das für den angegebenen Index beobachtbar ist.
Filter Dieser Operator filtert Werte aus der beobachtbaren Quelle basierend auf der angegebenen Prädikatfunktion.
zuerst Dieser Operator gibt das erste Element aus der beobachtbaren Quelle an.
ignore_elements Dieser Operator ignoriert alle Werte aus der beobachtbaren Quelle und führt nur Aufrufe aus, um Rückruffunktionen abzuschließen oder Fehler zu machen.
zuletzt Dieser Operator gibt das letzte beobachtbare Element aus der Quelle an.
überspringen Dieser Operator gibt ein Observable zurück, das das erste Auftreten von Zählelementen überspringt, die als Eingabe verwendet werden.
skip_last Dieser Operator gibt ein Observable zurück, das das letzte Auftreten von Zählelementen überspringt, die als Eingabe verwendet wurden.
nehmen Dieser Operator gibt eine Liste der Quellwerte in fortlaufender Reihenfolge basierend auf der angegebenen Anzahl an.
take_last Dieser Operator gibt eine Liste der Quellwerte in fortlaufender Reihenfolge vom letzten basierend auf der angegebenen Anzahl an.

Fehlerbehandlungsoperatoren

Die Operatoren, die wir in der Kategorie Fehlerbehandlungsoperatoren diskutieren werden, sind: -

Beispiele anzeigen

Operator Beschreibung
Fang Dieser Operator beendet die beobachtbare Quelle, wenn eine Ausnahme vorliegt.
wiederholen Dieser Operator versucht es erneut mit der Quelle, die bei einem Fehler beobachtet werden kann, und wird beendet, sobald die Wiederholungszählung abgeschlossen ist.

Versorgungsunternehmen

Im Folgenden sind die Operatoren aufgeführt, die in der Kategorie Utility-Operatoren erläutert werden.

Beispiele anzeigen

Operator Beschreibung
verzögern Dieser Bediener verzögert die beobachtbare Emission der Quelle gemäß der angegebenen Uhrzeit oder dem angegebenen Datum.
materialisieren Dieser Operator konvertiert die Werte aus der beobachtbaren Quelle mit den ausgegebenen Werten in Form expliziter Benachrichtigungswerte.
Zeitintervall Dieser Operator gibt die zwischen den Werten der beobachtbaren Quelle verstrichene Zeit an.
Auszeit Dieser Operator gibt alle Werte aus der Quelle an, die nach Ablauf der Zeit beobachtbar sind, oder löst einen Fehler aus.
Zeitstempel Dieser Operator fügt allen beobachtbaren Werten der beobachtbaren Quelle einen Zeitstempel hinzu.

Bedingte und boolesche Operatoren

Die Operatoren, die wir in der Kategorie Bedingte und Boolesche Operatoren diskutieren werden, sind wie folgt:

Beispiele anzeigen

Operator Beschreibung
alle Dieser Operator prüft, ob alle Werte aus der beobachtbaren Quelle die angegebene Bedingung erfüllen.
enthält Dieser Operator gibt eine Observable mit dem Wert true oder false zurück, wenn der angegebene Wert vorhanden ist und wenn es sich um den Wert der Observable Source handelt.
default_if_empty Dieser Operator gibt einen Standardwert zurück, wenn die beobachtbare Quelle leer ist.
sequence_equal Dieser Operator vergleicht zwei Folgen von Observablen oder ein Array von Werten und gibt eine Observable mit dem Wert true oder false zurück.
skip_until Dieser Operator verwirft Werte aus der beobachtbaren Quelle, bis die zweite beobachtbare Quelle einen Wert ausgibt.
skip_while Dieser Operator gibt eine Observable mit Werten aus der Source Observable zurück, die die übergebene Bedingung erfüllen.
take_until Dieser Operator verwirft Werte aus der beobachtbaren Quelle, nachdem die zweite beobachtbare einen Wert ausgegeben oder beendet hat.
take_while Dieser Operator verwirft Werte aus der Quelle, die beobachtet werden können, wenn die Bedingung fehlschlägt.

Anschließbare Operatoren

Die Operatoren, die wir in der Kategorie Connectable Operator diskutieren werden, sind -

Beispiele anzeigen

Operator Beschreibung
veröffentlichen Diese Methode wandelt das Observable in ein verbindbares Observable um.
ref_count Dieser Operator macht das Observable zu einem normalen Observable.
Wiederholung Diese Methode funktioniert ähnlich wie das replaySubject. Diese Methode gibt dieselben Werte zurück, auch wenn das Observable bereits gesendet wurde und einige Abonnenten zu spät abonniert haben.

Operatoren kombinieren

Das Folgende sind die Operatoren, die wir in der Kategorie Kombinieren von Operatoren diskutieren werden.

Beispiele anzeigen

Operator Beschreibung
kombinieren_latest Dieser Operator erstellt ein Tupel für das als Eingabe angegebene Observable.
verschmelzen Dieser Operator führt bestimmte Observablen zusammen.
beginnen mit Dieser Operator nimmt die angegebenen Werte auf und fügt zu Beginn der beobachtbaren Quelle die vollständige Sequenz zurück.
Postleitzahl Dieser Operator gibt eine Observable mit Werten in Tupelform zurück, die gebildet wird, indem der erste Wert der gegebenen Observable usw. verwendet wird.

Ein Subjekt ist eine beobachtbare Sequenz sowie ein Beobachter, der Multicasting durchführen kann, dh mit vielen Beobachtern spricht, die sich angemeldet haben.

Wir werden die folgenden Themen zum Thema diskutieren -

  • Erstellen Sie ein Thema
  • Abonnieren Sie ein Thema
  • Übergabe von Daten an den Betreff
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

Erstellen Sie ein Thema

Um mit einem Betreff zu arbeiten, müssen Sie den Betreff wie unten gezeigt importieren -

from rx.subject import Subject

Sie können ein Subjekt-Objekt wie folgt erstellen:

subject_test = Subject()

Das Objekt ist ein Beobachter, der drei Methoden hat -

  • on_next(value)
  • on_error (Fehler) und
  • on_completed()

Abonnieren Sie einen Betreff

Sie können mehrere Abonnements zu diesem Thema erstellen, wie unten gezeigt -

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

Übergabe von Daten an den Betreff

Sie können Daten an den Betreff übergeben, der mit der Methode on_next (value) erstellt wurde (siehe unten).

subject_test.on_next("A")
subject_test.on_next("B")

Die Daten werden an alle Abonnements weitergegeben, die zu diesem Thema hinzugefügt wurden.

Hier ist ein Arbeitsbeispiel für das Thema.

Beispiel

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

Das subject_test-Objekt wird durch Aufrufen eines Subject () erstellt. Das subject_test-Objekt verweist auf die Methoden on_next (Wert), on_error (Fehler) und on_completed (). Die Ausgabe des obigen Beispiels ist unten dargestellt -

Ausgabe

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

Wir können die on_completed () -Methode verwenden, um die Betreffausführung wie unten gezeigt zu stoppen.

Beispiel

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

Sobald wir complete aufrufen, wird die nächste später aufgerufene Methode nicht mehr aufgerufen.

Ausgabe

E:\pyrx>python testrx.py
The value is A
The value is A

Lassen Sie uns nun sehen, wie die Methode on_error (error) aufgerufen wird.

Beispiel

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

Ausgabe

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

BehaviorSubject

BehaviorSubject gibt Ihnen beim Aufruf den neuesten Wert. Sie können ein Verhaltensthema wie unten gezeigt erstellen -

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

Hier ist ein Arbeitsbeispiel für die Verwendung des Verhaltensthemas

Beispiel

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

Ausgabe

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

Betreff wiedergeben

Ein Wiederholungsobjekt ähnelt dem Verhaltensthema, wobei es die Werte puffern und sie den neuen Abonnenten wiedergeben kann. Hier ist ein funktionierendes Beispiel für ein Wiedergabethema.

Beispiel

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

Der verwendete Pufferwert ist 2 für das Wiedergabethema. Die letzten beiden Werte werden also gepuffert und für die neu angerufenen Teilnehmer verwendet.

Ausgabe

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

Im Fall von AsyncSubject wird der zuletzt aufgerufene Wert an den Abonnenten übergeben und erst nach dem Aufruf der Methode complete () ausgeführt.

Beispiel

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

Ausgabe

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2

Ein wichtiges Merkmal von RxPy ist die Parallelität, dh die parallele Ausführung der Aufgabe. Um dies zu erreichen, haben wir zwei Operatoren subscribe_on () und compare_on (), die mit einem Scheduler zusammenarbeiten und über die Ausführung der abonnierten Aufgabe entscheiden.

Hier ist ein Arbeitsbeispiel, das die Notwendigkeit von Subscibe_on (), Observ_on () und Scheduler zeigt.

Beispiel

import random
import time
import rx
from rx import operators as ops
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
) 
input("Press any key to exit\n")

Im obigen Beispiel habe ich zwei Aufgaben: Aufgabe 1 und Aufgabe 2. Die Ausführung der Aufgabe erfolgt nacheinander. Die zweite Aufgabe startet erst, wenn die erste Aufgabe erledigt ist.

Ausgabe

E:\pyrx>python testrx.py
From Task 1: 1
From Task 1: 2
From Task 1: 3
From Task 1: 4
From Task 1: 5
Task 1 complete
From Task 2: 1
From Task 2: 2
From Task 2: 3
From Task 2: 4
Task 2 complete

RxPy unterstützt viele Scheduler, und hier werden wir ThreadPoolScheduler verwenden. ThreadPoolScheduler versucht hauptsächlich, mit den verfügbaren CPU-Threads zu verwalten.

In dem Beispiel, das wir zuvor gesehen haben, werden wir ein Multiprozessor-Modul verwenden, das uns den cpu_count gibt. Die Anzahl wird an den ThreadPoolScheduler übergeben, der es schafft, die Aufgabe basierend auf den verfügbaren Threads parallel zum Laufen zu bringen.

Hier ist ein Arbeitsbeispiel -

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
input("Press any key to exit\n")

Im obigen Beispiel habe ich 2 Aufgaben und die cpu_count ist 4. Da die Aufgabe 2 ist und die bei uns verfügbaren Threads 4 sind, können beide Aufgaben parallel gestartet werden.

Ausgabe

E:\pyrx>python testrx.py
Cpu count is : 4
Press any key to exit
From Task 1: 1
From Task 2: 1
From Task 1: 2
From Task 2: 2
From Task 2: 3
From Task 1: 3
From Task 2: 4
Task 2 complete
From Task 1: 4
From Task 1: 5
Task 1 complete

Wenn Sie die Ausgabe sehen, wurden beide Aufgaben parallel gestartet.

Stellen Sie sich nun ein Szenario vor, in dem die Aufgabe größer als die CPU-Anzahl ist, dh die CPU-Anzahl 4 und die Aufgaben 5 sind. In diesem Fall müssten wir prüfen, ob ein Thread nach Abschluss der Aufgabe frei geworden ist, damit dies möglich ist der neuen Aufgabe zugewiesen, die in der Warteschlange verfügbar ist.

Zu diesem Zweck können wir den Operator watch_on () verwenden, der den Scheduler beobachtet, wenn Threads frei sind. Hier ist ein Arbeitsbeispiel mit compare_on ()

Beispiel

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
#Task 3
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 3: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 3 complete")
)
#Task 4
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 4: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 4 complete")
)
#Task 5
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.observe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 5: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 5 complete")
)
input("Press any key to exit\n")

Ausgabe

E:\pyrx>python testrx.py
Cpu count is : 4
From Task 4: 1
From Task 4: 2
From Task 1: 1
From Task 2: 1
From Task 3: 1
From Task 1: 2
From Task 3: 2
From Task 4: 3
From Task 3: 3
From Task 2: 2
From Task 1: 3
From Task 4: 4
Task 4 complete
From Task 5: 1
From Task 5: 2
From Task 5: 3
From Task 3: 4
Task 3 complete
From Task 2: 3
Press any key to exit
From Task 5: 4
Task 5 complete
From Task 1: 4
From Task 2: 4
Task 2 complete
From Task 1: 5
Task 1 complete

Wenn Sie die Ausgabe sehen, wird in dem Moment, in dem Aufgabe 4 abgeschlossen ist, der Thread an die nächste Aufgabe übergeben, dh Aufgabe 5, und dieselbe wird ausgeführt.

In diesem Kapitel werden wir die folgenden Themen im Detail diskutieren -

  • Grundlegendes Beispiel, das die Arbeitsweise von Observable, Operatoren und das Abonnieren des Beobachters zeigt.
  • Unterschied zwischen beobachtbar und Subjekt.
  • Kalte und heiße Observable verstehen.

Im Folgenden finden Sie ein grundlegendes Beispiel für die Arbeitsweise von Observable, Operatoren und Abonnenten des Beobachters.

Beispiel

test.py

import requests
import rx
import json
from rx import operators as ops
def filternames(x):
   if (x["name"].startswith("C")):
      return x["name"]
   else :
      return ""
content = requests.get('https://jsonplaceholder.typicode.com/users')
y = json.loads(content.text)
source = rx.from_(y)
case1 = source.pipe(
   ops.filter(lambda c: filternames(c)),
   ops.map(lambda a:a["name"])
)
case1.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)), 8. RxPy — Examples
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

Hier ist ein sehr einfaches Beispiel, in dem ich Benutzerdaten von dieser URL erhalte -

https://jsonplaceholder.typicode.com/users.

Filtern Sie die Daten, um die Namen zu erhalten, die mit "C" beginnen, und verwenden Sie später die Karte, um nur die Namen zurückzugeben. Hier ist die Ausgabe für das gleiche -

E:\pyrx\examples>python test.py
Got - Clementine Bauch
Got - Chelsey Dietrich
Got - Clementina DuBuque
Job Done!

Unterschied zwischen beobachtbar und Subjekt

In diesem Beispiel sehen wir den Unterschied zwischen einem beobachtbaren und einem Subjekt.

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

Ausgabe

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

Im obigen Beispiel erhalten Sie jedes Mal, wenn Sie das Observable abonnieren, neue Werte.

Betreff Beispiel

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

Ausgabe

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

Wenn Sie sehen, dass die Werte gemeinsam genutzt werden, verwenden beide Teilnehmer den Betreff.

Grundlegendes zu kalten und heißen Observablen

Ein Observable wird klassifiziert als

  • Kalte Observable
  • Heiße Observablen

Der Unterschied bei den Observablen wird bemerkt, wenn mehrere Abonnenten abonnieren.

Kalte Observable

Kalte Observablen sind beobachtbar, die ausgeführt werden, und rendern Daten jedes Mal, wenn sie abonniert werden. Wenn es abonniert ist, wird das Observable ausgeführt und die neuen Werte werden angegeben.

Das folgende Beispiel vermittelt das Verständnis von beobachtbarer Kälte.

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

Ausgabe

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

Im obigen Beispiel werden jedes Mal, wenn Sie das Observable abonnieren, das Observable ausgeführt und Werte ausgegeben. Die Werte können auch von Teilnehmer zu Teilnehmer unterschiedlich sein, wie im obigen Beispiel gezeigt.

Heiße Observablen

Im Falle von Hot Observable geben sie die Werte aus, wenn sie bereit sind, und warten nicht immer auf ein Abonnement. Wenn die Werte ausgegeben werden, erhalten alle Teilnehmer den gleichen Wert.

Sie können Hot Observable verwenden, wenn Sie möchten, dass Werte ausgegeben werden, wenn das Observable bereit ist, oder wenn Sie allen Ihren Abonnenten dieselben Werte mitteilen möchten.

Ein Beispiel für Hot Observable sind Subject- und Connectable-Operatoren.

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

Ausgabe

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

Wenn Sie sehen, wird derselbe Wert zwischen den Abonnenten geteilt. Sie können dasselbe mit dem connectable-Observable-Operator Publish () erreichen.