Skip to content

Commit

Permalink
fix: adapt the merge and bump up zenoh-c version
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanYuYuan committed Nov 18, 2024
1 parent 46ec449 commit 2743344
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 228 deletions.
157 changes: 65 additions & 92 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,29 @@
#include "rmw_client_data.hpp"

#include <fastcdr/FastBuffer.h>
#include <zenoh.h>

#include <chrono>
#include <cinttypes>
#include <limits>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>

#include "attachment_helpers.hpp"
#include "cdr.hpp"
#include "liveliness_utils.hpp"
#include "logging_macros.hpp"
#include "message_type_support.hpp"
#include "qos.hpp"
#include "rmw_context_impl_s.hpp"

#include "rcpputils/scope_exit.hpp"

#include "rmw/error_handling.h"
#include "rmw/get_topic_endpoint_info.h"
#include "rmw/impl/cpp/macros.hpp"

namespace
{

///=============================================================================
void client_data_handler(z_owned_reply_t * reply, void * data)
void client_data_handler(z_loaned_reply_t * reply, void * data)
{
auto client_data = static_cast<rmw_zenoh_cpp::ClientData *>(data);
if (client_data == nullptr) {
Expand All @@ -59,33 +52,31 @@ void client_data_handler(z_owned_reply_t * reply, void * data)
return;
}

if (!z_reply_check(reply)) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"z_reply_check returned False"
);
return;
}
if (!z_reply_is_ok(reply)) {
z_value_t err = z_reply_err(reply);
const z_loaned_reply_err_t * err = z_reply_err(reply);
const z_loaned_bytes_t * err_payload = z_reply_err_payload(err);

z_owned_string_t err_str;
z_bytes_to_string(err_payload, &err_str);

RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"z_reply_is_ok returned False for keyexpr %s. Reason: %.*s",
client_data->topic_info().topic_keyexpr_.c_str(),
(int)err.payload.len,
err.payload.start);
static_cast<int>(z_string_len(z_loan(err_str))),
z_string_data(z_loan(err_str)));
z_drop(z_move(err_str));

return;
}

std::chrono::nanoseconds::rep received_timestamp =
std::chrono::system_clock::now().time_since_epoch().count();

z_owned_reply_t owned_reply;
z_reply_clone(&owned_reply, reply);
client_data->add_new_reply(
std::make_unique<rmw_zenoh_cpp::ZenohReply>(reply, received_timestamp));

// Since we took ownership of the reply, null it out here
*reply = z_reply_null();
std::make_unique<rmw_zenoh_cpp::ZenohReply>(owned_reply, received_timestamp));
}

///=============================================================================
Expand All @@ -109,7 +100,7 @@ namespace rmw_zenoh_cpp
{
///=============================================================================
std::shared_ptr<ClientData> ClientData::make(
z_session_t session,
const z_loaned_session_t * session,
const rmw_node_t * const node,
const rmw_client_t * client,
liveliness::NodeInfo node_info,
Expand Down Expand Up @@ -235,29 +226,32 @@ ClientData::ClientData(
}

///=============================================================================
bool ClientData::init(z_session_t session)
bool ClientData::init(const z_loaned_session_t * session)
{
this->keyexpr_ =
z_keyexpr_new(this->entity_->topic_info().value().topic_keyexpr_.c_str());
std::string topic_keyexpr = this->entity_->topic_info().value().topic_keyexpr_;
auto free_ros_keyexpr = rcpputils::make_scope_exit(
[this]() {
z_drop(z_move(this->keyexpr_));
});
if (!z_check(z_loan(this->keyexpr_))) {
if (z_keyexpr_from_str(&this->keyexpr_, topic_keyexpr.c_str()) != Z_OK) {
RMW_SET_ERROR_MSG("unable to create zenoh keyexpr.");
return false;
}

this->token_ = zc_liveliness_declare_token(
session,
z_keyexpr(this->entity_->liveliness_keyexpr().c_str()),
NULL
);
std::string liveliness_keyexpr = this->entity_->liveliness_keyexpr();
z_view_keyexpr_t liveliness_ke;
z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str());
auto free_token = rcpputils::make_scope_exit(
[this]() {
z_drop(z_move(this->token_));
});
if (!z_check(this->token_)) {
if(zc_liveliness_declare_token(
session,
&this->token_,
z_loan(liveliness_ke),
NULL
) != Z_OK)
{
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create liveliness token for the client.");
Expand All @@ -277,13 +271,6 @@ liveliness::TopicInfo ClientData::topic_info() const
return entity_->topic_info().value();
}

///=============================================================================
bool ClientData::liveliness_is_valid() const
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
return zc_liveliness_token_check(&token_);
}

///=============================================================================
void ClientData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const
{
Expand All @@ -301,14 +288,15 @@ void ClientData::add_new_reply(std::unique_ptr<ZenohReply> reply)
reply_queue_.size() >= adapted_qos_profile.depth)
{
// Log warning if message is discarded due to hitting the queue depth
z_owned_str_t keystr = z_keyexpr_to_string(z_loan(keyexpr_));
z_view_string_t keystr;
z_keyexpr_as_view_string(z_loan(keyexpr_), &keystr);
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Query queue depth of %ld reached, discarding oldest Query "
"for client for %s",
"for client for %.*s",
adapted_qos_profile.depth,
z_loan(keystr));
z_drop(z_move(keystr));
static_cast<int>(z_string_len(z_loan(keystr))),
z_string_data(z_loan(keystr)));
reply_queue_.pop_front();
}
reply_queue_.emplace_back(std::move(reply));
Expand Down Expand Up @@ -338,16 +326,18 @@ rmw_ret_t ClientData::take_response(
std::unique_ptr<ZenohReply> latest_reply = std::move(reply_queue_.front());
reply_queue_.pop_front();

std::optional<z_sample_t> sample = latest_reply->get_sample();
if (!sample) {
if (!latest_reply->get_sample().has_value()) {
RMW_SET_ERROR_MSG("invalid reply sample");
return RMW_RET_ERROR;
}
const z_loaned_sample_t * sample = latest_reply->get_sample().value();

// Object that manages the raw buffer
z_owned_slice_t payload;
z_bytes_to_slice(z_sample_payload(sample), &payload);
eprosima::fastcdr::FastBuffer fastbuffer(
reinterpret_cast<char *>(const_cast<uint8_t *>(sample->payload.start)),
sample->payload.len);
reinterpret_cast<char *>(const_cast<uint8_t *>(z_slice_data(z_loan(payload)))),
z_slice_len(z_loan(payload)));

// Object that serializes the data
rmw_zenoh_cpp::Cdr deser(fastbuffer);
Expand All @@ -361,28 +351,21 @@ rmw_ret_t ClientData::take_response(
}

// Fill in the request_header
request_header->request_id.sequence_number =
rmw_zenoh_cpp::get_int64_from_attachment(&sample->attachment, "sequence_number");
rmw_zenoh_cpp::attachement_data_t attachment(z_sample_attachment(sample));
request_header->request_id.sequence_number = attachment.sequence_number;
if (request_header->request_id.sequence_number < 0) {
RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment");
return RMW_RET_ERROR;
}
request_header->source_timestamp =
rmw_zenoh_cpp::get_int64_from_attachment(&sample->attachment, "source_timestamp");
request_header->source_timestamp = attachment.source_timestamp;
if (request_header->source_timestamp < 0) {
RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment");
return RMW_RET_ERROR;
}
if (!rmw_zenoh_cpp::get_gid_from_attachment(
&sample->attachment,
request_header->request_id.writer_guid))
{
RMW_SET_ERROR_MSG("Could not get client gid from attachment");
return RMW_RET_ERROR;
}

memcpy(request_header->request_id.writer_guid, attachment.source_gid, RMW_GID_STORAGE_SIZE);
request_header->received_timestamp = latest_reply->get_received_timestamp();

z_drop(z_move(payload));
*taken = true;

return RMW_RET_OK;
Expand Down Expand Up @@ -436,28 +419,16 @@ rmw_ret_t ClientData::send_request(
*sequence_id = sequence_number_++;

// Send request
z_get_options_t opts = z_get_options_default();
z_owned_bytes_map_t map = create_map_and_set_sequence_num(
*sequence_id,
[this](z_owned_bytes_map_t * map, const char * key)
{
uint8_t local_gid[RMW_GID_STORAGE_SIZE];
entity_->copy_gid(local_gid);
z_bytes_t gid_bytes;
gid_bytes.len = RMW_GID_STORAGE_SIZE;
gid_bytes.start = local_gid;
z_bytes_map_insert_by_copy(map, z_bytes_new(key), gid_bytes);
});
if (!z_check(map)) {
// create_map_and_set_sequence_num already set the error
return RMW_RET_ERROR;
}
auto always_free_attachment_map = rcpputils::make_scope_exit(
[&map]() {
z_bytes_map_drop(z_move(map));
});
z_get_options_t opts;
z_get_options_default(&opts);
z_owned_bytes_t attachment;
uint8_t local_gid[RMW_GID_STORAGE_SIZE];
entity_->copy_gid(local_gid);
rmw_zenoh_cpp::create_map_and_set_sequence_num(
&attachment, *sequence_id,
local_gid);
opts.attachment = z_move(attachment);

opts.attachment = z_bytes_map_as_attachment(&map);
opts.target = Z_QUERY_TARGET_ALL_COMPLETE;
// The default timeout for a z_get query is 10 seconds and if a response is not received within
// this window, the queryable will return an invalid reply. However, it is common for actions,
Expand All @@ -468,15 +439,20 @@ rmw_ret_t ClientData::send_request(
// which optimizes bandwidth. The default is "None", which imples replies may come in any order
// and any number.
opts.consolidation = z_query_consolidation_latest();
opts.value.payload = z_bytes_t{data_length, reinterpret_cast<const uint8_t *>(request_bytes)};

z_owned_bytes_t payload;
z_bytes_copy_from_buf(
&payload, reinterpret_cast<const uint8_t *>(request_bytes), data_length);
opts.payload = z_move(payload);

// TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures,
// capture shared_from_this() instead of this.
z_owned_closure_reply_t zn_closure_reply =
z_closure(client_data_handler, client_data_drop, this);
z_owned_closure_reply_t callback;
z_closure(&callback, client_data_handler, client_data_drop, this);
z_get(
context_impl->session(),
z_loan(keyexpr_), "",
z_move(zn_closure_reply),
z_move(callback),
&opts);

return RMW_RET_OK;
Expand Down Expand Up @@ -534,12 +510,9 @@ void ClientData::_shutdown()
}

// Unregister this node from the ROS graph.
if (zc_liveliness_token_check(&token_)) {
zc_liveliness_undeclare_token(z_move(token_));
}
if (z_check(z_loan(keyexpr_))) {
z_drop(z_move(keyexpr_));
}
zc_liveliness_undeclare_token(z_move(token_));

z_drop(z_move(keyexpr_));

is_shutdown_ = true;
}
Expand Down
14 changes: 2 additions & 12 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,13 @@
#include <deque>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <unordered_map>

#include "event.hpp"
#include "liveliness_utils.hpp"
#include "message_type_support.hpp"
#include "service_type_support.hpp"
#include "type_support_common.hpp"
#include "zenoh_utils.hpp"

#include "rcutils/allocator.h"

#include "rmw/rmw.h"
#include "rmw/ret_types.h"

namespace rmw_zenoh_cpp
Expand All @@ -47,7 +40,7 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
public:
// Make a shared_ptr of ClientData.
static std::shared_ptr<ClientData> make(
z_session_t session,
const z_loaned_session_t * session,
const rmw_node_t * const node,
const rmw_client_t * client,
liveliness::NodeInfo node_info,
Expand All @@ -60,9 +53,6 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
// Get a copy of the TopicInfo of this ClientData.
liveliness::TopicInfo topic_info() const;

// Returns true if liveliness token is still valid.
bool liveliness_is_valid() const;

// Copy the GID of this ClientData into an rmw_gid_t.
void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const;

Expand Down Expand Up @@ -119,7 +109,7 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
std::shared_ptr<ResponseTypeSupport> response_type_support);

// Initialize the Zenoh objects for this entity.
bool init(z_session_t session);
bool init(const z_loaned_session_t * session);

// Shutdown this client (the mutex is expected to be held by the caller).
void _shutdown();
Expand Down
Loading

0 comments on commit 2743344

Please sign in to comment.