diff --git a/clients/drcachesim/analyzer.cpp b/clients/drcachesim/analyzer.cpp index 7829a11ee9b..12916f1e3b0 100644 --- a/clients/drcachesim/analyzer.cpp +++ b/clients/drcachesim/analyzer.cpp @@ -317,6 +317,7 @@ analyzer_tmpl_t::init_scheduler_common( worker_count_ = 1; output_count = 1; } + sched_mapping_ = options.mapping; if (scheduler_.init(sched_inputs, output_count, std::move(sched_ops)) != sched_type_t::STATUS_SUCCESS) { ERRMSG("Failed to initialize scheduler: %s\n", @@ -585,8 +586,9 @@ analyzer_tmpl_t::process_shard_exit( } template -void -analyzer_tmpl_t::process_tasks(analyzer_worker_data_t *worker) +bool +analyzer_tmpl_t::process_tasks_internal( + analyzer_worker_data_t *worker) { std::vector user_worker_data(num_tools_); @@ -622,7 +624,7 @@ analyzer_tmpl_t::process_tasks(analyzer_worker_data_t *w worker->error = "Failed to read from trace: " + worker->stream->get_stream_name(); } - return; + return false; } int shard_index = shard_type_ == SHARD_BY_CORE ? worker->index @@ -655,7 +657,7 @@ analyzer_tmpl_t::process_tasks(analyzer_worker_data_t *w record_is_instr(record)) && !process_interval(prev_interval_index, prev_interval_init_instr_count, worker, /*parallel=*/true, record_is_instr(record), shard_index)) { - return; + return false; } for (int i = 0; i < num_tools_; ++i) { if (!tools_[i]->parallel_shard_memref( @@ -665,24 +667,27 @@ analyzer_tmpl_t::process_tasks(analyzer_worker_data_t *w VPRINT(this, 1, "Worker %d hit shard memref error %s on trace shard %s\n", worker->index, worker->error.c_str(), worker->stream->get_stream_name().c_str()); - return; + return false; } } if (record_is_thread_final(record) && shard_type_ != SHARD_BY_CORE) { - if (!process_shard_exit(worker, shard_index)) - return; + if (!process_shard_exit(worker, shard_index)) { + return false; + } } } if (shard_type_ == SHARD_BY_CORE) { if (worker->shard_data.find(worker->index) != worker->shard_data.end()) { - if (!process_shard_exit(worker, worker->index)) - return; + if (!process_shard_exit(worker, worker->index)) { + return false; + } } } for (const auto &keyval : worker->shard_data) { if (!keyval.second.exited) { - if (!process_shard_exit(worker, keyval.second.shard_index)) - return; + if (!process_shard_exit(worker, keyval.second.shard_index)) { + return false; + } } } for (int i = 0; i < num_tools_; ++i) { @@ -691,7 +696,28 @@ analyzer_tmpl_t::process_tasks(analyzer_worker_data_t *w worker->error = error; VPRINT(this, 1, "Worker %d hit worker exit error %s\n", worker->index, error.c_str()); - return; + return false; + } + } + return true; +} + +template +void +analyzer_tmpl_t::process_tasks(analyzer_worker_data_t *worker) +{ + if (!process_tasks_internal(worker)) { + if (sched_mapping_ == sched_type_t::MAP_TO_ANY_OUTPUT) { + // Avoid a hang in the scheduler if we leave our current input stranded. + // XXX: Better to just do a global exit and not let the other threads + // keep running? That breaks the current model where errors are + // propagated to the user to decide what to do. + // We could perhaps add thread synch points to have other threads + // exit earlier: but maybe some uses cases consider one shard error + // to not affect others and not be fatal? + if (worker->stream->set_active(false) != sched_type_t::STATUS_OK) { + ERRMSG("Failed to set failing worker to inactive; may hang"); + } } } } diff --git a/clients/drcachesim/analyzer.h b/clients/drcachesim/analyzer.h index 87857de7b7b..964159c6aee 100644 --- a/clients/drcachesim/analyzer.h +++ b/clients/drcachesim/analyzer.h @@ -1,5 +1,5 @@ /* ********************************************************** - * Copyright (c) 2016-2023 Google, Inc. All rights reserved. + * Copyright (c) 2016-2024 Google, Inc. All rights reserved. * **********************************************************/ /* @@ -239,6 +239,10 @@ template class analyzer_tmpl_t { void process_serial(analyzer_worker_data_t &worker); + // Helper for process_tasks(). + bool + process_tasks_internal(analyzer_worker_data_t *worker); + // Helper for process_tasks() which calls parallel_shard_exit() in each tool. // Returns false if there was an error and the caller should return early. bool @@ -421,6 +425,7 @@ template class analyzer_tmpl_t { int verbosity_ = 0; shard_type_t shard_type_ = SHARD_BY_THREAD; bool sched_by_time_ = false; + typename sched_type_t::mapping_t sched_mapping_ = sched_type_t::MAP_TO_ANY_OUTPUT; private: bool diff --git a/clients/drcachesim/tests/analysis_unit_tests.cpp b/clients/drcachesim/tests/analysis_unit_tests.cpp index aefb6e78b5d..1329cd217c9 100644 --- a/clients/drcachesim/tests/analysis_unit_tests.cpp +++ b/clients/drcachesim/tests/analysis_unit_tests.cpp @@ -1,5 +1,5 @@ /* ********************************************************** - * Copyright (c) 2023 Google, Inc. All rights reserved. + * Copyright (c) 2023-2024 Google, Inc. All rights reserved. * **********************************************************/ /* @@ -76,6 +76,7 @@ class mock_analyzer_t : public analyzer_t { sched_ops = scheduler_t::make_scheduler_parallel_options(verbosity_); else sched_ops = scheduler_t::make_scheduler_serial_options(verbosity_); + sched_mapping_ = sched_ops.mapping; if (scheduler_.init(sched_inputs, worker_count_, std::move(sched_ops)) != sched_type_t::STATUS_SUCCESS) { assert(false); @@ -366,10 +367,122 @@ test_wait_records() return true; } +bool +test_tool_errors() +{ + // Tool errors can hang the analyzer if it doesn't tell the scheduler + // it's giving up on its input. We test that here. + std::cerr << "\n----------------\nTesting tool errors\n"; + + static constexpr int NUM_INPUTS = 5; + static constexpr int NUM_OUTPUTS = 2; + static constexpr int NUM_INSTRS = 9; + static constexpr memref_tid_t TID_BASE = 100; + std::vector inputs[NUM_INPUTS]; + for (int i = 0; i < NUM_INPUTS; i++) { + memref_tid_t tid = TID_BASE + i; + inputs[i].push_back(make_thread(tid)); + inputs[i].push_back(make_pid(1)); + for (int j = 0; j < NUM_INSTRS; j++) + inputs[i].push_back(make_instr(42 + j * 4)); + if (i == 4) { + // This one input will trigger an error in our error_tool_t. + inputs[i].push_back(make_marker(TRACE_MARKER_TYPE_CPU_ID, 4)); + } + inputs[i].push_back(make_exit(tid)); + } + + std::vector sched_inputs; + for (int i = 0; i < NUM_INPUTS; i++) { + memref_tid_t tid = TID_BASE + i; + std::vector readers; + readers.emplace_back(std::unique_ptr(new mock_reader_t(inputs[i])), + std::unique_ptr(new mock_reader_t()), tid); + sched_inputs.emplace_back(std::move(readers)); + } + scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT, + scheduler_t::DEPENDENCY_IGNORE, + scheduler_t::SCHEDULER_DEFAULTS, + /*verbosity=*/1); + + static const char *const TOOL_ERROR_STRING = "cpuid not supported"; + + class error_tool_t : public analysis_tool_t { + public: + bool + process_memref(const memref_t &memref) override + { + assert(false); // Only expect parallel mode. + return false; + } + bool + print_results() override + { + return true; + } + bool + parallel_shard_supported() override + { + return true; + } + void * + parallel_shard_init_stream(int shard_index, void *worker_data, + memtrace_stream_t *stream) override + { + auto per_shard = new per_shard_t; + return reinterpret_cast(per_shard); + } + bool + parallel_shard_exit(void *shard_data) override + { + per_shard_t *shard = reinterpret_cast(shard_data); + delete shard; + return true; + } + std::string + parallel_shard_error(void *shard_data) override + { + per_shard_t *shard = reinterpret_cast(shard_data); + return shard->error; + } + bool + parallel_shard_memref(void *shard_data, const memref_t &memref) override + { + per_shard_t *shard = reinterpret_cast(shard_data); + // Return an error in one of the inputs. + if (memref.marker.type == TRACE_TYPE_MARKER && + memref.marker.marker_type == TRACE_MARKER_TYPE_CPU_ID) { + shard->error = TOOL_ERROR_STRING; + return false; + } + return true; + } + + private: + struct per_shard_t { + std::string error; + }; + }; + + std::vector tools; + auto test_tool = std::unique_ptr(new error_tool_t); + tools.push_back(test_tool.get()); + mock_analyzer_t analyzer(sched_inputs, &tools[0], (int)tools.size(), + /*parallel=*/true, NUM_OUTPUTS, &sched_ops); + assert(!!analyzer); + // If the analyzer doesn't give up the input in the output stream that + // encounters it, the scheduler will hang waiting for that input, + // so failure in this test would be a CTest timeout. + bool res = analyzer.run(); + assert(!res); + assert(analyzer.get_error_string() == TOOL_ERROR_STRING); + return true; +} + int test_main(int argc, const char *argv[]) { - if (!test_queries() || !test_wait_records()) + if (!test_queries() || !test_wait_records() || !test_tool_errors()) return 1; std::cerr << "All done!\n"; return 0;