Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use liveliness subscriber with history instead of liveliness get first. #17

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 2 additions & 43 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph()
// shared_ptr<Data> would live on until the graph_sub_data_handler callback.
zc_liveliness_subscriber_options_t sub_options;
zc_liveliness_subscriber_options_default(&sub_options);
// Enable history option to get the old graph information before this session was started.
sub_options.history = true;
z_owned_closure_sample_t callback;
z_closure(&callback, graph_sub_data_handler, nullptr, this);
z_view_keyexpr_t keyexpr;
Expand Down Expand Up @@ -233,49 +235,6 @@ rmw_context_impl_s::rmw_context_impl_s(
std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token(
domain_id);

// Query router/liveliness participants to get graph information before this session was started.
// We create a blocking channel that is unbounded, ie. `bound` = 0, to receive
// replies for the zc_liveliness_get() call. This is necessary as if the `bound`
// is too low, the channel may starve the zenoh executor of its threads which
// would lead to deadlocks when trying to receive replies and block the
// execution here.
// The blocking channel will return when the sender end is closed which is
// the moment the query finishes.
// The non-blocking fifo exists only for the use case where we don't want to
// block the thread between responses (including the request termination response).
// In general, unless we want to cooperatively schedule other tasks on the same
// thread as reading the fifo, the blocking fifo will be more appropriate as
// the code will be simpler, and if we're just going to spin over the non-blocking
// reads until we obtain responses, we'll just be hogging CPU time by convincing
// the OS that we're doing actual work when it could instead park the thread.
z_owned_fifo_handler_reply_t handler;
z_owned_closure_reply_t closure;
z_fifo_channel_reply_new(&closure, &handler, SIZE_MAX - 1);

z_view_keyexpr_t keyexpr;
z_view_keyexpr_from_str(&keyexpr, liveliness_str.c_str());
zc_liveliness_get(
z_loan(session), z_loan(keyexpr),
z_move(closure), NULL);

z_owned_reply_t reply;
while (z_recv(z_loan(handler), &reply) == Z_OK) {
if (z_reply_is_ok(z_loan(reply))) {
const z_loaned_sample_t * sample = z_reply_ok(z_loan(reply));
z_view_string_t key_str;
z_keyexpr_as_view_string(z_sample_keyexpr(sample), &key_str);
std::string str(z_string_data(z_loan(key_str)), z_string_len(z_loan(key_str)));
// Ignore tokens from the same session to avoid race conditions from this
// query and the liveliness subscription.
graph_cache->parse_put(str, true);
} else {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n");
}
z_drop(z_move(reply));
}
z_drop(z_move(handler));

// Initialize the shm manager if shared_memory is enabled in the config.
std::optional<z_owned_shm_provider_t> shm_provider = std::nullopt;
if (strncmp(z_string_data(z_loan(shm_enabled)), "true", z_string_len(z_loan(shm_enabled))) == 0) {
Expand Down