Julia Multithreading nicht skaliert für peinlich parallele Arbeit
Der folgende Code berechnet die durchschnittliche Anzahl der Ziehungen, um 50 eindeutige Karten aus mehreren Sätzen zu erhalten. Wichtig ist nur, dass dieses Problem nicht viel RAM erfordert und beim Starten im Multithreading-Modus keine Variablen gemeinsam nutzt. Beim Start mit vier mehr als einem Thread zur Durchführung von 400.000 Simulationen dauert es ungefähr eine Sekunde länger als bei zwei Prozessen, die zusammen gestartet und 200.000 Simulationen durchgeführt werden. Das hat mich gestört und ich konnte keine Erklärung finden.
Dies ist der Julia-Code in epic_draw_multi_thread.jl:
using Random
using Printf
import Base.Threads.@spawn
function pickone(dist)
n = length(dist)
i = 1
r = rand()
while r >= dist[i] && i<n
i+=1
end
return i
end
function init_items(type_dist, unique_elements)
return zeros(Int32, length(type_dist), maximum(unique_elements))
end
function draw(type_dist, unique_elements_dist)
item_type = pickone(type_dist)
item_number = pickone(unique_elements_dist[item_type])
return item_type, item_number
end
function draw_unique(type_dist, unique_elements_dist, items, x)
while sum(items .> 0) < x
item_type, item_number = draw(type_dist, unique_elements_dist)
items[item_type, item_number] += 1
end
return sum(items)
end
function average_for_unique(type_dist, unique_elements_dist, x, n, reset=true)
println(@sprintf("Started average_for_unique on thread %d with n = %d", Threads.threadid(), n))
items = init_items(type_dist, unique_elements)
tot_draws = 0
for i in 1:n
tot_draws += draw_unique(type_dist, unique_elements_dist, items, x)
if reset
items .= 0
else
items[items.>1] -= 1
end
end
println(@sprintf("Completed average_for_unique on thread %d with n = %d", Threads.threadid(), n))
return tot_draws / n
end
function parallel_average_for_unique(type_dist, unique_elements_dist, x, n, reset=true)
println("Started computing...")
t = max(Threads.nthreads() - 1, 1)
m = Int32(round(n / t))
tasks = Array{Task}(undef, t)
@sync for i in 1:t
task = @spawn average_for_unique(type_dist, unique_elements_dist, x, m)
tasks[i] = task
end
sum(fetch(t) for t in tasks) / t
end
type_dist = [0.3, 0.3, 0.2, 0.15, 0.05]
const cum_type_dist = cumsum(type_dist)
unique_elements = [21, 27, 32, 14, 10]
unique_elements_dist = [[1 / unique_elements[j] for i in 1:unique_elements[j]] for j in 1:length(unique_elements)]
const cum_unique_elements_dist = [cumsum(dist) for dist in unique_elements_dist]
str_n = ARGS[1]
n = parse(Int64, str_n)
avg = parallel_average_for_unique(cum_type_dist, cum_unique_elements_dist, 50, n)
print(avg)
Dies ist der Befehl, der an der Shell ausgegeben wird, um auf zwei Threads zusammen mit den Ausgabe- und Timing-Ergebnissen ausgeführt zu werden:
time julia --threads 3 epic_draw_multi_thread.jl 400000
Started computing...
Started average_for_unique on thread 3 with n = 200000
Started average_for_unique on thread 2 with n = 200000
Completed average_for_unique on thread 2 with n = 200000
Completed average_for_unique on thread 3 with n = 200000
70.44460749999999
real 0m14.347s
user 0m26.959s
sys 0m2.124s
Dies ist der Befehl, der in der Shell ausgegeben wird, um zwei Prozesse mit jeweils der halben Jobgröße zusammen mit den Ausgabe- und Timing-Ergebnissen auszuführen:
time julia --threads 1 epic_draw_multi_thread.jl 200000 &
time julia --threads 1 epic_draw_multi_thread.jl 200000 &
Started computing...
Started computing...
Started average_for_unique on thread 1 with n = 200000
Started average_for_unique on thread 1 with n = 200000
Completed average_for_unique on thread 1 with n = 200000
Completed average_for_unique on thread 1 with n = 200000
70.434375
real 0m12.919s
user 0m12.688s
sys 0m0.300s
70.448695
real 0m12.996s
user 0m12.790s
sys 0m0.308s
Egal wie oft ich das Experiment wiederhole, der Multithread-Modus wird immer langsamer. Anmerkungen:
- Ich habe parallelen Code erstellt, um den Wert von PI zu approximieren, und hatte nicht das gleiche Problem. In diesem Code wird jedoch nichts angezeigt, was zu Konflikten zwischen Threads führen könnte, die zu Langsamkeit führen.
- Wenn ich mit mehr als einem Thread angefangen habe, verwende ich die Anzahl der Threads minus eins, um die Ziehungen durchzuführen. Andernfalls scheint der letzte Thread zu hängen. Diese Anweisung
t = max(Threads.nthreads() - 1, 1)
kann geändert werden,t = Threads.nthreads()
um die genaue Anzahl der verfügbaren Threads zu verwenden.
BEARBEITEN am 20.11.2020
Umsetzung der Empfehlungen von Przemyslaw Szufel. Dies ist der neue Code:
using Random
using Printf
import Base.Threads.@spawn
using BenchmarkTools
function pickone(dist, mt)
n = length(dist)
i = 1
r = rand(mt)
while r >= dist[i] && i<n
i+=1
end
return i
end
function init_items(type_dist, unique_elements)
return zeros(Int32, length(type_dist), maximum(unique_elements))
end
function draw(type_dist, unique_elements_dist, mt)
item_type = pickone(type_dist, mt)
item_number = pickone(unique_elements_dist[item_type], mt)
return item_type, item_number
end
function draw_unique(type_dist, unique_elements_dist, items, x, mt)
while sum(items .> 0) < x
item_type, item_number = draw(type_dist, unique_elements_dist, mt)
items[item_type, item_number] += 1
end
return sum(items)
end
function average_for_unique(type_dist, unique_elements_dist, x, n, mt, reset=true)
println(@sprintf("Started average_for_unique on thread %d with n = %d", Threads.threadid(), n))
items = init_items(type_dist, unique_elements)
tot_draws = 0
for i in 1:n
tot_draws += draw_unique(type_dist, unique_elements_dist, items, x, mt)
if reset
items .= 0
else
items[items.>1] -= 1
end
end
println(@sprintf("Completed average_for_unique on thread %d with n = %d", Threads.threadid(), n))
return tot_draws / n
end
function parallel_average_for_unique(type_dist, unique_elements_dist, x, n, reset=true)
println("Started computing...")
t = max(Threads.nthreads() - 1, 1)
mts = MersenneTwister.(1:t)
m = Int32(round(n / t))
tasks = Array{Task}(undef, t)
@sync for i in 1:t
task = @spawn average_for_unique(type_dist, unique_elements_dist, x, m, mts[i])
tasks[i] = task
end
sum(fetch(t) for t in tasks) / t
end
type_dist = [0.3, 0.3, 0.2, 0.15, 0.05]
const cum_type_dist = cumsum(type_dist)
unique_elements = [21, 27, 32, 14, 10]
unique_elements_dist = [[1 / unique_elements[j] for i in 1:unique_elements[j]] for j in 1:length(unique_elements)]
const cum_unique_elements_dist = [cumsum(dist) for dist in unique_elements_dist]
str_n = ARGS[1]
n = parse(Int64, str_n)
avg = @btime parallel_average_for_unique(cum_type_dist, cum_unique_elements_dist, 50, n)
print(avg)
Aktualisierte Benchmarks:
Threads @btime Linux Time
1 (2 processes) 9.927 s 0m44.871s
2 (1 process) 20.237 s 1m14.156s
3 (1 process) 14.302 s 1m2.114s
Antworten
Hier gibt es zwei Probleme:
- Sie messen die Leistung nicht richtig
- Wenn Sie Zufallszahlen in Threads generieren, sollten Sie
MersenneTwister
für jeden Thread einen eigenen Zufallsstatus haben, um die beste Leistung zu erzielen (andernfalls wird Ihr Zufallsstatus für alle Threads freigegeben und es muss eine Synchronisierung stattfinden).
Derzeit messen Sie die Zeit von "Julia Startzeit" + "Code-Kompilierungszeit" + "Laufzeit". Das Kompilieren eines Multithread-Codes dauert offensichtlich länger als das Kompilieren eines Single-Thread-Codes. Und Julia selbst zu starten dauert auch ein oder zwei Sekunden.
Sie haben hier zwei Möglichkeiten. Am einfachsten ist es, ein BenchmarkTools
@btime
Makro zu verwenden, um die Ausführungszeiten im Code zu messen. Eine andere Möglichkeit wäre, Ihren Code in ein Paket zu verwandeln und ihn über PackageCompiler in ein Julia-Image zu kompilieren . Sie messen jedoch weiterhin "Julia-Startzeit" + "Julia-Ausführungszeit"
Der Zufallszahlenstatus kann wie folgt erstellt werden:
mts = MersenneTwister.(1:Threads.nthreads());
und dann verwendet wie rand(mts[Threads.threadid()])