Skip to content

Commit

Permalink
codestyle
Browse files Browse the repository at this point in the history
  • Loading branch information
yellowhatter committed Sep 25, 2024
1 parent 1ce3cf8 commit 01b34c5
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 93 deletions.
3 changes: 2 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class rmw_context_impl_s final
z_owned_session_t session;

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
struct rmw_shm_s {
struct rmw_shm_s
{
z_owned_shm_provider_t shm_provider;
size_t msgsize_threshold;
};
Expand Down
12 changes: 7 additions & 5 deletions rmw_zenoh_cpp/src/detail/zenoh_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ static const std::unordered_map<ConfigurableEntity,
static const char * router_check_attempts_envar = "ZENOH_ROUTER_CHECK_ATTEMPTS";
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
static const char * zenoh_shm_alloc_size_envar = "ZENOH_SHM_ALLOC_SIZE";
static const size_t zenoh_shm_alloc_size_default = 100*1024*1024;
static const size_t zenoh_shm_alloc_size_default = 1 * 1024 * 1024;
static const char * zenoh_shm_message_size_threshold_envar = "ZENOH_SHM_MESSAGE_SIZE_THRESHOLD";
static const size_t zenoh_shm_message_size_threshold_default = 2*1024;
static const size_t zenoh_shm_message_size_threshold_default = 2 * 1024;
#endif

rmw_ret_t _get_z_config(
Expand Down Expand Up @@ -133,7 +133,8 @@ std::optional<uint64_t> zenoh_router_check_attempts()

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
///=============================================================================
size_t zenoh_shm_alloc_size() {
size_t zenoh_shm_alloc_size()
{
const char * envar_value;

if (NULL != rcutils_get_env(zenoh_shm_alloc_size_envar, &envar_value)) {
Expand All @@ -142,7 +143,7 @@ size_t zenoh_shm_alloc_size() {
zenoh_shm_alloc_size_envar);
return zenoh_shm_alloc_size_default;
}

// If the environment variable contains a value, handle it accordingly.
if (envar_value[0] != '\0') {
const auto read_value = std::strtoull(envar_value, nullptr, 10);
Expand All @@ -160,7 +161,8 @@ size_t zenoh_shm_alloc_size() {
return zenoh_shm_alloc_size_default;
}
///=============================================================================
size_t zenoh_shm_message_size_threshold() {
size_t zenoh_shm_message_size_threshold()
{
const char * envar_value;

if (NULL != rcutils_get_env(zenoh_shm_message_size_threshold_envar, &envar_value)) {
Expand Down
11 changes: 8 additions & 3 deletions rmw_zenoh_cpp/src/rmw_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,11 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
z_owned_string_t shm_enabled_str;
zc_config_get_from_str(z_loan(config), Z_CONFIG_SHARED_MEMORY_KEY, &shm_enabled_str);
const bool shm_enabled = (strncmp(z_string_data(z_loan(shm_enabled_str)), "true", z_string_len(z_loan(shm_enabled_str))) == 0);
const bool shm_enabled =
(strncmp(
z_string_data(z_loan(shm_enabled_str)), "true", z_string_len(
z_loan(
shm_enabled_str))) == 0);
z_drop(z_move(shm_enabled_str));
#endif

Expand Down Expand Up @@ -244,13 +248,14 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
// Provider's alignment will be 1 byte as we are going to make only 1-byte aligned allocations
z_alloc_alignment_t alignment = {0};
z_owned_memory_layout_t layout;
if(z_memory_layout_new(&layout, rmw_zenoh_cpp::zenoh_shm_alloc_size(), alignment) != Z_OK) {
if (z_memory_layout_new(&layout, rmw_zenoh_cpp::zenoh_shm_alloc_size(), alignment) != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Unable to create a Layout for SHM provider.");
return RMW_RET_ERROR;
}

// Create SHM provider
const auto provider_creation_result = z_posix_shm_provider_new(&shm.shm_provider, z_loan(layout));
const auto provider_creation_result =
z_posix_shm_provider_new(&shm.shm_provider, z_loan(layout));
z_drop(z_move(layout));
if (provider_creation_result != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Unable to create a SHM provider.");
Expand Down
168 changes: 84 additions & 84 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,17 @@ create_map_and_set_sequence_num(
}

rmw_ret_t publish(
rmw_zenoh_cpp::rmw_publisher_data_t* publisher_data,
z_moved_bytes_t* payload) {
rmw_zenoh_cpp::rmw_publisher_data_t* publisher_data,
z_moved_bytes_t* payload) {

auto free_payload = rcpputils::make_scope_exit(
[&payload]() { z_drop(payload); });
[&payload]() { z_drop(payload); });

z_owned_bytes_t attachment;
if (!create_map_and_set_sequence_num(
&attachment,
publisher_data->get_next_sequence_number(),
publisher_data->pub_gid)) {
&attachment,
publisher_data->get_next_sequence_number(),
publisher_data->pub_gid)) {
// create_map_and_set_sequence_num already set the error
return RMW_RET_ERROR;
}
Expand All @@ -161,44 +161,47 @@ rmw_ret_t publish(
}

/// Publish as RAW message.
template<typename Tserializer, typename... SerializerArgs>
template<typename Tserializer, typename ... SerializerArgs>
rmw_ret_t publish_raw(
rmw_zenoh_cpp::rmw_publisher_data_t* publisher_data,
size_t max_data_length,
SerializerArgs... serializer_args) {
rmw_zenoh_cpp::rmw_publisher_data_t* publisher_data,
size_t max_data_length,
SerializerArgs... serializer_args) {
// printf(">>> rmw_publish(), Will use RAW\n");

rcutils_allocator_t *allocator =
&(publisher_data->context->options.allocator);
rcutils_allocator_t * allocator =
&(publisher_data->context->options.allocator);

// Get memory from the allocator.
char * msg_bytes = static_cast<char *>(
allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(msg_bytes, "bytes for message is null",
return RMW_RET_BAD_ALLOC);
allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(
msg_bytes, "bytes for message is null",
return RMW_RET_BAD_ALLOC);

auto free_msg_bytes =
rcpputils::make_scope_exit([&msg_bytes, allocator]() {
if (msg_bytes) {
allocator->deallocate(msg_bytes, allocator->state);
}
});
rcpputils::make_scope_exit(
[&msg_bytes, allocator]() {
if (msg_bytes) {
allocator->deallocate(msg_bytes, allocator->state);
}
});

// Serialize message into memory
const size_t data_length = Tserializer::serialize_into(
msg_bytes,
max_data_length,
publisher_data,
serializer_args...);
msg_bytes,
max_data_length,
publisher_data,
serializer_args...);
// Return error upon unsuccessful serialization
if(data_length == 0) {
if (data_length == 0) {
// serialize_into already set the error
return RMW_RET_ERROR;
}

z_owned_bytes_t payload;
if (z_bytes_serialize_from_buf(
&payload, reinterpret_cast<const uint8_t *>(msg_bytes), data_length) != Z_OK) {
&payload, reinterpret_cast<const uint8_t *>(msg_bytes), data_length) != Z_OK)
{
RMW_SET_ERROR_MSG("unable to serialize raw buffer into Zenoh Payload");
return RMW_RET_ERROR;
}
Expand All @@ -208,12 +211,12 @@ rmw_ret_t publish_raw(

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
/// Publish as SHM message.
template<typename Tserializer, typename... SerializerArgs>
template<typename Tserializer, typename ... SerializerArgs>
rmw_ret_t publish_shm(
rmw_zenoh_cpp::rmw_publisher_data_t* publisher_data,
size_t max_data_length,
const z_loaned_shm_provider_t *provider,
SerializerArgs... serializer_args) {
rmw_zenoh_cpp::rmw_publisher_data_t* publisher_data,
size_t max_data_length,
const z_loaned_shm_provider_t *provider,
SerializerArgs... serializer_args) {
// printf(">>> rmw_publish(), Will use SHM\n");

// Allocate SHM bufer
Expand All @@ -222,32 +225,33 @@ rmw_ret_t publish_shm(
{
z_alloc_alignment_t alignment = {0};
z_shm_provider_alloc_gc_defrag_blocking(
&alloc, provider,
max_data_length, alignment);
&alloc, provider,
max_data_length, alignment);
}

// Check allocation status
if (alloc.status != ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Unexpected failure during SHM buffer allocation.");
"rmw_zenoh_cpp", "Unexpected failure during SHM buffer allocation.");
return RMW_RET_ERROR;
}

// Cleanup SHM buffer upon scope exit
auto always_free_shmbuf = rcpputils::make_scope_exit([&alloc]() {
auto always_free_shmbuf = rcpputils::make_scope_exit(
[&alloc]() {
z_drop(z_move(alloc.buf));
});
});

// Serialize message into memory
char* msg_bytes = reinterpret_cast<char *>(z_shm_mut_data_mut(z_loan_mut(alloc.buf)));
const size_t data_length = Tserializer::serialize_into(
msg_bytes,
max_data_length,
publisher_data,
serializer_args...);
msg_bytes,
max_data_length,
publisher_data,
serializer_args...);

// Return error upon unsuccessful serialization
if(data_length == 0) {
if (data_length == 0) {
// serialize_into already set the error
return RMW_RET_ERROR;
}
Expand All @@ -265,27 +269,28 @@ rmw_ret_t publish_shm(
#endif

/// Publish using raw or SHM(if applicable) buffer
template<typename Tserializer, typename... SerializerArgs>
template<typename Tserializer, typename ... SerializerArgs>
rmw_ret_t publish_with_method_selection(
rmw_zenoh_cpp::rmw_publisher_data_t* publisher_data,
size_t max_data_length,
SerializerArgs... serializer_args) {

rmw_zenoh_cpp::rmw_publisher_data_t* publisher_data,
size_t max_data_length,
SerializerArgs... serializer_args)
{
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
if (publisher_data->context->impl->shm.has_value() &&
publisher_data->context->impl->shm.value().msgsize_threshold <= max_data_length) {
return publish_shm<Tserializer, SerializerArgs...>(
publisher_data,
max_data_length,
z_loan(publisher_data->context->impl->shm.value().shm_provider),
serializer_args...);
} else
publisher_data->context->impl->shm.value().msgsize_threshold <= max_data_length)
{
return publish_shm<Tserializer, SerializerArgs...>(
publisher_data,
max_data_length,
z_loan(publisher_data->context->impl->shm.value().shm_provider),
serializer_args...);
} else
#endif
{
return publish_raw<Tserializer, SerializerArgs...>(
publisher_data,
max_data_length,
serializer_args...);
serializer_args ...);
}
}

Expand Down Expand Up @@ -980,12 +985,14 @@ rmw_return_loaned_message_from_publisher(
return RMW_RET_UNSUPPORTED;
}

struct RosMsgSerializer {
struct RosMsgSerializer
{
static size_t serialize_into(
char *buffer,
size_t size,
rmw_zenoh_cpp::rmw_publisher_data_t* publisher_data,
const void *ros_message) {
char *buffer,
size_t size,
rmw_zenoh_cpp::rmw_publisher_data_t* publisher_data,
const void *ros_message)
{
// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(buffer, size);

Expand All @@ -994,7 +1001,8 @@ struct RosMsgSerializer {

// Serialize message
if (!publisher_data->type_support->serialize_ros_message(
ros_message, ser.get_cdr(), publisher_data->type_support_impl)) {
ros_message, ser.get_cdr(), publisher_data->type_support_impl))
{
RMW_SET_ERROR_MSG("could not serialize ROS message");
return 0;
}
Expand All @@ -1004,15 +1012,17 @@ struct RosMsgSerializer {
};


struct RosSerializedMsgSerializer {
struct RosSerializedMsgSerializer
{
static size_t serialize_into(
char *_buffer,
size_t _size,
rmw_zenoh_cpp::rmw_publisher_data_t* publisher_data,
const rmw_serialized_message_t *serialized_message) {
char *_buffer,
size_t _size,
rmw_zenoh_cpp::rmw_publisher_data_t* publisher_data,
const rmw_serialized_message_t *serialized_message)
{
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer),
serialized_message->buffer_length);
reinterpret_cast<char *>(serialized_message->buffer),
serialized_message->buffer_length);
rmw_zenoh_cpp::Cdr ser(buffer);
if (!ser.get_cdr().jump(serialized_message->buffer_length)) {
RMW_SET_ERROR_MSG("cannot correctly set serialized buffer");
Expand Down Expand Up @@ -1042,31 +1052,21 @@ rmw_publish(
return RMW_RET_INVALID_ARGUMENT);

auto publisher_data =
static_cast<rmw_zenoh_cpp::rmw_publisher_data_t *>(publisher->data);
RMW_CHECK_FOR_NULL_WITH_MSG(publisher_data, "publisher_data is null",
return RMW_RET_INVALID_ARGUMENT);
static_cast<rmw_zenoh_cpp::rmw_publisher_data_t *>(publisher->data);
RMW_CHECK_FOR_NULL_WITH_MSG(
publisher_data, "publisher_data is null",
return RMW_RET_INVALID_ARGUMENT);
// estimate serialized data size
size_t max_data_length =
publisher_data->type_support->get_estimated_serialized_size(
ros_message, publisher_data->type_support_impl);

return publish_with_method_selection<RosMsgSerializer>(
publisher_data,
max_data_length,
ros_message);
publisher_data,
max_data_length,
ros_message);
}












//==============================================================================
/// Publish a loaned ROS message.
rmw_ret_t
Expand Down

0 comments on commit 01b34c5

Please sign in to comment.