From 2bbb99827215242fd06b3734b6b6b752fbac6f46 Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Sun, 4 Aug 2024 00:22:20 -0400 Subject: [PATCH] Use rclcpp::TypeAdapter instead of custom convert This uses standard facility to support type conversion, which significantly simplifies the code, is more performant, and is safer (as compile time we can check for more errors). Fixes #109 --- examples/ros2/publisher/main.cc | 24 ++++-- examples/ros2/subscriber/main.cc | 29 ++++--- .../lockless/spsc/realtime_readable_value.h | 1 + include/cactus_rt/ros2/publisher.h | 77 ++++++++++++------- include/cactus_rt/ros2/ros2_adapter.h | 52 ++++--------- include/cactus_rt/ros2/subscription.h | 75 ++++++++---------- include/cactus_rt/ros2/types.h | 14 ---- 7 files changed, 128 insertions(+), 144 deletions(-) delete mode 100644 include/cactus_rt/ros2/types.h diff --git a/examples/ros2/publisher/main.cc b/examples/ros2/publisher/main.cc index a281cf0..a529476 100644 --- a/examples/ros2/publisher/main.cc +++ b/examples/ros2/publisher/main.cc @@ -4,8 +4,10 @@ #include #include #include +#include #include #include +#include using cactus_rt::CyclicThread; using cactus_rt::ros2::App; @@ -18,12 +20,20 @@ struct RealtimeData { }; using RosData = std_msgs::msg::Int64; -namespace { -void RealtimeToROS2ConverterFunc(const RealtimeData& rt_data, RosData& ros_data) { - // A bit of a silly example, but gets the point across. - ros_data.data = rt_data.data; -} -} // namespace +template <> +struct rclcpp::TypeAdapter { + using is_specialized = std::true_type; + using custom_type = RealtimeData; + using ros_message_type = RosData; + + static void convert_to_ros_message(const custom_type& source, ros_message_type& destination) { + destination.data = source.data; + } + + static void convert_to_custom(const ros_message_type& source, custom_type& destination) { + destination.data = source.data; + } +}; class RealtimeROS2PublisherThread : public CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin { int64_t loop_counter_ = 0; @@ -44,7 +54,7 @@ class RealtimeROS2PublisherThread : public CyclicThread, public cactus_rt::ros2: RealtimeROS2PublisherThread(const char* name) : CyclicThread(name, CreateThreadConfig()) {} void InitializeForRos2() override { - publisher_ = ros2_adapter_->CreatePublisher("/hello", rclcpp::QoS(10), RealtimeToROS2ConverterFunc); + publisher_ = ros2_adapter_->CreatePublisher("/hello", rclcpp::QoS(10)); } int64_t GetLoopCounter() const { diff --git a/examples/ros2/subscriber/main.cc b/examples/ros2/subscriber/main.cc index 1b50d25..a1459c9 100644 --- a/examples/ros2/subscriber/main.cc +++ b/examples/ros2/subscriber/main.cc @@ -4,10 +4,10 @@ #include #include #include +#include #include #include - -#include "cactus_rt/ros2/subscription.h" +#include using cactus_rt::CyclicThread; using cactus_rt::ros2::App; @@ -21,13 +21,20 @@ struct RealtimeData { }; using RosData = std_msgs::msg::Int64; -namespace { -RealtimeData ROS2ToRealtimeConverterFunc(const RosData& ros_data) { - // A bit of a silly example, but gets the point across. - RealtimeData rt_data(ros_data.data); - return rt_data; -} -} // namespace +template <> +struct rclcpp::TypeAdapter { + using is_specialized = std::true_type; + using custom_type = RealtimeData; + using ros_message_type = RosData; + + static void convert_to_ros_message(const custom_type& source, ros_message_type& destination) { + destination.data = source.data; + } + + static void convert_to_custom(const ros_message_type& source, custom_type& destination) { + destination.data = source.data; + } +}; class RealtimeROS2SubscriberThread : public CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin { int64_t loop_counter_ = 0; @@ -49,7 +56,7 @@ class RealtimeROS2SubscriberThread : public CyclicThread, public cactus_rt::ros2 RealtimeROS2SubscriberThread(const char* name) : CyclicThread(name, CreateThreadConfig()) {} void InitializeForRos2() override { - subscription_ = ros2_adapter_->CreateSubscriptionForLatestValue("/hello", rclcpp::QoS(10), ROS2ToRealtimeConverterFunc); + subscription_ = ros2_adapter_->CreateSubscriptionForLatestValue("/hello", rclcpp::QoS(10)); } int64_t GetLoopCounter() const { @@ -67,7 +74,7 @@ class RealtimeROS2SubscriberThread : public CyclicThread, public cactus_rt::ros2 data = subscription_->ReadLatest(); } - LOG_INFO(Logger(), "Loop {}: {}", loop_counter_, data.value.data); + LOG_INFO(Logger(), "Loop {}: {} {}", loop_counter_, data.id, data.value.data); } return false; } diff --git a/include/cactus_rt/experimental/lockless/spsc/realtime_readable_value.h b/include/cactus_rt/experimental/lockless/spsc/realtime_readable_value.h index 2bc909b..8b56576 100644 --- a/include/cactus_rt/experimental/lockless/spsc/realtime_readable_value.h +++ b/include/cactus_rt/experimental/lockless/spsc/realtime_readable_value.h @@ -56,6 +56,7 @@ class RealtimeReadableValue { * and free a previous storage pointer. */ void Write(const T& new_value) { + // TODO: make this perfect forwarding. auto new_ptr = std::make_unique(new_value); T* expected; diff --git a/include/cactus_rt/ros2/publisher.h b/include/cactus_rt/ros2/publisher.h index ed6d2ed..2f2a682 100644 --- a/include/cactus_rt/ros2/publisher.h +++ b/include/cactus_rt/ros2/publisher.h @@ -3,10 +3,10 @@ #include -#include +#include +#include #include - -#include "types.h" +#include namespace cactus_rt::ros2 { class Ros2Adapter; @@ -23,35 +23,42 @@ class IPublisher { template class Publisher : public IPublisher { - typename rclcpp::Publisher::SharedPtr publisher_; - std::optional> converter_; - moodycamel::ReaderWriterQueue queue_; + friend class Ros2Adapter; + + using AdaptedRosType = rclcpp::TypeAdapter; + // TODO: static_assert is_pod RealtimeT? + + typename rclcpp::Publisher::SharedPtr publisher_; + moodycamel::ReaderWriterQueue queue_; bool DequeueAndPublishToRos() override { - RealtimeT rt_msg; + if constexpr (std::is_same_v) { + rclcpp::LoanedMessage loaned_msg = publisher_->borrow_loaned_message(); - // 1 copy - const bool has_data = queue_.try_dequeue(rt_msg); - if (!has_data) { - return false; - } + RealtimeT& rt_msg = loaned_msg.get(); + // First copy + if (!queue_.try_dequeue(rt_msg)) { + return false; + } - if (converter_) { - auto loaned_msg = publisher_->borrow_loaned_message(); - // 1 copy - converter_.value()(rt_msg, loaned_msg.get()); publisher_->publish(std::move(loaned_msg)); + return true; } else { - if constexpr (std::is_same_v) { - const auto* ros_msg = static_cast(rt_msg); - // 1 copy - publisher_->publish(*ros_msg); - } else { - throw std::invalid_argument{"converter not specified but RealtimeT and RosT are not the same?!"}; + RealtimeT rt_msg; + // First copy + if (!queue_.try_dequeue(rt_msg)) { + return false; } - } - return true; + // Possible allocation if middleware requires it. + rclcpp::LoanedMessage loaned_msg = publisher_->borrow_loaned_message(); + + // Second copy + AdaptedRosType::convert_to_ros_message(rt_msg, loaned_msg.get()); + + publisher_->publish(std::move(loaned_msg)); + return true; + } } void FullyDrainAndPublishToRos() override { @@ -63,6 +70,21 @@ class Publisher : public IPublisher { } } + static std::shared_ptr> Create( + rclcpp::Node& node, + const std::string& topic_name, + const rclcpp::QoS& qos, + const size_t rt_queue_size = 1000 + ) { + typename rclcpp::Publisher::SharedPtr publisher = node.create_publisher(topic_name, qos); + typename moodycamel::ReaderWriterQueue queue(rt_queue_size); + + return std::make_shared>( + std::move(publisher), + std::move(queue) + ); + } + public: /** * Constructs a publisher. Shouldn't be called directly. Only public to enable make_shared. @@ -70,10 +92,9 @@ class Publisher : public IPublisher { * @private */ Publisher( - typename rclcpp::Publisher::SharedPtr publisher, - std::optional> converter, - moodycamel::ReaderWriterQueue&& queue - ) : publisher_(publisher), converter_(converter), queue_(std::move(queue)) {} + typename rclcpp::Publisher::SharedPtr publisher, + moodycamel::ReaderWriterQueue&& queue + ) : publisher_(publisher), queue_(std::move(queue)) {} template bool Publish(Args&&... args) noexcept { diff --git a/include/cactus_rt/ros2/ros2_adapter.h b/include/cactus_rt/ros2/ros2_adapter.h index 532148a..90046b0 100644 --- a/include/cactus_rt/ros2/ros2_adapter.h +++ b/include/cactus_rt/ros2/ros2_adapter.h @@ -7,15 +7,10 @@ #include #include #include -#include #include -#include -#include -#include "./publisher.h" -#include "./subscription.h" -#include "cactus_rt/ros2/subscription.h" -#include "cactus_rt/ros2/types.h" +#include "publisher.h" +#include "subscription.h" namespace cactus_rt::ros2 { @@ -52,48 +47,27 @@ class Ros2Adapter { template std::shared_ptr> CreatePublisher( - const std::string& topic_name, - const rclcpp::QoS& qos, - std::optional> converter, - size_t rt_queue_size = 1000 + const std::string& topic_name, + const rclcpp::QoS& qos, + size_t rt_queue_size = 1000 ) { - if (!converter) { - if constexpr (!(std::is_trivial_v && std::is_standard_layout_v && std::is_same_v)) { - throw std::invalid_argument("RosT and RealtimeT must be the same and must be a plain object for converter not to be specified"); - } - } - - typename rclcpp::Publisher::SharedPtr publisher = this->ros_node_->create_publisher(topic_name, qos); - typename moodycamel::ReaderWriterQueue queue{rt_queue_size}; - - auto publisher_bundle = std::make_shared>( - std::move(publisher), - converter, - std::move(queue) - ); + auto publisher = Publisher::Create(*ros_node_, topic_name, qos, rt_queue_size); const std::scoped_lock lock(mut_); - publishers_.push_back(publisher_bundle); - return publisher_bundle; + publishers_.push_back(publisher); + return publisher; } template std::shared_ptr> CreateSubscriptionForLatestValue( - const std::string& topic_name, - const rclcpp::QoS& qos, - std::optional> converter + const std::string& topic_name, + const rclcpp::QoS& qos ) { - if (!converter) { - if constexpr (!(std::is_trivial_v && std::is_standard_layout_v && std::is_same_v)) { - throw std::invalid_argument("RosT and RealtimeT must be the same and must be a plain object for converter not to be specified"); - } - } - - auto subscription_bundle = SubscriptionLatest::Create(*this->ros_node_, topic_name, qos, converter); + auto subscriber = SubscriptionLatest::Create(*this->ros_node_, topic_name, qos); const std::scoped_lock lock(mut_); - subscriptions_.push_back(subscription_bundle); - return subscription_bundle; + subscriptions_.push_back(subscriber); + return subscriber; } private: diff --git a/include/cactus_rt/ros2/subscription.h b/include/cactus_rt/ros2/subscription.h index 2436947..9386fb1 100644 --- a/include/cactus_rt/ros2/subscription.h +++ b/include/cactus_rt/ros2/subscription.h @@ -2,15 +2,12 @@ #define CACTUS_RT_ROS2_SUBSCRIPTION_H_ #include -#include #include #include "../experimental/lockless/spsc/realtime_readable_value.h" -#include "cactus_rt/utils.h" -#include "types.h" // Note: ROS subscription dispatch is here: https://github.com/ros2/rclcpp/blob/e10728c/rclcpp/include/rclcpp/any_subscription_callback.hpp#L481 -// There are many methods that we can choose from. +// We are using the TypeAdapter method. namespace cactus_rt::ros2 { @@ -23,11 +20,11 @@ class ISubscription { template struct StampedValue { - int64_t received_time; // TODO: need to align this with elapsed time from the threads... + uint64_t id; RealtimeT value; StampedValue() = default; - StampedValue(int64_t t, const RealtimeT& v) : received_time(t), value(v) {} + StampedValue(const uint64_t i, const RealtimeT& v) : id(i), value(v) {} }; // TODO: Theoretically it is possible to not use the type converter if we don't @@ -36,63 +33,51 @@ template class SubscriptionLatest : public ISubscription { friend class Ros2Adapter; - using RealtimeReadableValue = cactus_rt::experimental::lockless::spsc::RealtimeReadableValue>; - - typename rclcpp::Subscription::SharedPtr ros_subscription_; - std::optional> converter_; - RealtimeReadableValue latest_value_; + using AdaptedRosType = rclcpp::TypeAdapter; - void SubscriptionCallback(const RosT& ros_msg) { - if (converter_) { - const RealtimeT& rt_msg = converter_.value()(ros_msg); + using RealtimeReadableValue = cactus_rt::experimental::lockless::spsc::RealtimeReadableValue>; - // 1 copy - const StampedValue stamped_value(NowNs(), rt_msg); + typename rclcpp::Subscription::SharedPtr ros_subscription_; + uint64_t current_msg_id_ = 0; + RealtimeReadableValue latest_value_; - // 1 allocation and 1 copy. - latest_value_.Write(stamped_value); - } else { - if constexpr (std::is_same_v) { - const auto& rt_msg = static_cast(ros_msg); + void SubscriptionCallback(const RealtimeT& rt_msg) { + current_msg_id_++; // Useful to detect message has changed by the real-time thread. - // 1 copy - const StampedValue stamped_value(NowNs(), rt_msg); + // First copy + const StampedValue stamped_value(current_msg_id_, rt_msg); - // 1 allocation and 1 copy. - latest_value_.Write(stamped_value); - } else { - throw std::invalid_argument{"converter not specified but RealtimeT and RosT are not the same?!"}; - } - } + // A second copy then an allocation + // TODO: directly write to the value (perfect forwarding required). + latest_value_.Write(stamped_value); } static std::shared_ptr Create( - rclcpp::Node& node, - const std::string& topic_name, - const rclcpp::QoS& qos, - std::optional> converter + rclcpp::Node& node, + const std::string& topic_name, + const rclcpp::QoS& qos ) { - auto subscription = std::make_shared>(converter); - - auto* subscription_ptr = subscription.get(); - - // TODO: is there a better way to do this under than to capture the shared_pointer like this? - subscription->ros_subscription_ = node.create_subscription(topic_name, qos, [subscription_ptr](const RosT& ros_msg) { - subscription_ptr->SubscriptionCallback(ros_msg); - }); + auto subscription = std::make_shared>(); + + subscription->ros_subscription_ = node.create_subscription( + topic_name, + qos, + [subscription](const RealtimeT& rt_msg) { + // TODO: we are capturing the subscription shared_ptr by value. Will this cause a memory leak? + subscription->SubscriptionCallback(rt_msg); + } + ); return subscription; } public: /** - * Constructs a subscription. Shouldn't be called directly. Only public to enable make_shared. + * @brief Do not use this method. Defined to allow make_shared to work. * * @private */ - explicit SubscriptionLatest( - std::optional> converter - ) : converter_(converter) {} + SubscriptionLatest() = default; inline StampedValue ReadLatest() noexcept { return latest_value_.Read(); diff --git a/include/cactus_rt/ros2/types.h b/include/cactus_rt/ros2/types.h deleted file mode 100644 index da35668..0000000 --- a/include/cactus_rt/ros2/types.h +++ /dev/null @@ -1,14 +0,0 @@ -#ifndef CACTUS_RT_ROS2_TYPES_H_ -#define CACTUS_RT_ROS2_TYPES_H_ - -#include - -namespace cactus_rt::ros2 { -template -using Ros2ToRealtimeConverter = std::function; - -template -using RealtimeToROS2Converter = std::function; -} // namespace cactus_rt::ros2 - -#endif