Comment faire en sorte que les travailleurs communiquent en multitraitement Python pour trouver la meilleure solution au problème?
Mon problème simplifié
J'ai créé une fonction qui renvoie le produit moyen après avoir divisé une liste de nombres en 3 listes distinctes.
Par example:
Input array 'arr' = [1,2,3,4,5,6,7,8,9]
Example partition: [1,5,6],[2,3,9],[4,7,8]
Example objective: mean([1 x 5 x 6],[2 x 3 x 9],[4 x 7 x 8]) = 102.67
Mon objectif - faire en sorte que les travailleurs se disputent la meilleure solution et communiquent
J'essaye maintenant d'exécuter cette fonction en parallèle (juste 2 ouvriers pour l'instant), de sorte qu'après toutes les 10 secondes, les ouvriers partagent leur partition (avec l'objectif le plus élevé) entre eux et l'utilisent comme point de départ pour les 10 prochaines secondes , et ainsi de suite jusqu'à ce que le résultat optimal s'améliore avec le temps. Ce meilleur résultat sera transmis à la fonction de calcul en tant que update_partition .
Je ne sais pas comment faire pour que les travailleurs communiquent leurs résultats, j'apprécierais donc de l'aide à ce sujet.
Comme je suis nouveau dans le multitraitement, j'apprécierais également tout conseil pour améliorer ma solution - par exemple en utilisant une file d'attente, un gestionnaire, un pool, etc.
Ma tentative - hors communication
# Competing and communicating workers
from multiprocessing import Process
import random
import numpy as np
import sys
# Sub functions used in the compute function
def partition(arr, n):
random.shuffle(arr)
return [np.array(arr[i::n]) for i in range(n)]
def average(partitionList):
return np.mean([np.prod(i) for i in partitionList]), partitionList
def swap(A,B,i,j):
b_temp = B[j].copy()
B[j] = A[i]
A[i] = b_temp
return A,B
# Main function - this just shuffles one element from each group of the array at a time to try and maximise the objective
def compute(message,arr,r,update_partition = 'Default'):
if update_partition != 'Default':
current_partition = update_partition
else:
current_partition = partition(arr, r)
current_partition = partition(arr, r)
obj_prev = average(current_partition)[0]
print('\n [%s] Initial objective: %.2f | Arrays: %s' % (message,obj_prev,current_partition))
while True:
for i in range(3):
randPosOne = np.random.randint(3)
randPosTwo = np.random.randint(3)
if i != 2:
swap(current_partition[i],current_partition[i+1],randPosOne,randPosTwo)
else:
swap(current_partition[i-2],current_partition[i],randPosOne,randPosTwo)
obj = average(current_partition)[0]
if obj > obj_prev:
obj_prev = obj
store = average(current_partition)[1]
print('\n [%s] Current objective: %.2f | Arrays: %s' % (message,obj,store))
else:
obj = obj_prev
if i != 2:
swap(current_partition[i],current_partition[i+1],randPosOne,randPosTwo)
else:
swap(current_partition[i-2],current_partition[i],randPosOne,randPosTwo)
if __name__ == '__main__':
# This is just an arbitray array of random numbers used as an input
arr = random.sample(range(10, 50), 12)
# This represents how many groups we would like to make out of the arr list
r = 3 #int(sys.argv[1])
first = Process(target=compute, args=("Worker 1", arr,r))
first.start()
second = Process(target=compute, args=("Worker 2", arr,r))
second.start()
Réponses
Cela ne vous satisfera pas forcément car cette solution ne concerne pas les multiples processus communiquant entre eux pour résoudre le problème. Mais je ne crois pas que la meilleure approche pour résoudre le problème exige qu'ils le fassent.
Ma première observation est que l'utilisation d'un mélange aléatoire pour générer les partitions est loin d'être idéale car elle générera des partitions qui sont essentiellement identiques à l'exception de l'ordre des éléments dans la partition et donc donnant lieu aux mêmes produits et à la même moyenne. Le code ci-dessous génère des partitions distinctes, ordonnées lexicalement et utilise un pool de processus de taille arbitraire pour calculer la moyenne de chaque partition. Ainsi, vous pouvez utiliser autant de processus que vous le souhaitez (jusqu'à concurrence du nombre de processeurs dont vous disposez) pour résoudre le problème). Pour une taille de tableau de 9 éléments, il n'y a que 280 façons possibles de partitionner les éléments en 3 tuples de 3 éléments chacun. Mais ce nombre augmente rapidement à mesure que le nombre d'éléments augmente. Pour une taille de tableau de 12 éléments (3 tuples de 4 éléments chacun), le nombre de partitions devient 5775. Le compromis est que la fonction generate_tuples
est plus coûteuse (en raison du tri qu'elle effectue) dans son effort pour éliminer les partitions redondantes.
Le code suivant recherche le partitionnement qui produit la moyenne maximale:
from itertools import permutations
import random
import multiprocessing
from statistics import mean
from math import prod
def generate_tuples(arr):
slice_size = len(arr) // 3
s = set()
cnt = 0
for p in permutations(arr):
t = tuple(sorted([tuple(sorted(p[0:slice_size])), tuple(sorted(p[slice_size:slice_size*2])), tuple(sorted(p[slice_size*2:slice_size*3]))]))
if t not in s:
yield t
s.add(t)
cnt += 1
print('Total partitions = ', cnt)
def compute(t):
return t, mean(prod(x) for x in t)
def main():
with multiprocessing.Pool(6) as pool:
arr = random.sample(range(10, 50), 12) # count should be divisible by 3
print('arr =', arr)
# chunksize should be approximately: size_of_iterable / (pool_size * 4):
results = pool.imap(compute, generate_tuples(arr), chunksize=241)
max_t = None
max_mean = 0
for t, m in results:
if m > max_mean:
max_mean = m
max_t = t
print(max_t, max_mean)
if __name__ == '__main__':
main()
Impressions:
arr = [25, 37, 38, 11, 44, 24, 36, 35, 26, 23, 49, 10]
Total partitions = 5775
((10, 11, 23, 24), (25, 26, 35, 36), (37, 38, 44, 49)) 1303685.3333333333
Mettre à jour
Les informations suivantes peuvent être utiles lorsque vous essayez d'utiliser le multitraitement.
La première approche utilise une liste partagée gérée . L'avantage de cette liste gérée est que l'accès est automatiquement sérialisé afin que les processus accédant à la liste, en fonction de la complexité des opérations exécutées, n'aient pas à effectuer explicitement le verrouillage. Et plutôt que de transmettre l'instance de liste partagée en tant qu'argument à vos fonctions de travail, il est souvent plus pratique d'initialiser chaque processus une fois en affectant la liste partagée à un global lors de la création du pool de processus:
import multiprocessing
def pool_initializer(the_list):
global arr
arr = the_list
def reverse():
arr = arr[::-1]
if __name__ == __main__: # required for Windows
with multiprocessing.Manger() as manager:
arr = manager.list(random.sample(range(10, 50), 12))
with Pool(initializer=pool_initializer, initargs=(arr,) as pool:
pool.apply(reverse)
print(arr)
L'inconvénient est qu'il arr
s'agit en fait d'un proxy de la mémoire partagée réelle et que l'accès peut donc être plus lent que d'utiliser la deuxième option, qui utilise un muliprocessing.Array. Tant qu'aucun processus ne tente de modifier le même élément, vous n'avez pas à vous soucier du verrouillage. Sinon, vous devrez créer une Lock
instance partageable et sérialiser l'accès à la baie si nécessaire. Voir [https://stackoverflow.com/questions/39122270/multiprocessing-shared-array].