julia multi-threaded ne se met pas à l'échelle pour un travail parallèle embarrassant

Nov 20 2020

Le code suivant calcule le nombre moyen de tirages pour obtenir 50 cartes uniques de plusieurs ensembles. Tout ce qui est important, c'est que ce problème ne nécessite pas beaucoup de RAM et ne partage aucune variable lorsqu'il est lancé en mode multi-threading. Lorsqu'il est lancé avec quatre threads de plus pour effectuer 400 000 simulations, il faut systématiquement environ une seconde de plus que deux processus lancés ensemble et exécutant 200 000 simulations. Cela m'a dérangé et je n'ai trouvé aucune explication.

Voici le code Julia dans epic_draw_multi_thread.jl:

using Random
using Printf
import Base.Threads.@spawn

function pickone(dist)
    n = length(dist)
    i = 1
    r = rand()
    while r >= dist[i] && i<n 
        i+=1
    end
    return i
end  

function init_items(type_dist, unique_elements)
    return zeros(Int32, length(type_dist), maximum(unique_elements))
end

function draw(type_dist, unique_elements_dist)
    item_type = pickone(type_dist)
    item_number = pickone(unique_elements_dist[item_type])
    return item_type, item_number
end

function draw_unique(type_dist, unique_elements_dist, items, x)
    while sum(items .> 0) < x
        item_type, item_number = draw(type_dist, unique_elements_dist)
        items[item_type, item_number] += 1
    end
    return sum(items)
end

function average_for_unique(type_dist, unique_elements_dist, x, n, reset=true)
    println(@sprintf("Started average_for_unique on thread %d with n = %d", Threads.threadid(), n))
    items = init_items(type_dist, unique_elements)

    tot_draws = 0
    for i in 1:n
        tot_draws += draw_unique(type_dist, unique_elements_dist, items, x)
        if reset
            items .= 0
        else
            items[items.>1] -= 1
        end
    end

    println(@sprintf("Completed average_for_unique on thread %d with n = %d", Threads.threadid(), n))
    return tot_draws / n
end

function parallel_average_for_unique(type_dist, unique_elements_dist, x, n, reset=true)
    println("Started computing...")
    t = max(Threads.nthreads() - 1, 1)
    m = Int32(round(n / t))
    tasks = Array{Task}(undef, t)
    @sync for i in 1:t
        task = @spawn average_for_unique(type_dist, unique_elements_dist, x, m)
        tasks[i] = task
    end
    sum(fetch(t) for t in tasks) / t
end
    
type_dist = [0.3, 0.3, 0.2, 0.15, 0.05]
const cum_type_dist = cumsum(type_dist)

unique_elements = [21, 27, 32, 14, 10]
unique_elements_dist = [[1 / unique_elements[j] for i in 1:unique_elements[j]] for j in 1:length(unique_elements)]
const cum_unique_elements_dist = [cumsum(dist) for dist in unique_elements_dist]

str_n = ARGS[1]
n = parse(Int64, str_n)
avg = parallel_average_for_unique(cum_type_dist, cum_unique_elements_dist, 50, n)
print(avg)

C'est la commande émise au niveau du shell pour s'exécuter sur deux threads avec les résultats de sortie et de synchronisation:

time julia --threads 3 epic_draw_multi_thread.jl 400000
Started computing...
Started average_for_unique on thread 3 with n = 200000
Started average_for_unique on thread 2 with n = 200000
Completed average_for_unique on thread 2 with n = 200000
Completed average_for_unique on thread 3 with n = 200000
70.44460749999999
real    0m14.347s
user    0m26.959s
sys     0m2.124s

Voici la commande émise au niveau du shell pour exécuter deux processus avec la moitié de la taille du travail chacun avec les résultats de sortie et de synchronisation:

time julia --threads 1 epic_draw_multi_thread.jl 200000 &
time julia --threads 1 epic_draw_multi_thread.jl 200000 &
Started computing...
Started computing...
Started average_for_unique on thread 1 with n = 200000
Started average_for_unique on thread 1 with n = 200000
Completed average_for_unique on thread 1 with n = 200000
Completed average_for_unique on thread 1 with n = 200000
70.434375
real    0m12.919s
user    0m12.688s
sys     0m0.300s
70.448695
real    0m12.996s
user    0m12.790s
sys     0m0.308s

Peu importe le nombre de fois que je répète l'expérience, j'obtiens toujours le mode multi-thread plus lent. Remarques:

  1. J'ai créé du code parallèle pour approximer la valeur de PI et je n'ai pas rencontré le même problème. Cependant, je ne vois rien dans ce code qui pourrait provoquer un conflit entre les threads entraînant de la lenteur.
  2. Lorsque j'ai commencé avec plus d'un thread, j'utilise le nombre de threads moins un pour effectuer les tirages. A défaut, le dernier fil semble s'accrocher. Cette instruction t = max(Threads.nthreads() - 1, 1)peut être modifiée pour t = Threads.nthreads()utiliser le nombre exact de threads disponibles.

MODIFIER le 20/11/2020

Mise en œuvre des recommandations de Przemyslaw Szufel. Voici le nouveau code:

using Random
using Printf
import Base.Threads.@spawn
using BenchmarkTools

function pickone(dist, mt)
    n = length(dist)
    i = 1
    r = rand(mt)
    while r >= dist[i] && i<n 
        i+=1
    end
    return i
end  

function init_items(type_dist, unique_elements)
    return zeros(Int32, length(type_dist), maximum(unique_elements))
end

function draw(type_dist, unique_elements_dist, mt)
    item_type = pickone(type_dist, mt)
    item_number = pickone(unique_elements_dist[item_type], mt)
    return item_type, item_number
end

function draw_unique(type_dist, unique_elements_dist, items, x, mt)
    while sum(items .> 0) < x
        item_type, item_number = draw(type_dist, unique_elements_dist, mt)
        items[item_type, item_number] += 1
    end
    return sum(items)
end

function average_for_unique(type_dist, unique_elements_dist, x, n, mt, reset=true)
    println(@sprintf("Started average_for_unique on thread %d with n = %d", Threads.threadid(), n))
    items = init_items(type_dist, unique_elements)

    tot_draws = 0
    for i in 1:n
        tot_draws += draw_unique(type_dist, unique_elements_dist, items, x, mt)
        if reset
            items .= 0
        else
            items[items.>1] -= 1
        end
    end

    println(@sprintf("Completed average_for_unique on thread %d with n = %d", Threads.threadid(), n))
    return tot_draws / n
end

function parallel_average_for_unique(type_dist, unique_elements_dist, x, n, reset=true)
    println("Started computing...")
    t = max(Threads.nthreads() - 1, 1)
    mts = MersenneTwister.(1:t)
    m = Int32(round(n / t))
    tasks = Array{Task}(undef, t)
    @sync for i in 1:t
        task = @spawn average_for_unique(type_dist, unique_elements_dist, x, m, mts[i])
        tasks[i] = task
    end
    sum(fetch(t) for t in tasks) / t
end
    
type_dist = [0.3, 0.3, 0.2, 0.15, 0.05]
const cum_type_dist = cumsum(type_dist)

unique_elements = [21, 27, 32, 14, 10]
unique_elements_dist = [[1 / unique_elements[j] for i in 1:unique_elements[j]] for j in 1:length(unique_elements)]
const cum_unique_elements_dist = [cumsum(dist) for dist in unique_elements_dist]

str_n = ARGS[1]
n = parse(Int64, str_n)
avg = @btime parallel_average_for_unique(cum_type_dist, cum_unique_elements_dist, 50, n)
print(avg)
    

Benchmarks mis à jour:

Threads          @btime     Linux Time       
1 (2 processes)  9.927 s    0m44.871s 
2 (1 process)   20.237 s    1m14.156s
3 (1 process)   14.302 s    1m2.114s

Réponses

5 PrzemyslawSzufel Nov 20 2020 at 02:35

Ici, nous avons deux problèmes:

  1. Vous ne mesurez pas correctement les performances
  2. Lors de la génération de nombres aléatoires dans les threads, vous devez avoir un MersenneTwisterétat aléatoire distinct pour chaque thread pour les meilleures performances (sinon votre état aléatoire est partagé entre tous les threads et la synchronisation doit se produire)

Actuellement, vous mesurez le temps de "Julia start time" + "code compile time" + "runtime". La compilation d'un code multi-thread prend évidemment plus de temps que la compilation d'un code mono-thread. Et démarrer Julia lui-même prend également une seconde ou deux.

Vous avez deux options ici. Le plus simple est d'utiliser une BenchmarkTools @btimemacro pour mesurer les temps d'exécution à l'intérieur du code. Une autre option serait de transformer votre code en package et de le compiler dans une image Julia via PackageCompiler . Cependant, vous mesurerez toujours "Julia start time" + "Julia execution time"

L'état de nombre aléatoire peut être créé comme suit:

mts = MersenneTwister.(1:Threads.nthreads());

puis utilisé comme rand(mts[Threads.threadid()])