Skip to content

Commit

Permalink
Gracefully handle SIGINT and SIGTERM in rosbag2 recorder (#1301)
Browse files Browse the repository at this point in the history
* Gracefully handle SIGINT and SIGTERM signal for rosbag2 recorder

- Call recorder->stop() and exec_->cancel() instead of rclcpp::shutdown

Signed-off-by: Michael Orlov <[email protected]>

* Use singleton for static executor in the rosbag2_py::Recorder

Signed-off-by: Michael Orlov <[email protected]>

* Rollback to the non-static executor and don't call executor->cancel()

- In case when signal will arrive we will trigger our internal exit_
variable and wait while current exec_->spin_all(10msec) will exit.

Signed-off-by: Michael Orlov <[email protected]>

* Spin recorder node in a separate thread for better handling exit

- Run exec->spin() in a separate thread, because we need to call
exec->cancel() after recorder->stop() to be able to send notifications
about bag split and close.
- Wait on conditional variable for exit_ flag

Signed-off-by: Michael Orlov <[email protected]>

* Address race condition in rosbag2_py.test_record_cancel

- Add `record_thread.join()` before trying to parse metadata.

Signed-off-by: Michael Orlov <[email protected]>

---------

Signed-off-by: Michael Orlov <[email protected]>
(cherry picked from commit 46a23e9)

# Conflicts:
#	rosbag2_py/src/rosbag2_py/_transport.cpp
#	rosbag2_py/test/test_transport.py
  • Loading branch information
MichaelOrlov authored and mergify[bot] committed Jun 13, 2023
1 parent 7351423 commit 7af6ef9
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 11 deletions.
69 changes: 58 additions & 11 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,20 @@ class Player

class Recorder
{
private:
std::unique_ptr<rclcpp::executors::SingleThreadedExecutor> exec_;

public:
Recorder()
{
rclcpp::init(0, nullptr);
exec_ = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
auto init_options = rclcpp::InitOptions();
init_options.shutdown_on_signal = false;
rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None);

std::signal(
SIGTERM, [](int /* signal */) {
rclcpp::shutdown();
rosbag2_py::Recorder::cancel();
});
std::signal(
SIGINT, [](int /* signal */) {
rosbag2_py::Recorder::cancel();
});
}

Expand All @@ -164,6 +167,8 @@ class Recorder
const rosbag2_storage::StorageOptions & storage_options,
RecordOptions & record_options)
{
exit_ = false;
auto exec = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
if (record_options.rmw_serialization_format.empty()) {
record_options.rmw_serialization_format = std::string(rmw_get_serialization_format());
}
Expand All @@ -173,20 +178,55 @@ class Recorder
std::move(writer), storage_options, record_options);
recorder->record();

exec_->add_node(recorder);
// Release the GIL for long-running record, so that calling Python code can use other threads
exec->add_node(recorder);
// Run exec->spin() in a separate thread, because we need to call exec->cancel() after
// recorder->stop() to be able to send notifications about bag split and close.
auto spin_thread = std::thread(
[&exec]() {
exec->spin();
});
{
// Release the GIL for long-running record, so that calling Python code can use other threads
py::gil_scoped_release release;
exec_->spin();
std::unique_lock<std::mutex> lock(wait_for_exit_mutex_);
wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Recorder::exit_.load();});
recorder->stop();
}
exec->cancel();
if (spin_thread.joinable()) {
spin_thread.join();
}
exec->remove_node(recorder);
}

void cancel()
static void cancel()
{
exec_->cancel();
exit_ = true;
wait_for_exit_cv_.notify_all();
}

protected:
static std::atomic_bool exit_;
static std::condition_variable wait_for_exit_cv_;
std::mutex wait_for_exit_mutex_;
};

<<<<<<< HEAD
=======
std::atomic_bool Recorder::exit_{false};
std::condition_variable Recorder::wait_for_exit_cv_{};

// Return a RecordOptions struct with defaults set for rewriting bags.
rosbag2_transport::RecordOptions bag_rewrite_default_record_options()
{
rosbag2_transport::RecordOptions options{};
// We never want to drop messages when converting bags, so set the compression queue size to 0
// (unbounded).
options.compression_queue_size = 0;
return options;
}

>>>>>>> 46a23e9 (Gracefully handle SIGINT and SIGTERM in rosbag2 recorder (#1301))
// Simple wrapper to read the output config YAML into structs
void bag_rewrite(
const std::vector<rosbag2_storage::StorageOptions> & input_options,
Expand Down Expand Up @@ -283,8 +323,15 @@ PYBIND11_MODULE(_transport, m) {

py::class_<rosbag2_py::Recorder>(m, "Recorder")
.def(py::init())
<<<<<<< HEAD
.def("record", &rosbag2_py::Recorder::record)
.def("cancel", &rosbag2_py::Recorder::cancel)
=======
.def(
"record", &rosbag2_py::Recorder::record, py::arg("storage_options"), py::arg("record_options"),
py::arg("node_name") = "rosbag2_recorder")
.def_static("cancel", &rosbag2_py::Recorder::cancel)
>>>>>>> 46a23e9 (Gracefully handle SIGINT and SIGTERM in rosbag2 recorder (#1301))
;

m.def(
Expand Down
12 changes: 12 additions & 0 deletions rosbag2_py/test/test_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,19 @@ def test_record_cancel(tmp_path):

recorder.cancel()

<<<<<<< HEAD
metadata_path = Path(bag_path) / 'metadata.yaml'
db3_path = Path(bag_path) / 'test_record_cancel_0.db3'
assert wait_for(lambda: metadata_path.is_file() and db3_path.is_file(),
=======
metadata_io = rosbag2_py.MetadataIo()
assert wait_for(lambda: metadata_io.metadata_file_exists(bag_path),
timeout=rclpy.duration.Duration(seconds=3))
record_thread.join()

metadata = metadata_io.read_metadata(bag_path)
assert(len(metadata.relative_file_paths))
storage_path = Path(metadata.relative_file_paths[0])
assert wait_for(lambda: storage_path.is_file(),
>>>>>>> 46a23e9 (Gracefully handle SIGINT and SIGTERM in rosbag2 recorder (#1301))
timeout=rclpy.duration.Duration(seconds=3))

0 comments on commit 7af6ef9

Please sign in to comment.