RxPY - การทำงานกับ Observables
สังเกตได้คือฟังก์ชันที่สร้างผู้สังเกตการณ์และเชื่อมต่อกับแหล่งที่มาซึ่งคาดว่าจะมีค่าตัวอย่างเช่นการคลิกเหตุการณ์ของเมาส์จากองค์ประกอบ dom เป็นต้น
หัวข้อที่กล่าวถึงด้านล่างจะได้รับการศึกษาโดยละเอียดในบทนี้
สร้าง Observables
สมัครสมาชิกและดำเนินการตามที่สังเกตได้
สร้างสิ่งที่สังเกตได้
ในการสร้างสิ่งที่สังเกตได้เราจะใช้ create() วิธีการและส่งผ่านฟังก์ชันไปที่มีรายการต่อไปนี้
on_next() - ฟังก์ชั่นนี้จะถูกเรียกใช้เมื่อ Observable ปล่อยไอเท็ม
on_completed() - ฟังก์ชันนี้จะถูกเรียกใช้เมื่อ Observable เสร็จสมบูรณ์
on_error() - ฟังก์ชันนี้จะถูกเรียกใช้เมื่อเกิดข้อผิดพลาดบน Observable
ในการทำงานกับ create () วิธีการแรกนำเข้าวิธีการดังที่แสดงด้านล่าง -
from rx import create
นี่คือตัวอย่างการทำงานเพื่อสร้างสิ่งที่สังเกตได้ -
testrx.py
from rx import create
deftest_observable(observer, scheduler):
observer.on_next("Hello")
observer.on_error("Error")
observer.on_completed()
source = create(test_observable).
สมัครสมาชิกและดำเนินการตามที่สังเกตได้
ในการสมัครรับข้อมูลที่สังเกตได้เราจำเป็นต้องใช้ฟังก์ชัน subscribe () และส่งผ่านฟังก์ชันเรียกกลับ on_next, on_error และ on_completed
นี่คือตัวอย่างการทำงาน -
testrx.py
from rx import create
deftest_observable(observer, scheduler):
observer.on_next("Hello")
observer.on_completed()
source = create(test_observable)
source.subscribe(
on_next = lambda i: print("Got - {0}".format(i)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!"),
)
วิธี subscribe () ดูแลการเรียกใช้งานที่สังเกตได้ ฟังก์ชันการโทรกลับon_next, on_error และ on_completedจะต้องส่งต่อไปยังวิธีการสมัครสมาชิก เรียกวิธีสมัครสมาชิกในทางกลับกันเรียกใช้ฟังก์ชัน test_observable ()
ไม่บังคับให้ส่งผ่านฟังก์ชันเรียกกลับทั้งสามไปยังเมธอด subscribe () คุณสามารถส่งผ่านตามความต้องการของคุณใน on_next (), on_error () และ on_completed ()
ฟังก์ชัน lambda ใช้สำหรับ on_next, on_error และ on_completed จะใช้เวลาในการโต้แย้งและดำเนินการตามนิพจน์ที่กำหนด
นี่คือผลลัพธ์ของสิ่งที่สังเกตได้ที่สร้างขึ้น -
E:\pyrx>python testrx.py
Got - Hello
Job Done!