Реактивное программирование

Реактивное программирование - это парадигма программирования, которая имеет дело с потоками данных и распространением изменений. Это означает, что когда поток данных испускается одним компонентом, изменение будет распространено на другие компоненты библиотекой реактивного программирования. Распространение изменения будет продолжаться до тех пор, пока оно не достигнет последнего получателя. Разница между программированием, управляемым событиями, и реактивным программированием заключается в том, что программирование, управляемое событиями, вращается вокруг событий, а реактивное программирование - вокруг данных.

ReactiveX или RX для реактивного программирования

ReactiveX или Raective Extension - самая известная реализация реактивного программирования. Работа ReactiveX зависит от следующих двух классов -

Наблюдаемый класс

Этот класс является источником потока данных или событий и упаковывает входящие данные, чтобы данные можно было передавать из одного потока в другой. Он не выдаст данные, пока на него не подпишется какой-нибудь наблюдатель.

Класс наблюдателя

Этот класс потребляет поток данных, излучаемый observable. Может быть несколько наблюдателей с наблюдаемым, и каждый наблюдатель будет получать каждый элемент данных, который испускается. Наблюдатель может получать события трех типов, подписавшись на наблюдаемое -

  • on_next() event - Это означает, что в потоке данных есть элемент.

  • on_completed() event - Это означает конец эмиссии и больше никаких предметов не будет.

  • on_error() event - Также подразумевается конец эмиссии, но в случае, если ошибка выдается observable.

RxPY - модуль Python для реактивного программирования

RxPY - это модуль Python, который можно использовать для реактивного программирования. Нам нужно убедиться, что модуль установлен. Следующая команда может использоваться для установки модуля RxPY -

pip install RxPY

пример

Ниже приведен сценарий Python, который использует RxPY модуль и его классы Observable а также Observe forреактивное программирование. В основном есть два класса -

  • get_strings() - для получения строк от наблюдателя.

  • PrintObserver()- для печати строк от наблюдателя. Он использует все три события класса наблюдателя. Он также использует класс subscribe ().

from rx import Observable, Observer
def get_strings(observer):
   observer.on_next("Ram")
   observer.on_next("Mohan")
   observer.on_next("Shyam")
      observer.on_completed()
class PrintObserver(Observer):
   def on_next(self, value):
      print("Received {0}".format(value))
   def on_completed(self):
   print("Finished")
   def on_error(self, error):
      print("Error: {0}".format(error))
source = Observable.create(get_strings)
source.subscribe(PrintObserver())

Выход

Received Ram
Received Mohan
Received Shyam
Finished

PyFunctional библиотека для реактивного программирования

PyFunctional- еще одна библиотека Python, которую можно использовать для реактивного программирования. Это позволяет нам создавать функциональные программы с использованием языка программирования Python. Это полезно, потому что позволяет нам создавать конвейеры данных с помощью связанных функциональных операторов.

Разница между RxPY и PyFunctional

Обе библиотеки используются для реактивного программирования и обрабатывают поток аналогичным образом, но основное различие между ними зависит от обработки данных. RxPY обрабатывает данные и события в системе, пока PyFunctional ориентирована на преобразование данных с использованием парадигм функционального программирования.

Установка PyFunctional модуля

Нам необходимо установить этот модуль перед его использованием. Его можно установить с помощью команды pip следующим образом:

pip install pyfunctional

пример

В следующем примере используется the PyFunctional модуль и его seqкласс, который действует как объект потока, с которым мы можем выполнять итерацию и манипулировать. В этой программе он отображает последовательность, используя функцию lamda, которая удваивает каждое значение, затем фильтрует значение, где x больше 4, и, наконец, сокращает последовательность до суммы всех оставшихся значений.

from functional import seq

result = seq(1,2,3).map(lambda x: x*2).filter(lambda x: x > 4).reduce(lambda x, y: x + y)

print ("Result: {}".format(result))

Выход

Result: 6