Skip to content

Commit

Permalink
Merge branch 'master' into i6643-finalize-interval-tool-api
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinav92003 authored Feb 22, 2024
2 parents 8bb4622 + 4bf1163 commit e443a7a
Show file tree
Hide file tree
Showing 16 changed files with 296 additions and 109 deletions.
5 changes: 5 additions & 0 deletions api/docs/release.dox
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ Further non-compatibility-affecting changes include:
- Added #dynamorio::drmemtrace::TRACE_MARKER_TYPE_VECTOR_LENGTH marker to indicate the
current vector length for architectures with a hardware defined or runtime changeable
vector length (such as AArch64's SVE scalable vectors).
- Renamed a protected data member in #dynamorio::drmemtrace::analyzer_tmpl_t from
merged_interval_snapshots_ to whole_trace_interval_snapshots_ (may be relevant for
users sub-classing analyzer_tmpl_t).
- Converted #dynamorio::drmemtrace::analysis_tool_tmpl_t::interval_state_snapshot_t
into a class with all its data members marked private with public accessor functions.

**************************************************
<hr>
Expand Down
70 changes: 53 additions & 17 deletions clients/drcachesim/analysis_tool.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,52 +197,88 @@ template <typename RecordType> class analysis_tool_tmpl_t {
* framework automatically, and must not be modified by the tool at any point.
* XXX: Perhaps this should be a class with private data members.
*/
struct interval_state_snapshot_t {
class interval_state_snapshot_t {
// Allow the analyzer framework access to private data members to set them
// during trace interval analysis. Tools have read-only access via the public
// accessor functions.
// Note that we expect X to be same as RecordType. But friend declarations
// cannot refer to partial specializations so we go with the separate template
// parameter X.
template <typename X, typename Y> friend class analyzer_tmpl_t;

public:
// This constructor is only for convenience in unit tests. The tool does not
// need to provide these values, and can simply use the default constructor
// below.
interval_state_snapshot_t(int64_t shard_id, uint64_t interval_id,
uint64_t interval_end_timestamp,
uint64_t instr_count_cumulative,
uint64_t instr_count_delta)
: shard_id(shard_id)
, interval_id(interval_id)
, interval_end_timestamp(interval_end_timestamp)
, instr_count_cumulative(instr_count_cumulative)
, instr_count_delta(instr_count_delta)
: shard_id_(shard_id)
, interval_id_(interval_id)
, interval_end_timestamp_(interval_end_timestamp)
, instr_count_cumulative_(instr_count_cumulative)
, instr_count_delta_(instr_count_delta)
{
}
interval_state_snapshot_t()
{
}
virtual ~interval_state_snapshot_t() = default;

int64_t
get_shard_id() const
{
return shard_id_;
}
uint64_t
get_interval_id() const
{
return interval_id_;
}
uint64_t
get_interval_end_timestamp() const
{
return interval_end_timestamp_;
}
uint64_t
get_instr_count_cumulative() const
{
return instr_count_cumulative_;
}
uint64_t
get_instr_count_delta() const
{
return instr_count_delta_;
}

static constexpr int64_t WHOLE_TRACE_SHARD_ID = -1;

private:
// The following fields are set automatically by the analyzer framework after
// the tool returns the interval_state_snapshot_t* in the
// generate_*interval_snapshot APIs. So they'll be available to the tool in
// the finalize_interval_snapshots(), combine_interval_snapshots(), and
// print_interval_results() APIs.
// the finalize_interval_snapshots(), combine_interval_snapshots() (for the
// parameter snapshots), and print_interval_results() APIs via the above
// public accessor functions.

// Identifier for the shard to which this interval belongs. Currently, shards
// map only to threads, so this is the thread id. Set to WHOLE_TRACE_SHARD_ID
// for the whole trace interval snapshots.
int64_t shard_id = 0;
uint64_t interval_id = 0;
int64_t shard_id_ = 0;
uint64_t interval_id_ = 0;
// Stores the timestamp (exclusive) when the above interval ends. Note
// that this is not the last timestamp actually seen in the trace interval,
// but simply the abstract boundary of the interval. This will be aligned
// to the specified -interval_microseconds.
uint64_t interval_end_timestamp = 0;
uint64_t interval_end_timestamp_ = 0;

// Count of instructions: cumulative till this interval's end, and the
// incremental delta in this interval vs the previous one. May be useful for
// tools to compute PKI (per kilo instruction) metrics; obviates the need for
// each tool to duplicate this.
uint64_t instr_count_cumulative = 0;
uint64_t instr_count_delta = 0;

static constexpr int64_t WHOLE_TRACE_SHARD_ID = -1;

virtual ~interval_state_snapshot_t() = default;
uint64_t instr_count_cumulative_ = 0;
uint64_t instr_count_delta_ = 0;
};
/**
* Notifies the analysis tool that the given trace \p interval_id has ended so
Expand Down
89 changes: 57 additions & 32 deletions clients/drcachesim/analyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler_common(
worker_count_ = 1;
output_count = 1;
}
sched_mapping_ = options.mapping;
if (scheduler_.init(sched_inputs, output_count, std::move(sched_ops)) !=
sched_type_t::STATUS_SUCCESS) {
ERRMSG("Failed to initialize scheduler: %s\n",
Expand Down Expand Up @@ -587,8 +588,9 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_shard_exit(
}

template <typename RecordType, typename ReaderType>
void
analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *worker)
bool
analyzer_tmpl_t<RecordType, ReaderType>::process_tasks_internal(
analyzer_worker_data_t *worker)
{
std::vector<void *> user_worker_data(num_tools_);

Expand Down Expand Up @@ -624,7 +626,7 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
worker->error =
"Failed to read from trace: " + worker->stream->get_stream_name();
}
return;
return false;
}
int shard_index = shard_type_ == SHARD_BY_CORE
? worker->index
Expand Down Expand Up @@ -657,7 +659,7 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
record_is_instr(record)) &&
!process_interval(prev_interval_index, prev_interval_init_instr_count, worker,
/*parallel=*/true, record_is_instr(record), shard_index)) {
return;
return false;
}
for (int i = 0; i < num_tools_; ++i) {
if (!tools_[i]->parallel_shard_memref(
Expand All @@ -667,24 +669,27 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
VPRINT(this, 1, "Worker %d hit shard memref error %s on trace shard %s\n",
worker->index, worker->error.c_str(),
worker->stream->get_stream_name().c_str());
return;
return false;
}
}
if (record_is_thread_final(record) && shard_type_ != SHARD_BY_CORE) {
if (!process_shard_exit(worker, shard_index))
return;
if (!process_shard_exit(worker, shard_index)) {
return false;
}
}
}
if (shard_type_ == SHARD_BY_CORE) {
if (worker->shard_data.find(worker->index) != worker->shard_data.end()) {
if (!process_shard_exit(worker, worker->index))
return;
if (!process_shard_exit(worker, worker->index)) {
return false;
}
}
}
for (const auto &keyval : worker->shard_data) {
if (!keyval.second.exited) {
if (!process_shard_exit(worker, keyval.second.shard_index))
return;
if (!process_shard_exit(worker, keyval.second.shard_index)) {
return false;
}
}
}
for (int i = 0; i < num_tools_; ++i) {
Expand All @@ -693,7 +698,28 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
worker->error = error;
VPRINT(this, 1, "Worker %d hit worker exit error %s\n", worker->index,
error.c_str());
return;
return false;
}
}
return true;
}

template <typename RecordType, typename ReaderType>
void
analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *worker)
{
if (!process_tasks_internal(worker)) {
if (sched_mapping_ == sched_type_t::MAP_TO_ANY_OUTPUT) {
// Avoid a hang in the scheduler if we leave our current input stranded.
// XXX: Better to just do a global exit and not let the other threads
// keep running? That breaks the current model where errors are
// propagated to the user to decide what to do.
// We could perhaps add thread synch points to have other threads
// exit earlier: but maybe some uses cases consider one shard error
// to not affect others and not be fatal?
if (worker->stream->set_active(false) != sched_type_t::STATUS_OK) {
ERRMSG("Failed to set failing worker to inactive; may hang");
}
}
}
}
Expand All @@ -714,17 +740,17 @@ analyzer_tmpl_t<RecordType, ReaderType>::combine_interval_snapshots(
tools_[tool_idx]->get_error_string();
return false;
}
result->instr_count_delta = 0;
result->instr_count_cumulative = 0;
result->instr_count_delta_ = 0;
result->instr_count_cumulative_ = 0;
for (auto snapshot : latest_shard_snapshots) {
if (snapshot == nullptr)
continue;
// As discussed in the doc for analysis_tool_t::combine_interval_snapshots,
// we combine all shard's latest snapshots for cumulative metrics, whereas
// we combine only the shards active in current interval for delta metrics.
result->instr_count_cumulative += snapshot->instr_count_cumulative;
if (snapshot->interval_end_timestamp == interval_end_timestamp)
result->instr_count_delta += snapshot->instr_count_delta;
result->instr_count_cumulative_ += snapshot->instr_count_cumulative_;
if (snapshot->interval_end_timestamp_ == interval_end_timestamp)
result->instr_count_delta_ += snapshot->instr_count_delta_;
}
return true;
}
Expand Down Expand Up @@ -762,7 +788,7 @@ analyzer_tmpl_t<RecordType, ReaderType>::merge_shard_interval_results(
continue;
earliest_interval_end_timestamp =
std::min(earliest_interval_end_timestamp,
intervals[shard_idx][at_idx[shard_idx]]->interval_end_timestamp);
intervals[shard_idx][at_idx[shard_idx]]->interval_end_timestamp_);
}
// We're done if no shard has any interval left unprocessed.
if (earliest_interval_end_timestamp == std::numeric_limits<uint64_t>::max()) {
Expand All @@ -780,7 +806,7 @@ analyzer_tmpl_t<RecordType, ReaderType>::merge_shard_interval_results(
if (at_idx[shard_idx] == intervals[shard_idx].size())
continue;
uint64_t cur_interval_end_timestamp =
intervals[shard_idx][at_idx[shard_idx]]->interval_end_timestamp;
intervals[shard_idx][at_idx[shard_idx]]->interval_end_timestamp_;
assert(cur_interval_end_timestamp >= earliest_interval_end_timestamp);
if (cur_interval_end_timestamp > earliest_interval_end_timestamp)
continue;
Expand Down Expand Up @@ -811,10 +837,10 @@ analyzer_tmpl_t<RecordType, ReaderType>::merge_shard_interval_results(
cur_merged_interval))
return false;
// Add the merged interval to the result list of whole trace intervals.
cur_merged_interval->shard_id = analysis_tool_tmpl_t<
cur_merged_interval->shard_id_ = analysis_tool_tmpl_t<
RecordType>::interval_state_snapshot_t::WHOLE_TRACE_SHARD_ID;
cur_merged_interval->interval_end_timestamp = earliest_interval_end_timestamp;
cur_merged_interval->interval_id = compute_timestamp_interval_id(
cur_merged_interval->interval_end_timestamp_ = earliest_interval_end_timestamp;
cur_merged_interval->interval_id_ = compute_timestamp_interval_id(
earliest_ever_interval_end_timestamp, earliest_interval_end_timestamp);
merged_intervals.push_back(cur_merged_interval);
}
Expand Down Expand Up @@ -1080,31 +1106,30 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_interval(
return false;
}
if (snapshot != nullptr) {
snapshot->shard_id = parallel
snapshot->shard_id_ = parallel
? worker->shard_data[shard_idx].shard_id
: analysis_tool_tmpl_t<
RecordType>::interval_state_snapshot_t::WHOLE_TRACE_SHARD_ID;
snapshot->interval_id = interval_id;
snapshot->interval_id_ = interval_id;
if (interval_microseconds_ > 0) {
// For timestamp intervals, the interval_end_timestamp is the abstract
// non-inclusive end timestamp for the interval_id. This is to make it
// easier to line up the corresponding shard interval snapshots so that
// we can merge them to form the whole-trace interval snapshots.
snapshot->interval_end_timestamp = compute_interval_end_timestamp(
snapshot->interval_end_timestamp_ = compute_interval_end_timestamp(
worker->stream->get_first_timestamp(), interval_id);
} else {
snapshot->interval_end_timestamp = worker->stream->get_last_timestamp();
snapshot->interval_end_timestamp_ = worker->stream->get_last_timestamp();
}
// instr_count_cumulative for the interval snapshot is supposed to be
// inclusive, so if the first record after the interval (that is, the record
// we're at right now) is an instr, it must be subtracted.
snapshot->instr_count_cumulative =
snapshot->instr_count_cumulative_ =
worker->stream->get_instruction_ordinal() - (at_instr_record ? 1 : 0);
snapshot->instr_count_delta =
snapshot->instr_count_cumulative - interval_init_instr_count;
worker->shard_data[shard_idx]
.tool_data[tool_idx]
.interval_snapshot_data.push_back(snapshot);
snapshot->instr_count_delta_ =
snapshot->instr_count_cumulative_ - interval_init_instr_count;
worker->shard_data[shard_idx].tool_data[tool_idx].interval_snapshot_data.push(
snapshot);
}
}
return true;
Expand Down
5 changes: 5 additions & 0 deletions clients/drcachesim/analyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ template <typename RecordType, typename ReaderType> class analyzer_tmpl_t {
void
process_serial(analyzer_worker_data_t &worker);

// Helper for process_tasks().
bool
process_tasks_internal(analyzer_worker_data_t *worker);

// Helper for process_tasks() which calls parallel_shard_exit() in each tool.
// Returns false if there was an error and the caller should return early.
bool
Expand Down Expand Up @@ -421,6 +425,7 @@ template <typename RecordType, typename ReaderType> class analyzer_tmpl_t {
int verbosity_ = 0;
shard_type_t shard_type_ = SHARD_BY_THREAD;
bool sched_by_time_ = false;
typename sched_type_t::mapping_t sched_mapping_ = sched_type_t::MAP_TO_ANY_OUTPUT;

private:
bool
Expand Down
5 changes: 2 additions & 3 deletions clients/drcachesim/common/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -973,9 +973,8 @@ droption_t<uint64_t> op_trim_after_timestamp(
DROPTION_SCOPE_ALL, "trim_after_timestamp", (std::numeric_limits<uint64_t>::max)(), 0,
(std::numeric_limits<uint64_t>::max)(),
"Trim records after this timestamp (in us) in the trace.",
"Removes all records after the first TRACE_MARKER_TYPE_TIMESTAMP marker with "
"timestamp larger than the specified value (keeps a TRACE_MARKER_TYPE_CPU_ID "
"immediately following the transition timestamp).");
"Removes all records from the first TRACE_MARKER_TYPE_TIMESTAMP marker with "
"timestamp larger than the specified value.");

} // namespace drmemtrace
} // namespace dynamorio
Loading

0 comments on commit e443a7a

Please sign in to comment.