RxPY-스케줄러를 사용한 동시성
RxPy의 중요한 기능 중 하나는 동시성입니다. 즉, 태스크를 병렬로 실행할 수 있습니다. 이를 수행하기 위해 구독 된 작업의 실행을 결정하는 스케줄러와 함께 작동하는 두 개의 연산자 subscribe_on () 및 observe_on ()이 있습니다.
다음은 subscibe_on (), observe_on () 및 스케줄러의 필요성을 보여주는 작업 예제입니다.
예
import random
import time
import rx
from rx import operators as ops
def adding_delay(value):
time.sleep(random.randint(5, 20) * 0.1)
return value
# Task 1
rx.of(1,2,3,4,5).pipe(
ops.map(lambda a: adding_delay(a))
).subscribe(
lambda s: print("From Task 1: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a))
).subscribe(
lambda s: print("From Task 2: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 2 complete")
)
input("Press any key to exit\n")
위의 예에는 태스크 1과 태스크 2의 두 가지 태스크가 있습니다. 태스크 실행은 순서대로 진행됩니다. 두 번째 작업은 첫 번째 작업이 완료 될 때만 시작됩니다.
산출
E:\pyrx>python testrx.py
From Task 1: 1
From Task 1: 2
From Task 1: 3
From Task 1: 4
From Task 1: 5
Task 1 complete
From Task 2: 1
From Task 2: 2
From Task 2: 3
From Task 2: 4
Task 2 complete
RxPy는 많은 스케줄러를 지원하며 여기서는 ThreadPoolScheduler를 사용할 것입니다. ThreadPoolScheduler는 주로 사용 가능한 CPU 스레드로 관리하려고합니다.
이 예제에서 우리는 이전에 보았지만 cpu_count를 제공하는 다중 처리 모듈을 사용할 것입니다. 카운트는 사용 가능한 스레드를 기반으로 병렬로 작업을 수행하도록 관리 할 ThreadPoolScheduler에 제공됩니다.
여기에 작동 예가 있습니다-
import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
time.sleep(random.randint(5, 20) * 0.1)
return value
# Task 1
rx.of(1,2,3,4,5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 1: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 2: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 2 complete")
)
input("Press any key to exit\n")
위의 예에서 2 개의 작업이 있고 cpu_count는 4입니다. 작업은 2이고 사용 가능한 스레드는 4이므로 두 작업을 병렬로 시작할 수 있습니다.
산출
E:\pyrx>python testrx.py
Cpu count is : 4
Press any key to exit
From Task 1: 1
From Task 2: 1
From Task 1: 2
From Task 2: 2
From Task 2: 3
From Task 1: 3
From Task 2: 4
Task 2 complete
From Task 1: 4
From Task 1: 5
Task 1 complete
출력이 표시되면 두 작업이 병렬로 시작된 것입니다.
이제 작업이 CPU 개수보다 많은 경우 (예 : CPU 개수가 4 개이고 작업이 5 개) 시나리오를 생각해보십시오.이 경우 작업 완료 후 스레드가 비어 있는지 확인해야합니다. 대기열에서 사용할 수있는 새 작업에 할당됩니다.
이를 위해 스레드가 비어있는 경우 스케줄러를 관찰하는 observe_on () 연산자를 사용할 수 있습니다. 다음은 observe_on ()을 사용한 작업 예제입니다.
예
import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
time.sleep(random.randint(5, 20) * 0.1)
return value
# Task 1
rx.of(1,2,3,4,5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 1: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 2: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 2 complete")
)
#Task 3
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 3: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 3 complete")
)
#Task 4
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 4: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 4 complete")
)
#Task 5
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.observe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 5: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 5 complete")
)
input("Press any key to exit\n")
산출
E:\pyrx>python testrx.py
Cpu count is : 4
From Task 4: 1
From Task 4: 2
From Task 1: 1
From Task 2: 1
From Task 3: 1
From Task 1: 2
From Task 3: 2
From Task 4: 3
From Task 3: 3
From Task 2: 2
From Task 1: 3
From Task 4: 4
Task 4 complete
From Task 5: 1
From Task 5: 2
From Task 5: 3
From Task 3: 4
Task 3 complete
From Task 2: 3
Press any key to exit
From Task 5: 4
Task 5 complete
From Task 1: 4
From Task 2: 4
Task 2 complete
From Task 1: 5
Task 1 complete
출력을 보면 작업 4가 완료되는 순간 스레드가 다음 작업 즉, 작업 5에 주어지고 동일한 작업이 실행되기 시작합니다.