Skip to content

Commit

Permalink
Use rclcpp::TypeAdapter instead of custom convert
Browse files Browse the repository at this point in the history
This uses standard facility to support type conversion, which
significantly simplifies the code, is more performant, and is safer (as
compile time we can check for more errors).

Fixes #109
  • Loading branch information
shuhaowu committed Aug 4, 2024
1 parent 989c7d1 commit 2bbb998
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 144 deletions.
24 changes: 17 additions & 7 deletions examples/ros2/publisher/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
#include <chrono>
#include <iostream>
#include <memory>
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/int64.hpp>
#include <thread>
#include <type_traits>

using cactus_rt::CyclicThread;
using cactus_rt::ros2::App;
Expand All @@ -18,12 +20,20 @@ struct RealtimeData {
};
using RosData = std_msgs::msg::Int64;

namespace {
void RealtimeToROS2ConverterFunc(const RealtimeData& rt_data, RosData& ros_data) {
// A bit of a silly example, but gets the point across.
ros_data.data = rt_data.data;
}
} // namespace
template <>
struct rclcpp::TypeAdapter<RealtimeData, RosData> {
using is_specialized = std::true_type;
using custom_type = RealtimeData;
using ros_message_type = RosData;

static void convert_to_ros_message(const custom_type& source, ros_message_type& destination) {
destination.data = source.data;
}

static void convert_to_custom(const ros_message_type& source, custom_type& destination) {
destination.data = source.data;
}
};

class RealtimeROS2PublisherThread : public CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin {
int64_t loop_counter_ = 0;
Expand All @@ -44,7 +54,7 @@ class RealtimeROS2PublisherThread : public CyclicThread, public cactus_rt::ros2:
RealtimeROS2PublisherThread(const char* name) : CyclicThread(name, CreateThreadConfig()) {}

void InitializeForRos2() override {
publisher_ = ros2_adapter_->CreatePublisher<RealtimeData, RosData>("/hello", rclcpp::QoS(10), RealtimeToROS2ConverterFunc);
publisher_ = ros2_adapter_->CreatePublisher<RealtimeData, RosData>("/hello", rclcpp::QoS(10));
}

int64_t GetLoopCounter() const {
Expand Down
29 changes: 18 additions & 11 deletions examples/ros2/subscriber/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
#include <chrono>
#include <iostream>
#include <memory>
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/int64.hpp>
#include <thread>

#include "cactus_rt/ros2/subscription.h"
#include <type_traits>

using cactus_rt::CyclicThread;
using cactus_rt::ros2::App;
Expand All @@ -21,13 +21,20 @@ struct RealtimeData {
};
using RosData = std_msgs::msg::Int64;

namespace {
RealtimeData ROS2ToRealtimeConverterFunc(const RosData& ros_data) {
// A bit of a silly example, but gets the point across.
RealtimeData rt_data(ros_data.data);
return rt_data;
}
} // namespace
template <>
struct rclcpp::TypeAdapter<RealtimeData, RosData> {
using is_specialized = std::true_type;
using custom_type = RealtimeData;
using ros_message_type = RosData;

static void convert_to_ros_message(const custom_type& source, ros_message_type& destination) {
destination.data = source.data;
}

static void convert_to_custom(const ros_message_type& source, custom_type& destination) {
destination.data = source.data;
}
};

class RealtimeROS2SubscriberThread : public CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin {
int64_t loop_counter_ = 0;
Expand All @@ -49,7 +56,7 @@ class RealtimeROS2SubscriberThread : public CyclicThread, public cactus_rt::ros2
RealtimeROS2SubscriberThread(const char* name) : CyclicThread(name, CreateThreadConfig()) {}

void InitializeForRos2() override {
subscription_ = ros2_adapter_->CreateSubscriptionForLatestValue<RealtimeData, RosData>("/hello", rclcpp::QoS(10), ROS2ToRealtimeConverterFunc);
subscription_ = ros2_adapter_->CreateSubscriptionForLatestValue<RealtimeData, RosData>("/hello", rclcpp::QoS(10));
}

int64_t GetLoopCounter() const {
Expand All @@ -67,7 +74,7 @@ class RealtimeROS2SubscriberThread : public CyclicThread, public cactus_rt::ros2
data = subscription_->ReadLatest();
}

LOG_INFO(Logger(), "Loop {}: {}", loop_counter_, data.value.data);
LOG_INFO(Logger(), "Loop {}: {} {}", loop_counter_, data.id, data.value.data);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class RealtimeReadableValue {
* and free a previous storage pointer.
*/
void Write(const T& new_value) {
// TODO: make this perfect forwarding.
auto new_ptr = std::make_unique<T>(new_value);
T* expected;

Expand Down
77 changes: 49 additions & 28 deletions include/cactus_rt/ros2/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

#include <readerwriterqueue.h>

#include <optional>
#include <rclcpp/loaned_message.hpp>
#include <rclcpp/publisher.hpp>
#include <rclcpp/rclcpp.hpp>

#include "types.h"
#include <rclcpp/type_adapter.hpp>

namespace cactus_rt::ros2 {
class Ros2Adapter;
Expand All @@ -23,35 +23,42 @@ class IPublisher {

template <typename RealtimeT, typename RosT>
class Publisher : public IPublisher {
typename rclcpp::Publisher<RosT>::SharedPtr publisher_;
std::optional<RealtimeToROS2Converter<RealtimeT, RosT>> converter_;
moodycamel::ReaderWriterQueue<RealtimeT> queue_;
friend class Ros2Adapter;

using AdaptedRosType = rclcpp::TypeAdapter<RealtimeT, RosT>;
// TODO: static_assert is_pod RealtimeT?

typename rclcpp::Publisher<AdaptedRosType>::SharedPtr publisher_;
moodycamel::ReaderWriterQueue<RealtimeT> queue_;

bool DequeueAndPublishToRos() override {
RealtimeT rt_msg;
if constexpr (std::is_same_v<RealtimeT, RosT>) {
rclcpp::LoanedMessage<RealtimeT> loaned_msg = publisher_->borrow_loaned_message();

// 1 copy
const bool has_data = queue_.try_dequeue(rt_msg);
if (!has_data) {
return false;
}
RealtimeT& rt_msg = loaned_msg.get();
// First copy
if (!queue_.try_dequeue(rt_msg)) {
return false;
}

if (converter_) {
auto loaned_msg = publisher_->borrow_loaned_message();
// 1 copy
converter_.value()(rt_msg, loaned_msg.get());
publisher_->publish(std::move(loaned_msg));
return true;
} else {
if constexpr (std::is_same_v<RealtimeT, RosT>) {
const auto* ros_msg = static_cast<const RosT*>(rt_msg);
// 1 copy
publisher_->publish(*ros_msg);
} else {
throw std::invalid_argument{"converter not specified but RealtimeT and RosT are not the same?!"};
RealtimeT rt_msg;
// First copy
if (!queue_.try_dequeue(rt_msg)) {
return false;
}
}

return true;
// Possible allocation if middleware requires it.
rclcpp::LoanedMessage<RosT> loaned_msg = publisher_->borrow_loaned_message();

// Second copy
AdaptedRosType::convert_to_ros_message(rt_msg, loaned_msg.get());

publisher_->publish(std::move(loaned_msg));
return true;
}
}

void FullyDrainAndPublishToRos() override {
Expand All @@ -63,17 +70,31 @@ class Publisher : public IPublisher {
}
}

static std::shared_ptr<Publisher<RealtimeT, RosT>> Create(
rclcpp::Node& node,
const std::string& topic_name,
const rclcpp::QoS& qos,
const size_t rt_queue_size = 1000
) {
typename rclcpp::Publisher<AdaptedRosType>::SharedPtr publisher = node.create_publisher<AdaptedRosType>(topic_name, qos);
typename moodycamel::ReaderWriterQueue<RealtimeT> queue(rt_queue_size);

return std::make_shared<Publisher<RealtimeT, RosT>>(
std::move(publisher),
std::move(queue)
);
}

public:
/**
* Constructs a publisher. Shouldn't be called directly. Only public to enable make_shared.
*
* @private
*/
Publisher(
typename rclcpp::Publisher<RosT>::SharedPtr publisher,
std::optional<RealtimeToROS2Converter<RealtimeT, RosT>> converter,
moodycamel::ReaderWriterQueue<RealtimeT>&& queue
) : publisher_(publisher), converter_(converter), queue_(std::move(queue)) {}
typename rclcpp::Publisher<AdaptedRosType>::SharedPtr publisher,
moodycamel::ReaderWriterQueue<RealtimeT>&& queue
) : publisher_(publisher), queue_(std::move(queue)) {}

template <typename... Args>
bool Publish(Args&&... args) noexcept {
Expand Down
52 changes: 13 additions & 39 deletions include/cactus_rt/ros2/ros2_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,10 @@
#include <memory>
#include <mutex>
#include <rclcpp/rclcpp.hpp>
#include <stdexcept>
#include <string>
#include <type_traits>
#include <utility>

#include "./publisher.h"
#include "./subscription.h"
#include "cactus_rt/ros2/subscription.h"
#include "cactus_rt/ros2/types.h"
#include "publisher.h"
#include "subscription.h"

namespace cactus_rt::ros2 {

Expand Down Expand Up @@ -52,48 +47,27 @@ class Ros2Adapter {

template <typename RealtimeT, typename RosT>
std::shared_ptr<Publisher<RealtimeT, RosT>> CreatePublisher(
const std::string& topic_name,
const rclcpp::QoS& qos,
std::optional<RealtimeToROS2Converter<RealtimeT, RosT>> converter,
size_t rt_queue_size = 1000
const std::string& topic_name,
const rclcpp::QoS& qos,
size_t rt_queue_size = 1000
) {
if (!converter) {
if constexpr (!(std::is_trivial_v<RosT> && std::is_standard_layout_v<RosT> && std::is_same_v<RosT, RealtimeT>)) {
throw std::invalid_argument("RosT and RealtimeT must be the same and must be a plain object for converter not to be specified");
}
}

typename rclcpp::Publisher<RosT>::SharedPtr publisher = this->ros_node_->create_publisher<RosT>(topic_name, qos);
typename moodycamel::ReaderWriterQueue<RealtimeT> queue{rt_queue_size};

auto publisher_bundle = std::make_shared<Publisher<RealtimeT, RosT>>(
std::move(publisher),
converter,
std::move(queue)
);
auto publisher = Publisher<RealtimeT, RosT>::Create(*ros_node_, topic_name, qos, rt_queue_size);

const std::scoped_lock lock(mut_);
publishers_.push_back(publisher_bundle);
return publisher_bundle;
publishers_.push_back(publisher);
return publisher;
}

template <typename RealtimeT, typename RosT>
std::shared_ptr<SubscriptionLatest<RealtimeT, RosT>> CreateSubscriptionForLatestValue(
const std::string& topic_name,
const rclcpp::QoS& qos,
std::optional<Ros2ToRealtimeConverter<RealtimeT, RosT>> converter
const std::string& topic_name,
const rclcpp::QoS& qos
) {
if (!converter) {
if constexpr (!(std::is_trivial_v<RosT> && std::is_standard_layout_v<RosT> && std::is_same_v<RosT, RealtimeT>)) {
throw std::invalid_argument("RosT and RealtimeT must be the same and must be a plain object for converter not to be specified");
}
}

auto subscription_bundle = SubscriptionLatest<RealtimeT, RosT>::Create(*this->ros_node_, topic_name, qos, converter);
auto subscriber = SubscriptionLatest<RealtimeT, RosT>::Create(*this->ros_node_, topic_name, qos);

const std::scoped_lock lock(mut_);
subscriptions_.push_back(subscription_bundle);
return subscription_bundle;
subscriptions_.push_back(subscriber);
return subscriber;
}

private:
Expand Down
Loading

0 comments on commit 2bbb998

Please sign in to comment.