From 67ed661cb9af460b0c4e0aa0b5f6e2e163b6a82c Mon Sep 17 00:00:00 2001 From: Yadu Date: Fri, 27 Sep 2024 10:50:50 -0700 Subject: [PATCH] Thread-safe access to graph cache (#258) * Make rmw_context_impl_s an RAII class Signed-off-by: Yadunund * fix regression with graph_guard_condition not triggering when entity is removed Signed-off-by: Yadunund * Have the context create the zenoh artifacts Signed-off-by: Yadunund * Add comment for session() api Signed-off-by: Yadunund * Style Signed-off-by: Yadunund * Add api to register querying_sub cb in rmw_context_impl_s Signed-off-by: Yadunund * Have rmw_context_impl_s return a shared_ptr to GraphCache Signed-off-by: Yadunund * Add todo on thread safety Signed-off-by: Yadunund * Update rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp Co-authored-by: Chris Lalancette Signed-off-by: Yadu * Address feedback Signed-off-by: Yadunund * Do not use allocator for creating graph_guard_condition Signed-off-by: Yadunund * Address feedback Signed-off-by: Yadunund --------- Signed-off-by: Yadunund Signed-off-by: Yadu Co-authored-by: Chris Lalancette --- rmw_zenoh_cpp/CMakeLists.txt | 1 + .../src/detail/rmw_context_impl_s.cpp | 383 ++++++++++++++++++ .../src/detail/rmw_context_impl_s.hpp | 138 +++++++ rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 8 - rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 32 -- rmw_zenoh_cpp/src/rmw_event.cpp | 9 +- .../src/rmw_get_node_info_and_types.cpp | 18 +- .../src/rmw_get_service_names_and_types.cpp | 6 +- .../src/rmw_get_topic_endpoint_info.cpp | 10 +- .../src/rmw_get_topic_names_and_types.cpp | 6 +- rmw_zenoh_cpp/src/rmw_init.cpp | 324 +-------------- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 122 +++--- 12 files changed, 630 insertions(+), 427 deletions(-) create mode 100644 rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp create mode 100644 rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index 4ecfe360..c548a6a7 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -38,6 +38,7 @@ add_library(rmw_zenoh_cpp SHARED src/detail/logging.cpp src/detail/message_type_support.cpp src/detail/qos.cpp + src/detail/rmw_context_impl_s.cpp src/detail/rmw_data_types.cpp src/detail/service_type_support.cpp src/detail/type_support.cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp new file mode 100644 index 00000000..d60bece8 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -0,0 +1,383 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw_context_impl_s.hpp" + +#include +#include +#include +#include +#include + +#include "guard_condition.hpp" +#include "identifier.hpp" +#include "liveliness_utils.hpp" +#include "logging_macros.hpp" +#include "zenoh_config.hpp" +#include "zenoh_router_check.hpp" + +#include "rcpputils/scope_exit.hpp" +#include "rmw/error_handling.h" +#include "rmw/impl/cpp/macros.hpp" + +// Megabytes of SHM to reserve. +// TODO(clalancette): Make this configurable, or get it from the configuration +#define SHM_BUFFER_SIZE_MB 10 + +///============================================================================= +void rmw_context_impl_s::graph_sub_data_handler(const z_sample_t * sample, void * data) +{ + z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); + auto free_keystr = rcpputils::make_scope_exit( + [&keystr]() { + z_drop(z_move(keystr)); + }); + + auto data_ptr = static_cast(data); + if (data_ptr == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "[graph_sub_data_handler] Invalid data_ptr." + ); + return; + } + + // Update the graph cache. + std::lock_guard lock(data_ptr->mutex_); + if (data_ptr->is_shutdown_) { + return; + } + switch (sample->kind) { + case z_sample_kind_t::Z_SAMPLE_KIND_PUT: + data_ptr->graph_cache_->parse_put(keystr._cstr); + break; + case z_sample_kind_t::Z_SAMPLE_KIND_DELETE: + data_ptr->graph_cache_->parse_del(keystr._cstr); + break; + default: + return; + } + + // Trigger the ROS graph guard condition. + rmw_ret_t rmw_ret = rmw_trigger_guard_condition(data_ptr->graph_guard_condition_.get()); + if (RMW_RET_OK != rmw_ret) { + RMW_ZENOH_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "[graph_sub_data_handler] Unable to trigger graph guard condition." + ); + } +} + +///============================================================================= +rmw_context_impl_s::Data::Data( + const std::string & enclave, + z_owned_session_t session, + std::optional shm_manager, + const std::string & liveliness_str, + std::shared_ptr graph_cache) +: enclave_(std::move(enclave)), + session_(std::move(session)), + shm_manager_(std::move(shm_manager)), + liveliness_str_(std::move(liveliness_str)), + graph_cache_(std::move(graph_cache)), + is_shutdown_(false), + next_entity_id_(0), + is_initialized_(false) +{ + graph_guard_condition_ = std::make_unique(); + graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; + graph_guard_condition_->data = &guard_condition_data_; +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph() +{ + std::lock_guard lock(mutex_); + if (is_initialized_) { + return RMW_RET_OK; + } + // Setup the liveliness subscriber to receives updates from the ROS graph + // and update the graph cache. + // TODO(Yadunund): This closure is still not 100% thread safe as we are + // passing Data* as the type erased argument to z_closure. Thus during + // the execution of graph_sub_data_handler, the rawptr may be freed/reset + // by a different thread. When we switch to zenoh-cpp we can replace z_closure + // with a lambda that captures a weak_ptr by copy. The lambda and caputed + // weak_ptr will have the same lifetime as the subscriber. Then within + // graph_sub_data_handler, we would first lock to weak_ptr to check if the + // shared_ptr exits. If it does, then even if a different thread calls + // rmw_context_fini() to destroy rmw_context_impl_s, the locked + // shared_ptr would live on until the graph_sub_data_handler callback. + auto sub_options = zc_liveliness_subscriber_options_null(); + z_owned_closure_sample_t callback = z_closure( + rmw_context_impl_s::graph_sub_data_handler, nullptr, this); + graph_subscriber_ = zc_liveliness_declare_subscriber( + z_loan(session_), + z_keyexpr(liveliness_str_.c_str()), + z_move(callback), + &sub_options); + zc_liveliness_subscriber_options_drop(z_move(sub_options)); + auto undeclare_z_sub = rcpputils::make_scope_exit( + [this]() { + z_undeclare_subscriber(z_move(this->graph_subscriber_)); + }); + if (!z_check(graph_subscriber_)) { + RMW_SET_ERROR_MSG("unable to create zenoh subscription"); + return RMW_RET_ERROR; + } + + undeclare_z_sub.cancel(); + is_initialized_ = true; + return RMW_RET_OK; +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::Data::shutdown() +{ + std::lock_guard lock(mutex_); + if (is_shutdown_) { + return RMW_RET_OK; + } + + z_undeclare_subscriber(z_move(graph_subscriber_)); + if (shm_manager_.has_value()) { + z_drop(z_move(shm_manager_.value())); + } + // Close the zenoh session + if (z_close(z_move(session_)) < 0) { + RMW_SET_ERROR_MSG("Error while closing zenoh session"); + return RMW_RET_ERROR; + } + is_shutdown_ = true; + return RMW_RET_OK; +} + +///============================================================================= +rmw_context_impl_s::Data::~Data() +{ + auto ret = this->shutdown(); + static_cast(ret); +} + +///============================================================================= +rmw_context_impl_s::rmw_context_impl_s( + const std::size_t domain_id, + const std::string & enclave) +{ + // Initialize the zenoh configuration. + z_owned_config_t config; + rmw_ret_t ret; + if ((ret = + rmw_zenoh_cpp::get_z_config( + rmw_zenoh_cpp::ConfigurableEntity::Session, + &config)) != RMW_RET_OK) + { + throw std::runtime_error("Error configuring Zenoh session."); + } + + // Check if shm is enabled. + z_owned_str_t shm_enabled = zc_config_get(z_loan(config), "transport/shared_memory/enabled"); + auto always_free_shm_enabled = rcpputils::make_scope_exit( + [&shm_enabled]() { + z_drop(z_move(shm_enabled)); + }); + + // Initialize the zenoh session. + z_owned_session_t session = z_open(z_move(config)); + if (!z_session_check(&session)) { + throw std::runtime_error("Error setting up zenoh session."); + } + auto close_session = rcpputils::make_scope_exit( + [&session]() { + z_close(z_move(session)); + }); + + // TODO(Yadunund) Move this check into a separate thread. + // Verify if the zenoh router is running if configured. + const std::optional configured_connection_attempts = + rmw_zenoh_cpp::zenoh_router_check_attempts(); + if (configured_connection_attempts.has_value()) { + ret = RMW_RET_ERROR; + uint64_t connection_attempts = 0; + // Retry until the connection is successful. + while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) { + if ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(session))) != RMW_RET_OK) { + ++connection_attempts; + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + if (ret != RMW_RET_OK) { + throw std::runtime_error( + "Unable to connect to a Zenoh router after " + + std::to_string(configured_connection_attempts.value()) + + " retries."); + } + } + + // Initialize the graph cache. + const z_id_t zid = z_info_zid(z_loan(session)); + auto graph_cache = std::make_shared(zid); + // Setup liveliness subscriptions for discovery. + std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token( + domain_id); + + // Query router/liveliness participants to get graph information before this session was started. + // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive + // replies for the zc_liveliness_get() call. This is necessary as if the `bound` + // is too low, the channel may starve the zenoh executor of its threads which + // would lead to deadlocks when trying to receive replies and block the + // execution here. + // The blocking channel will return when the sender end is closed which is + // the moment the query finishes. + // The non-blocking fifo exists only for the use case where we don't want to + // block the thread between responses (including the request termination response). + // In general, unless we want to cooperatively schedule other tasks on the same + // thread as reading the fifo, the blocking fifo will be more appropriate as + // the code will be simpler, and if we're just going to spin over the non-blocking + // reads until we obtain responses, we'll just be hogging CPU time by convincing + // the OS that we're doing actual work when it could instead park the thread. + z_owned_reply_channel_t channel = zc_reply_fifo_new(0); + zc_liveliness_get( + z_loan(session), z_keyexpr(liveliness_str.c_str()), + z_move(channel.send), NULL); + z_owned_reply_t reply = z_reply_null(); + for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); + call_success = z_call(channel.recv, &reply)) + { + if (!call_success) { + continue; + } + if (z_reply_is_ok(&reply)) { + z_sample_t sample = z_reply_ok(&reply); + z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); + // Ignore tokens from the same session to avoid race conditions from this + // query and the liveliness subscription. + graph_cache->parse_put(z_loan(keystr), true); + z_drop(z_move(keystr)); + } else { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n"); + } + } + z_drop(z_move(reply)); + z_drop(z_move(channel)); + + // Initialize the shm manager if shared_memory is enabled in the config. + std::optional shm_manager = std::nullopt; + if (shm_enabled._cstr != nullptr && + strcmp(shm_enabled._cstr, "true") == 0) + { + char idstr[sizeof(zid.id) * 2 + 1]; // 2 bytes for each byte of the id, plus the trailing \0 + static constexpr size_t max_size_of_each = 3; // 2 for each byte, plus the trailing \0 + for (size_t i = 0; i < sizeof(zid.id); ++i) { + snprintf(idstr + 2 * i, max_size_of_each, "%02x", zid.id[i]); + } + idstr[sizeof(zid.id) * 2] = '\0'; + // TODO(yadunund): Can we get the size of the shm from the config even though it's not + // a standard parameter? + shm_manager = + zc_shm_manager_new( + z_loan(session), + idstr, + SHM_BUFFER_SIZE_MB * 1024 * 1024); + if (!shm_manager.has_value() || + !zc_shm_manager_check(&shm_manager.value())) + { + throw std::runtime_error("Unable to create shm manager."); + } + } + auto free_shm_manager = rcpputils::make_scope_exit( + [&shm_manager]() { + if (shm_manager.has_value()) { + z_drop(z_move(shm_manager.value())); + } + }); + + close_session.cancel(); + free_shm_manager.cancel(); + + data_ = std::make_shared( + std::move(enclave), + std::move(session), + std::move(shm_manager), + std::move(liveliness_str), + std::move(graph_cache)); + + ret = data_->subscribe_to_ros_graph(); + if (ret != RMW_RET_OK) { + throw std::runtime_error("Unable to subscribe to ROS Graph updates."); + } +} + +///============================================================================= +std::string rmw_context_impl_s::enclave() const +{ + std::lock_guard lock(data_->mutex_); + return data_->enclave_; +} + +///============================================================================= +z_session_t rmw_context_impl_s::session() const +{ + std::lock_guard lock(data_->mutex_); + return z_loan(data_->session_); +} + +///============================================================================= +std::optional & rmw_context_impl_s::shm_manager() +{ + std::lock_guard lock(data_->mutex_); + return data_->shm_manager_; +} + +///============================================================================= +rmw_guard_condition_t * rmw_context_impl_s::graph_guard_condition() +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_guard_condition_.get(); +} + +///============================================================================= +size_t rmw_context_impl_s::get_next_entity_id() +{ + std::lock_guard lock(data_->mutex_); + return data_->next_entity_id_++; +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::shutdown() +{ + return data_->shutdown(); +} + +///============================================================================= +bool rmw_context_impl_s::is_shutdown() const +{ + std::lock_guard lock(data_->mutex_); + return data_->is_shutdown_; +} + +///============================================================================= +bool rmw_context_impl_s::session_is_valid() const +{ + std::lock_guard lock(data_->mutex_); + return z_check(data_->session_); +} + +///============================================================================= +std::shared_ptr rmw_context_impl_s::graph_cache() +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_; +} diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp new file mode 100644 index 00000000..3cb6f474 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -0,0 +1,138 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__RMW_CONTEXT_IMPL_S_HPP_ +#define DETAIL__RMW_CONTEXT_IMPL_S_HPP_ + +#include + +# include +#include +#include +#include +#include + +#include "graph_cache.hpp" +#include "guard_condition.hpp" +#include "liveliness_utils.hpp" + +#include "rcutils/types.h" +#include "rmw/rmw.h" + +///============================================================================= +class rmw_context_impl_s final +{ +public: + // Constructor that internally initializees the Zenoh session and other artifacts. + // Throws an std::runtime_error if any of the initializations fail. + // The construction will block until a Zenoh router is detected. + // TODO(Yadunund): Make this a non-blocking call by checking for the Zenoh + // router in a separate thread. Instead block when creating a node if router + // check has not succeeded. + rmw_context_impl_s( + const std::size_t domain_id, + const std::string & enclave); + + // Get a copy of the enclave. + std::string enclave() const; + + // Loan the Zenoh session. + // TODO(Yadunund): Remove this API once rmw_context_impl_s is updated to + // create other Zenoh objects. + z_session_t session() const; + + // Get a reference to the shm_manager. + // Note: This is not thread-safe. + // TODO(Yadunund): Remove this API and instead include a publish() API + // that handles the shm_manager once the context manages publishers. + std::optional & shm_manager(); + + // Get the graph guard condition. + rmw_guard_condition_t * graph_guard_condition(); + + // Get a unique id for a new entity. + size_t get_next_entity_id(); + + // Shutdown the Zenoh session. + rmw_ret_t shutdown(); + + // Check if the Zenoh session is shutdown. + bool is_shutdown() const; + + // Returns true if the Zenoh session is valid. + bool session_is_valid() const; + + /// Return a shared_ptr to the GraphCache stored in this context. + std::shared_ptr graph_cache(); + +private: + // Bundle all class members into a data struct which can be passed as a + // weak ptr to various threads for thread-safe access without capturing + // "this" ptr by reference. + struct Data : public std::enable_shared_from_this + { + // Constructor. + Data( + const std::string & enclave, + z_owned_session_t session, + std::optional shm_manager, + const std::string & liveliness_str, + std::shared_ptr graph_cache); + + // Subscribe to the ROS graph. + rmw_ret_t subscribe_to_ros_graph(); + + // Shutdown the Zenoh session. + rmw_ret_t shutdown(); + + // Destructor. + ~Data(); + + // Mutex to lock when accessing members. + mutable std::mutex mutex_; + // RMW allocator. + const rcutils_allocator_t * allocator_; + // Enclave, name used to find security artifacts in a sros2 keystore. + std::string enclave_; + // An owned session. + z_owned_session_t session_; + // An optional SHM manager that is initialized of SHM is enabled in the + // zenoh session config. + std::optional shm_manager_; + // Liveliness keyexpr string to subscribe to for ROS graph changes. + std::string liveliness_str_; + // Graph cache. + std::shared_ptr graph_cache_; + // ROS graph liveliness subscriber. + z_owned_subscriber_t graph_subscriber_; + // Equivalent to rmw_dds_common::Context's guard condition. + // Guard condition that should be triggered when the graph changes. + std::unique_ptr graph_guard_condition_; + // The GuardCondition data structure. + rmw_zenoh_cpp::GuardCondition guard_condition_data_; + // Shutdown flag. + bool is_shutdown_; + // A counter to assign a local id for every entity created in this session. + size_t next_entity_id_; + // True once graph subscriber is initialized. + bool is_initialized_; + }; + + std::shared_ptr data_{nullptr}; + + static void graph_sub_data_handler(const z_sample_t * sample, void * data); +}; + + +#endif // DETAIL__RMW_CONTEXT_IMPL_S_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 51f84b33..5beb6edd 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include - #include #include #include @@ -54,12 +52,6 @@ size_t hash_gid(const rmw_request_id_t & request_id) } } // namespace -///============================================================================= -size_t rmw_context_impl_s::get_next_entity_id() -{ - return next_entity_id_++; -} - namespace rmw_zenoh_cpp { ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 60e4bd01..5ed480e7 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -42,38 +42,6 @@ /// Structs for various type erased data fields. -///============================================================================= -class rmw_context_impl_s final -{ -public: - // Enclave, name used to find security artifacts in a sros2 keystore. - char * enclave; - - // An owned session. - z_owned_session_t session; - - // An optional SHM manager that is initialized of SHM is enabled in the - // zenoh session config. - std::optional shm_manager; - - z_owned_subscriber_t graph_subscriber; - - /// Shutdown flag. - bool is_shutdown; - - // Equivalent to rmw_dds_common::Context's guard condition - /// Guard condition that should be triggered when the graph changes. - rmw_guard_condition_t * graph_guard_condition; - - std::unique_ptr graph_cache; - - size_t get_next_entity_id(); - -private: - // A counter to assign a local id for every entity created in this session. - size_t next_entity_id_{0}; -}; - namespace rmw_zenoh_cpp { ///============================================================================= diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index 22922d05..3e350542 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -20,6 +20,7 @@ #include "detail/event.hpp" #include "detail/graph_cache.hpp" #include "detail/identifier.hpp" +#include "detail/rmw_context_impl_s.hpp" #include "detail/rmw_data_types.hpp" @@ -41,6 +42,8 @@ rmw_publisher_event_init( static_cast(publisher->data); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data->context, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_t * context_impl = static_cast(pub_data->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data->entity, RMW_RET_INVALID_ARGUMENT); if (publisher->implementation_identifier != rmw_zenoh_cpp::rmw_zenoh_identifier) { @@ -61,7 +64,7 @@ rmw_publisher_event_init( rmw_event->event_type = event_type; // Register the event with graph cache. - pub_data->context->impl->graph_cache->set_qos_event_callback( + context_impl->graph_cache()->set_qos_event_callback( pub_data->entity, zenoh_event_type, [pub_data, @@ -95,6 +98,8 @@ rmw_subscription_event_init( static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data->context, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_t * context_impl = static_cast(sub_data->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data->entity, RMW_RET_INVALID_ARGUMENT); if (subscription->implementation_identifier != rmw_zenoh_cpp::rmw_zenoh_identifier) { @@ -121,7 +126,7 @@ rmw_subscription_event_init( return RMW_RET_OK; } - sub_data->context->impl->graph_cache->set_qos_event_callback( + context_impl->graph_cache()->set_qos_event_callback( sub_data->entity, zenoh_event_type, [sub_data, diff --git a/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp index c42295b3..9e838aec 100644 --- a/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp @@ -14,7 +14,7 @@ #include "detail/identifier.hpp" #include "detail/liveliness_utils.hpp" -#include "detail/rmw_data_types.hpp" +#include "detail/rmw_context_impl_s.hpp" #include "rcutils/allocator.h" @@ -43,7 +43,9 @@ rmw_get_subscriber_names_and_types_by_node( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_entity_names_and_types_by_node( + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + return context_impl->graph_cache()->get_entity_names_and_types_by_node( rmw_zenoh_cpp::liveliness::EntityType::Subscription, allocator, node_name, @@ -71,7 +73,9 @@ rmw_get_publisher_names_and_types_by_node( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_entity_names_and_types_by_node( + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + return context_impl->graph_cache()->get_entity_names_and_types_by_node( rmw_zenoh_cpp::liveliness::EntityType::Publisher, allocator, node_name, @@ -98,7 +102,9 @@ rmw_get_service_names_and_types_by_node( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_entity_names_and_types_by_node( + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + return context_impl->graph_cache()->get_entity_names_and_types_by_node( rmw_zenoh_cpp::liveliness::EntityType::Service, allocator, node_name, @@ -125,7 +131,9 @@ rmw_get_client_names_and_types_by_node( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_entity_names_and_types_by_node( + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + return context_impl->graph_cache()->get_entity_names_and_types_by_node( rmw_zenoh_cpp::liveliness::EntityType::Client, allocator, node_name, diff --git a/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp index 12415ebe..681b8ceb 100644 --- a/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "detail/rmw_data_types.hpp" +#include "detail/rmw_context_impl_s.hpp" #include "rcutils/allocator.h" @@ -35,8 +35,10 @@ rmw_get_service_names_and_types( RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(service_names_and_types, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_service_names_and_types( + return context_impl->graph_cache()->get_service_names_and_types( allocator, service_names_and_types); } } // extern "C" diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp index a4c4878d..3d7570cc 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp @@ -15,7 +15,7 @@ #include "detail/identifier.hpp" #include "detail/liveliness_utils.hpp" -#include "detail/rmw_data_types.hpp" +#include "detail/rmw_context_impl_s.hpp" #include "rcutils/allocator.h" @@ -43,7 +43,9 @@ rmw_get_publishers_info_by_topic( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_entities_info_by_topic( + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + return context_impl->graph_cache()->get_entities_info_by_topic( rmw_zenoh_cpp::liveliness::EntityType::Publisher, allocator, topic_name, @@ -69,7 +71,9 @@ rmw_get_subscriptions_info_by_topic( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_entities_info_by_topic( + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + return context_impl->graph_cache()->get_entities_info_by_topic( rmw_zenoh_cpp::liveliness::EntityType::Subscription, allocator, topic_name, diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp index db7d972c..9a3fb6c2 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "detail/rmw_data_types.hpp" +#include "detail/rmw_context_impl_s.hpp" #include "rcutils/allocator.h" @@ -37,7 +37,9 @@ rmw_get_topic_names_and_types( RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(topic_names_and_types, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_topic_names_and_types( + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + return context_impl->graph_cache()->get_topic_names_and_types( allocator, no_demangle, topic_names_and_types); } } // extern "C" diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 92d913f3..4c6b2f20 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -21,9 +21,9 @@ #include "detail/guard_condition.hpp" #include "detail/identifier.hpp" #include "detail/liveliness_utils.hpp" +#include "detail/rmw_context_impl_s.hpp" #include "detail/rmw_data_types.hpp" #include "detail/zenoh_config.hpp" -#include "detail/zenoh_router_check.hpp" #include "rcutils/env.h" #include "detail/logging_macros.hpp" @@ -38,55 +38,6 @@ extern "C" { -// Megabytes of SHM to reserve. -// TODO(clalancette): Make this configurable, or get it from the configuration -#define SHM_BUFFER_SIZE_MB 10 - -namespace -{ -void -graph_sub_data_handler(const z_sample_t * sample, void * data) -{ - static_cast(data); - - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - auto free_keystr = rcpputils::make_scope_exit( - [&keystr]() { - z_drop(z_move(keystr)); - }); - - // Get the context impl from data. - rmw_context_impl_s * context_impl = static_cast( - data); - if (context_impl == nullptr) { - RMW_ZENOH_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "[graph_sub_data_handler] Unable to convert data into context_impl" - ); - return; - } - - switch (sample->kind) { - case z_sample_kind_t::Z_SAMPLE_KIND_PUT: - context_impl->graph_cache->parse_put(keystr._cstr); - break; - case z_sample_kind_t::Z_SAMPLE_KIND_DELETE: - context_impl->graph_cache->parse_del(keystr._cstr); - break; - default: - return; - } - - rmw_ret_t rmw_ret = rmw_trigger_guard_condition(context_impl->graph_guard_condition); - if (RMW_RET_OK != rmw_ret) { - RMW_ZENOH_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "[graph_sub_data_handler] Unable to trigger graph guard condition" - ); - } -} -} // namespace - //============================================================================== /// Initialize the middleware with the given options, and yielding an context. rmw_ret_t @@ -123,24 +74,6 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) const rcutils_allocator_t * allocator = &options->allocator; - context->impl = static_cast( - allocator->zero_allocate(1, sizeof(rmw_context_impl_t), allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( - context->impl, - "failed to allocate context impl", - return RMW_RET_BAD_ALLOC); - auto free_impl = rcpputils::make_scope_exit( - [context, allocator]() { - allocator->deallocate(context->impl, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW(context->impl, context->impl, return RMW_RET_BAD_ALLOC, rmw_context_impl_t); - auto impl_destructor = rcpputils::make_scope_exit( - [context] { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - context->impl->~rmw_context_impl_t(), rmw_context_impl_t *); - }); - rmw_ret_t ret; if ((ret = rmw_init_options_copy(options, &context->options)) != RMW_RET_OK) { // error already set @@ -154,20 +87,6 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) } }); - // Set the enclave. - context->impl->enclave = rcutils_strdup(options->enclave, *allocator); - RMW_CHECK_FOR_NULL_WITH_MSG( - context->impl->enclave, - "failed to allocate enclave", - return RMW_RET_BAD_ALLOC); - auto free_enclave = rcpputils::make_scope_exit( - [context, allocator]() { - allocator->deallocate(context->impl->enclave, allocator->state); - }); - - // Initialize context's implementation - context->impl->is_shutdown = false; - // If not already defined, set the logging environment variable for Zenoh sessions // to warning level by default. // TODO(Yadunund): Switch to rcutils_get_env once it supports not overwriting values. @@ -176,216 +95,29 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) return RMW_RET_ERROR; } - // Initialize the zenoh configuration. - z_owned_config_t config; - if ((ret = - rmw_zenoh_cpp::get_z_config( - rmw_zenoh_cpp::ConfigurableEntity::Session, - &config)) != RMW_RET_OK) - { - RMW_SET_ERROR_MSG("Error configuring Zenoh session."); - return ret; - } - - // Check if shm is enabled. - z_owned_str_t shm_enabled = zc_config_get(z_loan(config), "transport/shared_memory/enabled"); - auto free_shm_enabled = rcpputils::make_scope_exit( - [&shm_enabled]() { - z_drop(z_move(shm_enabled)); - }); - - // Initialize the zenoh session. - context->impl->session = z_open(z_move(config)); - if (!z_session_check(&context->impl->session)) { - RMW_SET_ERROR_MSG("Error setting up zenoh session"); - return RMW_RET_ERROR; - } - auto close_session = rcpputils::make_scope_exit( - [context]() { - z_close(z_move(context->impl->session)); - }); - - /// Initialize the graph cache. - z_id_t zid = z_info_zid(z_loan(context->impl->session)); - context->impl->graph_cache = std::make_unique(zid); - - // Verify if the zenoh router is running if configured. - const std::optional configured_connection_attempts = - rmw_zenoh_cpp::zenoh_router_check_attempts(); - if (configured_connection_attempts.has_value()) { - ret = RMW_RET_ERROR; - uint64_t connection_attempts = 0; - // Retry until the connection is successful. - while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) { - if ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(context->impl->session))) != RMW_RET_OK) { - ++connection_attempts; - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - if (ret != RMW_RET_OK) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "Unable to connect to a Zenoh router after %zu retries.", - configured_connection_attempts.value()); - return ret; - } - } - - // Initialize the shm manager if shared_memory is enabled in the config. - if (shm_enabled._cstr != nullptr && - strcmp(shm_enabled._cstr, "true") == 0) - { - char idstr[sizeof(zid.id) * 2 + 1]; // 2 bytes for each byte of the id, plus the trailing \0 - static constexpr size_t max_size_of_each = 3; // 2 for each byte, plus the trailing \0 - for (size_t i = 0; i < sizeof(zid.id); ++i) { - snprintf(idstr + 2 * i, max_size_of_each, "%02x", zid.id[i]); - } - idstr[sizeof(zid.id) * 2] = '\0'; - // TODO(yadunund): Can we get the size of the shm from the config even though it's not - // a standard parameter? - context->impl->shm_manager = - zc_shm_manager_new( - z_loan(context->impl->session), - idstr, - SHM_BUFFER_SIZE_MB * 1024 * 1024); - if (!context->impl->shm_manager.has_value() || - !zc_shm_manager_check(&context->impl->shm_manager.value())) - { - RMW_SET_ERROR_MSG("Unable to create shm manager."); - return RMW_RET_ERROR; - } - } - auto free_shm_manager = rcpputils::make_scope_exit( - [context]() { - if (context->impl->shm_manager.has_value()) { - z_drop(z_move(context->impl->shm_manager.value())); - } - }); - - // Initialize the guard condition. - context->impl->graph_guard_condition = - static_cast(allocator->zero_allocate( - 1, sizeof(rmw_guard_condition_t), allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( - context->impl->graph_guard_condition, - "failed to allocate graph guard condition", - return RMW_RET_BAD_ALLOC); - auto free_guard_condition = rcpputils::make_scope_exit( - [context, allocator]() { - allocator->deallocate(context->impl->graph_guard_condition, allocator->state); - }); - - context->impl->graph_guard_condition->implementation_identifier = - rmw_zenoh_cpp::rmw_zenoh_identifier; - - context->impl->graph_guard_condition->data = - allocator->zero_allocate(1, sizeof(rmw_zenoh_cpp::GuardCondition), allocator->state); + // Create the context impl. + context->impl = static_cast( + allocator->zero_allocate(1, sizeof(rmw_context_impl_t), allocator->state)); RMW_CHECK_FOR_NULL_WITH_MSG( - context->impl->graph_guard_condition->data, - "failed to allocate graph guard condition data", + context->impl, + "failed to allocate context impl", return RMW_RET_BAD_ALLOC); - auto free_guard_condition_data = rcpputils::make_scope_exit( + auto free_impl = rcpputils::make_scope_exit( [context, allocator]() { - allocator->deallocate(context->impl->graph_guard_condition->data, allocator->state); + allocator->deallocate(context->impl, allocator->state); }); RMW_TRY_PLACEMENT_NEW( - context->impl->graph_guard_condition->data, - context->impl->graph_guard_condition->data, + context->impl, + context->impl, return RMW_RET_BAD_ALLOC, - rmw_zenoh_cpp::GuardCondition); - auto destruct_guard_condition_data = rcpputils::make_scope_exit( - [context]() { - auto gc_data = - static_cast(context->impl->graph_guard_condition->data); - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - gc_data->~GuardCondition(), - rmw_zenoh_cpp::GuardCondition); - }); + rmw_context_impl_t, + context->actual_domain_id, + std::string(options->enclave) + ); - // Setup liveliness subscriptions for discovery. - const std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token( - context->actual_domain_id); - - // Query router/liveliness participants to get graph information before this session was started. - - // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive - // replies for the zc_liveliness_get() call. This is necessary as if the `bound` - // is too low, the channel may starve the zenoh executor of its threads which - // would lead to deadlocks when trying to receive replies and block the - // execution here. - // The blocking channel will return when the sender end is closed which is - // the moment the query finishes. - // The non-blocking fifo exists only for the use case where we don't want to - // block the thread between responses (including the request termination response). - // In general, unless we want to cooperatively schedule other tasks on the same - // thread as reading the fifo, the blocking fifo will be more appropriate as - // the code will be simpler, and if we're just going to spin over the non-blocking - // reads until we obtain responses, we'll just be hogging CPU time by convincing - // the OS that we're doing actual work when it could instead park the thread. - z_owned_reply_channel_t channel = zc_reply_fifo_new(0); - zc_liveliness_get( - z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), - z_move(channel.send), NULL); - z_owned_reply_t reply = z_reply_null(); - for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); - call_success = z_call(channel.recv, &reply)) - { - if (!call_success) { - continue; - } - if (z_reply_is_ok(&reply)) { - z_sample_t sample = z_reply_ok(&reply); - z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); - // Ignore tokens from the same session to avoid race conditions from this - // query and the liveliness subscription. - context->impl->graph_cache->parse_put(z_loan(keystr), true); - z_drop(z_move(keystr)); - } else { - printf("[discovery] Received an error\n"); - } - } - z_drop(z_move(reply)); - z_drop(z_move(channel)); - - // TODO(Yadunund): Switch this to a liveliness subscriptions once the API is available. - - // Uncomment and rely on #if #endif blocks to enable this feature when building with - // zenoh-pico since liveliness is only available in zenoh-c. - // auto sub_options = z_subscriber_options_default(); - // sub_options.reliability = Z_RELIABILITY_RELIABLE; - // context->impl->graph_subscriber = z_declare_subscriber( - // z_loan(context->impl->session), - // z_keyexpr(liveliness_str.c_str()), - // z_move(callback), - // &sub_options); - auto sub_options = zc_liveliness_subscriber_options_null(); - z_owned_closure_sample_t callback = z_closure(graph_sub_data_handler, nullptr, context->impl); - context->impl->graph_subscriber = zc_liveliness_declare_subscriber( - z_loan(context->impl->session), - z_keyexpr(liveliness_str.c_str()), - z_move(callback), - &sub_options); - zc_liveliness_subscriber_options_drop(z_move(sub_options)); - auto undeclare_z_sub = rcpputils::make_scope_exit( - [context]() { - z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); - }); - if (!z_check(context->impl->graph_subscriber)) { - RMW_SET_ERROR_MSG("unable to create zenoh subscription"); - return RMW_RET_ERROR; - } - - undeclare_z_sub.cancel(); - close_session.cancel(); - destruct_guard_condition_data.cancel(); - impl_destructor.cancel(); - free_guard_condition_data.cancel(); - free_guard_condition.cancel(); - free_enclave.cancel(); free_options.cancel(); - impl_destructor.cancel(); free_impl.cancel(); - free_shm_manager.cancel(); restore_context.cancel(); return RMW_RET_OK; @@ -407,19 +139,10 @@ rmw_shutdown(rmw_context_t * context) rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); - if (context->impl->shm_manager.has_value()) { - z_drop(z_move(context->impl->shm_manager.value())); - } - // Close the zenoh session - if (z_close(z_move(context->impl->session)) < 0) { - RMW_SET_ERROR_MSG("Error while closing zenoh session"); - return RMW_RET_ERROR; - } - - context->impl->is_shutdown = true; + rmw_context_impl_t * context_impl = static_cast(context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return RMW_RET_OK; + return context_impl->shutdown(); } //============================================================================== @@ -437,24 +160,13 @@ rmw_context_fini(rmw_context_t * context) context->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - if (!context->impl->is_shutdown) { + if (!context->impl->is_shutdown()) { RCUTILS_SET_ERROR_MSG("context has not been shutdown"); return RMW_RET_INVALID_ARGUMENT; } const rcutils_allocator_t * allocator = &context->options.allocator; - RMW_TRY_DESTRUCTOR( - static_cast( - context->impl->graph_guard_condition->data)->~GuardCondition(), - rmw_zenoh_cpp::GuardCondition, ); - allocator->deallocate(context->impl->graph_guard_condition->data, allocator->state); - - allocator->deallocate(context->impl->graph_guard_condition, allocator->state); - context->impl->graph_guard_condition = nullptr; - - allocator->deallocate(context->impl->enclave, allocator->state); - RMW_TRY_DESTRUCTOR(context->impl->~rmw_context_impl_t(), rmw_context_impl_t *, ); allocator->deallocate(context->impl, allocator->state); diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 4444fd70..7bffef42 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -36,6 +36,7 @@ #include "detail/logging_macros.hpp" #include "detail/message_type_support.hpp" #include "detail/qos.hpp" +#include "detail/rmw_context_impl_s.hpp" #include "detail/rmw_data_types.hpp" #include "detail/serialization_format.hpp" #include "detail/type_support_common.hpp" @@ -166,11 +167,7 @@ rmw_create_node( context->impl, "expected initialized context", return nullptr); - RMW_CHECK_FOR_NULL_WITH_MSG( - context->impl->enclave, - "expected initialized enclave", - return nullptr); - if (context->impl->is_shutdown) { + if (context->impl->is_shutdown()) { RCUTILS_SET_ERROR_MSG("context has been shutdown"); return nullptr; } @@ -253,13 +250,14 @@ rmw_create_node( // Initialize liveliness token for the node to advertise that a new node is in town. node_data->id = context->impl->get_next_entity_id(); + z_session_t session = context->impl->session(); node_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(z_loan(context->impl->session)), + z_info_zid(session), std::to_string(node_data->id), std::to_string(node_data->id), rmw_zenoh_cpp::liveliness::EntityType::Node, rmw_zenoh_cpp::liveliness::NodeInfo{context->actual_domain_id, namespace_, name, - context->impl->enclave}); + context->impl->enclave()}); if (node_data->entity == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -268,7 +266,7 @@ rmw_create_node( return nullptr; } node_data->token = zc_liveliness_declare_token( - z_loan(context->impl->session), + session, z_keyexpr(node_data->entity->liveliness_keyexpr().c_str()), NULL ); @@ -345,7 +343,7 @@ rmw_node_get_graph_guard_condition(const rmw_node_t * node) RMW_CHECK_ARGUMENT_FOR_NULL(node->context, nullptr); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, nullptr); - return node->context->impl->graph_guard_condition; + return node->context->impl->graph_guard_condition(); } //============================================================================== @@ -458,11 +456,7 @@ rmw_create_publisher( context_impl, "unable to get rmw_context_impl_s", return nullptr); - RMW_CHECK_FOR_NULL_WITH_MSG( - context_impl->enclave, - "expected initialized enclave", - return nullptr); - if (!z_check(context_impl->session)) { + if (!context_impl->session_is_valid()) { RMW_SET_ERROR_MSG("zenoh session is invalid"); return nullptr; } @@ -571,7 +565,8 @@ rmw_create_publisher( allocator->deallocate(type_hash_c_str, allocator->state); }); - const z_id_t zid = z_info_zid(z_loan(node->context->impl->session)); + z_session_t session = context_impl->session(); + const z_id_t zid = z_info_zid(session); publisher_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( zid, @@ -580,7 +575,7 @@ rmw_create_publisher( context_impl->get_next_entity_id()), rmw_zenoh_cpp::liveliness::EntityType::Publisher, rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave}, + node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave()}, rmw_zenoh_cpp::liveliness::TopicInfo{ node->context->actual_domain_id, rmw_publisher->topic_name, @@ -624,7 +619,7 @@ rmw_create_publisher( }); pub_cache_opts.queryable_prefix = z_loan(queryable_prefix); publisher_data->pub_cache = ze_declare_publication_cache( - z_loan(context_impl->session), + session, z_loan(keyexpr), &pub_cache_opts ); @@ -650,7 +645,7 @@ rmw_create_publisher( } // TODO(clalancette): What happens if the key name is a valid but empty string? publisher_data->pub = z_declare_publisher( - z_loan(context_impl->session), + session, z_loan(keyexpr), &opts ); @@ -664,7 +659,7 @@ rmw_create_publisher( }); publisher_data->token = zc_liveliness_declare_token( - z_loan(node->context->impl->session), + session, z_keyexpr(publisher_data->entity->liveliness_keyexpr().c_str()), NULL ); @@ -735,7 +730,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) } // Remove any event callbacks registered to this publisher. - context_impl->graph_cache->remove_qos_event_callbacks(publisher_data->entity); + context_impl->graph_cache()->remove_qos_event_callbacks(publisher_data->entity); RMW_TRY_DESTRUCTOR(publisher_data->~rmw_publisher_data_t(), rmw_publisher_data_t, ); allocator->deallocate(publisher_data, allocator->state); @@ -910,15 +905,15 @@ rmw_publish( }); // Get memory from SHM buffer if available. - if (publisher_data->context->impl->shm_manager.has_value() && - zc_shm_manager_check(&publisher_data->context->impl->shm_manager.value())) + if (publisher_data->context->impl->shm_manager().has_value() && + zc_shm_manager_check(&publisher_data->context->impl->shm_manager().value())) { shmbuf = zc_shm_alloc( - &publisher_data->context->impl->shm_manager.value(), + &publisher_data->context->impl->shm_manager().value(), max_data_length); if (!z_check(shmbuf.value())) { - zc_shm_gc(&publisher_data->context->impl->shm_manager.value()); - shmbuf = zc_shm_alloc(&publisher_data->context->impl->shm_manager.value(), max_data_length); + zc_shm_gc(&publisher_data->context->impl->shm_manager().value()); + shmbuf = zc_shm_alloc(&publisher_data->context->impl->shm_manager().value(), max_data_length); if (!z_check(shmbuf.value())) { // TODO(Yadunund): Should we revert to regular allocation and not return an error? RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing"); @@ -1026,7 +1021,7 @@ rmw_publisher_count_matched_subscriptions( rmw_context_impl_t * context_impl = static_cast(pub_data->context->impl); RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return context_impl->graph_cache->publisher_count_matched_subscriptions( + return context_impl->graph_cache()->publisher_count_matched_subscriptions( publisher, subscription_count); } @@ -1310,11 +1305,7 @@ rmw_create_subscription( context_impl, "unable to get rmw_context_impl_s", return nullptr); - RMW_CHECK_FOR_NULL_WITH_MSG( - context_impl->enclave, - "expected initialized enclave", - return nullptr); - if (!z_check(context_impl->session)) { + if (!context_impl->session_is_valid()) { RMW_SET_ERROR_MSG("zenoh session is invalid"); return nullptr; } @@ -1423,16 +1414,18 @@ rmw_create_subscription( allocator->deallocate(type_hash_c_str, allocator->state); }); + z_session_t session = context_impl->session(); + // Everything above succeeded and is setup properly. Now declare a subscriber // with Zenoh; after this, callbacks may come in at any time. sub_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(z_loan(node->context->impl->session)), + z_info_zid(session), std::to_string(node_data->id), std::to_string( context_impl->get_next_entity_id()), rmw_zenoh_cpp::liveliness::EntityType::Subscription, rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave}, + node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave()}, rmw_zenoh_cpp::liveliness::TopicInfo{ node->context->actual_domain_id, rmw_subscription->topic_name, @@ -1489,7 +1482,7 @@ rmw_create_subscription( sub_options.reliability = Z_RELIABILITY_RELIABLE; } sub_data->sub = ze_declare_querying_subscriber( - z_loan(context_impl->session), + session, z_loan(keyexpr), z_move(callback), &sub_options @@ -1500,7 +1493,7 @@ rmw_create_subscription( } // Register the querying subscriber with the graph cache to get latest // messages from publishers that were discovered after their first publication. - context_impl->graph_cache->set_querying_subscriber_callback( + context_impl->graph_cache()->set_querying_subscriber_callback( sub_data, [sub_data](const std::string & queryable_prefix) -> void { @@ -1533,7 +1526,7 @@ rmw_create_subscription( sub_options.reliability = Z_RELIABILITY_RELIABLE; } sub_data->sub = z_declare_subscriber( - z_loan(context_impl->session), + session, z_loan(keyexpr), z_move(callback), &sub_options @@ -1560,7 +1553,7 @@ rmw_create_subscription( // Publish to the graph that a new subscription is in town. sub_data->token = zc_liveliness_declare_token( - z_loan(context_impl->session), + session, z_keyexpr(sub_data->entity->liveliness_keyexpr().c_str()), NULL ); @@ -1639,11 +1632,11 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) ret = RMW_RET_ERROR; } // Also remove the registered callback from the GraphCache. - context_impl->graph_cache->remove_querying_subscriber_callback(sub_data); + context_impl->graph_cache()->remove_querying_subscriber_callback(sub_data); } // Remove any event callbacks registered to this subscription. - context_impl->graph_cache->remove_qos_event_callbacks(sub_data->entity); + context_impl->graph_cache()->remove_qos_event_callbacks(sub_data->entity); RMW_TRY_DESTRUCTOR(sub_data->~rmw_subscription_data_t(), rmw_subscription_data_t, ); allocator->deallocate(sub_data, allocator->state); @@ -1677,7 +1670,7 @@ rmw_subscription_count_matched_publishers( rmw_context_impl_t * context_impl = static_cast(sub_data->context->impl); RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return context_impl->graph_cache->subscription_count_matched_publishers( + return context_impl->graph_cache()->subscription_count_matched_publishers( subscription, publisher_count); } @@ -1894,7 +1887,7 @@ rmw_take_sequence( auto sub_data = static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - if (sub_data->context->impl->is_shutdown) { + if (sub_data->context->impl->is_shutdown()) { return RMW_RET_OK; } @@ -1954,7 +1947,7 @@ __rmw_take_serialized( auto sub_data = static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - if (sub_data->context->impl->is_shutdown) { + if (sub_data->context->impl->is_shutdown()) { return RMW_RET_OK; } @@ -2109,11 +2102,7 @@ rmw_create_client( context_impl, "unable to get rmw_context_impl_s", return nullptr); - RMW_CHECK_FOR_NULL_WITH_MSG( - context_impl->enclave, - "expected initialized enclave", - return nullptr); - if (!z_check(context_impl->session)) { + if (!context_impl->session_is_valid()) { RMW_SET_ERROR_MSG("zenoh session is invalid"); return nullptr; } @@ -2298,14 +2287,15 @@ rmw_create_client( allocator->deallocate(type_hash_c_str, allocator->state); }); + z_session_t session = context_impl->session(); client_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(z_loan(node->context->impl->session)), + z_info_zid(session), std::to_string(node_data->id), std::to_string( context_impl->get_next_entity_id()), rmw_zenoh_cpp::liveliness::EntityType::Client, rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave}, + node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave()}, rmw_zenoh_cpp::liveliness::TopicInfo{ node->context->actual_domain_id, rmw_client->service_name, @@ -2332,7 +2322,7 @@ rmw_create_client( } client_data->token = zc_liveliness_declare_token( - z_loan(node->context->impl->session), + session, z_keyexpr(client_data->entity->liveliness_keyexpr().c_str()), NULL ); @@ -2523,7 +2513,7 @@ rmw_send_request( z_owned_closure_reply_t zn_closure_reply = z_closure(rmw_zenoh_cpp::client_data_handler, rmw_zenoh_cpp::client_data_drop, client_data); z_get( - z_loan(context_impl->session), + context_impl->session(), z_loan(client_data->keyexpr), "", z_move(zn_closure_reply), &opts); @@ -2712,11 +2702,7 @@ rmw_create_service( context_impl, "unable to get rmw_context_impl_s", return nullptr); - RMW_CHECK_FOR_NULL_WITH_MSG( - context_impl->enclave, - "expected initialized enclave", - return nullptr); - if (!z_check(context_impl->session)) { + if (!context_impl->session_is_valid()) { RMW_SET_ERROR_MSG("zenoh session is invalid"); return nullptr; } @@ -2875,14 +2861,16 @@ rmw_create_service( allocator->deallocate(type_hash_c_str, allocator->state); }); + z_session_t session = context_impl->session(); + service_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(z_loan(node->context->impl->session)), + z_info_zid(session), std::to_string(node_data->id), std::to_string( context_impl->get_next_entity_id()), rmw_zenoh_cpp::liveliness::EntityType::Service, rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave}, + node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave()}, rmw_zenoh_cpp::liveliness::TopicInfo{ node->context->actual_domain_id, rmw_service->service_name, @@ -2916,7 +2904,7 @@ rmw_create_service( z_queryable_options_t qable_options = z_queryable_options_default(); qable_options.complete = true; service_data->qable = z_declare_queryable( - z_loan(context_impl->session), + session, z_loan(service_data->keyexpr), z_move(callback), &qable_options); @@ -2931,7 +2919,7 @@ rmw_create_service( }); service_data->token = zc_liveliness_declare_token( - z_loan(node->context->impl->session), + session, z_keyexpr(service_data->entity->liveliness_keyexpr().c_str()), NULL ); @@ -3682,7 +3670,7 @@ rmw_get_node_names( rcutils_allocator_t * allocator = &node->context->options.allocator; RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_node_names( + return node->context->impl->graph_cache()->get_node_names( node_names, node_namespaces, nullptr, allocator); } @@ -3705,7 +3693,7 @@ rmw_get_node_names_with_enclaves( rcutils_allocator_t * allocator = &node->context->options.allocator; RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_node_names( + return node->context->impl->graph_cache()->get_node_names( node_names, node_namespaces, enclaves, allocator); } @@ -3736,7 +3724,7 @@ rmw_count_publishers( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->count_publishers(topic_name, count); + return node->context->impl->graph_cache()->count_publishers(topic_name, count); } //============================================================================== @@ -3766,7 +3754,7 @@ rmw_count_subscribers( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->count_subscriptions(topic_name, count); + return node->context->impl->graph_cache()->count_subscriptions(topic_name, count); } //============================================================================== @@ -3796,7 +3784,7 @@ rmw_count_clients( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->count_clients(service_name, count); + return node->context->impl->graph_cache()->count_clients(service_name, count); } //============================================================================== @@ -3826,7 +3814,7 @@ rmw_count_services( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->count_services(service_name, count); + return node->context->impl->graph_cache()->count_services(service_name, count); } //============================================================================== @@ -3926,7 +3914,7 @@ rmw_service_server_is_available( return RMW_RET_INVALID_ARGUMENT; } - return node->context->impl->graph_cache->service_server_is_available( + return node->context->impl->graph_cache()->service_server_is_available( client->service_name, service_type.c_str(), is_available); }