RxPY - การทำงานกับ Observables

สังเกตได้คือฟังก์ชันที่สร้างผู้สังเกตการณ์และเชื่อมต่อกับแหล่งที่มาซึ่งคาดว่าจะมีค่าตัวอย่างเช่นการคลิกเหตุการณ์ของเมาส์จากองค์ประกอบ dom เป็นต้น

หัวข้อที่กล่าวถึงด้านล่างจะได้รับการศึกษาโดยละเอียดในบทนี้

  • สร้าง Observables

  • สมัครสมาชิกและดำเนินการตามที่สังเกตได้

สร้างสิ่งที่สังเกตได้

ในการสร้างสิ่งที่สังเกตได้เราจะใช้ create() วิธีการและส่งผ่านฟังก์ชันไปที่มีรายการต่อไปนี้

  • on_next() - ฟังก์ชั่นนี้จะถูกเรียกใช้เมื่อ Observable ปล่อยไอเท็ม

  • on_completed() - ฟังก์ชันนี้จะถูกเรียกใช้เมื่อ Observable เสร็จสมบูรณ์

  • on_error() - ฟังก์ชันนี้จะถูกเรียกใช้เมื่อเกิดข้อผิดพลาดบน Observable

ในการทำงานกับ create () วิธีการแรกนำเข้าวิธีการดังที่แสดงด้านล่าง -

from rx import create

นี่คือตัวอย่างการทำงานเพื่อสร้างสิ่งที่สังเกตได้ -

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error")
   observer.on_completed()
source = create(test_observable).

สมัครสมาชิกและดำเนินการตามที่สังเกตได้

ในการสมัครรับข้อมูลที่สังเกตได้เราจำเป็นต้องใช้ฟังก์ชัน subscribe () และส่งผ่านฟังก์ชันเรียกกลับ on_next, on_error และ on_completed

นี่คือตัวอย่างการทำงาน -

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

วิธี subscribe () ดูแลการเรียกใช้งานที่สังเกตได้ ฟังก์ชันการโทรกลับon_next, on_error และ on_completedจะต้องส่งต่อไปยังวิธีการสมัครสมาชิก เรียกวิธีสมัครสมาชิกในทางกลับกันเรียกใช้ฟังก์ชัน test_observable ()

ไม่บังคับให้ส่งผ่านฟังก์ชันเรียกกลับทั้งสามไปยังเมธอด subscribe () คุณสามารถส่งผ่านตามความต้องการของคุณใน on_next (), on_error () และ on_completed ()

ฟังก์ชัน lambda ใช้สำหรับ on_next, on_error และ on_completed จะใช้เวลาในการโต้แย้งและดำเนินการตามนิพจน์ที่กำหนด

นี่คือผลลัพธ์ของสิ่งที่สังเกตได้ที่สร้างขึ้น -

E:\pyrx>python testrx.py
Got - Hello
Job Done!