julia multi-threaded ne se met pas à l'échelle pour un travail parallèle embarrassant
Le code suivant calcule le nombre moyen de tirages pour obtenir 50 cartes uniques de plusieurs ensembles. Tout ce qui est important, c'est que ce problème ne nécessite pas beaucoup de RAM et ne partage aucune variable lorsqu'il est lancé en mode multi-threading. Lorsqu'il est lancé avec quatre threads de plus pour effectuer 400 000 simulations, il faut systématiquement environ une seconde de plus que deux processus lancés ensemble et exécutant 200 000 simulations. Cela m'a dérangé et je n'ai trouvé aucune explication.
Voici le code Julia dans epic_draw_multi_thread.jl:
using Random
using Printf
import Base.Threads.@spawn
function pickone(dist)
n = length(dist)
i = 1
r = rand()
while r >= dist[i] && i<n
i+=1
end
return i
end
function init_items(type_dist, unique_elements)
return zeros(Int32, length(type_dist), maximum(unique_elements))
end
function draw(type_dist, unique_elements_dist)
item_type = pickone(type_dist)
item_number = pickone(unique_elements_dist[item_type])
return item_type, item_number
end
function draw_unique(type_dist, unique_elements_dist, items, x)
while sum(items .> 0) < x
item_type, item_number = draw(type_dist, unique_elements_dist)
items[item_type, item_number] += 1
end
return sum(items)
end
function average_for_unique(type_dist, unique_elements_dist, x, n, reset=true)
println(@sprintf("Started average_for_unique on thread %d with n = %d", Threads.threadid(), n))
items = init_items(type_dist, unique_elements)
tot_draws = 0
for i in 1:n
tot_draws += draw_unique(type_dist, unique_elements_dist, items, x)
if reset
items .= 0
else
items[items.>1] -= 1
end
end
println(@sprintf("Completed average_for_unique on thread %d with n = %d", Threads.threadid(), n))
return tot_draws / n
end
function parallel_average_for_unique(type_dist, unique_elements_dist, x, n, reset=true)
println("Started computing...")
t = max(Threads.nthreads() - 1, 1)
m = Int32(round(n / t))
tasks = Array{Task}(undef, t)
@sync for i in 1:t
task = @spawn average_for_unique(type_dist, unique_elements_dist, x, m)
tasks[i] = task
end
sum(fetch(t) for t in tasks) / t
end
type_dist = [0.3, 0.3, 0.2, 0.15, 0.05]
const cum_type_dist = cumsum(type_dist)
unique_elements = [21, 27, 32, 14, 10]
unique_elements_dist = [[1 / unique_elements[j] for i in 1:unique_elements[j]] for j in 1:length(unique_elements)]
const cum_unique_elements_dist = [cumsum(dist) for dist in unique_elements_dist]
str_n = ARGS[1]
n = parse(Int64, str_n)
avg = parallel_average_for_unique(cum_type_dist, cum_unique_elements_dist, 50, n)
print(avg)
C'est la commande émise au niveau du shell pour s'exécuter sur deux threads avec les résultats de sortie et de synchronisation:
time julia --threads 3 epic_draw_multi_thread.jl 400000
Started computing...
Started average_for_unique on thread 3 with n = 200000
Started average_for_unique on thread 2 with n = 200000
Completed average_for_unique on thread 2 with n = 200000
Completed average_for_unique on thread 3 with n = 200000
70.44460749999999
real 0m14.347s
user 0m26.959s
sys 0m2.124s
Voici la commande émise au niveau du shell pour exécuter deux processus avec la moitié de la taille du travail chacun avec les résultats de sortie et de synchronisation:
time julia --threads 1 epic_draw_multi_thread.jl 200000 &
time julia --threads 1 epic_draw_multi_thread.jl 200000 &
Started computing...
Started computing...
Started average_for_unique on thread 1 with n = 200000
Started average_for_unique on thread 1 with n = 200000
Completed average_for_unique on thread 1 with n = 200000
Completed average_for_unique on thread 1 with n = 200000
70.434375
real 0m12.919s
user 0m12.688s
sys 0m0.300s
70.448695
real 0m12.996s
user 0m12.790s
sys 0m0.308s
Peu importe le nombre de fois que je répète l'expérience, j'obtiens toujours le mode multi-thread plus lent. Remarques:
- J'ai créé du code parallèle pour approximer la valeur de PI et je n'ai pas rencontré le même problème. Cependant, je ne vois rien dans ce code qui pourrait provoquer un conflit entre les threads entraînant de la lenteur.
- Lorsque j'ai commencé avec plus d'un thread, j'utilise le nombre de threads moins un pour effectuer les tirages. A défaut, le dernier fil semble s'accrocher. Cette instruction
t = max(Threads.nthreads() - 1, 1)
peut être modifiée pourt = Threads.nthreads()
utiliser le nombre exact de threads disponibles.
MODIFIER le 20/11/2020
Mise en œuvre des recommandations de Przemyslaw Szufel. Voici le nouveau code:
using Random
using Printf
import Base.Threads.@spawn
using BenchmarkTools
function pickone(dist, mt)
n = length(dist)
i = 1
r = rand(mt)
while r >= dist[i] && i<n
i+=1
end
return i
end
function init_items(type_dist, unique_elements)
return zeros(Int32, length(type_dist), maximum(unique_elements))
end
function draw(type_dist, unique_elements_dist, mt)
item_type = pickone(type_dist, mt)
item_number = pickone(unique_elements_dist[item_type], mt)
return item_type, item_number
end
function draw_unique(type_dist, unique_elements_dist, items, x, mt)
while sum(items .> 0) < x
item_type, item_number = draw(type_dist, unique_elements_dist, mt)
items[item_type, item_number] += 1
end
return sum(items)
end
function average_for_unique(type_dist, unique_elements_dist, x, n, mt, reset=true)
println(@sprintf("Started average_for_unique on thread %d with n = %d", Threads.threadid(), n))
items = init_items(type_dist, unique_elements)
tot_draws = 0
for i in 1:n
tot_draws += draw_unique(type_dist, unique_elements_dist, items, x, mt)
if reset
items .= 0
else
items[items.>1] -= 1
end
end
println(@sprintf("Completed average_for_unique on thread %d with n = %d", Threads.threadid(), n))
return tot_draws / n
end
function parallel_average_for_unique(type_dist, unique_elements_dist, x, n, reset=true)
println("Started computing...")
t = max(Threads.nthreads() - 1, 1)
mts = MersenneTwister.(1:t)
m = Int32(round(n / t))
tasks = Array{Task}(undef, t)
@sync for i in 1:t
task = @spawn average_for_unique(type_dist, unique_elements_dist, x, m, mts[i])
tasks[i] = task
end
sum(fetch(t) for t in tasks) / t
end
type_dist = [0.3, 0.3, 0.2, 0.15, 0.05]
const cum_type_dist = cumsum(type_dist)
unique_elements = [21, 27, 32, 14, 10]
unique_elements_dist = [[1 / unique_elements[j] for i in 1:unique_elements[j]] for j in 1:length(unique_elements)]
const cum_unique_elements_dist = [cumsum(dist) for dist in unique_elements_dist]
str_n = ARGS[1]
n = parse(Int64, str_n)
avg = @btime parallel_average_for_unique(cum_type_dist, cum_unique_elements_dist, 50, n)
print(avg)
Benchmarks mis à jour:
Threads @btime Linux Time
1 (2 processes) 9.927 s 0m44.871s
2 (1 process) 20.237 s 1m14.156s
3 (1 process) 14.302 s 1m2.114s
Réponses
Ici, nous avons deux problèmes:
- Vous ne mesurez pas correctement les performances
- Lors de la génération de nombres aléatoires dans les threads, vous devez avoir un
MersenneTwister
état aléatoire distinct pour chaque thread pour les meilleures performances (sinon votre état aléatoire est partagé entre tous les threads et la synchronisation doit se produire)
Actuellement, vous mesurez le temps de "Julia start time" + "code compile time" + "runtime". La compilation d'un code multi-thread prend évidemment plus de temps que la compilation d'un code mono-thread. Et démarrer Julia lui-même prend également une seconde ou deux.
Vous avez deux options ici. Le plus simple est d'utiliser une BenchmarkTools
@btime
macro pour mesurer les temps d'exécution à l'intérieur du code. Une autre option serait de transformer votre code en package et de le compiler dans une image Julia via PackageCompiler . Cependant, vous mesurerez toujours "Julia start time" + "Julia execution time"
L'état de nombre aléatoire peut être créé comme suit:
mts = MersenneTwister.(1:Threads.nthreads());
puis utilisé comme rand(mts[Threads.threadid()])