RxPY - Bekerja dengan Subjek

Subjek adalah urutan yang dapat diamati, serta, pengamat yang dapat multicast, yaitu berbicara dengan banyak pengamat yang telah berlangganan.

Kami akan membahas topik berikut tentang subjek -

  • Buat subjek
  • Berlangganan ke subjek
  • Meneruskan data ke subjek
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

Buat subjek

Untuk bekerja dengan subjek, kita perlu mengimpor Subjek seperti yang ditunjukkan di bawah ini -

from rx.subject import Subject

Anda dapat membuat subjek-objek sebagai berikut -

subject_test = Subject()

Objek adalah pengamat yang memiliki tiga metode -

  • on_next(value)
  • on_error (kesalahan) dan
  • on_completed()

Berlangganan ke Subjek

Anda dapat membuat beberapa langganan pada subjek seperti yang ditunjukkan di bawah ini -

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

Meneruskan Data ke Subjek

Anda dapat mengirimkan data ke subjek yang dibuat menggunakan metode on_next (nilai) seperti yang ditunjukkan di bawah ini -

subject_test.on_next("A")
subject_test.on_next("B")

Data akan diteruskan ke semua langganan, ditambahkan pada subjek.

Di sini, adalah contoh kerja dari subjek tersebut.

Contoh

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

Objek subject_test dibuat dengan memanggil Subject (). Objek subject_test memiliki referensi ke metode on_next (nilai), on_error (error) dan on_completed (). Output dari contoh di atas ditunjukkan di bawah ini -

Keluaran

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

Kita bisa menggunakan metode on_completed (), untuk menghentikan eksekusi subjek seperti yang ditunjukkan di bawah ini.

Contoh

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

Setelah kami memanggil selesai, metode selanjutnya yang dipanggil nanti tidak dipanggil.

Keluaran

E:\pyrx>python testrx.py
The value is A
The value is A

Sekarang mari kita lihat, bagaimana memanggil metode on_error (error).

Contoh

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

Keluaran

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

BehaviorSubject

BehaviorSubject akan memberi Anda nilai terbaru saat dipanggil. Anda dapat membuat subjek perilaku seperti yang ditunjukkan di bawah ini -

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

Di sini, adalah contoh kerja untuk menggunakan Behavior Subject

Contoh

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

Keluaran

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

Putar Ulang Subjek

Subjek replay mirip dengan subjek perilaku, di mana subjek tersebut dapat menyangga nilai-nilai dan memutar ulang subjek yang sama ke pelanggan baru. Di sini, adalah contoh kerja subjek ulangan.

Contoh

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

Nilai buffer yang digunakan adalah 2 pada subjek ulangan. Jadi, dua nilai terakhir akan di-buffer dan digunakan untuk pelanggan baru yang dipanggil.

Keluaran

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

Dalam kasus AsyncSubject, nilai terakhir yang dipanggil diteruskan ke pelanggan, dan itu akan dilakukan hanya setelah metode complete () dipanggil.

Contoh

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

Keluaran

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2