Skip to content

Commit

Permalink
Reworked subscriber examples
Browse files Browse the repository at this point in the history
  • Loading branch information
shuhaowu committed Aug 5, 2024
1 parent f45dd63 commit d32b107
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 134 deletions.
8 changes: 2 additions & 6 deletions examples/ros2/publisher/complex_data.cc
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
#include <cactus_rt/config.h>
#include <cactus_rt/cyclic_thread.h>
#include <cactus_rt/ros2/app.h>
#include <quill/detail/LogMacros.h>
#include <cactus_rt/rt.h>

#include <chrono>
#include <rclcpp/qos.hpp>
#include <memory>
#include <std_msgs/msg/float32_multi_array.hpp>

#include "cactus_rt/ros2/ros2_adapter.h"

struct MyCoefficientData {
float a;
float b;
Expand Down
7 changes: 1 addition & 6 deletions examples/ros2/publisher/simple_data.cc
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
#include <cactus_rt/config.h>
#include <cactus_rt/cyclic_thread.h>
#include <cactus_rt/ros2/app.h>
#include <quill/detail/LogMacros.h>
#include <cactus_rt/rt.h>

#include <chrono>
#include <memory>
#include <rclcpp/qos.hpp>
#include <std_msgs/msg/int64.hpp>

#include "cactus_rt/ros2/ros2_adapter.h"

using RealtimeType = std_msgs::msg::Int64;
using RosType = std_msgs::msg::Int64;

Expand Down
32 changes: 26 additions & 6 deletions examples/ros2/subscriber/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,23 +1,43 @@
find_package(std_msgs REQUIRED)

add_executable(rt_ros2_subscriber
main.cc
add_executable(rt_ros2_subscriber_simple_data
simple_data.cc
)

target_link_libraries(rt_ros2_subscriber
target_link_libraries(rt_ros2_subscriber_simple_data
PRIVATE
cactus_rt
)

setup_cactus_rt_target_options(rt_ros2_subscriber)
setup_cactus_rt_target_options(rt_ros2_subscriber_simple_data)

ament_target_dependencies(rt_ros2_subscriber
ament_target_dependencies(rt_ros2_subscriber_simple_data
PUBLIC
std_msgs
)

install(
TARGETS rt_ros2_subscriber
TARGETS rt_ros2_subscriber_simple_data
DESTINATION lib/${PROJECT_NAME}
)

add_executable(rt_ros2_subscriber_complex_data
complex_data.cc
)

target_link_libraries(rt_ros2_subscriber_complex_data
PRIVATE
cactus_rt
)

setup_cactus_rt_target_options(rt_ros2_subscriber_complex_data)

ament_target_dependencies(rt_ros2_subscriber_complex_data
PUBLIC
std_msgs
)

install(
TARGETS rt_ros2_subscriber_complex_data
DESTINATION lib/${PROJECT_NAME}
)
102 changes: 102 additions & 0 deletions examples/ros2/subscriber/complex_data.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#include <cactus_rt/ros2/app.h>
#include <cactus_rt/rt.h>

#include <chrono>
#include <memory>
#include <std_msgs/msg/float32_multi_array.hpp>

struct MyCoefficientData {
float a;
float b;
float c;
float d;
};

using RealtimeType = MyCoefficientData;
using RosType = std_msgs::msg::Float32MultiArray;

template <>
struct rclcpp::TypeAdapter<RealtimeType, RosType> {
using is_specialized = std::true_type;
using custom_type = RealtimeType;
using ros_message_type = RosType;

static void convert_to_ros_message(const custom_type& source, ros_message_type& destination) {
destination.data.reserve(4);
destination.data.push_back(source.a);
destination.data.push_back(source.b);
destination.data.push_back(source.c);
destination.data.push_back(source.d);
}

static void convert_to_custom(const ros_message_type& source, custom_type& destination) {
destination.a = source.data.at(0);
destination.b = source.data.at(1);
destination.c = source.data.at(2);
destination.d = source.data.at(3);
}
};

class RTROS2SubscriberThread : public cactus_rt::CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin {
int64_t last_msg_id_ = 0;
int64_t run_duration_;

std::shared_ptr<cactus_rt::ros2::SubscriptionLatest<RealtimeType, RosType>> subscription_;

static cactus_rt::CyclicThreadConfig CreateThreadConfig() {
cactus_rt::CyclicThreadConfig thread_config;
thread_config.period_ns = 1'000'000;
thread_config.cpu_affinity = std::vector<size_t>{2};
thread_config.SetFifoScheduler(80);

return thread_config;
}

public:
explicit RTROS2SubscriberThread(std::chrono::nanoseconds run_duration = std::chrono::seconds(30))
: cactus_rt::CyclicThread("RTROS2Subscriber", CreateThreadConfig()),
run_duration_(run_duration.count()) {}

void InitializeForRos2() override {
subscription_ = ros2_adapter_->CreateSubscriptionForLatestValue<RealtimeType, RosType>("/cactus_rt/complex", rclcpp::QoS(100));
}

protected:
bool Loop(int64_t elapsed_ns) noexcept override {
cactus_rt::ros2::StampedValue<MyCoefficientData> msg;
{
const auto span = Tracer().WithSpan("Subscription::ReadLatest");
msg = subscription_->ReadLatest();
}

if (msg.id != last_msg_id_) {
LOG_INFO(Logger(), "Got new data at {}: {} [{}, {}, {}, {}]", elapsed_ns, msg.id, msg.value.a, msg.value.b, msg.value.c, msg.value.d);
last_msg_id_ = msg.id;
}

return elapsed_ns > run_duration_;
}
};

int main(int argc, char* argv[]) {
rclcpp::init(argc, argv);

cactus_rt::ros2::App app("SimpleDataROS2Subscriber");
app.StartTraceSession("build/subscriber.perfetto");

constexpr std::chrono::seconds time(30);
std::cout << "Testing RT loop for " << time.count() << " seconds.\n";

auto thread = app.CreateROS2EnabledThread<RTROS2SubscriberThread>(time);
app.RegisterThread(thread);

app.Start();

std::this_thread::sleep_for(time);

app.RequestStop();
app.Join();

std::cout << "Done\n";
return 0;
}
104 changes: 0 additions & 104 deletions examples/ros2/subscriber/main.cc

This file was deleted.

77 changes: 77 additions & 0 deletions examples/ros2/subscriber/simple_data.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#include <cactus_rt/ros2/app.h>
#include <cactus_rt/rt.h>

#include <chrono>
#include <memory>
#include <std_msgs/msg/int64.hpp>

using RealtimeType = std_msgs::msg::Int64;
using RosType = std_msgs::msg::Int64;

class RTROS2SubscriberThread : public cactus_rt::CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin {
int64_t last_msg_id_ = 0;
int64_t run_duration_;

// We force turn off the trivial data check, because the ROS message data type
// has a defined constructor with code in it. That said, the code really is
// pretty trivial so it is safe to use in real-time. We thus disable the trivial
// type check manually.
std::shared_ptr<cactus_rt::ros2::SubscriptionLatest<RealtimeType, RosType, false>> subscription_;

static cactus_rt::CyclicThreadConfig CreateThreadConfig() {
cactus_rt::CyclicThreadConfig thread_config;
thread_config.period_ns = 1'000'000;
thread_config.cpu_affinity = std::vector<size_t>{2};
thread_config.SetFifoScheduler(80);

return thread_config;
}

public:
explicit RTROS2SubscriberThread(std::chrono::nanoseconds run_duration = std::chrono::seconds(30))
: cactus_rt::CyclicThread("RTROS2Subscriber", CreateThreadConfig()),
run_duration_(run_duration.count()) {}

void InitializeForRos2() override {
subscription_ = ros2_adapter_->CreateSubscriptionForLatestValue<RealtimeType, RosType, false>("/cactus_rt/simple", rclcpp::QoS(100));
}

protected:
bool Loop(int64_t elapsed_ns) noexcept override {
cactus_rt::ros2::StampedValue<std_msgs::msg::Int64> msg;
{
const auto span = Tracer().WithSpan("Subscription::ReadLatest");
msg = subscription_->ReadLatest();
}

if (msg.id != last_msg_id_) {
LOG_INFO(Logger(), "Got new data at {}: {} {}", elapsed_ns, msg.id, msg.value.data);
last_msg_id_ = msg.id;
}

return elapsed_ns > run_duration_;
}
};

int main(int argc, char* argv[]) {
rclcpp::init(argc, argv);

cactus_rt::ros2::App app("SimpleDataROS2Subscriber");
app.StartTraceSession("build/subscriber.perfetto");

constexpr std::chrono::seconds time(30);
std::cout << "Testing RT loop for " << time.count() << " seconds.\n";

auto thread = app.CreateROS2EnabledThread<RTROS2SubscriberThread>(time);
app.RegisterThread(thread);

app.Start();

std::this_thread::sleep_for(time);

app.RequestStop();
app.Join();

std::cout << "Done\n";
return 0;
}
2 changes: 1 addition & 1 deletion include/cactus_rt/ros2/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class Publisher : public IPublisher {
static_assert(!CheckForTrivialRealtimeT || std::is_trivial_v<RealtimeT>, "RealtimeT must be a trivial object to be real-time safe");

using NoConversion = std::is_same<RealtimeT, RosT>;
using AdaptedRosType = typename std::conditional<NoConversion::value, RosT, rclcpp::TypeAdapter<RealtimeT, RosT>>::type;
using AdaptedRosType = typename std::conditional_t<NoConversion::value, RosT, rclcpp::TypeAdapter<RealtimeT, RosT>>;

typename rclcpp::Publisher<AdaptedRosType>::SharedPtr publisher_;
moodycamel::ReaderWriterQueue<RealtimeT> queue_;
Expand Down
6 changes: 3 additions & 3 deletions include/cactus_rt/ros2/ros2_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ class Ros2Adapter {
return publisher;
}

template <typename RealtimeT, typename RosT>
std::shared_ptr<SubscriptionLatest<RealtimeT, RosT>> CreateSubscriptionForLatestValue(
template <typename RealtimeT, typename RosT, bool CheckForTrivialRealtimeT = true>
std::shared_ptr<SubscriptionLatest<RealtimeT, RosT, CheckForTrivialRealtimeT>> CreateSubscriptionForLatestValue(
const std::string& topic_name,
const rclcpp::QoS& qos
) {
auto subscriber = SubscriptionLatest<RealtimeT, RosT>::Create(*this->ros_node_, topic_name, qos);
auto subscriber = SubscriptionLatest<RealtimeT, RosT, CheckForTrivialRealtimeT>::Create(*this->ros_node_, topic_name, qos);

const std::scoped_lock lock(mut_);
subscriptions_.push_back(subscriber);
Expand Down
Loading

0 comments on commit d32b107

Please sign in to comment.