Skip to content

Commit

Permalink
Temp commit for visualization
Browse files Browse the repository at this point in the history
  • Loading branch information
jpsamaroo committed May 1, 2021
1 parent 8c79341 commit faca41c
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 69 deletions.
89 changes: 61 additions & 28 deletions benchmarks/benchmark.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#ccall(:jl_exit_on_sigint, Cvoid, (Cint,), 0)

using Distributed
if haskey(ENV, "BENCHMARK_PROCS")
const np, nt = parse.(Ref(Int), split(ENV["BENCHMARK_PROCS"], ":"))
Expand Down Expand Up @@ -40,6 +42,10 @@ elseif render == "offline"
using FFMPEG, FileIO, ImageMagick
end
const RENDERS = Dict{Int,Dict}()
const live_port = parse(Int, get(ENV, "BENCHMARK_LIVE_PORT", "8000"))

const graph = parse(Bool, get(ENV, "BENCHMARK_GRAPH", "0"))
const profile = parse(Bool, get(ENV, "BENCHMARK_PROFILE", "0"))

_benches = get(ENV, "BENCHMARK", "cpu,cpu+dagger")
const benches = []
Expand Down Expand Up @@ -124,7 +130,7 @@ end

theory_flops(nrow, ncol, nfeatures) = 11 * ncol * nrow * nfeatures + 2 * (ncol + nrow) * nfeatures

function nmf_suite(; dagger, accel, network, kwargs...)
function nmf_suite(ctx; dagger, accel, network)
suite = BenchmarkGroup()

#= TODO: Re-enable
Expand Down Expand Up @@ -194,59 +200,67 @@ function nmf_suite(; dagger, accel, network, kwargs...)
else
error("Unknown network $network")
end
rr = true
opts = if accel == "cuda"
Dagger.Sch.SchedulerOptions(;proctypes=[
DaggerGPU.CuArrayDeviceProc
], network=net)
], network=net,round_robin=rr)
elseif accel == "amdgpu"
Dagger.Sch.SchedulerOptions(;proctypes=[
DaggerGPU.ROCArrayProc
], network=net)
], network=net,round_robin=rr)
elseif accel == "cpu"
Dagger.Sch.SchedulerOptions(;network=net)
Dagger.Sch.SchedulerOptions(;network=net,round_robin=rr)
else
error("Unknown accelerator $accel")
end
ctx = Context(collect((1:nw) .+ 1); kwargs...)
p = sum([length(Dagger.get_processors(OSProc(id))) for id in 2:(nw+1)])
#bsz = ncol ÷ length(workers())
bsz = ncol ÷ 64
nsuite["Workers: $nw"] = @benchmarkable begin
compute($ctx, nnmf($X[], $W[], $H[]); options=$opts)
_ctx = Context($ctx, workers()[1:$nw])
compute(_ctx, nnmf($X[], $W[], $H[]); options=$opts)
end setup=begin
_nw, _scale = $nw, $scale
@info "Starting $_nw worker Dagger NNMF (scale by $_scale)"
if render != ""
Dagger.show_gantt($ctx; width=1800, window_length=20, delay=2, port=4040, live=live)
end
if $accel == "cuda"
# FIXME: Allocate with CUDA.rand if possible
$X[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nrow, $ncol); options=$opts))
$W[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nrow, $nfeatures); options=$opts))
$H[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nfeatures, $ncol); options=$opts))
$X[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($nrow, $bsz), Float32, $nrow, $ncol); options=$opts))
$W[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($nrow, $bsz), Float32, $nrow, $nfeatures); options=$opts))
$H[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($nrow, $bsz), Float32, $nfeatures, $ncol); options=$opts))
elseif $accel == "amdgpu"
$X[] = Dagger.mapchunks(ROCArray, compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nrow, $ncol); options=$opts))
$W[] = Dagger.mapchunks(ROCArray, compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nrow, $nfeatures); options=$opts))
$H[] = Dagger.mapchunks(ROCArray, compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nfeatures, $ncol); options=$opts))
elseif $accel == "cpu"
$X[] = compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nrow, $ncol); options=$opts)
$W[] = compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nrow, $nfeatures); options=$opts)
$H[] = compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nfeatures, $ncol); options=$opts)
$X[] = compute(rand(Blocks($nrow, $bsz), Float32, $nrow, $ncol); options=$opts)
$W[] = compute(rand(Blocks($nrow, $bsz), Float32, $nrow, $nfeatures); options=$opts)
$H[] = compute(rand(Blocks($nrow, $bsz), Float32, $nfeatures, $ncol); options=$opts)
end
end teardown=begin
if render != ""
if render != "" && !live
Dagger.continue_rendering[] = false
video_paths = take!(Dagger.render_results)
try
video_data = Dict(key=>read(video_paths[key]) for key in keys(video_paths))
push!(get!(()->[], RENDERS[$scale], $nw), video_data)
catch
for i in 1:5
isready(Dagger.render_results) && break
sleep(1)
end
if isready(Dagger.render_results)
video_paths = take!(Dagger.render_results)
try
video_data = Dict(key=>read(video_paths[key]) for key in keys(video_paths))
push!(get!(()->[], RENDERS[$scale], $nw), video_data)
catch err
@error "Failed to process render results" exception=(err,catch_backtrace())
end
else
@warn "Failed to fetch render results"
end
end
$X[] = nothing
$W[] = nothing
$H[] = nothing
@everywhere GC.gc()
end
break
nw ÷= 2
end
suite["NNMF scaled by: $scale"] = nsuite
Expand All @@ -261,28 +275,42 @@ function main()
output_prefix = "result-$(np)workers-$(nt)threads-$(Dates.now())"

suites = Dict()
graph_opts = if graph && render != ""
(log_sink=Dagger.LocalEventLog(), log_file=output_prefix*".dot")
elseif render != ""
(log_sink=Dagger.LocalEventLog(),)
else
NamedTuple()
end
ctx = Context(collect((1:nw) .+ 1); profile=profile, graph_opts...)
for bench in benches
name = bench.name
println("creating $name benchmarks")
suites[name] = if bench.dagger
nmf_suite(; dagger=true, accel=bench.accel, network=bench.network, log_sink=Dagger.LocalEventLog(), log_file=output_prefix*".dot", profile=false)
else
nmf_suite(; dagger=false, accel=bench.accel, network=bench.network)
suites[name] = nmf_suite(ctx; dagger=bench.dagger, accel=bench.accel, network=bench.network)
end
if render != ""
Dagger.show_gantt(ctx; width=1800, window_length=5, delay=2, port=live_port, live=live)
if live
# Make sure server code is compiled
sleep(1)
run(pipeline(`curl -s localhost:$live_port/`; stdout=devnull))
run(pipeline(`curl -s localhost:$live_port/profile`; stdout=devnull))
@info "Rendering started on port $live_port"
end
end
res = Dict()
for bench in benches
name = bench.name
println("running $name benchmarks")
res[name] = try
run(suites[name]; samples=5, seconds=10*60, gcsample=true)
run(suites[name]; samples=3, seconds=10*60, gcsample=true)
catch err
@error "Error running $name benchmarks" exception=(err,catch_backtrace())
nothing
end
end
for bench in benches
println("benchmark results for $(bench.name): $(res[bench.name])")
println("benchmark results for $(bench.name): $(minimum(res[bench.name]))")
end

println("saving results in $output_prefix.$output_format")
Expand All @@ -294,6 +322,11 @@ function main()
serialize(io, outdict)
end
end

if parse(Bool, get(ENV, "BENCHMARK_VISUALIZE", "0"))
run(`$(Base.julia_cmd()) $(joinpath(pwd(), "visualize.jl")) -- $(output_prefix*"."*output_format)`)
end

println("Done.")

# TODO: Compare with multiple results
Expand Down
94 changes: 68 additions & 26 deletions benchmarks/visualize.jl
Original file line number Diff line number Diff line change
@@ -1,27 +1,61 @@
using JLD
using JLD, Serialization
using BenchmarkTools
using TypedTables

res = JLD.load(ARGS[1])
res = if endswith(ARGS[1], ".jld")
JLD.load(ARGS[1])
elseif endswith(ARGS[1], ".jls")
deserialize(ARGS[1])
else
error("Unknown file type")
end

serial_results = res["results"]["Serial"]
dagger_results = res["results"]["Dagger"]
serial_results = filter(x->!occursin("dagger", x[1]), res["results"])
@assert length(keys(serial_results)) > 0 "No serial results found"
dagger_results = filter(x->occursin("dagger", x[1]), res["results"])
@assert length(keys(dagger_results)) > 0 "No Dagger results found"

scale_set = sort([key=>parse(Int, lstrip(last(split(key, ':')), ' ')) for key in keys(first(serial_results)[2])]; by=x->x[2])
nw_set = sort([key=>parse(Int, lstrip(last(split(key, ':')), ' ')) for key in keys(first(dagger_results)[2][first(first(scale_set))])]; by=x->x[2])
raw_table = NamedTuple[]
for bset_key in keys(res["results"])
bset = res["results"][bset_key]
if typeof(bset[first(first(scale_set))]) <: BenchmarkGroup
procs = parse(Int, lstrip(last(split(first(first(bset[first(first(scale_set))])), ':')), ' '))
for nw in nw_set
for i in 1:length(scale_set)
set_times = [minimum(bset[scale][nw[1]]).time/(10^9) for scale in first.(scale_set)]
push!(raw_table, (name=bset_key, time=set_times[i], scale=last.(scale_set)[i], procs=nw[2]))
end
end
else
set_times = [minimum(bset[scale]).time/(10^9) for scale in first.(scale_set)]
procs = 8 # default for OpenBLAS
for i in 1:length(set_times)
push!(raw_table, (name=bset_key, time=set_times[i], scale=last.(scale_set)[i], procs=procs))
end
end
end
table = Table(raw_table)

scale_set = sort([key=>parse(Int, lstrip(last(split(key, ':')), ' ')) for key in keys(serial_results)]; by=x->x[2])
serial_times = [minimum(serial_results[scale]).time/(10^9) for scale in first.(scale_set)]
nw_set = sort([key=>parse(Int, lstrip(last(split(key, ':')), ' ')) for key in keys(dagger_results[first(first(scale_set))])]; by=x->x[2])
@show table
btable = copy(table[map(x->!x, occursin.(Ref("dagger"), table.name))])
dtable = copy(table[occursin.(Ref("dagger"), table.name)])
@show btable dtable

table = Table(name=[:Base for _ in 1:3], time=serial_times, scale=last.(scale_set), procs=[8 for _ in 1:3])
#table = Table(name=[:Base for _ in 1:3], time=serial_times, scale=last.(scale_set), procs=[8 for _ in 1:3])

btable = copy(table)
#btable = copy(table)

#=
for (nw,nw_val) in nw_set
dagger_times = [minimum(dagger_results[scale][nw]).time/(10^9) for scale in first.(scale_set)]
t = Table(name=[:Dagger for _ in 1:3], time=dagger_times, scale=last.(scale_set), procs=[parse(Int,split(nw, ":")[2]) for _ in 1:3])
append!(table, t)
end
=#

dtable = table[table.name .== :Dagger]
#dtable = table[table.name .== :Dagger]

# Plotting

Expand All @@ -45,11 +79,11 @@ legend_names = String[]

scales = unique(dtable.scale)

colors = distinguishable_colors(lenght(scales), ColorSchemes.seaborn_deep.colors)
colors = distinguishable_colors(length(scales), ColorSchemes.seaborn_deep.colors)

for (i, scale) in enumerate(scales)
stable = dtable[dtable.scale .== scale]
t1 = first(stable[stable.procs .== 1].time)
t1 = first(stable[stable.procs .== minimum(dtable.procs)].time)
ss_efficiency = strong_scaling.(t1, stable.time, stable.procs)
push!(line_plots, lines!(ssp, stable.procs, ss_efficiency, linewidth=3.0, color = colors[i]))
push!(legend_names, "scale = $scale")
Expand All @@ -65,25 +99,33 @@ save("strong_scaling.png", fig)
# too little data

fig = Figure(resolution = (1200, 800))
weak_scaling(t1, tn) = t1/tn
weak_scaling(t1, tn, p_prime, p) = t1/((p_prime/p)*tn)

dtable = table[table.name .== :Dagger]
wstable = filter(row->row.scale == row.procs, dtable)
wstable = sort(wstable, by=r->r.scale)
t1 = first(wstable).time
t1 = first(dtable[map(row->(row.scale == 10) && (row.procs == 1), dtable)]).time

fig = Figure(resolution = (1200, 800))
perf = fig[1, 1] = Axis(fig, title = "Weak scaling")
perf.xlabel = "nprocs"
perf.ylabel = "Efficiency"
perf = fig[1, 1] = Axis(fig, title = "Weak Scaling")
perf.xlabel = "Number of processes"
perf.ylabel = "Scaling efficiency"

line_plots = Any[]
legend_names = String[]

wstable = similar(dtable, 0)
for pair in [(10,1),(35,4),(85,8)]
append!(wstable, dtable[map(row->(row.scale == pair[1]) && (row.procs == pair[2]), rows(dtable))])
end
@show wstable
push!(line_plots, lines!(perf, wstable.procs, weak_scaling.(t1, wstable.time, wstable.procs .* 10, wstable.scale), linewidth=3.0))
push!(legend_names, "cpu+dagger")

lines!(perf, wstable.procs, weak_scaling.(t1, wstable.time), linewidth=3.0)
legend = fig[1, 2] = Legend(fig, line_plots, legend_names)
save("weak_scaling.png", fig)

# 3. Comparision against Base

fig = Figure(resolution = (1200, 800))
perf = fig[1, 1] = Axis(fig, title = "DaggerArrays vs Base")
perf = fig[1, 1] = Axis(fig, title = "Dagger vs Base")
perf.xlabel = "Scaling factor"
perf.ylabel = "time (s)"

Expand All @@ -92,7 +134,7 @@ legend_names = String[]

procs = unique(dtable.procs)

colors = distinguishable_colors(lenght(procs) + 1, ColorSchemes.seaborn_deep.colors)
colors = distinguishable_colors(length(procs) + 1, ColorSchemes.seaborn_deep.colors)

for (i, nproc) in enumerate(procs)
stable = dtable[dtable.procs .== nproc]
Expand All @@ -109,9 +151,9 @@ save("raw_timings.png", fig)

# 4. Speedup
fig = Figure(resolution = (1200, 800))
speedup = fig[1, 1] = Axis(fig, title = "DaggerArrays vs Base (8 threads)")
speedup = fig[1, 1] = Axis(fig, title = "Dagger vs Base (8 threads)")
speedup.xlabel = "Scaling factor"
speedup.ylabel = "Speedup Base/Dagger"
speedup.ylabel = "Runtime Dagger/Base"

line_plots = Any[]
legend_names = String[]
Expand All @@ -121,7 +163,7 @@ colors = distinguishable_colors(length(procs), ColorSchemes.seaborn_deep.colors)
sort!(btable, by=r->r.scale)

for (i, nproc) in enumerate(unique(dtable.procs))
nproc < 8 && continue
nproc == 8 || continue
stable = dtable[dtable.procs .== nproc]
sort!(stable, by=r->r.scale)
push!(line_plots, lines!(speedup, stable.scale, btable.time ./ stable.time, linewidth=3.0, color = colors[i]))
Expand Down
17 changes: 12 additions & 5 deletions src/lib/logging.jl
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,13 @@ function raise_event(ctx, phase, category, id,tl, t, gc_num, prof, async)
end
end

empty_prof() = ProfilerResult(UInt[], Profile.getdict(UInt[]))
empty_prof() = ProfilerResult(UInt[], Dict{UInt64, Vector{Base.StackTraces.StackFrame}}())

const prof_refcount = Ref{Threads.Atomic{Int}}(Threads.Atomic{Int}(0))

function timespan_start(ctx, category, id, tl, async=isasync(ctx.log_sink))
isa(ctx.log_sink, NoOpLog) && return # don't go till raise
if ctx.profile && category == :compute
if ctx.profile && category == :compute && Threads.atomic_add!(prof_refcount[], 1) == 0
Profile.start_timer()
end
raise_event(ctx, :start, category, id, tl, time_ns(), gc_num(), empty_prof(), async)
Expand All @@ -129,13 +131,18 @@ end

function timespan_end(ctx, category, id, tl, async=isasync(ctx.log_sink))
isa(ctx.log_sink, NoOpLog) && return
time = time_ns()
gcn = gc_num()
prof = UInt[]
lidict = Dict{UInt64, Vector{Base.StackTraces.StackFrame}}()
if ctx.profile && category == :compute
Profile.stop_timer()
prof = Profile.fetch()
if Threads.atomic_sub!(prof_refcount[], 1) == 1
Profile.stop_timer()
end
prof, lidict = Profile.retrieve()
Profile.clear()
end
raise_event(ctx, :finish, category, id, tl,time_ns(), gc_num(), ProfilerResult(prof, Profile.getdict(prof)), async)
raise_event(ctx, :finish, category, id, tl, time, gcn, ProfilerResult(prof, lidict), async)
nothing
end

Expand Down
2 changes: 2 additions & 0 deletions src/processor.jl
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ Context(procs::Vector{P}=Processor[OSProc(w) for w in workers()];
profile=false, options=nothing) where {P<:Processor} =
Context(procs, proc_lock, log_sink, log_file, profile, options)
Context(xs::Vector{Int}; kwargs...) = Context(map(OSProc, xs); kwargs...)
Context(ctx::Context, xs::Vector) =
Context(xs; log_sink=ctx.log_sink, log_file=ctx.log_file, profile=ctx.profile)
procs(ctx::Context) = lock(ctx) do
copy(ctx.procs)
end
Expand Down
2 changes: 1 addition & 1 deletion src/sch/unix.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ struct TimeSpec
tv_nsec :: UInt64
end

maketime(ts) = ts.tv_sec * 1e9 + ts.tv_nsec
maketime(ts) = ts.tv_sec * UInt(1e9) + ts.tv_nsec

# From bits/times.h on a Linux system
# Check if those are the same on BSD
Expand Down
Loading

0 comments on commit faca41c

Please sign in to comment.