Wie können Mitarbeiter in Python Multiprocessing kommunizieren, um die beste Lösung für das Problem zu finden?
Mein vereinfachtes Problem
Ich habe eine Funktion erstellt, die das durchschnittliche Produkt zurückgibt, nachdem eine Liste von Zahlen in drei verschiedene Listen aufgeteilt wurde.
Beispielsweise:
Input array 'arr' = [1,2,3,4,5,6,7,8,9]
Example partition: [1,5,6],[2,3,9],[4,7,8]
Example objective: mean([1 x 5 x 6],[2 x 3 x 9],[4 x 7 x 8]) = 102.67
Mein Ziel - Mitarbeiter dazu bringen, um die beste Lösung zu konkurrieren und zu kommunizieren
Ich versuche jetzt, diese Funktion parallel auszuführen (derzeit nur 2 Worker), sodass die Worker alle 10 Sekunden ihre Partition (mit dem höchsten Ziel) miteinander teilen und sie als Ausgangspunkt für die nächsten 10 Sekunden verwenden und so weiter, bis sich das optimale Ergebnis mit der Zeit verbessert. Dieses beste Ergebnis wird als update_partition an die Rechenfunktion übergeben .
Ich bin mir nicht sicher, wie ich die Mitarbeiter dazu bringen soll, ihre Ergebnisse zu kommunizieren.
Da ich neu in der Mehrfachverarbeitung bin, würde ich mich auch über Ratschläge zur Verbesserung meiner Lösung freuen - z. B. über die Verwendung einer Warteschlange, eines Managers, eines Pools usw.
Mein Versuch - ohne Kommunikation
# Competing and communicating workers
from multiprocessing import Process
import random
import numpy as np
import sys
# Sub functions used in the compute function
def partition(arr, n):
random.shuffle(arr)
return [np.array(arr[i::n]) for i in range(n)]
def average(partitionList):
return np.mean([np.prod(i) for i in partitionList]), partitionList
def swap(A,B,i,j):
b_temp = B[j].copy()
B[j] = A[i]
A[i] = b_temp
return A,B
# Main function - this just shuffles one element from each group of the array at a time to try and maximise the objective
def compute(message,arr,r,update_partition = 'Default'):
if update_partition != 'Default':
current_partition = update_partition
else:
current_partition = partition(arr, r)
current_partition = partition(arr, r)
obj_prev = average(current_partition)[0]
print('\n [%s] Initial objective: %.2f | Arrays: %s' % (message,obj_prev,current_partition))
while True:
for i in range(3):
randPosOne = np.random.randint(3)
randPosTwo = np.random.randint(3)
if i != 2:
swap(current_partition[i],current_partition[i+1],randPosOne,randPosTwo)
else:
swap(current_partition[i-2],current_partition[i],randPosOne,randPosTwo)
obj = average(current_partition)[0]
if obj > obj_prev:
obj_prev = obj
store = average(current_partition)[1]
print('\n [%s] Current objective: %.2f | Arrays: %s' % (message,obj,store))
else:
obj = obj_prev
if i != 2:
swap(current_partition[i],current_partition[i+1],randPosOne,randPosTwo)
else:
swap(current_partition[i-2],current_partition[i],randPosOne,randPosTwo)
if __name__ == '__main__':
# This is just an arbitray array of random numbers used as an input
arr = random.sample(range(10, 50), 12)
# This represents how many groups we would like to make out of the arr list
r = 3 #int(sys.argv[1])
first = Process(target=compute, args=("Worker 1", arr,r))
first.start()
second = Process(target=compute, args=("Worker 2", arr,r))
second.start()
Antworten
Dies wird Sie nicht unbedingt zufriedenstellen, da es bei dieser Lösung nicht um mehrere Prozesse geht, die miteinander kommunizieren, um das Problem zu lösen. Aber dann glaube ich nicht, dass der beste Ansatz zur Lösung des Problems dies erfordert.
Meine erste Beobachtung ist, dass die Verwendung eines zufälligen Shuffle zum Generieren der Partitionen nicht ideal ist, da Partitionen generiert werden, die bis auf die Reihenfolge der Elemente innerhalb der Partition im Wesentlichen identisch sind und somit zu denselben Produkten und Mittelwerten führen. Der folgende Code generiert unterschiedliche, lexikalisch geordnete Partitionen und verwendet einen Prozesspool beliebiger Größe, um den Mittelwert für jede Partition zu berechnen. Sie können also beliebig viele Prozesse verwenden (bis zur Anzahl der Prozessoren), um das Problem zu lösen. Für eine Arraygröße von 9 Elementen gibt es nur 280 Möglichkeiten, die Elemente in 3 Tupel mit jeweils 3 Elementen zu unterteilen. Diese Anzahl wächst jedoch schnell, wenn die Anzahl der Elemente zunimmt. Bei einer Arraygröße von 12 Elementen (3 Tupel mit jeweils 4 Elementen) beträgt die Anzahl der Partitionen 5775. Der Nachteil ist, dass die Funktion generate_tuples
(aufgrund der Sortierung) teurer ist, um redundante Partitionen zu eliminieren.
Der folgende Code findet die Partitionierung, die den maximalen Mittelwert ergibt:
from itertools import permutations
import random
import multiprocessing
from statistics import mean
from math import prod
def generate_tuples(arr):
slice_size = len(arr) // 3
s = set()
cnt = 0
for p in permutations(arr):
t = tuple(sorted([tuple(sorted(p[0:slice_size])), tuple(sorted(p[slice_size:slice_size*2])), tuple(sorted(p[slice_size*2:slice_size*3]))]))
if t not in s:
yield t
s.add(t)
cnt += 1
print('Total partitions = ', cnt)
def compute(t):
return t, mean(prod(x) for x in t)
def main():
with multiprocessing.Pool(6) as pool:
arr = random.sample(range(10, 50), 12) # count should be divisible by 3
print('arr =', arr)
# chunksize should be approximately: size_of_iterable / (pool_size * 4):
results = pool.imap(compute, generate_tuples(arr), chunksize=241)
max_t = None
max_mean = 0
for t, m in results:
if m > max_mean:
max_mean = m
max_t = t
print(max_t, max_mean)
if __name__ == '__main__':
main()
Drucke:
arr = [25, 37, 38, 11, 44, 24, 36, 35, 26, 23, 49, 10]
Total partitions = 5775
((10, 11, 23, 24), (25, 26, 35, 36), (37, 38, 44, 49)) 1303685.3333333333
Aktualisieren
Die folgenden Informationen sind möglicherweise nützlich, wenn Sie versuchen, Multiprocessing zu verwenden.
Der erste Ansatz verwendet eine verwaltete freigegebene Liste. Der Vorteil dieser verwalteten Liste besteht darin, dass der Zugriff automatisch serialisiert wird, sodass Prozesse, die auf die Liste zugreifen, abhängig von der Komplexität der ausgeführten Vorgänge keine explizite Sperrung durchführen müssen. Anstatt die Instanz der gemeinsam genutzten Liste als Argument an Ihre Worker-Funktion (en) zu übergeben, ist es häufig bequemer, jeden Prozess einmal zu initialisieren, indem Sie die gemeinsam genutzte Liste beim Erstellen des Prozesspools einer globalen Liste zuweisen:
import multiprocessing
def pool_initializer(the_list):
global arr
arr = the_list
def reverse():
arr = arr[::-1]
if __name__ == __main__: # required for Windows
with multiprocessing.Manger() as manager:
arr = manager.list(random.sample(range(10, 50), 12))
with Pool(initializer=pool_initializer, initargs=(arr,) as pool:
pool.apply(reverse)
print(arr)
Der Nachteil ist, dass arr
es sich tatsächlich um einen Proxy für den tatsächlichen gemeinsam genutzten Speicher handelt und der Zugriff daher langsamer sein kann als bei Verwendung der zweiten Option, bei der a verwendet wird muliprocessing.Array. Solange keine zwei Prozesse versuchen, dasselbe Element zu ändern, müssen Sie sich keine Gedanken über das Sperren machen. Andernfalls müssen Sie eine gemeinsam nutzbare Lock
Instanz erstellen und bei Bedarf den Zugriff auf das Array serialisieren. Siehe [https://stackoverflow.com/questions/39122270/multiprocessing-shared-array].