Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/rolling' into ahcorde/dev/1.0.0-cpp
Browse files Browse the repository at this point in the history
  • Loading branch information
ahcorde committed Dec 6, 2024
2 parents a0943dd + 0293278 commit e63355a
Show file tree
Hide file tree
Showing 20 changed files with 225 additions and 97 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ jobs:
# Jazzy (binary)
- ROS_DISTRO: jazzy
BUILD_TYPE: binary
# Iron (binary)
- ROS_DISTRO: iron
BUILD_TYPE: binary
env:
ROS2_REPOS_FILE_URL: 'https://raw.githubusercontent.com/ros2/ros2/${{ matrix.ROS_DISTRO }}/ros2.repos'
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/style.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
strategy:
fail-fast: false
matrix:
distro: ['iron', 'jazzy', 'rolling']
distro: ['jazzy', 'rolling']
container:
image: ros:${{ matrix.distro }}-ros-base
timeout-minutes: 30
Expand Down
56 changes: 41 additions & 15 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,38 +25,63 @@
#include "rmw/types.h"

#include "attachment_helpers.hpp"
#include "liveliness_utils.hpp"

namespace rmw_zenoh_cpp
{

AttachmentData::AttachmentData(
const int64_t _sequence_number,
const int64_t _source_timestamp,
const uint8_t _source_gid[RMW_GID_STORAGE_SIZE])
const std::vector<uint8_t> _source_gid)
{
sequence_number = _sequence_number;
source_timestamp = _source_timestamp;
for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) {
source_gid.push_back(_source_gid[RMW_GID_STORAGE_SIZE - 1 - i]);
}
sequence_number_ = _sequence_number;
source_timestamp_ = _source_timestamp;
source_gid_ = _source_gid;
gid_hash_ = hash_gid(source_gid_);
}

AttachmentData::AttachmentData(AttachmentData && data)
{
sequence_number = std::move(data.sequence_number);
source_timestamp = std::move(data.source_timestamp);
source_gid = data.source_gid;
gid_hash_ = std::move(data.gid_hash_);
sequence_number_ = std::move(data.sequence_number_);
source_timestamp_ = std::move(data.source_timestamp_);
source_gid_ = data.source_gid_;
}

///=============================================================================
int64_t AttachmentData::sequence_number() const
{
return sequence_number_;
}

///=============================================================================
int64_t AttachmentData::source_timestamp() const
{
return source_timestamp_;
}

///=============================================================================
std::vector<uint8_t> AttachmentData::copy_gid() const
{
return source_gid_;
}

///=============================================================================
size_t AttachmentData::gid_hash() const
{
return gid_hash_;
}

zenoh::Bytes AttachmentData::serialize_to_zbytes()
{
auto serializer = zenoh::ext::Serializer();
serializer.serialize(std::string("sequence_number"));
serializer.serialize(this->sequence_number);
serializer.serialize(this->sequence_number_);
serializer.serialize(std::string("source_timestamp"));
serializer.serialize(this->source_timestamp);
serializer.serialize(this->source_timestamp_);
serializer.serialize(std::string("source_gid"));
serializer.serialize(this->source_gid);
serializer.serialize(this->source_gid_);
return std::move(serializer).finish();
}

Expand All @@ -67,18 +92,19 @@ AttachmentData::AttachmentData(const zenoh::Bytes & attachment)
if (sequence_number_str != "sequence_number") {
throw std::runtime_error("sequence_number is not found in the attachment.");
}
this->sequence_number = deserializer.deserialize<int64_t>();
this->sequence_number_ = deserializer.deserialize<int64_t>();

const auto source_timestamp_str = deserializer.deserialize<std::string>();
if (source_timestamp_str != "source_timestamp") {
throw std::runtime_error("source_timestamp is not found in the attachment.");
}
this->source_timestamp = deserializer.deserialize<int64_t>();
this->source_timestamp_ = deserializer.deserialize<int64_t>();

const auto source_gid_str = deserializer.deserialize<std::string>();
if (source_gid_str != "source_gid") {
throw std::runtime_error("source_gid is not found in the attachment.");
}
this->source_gid = deserializer.deserialize<std::vector<uint8_t>>();
this->source_gid_ = deserializer.deserialize<std::vector<uint8_t>>();
gid_hash_ = hash_gid(this->source_gid_);
}
} // namespace rmw_zenoh_cpp
15 changes: 10 additions & 5 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,28 @@

namespace rmw_zenoh_cpp
{

class AttachmentData final
{
public:
explicit AttachmentData(
const int64_t _sequence_number,
const int64_t _source_timestamp,
const uint8_t _source_gid[RMW_GID_STORAGE_SIZE]);
const std::vector<uint8_t> _source_gid);

explicit AttachmentData(const zenoh::Bytes & bytes);
explicit AttachmentData(AttachmentData && data);

int64_t sequence_number;
int64_t source_timestamp;
std::vector<uint8_t> source_gid;
int64_t sequence_number() const;
int64_t source_timestamp() const;
std::vector<uint8_t> copy_gid() const;
size_t gid_hash() const;

zenoh::Bytes serialize_to_zbytes();
private:
int64_t sequence_number_;
int64_t source_timestamp_;
std::vector<uint8_t> source_gid_;
size_t gid_hash_;
};
} // namespace rmw_zenoh_cpp

Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,7 @@ rmw_ret_t GraphCache::get_entities_info_by_topic(
}

memset(ep.endpoint_gid, 0, RMW_GID_STORAGE_SIZE);
entity->copy_gid(ep.endpoint_gid);
memcpy(ep.endpoint_gid, entity->copy_gid().data(), RMW_GID_STORAGE_SIZE);

endpoints.push_back(ep);
}
Expand Down
22 changes: 17 additions & 5 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,9 @@ Entity::Entity(
// returned to the RMW layer as necessary.
simplified_XXH128_hash_t keyexpr_gid =
simplified_XXH3_128bits(this->liveliness_keyexpr_.c_str(), this->liveliness_keyexpr_.length());
memcpy(this->gid_, &keyexpr_gid.low64, sizeof(keyexpr_gid.low64));
memcpy(this->gid_ + sizeof(keyexpr_gid.low64), &keyexpr_gid.high64, sizeof(keyexpr_gid.high64));
this->gid_.reserve(RMW_GID_STORAGE_SIZE);
memcpy(this->gid_.data(), &keyexpr_gid.low64, sizeof(keyexpr_gid.low64));
memcpy(this->gid_.data() + sizeof(keyexpr_gid.low64), &keyexpr_gid.high64, sizeof(keyexpr_gid.high64));

// We also hash the liveliness keyexpression into a size_t that we use to index into our maps.
this->keyexpr_hash_ = hash_gid(this->gid_);
Expand Down Expand Up @@ -630,9 +631,9 @@ std::string Entity::liveliness_keyexpr() const
}

///=============================================================================
void Entity::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const
std::vector<uint8_t> Entity::copy_gid() const
{
memcpy(out_gid, gid_, RMW_GID_STORAGE_SIZE);
return gid_;
}

///=============================================================================
Expand Down Expand Up @@ -671,7 +672,18 @@ std::string demangle_name(const std::string & input)
} // namespace liveliness

///=============================================================================
size_t hash_gid(const uint8_t gid[RMW_GID_STORAGE_SIZE])
size_t hash_gid(const std::vector<uint8_t> gid)
{
std::stringstream hash_str;
hash_str << std::hex;
for (const auto & g: gid) {
hash_str << static_cast<int>(g);
}
return std::hash<std::string>{}(hash_str.str());
}

///=============================================================================
size_t hash_gid_p(const uint8_t gid[RMW_GID_STORAGE_SIZE])
{
std::stringstream hash_str;
hash_str << std::hex;
Expand Down
8 changes: 6 additions & 2 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ class Entity

void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const;

std::vector<uint8_t> copy_gid() const;

private:
Entity(
std::string zid,
Expand All @@ -191,7 +193,7 @@ class Entity
NodeInfo node_info_;
std::optional<TopicInfo> topic_info_;
std::string liveliness_keyexpr_;
uint8_t gid_[RMW_GID_STORAGE_SIZE];
std::vector<uint8_t> gid_;
};

///=============================================================================
Expand Down Expand Up @@ -235,7 +237,9 @@ std::optional<rmw_qos_profile_t> keyexpr_to_qos(const std::string & keyexpr);

///=============================================================================
/// Generate a hash for a given GID.
size_t hash_gid(const uint8_t gid[RMW_GID_STORAGE_SIZE]);
size_t hash_gid_p(const uint8_t gid[RMW_GID_STORAGE_SIZE]);

size_t hash_gid(const std::vector<uint8_t> gid);
} // namespace rmw_zenoh_cpp

///=============================================================================
Expand Down
40 changes: 32 additions & 8 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ ClientData::ClientData(
wait_set_data_(nullptr),
sequence_number_(1),
is_shutdown_(false),
initialized_(false),
num_in_flight_(0)
{
// Do nothing.
Expand All @@ -177,7 +178,7 @@ bool ClientData::init(const std::shared_ptr<zenoh::Session> & session)
{
std::string topic_keyexpr = this->entity_->topic_info().value().topic_keyexpr_;
keyexpr_ = zenoh::KeyExpr(topic_keyexpr);

// TODO(ahcorde) check KeyExpr
std::string liveliness_keyexpr = this->entity_->liveliness_keyexpr();
zenoh::ZResult err;
this->token_ = session->liveliness_declare_token(
Expand All @@ -191,6 +192,8 @@ bool ClientData::init(const std::shared_ptr<zenoh::Session> & session)
return false;
}

initialized_ = true;

return true;
}

Expand All @@ -202,10 +205,19 @@ liveliness::TopicInfo ClientData::topic_info() const
}

///=============================================================================
void ClientData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const
bool ClientData::liveliness_is_valid() const
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
entity_->copy_gid(out_gid);
// The z_check function is now internal in zenoh-1.0.0 so we assume
// the liveliness token is still initialized as long as this entity has
// not been shutdown.
return !is_shutdown_;
}

///=============================================================================
std::vector<uint8_t> ClientData::copy_gid() const
{
return entity_->copy_gid();
}

///=============================================================================
Expand Down Expand Up @@ -293,19 +305,19 @@ rmw_ret_t ClientData::take_response(
return RMW_RET_ERROR;
}
rmw_zenoh_cpp::AttachmentData attachment(std::move(sample.get_attachment().value().get()));
request_header->request_id.sequence_number = attachment.sequence_number;
request_header->request_id.sequence_number = attachment.sequence_number();
if (request_header->request_id.sequence_number < 0) {
RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment");
return RMW_RET_ERROR;
}
request_header->source_timestamp = attachment.source_timestamp;
request_header->source_timestamp = attachment.source_timestamp();
if (request_header->source_timestamp < 0) {
RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment");
return RMW_RET_ERROR;
}
memcpy(
request_header->request_id.writer_guid,
attachment.source_gid.data(),
attachment.copy_gid().data(),
RMW_GID_STORAGE_SIZE);
request_header->received_timestamp = latest_reply->get_received_timestamp();

Expand Down Expand Up @@ -369,8 +381,8 @@ rmw_ret_t ClientData::send_request(

// Send request
zenoh::Session::GetOptions opts = zenoh::Session::GetOptions::create_default();
uint8_t local_gid[RMW_GID_STORAGE_SIZE];
entity_->copy_gid(local_gid);
std::vector<uint8_t> local_gid;
local_gid = entity_->copy_gid();
opts.attachment = rmw_zenoh_cpp::create_map_and_set_sequence_num(*sequence_id, local_gid);
opts.target = Z_QUERY_TARGET_ALL_COMPLETE;
// The default timeout for a z_get query is 10 seconds and if a response is not received within
Expand Down Expand Up @@ -428,6 +440,18 @@ rmw_ret_t ClientData::send_request(
std::make_unique<rmw_zenoh_cpp::ZenohReply>(reply, received_timestamp));
},
zenoh::closures::none,
// TODO(ahcorde): Try this
// [client_data]() {
// auto sub_data = client_data.lock();
// if (sub_data == nullptr) {
// RMW_ZENOH_LOG_ERROR_NAMED(
// "rmw_zenoh_cpp",
// "Unable to obtain ClientData from data for %s.",
// std::string(sample.get_keyexpr().as_string_view()));
// return;
// }
// client_data->decrement_in_flight_and_conditionally_remove();
// },
std::move(opts),
&err);
return RMW_RET_OK;
Expand Down
7 changes: 6 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
// Get a copy of the TopicInfo of this ClientData.
liveliness::TopicInfo topic_info() const;

// Returns true if liveliness token is still valid.
bool liveliness_is_valid() const;

// Copy the GID of this ClientData into an rmw_gid_t.
void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const;
std::vector<uint8_t> copy_gid() const;

// Add a new ZenohReply to the queue.
void add_new_reply(std::unique_ptr<rmw_zenoh_cpp::ZenohReply> reply);
Expand Down Expand Up @@ -147,6 +150,8 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
size_t sequence_number_;
// Shutdown flag.
bool is_shutdown_;
// Whether the object has ever successfully been initialized.
bool initialized_;
size_t num_in_flight_;
};
using ClientDataPtr = std::shared_ptr<ClientData>;
Expand Down
27 changes: 15 additions & 12 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ class rmw_context_impl_s::Data final
if (reply.is_ok()) {
const auto & sample = reply.get_ok();
graph_cache_->parse_put(std::string(sample.get_keyexpr().as_string_view()), true);
} else {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n");
}
}

Expand Down Expand Up @@ -237,18 +240,18 @@ class rmw_context_impl_s::Data final
return ret;
}

// Shutdown all the nodes in this context.
for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) {
ret = node_it->second->shutdown();
if (ret != RMW_RET_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to shutdown node with id %zu. rmw_ret_t code: %zu.",
node_it->second->id(),
ret
);
}
}
// TODO (ahcorde): review this
// // Shutdown all the nodes in this context.
// for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) {
// ret = node_it->second->shutdown();
// if (ret != RMW_RET_OK) {
// RMW_ZENOH_LOG_ERROR_NAMED(
// "rmw_zenoh_cpp",
// "Unable to shutdown node with id %zu. rmw_ret_t code: %zu.",
// node_it->second->id(),
// ret
// );
// }

zenoh::ZResult err;
std::move(graph_subscriber_cpp_).value().undeclare(&err);
Expand Down
Loading

0 comments on commit e63355a

Please sign in to comment.