RxPY - Краткое руководство

В этой главе объясняется, что такое реактивное программирование, что такое RxPY, его операторы, особенности, преимущества и недостатки.

Что такое реактивное программирование?

Реактивное программирование - это парадигма программирования, которая имеет дело с потоком данных и распространением изменений. Это означает, что, когда поток данных испускается одним компонентом, изменение будет распространено на другие компоненты библиотекой реактивного программирования. Распространение изменения будет продолжаться до тех пор, пока оно не достигнет последнего получателя.

Используя RxPY, у вас есть хороший контроль над асинхронными потоками данных, например, запрос, сделанный по URL-адресу, можно отследить с помощью наблюдаемого и использовать наблюдатель для прослушивания, когда запрос завершен для ответа или ошибки.

RxPY предлагает вам обрабатывать асинхронные потоки данных, используя Observables, запросить потоки данных, используя Operators т.е. фильтровать, суммировать, объединять, отображать, а также использовать параллелизм для потоков данных, используя Schedulers. Создание Observable дает объект-наблюдатель с методами on_next (v), on_error (e) и on_completed (), которые должны бытьsubscribed чтобы мы получали уведомление, когда происходит событие.

Observable можно запросить с помощью нескольких операторов в формате цепочки с помощью оператора конвейера.

RxPY предлагает операторов в различных категориях, таких как: -

  • Математические операторы

  • Операторы преобразования

  • Операторы фильтрации

  • Операторы обработки ошибок

  • Коммунальные операторы

  • Условные операторы

  • Операторы создания

  • Подключаемые операторы

Эти операторы подробно описаны в этом руководстве.

Что такое RxPy?

RxPY определяется как a library for composing asynchronous and event-based programs using observable collections and pipable query operators in Python согласно официальному сайту RxPy, который https://rxpy.readthedocs.io/en/latest/.

RxPY - это библиотека Python для поддержки реактивного программирования. RxPy означаетReactive Extensions for Python. Это библиотека, которая использует наблюдаемые объекты для работы с реактивным программированием, которое имеет дело с асинхронными вызовами данных, обратными вызовами и программами на основе событий.

Особенности RxPy

В RxPy следующие концепции заботятся об обработке асинхронной задачи:

Наблюдаемый

Наблюдаемый объект - это функция, которая создает наблюдателя и присоединяет его к источнику, имеющему потоки данных, которые ожидаются, например, от твитов, компьютерных событий и т. Д.

Наблюдатель

Это объект с методами on_next (), on_error () и on_completed (), который будет вызываться при взаимодействии с наблюдаемым, т.е. источник взаимодействует для примера входящих твитов и т. Д.

Подписка

Когда наблюдаемый объект создается, для его выполнения нам нужно подписаться на него.

Операторы

Оператор - это чистая функция, которая принимает наблюдаемое в качестве входных данных, а выход также является наблюдаемым. Вы можете использовать несколько операторов для наблюдаемых данных с помощью оператора конвейера.

Тема

Субъект - это наблюдаемая последовательность, а также наблюдатель, который может выполнять многоадресную передачу, то есть разговаривать со многими подписавшимися наблюдателями. Субъект является холодным наблюдаемым, т.е. значения будут разделены между подписавшимися наблюдателями.

Планировщики

Одной из важных особенностей RxPy является параллелизм, т.е. возможность параллельного выполнения задачи. Для этого в RxPy есть два оператора subscribe_on () и Наблюдать_on (), которые работают с планировщиками и определяют выполнение подписанной задачи.

Преимущества использования RxPY

Ниже приведены преимущества RxPy -

  • RxPY - отличная библиотека, когда дело доходит до обработки асинхронных потоков данных и событий. RxPY использует наблюдаемые объекты для работы с реактивным программированием, которое имеет дело с асинхронными вызовами данных, обратными вызовами и программами на основе событий.

  • RxPY предлагает огромный набор операторов в математических категориях, таких как преобразование, фильтрация, служебные, условные, обработки ошибок и объединения, что упрощает жизнь при использовании с реактивным программированием.

  • Параллелизм, то есть совместная работа нескольких задач, достигается с помощью планировщиков в RxPY.

  • Производительность повышается с использованием RxPY, поскольку обработка асинхронных задач и параллельная обработка упрощаются.

Недостаток использования RxPY

  • Отладка кода с помощью наблюдаемых немного сложна.

В этой главе мы будем работать над установкой RxPy. Чтобы начать работу с RxPY, нам нужно сначала установить Python. Итак, мы собираемся работать над следующим -

  • Установить Python
  • Установить RxPy

Установка Python

Перейдите на официальный сайт Python: https://www.python.org/downloads/.как показано ниже, и щелкните последнюю версию, доступную для Windows, Linux / Unix и mac os. Загрузите Python в соответствии с имеющейся у вас 64- или 32-разрядной ОС.

После того, как вы скачали, нажмите на .exe file и следуйте инструкциям по установке python в вашей системе.

Менеджер пакетов python, то есть pip, также будет установлен по умолчанию при указанной выше установке. Чтобы заставить его работать глобально в вашей системе, напрямую добавьте местоположение python в переменную PATH, то же самое будет показано в начале установки, не забудьте установить флажок, который говорит ДОБАВИТЬ в PATH. В случае, если вы забыли это проверить, выполните следующие действия, чтобы добавить в PATH.

Чтобы добавить в PATH, выполните следующие действия:

Щелкните правой кнопкой мыши значок «Компьютер» и выберите «Свойства» → «Дополнительные параметры системы».

Он отобразит экран, как показано ниже -

Щелкните Переменные среды, как показано выше. Он отобразит экран, как показано ниже -

Выберите Путь и нажмите кнопку «Изменить», в конце добавьте путь к местоположению вашего питона. Теперь давайте проверим версию Python.

Проверка версии Python

E:\pyrx>python --version
Python 3.7.3

Установить RxPY

Теперь, когда у нас установлен python, мы собираемся установить RxPy.

После установки python также будет установлен менеджер пакетов python, то есть pip. Ниже приведена команда для проверки версии pip -

E:\pyrx>pip --version
pip 19.1.1 from c:\users\xxxx\appdata\local\programs\python\python37\lib\site-
packages\pip (python 3.7)

У нас установлен pip, и версия 19.1.1. Теперь мы будем использовать pip для установки RxPy

Команда выглядит следующим образом -

pip install rx

В этом руководстве мы используем RxPY версии 3 и python версии 3.7.3. Работа RxPY версии 3 немного отличается от более ранней версии, то есть RxPY версии 1.

В этой главе мы собираемся обсудить различия между двумя версиями и изменения, которые необходимо сделать, если вы обновляете версии Python и RxPY.

Наблюдается в RxPY

В RxPy версии 1 Observable был отдельным классом -

from rx import Observable

Чтобы использовать Observable, вы должны использовать его следующим образом:

Observable.of(1,2,3,4,5,6,7,8,9,10)

В RxPy версии 3 Observable является непосредственно частью пакета rx.

Example

import rx
rx.of(1,2,3,4,5,6,7,8,9,10)

Операторы в RxPy

В версии 1 оператором были методы в классе Observable. Например, чтобы использовать операторы, мы должны импортировать Observable, как показано ниже -

from rx import Observable

Операторы используются как Observable.operator, например, как показано ниже -

Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

В случае RxPY версии 3 операторы являются функциями, импортируются и используются следующим образом:

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

Операторы связывания с использованием метода Pipe ()

В RxPy версии 1, если вам нужно было использовать несколько операторов для наблюдаемого, это нужно было сделать следующим образом:

Example

from rx import Observable
Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

Но в случае RxPY версии 3 вы можете использовать метод pipe () и несколько операторов, как показано ниже -

Example

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

Наблюдаемый - это функция, которая создает наблюдателя и присоединяет его к источнику, где ожидаются значения, например, щелчки, события мыши от элемента dom и т. Д.

Указанные ниже темы будут подробно изучены в этой главе.

  • Создать наблюдаемые

  • Подписка и выполнение наблюдаемого

Создать наблюдаемые

Для создания наблюдаемого мы будем использовать create() метод и передайте ему функцию, которая имеет следующие элементы.

  • on_next() - Эта функция вызывается, когда Observable испускает элемент.

  • on_completed() - Эта функция вызывается, когда Observable завершается.

  • on_error() - Эта функция вызывается при возникновении ошибки в Observable.

Для работы с методом create () сначала импортируйте метод, как показано ниже -

from rx import create

Вот рабочий пример для создания наблюдаемого -

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error")
   observer.on_completed()
source = create(test_observable).

Подписка и выполнение наблюдаемого

Чтобы подписаться на наблюдаемое, нам нужно использовать функцию subscribe () и передать функции обратного вызова on_next, on_error и on_completed.

Вот рабочий пример -

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

Метод subscribe () заботится о выполнении наблюдаемого. Функция обратного вызоваon_next, on_error и on_completedнеобходимо передать методу подписки. Вызов метода подписки, в свою очередь, выполняет функцию test_observable ().

Необязательно передавать все три функции обратного вызова методу subscribe (). Вы можете передать в соответствии с вашими требованиями on_next (), on_error () и on_completed ().

Лямбда-функция используется для on_next, on_error и on_completed. Он примет аргументы и выполнит данное выражение.

Вот результат созданного наблюдаемого -

E:\pyrx>python testrx.py
Got - Hello
Job Done!

В этой главе подробно рассказывается об операторах в RxPY. Эти операторы включают в себя -

  • Работа с операторами
  • Математические операторы
  • Операторы преобразования
  • Операторы фильтрации
  • Операторы обработки ошибок
  • Коммунальные операторы
  • Условные операторы
  • Операторы создания
  • Подключаемые операторы
  • Объединение операторов

Реактивный (Rx) python имеет почти множество операторов, которые упрощают жизнь с кодированием на python. Вы можете использовать эти несколько операторов вместе, например, при работе со строками вы можете использовать операторы map, filter, merge.

Работа с операторами

Вы можете работать с несколькими операторами вместе, используя метод pipe (). Этот метод позволяет объединить несколько операторов в цепочку.

Вот рабочий пример использования операторов -

test = of(1,2,3) // an observable
subscriber = test.pipe(
   op1(),
   op2(),
   op3()
)

В приведенном выше примере мы создали наблюдаемое, используя метод of (), который принимает значения 1, 2 и 3. Теперь на этом наблюдаемом вы можете выполнить другую операцию, используя любое количество операторов, используя метод pipe (), как показано. выше. Выполнение операторов будет продолжаться последовательно для данной наблюдаемой.

Чтобы работать с операторами, сначала импортируйте его, как показано ниже -

from rx import of, operators as op

Вот рабочий пример -

testrx.py

from rx import of, operators as op
test = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
sub1 = test.pipe(
   op.filter(lambda s: s%2==0),
   op.reduce(lambda acc, x: acc + x)
)
sub1.subscribe(lambda x: print("Sum of Even numbers is {0}".format(x)))

В приведенном выше примере есть список чисел, из которого мы фильтруем четные числа с помощью оператора фильтра, а затем добавляем его с помощью оператора сокращения.

Output

E:\pyrx>python testrx.py
Sum of Even numbers is 30

Вот список операторов, которые мы собираемся обсудить -

  • Создание наблюдаемых
  • Математические операторы
  • Операторы преобразования
  • Операторы фильтрации
  • Операторы обработки ошибок
  • Коммунальные операторы
  • Conditional
  • Connectable
  • Объединение операторов

Создание наблюдаемых

Ниже приведены наблюдаемые, которые мы собираемся обсудить в категории «Создание».

Показать примеры

Наблюдаемый Описание
Создайте Этот метод используется для создания наблюдаемого.
пустой Этот наблюдаемый ничего не выводит и напрямую испускает полное состояние.
никогда Этот метод создает наблюдаемую, которая никогда не достигнет полного состояния.
бросить Этот метод создаст наблюдаемое, которое выдаст ошибку.
из_ Этот метод преобразует данный массив или объект в наблюдаемый.
интервал Этот метод выдаст серию значений, созданных после тайм-аута.
просто Этот метод преобразует данное значение в наблюдаемое.
спектр Этот метод выдаст диапазон целых чисел на основе заданных входных данных.
repeat_value Этот метод создаст наблюдаемую, которая будет повторять заданное значение в соответствии с заданным счетчиком.
Начало Этот метод принимает функцию в качестве входных данных и возвращает наблюдаемое, которое будет возвращать значение из входной функции.
таймер Этот метод будет выдавать значения последовательно после истечения тайм-аута.

Математические операторы

Операторы, которые мы собираемся обсудить в категории математических операторов, следующие:

Показать примеры

Оператор Описание
в среднем Этот оператор вычислит среднее значение из исходной наблюдаемой и выведет наблюдаемую, которая будет иметь среднее значение.
concat Этот оператор принимает два или более наблюдаемых объекта и получает одну наблюдаемую со всеми значениями в последовательности.
считать

Этот оператор принимает Observable со значениями и преобразует его в Observable, который будет иметь одно значение. Функция count принимает функцию предиката в качестве необязательного аргумента.

Функция имеет логический тип и будет добавлять значение к выходным данным только в том случае, если она удовлетворяет условию.

Максимум Этот оператор даст наблюдаемое с максимальным значением из наблюдаемого источника.
мин Этот оператор даст наблюдаемое с минимальным значением из исходного наблюдаемого.
уменьшить Этот оператор принимает функцию, называемую функцией накопителя, которая используется для значений, поступающих из наблюдаемого источника, и возвращает накопленные значения в форме наблюдаемого объекта с необязательным начальным значением, переданным в функцию накопителя.
сумма Этот оператор вернет наблюдаемое с суммой всех значений из исходных наблюдаемых.

Операторы преобразования

Операторы, которые мы собираемся обсудить в категории операторов преобразования, упомянуты ниже -

Показать примеры

Оператор Категория
буфер Этот оператор будет собирать все значения из наблюдаемого источника и выдавать их через равные промежутки времени, как только заданное граничное условие будет выполнено.
Ground_by Этот оператор группирует значения, поступающие из наблюдаемого источника, на основе заданной функции key_mapper.
карта Этот оператор изменит каждое значение из наблюдаемого источника на новое значение на основе выходных данных mapper_func.
сканировать Этот оператор применит функцию аккумулятора к значениям, поступающим из исходного наблюдаемого объекта, и вернет наблюдаемый объект с новыми значениями.

Операторы фильтрации

Операторы, которые мы собираемся обсудить в категории операторов фильтрации, приведены ниже -

Показать примеры

Оператор Категория
дебонсировать Этот оператор будет давать значения из наблюдаемого источника до заданного промежутка времени и игнорировать остальное время.
отчетливый Этот оператор выдаст все значения, отличные от наблюдаемого источника.
element_at Этот оператор предоставит элемент из источника, наблюдаемый для данного индекса.
фильтр Этот оператор будет фильтровать значения из наблюдаемого источника на основе заданной функции предиката.
первый Этот оператор предоставит первый элемент наблюдаемого источника.
ignore_elements Этот оператор игнорирует все значения из наблюдаемого источника и выполняет только вызовы функций обратного вызова для завершения или ошибки.
последний Этот оператор предоставит последний элемент наблюдаемого источника.
пропускать Этот оператор вернет наблюдаемое, которое пропустит первое вхождение элементов count, взятых в качестве входных.
skip_last Этот оператор вернет наблюдаемое, которое пропустит последнее вхождение элементов count, взятых в качестве входных.
взять Этот оператор предоставит список исходных значений в непрерывном порядке на основе заданного количества.
take_last Этот оператор выдаст список исходных значений в непрерывном порядке, начиная с последнего, на основе заданного количества.

Операторы обработки ошибок

В категории «Операторы обработки ошибок» мы собираемся обсудить следующие операторы:

Показать примеры

Оператор Описание
поймать Этот оператор завершит наблюдаемый источник при возникновении исключения.
повторить попытку Этот оператор будет повторять попытку для наблюдаемого источника при возникновении ошибки, и после того, как счетчик повторных попыток завершится, он прекратится.

Коммунальные операторы

Ниже перечислены операторы, которые мы собираемся обсудить в категории операторов служебных программ.

Показать примеры

Оператор Описание
задержка Этот оператор будет задерживать наблюдаемый источник излучения в соответствии с указанным временем или датой.
материализоваться Этот оператор преобразует значения из наблюдаемого источника в значения, передаваемые в виде явных значений уведомления.
интервал времени Этот оператор даст время, прошедшее между значениями наблюдаемого источника.
тайм-аут Этот оператор выдаст все значения из наблюдаемого источника по истечении истекшего времени, иначе вызовет ошибку.
отметка времени Этот оператор прикрепит метку времени ко всем значениям наблюдаемого источника.

Условные и логические операторы

Операторы, которые мы собираемся обсудить в категории условных и логических операторов, приведены ниже:

Показать примеры

Оператор Описание
все Этот оператор проверяет, все ли значения из наблюдаемого источника удовлетворяют заданному условию.
содержит Этот оператор вернет наблюдаемое со значением true или false, если данное значение присутствует и является значением наблюдаемого источника.
default_if_empty Этот оператор вернет значение по умолчанию, если наблюдаемый источник пуст.
sequence_equal Этот оператор сравнивает две последовательности наблюдаемых или массив значений и возвращает наблюдаемое со значением true или false.
skip_until Этот оператор будет отбрасывать значения из исходного наблюдаемого до тех пор, пока второй наблюдаемый не выдаст значение.
skip_ while Этот оператор вернет наблюдаемое со значениями из исходного наблюдаемого, которое удовлетворяет переданному условию.
take_until Этот оператор отбрасывает значения из исходного наблюдаемого объекта после того, как второй наблюдаемый объект испускает значение или завершается.
взять_ пока Этот оператор отбрасывает значения из наблюдаемого источника, когда условие не выполняется.

Подключаемые операторы

Операторы, которые мы собираемся обсудить в категории подключаемых операторов:

Показать примеры

Оператор Описание
публиковать Этот метод преобразует наблюдаемое в подключаемое наблюдаемое.
ref_count Этот оператор сделает наблюдаемое обычным наблюдаемым.
переиграть Этот метод работает аналогично replaySubject. Этот метод вернет те же значения, даже если наблюдаемый объект уже сгенерирован, а некоторые подписчики опоздали с подпиской.

Объединение операторов

Ниже перечислены операторы, которые мы собираемся обсудить в категории «Комбинирующие операторы».

Показать примеры

Оператор Описание
Combine_latest Этот оператор создаст кортеж для наблюдаемого, заданного на входе.
слияние Этот оператор объединит данные наблюдаемые.
start_with Этот оператор принимает заданные значения и добавляет в начало наблюдаемого источника возврата полную последовательность.
застегивать Этот оператор возвращает наблюдаемое со значениями в виде кортежа, который формируется путем взятия первого значения данного наблюдаемого и так далее.

Субъект - это наблюдаемая последовательность, а также наблюдатель, который может выполнять многоадресную передачу, то есть разговаривать со многими подписавшимися наблюдателями.

Мы собираемся обсудить следующие темы по теме -

  • Создать тему
  • Подпишитесь на тему
  • Передача данных субъекту
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

Создать тему

Чтобы работать с темой, нам нужно импортировать тему, как показано ниже -

from rx.subject import Subject

Вы можете создать субъект-объект следующим образом -

subject_test = Subject()

Объект - это наблюдатель, у которого есть три метода:

  • on_next(value)
  • on_error (ошибка) и
  • on_completed()

Подпишитесь на тему

Вы можете создать несколько подписок по теме, как показано ниже -

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

Передача данных субъекту

Вы можете передать данные субъекту, созданному с помощью метода on_next (value), как показано ниже -

subject_test.on_next("A")
subject_test.on_next("B")

Данные будут переданы во все подписки, добавленные по теме.

Вот рабочий пример предмета.

пример

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

Объект subject_test создается путем вызова Subject (). Объект subject_test имеет ссылку на методы on_next (значение), on_error (ошибка) и on_completed (). Результат приведенного выше примера показан ниже -

Вывод

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

Мы можем использовать метод on_completed (), чтобы остановить выполнение объекта, как показано ниже.

пример

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

После вызова complete следующий метод, вызываемый позже, не вызывается.

Вывод

E:\pyrx>python testrx.py
The value is A
The value is A

Давайте теперь посмотрим, как вызвать метод on_error (error).

пример

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

Вывод

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

BehaviorSubject

BehaviorSubject предоставит вам последнее значение при вызове. Вы можете создать тему поведения, как показано ниже -

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

Вот рабочий пример использования Behavior Subject

пример

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

Вывод

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

Повторить тему

Субъект воспроизведения аналогичен субъекту поведения, при этом он может буферизовать значения и воспроизводить их для новых подписчиков. Вот рабочий пример темы воспроизведения.

пример

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

Используемое значение буфера - 2 для темы воспроизведения. Таким образом, последние два значения будут помещены в буфер и будут использоваться для вызываемых новых подписчиков.

Вывод

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

В случае AsyncSubject последнее вызванное значение передается подписчику, и это будет сделано только после вызова метода complete ().

пример

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

Вывод

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2

Одной из важных особенностей RxPy является параллелизм, то есть возможность параллельного выполнения задачи. Для этого у нас есть два оператора subscribe_on () и Наблюдать_on (), которые будут работать с планировщиком, который будет определять выполнение подписанной задачи.

Вот рабочий пример, который показывает необходимость subscibe_on (), Observation_on () и планировщика.

пример

import random
import time
import rx
from rx import operators as ops
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
) 
input("Press any key to exit\n")

В приведенном выше примере у меня есть 2 задачи: задача 1 и задача 2. Выполнение задачи происходит последовательно. Вторая задача запускается только тогда, когда первая задача выполнена.

Вывод

E:\pyrx>python testrx.py
From Task 1: 1
From Task 1: 2
From Task 1: 3
From Task 1: 4
From Task 1: 5
Task 1 complete
From Task 2: 1
From Task 2: 2
From Task 2: 3
From Task 2: 4
Task 2 complete

RxPy поддерживает множество планировщиков, и здесь мы собираемся использовать ThreadPoolScheduler. ThreadPoolScheduler в основном пытается управлять доступными потоками ЦП.

В примере, который мы видели ранее, мы собираемся использовать модуль многопроцессорности, который даст нам cpu_count. Счетчик будет передан ThreadPoolScheduler, который сможет заставить задачу работать параллельно на основе доступных потоков.

Вот рабочий пример -

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
input("Press any key to exit\n")

В приведенном выше примере у меня 2 задачи, а cpu_count равно 4. Поскольку задача равна 2, а количество доступных потоков - 4, обе задачи могут запускаться параллельно.

Вывод

E:\pyrx>python testrx.py
Cpu count is : 4
Press any key to exit
From Task 1: 1
From Task 2: 1
From Task 1: 2
From Task 2: 2
From Task 2: 3
From Task 1: 3
From Task 2: 4
Task 2 complete
From Task 1: 4
From Task 1: 5
Task 1 complete

Если вы видите результат, обе задачи запущены параллельно.

Теперь рассмотрим сценарий, в котором задача больше, чем количество ЦП, т.е. количество ЦП равно 4, а задач - 5. В этом случае нам нужно будет проверить, освободился ли какой-либо поток после завершения задачи, чтобы он мог быть назначена новой задаче, доступной в очереди.

Для этой цели мы можем использовать оператор Observer_on (), который будет наблюдать за планировщиком, если какие-либо потоки свободны. Вот рабочий пример с использованием Observer_on ()

пример

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
#Task 3
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 3: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 3 complete")
)
#Task 4
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 4: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 4 complete")
)
#Task 5
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.observe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 5: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 5 complete")
)
input("Press any key to exit\n")

Вывод

E:\pyrx>python testrx.py
Cpu count is : 4
From Task 4: 1
From Task 4: 2
From Task 1: 1
From Task 2: 1
From Task 3: 1
From Task 1: 2
From Task 3: 2
From Task 4: 3
From Task 3: 3
From Task 2: 2
From Task 1: 3
From Task 4: 4
Task 4 complete
From Task 5: 1
From Task 5: 2
From Task 5: 3
From Task 3: 4
Task 3 complete
From Task 2: 3
Press any key to exit
From Task 5: 4
Task 5 complete
From Task 1: 4
From Task 2: 4
Task 2 complete
From Task 1: 5
Task 1 complete

Если вы видите результат, то в момент, когда задача 4 завершена, поток передается следующей задаче, то есть задаче 5, и она начинает выполняться.

В этой главе мы подробно обсудим следующие темы -

  • Базовый пример, показывающий работу наблюдаемого, операторов и подписку на наблюдателя.
  • Разница между наблюдаемым и субъектом.
  • Понимание холодных и горячих наблюдаемых.

Ниже приведен базовый пример, показывающий работу наблюдаемого, операторов и подписку на наблюдателя.

пример

test.py

import requests
import rx
import json
from rx import operators as ops
def filternames(x):
   if (x["name"].startswith("C")):
      return x["name"]
   else :
      return ""
content = requests.get('https://jsonplaceholder.typicode.com/users')
y = json.loads(content.text)
source = rx.from_(y)
case1 = source.pipe(
   ops.filter(lambda c: filternames(c)),
   ops.map(lambda a:a["name"])
)
case1.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)), 8. RxPy — Examples
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

Вот очень простой пример, в котором я получаю пользовательские данные с этого URL-адреса -

https://jsonplaceholder.typicode.com/users.

Фильтрация данных для присвоения имен, начинающихся с «C», а затем с использованием карты для возврата только имен. Вот результат для того же -

E:\pyrx\examples>python test.py
Got - Clementine Bauch
Got - Chelsey Dietrich
Got - Clementina DuBuque
Job Done!

Разница между наблюдаемым и предметом

В этом примере мы увидим разницу между наблюдаемым и субъектом.

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

Вывод

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

В приведенном выше примере каждый раз, когда вы подписываетесь на наблюдаемое, он будет давать вам новые значения.

Пример темы

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

Вывод

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

Если вы видите, что значения являются общими для обоих подписчиков, использующих тему.

Понимание холодных и горячих наблюдаемых

Наблюдаемая классифицируется как

  • Холодные наблюдаемые
  • Горячие наблюдаемые

Разница в наблюдаемых будет заметна при подписке нескольких подписчиков.

Холодные наблюдаемые

Холодные наблюдаемые - это наблюдаемые, которые выполняются и отображают данные при каждой подписке. Когда он подписан, выполняется наблюдаемое и выдаются свежие значения.

Следующий пример дает понимание наблюдаемого холода.

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

Вывод

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

В приведенном выше примере каждый раз, когда вы подписываетесь на наблюдаемый объект, он будет выполнять наблюдаемый объект и выдавать значения. Значения также могут отличаться от подписчика к подписчику, как показано в примере выше.

Горячие наблюдаемые

В случае горячего наблюдаемого они будут выдавать значения, когда они будут готовы, и не всегда будут ждать подписки. Когда значения будут отправлены, все подписчики получат одно и то же значение.

Вы можете использовать hot observable, если хотите, чтобы значения испускались, когда наблюдаемый готов, или вы хотите поделиться теми же значениями со всеми своими подписчиками.

Примером горячих наблюдаемых являются операторы Subject и connectable.

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

Вывод

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

Если вы видите, одно и то же значение разделяется между подписчиками. Вы можете добиться того же, используя подключаемый наблюдаемый оператор publish ().