Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support RMW_EVENT_MESSAGE_LOST #169

Merged
merged 1 commit into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions rmw_zenoh_cpp/src/detail/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,7 @@ class EventsManager
mutable std::mutex event_condition_mutex_;
/// Condition variable to attach for event notifications.
std::condition_variable * event_conditions_[ZENOH_EVENT_ID_MAX + 1]{nullptr};
/// User callback that can be set via data_callback_mgr.set_callback().
rmw_event_callback_t callback_ {nullptr};
/// User data that should be passed to the user callback.
const void * user_data_ {nullptr};
/// Count for
size_t unread_count_ {0};

rmw_event_callback_t event_callback_[ZENOH_EVENT_ID_MAX + 1] {nullptr};
const void * event_data_[ZENOH_EVENT_ID_MAX + 1] {nullptr};
size_t event_unread_count_[ZENOH_EVENT_ID_MAX + 1] {0};
Expand Down
3 changes: 1 addition & 2 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ static const char PUB_STR[] = "MP";
static const char SUB_STR[] = "MS";
static const char SRV_STR[] = "SS";
static const char CLI_STR[] = "SC";
static const char EMPTY_NAMESPACE_REPLACEMENT = '_';
static const char KEYEXPR_DELIMITER = '/';
static const char SLASH_REPLACEMENT = '%';
static const char QOS_DELIMITER = ':';
Expand Down Expand Up @@ -261,7 +260,7 @@ Entity::Entity(
keyexpr_parts[KeyexprIndex::Id] = id_;
keyexpr_parts[KeyexprIndex::EntityStr] = entity_to_str.at(type_);
// An empty namespace from rcl will contain "/" but zenoh does not allow keys with "//".
// Hence we add an "_" to denote an empty namespace such that splitting the key
// Hence we mangle the empty namespace such that splitting the key
// will always result in 5 parts.
keyexpr_parts[KeyexprIndex::Namespace] = mangle_name(node_info_.ns_);
keyexpr_parts[KeyexprIndex::NodeName] = mangle_name(node_info_.name_);
Expand Down
48 changes: 36 additions & 12 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,24 @@
#include "attachment_helpers.hpp"
#include "rmw_data_types.hpp"

///=============================================================================
static size_t hash_gid(const uint8_t * gid)
{
std::stringstream hash_str;
hash_str << std::hex;
size_t i = 0;
for (; i < (RMW_GID_STORAGE_SIZE - 1); i++) {
hash_str << static_cast<int>(gid[i]);
}
return std::hash<std::string>{}(hash_str.str());
}

///=============================================================================
static size_t hash_gid(const rmw_request_id_t & request_id)
{
return hash_gid(request_id.writer_guid);
}

///=============================================================================
size_t rmw_context_impl_s::get_next_entity_id()
{
Expand Down Expand Up @@ -135,7 +153,24 @@ void rmw_subscription_data_t::add_new_message(
}
}

// TODO(Yadunund): Check for ZENOH_EVENT_MESSAGE_LOST.
// Check for messages lost if the new sequence number is not monotonically increasing.
const size_t gid_hash = hash_gid(msg->publisher_gid);
auto last_known_pub_it = last_known_published_msg_.find(gid_hash);
if (last_known_pub_it != last_known_published_msg_.end()) {
const int64_t seq_increment = std::abs(msg->sequence_number - last_known_pub_it->second);
if (seq_increment > 1) {
const size_t num_msg_lost = seq_increment - 1;
total_messages_lost_ += num_msg_lost;
auto event_status = std::make_unique<rmw_zenoh_event_status_t>();
event_status->total_count_change = num_msg_lost;
event_status->total_count = total_messages_lost_;
events_mgr.add_new_event(
rmw_zenoh_cpp::ZENOH_EVENT_MESSAGE_LOST,
std::move(event_status));
}
}
// Always update the last known sequence number for the publisher
last_known_published_msg_[gid_hash] = msg->sequence_number;

message_queue_.emplace_back(std::move(msg));

Expand Down Expand Up @@ -211,17 +246,6 @@ void rmw_service_data_t::add_new_query(std::unique_ptr<ZenohQuery> query)
notify();
}

static size_t hash_gid(const rmw_request_id_t & request_id)
{
std::stringstream hash_str;
hash_str << std::hex;
size_t i = 0;
for (; i < (RMW_GID_STORAGE_SIZE - 1); i++) {
hash_str << static_cast<int>(request_id.writer_guid[i]);
}
return std::hash<std::string>{}(hash_str.str());
}

///=============================================================================
bool rmw_service_data_t::add_to_query_map(
const rmw_request_id_t & request_id, std::unique_ptr<ZenohQuery> query)
Expand Down
4 changes: 4 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ class rmw_subscription_data_t final
std::deque<std::unique_ptr<saved_msg_data>> message_queue_;
mutable std::mutex message_queue_mutex_;

// Map GID of a publisher to the sequence number of the message it published.
std::unordered_map<size_t, int64_t> last_known_published_msg_;
size_t total_messages_lost_{0};

void notify();

std::condition_variable * condition_{nullptr};
Expand Down
Loading