Skip to content

Commit

Permalink
First step towards modernizing the rt publisher (backport #210) (#233)
Browse files Browse the repository at this point in the history
Co-authored-by: Lennart Nachtigall <[email protected]>
  • Loading branch information
mergify[bot] and firesurfer authored Dec 19, 2024
1 parent e9140b8 commit 48e9d15
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 17 deletions.
63 changes: 46 additions & 17 deletions include/realtime_tools/realtime_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,39 @@

namespace realtime_tools
{
template <class Msg>
template <class MessageT>
class RealtimePublisher
{
private:
using PublisherSharedPtr = typename rclcpp::Publisher<Msg>::SharedPtr;

public:
/// Provide various typedefs to resemble the rclcpp::Publisher type
using PublisherType = rclcpp::Publisher<MessageT>;
using PublisherSharedPtr = typename rclcpp::Publisher<MessageT>::SharedPtr;

using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
using ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type;

RCLCPP_SMART_PTR_DEFINITIONS(RealtimePublisher<MessageT>)

/// The msg_ variable contains the data that will get published on the ROS topic.
Msg msg_;
MessageT msg_;

/** \brief Constructor for the realtime publisher
*
* \param publisher the publisher to wrap
*/
explicit RealtimePublisher(PublisherSharedPtr publisher)
: publisher_(publisher), is_running_(false), keep_running_(true), turn_(LOOP_NOT_STARTED)
: publisher_(publisher), is_running_(false), keep_running_(true), turn_(State::LOOP_NOT_STARTED)
{
thread_ = std::thread(&RealtimePublisher::publishingLoop, this);
}

RealtimePublisher() : is_running_(false), keep_running_(false), turn_(LOOP_NOT_STARTED) {}
[[deprecated(
"Use constructor with rclcpp::Publisher<T>::SharedPtr instead - this class does not make sense "
"without a real publisher")]]
RealtimePublisher()
: is_running_(false), keep_running_(false), turn_(State::LOOP_NOT_STARTED)
{
}

/// Destructor
~RealtimePublisher()
Expand Down Expand Up @@ -101,7 +113,7 @@ class RealtimePublisher
bool trylock()
{
if (msg_mutex_.try_lock()) {
if (turn_ == REALTIME) {
if (turn_ == State::REALTIME) {
return true;
} else {
msg_mutex_.unlock();
Expand All @@ -112,6 +124,23 @@ class RealtimePublisher
}
}

/** \brief Try to get the data lock from realtime and publish the given message
*
* Tries to gain unique access to msg_ variable. If this succeeds
* update the msg_ variable and call unlockAndPublish
* @return false in case no lock for the realtime variable could be acquired
*/
bool tryPublish(const MessageT & msg)
{
if (!trylock()) {
return false;
}

msg_ = msg;
unlockAndPublish();
return true;
}

/** \brief Unlock the msg_ variable
*
* After a successful trylock and after the data is written to the mgs_
Expand All @@ -120,7 +149,7 @@ class RealtimePublisher
*/
void unlockAndPublish()
{
turn_ = NON_REALTIME;
turn_ = State::NON_REALTIME;
unlock();
}

Expand Down Expand Up @@ -163,10 +192,10 @@ class RealtimePublisher
void publishingLoop()
{
is_running_ = true;
turn_ = REALTIME;
turn_ = State::REALTIME;

while (keep_running_) {
Msg outgoing;
MessageT outgoing;

// Locks msg_ and copies it

Expand All @@ -176,7 +205,7 @@ class RealtimePublisher
lock();
#endif

while (turn_ != NON_REALTIME && keep_running_) {
while (turn_ != State::NON_REALTIME && keep_running_) {
#ifdef NON_POLLING
updated_cond_.wait(lock_);
#else
Expand All @@ -186,7 +215,7 @@ class RealtimePublisher
#endif
}
outgoing = msg_;
turn_ = REALTIME;
turn_ = State::REALTIME;

unlock();

Expand All @@ -210,12 +239,12 @@ class RealtimePublisher
std::condition_variable updated_cond_;
#endif

enum { REALTIME, NON_REALTIME, LOOP_NOT_STARTED };
std::atomic<int> turn_; // Who's turn is it to use msg_?
enum class State : int { REALTIME, NON_REALTIME, LOOP_NOT_STARTED };
std::atomic<State> turn_; // Who's turn is it to use msg_?
};

template <class Msg>
using RealtimePublisherSharedPtr = std::shared_ptr<RealtimePublisher<Msg>>;
template <class MessageT>
using RealtimePublisherSharedPtr = std::shared_ptr<RealtimePublisher<MessageT>>;

} // namespace realtime_tools
#endif // REALTIME_TOOLS__REALTIME_PUBLISHER_HPP_
41 changes: 41 additions & 0 deletions test/realtime_publisher_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,44 @@ TEST(RealtimePublisher, rt_publish)
EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str());
rclcpp::shutdown();
}

TEST(RealtimePublisher, rt_try_publish)
{
rclcpp::init(0, nullptr);
const size_t ATTEMPTS = 10;
const std::chrono::milliseconds DELAY(250);

const char * expected_msg = "Hello World";
auto node = std::make_shared<rclcpp::Node>("construct_move_destruct");
rclcpp::QoS qos(10);
qos.reliable().transient_local();
auto pub = node->create_publisher<StringMsg>("~/rt_publish", qos);
RealtimePublisher<StringMsg> rt_pub(pub);

// try publish a latched message
bool publish_success = false;
for (std::size_t i = 0; i < ATTEMPTS; ++i) {
StringMsg msg;
msg.string_value = expected_msg;

if (rt_pub.tryPublish(msg)) {
publish_success = true;
break;
}
std::this_thread::sleep_for(DELAY);
}
ASSERT_TRUE(publish_success);

// make sure subscriber gets it
StringCallback str_callback;

auto sub = node->create_subscription<StringMsg>(
"~/rt_publish", qos,
std::bind(&StringCallback::callback, &str_callback, std::placeholders::_1));
for (size_t i = 0; i < ATTEMPTS && str_callback.msg_.string_value.empty(); ++i) {
rclcpp::spin_some(node);
std::this_thread::sleep_for(DELAY);
}
EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str());
rclcpp::shutdown();
}
41 changes: 41 additions & 0 deletions test/realtime_publisher_tests_non_polling.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,44 @@ TEST(RealtimePublisherNonPolling, rt_publish)
EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str());
rclcpp::shutdown();
}

TEST(RealtimePublisher, rt_try_publish)
{
rclcpp::init(0, nullptr);
const size_t ATTEMPTS = 10;
const std::chrono::milliseconds DELAY(250);

const char * expected_msg = "Hello World";
auto node = std::make_shared<rclcpp::Node>("construct_move_destruct");
rclcpp::QoS qos(10);
qos.reliable().transient_local();
auto pub = node->create_publisher<StringMsg>("~/rt_publish", qos);
RealtimePublisher<StringMsg> rt_pub(pub);

// try publish a latched message
bool publish_success = false;
for (std::size_t i = 0; i < ATTEMPTS; ++i) {
StringMsg msg;
msg.string_value = expected_msg;

if (rt_pub.tryPublish(msg)) {
publish_success = true;
break;
}
std::this_thread::sleep_for(DELAY);
}
ASSERT_TRUE(publish_success);

// make sure subscriber gets it
StringCallback str_callback;

auto sub = node->create_subscription<StringMsg>(
"~/rt_publish", qos,
std::bind(&StringCallback::callback, &str_callback, std::placeholders::_1));
for (size_t i = 0; i < ATTEMPTS && str_callback.msg_.string_value.empty(); ++i) {
rclcpp::spin_some(node);
std::this_thread::sleep_for(DELAY);
}
EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str());
rclcpp::shutdown();
}

0 comments on commit 48e9d15

Please sign in to comment.