Skip to content

Commit

Permalink
Use liveliness subscriber with history instead of liveliness get first.
Browse files Browse the repository at this point in the history
Signed-off-by: ChenYing Kuo <[email protected]>
  • Loading branch information
evshary committed Oct 1, 2024
1 parent afd7f63 commit c1c7781
Showing 1 changed file with 2 additions and 43 deletions.
45 changes: 2 additions & 43 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph()
// shared_ptr<Data> would live on until the graph_sub_data_handler callback.
zc_liveliness_subscriber_options_t sub_options;
zc_liveliness_subscriber_options_default(&sub_options);
// Enable history option to get the old graph information before this session was started.
sub_options.history = true;
z_owned_closure_sample_t callback;
z_closure(&callback, graph_sub_data_handler, nullptr, this);
z_view_keyexpr_t keyexpr;
Expand Down Expand Up @@ -233,49 +235,6 @@ rmw_context_impl_s::rmw_context_impl_s(
std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token(
domain_id);

// Query router/liveliness participants to get graph information before this session was started.
// We create a blocking channel that is unbounded, ie. `bound` = 0, to receive
// replies for the zc_liveliness_get() call. This is necessary as if the `bound`
// is too low, the channel may starve the zenoh executor of its threads which
// would lead to deadlocks when trying to receive replies and block the
// execution here.
// The blocking channel will return when the sender end is closed which is
// the moment the query finishes.
// The non-blocking fifo exists only for the use case where we don't want to
// block the thread between responses (including the request termination response).
// In general, unless we want to cooperatively schedule other tasks on the same
// thread as reading the fifo, the blocking fifo will be more appropriate as
// the code will be simpler, and if we're just going to spin over the non-blocking
// reads until we obtain responses, we'll just be hogging CPU time by convincing
// the OS that we're doing actual work when it could instead park the thread.
z_owned_fifo_handler_reply_t handler;
z_owned_closure_reply_t closure;
z_fifo_channel_reply_new(&closure, &handler, SIZE_MAX - 1);

z_view_keyexpr_t keyexpr;
z_view_keyexpr_from_str(&keyexpr, liveliness_str.c_str());
zc_liveliness_get(
z_loan(session), z_loan(keyexpr),
z_move(closure), NULL);

z_owned_reply_t reply;
while (z_recv(z_loan(handler), &reply) == Z_OK) {
if (z_reply_is_ok(z_loan(reply))) {
const z_loaned_sample_t * sample = z_reply_ok(z_loan(reply));
z_view_string_t key_str;
z_keyexpr_as_view_string(z_sample_keyexpr(sample), &key_str);
std::string str(z_string_data(z_loan(key_str)), z_string_len(z_loan(key_str)));
// Ignore tokens from the same session to avoid race conditions from this
// query and the liveliness subscription.
graph_cache->parse_put(str, true);
} else {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n");
}
z_drop(z_move(reply));
}
z_drop(z_move(handler));

// Initialize the shm manager if shared_memory is enabled in the config.
std::optional<z_owned_shm_provider_t> shm_provider = std::nullopt;
if (strncmp(z_string_data(z_loan(shm_enabled)), "true", z_string_len(z_loan(shm_enabled))) == 0) {
Expand Down

0 comments on commit c1c7781

Please sign in to comment.