Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

i#6471 sched idle: Add idle time #6472

Merged
merged 5 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions clients/drcachesim/analyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,17 @@ analyzer_t::create_wait_marker()
return record;
}

template <>
memref_t
analyzer_t::create_idle_marker()
{
memref_t record = {}; // Zero the other fields.
record.marker.type = TRACE_TYPE_MARKER;
record.marker.marker_type = TRACE_MARKER_TYPE_CORE_IDLE;
record.marker.tid = INVALID_THREAD_ID;
return record;
}

/******************************************************************************
* Specializations for analyzer_tmpl_t<record_reader_t>, aka record_analyzer_t.
*/
Expand Down Expand Up @@ -182,6 +193,17 @@ record_analyzer_t::create_wait_marker()
return record;
}

template <>
trace_entry_t
record_analyzer_t::create_idle_marker()
{
trace_entry_t record;
record.type = TRACE_TYPE_MARKER;
record.size = TRACE_MARKER_TYPE_CORE_IDLE;
record.addr = 0; // Marker value has no meaning so we zero it.
return record;
}

/********************************************************************
* Other analyzer_tmpl_t routines that do not need to be specialized.
*/
Expand Down Expand Up @@ -537,6 +559,12 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
// We synthesize a record here. If we wanted this to count toward output
// stream ordinals we would need to add a scheduler API to inject it.
record = create_wait_marker();
} else if (status == sched_type_t::STATUS_IDLE) {
assert(shard_type_ == SHARD_BY_CORE);
// We let tools know about idle time so they can analyze cpu usage.
// We synthesize a record here. If we wanted this to count toward output
// stream ordinals we would need to add a scheduler API to inject it.
record = create_idle_marker();
} else if (status != sched_type_t::STATUS_OK) {
if (status == sched_type_t::STATUS_REGION_INVALID) {
worker->error =
Expand Down Expand Up @@ -596,8 +624,10 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
}
}
if (shard_type_ == SHARD_BY_CORE) {
if (!process_shard_exit(worker, worker->index))
return;
if (worker->shard_data.find(worker->index) != worker->shard_data.end()) {
if (!process_shard_exit(worker, worker->index))
return;
}
}
for (const auto &keyval : worker->shard_data) {
if (!keyval.second.exited) {
Expand Down
3 changes: 3 additions & 0 deletions clients/drcachesim/analyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ template <typename RecordType, typename ReaderType> class analyzer_tmpl_t {
RecordType
create_wait_marker();

RecordType
create_idle_marker();

// Invoked when the given interval finishes during serial or parallel
// analysis of the trace. For parallel analysis, the shard_id
// parameter should be set to the shard_id for which the interval
Expand Down
4 changes: 2 additions & 2 deletions clients/drcachesim/common/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -809,15 +809,15 @@ droption_t<bool> op_core_sharded(
"software threads. This option instead schedules those threads onto virtual cores "
"and analyzes each core in parallel. Thus, each shard consists of pieces from "
"many software threads. How the scheduling is performed is controlled by a set "
"of options with the prefix \"sched_\" along with -num_cores.");
"of options with the prefix \"sched_\" along with -cores.");

droption_t<bool> op_core_serial(
DROPTION_SCOPE_ALL, "core_serial", false, "Analyze per-core in serial.",
"In this mode, scheduling is performed just like for -core_sharded. "
"However, the resulting schedule is acted upon by a single analysis thread"
"which walks the N cores in lockstep in round robin fashion. "
"How the scheduling is performed is controlled by a set "
"of options with the prefix \"sched_\" along with -num_cores.");
"of options with the prefix \"sched_\" along with -cores.");

droption_t<int64_t>
op_sched_quantum(DROPTION_SCOPE_ALL, "sched_quantum", 1 * 1000 * 1000,
Expand Down
11 changes: 11 additions & 0 deletions clients/drcachesim/common/trace_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,17 @@ typedef enum {
*/
TRACE_MARKER_TYPE_CORE_WAIT,

/**
* This marker is used for core-sharded analyses to indicate that the current
* core has no available inputs to run (all inputs are on other cores or are
* blocked waiting for kernel resources). A new marker is emitted each
* time the tool analysis framework requests a new record from the scheduler and
* is given a wait status. There are no units of time here but each repetition
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
* is roughly the time where a regular record could have been read and passed
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
* along.
*/
TRACE_MARKER_TYPE_CORE_IDLE,

// ...
// These values are reserved for future built-in marker types.
// ...
Expand Down
3 changes: 2 additions & 1 deletion clients/drcachesim/reader/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ class reader_t : public std::iterator<std::input_iterator_tag, memref_t>,
is_record_synthetic() const override
{
if (cur_ref_.marker.type == TRACE_TYPE_MARKER &&
cur_ref_.marker.marker_type == TRACE_MARKER_TYPE_CORE_WAIT) {
(cur_ref_.marker.marker_type == TRACE_MARKER_TYPE_CORE_WAIT ||
cur_ref_.marker.marker_type == TRACE_MARKER_TYPE_CORE_IDLE)) {
// These are synthetic records not part of the input and not
// counting toward ordinals.
return true;
Expand Down
62 changes: 47 additions & 15 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
}
}
}
VPRINT(this, 1, "%zu inputs\n", inputs_.size());
live_input_count_.store(static_cast<int>(inputs_.size()), std::memory_order_release);
return set_initial_schedule(workload2inputs);
}

Expand Down Expand Up @@ -1313,7 +1315,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(
input.cur_region);
if (input.cur_region >= static_cast<int>(input.regions_of_interest.size())) {
if (input.at_eof)
return sched_type_t::STATUS_EOF;
return eof_or_idle(output);
else {
// We let the user know we're done.
if (options_.schedule_record_ostream != nullptr) {
Expand All @@ -1329,7 +1331,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(
return status;
}
input.queue.push_back(create_thread_exit(input.tid));
input.at_eof = true;
mark_input_eof(input);
return sched_type_t::STATUS_SKIPPED;
}
}
Expand Down Expand Up @@ -1408,7 +1410,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::skip_instructions(output_ordinal_t out
if (*input.reader == *input.reader_end) {
// Raise error because the input region is out of bounds.
VPRINT(this, 2, "skip_instructions: input=%d skip out of bounds\n", input.index);
input.at_eof = true;
mark_input_eof(input);
return sched_type_t::STATUS_REGION_INVALID;
}
input.in_cur_region = true;
Expand Down Expand Up @@ -1645,7 +1647,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
{
if (outputs_[output].record_index + 1 >=
static_cast<int>(outputs_[output].record.size()))
return sched_type_t::STATUS_EOF;
return eof_or_idle(output);
const schedule_record_t &segment =
outputs_[output].record[outputs_[output].record_index + 1];
index = segment.key.input;
Expand Down Expand Up @@ -1719,7 +1721,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
// queued candidate record, if any.
clear_input_queue(inputs_[index]);
inputs_[index].queue.push_back(create_thread_exit(inputs_[index].tid));
inputs_[index].at_eof = true;
mark_input_eof(inputs_[index]);
VPRINT(this, 2, "early end for input %d\n", index);
// We're done with this entry but we need the queued record to be read,
// so we do not move past the entry.
Expand Down Expand Up @@ -1773,7 +1775,11 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
const schedule_record_t &segment =
outputs_[output].record[outputs_[output].record_index];
int input = segment.key.input;
VPRINT(this, res == sched_type_t::STATUS_WAIT ? 3 : 2,
VPRINT(this,
(res == sched_type_t::STATUS_IDLE ||
res == sched_type_t::STATUS_WAIT)
? 3
: 2,
"next_record[%d]: replay segment in=%d (@%" PRId64
") type=%d start=%" PRId64 " end=%" PRId64 "\n",
output, input,
Expand Down Expand Up @@ -1819,10 +1825,10 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
// We found a direct switch target above.
} else if (ready_queue_empty()) {
if (prev_index == INVALID_INPUT_ORDINAL)
return sched_type_t::STATUS_EOF;
return eof_or_idle(output);
std::lock_guard<std::mutex> lock(*inputs_[prev_index].lock);
if (inputs_[prev_index].at_eof)
return sched_type_t::STATUS_EOF;
return eof_or_idle(output);
else
index = prev_index; // Go back to prior.
} else {
Expand All @@ -1836,7 +1842,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
}
input_info_t *queue_next = pop_from_ready_queue(output);
if (queue_next == nullptr)
return sched_type_t::STATUS_EOF;
return eof_or_idle(output);
index = queue_next->index;
}
} else if (options_.deps == DEPENDENCY_TIMESTAMPS) {
Expand All @@ -1850,7 +1856,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
}
}
if (index < 0)
return sched_type_t::STATUS_EOF;
return eof_or_idle(output);
VPRINT(this, 2,
"next_record[%d]: advancing to timestamp %" PRIu64
" == input #%d\n",
Expand Down Expand Up @@ -1883,14 +1889,15 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
std::lock_guard<std::mutex> lock(*inputs_[index].lock);
if (inputs_[index].at_eof ||
*inputs_[index].reader == *inputs_[index].reader_end) {
VPRINT(this, 2, "next_record[%d]: local index %d == input #%d at eof\n",
output, outputs_[output].input_indices_index, index);
VPRINT(this, 2, "next_record[%d]: input #%d at eof\n", output, index);
if (options_.schedule_record_ostream != nullptr &&
prev_index != INVALID_INPUT_ORDINAL)
close_schedule_segment(output, inputs_[prev_index]);
inputs_[index].at_eof = true;
if (!inputs_[index].at_eof)
mark_input_eof(inputs_[index]);
index = INVALID_INPUT_ORDINAL;
// Loop and pick next thread.
prev_index = INVALID_INPUT_ORDINAL;
continue;
}
break;
Expand All @@ -1911,7 +1918,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
// check for quantum end.
outputs_[output].cur_time = cur_time; // Invalid values are checked below.
if (!outputs_[output].active)
return sched_type_t::STATUS_WAIT;
return sched_type_t::STATUS_IDLE;
if (outputs_[output].waiting) {
VPRINT(this, 5, "next_record[%d]: need new input (cur=waiting)\n", output);
sched_type_t::stream_status_t res = pick_next_input(output, true);
Expand All @@ -1922,7 +1929,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
if (outputs_[output].cur_input < 0) {
// This happens with more outputs than inputs. For non-empty outputs we
// require cur_input to be set to >=0 during init().
return sched_type_t::STATUS_EOF;
return eof_or_idle(output);
}
input = &inputs_[outputs_[output].cur_input];
auto lock = std::unique_lock<std::mutex>(*input->lock);
Expand Down Expand Up @@ -1970,6 +1977,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
input->needs_advance = true;
}
if (input->at_eof || *input->reader == *input->reader_end) {
if (!input->at_eof)
mark_input_eof(*input);
lock.unlock();
VPRINT(this, 5, "next_record[%d]: need new input (cur=%d eof)\n", output,
input->index);
Expand Down Expand Up @@ -1998,6 +2007,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
if (outputs_[output].record_index >=
static_cast<int>(outputs_[output].record.size())) {
// We're on the last record.
VPRINT(this, 4, "next_record[%d]: on last record\n", output);
} else if (outputs_[output].record[outputs_[output].record_index].type ==
schedule_record_t::SKIP) {
VPRINT(this, 5, "next_record[%d]: need new input after skip\n", output);
Expand Down Expand Up @@ -2257,6 +2267,28 @@ scheduler_tmpl_t<RecordType, ReaderType>::stop_speculation(output_ordinal_t outp
return sched_type_t::STATUS_OK;
}

template <typename RecordType, typename ReaderType>
void
scheduler_tmpl_t<RecordType, ReaderType>::mark_input_eof(input_info_t &input)
{
input.at_eof = true;
assert(live_input_count_.load(std::memory_order_acquire) > 0);
live_input_count_.fetch_add(-1, std::memory_order_release);
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::eof_or_idle(output_ordinal_t output)
{
if (options_.mapping == MAP_TO_CONSISTENT_OUTPUT ||
live_input_count_.load(std::memory_order_acquire) == 0) {
return sched_type_t::STATUS_EOF;
} else {
outputs_[output].waiting = true;
return sched_type_t::STATUS_IDLE;
}
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::set_output_active(output_ordinal_t output,
Expand Down
27 changes: 24 additions & 3 deletions clients/drcachesim/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include <stddef.h>
#include <stdint.h>

#include <atomic>
#include <deque>
#include <limits>
#include <memory>
Expand Down Expand Up @@ -112,14 +113,25 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
* (#DEPENDENCY_TIMESTAMPS) to avoid one stream getting ahead of another. For
* replaying a schedule as it was traced with #MAP_TO_RECORDED_OUTPUT this can
* indicate an idle period on a core where the traced workload was not currently
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
* scheduled.
* scheduled, but generally #STATUS_WAIT should be treated as artificial.
* Simulators are suggested to not advance simulated time for #STATUS_WAIT while
* they should advance time for #STATUS_IDLE.
*/
STATUS_WAIT,
STATUS_INVALID, /**< Error condition. */
STATUS_REGION_INVALID, /**< Input region is out of bounds. */
STATUS_NOT_IMPLEMENTED, /**< Feature not implemented. */
STATUS_SKIPPED, /**< Used for internal scheduler purposes. */
STATUS_RECORD_FAILED, /**< Failed to record schedule for future replay. */
/**
* This code indicates that all inputs are blocked waiting for kernel resources
* (such as i/o). This is similar to #STATUS_WAIT, but #STATUS_WAIT indicates an
* artificial pause due to imposing the original ordering while #STATUS_IDLE
* indicates actual idle time in the application. Simulators are suggested
* to not advance simulated time for #STATUS_WAIT while they should advance
* time for #STATUS_IDLE.
*/
STATUS_IDLE,
};

/** Identifies an input stream by its index. */
Expand Down Expand Up @@ -629,7 +641,7 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
/**
* Disables or re-enables this output stream. If "active" is false, this
* stream becomes inactive and its currently assigned input is moved to the
* ready queue to be scheduled on other outputs. The #STATUS_WAIT code is
* ready queue to be scheduled on other outputs. The #STATUS_IDLE code is
* returned to next_record() for inactive streams. If "active" is true,
* this stream becomes active again.
* This is only supported for #MAP_TO_ANY_OUTPUT.
Expand Down Expand Up @@ -1076,7 +1088,7 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
// sched_lock_.
std::vector<schedule_record_t> record;
int record_index = 0;
bool waiting = false;
bool waiting = false; // Waiting or idling.
bool active = true;
// Used for time-based quanta.
uint64_t cur_time = 0;
Expand Down Expand Up @@ -1259,6 +1271,13 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
stream_status_t
set_output_active(output_ordinal_t output, bool active);

// Caller must hold the input's lock.
void
mark_input_eof(input_info_t &input);

stream_status_t
eof_or_idle(output_ordinal_t output);

///////////////////////////////////////////////////////////////////////////
// Support for ready queues for who to schedule next:

Expand Down Expand Up @@ -1325,6 +1344,8 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
flexible_queue_t<input_info_t *, InputTimestampComparator> ready_priority_;
// Global ready queue counter used to provide FIFO for same-priority inputs.
uint64_t ready_counter_ = 0;
// Count of inputs not yet at eof.
std::atomic<int> live_input_count_;
// Map from workload,tid pair to input.
struct workload_tid_t {
workload_tid_t(int wl, memref_tid_t tid)
Expand Down
4 changes: 4 additions & 0 deletions clients/drcachesim/tests/analysis_unit_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ test_queries()
parallel_shard_memref(void *shard_data, const memref_t &memref) override
{
per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data);
if (memref.marker.type == TRACE_TYPE_MARKER &&
(memref.marker.marker_type == TRACE_MARKER_TYPE_CORE_WAIT ||
memref.marker.marker_type == TRACE_MARKER_TYPE_CORE_IDLE))
return true;
// These are our testing goals: these queries.
// We have one thread for each of our NUM_INPUTS workloads.
assert(shard->stream->get_output_cpuid() == shard->index);
Expand Down
Loading
Loading