Skip to content

Commit

Permalink
- refactor buffer serialization code
Browse files Browse the repository at this point in the history
- update default configs to have SHM enabled by default (if compiled with SHM support)
  • Loading branch information
yellowhatter committed Aug 28, 2024
1 parent 3b31861 commit 991a1a5
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 120 deletions.
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@
},
/// Shared memory configuration
shared_memory: {
enabled: false,
enabled: true,
},
/// Access control configuration
auth: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@
},
/// Shared memory configuration
shared_memory: {
enabled: false,
enabled: true,
},
/// Access control configuration
auth: {
Expand Down
14 changes: 7 additions & 7 deletions rmw_zenoh_cpp/src/rmw_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,10 @@ rmw_ret_t rmw_init(const rmw_init_options_t *options, rmw_context_t *context) {
}

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
z_owned_string_t shm_enabled;
zc_config_get_from_str(z_loan(config), Z_CONFIG_SHARED_MEMORY_KEY, &shm_enabled);
auto free_shm_= rcpputils::make_scope_exit(
[&shm_enabled]() {
z_drop(z_move(shm_enabled));
});
z_owned_string_t shm_enabled_str;
zc_config_get_from_str(z_loan(config), Z_CONFIG_SHARED_MEMORY_KEY, &shm_enabled_str);
const bool shm_enabled = (strncmp(z_string_data(z_loan(shm_enabled_str)), "true", z_string_len(z_loan(shm_enabled_str))) == 0);
z_drop(z_move(shm_enabled_str));
#endif

// Initialize the zenoh session.
Expand Down Expand Up @@ -221,7 +219,7 @@ rmw_ret_t rmw_init(const rmw_init_options_t *options, rmw_context_t *context) {

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Initialize the shm manager if shared_memory is enabled in the config.
if (strncmp(z_string_data(z_loan(shm_enabled)), "true", z_string_len(z_loan(shm_enabled))) == 0) {
if (shm_enabled) {
printf(">>> SHM is enabled\n");

rmw_context_impl_s::rmw_shm_s shm;
Expand All @@ -248,6 +246,8 @@ rmw_ret_t rmw_init(const rmw_init_options_t *options, rmw_context_t *context) {

// Upon successful provider creation, store it in the context
context->impl->shm = std::make_optional(shm);
} else {
printf(">>> SHM is disabled\n");
}

auto free_shm_provider = rcpputils::make_scope_exit(
Expand Down
270 changes: 159 additions & 111 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,151 @@ create_map_and_set_sequence_num(z_owned_bytes_t* out_bytes,
}
} // namespace

size_t serialize_into(
char *buffer,
size_t size,
rmw_zenoh_cpp::rmw_publisher_data_t* publisher_data,
const void *ros_message) {
// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(buffer, size);

// Object that serializes the data
rmw_zenoh_cpp::Cdr ser(fastbuffer);

// Serialize message
if (!publisher_data->type_support->serialize_ros_message(
ros_message, ser.get_cdr(), publisher_data->type_support_impl)) {
RMW_SET_ERROR_MSG("could not serialize ROS message");
return 0;
}

return ser.get_serialized_data_length();
}

rmw_ret_t publish(
rmw_zenoh_cpp::rmw_publisher_data_t* publisher_data,
z_moved_bytes_t* payload) {

auto free_payload = rcpputils::make_scope_exit(
[&payload]() { z_drop(payload); });

z_owned_bytes_t attachment;
if (!create_map_and_set_sequence_num(
&attachment,
publisher_data->get_next_sequence_number(),
publisher_data->pub_gid)) {
// create_map_and_set_sequence_num already set the error
return RMW_RET_ERROR;
}

// The encoding is simply forwarded and is useful when key expressions in the
// session use different encoding formats. In our case, all key expressions
// will be encoded with CDR so it does not really matter.
z_publisher_put_options_t options;
z_publisher_put_options_default(&options);
options.attachment = z_move(attachment);

free_payload.cancel();

if (z_publisher_put(z_loan(publisher_data->pub), payload, &options) != Z_OK) {
RMW_SET_ERROR_MSG("unable to publish message");
return RMW_RET_ERROR;
}

return RMW_RET_OK;
}

/// Publish as RAW message.
rmw_ret_t publish_raw(
rmw_zenoh_cpp::rmw_publisher_data_t* publisher_data,
const void *ros_message,
size_t max_data_length) {
// printf(">>> rmw_publish(), Will use RAW\n");

rcutils_allocator_t *allocator =
&(publisher_data->context->options.allocator);

// Get memory from the allocator.
char * msg_bytes = static_cast<char *>(
allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(msg_bytes, "bytes for message is null",
return RMW_RET_BAD_ALLOC);

auto free_msg_bytes =
rcpputils::make_scope_exit([&msg_bytes, allocator]() {
if (msg_bytes) {
allocator->deallocate(msg_bytes, allocator->state);
}
});

// Serialize message into memory
const size_t data_length = serialize_into(msg_bytes, max_data_length, publisher_data, ros_message);
// Return error upon unsuccessful serialization
if(data_length == 0) {
// serialize_into already set the error
return RMW_RET_ERROR;
}

z_owned_bytes_t payload;
if (z_bytes_serialize_from_buf(
&payload, reinterpret_cast<const uint8_t *>(msg_bytes), data_length) != Z_OK) {
RMW_SET_ERROR_MSG("unable to serialize raw buffer into Zenoh Payload");
return RMW_RET_ERROR;
}

return publish(publisher_data, z_move(payload));
}

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
/// Publish as SHM message.
rmw_ret_t publish_shm(
rmw_zenoh_cpp::rmw_publisher_data_t* publisher_data,
const void *ros_message,
size_t max_data_length,
const z_loaned_shm_provider_t *provider) {
// printf(">>> rmw_publish(), Will use SHM\n");

// Allocate SHM bufer
// We use 1-byte alignment
z_buf_layout_alloc_result_t alloc;
{
z_alloc_alignment_t alignment = {0};
z_shm_provider_alloc_gc_defrag_blocking(
&alloc, provider,
max_data_length, alignment);
}

// Check allocation status
if (alloc.status != ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Unexpected failure during SHM buffer allocation.");
return RMW_RET_ERROR;
}

// Cleanup SHM buffer upon scope exit
auto always_free_shmbuf = rcpputils::make_scope_exit([&alloc]() {
z_drop(z_move(alloc.buf));
});

// Serialize message into memory
char* msg_bytes = reinterpret_cast<char *>(z_shm_mut_data_mut(z_loan_mut(alloc.buf)));
const size_t data_length = serialize_into(msg_bytes, max_data_length, publisher_data, ros_message);
// Return error upon unsuccessful serialization
if(data_length == 0) {
// serialize_into already set the error
return RMW_RET_ERROR;
}

z_owned_bytes_t payload;
if (z_bytes_serialize_from_shm_mut(&payload, z_move(alloc.buf)) != Z_OK) {
RMW_SET_ERROR_MSG("unable to serialize SHM buffer into Zenoh Payload");
return RMW_RET_ERROR;
}

return publish(publisher_data, z_move(payload));
}
#endif

//==============================================================================
/// Publish a ROS message.
rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message,
Expand All @@ -858,137 +1003,40 @@ rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message,
static_cast<rmw_zenoh_cpp::rmw_publisher_data_t *>(publisher->data);
RMW_CHECK_FOR_NULL_WITH_MSG(publisher_data, "publisher_data is null",
return RMW_RET_INVALID_ARGUMENT);

rcutils_allocator_t *allocator =
&(publisher_data->context->options.allocator);

// Serialize data.
size_t max_data_length =
publisher_data->type_support->get_estimated_serialized_size(
ros_message, publisher_data->type_support_impl);

// To store serialized message byte array.
char *msg_bytes = nullptr;

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
std::optional<z_owned_shm_mut_t> shmbuf = std::nullopt;
auto always_free_shmbuf = rcpputils::make_scope_exit([&shmbuf]() {
if (shmbuf.has_value()) {
z_drop(z_move(shmbuf.value()));
}
});
#endif

auto free_msg_bytes =
rcpputils::make_scope_exit([&msg_bytes, allocator
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, &shmbuf
#endif
]() {
if (msg_bytes
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
&& !shmbuf.has_value()
#endif
) {
allocator->deallocate(msg_bytes, allocator->state);
}
});

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get memory from SHM buffer if available.
if (publisher_data->context->impl->shm.has_value() &&
publisher_data->context->impl->shm.value().msgsize_threshold <= max_data_length) {
// printf(">>> rmw_publish(), Will use SHM\n");

const auto& provider = publisher_data->context->impl->shm.value().shm_provider;

// Allocate SHM bufer
// We use 1-byte alignment
z_buf_layout_alloc_result_t alloc;
z_alloc_alignment_t alignment = {0};
z_shm_provider_alloc_gc_defrag_blocking(
&alloc, z_loan(provider),
max_data_length, alignment);

if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
shmbuf = std::make_optional(alloc.buf);
msg_bytes =
reinterpret_cast<char *>(z_shm_mut_data_mut(z_loan_mut(alloc.buf)));
} else {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Unexpected failure during SHM buffer allocation.");
return RMW_RET_ERROR;
}

} else
return publish_shm(
publisher_data,
ros_message,
max_data_length,
z_loan(publisher_data->context->impl->shm.value().shm_provider));
} else
#endif
{
// printf(">>> rmw_publish(), Will use RAW\n");
{
return publish_raw(
publisher_data,
ros_message,
max_data_length);
}
}


// Get memory from the allocator.
msg_bytes = static_cast<char *>(
allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(msg_bytes, "bytes for message is null",
return RMW_RET_BAD_ALLOC);
}

// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length);

// Object that serializes the data
rmw_zenoh_cpp::Cdr ser(fastbuffer);
if (!publisher_data->type_support->serialize_ros_message(
ros_message, ser.get_cdr(), publisher_data->type_support_impl)) {
RMW_SET_ERROR_MSG("could not serialize ROS message");
return RMW_RET_ERROR;
}

const size_t data_length = ser.get_serialized_data_length();

int64_t sequence_number = publisher_data->get_next_sequence_number();

z_owned_bytes_t attachment;
if (!create_map_and_set_sequence_num(&attachment, sequence_number, publisher_data->pub_gid)) {
// create_map_and_set_sequence_num already set the error
return RMW_RET_ERROR;
}
auto free_attachment = rcpputils::make_scope_exit(
[&attachment]() { z_drop(z_move(attachment)); });

z_owned_bytes_t payload;
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
if (shmbuf.has_value()) {
if (z_bytes_serialize_from_shm_mut(&payload, z_move(shmbuf.value())) != Z_OK) {
RMW_SET_ERROR_MSG("unable to serialize SHM buffer into Zenoh Payload");
return RMW_RET_ERROR;
}
} else
#endif
{
if (z_bytes_serialize_from_buf(
&payload, reinterpret_cast<const uint8_t *>(msg_bytes), data_length) != Z_OK) {
RMW_SET_ERROR_MSG("unable to serialize raw buffer into Zenoh Payload");
return RMW_RET_ERROR;
}
}

// we cancel free attachment because attachment will be moved into z_publisher_put_options_t
free_attachment.cancel();

// The encoding is simply forwarded and is useful when key expressions in the
// session use different encoding formats. In our case, all key expressions
// will be encoded with CDR so it does not really matter.
z_publisher_put_options_t options;
z_publisher_put_options_default(&options);
options.attachment = z_move(attachment);

if (z_publisher_put(z_loan(publisher_data->pub), z_move(payload), &options) != Z_OK) {
RMW_SET_ERROR_MSG("unable to publish message");
return RMW_RET_ERROR;
}

return RMW_RET_OK;
}

//==============================================================================
/// Publish a loaned ROS message.
Expand Down

0 comments on commit 991a1a5

Please sign in to comment.