Skip to content

Commit

Permalink
Don't copy contiguous bytes on reception
Browse files Browse the repository at this point in the history
This uses the slices iterator API of zenoh-cpp to avoid unecessarily
copying bytes into a vecotr, if and only if the bytes is made up of
exactly one slice.
  • Loading branch information
fuzzypixelz committed Dec 18, 2024
1 parent df6ec26 commit b9dc578
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 5 deletions.
89 changes: 89 additions & 0 deletions rmw_zenoh_cpp/src/detail/payload.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef DETAIL__PAYLOAD_HPP_
#define DETAIL__PAYLOAD_HPP_

#include <zenoh.h>

#include <variant>
#include <utility>

namespace rmw_zenoh_cpp
{
///=============================================================================
class Payload
{
public:
explicit Payload(const zenoh::Bytes & bytes)
{
auto slices = bytes.slice_iter();
auto slice = slices.next();
if (!slice.has_value()) {
bytes_ = nullptr;
} else {
if (!slices.next().has_value()) {
Contiguous contiguous = {};
contiguous.slice = slice.value();
contiguous.bytes = bytes.clone();
} else {
bytes_ = bytes.as_vector();
}
}
}

~Payload() = default;

const uint8_t * data()
{
if (std::holds_alternative<Empty>(bytes_)) {
return nullptr;
} else if (std::holds_alternative<NonContiguous>(bytes_)) {
return std::get<NonContiguous>(bytes_).data();
} else {
return std::get<Contiguous>(bytes_).slice.data;
}
}

size_t size()
{
if (std::holds_alternative<Empty>(bytes_)) {
return 0;
} else if (std::holds_alternative<NonContiguous>(bytes_)) {
return std::get<NonContiguous>(bytes_).size();
} else {
return std::get<Contiguous>(bytes_).slice.len;
}
}

bool empty()
{
return std::holds_alternative<Empty>(bytes_);
}

private:
struct Contiguous
{
zenoh::Slice slice;
zenoh::Bytes bytes;
};
using NonContiguous = std::vector<uint8_t>;
using Empty = std::nullptr_t;
// Is `z_owned_slice_t` in case of a non-contiguous `bytes`
// and `z_view_slice_t` plus a `z_owned_bytes_t` otherwise.
std::variant<NonContiguous, Contiguous, Empty> bytes_;
};
} // namespace rmw_zenoh_cpp

#endif // DETAIL__PAYLOAD_HPP_
6 changes: 3 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ namespace rmw_zenoh_cpp
{
///=============================================================================
SubscriptionData::Message::Message(
std::vector<uint8_t> && p,
const zenoh::Bytes & p,
uint64_t recv_ts,
AttachmentData && attachment_)
: payload(std::move(p)), recv_timestamp(recv_ts), attachment(std::move(attachment_))
: payload(p), recv_timestamp(recv_ts), attachment(std::move(attachment_))
{
}

Expand Down Expand Up @@ -225,7 +225,7 @@ bool SubscriptionData::init()

sub_data->add_new_message(
std::make_unique<SubscriptionData::Message>(
sample.get_payload().as_vector(),
sample.get_payload(),
std::chrono::system_clock::now().time_since_epoch().count(),
std::move(attachment_data)),
std::string(sample.get_keyexpr().as_string_view()));
Expand Down
5 changes: 3 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "attachment_helpers.hpp"
#include "type_support_common.hpp"
#include "zenoh_utils.hpp"
#include "payload.hpp"

#include "rcutils/allocator.h"

Expand All @@ -51,13 +52,13 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
struct Message
{
explicit Message(
std::vector<uint8_t> && p,
const zenoh::Bytes & bytes,
uint64_t recv_ts,
AttachmentData && attachment);

~Message();

std::vector<uint8_t> payload;
Payload payload;
uint64_t recv_timestamp;
AttachmentData attachment;
};
Expand Down

0 comments on commit b9dc578

Please sign in to comment.