Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

i#6666: Avoid hang when analyzer worker returns early #6667

Merged
merged 3 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 38 additions & 12 deletions clients/drcachesim/analyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler_common(
worker_count_ = 1;
output_count = 1;
}
sched_mapping_ = options.mapping;
if (scheduler_.init(sched_inputs, output_count, std::move(sched_ops)) !=
sched_type_t::STATUS_SUCCESS) {
ERRMSG("Failed to initialize scheduler: %s\n",
Expand Down Expand Up @@ -585,8 +586,9 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_shard_exit(
}

template <typename RecordType, typename ReaderType>
void
analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *worker)
bool
analyzer_tmpl_t<RecordType, ReaderType>::process_tasks_internal(
analyzer_worker_data_t *worker)
{
std::vector<void *> user_worker_data(num_tools_);

Expand Down Expand Up @@ -622,7 +624,7 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
worker->error =
"Failed to read from trace: " + worker->stream->get_stream_name();
}
return;
return false;
}
int shard_index = shard_type_ == SHARD_BY_CORE
? worker->index
Expand Down Expand Up @@ -655,7 +657,7 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
record_is_instr(record)) &&
!process_interval(prev_interval_index, prev_interval_init_instr_count, worker,
/*parallel=*/true, record_is_instr(record), shard_index)) {
return;
return false;
}
for (int i = 0; i < num_tools_; ++i) {
if (!tools_[i]->parallel_shard_memref(
Expand All @@ -665,24 +667,27 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
VPRINT(this, 1, "Worker %d hit shard memref error %s on trace shard %s\n",
worker->index, worker->error.c_str(),
worker->stream->get_stream_name().c_str());
return;
return false;
}
}
if (record_is_thread_final(record) && shard_type_ != SHARD_BY_CORE) {
if (!process_shard_exit(worker, shard_index))
return;
if (!process_shard_exit(worker, shard_index)) {
return false;
}
}
}
if (shard_type_ == SHARD_BY_CORE) {
if (worker->shard_data.find(worker->index) != worker->shard_data.end()) {
if (!process_shard_exit(worker, worker->index))
return;
if (!process_shard_exit(worker, worker->index)) {
return false;
}
}
}
for (const auto &keyval : worker->shard_data) {
if (!keyval.second.exited) {
if (!process_shard_exit(worker, keyval.second.shard_index))
return;
if (!process_shard_exit(worker, keyval.second.shard_index)) {
return false;
}
}
}
for (int i = 0; i < num_tools_; ++i) {
Expand All @@ -691,7 +696,28 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
worker->error = error;
VPRINT(this, 1, "Worker %d hit worker exit error %s\n", worker->index,
error.c_str());
return;
return false;
}
}
return true;
}

template <typename RecordType, typename ReaderType>
void
analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *worker)
{
if (!process_tasks_internal(worker)) {
if (sched_mapping_ == sched_type_t::MAP_TO_ANY_OUTPUT) {
// Avoid a hang in the scheduler if we leave our current input stranded.
// XXX: Better to just do a global exit and not let the other threads
// keep running? That breaks the current model where errors are
// propagated to the user to decide what to do.
// We could perhaps add thread synch points to have other threads
// exit earlier: but maybe some uses cases consider one shard error
// to not affect others and not be fatal?
if (worker->stream->set_active(false) != sched_type_t::STATUS_OK) {
ERRMSG("Failed to set failing worker to inactive; may hang");
}
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion clients/drcachesim/analyzer.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* **********************************************************
* Copyright (c) 2016-2023 Google, Inc. All rights reserved.
* Copyright (c) 2016-2024 Google, Inc. All rights reserved.
* **********************************************************/

/*
Expand Down Expand Up @@ -239,6 +239,10 @@ template <typename RecordType, typename ReaderType> class analyzer_tmpl_t {
void
process_serial(analyzer_worker_data_t &worker);

// Helper for process_tasks().
bool
process_tasks_internal(analyzer_worker_data_t *worker);

// Helper for process_tasks() which calls parallel_shard_exit() in each tool.
// Returns false if there was an error and the caller should return early.
bool
Expand Down Expand Up @@ -421,6 +425,7 @@ template <typename RecordType, typename ReaderType> class analyzer_tmpl_t {
int verbosity_ = 0;
shard_type_t shard_type_ = SHARD_BY_THREAD;
bool sched_by_time_ = false;
typename sched_type_t::mapping_t sched_mapping_ = sched_type_t::MAP_TO_ANY_OUTPUT;

private:
bool
Expand Down
117 changes: 115 additions & 2 deletions clients/drcachesim/tests/analysis_unit_tests.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* **********************************************************
* Copyright (c) 2023 Google, Inc. All rights reserved.
* Copyright (c) 2023-2024 Google, Inc. All rights reserved.
* **********************************************************/

/*
Expand Down Expand Up @@ -76,6 +76,7 @@ class mock_analyzer_t : public analyzer_t {
sched_ops = scheduler_t::make_scheduler_parallel_options(verbosity_);
else
sched_ops = scheduler_t::make_scheduler_serial_options(verbosity_);
sched_mapping_ = sched_ops.mapping;
if (scheduler_.init(sched_inputs, worker_count_, std::move(sched_ops)) !=
sched_type_t::STATUS_SUCCESS) {
assert(false);
Expand Down Expand Up @@ -366,10 +367,122 @@ test_wait_records()
return true;
}

bool
test_tool_errors()
{
// Tool errors can hang the analyzer if it doesn't tell the scheduler
// it's giving up on its input. We test that here.
std::cerr << "\n----------------\nTesting tool errors\n";

static constexpr int NUM_INPUTS = 5;
static constexpr int NUM_OUTPUTS = 2;
static constexpr int NUM_INSTRS = 9;
static constexpr memref_tid_t TID_BASE = 100;
std::vector<trace_entry_t> inputs[NUM_INPUTS];
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
inputs[i].push_back(make_thread(tid));
inputs[i].push_back(make_pid(1));
for (int j = 0; j < NUM_INSTRS; j++)
inputs[i].push_back(make_instr(42 + j * 4));
if (i == 4) {
// This one input will trigger an error in our error_tool_t.
inputs[i].push_back(make_marker(TRACE_MARKER_TYPE_CPU_ID, 4));
}
inputs[i].push_back(make_exit(tid));
}

std::vector<scheduler_t::input_workload_t> sched_inputs;
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs[i])),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
sched_inputs.emplace_back(std::move(readers));
}
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_IGNORE,
scheduler_t::SCHEDULER_DEFAULTS,
/*verbosity=*/1);

static const char *const TOOL_ERROR_STRING = "cpuid not supported";

class error_tool_t : public analysis_tool_t {
public:
bool
process_memref(const memref_t &memref) override
{
assert(false); // Only expect parallel mode.
return false;
}
bool
print_results() override
{
return true;
}
bool
parallel_shard_supported() override
{
return true;
}
void *
parallel_shard_init_stream(int shard_index, void *worker_data,
memtrace_stream_t *stream) override
{
auto per_shard = new per_shard_t;
return reinterpret_cast<void *>(per_shard);
}
bool
parallel_shard_exit(void *shard_data) override
{
per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data);
delete shard;
return true;
}
std::string
parallel_shard_error(void *shard_data) override
{
per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data);
return shard->error;
}
bool
parallel_shard_memref(void *shard_data, const memref_t &memref) override
{
per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data);
// Return an error in one of the inputs.
if (memref.marker.type == TRACE_TYPE_MARKER &&
memref.marker.marker_type == TRACE_MARKER_TYPE_CPU_ID) {
shard->error = TOOL_ERROR_STRING;
return false;
}
return true;
}

private:
struct per_shard_t {
std::string error;
};
};

std::vector<analysis_tool_t *> tools;
auto test_tool = std::unique_ptr<error_tool_t>(new error_tool_t);
tools.push_back(test_tool.get());
mock_analyzer_t analyzer(sched_inputs, &tools[0], (int)tools.size(),
/*parallel=*/true, NUM_OUTPUTS, &sched_ops);
assert(!!analyzer);
// If the analyzer doesn't give up the input in the output stream that
// encounters it, the scheduler will hang waiting for that input,
// so failure in this test would be a CTest timeout.
bool res = analyzer.run();
assert(!res);
assert(analyzer.get_error_string() == TOOL_ERROR_STRING);
return true;
}

int
test_main(int argc, const char *argv[])
{
if (!test_queries() || !test_wait_records())
if (!test_queries() || !test_wait_records() || !test_tool_errors())
return 1;
std::cerr << "All done!\n";
return 0;
Expand Down
Loading