RxPY - Hướng dẫn nhanh

Chương này giải thích lập trình phản ứng là gì, RxPY là gì, các toán tử, tính năng, ưu điểm và nhược điểm của nó.

Lập trình phản ứng là gì?

Lập trình phản ứng là một mô hình lập trình, xử lý luồng dữ liệu và sự lan truyền của sự thay đổi. Nó có nghĩa là, khi một luồng dữ liệu được phát ra bởi một thành phần, sự thay đổi sẽ được truyền đến các thành phần khác bởi một thư viện lập trình phản ứng. Sự lan truyền sự thay đổi sẽ tiếp tục cho đến khi nó đến người nhận cuối cùng.

Bằng cách sử dụng RxPY, bạn có quyền kiểm soát tốt các luồng dữ liệu không đồng bộ, ví dụ: yêu cầu được gửi đến URL có thể được theo dõi bằng cách sử dụng có thể quan sát và sử dụng trình quan sát để lắng nghe khi yêu cầu hoàn tất để phản hồi hoặc có lỗi.

RxPY cung cấp cho bạn khả năng xử lý các luồng dữ liệu không đồng bộ bằng cách sử dụng Observables, truy vấn các luồng dữ liệu bằng cách sử dụng Operators tức là lọc, tính tổng, kết hợp, ánh xạ và cũng sử dụng đồng thời cho các luồng dữ liệu bằng cách sử dụng Schedulers. Tạo một Observable, cung cấp một đối tượng quan sát với các phương thức on_next (v), on_error (e) và on_completed (), cần phảisubscribed để chúng tôi nhận được thông báo khi một sự kiện xảy ra.

Có thể truy vấn Observable bằng nhiều toán tử trong một định dạng chuỗi bằng cách sử dụng toán tử ống.

RxPY cung cấp các toán tử trong các danh mục khác nhau như: -

  • Toán tử toán học

  • Toán tử chuyển đổi

  • Lọc toán tử

  • Các toán tử xử lý lỗi

  • Các nhà khai thác tiện ích

  • Toán tử có điều kiện

  • Toán tử tạo

  • Các toán tử có thể kết nối

Các toán tử này được giải thích chi tiết trong hướng dẫn này.

RxPy là gì?

RxPY được định nghĩa là a library for composing asynchronous and event-based programs using observable collections and pipable query operators in Python theo trang web chính thức của RxPy, https://rxpy.readthedocs.io/en/latest/.

RxPY là một thư viện python để hỗ trợ Lập trình phản ứng. RxPy là viết tắt củaReactive Extensions for Python. Đó là một thư viện sử dụng các khả năng quan sát để làm việc với lập trình phản ứng xử lý các cuộc gọi dữ liệu không đồng bộ, lệnh gọi lại và các chương trình dựa trên sự kiện.

Đặc điểm của RxPy

Trong RxPy, các khái niệm sau sẽ xử lý tác vụ không đồng bộ:

Có thể quan sát được

Có thể quan sát là một hàm tạo một trình quan sát và gắn nó vào nguồn có các luồng dữ liệu được mong đợi từ đó, ví dụ: Tweet, các sự kiện liên quan đến máy tính, v.v.

Người quan sát

Nó là một đối tượng có các phương thức on_next (), on_error () và on_completed (), sẽ được gọi khi có tương tác với phương thức có thể quan sát được, tức là nguồn tương tác với một Tweet đến, v.v.

Đăng ký

Khi có thể quan sát được tạo, để thực thi có thể quan sát, chúng ta cần đăng ký với nó.

Các nhà khai thác

Toán tử là một hàm thuần túy lấy đầu vào có thể quan sát được và đầu ra cũng là một hàm có thể quan sát được. Bạn có thể sử dụng nhiều toán tử trên một dữ liệu quan sát được bằng cách sử dụng toán tử ống.

Môn học

Chủ thể là một chuỗi có thể quan sát được cũng như một người quan sát có thể phát đa hướng, tức là nói chuyện với nhiều người quan sát đã đăng ký. Đối tượng là một người quan sát lạnh, tức là các giá trị sẽ được chia sẻ giữa những người quan sát đã được đăng ký.

Người lên lịch

Một tính năng quan trọng của RxPy là đồng thời tức là cho phép nhiệm vụ thực thi song song. Để điều đó xảy ra, RxPy có hai toán tử subscribe_on () và Obser_on () hoạt động với các bộ lập lịch và sẽ quyết định việc thực thi tác vụ đã đăng ký.

Ưu điểm của việc sử dụng RxPY

Sau đây là những ưu điểm của RxPy:

  • RxPY là một thư viện tuyệt vời khi nói đến việc xử lý các sự kiện và luồng dữ liệu không đồng bộ. RxPY sử dụng khả năng quan sát để làm việc với lập trình phản ứng xử lý các cuộc gọi dữ liệu không đồng bộ, lệnh gọi lại và các chương trình dựa trên sự kiện.

  • RxPY cung cấp một bộ sưu tập khổng lồ các toán tử trong các danh mục toán học, biến đổi, lọc, tiện ích, điều kiện, xử lý lỗi, nối giúp dễ dàng sử dụng với lập trình phản ứng.

  • Đồng thời tức là làm việc nhiều tác vụ với nhau đạt được bằng cách sử dụng bộ lập lịch trong RxPY.

  • Hiệu suất được cải thiện bằng cách sử dụng RxPY vì việc xử lý tác vụ không đồng bộ và xử lý song song được thực hiện dễ dàng.

Bất lợi khi sử dụng RxPY

  • Gỡ lỗi mã bằng các vật có thể quan sát là một chút khó khăn.

Trong chương này, chúng ta sẽ tiến hành cài đặt RxPy. Để bắt đầu làm việc với RxPY, trước tiên chúng ta cần cài đặt Python. Vì vậy, chúng tôi sẽ làm việc sau:

  • Cài đặt Python
  • Cài đặt RxPy

Cài đặt Python

Truy cập trang web chính thức của Python: https://www.python.org/downloads/.như hình dưới đây và nhấp vào phiên bản mới nhất có sẵn cho Windows, Linux / Unix và mac os. Tải xuống Python theo hệ điều hành 64 hoặc 32-bit có sẵn bên mình.

Khi bạn đã tải xuống, hãy nhấp vào .exe file và làm theo các bước để cài đặt python trên hệ thống của bạn.

Trình quản lý gói python, tức là pip cũng sẽ được cài đặt theo mặc định với cài đặt trên. Để làm cho nó hoạt động trên toàn cầu trên hệ thống của bạn, hãy thêm trực tiếp vị trí của python vào biến PATH, biến này được hiển thị khi bắt đầu cài đặt, hãy nhớ đánh dấu vào hộp kiểm cho biết ADD to PATH. Trong trường hợp, bạn quên kiểm tra nó, vui lòng làm theo các bước dưới đây để thêm vào PATH.

Để thêm vào PATH, hãy làm theo các bước sau:

Nhấp chuột phải vào biểu tượng Máy tính của bạn và nhấp vào thuộc tính → Cài đặt Hệ thống Nâng cao.

Nó sẽ hiển thị màn hình như hình dưới đây -

Nhấp vào Biến môi trường như hình trên. Nó sẽ hiển thị màn hình như hình dưới đây -

Chọn Đường dẫn và nhấp vào nút Chỉnh sửa, thêm đường dẫn vị trí của con trăn của bạn vào cuối. Bây giờ, hãy kiểm tra phiên bản python.

Kiểm tra phiên bản python

E:\pyrx>python --version
Python 3.7.3

Cài đặt RxPY

Bây giờ, chúng ta đã cài đặt python, chúng ta sẽ cài đặt RxPy.

Sau khi python được cài đặt, trình quản lý gói python, tức là pip cũng sẽ được cài đặt. Sau đây là lệnh để kiểm tra phiên bản pip:

E:\pyrx>pip --version
pip 19.1.1 from c:\users\xxxx\appdata\local\programs\python\python37\lib\site-
packages\pip (python 3.7)

Chúng tôi đã cài đặt pip và phiên bản là 19.1.1. Bây giờ, chúng ta sẽ sử dụng pip để cài đặt RxPy

Lệnh như sau:

pip install rx

Trong hướng dẫn này, chúng tôi đang sử dụng RxPY phiên bản 3 và phiên bản python 3.7.3. Hoạt động của RxPY phiên bản 3 khác một chút với phiên bản trước đó, tức là RxPY phiên bản 1.

Trong chương này, chúng ta sẽ thảo luận về sự khác biệt giữa 2 phiên bản và những thay đổi cần thực hiện trong trường hợp bạn đang cập nhật phiên bản Python và RxPY.

Có thể quan sát trong RxPY

Trong phiên bản RxPy 1, Observable là một lớp riêng biệt -

from rx import Observable

Để sử dụng Observable, bạn phải sử dụng nó như sau:

Observable.of(1,2,3,4,5,6,7,8,9,10)

Trong phiên bản RxPy 3, Observable trực tiếp là một phần của gói rx.

Example

import rx
rx.of(1,2,3,4,5,6,7,8,9,10)

Các toán tử trong RxPy

Trong phiên bản 1, toán tử là các phương thức trong lớp Observable. Ví dụ, để sử dụng các toán tử, chúng ta phải nhập Observable như hình dưới đây:

from rx import Observable

Ví dụ: các toán tử được sử dụng như Observable.operator, như được hiển thị bên dưới:

Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

Trong trường hợp RxPY phiên bản 3, các toán tử là chức năng và được nhập và sử dụng như sau:

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

Các toán tử chuỗi sử dụng phương thức Pipe ()

Trong phiên bản RxPy 1, trong trường hợp bạn phải sử dụng nhiều toán tử trên một thiết bị có thể quan sát được, nó phải được thực hiện như sau:

Example

from rx import Observable
Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

Tuy nhiên, trong trường hợp RxPY phiên bản 3, bạn có thể sử dụng phương thức pipe () và nhiều toán tử như được hiển thị bên dưới:

Example

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

Có thể quan sát, là một hàm tạo trình quan sát và gắn nó vào nguồn nơi các giá trị được mong đợi, ví dụ: nhấp chuột, sự kiện chuột từ phần tử dom, v.v.

Các chủ đề được đề cập dưới đây sẽ được nghiên cứu chi tiết trong chương này.

  • Tạo có thể quan sát

  • Đăng ký và thực hiện một quan sát được

Tạo vật có thể quan sát

Để tạo ra một cái có thể quan sát được, chúng tôi sẽ sử dụng create() và truyền hàm cho nó có các mục sau.

  • on_next() - Hàm này được gọi khi Observable phát ra một mục.

  • on_completed() - Hàm này được gọi khi Observable hoàn tất.

  • on_error() - Hàm này được gọi khi có lỗi xảy ra trên Observable.

Để làm việc với phương thức create (), trước tiên hãy nhập phương thức như hình dưới đây:

from rx import create

Đây là một ví dụ hoạt động, để tạo ra một -

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error")
   observer.on_completed()
source = create(test_observable).

Đăng ký và thực hiện một quan sát được

Để đăng ký một hàm có thể quan sát được, chúng ta cần sử dụng hàm subscribe () và chuyển hàm gọi lại on_next, on_error và on_completed.

Đây là một ví dụ hoạt động -

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

Phương thức subscribe () đảm nhận việc thực thi phương thức có thể quan sát được. Chức năng gọi lạion_next, on_erroron_completedphải được chuyển đến phương thức đăng ký. Lần lượt gọi phương thức đăng ký thực thi hàm test_observable ().

Không bắt buộc phải chuyển cả ba hàm gọi lại cho phương thức subscribe (). Bạn có thể chuyển on_next (), on_error () và on_completed () theo yêu cầu của mình.

Hàm lambda được sử dụng cho on_next, on_error và on_completed. Nó sẽ nhận các đối số và thực thi biểu thức đã cho.

Đây là đầu ra, của kết quả quan sát được tạo ra -

E:\pyrx>python testrx.py
Got - Hello
Job Done!

Chương này giải thích chi tiết về các toán tử trong RxPY. Các toán tử này bao gồm -

  • Làm việc với các nhà điều hành
  • Toán tử toán học
  • Toán tử chuyển đổi
  • Lọc toán tử
  • Các toán tử xử lý lỗi
  • Các nhà khai thác tiện ích
  • Toán tử có điều kiện
  • Toán tử tạo
  • Các toán tử có thể kết nối
  • Kết hợp các toán tử

Reactive (Rx) python hầu như có rất nhiều toán tử, điều này làm cho cuộc sống trở nên dễ dàng với việc mã hóa python. Bạn có thể sử dụng nhiều toán tử này cùng nhau, ví dụ, trong khi làm việc với chuỗi, bạn có thể sử dụng các toán tử ánh xạ, lọc, hợp nhất.

Làm việc với các nhà điều hành

Bạn có thể làm việc với nhiều toán tử cùng nhau bằng cách sử dụng phương thức pipe (). Phương pháp này cho phép xâu chuỗi nhiều toán tử lại với nhau.

Đây là một ví dụ hoạt động của việc sử dụng các toán tử -

test = of(1,2,3) // an observable
subscriber = test.pipe(
   op1(),
   op2(),
   op3()
)

Trong ví dụ trên, chúng tôi đã tạo một phương thức có thể quan sát bằng cách sử dụng () nhận các giá trị 1, 2 và 3. Bây giờ, trên có thể quan sát này, bạn có thể thực hiện một thao tác khác, sử dụng bất kỳ số toán tử nào sử dụng phương thức pipe () như hình minh họa ở trên. Việc thực thi các toán tử sẽ diễn ra tuần tự trên những gì có thể quan sát được.

Để làm việc với các toán tử, trước tiên hãy nhập nó như hình dưới đây:

from rx import of, operators as op

Đây là một ví dụ hoạt động -

testrx.py

from rx import of, operators as op
test = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
sub1 = test.pipe(
   op.filter(lambda s: s%2==0),
   op.reduce(lambda acc, x: acc + x)
)
sub1.subscribe(lambda x: print("Sum of Even numbers is {0}".format(x)))

Trong ví dụ trên, có một danh sách các số, từ đó chúng ta đang lọc các số chẵn bằng toán tử bộ lọc và sau đó thêm nó bằng toán tử giảm.

Output

E:\pyrx>python testrx.py
Sum of Even numbers is 30

Đây là danh sách các Nhà điều hành mà chúng ta sẽ thảo luận -

  • Tạo có thể quan sát
  • Toán tử toán học
  • Toán tử chuyển đổi
  • Lọc toán tử
  • Các toán tử xử lý lỗi
  • Các nhà khai thác tiện ích
  • Conditional
  • Connectable
  • Kết hợp các toán tử

Tạo có thể quan sát

Sau đây là những điều có thể quan sát, chúng ta sẽ thảo luận trong danh mục Sáng tạo

Hiển thị các ví dụ

Có thể quan sát được Sự miêu tả
tạo nên Phương pháp này được sử dụng để tạo ra một quan sát được.
trống Điều này có thể quan sát được sẽ không xuất ra bất cứ thứ gì và trực tiếp phát ra trạng thái hoàn chỉnh.
không bao giờ Phương pháp này tạo ra một quan sát được sẽ không bao giờ đạt đến trạng thái hoàn chỉnh.
phi Phương thức này sẽ tạo ra một lỗi có thể quan sát được.
từ_ Phương thức này sẽ chuyển đổi mảng hoặc đối tượng đã cho thành một đối tượng có thể quan sát được.
khoảng thời gian Phương pháp này sẽ cung cấp một loạt các giá trị được tạo ra sau một thời gian chờ.
chỉ Phương thức này sẽ chuyển đổi giá trị đã cho thành một giá trị có thể quan sát được.
phạm vi Phương thức này sẽ cung cấp một loạt các số nguyên dựa trên đầu vào đã cho.
giá trị lặp lại Phương thức này sẽ tạo ra một giá trị có thể quan sát được sẽ lặp lại giá trị đã cho theo số lượng được đưa ra.
khởi đầu Phương thức này nhận một hàm làm đầu vào và trả về một giá trị có thể quan sát được sẽ trả về giá trị từ hàm đầu vào.
hẹn giờ Phương thức này sẽ phát ra các giá trị theo thứ tự sau khi hết thời gian chờ.

Toán tử toán học

Các toán tử mà chúng ta sẽ thảo luận trong danh mục Toán tử như sau: -

Hiển thị các ví dụ

Nhà điều hành Sự miêu tả
Trung bình cộng Toán tử này sẽ tính giá trị trung bình từ nguồn có thể quan sát được đã cho và xuất ra giá trị có thể quan sát được sẽ có giá trị trung bình.
kết hợp Toán tử này sẽ nhận vào hai hoặc nhiều khả năng quan sát và đưa ra một khả năng quan sát duy nhất với tất cả các giá trị trong chuỗi.
đếm

Toán tử này nhận một có thể quan sát với các giá trị và chuyển đổi nó thành một có thể quan sát sẽ có một giá trị duy nhất. Hàm count nhận trong hàm vị từ như một đối số tùy chọn.

Hàm có kiểu boolean và sẽ chỉ thêm giá trị vào đầu ra nếu nó thỏa mãn điều kiện.

tối đa Toán tử này sẽ đưa ra một giá trị có thể quan sát được với giá trị tối đa từ nguồn có thể quan sát được.
min Toán tử này sẽ đưa ra một giá trị có thể quan sát được với giá trị nhỏ nhất từ ​​nguồn có thể quan sát được.
giảm Toán tử này nhận vào một hàm được gọi là hàm tích lũy được sử dụng trên các giá trị đến từ nguồn có thể quan sát được và nó trả về các giá trị tích lũy ở dạng có thể quan sát được, với một giá trị gốc tùy chọn được chuyển đến hàm tích lũy.
Tổng Toán tử này sẽ trả về một giá trị có thể quan sát được với tổng của tất cả các giá trị từ nguồn có thể quan sát được.

Toán tử chuyển đổi

Các toán tử mà chúng ta sẽ thảo luận trong danh mục Toán tử chuyển đổi được đề cập dưới đây:

Hiển thị các ví dụ

Nhà điều hành thể loại
đệm Toán tử này sẽ thu thập tất cả các giá trị từ nguồn có thể quan sát được và phát ra chúng theo các khoảng thời gian đều đặn khi điều kiện biên đã cho được thỏa mãn.
ground_by Toán tử này sẽ nhóm các giá trị đến từ nguồn có thể quan sát được dựa trên hàm key_mapper đã cho.
bản đồ Toán tử này sẽ thay đổi mỗi giá trị từ nguồn có thể quan sát thành một giá trị mới dựa trên kết quả của mapper_func đã cho.
quét Toán tử này sẽ áp dụng một hàm tích lũy cho các giá trị đến từ nguồn có thể quan sát được và trả về một giá trị có thể quan sát được với các giá trị mới.

Lọc toán tử

Các toán tử mà chúng ta sẽ thảo luận trong danh mục toán tử Lọc được đưa ra dưới đây:

Hiển thị các ví dụ

Nhà điều hành thể loại
sự suy đồi Toán tử này sẽ cung cấp các giá trị từ nguồn có thể quan sát được, cho đến khi khoảng thời gian đã cho và bỏ qua phần thời gian còn lại.
khác biệt Toán tử này sẽ cung cấp tất cả các giá trị khác biệt với nguồn có thể quan sát được.
element_at Toán tử này sẽ cung cấp một phần tử từ nguồn có thể quan sát được cho chỉ số đã cho.
bộ lọc Toán tử này sẽ lọc các giá trị từ nguồn có thể quan sát được dựa trên hàm vị từ đã cho.
Đầu tiên Toán tử này sẽ cho phần tử đầu tiên từ nguồn có thể quan sát được.
ignore_elements Toán tử này sẽ bỏ qua tất cả các giá trị từ nguồn có thể quan sát được và chỉ thực hiện các lệnh gọi để hoàn thành hoặc các hàm gọi lại lỗi.
Cuối cùng Toán tử này sẽ đưa ra phần tử cuối cùng từ nguồn có thể quan sát được.
nhảy Toán tử này sẽ trả lại một giá trị có thể quan sát được sẽ bỏ qua lần xuất hiện đầu tiên của các mục đếm được lấy làm đầu vào.
bỏ qua Toán tử này sẽ trả lại một giá trị có thể quan sát được sẽ bỏ qua lần xuất hiện cuối cùng của các mục đếm được lấy làm đầu vào.
lấy Toán tử này sẽ đưa ra danh sách các giá trị nguồn theo thứ tự liên tục dựa trên số lượng đã cho.
take_last Toán tử này sẽ đưa ra một danh sách các giá trị nguồn theo thứ tự liên tục từ cuối cùng dựa trên số lượng đã cho.

Các toán tử xử lý lỗi

Các toán tử mà chúng ta sẽ thảo luận trong danh mục Toán tử xử lý lỗi là: -

Hiển thị các ví dụ

Nhà điều hành Sự miêu tả
nắm lấy Toán tử này sẽ kết thúc nguồn có thể quan sát được khi có một ngoại lệ.
thử lại Toán tử này sẽ thử lại trên nguồn có thể quan sát được khi có lỗi và khi quá trình thử lại được thực hiện, nó sẽ kết thúc.

Các nhà khai thác tiện ích

Sau đây là các toán tử mà chúng ta sẽ thảo luận trong danh mục Toán tử tiện ích.

Hiển thị các ví dụ

Nhà điều hành Sự miêu tả
sự chậm trễ Nhà điều hành này sẽ trì hoãn phát xạ có thể quan sát được nguồn theo thời gian hoặc ngày được đưa ra.
cụ thể hoá Toán tử này sẽ chuyển đổi các giá trị từ nguồn có thể quan sát được với các giá trị được phát ra dưới dạng giá trị thông báo rõ ràng.
Khoảng thời gian Toán tử này sẽ cung cấp thời gian trôi qua giữa các giá trị từ nguồn có thể quan sát được.
hết giờ Toán tử này sẽ cung cấp tất cả các giá trị từ nguồn có thể quan sát được sau thời gian trôi qua, nếu không sẽ gây ra lỗi.
dấu thời gian Toán tử này sẽ đính kèm một dấu thời gian cho tất cả các giá trị từ nguồn có thể quan sát được.

Toán tử có điều kiện và Boolean

Các toán tử mà chúng ta sẽ thảo luận trong danh mục Toán tử có điều kiện và Boolean như được đưa ra bên dưới:

Hiển thị các ví dụ

Nhà điều hành Sự miêu tả
tất cả Toán tử này sẽ kiểm tra xem tất cả các giá trị từ nguồn có thể quan sát được có thỏa mãn điều kiện đã cho hay không.
chứa đựng Toán tử này sẽ trả về một giá trị có thể quan sát được với giá trị đúng hoặc sai nếu giá trị đã cho có mặt và nếu nó là giá trị của nguồn có thể quan sát được.
default_if_empty Toán tử này sẽ trả về giá trị mặc định nếu nguồn có thể quan sát được trống.
chuỗi_equal Toán tử này sẽ so sánh hai chuỗi có thể quan sát hoặc một mảng giá trị và trả về một có thể quan sát với giá trị true hoặc false.
bỏ qua Toán tử này sẽ loại bỏ các giá trị từ nguồn có thể quan sát được cho đến khi thứ hai có thể quan sát được tạo ra một giá trị.
bỏ qua Toán tử này sẽ trả về một giá trị có thể quan sát được với các giá trị từ nguồn có thể quan sát được thỏa mãn điều kiện được truyền.
take_until Toán tử này sẽ loại bỏ các giá trị khỏi nguồn có thể quan sát được sau khi nguồn có thể quan sát được thứ hai phát ra một giá trị hoặc kết thúc.
mất_giá Toán tử này sẽ loại bỏ các giá trị khỏi nguồn có thể quan sát được khi điều kiện không thành công.

Các nhà khai thác có thể kết nối

Các toán tử mà chúng ta sẽ thảo luận trong danh mục Toán tử có thể kết nối là:

Hiển thị các ví dụ

Nhà điều hành Sự miêu tả
công bố Phương pháp này sẽ chuyển đổi cái có thể quan sát được thành cái có thể quan sát được.
ref_count Toán tử này sẽ làm cho người có thể quan sát được trở thành có thể quan sát được bình thường.
phát lại Phương thức này hoạt động tương tự như replaySubject. Phương thức này sẽ trả về các giá trị tương tự, ngay cả khi giá trị có thể quan sát đã được phát ra và một số người đăng ký đăng ký muộn.

Kết hợp các toán tử

Sau đây là các toán tử mà chúng ta sẽ thảo luận trong danh mục Toán tử kết hợp.

Hiển thị các ví dụ

Nhà điều hành Sự miêu tả
kết hợp Toán tử này sẽ tạo một bộ giá trị cho đầu vào có thể quan sát được.
hợp nhất Toán tử này sẽ hợp nhất các khả năng quan sát đã cho.
bắt đầu với Toán tử này sẽ nhận các giá trị đã cho và thêm vào đầu nguồn có thể quan sát được, trả về chuỗi đầy đủ.
zip Toán tử này trả về một giá trị có thể quan sát được với các giá trị ở dạng tuple được hình thành bằng cách lấy giá trị đầu tiên của giá trị có thể quan sát đã cho, v.v.

Chủ thể là một chuỗi có thể quan sát được, cũng như, một người quan sát có thể phát đa hướng, tức là nói chuyện với nhiều người quan sát đã đăng ký.

Chúng ta sẽ thảo luận các chủ đề sau về chủ đề -

  • Tạo một chủ đề
  • Đăng ký một chủ đề
  • Truyền dữ liệu cho chủ đề
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

Tạo một chủ đề

Để làm việc với một chủ đề, chúng ta cần nhập Chủ đề như hình dưới đây -

from rx.subject import Subject

Bạn có thể tạo một chủ thể-đối tượng như sau:

subject_test = Subject()

Đối tượng là một người quan sát có ba phương pháp:

  • on_next(value)
  • on_error (error) và
  • on_completed()

Đăng ký một Chủ đề

Bạn có thể tạo nhiều đăng ký theo chủ đề như hình dưới đây -

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

Truyền dữ liệu cho chủ đề

Bạn có thể chuyển dữ liệu cho chủ đề được tạo bằng phương thức on_next (value) như hình dưới đây -

subject_test.on_next("A")
subject_test.on_next("B")

Dữ liệu sẽ được chuyển cho tất cả các đăng ký, được thêm vào chủ đề.

Đây là một ví dụ hoạt động của chủ đề này.

Thí dụ

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

Đối tượng subject_test được tạo bằng cách gọi một Subject (). Đối tượng subject_test có tham chiếu đến các phương thức on_next (value), on_error (error) và on_completed (). Kết quả của ví dụ trên được hiển thị bên dưới:

Đầu ra

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

Chúng ta có thể sử dụng phương thức on_completed () để dừng việc thực thi chủ đề như hình dưới đây.

Thí dụ

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

Khi chúng tôi gọi hoàn tất, phương thức tiếp theo được gọi sau sẽ không được gọi.

Đầu ra

E:\pyrx>python testrx.py
The value is A
The value is A

Bây giờ chúng ta hãy xem, cách gọi phương thức on_error (error).

Thí dụ

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

Đầu ra

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

BehaviorSubject

BehaviorSubject sẽ cung cấp cho bạn giá trị mới nhất khi được gọi. Bạn có thể tạo chủ đề hành vi như hình dưới đây -

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

Đây là một ví dụ hoạt động để sử dụng Chủ đề hành vi

Thí dụ

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

Đầu ra

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

Phát lại chủ đề

Một replayubject tương tự như chủ đề hành vi, trong đó, nó có thể đệm các giá trị và phát lại giống như vậy cho người đăng ký mới. Đây là một ví dụ hoạt động của chủ đề phát lại.

Thí dụ

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

Giá trị đệm được sử dụng là 2 trên chủ đề phát lại. Vì vậy, hai giá trị cuối cùng sẽ được đệm và sử dụng cho các thuê bao mới được gọi.

Đầu ra

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

Trong trường hợp của AsyncSubject, giá trị cuối cùng được gọi được chuyển cho thuê bao và nó sẽ chỉ được thực hiện sau khi phương thức complete () được gọi.

Thí dụ

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

Đầu ra

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2

Một tính năng quan trọng của RxPy là đồng thời, tức là cho phép tác vụ thực thi song song. Để điều đó xảy ra, chúng tôi có hai toán tử subscribe_on () và Obser_on () sẽ hoạt động với một bộ lập lịch, sẽ quyết định việc thực thi tác vụ đã đăng ký.

Đây là một ví dụ làm việc cho thấy sự cần thiết của subscibe_on (), Obser_on () và bộ lập lịch.

Thí dụ

import random
import time
import rx
from rx import operators as ops
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
) 
input("Press any key to exit\n")

Trong ví dụ trên, tôi có 2 nhiệm vụ: Nhiệm vụ 1 và Nhiệm vụ 2. Việc thực hiện nhiệm vụ theo trình tự. Nhiệm vụ thứ hai chỉ bắt đầu khi nhiệm vụ đầu tiên được hoàn thành.

Đầu ra

E:\pyrx>python testrx.py
From Task 1: 1
From Task 1: 2
From Task 1: 3
From Task 1: 4
From Task 1: 5
Task 1 complete
From Task 2: 1
From Task 2: 2
From Task 2: 3
From Task 2: 4
Task 2 complete

RxPy hỗ trợ nhiều Scheduler và ở đây, chúng ta sẽ sử dụng ThreadPoolScheduler. ThreadPoolScheduler chủ yếu sẽ cố gắng quản lý với các luồng CPU có sẵn.

Trong ví dụ, chúng ta đã thấy trước đó, chúng ta sẽ sử dụng một mô-đun đa xử lý sẽ cung cấp cho chúng ta cpu_count. Số lượng sẽ được trao cho ThreadPoolScheduler sẽ quản lý để tác vụ hoạt động song song dựa trên các luồng có sẵn.

Đây là một ví dụ hoạt động -

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
input("Press any key to exit\n")

Trong ví dụ trên, tôi có 2 tác vụ và số cpu_count là 4. Vì, tác vụ là 2 và luồng có sẵn với chúng tôi là 4, cả hai tác vụ có thể bắt đầu song song.

Đầu ra

E:\pyrx>python testrx.py
Cpu count is : 4
Press any key to exit
From Task 1: 1
From Task 2: 1
From Task 1: 2
From Task 2: 2
From Task 2: 3
From Task 1: 3
From Task 2: 4
Task 2 complete
From Task 1: 4
From Task 1: 5
Task 1 complete

Nếu bạn thấy đầu ra, cả hai tác vụ đã bắt đầu song song.

Bây giờ, hãy xem xét một tình huống, trong đó nhiệm vụ nhiều hơn số CPU, tức là số CPU là 4 và nhiệm vụ là 5. Trong trường hợp này, chúng ta sẽ cần kiểm tra xem có luồng nào còn trống sau khi hoàn thành tác vụ không, vì vậy, nó có thể được giao cho nhiệm vụ mới có sẵn trong hàng đợi.

Với mục đích này, chúng ta có thể sử dụng toán tử Obser_on () sẽ quan sát bộ lập lịch nếu bất kỳ luồng nào còn trống. Đây là một ví dụ làm việc bằng cách sử dụng Obser_on ()

Thí dụ

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
#Task 3
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 3: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 3 complete")
)
#Task 4
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 4: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 4 complete")
)
#Task 5
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.observe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 5: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 5 complete")
)
input("Press any key to exit\n")

Đầu ra

E:\pyrx>python testrx.py
Cpu count is : 4
From Task 4: 1
From Task 4: 2
From Task 1: 1
From Task 2: 1
From Task 3: 1
From Task 1: 2
From Task 3: 2
From Task 4: 3
From Task 3: 3
From Task 2: 2
From Task 1: 3
From Task 4: 4
Task 4 complete
From Task 5: 1
From Task 5: 2
From Task 5: 3
From Task 3: 4
Task 3 complete
From Task 2: 3
Press any key to exit
From Task 5: 4
Task 5 complete
From Task 1: 4
From Task 2: 4
Task 2 complete
From Task 1: 5
Task 1 complete

Nếu bạn thấy kết quả đầu ra, thời điểm tác vụ 4 hoàn thành, luồng được trao cho tác vụ tiếp theo, tức là tác vụ 5 và cùng một bắt đầu thực hiện.

Trong chương này, chúng ta sẽ thảo luận chi tiết về các chủ đề sau:

  • Ví dụ cơ bản cho thấy hoạt động của người quan sát, người vận hành và đăng ký người quan sát.
  • Sự khác biệt giữa có thể quan sát và chủ thể.
  • Hiểu biết quan sát lạnh và nóng.

Dưới đây là một ví dụ cơ bản cho thấy hoạt động của người quan sát, toán tử và đăng ký trình quan sát.

Thí dụ

test.py

import requests
import rx
import json
from rx import operators as ops
def filternames(x):
   if (x["name"].startswith("C")):
      return x["name"]
   else :
      return ""
content = requests.get('https://jsonplaceholder.typicode.com/users')
y = json.loads(content.text)
source = rx.from_(y)
case1 = source.pipe(
   ops.filter(lambda c: filternames(c)),
   ops.map(lambda a:a["name"])
)
case1.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)), 8. RxPy — Examples
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

Đây là một ví dụ rất đơn giản, trong đó, tôi đang lấy dữ liệu người dùng từ URL này -

https://jsonplaceholder.typicode.com/users.

Lọc dữ liệu, để cung cấp các tên bắt đầu bằng "C" và sau đó sử dụng bản đồ để chỉ trả về các tên. Đây là đầu ra cho cùng một -

E:\pyrx\examples>python test.py
Got - Clementine Bauch
Got - Chelsey Dietrich
Got - Clementina DuBuque
Job Done!

Sự khác biệt giữa có thể quan sát và đối tượng

Trong ví dụ này, chúng ta sẽ thấy sự khác biệt giữa đối tượng có thể quan sát và đối tượng.

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

Đầu ra

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

Trong ví dụ trên, mỗi khi bạn đăng ký vào trang có thể quan sát, nó sẽ cung cấp cho bạn các giá trị mới.

Ví dụ chủ đề

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

Đầu ra

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

Nếu bạn thấy các giá trị được chia sẻ, giữa cả hai người đăng ký sử dụng chủ đề.

Hiểu biết về vật quan sát lạnh và nóng

Một quan sát được phân loại là

  • Có thể quan sát lạnh
  • Có thể quan sát nóng

Sự khác biệt về khả năng quan sát sẽ được nhận thấy khi nhiều người đăng ký đang đăng ký.

Có thể quan sát lạnh

Các vật quan sát lạnh, có thể quan sát được thực thi và hiển thị dữ liệu mỗi khi nó được đăng ký. Khi nó được đăng ký, giá trị có thể quan sát được thực thi và các giá trị mới được đưa ra.

Ví dụ sau đây cung cấp sự hiểu biết về lạnh có thể quan sát được.

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

Đầu ra

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

Trong ví dụ trên, mỗi khi bạn đăng ký giá trị có thể quan sát, nó sẽ thực thi các giá trị có thể quan sát và phát ra. Các giá trị cũng có thể khác nhau từ người đăng ký này sang người đăng ký khác như thể hiện trong ví dụ trên.

Có thể quan sát nóng

Trong trường hợp có thể quan sát được, chúng sẽ phát ra các giá trị khi chúng sẵn sàng và không phải lúc nào cũng chờ đăng ký. Khi các giá trị được phát ra, tất cả những người đăng ký sẽ nhận được cùng một giá trị.

Bạn có thể sử dụng các giá trị có thể quan sát được khi bạn muốn các giá trị được phát ra khi có thể quan sát đã sẵn sàng hoặc bạn muốn chia sẻ các giá trị giống nhau cho tất cả người đăng ký của mình.

Một ví dụ về các toán tử có thể quan sát được là Chủ đề và các toán tử có thể kết nối.

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

Đầu ra

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

Nếu bạn thấy, giá trị giống nhau được chia sẻ giữa những người đăng ký. Bạn có thể đạt được điều tương tự bằng cách sử dụng toán tử quan sát có thể kết nối xuất bản ().