Skip to content

Commit

Permalink
Attach sequence number, publisher GID, and source timestamp to public… (
Browse files Browse the repository at this point in the history
#123)

* Attach sequence number, publisher GID, and source timestamp to publications.

That way, the subscriptions can pull them out of the
attachment and pass it to the upper layers.

Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette authored Mar 5, 2024
1 parent 16ff4d0 commit ae14222
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 164 deletions.
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ find_package(zenoh_c_vendor REQUIRED)
find_package(zenohc REQUIRED)

add_library(rmw_zenoh_cpp SHARED
src/detail/attachment_helpers.cpp
src/detail/identifier.cpp
src/detail/graph_cache.cpp
src/detail/guard_condition.cpp
Expand Down
93 changes: 93 additions & 0 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <zenoh.h>

#include <cstdlib>
#include <cstring>
#include <string>

#include "rmw/types.h"

#include "attachment_helpers.hpp"

bool get_gid_from_attachment(
const z_attachment_t * const attachment, uint8_t gid[RMW_GID_STORAGE_SIZE])
{
if (!z_check(*attachment)) {
return false;
}

z_bytes_t index = z_attachment_get(*attachment, z_bytes_new("source_gid"));
if (!z_check(index)) {
return false;
}

if (index.len != RMW_GID_STORAGE_SIZE) {
return false;
}

memcpy(gid, index.start, index.len);

return true;
}

int64_t get_int64_from_attachment(
const z_attachment_t * const attachment, const std::string & name)
{
if (!z_check(*attachment)) {
// A valid request must have had an attachment
return -1;
}

z_bytes_t index = z_attachment_get(*attachment, z_bytes_new(name.c_str()));
if (!z_check(index)) {
return -1;
}

if (index.len < 1) {
return -1;
}

if (index.len > 19) {
// The number was larger than we expected
return -1;
}

// The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807.
// That is 19 characters long, plus one for the trailing \0, means we need 20 bytes.
char int64_str[20];

memcpy(int64_str, index.start, index.len);
int64_str[index.len] = '\0';

errno = 0;
char * endptr;
int64_t num = strtol(int64_str, &endptr, 10);
if (num == 0) {
// This is an error regardless; the client should never send this
return -1;
} else if (endptr == int64_str) {
// No values were converted, this is an error
return -1;
} else if (*endptr != '\0') {
// There was junk after the number
return -1;
} else if (errno != 0) {
// Some other error occurred, which may include overflow or underflow
return -1;
}

return num;
}
30 changes: 30 additions & 0 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef DETAIL__ATTACHMENT_HELPERS_HPP_
#define DETAIL__ATTACHMENT_HELPERS_HPP_

#include <zenoh.h>

#include <string>

#include "rmw/types.h"

bool get_gid_from_attachment(
const z_attachment_t * const attachment, uint8_t gid[RMW_GID_STORAGE_SIZE]);

int64_t get_int64_from_attachment(
const z_attachment_t * const attachment, const std::string & name);

#endif // DETAIL__ATTACHMENT_HELPERS_HPP_
53 changes: 47 additions & 6 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,43 @@

#include <zenoh.h>

#include <condition_variable>
#include <cstring>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <utility>

#include "rcpputils/scope_exit.hpp"
#include "rcutils/logging_macros.h"

#include "attachment_helpers.hpp"
#include "rmw_data_types.hpp"

///==============================================================================
saved_msg_data::saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16])
: payload(p), recv_timestamp(recv_ts)
saved_msg_data::saved_msg_data(
zc_owned_payload_t p,
uint64_t recv_ts,
const uint8_t pub_gid[RMW_GID_STORAGE_SIZE],
int64_t seqnum,
int64_t source_ts)
: payload(p), recv_timestamp(recv_ts), sequence_number(seqnum), source_timestamp(source_ts)
{
memcpy(publisher_gid, pub_gid, 16);
memcpy(publisher_gid, pub_gid, RMW_GID_STORAGE_SIZE);
}

saved_msg_data::~saved_msg_data()
{
z_drop(z_move(payload));
}

size_t rmw_publisher_data_t::get_next_sequence_number()
{
std::lock_guard<std::mutex> lock(sequence_number_mutex_);
return sequence_number_++;
}

void rmw_subscription_data_t::attach_condition(std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(condition_mutex_);
Expand Down Expand Up @@ -253,10 +268,36 @@ void sub_data_handler(
return;
}

uint8_t pub_gid[RMW_GID_STORAGE_SIZE];
if (!get_gid_from_attachment(&sample->attachment, pub_gid)) {
// We failed to get the GID from the attachment. While this isn't fatal,
// it is unusual and so we should report it.
memset(pub_gid, 0, RMW_GID_STORAGE_SIZE);
RCUTILS_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Unable to obtain publisher GID from the attachment.");
}

int64_t sequence_number = get_int64_from_attachment(&sample->attachment, "sequence_number");
if (sequence_number < 0) {
// We failed to get the sequence number from the attachment. While this
// isn't fatal, it is unusual and so we should report it.
sequence_number = 0;
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Unable to obtain sequence number from the attachment.");
}

int64_t source_timestamp = get_int64_from_attachment(&sample->attachment, "source_timestamp");
if (source_timestamp < 0) {
// We failed to get the source timestamp from the attachment. While this
// isn't fatal, it is unusual and so we should report it.
source_timestamp = 0;
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Unable to obtain sequence number from the attachment.");
}

sub_data->add_new_message(
std::make_unique<saved_msg_data>(
zc_sample_payload_rcinc(sample),
sample->timestamp.time, sample->timestamp.id.id), z_loan(keystr));
sample->timestamp.time, pub_gid, sequence_number, source_timestamp), z_loan(keystr));
}

ZenohQuery::ZenohQuery(const z_query_t * query)
Expand Down Expand Up @@ -318,8 +359,8 @@ std::optional<z_sample_t> ZenohReply::get_sample() const

size_t rmw_client_data_t::get_next_sequence_number()
{
std::lock_guard<std::mutex> lock(sequence_number_mutex);
return sequence_number++;
std::lock_guard<std::mutex> lock(sequence_number_mutex_);
return sequence_number_++;
}

//==============================================================================
Expand Down
28 changes: 21 additions & 7 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ struct rmw_node_data_t
};

///==============================================================================
struct rmw_publisher_data_t
class rmw_publisher_data_t final
{
public:
// An owned publisher.
z_owned_publisher_t pub;

Expand All @@ -93,7 +94,13 @@ struct rmw_publisher_data_t
// Context for memory allocation for messages.
rmw_context_t * context;

uint8_t pub_guid[RMW_GID_STORAGE_SIZE];
uint8_t pub_gid[RMW_GID_STORAGE_SIZE];

size_t get_next_sequence_number();

private:
std::mutex sequence_number_mutex_;
size_t sequence_number_{1};
};

///==============================================================================
Expand All @@ -111,13 +118,20 @@ void sub_data_handler(const z_sample_t * sample, void * sub_data);

struct saved_msg_data
{
explicit saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16]);
explicit saved_msg_data(
zc_owned_payload_t p,
uint64_t recv_ts,
const uint8_t pub_gid[RMW_GID_STORAGE_SIZE],
int64_t seqnum,
int64_t source_ts);

~saved_msg_data();

zc_owned_payload_t payload;
uint64_t recv_timestamp;
uint8_t publisher_gid[16];
uint8_t publisher_gid[RMW_GID_STORAGE_SIZE];
int64_t sequence_number;
int64_t source_timestamp;
};

///==============================================================================
Expand Down Expand Up @@ -266,7 +280,7 @@ class rmw_client_data_t final

rmw_context_t * context;

uint8_t client_guid[RMW_GID_STORAGE_SIZE];
uint8_t client_gid[RMW_GID_STORAGE_SIZE];

size_t get_next_sequence_number();

Expand All @@ -283,8 +297,8 @@ class rmw_client_data_t final
private:
void notify();

size_t sequence_number{1};
std::mutex sequence_number_mutex;
size_t sequence_number_{1};
std::mutex sequence_number_mutex_;

std::condition_variable * condition_{nullptr};
std::mutex condition_mutex_;
Expand Down
Loading

0 comments on commit ae14222

Please sign in to comment.