RxPY - Guia Rápido

Este capítulo explica o que é programação reativa, o que é RxPY, seus operadores, recursos, vantagens e desvantagens.

O que é programação reativa?

A programação reativa é um paradigma de programação que lida com o fluxo de dados e a propagação da mudança. Isso significa que, quando um fluxo de dados é emitido por um componente, a mudança será propagada para outros componentes por uma biblioteca de programação reativa. A propagação da mudança continuará até atingir o receptor final.

Ao usar o RxPY, você tem um bom controle sobre os fluxos de dados assíncronos, por exemplo, uma solicitação feita para URL pode ser rastreada usando observável e usar o observador para ouvir quando a solicitação for concluída para resposta ou erro.

RxPY oferece a você lidar com fluxos de dados assíncronos usando Observables, consulte os fluxos de dados usando Operators ou seja, filtrar, somar, concatenar, mapear e também usar a simultaneidade para os fluxos de dados usando Schedulers. Criando um Observable, dá um objeto observador com os métodos on_next (v), on_error (e) e on_completed (), que precisa sersubscribed para que recebamos uma notificação quando um evento ocorrer.

O Observable pode ser consultado usando vários operadores em um formato de cadeia usando o operador de tubo.

RxPY oferece operadoras em várias categorias como: -

  • Operadores matemáticos

  • Operadores de transformação

  • Operadores de filtragem

  • Operadores de tratamento de erros

  • Operadores de serviços públicos

  • Operadores condicionais

  • Operadores de criação

  • Operadores conectáveis

Esses operadores são explicados em detalhes neste tutorial.

O que é RxPy?

RxPY é definido como a library for composing asynchronous and event-based programs using observable collections and pipable query operators in Python conforme o site oficial da RxPy, que é https://rxpy.readthedocs.io/en/latest/.

RxPY é uma biblioteca python para dar suporte à programação reativa. RxPy significaReactive Extensions for Python. É uma biblioteca que usa observáveis ​​para trabalhar com programação reativa que lida com chamadas de dados assíncronas, retornos de chamada e programas baseados em eventos.

Recursos do RxPy

No RxPy, os conceitos a seguir cuidam de lidar com a tarefa assíncrona -

Observável

Um observável é uma função que cria um observador e o anexa à fonte com fluxos de dados esperados de, por exemplo, Tweets, eventos relacionados ao computador, etc.

Observador

É um objeto com os métodos on_next (), on_error () e on_completed (), que será chamado quando houver interação com o observável, isto é, a fonte interage para um exemplo de Tweets recebidos, etc.

Inscrição

Quando o observável é criado, para executar o observável, precisamos assiná-lo.

Operadores

Um operador é uma função pura que recebe observáveis ​​como entrada e a saída também é observável. Você pode usar vários operadores em dados observáveis ​​usando o operador pipe.

Sujeito

Um assunto é uma sequência observável, bem como um observador que pode fazer multicast, ou seja, falar com muitos observadores que se inscreveram. O assunto é um observável frio, ou seja, os valores serão compartilhados entre os observadores que foram inscritos.

Schedulers

Uma característica importante do RxPy é a simultaneidade, ou seja, permitir que a tarefa seja executada em paralelo. Para que isso aconteça, o RxPy tem dois operadores subscribe_on () e observe_on () que trabalham com agendadores e decidem a execução da tarefa inscrita.

Vantagens de usar RxPY

A seguir estão as vantagens do RxPy -

  • RxPY é uma biblioteca incrível quando se trata de lidar com fluxos de dados assíncronos e eventos. RxPY usa observáveis ​​para trabalhar com programação reativa que lida com chamadas de dados assíncronas, retornos de chamada e programas baseados em eventos.

  • O RxPY oferece uma enorme coleção de operadores em categorias matemáticas, de transformação, de filtragem, de utilidade, condicionais, de tratamento de erros e de junção que tornam a vida mais fácil quando usados ​​com programação reativa.

  • A simultaneidade, isto é, trabalhar em várias tarefas em conjunto, é obtida usando agendadores no RxPY.

  • O desempenho é aprimorado usando RxPY, pois o manuseio de tarefas assíncronas e o processamento paralelo são facilitados.

Desvantagem de usar RxPY

  • Depurar o código com observáveis ​​é um pouco difícil.

Neste capítulo, trabalharemos na instalação do RxPy. Para começar a trabalhar com RxPY, precisamos primeiro instalar o Python. Então, vamos trabalhar no seguinte -

  • Instale Python
  • Instale RxPy

Instalando Python

Vá para o site oficial do Python: https://www.python.org/downloads/.conforme mostrado abaixo, e clique na versão mais recente disponível para Windows, Linux / Unix e mac os. Baixe Python de acordo com seu sistema operacional de 64 ou 32 bits disponível com você.

Depois de fazer o download, clique no .exe file e siga as etapas para instalar o python em seu sistema.

O gerenciador de pacotes python, ou seja, pip também será instalado por padrão com a instalação acima. Para fazer funcionar globalmente em seu sistema, adicione diretamente a localização do python à variável PATH, a mesma é mostrada no início da instalação, para lembrar de marcar a caixa de seleção, que diz ADD to PATH. Caso você se esqueça de verificá-lo, siga os passos abaixo indicados para adicionar ao PATH.

Para adicionar ao PATH, siga as etapas abaixo -

Clique com o botão direito no ícone do seu computador e clique em propriedades → Configurações avançadas do sistema.

Ele exibirá a tela conforme mostrado abaixo -

Clique em Variáveis ​​de ambiente como mostrado acima. Ele exibirá a tela conforme mostrado abaixo -

Selecione o caminho e clique no botão Editar, adicione o caminho da localização do seu python no final. Agora, vamos verificar a versão do python.

Verificando a versão do python

E:\pyrx>python --version
Python 3.7.3

Instale RxPY

Agora que temos o python instalado, vamos instalar o RxPy.

Assim que o python for instalado, o gerenciador de pacotes python, ou seja, o pip também será instalado. A seguir está o comando para verificar a versão do pip -

E:\pyrx>pip --version
pip 19.1.1 from c:\users\xxxx\appdata\local\programs\python\python37\lib\site-
packages\pip (python 3.7)

Temos o pip instalado e a versão é 19.1.1. Agora, vamos usar o pip para instalar o RxPy

O comando é o seguinte -

pip install rx

Neste tutorial, estamos usando RxPY versão 3 e python versão 3.7.3. O funcionamento do RxPY versão 3 difere um pouco da versão anterior, ou seja, o RxPY versão 1.

Neste capítulo, vamos discutir as diferenças entre as 2 versões e as mudanças que precisam ser feitas no caso de você estar atualizando as versões Python e RxPY.

Observável em RxPY

Na versão 1 do RxPy, Observable era uma classe separada -

from rx import Observable

Para usar o Observable, você deve usá-lo da seguinte maneira -

Observable.of(1,2,3,4,5,6,7,8,9,10)

Na versão 3 do RxPy, o Observable faz parte diretamente do pacote rx.

Example

import rx
rx.of(1,2,3,4,5,6,7,8,9,10)

Operadores em RxPy

Na versão 1, o operador era métodos na classe Observable. Por exemplo, para fazer uso de operadores, temos que importar Observable conforme mostrado abaixo -

from rx import Observable

Os operadores são usados ​​como Observable.operator, por exemplo, conforme mostrado abaixo -

Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

No caso do RxPY versão 3, os operadores funcionam, são importados e usados ​​da seguinte forma -

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

Operadores de encadeamento usando o método Pipe ()

Na versão 1 do RxPy, caso você tivesse que usar vários operadores em um observável, isso tinha que ser feito da seguinte maneira -

Example

from rx import Observable
Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

Mas, no caso do RxPY versão 3, você pode usar o método pipe () e vários operadores como mostrado abaixo -

Example

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

Um observável é uma função que cria um observador e o anexa à fonte onde os valores são esperados, por exemplo, cliques, eventos de mouse de um elemento dom, etc.

Os tópicos mencionados abaixo serão estudados detalhadamente neste capítulo.

  • Criar observáveis

  • Assine e execute um observável

Crie observáveis

Para criar um observável, usaremos create() método e passar a função para ele que contém os seguintes itens.

  • on_next() - Esta função é chamada quando o Observable emite um item.

  • on_completed() - Esta função é chamada quando o Observable é concluído.

  • on_error() - Esta função é chamada quando ocorre um erro no Observable.

Para trabalhar com o método create (), primeiro importe o método conforme mostrado abaixo -

from rx import create

Aqui está um exemplo prático, para criar um observável -

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error")
   observer.on_completed()
source = create(test_observable).

Assine e execute um observável

Para assinar um observável, precisamos usar a função subscribe () e passar a função de retorno de chamada on_next, on_error e on_completed.

Aqui está um exemplo prático -

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

O método subscribe () cuida da execução do observável. A função de retorno de chamadaon_next, on_error e on_completeddeve ser passado para o método de inscrição. A chamada para o método de inscrição, por sua vez, executa a função test_observable ().

Não é obrigatório passar todas as três funções de retorno de chamada para o método subscribe (). Você pode passar de acordo com seus requisitos para on_next (), on_error () e on_completed ().

A função lambda é usada para on_next, on_error e on_completed. Ele pegará os argumentos e executará a expressão fornecida.

Aqui está a saída do observável criado -

E:\pyrx>python testrx.py
Got - Hello
Job Done!

Este capítulo explica sobre os operadores em RxPY em detalhes. Esses operadores incluem -

  • Trabalhando com Operadores
  • Operadores matemáticos
  • Operadores de transformação
  • Operadores de filtragem
  • Operadores de tratamento de erros
  • Operadores de serviços públicos
  • Operadores condicionais
  • Operadores de criação
  • Operadores conectáveis
  • Combinando operadores

O python reativo (Rx) tem quase muitos operadores, que facilitam a vida com a codificação python. Você pode usar esses vários operadores juntos, por exemplo, ao trabalhar com strings, você pode usar os operadores de mapa, filtro e mesclagem.

Trabalhando com Operadores

Você pode trabalhar com vários operadores juntos usando o método pipe (). Este método permite encadear vários operadores juntos.

Aqui está um exemplo prático do uso de operadores -

test = of(1,2,3) // an observable
subscriber = test.pipe(
   op1(),
   op2(),
   op3()
)

No exemplo acima, criamos um método observável of () que leva os valores 1, 2 e 3. Agora, neste observável, você pode realizar uma operação diferente, usando qualquer número de operadores usando o método pipe (), conforme mostrado acima. A execução dos operadores continuará sequencialmente no observável dado.

Para trabalhar com operadores, primeiro importe-o conforme mostrado abaixo -

from rx import of, operators as op

Aqui está um exemplo prático -

testrx.py

from rx import of, operators as op
test = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
sub1 = test.pipe(
   op.filter(lambda s: s%2==0),
   op.reduce(lambda acc, x: acc + x)
)
sub1.subscribe(lambda x: print("Sum of Even numbers is {0}".format(x)))

No exemplo acima, há uma lista de números, da qual estamos filtrando números pares usando um operador de filtro e, posteriormente, adicionando-o usando um operador de redução.

Output

E:\pyrx>python testrx.py
Sum of Even numbers is 30

Aqui está uma lista de operadores, que vamos discutir -

  • Criação de observáveis
  • Operadores matemáticos
  • Operadores de transformação
  • Operadores de filtragem
  • Operadores de tratamento de erros
  • Operadores de serviços públicos
  • Conditional
  • Connectable
  • Combinando operadores

Criação de observáveis

A seguir estão os observáveis, vamos discutir na categoria Criação

Mostrar exemplos

Observável Descrição
crio Este método é usado para criar um observável.
vazio Este observável não produzirá nada e emitirá diretamente o estado completo.
Nunca Este método cria um observável que nunca alcançará o estado completo.
lançar Este método criará um observável que gerará um erro.
de_ Este método converterá a matriz ou objeto fornecido em um observável.
intervalo Este método fornecerá uma série de valores produzidos após um tempo limite.
somente Este método converterá determinado valor em um observável.
alcance Este método fornecerá um intervalo de inteiros com base na entrada fornecida.
repeat_value Este método criará um observável que repetirá o valor fornecido de acordo com a contagem fornecida.
começar Este método recebe uma função como entrada e retorna um valor observável que retornará o valor da função de entrada.
cronômetro Este método emitirá os valores em sequência após o tempo limite terminar.

Operadores matemáticos

Os operadores que vamos discutir na categoria de operadores matemáticos são os seguintes: -

Mostrar exemplos

Operador Descrição
média Este operador irá calcular a média a partir da fonte observável fornecida e produzir um observável que terá o valor médio.
concat Esse operador pegará dois ou mais observáveis ​​e receberá um único observável com todos os valores na sequência.
contagem

Este operador obtém um Observable com valores e o converte em um Observable que terá um único valor. A função de contagem assume a função de predicado como um argumento opcional.

A função é do tipo booleano e adicionará valor à saída somente se atender à condição.

max Este operador fornecerá um observável com valor máximo da fonte observável.
min Este operador fornecerá um valor observável com valor mínimo da fonte observável.
reduzir Este operador assume uma função chamada função acumuladora que é usada nos valores vindos da fonte observável e retorna os valores acumulados na forma de um observável, com um valor semente opcional passado para a função acumuladora.
soma Este operador retornará um observável com a soma de todos os valores dos observáveis ​​de origem.

Operadores de transformação

Os operadores que vamos discutir na categoria Operador de transformação são mencionados abaixo -

Mostrar exemplos

Operador Categoria
amortecedor Esse operador coletará todos os valores da fonte observável e os emitirá em intervalos regulares assim que a condição de limite fornecida for satisfeita.
ground_by Este operador agrupará os valores provenientes da fonte observável com base na função key_mapper fornecida.
mapa Este operador irá alterar cada valor da fonte observável em um novo valor com base na saída do mapper_func fornecido.
Varredura Este operador aplicará uma função de acumulador aos valores provenientes da fonte observável e retornará um observável com novos valores.

Operadores de filtragem

Os operadores que discutiremos na categoria de operador de filtragem são fornecidos abaixo -

Mostrar exemplos

Operador Categoria
debounce Este operador fornecerá os valores da fonte observável, até que o intervalo de tempo fornecido e ignore o restante do tempo.
distinto Este operador fornecerá todos os valores que são distintos da fonte observável.
element_at Este operador fornecerá um elemento da fonte observável para o índice fornecido.
filtro Este operador filtrará os valores da fonte observável com base na função de predicado fornecida.
primeiro Este operador fornecerá o primeiro elemento da fonte observável.
ignore_elements Este operador irá ignorar todos os valores observáveis ​​de origem e apenas executará chamadas para funções de callback completas ou com erro.
último Este operador fornecerá o último elemento da fonte observável.
pular Este operador retornará um observável que ignorará a primeira ocorrência de itens de contagem tomados como entrada.
skip_last Este operador retornará um observável que irá ignorar a última ocorrência de itens de contagem tomados como entrada.
levar Este operador fornecerá uma lista de valores de origem em ordem contínua com base na contagem fornecida.
take_last Este operador fornecerá uma lista de valores de origem em ordem contínua a partir do último com base na contagem fornecida.

Operadores de tratamento de erros

Os operadores que vamos discutir na categoria Operador de tratamento de erros são: -

Mostrar exemplos

Operador Descrição
pegar Este operador encerrará a fonte observável quando houver uma exceção.
tentar novamente Este operador tentará novamente na fonte observável quando houver um erro e, assim que a contagem de novas tentativas for concluída, ela será encerrada.

Operadores de serviços públicos

A seguir estão os operadores que discutiremos na categoria Operador de serviços públicos.

Mostrar exemplos

Operador Descrição
demora Este operador irá atrasar a emissão observável da fonte de acordo com a hora ou data fornecida.
materializar Este operador converterá os valores da fonte observável com os valores emitidos na forma de valores de notificação explícitos.
intervalo de tempo Este operador dará o tempo decorrido entre os valores da fonte observável.
tempo esgotado Este operador fornecerá todos os valores da fonte observáveis ​​após o tempo decorrido ou irá disparar um erro.
timestamp Este operador anexará um carimbo de data / hora a todos os valores do observável de origem.

Operadores condicionais e booleanos

Os operadores que vamos discutir na categoria Operador condicional e booleano são os indicados abaixo -

Mostrar exemplos

Operador Descrição
todos Este operador verificará se todos os valores da fonte observável satisfazem a condição fornecida.
contém Este operador retornará um observável com o valor verdadeiro ou falso se o valor fornecido estiver presente e se for o valor da fonte observável.
default_if_empty Este operador retornará um valor padrão se a fonte observável estiver vazia.
sequência_equal Este operador irá comparar duas sequências de observáveis ​​ou uma matriz de valores e retornar um observável com o valor verdadeiro ou falso.
skip_until Este operador descartará os valores da fonte observável até que o segundo observável emita um valor.
skip_while Este operador retornará um observável com valores do observável de origem que satisfaça a condição passada.
pegue_até Este operador descartará os valores da fonte observável após o segundo observável emitir um valor ou ser encerrado.
take_while Este operador descartará os valores da fonte observáveis ​​quando a condição falhar.

Operadores conectáveis

Os operadores que vamos discutir na categoria Operador conectável são -

Mostrar exemplos

Operador Descrição
publicar Este método converterá o observável em um observável conectável.
ref_count Este operador tornará o observável um observável normal.
repetir Este método funciona de forma semelhante ao replaySubject. Este método retornará os mesmos valores, mesmo se o observável já tiver sido emitido e alguns dos assinantes estiverem atrasados ​​na assinatura.

Operadores de combinação

A seguir estão os operadores que discutiremos na categoria de operador de combinação.

Mostrar exemplos

Operador Descrição
combinar o último Este operador criará uma tupla para o observável dado como entrada.
fundir Este operador irá mesclar dados observáveis.
começar com Este operador pegará os valores fornecidos e adicionará no início do retorno observável de origem a sequência completa.
fecho eclair Este operador retorna um observável com valores em uma forma de tupla, que é formado tomando o primeiro valor do observável fornecido e assim por diante.

Um assunto é uma sequência observável, bem como um observador que pode fazer multicast, ou seja, falar com muitos observadores que se inscreveram.

Vamos discutir os seguintes tópicos sobre o assunto -

  • Crie um assunto
  • Inscreva-se em um assunto
  • Passando dados para o assunto
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

Crie um assunto

Para trabalhar com um assunto, precisamos importar Assunto conforme mostrado abaixo -

from rx.subject import Subject

Você pode criar um objeto-sujeito da seguinte maneira -

subject_test = Subject()

O objeto é um observador que possui três métodos -

  • on_next(value)
  • on_error (erro) e
  • on_completed()

Inscrever-se em um assunto

Você pode criar várias assinaturas sobre o assunto, conforme mostrado abaixo -

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

Passando Dados para o Assunto

Você pode passar dados para o assunto criado usando o método on_next (valor) como mostrado abaixo -

subject_test.on_next("A")
subject_test.on_next("B")

Os dados serão repassados ​​a todas as inscrições, agregadas no assunto.

Aqui está um exemplo prático do assunto.

Exemplo

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

O objeto subject_test é criado chamando um Subject (). O objeto subject_test faz referência aos métodos on_next (value), on_error (error) e on_completed (). O resultado do exemplo acima é mostrado abaixo -

Resultado

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

Podemos usar o método on_completed (), para parar a execução do assunto como mostrado abaixo.

Exemplo

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

Assim que chamarmos complete, o próximo método chamado posteriormente não será invocado.

Resultado

E:\pyrx>python testrx.py
The value is A
The value is A

Vamos agora ver como chamar o método on_error (error).

Exemplo

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

Resultado

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

BehaviorSubject

BehaviorSubject fornecerá o valor mais recente quando chamado. Você pode criar um assunto de comportamento conforme mostrado abaixo -

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

Aqui está um exemplo prático para usar o assunto de comportamento

Exemplo

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

Resultado

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

Assunto Repetir

Um sujeito de replay é semelhante ao sujeito de comportamento, em que ele pode armazenar os valores e reproduzir os mesmos para os novos assinantes. Aqui está um exemplo prático do assunto de repetição.

Exemplo

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

O valor do buffer usado é 2 no assunto de reprodução. Portanto, os dois últimos valores serão armazenados em buffer e usados ​​para os novos assinantes chamados.

Resultado

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

No caso de AsyncSubject, o último valor chamado é passado para o assinante, e isso será feito somente depois que o método complete () for chamado.

Exemplo

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

Resultado

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2

Uma característica importante do RxPy é a simultaneidade, ou seja, permitir que a tarefa seja executada em paralelo. Para que isso aconteça, temos dois operadores subscribe_on () e observe_on () que trabalharão com um planejador, que decidirá a execução da tarefa inscrita.

Aqui, está um exemplo de trabalho, que mostra a necessidade de subscibe_on (), observe_on () e planejador.

Exemplo

import random
import time
import rx
from rx import operators as ops
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
) 
input("Press any key to exit\n")

No exemplo acima, tenho 2 tarefas: Tarefa 1 e Tarefa 2. A execução da tarefa está em sequência. A segunda tarefa começa apenas, quando a primeira tarefa é concluída.

Resultado

E:\pyrx>python testrx.py
From Task 1: 1
From Task 1: 2
From Task 1: 3
From Task 1: 4
From Task 1: 5
Task 1 complete
From Task 2: 1
From Task 2: 2
From Task 2: 3
From Task 2: 4
Task 2 complete

RxPy suporta muitos Scheduler, e aqui, vamos fazer uso de ThreadPoolScheduler. ThreadPoolScheduler tentará principalmente gerenciar com os threads de CPU disponíveis.

No exemplo que vimos anteriormente, vamos fazer uso de um módulo de multiprocessamento que nos dará o cpu_count. A contagem será dada ao ThreadPoolScheduler que conseguirá fazer a tarefa funcionar em paralelo com base nos threads disponíveis.

Aqui está um exemplo prático -

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
input("Press any key to exit\n")

No exemplo acima, eu tenho 2 tarefas e cpu_count é 4. Como a tarefa é 2 e os threads disponíveis conosco são 4, ambas as tarefas podem iniciar em paralelo.

Resultado

E:\pyrx>python testrx.py
Cpu count is : 4
Press any key to exit
From Task 1: 1
From Task 2: 1
From Task 1: 2
From Task 2: 2
From Task 2: 3
From Task 1: 3
From Task 2: 4
Task 2 complete
From Task 1: 4
From Task 1: 5
Task 1 complete

Se você vir a saída, ambas as tarefas foram iniciadas em paralelo.

Agora, considere um cenário, onde a tarefa é maior do que a contagem de CPU, ou seja, a contagem de CPU é 4 e as tarefas são 5. Nesse caso, precisaríamos verificar se algum encadeamento ficou livre após a conclusão da tarefa, para que possa ser atribuído à nova tarefa disponível na fila.

Para este propósito, podemos usar o operador observe_on () que observará o escalonador se algum encadeamento estiver livre. Aqui está um exemplo de trabalho usando observe_on ()

Exemplo

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
#Task 3
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 3: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 3 complete")
)
#Task 4
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 4: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 4 complete")
)
#Task 5
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.observe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 5: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 5 complete")
)
input("Press any key to exit\n")

Resultado

E:\pyrx>python testrx.py
Cpu count is : 4
From Task 4: 1
From Task 4: 2
From Task 1: 1
From Task 2: 1
From Task 3: 1
From Task 1: 2
From Task 3: 2
From Task 4: 3
From Task 3: 3
From Task 2: 2
From Task 1: 3
From Task 4: 4
Task 4 complete
From Task 5: 1
From Task 5: 2
From Task 5: 3
From Task 3: 4
Task 3 complete
From Task 2: 3
Press any key to exit
From Task 5: 4
Task 5 complete
From Task 1: 4
From Task 2: 4
Task 2 complete
From Task 1: 5
Task 1 complete

Se você ver a saída, no momento em que a tarefa 4 é concluída, o thread é passado para a próxima tarefa, ou seja, a tarefa 5 e a mesma começa a ser executada.

Neste capítulo, discutiremos os seguintes tópicos em detalhes -

  • Exemplo básico mostrando o funcionamento do observável, operadores e assinando o observador.
  • Diferença entre observável e sujeito.
  • Compreendendo os observáveis ​​quentes e frios.

A seguir, é fornecido um exemplo básico que mostra o funcionamento do observável, dos operadores e da assinatura do observador.

Exemplo

test.py

import requests
import rx
import json
from rx import operators as ops
def filternames(x):
   if (x["name"].startswith("C")):
      return x["name"]
   else :
      return ""
content = requests.get('https://jsonplaceholder.typicode.com/users')
y = json.loads(content.text)
source = rx.from_(y)
case1 = source.pipe(
   ops.filter(lambda c: filternames(c)),
   ops.map(lambda a:a["name"])
)
case1.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)), 8. RxPy — Examples
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

Aqui está um exemplo muito simples, em que estou obtendo dados do usuário a partir deste URL -

https://jsonplaceholder.typicode.com/users.

Filtrando os dados, para dar os nomes começando com "C", e depois usando o mapa para retornar apenas os nomes. Aqui está a saída para o mesmo -

E:\pyrx\examples>python test.py
Got - Clementine Bauch
Got - Chelsey Dietrich
Got - Clementina DuBuque
Job Done!

Diferença entre observável e sujeito

Neste exemplo, veremos a diferença entre um observável e um sujeito.

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

Resultado

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

No exemplo acima, cada vez que você assina o observável, ele fornecerá novos valores.

Exemplo de Assunto

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

Resultado

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

Se você ver que os valores são compartilhados, entre os dois assinantes usando o assunto.

Compreendendo os observáveis ​​quentes e frios

Um observável é classificado como

  • Observáveis ​​Frios
  • Observáveis ​​quentes

A diferença nos observáveis ​​será notada quando vários assinantes estiverem se inscrevendo.

Observáveis ​​Frios

Observáveis ​​frios, são observáveis ​​que são executados e renderizam dados cada vez que são inscritos. Quando é inscrito, o observável é executado e os novos valores são fornecidos.

O exemplo a seguir fornece a compreensão do frio observável.

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

Resultado

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

No exemplo acima, toda vez que você assinar o observável, ele executará o observável e emitirá valores. Os valores também podem diferir de assinante para assinante, conforme mostrado no exemplo acima.

Observáveis ​​quentes

No caso de hot observable, eles emitirão os valores quando estiverem prontos e nem sempre aguardarão por uma assinatura. Quando os valores são emitidos, todos os assinantes receberão o mesmo valor.

Você pode fazer uso do observável quente quando quiser que os valores sejam emitidos quando o observável estiver pronto, ou quiser compartilhar os mesmos valores com todos os seus assinantes.

Um exemplo de observável quente são os operadores sujeitos e conectáveis.

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

Resultado

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

Se você vir, o mesmo valor é compartilhado entre os assinantes. Você pode obter o mesmo usando o operador observável conectável publish ().