Skip to content

Commit

Permalink
Zenoh 1.0.0 Porting (ros2#276)
Browse files Browse the repository at this point in the history
* chore: configure the compiliation

* chore: complete the 1st version

* fix: memory leak

* fix: z_error_t -> z_result_t

* Fix `scouting/*/autoconnect/*` per eclipse-zenoh/zenoh@b31a410 (#3)

* chore: checkout the local zenoh-c

* chore: polish z_open

* feat: `z_bytes_serialize_from_slice` without copy

* Initialize `query_` member of `ZenohQuery`

* refactor: use `z_owned_slice_t` instead

* chore: adapt the latest change of zenoh-c dev/1.0.0

* chore: use `strncmp` to avoid copying

* refactor: use `z_view_keyexpr_t` to avoid copying

* chore: adapt the new changes from zenoh-c and fix the bug in liveliness

* fix: segmentation fault due to the unallocated query memory

* fix: workaround the ZID parsing issue

* fix Zenoh Config read\check

* adopt to recent zenoh-c API changes

* fix: adapt the latest change of batching config

* build: deprecate the zenohc_debug and include the zenohc dependency in the zenoh_c_vendor

* Use main branch for upgrading to Zenoh 1.0

* Increase the delay in scouting (#16)

* ci: fix the argument order in the style CI

* refactor: use `z_id_to_string`

* build: enable the unstable feature flag

* build: bump up the zenoh-c commit

* build: update zenoh-c version

* fix: set the max size of initial query queue to `SIZE_MAX - 1`

* fix: iterator memory leak

* feat: update to zenoh-c 1.0.0.8 changes

* chore(style): address `ament_cpplint` and `ament_uncrustiy`

* fix: initiate zenoh logger

* chore: apply the suggestions

* chore: add the comments for the zenoh logger

* fix: store and destroy the subscriber properly

* chore: improve the null pointer check: NULL => nullptr

* Change liveliness tokens logs from warn to debug level (#22)

* fix: properly clone the pointer of query and reply to resolve the segfault in test_service__rmw_zenoh_cpp

* chore: update to zenoh-c 1.0.0.9 (#23)

* Thread-safe access to graph cache (ros2#258)

* refactor(api): align with latest serialization changes

* chore(deps): bump up zenoh-c to 1.0.0.10

* chore(api): align with latest serialization changes

* fix: correct the sub_ke and selector_ke in the querying_subscriber

* fix: thread-safe publisher

* Enable history option for liveliness subscriber. (#27)

* refactor!: adopt the TLS config renaming

* refactor: allow Zenoh session to close without dropping

* fix: address the failure in rclcpp/test_wait_for_message of declaring a subscriber after the RMW has been shut down

* test: close but not drop the session

* fix: correct the merge

* chore: Explicit false in adminspace config

* fix: enable admin space in rmw router and ros nodes

* Bump zenoh-c version.

* Use the latest zenoh-c which fix some nav2 issues. (#31)

* Update config files according to Zenoh 1.0.0 DEFAULT_CONFIG.json5 (#33)

* chore(zenoh_c_vendor): bumb up zenoh-c version

* refactor: remove the free_attachment

* Fix unset request header writer GUID in `rmw_take_response`

* fix: keyexpr is missing in the service

* Avoid touching Zenoh Session while exiting.

* Register function right after opening Zenoh Session.

* chore(deps): bump up zenoh-c to 1.0.1

* fix: use TRUE value to configure the feature flag

* fix: correct typo `attachement` to `attachment`

* refactor: remove the warning of subscriber reliability QoS

* Fix `z_view_string_t` to `std::string` conversion

* refactor: zc_liveliness_* -> z_liveliness_* and bump up zenoh-c version

* refactor: reorder the cancel functions

* chore: reorder some lines of code

* refactor: add `session_is_valid` check

* fixup! refactor: reorder the cancel functions

* fixup! refactor: zc_liveliness_* -> z_liveliness_* and bump up zenoh-c version

Signed-off-by: Luca Cominardi <[email protected]>
Signed-off-by: ChenYing Kuo <[email protected]>
Signed-off-by: Gabriele Baldoni <[email protected]>
Signed-off-by: Yadunund <[email protected]>
Co-authored-by: Mahmoud Mazouz <[email protected]>
Co-authored-by: yellowhatter <[email protected]>
Co-authored-by: Steven Palma <[email protected]>
Co-authored-by: Julien Enoch <[email protected]>
Co-authored-by: Chris Lalancette <[email protected]>
  • Loading branch information
6 people authored Dec 6, 2024
1 parent 20af287 commit 0293278
Show file tree
Hide file tree
Showing 30 changed files with 1,243 additions and 814 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/style.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: uncrustify
run: /ros_entrypoint.sh ament_uncrustify --exclude rmw_zenoh_cpp/src/detail/ordered_hash.hpp rmw_zenoh_cpp/src/detail/ordered_map.hpp rmw_zenoh_cpp/
run: /ros_entrypoint.sh ament_uncrustify rmw_zenoh_cpp/ --exclude rmw_zenoh_cpp/src/detail/ordered_hash.hpp rmw_zenoh_cpp/src/detail/ordered_map.hpp
- name: cpplint
run: /ros_entrypoint.sh ament_cpplint --exclude rmw_zenoh_cpp/src/detail/ordered_hash.hpp rmw_zenoh_cpp/src/detail/ordered_map.hpp rmw_zenoh_cpp/
run: /ros_entrypoint.sh ament_cpplint rmw_zenoh_cpp/ --exclude rmw_zenoh_cpp/src/detail/ordered_hash.hpp rmw_zenoh_cpp/src/detail/ordered_map.hpp
4 changes: 0 additions & 4 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ find_package(rosidl_typesupport_fastrtps_c REQUIRED)
find_package(rosidl_typesupport_fastrtps_cpp REQUIRED)
find_package(rmw REQUIRED)
find_package(zenoh_c_vendor REQUIRED)
find_package(zenohc_debug QUIET)
if(NOT zenohc_debug_FOUND)
find_package(zenohc REQUIRED)
endif()

add_library(rmw_zenoh_cpp SHARED
src/detail/attachment_helpers.cpp
Expand Down
325 changes: 278 additions & 47 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5

Large diffs are not rendered by default.

338 changes: 286 additions & 52 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5

Large diffs are not rendered by default.

152 changes: 96 additions & 56 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,83 +16,123 @@

#include <cstdlib>
#include <cstring>
#include <string>
#include <stdexcept>
#include <string_view>
#include <utility>

#include "rmw/types.h"

#include "attachment_helpers.hpp"
#include "liveliness_utils.hpp"

namespace rmw_zenoh_cpp
{
//==============================================================================
bool get_gid_from_attachment(
const z_attachment_t * const attachment, uint8_t gid[RMW_GID_STORAGE_SIZE])
///=============================================================================
AttachmentData::AttachmentData(
const int64_t sequence_number,
const int64_t source_timestamp,
const uint8_t source_gid[RMW_GID_STORAGE_SIZE])
: sequence_number_(sequence_number),
source_timestamp_(source_timestamp)
{
if (!z_check(*attachment)) {
return false;
}

z_bytes_t index = z_attachment_get(*attachment, z_bytes_new("source_gid"));
if (!z_check(index)) {
return false;
}

if (index.len != RMW_GID_STORAGE_SIZE) {
return false;
}

memcpy(gid, index.start, index.len);
memcpy(source_gid_, source_gid, RMW_GID_STORAGE_SIZE);
gid_hash_ = hash_gid(source_gid_);
}

return true;
///=============================================================================
AttachmentData::AttachmentData(AttachmentData && data)
: sequence_number_(std::move(data.sequence_number_)),
source_timestamp_(std::move(data.source_timestamp_)),
gid_hash_(std::move(data.gid_hash_))
{
memcpy(source_gid_, data.source_gid_, RMW_GID_STORAGE_SIZE);
}

//==============================================================================
int64_t get_int64_from_attachment(
const z_attachment_t * const attachment, const std::string & name)
///=============================================================================
AttachmentData::AttachmentData(const z_loaned_bytes_t * attachment)
{
if (!z_check(*attachment)) {
// A valid request must have had an attachment
return -1;
ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment);
z_owned_string_t key;

// Deserialize the sequence_number
ze_deserializer_deserialize_string(&deserializer, &key);
if (std::string_view(
z_string_data(z_loan(key)),
z_string_len(z_loan(key))) != "sequence_number")
{
throw std::runtime_error("sequence_number is not found in the attachment.");
}

z_bytes_t index = z_attachment_get(*attachment, z_bytes_new(name.c_str()));
if (!z_check(index)) {
return -1;
z_drop(z_move(key));
if (ze_deserializer_deserialize_int64(&deserializer, &this->sequence_number_)) {
throw std::runtime_error("Failed to deserialize the sequence_number.");
}

if (index.len < 1) {
return -1;
// Deserialize the source_timestamp
ze_deserializer_deserialize_string(&deserializer, &key);
if (std::string_view(
z_string_data(z_loan(key)),
z_string_len(z_loan(key))) != "source_timestamp")
{
throw std::runtime_error("source_timestamp is not found in the attachment");
}
z_drop(z_move(key));
if (ze_deserializer_deserialize_int64(&deserializer, &this->source_timestamp_)) {
throw std::runtime_error("Failed to deserialize the source_timestamp.");
}

if (index.len > 19) {
// The number was larger than we expected
return -1;
// Deserialize the source_gid
ze_deserializer_deserialize_string(&deserializer, &key);
if (std::string_view(z_string_data(z_loan(key)), z_string_len(z_loan(key))) != "source_gid") {
throw std::runtime_error("Invalid attachment: the key source_gid is not found");
}
z_drop(z_move(key));
z_owned_slice_t slice;
if (ze_deserializer_deserialize_slice(&deserializer, &slice)) {
throw std::runtime_error("Failed to deserialize the source_gid.");
}
if (z_slice_len(z_loan(slice)) != RMW_GID_STORAGE_SIZE) {
throw std::runtime_error("The length of source_gid mismatched.");
}
memcpy(this->source_gid_, z_slice_data(z_loan(slice)), z_slice_len(z_loan(slice)));
z_drop(z_move(slice));
gid_hash_ = hash_gid(this->source_gid_);
}

// The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807.
// That is 19 characters long, plus one for the trailing \0, means we need 20 bytes.
char int64_str[20];
///=============================================================================
int64_t AttachmentData::sequence_number() const
{
return sequence_number_;
}

memcpy(int64_str, index.start, index.len);
int64_str[index.len] = '\0';
///=============================================================================
int64_t AttachmentData::source_timestamp() const
{
return source_timestamp_;
}

errno = 0;
char * endptr;
int64_t num = strtol(int64_str, &endptr, 10);
if (num == 0) {
// This is an error regardless; the client should never send this
return -1;
} else if (endptr == int64_str) {
// No values were converted, this is an error
return -1;
} else if (*endptr != '\0') {
// There was junk after the number
return -1;
} else if (errno != 0) {
// Some other error occurred, which may include overflow or underflow
return -1;
}
///=============================================================================
void AttachmentData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const
{
memcpy(out_gid, source_gid_, RMW_GID_STORAGE_SIZE);
}

///=============================================================================
size_t AttachmentData::gid_hash() const
{
return gid_hash_;
}

return num;
///=============================================================================
void AttachmentData::serialize_to_zbytes(z_owned_bytes_t * attachment)
{
ze_owned_serializer_t serializer;
ze_serializer_empty(&serializer);
ze_serializer_serialize_str(z_loan_mut(serializer), "sequence_number");
ze_serializer_serialize_int64(z_loan_mut(serializer), this->sequence_number_);
ze_serializer_serialize_str(z_loan_mut(serializer), "source_timestamp");
ze_serializer_serialize_int64(z_loan_mut(serializer), this->source_timestamp_);
ze_serializer_serialize_str(z_loan_mut(serializer), "source_gid");
ze_serializer_serialize_buf(z_loan_mut(serializer), this->source_gid_, RMW_GID_STORAGE_SIZE);
ze_serializer_finish(z_move(serializer), attachment);
}
} // namespace rmw_zenoh_cpp
31 changes: 23 additions & 8 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,34 @@

#include <zenoh.h>

#include <string>

#include "rmw/types.h"

namespace rmw_zenoh_cpp
{
//==============================================================================
bool get_gid_from_attachment(
const z_attachment_t * const attachment, uint8_t gid[RMW_GID_STORAGE_SIZE]);
///=============================================================================
class AttachmentData final
{
public:
AttachmentData(
const int64_t sequence_number,
const int64_t source_timestamp,
const uint8_t source_gid[RMW_GID_STORAGE_SIZE]);
explicit AttachmentData(const z_loaned_bytes_t *);
explicit AttachmentData(AttachmentData && data);

int64_t sequence_number() const;
int64_t source_timestamp() const;
void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const;
size_t gid_hash() const;

void serialize_to_zbytes(z_owned_bytes_t *);

//==============================================================================
int64_t get_int64_from_attachment(
const z_attachment_t * const attachment, const std::string & name);
private:
int64_t sequence_number_;
int64_t source_timestamp_;
uint8_t source_gid_[RMW_GID_STORAGE_SIZE];
size_t gid_hash_;
};
} // namespace rmw_zenoh_cpp

#endif // DETAIL__ATTACHMENT_HELPERS_HPP_
7 changes: 4 additions & 3 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -612,9 +612,10 @@ void GraphCache::parse_del(

if (entity->type() == EntityType::Node) {
// Node
// The liveliness tokens to remove pub/subs should be received before the one to remove a node
// given the reliability QoS for liveliness subs. However, if we find any pubs/subs present in
// the node below, we should update the count in graph_topics_.
// When destroying a node, Zenoh does not guarantee that liveliness tokens to remove pub/subs
// arrive before the one to remove the node from the graph despite un-registering those entities
// earlier. In such scenarios, if we find any pub/subs present in the node, we reduce their
// counts in graph_topics_.
const GraphNodePtr graph_node = node_it->second;
if (!graph_node->pubs_.empty() ||
!graph_node->subs_.empty() ||
Expand Down
13 changes: 5 additions & 8 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,14 +374,11 @@ std::optional<rmw_qos_profile_t> keyexpr_to_qos(const std::string & keyexpr)
///=============================================================================
std::string zid_to_str(const z_id_t & id)
{
std::stringstream ss;
ss << std::hex;
for (std::size_t i = 0; i < sizeof(id.id); i++) {
// By Zenoh convention a z_id_t is a little endian u128.
const std::size_t le_idx = sizeof(id.id) - 1 - i;
ss << static_cast<int>(id.id[le_idx]);
}
return ss.str();
z_owned_string_t z_str;
z_id_to_string(&id, &z_str);
std::string str(z_string_data(z_loan(z_str)), z_string_len(z_loan(z_str)));
z_drop(z_move(z_str));
return str;
}

///=============================================================================
Expand Down
Loading

0 comments on commit 0293278

Please sign in to comment.