Skip to content

Commit

Permalink
Fix incompatibility with Iron and Rolling (fix #21) (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
JEnoch authored Nov 30, 2023
1 parent a485c63 commit 38cfa93
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 19 deletions.
42 changes: 41 additions & 1 deletion zenoh-plugin-ros2dds/src/ros2_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
// ZettaScale Zenoh Team, <[email protected]>
//

use std::sync::atomic::{AtomicU32, Ordering};
use std::{
env::VarError,
sync::atomic::{AtomicU32, Ordering},
};

use cyclors::{
dds_entity_t,
Expand All @@ -29,7 +32,12 @@ use crate::{config::Config, dds_utils::get_guid, ke_for_sure};
pub const ROS2_ACTION_CANCEL_GOAL_SRV_TYPE: &str = "action_msgs/srv/CancelGoal";
pub const ROS2_ACTION_STATUS_MSG_TYPE: &str = "action_msgs/msg/GoalStatusArray";

// ROS_DISTRO value assumed if the environment variable is not set
pub const ASSUMED_ROS_DISTRO: &str = "iron";

lazy_static::lazy_static!(
pub static ref ROS_DISTRO: String = get_ros_distro();

pub static ref KE_SUFFIX_ACTION_SEND_GOAL: &'static keyexpr = ke_for_sure!("_action/send_goal");
pub static ref KE_SUFFIX_ACTION_CANCEL_GOAL: &'static keyexpr = ke_for_sure!("_action/cancel_goal");
pub static ref KE_SUFFIX_ACTION_GET_RESULT: &'static keyexpr = ke_for_sure!("_action/get_result");
Expand All @@ -41,6 +49,38 @@ lazy_static::lazy_static!(
pub static ref QOS_DEFAULT_ACTION_STATUS: Qos = ros2_action_status_default_qos();
);

pub fn get_ros_distro() -> String {
match std::env::var("ROS_DISTRO") {
Ok(s) if !s.is_empty() => {
log::debug!("ROS_DISTRO detected: {s}");
s
}
Ok(_) | Err(VarError::NotPresent) => {
log::warn!(
"ROS_DISTRO environment variable is not set. \
Assuming '{ASSUMED_ROS_DISTRO}', but this could lead to errors on 'ros_discovery_info' \
(see https://github.com/eclipse-zenoh/zenoh-plugin-ros2dds/issues/21)"
);
ASSUMED_ROS_DISTRO.to_string()
}
Err(VarError::NotUnicode(s)) => {
log::warn!(
"ROS_DISTRO environment variable is invalid ('{s:?}'). \
Assuming '{ASSUMED_ROS_DISTRO}', but this could lead to errors on 'ros_discovery_info' \
(see https://github.com/eclipse-zenoh/zenoh-plugin-ros2dds/issues/21)"
);
ASSUMED_ROS_DISTRO.to_string()
}
}
}

/// Check if the ROS_DISTRO is older than `distro`, comparing the 1st char.
/// None is returned if ROS_DISTRO is not set.
pub fn ros_distro_is_less_than(distro: &str) -> bool {
assert!(!distro.is_empty());
ROS_DISTRO.chars().next() < distro.chars().next()
}

/// Convert ROS2 interface name to a Zenoh key expression,
/// prefixing with "namespace" if configured
pub fn ros2_name_to_key_expr(ros2_name: &str, config: &Config) -> OwnedKeyExpr {
Expand Down
117 changes: 99 additions & 18 deletions zenoh-plugin-ros2dds/src/ros_discovery.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::dds_types::DDSRawSample;
use crate::ros2_utils::ros_distro_is_less_than;
use crate::{ChannelEvent, ROS_DISCOVERY_INFO_PUSH_INTERVAL_MS};
//
// Copyright (c) 2022 ZettaScale Technology
Expand Down Expand Up @@ -284,8 +285,8 @@ impl RosDiscoveryInfoMgr {
Ok(i) => Some(i),
Err(e) => {
log::warn!(
"Error receiving ParticipantEntitiesInfo on ros_discovery_info: {}",
e
"Error receiving ParticipantEntitiesInfo on ros_discovery_info: {} - payload: {}",
e, sample.hex_encode()
);
None
}
Expand Down Expand Up @@ -471,11 +472,11 @@ fn serialize_ros_gid<S>(gid: &Gid, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
if serializer.is_human_readable() {
if serializer.is_human_readable() || !ros_distro_is_less_than("iron") {
gid.serialize(serializer)
} else {
// Data size for gid in ROS messages in 24 bytes, while a DDS gid is 16 bytes.
// Rely on "impl Serialize for Gid" for the 16 bytes, and add the last 8 bytes.
// #21: prior to iron the Gid type in ROS messages was 'char[24] data'.
// Then 8 bytes shall be added since here it's defined as 16 bytes (as per DDS spec)
Serialize::serialize(&(gid, &BYTES_8), serializer)
}
}
Expand All @@ -484,12 +485,12 @@ fn deserialize_ros_gid<'de, D>(deserializer: D) -> Result<Gid, D::Error>
where
D: Deserializer<'de>,
{
if deserializer.is_human_readable() {
if deserializer.is_human_readable() || !ros_distro_is_less_than("iron") {
// Rely on impl<'de> Deserialize<'de> for Gid
Deserialize::deserialize(deserializer)
} else {
// Data size for gid in ROS messages in 24 bytes, while a DDS gid is 16 bytes.
// Rely on "impl<'de> Deserialize<'de> for Gid" for the 16 bytes, and ignore the last 8 bytes
// #21: prior to iron the Gid type in ROS messages was 'char[24] data'.
// then 8 bytes shall be removed since here it's defined as 16 bytes (as per DDS spec)
let (result, _ignore): (Gid, [u8; 8]) = Deserialize::deserialize(deserializer)?;
Ok(result)
}
Expand All @@ -502,11 +503,11 @@ where
let is_human_readable = serializer.is_human_readable();
let mut seq: <S as Serializer>::SerializeSeq = serializer.serialize_seq(Some(gids.len()))?;
for gid in gids {
if is_human_readable {
if is_human_readable || !ros_distro_is_less_than("iron") {
seq.serialize_element(gid)?;
} else {
// Data size for gid in ROS messages in 24 bytes, while a DDS gid is 16 bytes.
// Rely on "impl Serialize for Gid" for the 16 bytes, and add the last 8 bytes.
// #21: prior to iron the Gid type in ROS messages was 'char[24] data'.
// Then 8 bytes shall be added since here it's defined as 16 bytes (as per DDS spec)
seq.serialize_element(&(gid, &BYTES_8))?;
}
}
Expand All @@ -517,16 +518,16 @@ fn deserialize_ros_gids<'de, D>(deserializer: D) -> Result<HashSet<Gid>, D::Erro
where
D: Deserializer<'de>,
{
if deserializer.is_human_readable() {
if deserializer.is_human_readable() || !ros_distro_is_less_than("iron") {
Deserialize::deserialize(deserializer)
} else {
// Data size for gid in ROS messages in 24 bytes, while a DDS gid is 16 bytes.
// Deserialize as Vec<[u8; 24]>, consider 16 bytes only for each
// #21: prior to iron the Gid type in ROS messages was 'char[24] data'.
// then 8 bytes shall be removed since here it's defined as 16 bytes (as per DDS spec)
let ros_gids: Vec<[u8; 24]> = Deserialize::deserialize(deserializer)?;
// NOTE: a DDS gid is 16 bytes only. ignore the last 8 bytes
Ok(ros_gids
.iter()
.map(|ros_gid| {
// Ignore the last 8 bytes
TryInto::<&[u8; 16]>::try_into(&ros_gid[..16])
.unwrap()
.into()
Expand Down Expand Up @@ -564,16 +565,21 @@ where
}

mod tests {

#[test]
fn test_serde() {
#[ignore]
// Test ignored as it cannot be run at the same time than test_serde_after_iron()
// Both need different ROS_DISTRO env var, that cannot be changed between the 2 tests
// Run this test individually or with `cargo test -- --ignored``
fn test_serde_prior_to_iron() {
use super::*;
use crate::ros2_utils::get_ros_distro;
use std::str::FromStr;

// ros_discovery_message sent by a component_container node started as such:
// ros_discovery_message sent by a component_container node on Humble started as such:
// - ros2 run rclcpp_components component_container --ros-args --remap __ns:=/TEST
// - ros2 component load /TEST/ComponentManager composition composition::Listener
// - ros2 component load /TEST/ComponentManager composition composition::Talker
std::env::set_var("ROS_DISTRO", "humble");
let ros_discovery_info_cdr: Vec<u8> = hex::decode(
"000100000110de17b1eaf995400c9ac8000001c1000000000000000003000000\
060000002f5445535400000011000000436f6d706f6e656e744d616e61676572\
Expand Down Expand Up @@ -613,6 +619,7 @@ mod tests {
)
.unwrap();

println!("ROS_DISTRO={}", get_ros_distro());
let part_info: ParticipantEntitiesInfo = cdr::deserialize(&ros_discovery_info_cdr).unwrap();
println!("{:?}", part_info);

Expand Down Expand Up @@ -643,4 +650,78 @@ mod tests {
assert_eq!(node_talker.reader_gid_seq.len(), 7);
assert_eq!(node_talker.writer_gid_seq.len(), 9);
}

#[test]
fn test_serde_after_iron() {
use super::*;
use crate::ros2_utils::get_ros_distro;
use std::str::FromStr;

// ros_discovery_message sent by a component_container node on Iron started as such:
// - ros2 run rclcpp_components component_container --ros-args --remap __ns:=/TEST
// - ros2 component load /TEST/ComponentManager composition composition::Listener
// - ros2 component load /TEST/ComponentManager composition composition::Talker
std::env::set_var("ROS_DISTRO", "iron");
let ros_discovery_info_cdr: Vec<u8> = hex::decode(
"00010000010f20a26b2fbd8000000000000001c103000000060000002f544553\
5400000011000000436f6d706f6e656e744d616e616765720000000005000000\
010f20a26b2fbd800000000000000404010f20a26b2fbd800000000000000504\
010f20a26b2fbd800000000000000704010f20a26b2fbd800000000000000904\
010f20a26b2fbd800000000000000b0405000000010f20a26b2fbd8000000000\
00000303010f20a26b2fbd800000000000000603010f20a26b2fbd8000000000\
00000803010f20a26b2fbd800000000000000a03010f20a26b2fbd8000000000\
00000c03020000002f000000090000006c697374656e65720000000009000000\
010f20a26b2fbd800000000000000e04010f20a26b2fbd800000000000001004\
010f20a26b2fbd800000000000001204010f20a26b2fbd800000000000001404\
010f20a26b2fbd800000000000001604010f20a26b2fbd800000000000001804\
010f20a26b2fbd800000000000001b04010f20a26b2fbd800000000000001c04\
010f20a26b2fbd800000000000001e0409000000010f20a26b2fbd8000000000\
00000d03010f20a26b2fbd800000000000000f03010f20a26b2fbd8000000000\
00001103010f20a26b2fbd800000000000001303010f20a26b2fbd8000000000\
00001503010f20a26b2fbd800000000000001703010f20a26b2fbd8000000000\
00001903010f20a26b2fbd800000000000001a03010f20a26b2fbd8000000000\
00001d03020000002f0000000700000074616c6b6572000008000000010f20a2\
6b2fbd800000000000002004010f20a26b2fbd800000000000002204010f20a2\
6b2fbd800000000000002404010f20a26b2fbd800000000000002604010f20a2\
6b2fbd800000000000002804010f20a26b2fbd800000000000002a04010f20a2\
6b2fbd800000000000002d04010f20a26b2fbd800000000000002e040a000000\
010f20a26b2fbd800000000000001f03010f20a26b2fbd800000000000002103\
010f20a26b2fbd800000000000002303010f20a26b2fbd800000000000002503\
010f20a26b2fbd800000000000002703010f20a26b2fbd800000000000002903\
010f20a26b2fbd800000000000002b03010f20a26b2fbd800000000000002c03\
010f20a26b2fbd800000000000002f03010f20a26b2fbd800000000000003003",
)
.unwrap();

println!("ROS_DISTRO={}", get_ros_distro());
let part_info: ParticipantEntitiesInfo = cdr::deserialize(&ros_discovery_info_cdr).unwrap();
println!("{:?}", part_info);

assert_eq!(
part_info.gid,
Gid::from_str("010f20a26b2fbd8000000000000001c1").unwrap()
);
assert_eq!(part_info.node_entities_info_seq.len(), 3);

let node_componentmgr = part_info
.node_entities_info_seq
.get("/TEST/ComponentManager")
.unwrap();
assert_eq!(node_componentmgr.node_namespace, "/TEST");
assert_eq!(node_componentmgr.node_name, "ComponentManager");
assert_eq!(node_componentmgr.reader_gid_seq.len(), 5);
assert_eq!(node_componentmgr.writer_gid_seq.len(), 5);

let node_listener = part_info.node_entities_info_seq.get("/listener").unwrap();
assert_eq!(node_listener.node_namespace, "/");
assert_eq!(node_listener.node_name, "listener");
assert_eq!(node_listener.reader_gid_seq.len(), 9);
assert_eq!(node_listener.writer_gid_seq.len(), 9);

let node_talker = part_info.node_entities_info_seq.get("/talker").unwrap();
assert_eq!(node_talker.node_namespace, "/");
assert_eq!(node_talker.node_name, "talker");
assert_eq!(node_talker.reader_gid_seq.len(), 8);
assert_eq!(node_talker.writer_gid_seq.len(), 10);
}
}

0 comments on commit 38cfa93

Please sign in to comment.