Skip to content

Commit

Permalink
Merge commit 'f60c2a2a2090311d803de4a7e2151f7ab6ce1433'
Browse files Browse the repository at this point in the history
Conflicts:
	zenoh_c_vendor/CMakeLists.txt
  • Loading branch information
yellowhatter committed Aug 29, 2024
2 parents abbe1be + f60c2a2 commit d696a60
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 60 deletions.
5 changes: 3 additions & 2 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,11 @@ install(
)

add_executable(rmw_zenohd
src/zenohd/main.cpp
src/detail/zenoh_config.cpp
src/detail/liveliness_utils.cpp
src/detail/logging.cpp
src/detail/qos.cpp
src/detail/zenoh_config.cpp
src/zenohd/main.cpp
)

target_link_libraries(rmw_zenohd
Expand Down
13 changes: 10 additions & 3 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,16 @@
data_low: 4,
background: 4,
},
/// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress.
/// Higher values lead to a more aggressive batching but it will introduce additional latency.
backoff: 100,
/// Perform batching of messages if they are smaller of the batch_size
batching: {
/// Perform adaptive batching of messages if they are smaller of the batch_size.
/// When the network is detected to not be fast enough to transmit every message individually, many small messages may be
/// batched together and sent all at once on the wire reducing the overall network overhead. This is typically of a high-throughput
/// scenario mainly composed of small messages. In other words, batching is activated by the network back-pressure.
enabled: true,
/// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens.
time_limit: 1,
}
},
// Number of threads dedicated to transmission
// By default, the number of threads is calculated as follows: 1 + ((#cores - 1) / 4)
Expand Down
13 changes: 10 additions & 3 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,16 @@
data_low: 4,
background: 4,
},
/// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress.
/// Higher values lead to a more aggressive batching but it will introduce additional latency.
backoff: 100,
/// Perform batching of messages if they are smaller of the batch_size
batching: {
/// Perform adaptive batching of messages if they are smaller of the batch_size.
/// When the network is detected to not be fast enough to transmit every message individually, many small messages may be
/// batched together and sent all at once on the wire reducing the overall network overhead. This is typically of a high-throughput
/// scenario mainly composed of small messages. In other words, batching is activated by the network back-pressure.
enabled: true,
/// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens.
time_limit: 1,
}
},
// Number of threads dedicated to transmission
// By default, the number of threads is calculated as follows: 1 + ((#cores - 1) / 4)
Expand Down
145 changes: 97 additions & 48 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
#include <optional>
#include <sstream>
#include <string>
#include <type_traits>
#include <unordered_map>
#include <utility>
#include <vector>

#include "logging_macros.hpp"
#include "qos.hpp"

#include "rcpputils/scope_exit.hpp"

Expand Down Expand Up @@ -167,48 +169,112 @@ std::vector<std::string> split_keyexpr(
result.push_back(keyexpr.substr(start));
return result;
}

///=============================================================================
// Helper function to convert string to size_t.
// The function is templated to enable conversion to size_t or std::size_t.
template<typename T>
std::optional<T> str_to_size_t(const std::string & str, const T default_value)
{
if (str.empty()) {
return default_value;
}
errno = 0;
char * endptr;
// TODO(Yadunund): strtoul returns an unsigned long, not size_t.
// Depending on the architecture and platform, these may not be the same size.
// Further, if the incoming str is a signed integer, storing it in a size_t is incorrect.
// We should fix this piece of code to deal with both of those situations.
size_t num = strtoul(str.c_str(), &endptr, 10);
if (endptr == str.c_str()) {
// No values were converted, this is an error
RMW_SET_ERROR_MSG("no valid numbers available");
return std::nullopt;
} else if (*endptr != '\0') {
// There was junk after the number
RMW_SET_ERROR_MSG("non-numeric values");
return std::nullopt;
} else if (errno != 0) {
// Some other error occurred, which may include overflow or underflow
RMW_SET_ERROR_MSG(
"an undefined error occurred while getting the number, this may be an overflow");
return std::nullopt;
}
return num;
}
} // namespace

///=============================================================================
// TODO(Yadunund): Rely on maps to retrieve strings.
std::string qos_to_keyexpr(rmw_qos_profile_t qos)
std::string qos_to_keyexpr(const rmw_qos_profile_t & qos)
{
std::string keyexpr = "";
const rmw_qos_profile_t & default_qos = QoS::get().default_qos();

// Reliability.
keyexpr += std::to_string(qos.reliability);
if (qos.reliability != default_qos.reliability) {
keyexpr += std::to_string(qos.reliability);
}
keyexpr += QOS_DELIMITER;

// Durability.
keyexpr += std::to_string(qos.durability);
if (qos.durability != default_qos.durability) {
keyexpr += std::to_string(qos.durability);
}
keyexpr += QOS_DELIMITER;

// History.
keyexpr += std::to_string(qos.history);
if (qos.history != default_qos.history) {
keyexpr += std::to_string(qos.history);
}
keyexpr += QOS_COMPONENT_DELIMITER;
keyexpr += std::to_string(qos.depth);
if (qos.depth != default_qos.depth) {
keyexpr += std::to_string(qos.depth);
}
keyexpr += QOS_DELIMITER;

// Deadline.
keyexpr += std::to_string(qos.deadline.sec);
if (qos.deadline.sec != default_qos.deadline.sec) {
keyexpr += std::to_string(qos.deadline.sec);
}
keyexpr += QOS_COMPONENT_DELIMITER;
keyexpr += std::to_string(qos.deadline.nsec);
if (qos.deadline.nsec != default_qos.deadline.nsec) {
keyexpr += std::to_string(qos.deadline.nsec);
}
keyexpr += QOS_DELIMITER;

// Lifespan.
keyexpr += std::to_string(qos.lifespan.sec);
if (qos.lifespan.sec != default_qos.lifespan.sec) {
keyexpr += std::to_string(qos.lifespan.sec);
}
keyexpr += QOS_COMPONENT_DELIMITER;
keyexpr += std::to_string(qos.lifespan.nsec);
if (qos.lifespan.nsec != default_qos.lifespan.nsec) {
keyexpr += std::to_string(qos.lifespan.nsec);
}
keyexpr += QOS_DELIMITER;

// Liveliness.
keyexpr += std::to_string(qos.liveliness);
if (qos.liveliness != default_qos.liveliness) {
keyexpr += std::to_string(qos.liveliness);
}
keyexpr += QOS_COMPONENT_DELIMITER;
keyexpr += std::to_string(qos.liveliness_lease_duration.sec);
if (qos.liveliness_lease_duration.sec != default_qos.liveliness_lease_duration.sec) {
keyexpr += std::to_string(qos.liveliness_lease_duration.sec);
}
keyexpr += QOS_COMPONENT_DELIMITER;
keyexpr += std::to_string(qos.liveliness_lease_duration.nsec);
if (qos.liveliness_lease_duration.nsec != default_qos.liveliness_lease_duration.nsec) {
keyexpr += std::to_string(qos.liveliness_lease_duration.nsec);
}

return keyexpr;
}

///=============================================================================
std::optional<rmw_qos_profile_t> keyexpr_to_qos(const std::string & keyexpr)
{
const rmw_qos_profile_t & default_qos = QoS::get().default_qos();
rmw_qos_profile_t qos;

const std::vector<std::string> parts = split_keyexpr(keyexpr, QOS_DELIMITER);
if (parts.size() < 6) {
return std::nullopt;
Expand All @@ -232,46 +298,29 @@ std::optional<rmw_qos_profile_t> keyexpr_to_qos(const std::string & keyexpr)
}

try {
qos.history = str_to_qos_history.at(history_parts[0]);
qos.reliability = str_to_qos_reliability.at(parts[0]);
qos.durability = str_to_qos_durability.at(parts[1]);
qos.liveliness = str_to_qos_liveliness.at(liveliness_parts[0]);
qos.history = history_parts[0].empty() ? default_qos.history : str_to_qos_history.at(
history_parts[0]);
qos.reliability = parts[0].empty() ? default_qos.reliability : str_to_qos_reliability.at(
parts[0]);
qos.durability = parts[1].empty() ? default_qos.durability : str_to_qos_durability.at(parts[1]);
qos.liveliness =
liveliness_parts[0].empty() ? default_qos.liveliness : str_to_qos_liveliness.at(
liveliness_parts[0]);
} catch (const std::exception & e) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("Error setting QoS values from strings: %s", e.what());
return std::nullopt;
}

// Helper function to convert string to size_t.
auto str_to_size_t =
[](const std::string & str) -> std::optional<size_t>
{
errno = 0;
char * endptr;
size_t num = strtoul(str.c_str(), &endptr, 10);
if (endptr == str.c_str()) {
// No values were converted, this is an error
RMW_SET_ERROR_MSG("no valid numbers available");
return std::nullopt;
} else if (*endptr != '\0') {
// There was junk after the number
RMW_SET_ERROR_MSG("non-numeric values");
return std::nullopt;
} else if (errno != 0) {
// Some other error occurred, which may include overflow or underflow
RMW_SET_ERROR_MSG(
"an undefined error occurred while getting the number, this may be an overflow");
return std::nullopt;
}
return num;
};

const auto maybe_depth = str_to_size_t(history_parts[1]);
const auto maybe_deadline_s = str_to_size_t(deadline_parts[0]);
const auto maybe_deadline_ns = str_to_size_t(deadline_parts[1]);
const auto maybe_lifespan_s = str_to_size_t(lifespan_parts[0]);
const auto maybe_lifespan_ns = str_to_size_t(lifespan_parts[1]);
const auto maybe_liveliness_s = str_to_size_t(liveliness_parts[1]);
const auto maybe_liveliness_ns = str_to_size_t(liveliness_parts[2]);
const auto maybe_depth = str_to_size_t(history_parts[1], default_qos.depth);
const auto maybe_deadline_s = str_to_size_t(deadline_parts[0], default_qos.deadline.sec);
const auto maybe_deadline_ns = str_to_size_t(deadline_parts[1], default_qos.deadline.nsec);
const auto maybe_lifespan_s = str_to_size_t(lifespan_parts[0], default_qos.lifespan.sec);
const auto maybe_lifespan_ns = str_to_size_t(lifespan_parts[1], default_qos.lifespan.nsec);
const auto maybe_liveliness_s = str_to_size_t(
liveliness_parts[1],
default_qos.liveliness_lease_duration.sec);
const auto maybe_liveliness_ns = str_to_size_t(
liveliness_parts[2],
default_qos.liveliness_lease_duration.nsec);
if (maybe_depth == std::nullopt ||
maybe_deadline_s == std::nullopt ||
maybe_deadline_ns == std::nullopt ||
Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/detail/liveliness_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ std::string demangle_name(const std::string & input);
*
* See rmw/types.h for the values of each policy enum.
*/
std::string qos_to_keyexpr(rmw_qos_profile_t qos);
std::string qos_to_keyexpr(const rmw_qos_profile_t & qos);

///=============================================================================
/// Convert a rmw_qos_profile_t from a keyexpr. Return std::nullopt if invalid.
Expand Down
7 changes: 4 additions & 3 deletions zenoh_c_vendor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ ament_vendor(zenoh_c_vendor
VCS_URL https://github.com/eclipse-zenoh/zenoh-c
VCS_VERSION dev/1.0.0
CMAKE_ARGS
-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}
-DZENOHC_BUILD_WITH_SHARED_MEMORY=${RMW_ZENOH_BUILD_WITH_SHARED_MEMORY}
-DZENOHC_BUILD_WITH_UNSTABLE_API=${RMW_ZENOH_BUILD_WITH_SHARED_MEMORY}
"-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}"
"-DZENOHC_BUILD_WITH_SHARED_MEMORY=${RMW_ZENOH_BUILD_WITH_SHARED_MEMORY}"
"-DZENOHC_BUILD_WITH_UNSTABLE_API=true"
"-DZENOHC_CUSTOM_TARGET=${ZENOHC_CUSTOM_TARGET}"
)

ament_package()

0 comments on commit d696a60

Please sign in to comment.