Skip to content

Commit

Permalink
Check for lost messages in sub_data_handler (#169)
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund authored May 16, 2024
1 parent 369dfad commit e7aad9b
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 20 deletions.
7 changes: 1 addition & 6 deletions rmw_zenoh_cpp/src/detail/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,7 @@ class EventsManager
mutable std::mutex event_condition_mutex_;
/// Condition variable to attach for event notifications.
std::condition_variable * event_conditions_[ZENOH_EVENT_ID_MAX + 1]{nullptr};
/// User callback that can be set via data_callback_mgr.set_callback().
rmw_event_callback_t callback_ {nullptr};
/// User data that should be passed to the user callback.
const void * user_data_ {nullptr};
/// Count for
size_t unread_count_ {0};

rmw_event_callback_t event_callback_[ZENOH_EVENT_ID_MAX + 1] {nullptr};
const void * event_data_[ZENOH_EVENT_ID_MAX + 1] {nullptr};
size_t event_unread_count_[ZENOH_EVENT_ID_MAX + 1] {0};
Expand Down
3 changes: 1 addition & 2 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ static const char PUB_STR[] = "MP";
static const char SUB_STR[] = "MS";
static const char SRV_STR[] = "SS";
static const char CLI_STR[] = "SC";
static const char EMPTY_NAMESPACE_REPLACEMENT = '_';
static const char KEYEXPR_DELIMITER = '/';
static const char SLASH_REPLACEMENT = '%';
static const char QOS_DELIMITER = ':';
Expand Down Expand Up @@ -261,7 +260,7 @@ Entity::Entity(
keyexpr_parts[KeyexprIndex::Id] = id_;
keyexpr_parts[KeyexprIndex::EntityStr] = entity_to_str.at(type_);
// An empty namespace from rcl will contain "/" but zenoh does not allow keys with "//".
// Hence we add an "_" to denote an empty namespace such that splitting the key
// Hence we mangle the empty namespace such that splitting the key
// will always result in 5 parts.
keyexpr_parts[KeyexprIndex::Namespace] = mangle_name(node_info_.ns_);
keyexpr_parts[KeyexprIndex::NodeName] = mangle_name(node_info_.name_);
Expand Down
48 changes: 36 additions & 12 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,24 @@
#include "attachment_helpers.hpp"
#include "rmw_data_types.hpp"

///=============================================================================
static size_t hash_gid(const uint8_t * gid)
{
std::stringstream hash_str;
hash_str << std::hex;
size_t i = 0;
for (; i < (RMW_GID_STORAGE_SIZE - 1); i++) {
hash_str << static_cast<int>(gid[i]);
}
return std::hash<std::string>{}(hash_str.str());
}

///=============================================================================
static size_t hash_gid(const rmw_request_id_t & request_id)
{
return hash_gid(request_id.writer_guid);
}

///=============================================================================
size_t rmw_context_impl_s::get_next_entity_id()
{
Expand Down Expand Up @@ -135,7 +153,24 @@ void rmw_subscription_data_t::add_new_message(
}
}

// TODO(Yadunund): Check for ZENOH_EVENT_MESSAGE_LOST.
// Check for messages lost if the new sequence number is not monotonically increasing.
const size_t gid_hash = hash_gid(msg->publisher_gid);
auto last_known_pub_it = last_known_published_msg_.find(gid_hash);
if (last_known_pub_it != last_known_published_msg_.end()) {
const int64_t seq_increment = std::abs(msg->sequence_number - last_known_pub_it->second);
if (seq_increment > 1) {
const size_t num_msg_lost = seq_increment - 1;
total_messages_lost_ += num_msg_lost;
auto event_status = std::make_unique<rmw_zenoh_event_status_t>();
event_status->total_count_change = num_msg_lost;
event_status->total_count = total_messages_lost_;
events_mgr.add_new_event(
rmw_zenoh_cpp::ZENOH_EVENT_MESSAGE_LOST,
std::move(event_status));
}
}
// Always update the last known sequence number for the publisher
last_known_published_msg_[gid_hash] = msg->sequence_number;

message_queue_.emplace_back(std::move(msg));

Expand Down Expand Up @@ -211,17 +246,6 @@ void rmw_service_data_t::add_new_query(std::unique_ptr<ZenohQuery> query)
notify();
}

static size_t hash_gid(const rmw_request_id_t & request_id)
{
std::stringstream hash_str;
hash_str << std::hex;
size_t i = 0;
for (; i < (RMW_GID_STORAGE_SIZE - 1); i++) {
hash_str << static_cast<int>(request_id.writer_guid[i]);
}
return std::hash<std::string>{}(hash_str.str());
}

///=============================================================================
bool rmw_service_data_t::add_to_query_map(
const rmw_request_id_t & request_id, std::unique_ptr<ZenohQuery> query)
Expand Down
4 changes: 4 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ class rmw_subscription_data_t final
std::deque<std::unique_ptr<saved_msg_data>> message_queue_;
mutable std::mutex message_queue_mutex_;

// Map GID of a publisher to the sequence number of the message it published.
std::unordered_map<size_t, int64_t> last_known_published_msg_;
size_t total_messages_lost_{0};

void notify();

std::condition_variable * condition_{nullptr};
Expand Down

0 comments on commit e7aad9b

Please sign in to comment.