줄리아 다중 스레드가 당황스럽게 병렬 작업을 위해 확장되지 않음
다음 코드는 여러 세트에서 50 개의 고유 카드를 얻기 위해 평균 뽑기 수를 계산합니다. 중요한 것은이 문제가 많은 RAM을 필요로하지 않으며 멀티 스레딩 모드로 시작할 때 변수를 공유하지 않는다는 것입니다. 400,000 개의 시뮬레이션을 수행하기 위해 4 개 이상의 스레드로 시작하면 두 개의 프로세스가 함께 시작되고 200,000 개의 시뮬레이션을 수행하는 것보다 약 1 초가 더 걸립니다. 이것은 나를 괴롭 혔고 나는 어떤 설명도 찾을 수 없었다.
다음은 epic_draw_multi_thread.jl의 Julia 코드입니다.
using Random
using Printf
import Base.Threads.@spawn
function pickone(dist)
n = length(dist)
i = 1
r = rand()
while r >= dist[i] && i<n
i+=1
end
return i
end
function init_items(type_dist, unique_elements)
return zeros(Int32, length(type_dist), maximum(unique_elements))
end
function draw(type_dist, unique_elements_dist)
item_type = pickone(type_dist)
item_number = pickone(unique_elements_dist[item_type])
return item_type, item_number
end
function draw_unique(type_dist, unique_elements_dist, items, x)
while sum(items .> 0) < x
item_type, item_number = draw(type_dist, unique_elements_dist)
items[item_type, item_number] += 1
end
return sum(items)
end
function average_for_unique(type_dist, unique_elements_dist, x, n, reset=true)
println(@sprintf("Started average_for_unique on thread %d with n = %d", Threads.threadid(), n))
items = init_items(type_dist, unique_elements)
tot_draws = 0
for i in 1:n
tot_draws += draw_unique(type_dist, unique_elements_dist, items, x)
if reset
items .= 0
else
items[items.>1] -= 1
end
end
println(@sprintf("Completed average_for_unique on thread %d with n = %d", Threads.threadid(), n))
return tot_draws / n
end
function parallel_average_for_unique(type_dist, unique_elements_dist, x, n, reset=true)
println("Started computing...")
t = max(Threads.nthreads() - 1, 1)
m = Int32(round(n / t))
tasks = Array{Task}(undef, t)
@sync for i in 1:t
task = @spawn average_for_unique(type_dist, unique_elements_dist, x, m)
tasks[i] = task
end
sum(fetch(t) for t in tasks) / t
end
type_dist = [0.3, 0.3, 0.2, 0.15, 0.05]
const cum_type_dist = cumsum(type_dist)
unique_elements = [21, 27, 32, 14, 10]
unique_elements_dist = [[1 / unique_elements[j] for i in 1:unique_elements[j]] for j in 1:length(unique_elements)]
const cum_unique_elements_dist = [cumsum(dist) for dist in unique_elements_dist]
str_n = ARGS[1]
n = parse(Int64, str_n)
avg = parallel_average_for_unique(cum_type_dist, cum_unique_elements_dist, 50, n)
print(avg)
이것은 출력 및 타이밍 결과와 함께 두 개의 스레드에서 실행하기 위해 쉘에서 실행되는 명령입니다.
time julia --threads 3 epic_draw_multi_thread.jl 400000
Started computing...
Started average_for_unique on thread 3 with n = 200000
Started average_for_unique on thread 2 with n = 200000
Completed average_for_unique on thread 2 with n = 200000
Completed average_for_unique on thread 3 with n = 200000
70.44460749999999
real 0m14.347s
user 0m26.959s
sys 0m2.124s
다음은 출력 및 타이밍 결과와 함께 작업 크기가 절반 인 두 개의 프로세스를 실행하기 위해 쉘에서 실행되는 명령입니다.
time julia --threads 1 epic_draw_multi_thread.jl 200000 &
time julia --threads 1 epic_draw_multi_thread.jl 200000 &
Started computing...
Started computing...
Started average_for_unique on thread 1 with n = 200000
Started average_for_unique on thread 1 with n = 200000
Completed average_for_unique on thread 1 with n = 200000
Completed average_for_unique on thread 1 with n = 200000
70.434375
real 0m12.919s
user 0m12.688s
sys 0m0.300s
70.448695
real 0m12.996s
user 0m12.790s
sys 0m0.308s
실험을 몇 번 반복해도 항상 멀티 스레드 모드가 느려집니다. 메모:
- PI 값을 근사화하기 위해 병렬 코드를 만들었지 만 동일한 문제가 발생하지 않았습니다. 그러나이 코드에서 스레드 간의 충돌을 유발하여 속도를 늦출 수있는 것은 보이지 않습니다.
- 두 개 이상의 스레드로 시작하면 스레드 수에서 1을 뺀 값을 사용하여 그리기를 수행합니다. 실패하면 마지막 스레드가 중단되는 것 같습니다. 이 문
t = max(Threads.nthreads() - 1, 1)
은t = Threads.nthreads()
사용 가능한 정확한 스레드 수를 사용 하도록 변경 될 수 있습니다.
2020 년 11 월 20 일 수정
Przemyslaw Szufel 권장 사항을 구현했습니다. 다음은 새로운 코드입니다.
using Random
using Printf
import Base.Threads.@spawn
using BenchmarkTools
function pickone(dist, mt)
n = length(dist)
i = 1
r = rand(mt)
while r >= dist[i] && i<n
i+=1
end
return i
end
function init_items(type_dist, unique_elements)
return zeros(Int32, length(type_dist), maximum(unique_elements))
end
function draw(type_dist, unique_elements_dist, mt)
item_type = pickone(type_dist, mt)
item_number = pickone(unique_elements_dist[item_type], mt)
return item_type, item_number
end
function draw_unique(type_dist, unique_elements_dist, items, x, mt)
while sum(items .> 0) < x
item_type, item_number = draw(type_dist, unique_elements_dist, mt)
items[item_type, item_number] += 1
end
return sum(items)
end
function average_for_unique(type_dist, unique_elements_dist, x, n, mt, reset=true)
println(@sprintf("Started average_for_unique on thread %d with n = %d", Threads.threadid(), n))
items = init_items(type_dist, unique_elements)
tot_draws = 0
for i in 1:n
tot_draws += draw_unique(type_dist, unique_elements_dist, items, x, mt)
if reset
items .= 0
else
items[items.>1] -= 1
end
end
println(@sprintf("Completed average_for_unique on thread %d with n = %d", Threads.threadid(), n))
return tot_draws / n
end
function parallel_average_for_unique(type_dist, unique_elements_dist, x, n, reset=true)
println("Started computing...")
t = max(Threads.nthreads() - 1, 1)
mts = MersenneTwister.(1:t)
m = Int32(round(n / t))
tasks = Array{Task}(undef, t)
@sync for i in 1:t
task = @spawn average_for_unique(type_dist, unique_elements_dist, x, m, mts[i])
tasks[i] = task
end
sum(fetch(t) for t in tasks) / t
end
type_dist = [0.3, 0.3, 0.2, 0.15, 0.05]
const cum_type_dist = cumsum(type_dist)
unique_elements = [21, 27, 32, 14, 10]
unique_elements_dist = [[1 / unique_elements[j] for i in 1:unique_elements[j]] for j in 1:length(unique_elements)]
const cum_unique_elements_dist = [cumsum(dist) for dist in unique_elements_dist]
str_n = ARGS[1]
n = parse(Int64, str_n)
avg = @btime parallel_average_for_unique(cum_type_dist, cum_unique_elements_dist, 50, n)
print(avg)
업데이트 된 벤치 마크 :
Threads @btime Linux Time
1 (2 processes) 9.927 s 0m44.871s
2 (1 process) 20.237 s 1m14.156s
3 (1 process) 14.302 s 1m2.114s
답변
여기에는 두 가지 문제가 있습니다.
- 성능을 올바르게 측정하고 있지 않습니다.
- 스레드에서 난수를 생성 할 때
MersenneTwister
최상의 성능을 위해 각 스레드에 대해 별도의 임의 상태를 가져야합니다 (그렇지 않으면 임의 상태가 모든 스레드에서 공유되고 동기화가 발생해야 함).
현재 "Julia 시작 시간"+ "코드 컴파일 시간"+ "런타임"의 시간을 측정하고 있습니다. 다중 스레드 코드를 컴파일하는 것은 단일 스레드 코드를 컴파일하는 것보다 분명히 더 오래 걸립니다. Julia 자체를 시작하는 것도 1 ~ 2 초가 걸립니다.
여기에 두 가지 옵션이 있습니다. 가장 쉬운 방법은 BenchmarkTools
@btime
매크로 를 사용 하여 코드 내에서 실행 시간을 측정하는 것입니다. 또 다른 옵션은 코드를 패키지로 만들고 PackageCompiler 를 통해 Julia 이미지로 컴파일하는 것 입니다. 그러나 "Julia 시작 시간"+ "Julia 실행 시간"은 계속 측정됩니다.
난수 상태는 다음과 같이 만들 수 있습니다.
mts = MersenneTwister.(1:Threads.nthreads());
그런 다음 rand(mts[Threads.threadid()])