작업자가 Python 다중 처리로 통신하여 문제에 대한 최상의 솔루션을 찾는 방법은 무엇입니까?
내 단순화 된 문제
숫자 목록을 3 개의 개별 목록으로 나눈 후 평균 제품을 반환하는 함수를 만들었습니다.
예를 들면 :
Input array 'arr' = [1,2,3,4,5,6,7,8,9]
Example partition: [1,5,6],[2,3,9],[4,7,8]
Example objective: mean([1 x 5 x 6],[2 x 3 x 9],[4 x 7 x 8]) = 102.67
내 목표-직원들이 최상의 솔루션을 위해 경쟁하고 소통하도록합니다.
이제이 함수를 병렬로 실행하려고합니다 (현재는 작업자 2 명만), 10 초마다 작업자 가 파티션 (가장 높은 목표)을 서로 공유하고 다음 10 초 동안 시작점으로 사용하도록합니다. , 최적의 결과가 시간이 지남에 따라 개선 될 때까지 계속됩니다. 이 최상의 결과는 update_partition으로 컴퓨팅 함수에 전달됩니다 .
작업자가 결과를 전달하도록하는 방법을 잘 모르겠습니다. 이에 대한 도움을 주시면 감사하겠습니다.
멀티 프로세싱을 처음 접하기 때문에 큐, 관리자, 풀 등을 사용하여 솔루션을 개선하기위한 조언을 주시면 감사하겠습니다.
내 시도-커뮤니케이션 제외
# Competing and communicating workers
from multiprocessing import Process
import random
import numpy as np
import sys
# Sub functions used in the compute function
def partition(arr, n):
random.shuffle(arr)
return [np.array(arr[i::n]) for i in range(n)]
def average(partitionList):
return np.mean([np.prod(i) for i in partitionList]), partitionList
def swap(A,B,i,j):
b_temp = B[j].copy()
B[j] = A[i]
A[i] = b_temp
return A,B
# Main function - this just shuffles one element from each group of the array at a time to try and maximise the objective
def compute(message,arr,r,update_partition = 'Default'):
if update_partition != 'Default':
current_partition = update_partition
else:
current_partition = partition(arr, r)
current_partition = partition(arr, r)
obj_prev = average(current_partition)[0]
print('\n [%s] Initial objective: %.2f | Arrays: %s' % (message,obj_prev,current_partition))
while True:
for i in range(3):
randPosOne = np.random.randint(3)
randPosTwo = np.random.randint(3)
if i != 2:
swap(current_partition[i],current_partition[i+1],randPosOne,randPosTwo)
else:
swap(current_partition[i-2],current_partition[i],randPosOne,randPosTwo)
obj = average(current_partition)[0]
if obj > obj_prev:
obj_prev = obj
store = average(current_partition)[1]
print('\n [%s] Current objective: %.2f | Arrays: %s' % (message,obj,store))
else:
obj = obj_prev
if i != 2:
swap(current_partition[i],current_partition[i+1],randPosOne,randPosTwo)
else:
swap(current_partition[i-2],current_partition[i],randPosOne,randPosTwo)
if __name__ == '__main__':
# This is just an arbitray array of random numbers used as an input
arr = random.sample(range(10, 50), 12)
# This represents how many groups we would like to make out of the arr list
r = 3 #int(sys.argv[1])
first = Process(target=compute, args=("Worker 1", arr,r))
first.start()
second = Process(target=compute, args=("Worker 2", arr,r))
second.start()
답변
이 솔루션은 문제를 해결하기 위해 서로 통신하는 여러 프로세스에 대한 것이 아니기 때문에 반드시 만족 스럽지는 않습니다. 하지만 문제를 해결하기위한 최선의 접근 방식은 그렇게해야한다고 생각하지 않습니다.
내 첫 번째 관찰은 파티션을 생성하기 위해 랜덤 셔플을 사용하는 것은 파티션 내의 요소 순서를 제외하고 본질적으로 동일한 파티션을 생성하여 동일한 제품과 평균을 생성하기 때문에 이상적이지 않다는 것입니다. 아래 코드는 어휘 적으로 정렬 된 별개의 파티션을 생성하고 임의 크기의 프로세스 풀을 사용하여 각 파티션의 평균을 계산합니다. 따라서 문제를 해결하기 위해 원하는만큼의 프로세스를 사용할 수 있습니다 (최대 프로세서 수까지). 9 개 요소의 배열 크기에 대해 요소를 각각 3 개 요소의 3 개 튜플로 분할하는 방법은 280 개뿐입니다. 그러나이 숫자는 요소 수가 증가함에 따라 빠르게 증가합니다. 배열 크기가 12 개 요소 (각각 4 개 요소의 3 개 튜플)의 경우 파티션 수는 5775가됩니다. 단점은 generate_tuples
중복 파티션을 제거 하는 데있어 함수 가 더 많은 비용이 든다는 것입니다 (정렬로 인해).
다음 코드는 최대 평균을 생성하는 분할을 찾습니다.
from itertools import permutations
import random
import multiprocessing
from statistics import mean
from math import prod
def generate_tuples(arr):
slice_size = len(arr) // 3
s = set()
cnt = 0
for p in permutations(arr):
t = tuple(sorted([tuple(sorted(p[0:slice_size])), tuple(sorted(p[slice_size:slice_size*2])), tuple(sorted(p[slice_size*2:slice_size*3]))]))
if t not in s:
yield t
s.add(t)
cnt += 1
print('Total partitions = ', cnt)
def compute(t):
return t, mean(prod(x) for x in t)
def main():
with multiprocessing.Pool(6) as pool:
arr = random.sample(range(10, 50), 12) # count should be divisible by 3
print('arr =', arr)
# chunksize should be approximately: size_of_iterable / (pool_size * 4):
results = pool.imap(compute, generate_tuples(arr), chunksize=241)
max_t = None
max_mean = 0
for t, m in results:
if m > max_mean:
max_mean = m
max_t = t
print(max_t, max_mean)
if __name__ == '__main__':
main()
인쇄물:
arr = [25, 37, 38, 11, 44, 24, 36, 35, 26, 23, 49, 10]
Total partitions = 5775
((10, 11, 23, 24), (25, 26, 35, 36), (37, 38, 44, 49)) 1303685.3333333333
최신 정보
다음은 다중 처리를 사용하려고 할 때 유용한 정보가 될 수 있습니다.
첫 번째 방법은 관리되는 공유 목록을 사용 합니다. 이 관리 목록의 장점은 액세스가 자동으로 직렬화되므로 수행되는 작업의 복잡성에 따라 목록에 액세스하는 프로세스가 명시 적으로 잠금을 수행 할 필요가 없다는 것입니다. 그리고 공유 목록 인스턴스를 작업자 함수에 인수로 전달하는 것보다 프로세스 풀이 생성 될 때 공유 목록을 전역에 할당하여 각 프로세스를 한 번 초기화하는 것이 더 편리한 경우가 많습니다.
import multiprocessing
def pool_initializer(the_list):
global arr
arr = the_list
def reverse():
arr = arr[::-1]
if __name__ == __main__: # required for Windows
with multiprocessing.Manger() as manager:
arr = manager.list(random.sample(range(10, 50), 12))
with Pool(initializer=pool_initializer, initargs=(arr,) as pool:
pool.apply(reverse)
print(arr)
단점은 arr
실제로 실제 공유 메모리에 대한 프록시이므로 두 번째 옵션을 사용하는 것보다 액세스 속도가 느릴 수 있습니다 muliprocessing.Array. 두 프로세스가 동일한 요소를 수정하지 않는 한 잠금에 대해 걱정할 필요가 없습니다. 그렇지 않으면 공유 가능한 Lock
인스턴스 를 만들고 필요할 때 어레이에 대한 액세스를 직렬화해야합니다. [https://stackoverflow.com/questions/39122270/multiprocessing-shared-array]를 참조하세요.