問題の最善の解決策を見つけるために、Pythonマルチプロセッシングでワーカーを通信させる方法は?

Nov 22 2020

私の単純化された問題

数値のリストを3つの異なるリストに分割した後、平均的な積を返す関数を作成しました。

例えば:

Input array 'arr' = [1,2,3,4,5,6,7,8,9]

Example partition: [1,5,6],[2,3,9],[4,7,8]

Example objective: mean([1 x 5 x 6],[2 x 3 x 9],[4 x 7 x 8]) = 102.67

私の目標-労働者に最良の解決策を求めて競争させ、コミュニケーションをとらせる

私は現在、この関数を並行して実行しようとしています(今のところ2人のワーカーのみ)。これにより、10秒ごとに、ワーカーは(最高の目的で)パーティションを互いに共有し、次の10秒間の開始点として使用します。 、など、最適な結果が時間の経過とともに改善されるまで続きます。この最良の結果は、update_partitionとして計算関数に渡されます。

労働者に彼らの結果を伝える方法がわからないので、これについていくらかの助けをいただければ幸いです。

私はマルチプロセッシングに慣れていないので、ソリューションを改善するためのアドバイスもいただければ幸いです。たとえば、キュ​​ー、マネージャー、プールなどを使用します。

私の試み-コミュニケーションを除く

# Competing and communicating workers

from multiprocessing import Process
import random
import numpy as np
import sys

# Sub functions used in the compute function
def partition(arr, n):
    random.shuffle(arr)
    return [np.array(arr[i::n]) for i in range(n)]

def average(partitionList):
    return np.mean([np.prod(i) for i in partitionList]), partitionList

def swap(A,B,i,j):
    b_temp = B[j].copy()
    B[j] = A[i]
    A[i] = b_temp
    return A,B

# Main function - this just shuffles one element from each group of the array at a time to try and maximise the objective
def compute(message,arr,r,update_partition = 'Default'):

    if update_partition != 'Default':
        current_partition = update_partition
    else:    
        current_partition = partition(arr, r)
        
    current_partition = partition(arr, r)
    obj_prev = average(current_partition)[0]
    print('\n [%s] Initial objective: %.2f | Arrays: %s' % (message,obj_prev,current_partition))

    while True:
        for i in range(3):
            randPosOne = np.random.randint(3)
            randPosTwo = np.random.randint(3)

            if i != 2:
                swap(current_partition[i],current_partition[i+1],randPosOne,randPosTwo)
            else:
                swap(current_partition[i-2],current_partition[i],randPosOne,randPosTwo)

            obj = average(current_partition)[0]

            if obj > obj_prev:
                obj_prev = obj
                store = average(current_partition)[1]
                print('\n [%s] Current objective: %.2f | Arrays: %s' % (message,obj,store))

            else:
                obj = obj_prev
                if i != 2:
                    swap(current_partition[i],current_partition[i+1],randPosOne,randPosTwo)
                else:
                    swap(current_partition[i-2],current_partition[i],randPosOne,randPosTwo)
                    

if __name__ == '__main__':
    
    # This is just an arbitray array of random numbers used as an input
    arr = random.sample(range(10, 50), 12)
    
    # This represents how many groups we would like to make out of the arr list
    r = 3 #int(sys.argv[1])
    
    first = Process(target=compute, args=("Worker 1", arr,r))
    first.start()
    second = Process(target=compute, args=("Worker 2", arr,r))
    second.start()


回答

1 Booboo Nov 22 2020 at 21:08

このソリューションは、問題を解決するために相互に通信する複数のプロセスに関するものではないため、これは必ずしも満足できるものではありません。しかし、問題を解決するための最善のアプローチでは、そうする必要があるとは思いません。

私の最初の観察では、ランダムシャッフルを使用してパーティションを生成すると、パーティション内の要素の順序を除いて本質的に同一のパーティションが生成され、同じ積と平均が生成されるため、理想的とは言えません。以下のコードは、字句的に順序付けられた個別のパーティションを生成し、任意のサイズのプロセスプールを使用して、各パーティションの平均を計算します。したがって、問題を解決するために、必要な数のプロセスを使用できます(プロセッサの数まで)。9要素の配列サイズの場合、要素をそれぞれ3要素の3タプルに分割する方法は280しかありません。しかし、この数は要素の数が増えるにつれて急速に増加します。配列サイズが12要素(それぞれ4要素の3タプル)の場合、パーティションの数は5775になります。トレードオフは、generate_tuples冗長なパーティションを排除するために、関数のコストが高くなることです(ソートが行われるため)。

次のコードは、最大平均を生成するパーティショニングを見つけます。

from itertools import permutations
import random
import multiprocessing
from statistics import mean
from math import prod

def generate_tuples(arr):
    slice_size = len(arr) // 3
    s = set()
    cnt = 0
    for p in permutations(arr):
        t = tuple(sorted([tuple(sorted(p[0:slice_size])), tuple(sorted(p[slice_size:slice_size*2])), tuple(sorted(p[slice_size*2:slice_size*3]))]))
        if t not in s:
            yield t
            s.add(t)
            cnt += 1
    print('Total partitions = ', cnt)



def compute(t):
    return t, mean(prod(x) for x in t)


def main():
    with multiprocessing.Pool(6) as pool:
        arr = random.sample(range(10, 50), 12) # count should be divisible by 3
        print('arr =', arr)
        # chunksize should be approximately: size_of_iterable / (pool_size * 4):
        results = pool.imap(compute, generate_tuples(arr), chunksize=241)
        max_t = None
        max_mean = 0
        for t, m in results:
            if m > max_mean:
                max_mean = m
                max_t = t
        print(max_t, max_mean)


if __name__ == '__main__':
    main()

プリント:

arr = [25, 37, 38, 11, 44, 24, 36, 35, 26, 23, 49, 10]
Total partitions =  5775
((10, 11, 23, 24), (25, 26, 35, 36), (37, 38, 44, 49)) 1303685.3333333333

更新

以下は、マルチプロセッシングを使用しようとするときに役立つ情報です。

最初のアプローチでは、管理された共有リストを使用します。この管理対象リストの利点は、アクセスが自動的にシリアル化されるため、実行される操作の複雑さに応じて、リストにアクセスするプロセスが明示的にロックを実行する必要がないことです。また、共有リストインスタンスを引数としてワーカー関数に渡すよりも、プロセスプールの作成時に共有リストをグローバルに割り当てることで、各プロセスを1回初期化する方が便利な場合がよくあります。

import multiprocessing

def pool_initializer(the_list):
    global arr

    arr = the_list


def reverse():
    arr = arr[::-1]


if __name__ == __main__: # required for Windows
    with multiprocessing.Manger() as manager:
        arr = manager.list(random.sample(range(10, 50), 12))
        with Pool(initializer=pool_initializer, initargs=(arr,) as pool:
            pool.apply(reverse)
        print(arr)

欠点はarr、実際には実際の共有メモリへのプロキシであるため、を使用している2番目のオプションを使用するよりもアクセスが遅くなる可能性があることmuliprocessing.Arrayです。2つのプロセスが同じ要素を変更しようとしない限り、ロックについて心配する必要はありません。それ以外の場合は、共有可能なLockインスタンスを作成し、必要に応じてアレイへのアクセスをシリアル化する必要があります。[https://stackoverflow.com/questions/39122270/multiprocessing-shared-array]を参照してください。