Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Karsten/serialized ipm touchups #2

Draft
wants to merge 9 commits into
base: dnae_adas/serialized_ipm
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/publisher_base.cpp
src/rclcpp/qos.cpp
src/rclcpp/qos_event.cpp
src/rclcpp/serialization.cpp
src/rclcpp/serialized_message.cpp
src/rclcpp/service.cpp
src/rclcpp/signal_handler.cpp
src/rclcpp/subscription_base.cpp
Expand Down Expand Up @@ -405,6 +407,15 @@ if(BUILD_TESTING)
${PROJECT_NAME}
)
endif()
ament_add_gtest(test_serialized_message test/test_serialized_message.cpp)
if(TARGET test_serialized_message)
ament_target_dependencies(test_serialized_message
test_msgs
)
target_link_libraries(test_serialized_message
${PROJECT_NAME}
)
endif()
ament_add_gtest(test_service test/test_service.cpp)
if(TARGET test_service)
ament_target_dependencies(test_service
Expand Down
3 changes: 1 addition & 2 deletions rclcpp/include/rclcpp/create_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ create_publisher(
const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>();

return create_publisher<MessageT, AllocatorT, PublisherT, NodeT>(
node, topic_name, type_support,
qos, options);
node, topic_name, type_support, qos, options);
}

} // namespace rclcpp
Expand Down
16 changes: 6 additions & 10 deletions rclcpp/include/rclcpp/create_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,10 @@ create_subscription(
const rosidl_message_type_support_t & type_support,
const rclcpp::QoS & qos,
CallbackT && callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options = (
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>()
),
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = (
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options =
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>(),
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat =
MessageMemoryStrategyT::create_default()
)
)
{
using rclcpp::node_interfaces::get_node_topics_interface;
Expand Down Expand Up @@ -96,12 +94,10 @@ create_subscription(
const std::string & topic_name,
const rclcpp::QoS & qos,
CallbackT && callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options = (
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>()
),
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = (
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options =
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>(),
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat =
MessageMemoryStrategyT::create_default()
)
)
{
const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>();
Expand Down
8 changes: 2 additions & 6 deletions rclcpp/include/rclcpp/experimental/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,11 @@ class IntraProcessManager
* In addition this generates a unique intra process id for the subscription.
*
* \param subscription the SubscriptionIntraProcess to register.
* \param is_serialized true if the buffer expects serialized messages
* \return an unsigned 64-bit integer which is the subscription's unique id.
*/
RCLCPP_PUBLIC
uint64_t
add_subscription(
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription,
const bool is_serialized = false);
add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription);

/// Unregister a subscription using the subscription's unique id.
/**
Expand All @@ -137,12 +134,11 @@ class IntraProcessManager
* In addition this generates a unique intra process id for the publisher.
*
* \param publisher publisher to be registered with the manager.
* \param is_serialized true if the buffer expects serialized messages
* \return an unsigned 64-bit integer which is the publisher's unique id.
*/
RCLCPP_PUBLIC
uint64_t
add_publisher(rclcpp::PublisherBase::SharedPtr publisher, const bool is_serialized = false);
add_publisher(rclcpp::PublisherBase::SharedPtr publisher);

/// Unregister a publisher using the publisher's unique id.
/**
Expand Down
24 changes: 12 additions & 12 deletions rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
const std::string & topic_name,
rmw_qos_profile_t qos_profile,
rclcpp::IntraProcessBufferType buffer_type,
std::shared_ptr<SerializationBase> serializer)
: SubscriptionIntraProcessBase(topic_name, qos_profile),
std::shared_ptr<SerializationBase> serializer,
bool is_serialized = false)
: SubscriptionIntraProcessBase(topic_name, qos_profile, is_serialized),
any_callback_(callback), serializer_(serializer)
{
if (!std::is_same<MessageT, CallbackMessageT>::value &&
!std::is_same<MessageT, rclcpp::SerializedMessage>::value &&
!std::is_same<CallbackMessageT, rcl_serialized_message_t>::value)
!std::is_base_of<rcl_serialized_message_t, MessageT>::value)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't exactly sure why this check is happening. Also, I wouldn't compile if we check for the CallbackMessageT, which I believe is the right thing to check for though.

{
throw std::runtime_error("SubscriptionIntraProcess wrong callback type");
}
Expand Down Expand Up @@ -170,15 +170,15 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
msg_info.from_intra_process = true;

ConstMessageSharedPtr msg = buffer_->consume_shared();
auto serialized_msg =
serializer_->serialize_message(reinterpret_cast<const void *>(msg.get()));
auto serialized_msg = std::make_shared<rclcpp::SerializedMessage>();
serializer_->serialize_message(reinterpret_cast<const void *>(msg.get()), serialized_msg.get());

if (nullptr == serialized_msg) {
if (0u == serialized_msg->buffer_length) {
throw std::runtime_error("Subscription intra-process could not serialize message");
}

if (any_callback_.use_take_shared_method()) {
any_callback_.dispatch_intra_process(serialized_msg, msg_info);
any_callback_.dispatch_intra_process(std::static_pointer_cast<rcl_serialized_message_t>(serialized_msg), msg_info);
} else {
throw std::runtime_error(
"Subscription intra-process for serialized "
Expand Down Expand Up @@ -243,8 +243,8 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
throw std::runtime_error("Subscription intra-process can't handle unserialized messages");
}

ConstMessageSharedPtr serialized_container = buffer_->consume_shared();
if (nullptr == serialized_container) {
ConstMessageSharedPtr serialized_message = buffer_->consume_shared();
if (nullptr == serialized_message) {
throw std::runtime_error("Subscription intra-process could not get serialized message");
}

Expand All @@ -254,13 +254,13 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
if (any_callback_.use_take_shared_method()) {
CallbackMessageSharedPtr msg = construct_unique();
serializer_->deserialize_message(
serialized_container.get(),
serialized_message.get(),
reinterpret_cast<void *>(msg.get()));
any_callback_.dispatch_intra_process(msg, msg_info);
} else {
CallbackMessageUniquePtr msg = construct_unique();
serializer_->deserialize_message(
serialized_container.get(),
serialized_message.get(),
reinterpret_cast<void *>(msg.get()));
any_callback_.dispatch_intra_process(std::move(msg), msg_info);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
RCLCPP_SMART_PTR_ALIASES_ONLY(SubscriptionIntraProcessBase)

RCLCPP_PUBLIC
SubscriptionIntraProcessBase(const std::string & topic_name, rmw_qos_profile_t qos_profile)
: topic_name_(topic_name), qos_profile_(qos_profile)
SubscriptionIntraProcessBase(
const std::string & topic_name,
rmw_qos_profile_t qos_profile,
bool is_serialized)
: topic_name_(topic_name), qos_profile_(qos_profile), is_serialized_(is_serialized)
{}

virtual ~SubscriptionIntraProcessBase() = default;
Expand Down Expand Up @@ -70,6 +73,10 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
rmw_qos_profile_t
get_actual_qos() const;

RCLCPP_PUBLIC
bool
is_serialized() const;

protected:
std::recursive_mutex reentrant_mutex_;
rcl_guard_condition_t gc_;
Expand All @@ -80,6 +87,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable

std::string topic_name_;
rmw_qos_profile_t qos_profile_;
bool is_serialized_;
};

} // namespace experimental
Expand Down
42 changes: 20 additions & 22 deletions rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,14 @@ class Publisher : public PublisherBase
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options)
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options,
bool is_serialized = false)
: PublisherBase(
node_base,
topic,
*rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
options.template to_rcl_publisher_options<MessageT>(qos)),
options.template to_rcl_publisher_options<MessageT>(qos),
is_serialized),
options_(options),
message_allocator_(new MessageAllocator(*options.get_allocator().get())),
message_allocator_serialized_(new SerializedMessageAllocator(*options.get_allocator().get()))
Expand All @@ -87,12 +89,14 @@ class Publisher : public PublisherBase
const std::string & topic,
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options,
const rosidl_message_type_support_t & type_support)
const rosidl_message_type_support_t & type_support,
bool is_serialized = false)
: PublisherBase(
node_base,
topic,
type_support,
options.template to_rcl_publisher_options<MessageT>(qos)),
options.template to_rcl_publisher_options<MessageT>(qos),
is_serialized),
options_(options),
message_allocator_(new MessageAllocator(*options.get_allocator().get())),
message_allocator_serialized_(new SerializedMessageAllocator(*options.get_allocator().get()))
Expand Down Expand Up @@ -132,12 +136,7 @@ class Publisher : public PublisherBase
"intraprocess communication allowed only with volatile durability");
}
uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this());
uint64_t intra_process_publisher_id_serialized = ipm->add_publisher(
this->shared_from_this(), true);
this->setup_intra_process(
intra_process_publisher_id,
intra_process_publisher_id_serialized,
ipm);
this->setup_intra_process(intra_process_publisher_id, ipm);
}
}

Expand Down Expand Up @@ -229,6 +228,13 @@ class Publisher : public PublisherBase
this->do_serialized_publish(*serialized_msg);
}

/// Publish a serialized message.
template<typename TDeleter>
void publish(std::unique_ptr<SerializedMessage, TDeleter> serialized_msg)
{
this->do_serialized_publish(*serialized_msg);
}

/// Publish an instance of a LoanedMessage.
/**
* When publishing a loaned message, the memory for this ROS message will be deallocated
Expand Down Expand Up @@ -309,7 +315,7 @@ class Publisher : public PublisherBase
}

template<class T = MessageT>
typename std::enable_if<!std::is_same<T, rcl_serialized_message_t>::value>::type
typename std::enable_if<!std::is_base_of<rcl_serialized_message_t, T>::value>::type
do_publish_message(const T & msg)
{
// Avoid allocating when not using intra process.
Expand All @@ -327,7 +333,7 @@ class Publisher : public PublisherBase
}

template<class T = MessageT>
typename std::enable_if<std::is_same<T, rcl_serialized_message_t>::value>::type
typename std::enable_if<std::is_base_of<rcl_serialized_message_t, T>::value>::type
do_publish_message(const T & msg)
{
// Kept for backwards compatibility. Copies compelete memory!
Expand Down Expand Up @@ -411,12 +417,8 @@ class Publisher : public PublisherBase
throw std::runtime_error("cannot publish msg which is a null pointer");
}

const uint64_t intra_process_publisher_id = std::is_same<T,
rclcpp::SerializedMessage>::value ?
intra_process_publisher_id_serialized_ : intra_process_publisher_id_;

ipm->template do_intra_process_publish<T, AllocatorT>(
intra_process_publisher_id,
intra_process_publisher_id_,
std::move(msg),
message_allocator);
}
Expand All @@ -436,12 +438,8 @@ class Publisher : public PublisherBase
throw std::runtime_error("cannot publish msg which is a null pointer");
}

const uint64_t intra_process_publisher_id = std::is_same<T,
rclcpp::SerializedMessage>::value ?
intra_process_publisher_id_serialized_ : intra_process_publisher_id_;

return ipm->template do_intra_process_publish_and_return_shared<T, AllocatorT>(
intra_process_publisher_id,
intra_process_publisher_id_,
std::move(msg),
message_allocator);
}
Expand Down
12 changes: 9 additions & 3 deletions rclcpp/include/rclcpp/publisher_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rosidl_message_type_support_t & type_support,
const rcl_publisher_options_t & publisher_options);
const rcl_publisher_options_t & publisher_options,
bool is_serializd = false);

RCLCPP_PUBLIC
virtual ~PublisherBase();
Expand Down Expand Up @@ -191,9 +192,12 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
void
setup_intra_process(
uint64_t intra_process_publisher_id,
uint64_t intra_process_publisher_id_serialized,
IntraProcessManagerSharedPtr ipm);

RCLCPP_PUBLIC
bool
is_serialized() const;

protected:
template<typename EventCallbackT>
void
Expand Down Expand Up @@ -223,9 +227,11 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
bool intra_process_is_enabled_;
IntraProcessManagerWeakPtr weak_ipm_;
uint64_t intra_process_publisher_id_;
uint64_t intra_process_publisher_id_serialized_;

rmw_gid_t rmw_gid_;

private:
bool is_serialized_;
};

} // namespace rclcpp
Expand Down
4 changes: 2 additions & 2 deletions rclcpp/include/rclcpp/publisher_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ create_publisher_factory(
const rclcpp::QoS & qos
) -> std::shared_ptr<PublisherT>
{
auto is_serialized = std::is_base_of<rcl_serialized_message_t, MessageT>::value;
auto publisher = std::make_shared<PublisherT>(
node_base, topic_name, qos, options,
type_support);
node_base, topic_name, qos, options, type_support, is_serialized);
// This is used for setting up things like intra process comms which
// require this->shared_from_this() which cannot be called from
// the constructor.
Expand Down
Loading