Skip to content

Commit

Permalink
Merge pull request #367 from JeffersonLab/nbrei_eventproc_beginrun_fix
Browse files Browse the repository at this point in the history
Bugfix: JEventProcessor::Process() called before BeginRun()
  • Loading branch information
nathanwbrei authored Sep 25, 2024
2 parents efad5ff + 0fdc68f commit 95e6b8d
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 27 deletions.
6 changes: 5 additions & 1 deletion src/libraries/JANA/Components/JHasOutputs.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct JHasOutputs {
template <typename T>
class Output : public OutputBase {
std::vector<T*> m_data;
bool is_not_owner = false;

public:
Output(JHasOutputs* owner, std::string default_tag_name="") {
Expand All @@ -43,9 +44,12 @@ struct JHasOutputs {

protected:
void InsertCollection(JEvent& event) override {
event.Insert(m_data, this->collection_names[0]);
auto fac = event.Insert(m_data, this->collection_names[0]);
fac->SetIsNotOwnerFlag(is_not_owner);
}
void Reset() override { }

void SetIsNotOwnerFlag(bool not_owner=true) { is_not_owner = not_owner; }
};


Expand Down
48 changes: 22 additions & 26 deletions src/libraries/JANA/JEventProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <JANA/Components/JHasInputs.h>
#include <JANA/Components/JHasRunCallbacks.h>
#include <JANA/JEvent.h>
#include <mutex>

class JApplication;

Expand All @@ -30,10 +31,9 @@ class JEventProcessor : public jana::components::JComponent,

uint64_t GetEventCount() const { return m_event_count; };

bool AreEventsOrdered() const { return m_receive_events_in_order; }


virtual void DoInitialize() {
std::lock_guard<std::mutex> lock(m_mutex);
for (auto* parameter : m_parameters) {
parameter->Configure(*(m_app->GetJParameterManager()), m_prefix);
}
Expand Down Expand Up @@ -109,8 +109,12 @@ class JEventProcessor : public jana::components::JComponent,

virtual void DoLegacyProcess(const std::shared_ptr<const JEvent>& event) {

// DoLegacyProcess doesn't hold any locks, as it requires the user to hold a lock for it.
// Because of this,
// DoLegacyProcess holds a lock to make sure that {Begin,Change,End}Run() are always called before Process().
// Note that in LegacyMode, Process() requires the user to manage a _separate_ lock for its critical section.
// This arrangement means that {Begin,Change,End}Run() will definitely be called at least once before `Process`, but there
// may be races when there are multiple run numbers present in the stream. This isn't a problem in practice for now,
// but future work should use ExpertMode or DeclarativeMode for this reason (but also for the usability improvements!)

if (m_callback_style != CallbackStyle::LegacyMode) {
throw JException("Called DoLegacyProcess() on a non-legacy-mode JEventProcessor");
}
Expand All @@ -123,15 +127,19 @@ class JEventProcessor : public jana::components::JComponent,
else if (m_status == Status::Finalized) {
throw JException("JEventProcessor: Attempted to call DoMap() after Finalize()");
}
if (m_last_run_number != run_number) {
if (m_last_run_number != -1) {
CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); });
{
// Protect the call to BeginRun(), etc, to prevent some threads from running Process() before BeginRun().
std::lock_guard<std::mutex> lock(m_mutex);
if (m_last_run_number != run_number) {
if (m_last_run_number != -1) {
CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); });
}
for (auto* resource : m_resources) {
resource->ChangeRun(event->GetRunNumber(), m_app);
}
m_last_run_number = run_number;
CallWithJExceptionWrapper("JEventProcessor::BeginRun", [&](){ BeginRun(event); });
}
for (auto* resource : m_resources) {
resource->ChangeRun(event->GetRunNumber(), m_app);
}
m_last_run_number = run_number;
CallWithJExceptionWrapper("JEventProcessor::BeginRun", [&](){ BeginRun(event); });
}
CallWithJExceptionWrapper("JEventProcessor::Process", [&](){ Process(event); });
m_event_count += 1;
Expand Down Expand Up @@ -170,33 +178,29 @@ class JEventProcessor : public jana::components::JComponent,
// LegacyMode-specific callbacks

virtual void Process(const std::shared_ptr<const JEvent>& /*event*/) {
throw JException("Not implemented yet!");
}

// ExpertMode-specific callbacks

virtual void ProcessParallel(const JEvent& /*event*/) {
}

virtual void Process(const JEvent& /*event*/) {
throw JException("Not implemented yet!");
}

// DeclarativeMode-specific callbacks

virtual void ProcessParallel(int64_t /*run_nr*/, uint64_t /*event_nr*/, uint64_t /*event_idx*/) {
throw JException("Not implemented yet!");
}

virtual void Process(int64_t /*run_nr*/, uint64_t /*event_nr*/, uint64_t /*event_idx*/) {
throw JException("Not implemented yet!");
}


virtual void Finish() {}


// TODO: Deprecate
[[deprecated]]
virtual std::string GetType() const {
return m_type_name;
}
Expand All @@ -217,18 +221,10 @@ class JEventProcessor : public jana::components::JComponent,

// void SetResourceName(std::string resource_name) { m_resource_name = std::move(resource_name); }

/// SetEventsOrdered allows the user to tell the parallelization engine that it needs to see
/// the event stream ordered by increasing event IDs. (Note that this requires all EventSources
/// emit event IDs which are consecutive.) Ordering by event ID makes for cleaner output, but comes
/// with a performance penalty, so it is best if this is enabled during debugging, and disabled otherwise.

// void SetEventsOrdered(bool receive_events_in_order) { m_receive_events_in_order = receive_events_in_order; }


private:
std::string m_resource_name;
std::atomic_ullong m_event_count {0};
bool m_receive_events_in_order = false;

};

Expand Down

0 comments on commit 95e6b8d

Please sign in to comment.