RxPY - शेड्यूलर का उपयोग कर संगामिति
RxPy की एक महत्वपूर्ण विशेषता समवर्ती है, अर्थात कार्य को समानांतर में निष्पादित करने की अनुमति देना। ऐसा करने के लिए, हमारे पास दो ऑपरेटर्स subscribe_on () और obs_on () हैं, जो किसी शेड्यूलर के साथ काम करेंगे, जो सब्स्क्राइब्ड टास्क के निष्पादन का निर्णय करेगा।
यहाँ, एक कार्यशील उदाहरण है, जो subscibe_on (), obs_on () और अनुसूचक की आवश्यकता को दर्शाता है।
उदाहरण
import random
import time
import rx
from rx import operators as ops
def adding_delay(value):
time.sleep(random.randint(5, 20) * 0.1)
return value
# Task 1
rx.of(1,2,3,4,5).pipe(
ops.map(lambda a: adding_delay(a))
).subscribe(
lambda s: print("From Task 1: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a))
).subscribe(
lambda s: print("From Task 2: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 2 complete")
)
input("Press any key to exit\n")
उपरोक्त उदाहरण में, मेरे पास 2 कार्य हैं: कार्य 1 और कार्य 2। कार्य का निष्पादन अनुक्रम में है। दूसरा कार्य तभी शुरू होता है, जब पहला कार्य किया जाता है।
उत्पादन
E:\pyrx>python testrx.py
From Task 1: 1
From Task 1: 2
From Task 1: 3
From Task 1: 4
From Task 1: 5
Task 1 complete
From Task 2: 1
From Task 2: 2
From Task 2: 3
From Task 2: 4
Task 2 complete
RxPy कई शेड्यूलर का समर्थन करता है, और यहां, हम थ्रेडपूलसुशलर का उपयोग करने जा रहे हैं। ThreadPoolScheduler मुख्य रूप से उपलब्ध CPU थ्रेड्स के साथ प्रबंधन करने का प्रयास करेगा।
उदाहरण में, हमने पहले देखा है, हम एक मल्टीप्रोसेसिंग मॉड्यूल का उपयोग करने जा रहे हैं जो हमें cpu_count देगा। गणना थ्रेडपूलसकिलर को दी जाएगी जो उपलब्ध थ्रेड्स के आधार पर समानांतर में काम करने के लिए प्रबंधन करेगा।
यहाँ, एक कार्य उदाहरण है -
import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
time.sleep(random.randint(5, 20) * 0.1)
return value
# Task 1
rx.of(1,2,3,4,5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 1: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 2: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 2 complete")
)
input("Press any key to exit\n")
उपरोक्त उदाहरण में, मेरे पास 2 कार्य हैं और cpu_count 4 है। चूंकि, कार्य 2 है और हमारे साथ उपलब्ध धागे 4 हैं, दोनों कार्य समानांतर में शुरू हो सकते हैं।
उत्पादन
E:\pyrx>python testrx.py
Cpu count is : 4
Press any key to exit
From Task 1: 1
From Task 2: 1
From Task 1: 2
From Task 2: 2
From Task 2: 3
From Task 1: 3
From Task 2: 4
Task 2 complete
From Task 1: 4
From Task 1: 5
Task 1 complete
यदि आप आउटपुट देखते हैं, तो दोनों कार्य समानांतर में शुरू हो गए हैं।
अब, एक परिदृश्य पर विचार करें, जहां कार्य सीपीयू गणना से अधिक है अर्थात सीपीयू की संख्या 4 है और कार्य 5 हैं। इस मामले में, हमें यह जांचने की आवश्यकता है कि क्या कार्य पूरा होने के बाद कोई धागा मुक्त हो गया है, ताकि, यह हो सके कतार में उपलब्ध नए कार्य को सौंपा।
इस प्रयोजन के लिए, हम अवलोकन_ का उपयोग कर सकते हैं () ऑपरेटर जो अनुसूचक का निरीक्षण करेगा यदि कोई थ्रेड मुक्त है। यहां, अवलोकन_ () का उपयोग करके एक कार्यशील उदाहरण है
उदाहरण
import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
time.sleep(random.randint(5, 20) * 0.1)
return value
# Task 1
rx.of(1,2,3,4,5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 1: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 2: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 2 complete")
)
#Task 3
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 3: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 3 complete")
)
#Task 4
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 4: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 4 complete")
)
#Task 5
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.observe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 5: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 5 complete")
)
input("Press any key to exit\n")
उत्पादन
E:\pyrx>python testrx.py
Cpu count is : 4
From Task 4: 1
From Task 4: 2
From Task 1: 1
From Task 2: 1
From Task 3: 1
From Task 1: 2
From Task 3: 2
From Task 4: 3
From Task 3: 3
From Task 2: 2
From Task 1: 3
From Task 4: 4
Task 4 complete
From Task 5: 1
From Task 5: 2
From Task 5: 3
From Task 3: 4
Task 3 complete
From Task 2: 3
Press any key to exit
From Task 5: 4
Task 5 complete
From Task 1: 4
From Task 2: 4
Task 2 complete
From Task 1: 5
Task 1 complete
यदि आप आउटपुट को देखते हैं, तो पल 4 कार्य पूरा हो जाता है, थ्रेड अगले कार्य को दिया जाता है अर्थात, कार्य 5 और वही निष्पादित करना शुरू करता है।