Skip to content

Commit

Permalink
Avoid copying liveliness::Entity
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Mar 21, 2024
1 parent 47973be commit 1f507f7
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 142 deletions.
165 changes: 80 additions & 85 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp

Large diffs are not rendered by default.

29 changes: 16 additions & 13 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,21 @@ class TopicData
public:
liveliness::TopicInfo info_;

using EntitySet = std::unordered_set<
liveliness::ConstEntityPtr>;

// The publishers or clients entities.
std::unordered_set<liveliness::Entity> pubs_;
EntitySet pubs_;

// The subscriptions or services entities
std::unordered_set<liveliness::Entity> subs_;
EntitySet subs_;

// Returns nullptr if the entity does not contain topic_info.
static TopicDataPtr make(liveliness::Entity entity);
static TopicDataPtr make(liveliness::ConstEntityPtr entity);

private:
// Private constructor to force users to rely on make.
explicit TopicData(liveliness::Entity entity);
explicit TopicData(liveliness::ConstEntityPtr entity);
};

///=============================================================================
Expand Down Expand Up @@ -168,7 +171,7 @@ class GraphCache final
/// Set a qos event callback for an entity from the current session.
/// @note The callback will be removed when the entity is removed from the graph.
void set_qos_event_callback(
const liveliness::Entity & entity,
liveliness::ConstEntityPtr entity,
const rmw_zenoh_event_type_t & event_type,
GraphCacheEventCallback callback);

Expand All @@ -183,20 +186,20 @@ class GraphCache final
// Helper function to update TopicMap within the node the cache for the entire graph.
void update_topic_maps_for_put(
GraphNodePtr graph_node,
const liveliness::Entity & entity);
liveliness::ConstEntityPtr entity);

void update_topic_map_for_put(
GraphNode::TopicMap & topic_map,
const liveliness::Entity & entity,
liveliness::ConstEntityPtr entity,
bool report_events = false);

void update_topic_maps_for_del(
GraphNodePtr graph_node,
const liveliness::Entity & entity);
liveliness::ConstEntityPtr entity);

void update_topic_map_for_del(
GraphNode::TopicMap & topic_map,
const liveliness::Entity & entity,
liveliness::ConstEntityPtr entity,
bool report_events = false);

void remove_topic_map_from_cache(
Expand All @@ -217,15 +220,15 @@ class GraphCache final
const rmw_zenoh_event_type_t event_id);

void handle_matched_events_for_put(
const liveliness::Entity & entity,
liveliness::ConstEntityPtr entity,
const GraphNode::TopicQoSMap & topic_qos_map);

void handle_matched_events_for_del(
const liveliness::Entity & entity,
liveliness::ConstEntityPtr entity,
const GraphNode::TopicQoSMap & topic_qos_map);

using EntityEventMap =
std::unordered_map<liveliness::Entity, std::unordered_set<rmw_zenoh_event_type_t>>;
std::unordered_map<liveliness::ConstEntityPtr, std::unordered_set<rmw_zenoh_event_type_t>>;
void take_entities_with_events(EntityEventMap & entities_with_events);

std::string zid_str_;
Expand Down Expand Up @@ -270,7 +273,7 @@ class GraphCache final
// pub/sub with the exact same topic, type & QoS but registers a different callback
// for the same event type. We could switch to a multimap here but removing the callback
// will be impossible right now since entities do not have unique IDs.
using GraphEventCallbackMap = std::unordered_map<liveliness::Entity, GraphEventCallbacks>;
using GraphEventCallbackMap = std::unordered_map<liveliness::ConstEntityPtr, GraphEventCallbacks>;
// EventCallbackMap for each type of event we support in rmw_zenoh_cpp.
GraphEventCallbackMap event_callbacks_;
// Counters to track changes to event statues for each topic.
Expand Down
55 changes: 28 additions & 27 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ Entity::Entity(
}

///=============================================================================
std::optional<Entity> Entity::make(
std::shared_ptr<Entity> Entity::make(
z_id_t zid,
const std::string & nid,
const std::string & id,
Expand All @@ -301,33 +301,33 @@ std::optional<Entity> Entity::make(
{
if (id.empty()) {
RCUTILS_SET_ERROR_MSG("Invalid id.");
return std::nullopt;
return nullptr;
}
if (entity_to_str.find(type) == entity_to_str.end()) {
RCUTILS_SET_ERROR_MSG("Invalid entity type.");
return std::nullopt;
return nullptr;
}
if (node_info.ns_.empty() || node_info.name_.empty()) {
RCUTILS_SET_ERROR_MSG("Invalid node_info for entity.");
return std::nullopt;
return nullptr;
}
if (type != EntityType::Node && !topic_info.has_value()) {
RCUTILS_SET_ERROR_MSG("Invalid topic_info for entity.");
return std::nullopt;
return nullptr;
}

Entity entity{
zid_to_str(zid),
std::move(nid),
std::move(id),
std::move(type),
std::move(node_info),
std::move(topic_info)};
return entity;
return std::make_shared<Entity>(
Entity{
zid_to_str(zid),
std::move(nid),
std::move(id),
std::move(type),
std::move(node_info),
std::move(topic_info)});
}

///=============================================================================
std::optional<Entity> Entity::make(const std::string & keyexpr)
std::shared_ptr<Entity> Entity::make(const std::string & keyexpr)
{
std::vector<std::string> parts = split_keyexpr(keyexpr);
// Every token will contain at least 7 parts:
Expand All @@ -337,22 +337,22 @@ std::optional<Entity> Entity::make(const std::string & keyexpr)
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Received invalid liveliness token");
return std::nullopt;
return nullptr;
}
for (const std::string & p : parts) {
if (p.empty()) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Received invalid liveliness token");
return std::nullopt;
return nullptr;
}
}

if (parts[KeyexprIndex::AdminSpace] != ADMIN_SPACE) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Received liveliness token with invalid admin space.");
return std::nullopt;
return nullptr;
}

// Get the entity, ie NN, MP, MS, SS, SC.
Expand All @@ -363,7 +363,7 @@ std::optional<Entity> Entity::make(const std::string & keyexpr)
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Received liveliness token with invalid entity %s.", entity_str.c_str());
return std::nullopt;
return nullptr;
}

EntityType entity_type = entity_it->second;
Expand All @@ -381,14 +381,14 @@ std::optional<Entity> Entity::make(const std::string & keyexpr)
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Received liveliness token for non-node entity without required parameters.");
return std::nullopt;
return nullptr;
}
std::optional<rmw_qos_profile_t> qos = keyexpr_to_qos(parts[KeyexprIndex::TopicQoS]);
if (!qos.has_value()) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Received liveliness token with invalid qos keyexpr");
return std::nullopt;
return nullptr;
}
topic_info = TopicInfo{
demangle_name(std::move(parts[KeyexprIndex::TopicName])),
Expand All @@ -397,13 +397,14 @@ std::optional<Entity> Entity::make(const std::string & keyexpr)
};
}

return Entity{
std::move(zid),
std::move(nid),
std::move(id),
std::move(entity_type),
NodeInfo{std::move(domain_id), std::move(ns), std::move(node_name), ""},
std::move(topic_info)};
return std::make_shared<Entity>(
Entity{
std::move(zid),
std::move(nid),
std::move(id),
std::move(entity_type),
NodeInfo{std::move(domain_id), std::move(ns), std::move(node_name), ""},
std::move(topic_info)});
}

///=============================================================================
Expand Down
29 changes: 27 additions & 2 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <zenoh.h>

#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <vector>
Expand Down Expand Up @@ -102,6 +103,9 @@ enum class EntityType : uint8_t
* _dds:: prefix in the topic_type is an artifact of the type support implementation and is
* removed when reporting the topic_type in graph_cache.cpp (see _demangle_if_ros_type()).
*/
class Entity;
using EntityPtr = std::shared_ptr<Entity>;
using ConstEntityPtr = std::shared_ptr<const Entity>;
class Entity
{
public:
Expand All @@ -114,7 +118,7 @@ class Entity
/// @param node_info The node information that is required for all entities.
/// @param topic_info An optional topic information for relevant entities.
/// @return An entity if all inputs are valid. This way no invalid entities can be created.
static std::optional<Entity> make(
static EntityPtr make(
z_id_t zid,
const std::string & nid,
const std::string & id,
Expand All @@ -123,7 +127,7 @@ class Entity
std::optional<TopicInfo> topic_info = std::nullopt);

/// Make an Entity from a liveliness keyexpr.
static std::optional<Entity> make(const std::string & keyexpr);
static EntityPtr make(const std::string & keyexpr);

// Get the zenoh session id as a string. This is not unique as entities
// created within the same session, will have the same ids.
Expand Down Expand Up @@ -224,6 +228,27 @@ struct hash<liveliness::Entity>
return entity.guid();
}
};

template<>
struct hash<liveliness::ConstEntityPtr>
{
auto operator()(const liveliness::ConstEntityPtr & entity) const -> size_t
{
return entity->guid();
}
};

template<>
struct equal_to<liveliness::ConstEntityPtr>
{
auto operator()(
const liveliness::ConstEntityPtr & lhs,
const liveliness::ConstEntityPtr & rhs) const -> bool
{
return lhs->guid() == rhs->guid();
}
};

} // namespace std

#endif // DETAIL__LIVELINESS_UTILS_HPP_
13 changes: 7 additions & 6 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ class rmw_context_impl_s final
///=============================================================================
struct rmw_node_data_t
{
// TODO(Yadunund): Do we need a token at the node level? Right now I have one
// for cases where a node may spin up but does not have any publishers or subscriptions.
// The Entity generated for the node.
std::shared_ptr<liveliness::Entity> entity;

// Liveliness token for the node.
zc_owned_liveliness_token_t token;

Expand All @@ -87,7 +88,7 @@ class rmw_publisher_data_t final
{
public:
// The Entity generated for the publisher.
std::optional<liveliness::Entity> entity;
std::shared_ptr<liveliness::Entity> entity;

// An owned publisher.
z_owned_publisher_t pub;
Expand Down Expand Up @@ -156,7 +157,7 @@ class rmw_subscription_data_t final
{
public:
// The Entity generated for the subscription.
std::optional<liveliness::Entity> entity;
std::shared_ptr<liveliness::Entity> entity;

// An owned subscriber or querying_subscriber depending on the QoS settings.
std::variant<z_owned_subscriber_t, ze_owned_querying_subscriber_t> sub;
Expand Down Expand Up @@ -221,7 +222,7 @@ class rmw_service_data_t final
{
public:
// The Entity generated for the service.
std::optional<liveliness::Entity> entity;
std::shared_ptr<liveliness::Entity> entity;

z_owned_keyexpr_t keyexpr;
z_owned_queryable_t qable;
Expand Down Expand Up @@ -291,7 +292,7 @@ class rmw_client_data_t final
{
public:
// The Entity generated for the client.
std::optional<liveliness::Entity> entity;
std::shared_ptr<liveliness::Entity> entity;

z_owned_keyexpr_t keyexpr;
z_owned_closure_reply_t zn_closure_reply;
Expand Down
6 changes: 4 additions & 2 deletions rmw_zenoh_cpp/src/rmw_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ rmw_publisher_event_init(
rmw_publisher_data_t * pub_data = static_cast<rmw_publisher_data_t *>(publisher->data);
RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(pub_data->context, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(pub_data->entity, RMW_RET_INVALID_ARGUMENT);

if (publisher->implementation_identifier != rmw_zenoh_identifier) {
RMW_SET_ERROR_MSG("Publisher implementation identifier not from this implementation");
Expand All @@ -59,7 +60,7 @@ rmw_publisher_event_init(

// Register the event with graph cache.
pub_data->context->impl->graph_cache->set_qos_event_callback(
pub_data->entity.value(),
pub_data->entity,
rmw_event_it->second,
[pub_data,
event_id = rmw_event_it->second](std::unique_ptr<rmw_zenoh_event_status_t> zenoh_event)
Expand Down Expand Up @@ -91,6 +92,7 @@ rmw_subscription_event_init(
rmw_subscription_data_t * sub_data = static_cast<rmw_subscription_data_t *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data->context, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data->entity, RMW_RET_INVALID_ARGUMENT);

if (subscription->implementation_identifier != rmw_zenoh_identifier) {
RMW_SET_ERROR_MSG(
Expand All @@ -116,7 +118,7 @@ rmw_subscription_event_init(
}

sub_data->context->impl->graph_cache->set_qos_event_callback(
sub_data->entity.value(),
sub_data->entity,
rmw_event_it->second,
[sub_data,
event_id = rmw_event_it->second](std::unique_ptr<rmw_zenoh_event_status_t> zenoh_event)
Expand Down
Loading

0 comments on commit 1f507f7

Please sign in to comment.