diff --git a/examples/ros2/publisher/main.cc b/examples/ros2/publisher/main.cc index a281cf0..a529476 100644 --- a/examples/ros2/publisher/main.cc +++ b/examples/ros2/publisher/main.cc @@ -4,8 +4,10 @@ #include #include #include +#include #include #include +#include using cactus_rt::CyclicThread; using cactus_rt::ros2::App; @@ -18,12 +20,20 @@ struct RealtimeData { }; using RosData = std_msgs::msg::Int64; -namespace { -void RealtimeToROS2ConverterFunc(const RealtimeData& rt_data, RosData& ros_data) { - // A bit of a silly example, but gets the point across. - ros_data.data = rt_data.data; -} -} // namespace +template <> +struct rclcpp::TypeAdapter { + using is_specialized = std::true_type; + using custom_type = RealtimeData; + using ros_message_type = RosData; + + static void convert_to_ros_message(const custom_type& source, ros_message_type& destination) { + destination.data = source.data; + } + + static void convert_to_custom(const ros_message_type& source, custom_type& destination) { + destination.data = source.data; + } +}; class RealtimeROS2PublisherThread : public CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin { int64_t loop_counter_ = 0; @@ -44,7 +54,7 @@ class RealtimeROS2PublisherThread : public CyclicThread, public cactus_rt::ros2: RealtimeROS2PublisherThread(const char* name) : CyclicThread(name, CreateThreadConfig()) {} void InitializeForRos2() override { - publisher_ = ros2_adapter_->CreatePublisher("/hello", rclcpp::QoS(10), RealtimeToROS2ConverterFunc); + publisher_ = ros2_adapter_->CreatePublisher("/hello", rclcpp::QoS(10)); } int64_t GetLoopCounter() const { diff --git a/examples/ros2/subscriber/main.cc b/examples/ros2/subscriber/main.cc index 1b50d25..a1459c9 100644 --- a/examples/ros2/subscriber/main.cc +++ b/examples/ros2/subscriber/main.cc @@ -4,10 +4,10 @@ #include #include #include +#include #include #include - -#include "cactus_rt/ros2/subscription.h" +#include using cactus_rt::CyclicThread; using cactus_rt::ros2::App; @@ -21,13 +21,20 @@ struct RealtimeData { }; using RosData = std_msgs::msg::Int64; -namespace { -RealtimeData ROS2ToRealtimeConverterFunc(const RosData& ros_data) { - // A bit of a silly example, but gets the point across. - RealtimeData rt_data(ros_data.data); - return rt_data; -} -} // namespace +template <> +struct rclcpp::TypeAdapter { + using is_specialized = std::true_type; + using custom_type = RealtimeData; + using ros_message_type = RosData; + + static void convert_to_ros_message(const custom_type& source, ros_message_type& destination) { + destination.data = source.data; + } + + static void convert_to_custom(const ros_message_type& source, custom_type& destination) { + destination.data = source.data; + } +}; class RealtimeROS2SubscriberThread : public CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin { int64_t loop_counter_ = 0; @@ -49,7 +56,7 @@ class RealtimeROS2SubscriberThread : public CyclicThread, public cactus_rt::ros2 RealtimeROS2SubscriberThread(const char* name) : CyclicThread(name, CreateThreadConfig()) {} void InitializeForRos2() override { - subscription_ = ros2_adapter_->CreateSubscriptionForLatestValue("/hello", rclcpp::QoS(10), ROS2ToRealtimeConverterFunc); + subscription_ = ros2_adapter_->CreateSubscriptionForLatestValue("/hello", rclcpp::QoS(10)); } int64_t GetLoopCounter() const { @@ -67,7 +74,7 @@ class RealtimeROS2SubscriberThread : public CyclicThread, public cactus_rt::ros2 data = subscription_->ReadLatest(); } - LOG_INFO(Logger(), "Loop {}: {}", loop_counter_, data.value.data); + LOG_INFO(Logger(), "Loop {}: {} {}", loop_counter_, data.id, data.value.data); } return false; } diff --git a/include/cactus_rt/experimental/lockless/spsc/realtime_readable_value.h b/include/cactus_rt/experimental/lockless/spsc/realtime_readable_value.h index 2bc909b..8b56576 100644 --- a/include/cactus_rt/experimental/lockless/spsc/realtime_readable_value.h +++ b/include/cactus_rt/experimental/lockless/spsc/realtime_readable_value.h @@ -56,6 +56,7 @@ class RealtimeReadableValue { * and free a previous storage pointer. */ void Write(const T& new_value) { + // TODO: make this perfect forwarding. auto new_ptr = std::make_unique(new_value); T* expected; diff --git a/include/cactus_rt/ros2/publisher.h b/include/cactus_rt/ros2/publisher.h index ed6d2ed..2f2a682 100644 --- a/include/cactus_rt/ros2/publisher.h +++ b/include/cactus_rt/ros2/publisher.h @@ -3,10 +3,10 @@ #include -#include +#include +#include #include - -#include "types.h" +#include namespace cactus_rt::ros2 { class Ros2Adapter; @@ -23,35 +23,42 @@ class IPublisher { template class Publisher : public IPublisher { - typename rclcpp::Publisher::SharedPtr publisher_; - std::optional> converter_; - moodycamel::ReaderWriterQueue queue_; + friend class Ros2Adapter; + + using AdaptedRosType = rclcpp::TypeAdapter; + // TODO: static_assert is_pod RealtimeT? + + typename rclcpp::Publisher::SharedPtr publisher_; + moodycamel::ReaderWriterQueue queue_; bool DequeueAndPublishToRos() override { - RealtimeT rt_msg; + if constexpr (std::is_same_v) { + rclcpp::LoanedMessage loaned_msg = publisher_->borrow_loaned_message(); - // 1 copy - const bool has_data = queue_.try_dequeue(rt_msg); - if (!has_data) { - return false; - } + RealtimeT& rt_msg = loaned_msg.get(); + // First copy + if (!queue_.try_dequeue(rt_msg)) { + return false; + } - if (converter_) { - auto loaned_msg = publisher_->borrow_loaned_message(); - // 1 copy - converter_.value()(rt_msg, loaned_msg.get()); publisher_->publish(std::move(loaned_msg)); + return true; } else { - if constexpr (std::is_same_v) { - const auto* ros_msg = static_cast(rt_msg); - // 1 copy - publisher_->publish(*ros_msg); - } else { - throw std::invalid_argument{"converter not specified but RealtimeT and RosT are not the same?!"}; + RealtimeT rt_msg; + // First copy + if (!queue_.try_dequeue(rt_msg)) { + return false; } - } - return true; + // Possible allocation if middleware requires it. + rclcpp::LoanedMessage loaned_msg = publisher_->borrow_loaned_message(); + + // Second copy + AdaptedRosType::convert_to_ros_message(rt_msg, loaned_msg.get()); + + publisher_->publish(std::move(loaned_msg)); + return true; + } } void FullyDrainAndPublishToRos() override { @@ -63,6 +70,21 @@ class Publisher : public IPublisher { } } + static std::shared_ptr> Create( + rclcpp::Node& node, + const std::string& topic_name, + const rclcpp::QoS& qos, + const size_t rt_queue_size = 1000 + ) { + typename rclcpp::Publisher::SharedPtr publisher = node.create_publisher(topic_name, qos); + typename moodycamel::ReaderWriterQueue queue(rt_queue_size); + + return std::make_shared>( + std::move(publisher), + std::move(queue) + ); + } + public: /** * Constructs a publisher. Shouldn't be called directly. Only public to enable make_shared. @@ -70,10 +92,9 @@ class Publisher : public IPublisher { * @private */ Publisher( - typename rclcpp::Publisher::SharedPtr publisher, - std::optional> converter, - moodycamel::ReaderWriterQueue&& queue - ) : publisher_(publisher), converter_(converter), queue_(std::move(queue)) {} + typename rclcpp::Publisher::SharedPtr publisher, + moodycamel::ReaderWriterQueue&& queue + ) : publisher_(publisher), queue_(std::move(queue)) {} template bool Publish(Args&&... args) noexcept { diff --git a/include/cactus_rt/ros2/ros2_adapter.h b/include/cactus_rt/ros2/ros2_adapter.h index 532148a..90046b0 100644 --- a/include/cactus_rt/ros2/ros2_adapter.h +++ b/include/cactus_rt/ros2/ros2_adapter.h @@ -7,15 +7,10 @@ #include #include #include -#include #include -#include -#include -#include "./publisher.h" -#include "./subscription.h" -#include "cactus_rt/ros2/subscription.h" -#include "cactus_rt/ros2/types.h" +#include "publisher.h" +#include "subscription.h" namespace cactus_rt::ros2 { @@ -52,48 +47,27 @@ class Ros2Adapter { template std::shared_ptr> CreatePublisher( - const std::string& topic_name, - const rclcpp::QoS& qos, - std::optional> converter, - size_t rt_queue_size = 1000 + const std::string& topic_name, + const rclcpp::QoS& qos, + size_t rt_queue_size = 1000 ) { - if (!converter) { - if constexpr (!(std::is_trivial_v && std::is_standard_layout_v && std::is_same_v)) { - throw std::invalid_argument("RosT and RealtimeT must be the same and must be a plain object for converter not to be specified"); - } - } - - typename rclcpp::Publisher::SharedPtr publisher = this->ros_node_->create_publisher(topic_name, qos); - typename moodycamel::ReaderWriterQueue queue{rt_queue_size}; - - auto publisher_bundle = std::make_shared>( - std::move(publisher), - converter, - std::move(queue) - ); + auto publisher = Publisher::Create(*ros_node_, topic_name, qos, rt_queue_size); const std::scoped_lock lock(mut_); - publishers_.push_back(publisher_bundle); - return publisher_bundle; + publishers_.push_back(publisher); + return publisher; } template std::shared_ptr> CreateSubscriptionForLatestValue( - const std::string& topic_name, - const rclcpp::QoS& qos, - std::optional> converter + const std::string& topic_name, + const rclcpp::QoS& qos ) { - if (!converter) { - if constexpr (!(std::is_trivial_v && std::is_standard_layout_v && std::is_same_v)) { - throw std::invalid_argument("RosT and RealtimeT must be the same and must be a plain object for converter not to be specified"); - } - } - - auto subscription_bundle = SubscriptionLatest::Create(*this->ros_node_, topic_name, qos, converter); + auto subscriber = SubscriptionLatest::Create(*this->ros_node_, topic_name, qos); const std::scoped_lock lock(mut_); - subscriptions_.push_back(subscription_bundle); - return subscription_bundle; + subscriptions_.push_back(subscriber); + return subscriber; } private: diff --git a/include/cactus_rt/ros2/subscription.h b/include/cactus_rt/ros2/subscription.h index 2436947..9386fb1 100644 --- a/include/cactus_rt/ros2/subscription.h +++ b/include/cactus_rt/ros2/subscription.h @@ -2,15 +2,12 @@ #define CACTUS_RT_ROS2_SUBSCRIPTION_H_ #include -#include #include #include "../experimental/lockless/spsc/realtime_readable_value.h" -#include "cactus_rt/utils.h" -#include "types.h" // Note: ROS subscription dispatch is here: https://github.com/ros2/rclcpp/blob/e10728c/rclcpp/include/rclcpp/any_subscription_callback.hpp#L481 -// There are many methods that we can choose from. +// We are using the TypeAdapter method. namespace cactus_rt::ros2 { @@ -23,11 +20,11 @@ class ISubscription { template struct StampedValue { - int64_t received_time; // TODO: need to align this with elapsed time from the threads... + uint64_t id; RealtimeT value; StampedValue() = default; - StampedValue(int64_t t, const RealtimeT& v) : received_time(t), value(v) {} + StampedValue(const uint64_t i, const RealtimeT& v) : id(i), value(v) {} }; // TODO: Theoretically it is possible to not use the type converter if we don't @@ -36,63 +33,51 @@ template class SubscriptionLatest : public ISubscription { friend class Ros2Adapter; - using RealtimeReadableValue = cactus_rt::experimental::lockless::spsc::RealtimeReadableValue>; - - typename rclcpp::Subscription::SharedPtr ros_subscription_; - std::optional> converter_; - RealtimeReadableValue latest_value_; + using AdaptedRosType = rclcpp::TypeAdapter; - void SubscriptionCallback(const RosT& ros_msg) { - if (converter_) { - const RealtimeT& rt_msg = converter_.value()(ros_msg); + using RealtimeReadableValue = cactus_rt::experimental::lockless::spsc::RealtimeReadableValue>; - // 1 copy - const StampedValue stamped_value(NowNs(), rt_msg); + typename rclcpp::Subscription::SharedPtr ros_subscription_; + uint64_t current_msg_id_ = 0; + RealtimeReadableValue latest_value_; - // 1 allocation and 1 copy. - latest_value_.Write(stamped_value); - } else { - if constexpr (std::is_same_v) { - const auto& rt_msg = static_cast(ros_msg); + void SubscriptionCallback(const RealtimeT& rt_msg) { + current_msg_id_++; // Useful to detect message has changed by the real-time thread. - // 1 copy - const StampedValue stamped_value(NowNs(), rt_msg); + // First copy + const StampedValue stamped_value(current_msg_id_, rt_msg); - // 1 allocation and 1 copy. - latest_value_.Write(stamped_value); - } else { - throw std::invalid_argument{"converter not specified but RealtimeT and RosT are not the same?!"}; - } - } + // A second copy then an allocation + // TODO: directly write to the value (perfect forwarding required). + latest_value_.Write(stamped_value); } static std::shared_ptr Create( - rclcpp::Node& node, - const std::string& topic_name, - const rclcpp::QoS& qos, - std::optional> converter + rclcpp::Node& node, + const std::string& topic_name, + const rclcpp::QoS& qos ) { - auto subscription = std::make_shared>(converter); - - auto* subscription_ptr = subscription.get(); - - // TODO: is there a better way to do this under than to capture the shared_pointer like this? - subscription->ros_subscription_ = node.create_subscription(topic_name, qos, [subscription_ptr](const RosT& ros_msg) { - subscription_ptr->SubscriptionCallback(ros_msg); - }); + auto subscription = std::make_shared>(); + + subscription->ros_subscription_ = node.create_subscription( + topic_name, + qos, + [subscription](const RealtimeT& rt_msg) { + // TODO: we are capturing the subscription shared_ptr by value. Will this cause a memory leak? + subscription->SubscriptionCallback(rt_msg); + } + ); return subscription; } public: /** - * Constructs a subscription. Shouldn't be called directly. Only public to enable make_shared. + * @brief Do not use this method. Defined to allow make_shared to work. * * @private */ - explicit SubscriptionLatest( - std::optional> converter - ) : converter_(converter) {} + SubscriptionLatest() = default; inline StampedValue ReadLatest() noexcept { return latest_value_.Read(); diff --git a/include/cactus_rt/ros2/types.h b/include/cactus_rt/ros2/types.h deleted file mode 100644 index da35668..0000000 --- a/include/cactus_rt/ros2/types.h +++ /dev/null @@ -1,14 +0,0 @@ -#ifndef CACTUS_RT_ROS2_TYPES_H_ -#define CACTUS_RT_ROS2_TYPES_H_ - -#include - -namespace cactus_rt::ros2 { -template -using Ros2ToRealtimeConverter = std::function; - -template -using RealtimeToROS2Converter = std::function; -} // namespace cactus_rt::ros2 - -#endif