Skip to content

Commit

Permalink
Adds base support for services.
Browse files Browse the repository at this point in the history
Signed-off-by: Franco Cipollone <[email protected]>
  • Loading branch information
francocipollone committed Dec 15, 2023
1 parent d734bc7 commit ab35a71
Show file tree
Hide file tree
Showing 3 changed files with 989 additions and 43 deletions.
87 changes: 87 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,90 @@ void sub_data_handler(

z_drop(z_move(keystr));
}

void service_data_handler(const z_query_t * query, void * service_data) {
RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"service_data_handler triggered"
);
z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query));

auto rmw_service_data = static_cast<rmw_service_data_t *>(service_data);
if (rmw_service_data == nullptr) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain rmw_service_data_t from data for "
"service for %s",
z_loan(keystr)
);
return;
}

// Get the query parameters and payload
{
std::lock_guard<std::mutex> lock(rmw_service_data->query_queue_mutex);

const unsigned int client_id = rmw_service_data->get_new_uid();
rmw_service_data->id_query_map.emplace(
std::make_pair(client_id, std::make_unique<saved_queryable_data>(z_query_clone(query))));
rmw_service_data->to_take.push_back(client_id);


// Since we added new data, trigger the guard condition if it is available
std::lock_guard<std::mutex> internal_lock(rmw_service_data->internal_mutex);
if (rmw_service_data->condition != nullptr) {
rmw_service_data->condition->notify_one();
}
}

z_drop(z_move(keystr));
}

void client_data_handler(z_owned_reply_t * reply, void * client_data) {
RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"client_data_handler triggered"
);
auto rmw_client_data = static_cast<rmw_client_data_t *>(client_data);
if (rmw_client_data == nullptr) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain rmw_client_data_t "
);
return;
}

if (!z_reply_is_ok(reply)) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain z_reply_is_ok "
);
return;
}

z_sample_t sample = z_reply_ok(reply);

z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);

RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"client_data_handler triggered for %s",
z_loan(keystr)
);

{
std::lock_guard<std::mutex> msg_lock(rmw_client_data->message_mutex);
rmw_client_data->message = std::make_unique<saved_msg_data>(
zc_sample_payload_rcinc(&sample),
sample.timestamp.time, sample.timestamp.id.id);
}
{
std::lock_guard<std::mutex> internal_lock(rmw_client_data->internal_mutex);
if (rmw_client_data->condition != nullptr) {
rmw_client_data->condition->notify_one();
}
}

z_drop(z_move(keystr));
z_drop(z_move(reply));
}
84 changes: 84 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

#include "graph_cache.hpp"
#include "message_type_support.hpp"
#include "service_type_support.hpp"

/// Structs for various type erased data fields.

Expand Down Expand Up @@ -131,4 +132,87 @@ struct rmw_subscription_data_t
std::condition_variable * condition{nullptr};
};


///==============================================================================

// z_owned_closure_query_t
void service_data_handler(const z_query_t * query, void * service_data);

void client_data_handler(z_owned_reply_t * reply, void * client_data);


struct saved_queryable_data
{
explicit saved_queryable_data(z_owned_query_t query)
: query(query)
{
}

const z_owned_query_t query;
};

///==============================================================================

struct rmw_service_data_t
{
unsigned int get_new_uid() {
return client_count++;
}

const char * zn_queryable_key;
z_owned_queryable_t zn_queryable;

// Liveliness token for the service.
zc_owned_liveliness_token_t token;

const void * request_type_support_impl;
const void * response_type_support_impl;
const char * typesupport_identifier;
RequestTypeSupport * request_type_support;
ResponseTypeSupport * response_type_support;

rmw_context_t * context;

// Map to store the query id and the query.
// The query handler is saved as it is needed to answer the query later on.
std::unordered_map<unsigned int, std::unique_ptr<saved_queryable_data>> id_query_map;
// The query id's of the queries that need to be processed.
std::deque<unsigned int> to_take;
std::mutex query_queue_mutex;

std::mutex internal_mutex;
std::condition_variable * condition{nullptr};


unsigned int client_count{};

};

///==============================================================================

struct rmw_client_data_t
{
z_owned_reply_channel_t zn_reply_channel;
// z_get_options_t z_get_options;

z_owned_closure_reply_t zn_closure_reply;

// Liveliness token for the client.
zc_owned_liveliness_token_t token;

std::mutex message_mutex;
std::unique_ptr<saved_msg_data> message;

const void * request_type_support_impl;
const void * response_type_support_impl;
const char * typesupport_identifier;
RequestTypeSupport * request_type_support;
ResponseTypeSupport * response_type_support;

rmw_context_t * context;

std::mutex internal_mutex;
std::condition_variable * condition{nullptr};
};

#endif // DETAIL__RMW_DATA_TYPES_HPP_
Loading

0 comments on commit ab35a71

Please sign in to comment.