Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement __rmw_take_serialized. #113

Merged
merged 2 commits into from
Feb 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 69 additions & 9 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1634,6 +1634,69 @@ rmw_take_sequence(
return RMW_RET_UNSUPPORTED;
}

//==============================================================================
static rmw_ret_t __rmw_take_serialized(
const rmw_subscription_t * subscription,
rmw_serialized_message_t * serialized_message,
bool * taken,
rmw_message_info_t * message_info)
{
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription->topic_name, RMW_RET_ERROR);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription->data, RMW_RET_ERROR);
RMW_CHECK_ARGUMENT_FOR_NULL(serialized_message, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(message_info, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription handle,
subscription->implementation_identifier, rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

*taken = false;

auto sub_data = static_cast<rmw_subscription_data_t *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);

if (sub_data->context->impl->is_shutdown) {
return RMW_RET_OK;
}

// RETRIEVE SERIALIZED MESSAGE ===============================================

std::unique_ptr<saved_msg_data> msg_data = sub_data->pop_next_message();
if (msg_data == nullptr) {
// This tells rcl that the check for a new message was done, but no messages have come in yet.
return RMW_RET_OK;
}

if (serialized_message->buffer_capacity < msg_data->payload.payload.len) {
rmw_ret_t ret =
rmw_serialized_message_resize(serialized_message, msg_data->payload.payload.len);
if (ret != RMW_RET_OK) {
return ret; // Error message already set
}
}
serialized_message->buffer_length = msg_data->payload.payload.len;
memcpy(
serialized_message->buffer, msg_data->payload.payload.start,
msg_data->payload.payload.len);

*taken = true;

// TODO(clalancette): fill in source_timestamp
message_info->source_timestamp = 0;
message_info->received_timestamp = msg_data->recv_timestamp;
// TODO(clalancette): fill in publication_sequence_number
message_info->publication_sequence_number = 0;
// TODO(clalancette): fill in reception_sequence_number
message_info->reception_sequence_number = 0;
message_info->publisher_gid.implementation_identifier = rmw_zenoh_identifier;
memcpy(message_info->publisher_gid.data, msg_data->publisher_gid, 16);
message_info->from_intra_process = false;

return RMW_RET_OK;
}

//==============================================================================
/// Take an incoming ROS message as a byte stream.
rmw_ret_t
Expand All @@ -1643,11 +1706,11 @@ rmw_take_serialized_message(
bool * taken,
rmw_subscription_allocation_t * allocation)
{
static_cast<void>(subscription);
static_cast<void>(serialized_message);
static_cast<void>(taken);
static_cast<void>(allocation);
return RMW_RET_UNSUPPORTED;

rmw_message_info_t dummy_msg_info;

return __rmw_take_serialized(subscription, serialized_message, taken, &dummy_msg_info);
}

//==============================================================================
Expand All @@ -1660,12 +1723,9 @@ rmw_take_serialized_message_with_info(
rmw_message_info_t * message_info,
rmw_subscription_allocation_t * allocation)
{
static_cast<void>(subscription);
static_cast<void>(serialized_message);
static_cast<void>(taken);
static_cast<void>(message_info);
static_cast<void>(allocation);
return RMW_RET_UNSUPPORTED;

return __rmw_take_serialized(subscription, serialized_message, taken, message_info);
}

//==============================================================================
Expand Down
Loading