diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index b32d35d6..8532e7db 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -220,8 +220,8 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) { if ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(context->impl->session))) != RMW_RET_OK) { ++connection_attempts; + std::this_thread::sleep_for(std::chrono::seconds(1)); } - std::this_thread::sleep_for(std::chrono::seconds(1)); } if (ret != RMW_RET_OK) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( @@ -299,50 +299,6 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) const std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token( context->actual_domain_id); - // Query router/liveliness participants to get graph information before this session was started. - - // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive - // replies for the zc_liveliness_get() call. This is necessary as if the `bound` - // is too low, the channel may starve the zenoh executor of its threads which - // would lead to deadlocks when trying to receive replies and block the - // execution here. - // The blocking channel will return when the sender end is closed which is - // the moment the query finishes. - // The non-blocking fifo exists only for the use case where we don't want to - // block the thread between responses (including the request termination response). - // In general, unless we want to cooperatively schedule other tasks on the same - // thread as reading the fifo, the blocking fifo will be more appropriate as - // the code will be simpler, and if we're just going to spin over the non-blocking - // reads until we obtain responses, we'll just be hogging CPU time by convincing - // the OS that we're doing actual work when it could instead park the thread. - z_owned_fifo_handler_reply_t handler; - z_owned_closure_reply_t closure; - z_fifo_channel_reply_new(&closure, &handler, SIZE_MAX - 1); - - z_view_keyexpr_t keyexpr; - z_view_keyexpr_from_str(&keyexpr, liveliness_str.c_str()); - zc_liveliness_get( - z_loan(context->impl->session), z_loan(keyexpr), - z_move(closure), NULL); - - z_owned_reply_t reply; - while (z_recv(z_loan(handler), &reply) == Z_OK) { - if (z_reply_is_ok(z_loan(reply))) { - const z_loaned_sample_t * sample = z_reply_ok(z_loan(reply)); - z_view_string_t key_str; - z_keyexpr_as_view_string(z_sample_keyexpr(sample), &key_str); - std::string str(z_string_data(z_loan(key_str)), z_string_len(z_loan(key_str))); - // Ignore tokens from the same session to avoid race conditions from this - // query and the liveliness subscription. - context->impl->graph_cache->parse_put(str, true); - } else { - RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "[discovery] Received an error\n"); - } - z_drop(z_move(reply)); - } - - z_drop(z_move(handler)); - // TODO(Yadunund): Switch this to a liveliness subscriptions once the API is available. // Uncomment and rely on #if #endif blocks to enable this feature when building with @@ -356,9 +312,12 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) // &sub_options); zc_liveliness_subscriber_options_t sub_options; zc_liveliness_subscriber_options_default(&sub_options); + // Enable history option to get the old graph information before this session was started. + sub_options.history = true; z_owned_closure_sample_t callback; z_closure(&callback, graph_sub_data_handler, nullptr, context->impl); + z_view_keyexpr_t keyexpr; z_view_keyexpr_from_str(&keyexpr, liveliness_str.c_str()); auto undeclare_z_sub = rcpputils::make_scope_exit( diff --git a/zenoh_c_vendor/CMakeLists.txt b/zenoh_c_vendor/CMakeLists.txt index 6e648da3..4621afca 100644 --- a/zenoh_c_vendor/CMakeLists.txt +++ b/zenoh_c_vendor/CMakeLists.txt @@ -26,7 +26,7 @@ set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=shared-memor # - https://github.com/eclipse-zenoh/zenoh-c/pull/620 (fix ze_querying_subscriber_get API to query newly discovered publishers) ament_vendor(zenoh_c_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git - VCS_VERSION ecad7f358fabdf55c4d6c35118415b5c457c8f20 + VCS_VERSION edf0bee4f420179fe39ba3563f696c2c7923d56c CMAKE_ARGS "-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}" "-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE"