작업자가 Python 다중 처리로 통신하여 문제에 대한 최상의 솔루션을 찾는 방법은 무엇입니까?

Nov 22 2020

내 단순화 된 문제

숫자 목록을 3 개의 개별 목록으로 나눈 후 평균 제품을 반환하는 함수를 만들었습니다.

예를 들면 :

Input array 'arr' = [1,2,3,4,5,6,7,8,9]

Example partition: [1,5,6],[2,3,9],[4,7,8]

Example objective: mean([1 x 5 x 6],[2 x 3 x 9],[4 x 7 x 8]) = 102.67

내 목표-직원들이 최상의 솔루션을 위해 경쟁하고 소통하도록합니다.

이제이 함수를 병렬로 실행하려고합니다 (현재는 작업자 2 명만), 10 초마다 작업자 가 파티션 (가장 높은 목표)을 서로 공유하고 다음 10 초 동안 시작점으로 사용하도록합니다. , 최적의 결과가 시간이 지남에 따라 개선 될 때까지 계속됩니다. 이 최상의 결과는 update_partition으로 컴퓨팅 함수에 전달됩니다 .

작업자가 결과를 전달하도록하는 방법을 잘 모르겠습니다. 이에 대한 도움을 주시면 감사하겠습니다.

멀티 프로세싱을 처음 접하기 때문에 큐, 관리자, 풀 등을 사용하여 솔루션을 개선하기위한 조언을 주시면 감사하겠습니다.

내 시도-커뮤니케이션 제외

# Competing and communicating workers

from multiprocessing import Process
import random
import numpy as np
import sys

# Sub functions used in the compute function
def partition(arr, n):
    random.shuffle(arr)
    return [np.array(arr[i::n]) for i in range(n)]

def average(partitionList):
    return np.mean([np.prod(i) for i in partitionList]), partitionList

def swap(A,B,i,j):
    b_temp = B[j].copy()
    B[j] = A[i]
    A[i] = b_temp
    return A,B

# Main function - this just shuffles one element from each group of the array at a time to try and maximise the objective
def compute(message,arr,r,update_partition = 'Default'):

    if update_partition != 'Default':
        current_partition = update_partition
    else:    
        current_partition = partition(arr, r)
        
    current_partition = partition(arr, r)
    obj_prev = average(current_partition)[0]
    print('\n [%s] Initial objective: %.2f | Arrays: %s' % (message,obj_prev,current_partition))

    while True:
        for i in range(3):
            randPosOne = np.random.randint(3)
            randPosTwo = np.random.randint(3)

            if i != 2:
                swap(current_partition[i],current_partition[i+1],randPosOne,randPosTwo)
            else:
                swap(current_partition[i-2],current_partition[i],randPosOne,randPosTwo)

            obj = average(current_partition)[0]

            if obj > obj_prev:
                obj_prev = obj
                store = average(current_partition)[1]
                print('\n [%s] Current objective: %.2f | Arrays: %s' % (message,obj,store))

            else:
                obj = obj_prev
                if i != 2:
                    swap(current_partition[i],current_partition[i+1],randPosOne,randPosTwo)
                else:
                    swap(current_partition[i-2],current_partition[i],randPosOne,randPosTwo)
                    

if __name__ == '__main__':
    
    # This is just an arbitray array of random numbers used as an input
    arr = random.sample(range(10, 50), 12)
    
    # This represents how many groups we would like to make out of the arr list
    r = 3 #int(sys.argv[1])
    
    first = Process(target=compute, args=("Worker 1", arr,r))
    first.start()
    second = Process(target=compute, args=("Worker 2", arr,r))
    second.start()


답변

1 Booboo Nov 22 2020 at 21:08

이 솔루션은 문제를 해결하기 위해 서로 통신하는 여러 프로세스에 대한 것이 아니기 때문에 반드시 만족 스럽지는 않습니다. 하지만 문제를 해결하기위한 최선의 접근 방식은 그렇게해야한다고 생각하지 않습니다.

내 첫 번째 관찰은 파티션을 생성하기 위해 랜덤 셔플을 사용하는 것은 파티션 내의 요소 순서를 제외하고 본질적으로 동일한 파티션을 생성하여 동일한 제품과 평균을 생성하기 때문에 이상적이지 않다는 것입니다. 아래 코드는 어휘 적으로 정렬 된 별개의 파티션을 생성하고 임의 크기의 프로세스 풀을 사용하여 각 파티션의 평균을 계산합니다. 따라서 문제를 해결하기 위해 원하는만큼의 프로세스를 사용할 수 있습니다 (최대 프로세서 수까지). 9 개 요소의 배열 크기에 대해 요소를 각각 3 개 요소의 3 개 튜플로 분할하는 방법은 280 개뿐입니다. 그러나이 숫자는 요소 수가 증가함에 따라 빠르게 증가합니다. 배열 크기가 12 개 요소 (각각 4 개 요소의 3 개 튜플)의 경우 파티션 수는 5775가됩니다. 단점은 generate_tuples중복 파티션을 제거 하는 데있어 함수 가 더 많은 비용이 든다는 것입니다 (정렬로 인해).

다음 코드는 최대 평균을 생성하는 분할을 찾습니다.

from itertools import permutations
import random
import multiprocessing
from statistics import mean
from math import prod

def generate_tuples(arr):
    slice_size = len(arr) // 3
    s = set()
    cnt = 0
    for p in permutations(arr):
        t = tuple(sorted([tuple(sorted(p[0:slice_size])), tuple(sorted(p[slice_size:slice_size*2])), tuple(sorted(p[slice_size*2:slice_size*3]))]))
        if t not in s:
            yield t
            s.add(t)
            cnt += 1
    print('Total partitions = ', cnt)



def compute(t):
    return t, mean(prod(x) for x in t)


def main():
    with multiprocessing.Pool(6) as pool:
        arr = random.sample(range(10, 50), 12) # count should be divisible by 3
        print('arr =', arr)
        # chunksize should be approximately: size_of_iterable / (pool_size * 4):
        results = pool.imap(compute, generate_tuples(arr), chunksize=241)
        max_t = None
        max_mean = 0
        for t, m in results:
            if m > max_mean:
                max_mean = m
                max_t = t
        print(max_t, max_mean)


if __name__ == '__main__':
    main()

인쇄물:

arr = [25, 37, 38, 11, 44, 24, 36, 35, 26, 23, 49, 10]
Total partitions =  5775
((10, 11, 23, 24), (25, 26, 35, 36), (37, 38, 44, 49)) 1303685.3333333333

최신 정보

다음은 다중 처리를 사용하려고 할 때 유용한 정보가 될 수 있습니다.

첫 번째 방법은 관리되는 공유 목록을 사용 합니다. 이 관리 목록의 장점은 액세스가 자동으로 직렬화되므로 수행되는 작업의 복잡성에 따라 목록에 액세스하는 프로세스가 명시 적으로 잠금을 수행 할 필요가 없다는 것입니다. 그리고 공유 목록 인스턴스를 작업자 함수에 인수로 전달하는 것보다 프로세스 풀이 생성 될 때 공유 목록을 전역에 할당하여 각 프로세스를 한 번 초기화하는 것이 더 편리한 경우가 많습니다.

import multiprocessing

def pool_initializer(the_list):
    global arr

    arr = the_list


def reverse():
    arr = arr[::-1]


if __name__ == __main__: # required for Windows
    with multiprocessing.Manger() as manager:
        arr = manager.list(random.sample(range(10, 50), 12))
        with Pool(initializer=pool_initializer, initargs=(arr,) as pool:
            pool.apply(reverse)
        print(arr)

단점은 arr실제로 실제 공유 메모리에 대한 프록시이므로 두 번째 옵션을 사용하는 것보다 액세스 속도가 느릴 수 있습니다 muliprocessing.Array. 두 프로세스가 동일한 요소를 수정하지 않는 한 잠금에 대해 걱정할 필요가 없습니다. 그렇지 않으면 공유 가능한 Lock인스턴스 를 만들고 필요할 때 어레이에 대한 액세스를 직렬화해야합니다. [https://stackoverflow.com/questions/39122270/multiprocessing-shared-array]를 참조하세요.