Skip to content

Commit

Permalink
i#6666: Avoid hang when analyzer worker returns early (#6667)
Browse files Browse the repository at this point in the history
Adds a call to set_active(false) on the output stream when an analyzer
worker thread returns early due to an error. This frees up the current
input and avoids a hang in the scheduler after the error is printed.

Adds a unit test which hangs without this fix.

Fixes #6666
  • Loading branch information
derekbruening authored Feb 22, 2024
1 parent 6a06ccb commit afdc470
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 15 deletions.
50 changes: 38 additions & 12 deletions clients/drcachesim/analyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler_common(
worker_count_ = 1;
output_count = 1;
}
sched_mapping_ = options.mapping;
if (scheduler_.init(sched_inputs, output_count, std::move(sched_ops)) !=
sched_type_t::STATUS_SUCCESS) {
ERRMSG("Failed to initialize scheduler: %s\n",
Expand Down Expand Up @@ -585,8 +586,9 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_shard_exit(
}

template <typename RecordType, typename ReaderType>
void
analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *worker)
bool
analyzer_tmpl_t<RecordType, ReaderType>::process_tasks_internal(
analyzer_worker_data_t *worker)
{
std::vector<void *> user_worker_data(num_tools_);

Expand Down Expand Up @@ -622,7 +624,7 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
worker->error =
"Failed to read from trace: " + worker->stream->get_stream_name();
}
return;
return false;
}
int shard_index = shard_type_ == SHARD_BY_CORE
? worker->index
Expand Down Expand Up @@ -655,7 +657,7 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
record_is_instr(record)) &&
!process_interval(prev_interval_index, prev_interval_init_instr_count, worker,
/*parallel=*/true, record_is_instr(record), shard_index)) {
return;
return false;
}
for (int i = 0; i < num_tools_; ++i) {
if (!tools_[i]->parallel_shard_memref(
Expand All @@ -665,24 +667,27 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
VPRINT(this, 1, "Worker %d hit shard memref error %s on trace shard %s\n",
worker->index, worker->error.c_str(),
worker->stream->get_stream_name().c_str());
return;
return false;
}
}
if (record_is_thread_final(record) && shard_type_ != SHARD_BY_CORE) {
if (!process_shard_exit(worker, shard_index))
return;
if (!process_shard_exit(worker, shard_index)) {
return false;
}
}
}
if (shard_type_ == SHARD_BY_CORE) {
if (worker->shard_data.find(worker->index) != worker->shard_data.end()) {
if (!process_shard_exit(worker, worker->index))
return;
if (!process_shard_exit(worker, worker->index)) {
return false;
}
}
}
for (const auto &keyval : worker->shard_data) {
if (!keyval.second.exited) {
if (!process_shard_exit(worker, keyval.second.shard_index))
return;
if (!process_shard_exit(worker, keyval.second.shard_index)) {
return false;
}
}
}
for (int i = 0; i < num_tools_; ++i) {
Expand All @@ -691,7 +696,28 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
worker->error = error;
VPRINT(this, 1, "Worker %d hit worker exit error %s\n", worker->index,
error.c_str());
return;
return false;
}
}
return true;
}

template <typename RecordType, typename ReaderType>
void
analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *worker)
{
if (!process_tasks_internal(worker)) {
if (sched_mapping_ == sched_type_t::MAP_TO_ANY_OUTPUT) {
// Avoid a hang in the scheduler if we leave our current input stranded.
// XXX: Better to just do a global exit and not let the other threads
// keep running? That breaks the current model where errors are
// propagated to the user to decide what to do.
// We could perhaps add thread synch points to have other threads
// exit earlier: but maybe some uses cases consider one shard error
// to not affect others and not be fatal?
if (worker->stream->set_active(false) != sched_type_t::STATUS_OK) {
ERRMSG("Failed to set failing worker to inactive; may hang");
}
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion clients/drcachesim/analyzer.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* **********************************************************
* Copyright (c) 2016-2023 Google, Inc. All rights reserved.
* Copyright (c) 2016-2024 Google, Inc. All rights reserved.
* **********************************************************/

/*
Expand Down Expand Up @@ -239,6 +239,10 @@ template <typename RecordType, typename ReaderType> class analyzer_tmpl_t {
void
process_serial(analyzer_worker_data_t &worker);

// Helper for process_tasks().
bool
process_tasks_internal(analyzer_worker_data_t *worker);

// Helper for process_tasks() which calls parallel_shard_exit() in each tool.
// Returns false if there was an error and the caller should return early.
bool
Expand Down Expand Up @@ -421,6 +425,7 @@ template <typename RecordType, typename ReaderType> class analyzer_tmpl_t {
int verbosity_ = 0;
shard_type_t shard_type_ = SHARD_BY_THREAD;
bool sched_by_time_ = false;
typename sched_type_t::mapping_t sched_mapping_ = sched_type_t::MAP_TO_ANY_OUTPUT;

private:
bool
Expand Down
117 changes: 115 additions & 2 deletions clients/drcachesim/tests/analysis_unit_tests.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* **********************************************************
* Copyright (c) 2023 Google, Inc. All rights reserved.
* Copyright (c) 2023-2024 Google, Inc. All rights reserved.
* **********************************************************/

/*
Expand Down Expand Up @@ -76,6 +76,7 @@ class mock_analyzer_t : public analyzer_t {
sched_ops = scheduler_t::make_scheduler_parallel_options(verbosity_);
else
sched_ops = scheduler_t::make_scheduler_serial_options(verbosity_);
sched_mapping_ = sched_ops.mapping;
if (scheduler_.init(sched_inputs, worker_count_, std::move(sched_ops)) !=
sched_type_t::STATUS_SUCCESS) {
assert(false);
Expand Down Expand Up @@ -366,10 +367,122 @@ test_wait_records()
return true;
}

bool
test_tool_errors()
{
// Tool errors can hang the analyzer if it doesn't tell the scheduler
// it's giving up on its input. We test that here.
std::cerr << "\n----------------\nTesting tool errors\n";

static constexpr int NUM_INPUTS = 5;
static constexpr int NUM_OUTPUTS = 2;
static constexpr int NUM_INSTRS = 9;
static constexpr memref_tid_t TID_BASE = 100;
std::vector<trace_entry_t> inputs[NUM_INPUTS];
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
inputs[i].push_back(make_thread(tid));
inputs[i].push_back(make_pid(1));
for (int j = 0; j < NUM_INSTRS; j++)
inputs[i].push_back(make_instr(42 + j * 4));
if (i == 4) {
// This one input will trigger an error in our error_tool_t.
inputs[i].push_back(make_marker(TRACE_MARKER_TYPE_CPU_ID, 4));
}
inputs[i].push_back(make_exit(tid));
}

std::vector<scheduler_t::input_workload_t> sched_inputs;
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs[i])),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
sched_inputs.emplace_back(std::move(readers));
}
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_IGNORE,
scheduler_t::SCHEDULER_DEFAULTS,
/*verbosity=*/1);

static const char *const TOOL_ERROR_STRING = "cpuid not supported";

class error_tool_t : public analysis_tool_t {
public:
bool
process_memref(const memref_t &memref) override
{
assert(false); // Only expect parallel mode.
return false;
}
bool
print_results() override
{
return true;
}
bool
parallel_shard_supported() override
{
return true;
}
void *
parallel_shard_init_stream(int shard_index, void *worker_data,
memtrace_stream_t *stream) override
{
auto per_shard = new per_shard_t;
return reinterpret_cast<void *>(per_shard);
}
bool
parallel_shard_exit(void *shard_data) override
{
per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data);
delete shard;
return true;
}
std::string
parallel_shard_error(void *shard_data) override
{
per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data);
return shard->error;
}
bool
parallel_shard_memref(void *shard_data, const memref_t &memref) override
{
per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data);
// Return an error in one of the inputs.
if (memref.marker.type == TRACE_TYPE_MARKER &&
memref.marker.marker_type == TRACE_MARKER_TYPE_CPU_ID) {
shard->error = TOOL_ERROR_STRING;
return false;
}
return true;
}

private:
struct per_shard_t {
std::string error;
};
};

std::vector<analysis_tool_t *> tools;
auto test_tool = std::unique_ptr<error_tool_t>(new error_tool_t);
tools.push_back(test_tool.get());
mock_analyzer_t analyzer(sched_inputs, &tools[0], (int)tools.size(),
/*parallel=*/true, NUM_OUTPUTS, &sched_ops);
assert(!!analyzer);
// If the analyzer doesn't give up the input in the output stream that
// encounters it, the scheduler will hang waiting for that input,
// so failure in this test would be a CTest timeout.
bool res = analyzer.run();
assert(!res);
assert(analyzer.get_error_string() == TOOL_ERROR_STRING);
return true;
}

int
test_main(int argc, const char *argv[])
{
if (!test_queries() || !test_wait_records())
if (!test_queries() || !test_wait_records() || !test_tool_errors())
return 1;
std::cerr << "All done!\n";
return 0;
Expand Down

0 comments on commit afdc470

Please sign in to comment.