Skip to content

Commit

Permalink
Using TypeAdapter
Browse files Browse the repository at this point in the history
  • Loading branch information
shuhaowu committed Aug 4, 2024
1 parent 989c7d1 commit eb23f65
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 145 deletions.
24 changes: 17 additions & 7 deletions examples/ros2/publisher/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
#include <chrono>
#include <iostream>
#include <memory>
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/int64.hpp>
#include <thread>
#include <type_traits>

using cactus_rt::CyclicThread;
using cactus_rt::ros2::App;
Expand All @@ -18,12 +20,20 @@ struct RealtimeData {
};
using RosData = std_msgs::msg::Int64;

namespace {
void RealtimeToROS2ConverterFunc(const RealtimeData& rt_data, RosData& ros_data) {
// A bit of a silly example, but gets the point across.
ros_data.data = rt_data.data;
}
} // namespace
template <>
struct rclcpp::TypeAdapter<RealtimeData, RosData> {
using is_specialized = std::true_type;
using custom_type = RealtimeData;
using ros_message_type = RosData;

static void convert_to_ros_message(const custom_type& source, ros_message_type& destination) {
destination.data = source.data;
}

static void convert_to_custom(const ros_message_type& source, custom_type& destination) {
destination.data = source.data;
}
};

class RealtimeROS2PublisherThread : public CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin {
int64_t loop_counter_ = 0;
Expand All @@ -44,7 +54,7 @@ class RealtimeROS2PublisherThread : public CyclicThread, public cactus_rt::ros2:
RealtimeROS2PublisherThread(const char* name) : CyclicThread(name, CreateThreadConfig()) {}

void InitializeForRos2() override {
publisher_ = ros2_adapter_->CreatePublisher<RealtimeData, RosData>("/hello", rclcpp::QoS(10), RealtimeToROS2ConverterFunc);
publisher_ = ros2_adapter_->CreatePublisher<RealtimeData, RosData>("/hello", rclcpp::QoS(10));
}

int64_t GetLoopCounter() const {
Expand Down
29 changes: 18 additions & 11 deletions examples/ros2/subscriber/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
#include <chrono>
#include <iostream>
#include <memory>
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/int64.hpp>
#include <thread>

#include "cactus_rt/ros2/subscription.h"
#include <type_traits>

using cactus_rt::CyclicThread;
using cactus_rt::ros2::App;
Expand All @@ -21,13 +21,20 @@ struct RealtimeData {
};
using RosData = std_msgs::msg::Int64;

namespace {
RealtimeData ROS2ToRealtimeConverterFunc(const RosData& ros_data) {
// A bit of a silly example, but gets the point across.
RealtimeData rt_data(ros_data.data);
return rt_data;
}
} // namespace
template <>
struct rclcpp::TypeAdapter<RealtimeData, RosData> {
using is_specialized = std::true_type;
using custom_type = RealtimeData;
using ros_message_type = RosData;

static void convert_to_ros_message(const custom_type& source, ros_message_type& destination) {
destination.data = source.data;
}

static void convert_to_custom(const ros_message_type& source, custom_type& destination) {
destination.data = source.data;
}
};

class RealtimeROS2SubscriberThread : public CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin {
int64_t loop_counter_ = 0;
Expand All @@ -49,7 +56,7 @@ class RealtimeROS2SubscriberThread : public CyclicThread, public cactus_rt::ros2
RealtimeROS2SubscriberThread(const char* name) : CyclicThread(name, CreateThreadConfig()) {}

void InitializeForRos2() override {
subscription_ = ros2_adapter_->CreateSubscriptionForLatestValue<RealtimeData, RosData>("/hello", rclcpp::QoS(10), ROS2ToRealtimeConverterFunc);
subscription_ = ros2_adapter_->CreateSubscriptionForLatestValue<RealtimeData, RosData>("/hello", rclcpp::QoS(10));
}

int64_t GetLoopCounter() const {
Expand All @@ -67,7 +74,7 @@ class RealtimeROS2SubscriberThread : public CyclicThread, public cactus_rt::ros2
data = subscription_->ReadLatest();
}

LOG_INFO(Logger(), "Loop {}: {}", loop_counter_, data.value.data);
LOG_INFO(Logger(), "Loop {}: {} {}", loop_counter_, data.id, data.value.data);
}
return false;
}
Expand Down
77 changes: 49 additions & 28 deletions include/cactus_rt/ros2/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

#include <readerwriterqueue.h>

#include <optional>
#include <rclcpp/loaned_message.hpp>
#include <rclcpp/publisher.hpp>
#include <rclcpp/rclcpp.hpp>

#include "types.h"
#include <rclcpp/type_adapter.hpp>

namespace cactus_rt::ros2 {
class Ros2Adapter;
Expand All @@ -23,35 +23,42 @@ class IPublisher {

template <typename RealtimeT, typename RosT>
class Publisher : public IPublisher {
typename rclcpp::Publisher<RosT>::SharedPtr publisher_;
std::optional<RealtimeToROS2Converter<RealtimeT, RosT>> converter_;
moodycamel::ReaderWriterQueue<RealtimeT> queue_;
friend class Ros2Adapter;

using AdaptedRosType = rclcpp::TypeAdapter<RealtimeT, RosT>;
// TODO: static_assert is_pod RealtimeT?

typename rclcpp::Publisher<AdaptedRosType>::SharedPtr publisher_;
moodycamel::ReaderWriterQueue<RealtimeT> queue_;

bool DequeueAndPublishToRos() override {
RealtimeT rt_msg;
if constexpr (std::is_same_v<RealtimeT, RosT>) {
rclcpp::LoanedMessage<RealtimeT> loaned_msg = publisher_->borrow_loaned_message();

// 1 copy
const bool has_data = queue_.try_dequeue(rt_msg);
if (!has_data) {
return false;
}
RealtimeT& rt_msg = loaned_msg.get();
// First copy
if (!queue_.try_dequeue(rt_msg)) {
return false;
}

if (converter_) {
auto loaned_msg = publisher_->borrow_loaned_message();
// 1 copy
converter_.value()(rt_msg, loaned_msg.get());
publisher_->publish(std::move(loaned_msg));
return true;
} else {
if constexpr (std::is_same_v<RealtimeT, RosT>) {
const auto* ros_msg = static_cast<const RosT*>(rt_msg);
// 1 copy
publisher_->publish(*ros_msg);
} else {
throw std::invalid_argument{"converter not specified but RealtimeT and RosT are not the same?!"};
RealtimeT rt_msg;
// First copy
if (!queue_.try_dequeue(rt_msg)) {
return false;
}
}

return true;
// Possible allocation if middleware requires it.
rclcpp::LoanedMessage<RosT> loaned_msg = publisher_->borrow_loaned_message();

// Second copy
AdaptedRosType::convert_to_ros_message(rt_msg, loaned_msg.get());

publisher_->publish(std::move(loaned_msg));
return true;
}
}

void FullyDrainAndPublishToRos() override {
Expand All @@ -63,17 +70,31 @@ class Publisher : public IPublisher {
}
}

static std::shared_ptr<Publisher<RealtimeT, RosT>> Create(
rclcpp::Node& node,
const std::string& topic_name,
const rclcpp::QoS& qos,
const size_t rt_queue_size = 1000
) {
typename rclcpp::Publisher<AdaptedRosType>::SharedPtr publisher = node.create_publisher<AdaptedRosType>(topic_name, qos);
typename moodycamel::ReaderWriterQueue<RealtimeT> queue(rt_queue_size);

return std::make_shared<Publisher<RealtimeT, RosT>>(
std::move(publisher),
std::move(queue)
);
}

public:
/**
* Constructs a publisher. Shouldn't be called directly. Only public to enable make_shared.
*
* @private
*/
Publisher(
typename rclcpp::Publisher<RosT>::SharedPtr publisher,
std::optional<RealtimeToROS2Converter<RealtimeT, RosT>> converter,
moodycamel::ReaderWriterQueue<RealtimeT>&& queue
) : publisher_(publisher), converter_(converter), queue_(std::move(queue)) {}
typename rclcpp::Publisher<AdaptedRosType>::SharedPtr publisher,
moodycamel::ReaderWriterQueue<RealtimeT>&& queue
) : publisher_(publisher), queue_(std::move(queue)) {}

template <typename... Args>
bool Publish(Args&&... args) noexcept {
Expand Down
52 changes: 13 additions & 39 deletions include/cactus_rt/ros2/ros2_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,10 @@
#include <memory>
#include <mutex>
#include <rclcpp/rclcpp.hpp>
#include <stdexcept>
#include <string>
#include <type_traits>
#include <utility>

#include "./publisher.h"
#include "./subscription.h"
#include "cactus_rt/ros2/subscription.h"
#include "cactus_rt/ros2/types.h"
#include "publisher.h"
#include "subscription.h"

namespace cactus_rt::ros2 {

Expand Down Expand Up @@ -52,48 +47,27 @@ class Ros2Adapter {

template <typename RealtimeT, typename RosT>
std::shared_ptr<Publisher<RealtimeT, RosT>> CreatePublisher(
const std::string& topic_name,
const rclcpp::QoS& qos,
std::optional<RealtimeToROS2Converter<RealtimeT, RosT>> converter,
size_t rt_queue_size = 1000
const std::string& topic_name,
const rclcpp::QoS& qos,
size_t rt_queue_size = 1000
) {
if (!converter) {
if constexpr (!(std::is_trivial_v<RosT> && std::is_standard_layout_v<RosT> && std::is_same_v<RosT, RealtimeT>)) {
throw std::invalid_argument("RosT and RealtimeT must be the same and must be a plain object for converter not to be specified");
}
}

typename rclcpp::Publisher<RosT>::SharedPtr publisher = this->ros_node_->create_publisher<RosT>(topic_name, qos);
typename moodycamel::ReaderWriterQueue<RealtimeT> queue{rt_queue_size};

auto publisher_bundle = std::make_shared<Publisher<RealtimeT, RosT>>(
std::move(publisher),
converter,
std::move(queue)
);
auto publisher = Publisher<RealtimeT, RosT>::Create(*ros_node_, topic_name, qos, rt_queue_size);

const std::scoped_lock lock(mut_);
publishers_.push_back(publisher_bundle);
return publisher_bundle;
publishers_.push_back(publisher);
return publisher;
}

template <typename RealtimeT, typename RosT>
std::shared_ptr<SubscriptionLatest<RealtimeT, RosT>> CreateSubscriptionForLatestValue(
const std::string& topic_name,
const rclcpp::QoS& qos,
std::optional<Ros2ToRealtimeConverter<RealtimeT, RosT>> converter
const std::string& topic_name,
const rclcpp::QoS& qos
) {
if (!converter) {
if constexpr (!(std::is_trivial_v<RosT> && std::is_standard_layout_v<RosT> && std::is_same_v<RosT, RealtimeT>)) {
throw std::invalid_argument("RosT and RealtimeT must be the same and must be a plain object for converter not to be specified");
}
}

auto subscription_bundle = SubscriptionLatest<RealtimeT, RosT>::Create(*this->ros_node_, topic_name, qos, converter);
auto subscriber = SubscriptionLatest<RealtimeT, RosT>::Create(*this->ros_node_, topic_name, qos);

const std::scoped_lock lock(mut_);
subscriptions_.push_back(subscription_bundle);
return subscription_bundle;
subscriptions_.push_back(subscriber);
return subscriber;
}

private:
Expand Down
Loading

0 comments on commit eb23f65

Please sign in to comment.