Skip to content

Commit

Permalink
Merge pull request #340 from ZettaScaleLabs/fix/sub-high-latency
Browse files Browse the repository at this point in the history
Fix the high latency on the subscriber due to zenoh bytes conversion
  • Loading branch information
YuanYuYuan authored Dec 16, 2024
1 parent 9e17da2 commit b62a6e2
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
15 changes: 7 additions & 8 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <string>
#include <utility>
#include <variant>
#include <vector>

#include "attachment_helpers.hpp"
#include "cdr.hpp"
Expand All @@ -43,7 +44,7 @@ namespace rmw_zenoh_cpp
{
///=============================================================================
SubscriptionData::Message::Message(
zenoh::Bytes p,
std::vector<uint8_t> && p,
uint64_t recv_ts,
AttachmentData && attachment_)
: payload(std::move(p)), recv_timestamp(recv_ts), attachment(std::move(attachment_))
Expand Down Expand Up @@ -224,7 +225,7 @@ bool SubscriptionData::init()

sub_data->add_new_message(
std::make_unique<SubscriptionData::Message>(
sample.get_payload().clone(),
sample.get_payload().as_vector(),
std::chrono::system_clock::now().time_since_epoch().count(),
std::move(attachment_data)),
std::string(sample.get_keyexpr().as_string_view()));
Expand Down Expand Up @@ -287,8 +288,6 @@ bool SubscriptionData::init()
zenoh::Subscriber<void> sub = context_impl->session()->declare_subscriber(
sub_ke,
[data_wp](const zenoh::Sample & sample) {
zenoh::KeyExpr keystr(std::string(sample.get_keyexpr().as_string_view()));

auto sub_data = data_wp.lock();
if (sub_data == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
Expand All @@ -310,10 +309,10 @@ bool SubscriptionData::init()
AttachmentData attachment_data(attachment_value);
sub_data->add_new_message(
std::make_unique<SubscriptionData::Message>(
sample.get_payload().clone(),
payload.as_vector(),
std::chrono::system_clock::now().time_since_epoch().count(),
std::move(attachment_data)),
std::string(keystr.as_string_view()));
std::string(sample.get_keyexpr().as_string_view()));
},
zenoh::closures::none,
std::move(sub_options),
Expand Down Expand Up @@ -491,7 +490,7 @@ rmw_ret_t SubscriptionData::take_one_message(
std::unique_ptr<Message> msg_data = std::move(message_queue_.front());
message_queue_.pop_front();

auto payload_data = msg_data->payload.as_vector();
auto & payload_data = msg_data->payload;

if (payload_data.empty()) {
RMW_ZENOH_LOG_DEBUG_NAMED(
Expand Down Expand Up @@ -549,7 +548,7 @@ rmw_ret_t SubscriptionData::take_serialized_message(
std::unique_ptr<Message> msg_data = std::move(message_queue_.front());
message_queue_.pop_front();

auto payload_data = msg_data->payload.as_vector();
auto & payload_data = msg_data->payload;

if (payload_data.empty()) {
RMW_ZENOH_LOG_DEBUG_NAMED(
Expand Down
5 changes: 3 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <string>
#include <unordered_map>
#include <variant>
#include <vector>

#include <zenoh.hxx>

Expand All @@ -50,13 +51,13 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
struct Message
{
explicit Message(
zenoh::Bytes p,
std::vector<uint8_t> && p,
uint64_t recv_ts,
AttachmentData && attachment);

~Message();

zenoh::Bytes payload;
std::vector<uint8_t> payload;
uint64_t recv_timestamp;
AttachmentData attachment;
};
Expand Down

0 comments on commit b62a6e2

Please sign in to comment.