RxPY - Работа с темой

Субъект - это наблюдаемая последовательность, а также наблюдатель, который может осуществлять многоадресную рассылку, то есть разговаривать со многими подписавшимися наблюдателями.

Мы собираемся обсудить следующие темы по теме -

  • Создать тему
  • Подпишитесь на тему
  • Передача данных субъекту
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

Создать тему

Чтобы работать с темой, нам нужно импортировать тему, как показано ниже -

from rx.subject import Subject

Вы можете создать субъект-объект следующим образом -

subject_test = Subject()

Объект - это наблюдатель, у которого есть три метода:

  • on_next(value)
  • on_error (ошибка) и
  • on_completed()

Подпишитесь на тему

Вы можете создать несколько подписок по теме, как показано ниже -

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

Передача данных субъекту

Вы можете передать данные субъекту, созданному с помощью метода on_next (value), как показано ниже -

subject_test.on_next("A")
subject_test.on_next("B")

Данные будут переданы во все подписки, добавленные по теме.

Вот рабочий пример предмета.

пример

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

Объект subject_test создается путем вызова Subject (). Объект subject_test имеет ссылку на методы on_next (значение), on_error (ошибка) и on_completed (). Результат приведенного выше примера показан ниже -

Вывод

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

Мы можем использовать метод on_completed (), чтобы остановить выполнение объекта, как показано ниже.

пример

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

После вызова complete следующий метод, вызываемый позже, не вызывается.

Вывод

E:\pyrx>python testrx.py
The value is A
The value is A

Давайте теперь посмотрим, как вызвать метод on_error (error).

пример

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

Вывод

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

BehaviorSubject

BehaviorSubject даст вам последнее значение при вызове. Вы можете создать тему поведения, как показано ниже -

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

Вот рабочий пример использования Behavior Subject

пример

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

Вывод

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

Повторить тему

Субъект повторного воспроизведения аналогичен субъекту поведения, при этом он может буферизовать значения и воспроизводить их для новых подписчиков. Вот рабочий пример темы воспроизведения.

пример

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

Используемое значение буфера - 2 для темы воспроизведения. Таким образом, последние два значения будут помещены в буфер и будут использоваться для вызываемых новых подписчиков.

Вывод

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

В случае AsyncSubject последнее вызванное значение передается подписчику, и это будет сделано только после вызова метода complete ().

пример

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

Вывод

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2