Skip to content

Commit

Permalink
i#6471 sched idle: Add replay fallback and fix (#6481)
Browse files Browse the repository at this point in the history
Adds a fallback to return EOF when all outputs are past the last record
in replay mode, to handle a schedule where not every input reaches EOF
or has regions of interest. Adds a unit test for this case, which hangs
without the fix.

Fixes a record bug which seems to cause the final segment entry for an
input to have a stop instruction ordinal prior to the input's real
endpoint. This is what caused hangs in actual usage of PR #6472 and led
to adding the fallback just described (which is useful on its own). On
selecting a new input which turned out to be at EOF, the original code
called close_schedule_segment() on *prev_input* for some reason. That
replaces the default -1 sentinel (read to EOF) with the current ordinal.
The code then tries again for a new input, but if it ends up finding the
prior one it would keep executing that without a new schedule segment
entry. If that's the last input on that core, close_schedule_segment()
is not called (the default is relied upon). I observed missing sentinels
for one input in some runs and this is my theory as to what happened;
those cases are gone now with this fix. Unfortunately it is difficult to
create a unit test for this, but testing on the original hangs (without
the fallback) show it fixed.

Issue: #6471
  • Loading branch information
derekbruening authored Nov 29, 2023
1 parent cd5521c commit 0254f5b
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 6 deletions.
25 changes: 19 additions & 6 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::set_initial_schedule(
std::unordered_map<int, std::vector<int>> &workload2inputs)
{
if (options_.mapping == MAP_AS_PREVIOUSLY) {
live_replay_output_count_.store(static_cast<int>(outputs_.size()),
std::memory_order_release);
if (options_.schedule_replay_istream == nullptr ||
options_.schedule_record_ostream != nullptr)
return STATUS_ERROR_INVALID_PARAMETER;
Expand Down Expand Up @@ -1646,8 +1648,13 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
output_ordinal_t output, input_ordinal_t &index)
{
if (outputs_[output].record_index + 1 >=
static_cast<int>(outputs_[output].record.size()))
static_cast<int>(outputs_[output].record.size())) {
if (!outputs_[output].at_eof) {
outputs_[output].at_eof = true;
live_replay_output_count_.fetch_add(-1, std::memory_order_release);
}
return eof_or_idle(output);
}
const schedule_record_t &segment =
outputs_[output].record[outputs_[output].record_index + 1];
index = segment.key.input;
Expand Down Expand Up @@ -1895,14 +1902,10 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
if (inputs_[index].at_eof ||
*inputs_[index].reader == *inputs_[index].reader_end) {
VPRINT(this, 2, "next_record[%d]: input #%d at eof\n", output, index);
if (options_.schedule_record_ostream != nullptr &&
prev_index != INVALID_INPUT_ORDINAL)
close_schedule_segment(output, inputs_[prev_index]);
if (!inputs_[index].at_eof)
mark_input_eof(inputs_[index]);
index = INVALID_INPUT_ORDINAL;
// Loop and pick next thread.
prev_index = INVALID_INPUT_ORDINAL;
continue;
}
break;
Expand Down Expand Up @@ -2276,17 +2279,27 @@ template <typename RecordType, typename ReaderType>
void
scheduler_tmpl_t<RecordType, ReaderType>::mark_input_eof(input_info_t &input)
{
if (input.at_eof)
return;
input.at_eof = true;
assert(live_input_count_.load(std::memory_order_acquire) > 0);
live_input_count_.fetch_add(-1, std::memory_order_release);
VPRINT(this, 2, "input %d at eof; %d live inputs left\n", input.index,
live_input_count_.load(std::memory_order_acquire));
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::eof_or_idle(output_ordinal_t output)
{
if (options_.mapping == MAP_TO_CONSISTENT_OUTPUT ||
live_input_count_.load(std::memory_order_acquire) == 0) {
live_input_count_.load(std::memory_order_acquire) == 0 ||
// While a full schedule recorded should have each input hit either its
// EOF or ROI end, we have a fallback to avoid hangs for possible recorded
// schedules that end an input early deliberately without an ROI.
(options_.mapping == MAP_AS_PREVIOUSLY &&
live_replay_output_count_.load(std::memory_order_acquire) == 0)) {
assert(options_.mapping != MAP_AS_PREVIOUSLY || outputs_[output].at_eof);
return sched_type_t::STATUS_EOF;
} else {
outputs_[output].waiting = true;
Expand Down
4 changes: 4 additions & 0 deletions clients/drcachesim/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,8 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
uint64_t cur_time = 0;
// Used for MAP_TO_RECORDED_OUTPUT get_output_cpuid().
int64_t as_traced_cpuid = -1;
// Used for MAP_AS_PREVIOUSLY with live_replay_output_count_.
bool at_eof = false;
};

// Called just once at initialization time to set the initial input-to-output
Expand Down Expand Up @@ -1346,6 +1348,8 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
uint64_t ready_counter_ = 0;
// Count of inputs not yet at eof.
std::atomic<int> live_input_count_;
// In replay mode, count of outputs not yet at the end of the replay sequence.
std::atomic<int> live_replay_output_count_;
// Map from workload,tid pair to input.
struct workload_tid_t {
workload_tid_t(int wl, memref_tid_t tid)
Expand Down
113 changes: 113 additions & 0 deletions clients/drcachesim/tests/scheduler_unit_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2328,6 +2328,118 @@ test_replay_timestamps()
#endif // HAS_ZIP
}

#ifdef HAS_ZIP
// We subclass scheduler_t to access its record struct and functions.
class test_noeof_scheduler_t : public scheduler_t {
public:
void
write_test_schedule(std::string record_fname)
{
// We duplicate test_scheduler_t but we have one input ending early before
// eof.
scheduler_t scheduler;
std::vector<schedule_record_t> sched0;
sched0.emplace_back(scheduler_t::schedule_record_t::VERSION, 0, 0, 0, 0);
sched0.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 0, 0, 4, 11);
// There is a huge time gap here.
// Max numeric value means continue until EOF.
sched0.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 2, 7,
0xffffffffffffffffUL, 91);
sched0.emplace_back(scheduler_t::schedule_record_t::FOOTER, 0, 0, 0, 0);
std::vector<schedule_record_t> sched1;
sched1.emplace_back(scheduler_t::schedule_record_t::VERSION, 0, 0, 0, 0);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 1, 0, 4, 10);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 2, 0, 4, 20);
// Input 2 advances early so core 0 is no longer waiting on it but only
// the timestamp.
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 2, 4, 7, 60);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 3, 0, 4, 30);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 0, 4, 7, 40);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 1, 4, 7, 50);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 3, 4, 7, 70);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 0, 7,
0xffffffffffffffffUL, 80);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 1, 7,
0xffffffffffffffffUL, 90);
// Input 3 never reaches EOF (end is exclusive: so it stops at 8 with the
// real end at 9).
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 3, 7, 9, 110);
sched1.emplace_back(scheduler_t::schedule_record_t::FOOTER, 0, 0, 0, 0);
zipfile_ostream_t outfile(record_fname);
std::string err = outfile.open_new_component(recorded_schedule_component_name(0));
assert(err.empty());
if (!outfile.write(reinterpret_cast<char *>(sched0.data()),
sched0.size() * sizeof(sched0[0])))
assert(false);
err = outfile.open_new_component(recorded_schedule_component_name(1));
assert(err.empty());
if (!outfile.write(reinterpret_cast<char *>(sched1.data()),
sched1.size() * sizeof(sched1[0])))
assert(false);
}
};
#endif

static void
test_replay_noeof()
{
#ifdef HAS_ZIP
std::cerr << "\n----------------\nTesting replay with no eof\n";
static constexpr int NUM_INPUTS = 4;
static constexpr int NUM_OUTPUTS = 2;
static constexpr int NUM_INSTRS = 9;
static constexpr memref_tid_t TID_BASE = 100;
std::vector<trace_entry_t> inputs[NUM_INPUTS];
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
inputs[i].push_back(make_thread(tid));
inputs[i].push_back(make_pid(1));
// We need a timestamp so the scheduler will find one for initial
// input processing. We do not try to duplicate the timestamp
// sequences in the stored file and just use a dummy timestamp here.
inputs[i].push_back(make_timestamp(10 + i));
for (int j = 0; j < NUM_INSTRS; j++)
inputs[i].push_back(make_instr(42 + j * 4));
inputs[i].push_back(make_exit(tid));
}

// Create a record file with timestamps requiring waiting.
// We cooperate with the test_noeof_scheduler_t class which constructs this schedule:
static const char *const CORE0_SCHED_STRING = ".AAA-------------------------CCC.__";
static const char *const CORE1_SCHED_STRING = ".BBB.CCCCCC.DDDAAABBBDDDAAA.BBB.DD";
std::string record_fname = "tmp_test_replay_noeof_timestamp.zip";
test_noeof_scheduler_t test_scheduler;
test_scheduler.write_test_schedule(record_fname);

// Replay the recorded schedule.
std::vector<scheduler_t::input_workload_t> sched_inputs;
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs[i])),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
sched_inputs.emplace_back(std::move(readers));
}
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_AS_PREVIOUSLY,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
/*verbosity=*/4);
zipfile_istream_t infile(record_fname);
sched_ops.schedule_replay_istream = &infile;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] == CORE0_SCHED_STRING);
assert(sched_as_string[1] == CORE1_SCHED_STRING);
#endif // HAS_ZIP
}

static void
test_replay_skip()
{
Expand Down Expand Up @@ -3200,6 +3312,7 @@ test_main(int argc, const char *argv[])
test_replay();
test_replay_multi_threaded(argv[1]);
test_replay_timestamps();
test_replay_noeof();
test_replay_skip();
test_replay_limit();
test_replay_as_traced_from_file(argv[1]);
Expand Down

0 comments on commit 0254f5b

Please sign in to comment.