diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 0d288964..780d644c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -528,7 +528,12 @@ void client_data_handler(const z_loaned_reply_t *reply, void *data) { return; } - if (!z_reply_is_ok(reply)) { + if (z_reply_is_ok(reply)) + { + z_owned_reply_t owned_reply; + z_reply_clone(&owned_reply, reply); + client_data->add_new_reply(std::make_unique(&owned_reply)); + } else { z_view_string_t keystr; z_keyexpr_as_view_string(z_loan(client_data->keyexpr), &keystr); const z_loaned_reply_err_t *err = z_reply_err(reply); @@ -545,15 +550,6 @@ void client_data_handler(const z_loaned_reply_t *reply, void *data) { z_drop(z_move(err_str)); return; } - z_owned_reply_t owned_reply; - z_reply_clone(&owned_reply, reply); - - if (!z_reply_check(&owned_reply)) { - RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "z_reply_check returned False"); - return; - } - - client_data->add_new_reply(std::make_unique(&owned_reply)); } void client_data_drop(void *data) { diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 2a8f8772..7526b7a7 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -355,25 +355,14 @@ rmw_ret_t rmw_init(const rmw_init_options_t *options, rmw_context_t *context) { z_view_keyexpr_from_str(&keyexpr, liveliness_str.c_str()); - zc_liveliness_declare_subscriber( + if(zc_liveliness_declare_subscriber( &context->impl->graph_subscriber, z_loan(context->impl->session), z_loan(keyexpr), - z_move(callback), &sub_options); - - const z_loaned_keyexpr_t *sub_ke = z_subscriber_keyexpr(z_loan(context->impl->graph_subscriber)); - z_view_string_t sub_keyexpr; - z_keyexpr_as_view_string(sub_ke, &sub_keyexpr); - - - auto undeclare_z_sub = rcpputils::make_scope_exit([context]() { - z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); - }); - if (!z_check(context->impl->graph_subscriber)) { + z_move(callback), &sub_options) != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return RMW_RET_ERROR; } - undeclare_z_sub.cancel(); close_session.cancel(); destruct_guard_condition_data.cancel(); impl_destructor.cancel(); diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 662ff494..7f18f0b7 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -85,10 +85,11 @@ std::string strip_slashes(const char *const str) { // copies the old string into it. If this becomes a performance problem, we // could consider modifying the topic_name in place. But this means we need to // be much more careful about who owns the string. -z_owned_keyexpr_t ros_topic_name_to_zenoh_key(const std::size_t domain_id, - const char *const topic_name, - const char *const topic_type, - const char *const type_hash) { +bool ros_topic_name_to_zenoh_key(z_owned_keyexpr_t* out_keyexpr, + const std::size_t domain_id, + const char *const topic_name, + const char *const topic_type, + const char *const type_hash) { std::string keyexpr_str = std::to_string(domain_id); keyexpr_str += "/"; keyexpr_str += strip_slashes(topic_name); @@ -98,9 +99,7 @@ z_owned_keyexpr_t ros_topic_name_to_zenoh_key(const std::size_t domain_id, keyexpr_str += type_hash; // TODO(yuyuan): use z_view_keyexpr_t? - z_owned_keyexpr_t keyexpr; - z_keyexpr_from_str(&keyexpr, keyexpr_str.c_str()); - return keyexpr; + return z_keyexpr_from_str(out_keyexpr, keyexpr_str.c_str()) == Z_OK; } //============================================================================== @@ -311,11 +310,6 @@ rmw_node_t *rmw_create_node(rmw_context_t *context, const char *name, auto free_token = rcpputils::make_scope_exit( [node_data]() { z_drop(z_move(node_data->token)); }); - if (!z_check(node_data->token)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Unable to create liveliness token for the node."); - return nullptr; - } node->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; node->context = context; @@ -476,10 +470,6 @@ rmw_publisher_t *rmw_create_publisher( return nullptr); RMW_CHECK_FOR_NULL_WITH_MSG(context_impl->enclave, "expected initialized enclave", return nullptr); - if (!z_check(context_impl->session)) { - RMW_SET_ERROR_MSG("zenoh session is invalid"); - return nullptr; - } rcutils_allocator_t *allocator = &node->context->options.allocator; @@ -580,17 +570,17 @@ rmw_publisher_t *rmw_create_publisher( allocator->deallocate(type_hash_c_str, allocator->state); }); - z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key( + z_owned_keyexpr_t keyexpr; + if (!ros_topic_name_to_zenoh_key( + &keyexpr, node->context->actual_domain_id, topic_name, - publisher_data->type_support->get_name(), type_hash_c_str); - z_view_string_t str; - z_keyexpr_as_view_string(z_loan(keyexpr), &str); + publisher_data->type_support->get_name(), type_hash_c_str)) { + RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); + return nullptr; + } + auto always_free_ros_keyexpr = rcpputils::make_scope_exit( [&keyexpr]() { z_keyexpr_drop(z_move(keyexpr)); }); - if (!z_keyexpr_check(&keyexpr)) { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return nullptr; - } // Create a Publication Cache if durability is transient_local. if (publisher_data->adapted_qos_profile.durability == @@ -794,15 +784,14 @@ rmw_return_loaned_message_from_publisher(const rmw_publisher_t *publisher, } namespace { -z_owned_bytes_t -create_map_and_set_sequence_num(int64_t sequence_number, +bool +create_map_and_set_sequence_num(z_owned_bytes_t* out_bytes, + int64_t sequence_number, uint8_t gid[RMW_GID_STORAGE_SIZE]) { // z_owned_slice_map_t map; // z_slice_map_new(&map); - z_owned_bytes_t bytes; - // auto free_attachment_map = // rcpputils::make_scope_exit([&map]() { z_drop(z_move(map)); }); @@ -845,14 +834,13 @@ create_map_and_set_sequence_num(int64_t sequence_number, rmw_zenoh_cpp::attachement_data_t data(sequence_number, source_timestamp, gid); - if (data.serialize_to_zbytes(&bytes)) { + if (data.serialize_to_zbytes(out_bytes)) { RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Failed to serialize the attachment"); - z_bytes_null(&bytes); - return bytes; + return false; } - return bytes; + return true; } } // namespace @@ -916,7 +904,7 @@ rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message, &alloc, z_loan(provider), SHM_BUF_OK_SIZE, alignment); - if (z_check(alloc.buf)) { + if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { shmbuf = std::make_optional(alloc.buf); msg_bytes = reinterpret_cast(z_shm_mut_data_mut(z_loan_mut(alloc.buf))); @@ -951,9 +939,8 @@ rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message, int64_t sequence_number = publisher_data->get_next_sequence_number(); - z_owned_bytes_t attachment = - create_map_and_set_sequence_num(sequence_number, publisher_data->pub_gid); - if (!z_check(attachment)) { + z_owned_bytes_t attachment; + if (!create_map_and_set_sequence_num(&attachment, sequence_number, publisher_data->pub_gid)) { // create_map_and_set_sequence_num already set the error return RMW_RET_ERROR; } @@ -1073,10 +1060,8 @@ rmw_ret_t rmw_publish_serialized_message( uint64_t sequence_number = publisher_data->get_next_sequence_number(); - z_owned_bytes_t attachment = - create_map_and_set_sequence_num(sequence_number, publisher_data->pub_gid); - - if (!z_check(attachment)) { + z_owned_bytes_t attachment; + if (!create_map_and_set_sequence_num(&attachment, sequence_number, publisher_data->pub_gid)) { // create_map_and_set_sequence_num already set the error return RMW_RET_ERROR; } @@ -1126,10 +1111,6 @@ rmw_ret_t rmw_publisher_assert_liveliness(const rmw_publisher_t *publisher) { static_cast(publisher->data); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); - if (!zc_liveliness_token_check(&pub_data->token)) { - return RMW_RET_ERROR; - } - return RMW_RET_OK; } @@ -1275,10 +1256,6 @@ rmw_subscription_t *rmw_create_subscription( return nullptr); RMW_CHECK_FOR_NULL_WITH_MSG(context_impl->enclave, "expected initialized enclave", return nullptr); - if (!z_check(context_impl->session)) { - RMW_SET_ERROR_MSG("zenoh session is invalid"); - return nullptr; - } rcutils_allocator_t *allocator = &node->context->options.allocator; @@ -1382,15 +1359,16 @@ rmw_subscription_t *rmw_create_subscription( // with Zenoh; after this, callbacks may come in at any time. z_owned_closure_sample_t callback; z_closure(&callback, rmw_zenoh_cpp::sub_data_handler, nullptr, sub_data); - z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key( + z_owned_keyexpr_t keyexpr; + if (!ros_topic_name_to_zenoh_key( + &keyexpr, node->context->actual_domain_id, topic_name, - sub_data->type_support->get_name(), type_hash_c_str); - auto always_free_ros_keyexpr = rcpputils::make_scope_exit( - [&keyexpr]() { z_keyexpr_drop(z_move(keyexpr)); }); - if (!z_keyexpr_check(&keyexpr)) { + sub_data->type_support->get_name(), type_hash_c_str)) { RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return nullptr; } + auto always_free_ros_keyexpr = rcpputils::make_scope_exit( + [&keyexpr]() { z_keyexpr_drop(z_move(keyexpr)); }); // // TODO(yuyuan): owned_key_str seems to be useless here? // // Instantiate the subscription with suitable options depending on the @@ -1484,14 +1462,8 @@ rmw_subscription_t *rmw_create_subscription( RMW_SET_ERROR_MSG("unable to create zenoh keyexpr for liveness."); return nullptr; } - zc_liveliness_declare_token(&sub_data->token, z_loan(context_impl->session), - z_loan(token_keyexpr), NULL); - auto free_token = rcpputils::make_scope_exit([sub_data]() { - if (sub_data != nullptr) { - z_drop(z_move(sub_data->token)); - } - }); - if (!z_check(sub_data->token)) { + if(zc_liveliness_declare_token(&sub_data->token, z_loan(context_impl->session), + z_loan(token_keyexpr), NULL) != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the subscription."); @@ -1500,7 +1472,6 @@ rmw_subscription_t *rmw_create_subscription( rmw_subscription->data = sub_data; - free_token.cancel(); undeclare_z_sub.cancel(); free_topic_name.cancel(); destruct_msg_type_support.cancel(); @@ -1981,10 +1952,7 @@ rmw_client_t *rmw_create_client( return nullptr); RMW_CHECK_FOR_NULL_WITH_MSG(context_impl->enclave, "expected initialized enclave", return nullptr); - if (!z_check(context_impl->session)) { - RMW_SET_ERROR_MSG("zenoh session is invalid"); - return nullptr; - } + RMW_CHECK_ARGUMENT_FOR_NULL(node->data, nullptr); const rmw_zenoh_cpp::rmw_node_data_t *node_data = static_cast(node->data); @@ -2155,16 +2123,16 @@ rmw_client_t *rmw_create_client( allocator->deallocate(type_hash_c_str, allocator->state); }); - client_data->keyexpr = ros_topic_name_to_zenoh_key( + if(!ros_topic_name_to_zenoh_key( + &client_data->keyexpr, node->context->actual_domain_id, rmw_client->service_name, - service_type.c_str(), type_hash_c_str); - auto free_ros_keyexpr = rcpputils::make_scope_exit( - [client_data]() { z_keyexpr_drop(z_move(client_data->keyexpr)); }); - if (!z_keyexpr_check(&client_data->keyexpr)) { + service_type.c_str(), type_hash_c_str)) { RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return nullptr; } - + auto free_ros_keyexpr = rcpputils::make_scope_exit( + [client_data]() { z_keyexpr_drop(z_move(client_data->keyexpr)); }); + client_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( z_info_zid(z_loan(node->context->impl->session)), std::to_string(node_data->id), @@ -2184,31 +2152,24 @@ rmw_client_t *rmw_create_client( } z_owned_keyexpr_t keyexpr; z_keyexpr_from_str(&keyexpr, client_data->entity->keyexpr().c_str()); - zc_liveliness_declare_token(&client_data->token, + if (zc_liveliness_declare_token(&client_data->token, z_loan(node->context->impl->session), - z_loan(keyexpr), NULL); + z_loan(keyexpr), NULL) != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the client."); + return nullptr; + } // WARN(yuyuan): z_view_keyexpr_t would fail // z_view_keyexpr_t keyexpr; // z_view_keyexpr_from_str(&keyexpr, client_data->entity->keyexpr().c_str()); // zc_liveliness_declare_token(&client_data->token, // z_loan(node->context->impl->session), // z_loan(keyexpr), NULL); - auto free_token = rcpputils::make_scope_exit([client_data]() { - if (client_data != nullptr) { - z_drop(z_move(client_data->token)); - } - }); - if (!z_check(client_data->token)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Unable to create liveliness token for the client."); - return nullptr; - } - z_drop(z_move(keyexpr)); rmw_client->data = client_data; - free_token.cancel(); free_rmw_client.cancel(); free_client_data.cancel(); destruct_request_type_support.cancel(); @@ -2334,9 +2295,8 @@ rmw_ret_t rmw_send_request(const rmw_client_t *client, const void *ros_request, z_get_options_t opts; z_get_options_default(&opts); - z_owned_bytes_t attachment = - create_map_and_set_sequence_num(*sequence_id, client_data->client_gid); - if (!z_check(attachment)) { + z_owned_bytes_t attachment; + if (!create_map_and_set_sequence_num(&attachment, *sequence_id, client_data->client_gid)) { // create_map_and_set_sequence_num already set the error return RMW_RET_ERROR; } @@ -2550,10 +2510,6 @@ rmw_service_t *rmw_create_service( return nullptr); RMW_CHECK_FOR_NULL_WITH_MSG(context_impl->enclave, "expected initialized enclave", return nullptr); - if (!z_check(context_impl->session)) { - RMW_SET_ERROR_MSG("zenoh session is invalid"); - return nullptr; - } // SERVICE DATA ============================================================== rcutils_allocator_t *allocator = &node->context->options.allocator; @@ -2698,20 +2654,19 @@ rmw_service_t *rmw_create_service( allocator->deallocate(type_hash_c_str, allocator->state); }); - service_data->keyexpr = ros_topic_name_to_zenoh_key( + if(!ros_topic_name_to_zenoh_key( + &service_data->keyexpr, node->context->actual_domain_id, rmw_service->service_name, - service_type.c_str(), type_hash_c_str); + service_type.c_str(), type_hash_c_str)) { + RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); + return nullptr; + } auto free_ros_keyexpr = rcpputils::make_scope_exit([service_data]() { if (service_data) { z_drop(z_move(service_data->keyexpr)); } }); - if (!z_keyexpr_check(&service_data->keyexpr)) { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return nullptr; - } - z_owned_closure_query_t callback; z_closure(&callback, rmw_zenoh_cpp::service_data_handler, nullptr, service_data); @@ -3009,9 +2964,9 @@ rmw_ret_t rmw_send_response(const rmw_service_t *service, z_query_reply_options_t options; z_query_reply_options_default(&options); - z_owned_bytes_t attachment = create_map_and_set_sequence_num( - request_header->sequence_number, request_header->writer_guid); - if (!z_check(attachment)) { + z_owned_bytes_t attachment; + if (!create_map_and_set_sequence_num(&attachment, + request_header->sequence_number, request_header->writer_guid)) { // create_map_and_set_sequence_num already set the error return RMW_RET_ERROR; }