From 00d855417d0b29d34fed66c5a4f2f7df8e64652f Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 12 Nov 2024 13:57:05 -0500 Subject: [PATCH 1/4] Revamp of rmw_context_impl_s (#307) * Make rmw_context_impl_s::Data private. That way we can completely hide the implementation in rmw_context_impl_s.cpp. Signed-off-by: Chris Lalancette * Remove subscribe_to_ros_graph method. We can just do this work right in the constructor, which simplifies it. Signed-off-by: Chris Lalancette * Remove unused is_initialized_ member. Signed-off-by: Chris Lalancette * Remove unused allocator member. Signed-off-by: Chris Lalancette * Stop storing the liveliness_str. We only need it during construction, so only use it there. Signed-off-by: Chris Lalancette * Make struct Data "final". Signed-off-by: Chris Lalancette * Keep the Data members around while they are being used. What we do here is to create a global map between a Data pointer, and its shared_ptr equivalent. When we create a new Data ptr via rmw_context_impl_s, we add it to the map. When we delete a Data ptr via rmw_context_impl_s, we remove it from the map. When we get a graph callback, we look to see if the pointer is in the map; if so, we get the shared_ptr. Because of this, we are guaranteed that the Data pointer will not be removed while we are using it. Signed-off-by: Chris Lalancette * Revamp Data and rmw_context_impl_s. In particular, since all of the data is owned by Data, move all of the functionality into there as well. Then it is clear who owns all of it, instead of it being split across rmw_context_impl_s and Data. Signed-off-by: Chris Lalancette * Feedback from review. Signed-off-by: Chris Lalancette * Feedback from review. Signed-off-by: Chris Lalancette * Style. Signed-off-by: Chris Lalancette --------- Signed-off-by: Chris Lalancette --- .../src/detail/rmw_context_impl_s.cpp | 742 ++++++++++-------- .../src/detail/rmw_context_impl_s.hpp | 74 +- 2 files changed, 430 insertions(+), 386 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index ed7fa87b..374af78e 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -14,391 +14,531 @@ #include "rmw_context_impl_s.hpp" +#include +#include +#include +#include #include #include +#include #include #include +#include #include +#include "graph_cache.hpp" #include "guard_condition.hpp" #include "identifier.hpp" -#include "liveliness_utils.hpp" #include "logging_macros.hpp" +#include "rmw_node_data.hpp" #include "zenoh_config.hpp" #include "zenoh_router_check.hpp" #include "rcpputils/scope_exit.hpp" #include "rmw/error_handling.h" -#include "rmw/impl/cpp/macros.hpp" // Megabytes of SHM to reserve. // TODO(clalancette): Make this configurable, or get it from the configuration #define SHM_BUFFER_SIZE_MB 10 -///============================================================================= -void rmw_context_impl_s::graph_sub_data_handler(const z_sample_t * sample, void * data) +// This global mapping of raw Data pointers to Data shared pointers allows graph_sub_data_handler() +// to lookup the pointer, and gain a reference to a shared_ptr if it exists. +// This guarantees that the Data object will not be destroyed while we are using it. +static std::mutex data_to_data_shared_ptr_map_mutex; +static std::unordered_map> data_to_data_shared_ptr_map; + +static void graph_sub_data_handler(const z_sample_t * sample, void * data); + +// Bundle all class members into a data struct which can be passed as a +// weak ptr to various threads for thread-safe access without capturing +// "this" ptr by reference. +class rmw_context_impl_s::Data final { - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - auto free_keystr = rcpputils::make_scope_exit( - [&keystr]() { - z_drop(z_move(keystr)); - }); +public: + // Constructor. + Data( + std::size_t domain_id, + const std::string & enclave) + : domain_id_(std::move(domain_id)), + enclave_(std::move(enclave)), + is_shutdown_(false), + next_entity_id_(0), + nodes_({}) + { + // Initialize the zenoh configuration. + z_owned_config_t config; + rmw_ret_t ret; + if ((ret = + rmw_zenoh_cpp::get_z_config( + rmw_zenoh_cpp::ConfigurableEntity::Session, + &config)) != RMW_RET_OK) + { + throw std::runtime_error("Error configuring Zenoh session."); + } - auto data_ptr = static_cast(data); - if (data_ptr == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "[graph_sub_data_handler] Invalid data_ptr." - ); - return; - } + // Check if shm is enabled. + z_owned_str_t shm_enabled = zc_config_get(z_loan(config), "transport/shared_memory/enabled"); + auto always_free_shm_enabled = rcpputils::make_scope_exit( + [&shm_enabled]() { + z_drop(z_move(shm_enabled)); + }); + + // Initialize the zenoh session. + session_ = z_open(z_move(config)); + if (!z_session_check(&session_)) { + throw std::runtime_error("Error setting up zenoh session."); + } + auto close_session = rcpputils::make_scope_exit( + [this]() { + z_close(z_move(session_)); + }); + + // TODO(Yadunund) Move this check into a separate thread. + // Verify if the zenoh router is running if configured. + const std::optional configured_connection_attempts = + rmw_zenoh_cpp::zenoh_router_check_attempts(); + if (configured_connection_attempts.has_value()) { + ret = RMW_RET_ERROR; + uint64_t connection_attempts = 0; + // Retry until the connection is successful. + while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) { + if ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(session_))) != RMW_RET_OK) { + ++connection_attempts; + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + if (ret != RMW_RET_OK) { + throw std::runtime_error( + "Unable to connect to a Zenoh router after " + + std::to_string(configured_connection_attempts.value()) + + " retries."); + } + } - // Update the graph cache. - std::lock_guard lock(data_ptr->mutex_); - if (data_ptr->is_shutdown_) { - return; - } - switch (sample->kind) { - case z_sample_kind_t::Z_SAMPLE_KIND_PUT: - data_ptr->graph_cache_->parse_put(keystr._cstr); - break; - case z_sample_kind_t::Z_SAMPLE_KIND_DELETE: - data_ptr->graph_cache_->parse_del(keystr._cstr); - break; - default: - return; - } + // Initialize the graph cache. + const z_id_t zid = z_info_zid(z_loan(session_)); + graph_cache_ = std::make_shared(zid); + // Setup liveliness subscriptions for discovery. + std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token(domain_id); + + // Query router/liveliness participants to get graph information before the session was started. + // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive + // replies for the zc_liveliness_get() call. This is necessary as if the `bound` + // is too low, the channel may starve the zenoh executor of its threads which + // would lead to deadlocks when trying to receive replies and block the + // execution here. + // The blocking channel will return when the sender end is closed which is + // the moment the query finishes. + // The non-blocking fifo exists only for the use case where we don't want to + // block the thread between responses (including the request termination response). + // In general, unless we want to cooperatively schedule other tasks on the same + // thread as reading the fifo, the blocking fifo will be more appropriate as + // the code will be simpler, and if we're just going to spin over the non-blocking + // reads until we obtain responses, we'll just be hogging CPU time by convincing + // the OS that we're doing actual work when it could instead park the thread. + z_owned_reply_channel_t channel = zc_reply_fifo_new(0); + zc_liveliness_get( + z_loan(session_), z_keyexpr(liveliness_str.c_str()), + z_move(channel.send), NULL); + z_owned_reply_t reply = z_reply_null(); + for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); + call_success = z_call(channel.recv, &reply)) + { + if (!call_success) { + continue; + } + if (z_reply_is_ok(&reply)) { + z_sample_t sample = z_reply_ok(&reply); + z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); + // Ignore tokens from the same session to avoid race conditions from this + // query and the liveliness subscription. + graph_cache_->parse_put(z_loan(keystr), true); + z_drop(z_move(keystr)); + } else { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n"); + } + } + z_drop(z_move(reply)); + z_drop(z_move(channel)); - // Trigger the ROS graph guard condition. - rmw_ret_t rmw_ret = rmw_trigger_guard_condition(data_ptr->graph_guard_condition_.get()); - if (RMW_RET_OK != rmw_ret) { - RMW_ZENOH_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "[graph_sub_data_handler] Unable to trigger graph guard condition." - ); + // Initialize the shm manager if shared_memory is enabled in the config. + shm_manager_ = std::nullopt; + if (shm_enabled._cstr != nullptr && + strcmp(shm_enabled._cstr, "true") == 0) + { + char idstr[sizeof(zid.id) * 2 + 1]; // 2 bytes for each byte of the id, plus the trailing \0 + static constexpr size_t max_size_of_each = 3; // 2 for each byte, plus the trailing \0 + for (size_t i = 0; i < sizeof(zid.id); ++i) { + snprintf(idstr + 2 * i, max_size_of_each, "%02x", zid.id[i]); + } + idstr[sizeof(zid.id) * 2] = '\0'; + // TODO(yadunund): Can we get the size of the shm from the config even though it's not + // a standard parameter? + shm_manager_ = + zc_shm_manager_new( + z_loan(session_), + idstr, + SHM_BUFFER_SIZE_MB * 1024 * 1024); + if (!shm_manager_.has_value() || + !zc_shm_manager_check(&shm_manager_.value())) + { + throw std::runtime_error("Unable to create shm manager."); + } + } + auto free_shm_manager = rcpputils::make_scope_exit( + [this]() { + if (shm_manager_.has_value()) { + z_drop(z_move(shm_manager_.value())); + } + }); + + graph_guard_condition_ = std::make_unique(); + graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; + graph_guard_condition_->data = &guard_condition_data_; + + // Setup the liveliness subscriber to receives updates from the ROS graph + // and update the graph cache. + auto sub_options = zc_liveliness_subscriber_options_null(); + z_owned_closure_sample_t callback = z_closure( + graph_sub_data_handler, nullptr, this); + graph_subscriber_ = zc_liveliness_declare_subscriber( + z_loan(session_), + z_keyexpr(liveliness_str.c_str()), + z_move(callback), + &sub_options); + zc_liveliness_subscriber_options_drop(z_move(sub_options)); + auto undeclare_z_sub = rcpputils::make_scope_exit( + [this]() { + z_undeclare_subscriber(z_move(this->graph_subscriber_)); + }); + if (!z_check(graph_subscriber_)) { + RMW_SET_ERROR_MSG("unable to create zenoh subscription"); + throw std::runtime_error("Unable to subscribe to ROS graph updates."); + } + + close_session.cancel(); + free_shm_manager.cancel(); + undeclare_z_sub.cancel(); } -} -///============================================================================= -rmw_context_impl_s::Data::Data( - std::size_t domain_id, - const std::string & enclave, - z_owned_session_t session, - std::optional shm_manager, - const std::string & liveliness_str, - std::shared_ptr graph_cache) -: enclave_(std::move(enclave)), - domain_id_(std::move(domain_id)), - session_(std::move(session)), - shm_manager_(std::move(shm_manager)), - liveliness_str_(std::move(liveliness_str)), - graph_cache_(std::move(graph_cache)), - is_shutdown_(false), - next_entity_id_(0), - is_initialized_(false), - nodes_({}) -{ - graph_guard_condition_ = std::make_unique(); - graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - graph_guard_condition_->data = &guard_condition_data_; -} + // Shutdown the Zenoh session. + rmw_ret_t shutdown() + { + { + std::lock_guard lock(mutex_); + rmw_ret_t ret = RMW_RET_OK; + if (is_shutdown_) { + return ret; + } -///============================================================================= -rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph() -{ - std::lock_guard lock(mutex_); - if (is_initialized_) { + // Shutdown all the nodes in this context. + for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) { + ret = node_it->second->shutdown(); + if (ret != RMW_RET_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to shutdown node with id %zu. rmw_ret_t code: %zu.", + node_it->second->id(), + ret + ); + } + } + + z_undeclare_subscriber(z_move(graph_subscriber_)); + if (shm_manager_.has_value()) { + z_drop(z_move(shm_manager_.value())); + } + is_shutdown_ = true; + + // We specifically do *not* hold the mutex_ while tearing down the session; this allows us + // to avoid an AB/BA deadlock if shutdown is racing with graph_sub_data_handler(). + } + + // Close the zenoh session + if (z_close(z_move(session_)) < 0) { + RMW_SET_ERROR_MSG("Error while closing zenoh session"); + return RMW_RET_ERROR; + } return RMW_RET_OK; } - // Setup the liveliness subscriber to receives updates from the ROS graph - // and update the graph cache. - // TODO(Yadunund): This closure is still not 100% thread safe as we are - // passing Data* as the type erased argument to z_closure. Thus during - // the execution of graph_sub_data_handler, the rawptr may be freed/reset - // by a different thread. When we switch to zenoh-cpp we can replace z_closure - // with a lambda that captures a weak_ptr by copy. The lambda and caputed - // weak_ptr will have the same lifetime as the subscriber. Then within - // graph_sub_data_handler, we would first lock to weak_ptr to check if the - // shared_ptr exits. If it does, then even if a different thread calls - // rmw_context_fini() to destroy rmw_context_impl_s, the locked - // shared_ptr would live on until the graph_sub_data_handler callback. - auto sub_options = zc_liveliness_subscriber_options_null(); - z_owned_closure_sample_t callback = z_closure( - rmw_context_impl_s::graph_sub_data_handler, nullptr, this); - graph_subscriber_ = zc_liveliness_declare_subscriber( - z_loan(session_), - z_keyexpr(liveliness_str_.c_str()), - z_move(callback), - &sub_options); - zc_liveliness_subscriber_options_drop(z_move(sub_options)); - auto undeclare_z_sub = rcpputils::make_scope_exit( - [this]() { - z_undeclare_subscriber(z_move(this->graph_subscriber_)); - }); - if (!z_check(graph_subscriber_)) { - RMW_SET_ERROR_MSG("unable to create zenoh subscription"); - return RMW_RET_ERROR; + + std::string enclave() const + { + std::lock_guard lock(mutex_); + return enclave_; } - undeclare_z_sub.cancel(); - is_initialized_ = true; - return RMW_RET_OK; -} + z_session_t session() const + { + std::lock_guard lock(mutex_); + return z_loan(session_); + } -///============================================================================= -rmw_ret_t rmw_context_impl_s::Data::shutdown() -{ - std::lock_guard lock(mutex_); - rmw_ret_t ret = RMW_RET_OK; - if (is_shutdown_) { - return ret; + std::optional & shm_manager() + { + std::lock_guard lock(mutex_); + return shm_manager_; } - // Shutdown all the nodes in this context. - for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) { - ret = node_it->second->shutdown(); - if (ret != RMW_RET_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to shutdown node with id %zu. rmw_ret_t code: %zu.", - node_it->second->id(), - ret - ); - } + rmw_guard_condition_t * graph_guard_condition() + { + std::lock_guard lock(mutex_); + return graph_guard_condition_.get(); } - z_undeclare_subscriber(z_move(graph_subscriber_)); - if (shm_manager_.has_value()) { - z_drop(z_move(shm_manager_.value())); + std::size_t get_next_entity_id() + { + std::lock_guard lock(mutex_); + return next_entity_id_++; } - // Close the zenoh session - if (z_close(z_move(session_)) < 0) { - RMW_SET_ERROR_MSG("Error while closing zenoh session"); - return RMW_RET_ERROR; + + bool is_shutdown() const + { + std::lock_guard lock(mutex_); + return is_shutdown_; } - is_shutdown_ = true; - return RMW_RET_OK; -} -///============================================================================= -rmw_context_impl_s::Data::~Data() -{ - auto ret = this->shutdown(); - nodes_.clear(); - static_cast(ret); -} + bool session_is_valid() const + { + std::lock_guard lock(mutex_); + return z_check(session_); + } -///============================================================================= -rmw_context_impl_s::rmw_context_impl_s( - const std::size_t domain_id, - const std::string & enclave) -{ - // Initialize the zenoh configuration. - z_owned_config_t config; - rmw_ret_t ret; - if ((ret = - rmw_zenoh_cpp::get_z_config( - rmw_zenoh_cpp::ConfigurableEntity::Session, - &config)) != RMW_RET_OK) + std::shared_ptr graph_cache() { - throw std::runtime_error("Error configuring Zenoh session."); + std::lock_guard lock(mutex_); + return graph_cache_; } - // Check if shm is enabled. - z_owned_str_t shm_enabled = zc_config_get(z_loan(config), "transport/shared_memory/enabled"); - auto always_free_shm_enabled = rcpputils::make_scope_exit( - [&shm_enabled]() { - z_drop(z_move(shm_enabled)); - }); + bool create_node_data( + const rmw_node_t * const node, + const std::string & ns, + const std::string & node_name) + { + std::lock_guard lock(mutex_); + if (nodes_.count(node) > 0) { + // Node already exists. + return false; + } - // Initialize the zenoh session. - z_owned_session_t session = z_open(z_move(config)); - if (!z_session_check(&session)) { - throw std::runtime_error("Error setting up zenoh session."); - } - auto close_session = rcpputils::make_scope_exit( - [&session]() { - z_close(z_move(session)); - }); + // Check that the Zenoh session is still valid. + if (!z_check(session_)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create NodeData as Zenoh session is invalid."); + return false; + } - // TODO(Yadunund) Move this check into a separate thread. - // Verify if the zenoh router is running if configured. - const std::optional configured_connection_attempts = - rmw_zenoh_cpp::zenoh_router_check_attempts(); - if (configured_connection_attempts.has_value()) { - ret = RMW_RET_ERROR; - uint64_t connection_attempts = 0; - // Retry until the connection is successful. - while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) { - if ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(session))) != RMW_RET_OK) { - ++connection_attempts; - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + auto node_data = rmw_zenoh_cpp::NodeData::make( + node, + this->get_next_entity_id(), + z_loan(session_), + domain_id_, + ns, + node_name, + enclave_); + if (node_data == nullptr) { + // Error already handled. + return false; } - if (ret != RMW_RET_OK) { - throw std::runtime_error( - "Unable to connect to a Zenoh router after " + - std::to_string(configured_connection_attempts.value()) + - " retries."); + + auto node_insertion = nodes_.insert(std::make_pair(node, std::move(node_data))); + if (!node_insertion.second) { + return false; } + + return true; } - // Initialize the graph cache. - const z_id_t zid = z_info_zid(z_loan(session)); - auto graph_cache = std::make_shared(zid); - // Setup liveliness subscriptions for discovery. - std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token( - domain_id); - - // Query router/liveliness participants to get graph information before this session was started. - // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive - // replies for the zc_liveliness_get() call. This is necessary as if the `bound` - // is too low, the channel may starve the zenoh executor of its threads which - // would lead to deadlocks when trying to receive replies and block the - // execution here. - // The blocking channel will return when the sender end is closed which is - // the moment the query finishes. - // The non-blocking fifo exists only for the use case where we don't want to - // block the thread between responses (including the request termination response). - // In general, unless we want to cooperatively schedule other tasks on the same - // thread as reading the fifo, the blocking fifo will be more appropriate as - // the code will be simpler, and if we're just going to spin over the non-blocking - // reads until we obtain responses, we'll just be hogging CPU time by convincing - // the OS that we're doing actual work when it could instead park the thread. - z_owned_reply_channel_t channel = zc_reply_fifo_new(0); - zc_liveliness_get( - z_loan(session), z_keyexpr(liveliness_str.c_str()), - z_move(channel.send), NULL); - z_owned_reply_t reply = z_reply_null(); - for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); - call_success = z_call(channel.recv, &reply)) + std::shared_ptr get_node_data(const rmw_node_t * const node) { - if (!call_success) { - continue; - } - if (z_reply_is_ok(&reply)) { - z_sample_t sample = z_reply_ok(&reply); - z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); - // Ignore tokens from the same session to avoid race conditions from this - // query and the liveliness subscription. - graph_cache->parse_put(z_loan(keystr), true); - z_drop(z_move(keystr)); - } else { - RMW_ZENOH_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n"); + std::lock_guard lock(mutex_); + auto node_it = nodes_.find(node); + if (node_it == nodes_.end()) { + return nullptr; } + return node_it->second; } - z_drop(z_move(reply)); - z_drop(z_move(channel)); - // Initialize the shm manager if shared_memory is enabled in the config. - std::optional shm_manager = std::nullopt; - if (shm_enabled._cstr != nullptr && - strcmp(shm_enabled._cstr, "true") == 0) + void delete_node_data(const rmw_node_t * const node) { - char idstr[sizeof(zid.id) * 2 + 1]; // 2 bytes for each byte of the id, plus the trailing \0 - static constexpr size_t max_size_of_each = 3; // 2 for each byte, plus the trailing \0 - for (size_t i = 0; i < sizeof(zid.id); ++i) { - snprintf(idstr + 2 * i, max_size_of_each, "%02x", zid.id[i]); + std::lock_guard lock(mutex_); + nodes_.erase(node); + } + + void update_graph_cache(z_sample_kind_t sample_kind, const std::string & keystr) + { + std::lock_guard lock(mutex_); + if (is_shutdown_) { + return; } - idstr[sizeof(zid.id) * 2] = '\0'; - // TODO(yadunund): Can we get the size of the shm from the config even though it's not - // a standard parameter? - shm_manager = - zc_shm_manager_new( - z_loan(session), - idstr, - SHM_BUFFER_SIZE_MB * 1024 * 1024); - if (!shm_manager.has_value() || - !zc_shm_manager_check(&shm_manager.value())) - { - throw std::runtime_error("Unable to create shm manager."); + switch (sample_kind) { + case z_sample_kind_t::Z_SAMPLE_KIND_PUT: + graph_cache_->parse_put(keystr); + break; + case z_sample_kind_t::Z_SAMPLE_KIND_DELETE: + graph_cache_->parse_del(keystr); + break; + default: + return; + } + + // Trigger the ROS graph guard condition. + rmw_ret_t rmw_ret = rmw_trigger_guard_condition(graph_guard_condition_.get()); + if (RMW_RET_OK != rmw_ret) { + RMW_ZENOH_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "[graph_sub_data_handler] Unable to trigger graph guard condition." + ); } } - auto free_shm_manager = rcpputils::make_scope_exit( - [&shm_manager]() { - if (shm_manager.has_value()) { - z_drop(z_move(shm_manager.value())); - } - }); - close_session.cancel(); - free_shm_manager.cancel(); + // Destructor. + ~Data() + { + auto ret = this->shutdown(); + nodes_.clear(); + static_cast(ret); + } + +private: + // Mutex to lock when accessing members. + mutable std::recursive_mutex mutex_; + // The ROS domain id of this context. + std::size_t domain_id_; + // Enclave, name used to find security artifacts in a sros2 keystore. + std::string enclave_; + // An owned session. + z_owned_session_t session_; + // An optional SHM manager that is initialized of SHM is enabled in the + // zenoh session config. + std::optional shm_manager_; + // Graph cache. + std::shared_ptr graph_cache_; + // ROS graph liveliness subscriber. + z_owned_subscriber_t graph_subscriber_; + // Equivalent to rmw_dds_common::Context's guard condition. + // Guard condition that should be triggered when the graph changes. + std::unique_ptr graph_guard_condition_; + // The GuardCondition data structure. + rmw_zenoh_cpp::GuardCondition guard_condition_data_; + // Shutdown flag. + bool is_shutdown_; + // A counter to assign a local id for every entity created in this session. + std::size_t next_entity_id_; + // Nodes created from this context. + std::unordered_map> nodes_; +}; + +///============================================================================= +static void graph_sub_data_handler(const z_sample_t * sample, void * data) +{ + z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); + auto free_keystr = rcpputils::make_scope_exit( + [&keystr]() { + z_drop(z_move(keystr)); + }); - data_ = std::make_shared( - domain_id, - std::move(enclave), - std::move(session), - std::move(shm_manager), - std::move(liveliness_str), - std::move(graph_cache)); + auto data_ptr = static_cast(data); + if (data_ptr == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "[graph_sub_data_handler] Invalid data_ptr." + ); + return; + } - ret = data_->subscribe_to_ros_graph(); - if (ret != RMW_RET_OK) { - throw std::runtime_error("Unable to subscribe to ROS Graph updates."); + // Look up the data shared_ptr in the global map. If it is in there, use it. + // If not, it is being shutdown so we can just ignore this update. + std::shared_ptr data_shared_ptr{nullptr}; + { + std::lock_guard lk(data_to_data_shared_ptr_map_mutex); + if (data_to_data_shared_ptr_map.count(data_ptr) == 0) { + return; + } + data_shared_ptr = data_to_data_shared_ptr_map[data_ptr]; } + + // Update the graph cache. + data_shared_ptr->update_graph_cache(sample->kind, keystr._cstr); +} + +///============================================================================= +rmw_context_impl_s::rmw_context_impl_s( + const std::size_t domain_id, + const std::string & enclave) +{ + data_ = std::make_shared(domain_id, std::move(enclave)); + + std::lock_guard lk(data_to_data_shared_ptr_map_mutex); + data_to_data_shared_ptr_map.emplace(data_.get(), data_); +} + +///============================================================================= +rmw_context_impl_s::~rmw_context_impl_s() +{ + this->shutdown(); } ///============================================================================= std::string rmw_context_impl_s::enclave() const { - std::lock_guard lock(data_->mutex_); - return data_->enclave_; + return data_->enclave(); } ///============================================================================= z_session_t rmw_context_impl_s::session() const { - std::lock_guard lock(data_->mutex_); - return z_loan(data_->session_); + return data_->session(); } ///============================================================================= std::optional & rmw_context_impl_s::shm_manager() { - std::lock_guard lock(data_->mutex_); - return data_->shm_manager_; + return data_->shm_manager(); } ///============================================================================= rmw_guard_condition_t * rmw_context_impl_s::graph_guard_condition() { - std::lock_guard lock(data_->mutex_); - return data_->graph_guard_condition_.get(); + return data_->graph_guard_condition(); } ///============================================================================= std::size_t rmw_context_impl_s::get_next_entity_id() { - std::lock_guard lock(data_->mutex_); - return data_->next_entity_id_++; + return data_->get_next_entity_id(); } ///============================================================================= rmw_ret_t rmw_context_impl_s::shutdown() { + { + std::lock_guard lk(data_to_data_shared_ptr_map_mutex); + data_to_data_shared_ptr_map.erase(data_.get()); + } + return data_->shutdown(); } ///============================================================================= bool rmw_context_impl_s::is_shutdown() const { - std::lock_guard lock(data_->mutex_); - return data_->is_shutdown_; + return data_->is_shutdown(); } ///============================================================================= bool rmw_context_impl_s::session_is_valid() const { - std::lock_guard lock(data_->mutex_); - return z_check(data_->session_); + return data_->session_is_valid(); } ///============================================================================= std::shared_ptr rmw_context_impl_s::graph_cache() { - std::lock_guard lock(data_->mutex_); - return data_->graph_cache_; + return data_->graph_cache(); } ///============================================================================= @@ -407,56 +547,18 @@ bool rmw_context_impl_s::create_node_data( const std::string & ns, const std::string & node_name) { - std::lock_guard lock(data_->mutex_); - if (data_->nodes_.count(node) > 0) { - // Node already exists. - return false; - } - - // Check that the Zenoh session is still valid. - if (!z_check(data_->session_)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create NodeData as Zenoh session is invalid."); - return false; - } - - auto node_data = rmw_zenoh_cpp::NodeData::make( - node, - this->get_next_entity_id(), - z_loan(data_->session_), - data_->domain_id_, - ns, - node_name, - data_->enclave_); - if (node_data == nullptr) { - // Error already handled. - return false; - } - - auto node_insertion = data_->nodes_.insert(std::make_pair(node, std::move(node_data))); - if (!node_insertion.second) { - return false; - } - - return true; + return data_->create_node_data(node, ns, node_name); } ///============================================================================= std::shared_ptr rmw_context_impl_s::get_node_data( const rmw_node_t * const node) { - std::lock_guard lock(data_->mutex_); - auto node_it = data_->nodes_.find(node); - if (node_it == data_->nodes_.end()) { - return nullptr; - } - return node_it->second; + return data_->get_node_data(node); } ///============================================================================= void rmw_context_impl_s::delete_node_data(const rmw_node_t * const node) { - std::lock_guard lock(data_->mutex_); - data_->nodes_.erase(node); + data_->delete_node_data(node); } diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index 884056e3..a35b9ffe 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -17,20 +17,16 @@ #include -# include +#include #include -#include #include #include -#include #include "graph_cache.hpp" -#include "guard_condition.hpp" -#include "liveliness_utils.hpp" #include "rmw_node_data.hpp" -#include "rcutils/types.h" -#include "rmw/rmw.h" +#include "rmw/ret_types.h" +#include "rmw/types.h" ///============================================================================= class rmw_context_impl_s final @@ -46,6 +42,8 @@ class rmw_context_impl_s final const std::size_t domain_id, const std::string & enclave); + ~rmw_context_impl_s(); + // Get a copy of the enclave. std::string enclave() const; @@ -93,67 +91,11 @@ class rmw_context_impl_s final /// Delete the NodeData for a given rmw_node_t if present. void delete_node_data(const rmw_node_t * const node); -private: - // Bundle all class members into a data struct which can be passed as a - // weak ptr to various threads for thread-safe access without capturing - // "this" ptr by reference. - struct Data : public std::enable_shared_from_this - { - // Constructor. - Data( - std::size_t domain_id, - const std::string & enclave, - z_owned_session_t session, - std::optional shm_manager, - const std::string & liveliness_str, - std::shared_ptr graph_cache); - - // Subscribe to the ROS graph. - rmw_ret_t subscribe_to_ros_graph(); - - // Shutdown the Zenoh session. - rmw_ret_t shutdown(); - - // Destructor. - ~Data(); - - // Mutex to lock when accessing members. - mutable std::recursive_mutex mutex_; - // RMW allocator. - const rcutils_allocator_t * allocator_; - // Enclave, name used to find security artifacts in a sros2 keystore. - std::string enclave_; - // The ROS domain id of this context. - std::size_t domain_id_; - // An owned session. - z_owned_session_t session_; - // An optional SHM manager that is initialized of SHM is enabled in the - // zenoh session config. - std::optional shm_manager_; - // Liveliness keyexpr string to subscribe to for ROS graph changes. - std::string liveliness_str_; - // Graph cache. - std::shared_ptr graph_cache_; - // ROS graph liveliness subscriber. - z_owned_subscriber_t graph_subscriber_; - // Equivalent to rmw_dds_common::Context's guard condition. - // Guard condition that should be triggered when the graph changes. - std::unique_ptr graph_guard_condition_; - // The GuardCondition data structure. - rmw_zenoh_cpp::GuardCondition guard_condition_data_; - // Shutdown flag. - bool is_shutdown_; - // A counter to assign a local id for every entity created in this session. - std::size_t next_entity_id_; - // True once graph subscriber is initialized. - bool is_initialized_; - // Nodes created from this context. - std::unordered_map> nodes_; - }; + // Forward declaration + class Data; +private: std::shared_ptr data_{nullptr}; - - static void graph_sub_data_handler(const z_sample_t * sample, void * data); }; From 91edead67504f2a297a5819b5c625e3e3bd92b36 Mon Sep 17 00:00:00 2001 From: yadunund Date: Tue, 12 Nov 2024 11:30:24 -0800 Subject: [PATCH 2/4] Make ClientData thread-safe (#293) * Make ClientData thread-safe * Handle queries in flight * Address some feedback * Fix service client type check * fix tracking of num in flight without UB * Additional fixes for yadu/raii-client. (#304) * Additional fixes for yadu/raii-client. * Apply suggestions from code review * Fixups. * Fix -Wnon-pod-varargs * Make PublisherData::copy_gid have the same signatures as others. This just makes it more consistent with the rest of the APIs. * Add api to init client * Vastly simplify handling of in-flight ClientData. We basically use the pointer as a key to lookup the shared_ptr, and then always use the shared_ptr. That ensures that even if the pointer was dropped from the node while the client callback is operating, it will still be valid and won't be destroyed until that method returns. Signed-off-by: Yadunund Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/CMakeLists.txt | 2 +- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 9 +- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 3 +- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 558 ++++++++++++++++++ rmw_zenoh_cpp/src/detail/rmw_client_data.hpp | 149 +++++ .../src/detail/rmw_context_impl_s.hpp | 2 - rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 220 ------- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 131 ---- rmw_zenoh_cpp/src/detail/rmw_node_data.cpp | 86 +++ rmw_zenoh_cpp/src/detail/rmw_node_data.hpp | 18 + .../src/detail/rmw_publisher_data.cpp | 4 +- .../src/detail/rmw_publisher_data.hpp | 2 +- rmw_zenoh_cpp/src/rmw_event.cpp | 1 - rmw_zenoh_cpp/src/rmw_init.cpp | 1 - rmw_zenoh_cpp/src/rmw_zenoh.cpp | 522 +++------------- 15 files changed, 901 insertions(+), 807 deletions(-) create mode 100644 rmw_zenoh_cpp/src/detail/rmw_client_data.cpp create mode 100644 rmw_zenoh_cpp/src/detail/rmw_client_data.hpp delete mode 100644 rmw_zenoh_cpp/src/detail/rmw_data_types.cpp delete mode 100644 rmw_zenoh_cpp/src/detail/rmw_data_types.hpp diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index d6080970..7c4a27eb 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -38,8 +38,8 @@ add_library(rmw_zenoh_cpp SHARED src/detail/logging.cpp src/detail/message_type_support.cpp src/detail/qos.cpp + src/detail/rmw_client_data.cpp src/detail/rmw_context_impl_s.cpp - src/detail/rmw_data_types.cpp src/detail/rmw_publisher_data.cpp src/detail/rmw_node_data.cpp src/detail/rmw_service_data.cpp diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 22f1b153..776d343d 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -36,7 +36,6 @@ #include "graph_cache.hpp" #include "logging_macros.hpp" -#include "rmw_data_types.hpp" namespace rmw_zenoh_cpp { @@ -1182,15 +1181,15 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( ///============================================================================= rmw_ret_t GraphCache::service_server_is_available( - const char * service_name, - const char * service_type, + const liveliness::TopicInfo & client_topic_info, bool * is_available) const { *is_available = false; std::lock_guard lock(graph_mutex_); - GraphNode::TopicMap::const_iterator service_it = graph_services_.find(service_name); + GraphNode::TopicMap::const_iterator service_it = graph_services_.find(client_topic_info.name_); if (service_it != graph_services_.end()) { - GraphNode::TopicTypeMap::const_iterator type_it = service_it->second.find(service_type); + GraphNode::TopicTypeMap::const_iterator type_it = + service_it->second.find(client_topic_info.type_); if (type_it != service_it->second.end()) { for (const auto & [_, topic_data] : type_it->second) { if (topic_data->subs_.size() > 0) { diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index f2302efd..8ac2172d 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -172,8 +172,7 @@ class GraphCache final rmw_topic_endpoint_info_array_t * endpoints_info) const; rmw_ret_t service_server_is_available( - const char * service_name, - const char * service_type, + const liveliness::TopicInfo & client_topic_info, bool * is_available) const; /// Set a qos event callback for an entity from the current session. diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp new file mode 100644 index 00000000..8c851a38 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -0,0 +1,558 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw_client_data.hpp" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "attachment_helpers.hpp" +#include "cdr.hpp" +#include "liveliness_utils.hpp" +#include "logging_macros.hpp" +#include "message_type_support.hpp" +#include "qos.hpp" +#include "rmw_context_impl_s.hpp" + +#include "rcpputils/scope_exit.hpp" + +#include "rmw/error_handling.h" +#include "rmw/get_topic_endpoint_info.h" +#include "rmw/impl/cpp/macros.hpp" + +namespace +{ + +std::mutex client_data_ptr_to_shared_ptr_map_mutex; +std::unordered_map> client_data_ptr_to_shared_ptr_map; + +///============================================================================= +void client_data_handler(z_owned_reply_t * reply, void * data) +{ + auto client_data = static_cast(data); + if (client_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain client_data_t from data in client_data_handler." + ); + return; + } + + std::shared_ptr client_data_shared_ptr{nullptr}; + { + std::lock_guard lk(client_data_ptr_to_shared_ptr_map_mutex); + if (client_data_ptr_to_shared_ptr_map.count(client_data) == 0) { + return; + } + client_data_shared_ptr = client_data_ptr_to_shared_ptr_map[client_data]; + } + + if (client_data_shared_ptr->is_shutdown()) { + return; + } + + if (!z_reply_check(reply)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "z_reply_check returned False" + ); + return; + } + if (!z_reply_is_ok(reply)) { + z_value_t err = z_reply_err(reply); + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "z_reply_is_ok returned False for keyexpr %s. Reason: %.*s", + client_data->topic_info().topic_keyexpr_.c_str(), + (int)err.payload.len, + err.payload.start); + + return; + } + + std::chrono::nanoseconds::rep received_timestamp = + std::chrono::system_clock::now().time_since_epoch().count(); + + client_data_shared_ptr->add_new_reply( + std::make_unique(reply, received_timestamp)); + + // Since we took ownership of the reply, null it out here + *reply = z_reply_null(); +} + +} // namespace + +namespace rmw_zenoh_cpp +{ +///============================================================================= +std::shared_ptr ClientData::make( + z_session_t session, + const rmw_node_t * const node, + liveliness::NodeInfo node_info, + std::size_t node_id, + std::size_t service_id, + const std::string & service_name, + const rosidl_service_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile) +{ + // Adapt any 'best available' QoS options + rmw_qos_profile_t adapted_qos_profile = *qos_profile; + rmw_ret_t ret = QoS::get().best_available_qos( + nullptr, nullptr, &adapted_qos_profile, nullptr); + if (RMW_RET_OK != ret) { + RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); + return nullptr; + } + + rcutils_allocator_t * allocator = &node->context->options.allocator; + + const rosidl_type_hash_t * type_hash = type_support->get_type_hash_func(type_support); + auto service_members = static_cast(type_support->data); + auto request_members = static_cast( + service_members->request_members_->data); + auto response_members = static_cast( + service_members->response_members_->data); + auto request_type_support = std::make_shared(service_members); + auto response_type_support = std::make_shared(service_members); + + // Note: Service request/response types will contain a suffix Request_ or Response_. + // We remove the suffix when appending the type to the liveliness tokens for + // better reusability within GraphCache. + std::string service_type = request_type_support->get_name(); + size_t suffix_substring_position = service_type.find("Request_"); + if (std::string::npos != suffix_substring_position) { + service_type = service_type.substr(0, suffix_substring_position); + } else { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unexpected type %s for client %s. Report this bug", + service_type.c_str(), service_name.c_str()); + return nullptr; + } + + // Convert the type hash to a string so that it can be included in the keyexpr. + char * type_hash_c_str = nullptr; + rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( + type_hash, + *allocator, + &type_hash_c_str); + if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { + RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); + return nullptr; + } + auto free_type_hash_c_str = rcpputils::make_scope_exit( + [&allocator, &type_hash_c_str]() { + allocator->deallocate(type_hash_c_str, allocator->state); + }); + + std::size_t domain_id = node_info.domain_id_; + auto entity = liveliness::Entity::make( + z_info_zid(session), + std::to_string(node_id), + std::to_string(service_id), + liveliness::EntityType::Client, + std::move(node_info), + liveliness::TopicInfo{ + std::move(domain_id), + service_name, + std::move(service_type), + type_hash_c_str, + std::move(adapted_qos_profile)} + ); + if (entity == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the client %s.", + service_name.c_str()); + return nullptr; + } + + std::shared_ptr client_data = std::shared_ptr( + new ClientData{ + node, + entity, + request_members, + response_members, + request_type_support, + response_type_support + }); + + if (!client_data->init(session)) { + // init() already set the error. + return nullptr; + } + + std::lock_guard lk(client_data_ptr_to_shared_ptr_map_mutex); + client_data_ptr_to_shared_ptr_map.emplace(client_data.get(), client_data); + + return client_data; +} + +///============================================================================= +ClientData::ClientData( + const rmw_node_t * rmw_node, + std::shared_ptr entity, + const void * request_type_support_impl, + const void * response_type_support_impl, + std::shared_ptr request_type_support, + std::shared_ptr response_type_support) +: rmw_node_(rmw_node), + entity_(std::move(entity)), + request_type_support_impl_(request_type_support_impl), + response_type_support_impl_(response_type_support_impl), + request_type_support_(request_type_support), + response_type_support_(response_type_support), + wait_set_data_(nullptr), + sequence_number_(1), + is_shutdown_(false) +{ + // Do nothing. +} + +///============================================================================= +bool ClientData::init(z_session_t session) +{ + this->keyexpr_ = + z_keyexpr_new(this->entity_->topic_info().value().topic_keyexpr_.c_str()); + auto free_ros_keyexpr = rcpputils::make_scope_exit( + [this]() { + z_drop(z_move(this->keyexpr_)); + }); + if (!z_check(z_loan(this->keyexpr_))) { + RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); + return false; + } + + this->token_ = zc_liveliness_declare_token( + session, + z_keyexpr(this->entity_->liveliness_keyexpr().c_str()), + NULL + ); + auto free_token = rcpputils::make_scope_exit( + [this]() { + z_drop(z_move(this->token_)); + }); + if (!z_check(this->token_)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the client."); + return false; + } + + free_ros_keyexpr.cancel(); + free_token.cancel(); + + return true; +} + +///============================================================================= +liveliness::TopicInfo ClientData::topic_info() const +{ + std::lock_guard lock(mutex_); + return entity_->topic_info().value(); +} + +///============================================================================= +bool ClientData::liveliness_is_valid() const +{ + std::lock_guard lock(mutex_); + return zc_liveliness_token_check(&token_); +} + +///============================================================================= +void ClientData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const +{ + std::lock_guard lock(mutex_); + entity_->copy_gid(out_gid); +} + +///============================================================================= +void ClientData::add_new_reply(std::unique_ptr reply) +{ + std::lock_guard lock(mutex_); + const rmw_qos_profile_t adapted_qos_profile = + entity_->topic_info().value().qos_; + if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && + reply_queue_.size() >= adapted_qos_profile.depth) + { + // Log warning if message is discarded due to hitting the queue depth + z_owned_str_t keystr = z_keyexpr_to_string(z_loan(keyexpr_)); + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Query queue depth of %ld reached, discarding oldest Query " + "for client for %s", + adapted_qos_profile.depth, + z_loan(keystr)); + z_drop(z_move(keystr)); + reply_queue_.pop_front(); + } + reply_queue_.emplace_back(std::move(reply)); + + // Since we added new data, trigger user callback and guard condition if they are available + data_callback_mgr_.trigger_callback(); + if (wait_set_data_ != nullptr) { + std::lock_guard wait_set_lock(wait_set_data_->condition_mutex); + wait_set_data_->triggered = true; + wait_set_data_->condition_variable.notify_one(); + } +} + +///============================================================================= +rmw_ret_t ClientData::take_response( + rmw_service_info_t * request_header, + void * ros_response, + bool * taken) +{ + std::lock_guard lock(mutex_); + *taken = false; + + if (is_shutdown_ || reply_queue_.empty()) { + // This tells rcl that the check for a new message was done, but no messages have come in yet. + return RMW_RET_OK; + } + std::unique_ptr latest_reply = std::move(reply_queue_.front()); + reply_queue_.pop_front(); + + std::optional sample = latest_reply->get_sample(); + if (!sample) { + RMW_SET_ERROR_MSG("invalid reply sample"); + return RMW_RET_ERROR; + } + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer( + reinterpret_cast(const_cast(sample->payload.start)), + sample->payload.len); + + // Object that serializes the data + rmw_zenoh_cpp::Cdr deser(fastbuffer); + if (!response_type_support_->deserialize_ros_message( + deser.get_cdr(), + ros_response, + response_type_support_impl_)) + { + RMW_SET_ERROR_MSG("could not deserialize ROS response"); + return RMW_RET_ERROR; + } + + // Fill in the request_header + request_header->request_id.sequence_number = + rmw_zenoh_cpp::get_int64_from_attachment(&sample->attachment, "sequence_number"); + if (request_header->request_id.sequence_number < 0) { + RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); + return RMW_RET_ERROR; + } + request_header->source_timestamp = + rmw_zenoh_cpp::get_int64_from_attachment(&sample->attachment, "source_timestamp"); + if (request_header->source_timestamp < 0) { + RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); + return RMW_RET_ERROR; + } + if (!rmw_zenoh_cpp::get_gid_from_attachment( + &sample->attachment, + request_header->request_id.writer_guid)) + { + RMW_SET_ERROR_MSG("Could not get client gid from attachment"); + return RMW_RET_ERROR; + } + + request_header->received_timestamp = latest_reply->get_received_timestamp(); + + *taken = true; + + return RMW_RET_OK; +} + +///============================================================================= +rmw_ret_t ClientData::send_request( + const void * ros_request, + int64_t * sequence_id) +{ + std::lock_guard lock(mutex_); + if (is_shutdown_) { + return RMW_RET_OK; + } + + rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator; + rmw_context_impl_s * context_impl = static_cast(rmw_node_->context->impl); + if (context_impl == nullptr) { + return RMW_RET_INVALID_ARGUMENT; + } + + size_t max_data_length = ( + request_type_support_->get_estimated_serialized_size( + ros_request, request_type_support_impl_)); + + // Init serialized message byte array + char * request_bytes = static_cast(allocator->allocate( + max_data_length, + allocator->state)); + if (!request_bytes) { + RMW_SET_ERROR_MSG("failed allocate request message bytes"); + return RMW_RET_ERROR; + } + auto always_free_request_bytes = rcpputils::make_scope_exit( + [request_bytes, allocator]() { + allocator->deallocate(request_bytes, allocator->state); + }); + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer(request_bytes, max_data_length); + // Object that serializes the data + Cdr ser(fastbuffer); + if (!request_type_support_->serialize_ros_message( + ros_request, + ser.get_cdr(), + request_type_support_impl_)) + { + return RMW_RET_ERROR; + } + size_t data_length = ser.get_serialized_data_length(); + *sequence_id = sequence_number_++; + + // Send request + z_get_options_t opts = z_get_options_default(); + z_owned_bytes_map_t map = create_map_and_set_sequence_num( + *sequence_id, + [this](z_owned_bytes_map_t * map, const char * key) + { + uint8_t local_gid[RMW_GID_STORAGE_SIZE]; + entity_->copy_gid(local_gid); + z_bytes_t gid_bytes; + gid_bytes.len = RMW_GID_STORAGE_SIZE; + gid_bytes.start = local_gid; + z_bytes_map_insert_by_copy(map, z_bytes_new(key), gid_bytes); + }); + if (!z_check(map)) { + // create_map_and_set_sequence_num already set the error + return RMW_RET_ERROR; + } + auto always_free_attachment_map = rcpputils::make_scope_exit( + [&map]() { + z_bytes_map_drop(z_move(map)); + }); + + opts.attachment = z_bytes_map_as_attachment(&map); + opts.target = Z_QUERY_TARGET_ALL_COMPLETE; + // The default timeout for a z_get query is 10 seconds and if a response is not received within + // this window, the queryable will return an invalid reply. However, it is common for actions, + // which are implemented using services, to take an extended duration to complete. Hence, we set + // the timeout_ms to the largest supported value to account for most realistic scenarios. + opts.timeout_ms = std::numeric_limits::max(); + // Latest consolidation guarantees unicity of replies for the same key expression, + // which optimizes bandwidth. The default is "None", which imples replies may come in any order + // and any number. + opts.consolidation = z_query_consolidation_latest(); + opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; + // TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures, + // capture shared_from_this() instead of this. + z_owned_closure_reply_t zn_closure_reply = + z_closure(client_data_handler, nullptr, this); + z_get( + context_impl->session(), + z_loan(keyexpr_), "", + z_move(zn_closure_reply), + &opts); + + return RMW_RET_OK; +} + +///============================================================================= +ClientData::~ClientData() +{ + { + std::lock_guard lk(client_data_ptr_to_shared_ptr_map_mutex); + client_data_ptr_to_shared_ptr_map.erase(this); + } + + const rmw_ret_t ret = this->shutdown(); + if (ret != RMW_RET_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Error destructing client /%s.", + entity_->topic_info().value().name_.c_str() + ); + } +} + +//============================================================================== +void ClientData::set_on_new_response_callback( + rmw_event_callback_t callback, + const void * user_data) +{ + std::lock_guard lock(mutex_); + data_callback_mgr_.set_callback(user_data, std::move(callback)); +} + +///============================================================================= +bool ClientData::queue_has_data_and_attach_condition_if_not( + rmw_wait_set_data_t * wait_set_data) +{ + std::lock_guard lock(mutex_); + if (!reply_queue_.empty()) { + return true; + } + wait_set_data_ = wait_set_data; + + return false; +} + +///============================================================================= +bool ClientData::detach_condition_and_queue_is_empty() +{ + std::lock_guard lock(mutex_); + wait_set_data_ = nullptr; + + return reply_queue_.empty(); +} + +///============================================================================= +rmw_ret_t ClientData::shutdown() +{ + rmw_ret_t ret = RMW_RET_OK; + std::lock_guard lock(mutex_); + if (is_shutdown_ || !initialized_) { + return ret; + } + + // Unregister this node from the ROS graph. + if (zc_liveliness_token_check(&token_)) { + zc_liveliness_undeclare_token(z_move(token_)); + } + if (z_check(z_loan(keyexpr_))) { + z_drop(z_move(keyexpr_)); + } + + is_shutdown_ = true; + initialized_ = false; + return RMW_RET_OK; +} + +///============================================================================= +bool ClientData::is_shutdown() const +{ + std::lock_guard lock(mutex_); + return is_shutdown_; +} +} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp new file mode 100644 index 00000000..b4c56775 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -0,0 +1,149 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__RMW_CLIENT_DATA_HPP_ +#define DETAIL__RMW_CLIENT_DATA_HPP_ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "event.hpp" +#include "liveliness_utils.hpp" +#include "message_type_support.hpp" +#include "service_type_support.hpp" +#include "type_support_common.hpp" +#include "zenoh_utils.hpp" + +#include "rcutils/allocator.h" + +#include "rmw/rmw.h" +#include "rmw/ret_types.h" + +namespace rmw_zenoh_cpp +{ + +///============================================================================= +class ClientData final : public std::enable_shared_from_this +{ +public: + // Make a shared_ptr of ClientData. + static std::shared_ptr make( + z_session_t session, + const rmw_node_t * const node, + liveliness::NodeInfo node_info, + std::size_t node_id, + std::size_t service_id, + const std::string & service_name, + const rosidl_service_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile); + + // Get a copy of the TopicInfo of this ClientData. + liveliness::TopicInfo topic_info() const; + + // Returns true if liveliness token is still valid. + bool liveliness_is_valid() const; + + // Copy the GID of this ClientData into an rmw_gid_t. + void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; + + // Add a new ZenohReply to the queue. + void add_new_reply(std::unique_ptr reply); + + // Take a ROS service response. + rmw_ret_t take_response( + rmw_service_info_t * request_header, + void * ros_response, + bool * taken); + + // Send a service request. + rmw_ret_t send_request( + const void * ros_request, + int64_t * sequence_id); + + void set_on_new_response_callback( + rmw_event_callback_t callback, + const void * user_data); + + // rmw_wait helpers. + bool queue_has_data_and_attach_condition_if_not( + rmw_wait_set_data_t * wait_set_data); + + bool detach_condition_and_queue_is_empty(); + + // See the comment for "num_in_flight" below on the use of this method. + void decrement_queries_in_flight(); + + // Shutdown this ClientData. + rmw_ret_t shutdown(); + + // Check if this ClientData is shutdown. + bool is_shutdown() const; + + // Destructor. + ~ClientData(); + +private: + // Constructor. + ClientData( + const rmw_node_t * rmw_node, + std::shared_ptr entity, + const void * request_type_support_impl, + const void * response_type_support_impl, + std::shared_ptr request_type_support, + std::shared_ptr response_type_support); + + // Initialize the Zenoh objects for this entity. + bool init(z_session_t session); + + // Internal mutex. + mutable std::mutex mutex_; + // The parent node. + const rmw_node_t * rmw_node_; + // The Entity generated for the service. + std::shared_ptr entity_; + // An owned keyexpression. + z_owned_keyexpr_t keyexpr_; + // Liveliness token for the service. + zc_owned_liveliness_token_t token_; + // Type support fields. + const void * request_type_support_impl_; + const void * response_type_support_impl_; + std::shared_ptr request_type_support_; + std::shared_ptr response_type_support_; + // Deque to store the replies in the order they arrive. + std::deque> reply_queue_; + // Wait set data. + rmw_wait_set_data_t * wait_set_data_; + // Data callback manager. + DataCallbackManager data_callback_mgr_; + // Sequence number for queries. + size_t sequence_number_; + // Shutdown flag. + bool is_shutdown_; + // Whether the object has ever successfully been initialized. + bool initialized_; +}; +using ClientDataPtr = std::shared_ptr; +using ClientDataConstPtr = std::shared_ptr; +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__RMW_CLIENT_DATA_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index a35b9ffe..b9367e6f 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -48,8 +48,6 @@ class rmw_context_impl_s final std::string enclave() const; // Loan the Zenoh session. - // TODO(Yadunund): Remove this API once rmw_context_impl_s is updated to - // create other Zenoh objects. z_session_t session() const; // Get a reference to the shm_manager. diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp deleted file mode 100644 index acf0bc70..00000000 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ /dev/null @@ -1,220 +0,0 @@ -// Copyright 2023 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include - -#include "logging_macros.hpp" - -#include "rmw/error_handling.h" -#include "rmw/impl/cpp/macros.hpp" - -#include "attachment_helpers.hpp" -#include "rmw_data_types.hpp" - -///============================================================================= -namespace rmw_zenoh_cpp -{ -///============================================================================= -void rmw_client_data_t::notify() -{ - std::lock_guard lock(condition_mutex_); - if (wait_set_data_ != nullptr) { - std::lock_guard wait_set_lock(wait_set_data_->condition_mutex); - wait_set_data_->triggered = true; - wait_set_data_->condition_variable.notify_one(); - } -} - -///============================================================================= -void rmw_client_data_t::add_new_reply(std::unique_ptr reply) -{ - std::lock_guard lock(reply_queue_mutex_); - if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && - reply_queue_.size() >= adapted_qos_profile.depth) - { - // Log warning if message is discarded due to hitting the queue depth - z_owned_str_t keystr = z_keyexpr_to_string(z_loan(this->keyexpr)); - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Reply queue depth of %ld reached, discarding oldest reply " - "for client for %s", - adapted_qos_profile.depth, - z_loan(keystr)); - z_drop(z_move(keystr)); - reply_queue_.pop_front(); - } - reply_queue_.emplace_back(std::move(reply)); - - // Since we added new data, trigger user callback and guard condition if they are available - data_callback_mgr.trigger_callback(); - notify(); -} - -///============================================================================= -bool rmw_client_data_t::queue_has_data_and_attach_condition_if_not( - rmw_wait_set_data_t * wait_set_data) -{ - std::lock_guard lock(condition_mutex_); - if (!reply_queue_.empty()) { - return true; - } - wait_set_data_ = wait_set_data; - - return false; -} - -///============================================================================= -bool rmw_client_data_t::detach_condition_and_queue_is_empty() -{ - std::lock_guard lock(condition_mutex_); - wait_set_data_ = nullptr; - - return reply_queue_.empty(); -} - -///============================================================================= -std::unique_ptr rmw_client_data_t::pop_next_reply() -{ - std::lock_guard lock(reply_queue_mutex_); - - if (reply_queue_.empty()) { - return nullptr; - } - - std::unique_ptr latest_reply = std::move(reply_queue_.front()); - reply_queue_.pop_front(); - - return latest_reply; -} - -//============================================================================== -// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class -// for the use of this method. -void rmw_client_data_t::increment_in_flight_callbacks() -{ - std::lock_guard lock(in_flight_mutex_); - num_in_flight_++; -} - -//============================================================================== -// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class -// for the use of this method. -bool rmw_client_data_t::shutdown_and_query_in_flight() -{ - std::lock_guard lock(in_flight_mutex_); - is_shutdown_ = true; - - return num_in_flight_ > 0; -} - -//============================================================================== -// See the comment about the "num_in_flight" class variable in the rmw_client_data_t structure -// for the use of this method. -bool rmw_client_data_t::decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight) -{ - std::lock_guard lock(in_flight_mutex_); - queries_in_flight = --num_in_flight_ > 0; - return is_shutdown_; -} - -bool rmw_client_data_t::is_shutdown() const -{ - std::lock_guard lock(in_flight_mutex_); - return is_shutdown_; -} - - -///============================================================================= -size_t rmw_client_data_t::get_next_sequence_number() -{ - std::lock_guard lock(sequence_number_mutex_); - return sequence_number_++; -} - -//============================================================================== -void client_data_handler(z_owned_reply_t * reply, void * data) -{ - auto client_data = static_cast(data); - if (client_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain client_data_t " - ); - return; - } - - // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for - // why we need to do this. - if (client_data->is_shutdown()) { - return; - } - - if (!z_reply_check(reply)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "z_reply_check returned False" - ); - return; - } - if (!z_reply_is_ok(reply)) { - z_owned_str_t keystr = z_keyexpr_to_string(z_loan(client_data->keyexpr)); - z_value_t err = z_reply_err(reply); - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "z_reply_is_ok returned False for keyexpr %s. Reason: %.*s", - z_loan(keystr), - (int)err.payload.len, - err.payload.start); - z_drop(z_move(keystr)); - - return; - } - - std::chrono::nanoseconds::rep received_timestamp = - std::chrono::system_clock::now().time_since_epoch().count(); - - client_data->add_new_reply(std::make_unique(reply, received_timestamp)); - // Since we took ownership of the reply, null it out here - *reply = z_reply_null(); -} - -void client_data_drop(void * data) -{ - auto client_data = static_cast(data); - if (client_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain client_data_t " - ); - return; - } - - // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for - // why we need to do this. - bool queries_in_flight = false; - bool is_shutdown = client_data->decrement_queries_in_flight_and_is_shutdown(queries_in_flight); - - if (is_shutdown) { - if (!queries_in_flight) { - RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); - client_data->context->options.allocator.deallocate( - client_data, client_data->context->options.allocator.state); - } - } -} - -} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp deleted file mode 100644 index 5605d7f0..00000000 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright 2023 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef DETAIL__RMW_DATA_TYPES_HPP_ -#define DETAIL__RMW_DATA_TYPES_HPP_ - -#include - -#include -#include -#include -#include -#include -#include - -#include "rcutils/allocator.h" - -#include "rmw/rmw.h" - -#include "rosidl_runtime_c/type_hash.h" - -#include "event.hpp" -#include "liveliness_utils.hpp" -#include "message_type_support.hpp" -#include "rmw_wait_set_data.hpp" -#include "service_type_support.hpp" -#include "zenoh_utils.hpp" - -/// Structs for various type erased data fields. -namespace rmw_zenoh_cpp -{ -///============================================================================= -void client_data_handler(z_owned_reply_t * reply, void * client_data); -void client_data_drop(void * data); - -///============================================================================= -class rmw_client_data_t final -{ -public: - // The Entity generated for the client. - std::shared_ptr entity; - - z_owned_keyexpr_t keyexpr; - - // Store the actual QoS profile used to configure this client. - // The QoS is reused for sending requests and getting responses. - rmw_qos_profile_t adapted_qos_profile; - - // Liveliness token for the client. - zc_owned_liveliness_token_t token; - - const void * request_type_support_impl; - const void * response_type_support_impl; - const char * typesupport_identifier; - const rosidl_type_hash_t * type_hash; - RequestTypeSupport * request_type_support; - ResponseTypeSupport * response_type_support; - - rmw_context_t * context; - - size_t get_next_sequence_number(); - - void add_new_reply(std::unique_ptr reply); - - bool queue_has_data_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data); - - bool detach_condition_and_queue_is_empty(); - - std::unique_ptr pop_next_reply(); - - DataCallbackManager data_callback_mgr; - - // See the comment for "num_in_flight" below on the use of this method. - void increment_in_flight_callbacks(); - - // See the comment for "num_in_flight" below on the use of this method. - bool shutdown_and_query_in_flight(); - - // See the comment for "num_in_flight" below on the use of this method. - bool decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight); - - bool is_shutdown() const; - -private: - void notify(); - - size_t sequence_number_{1}; - std::mutex sequence_number_mutex_; - - rmw_wait_set_data_t * wait_set_data_{nullptr}; - std::mutex condition_mutex_; - - std::deque> reply_queue_; - mutable std::mutex reply_queue_mutex_; - - // rmw_zenoh uses Zenoh queries to implement clients. It turns out that in Zenoh, there is no - // way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an - // rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the - // query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory. - // - // The next 3 variables are used to avoid that situation. Any time a query is initiated via - // rmw_send_request(), num_in_flight_ is incremented. When the Zenoh calls the callback for the - // query reply, num_in_flight_ is decremented. When rmw_destroy_client() is called, is_shutdown_ - // is set to true. If num_in_flight_ is 0, the data associated with this structure is freed. - // If num_in_flight_ is *not* 0, then the data associated with this structure is maintained. - // In the situation where is_shutdown_ is true, and num_in_flight_ drops to 0 in the query - // callback, the query callback will free up the structure. - // - // There is one case which is not handled by this, which has to do with timeouts. The query - // timeout is currently set to essentially infinite. Thus, if a query is in-flight but never - // returns, the memory in this structure will never be freed. There isn't much we can do about - // that at this time, but we may want to consider changing the timeout so that the memory can - // eventually be freed up. - mutable std::mutex in_flight_mutex_; - bool is_shutdown_{false}; - size_t num_in_flight_{0}; -}; -} // namespace rmw_zenoh_cpp - -#endif // DETAIL__RMW_DATA_TYPES_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index 19357c3d..f3430280 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -319,6 +319,80 @@ void NodeData::delete_service_data(const rmw_service_t * const service) services_.erase(service); } + +///============================================================================= +bool NodeData::create_client_data( + const rmw_client_t * const client, + z_session_t session, + std::size_t id, + const std::string & service_name, + const rosidl_service_type_support_t * type_supports, + const rmw_qos_profile_t * qos_profile) +{ + std::lock_guard lock_guard(mutex_); + if (is_shutdown_) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create ClientData as the NodeData has been shutdown."); + return false; + } + + if (clients_.count(client) > 0) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "ClientData already exists."); + return false; + } + + auto client_data = ClientData::make( + std::move(session), + node_, + entity_->node_info(), + id_, + std::move(id), + std::move(service_name), + type_supports, + qos_profile); + if (client_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to make ClientData."); + return false; + } + + auto insertion = clients_.insert(std::make_pair(client, std::move(client_data))); + if (!insertion.second) { + return false; + } + return true; +} + +///============================================================================= +ClientDataPtr NodeData::get_client_data(const rmw_client_t * const client) +{ + std::lock_guard lock_guard(mutex_); + auto it = clients_.find(client); + if (it == clients_.end()) { + return nullptr; + } + + return it->second; +} + +///============================================================================= +void NodeData::delete_client_data(const rmw_client_t * const client) +{ + std::lock_guard lock_guard(mutex_); + auto client_it = clients_.find(client); + if (client_it == clients_.end()) { + return; + } + // Shutdown the client, then erase it. The code in rmw_client_data.cpp is careful about keeping + // it alive as long as necessary. + client_it->second->shutdown(); + clients_.erase(client); +} + ///============================================================================= rmw_ret_t NodeData::shutdown() { @@ -365,6 +439,18 @@ rmw_ret_t NodeData::shutdown() ); } } + for (auto cli_it = clients_.begin(); cli_it != clients_.end(); ++cli_it) { + ret = cli_it->second->shutdown(); + if (ret != RMW_RET_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to shutdown client %s within id %zu. rmw_ret_t code: %zu.", + cli_it->second->topic_info().name_.c_str(), + id_, + ret + ); + } + } // Unregister this node from the ROS graph. zc_liveliness_undeclare_token(z_move(token_)); diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp index c6c74e0d..f85b1366 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp @@ -25,6 +25,7 @@ #include "graph_cache.hpp" #include "liveliness_utils.hpp" +#include "rmw_client_data.hpp" #include "rmw_publisher_data.hpp" #include "rmw_subscription_data.hpp" #include "rmw_service_data.hpp" @@ -97,6 +98,21 @@ class NodeData final // Delete the ServiceData for a given rmw_service_t if present. void delete_service_data(const rmw_service_t * const service); + // Create a new ClientData for a given rmw_client_t. + bool create_client_data( + const rmw_client_t * const client, + z_session_t session, + std::size_t id, + const std::string & service_name, + const rosidl_service_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile); + + // Retrieve the ClientData for a given rmw_client_t if present. + ClientDataPtr get_client_data(const rmw_client_t * const client); + + // Delete the ClientData for a given rmw_client_t if present. + void delete_client_data(const rmw_client_t * const client); + // Shutdown this NodeData. rmw_ret_t shutdown(); @@ -132,6 +148,8 @@ class NodeData final std::unordered_map subs_; // Map of services. std::unordered_map services_; + // Map of clients. + std::unordered_map clients_; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 9b2777c8..b2ea8235 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -421,10 +421,10 @@ liveliness::TopicInfo PublisherData::topic_info() const } ///============================================================================= -void PublisherData::copy_gid(rmw_gid_t * gid) const +void PublisherData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const { std::lock_guard lock(mutex_); - entity_->copy_gid(gid->data); + entity_->copy_gid(out_gid); } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index 2a5f9240..7b124cc3 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -68,7 +68,7 @@ class PublisherData final liveliness::TopicInfo topic_info() const; // Copy the GID of this PublisherData into an rmw_gid_t. - void copy_gid(rmw_gid_t * gid) const; + void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; // Returns true if liveliness token is still valid. bool liveliness_is_valid() const; diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index 4fe298a0..63de9bcb 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -21,7 +21,6 @@ #include "detail/graph_cache.hpp" #include "detail/identifier.hpp" #include "detail/rmw_context_impl_s.hpp" -#include "detail/rmw_data_types.hpp" #include "detail/rmw_publisher_data.hpp" #include "detail/rmw_subscription_data.hpp" diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 29a81bd3..c61a44fa 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -22,7 +22,6 @@ #include "detail/identifier.hpp" #include "detail/liveliness_utils.hpp" #include "detail/rmw_context_impl_s.hpp" -#include "detail/rmw_data_types.hpp" #include "detail/zenoh_config.hpp" #include "rcutils/env.h" diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 761fbdcf..eebe1c4b 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -36,7 +36,6 @@ #include "detail/message_type_support.hpp" #include "detail/qos.hpp" #include "detail/rmw_context_impl_s.hpp" -#include "detail/rmw_data_types.hpp" #include "detail/serialization_format.hpp" #include "detail/type_support_common.hpp" #include "detail/zenoh_utils.hpp" @@ -1363,21 +1362,28 @@ rmw_create_client( const rmw_qos_profile_t * qos_profile) { RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return nullptr); - + RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr); RMW_CHECK_ARGUMENT_FOR_NULL(service_name, nullptr); if (strlen(service_name) == 0) { RMW_SET_ERROR_MSG("service name is empty string"); return nullptr; } RMW_CHECK_ARGUMENT_FOR_NULL(qos_profile, nullptr); - RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr); - + // Validate service name + int validation_result; + if (rmw_validate_full_topic_name(service_name, &validation_result, nullptr) != RMW_RET_OK) { + RMW_SET_ERROR_MSG("rmw_validate_full_topic_name failed"); + return nullptr; + } + if (validation_result != RMW_TOPIC_VALID && !qos_profile->avoid_ros_namespace_conventions) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("service name is malformed: %s", service_name); + return nullptr; + } RMW_CHECK_FOR_NULL_WITH_MSG( node->context, "expected initialized context", @@ -1397,23 +1403,22 @@ rmw_create_client( return nullptr; } - rcutils_allocator_t * allocator = &node->context->options.allocator; - - // Validate service name - int validation_result; - - if (rmw_validate_full_topic_name(service_name, &validation_result, nullptr) != RMW_RET_OK) { - RMW_SET_ERROR_MSG("rmw_validate_full_topic_name failed"); + // Get the service type support. + const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); + if (type_support == nullptr) { + // error was already set by find_service_type_support return nullptr; } - if (validation_result != RMW_TOPIC_VALID && !qos_profile->avoid_ros_namespace_conventions) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("service name is malformed: %s", service_name); + rcutils_allocator_t * allocator = &node->context->options.allocator; + if (!rcutils_allocator_is_valid(allocator)) { + RMW_SET_ERROR_MSG("allocator is invalid."); return nullptr; } - // client data - rmw_client_t * rmw_client = static_cast(allocator->zero_allocate( + // Create the rmw_client. + rmw_client_t * rmw_client = + static_cast(allocator->zero_allocate( 1, sizeof(rmw_client_t), allocator->state)); @@ -1421,226 +1426,46 @@ rmw_create_client( rmw_client, "failed to allocate memory for the client", return nullptr); - auto free_rmw_client = rcpputils::make_scope_exit( [rmw_client, allocator]() { allocator->deallocate(rmw_client, allocator->state); }); - auto client_data = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::rmw_client_data_t), allocator->state)); + auto node_data = context_impl->get_node_data(node); RMW_CHECK_FOR_NULL_WITH_MSG( - client_data, - "failed to allocate memory for client data", + node_data, + "NodeData not found.", return nullptr); - auto free_client_data = rcpputils::make_scope_exit( - [client_data, allocator]() { - allocator->deallocate(client_data, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW( - client_data, - client_data, - return nullptr, - rmw_zenoh_cpp::rmw_client_data_t, - ); - auto destruct_client_data = rcpputils::make_scope_exit( - [client_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - client_data->~rmw_client_data_t(), - rmw_zenoh_cpp::rmw_client_data_t); - }); - - // Adapt any 'best available' QoS options - client_data->adapted_qos_profile = *qos_profile; - rmw_ret_t ret = rmw_zenoh_cpp::QoS::get().best_available_qos( - nullptr, nullptr, &client_data->adapted_qos_profile, nullptr); - if (RMW_RET_OK != ret) { - RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); - return nullptr; - } - // Obtain the type support - const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); - if (type_support == nullptr) { - // error was already set by find_service_type_support + if (!node_data->create_client_data( + rmw_client, + context_impl->session(), + context_impl->get_next_entity_id(), + service_name, + type_support, + qos_profile)) + { + // Error already handled. return nullptr; } - auto service_members = static_cast(type_support->data); - auto request_members = static_cast( - service_members->request_members_->data); - auto response_members = static_cast( - service_members->response_members_->data); - - client_data->context = node->context; - client_data->typesupport_identifier = type_support->typesupport_identifier; - client_data->type_hash = type_support->get_type_hash_func(type_support); - client_data->request_type_support_impl = request_members; - client_data->response_type_support_impl = response_members; - - // Request type support - client_data->request_type_support = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::RequestTypeSupport), allocator->state)); - - RMW_CHECK_FOR_NULL_WITH_MSG( - client_data->request_type_support, - "Failed to allocate rmw_zenoh_cpp::RequestTypeSupport", - return nullptr); - auto free_request_type_support = rcpputils::make_scope_exit( - [client_data, allocator]() { - allocator->deallocate(client_data->request_type_support, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW( - client_data->request_type_support, - client_data->request_type_support, - return nullptr, - rmw_zenoh_cpp::RequestTypeSupport, service_members); - auto destruct_request_type_support = rcpputils::make_scope_exit( - [client_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - client_data->request_type_support->~RequestTypeSupport(), - rmw_zenoh_cpp::RequestTypeSupport); - }); - - // Response type support - client_data->response_type_support = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::ResponseTypeSupport), allocator->state)); - - RMW_CHECK_FOR_NULL_WITH_MSG( - client_data->response_type_support, - "Failed to allocate rmw_zenoh_cpp::ResponseTypeSupport", - return nullptr); - auto free_response_type_support = rcpputils::make_scope_exit( - [client_data, allocator]() { - allocator->deallocate(client_data->response_type_support, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW( - client_data->response_type_support, - client_data->response_type_support, - return nullptr, - rmw_zenoh_cpp::ResponseTypeSupport, service_members); - auto destruct_response_type_support = rcpputils::make_scope_exit( - [client_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - client_data->response_type_support->~ResponseTypeSupport(), - rmw_zenoh_cpp::ResponseTypeSupport); - }); - - // Populate the rmw_client. + // TODO(Yadunund): We cannot store the rmw_node_t * here since this type erased + // Client handle will be returned in the rmw_clients_t in rmw_wait + // from which we cannot obtain ClientData. + rmw_client->data = static_cast(node_data->get_client_data(rmw_client).get()); rmw_client->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; rmw_client->service_name = rcutils_strdup(service_name, *allocator); RMW_CHECK_FOR_NULL_WITH_MSG( rmw_client->service_name, - "failed to allocate client name", + "failed to allocate service client name", return nullptr); auto free_service_name = rcpputils::make_scope_exit( [rmw_client, allocator]() { allocator->deallocate(const_cast(rmw_client->service_name), allocator->state); }); - // Note: Service request/response types will contain a suffix Request_ or Response_. - // We remove the suffix when appending the type to the liveliness tokens for - // better reusability within GraphCache. - std::string service_type = client_data->request_type_support->get_name(); - size_t suffix_substring_position = service_type.find("Request_"); - if (std::string::npos != suffix_substring_position) { - service_type = service_type.substr(0, suffix_substring_position); - } else { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unexpected type %s for client %s. Report this bug", - service_type.c_str(), rmw_client->service_name); - return nullptr; - } - - // Convert the type hash to a string so that it can be included in - // the keyexpr. - char * type_hash_c_str = nullptr; - rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( - client_data->type_hash, - *allocator, - &type_hash_c_str); - if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { - RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); - return nullptr; - } - auto free_type_hash_c_str = rcpputils::make_scope_exit( - [&allocator, &type_hash_c_str]() { - allocator->deallocate(type_hash_c_str, allocator->state); - }); - - z_session_t session = context_impl->session(); - auto node_data = context_impl->get_node_data(node); - RMW_CHECK_FOR_NULL_WITH_MSG( - node_data, - "NodeData not found.", - return nullptr); - client_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(session), - std::to_string(node_data->id()), - std::to_string( - context_impl->get_next_entity_id()), - rmw_zenoh_cpp::liveliness::EntityType::Client, - rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave()}, - rmw_zenoh_cpp::liveliness::TopicInfo{ - node->context->actual_domain_id, - rmw_client->service_name, - std::move(service_type), - type_hash_c_str, - client_data->adapted_qos_profile} - ); - if (client_data->entity == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to generate keyexpr for liveliness token for the client %s.", - rmw_client->service_name); - return nullptr; - } - - client_data->keyexpr = z_keyexpr_new(client_data->entity->topic_info()->topic_keyexpr_.c_str()); - auto free_ros_keyexpr = rcpputils::make_scope_exit( - [client_data]() { - z_keyexpr_drop(z_move(client_data->keyexpr)); - }); - if (!z_keyexpr_check(&client_data->keyexpr)) { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return nullptr; - } - - client_data->token = zc_liveliness_declare_token( - session, - z_keyexpr(client_data->entity->liveliness_keyexpr().c_str()), - NULL - ); - auto free_token = rcpputils::make_scope_exit( - [client_data]() { - if (client_data != nullptr) { - z_drop(z_move(client_data->token)); - } - }); - if (!z_check(client_data->token)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the client."); - return nullptr; - } - - rmw_client->data = client_data; - - free_token.cancel(); free_rmw_client.cancel(); - free_client_data.cancel(); - destruct_request_type_support.cancel(); - free_request_type_support.cancel(); - destruct_response_type_support.cancel(); - free_response_type_support.cancel(); - destruct_client_data.cancel(); free_service_name.cancel(); - free_ros_keyexpr.cancel(); return rmw_client; } @@ -1650,10 +1475,12 @@ rmw_create_client( rmw_ret_t rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) { - // ASSERTIONS ================================================================ RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_s * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, @@ -1664,36 +1491,15 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) client->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - - rcutils_allocator_t * allocator = &node->context->options.allocator; - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); - RMW_CHECK_FOR_NULL_WITH_MSG( - client_data, - "client implementation pointer is null.", - return RMW_RET_INVALID_ARGUMENT); - - // CLEANUP =================================================================== - z_drop(z_move(client_data->keyexpr)); - zc_liveliness_undeclare_token(z_move(client_data->token)); - - RMW_TRY_DESTRUCTOR( - client_data->request_type_support->~RequestTypeSupport(), rmw_zenoh_cpp::RequestTypeSupport, ); - allocator->deallocate(client_data->request_type_support, allocator->state); - - RMW_TRY_DESTRUCTOR( - client_data->response_type_support->~ResponseTypeSupport(), rmw_zenoh_cpp::ResponseTypeSupport, - ); - allocator->deallocate(client_data->response_type_support, allocator->state); - - // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for - // why we need to do this. - if (!client_data->shutdown_and_query_in_flight()) { - RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); - allocator->deallocate(client->data, allocator->state); + auto node_data = context_impl->get_node_data(node); + if (node_data == nullptr) { + return RMW_RET_INVALID_ARGUMENT; } + // Remove the ClientData from NodeData. + node_data->delete_client_data(client); + + rcutils_allocator_t * allocator = &node->context->options.allocator; allocator->deallocate(const_cast(client->service_name), allocator->state); allocator->deallocate(client, allocator->state); @@ -1710,115 +1516,23 @@ rmw_send_request( { RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(sequence_id, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_FOR_NULL_WITH_MSG( + client->service_name, "client has no service name", RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( client, client->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); RMW_CHECK_FOR_NULL_WITH_MSG( client_data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(sequence_id, RMW_RET_INVALID_ARGUMENT); - if (client_data->is_shutdown()) { - return RMW_RET_ERROR; - } - - rmw_context_impl_s * context_impl = static_cast( - client_data->context->impl); - - // Serialize data - - rcutils_allocator_t * allocator = &(client_data->context->options.allocator); - - size_t max_data_length = ( - client_data->request_type_support->get_estimated_serialized_size( - ros_request, client_data->request_type_support_impl)); - - // Init serialized message byte array - char * request_bytes = static_cast(allocator->allocate( - max_data_length, - allocator->state)); - if (!request_bytes) { - RMW_SET_ERROR_MSG("failed allocate request message bytes"); - return RMW_RET_ERROR; - } - auto free_request_bytes = rcpputils::make_scope_exit( - [request_bytes, allocator]() { - allocator->deallocate(request_bytes, allocator->state); - }); - - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer(request_bytes, max_data_length); - - // Object that serializes the data - rmw_zenoh_cpp::Cdr ser(fastbuffer); - if (!client_data->request_type_support->serialize_ros_message( - ros_request, - ser.get_cdr(), - client_data->request_type_support_impl)) - { - return RMW_RET_ERROR; - } - - size_t data_length = ser.get_serialized_data_length(); - - *sequence_id = client_data->get_next_sequence_number(); - - // Send request - z_get_options_t opts = z_get_options_default(); - - z_owned_bytes_map_t map = rmw_zenoh_cpp::create_map_and_set_sequence_num( - *sequence_id, - [client_data](z_owned_bytes_map_t * map, const char * key) - { - uint8_t local_gid[RMW_GID_STORAGE_SIZE]; - client_data->entity->copy_gid(local_gid); - z_bytes_t gid_bytes; - gid_bytes.len = RMW_GID_STORAGE_SIZE; - gid_bytes.start = local_gid; - z_bytes_map_insert_by_copy(map, z_bytes_new(key), gid_bytes); - }); - if (!z_check(map)) { - // create_map_and_set_sequence_num already set the error - return RMW_RET_ERROR; - } - auto free_attachment_map = rcpputils::make_scope_exit( - [&map]() { - z_bytes_map_drop(z_move(map)); - }); - - // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for - // why we need to do this. - client_data->increment_in_flight_callbacks(); - - opts.attachment = z_bytes_map_as_attachment(&map); - - opts.target = Z_QUERY_TARGET_ALL_COMPLETE; - // The default timeout for a z_get query is 10 seconds and if a response is not received within - // this window, the queryable will return an invalid reply. However, it is common for actions, - // which are implemented using services, to take an extended duration to complete. Hence, we set - // the timeout_ms to the largest supported value to account for most realistic scenarios. - opts.timeout_ms = std::numeric_limits::max(); - // Latest consolidation guarantees unicity of replies for the same key expression, - // which optimizes bandwidth. The default is "None", which imples replies may come in any order - // and any number. - opts.consolidation = z_query_consolidation_latest(); - opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; - z_owned_closure_reply_t zn_closure_reply = - z_closure(rmw_zenoh_cpp::client_data_handler, rmw_zenoh_cpp::client_data_drop, client_data); - z_get( - context_impl->session(), - z_loan(client_data->keyexpr), "", - z_move(zn_closure_reply), - &opts); - - return RMW_RET_OK; + return client_data->send_request(ros_request, sequence_id); } //============================================================================== @@ -1830,12 +1544,10 @@ rmw_take_response( void * ros_response, bool * taken) { + RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); *taken = false; - RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(ros_response, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( client, client->implementation_identifier, @@ -1843,69 +1555,13 @@ rmw_take_response( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_FOR_NULL_WITH_MSG( client->service_name, "client has no service name", RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); RMW_CHECK_FOR_NULL_WITH_MSG( client->data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_response, RMW_RET_INVALID_ARGUMENT); - std::unique_ptr latest_reply = client_data->pop_next_reply(); - if (latest_reply == nullptr) { - // This tells rcl that the check for a new message was done, but no messages have come in yet. - return RMW_RET_OK; - } - - std::optional sample = latest_reply->get_sample(); - if (!sample) { - RMW_SET_ERROR_MSG("invalid reply sample"); - return RMW_RET_ERROR; - } - - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(sample->payload.start)), - sample->payload.len); - - // Object that serializes the data - rmw_zenoh_cpp::Cdr deser(fastbuffer); - if (!client_data->response_type_support->deserialize_ros_message( - deser.get_cdr(), - ros_response, - client_data->response_type_support_impl)) - { - RMW_SET_ERROR_MSG("could not deserialize ROS response"); - return RMW_RET_ERROR; - } - - // Fill in the request_header - - request_header->request_id.sequence_number = - rmw_zenoh_cpp::get_int64_from_attachment(&sample->attachment, "sequence_number"); - if (request_header->request_id.sequence_number < 0) { - RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); - return RMW_RET_ERROR; - } - - request_header->source_timestamp = - rmw_zenoh_cpp::get_int64_from_attachment(&sample->attachment, "source_timestamp"); - if (request_header->source_timestamp < 0) { - RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); - return RMW_RET_ERROR; - } - - if (!rmw_zenoh_cpp::get_gid_from_attachment( - &sample->attachment, - request_header->request_id.writer_guid)) - { - RMW_SET_ERROR_MSG("Could not get client gid from attachment"); - return RMW_RET_ERROR; - } - - request_header->received_timestamp = latest_reply->get_received_timestamp(); - - *taken = true; - - return RMW_RET_OK; + return client_data->take_response(request_header, ros_response, taken); } //============================================================================== @@ -1922,12 +1578,11 @@ rmw_client_request_publisher_get_actual_qos( rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); - *qos = client_data->adapted_qos_profile; + *qos = client_data->topic_info().qos_; return RMW_RET_OK; } @@ -1951,7 +1606,6 @@ rmw_create_service( const char * service_name, const rmw_qos_profile_t * qos_profile) { - // ASSERTIONS ================================================================ RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, @@ -2004,7 +1658,6 @@ rmw_create_service( return nullptr; } - // SERVICE DATA ============================================================== rcutils_allocator_t * allocator = &node->context->options.allocator; if (!rcutils_allocator_is_valid(allocator)) { RMW_SET_ERROR_MSG("allocator is invalid."); @@ -2045,7 +1698,7 @@ rmw_create_service( } // TODO(Yadunund): We cannot store the rmw_node_t * here since this type erased - // Service handle will be returned in the rmw_service_t in rmw_wait + // Service handle will be returned in the rmw_services_t in rmw_wait // from which we cannot obtain ServiceData. rmw_service->data = static_cast(node_data->get_service_data(rmw_service).get()); rmw_service->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; @@ -2436,8 +2089,8 @@ check_and_attach_condition( if (clients) { for (size_t i = 0; i < clients->client_count; ++i) { - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(clients->clients[i]); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(clients->clients[i]); if (client_data == nullptr) { continue; } @@ -2597,8 +2250,8 @@ rmw_wait( if (clients) { for (size_t i = 0; i < clients->client_count; ++i) { - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(clients->clients[i]); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(clients->clients[i]); if (client_data == nullptr) { continue; } @@ -2798,7 +2451,7 @@ rmw_get_gid_for_publisher(const rmw_publisher_t * publisher, rmw_gid_t * gid) RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); gid->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - pub_data->copy_gid(gid); + pub_data->copy_gid(gid->data); return RMW_RET_OK; } @@ -2810,12 +2463,12 @@ rmw_get_gid_for_client(const rmw_client_t * client, rmw_gid_t * gid) { RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(gid, RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); + RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); gid->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - client_data->entity->copy_gid(gid->data); + client_data->copy_gid(gid->data); return RMW_RET_OK; } @@ -2861,29 +2514,16 @@ rmw_service_server_is_available( RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(is_available, RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); if (client_data == nullptr) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( "Unable to retreive client_data from client for service %s", client->service_name); return RMW_RET_INVALID_ARGUMENT; } - std::string service_type = client_data->request_type_support->get_name(); - size_t suffix_substring_position = service_type.find("Request_"); - if (std::string::npos != suffix_substring_position) { - service_type = service_type.substr(0, suffix_substring_position); - } else { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unexpected type %s for client %s. Report this bug", - service_type.c_str(), client->service_name); - return RMW_RET_INVALID_ARGUMENT; - } - return node->context->impl->graph_cache()->service_server_is_available( - client->service_name, service_type.c_str(), is_available); + client_data->topic_info(), is_available); } //============================================================================== @@ -2956,11 +2596,11 @@ rmw_client_set_on_new_response_callback( const void * user_data) { RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); - client_data->data_callback_mgr.set_callback( - user_data, std::move(callback)); + client_data->set_on_new_response_callback( + std::move(callback), user_data); return RMW_RET_OK; } } // extern "C" From 2976b9cda94686d2c2f495ad5c4d698b9a281a15 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 13 Nov 2024 11:32:04 -0500 Subject: [PATCH 3/4] Remove the unneeded initialized function from rmw_client_data. (#309) Not only is this unneeded, it is also improperly initialized so was causing some problems in tests. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 3 +-- rmw_zenoh_cpp/src/detail/rmw_client_data.hpp | 2 -- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index 8c851a38..84ae0328 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -532,7 +532,7 @@ rmw_ret_t ClientData::shutdown() { rmw_ret_t ret = RMW_RET_OK; std::lock_guard lock(mutex_); - if (is_shutdown_ || !initialized_) { + if (is_shutdown_) { return ret; } @@ -545,7 +545,6 @@ rmw_ret_t ClientData::shutdown() } is_shutdown_ = true; - initialized_ = false; return RMW_RET_OK; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index b4c56775..cee312b1 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -139,8 +139,6 @@ class ClientData final : public std::enable_shared_from_this size_t sequence_number_; // Shutdown flag. bool is_shutdown_; - // Whether the object has ever successfully been initialized. - bool initialized_; }; using ClientDataPtr = std::shared_ptr; using ClientDataConstPtr = std::shared_ptr; From e29ac6de6f22852741d777318577906e5e5378a1 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Thu, 14 Nov 2024 00:44:36 -0500 Subject: [PATCH 4/4] Redo the per-client-data in-flight monitoring. (#310) * Redo the per-client-data in-flight monitoring. The previous approach with using a global map of pointers to shared_ptr does not work for one simple reason; on shutdown, C++ does not define the order in which statics are destroyed. Thus it is possible (and even likely in some cases) that on Ctrl-C, the global map would be deleted, and then we would try to access it, leading to crashes. I was seeing some of these while running the rcl tests. Instead, go back to the previous solution where we were storing num_in_flight completely within the ClientData structure. This requires doing a bit of gymnastics with the data types, and reintrodues the possible UB during shutdown if the structure was destructed but there is still queries in flight. But it otherwise fixes all of the tests for me locally. Signed-off-by: Chris Lalancette * Fixes from review. Signed-off-by: Chris Lalancette --------- Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 109 ++++++++++++------- rmw_zenoh_cpp/src/detail/rmw_client_data.hpp | 22 +++- rmw_zenoh_cpp/src/detail/rmw_node_data.cpp | 8 +- 3 files changed, 91 insertions(+), 48 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index 84ae0328..cf0fcb4e 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -43,10 +43,6 @@ namespace { -std::mutex client_data_ptr_to_shared_ptr_map_mutex; -std::unordered_map> client_data_ptr_to_shared_ptr_map; - ///============================================================================= void client_data_handler(z_owned_reply_t * reply, void * data) { @@ -59,16 +55,7 @@ void client_data_handler(z_owned_reply_t * reply, void * data) return; } - std::shared_ptr client_data_shared_ptr{nullptr}; - { - std::lock_guard lk(client_data_ptr_to_shared_ptr_map_mutex); - if (client_data_ptr_to_shared_ptr_map.count(client_data) == 0) { - return; - } - client_data_shared_ptr = client_data_ptr_to_shared_ptr_map[client_data]; - } - - if (client_data_shared_ptr->is_shutdown()) { + if (client_data->is_shutdown()) { return; } @@ -94,13 +81,28 @@ void client_data_handler(z_owned_reply_t * reply, void * data) std::chrono::nanoseconds::rep received_timestamp = std::chrono::system_clock::now().time_since_epoch().count(); - client_data_shared_ptr->add_new_reply( + client_data->add_new_reply( std::make_unique(reply, received_timestamp)); // Since we took ownership of the reply, null it out here *reply = z_reply_null(); } +///============================================================================= +void client_data_drop(void * data) +{ + auto client_data = static_cast(data); + if (client_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain client_data_t from data in client_data_drop." + ); + return; + } + + client_data->decrement_in_flight_and_conditionally_remove(); +} + } // namespace namespace rmw_zenoh_cpp @@ -109,6 +111,7 @@ namespace rmw_zenoh_cpp std::shared_ptr ClientData::make( z_session_t session, const rmw_node_t * const node, + const rmw_client_t * client, liveliness::NodeInfo node_info, std::size_t node_id, std::size_t service_id, @@ -191,6 +194,7 @@ std::shared_ptr ClientData::make( std::shared_ptr client_data = std::shared_ptr( new ClientData{ node, + client, entity, request_members, response_members, @@ -203,21 +207,20 @@ std::shared_ptr ClientData::make( return nullptr; } - std::lock_guard lk(client_data_ptr_to_shared_ptr_map_mutex); - client_data_ptr_to_shared_ptr_map.emplace(client_data.get(), client_data); - return client_data; } ///============================================================================= ClientData::ClientData( const rmw_node_t * rmw_node, + const rmw_client_t * rmw_client, std::shared_ptr entity, const void * request_type_support_impl, const void * response_type_support_impl, std::shared_ptr request_type_support, std::shared_ptr response_type_support) : rmw_node_(rmw_node), + rmw_client_(rmw_client), entity_(std::move(entity)), request_type_support_impl_(request_type_support_impl), response_type_support_impl_(response_type_support_impl), @@ -225,7 +228,8 @@ ClientData::ClientData( response_type_support_(response_type_support), wait_set_data_(nullptr), sequence_number_(1), - is_shutdown_(false) + is_shutdown_(false), + num_in_flight_(0) { // Do nothing. } @@ -269,28 +273,28 @@ bool ClientData::init(z_session_t session) ///============================================================================= liveliness::TopicInfo ClientData::topic_info() const { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); return entity_->topic_info().value(); } ///============================================================================= bool ClientData::liveliness_is_valid() const { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); return zc_liveliness_token_check(&token_); } ///============================================================================= void ClientData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); entity_->copy_gid(out_gid); } ///============================================================================= void ClientData::add_new_reply(std::unique_ptr reply) { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); const rmw_qos_profile_t adapted_qos_profile = entity_->topic_info().value().qos_; if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && @@ -324,7 +328,7 @@ rmw_ret_t ClientData::take_response( void * ros_response, bool * taken) { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); *taken = false; if (is_shutdown_ || reply_queue_.empty()) { @@ -389,7 +393,7 @@ rmw_ret_t ClientData::send_request( const void * ros_request, int64_t * sequence_id) { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); if (is_shutdown_) { return RMW_RET_OK; } @@ -468,7 +472,7 @@ rmw_ret_t ClientData::send_request( // TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures, // capture shared_from_this() instead of this. z_owned_closure_reply_t zn_closure_reply = - z_closure(client_data_handler, nullptr, this); + z_closure(client_data_handler, client_data_drop, this); z_get( context_impl->session(), z_loan(keyexpr_), "", @@ -481,11 +485,6 @@ rmw_ret_t ClientData::send_request( ///============================================================================= ClientData::~ClientData() { - { - std::lock_guard lk(client_data_ptr_to_shared_ptr_map_mutex); - client_data_ptr_to_shared_ptr_map.erase(this); - } - const rmw_ret_t ret = this->shutdown(); if (ret != RMW_RET_OK) { RMW_ZENOH_LOG_ERROR_NAMED( @@ -501,7 +500,7 @@ void ClientData::set_on_new_response_callback( rmw_event_callback_t callback, const void * user_data) { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); data_callback_mgr_.set_callback(user_data, std::move(callback)); } @@ -509,7 +508,7 @@ void ClientData::set_on_new_response_callback( bool ClientData::queue_has_data_and_attach_condition_if_not( rmw_wait_set_data_t * wait_set_data) { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); if (!reply_queue_.empty()) { return true; } @@ -521,19 +520,17 @@ bool ClientData::queue_has_data_and_attach_condition_if_not( ///============================================================================= bool ClientData::detach_condition_and_queue_is_empty() { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); wait_set_data_ = nullptr; return reply_queue_.empty(); } ///============================================================================= -rmw_ret_t ClientData::shutdown() +void ClientData::_shutdown() { - rmw_ret_t ret = RMW_RET_OK; - std::lock_guard lock(mutex_); if (is_shutdown_) { - return ret; + return; } // Unregister this node from the ROS graph. @@ -545,13 +542,47 @@ rmw_ret_t ClientData::shutdown() } is_shutdown_ = true; +} + +///============================================================================= +rmw_ret_t ClientData::shutdown() +{ + std::lock_guard lock(mutex_); + _shutdown(); return RMW_RET_OK; } +///============================================================================= +bool ClientData::shutdown_and_query_in_flight() +{ + std::lock_guard lock(mutex_); + _shutdown(); + return num_in_flight_ > 0; +} + +///============================================================================= +void ClientData::decrement_in_flight_and_conditionally_remove() +{ + std::lock_guard lock(mutex_); + --num_in_flight_; + + if (is_shutdown_ && num_in_flight_ == 0) { + rmw_context_impl_s * context_impl = static_cast(rmw_node_->data); + if (context_impl == nullptr) { + return; + } + std::shared_ptr node_data = context_impl->get_node_data(rmw_node_); + if (node_data == nullptr) { + return; + } + node_data->delete_client_data(rmw_client_); + } +} + ///============================================================================= bool ClientData::is_shutdown() const { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); return is_shutdown_; } } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index cee312b1..eda4bf9c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -49,6 +49,7 @@ class ClientData final : public std::enable_shared_from_this static std::shared_ptr make( z_session_t session, const rmw_node_t * const node, + const rmw_client_t * client, liveliness::NodeInfo node_info, std::size_t node_id, std::size_t service_id, @@ -79,22 +80,27 @@ class ClientData final : public std::enable_shared_from_this const void * ros_request, int64_t * sequence_id); + // Set a callback to be called when events happen. void set_on_new_response_callback( rmw_event_callback_t callback, const void * user_data); - // rmw_wait helpers. + // Check if there is data in the queue, and if not attach the wait set condition variable. bool queue_has_data_and_attach_condition_if_not( rmw_wait_set_data_t * wait_set_data); + // Detach any attached wait set condition variable, and return whether there is data in the queue. bool detach_condition_and_queue_is_empty(); - // See the comment for "num_in_flight" below on the use of this method. - void decrement_queries_in_flight(); - // Shutdown this ClientData. rmw_ret_t shutdown(); + // Shutdown this ClientData, and return whether there are any requests currently in flight. + bool shutdown_and_query_in_flight(); + + // Decrement the in flight requests, and if that drops to 0 remove the client from the node. + void decrement_in_flight_and_conditionally_remove(); + // Check if this ClientData is shutdown. bool is_shutdown() const; @@ -105,6 +111,7 @@ class ClientData final : public std::enable_shared_from_this // Constructor. ClientData( const rmw_node_t * rmw_node, + const rmw_client_t * client, std::shared_ptr entity, const void * request_type_support_impl, const void * response_type_support_impl, @@ -114,10 +121,14 @@ class ClientData final : public std::enable_shared_from_this // Initialize the Zenoh objects for this entity. bool init(z_session_t session); + // Shutdown this client (the mutex is expected to be held by the caller). + void _shutdown(); + // Internal mutex. - mutable std::mutex mutex_; + mutable std::recursive_mutex mutex_; // The parent node. const rmw_node_t * rmw_node_; + const rmw_client_t * rmw_client_; // The Entity generated for the service. std::shared_ptr entity_; // An owned keyexpression. @@ -139,6 +150,7 @@ class ClientData final : public std::enable_shared_from_this size_t sequence_number_; // Shutdown flag. bool is_shutdown_; + size_t num_in_flight_; }; using ClientDataPtr = std::shared_ptr; using ClientDataConstPtr = std::shared_ptr; diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index f3430280..bd3f3f6e 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -347,6 +347,7 @@ bool NodeData::create_client_data( auto client_data = ClientData::make( std::move(session), node_, + client, entity_->node_info(), id_, std::move(id), @@ -387,10 +388,9 @@ void NodeData::delete_client_data(const rmw_client_t * const client) if (client_it == clients_.end()) { return; } - // Shutdown the client, then erase it. The code in rmw_client_data.cpp is careful about keeping - // it alive as long as necessary. - client_it->second->shutdown(); - clients_.erase(client); + if (!client_it->second->shutdown_and_query_in_flight()) { + clients_.erase(client); + } } ///=============================================================================