From 21785f9b59828ff121924bc95825384f484b96a1 Mon Sep 17 00:00:00 2001 From: Franco Cipollone <53065142+francocipollone@users.noreply.github.com> Date: Fri, 12 Jan 2024 00:32:47 -0300 Subject: [PATCH] Adds base support for services. (#86) * Adds base support for services. Signed-off-by: Franco Cipollone * Addresses Yadu's comments. Signed-off-by: Franco Cipollone * Addresses Yadu's comments. Signed-off-by: Franco Cipollone * Fixes memory leak. Signed-off-by: Franco Cipollone * Removes unnecessary declaration. Signed-off-by: Franco Cipollone * Cleanup services implementation (#88) * Rely on channels for sending requests Signed-off-by: Yadunund * Revert to callback for client with fixes Signed-off-by: Yadunund * Cleanup service cb Signed-off-by: Yadunund * Style Signed-off-by: Yadunund * Cleanup comments Signed-off-by: Yadunund --------- Signed-off-by: Yadunund * Use anynomous space instead of static functions. Signed-off-by: Franco Cipollone * Fix style. Signed-off-by: Franco Cipollone * Use zero_allocate where needed. Signed-off-by: Franco Cipollone * Use a scope_exit to drop the keystr. This just makes sure we always clean it up. Signed-off-by: Chris Lalancette * Rename find_type_support to find_message_type_support. Signed-off-by: Chris Lalancette * Add error checking into ros_topic_name_to_zenoh_key Signed-off-by: Chris Lalancette * Make sure to always free response_bytes. Signed-off-by: Chris Lalancette * Remove unnecessary TODO comment. Signed-off-by: Chris Lalancette * Remember to free request_bytes. Signed-off-by: Chris Lalancette * Small change to take requests from the front of the deque. Signed-off-by: Chris Lalancette * Initial work for attachments and sequence numbers. Signed-off-by: Chris Lalancette * Add in error checking for getting attachments. Signed-off-by: Chris Lalancette * More error checking for attachments. Signed-off-by: Chris Lalancette * Further cleanup. Signed-off-by: Chris Lalancette * Remove unnecessary (and incorrect) copy of sequence_number Signed-off-by: Chris Lalancette * Style Signed-off-by: Yadunund --------- Signed-off-by: Franco Cipollone Signed-off-by: Yadunund Signed-off-by: Chris Lalancette Co-authored-by: Yadu Co-authored-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 3 +- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 89 +- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 70 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 1314 ++++++++++++++++--- 4 files changed, 1263 insertions(+), 213 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index a9e99463..35b18c4f 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -216,7 +216,8 @@ void GraphCache::parse_put(const std::string & keyexpr) } else { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", - "Unable to add a new node /%s with id %s an existing namespace %s in the graph. Report this bug.", + "Unable to add a new node /%s with id %s an " + "existing namespace %s in the graph. Report this bug.", entity.node_name().c_str(), entity.id().c_str(), entity.node_namespace().c_str()); diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 0458bfd0..d26c9e0d 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -17,16 +17,28 @@ #include #include +#include "rcpputils/scope_exit.hpp" #include "rcutils/logging_macros.h" #include "rmw_data_types.hpp" ///============================================================================== +saved_msg_data::saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16]) +: payload(p), recv_timestamp(recv_ts) +{ + memcpy(publisher_gid, pub_gid, 16); +} + +//============================================================================== void sub_data_handler( const z_sample_t * sample, void * data) { z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); + auto drop_keystr = rcpputils::make_scope_exit( + [&keystr]() { + z_drop(z_move(keystr)); + }); auto sub_data = static_cast(data); if (sub_data == nullptr) { @@ -67,6 +79,81 @@ void sub_data_handler( sub_data->condition->notify_one(); } } +} + +//============================================================================== +void service_data_handler(const z_query_t * query, void * data) +{ + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_cpp", + "[service_data_handler] triggered" + ); + z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query)); + auto drop_keystr = rcpputils::make_scope_exit( + [&keystr]() { + z_drop(z_move(keystr)); + }); + + rmw_service_data_t * service_data = static_cast(data); + if (service_data == nullptr) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain rmw_service_data_t from data for " + "service for %s", + z_loan(keystr) + ); + return; + } + + // Get the query parameters and payload + { + std::lock_guard lock(service_data->query_queue_mutex); + service_data->query_queue.push_back(z_query_clone(query)); + } + { + // Since we added new data, trigger the guard condition if it is available + std::lock_guard internal_lock(service_data->internal_mutex); + if (service_data->condition != nullptr) { + service_data->condition->notify_one(); + } + } +} - z_drop(z_move(keystr)); +//============================================================================== +void client_data_handler(z_owned_reply_t * reply, void * data) +{ + auto client_data = static_cast(data); + if (client_data == nullptr) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain client_data_t " + ); + return; + } + if (!z_reply_check(reply)) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "z_reply_check returned False" + ); + return; + } + if (!z_reply_is_ok(reply)) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "z_reply_is_ok returned False" + ); + return; + } + { + std::lock_guard msg_lock(client_data->message_mutex); + // Take ownership of the reply. + client_data->replies.emplace_back(*reply); + *reply = z_reply_null(); + } + { + std::lock_guard internal_lock(client_data->internal_mutex); + if (client_data->condition != nullptr) { + client_data->condition->notify_one(); + } + } } diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index d16e57ce..3f76fd91 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -22,8 +22,10 @@ #include #include #include +#include #include #include +#include #include "rcutils/allocator.h" @@ -31,6 +33,7 @@ #include "graph_cache.hpp" #include "message_type_support.hpp" +#include "service_type_support.hpp" /// Structs for various type erased data fields. @@ -97,11 +100,7 @@ void sub_data_handler(const z_sample_t * sample, void * sub_data); struct saved_msg_data { - explicit saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16]) - : payload(p), recv_timestamp(recv_ts) - { - memcpy(publisher_gid, pub_gid, 16); - } + explicit saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16]); zc_owned_payload_t payload; uint64_t recv_timestamp; @@ -131,4 +130,65 @@ struct rmw_subscription_data_t std::condition_variable * condition{nullptr}; }; + +///============================================================================== + +void service_data_handler(const z_query_t * query, void * service_data); + +void client_data_handler(z_owned_reply_t * reply, void * client_data); + +///============================================================================== + +struct rmw_service_data_t +{ + std::size_t get_new_uid(); + + z_owned_keyexpr_t keyexpr; + z_owned_queryable_t qable; + + const void * request_type_support_impl; + const void * response_type_support_impl; + const char * typesupport_identifier; + RequestTypeSupport * request_type_support; + ResponseTypeSupport * response_type_support; + + rmw_context_t * context; + + // Deque to store the queries in the order they arrive. + std::deque query_queue; + std::mutex query_queue_mutex; + + // Map to store the sequence_number -> query_id + std::map sequence_to_query_map; + std::mutex sequence_to_query_map_mutex; + + std::mutex internal_mutex; + std::condition_variable * condition{nullptr}; +}; + +///============================================================================== + +struct rmw_client_data_t +{ + z_owned_keyexpr_t keyexpr; + + z_owned_closure_reply_t zn_closure_reply; + + std::mutex message_mutex; + std::deque replies; + + const void * request_type_support_impl; + const void * response_type_support_impl; + const char * typesupport_identifier; + RequestTypeSupport * request_type_support; + ResponseTypeSupport * response_type_support; + + rmw_context_t * context; + + std::mutex internal_mutex; + std::condition_variable * condition{nullptr}; + + size_t sequence_number{1}; +}; + #endif // DETAIL__RMW_DATA_TYPES_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 1a39f976..38a6c662 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -48,6 +49,105 @@ #include "rmw/validate_namespace.h" #include "rmw/validate_node_name.h" +namespace +{ + +//============================================================================== +// A function to take ros topic names and convert them to valid Zenoh keys. +// In particular, Zenoh keys cannot start or end with a /, so this function +// will strip them out. +// The Zenoh key is also prefixed with the ros_domain_id. +// Performance note: at present, this function allocates a new string and copies +// the old string into it. If this becomes a performance problem, we could consider +// modifying the topic_name in place. But this means we need to be much more +// careful about who owns the string. +z_owned_keyexpr_t ros_topic_name_to_zenoh_key( + const char * const topic_name, size_t domain_id, rcutils_allocator_t * allocator) +{ + size_t start_offset = 0; + size_t topic_name_len = strlen(topic_name); + size_t end_offset = topic_name_len; + + if (topic_name_len > 0) { + if (topic_name[0] == '/') { + // Strip the leading '/' + start_offset = 1; + } + if (topic_name[end_offset - 1] == '/') { + // Strip the trailing '/' + end_offset -= 1; + } + } + + std::stringstream domain_ss; + domain_ss << domain_id; + char * stripped_topic_name = rcutils_strndup( + &topic_name[start_offset], end_offset - start_offset, *allocator); + if (stripped_topic_name == nullptr) { + return z_keyexpr_null(); + } + z_owned_keyexpr_t keyexpr = z_keyexpr_join( + z_keyexpr(domain_ss.str().c_str()), z_keyexpr(stripped_topic_name)); + allocator->deallocate(stripped_topic_name, allocator->state); + + return keyexpr; +} + +//============================================================================== +const rosidl_message_type_support_t * find_message_type_support( + const rosidl_message_type_support_t * type_supports) +{ + const rosidl_message_type_support_t * type_support = get_message_typesupport_handle( + type_supports, RMW_ZENOH_CPP_TYPESUPPORT_C); + if (!type_support) { + rcutils_error_string_t prev_error_string = rcutils_get_error_string(); + rcutils_reset_error(); + type_support = get_message_typesupport_handle(type_supports, RMW_ZENOH_CPP_TYPESUPPORT_CPP); + if (!type_support) { + rcutils_error_string_t error_string = rcutils_get_error_string(); + rcutils_reset_error(); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "Type support not from this implementation. Got:\n" + " %s\n" + " %s\n" + "while fetching it", + prev_error_string.str, error_string.str); + return nullptr; + } + } + + return type_support; +} + +//============================================================================== +const rosidl_service_type_support_t * find_service_type_support( + const rosidl_service_type_support_t * type_supports) +{ + const rosidl_service_type_support_t * type_support = get_service_typesupport_handle( + type_supports, RMW_ZENOH_CPP_TYPESUPPORT_C); + if (!type_support) { + rcutils_error_string_t prev_error_string = rcutils_get_error_string(); + rcutils_reset_error(); + type_support = get_service_typesupport_handle( + type_supports, RMW_ZENOH_CPP_TYPESUPPORT_CPP); + if (!type_support) { + rcutils_error_string_t error_string = rcutils_get_error_string(); + rcutils_reset_error(); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "Type support not from this implementation. Got:\n" + " %s\n" + " %s\n" + "while fetching it", + prev_error_string.str, error_string.str); + return nullptr; + } + } + + return type_support; +} + +} // namespace + extern "C" { //============================================================================== @@ -318,70 +418,6 @@ rmw_fini_publisher_allocation( return RMW_RET_UNSUPPORTED; } -//============================================================================== -// A function to take ros topic names and convert them to valid Zenoh keys. -// In particular, Zenoh keys cannot start or end with a /, so this function -// will strip them out. -// The Zenoh key is also prefixed with the ros_domain_id. -// Performance note: at present, this function allocates a new string and copies -// the old string into it. If this becomes a performance problem, we could consider -// modifying the topic_name in place. But this means we need to be much more -// careful about who owns the string. -static z_owned_keyexpr_t ros_topic_name_to_zenoh_key( - const char * const topic_name, size_t domain_id, rcutils_allocator_t * allocator) -{ - size_t start_offset = 0; - size_t topic_name_len = strlen(topic_name); - size_t end_offset = topic_name_len; - - if (topic_name_len > 0) { - if (topic_name[0] == '/') { - // Strip the leading '/' - start_offset = 1; - } - if (topic_name[end_offset - 1] == '/') { - // Strip the trailing '/' - end_offset -= 1; - } - } - - std::stringstream domain_ss; - domain_ss << domain_id; - char * stripped_topic_name = rcutils_strndup( - &topic_name[start_offset], end_offset - start_offset, *allocator); - z_owned_keyexpr_t keyexpr = z_keyexpr_join( - z_keyexpr(domain_ss.str().c_str()), z_keyexpr(stripped_topic_name)); - allocator->deallocate(stripped_topic_name, allocator->state); - - return keyexpr; -} - -//============================================================================== -static const rosidl_message_type_support_t * find_type_support( - const rosidl_message_type_support_t * type_supports) -{ - const rosidl_message_type_support_t * type_support = get_message_typesupport_handle( - type_supports, RMW_ZENOH_CPP_TYPESUPPORT_C); - if (!type_support) { - rcutils_error_string_t prev_error_string = rcutils_get_error_string(); - rcutils_reset_error(); - type_support = get_message_typesupport_handle(type_supports, RMW_ZENOH_CPP_TYPESUPPORT_CPP); - if (!type_support) { - rcutils_error_string_t error_string = rcutils_get_error_string(); - rcutils_reset_error(); - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "Type support not from this implementation. Got:\n" - " %s\n" - " %s\n" - "while fetching it", - prev_error_string.str, error_string.str); - return nullptr; - } - } - - return type_support; -} - //============================================================================== /// Create a publisher and return a handle to that publisher. rmw_publisher_t * @@ -434,9 +470,9 @@ rmw_create_publisher( } // Get the RMW type support. - const rosidl_message_type_support_t * type_support = find_type_support(type_supports); + const rosidl_message_type_support_t * type_support = find_message_type_support(type_supports); if (type_support == nullptr) { - // error was already set by find_type_support + // error was already set by find_message_type_support return nullptr; } @@ -1000,9 +1036,9 @@ rmw_serialize( const rosidl_message_type_support_t * type_support, rmw_serialized_message_t * serialized_message) { - const rosidl_message_type_support_t * ts = find_type_support(type_support); + const rosidl_message_type_support_t * ts = find_message_type_support(type_support); if (ts == nullptr) { - // error was already set by find_type_support + // error was already set by find_message_type_support return RMW_RET_ERROR; } @@ -1035,9 +1071,9 @@ rmw_deserialize( const rosidl_message_type_support_t * type_support, void * ros_message) { - const rosidl_message_type_support_t * ts = find_type_support(type_support); + const rosidl_message_type_support_t * ts = find_message_type_support(type_support); if (ts == nullptr) { - // error was already set by find_type_support + // error was already set by find_message_type_support return RMW_RET_ERROR; } @@ -1115,9 +1151,9 @@ rmw_create_subscription( // return nullptr; // } - const rosidl_message_type_support_t * type_support = find_type_support(type_supports); + const rosidl_message_type_support_t * type_support = find_message_type_support(type_supports); if (type_support == nullptr) { - // error was already set by find_type_support + // error was already set by find_message_type_support return nullptr; } @@ -1451,7 +1487,7 @@ static rmw_ret_t __rmw_take( std::unique_ptr msg_data; { - std::unique_lock lock(sub_data->message_queue_mutex); + std::lock_guard lock(sub_data->message_queue_mutex); if (sub_data->message_queue.empty()) { // This tells rcl that the check for a new message was done, but no messages have come in yet. @@ -1630,145 +1666,790 @@ rmw_return_loaned_message_from_subscription( /// Create a service client that can send requests to and receive replies from a service server. rmw_client_t * rmw_create_client( - const rmw_node_t * node, - const rosidl_service_type_support_t * type_support, - const char * service_name, - const rmw_qos_profile_t * qos_profile) -{ - static_cast(node); - static_cast(type_support); - static_cast(service_name); - static_cast(qos_profile); - return nullptr; -} - -//============================================================================== -/// Destroy and unregister a service client from its node. -rmw_ret_t -rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) -{ - static_cast(node); - static_cast(client); - return RMW_RET_UNSUPPORTED; -} - -//============================================================================== -/// Send a ROS service request. -rmw_ret_t -rmw_send_request( - const rmw_client_t * client, - const void * ros_request, - int64_t * sequence_id) -{ - static_cast(client); - static_cast(ros_request); - static_cast(sequence_id); - return RMW_RET_UNSUPPORTED; -} - -//============================================================================== -/// Take an incoming ROS service response. -rmw_ret_t -rmw_take_response( - const rmw_client_t * client, - rmw_service_info_t * request_header, - void * ros_response, - bool * taken) -{ - static_cast(client); - static_cast(request_header); - static_cast(ros_response); - static_cast(taken); - return RMW_RET_UNSUPPORTED; -} - -//============================================================================== -/// Retrieve the actual qos settings of the client's request publisher. -rmw_ret_t -rmw_client_request_publisher_get_actual_qos( - const rmw_client_t * client, - rmw_qos_profile_t * qos) -{ - static_cast(client); - static_cast(qos); - return RMW_RET_UNSUPPORTED; -} - -//============================================================================== -/// Retrieve the actual qos settings of the client's response subscription. -rmw_ret_t -rmw_client_response_subscription_get_actual_qos( - const rmw_client_t * client, - rmw_qos_profile_t * qos) -{ - static_cast(client); - static_cast(qos); - return RMW_RET_UNSUPPORTED; -} - -//============================================================================== -/// Create a service server that can receive requests from and send replies to a service client. -rmw_service_t * -rmw_create_service( const rmw_node_t * node, const rosidl_service_type_support_t * type_supports, const char * service_name, - const rmw_qos_profile_t * qos_profiles) + const rmw_qos_profile_t * qos_profile) { - // Interim implementation to suppress type_description service that spins up - // with a node by default. + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_common_cpp", + "[rmw_create_client] %s with queue of depth %ld", + service_name, + qos_profile->depth); RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, rmw_zenoh_identifier, return nullptr); - RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr); - RMW_CHECK_ARGUMENT_FOR_NULL(service_name, nullptr); - if (0 == strlen(service_name)) { - RMW_SET_ERROR_MSG("service_name argument is an empty string"); - return nullptr; - } - RMW_CHECK_ARGUMENT_FOR_NULL(qos_profiles, nullptr); - if (!qos_profiles->avoid_ros_namespace_conventions) { - int validation_result = RMW_TOPIC_VALID; - rmw_ret_t ret = rmw_validate_full_topic_name(service_name, &validation_result, nullptr); - if (RMW_RET_OK != ret) { - return nullptr; - } - if (RMW_TOPIC_VALID != validation_result) { - const char * reason = rmw_full_topic_name_validation_result_string(validation_result); - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("service_name argument is invalid: %s", reason); - return nullptr; - } - } - // rmw_qos_profile_t adapted_qos_profile = - // rmw_dds_common::qos_profile_update_best_available_for_services(*qos_profile); - // if (!is_valid_qos(adapted_qos_profile)) { - // RMW_SET_ERROR_MSG("create_service() called with invalid QoS"); - // return nullptr; - // } - rmw_service_t * service = rmw_service_allocate(); - if (!service) { - RMW_SET_ERROR_MSG("failed to allocate rmw_service_t"); + RMW_CHECK_ARGUMENT_FOR_NULL(service_name, nullptr); + if (strlen(service_name) == 0) { + RMW_SET_ERROR_MSG("service name is empty string"); return nullptr; } + RMW_CHECK_ARGUMENT_FOR_NULL(qos_profile, nullptr); + RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr); - return service; -} - -//============================================================================== -/// Destroy and unregister a service server from its node. -rmw_ret_t -rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) + RMW_CHECK_FOR_NULL_WITH_MSG( + node->context, + "expected initialized context", + return nullptr); + RMW_CHECK_FOR_NULL_WITH_MSG( + node->context->impl, + "expected initialized context impl", + return nullptr); + rmw_context_impl_s * context_impl = static_cast( + node->context->impl); + RMW_CHECK_FOR_NULL_WITH_MSG( + context_impl, + "unable to get rmw_context_impl_s", + return nullptr); + if (!z_check(context_impl->session)) { + RMW_SET_ERROR_MSG("zenoh session is invalid"); + return nullptr; + } + + rcutils_allocator_t * allocator = &node->context->options.allocator; + + // Validate service name + int validation_result; + + if (rmw_validate_full_topic_name(service_name, &validation_result, nullptr) != RMW_RET_OK) { + RMW_SET_ERROR_MSG("rmw_validate_full_topic_name failed"); + return nullptr; + } + + if (validation_result != RMW_TOPIC_VALID && !qos_profile->avoid_ros_namespace_conventions) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("service name is malformed: %s", service_name); + return nullptr; + } + + // client data + rmw_client_t * rmw_client = static_cast(allocator->zero_allocate( + 1, + sizeof(rmw_client_t), + allocator->state)); + RMW_CHECK_FOR_NULL_WITH_MSG( + rmw_client, + "failed to allocate memory for the client", + return nullptr); + + auto free_rmw_client = rcpputils::make_scope_exit( + [rmw_client, allocator]() { + allocator->deallocate(rmw_client, allocator->state); + }); + + auto client_data = static_cast( + allocator->allocate(sizeof(rmw_client_data_t), allocator->state)); + RMW_CHECK_FOR_NULL_WITH_MSG( + client_data, + "failed to allocate memory for client data", + return nullptr); + auto free_client_data = rcpputils::make_scope_exit( + [client_data, allocator]() { + allocator->deallocate(client_data, allocator->state); + }); + + RMW_TRY_PLACEMENT_NEW(client_data, client_data, return nullptr, rmw_client_data_t); + auto destruct_client_data = rcpputils::make_scope_exit( + [client_data]() { + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( + client_data->~rmw_client_data_t(), + rmw_client_data_t); + }); + + // Obtain the type support + const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); + if (type_support == nullptr) { + // error was already set by find_service_type_support + return nullptr; + } + + auto service_members = static_cast(type_support->data); + auto request_members = static_cast( + service_members->request_members_->data); + auto response_members = static_cast( + service_members->response_members_->data); + + client_data->context = node->context; + client_data->typesupport_identifier = type_support->typesupport_identifier; + client_data->request_type_support_impl = request_members; + client_data->response_type_support_impl = response_members; + + // Request type support + client_data->request_type_support = static_cast( + allocator->allocate(sizeof(RequestTypeSupport), allocator->state)); + + RMW_CHECK_FOR_NULL_WITH_MSG( + client_data->request_type_support, + "Failed to allocate RequestTypeSupport", + return nullptr); + auto free_request_type_support = rcpputils::make_scope_exit( + [client_data, allocator]() { + allocator->deallocate(client_data->request_type_support, allocator->state); + }); + + RMW_TRY_PLACEMENT_NEW( + client_data->request_type_support, + client_data->request_type_support, + return nullptr, + RequestTypeSupport, service_members); + auto destruct_request_type_support = rcpputils::make_scope_exit( + [client_data]() { + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( + client_data->request_type_support->~RequestTypeSupport(), + RequestTypeSupport); + }); + + // Response type support + client_data->response_type_support = static_cast( + allocator->allocate(sizeof(ResponseTypeSupport), allocator->state)); + + RMW_CHECK_FOR_NULL_WITH_MSG( + client_data->response_type_support, + "Failed to allocate ResponseTypeSupport", + return nullptr); + auto free_response_type_support = rcpputils::make_scope_exit( + [client_data, allocator]() { + allocator->deallocate(client_data->response_type_support, allocator->state); + }); + + RMW_TRY_PLACEMENT_NEW( + client_data->response_type_support, + client_data->response_type_support, + return nullptr, + ResponseTypeSupport, service_members); + auto destruct_response_type_support = rcpputils::make_scope_exit( + [client_data]() { + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( + client_data->response_type_support->~ResponseTypeSupport(), + ResponseTypeSupport); + }); + + // Populate the rmw_client. + rmw_client->implementation_identifier = rmw_zenoh_identifier; + rmw_client->service_name = rcutils_strdup(service_name, *allocator); + RMW_CHECK_FOR_NULL_WITH_MSG( + rmw_client->service_name, + "failed to allocate client name", + return nullptr); + auto free_service_name = rcpputils::make_scope_exit( + [rmw_client, allocator]() { + allocator->deallocate(const_cast(rmw_client->service_name), allocator->state); + }); + + client_data->keyexpr = ros_topic_name_to_zenoh_key( + rmw_client->service_name, node->context->actual_domain_id, allocator); + auto free_ros_keyexpr = rcpputils::make_scope_exit( + [client_data]() { + z_keyexpr_drop(z_move(client_data->keyexpr)); + }); + if (!z_keyexpr_check(&client_data->keyexpr)) { + RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); + return nullptr; + } + + rmw_client->data = client_data; + + free_rmw_client.cancel(); + free_client_data.cancel(); + free_request_type_support.cancel(); + destruct_request_type_support.cancel(); + free_response_type_support.cancel(); + destruct_client_data.cancel(); + destruct_response_type_support.cancel(); + free_service_name.cancel(); + free_ros_keyexpr.cancel(); + + return rmw_client; +} + +//============================================================================== +/// Destroy and unregister a service client from its node. +rmw_ret_t +rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) +{ + // ASSERTIONS ================================================================ + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + node, + node->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + client, + client->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + rcutils_allocator_t * allocator = &node->context->options.allocator; + + rmw_client_data_t * client_data = static_cast(client->data); + RMW_CHECK_FOR_NULL_WITH_MSG( + client_data, + "client implementation pointer is null.", + return RMW_RET_INVALID_ARGUMENT); + + // CLEANUP =================================================================== + z_drop(z_move(client_data->zn_closure_reply)); + z_drop(z_move(client_data->keyexpr)); + for (z_owned_reply_t & reply : client_data->replies) { + z_reply_drop(&reply); + } + client_data->replies.clear(); + + allocator->deallocate(client_data->request_type_support, allocator->state); + allocator->deallocate(client_data->response_type_support, allocator->state); + allocator->deallocate(client->data, allocator->state); + + allocator->deallocate(const_cast(client->service_name), allocator->state); + allocator->deallocate(client, allocator->state); + + return RMW_RET_OK; +} + +static z_owned_bytes_map_t create_map_and_set_sequence_num(int64_t sequence_number) { - // Interim implementation to suppress type_description service that spins up - // with a node by default. - if (node == nullptr || service == nullptr) { + z_owned_bytes_map_t map = z_bytes_map_new(); + if (!z_check(map)) { + RMW_SET_ERROR_MSG("failed to allocate map for sequence number"); + return z_bytes_map_null(); + } + + // The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807. + // That is 19 characters long, plus one for the trailing \0, means we need 20 bytes. + char seq_id_str[20]; + if (rcutils_snprintf(seq_id_str, 20, "%" PRId64, sequence_number) < 0) { + RMW_SET_ERROR_MSG("failed to print sequence_number into buffer"); + return z_bytes_map_null(); + } + z_bytes_map_insert_by_copy(&map, z_bytes_new("sequence_number"), z_bytes_new(seq_id_str)); + + return map; +} + +//============================================================================== +/// Send a ROS service request. +rmw_ret_t +rmw_send_request( + const rmw_client_t * client, + const void * ros_request, + int64_t * sequence_id) +{ + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_cpp", "[rmw_send_request]"); + RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(sequence_id, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + client, + client->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + rmw_client_data_t * client_data = static_cast(client->data); + RMW_CHECK_FOR_NULL_WITH_MSG( + client_data, + "Unable to retrieve client_data from client.", + RMW_RET_INVALID_ARGUMENT); + + rmw_context_impl_s * context_impl = static_cast( + client_data->context->impl); + + // Serialize data + + rcutils_allocator_t * allocator = &(client_data->context->options.allocator); + + size_t max_data_length = ( + client_data->request_type_support->get_estimated_serialized_size( + ros_request, client_data->request_type_support_impl)); + + // Init serialized message byte array + char * request_bytes = static_cast(allocator->allocate( + max_data_length, + allocator->state)); + if (!request_bytes) { + RMW_SET_ERROR_MSG("failed allocate request message bytes"); return RMW_RET_ERROR; } - rmw_service_free(service); + auto free_request_bytes = rcpputils::make_scope_exit( + [request_bytes, allocator]() { + allocator->deallocate(request_bytes, allocator->state); + }); + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer(request_bytes, max_data_length); + + // Object that serializes the data + eprosima::fastcdr::Cdr ser( + fastbuffer, + eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, + eprosima::fastcdr::Cdr::DDS_CDR); + if (!client_data->request_type_support->serialize_ros_message( + ros_request, + ser, + client_data->request_type_support_impl)) + { + return RMW_RET_ERROR; + } + + size_t data_length = ser.getSerializedDataLength(); + + // TODO(clalancette): Locking for multiple requests at the same time + *sequence_id = client_data->sequence_number++; + + // Send request + z_get_options_t opts = z_get_options_default(); + + z_owned_bytes_map_t map = create_map_and_set_sequence_num(*sequence_id); + if (!z_check(map)) { + // create_map_and_set_sequence_num already set the error + return RMW_RET_ERROR; + } + auto free_attachment_map = rcpputils::make_scope_exit( + [&map]() { + z_bytes_map_drop(z_move(map)); + }); + + opts.attachment = z_bytes_map_as_attachment(&map); + + opts.target = Z_QUERY_TARGET_ALL; + opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; + opts.value.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); + client_data->zn_closure_reply = z_closure(client_data_handler, nullptr, client_data); + z_get( + z_loan(context_impl->session), z_loan( + client_data->keyexpr), "", &client_data->zn_closure_reply, &opts); + + return RMW_RET_OK; +} + +static int64_t get_sequence_num_from_attachment(const z_attachment_t * const attachment) +{ + // Get the sequence_number out of the attachment + if (!z_check(*attachment)) { + // A valid request must have had an attachment + RMW_SET_ERROR_MSG("Could not get attachment from query"); + return -1; + } + + z_bytes_t sequence_num_index = z_attachment_get(*attachment, z_bytes_new("sequence_number")); + if (!z_check(sequence_num_index)) { + // A valid request must have had a sequence number attached + RMW_SET_ERROR_MSG("Could not get sequence number from query"); + return -1; + } + + if (sequence_num_index.len < 1) { + RMW_SET_ERROR_MSG("No value specified for the sequence number"); + return -1; + } + + if (sequence_num_index.len > 19) { + // The sequence number was larger than we expected + RMW_SET_ERROR_MSG("Sequence number too large"); + return -1; + } + + // The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807. + // That is 19 characters long, plus one for the trailing \0, means we need 20 bytes. + char sequence_num_str[20]; + + memcpy(sequence_num_str, sequence_num_index.start, sequence_num_index.len); + sequence_num_str[sequence_num_index.len] = '\0'; + + errno = 0; + char * endptr; + int64_t seqnum = strtol(sequence_num_str, &endptr, 10); + if (seqnum == 0) { + // This is an error regardless; the client should never send this + RMW_SET_ERROR_MSG("A invalid zero value sent as the sequence number"); + return -1; + } else if (endptr == sequence_num_str) { + // No values were converted, this is an error + RMW_SET_ERROR_MSG("No valid numbers available in the sequence number"); + return -1; + } else if (*endptr != '\0') { + // There was junk after the number + RMW_SET_ERROR_MSG("Non-numeric values in the sequence number"); + return -1; + } else if (errno != 0) { + // Some other error occurred, which may include overflow or underflow + RMW_SET_ERROR_MSG( + "An undefined error occurred while getting the sequence number, this may be an overflow"); + return -1; + } + + return seqnum; +} + +//============================================================================== +/// Take an incoming ROS service response. +rmw_ret_t +rmw_take_response( + const rmw_client_t * client, + rmw_service_info_t * request_header, + void * ros_response, + bool * taken) +{ + *taken = false; + RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_response]"); + + RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_response, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + client, + client->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_FOR_NULL_WITH_MSG( + client->service_name, "client has no service name", RMW_RET_INVALID_ARGUMENT); + + rmw_client_data_t * client_data = static_cast(client->data); + RMW_CHECK_FOR_NULL_WITH_MSG( + client->data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); + + z_owned_reply_t * latest_reply = nullptr; + + std::lock_guard lock(client_data->message_mutex); + if (client_data->replies.empty()) { + RCUTILS_LOG_ERROR_NAMED("rmw_zenoh_cpp", "[rmw_take_response] Response message is empty"); + return RMW_RET_ERROR; + } + latest_reply = &client_data->replies.front(); + z_sample_t sample = z_reply_ok(latest_reply); + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer( + reinterpret_cast(const_cast(sample.payload.start)), + sample.payload.len); + + // Object that serializes the data + eprosima::fastcdr::Cdr deser( + fastbuffer, + eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, + eprosima::fastcdr::Cdr::DDS_CDR); + if (!client_data->response_type_support->deserialize_ros_message( + deser, + ros_response, + client_data->response_type_support_impl)) + { + RMW_SET_ERROR_MSG("could not deserialize ROS response"); + return RMW_RET_ERROR; + } + + request_header->request_id.sequence_number = get_sequence_num_from_attachment(&sample.attachment); + if (request_header->request_id.sequence_number < 0) { + // get_sequence_num_from_attachment already set an error + return RMW_RET_ERROR; + } + // TODO(clalancette): We also need to fill in the source_timestamp, received_timestamp, + // and writer_guid + + *taken = true; + + client_data->replies.pop_front(); + z_reply_drop(latest_reply); + + return RMW_RET_OK; +} + +//============================================================================== +/// Retrieve the actual qos settings of the client's request publisher. +rmw_ret_t +rmw_client_request_publisher_get_actual_qos( + const rmw_client_t * client, + rmw_qos_profile_t * qos) +{ + // TODO(francocipollone): Fix. + static_cast(client); + static_cast(qos); + return RMW_RET_OK; +} + +//============================================================================== +/// Retrieve the actual qos settings of the client's response subscription. +rmw_ret_t +rmw_client_response_subscription_get_actual_qos( + const rmw_client_t * client, + rmw_qos_profile_t * qos) +{ + // TODO(francocipollone): Fix. + static_cast(client); + static_cast(qos); + return RMW_RET_OK; +} + +//============================================================================== +/// Create a service server that can receive requests from and send replies to a service client. +rmw_service_t * +rmw_create_service( + const rmw_node_t * node, + const rosidl_service_type_support_t * type_supports, + const char * service_name, + const rmw_qos_profile_t * qos_profiles) +{ + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_cpp", + "[rmw_create_service] %s", + service_name); + + // ASSERTIONS ================================================================ + RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + node, + node->implementation_identifier, + rmw_zenoh_identifier, + return nullptr); + RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr); + RMW_CHECK_ARGUMENT_FOR_NULL(service_name, nullptr); + if (0 == strlen(service_name)) { + RMW_SET_ERROR_MSG("service_name argument is an empty string"); + return nullptr; + } + RMW_CHECK_ARGUMENT_FOR_NULL(qos_profiles, nullptr); + if (!qos_profiles->avoid_ros_namespace_conventions) { + int validation_result = RMW_TOPIC_VALID; + // TODO(francocipollone): Verify if this is the right way to validate the service name. + rmw_ret_t ret = rmw_validate_full_topic_name(service_name, &validation_result, nullptr); + if (RMW_RET_OK != ret) { + return nullptr; + } + if (RMW_TOPIC_VALID != validation_result) { + const char * reason = rmw_full_topic_name_validation_result_string(validation_result); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("service_name argument is invalid: %s", reason); + return nullptr; + } + } + + RMW_CHECK_FOR_NULL_WITH_MSG( + node->context, + "expected initialized context", + return nullptr); + RMW_CHECK_FOR_NULL_WITH_MSG( + node->context->impl, + "expected initialized context impl", + return nullptr); + rmw_context_impl_s * context_impl = static_cast( + node->context->impl); + RMW_CHECK_FOR_NULL_WITH_MSG( + context_impl, + "unable to get rmw_context_impl_s", + return nullptr); + if (!z_check(context_impl->session)) { + RMW_SET_ERROR_MSG("zenoh session is invalid"); + return nullptr; + } + + // SERVICE DATA ============================================================== + rcutils_allocator_t * allocator = &node->context->options.allocator; + + rmw_service_t * rmw_service = static_cast(allocator->zero_allocate( + 1, + sizeof(rmw_service_t), + allocator->state)); + RMW_CHECK_FOR_NULL_WITH_MSG( + rmw_service, + "failed to allocate memory for the service", + return nullptr); + auto free_rmw_service = rcpputils::make_scope_exit( + [rmw_service, allocator]() { + allocator->deallocate(rmw_service, allocator->state); + }); + + auto service_data = static_cast( + allocator->allocate(sizeof(rmw_service_data_t), allocator->state)); + RMW_CHECK_FOR_NULL_WITH_MSG( + service_data, + "failed to allocate memory for service data", + return nullptr); + auto free_service_data = rcpputils::make_scope_exit( + [service_data, allocator]() { + allocator->deallocate(service_data, allocator->state); + }); + + RMW_TRY_PLACEMENT_NEW(service_data, service_data, return nullptr, rmw_service_data_t); + auto destruct_service_data = rcpputils::make_scope_exit( + [service_data]() { + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( + service_data->~rmw_service_data_t(), + rmw_service_data_t); + }); + + // Get the RMW type support. + const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); + if (type_support == nullptr) { + // error was already set by find_service_type_support + return nullptr; + } + + auto service_members = static_cast(type_support->data); + auto request_members = static_cast( + service_members->request_members_->data); + auto response_members = static_cast( + service_members->response_members_->data); + + service_data->context = node->context; + service_data->typesupport_identifier = type_support->typesupport_identifier; + service_data->request_type_support_impl = request_members; + service_data->response_type_support_impl = response_members; + + // Request type support + service_data->request_type_support = static_cast( + allocator->allocate(sizeof(RequestTypeSupport), allocator->state)); + RMW_CHECK_FOR_NULL_WITH_MSG( + service_data->request_type_support, + "Failed to allocate RequestTypeSupport", + return nullptr); + auto free_request_type_support = rcpputils::make_scope_exit( + [request_type_support = service_data->request_type_support, allocator]() { + allocator->deallocate(request_type_support, allocator->state); + }); + RMW_TRY_PLACEMENT_NEW( + service_data->request_type_support, + service_data->request_type_support, + return nullptr, + RequestTypeSupport, service_members); + auto destruct_request_type_support = rcpputils::make_scope_exit( + [service_data]() { + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( + service_data->request_type_support->~RequestTypeSupport(), + RequestTypeSupport); + }); + + // Response type support + service_data->response_type_support = static_cast( + allocator->allocate(sizeof(ResponseTypeSupport), allocator->state)); + RMW_CHECK_FOR_NULL_WITH_MSG( + service_data->response_type_support, + "Failed to allocate ResponseTypeSupport", + return nullptr); + auto free_response_type_support = rcpputils::make_scope_exit( + [response_type_support = service_data->response_type_support, allocator]() { + allocator->deallocate(response_type_support, allocator->state); + }); + RMW_TRY_PLACEMENT_NEW( + service_data->response_type_support, + service_data->response_type_support, + return nullptr, + ResponseTypeSupport, service_members); + auto destruct_response_type_support = rcpputils::make_scope_exit( + [service_data]() { + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( + service_data->response_type_support->~ResponseTypeSupport(), + ResponseTypeSupport); + }); + + // Populate the rmw_service. + rmw_service->implementation_identifier = rmw_zenoh_identifier; + rmw_service->service_name = rcutils_strdup(service_name, *allocator); + RMW_CHECK_FOR_NULL_WITH_MSG( + rmw_service->service_name, + "failed to allocate service name", + return nullptr); + auto free_service_name = rcpputils::make_scope_exit( + [rmw_service, allocator]() { + allocator->deallocate(const_cast(rmw_service->service_name), allocator->state); + }); + service_data->keyexpr = ros_topic_name_to_zenoh_key( + rmw_service->service_name, node->context->actual_domain_id, allocator); + auto free_ros_keyexpr = rcpputils::make_scope_exit( + [service_data]() { + if (service_data) { + z_drop(z_move(service_data->keyexpr)); + } + }); + if (!z_check(z_loan(service_data->keyexpr))) { + RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); + return nullptr; + } + + z_owned_closure_query_t callback = z_closure(service_data_handler, nullptr, service_data); + + service_data->qable = z_declare_queryable( + z_loan(context_impl->session), + z_loan(service_data->keyexpr), + z_move(callback), + nullptr); + + if (!z_check(service_data->qable)) { + RMW_SET_ERROR_MSG("unable to create zenoh queryable"); + return nullptr; + } + auto undeclare_z_queryable = rcpputils::make_scope_exit( + [service_data]() { + z_undeclare_queryable(z_move(service_data->qable)); + }); + + // TODO(francocipollone): Update graph cache. + + rmw_service->data = service_data; + + free_rmw_service.cancel(); + free_service_data.cancel(); + free_service_name.cancel(); + destruct_service_data.cancel(); + destruct_request_type_support.cancel(); + destruct_response_type_support.cancel(); + free_request_type_support.cancel(); + free_response_type_support.cancel(); + free_ros_keyexpr.cancel(); + undeclare_z_queryable.cancel(); + return rmw_service; +} + +//============================================================================== +/// Destroy and unregister a service server from its node. +rmw_ret_t +rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) +{ + // ASSERTIONS ================================================================ + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(service->data, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + node, + node->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + service, + service->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + rcutils_allocator_t * allocator = &node->context->options.allocator; + + rmw_service_data_t * service_data = static_cast(service->data); + RMW_CHECK_FOR_NULL_WITH_MSG( + service_data, + "Unable to retrieve service_data from service", + return RMW_RET_INVALID_ARGUMENT); + + // CLEANUP ================================================================ + z_drop(z_move(service_data->keyexpr)); + z_drop(z_move(service_data->qable)); + for (z_owned_query_t & query : service_data->query_queue) { + z_drop(z_move(query)); + } + service_data->query_queue.clear(); + service_data->sequence_to_query_map.clear(); + + allocator->deallocate(service_data->request_type_support, allocator->state); + allocator->deallocate(service_data->response_type_support, allocator->state); + allocator->deallocate(service->data, allocator->state); + + allocator->deallocate(const_cast(service->service_name), allocator->state); + allocator->deallocate(service, allocator->state); + + // TODO(francocipollone): Update graph cache. return RMW_RET_OK; } @@ -1781,11 +2462,91 @@ rmw_take_request( void * ros_request, bool * taken) { - static_cast(service); - static_cast(request_header); - static_cast(ros_request); - static_cast(taken); - return RMW_RET_UNSUPPORTED; + *taken = false; + RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_request]"); + + RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(service->data, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); + + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + service, + service->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + RMW_CHECK_FOR_NULL_WITH_MSG( + service->service_name, "service has no service name", RMW_RET_INVALID_ARGUMENT); + + rmw_service_data_t * service_data = static_cast(service->data); + RMW_CHECK_FOR_NULL_WITH_MSG( + service->data, "Unable to retrieve service_data from service", RMW_RET_INVALID_ARGUMENT); + + z_owned_query_t query; + { + std::lock_guard lock(service_data->query_queue_mutex); + if (service_data->query_queue.empty()) { + return RMW_RET_OK; + } + query = service_data->query_queue.front(); + } + + const z_query_t loaned_query = z_query_loan(&query); + + // Get the sequence_number out of the attachment + z_attachment_t attachment = z_query_attachment(&loaned_query); + + int64_t sequence_number = get_sequence_num_from_attachment(&attachment); + if (sequence_number < 0) { + // get_sequence_number_from_attachment already set the error + return RMW_RET_ERROR; + } + + // Add this query to the map, so that rmw_send_response can quickly look it up later + { + std::lock_guard lock(service_data->sequence_to_query_map_mutex); + if (service_data->sequence_to_query_map.find(sequence_number) != + service_data->sequence_to_query_map.end()) + { + RMW_SET_ERROR_MSG("duplicate sequence number in the map"); + return RMW_RET_ERROR; + } + service_data->sequence_to_query_map.emplace(std::pair(sequence_number, query)); + } + + // DESERIALIZE MESSAGE ======================================================== + z_value_t payload_value = z_query_value(&loaned_query); + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer( + reinterpret_cast(const_cast(payload_value.payload.start)), + payload_value.payload.len); + + // Object that serializes the data + eprosima::fastcdr::Cdr deser( + fastbuffer, + eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, + eprosima::fastcdr::Cdr::DDS_CDR); + if (!service_data->request_type_support->deserialize_ros_message( + deser, + ros_request, + service_data->request_type_support_impl)) + { + RMW_SET_ERROR_MSG("could not deserialize ROS message"); + return RMW_RET_ERROR; + } + + // Fill in the request header. + // TODO(clalancette): We also need to fill in writer_guid, source_timestamp, + // and received_timestamp + request_header->request_id.sequence_number = sequence_number; + + service_data->query_queue.pop_front(); + + *taken = true; + + return RMW_RET_OK; } //============================================================================== @@ -1796,10 +2557,95 @@ rmw_send_response( rmw_request_id_t * request_header, void * ros_response) { - static_cast(service); - static_cast(request_header); - static_cast(ros_response); - return RMW_RET_UNSUPPORTED; + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_cpp", "[rmw_send_response]"); + + RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(service->data, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(request_header, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_response, RMW_RET_INVALID_ARGUMENT); + + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + service, + service->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + RMW_CHECK_FOR_NULL_WITH_MSG( + service->data, + "Unable to retrieve service_data from service", + RMW_RET_INVALID_ARGUMENT); + + rmw_service_data_t * service_data = static_cast(service->data); + + rcutils_allocator_t * allocator = &(service_data->context->options.allocator); + + size_t max_data_length = ( + service_data->response_type_support->get_estimated_serialized_size( + ros_response, service_data->response_type_support_impl)); + + // Init serialized message byte array + char * response_bytes = static_cast(allocator->allocate( + max_data_length, + allocator->state)); + if (!response_bytes) { + RMW_SET_ERROR_MSG("failed allocate response message bytes"); + return RMW_RET_ERROR; + } + auto free_response_bytes = rcpputils::make_scope_exit( + [response_bytes, allocator]() { + allocator->deallocate(response_bytes, allocator->state); + }); + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer(response_bytes, max_data_length); + + // Object that serializes the data + eprosima::fastcdr::Cdr ser( + fastbuffer, + eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, + eprosima::fastcdr::Cdr::DDS_CDR); + if (!service_data->response_type_support->serialize_ros_message( + ros_response, + ser, + service_data->response_type_support_impl)) + { + return RMW_RET_ERROR; + } + + size_t data_length = ser.getSerializedDataLength(); + + // Create the queryable payload + std::lock_guard lock(service_data->sequence_to_query_map_mutex); + auto query_it = service_data->sequence_to_query_map.find(request_header->sequence_number); + if (query_it == service_data->sequence_to_query_map.end()) { + RMW_SET_ERROR_MSG("Unable to find taken request. Report this bug."); + return RMW_RET_ERROR; + } + const z_query_t loaned_query = z_query_loan(&query_it->second); + z_query_reply_options_t options = z_query_reply_options_default(); + + // TODO(clalancette): We also need to fill in and send the writer_guid + z_owned_bytes_map_t map = create_map_and_set_sequence_num(request_header->sequence_number); + if (!z_check(map)) { + // create_map_and_set_sequence_num already set the error + return RMW_RET_ERROR; + } + auto free_attachment_map = rcpputils::make_scope_exit( + [&map]() { + z_bytes_map_drop(z_move(map)); + }); + + options.attachment = z_bytes_map_as_attachment(&map); + + options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); + z_query_reply( + &loaned_query, z_loan(service_data->keyexpr), reinterpret_cast( + response_bytes), data_length, &options); + + z_drop(z_move(query_it->second)); + service_data->sequence_to_query_map.erase(query_it); + return RMW_RET_OK; } //============================================================================== @@ -2057,6 +2903,29 @@ rmw_wait( } } + + if (services) { + // Go through each of the services and attach the wait set condition variable to them. + // That way they can wake it up if they are triggered while we are waiting. + for (size_t i = 0; i < services->service_count; ++i) { + auto serv_data = static_cast(services->services[i]); + if (serv_data != nullptr) { + serv_data->condition = &wait_set_data->condition_variable; + } + } + } + + if (clients) { + // Go through each of the clients and attach the wait set condition variable to them. + // That way they can wake it up if they are triggered while we are waiting. + for (size_t i = 0; i < clients->client_count; ++i) { + rmw_client_data_t * client_data = static_cast(clients->clients[i]); + if (client_data != nullptr) { + client_data->condition = &wait_set_data->condition_variable; + } + } + } + std::unique_lock lock(wait_set_data->condition_mutex); // According to the RMW documentation, if wait_timeout is NULL that means @@ -2102,6 +2971,37 @@ rmw_wait( } } + if (services) { + // Now detach the condition variable and mutex from each of the services + for (size_t i = 0; i < services->service_count; ++i) { + auto serv_data = static_cast(services->services[i]); + if (serv_data != nullptr) { + serv_data->condition = nullptr; + if (serv_data->query_queue.empty()) { + // Setting to nullptr lets rcl know that this service is not ready + services->services[i] = nullptr; + } + } + } + } + + if (clients) { + // Now detach the condition variable and mutex from each of the clients + for (size_t i = 0; i < clients->client_count; ++i) { + rmw_client_data_t * client_data = static_cast(clients->clients[i]); + if (client_data != nullptr) { + client_data->condition = nullptr; + // According to the documentation for rmw_wait in rmw.h, entries in the + // array that have *not* been triggered should be set to NULL + if (client_data->replies.empty()) { + // Setting to nullptr lets rcl know that this client is not ready + clients->clients[i] = nullptr; + } + } + } + } + + return RMW_RET_OK; } @@ -2277,10 +3177,12 @@ rmw_service_server_is_available( const rmw_client_t * client, bool * is_available) { + // TODO(francocipollone): Provide a proper implementation. + // We need graph cache information for this. + *is_available = true; static_cast(node); static_cast(client); - static_cast(is_available); - return RMW_RET_UNSUPPORTED; + return RMW_RET_OK; } //==============================================================================