Skip to content

Commit

Permalink
Fix for rosbag2_transport::Recorder failures due to the unhandled exc…
Browse files Browse the repository at this point in the history
…eptions (#1382)

* Fix for rosbag2_transport::Recorder failures due to unhandled exceptions

Signed-off-by: Michael Orlov <[email protected]>

* Revert "Fix for rosbag2_transport::Recorder failures due to unhandled exceptions"

This reverts commit 767d440.

Signed-off-by: Michael Orlov <[email protected]>

* Handle exceptions in event publisher and discovery threads

- Exceptions potentially may come from the underlying node operations
when we are invalidating context via rclcpp::shutdown(). We need to have
possibility to correct finish recording in this case.

Signed-off-by: Michael Orlov <[email protected]>

---------

Signed-off-by: Michael Orlov <[email protected]>
  • Loading branch information
MichaelOrlov authored Jun 17, 2023
1 parent 46a23e9 commit bb8d2a5
Showing 1 changed file with 33 additions and 15 deletions.
48 changes: 33 additions & 15 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,12 @@ void RecorderImpl::record()
response->paused = is_paused();
});

split_event_pub_ =
node->create_publisher<rosbag2_interfaces::msg::WriteSplitEvent>("events/write_split", 1);

// Start the thread that will publish events
event_publisher_thread_ = std::thread(&RecorderImpl::event_publisher_thread_main, this);

split_event_pub_ =
node->create_publisher<rosbag2_interfaces::msg::WriteSplitEvent>("events/write_split", 1);
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[this](rosbag2_cpp::bag_events::BagSplitInfo & info) {
Expand Down Expand Up @@ -336,7 +337,17 @@ void RecorderImpl::event_publisher_thread_main()
auto message = rosbag2_interfaces::msg::WriteSplitEvent();
message.closed_file = bag_split_info_.closed_file;
message.opened_file = bag_split_info_.opened_file;
split_event_pub_->publish(message);
try {
split_event_pub_->publish(message);
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(
node->get_logger(),
"Failed to publish message on '/events/write_split' topic. \nError: " << e.what());
} catch (...) {
RCLCPP_ERROR_STREAM(
node->get_logger(),
"Failed to publish message on '/events/write_split' topic.");
}
}
}
RCLCPP_INFO(node->get_logger(), "Event publisher thread: Exiting");
Expand Down Expand Up @@ -395,19 +406,26 @@ void RecorderImpl::topics_discovery()
}
}
while (rclcpp::ok() && stop_discovery_ == false) {
auto topics_to_subscribe =
get_requested_or_available_topics();
for (const auto & topic_and_type : topics_to_subscribe) {
warn_if_new_qos_for_subscribed_topic(topic_and_type.first);
}
auto missing_topics = get_missing_topics(topics_to_subscribe);
subscribe_topics(missing_topics);
try {
auto topics_to_subscribe = get_requested_or_available_topics();
for (const auto & topic_and_type : topics_to_subscribe) {
warn_if_new_qos_for_subscribed_topic(topic_and_type.first);
}
auto missing_topics = get_missing_topics(topics_to_subscribe);
subscribe_topics(missing_topics);

if (!record_options_.topics.empty() && subscriptions_.size() == record_options_.topics.size()) {
RCLCPP_INFO(
node->get_logger(),
"All requested topics are subscribed. Stopping discovery...");
return;
if (!record_options_.topics.empty() &&
subscriptions_.size() == record_options_.topics.size())
{
RCLCPP_INFO(
node->get_logger(),
"All requested topics are subscribed. Stopping discovery...");
return;
}
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(node->get_logger(), "Failure in topics discovery.\nError: " << e.what());
} catch (...) {
RCLCPP_ERROR_STREAM(node->get_logger(), "Failure in topics discovery.");
}
std::this_thread::sleep_for(record_options_.topic_polling_interval);
}
Expand Down

0 comments on commit bb8d2a5

Please sign in to comment.