Skip to content

Commit

Permalink
Merge branch 'ros2:rolling' into rolling
Browse files Browse the repository at this point in the history
  • Loading branch information
imstevenpmwork authored Dec 12, 2024
2 parents 1bea19b + a7187eb commit ebfada8
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 31 deletions.
35 changes: 22 additions & 13 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
// limitations under the License.

#include <algorithm>
#include <array>
#include <functional>
#include <limits>
#include <memory>
#include <mutex>
#include <optional>
Expand Down Expand Up @@ -209,29 +211,32 @@ void GraphCache::handle_matched_events_for_put(
EntityEventMap local_entities_with_events = {};
// The entity added may be local with callbacks registered but there
// may be other local entities in the graph that are matched.
std::size_t match_count_for_entity = 0;
int32_t match_count_for_entity = 0;
for (const auto & [_, topic_data_ptr] : topic_qos_map) {
if (is_pub) {
// Count the number of matching subs for each set of qos settings.
match_count_for_entity += topic_data_ptr->subs_.size();
std::size_t sub_size = topic_data_ptr->subs_.size();
if (sub_size > std::numeric_limits<int32_t>::max()) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Too many subscriptions on publisher; assuming 0. Report this bug.");
sub_size = 0;
}
match_count_for_entity += static_cast<int32_t>(sub_size);
// Also iterate through the subs to check if any are local and if update event counters.
for (liveliness::ConstEntityPtr sub_entity : topic_data_ptr->subs_) {
// Update counters only if key expressions match.
if (entity->topic_info()->topic_keyexpr_ ==
sub_entity->topic_info().value().topic_keyexpr_)
{
update_event_counters(
topic_info.name_,
ZENOH_EVENT_SUBSCRIPTION_MATCHED,
static_cast<int32_t>(1));
update_event_counters(topic_info.name_, ZENOH_EVENT_SUBSCRIPTION_MATCHED, 1);
if (is_entity_local(*sub_entity)) {
local_entities_with_events[sub_entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED);
}
}
}
// Update event counters for the new entity->
update_event_counters(
topic_info.name_,
update_event_counters(topic_info.name_,
ZENOH_EVENT_PUBLICATION_MATCHED,
match_count_for_entity);
if (is_entity_local(*entity) && match_count_for_entity > 0) {
Expand All @@ -240,17 +245,21 @@ void GraphCache::handle_matched_events_for_put(
} else {
// Entity is a sub.
// Count the number of matching pubs for each set of qos settings.
match_count_for_entity += topic_data_ptr->pubs_.size();
std::size_t pub_size = topic_data_ptr->pubs_.size();
if (pub_size > std::numeric_limits<int32_t>::max()) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Too many publishers on subscription; assuming 0. Report this bug.");
pub_size = 0;
}
match_count_for_entity += static_cast<int32_t>(pub_size);
// Also iterate through the pubs to check if any are local and if update event counters.
for (liveliness::ConstEntityPtr pub_entity : topic_data_ptr->pubs_) {
// Update counters only if key expressions match.
if (entity->topic_info()->topic_keyexpr_ ==
pub_entity->topic_info().value().topic_keyexpr_)
{
update_event_counters(
topic_info.name_,
ZENOH_EVENT_PUBLICATION_MATCHED,
static_cast<int32_t>(1));
update_event_counters(topic_info.name_, ZENOH_EVENT_PUBLICATION_MATCHED, 1);
if (is_entity_local(*pub_entity)) {
local_entities_with_events[pub_entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED);
}
Expand Down
7 changes: 6 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,15 @@ class rmw_context_impl_s::Data final
throw std::runtime_error("Error configuring Zenoh session.");
}

#ifndef _MSC_VER
// Check if shm is enabled.
z_owned_string_t shm_enabled;
zc_config_get_from_str(z_loan(config), Z_CONFIG_SHARED_MEMORY_KEY, &shm_enabled);
auto always_free_shm_enabled = rcpputils::make_scope_exit(
[&shm_enabled]() {
z_drop(z_move(shm_enabled));
});
#endif

// Initialize the zenoh session.
z_owned_session_t raw_session;
Expand Down Expand Up @@ -172,6 +174,7 @@ class rmw_context_impl_s::Data final

// Initialize the shm manager if shared_memory is enabled in the config.
shm_provider_ = std::nullopt;
#ifndef _MSC_VER
if (strncmp(
z_string_data(z_loan(shm_enabled)),
"true",
Expand All @@ -195,7 +198,7 @@ class rmw_context_impl_s::Data final
z_drop(z_move(shm_provider_.value()));
}
});

#endif
graph_guard_condition_ = std::make_unique<rmw_guard_condition_t>();
graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
graph_guard_condition_->data = &guard_condition_data_;
Expand All @@ -222,7 +225,9 @@ class rmw_context_impl_s::Data final
});

close_session.cancel();
#ifndef _MSC_VER
free_shm_provider.cancel();
#endif
undeclare_z_sub.cancel();
}

Expand Down
3 changes: 1 addition & 2 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include "rmw/types.h"

///=============================================================================
class rmw_context_impl_s final
struct rmw_context_impl_s final
{
public:
// Constructor that internally initializes the Zenoh session and other artifacts.
Expand Down Expand Up @@ -96,5 +96,4 @@ class rmw_context_impl_s final
std::shared_ptr<Data> data_{nullptr};
};


#endif // DETAIL__RMW_CONTEXT_IMPL_S_HPP_
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/detail/zenoh_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ std::optional<uint64_t> zenoh_router_check_attempts()
}
// If the environment variable contains a value, handle it accordingly.
if (envar_value[0] != '\0') {
const auto read_value = std::strtol(envar_value, nullptr, 10);
const int64_t read_value = std::strtoll(envar_value, nullptr, 10);
if (read_value > 0) {
return read_value;
} else if (read_value < 0) {
Expand Down
12 changes: 6 additions & 6 deletions rmw_zenoh_cpp/src/rmw_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,16 +220,16 @@ rmw_take_event(
case rmw_zenoh_cpp::ZENOH_EVENT_REQUESTED_QOS_INCOMPATIBLE: {
auto ei = static_cast<rmw_requested_qos_incompatible_event_status_t *>(event_info);
RMW_CHECK_ARGUMENT_FOR_NULL(ei, RMW_RET_INVALID_ARGUMENT);
ei->total_count = st->total_count;
ei->total_count_change = st->total_count_change;
ei->total_count = static_cast<int32_t>(st->total_count);
ei->total_count_change = static_cast<int32_t>(st->total_count_change);
*taken = true;
return RMW_RET_OK;
}
case rmw_zenoh_cpp::ZENOH_EVENT_MESSAGE_LOST: {
auto ei = static_cast<rmw_message_lost_status_t *>(event_info);
RMW_CHECK_ARGUMENT_FOR_NULL(ei, RMW_RET_INVALID_ARGUMENT);
ei->total_count = st->total_count;
ei->total_count_change = st->total_count_change;
ei->total_count = static_cast<int32_t>(st->total_count);
ei->total_count_change = static_cast<int32_t>(st->total_count_change);
*taken = true;
return RMW_RET_OK;
}
Expand All @@ -247,8 +247,8 @@ rmw_take_event(
case rmw_zenoh_cpp::ZENOH_EVENT_OFFERED_QOS_INCOMPATIBLE: {
auto ei = static_cast<rmw_offered_qos_incompatible_event_status_t *>(event_info);
RMW_CHECK_ARGUMENT_FOR_NULL(ei, RMW_RET_INVALID_ARGUMENT);
ei->total_count = st->total_count;
ei->total_count_change = st->total_count_change;
ei->total_count = static_cast<int32_t>(st->total_count);
ei->total_count_change = static_cast<int32_t>(st->total_count_change);
*taken = true;
return RMW_RET_OK;
}
Expand Down
5 changes: 1 addition & 4 deletions rmw_zenoh_cpp/src/rmw_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,7 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
}
});

// If not already defined, set the logging environment variable for Zenoh sessions
// to warning level by default.
// TODO(Yadunund): Switch to rcutils_get_env once it supports not overwriting values.
if (setenv(ZENOH_LOG_ENV_VAR_STR, ZENOH_LOG_WARN_LEVEL_STR, 0) != 0) {
if (!rcutils_set_env_overwrite(ZENOH_LOG_ENV_VAR_STR, ZENOH_LOG_WARN_LEVEL_STR, 0)) {
RMW_SET_ERROR_MSG("Error configuring Zenoh logging.");
return RMW_RET_ERROR;
}
Expand Down
6 changes: 2 additions & 4 deletions rmw_zenoh_cpp/src/zenohd/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "rmw/error_handling.h"

#include "rcpputils/scope_exit.hpp"
#include "rcutils/env.h"

static bool running = true;
static std::mutex run_mutex;
Expand Down Expand Up @@ -60,10 +61,7 @@ int main(int argc, char ** argv)
(void)argc;
(void)argv;

// If not already defined, set the logging environment variable for Zenoh router
// to info level by default.
// TODO(Yadunund): Switch to rcutils_get_env once it supports not overwriting values.
if (setenv(ZENOH_LOG_ENV_VAR_STR, ZENOH_LOG_INFO_LEVEL_STR, 0) != 0) {
if (!rcutils_set_env_overwrite(ZENOH_LOG_ENV_VAR_STR, ZENOH_LOG_INFO_LEVEL_STR, 0)) {
RMW_SET_ERROR_MSG("Error configuring Zenoh logging.");
return 1;
}
Expand Down

0 comments on commit ebfada8

Please sign in to comment.