Skip to content

Commit

Permalink
Merge branch 'ros2:rolling' into rolling
Browse files Browse the repository at this point in the history
  • Loading branch information
imstevenpmwork authored Nov 28, 2024
2 parents c8f6827 + 8306a63 commit efba411
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 86 deletions.
5 changes: 4 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ rmw_ret_t ClientData::send_request(
opts.value.payload = z_bytes_t{data_length, reinterpret_cast<const uint8_t *>(request_bytes)};
// TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures,
// capture shared_from_this() instead of this.
num_in_flight_++;
z_owned_closure_reply_t zn_closure_reply =
z_closure(client_data_handler, client_data_drop, this);
z_get(
Expand Down Expand Up @@ -563,7 +564,7 @@ bool ClientData::shutdown_and_query_in_flight()
///=============================================================================
void ClientData::decrement_in_flight_and_conditionally_remove()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
std::unique_lock<std::recursive_mutex> lock(mutex_);
--num_in_flight_;

if (is_shutdown_ && num_in_flight_ == 0) {
Expand All @@ -575,6 +576,8 @@ void ClientData::decrement_in_flight_and_conditionally_remove()
if (node_data == nullptr) {
return;
}
// We have to unlock here since we are about to delete ourself, and thus the unlock would be UB.
lock.unlock();
node_data->delete_client_data(rmw_client_);
}
}
Expand Down
13 changes: 0 additions & 13 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,19 +232,6 @@ class rmw_context_impl_s::Data final
return ret;
}

// Shutdown all the nodes in this context.
for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) {
ret = node_it->second->shutdown();
if (ret != RMW_RET_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to shutdown node with id %zu. rmw_ret_t code: %zu.",
node_it->second->id(),
ret
);
}
}

z_undeclare_subscriber(z_move(graph_subscriber_));
if (shm_manager_.has_value()) {
z_drop(z_move(shm_manager_.value()));
Expand Down
80 changes: 15 additions & 65 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ NodeData::~NodeData()
///=============================================================================
std::size_t NodeData::id() const
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> lock(mutex_);
return id_;
}

Expand All @@ -128,7 +128,7 @@ bool NodeData::create_pub_data(
const rosidl_message_type_support_t * type_support,
const rmw_qos_profile_t * qos_profile)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
if (is_shutdown_) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
Expand Down Expand Up @@ -169,7 +169,7 @@ bool NodeData::create_pub_data(
///=============================================================================
PublisherDataPtr NodeData::get_pub_data(const rmw_publisher_t * const publisher)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
auto it = pubs_.find(publisher);
if (it == pubs_.end()) {
return nullptr;
Expand All @@ -181,7 +181,7 @@ PublisherDataPtr NodeData::get_pub_data(const rmw_publisher_t * const publisher)
///=============================================================================
void NodeData::delete_pub_data(const rmw_publisher_t * const publisher)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
pubs_.erase(publisher);
}

Expand All @@ -195,7 +195,7 @@ bool NodeData::create_sub_data(
const rosidl_message_type_support_t * type_support,
const rmw_qos_profile_t * qos_profile)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
if (is_shutdown_) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
Expand Down Expand Up @@ -237,7 +237,7 @@ bool NodeData::create_sub_data(
///=============================================================================
SubscriptionDataPtr NodeData::get_sub_data(const rmw_subscription_t * const subscription)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
auto it = subs_.find(subscription);
if (it == subs_.end()) {
return nullptr;
Expand All @@ -249,7 +249,7 @@ SubscriptionDataPtr NodeData::get_sub_data(const rmw_subscription_t * const subs
///=============================================================================
void NodeData::delete_sub_data(const rmw_subscription_t * const subscription)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
subs_.erase(subscription);
}

Expand All @@ -262,7 +262,7 @@ bool NodeData::create_service_data(
const rosidl_service_type_support_t * type_supports,
const rmw_qos_profile_t * qos_profile)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
if (is_shutdown_) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
Expand Down Expand Up @@ -303,7 +303,7 @@ bool NodeData::create_service_data(
///=============================================================================
ServiceDataPtr NodeData::get_service_data(const rmw_service_t * const service)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
auto it = services_.find(service);
if (it == services_.end()) {
return nullptr;
Expand All @@ -315,7 +315,7 @@ ServiceDataPtr NodeData::get_service_data(const rmw_service_t * const service)
///=============================================================================
void NodeData::delete_service_data(const rmw_service_t * const service)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
services_.erase(service);
}

Expand All @@ -329,7 +329,7 @@ bool NodeData::create_client_data(
const rosidl_service_type_support_t * type_supports,
const rmw_qos_profile_t * qos_profile)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
if (is_shutdown_) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
Expand Down Expand Up @@ -371,7 +371,7 @@ bool NodeData::create_client_data(
///=============================================================================
ClientDataPtr NodeData::get_client_data(const rmw_client_t * const client)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
auto it = clients_.find(client);
if (it == clients_.end()) {
return nullptr;
Expand All @@ -383,7 +383,7 @@ ClientDataPtr NodeData::get_client_data(const rmw_client_t * const client)
///=============================================================================
void NodeData::delete_client_data(const rmw_client_t * const client)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
auto client_it = clients_.find(client);
if (client_it == clients_.end()) {
return;
Expand All @@ -396,62 +396,12 @@ void NodeData::delete_client_data(const rmw_client_t * const client)
///=============================================================================
rmw_ret_t NodeData::shutdown()
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> lock(mutex_);
rmw_ret_t ret = RMW_RET_OK;
if (is_shutdown_) {
return ret;
}

// Shutdown all the entities within this node.
for (auto pub_it = pubs_.begin(); pub_it != pubs_.end(); ++pub_it) {
ret = pub_it->second->shutdown();
if (ret != RMW_RET_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to shutdown publisher %s within id %zu. rmw_ret_t code: %zu.",
pub_it->second->topic_info().name_.c_str(),
id_,
ret
);
}
}
for (auto sub_it = subs_.begin(); sub_it != subs_.end(); ++sub_it) {
ret = sub_it->second->shutdown();
if (ret != RMW_RET_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to shutdown subscription %s within id %zu. rmw_ret_t code: %zu.",
sub_it->second->topic_info().name_.c_str(),
id_,
ret
);
}
}
for (auto srv_it = services_.begin(); srv_it != services_.end(); ++srv_it) {
ret = srv_it->second->shutdown();
if (ret != RMW_RET_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to shutdown service %s within id %zu. rmw_ret_t code: %zu.",
srv_it->second->topic_info().name_.c_str(),
id_,
ret
);
}
}
for (auto cli_it = clients_.begin(); cli_it != clients_.end(); ++cli_it) {
ret = cli_it->second->shutdown();
if (ret != RMW_RET_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to shutdown client %s within id %zu. rmw_ret_t code: %zu.",
cli_it->second->topic_info().name_.c_str(),
id_,
ret
);
}
}

// Unregister this node from the ROS graph.
zc_liveliness_undeclare_token(z_move(token_));

Expand All @@ -463,7 +413,7 @@ rmw_ret_t NodeData::shutdown()
// Check if the Node is shutdown.
bool NodeData::is_shutdown() const
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> lock(mutex_);
return is_shutdown_;
}

Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/detail/rmw_node_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class NodeData final
std::shared_ptr<liveliness::Entity> entity,
zc_owned_liveliness_token_t token);
// Internal mutex.
mutable std::mutex mutex_;
mutable std::recursive_mutex mutex_;
// The rmw_node_t associated with this NodeData.
const rmw_node_t * node_;
// The entity id of this node as generated by get_next_entity_id().
Expand Down
10 changes: 6 additions & 4 deletions rmw_zenoh_cpp/src/detail/rmw_service_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ void service_data_handler(const z_query_t * query, void * data)
return;
}

service_data->add_new_query(std::make_unique<ZenohQuery>(query));
std::chrono::nanoseconds::rep received_timestamp =
std::chrono::system_clock::now().time_since_epoch().count();

service_data->add_new_query(std::make_unique<ZenohQuery>(query, received_timestamp));
}

///=============================================================================
Expand Down Expand Up @@ -339,9 +342,8 @@ rmw_ret_t ServiceData::take_request(
RMW_SET_ERROR_MSG("Could not get client GID from attachment");
return RMW_RET_ERROR;
}
auto now = std::chrono::system_clock::now().time_since_epoch();
auto now_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(now);
request_header->received_timestamp = now_ns.count();

request_header->received_timestamp = query->get_received_timestamp();

// Add this query to the map, so that rmw_send_response can quickly look it up later.
const size_t hash = rmw_zenoh_cpp::hash_gid(request_header->request_id.writer_guid);
Expand Down
9 changes: 8 additions & 1 deletion rmw_zenoh_cpp/src/detail/zenoh_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,16 @@ create_map_and_set_sequence_num(
}

///=============================================================================
ZenohQuery::ZenohQuery(const z_query_t * query)
ZenohQuery::ZenohQuery(const z_query_t * query, std::chrono::nanoseconds::rep received_timestamp)
{
query_ = z_query_clone(query);
received_timestamp_ = received_timestamp;
}

///=============================================================================
std::chrono::nanoseconds::rep ZenohQuery::get_received_timestamp() const
{
return received_timestamp_;
}

///=============================================================================
Expand Down
5 changes: 4 additions & 1 deletion rmw_zenoh_cpp/src/detail/zenoh_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,17 @@ class ZenohReply final
class ZenohQuery final
{
public:
ZenohQuery(const z_query_t * query);
ZenohQuery(const z_query_t * query, std::chrono::nanoseconds::rep received_timestamp);

~ZenohQuery();

const z_query_t get_query() const;

std::chrono::nanoseconds::rep get_received_timestamp() const;

private:
z_owned_query_t query_;
std::chrono::nanoseconds::rep received_timestamp_;
};
} // namespace rmw_zenoh_cpp

Expand Down

0 comments on commit efba411

Please sign in to comment.