RxPY - Zamanlayıcı kullanarak Eşzamanlılık

RxPy'nin önemli bir özelliği eşzamanlılıktır, yani görevin paralel olarak yürütülmesine izin vermek. Bunun olmasını sağlamak için, abone olunan görevin yürütülmesine karar verecek bir zamanlayıcı ile birlikte çalışacak olan subscribe_on () ve observe_on () iki operatörümüz var.

Burada, subscibe_on (), observe_on () ve scheduler'a olan ihtiyacı gösteren çalışan bir örnek var.

Misal

import random
import time
import rx
from rx import operators as ops
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
) 
input("Press any key to exit\n")

Yukarıdaki örnekte 2 görevim var: Görev 1 ve Görev 2. Görevin yürütülmesi sırayla gerçekleşir. İkinci görev yalnızca ilk görev tamamlandığında başlar.

Çıktı

E:\pyrx>python testrx.py
From Task 1: 1
From Task 1: 2
From Task 1: 3
From Task 1: 4
From Task 1: 5
Task 1 complete
From Task 2: 1
From Task 2: 2
From Task 2: 3
From Task 2: 4
Task 2 complete

RxPy birçok Zamanlayıcıyı destekler ve burada, ThreadPoolScheduler'ı kullanacağız. ThreadPoolScheduler, esas olarak mevcut CPU iş parçacıklarıyla yönetmeye çalışacaktır.

Örnekte, daha önce gördüğümüz gibi, bize cpu_count verecek bir çoklu işlem modülünden yararlanacağız. Sayım, görevin mevcut iş parçacıklarına göre paralel olarak çalışmasını sağlayacak olan ThreadPoolScheduler'a verilecektir.

İşte çalışan bir örnek -

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
input("Press any key to exit\n")

Yukarıdaki örnekte, 2 görevim var ve cpu_count 4'tür. Görev 2 ve bizim için mevcut olan evreler 4 olduğundan, her iki görev de paralel olarak başlayabilir.

Çıktı

E:\pyrx>python testrx.py
Cpu count is : 4
Press any key to exit
From Task 1: 1
From Task 2: 1
From Task 1: 2
From Task 2: 2
From Task 2: 3
From Task 1: 3
From Task 2: 4
Task 2 complete
From Task 1: 4
From Task 1: 5
Task 1 complete

Çıkışı görürseniz, her iki görev de paralel olarak başlamıştır.

Şimdi, görevin CPU sayısından fazla olduğu, yani CPU sayısının 4 olduğu ve görevlerin 5 olduğu bir senaryo düşünün. Bu durumda, görev tamamlandıktan sonra herhangi bir iş parçacığının boş olup olmadığını kontrol etmemiz gerekir, böylece kuyrukta bulunan yeni göreve atanır.

Bu amaçla, herhangi bir evre boşsa zamanlayıcıyı gözlemleyecek observe_on () operatörünü kullanabiliriz. İşte observe_on () kullanan çalışan bir örnek.

Misal

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
#Task 3
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 3: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 3 complete")
)
#Task 4
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 4: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 4 complete")
)
#Task 5
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.observe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 5: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 5 complete")
)
input("Press any key to exit\n")

Çıktı

E:\pyrx>python testrx.py
Cpu count is : 4
From Task 4: 1
From Task 4: 2
From Task 1: 1
From Task 2: 1
From Task 3: 1
From Task 1: 2
From Task 3: 2
From Task 4: 3
From Task 3: 3
From Task 2: 2
From Task 1: 3
From Task 4: 4
Task 4 complete
From Task 5: 1
From Task 5: 2
From Task 5: 3
From Task 3: 4
Task 3 complete
From Task 2: 3
Press any key to exit
From Task 5: 4
Task 5 complete
From Task 1: 4
From Task 2: 4
Task 2 complete
From Task 1: 5
Task 1 complete

Çıkışı görürseniz, görev 4 tamamlanır, iş parçacığı bir sonraki göreve verilir, yani görev 5 ve aynısı yürütülmeye başlar.