Skip to content

Commit

Permalink
Use liveliness subscriber with history instead of liveliness get first.
Browse files Browse the repository at this point in the history
Signed-off-by: ChenYing Kuo <[email protected]>
  • Loading branch information
evshary committed Sep 11, 2024
1 parent 5bf7cc0 commit 5630ac3
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 46 deletions.
49 changes: 4 additions & 45 deletions rmw_zenoh_cpp/src/rmw_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) {
if ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(context->impl->session))) != RMW_RET_OK) {
++connection_attempts;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
if (ret != RMW_RET_OK) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
Expand Down Expand Up @@ -299,50 +299,6 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
const std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token(
context->actual_domain_id);

// Query router/liveliness participants to get graph information before this session was started.

// We create a blocking channel that is unbounded, ie. `bound` = 0, to receive
// replies for the zc_liveliness_get() call. This is necessary as if the `bound`
// is too low, the channel may starve the zenoh executor of its threads which
// would lead to deadlocks when trying to receive replies and block the
// execution here.
// The blocking channel will return when the sender end is closed which is
// the moment the query finishes.
// The non-blocking fifo exists only for the use case where we don't want to
// block the thread between responses (including the request termination response).
// In general, unless we want to cooperatively schedule other tasks on the same
// thread as reading the fifo, the blocking fifo will be more appropriate as
// the code will be simpler, and if we're just going to spin over the non-blocking
// reads until we obtain responses, we'll just be hogging CPU time by convincing
// the OS that we're doing actual work when it could instead park the thread.
z_owned_fifo_handler_reply_t handler;
z_owned_closure_reply_t closure;
z_fifo_channel_reply_new(&closure, &handler, SIZE_MAX);

z_view_keyexpr_t keyexpr;
z_view_keyexpr_from_str(&keyexpr, liveliness_str.c_str());
zc_liveliness_get(
z_loan(context->impl->session), z_loan(keyexpr),
z_move(closure), NULL);

z_owned_reply_t reply;
while (z_recv(z_loan(handler), &reply) == Z_OK) {
if (z_reply_is_ok(z_loan(reply))) {
const z_loaned_sample_t * sample = z_reply_ok(z_loan(reply));
z_view_string_t key_str;
z_keyexpr_as_view_string(z_sample_keyexpr(sample), &key_str);
std::string str(z_string_data(z_loan(key_str)), z_string_len(z_loan(key_str)));
// Ignore tokens from the same session to avoid race conditions from this
// query and the liveliness subscription.
context->impl->graph_cache->parse_put(str, true);
} else {
RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "[discovery] Received an error\n");
}
z_drop(z_move(reply));
}

z_drop(z_move(handler));

// TODO(Yadunund): Switch this to a liveliness subscriptions once the API is available.

// Uncomment and rely on #if #endif blocks to enable this feature when building with
Expand All @@ -356,9 +312,12 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
// &sub_options);
zc_liveliness_subscriber_options_t sub_options;
zc_liveliness_subscriber_options_default(&sub_options);
// Enable history option to get the old graph information before this session was started.
sub_options.history = true;
z_owned_closure_sample_t callback;
z_closure(&callback, graph_sub_data_handler, nullptr, context->impl);

z_view_keyexpr_t keyexpr;
z_view_keyexpr_from_str(&keyexpr, liveliness_str.c_str());

auto undeclare_z_sub = rcpputils::make_scope_exit(
Expand Down
2 changes: 1 addition & 1 deletion zenoh_c_vendor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ set(ZENOHC_CARGO_FLAGS "--no-default-features$<SEMICOLON>--features=shared-memor
# - https://github.com/eclipse-zenoh/zenoh-c/pull/620 (fix ze_querying_subscriber_get API to query newly discovered publishers)
ament_vendor(zenoh_c_vendor
VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git
VCS_VERSION ecad7f358fabdf55c4d6c35118415b5c457c8f20
VCS_VERSION edf0bee4f420179fe39ba3563f696c2c7923d56c
CMAKE_ARGS
"-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}"
"-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE"
Expand Down

0 comments on commit 5630ac3

Please sign in to comment.