From 86bc1e253eedb2748937768ec4ed2c4923feeaaa Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Tue, 20 Aug 2024 22:41:13 -0400 Subject: [PATCH 1/2] Publisher dequeue is now more fair Fixes #106. --- src/cactus_rt/ros2/ros2_adapter.cc | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/cactus_rt/ros2/ros2_adapter.cc b/src/cactus_rt/ros2/ros2_adapter.cc index a1d0fbc..5159edf 100644 --- a/src/cactus_rt/ros2/ros2_adapter.cc +++ b/src/cactus_rt/ros2/ros2_adapter.cc @@ -16,13 +16,15 @@ void Ros2Adapter::TimerCallback() { void Ros2Adapter::DrainQueues() { const std::scoped_lock lock(mut_); - for (const auto& publisher : publishers_) { - // Hopefully the thread is not publishing so quickly that a single - // publisher monopolizes all resources. That said, if that happens the - // program is likely in bigger trouble anyway. - // - // TODO: make it so we dequeue once. - publisher->FullyDrainAndPublishToRos(); + bool has_data = true; + while (has_data) { + has_data = false; + + for (const auto& publisher : publishers_) { + if (publisher->DequeueAndPublishToRos()) { + has_data = true; + } + } } } From 1042897a9c615b8efe278b7b71c7c81a40d2c4f0 Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Tue, 20 Aug 2024 23:09:08 -0400 Subject: [PATCH 2/2] Drain queue on exit It seems like publishing a ROS message is not passing the data to the executor so the executor sends the data during `spin_once`. That said, it might not be synchronous either as this is (middleware-)implementation defined. See [`rmw_publish_loaned_message`][1]. It seems like by default, FastDDS uses an async mode which uses an internal thread to send the data. That's fine with this code I think and we can basically treat that once we call publish, the message is basically sent. Thus, instead of `spin_once` after stop is requested (due to signals), we simply drain the queue directly from `Ros2ExecutorThread`. This requires making `Ros2ExecutorThread` to be a friend of `Ros2Adapter`. There's no issues with thread (as we are using a SPSC queue for the published message) because the `TimerCallback` of `Ros2Adapter` is executing as a part of `Ros2ExecutorThread` anyway. Moving it out of `TimerCallback` and into `Ros2ExecutorThread` is no different from a threading perspective. Fixes #104. [1]: https://docs.ros.org/en/jazzy/p/rmw/generated/function_rmw_8h_1ab01da69d8613952343abd5d65107399a.html --- include/cactus_rt/ros2/ros2_adapter.h | 4 ++++ src/cactus_rt/ros2/app.cc | 3 +-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/include/cactus_rt/ros2/ros2_adapter.h b/include/cactus_rt/ros2/ros2_adapter.h index 1899872..dd1fee7 100644 --- a/include/cactus_rt/ros2/ros2_adapter.h +++ b/include/cactus_rt/ros2/ros2_adapter.h @@ -15,7 +15,11 @@ namespace cactus_rt::ros2 { +class Ros2ExecutorThread; + class Ros2Adapter { + friend class Ros2ExecutorThread; + public: struct Config { /** diff --git a/src/cactus_rt/ros2/app.cc b/src/cactus_rt/ros2/app.cc index a9e97fc..a824b51 100644 --- a/src/cactus_rt/ros2/app.cc +++ b/src/cactus_rt/ros2/app.cc @@ -25,8 +25,7 @@ void Ros2ExecutorThread::Run() { executor_->spin_once(); } - // Execute one more time to ensure everything is processed. - executor_->spin_once(); + ros2_adapter_->DrainQueues(); executor_->remove_node(node_ptr); }