Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

i#6666: Avoid hang when analyzer worker returns early #6667

Merged
merged 3 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions clients/drcachesim/analyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
worker->error =
"Failed to read from trace: " + worker->stream->get_stream_name();
}
worker->stream->set_active(false); // Avoid hang in scheduler.
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
return;
}
int shard_index = shard_type_ == SHARD_BY_CORE
Expand Down Expand Up @@ -655,6 +656,7 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
record_is_instr(record)) &&
!process_interval(prev_interval_index, prev_interval_init_instr_count, worker,
/*parallel=*/true, record_is_instr(record), shard_index)) {
worker->stream->set_active(false); // Avoid hang in scheduler.
return;
}
for (int i = 0; i < num_tools_; ++i) {
Expand All @@ -665,24 +667,31 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
VPRINT(this, 1, "Worker %d hit shard memref error %s on trace shard %s\n",
worker->index, worker->error.c_str(),
worker->stream->get_stream_name().c_str());
worker->stream->set_active(false); // Avoid hang in scheduler.
return;
}
}
if (record_is_thread_final(record) && shard_type_ != SHARD_BY_CORE) {
if (!process_shard_exit(worker, shard_index))
if (!process_shard_exit(worker, shard_index)) {
worker->stream->set_active(false); // Avoid hang in scheduler.
return;
}
}
}
if (shard_type_ == SHARD_BY_CORE) {
if (worker->shard_data.find(worker->index) != worker->shard_data.end()) {
if (!process_shard_exit(worker, worker->index))
if (!process_shard_exit(worker, worker->index)) {
worker->stream->set_active(false); // Avoid hang in scheduler.
return;
}
}
}
for (const auto &keyval : worker->shard_data) {
if (!keyval.second.exited) {
if (!process_shard_exit(worker, keyval.second.shard_index))
if (!process_shard_exit(worker, keyval.second.shard_index)) {
worker->stream->set_active(false); // Avoid hang in scheduler.
return;
}
}
}
for (int i = 0; i < num_tools_; ++i) {
Expand All @@ -691,6 +700,7 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
worker->error = error;
VPRINT(this, 1, "Worker %d hit worker exit error %s\n", worker->index,
error.c_str());
worker->stream->set_active(false); // Avoid hang in scheduler.
return;
}
}
Expand Down
116 changes: 114 additions & 2 deletions clients/drcachesim/tests/analysis_unit_tests.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* **********************************************************
* Copyright (c) 2023 Google, Inc. All rights reserved.
* Copyright (c) 2023-2024 Google, Inc. All rights reserved.
* **********************************************************/

/*
Expand Down Expand Up @@ -366,10 +366,122 @@ test_wait_records()
return true;
}

bool
test_tool_errors()
{
// Tool errors can hang the analyzer if it doesn't tell the scheduler
// it's giving up on its input. We test that here.
std::cerr << "\n----------------\nTesting tool errors\n";

static constexpr int NUM_INPUTS = 5;
static constexpr int NUM_OUTPUTS = 2;
static constexpr int NUM_INSTRS = 9;
static constexpr memref_tid_t TID_BASE = 100;
std::vector<trace_entry_t> inputs[NUM_INPUTS];
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
inputs[i].push_back(make_thread(tid));
inputs[i].push_back(make_pid(1));
for (int j = 0; j < NUM_INSTRS; j++)
inputs[i].push_back(make_instr(42 + j * 4));
if (i == 4) {
// This one input will trigger an error in our error_tool_t.
inputs[i].push_back(make_marker(TRACE_MARKER_TYPE_CPU_ID, 4));
}
inputs[i].push_back(make_exit(tid));
}

std::vector<scheduler_t::input_workload_t> sched_inputs;
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs[i])),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
sched_inputs.emplace_back(std::move(readers));
}
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_IGNORE,
scheduler_t::SCHEDULER_DEFAULTS,
/*verbosity=*/1);

static const char *const TOOL_ERROR_STRING = "cpuid not supported";

class error_tool_t : public analysis_tool_t {
public:
bool
process_memref(const memref_t &memref) override
{
assert(false); // Only expect parallel mode.
return false;
}
bool
print_results() override
{
return true;
}
bool
parallel_shard_supported() override
{
return true;
}
void *
parallel_shard_init_stream(int shard_index, void *worker_data,
memtrace_stream_t *stream) override
{
auto per_shard = new per_shard_t;
return reinterpret_cast<void *>(per_shard);
}
bool
parallel_shard_exit(void *shard_data) override
{
per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data);
delete shard;
return true;
}
std::string
parallel_shard_error(void *shard_data) override
{
per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data);
return shard->error;
}
bool
parallel_shard_memref(void *shard_data, const memref_t &memref) override
{
per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data);
// Return an error in one of the inputs.
if (memref.marker.type == TRACE_TYPE_MARKER &&
memref.marker.marker_type == TRACE_MARKER_TYPE_CPU_ID) {
shard->error = TOOL_ERROR_STRING;
return false;
}
return true;
}

private:
struct per_shard_t {
std::string error;
};
};

std::vector<analysis_tool_t *> tools;
auto test_tool = std::unique_ptr<error_tool_t>(new error_tool_t);
tools.push_back(test_tool.get());
mock_analyzer_t analyzer(sched_inputs, &tools[0], (int)tools.size(),
/*parallel=*/true, NUM_OUTPUTS, &sched_ops);
assert(!!analyzer);
// If the analyzer doesn't give up the input in the output stream that
// encounters it, the scheduler will hang waiting for that input,
// so failure in this test would be a CTest timeout.
bool res = analyzer.run();
assert(!res);
assert(analyzer.get_error_string() == TOOL_ERROR_STRING);
return true;
}

int
test_main(int argc, const char *argv[])
{
if (!test_queries() || !test_wait_records())
if (!test_queries() || !test_wait_records() || !test_tool_errors())
return 1;
std::cerr << "All done!\n";
return 0;
Expand Down
Loading