RxPY - Guide rapide

Ce chapitre explique ce qu'est la programmation réactive, ce qu'est RxPY, ses opérateurs, ses fonctionnalités, ses avantages et ses inconvénients.

Qu'est-ce que la programmation réactive?

La programmation réactive est un paradigme de programmation qui traite du flux de données et de la propagation du changement. Cela signifie que, lorsqu'un flux de données est émis par un composant, le changement sera propagé à d'autres composants par une bibliothèque de programmation réactive. La propagation du changement continuera jusqu'à ce qu'il atteigne le récepteur final.

En utilisant RxPY, vous avez un bon contrôle sur les flux de données asynchrones, par exemple, une demande faite à l'URL peut être tracée à l'aide d'observable et utilisez l'observateur pour écouter lorsque la demande est terminée pour une réponse ou une erreur.

RxPY vous propose de gérer les flux de données asynchrones en utilisant Observables, interrogez les flux de données à l'aide de Operators ie filtre, somme, concat, mappe et utilise également la concurrence pour les flux de données en utilisant Schedulers. Créer un Observable, donne un objet observateur avec les méthodes on_next (v), on_error (e) et on_completed (), qui doit êtresubscribed afin que nous recevions une notification lorsqu'un événement se produit.

L'Observable peut être interrogé à l'aide de plusieurs opérateurs dans un format de chaîne à l'aide de l'opérateur de canal.

RxPY propose des opérateurs dans différentes catégories comme: -

  • Opérateurs mathématiques

  • Opérateurs de transformation

  • Opérateurs de filtrage

  • Opérateurs de gestion des erreurs

  • Opérateurs de services publics

  • Opérateurs conditionnels

  • Opérateurs de création

  • Opérateurs connectables

Ces opérateurs sont expliqués en détail dans ce tutoriel.

Qu'est-ce que RxPy?

RxPY est défini comme a library for composing asynchronous and event-based programs using observable collections and pipable query operators in Python selon le site officiel de RxPy, qui est https://rxpy.readthedocs.io/en/latest/.

RxPY est une bibliothèque python prenant en charge la programmation réactive. RxPy signifieReactive Extensions for Python. C'est une bibliothèque qui utilise des observables pour travailler avec une programmation réactive qui traite des appels de données asynchrones, des rappels et des programmes basés sur des événements.

Caractéristiques de RxPy

Dans RxPy, les concepts suivants prennent en charge la gestion de la tâche asynchrone -

Observable

Un observable est une fonction qui crée un observateur et l'attache à la source ayant des flux de données attendus, par exemple, de Tweets, d'événements liés à l'ordinateur, etc.

Observateur

C'est un objet avec les méthodes on_next (), on_error () et on_completed (), qui sera appelé lorsqu'il y a interaction avec l'observable c'est-à-dire que la source interagit pour un exemple de Tweets entrants, etc.

Abonnement

Lorsque l'observable est créée, pour exécuter l'observable, nous devons y souscrire.

Les opérateurs

Un opérateur est une fonction pure qui prend observable comme entrée et la sortie est également une observable. Vous pouvez utiliser plusieurs opérateurs sur des données observables à l'aide de l'opérateur de canal.

Matière

Un sujet est une séquence observable ainsi qu'un observateur qui peut diffuser en multidiffusion, c'est-à-dire parler à de nombreux observateurs qui se sont abonnés. Le sujet est une observable froide, c'est-à-dire que les valeurs seront partagées entre les observateurs abonnés.

Planificateurs

Une caractéristique importante de RxPy est la concurrence, c'est-à-dire permettre à la tâche de s'exécuter en parallèle. Pour ce faire, RxPy a deux opérateurs subscribe_on () et observe_on () qui fonctionnent avec les planificateurs et décideront de l'exécution de la tâche souscrite.

Avantages de l'utilisation de RxPY

Voici les avantages de RxPy -

  • RxPY est une bibliothèque géniale pour la gestion des flux de données et des événements asynchrones. RxPY utilise des observables pour travailler avec une programmation réactive qui traite des appels de données asynchrones, des rappels et des programmes basés sur des événements.

  • RxPY offre une énorme collection d'opérateurs dans les catégories mathématiques, transformation, filtrage, utilitaire, conditionnelle, gestion des erreurs, jointure qui facilite la vie lorsqu'il est utilisé avec la programmation réactive.

  • La concurrence, c'est-à-dire que le travail de plusieurs tâches ensemble est réalisé en utilisant des planificateurs dans RxPY.

  • Les performances sont améliorées en utilisant RxPY car la gestion des tâches asynchrones et le traitement parallèle sont simplifiés.

Inconvénient de l'utilisation de RxPY

  • Déboguer le code avec des observables est un peu difficile.

Dans ce chapitre, nous travaillerons sur l'installation de RxPy. Pour commencer à travailler avec RxPY, nous devons d'abord installer Python. Donc, nous allons travailler sur ce qui suit -

  • Installez Python
  • Installez RxPy

Installer Python

Accédez au site officiel de Python: https://www.python.org/downloads/.comme indiqué ci-dessous, et cliquez sur la dernière version disponible pour Windows, Linux / Unix et mac os. Téléchargez Python selon votre système d'exploitation 64 ou 32 bits disponible avec vous.

Une fois que vous avez téléchargé, cliquez sur le .exe file et suivez les étapes pour installer python sur votre système.

Le gestionnaire de paquets python, c'est-à-dire pip, sera également installé par défaut avec l'installation ci-dessus. Pour le faire fonctionner globalement sur votre système, ajoutez directement l'emplacement de python à la variable PATH, la même chose est affichée au début de l'installation, pour ne pas oublier de cocher la case, qui dit AJOUTER à PATH. Si vous oubliez de le vérifier, veuillez suivre les étapes ci-dessous pour l'ajouter à PATH.

Pour ajouter à PATH, suivez les étapes ci-dessous -

Faites un clic droit sur l'icône de votre ordinateur et cliquez sur Propriétés → Paramètres système avancés.

Il affichera l'écran comme indiqué ci-dessous -

Cliquez sur Variables d'environnement comme indiqué ci-dessus. Il affichera l'écran comme indiqué ci-dessous -

Sélectionnez Chemin et cliquez sur le bouton Modifier, ajoutez le chemin de localisation de votre python à la fin. Maintenant, vérifions la version de python.

Vérification de la version de python

E:\pyrx>python --version
Python 3.7.3

Installez RxPY

Maintenant que nous avons installé python, nous allons installer RxPy.

Une fois python installé, le gestionnaire de paquets python, c'est-à-dire pip, sera également installé. Voici la commande pour vérifier la version de pip -

E:\pyrx>pip --version
pip 19.1.1 from c:\users\xxxx\appdata\local\programs\python\python37\lib\site-
packages\pip (python 3.7)

Nous avons installé pip et la version est 19.1.1. Maintenant, nous allons utiliser pip pour installer RxPy

La commande est la suivante -

pip install rx

Dans ce tutoriel, nous utilisons RxPY version 3 et python version 3.7.3. Le fonctionnement de la version 3 de RxPY diffère un peu de la version précédente, c'est-à-dire de la version 1 de RxPY.

Dans ce chapitre, nous allons discuter des différences entre les 2 versions et des changements qui doivent être effectués au cas où vous mettriez à jour les versions Python et RxPY.

Observable dans RxPY

Dans RxPy version 1, Observable était une classe distincte -

from rx import Observable

Pour utiliser l'Observable, vous devez l'utiliser comme suit -

Observable.of(1,2,3,4,5,6,7,8,9,10)

Dans RxPy version 3, Observable fait directement partie du package rx.

Example

import rx
rx.of(1,2,3,4,5,6,7,8,9,10)

Opérateurs dans RxPy

Dans la version 1, l'opérateur était des méthodes de la classe Observable. Par exemple, pour utiliser les opérateurs, nous devons importer Observable comme indiqué ci-dessous -

from rx import Observable

Les opérateurs sont utilisés comme Observable.operator, par exemple, comme indiqué ci-dessous -

Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

Dans le cas de la version 3 de RxPY, les opérateurs sont fonctionnels et sont importés et utilisés comme suit -

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

Chaînage d'opérateurs à l'aide de la méthode Pipe ()

Dans RxPy version 1, au cas où vous deviez utiliser plusieurs opérateurs sur une observable, il fallait le faire comme suit -

Example

from rx import Observable
Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

Mais, dans le cas de la version 3 de RxPY, vous pouvez utiliser la méthode pipe () et plusieurs opérateurs comme indiqué ci-dessous -

Example

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

Un observable est une fonction qui crée un observateur et l'attache à la source où des valeurs sont attendues, par exemple, des clics, des événements de souris à partir d'un élément dom, etc.

Les sujets mentionnés ci-dessous seront étudiés en détail dans ce chapitre.

  • Créer des observables

  • Abonnez-vous et exécutez un observable

Créer des observables

Pour créer un observable, nous utiliserons create() et transmettez-lui la fonction contenant les éléments suivants.

  • on_next() - Cette fonction est appelée lorsque l'Observable émet un élément.

  • on_completed() - Cette fonction est appelée lorsque l'observable est terminé.

  • on_error() - Cette fonction est appelée lorsqu'une erreur se produit sur l'Observable.

Pour travailler avec la méthode create (), importez d'abord la méthode comme indiqué ci-dessous -

from rx import create

Voici un exemple de travail, pour créer un observable -

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error")
   observer.on_completed()
source = create(test_observable).

Abonnez-vous et exécutez un observable

Pour souscrire à une observable, nous devons utiliser la fonction subscribe () et passer la fonction de rappel on_next, on_error et on_completed.

Voici un exemple de travail -

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

La méthode subscribe () se charge de l'exécution de l'observable. La fonction de rappelon_next, on_error et on_completeddoit être passé à la méthode subscribe. L'appel à la méthode d'abonnement, à son tour, exécute la fonction test_observable ().

Il n'est pas obligatoire de passer les trois fonctions de rappel à la méthode subscribe (). Vous pouvez passer selon vos besoins le on_next (), on_error () et on_completed ().

La fonction lambda est utilisée pour on_next, on_error et on_completed. Il prendra les arguments et exécutera l'expression donnée.

Voici la sortie, de l'observable créée -

E:\pyrx>python testrx.py
Got - Hello
Job Done!

Ce chapitre explique en détail les opérateurs dans RxPY. Ces opérateurs comprennent -

  • Travailler avec les opérateurs
  • Opérateurs mathématiques
  • Opérateurs de transformation
  • Opérateurs de filtrage
  • Opérateurs de gestion des erreurs
  • Opérateurs de services publics
  • Opérateurs conditionnels
  • Opérateurs de création
  • Opérateurs connectables
  • Combinaison d'opérateurs

Le python réactif (Rx) a presque beaucoup d'opérateurs, qui facilitent la vie avec le codage python. Vous pouvez utiliser ces multiples opérateurs ensemble, par exemple, lorsque vous travaillez avec des chaînes, vous pouvez utiliser des opérateurs de mappage, de filtre et de fusion.

Travailler avec les opérateurs

Vous pouvez travailler avec plusieurs opérateurs ensemble en utilisant la méthode pipe (). Cette méthode permet de chaîner plusieurs opérateurs ensemble.

Voici un exemple pratique d'utilisation d'opérateurs -

test = of(1,2,3) // an observable
subscriber = test.pipe(
   op1(),
   op2(),
   op3()
)

Dans l'exemple ci-dessus, nous avons créé une méthode observable en utilisant of () qui prend les valeurs 1, 2 et 3. Maintenant, sur cette observable, vous pouvez effectuer une opération différente, en utilisant n'importe quel nombre d'opérateurs en utilisant la méthode pipe () comme indiqué au dessus de. L'exécution des opérateurs se poursuivra séquentiellement sur l'observable donnée.

Pour travailler avec des opérateurs, importez-le d'abord comme indiqué ci-dessous -

from rx import of, operators as op

Voici un exemple de travail -

testrx.py

from rx import of, operators as op
test = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
sub1 = test.pipe(
   op.filter(lambda s: s%2==0),
   op.reduce(lambda acc, x: acc + x)
)
sub1.subscribe(lambda x: print("Sum of Even numbers is {0}".format(x)))

Dans l'exemple ci-dessus, il existe une liste de nombres, à partir de laquelle nous filtrons les nombres pairs à l'aide d'un opérateur de filtre et l'ajoutons plus tard à l'aide d'un opérateur de réduction.

Output

E:\pyrx>python testrx.py
Sum of Even numbers is 30

Voici une liste d'opérateurs, dont nous allons discuter -

  • Créer des observables
  • Opérateurs mathématiques
  • Opérateurs de transformation
  • Opérateurs de filtrage
  • Opérateurs de gestion des erreurs
  • Opérateurs de services publics
  • Conditional
  • Connectable
  • Combinaison d'opérateurs

Créer des observables

Voici les observables, nous allons discuter dans la catégorie Création

Afficher des exemples

Observable La description
créer Cette méthode est utilisée pour créer un observable.
vide Cette observable ne sortira rien et émettra directement l'état complet.
jamais Cette méthode crée un observable qui n'atteindra jamais l'état complet.
jeter Cette méthode créera un observable qui générera une erreur.
de_ Cette méthode convertira le tableau ou l'objet donné en observable.
intervalle Cette méthode donnera une série de valeurs produites après un timeout.
juste Cette méthode convertira une valeur donnée en observable.
intervalle Cette méthode donnera une plage d'entiers en fonction de l'entrée donnée.
repeat_value Cette méthode créera une observable qui répétera la valeur donnée selon le nombre donné.
début Cette méthode prend une fonction en tant qu'entrée et renvoie une observable qui retournera une valeur à partir de la fonction d'entrée.
minuteur Cette méthode émettra les valeurs dans l'ordre après l'expiration du délai.

Opérateurs mathématiques

Les opérateurs que nous allons discuter dans la catégorie Opérateurs mathématiques sont les suivants: -

Afficher des exemples

Opérateur La description
moyenne Cet opérateur calculera la moyenne à partir de la source observable donnée et produira une observable qui aura la valeur moyenne.
concat Cet opérateur prendra deux ou plusieurs observables et donne une seule observable avec toutes les valeurs de la séquence.
compter

Cet opérateur prend un Observable avec des valeurs et le convertit en un Observable qui aura une valeur unique. La fonction de comptage prend la fonction de prédicat comme argument facultatif.

La fonction est de type booléen et ajoutera de la valeur à la sortie uniquement si elle remplit la condition.

max Cet opérateur donnera une observable avec une valeur maximale à partir de l'observable source.
min Cet opérateur donnera une observable avec une valeur min à partir de l'observable source.
réduire Cet opérateur prend dans une fonction appelée fonction d'accumulateur qui est utilisée sur les valeurs provenant de l'observable source, et il retourne les valeurs accumulées sous la forme d'une observable, avec une valeur de départ optionnelle passée à la fonction d'accumulation.
somme Cet opérateur renverra une observable avec la somme de toutes les valeurs des observables source.

Opérateurs de transformation

Les opérateurs dont nous allons parler dans la catégorie Opérateurs de transformation sont mentionnés ci-dessous -

Afficher des exemples

Opérateur Catégorie
tampon Cet opérateur collectera toutes les valeurs de l'observable source, et les émettra à intervalles réguliers une fois la condition aux limites donnée satisfaite.
ground_by Cet opérateur regroupera les valeurs provenant de l'observable source en fonction de la fonction key_mapper donnée.
carte Cet opérateur changera chaque valeur de la source observable en une nouvelle valeur basée sur la sortie du mapper_func donné.
analyse Cet opérateur appliquera une fonction d'accumulation aux valeurs provenant de l'observable source et retournera une observable avec de nouvelles valeurs.

Opérateurs de filtrage

Les opérateurs que nous allons discuter dans la catégorie des opérateurs de filtrage sont donnés ci-dessous -

Afficher des exemples

Opérateur Catégorie
rebondir Cet opérateur donnera les valeurs de la source observable, jusqu'à ce que la période donnée et ignore le reste du temps.
distinct Cet opérateur donnera toutes les valeurs distinctes de l'observable source.
element_at Cet opérateur donnera un élément de la source observable pour l'index donné.
filtre Cet opérateur filtrera les valeurs de l'observable source en fonction de la fonction de prédicat donnée.
première Cet opérateur donnera le premier élément de la source observable.
ignore_elements Cet opérateur ignorera toutes les valeurs de l'observable source et n'exécutera que les appels aux fonctions de rappel complètes ou d'erreur.
dernier Cet opérateur donnera le dernier élément de la source observable.
sauter Cet opérateur rendra une observable qui sautera la première occurrence des éléments de comptage pris en entrée.
skip_last Cet opérateur rendra une observable qui sautera la dernière occurrence des éléments de comptage pris en entrée.
prendre Cet opérateur donnera une liste de valeurs source dans un ordre continu basé sur le nombre donné.
take_last Cet opérateur donnera une liste de valeurs source dans l'ordre continu du dernier en fonction du nombre donné.

Opérateurs de gestion des erreurs

Les opérateurs dont nous allons parler dans la catégorie Opérateur de gestion des erreurs sont: -

Afficher des exemples

Opérateur La description
capture Cet opérateur mettra fin à l'observable source lorsqu'il y a une exception.
retenter Cet opérateur réessayera sur l'observable source en cas d'erreur et une fois le nombre de tentatives terminé, il se terminera.

Opérateurs de services publics

Voici les opérateurs dont nous allons parler dans la catégorie Opérateur de services publics.

Afficher des exemples

Opérateur La description
retard Cet opérateur retardera l'émission observable de la source selon l'heure ou la date indiquée.
se concrétiser Cet opérateur convertira les valeurs de la source observable avec les valeurs émises sous forme de valeurs de notification explicites.
intervalle de temps Cet opérateur donnera le temps écoulé entre les valeurs de la source observable.
temps libre Cet opérateur donnera toutes les valeurs de la source observables après le temps écoulé ou bien déclenchera une erreur.
horodatage Cet opérateur attachera un horodatage à toutes les valeurs de l'observable source.

Opérateurs conditionnels et booléens

Les opérateurs que nous allons discuter dans la catégorie Opérateur conditionnel et booléen sont indiqués ci-dessous -

Afficher des exemples

Opérateur La description
tout Cet opérateur vérifiera si toutes les valeurs de l'observable source satisfont à la condition donnée.
contient Cet opérateur renverra une observable avec la valeur true ou false si la valeur donnée est présente et si c'est la valeur de l'observable source.
default_if_empty Cet opérateur renverra une valeur par défaut si l'observable source est vide.
sequence_equal Cet opérateur comparera deux séquences d'observables ou un tableau de valeurs et retournera une observable avec la valeur true ou false.
skip_until Cet opérateur rejettera les valeurs de l'observable source jusqu'à ce que la seconde observable émette une valeur.
skip_ while Cet opérateur renverra une observable avec des valeurs de l'observable source qui satisfait la condition passée.
take_until Cet opérateur rejettera les valeurs de l'observable source après que la seconde observable ait émis une valeur ou soit terminée.
prendre_maintenant Cet opérateur rejettera les valeurs de la source observable lorsque la condition échoue.

Opérateurs connectables

Les opérateurs dont nous allons parler dans la catégorie des opérateurs connectables sont -

Afficher des exemples

Opérateur La description
publier Cette méthode convertira l'observable en observable connectable.
ref_count Cet opérateur fera de l'observable un observable normal.
rejouer Cette méthode fonctionne de manière similaire à replaySubject. Cette méthode retournera les mêmes valeurs, même si l'observable a déjà émis et que certains des abonnés sont en retard dans l'abonnement.

Combinaison d'opérateurs

Voici les opérateurs dont nous allons parler dans la catégorie Opérateur de combinaison.

Afficher des exemples

Opérateur La description
combine_latest Cet opérateur créera un tuple pour l'observable donnée en entrée.
fusionner Cet opérateur fusionnera des observables données.
Commencer avec Cet opérateur prendra les valeurs données et ajoutera au début de l'observable source renvoyer la séquence complète.
Zip *: français Cet opérateur renvoie une observable avec des valeurs sous forme de tuple qui est formée en prenant la première valeur de l'observable donnée et ainsi de suite.

Un sujet est une séquence observable, ainsi qu'un observateur qui peut multidiffuser, c'est-à-dire parler à de nombreux observateurs qui se sont abonnés.

Nous allons discuter des sujets suivants sur le sujet -

  • Créer un sujet
  • Abonnez-vous à un sujet
  • Passer des données au sujet
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

Créer un sujet

Pour travailler avec un sujet, nous devons importer le sujet comme indiqué ci-dessous -

from rx.subject import Subject

Vous pouvez créer un sujet-objet comme suit -

subject_test = Subject()

L'objet est un observateur qui a trois méthodes -

  • on_next(value)
  • on_error (erreur) et
  • on_completed()

S'abonner à un sujet

Vous pouvez créer plusieurs abonnements sur le sujet comme indiqué ci-dessous -

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

Transmission des données au sujet

Vous pouvez transmettre des données au sujet créé à l'aide de la méthode on_next (value) comme indiqué ci-dessous -

subject_test.on_next("A")
subject_test.on_next("B")

Les données seront transmises à l'ensemble de l'abonnement, ajoutées sur le sujet.

Voici un exemple de travail du sujet.

Exemple

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

L'objet subject_test est créé en appelant un Subject (). L'objet subject_test fait référence aux méthodes on_next (valeur), on_error (erreur) et on_completed (). La sortie de l'exemple ci-dessus est indiquée ci-dessous -

Production

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

Nous pouvons utiliser la méthode on_completed (), pour arrêter l'exécution du sujet comme indiqué ci-dessous.

Exemple

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

Une fois que nous appelons complete, la méthode suivante appelée plus tard n'est pas appelée.

Production

E:\pyrx>python testrx.py
The value is A
The value is A

Voyons maintenant comment appeler la méthode on_error (error).

Exemple

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

Production

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

BehaviorSubject

BehaviorSubject vous donnera la dernière valeur une fois appelé. Vous pouvez créer un sujet de comportement comme indiqué ci-dessous -

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

Voici un exemple pratique d'utilisation du sujet de comportement

Exemple

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

Production

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

Rejouer le sujet

Un sujet de relecture est similaire au sujet de comportement, dans lequel il peut mettre en mémoire tampon les valeurs et les rejouer aux nouveaux abonnés. Voici un exemple fonctionnel de sujet de relecture.

Exemple

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

La valeur de tampon utilisée est 2 sur le sujet de la relecture. Ainsi, les deux dernières valeurs seront mises en mémoire tampon et utilisées pour les nouveaux abonnés appelés.

Production

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

Dans le cas d'AsyncSubject, la dernière valeur appelée est transmise à l'abonné, et elle ne sera effectuée qu'après l'appel de la méthode complete ().

Exemple

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

Production

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2

Une caractéristique importante de RxPy est la concurrence, c'est-à-dire permettre à la tâche de s'exécuter en parallèle. Pour ce faire, nous avons deux opérateurs subscribe_on () et observe_on () qui fonctionneront avec un ordonnanceur, qui décidera de l'exécution de la tâche souscrite.

Voici un exemple de travail qui montre la nécessité de subscibe_on (), observe_on () et scheduler.

Exemple

import random
import time
import rx
from rx import operators as ops
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
) 
input("Press any key to exit\n")

Dans l'exemple ci-dessus, j'ai 2 tâches: Tâche 1 et Tâche 2. L'exécution de la tâche est en séquence. La deuxième tâche ne démarre que lorsque la première tâche est terminée.

Production

E:\pyrx>python testrx.py
From Task 1: 1
From Task 1: 2
From Task 1: 3
From Task 1: 4
From Task 1: 5
Task 1 complete
From Task 2: 1
From Task 2: 2
From Task 2: 3
From Task 2: 4
Task 2 complete

RxPy prend en charge de nombreux Scheduler, et ici, nous allons utiliser ThreadPoolScheduler. ThreadPoolScheduler essaiera principalement de gérer avec les threads CPU disponibles.

Dans l'exemple que nous avons vu précédemment, nous allons utiliser un module multiprocesseur qui nous donnera le cpu_count. Le décompte sera donné au ThreadPoolScheduler qui parviendra à faire fonctionner la tâche en parallèle en fonction des threads disponibles.

Voici un exemple de travail -

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
input("Press any key to exit\n")

Dans l'exemple ci-dessus, j'ai 2 tâches et le cpu_count est 4. Depuis, la tâche est de 2 et les threads disponibles avec nous sont de 4, les deux tâches peuvent démarrer en parallèle.

Production

E:\pyrx>python testrx.py
Cpu count is : 4
Press any key to exit
From Task 1: 1
From Task 2: 1
From Task 1: 2
From Task 2: 2
From Task 2: 3
From Task 1: 3
From Task 2: 4
Task 2 complete
From Task 1: 4
From Task 1: 5
Task 1 complete

Si vous voyez la sortie, la tâche a démarré en parallèle.

Maintenant, considérons un scénario où la tâche est supérieure au nombre de CPU, c'est-à-dire que le nombre de CPU est de 4 et les tâches de 5. Dans ce cas, nous aurions besoin de vérifier si un thread est devenu libre après la fin de la tâche, de sorte qu'il puisse être affecté à la nouvelle tâche disponible dans la file d'attente.

Pour cela, nous pouvons utiliser l'opérateur observe_on () qui observera le planificateur si des threads sont libres. Voici un exemple de travail utilisant observer_on ()

Exemple

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
#Task 3
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 3: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 3 complete")
)
#Task 4
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 4: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 4 complete")
)
#Task 5
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.observe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 5: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 5 complete")
)
input("Press any key to exit\n")

Production

E:\pyrx>python testrx.py
Cpu count is : 4
From Task 4: 1
From Task 4: 2
From Task 1: 1
From Task 2: 1
From Task 3: 1
From Task 1: 2
From Task 3: 2
From Task 4: 3
From Task 3: 3
From Task 2: 2
From Task 1: 3
From Task 4: 4
Task 4 complete
From Task 5: 1
From Task 5: 2
From Task 5: 3
From Task 3: 4
Task 3 complete
From Task 2: 3
Press any key to exit
From Task 5: 4
Task 5 complete
From Task 1: 4
From Task 2: 4
Task 2 complete
From Task 1: 5
Task 1 complete

Si vous voyez la sortie, la tâche de moment 4 est terminée, le fil est donné à la tâche suivante, c'est-à-dire, la tâche 5 et la même tâche commence à s'exécuter.

Dans ce chapitre, nous aborderons en détail les sujets suivants -

  • Exemple de base montrant le fonctionnement d'observable, d'opérateurs et d'abonnement à l'observateur.
  • Différence entre observable et sujet.
  • Comprendre les observables froids et chauds.

Vous trouverez ci-dessous un exemple de base montrant le fonctionnement de l'observable, des opérateurs et l'abonnement à l'observateur.

Exemple

test.py

import requests
import rx
import json
from rx import operators as ops
def filternames(x):
   if (x["name"].startswith("C")):
      return x["name"]
   else :
      return ""
content = requests.get('https://jsonplaceholder.typicode.com/users')
y = json.loads(content.text)
source = rx.from_(y)
case1 = source.pipe(
   ops.filter(lambda c: filternames(c)),
   ops.map(lambda a:a["name"])
)
case1.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)), 8. RxPy — Examples
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

Voici un exemple très simple, dans lequel j'obtiens des données utilisateur à partir de cette URL -

https://jsonplaceholder.typicode.com/users.

Filtrer les données, pour donner les noms commençant par "C", et plus tard utiliser la carte pour renvoyer les noms uniquement. Voici la sortie pour le même -

E:\pyrx\examples>python test.py
Got - Clementine Bauch
Got - Chelsey Dietrich
Got - Clementina DuBuque
Job Done!

Différence entre observable et sujet

Dans cet exemple, nous verrons la différence entre un observable et un sujet.

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

Production

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

Dans l'exemple ci-dessus, chaque fois que vous vous abonnez à l'observable, il vous donnera de nouvelles valeurs.

Exemple de sujet

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

Production

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

Si vous voyez que les valeurs sont partagées, entre les deux abonnés utilisant le sujet.

Comprendre les observables froids et chauds

Un observable est classé comme

  • Observables froids
  • Observables chauds

La différence d'observables sera remarquée lorsque plusieurs abonnés s'abonnent.

Observables froids

Les observables froids sont des observables qui sont exécutés et restituent des données à chaque fois qu'il est abonné. Lorsqu'elle est souscrite, l'observable est exécutée et les nouvelles valeurs sont données.

L'exemple suivant donne la compréhension du froid observable.

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

Production

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

Dans l'exemple ci-dessus, chaque fois que vous vous abonnez à l'observable, il exécutera l'observable et émettra des valeurs. Les valeurs peuvent également différer d'un abonné à un autre, comme illustré dans l'exemple ci-dessus.

Observables chauds

Dans le cas des observables à chaud, ils émettront les valeurs quand ils seront prêts et n'attendront pas toujours un abonnement. Lorsque les valeurs sont émises, tous les abonnés recevront la même valeur.

Vous pouvez utiliser l'observable à chaud lorsque vous souhaitez que des valeurs soient émises lorsque l'observable est prête ou que vous souhaitez partager les mêmes valeurs avec tous vos abonnés.

Un exemple d'observable à chaud est les opérateurs Subject et connectable.

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

Production

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

Si vous voyez, la même valeur est partagée entre les abonnés. Vous pouvez obtenir la même chose en utilisant l'opérateur observable connectable publish ().