Julia Multithreading nicht skaliert für peinlich parallele Arbeit

Nov 20 2020

Der folgende Code berechnet die durchschnittliche Anzahl der Ziehungen, um 50 eindeutige Karten aus mehreren Sätzen zu erhalten. Wichtig ist nur, dass dieses Problem nicht viel RAM erfordert und beim Starten im Multithreading-Modus keine Variablen gemeinsam nutzt. Beim Start mit vier mehr als einem Thread zur Durchführung von 400.000 Simulationen dauert es ungefähr eine Sekunde länger als bei zwei Prozessen, die zusammen gestartet und 200.000 Simulationen durchgeführt werden. Das hat mich gestört und ich konnte keine Erklärung finden.

Dies ist der Julia-Code in epic_draw_multi_thread.jl:

using Random
using Printf
import Base.Threads.@spawn

function pickone(dist)
    n = length(dist)
    i = 1
    r = rand()
    while r >= dist[i] && i<n 
        i+=1
    end
    return i
end  

function init_items(type_dist, unique_elements)
    return zeros(Int32, length(type_dist), maximum(unique_elements))
end

function draw(type_dist, unique_elements_dist)
    item_type = pickone(type_dist)
    item_number = pickone(unique_elements_dist[item_type])
    return item_type, item_number
end

function draw_unique(type_dist, unique_elements_dist, items, x)
    while sum(items .> 0) < x
        item_type, item_number = draw(type_dist, unique_elements_dist)
        items[item_type, item_number] += 1
    end
    return sum(items)
end

function average_for_unique(type_dist, unique_elements_dist, x, n, reset=true)
    println(@sprintf("Started average_for_unique on thread %d with n = %d", Threads.threadid(), n))
    items = init_items(type_dist, unique_elements)

    tot_draws = 0
    for i in 1:n
        tot_draws += draw_unique(type_dist, unique_elements_dist, items, x)
        if reset
            items .= 0
        else
            items[items.>1] -= 1
        end
    end

    println(@sprintf("Completed average_for_unique on thread %d with n = %d", Threads.threadid(), n))
    return tot_draws / n
end

function parallel_average_for_unique(type_dist, unique_elements_dist, x, n, reset=true)
    println("Started computing...")
    t = max(Threads.nthreads() - 1, 1)
    m = Int32(round(n / t))
    tasks = Array{Task}(undef, t)
    @sync for i in 1:t
        task = @spawn average_for_unique(type_dist, unique_elements_dist, x, m)
        tasks[i] = task
    end
    sum(fetch(t) for t in tasks) / t
end
    
type_dist = [0.3, 0.3, 0.2, 0.15, 0.05]
const cum_type_dist = cumsum(type_dist)

unique_elements = [21, 27, 32, 14, 10]
unique_elements_dist = [[1 / unique_elements[j] for i in 1:unique_elements[j]] for j in 1:length(unique_elements)]
const cum_unique_elements_dist = [cumsum(dist) for dist in unique_elements_dist]

str_n = ARGS[1]
n = parse(Int64, str_n)
avg = parallel_average_for_unique(cum_type_dist, cum_unique_elements_dist, 50, n)
print(avg)

Dies ist der Befehl, der an der Shell ausgegeben wird, um auf zwei Threads zusammen mit den Ausgabe- und Timing-Ergebnissen ausgeführt zu werden:

time julia --threads 3 epic_draw_multi_thread.jl 400000
Started computing...
Started average_for_unique on thread 3 with n = 200000
Started average_for_unique on thread 2 with n = 200000
Completed average_for_unique on thread 2 with n = 200000
Completed average_for_unique on thread 3 with n = 200000
70.44460749999999
real    0m14.347s
user    0m26.959s
sys     0m2.124s

Dies ist der Befehl, der in der Shell ausgegeben wird, um zwei Prozesse mit jeweils der halben Jobgröße zusammen mit den Ausgabe- und Timing-Ergebnissen auszuführen:

time julia --threads 1 epic_draw_multi_thread.jl 200000 &
time julia --threads 1 epic_draw_multi_thread.jl 200000 &
Started computing...
Started computing...
Started average_for_unique on thread 1 with n = 200000
Started average_for_unique on thread 1 with n = 200000
Completed average_for_unique on thread 1 with n = 200000
Completed average_for_unique on thread 1 with n = 200000
70.434375
real    0m12.919s
user    0m12.688s
sys     0m0.300s
70.448695
real    0m12.996s
user    0m12.790s
sys     0m0.308s

Egal wie oft ich das Experiment wiederhole, der Multithread-Modus wird immer langsamer. Anmerkungen:

  1. Ich habe parallelen Code erstellt, um den Wert von PI zu approximieren, und hatte nicht das gleiche Problem. In diesem Code wird jedoch nichts angezeigt, was zu Konflikten zwischen Threads führen könnte, die zu Langsamkeit führen.
  2. Wenn ich mit mehr als einem Thread angefangen habe, verwende ich die Anzahl der Threads minus eins, um die Ziehungen durchzuführen. Andernfalls scheint der letzte Thread zu hängen. Diese Anweisung t = max(Threads.nthreads() - 1, 1)kann geändert werden, t = Threads.nthreads()um die genaue Anzahl der verfügbaren Threads zu verwenden.

BEARBEITEN am 20.11.2020

Umsetzung der Empfehlungen von Przemyslaw Szufel. Dies ist der neue Code:

using Random
using Printf
import Base.Threads.@spawn
using BenchmarkTools

function pickone(dist, mt)
    n = length(dist)
    i = 1
    r = rand(mt)
    while r >= dist[i] && i<n 
        i+=1
    end
    return i
end  

function init_items(type_dist, unique_elements)
    return zeros(Int32, length(type_dist), maximum(unique_elements))
end

function draw(type_dist, unique_elements_dist, mt)
    item_type = pickone(type_dist, mt)
    item_number = pickone(unique_elements_dist[item_type], mt)
    return item_type, item_number
end

function draw_unique(type_dist, unique_elements_dist, items, x, mt)
    while sum(items .> 0) < x
        item_type, item_number = draw(type_dist, unique_elements_dist, mt)
        items[item_type, item_number] += 1
    end
    return sum(items)
end

function average_for_unique(type_dist, unique_elements_dist, x, n, mt, reset=true)
    println(@sprintf("Started average_for_unique on thread %d with n = %d", Threads.threadid(), n))
    items = init_items(type_dist, unique_elements)

    tot_draws = 0
    for i in 1:n
        tot_draws += draw_unique(type_dist, unique_elements_dist, items, x, mt)
        if reset
            items .= 0
        else
            items[items.>1] -= 1
        end
    end

    println(@sprintf("Completed average_for_unique on thread %d with n = %d", Threads.threadid(), n))
    return tot_draws / n
end

function parallel_average_for_unique(type_dist, unique_elements_dist, x, n, reset=true)
    println("Started computing...")
    t = max(Threads.nthreads() - 1, 1)
    mts = MersenneTwister.(1:t)
    m = Int32(round(n / t))
    tasks = Array{Task}(undef, t)
    @sync for i in 1:t
        task = @spawn average_for_unique(type_dist, unique_elements_dist, x, m, mts[i])
        tasks[i] = task
    end
    sum(fetch(t) for t in tasks) / t
end
    
type_dist = [0.3, 0.3, 0.2, 0.15, 0.05]
const cum_type_dist = cumsum(type_dist)

unique_elements = [21, 27, 32, 14, 10]
unique_elements_dist = [[1 / unique_elements[j] for i in 1:unique_elements[j]] for j in 1:length(unique_elements)]
const cum_unique_elements_dist = [cumsum(dist) for dist in unique_elements_dist]

str_n = ARGS[1]
n = parse(Int64, str_n)
avg = @btime parallel_average_for_unique(cum_type_dist, cum_unique_elements_dist, 50, n)
print(avg)
    

Aktualisierte Benchmarks:

Threads          @btime     Linux Time       
1 (2 processes)  9.927 s    0m44.871s 
2 (1 process)   20.237 s    1m14.156s
3 (1 process)   14.302 s    1m2.114s

Antworten

5 PrzemyslawSzufel Nov 20 2020 at 02:35

Hier gibt es zwei Probleme:

  1. Sie messen die Leistung nicht richtig
  2. Wenn Sie Zufallszahlen in Threads generieren, sollten Sie MersenneTwisterfür jeden Thread einen eigenen Zufallsstatus haben, um die beste Leistung zu erzielen (andernfalls wird Ihr Zufallsstatus für alle Threads freigegeben und es muss eine Synchronisierung stattfinden).

Derzeit messen Sie die Zeit von "Julia Startzeit" + "Code-Kompilierungszeit" + "Laufzeit". Das Kompilieren eines Multithread-Codes dauert offensichtlich länger als das Kompilieren eines Single-Thread-Codes. Und Julia selbst zu starten dauert auch ein oder zwei Sekunden.

Sie haben hier zwei Möglichkeiten. Am einfachsten ist es, ein BenchmarkTools @btimeMakro zu verwenden, um die Ausführungszeiten im Code zu messen. Eine andere Möglichkeit wäre, Ihren Code in ein Paket zu verwandeln und ihn über PackageCompiler in ein Julia-Image zu kompilieren . Sie messen jedoch weiterhin "Julia-Startzeit" + "Julia-Ausführungszeit"

Der Zufallszahlenstatus kann wie folgt erstellt werden:

mts = MersenneTwister.(1:Threads.nthreads());

und dann verwendet wie rand(mts[Threads.threadid()])