Skip to content

Commit

Permalink
Sch: Add sch uid to most logging events
Browse files Browse the repository at this point in the history
  • Loading branch information
jpsamaroo committed Aug 20, 2024
1 parent b898fcd commit 2b282d5
Showing 1 changed file with 34 additions and 32 deletions.
66 changes: 34 additions & 32 deletions src/sch/Sch.jl
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ const WORKER_MONITOR_TASKS = Dict{Int,Task}()
const WORKER_MONITOR_CHANS = Dict{Int,Dict{UInt64,RemoteChannel}}()
function init_proc(state, p, log_sink)
ctx = Context(Int[]; log_sink)
timespan_start(ctx, :init_proc, (;worker=p.pid), nothing)
timespan_start(ctx, :init_proc, (;uid=state.uid, worker=p.pid), nothing)
# Initialize pressure and capacity
gproc = OSProc(p.pid)
lock(state.lock) do
Expand Down Expand Up @@ -383,7 +383,7 @@ function init_proc(state, p, log_sink)
# Setup dynamic listener
dynamic_listener!(ctx, state, p.pid)

timespan_finish(ctx, :init_proc, (;worker=p.pid), nothing)
timespan_finish(ctx, :init_proc, (;uid=state.uid, worker=p.pid), nothing)
end
function _cleanup_proc(uid, log_sink)
empty!(CHUNK_CACHE) # FIXME: Should be keyed on uid!
Expand All @@ -399,7 +399,7 @@ end
function cleanup_proc(state, p, log_sink)
ctx = Context(Int[]; log_sink)
wid = p.pid
timespan_start(ctx, :cleanup_proc, (;worker=wid), nothing)
timespan_start(ctx, :cleanup_proc, (;uid=state.uid, worker=wid), nothing)
lock(WORKER_MONITOR_LOCK) do
if haskey(WORKER_MONITOR_CHANS, wid)
delete!(WORKER_MONITOR_CHANS[wid], state.uid)
Expand All @@ -419,7 +419,7 @@ function cleanup_proc(state, p, log_sink)
end
end

timespan_finish(ctx, :cleanup_proc, (;worker=wid), nothing)
timespan_finish(ctx, :cleanup_proc, (;uid=state.uid, worker=wid), nothing)
end

"Process-local condition variable (and lock) indicating task completion."
Expand Down Expand Up @@ -467,24 +467,24 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())

master = OSProc(myid())

timespan_start(ctx, :scheduler_init, nothing, master)
timespan_start(ctx, :scheduler_init, (;uid=state.uid), master)
try
scheduler_init(ctx, state, d, options, deps)
finally
timespan_finish(ctx, :scheduler_init, nothing, master)
timespan_finish(ctx, :scheduler_init, (;uid=state.uid), master)
end

value, errored = try
scheduler_run(ctx, state, d, options)
finally
# Always try to tear down the scheduler
timespan_start(ctx, :scheduler_exit, nothing, master)
timespan_start(ctx, :scheduler_exit, (;uid=state.uid), master)
try
scheduler_exit(ctx, state, options)
catch err
@error "Error when tearing down scheduler" exception=(err,catch_backtrace())
finally
timespan_finish(ctx, :scheduler_exit, nothing, master)
timespan_finish(ctx, :scheduler_exit, (;uid=state.uid), master)
end
end

Expand Down Expand Up @@ -545,10 +545,10 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options)
check_integrity(ctx)

isempty(state.running) && continue
timespan_start(ctx, :take, nothing, nothing)
timespan_start(ctx, :take, (;uid=state.uid), nothing)
@dagdebug nothing :take "Waiting for results"
chan_value = take!(state.chan) # get result of completed thunk
timespan_finish(ctx, :take, nothing, nothing)
timespan_finish(ctx, :take, (;uid=state.uid), nothing)
if chan_value isa RescheduleSignal
continue
end
Expand All @@ -563,13 +563,13 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options)
@warn "Worker $(pid) died, rescheduling work"

# Remove dead worker from procs list
timespan_start(ctx, :remove_procs, (;worker=pid), nothing)
timespan_start(ctx, :remove_procs, (;uid=state.uid, worker=pid), nothing)
remove_dead_proc!(ctx, state, gproc)
timespan_finish(ctx, :remove_procs, (;worker=pid), nothing)
timespan_finish(ctx, :remove_procs, (;uid=state.uid, worker=pid), nothing)

timespan_start(ctx, :handle_fault, (;worker=pid), nothing)
timespan_start(ctx, :handle_fault, (;uid=state.uid, worker=pid), nothing)
handle_fault(ctx, state, gproc)
timespan_finish(ctx, :handle_fault, (;worker=pid), nothing)
timespan_finish(ctx, :handle_fault, (;uid=state.uid, worker=pid), nothing)
return # effectively `continue`
else
if something(ctx.options.allow_errors, false) ||
Expand Down Expand Up @@ -604,9 +604,9 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options)
end
end

timespan_start(ctx, :finish, (;thunk_id), (;thunk_id, result=res))
timespan_start(ctx, :finish, (;uid=state.uid, thunk_id), (;thunk_id, result=res))
finish_task!(ctx, state, node, thunk_failed)
timespan_finish(ctx, :finish, (;thunk_id), (;thunk_id, result=res))
timespan_finish(ctx, :finish, (;uid=state.uid, thunk_id), (;thunk_id, result=res))

delete_unused_tasks!(state)
end
Expand Down Expand Up @@ -691,13 +691,13 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
task = nothing
@label pop_task
if task !== nothing
timespan_finish(ctx, :schedule, (;thunk_id=task.id), (;thunk_id=task.id))
timespan_finish(ctx, :schedule, (;uid=state.uid, thunk_id=task.id), (;thunk_id=task.id))
end
if isempty(state.ready)
@goto fire_tasks
end
task = pop!(state.ready)
timespan_start(ctx, :schedule, (;thunk_id=task.id), (;thunk_id=task.id))
timespan_start(ctx, :schedule, (;uid=state.uid, thunk_id=task.id), (;thunk_id=task.id))
if haskey(state.cache, task)
if haskey(state.errored, task)
# An error was eagerly propagated to this task
Expand Down Expand Up @@ -887,7 +887,7 @@ function monitor_procs_changed!(ctx, state)
wait(ctx.proc_notify)
end

timespan_start(ctx, :assign_procs, nothing, nothing)
timespan_start(ctx, :assign_procs, (;uid=state.uid), nothing)

# Load new set of procs
new_ps = procs_to_use(ctx)
Expand Down Expand Up @@ -915,7 +915,7 @@ function monitor_procs_changed!(ctx, state)
end
end

timespan_finish(ctx, :assign_procs, nothing, nothing)
timespan_finish(ctx, :assign_procs, (;uid=state.uid), nothing)
old_ps = new_ps
end
end
Expand Down Expand Up @@ -1085,16 +1085,17 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state)
# know which task failed.
tasks = Task[]
for ts in to_send
# TODO: errormonitor
task = Threads.@spawn begin
timespan_start(ctx, :fire, (;worker=gproc.pid), nothing)
timespan_start(ctx, :fire, (;uid=state.uid, worker=gproc.pid), nothing)
try
remotecall_wait(do_tasks, gproc.pid, proc, state.chan, [ts]);
catch err
bt = catch_backtrace()
thunk_id = ts[1]
put!(state.chan, (gproc.pid, proc, thunk_id, (CapturedException(err, bt), nothing)))
finally
timespan_finish(ctx, :fire, (;worker=gproc.pid), nothing)
timespan_finish(ctx, :fire, (;uid=state.uid, worker=gproc.pid), nothing)
end
end
end
Expand Down Expand Up @@ -1212,6 +1213,7 @@ proc_has_occupancy(proc_occupancy, task_occupancy) =
function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, return_queue::RemoteChannel)
to_proc = istate.proc
proc_run_task = @task begin
# FIXME: Context changes aren't noticed over time
ctx = istate.ctx
tasks = istate.tasks
proc_occupancy = istate.proc_occupancy
Expand All @@ -1223,20 +1225,20 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
# Wait for new tasks
if !work_to_do
@dagdebug nothing :processor "Waiting for tasks"
timespan_start(ctx, :proc_run_wait, (;worker=wid, processor=to_proc), nothing)
timespan_start(ctx, :proc_run_wait, (;uid, worker=wid, processor=to_proc), nothing)
wait(istate.reschedule)
@static if VERSION >= v"1.9"
reset(istate.reschedule)
end
timespan_finish(ctx, :proc_run_wait, (;worker=wid, processor=to_proc), nothing)
timespan_finish(ctx, :proc_run_wait, (;uid, worker=wid, processor=to_proc), nothing)
if istate.done[]
return
end
end

# Fetch a new task to execute
@dagdebug nothing :processor "Trying to dequeue"
timespan_start(ctx, :proc_run_fetch, (;worker=wid, processor=to_proc), nothing)
timespan_start(ctx, :proc_run_fetch, (;uid, worker=wid, processor=to_proc), nothing)
work_to_do = false
task_and_occupancy = lock(istate.queue) do queue
# Only steal if there are multiple queued tasks, to prevent
Expand All @@ -1255,7 +1257,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
return queue_result
end
if task_and_occupancy === nothing
timespan_finish(ctx, :proc_run_fetch, (;worker=wid, processor=to_proc), nothing)
timespan_finish(ctx, :proc_run_fetch, (;uid, worker=wid, processor=to_proc), nothing)

@dagdebug nothing :processor "Failed to dequeue"

Expand All @@ -1270,7 +1272,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
@dagdebug nothing :processor "Trying to steal"

# Try to steal a task
timespan_start(ctx, :steal_local, (;worker=wid, processor=to_proc), nothing)
timespan_start(ctx, :proc_steal_local, (;uid, worker=wid, processor=to_proc), nothing)

# Try to steal from local queues randomly
# TODO: Prioritize stealing from busiest processors
Expand Down Expand Up @@ -1305,12 +1307,12 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
from_proc = other_istate.proc
thunk_id = task[1]
@dagdebug thunk_id :processor "Stolen from $from_proc by $to_proc"
timespan_finish(ctx, :steal_local, (;worker=wid, processor=to_proc), (;from_proc, thunk_id))
timespan_finish(ctx, :proc_steal_local, (;uid, worker=wid, processor=to_proc), (;from_proc, thunk_id))
# TODO: Keep stealing until we hit full occupancy?
@goto execute
end
end
timespan_finish(ctx, :steal_local, (;worker=wid, processor=to_proc), nothing)
timespan_finish(ctx, :proc_steal_local, (;uid, worker=wid, processor=to_proc), nothing)

# TODO: Try to steal from remote queues

Expand All @@ -1322,7 +1324,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
task = task_spec[]
thunk_id = task[1]
time_util = task[2]
timespan_finish(ctx, :proc_run_fetch, (;worker=wid, processor=to_proc), (;thunk_id, proc_occupancy=proc_occupancy[], task_occupancy))
timespan_finish(ctx, :proc_run_fetch, (;uid, worker=wid, processor=to_proc), (;thunk_id, proc_occupancy=proc_occupancy[], task_occupancy))
@dagdebug thunk_id :processor "Dequeued task"

# Execute the task and return its result
Expand Down Expand Up @@ -1423,7 +1425,7 @@ function do_tasks(to_proc, return_queue, tasks)
for task in tasks
thunk_id = task[1]
occupancy = task[4]
timespan_start(ctx, :enqueue, (;processor=to_proc, thunk_id), nothing)
timespan_start(ctx, :enqueue, (;uid, processor=to_proc, thunk_id), nothing)
should_launch = lock(TASK_SYNC) do
# Already running; don't try to re-launch
if !(thunk_id in TASKS_RUNNING)
Expand All @@ -1435,7 +1437,7 @@ function do_tasks(to_proc, return_queue, tasks)
end
should_launch || continue
enqueue!(queue, TaskSpecKey(task), occupancy)
timespan_finish(ctx, :enqueue, (;processor=to_proc, thunk_id), nothing)
timespan_finish(ctx, :enqueue, (;uid, processor=to_proc, thunk_id), nothing)
@dagdebug thunk_id :processor "Enqueued task"
end
end
Expand Down

0 comments on commit 2b282d5

Please sign in to comment.