RxPY - Concurrency menggunakan Scheduler
Salah satu fitur penting dari RxPy adalah konkurensi, yaitu memungkinkan tugas dijalankan secara paralel. Untuk mewujudkannya, kami memiliki dua operator subscribe_on () dan observ_on () yang akan bekerja dengan penjadwal, yang akan memutuskan pelaksanaan tugas berlangganan.
Di sini, adalah contoh kerja, yang menunjukkan kebutuhan untuk subscibe_on (), observ_on () dan penjadwal.
Contoh
import random
import time
import rx
from rx import operators as ops
def adding_delay(value):
time.sleep(random.randint(5, 20) * 0.1)
return value
# Task 1
rx.of(1,2,3,4,5).pipe(
ops.map(lambda a: adding_delay(a))
).subscribe(
lambda s: print("From Task 1: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a))
).subscribe(
lambda s: print("From Task 2: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 2 complete")
)
input("Press any key to exit\n")
Dalam contoh di atas, saya memiliki 2 tugas: Tugas 1 dan Tugas 2. Pelaksanaan tugas secara berurutan. Tugas kedua hanya dimulai, saat tugas pertama selesai.
Keluaran
E:\pyrx>python testrx.py
From Task 1: 1
From Task 1: 2
From Task 1: 3
From Task 1: 4
From Task 1: 5
Task 1 complete
From Task 2: 1
From Task 2: 2
From Task 2: 3
From Task 2: 4
Task 2 complete
RxPy mendukung banyak Scheduler, dan di sini, kita akan menggunakan ThreadPoolScheduler. ThreadPoolScheduler terutama akan mencoba mengelola dengan utas CPU yang tersedia.
Dalam contoh, kita telah melihat sebelumnya, kita akan menggunakan modul multiprosesing yang akan memberi kita cpu_count. Hitungannya akan diberikan ke ThreadPoolScheduler yang akan mengatur agar tugas bekerja secara paralel berdasarkan utas yang tersedia.
Di sini, adalah contoh yang berfungsi -
import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
time.sleep(random.randint(5, 20) * 0.1)
return value
# Task 1
rx.of(1,2,3,4,5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 1: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 2: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 2 complete")
)
input("Press any key to exit\n")
Dalam contoh di atas, saya memiliki 2 tugas dan cpu_count adalah 4. Karena, tugasnya adalah 2 dan utas yang tersedia bersama kami adalah 4, keduanya tugas dapat dimulai secara paralel.
Keluaran
E:\pyrx>python testrx.py
Cpu count is : 4
Press any key to exit
From Task 1: 1
From Task 2: 1
From Task 1: 2
From Task 2: 2
From Task 2: 3
From Task 1: 3
From Task 2: 4
Task 2 complete
From Task 1: 4
From Task 1: 5
Task 1 complete
Jika Anda melihat hasilnya, kedua tugas telah dimulai secara paralel.
Sekarang, pertimbangkan skenario, di mana tugas lebih dari jumlah CPU yaitu jumlah CPU 4 dan tugas 5. Dalam kasus ini, kita perlu memeriksa apakah ada utas yang bebas setelah tugas selesai, sehingga, itu bisa ditugaskan ke tugas baru yang tersedia dalam antrian.
Untuk tujuan ini, kita dapat menggunakan operator observ_on () yang akan mengamati penjadwal jika ada utas yang bebas. Di sini, adalah contoh kerja menggunakan observ_on ()
Contoh
import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
time.sleep(random.randint(5, 20) * 0.1)
return value
# Task 1
rx.of(1,2,3,4,5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 1: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 2: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 2 complete")
)
#Task 3
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 3: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 3 complete")
)
#Task 4
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 4: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 4 complete")
)
#Task 5
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.observe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 5: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 5 complete")
)
input("Press any key to exit\n")
Keluaran
E:\pyrx>python testrx.py
Cpu count is : 4
From Task 4: 1
From Task 4: 2
From Task 1: 1
From Task 2: 1
From Task 3: 1
From Task 1: 2
From Task 3: 2
From Task 4: 3
From Task 3: 3
From Task 2: 2
From Task 1: 3
From Task 4: 4
Task 4 complete
From Task 5: 1
From Task 5: 2
From Task 5: 3
From Task 3: 4
Task 3 complete
From Task 2: 3
Press any key to exit
From Task 5: 4
Task 5 complete
From Task 1: 4
From Task 2: 4
Task 2 complete
From Task 1: 5
Task 1 complete
Jika Anda melihat hasilnya, saat tugas 4 selesai, utas diberikan ke tugas berikutnya yaitu, tugas 5 dan tugas yang sama mulai dijalankan.