RxPY - การทำงานกับหัวเรื่อง

เรื่องเป็นลำดับที่สังเกตได้เช่นเดียวกับผู้สังเกตการณ์ที่สามารถหลายผู้รับกล่าวคือพูดคุยกับผู้สังเกตการณ์หลายคนที่สมัครเป็นสมาชิก

เราจะพูดถึงหัวข้อต่อไปนี้ในหัวข้อ -

  • สร้างหัวเรื่อง
  • สมัครรับเรื่อง
  • การส่งผ่านข้อมูลไปยังหัวเรื่อง
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

สร้างหัวเรื่อง

ในการทำงานกับหัวเรื่องเราจำเป็นต้องนำเข้าหัวเรื่องตามที่แสดงด้านล่าง -

from rx.subject import Subject

คุณสามารถสร้าง subject-object ได้ดังนี้ -

subject_test = Subject()

วัตถุคือผู้สังเกตการณ์ซึ่งมีสามวิธี -

  • on_next(value)
  • on_error (ข้อผิดพลาด) และ
  • on_completed()

สมัครรับเรื่อง

คุณสามารถสร้างการสมัครสมาชิกได้หลายหัวข้อตามที่แสดงด้านล่าง -

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

การส่งผ่านข้อมูลไปยังหัวเรื่อง

คุณสามารถส่งข้อมูลไปยังเรื่องที่สร้างขึ้นโดยใช้วิธี on_next (ค่า) ดังที่แสดงด้านล่าง -

subject_test.on_next("A")
subject_test.on_next("B")

ข้อมูลจะถูกส่งต่อไปยังการสมัครสมาชิกทั้งหมดเพิ่มในหัวข้อ

นี่คือตัวอย่างการทำงานของหัวเรื่อง

ตัวอย่าง

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

วัตถุ subject_test ถูกสร้างขึ้นโดยการเรียก Subject () ออบเจ็กต์ subject_test มีการอ้างอิงถึง on_next (value), on_error (error) และ on_completed () เมธอด ผลลัพธ์ของตัวอย่างข้างต้นแสดงไว้ด้านล่าง -

เอาต์พุต

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

เราสามารถใช้เมธอด on_completed () เพื่อหยุดการดำเนินเรื่องดังที่แสดงด้านล่าง

ตัวอย่าง

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

เมื่อเราเรียกเสร็จสิ้นจะไม่มีการเรียกใช้วิธีการถัดไปที่เรียกในภายหลัง

เอาต์พุต

E:\pyrx>python testrx.py
The value is A
The value is A

ตอนนี้ให้เราดูวิธีการเรียก on_error (error) method

ตัวอย่าง

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

เอาต์พุต

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

พฤติกรรม

BehaviorSubject จะให้ค่าล่าสุดเมื่อเรียก คุณสามารถสร้างเรื่องพฤติกรรมได้ดังที่แสดงด้านล่าง -

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

นี่คือตัวอย่างการทำงานในการใช้ Behavior Subject

ตัวอย่าง

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

เอาต์พุต

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

เล่นซ้ำเรื่อง

การเล่นซ้ำจะคล้ายกับเรื่องพฤติกรรมโดยที่มันสามารถบัฟเฟอร์ค่าและเล่นซ้ำกับสมาชิกใหม่ได้ นี่คือตัวอย่างการทำงานของหัวข้อการเล่นซ้ำ

ตัวอย่าง

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

ค่าบัฟเฟอร์ที่ใช้คือ 2 ในหัวข้อการเล่นซ้ำ ดังนั้นสองค่าสุดท้ายจะถูกบัฟเฟอร์และใช้สำหรับสมาชิกใหม่ที่เรียกว่า

เอาต์พุต

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

ในกรณีของ AsyncSubject ค่าสุดท้ายที่เรียกจะถูกส่งผ่านไปยังผู้สมัครสมาชิกและจะกระทำหลังจากที่เรียกใช้เมธอด complete () เท่านั้น

ตัวอย่าง

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

เอาต์พุต

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2