From ecb23a30e8bed586face206c95f461fdb47dd2f8 Mon Sep 17 00:00:00 2001 From: nerdtomars Date: Mon, 1 Jan 2024 13:04:48 +0800 Subject: [PATCH 1/3] add get_publishers_info_by_topic --- r2r/src/nodes.rs | 93 ++++++++++++++++++ r2r/src/qos.rs | 102 +++++++++++++++++++ r2r/tests/tokio_testing.rs | 195 ++++++++++++++++++++++++------------- 3 files changed, 324 insertions(+), 66 deletions(-) diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index 02910ae77..00864e265 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -1169,6 +1169,46 @@ impl Node { Ok(res) } + pub fn get_publishers_info_by_topic( + &self, topic_name: &str, no_mangle: bool, + ) -> Result> { + let node = self.node_handle.as_ref(); + + let topic_c_string = + CString::new(topic_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; + + let mut allocator = unsafe { rcutils_get_default_allocator() }; + + let mut info_array: rcl_topic_endpoint_info_array_t = + unsafe { rmw_get_zero_initialized_topic_endpoint_info_array() }; + + let result = unsafe { + rcl_get_publishers_info_by_topic( + node, + &mut allocator, + topic_c_string.as_ptr(), + no_mangle, + &mut info_array, + ) + }; + + if result != RCL_RET_OK as i32 { + unsafe { rmw_topic_endpoint_info_array_fini(&mut info_array, &mut allocator) }; + return Err(Error::from_rcl_error(result)); + } + + // Convert info_array to Vec + let topic_info_list = convert_info_array_to_vec(&info_array); + + let result = unsafe { rmw_topic_endpoint_info_array_fini(&mut info_array, &mut allocator) }; + + if result != RCL_RET_OK as i32 { + return Err(Error::from_rcl_error(result)); + } + + Ok(topic_info_list) + } + /// Create a ROS wall timer. /// /// Create a ROS timer that is woken up by spin every `period`. @@ -1324,3 +1364,56 @@ impl Drop for Node { pub trait IsAvailablePollable { fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()>; } + +pub struct TopicEndpointInfo { + pub node_name: String, + pub node_namespace: String, + pub topic_type: String, + pub endpoint_gid: [u8; RMW_GID_STORAGE_SIZE as usize], + pub qos_profile: QosProfile, +} + +impl From for TopicEndpointInfo { + fn from(info: rmw_topic_endpoint_info_s) -> Self { + // Convert C strings to Rust String + let node_name = unsafe { CStr::from_ptr(info.node_name) } + .to_string_lossy() + .into_owned(); + let node_namespace = unsafe { CStr::from_ptr(info.node_namespace) } + .to_string_lossy() + .into_owned(); + let topic_type = unsafe { CStr::from_ptr(info.topic_type) } + .to_string_lossy() + .into_owned(); + + // Copy the endpoint_gid array + let endpoint_gid: [u8; RMW_GID_STORAGE_SIZE as usize] = info.endpoint_gid; + + // Convert qos_profile + let qos_profile = QosProfile::from(info.qos_profile); // Adjust this line based on how QosProfile is defined + + TopicEndpointInfo { + node_name, + node_namespace, + topic_type, + endpoint_gid, + qos_profile, + } + } +} + +fn convert_info_array_to_vec( + info_array: &rcl_topic_endpoint_info_array_t, +) -> Vec { + let mut topic_info_list = Vec::with_capacity(info_array.size); + + unsafe { + let infos = std::slice::from_raw_parts(info_array.info_array, info_array.size); + for &info in infos { + let endpoint_info = TopicEndpointInfo::from(info); + topic_info_list.push(endpoint_info); + } + } + + topic_info_list +} diff --git a/r2r/src/qos.rs b/r2r/src/qos.rs index abe4c3935..8aca68d43 100644 --- a/r2r/src/qos.rs +++ b/r2r/src/qos.rs @@ -233,6 +233,19 @@ impl From for rmw_qos_history_policy_t { } } +impl From for HistoryPolicy { + fn from(rmw_history_policy: rmw_qos_history_policy_t) -> Self { + match rmw_history_policy { + rmw_qos_history_policy_t::RMW_QOS_POLICY_HISTORY_KEEP_ALL => HistoryPolicy::KeepAll, + rmw_qos_history_policy_t::RMW_QOS_POLICY_HISTORY_KEEP_LAST => HistoryPolicy::KeepLast, + rmw_qos_history_policy_t::RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT => { + HistoryPolicy::SystemDefault + } + rmw_qos_history_policy_t::RMW_QOS_POLICY_HISTORY_UNKNOWN => HistoryPolicy::Unknown, + } + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub enum ReliabilityPolicy { BestEffort, @@ -259,6 +272,25 @@ impl From for rmw_qos_reliability_policy_t { } } +impl From for ReliabilityPolicy { + fn from(rmw_reliability_policy: rmw_qos_reliability_policy_t) -> Self { + match rmw_reliability_policy { + rmw_qos_reliability_policy_t::RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT => { + ReliabilityPolicy::BestEffort + } + rmw_qos_reliability_policy_t::RMW_QOS_POLICY_RELIABILITY_RELIABLE => { + ReliabilityPolicy::Reliable + } + rmw_qos_reliability_policy_t::RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT => { + ReliabilityPolicy::SystemDefault + } + rmw_qos_reliability_policy_t::RMW_QOS_POLICY_RELIABILITY_UNKNOWN => { + ReliabilityPolicy::Unknown + } + } + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub enum DurabilityPolicy { TransientLocal, @@ -286,6 +318,25 @@ impl From for rmw_qos_durability_policy_t { } } +impl From for DurabilityPolicy { + fn from(rmw_durability_policy: rmw_qos_durability_policy_t) -> Self { + match rmw_durability_policy { + rmw_qos_durability_policy_t::RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL => { + DurabilityPolicy::TransientLocal + } + rmw_qos_durability_policy_t::RMW_QOS_POLICY_DURABILITY_VOLATILE => { + DurabilityPolicy::Volatile + } + rmw_qos_durability_policy_t::RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT => { + DurabilityPolicy::SystemDefault + } + rmw_qos_durability_policy_t::RMW_QOS_POLICY_DURABILITY_UNKNOWN => { + DurabilityPolicy::Unknown + } + } + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub enum LivelinessPolicy { Automatic, @@ -317,6 +368,28 @@ impl From for rmw_qos_liveliness_policy_t { } } +impl From for LivelinessPolicy { + fn from(rmw_liveliness_policy: rmw_qos_liveliness_policy_t) -> Self { + match rmw_liveliness_policy { + rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_AUTOMATIC => { + LivelinessPolicy::Automatic + } + rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE => { + LivelinessPolicy::ManualByNode + } + rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC => { + LivelinessPolicy::ManualByTopic + } + rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT => { + LivelinessPolicy::SystemDefault + } + rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_UNKNOWN => { + LivelinessPolicy::Unknown + } + } + } +} + /// QoS profile #[derive(Debug, Clone, PartialEq, Eq)] pub struct QosProfile { @@ -719,8 +792,28 @@ impl From for r2r_rcl::rmw_qos_profile_t { } } } + +impl From for QosProfile { + fn from(rmw_qos: r2r_rcl::rmw_qos_profile_t) -> Self { + QosProfile { + history: rmw_qos.history.into(), + depth: rmw_qos.depth, + reliability: rmw_qos.reliability.into(), + durability: rmw_qos.durability.into(), + deadline: Duration::from_rmw_time_t(&rmw_qos.deadline), + lifespan: Duration::from_rmw_time_t(&rmw_qos.lifespan), + liveliness: rmw_qos.liveliness.into(), + liveliness_lease_duration: Duration::from_rmw_time_t( + &rmw_qos.liveliness_lease_duration, + ), + avoid_ros_namespace_conventions: rmw_qos.avoid_ros_namespace_conventions, + } + } +} + pub(crate) trait RclDurationT { fn to_rmw_time_t(&self) -> rmw_time_t; + fn from_rmw_time_t(rmw_time: &rmw_time_t) -> Self; } impl RclDurationT for Duration { @@ -730,4 +823,13 @@ impl RclDurationT for Duration { nsec: self.subsec_nanos().into(), } } + + fn from_rmw_time_t(rmw_time: &rmw_time_t) -> Self { + assert!( + rmw_time.nsec < 1_000_000_000, + "nsec part of rmw_time_t should be less than 1 billion" + ); + + Duration::new(rmw_time.sec, rmw_time.nsec as u32) + } } diff --git a/r2r/tests/tokio_testing.rs b/r2r/tests/tokio_testing.rs index aa4077f93..e081f7726 100644 --- a/r2r/tests/tokio_testing.rs +++ b/r2r/tests/tokio_testing.rs @@ -9,85 +9,148 @@ const N_TEARDOWN_CYCLES: usize = 2; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn tokio_testing() -> Result<(), Box> { - let mut threads = futures::stream::FuturesUnordered::from_iter( - (0..N_CONCURRENT_ROS_CONTEXT).map(|i_context| tokio::spawn(async move { - // Iterate to check for memory corruption on node setup/teardown - for i_cycle in 0..N_TEARDOWN_CYCLES { - - println!("tokio_testing iteration {i_cycle}"); - - let ctx = r2r::Context::create().unwrap(); - // let ctx = std::thread::spawn(|| r2r::Context::create().unwrap()).join().unwrap(); - - let mut node = r2r::Node::create(ctx, &format!("testnode_{i_context}"), "").unwrap(); - let mut s_the_no = - node.subscribe::(&format!("/the_no_{i_context}"), QosProfile::default()).unwrap(); - let mut s_new_no = - node.subscribe::(&format!("/new_no_{i_context}"), QosProfile::default()).unwrap(); - let p_the_no = - node.create_publisher::(&format!("/the_no_{i_context}"), QosProfile::default()).unwrap(); - let p_new_no = - node.create_publisher::(&format!("/new_no_{i_context}"), QosProfile::default()).unwrap(); - let state = Arc::new(Mutex::new(0)); - - task::spawn(async move { - (0..10).for_each(|i| { - p_the_no - .publish(&r2r::std_msgs::msg::Int32 { data: i }) - .unwrap(); - - println!("send {i}"); - + (0..N_CONCURRENT_ROS_CONTEXT).map(|i_context| { + tokio::spawn(async move { + // Iterate to check for memory corruption on node setup/teardown + for i_cycle in 0..N_TEARDOWN_CYCLES { + println!("tokio_testing iteration {i_cycle}"); + + let ctx = r2r::Context::create().unwrap(); + // let ctx = std::thread::spawn(|| r2r::Context::create().unwrap()).join().unwrap(); + + let mut node = + r2r::Node::create(ctx, &format!("testnode_{i_context}"), "").unwrap(); + let mut s_the_no = node + .subscribe::( + &format!("/the_no_{i_context}"), + QosProfile::default(), + ) + .unwrap(); + let mut s_new_no = node + .subscribe::( + &format!("/new_no_{i_context}"), + QosProfile::default(), + ) + .unwrap(); + let p_the_no = node + .create_publisher::( + &format!("/the_no_{i_context}"), + QosProfile::default(), + ) + .unwrap(); + let p_new_no = node + .create_publisher::( + &format!("/new_no_{i_context}"), + QosProfile::default(), + ) + .unwrap(); + + let p_float_no = node.create_publisher::( + &format!("/float_no_{i_context}"), + QosProfile::default().best_effort(), + )?; + + let pub_info = node + .get_publishers_info_by_topic(&format!("/float_no_{i_context}"), false)?; + assert_eq!(pub_info.len(), 1); + assert_eq!(pub_info[0].topic_type, "std_msgs/msg/Float32".to_owned()); + assert_eq!( + pub_info[0].qos_profile.reliability, + QosProfile::default().best_effort().reliability + ); + assert_eq!( + pub_info[0].qos_profile.durability, + QosProfile::default().durability + ); + + let pub_info = + node.get_publishers_info_by_topic(&format!("/new_no_{i_context}"), false)?; + assert_eq!(pub_info.len(), 1); + assert_eq!(pub_info[0].topic_type, "std_msgs/msg/Int32".to_owned()); + assert_eq!( + pub_info[0].qos_profile.reliability, + QosProfile::default().reliability + ); + assert_eq!( + pub_info[0].qos_profile.durability, + QosProfile::default().durability + ); + + let state = Arc::new(Mutex::new(0)); + + task::spawn(async move { + (0..10).for_each(|i| { + p_the_no + .publish(&r2r::std_msgs::msg::Int32 { data: i }) + .unwrap(); + + println!("send {i}"); + }); }); - }); - - task::spawn(async move { - while let Some(msg) = s_the_no.next().await { - p_new_no - .publish(&r2r::std_msgs::msg::Int32 { - data: msg.data + 10, - }) - .unwrap(); - println!("got {}, send {}", msg.data, msg.data + 10); - } - }); + task::spawn(async move { + while let Some(msg) = s_the_no.next().await { + p_new_no + .publish(&r2r::std_msgs::msg::Int32 { + data: msg.data + 10, + }) + .unwrap(); - let s = state.clone(); - task::spawn(async move { - while let Some(msg) = s_new_no.next().await { + println!("got {}, send {}", msg.data, msg.data + 10); + } + }); - println!("got {}", msg.data); + let s = state.clone(); + task::spawn(async move { + while let Some(msg) = s_new_no.next().await { + println!("got {}", msg.data); - let i = msg.data; + let i = msg.data; - *s.lock().unwrap() = i; - } - }); + *s.lock().unwrap() = i; + } + }); - // std::thread::spawn doesn't work here anymore? - let handle = task::spawn_blocking(move || { - for _ in 1..30 { - node.spin_once(std::time::Duration::from_millis(100)); - let x = state.lock().unwrap(); + task::spawn(async move { + (0..10).for_each(|i| { + p_the_no + .publish(&r2r::std_msgs::msg::Int32 { data: i }) + .unwrap(); + }); + }); - println!("rec {}", x); + task::spawn(async move { + (0..10).for_each(|i| { + p_float_no + .publish(&r2r::std_msgs::msg::Float32 { data: i as f32 }) + .unwrap(); + }); + }); - if *x == 19 { - break; - } - } + // std::thread::spawn doesn't work here anymore? + let handle = task::spawn_blocking(move || { + for _ in 1..30 { + node.spin_once(std::time::Duration::from_millis(100)); + let x = state.lock().unwrap(); - *state.lock().unwrap() - }); - let x = handle.await.unwrap(); - assert_eq!(x, 19); + println!("rec {}", x); - println!("tokio_testing finish iteration {i_cycle}"); + if *x == 19 { + break; + } + } - } - }))); + *state.lock().unwrap() + }); + let x = handle.await.unwrap(); + assert_eq!(x, 19); + + println!("tokio_testing finish iteration {i_cycle}"); + } + }) + }), + ); while let Some(thread) = threads.next().await { thread.unwrap(); From ca3a9906e8c9f91b59e9098c749ff23d694a69e1 Mon Sep 17 00:00:00 2001 From: nerdtomars Date: Tue, 9 Jan 2024 23:08:51 +0800 Subject: [PATCH 2/3] change type name and rebase --- r2r/src/nodes.rs | 6 +++--- r2r/tests/tokio_testing.rs | 26 +++++++++++--------------- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index 00864e265..f7e1427b8 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -792,7 +792,7 @@ impl Node { } /// Create a ROS publisher with a type given at runtime, where the data may either be - /// supplied as JSON (using the `publish` method) or a pre-serialized ROS message + /// supplied as JSON (using the `publish` method) or a pre-serialized ROS message /// (i.e. &[u8], using the `publish_raw` method). pub fn create_publisher_untyped( &mut self, topic: &str, topic_type: &str, qos_profile: QosProfile, @@ -1373,8 +1373,8 @@ pub struct TopicEndpointInfo { pub qos_profile: QosProfile, } -impl From for TopicEndpointInfo { - fn from(info: rmw_topic_endpoint_info_s) -> Self { +impl From for TopicEndpointInfo { + fn from(info: rmw_topic_endpoint_info_t) -> Self { // Convert C strings to Rust String let node_name = unsafe { CStr::from_ptr(info.node_name) } .to_string_lossy() diff --git a/r2r/tests/tokio_testing.rs b/r2r/tests/tokio_testing.rs index e081f7726..e7e684d76 100644 --- a/r2r/tests/tokio_testing.rs +++ b/r2r/tests/tokio_testing.rs @@ -46,13 +46,16 @@ async fn tokio_testing() -> Result<(), Box> { ) .unwrap(); - let p_float_no = node.create_publisher::( - &format!("/float_no_{i_context}"), - QosProfile::default().best_effort(), - )?; + let p_float_no = node + .create_publisher::( + &format!("/float_no_{i_context}"), + QosProfile::default().best_effort(), + ) + .unwrap(); let pub_info = node - .get_publishers_info_by_topic(&format!("/float_no_{i_context}"), false)?; + .get_publishers_info_by_topic(&format!("/float_no_{i_context}"), false) + .unwrap(); assert_eq!(pub_info.len(), 1); assert_eq!(pub_info[0].topic_type, "std_msgs/msg/Float32".to_owned()); assert_eq!( @@ -64,8 +67,9 @@ async fn tokio_testing() -> Result<(), Box> { QosProfile::default().durability ); - let pub_info = - node.get_publishers_info_by_topic(&format!("/new_no_{i_context}"), false)?; + let pub_info = node + .get_publishers_info_by_topic(&format!("/new_no_{i_context}"), false) + .unwrap(); assert_eq!(pub_info.len(), 1); assert_eq!(pub_info[0].topic_type, "std_msgs/msg/Int32".to_owned()); assert_eq!( @@ -112,14 +116,6 @@ async fn tokio_testing() -> Result<(), Box> { } }); - task::spawn(async move { - (0..10).for_each(|i| { - p_the_no - .publish(&r2r::std_msgs::msg::Int32 { data: i }) - .unwrap(); - }); - }); - task::spawn(async move { (0..10).for_each(|i| { p_float_no From 2df3a584f323a77c8f108a34f69f507edeba0727 Mon Sep 17 00:00:00 2001 From: NerdToMars Date: Thu, 11 Jan 2024 12:45:37 +0000 Subject: [PATCH 3/3] hotfix duration for foxy distro --- .dockerignore | 3 +++ r2r/src/qos.rs | 23 +++++++++++++++++++---- 2 files changed, 22 insertions(+), 4 deletions(-) create mode 100644 .dockerignore diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 000000000..dbf034382 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,3 @@ +.git +.github +target diff --git a/r2r/src/qos.rs b/r2r/src/qos.rs index 8aca68d43..8e680bd34 100644 --- a/r2r/src/qos.rs +++ b/r2r/src/qos.rs @@ -825,10 +825,25 @@ impl RclDurationT for Duration { } fn from_rmw_time_t(rmw_time: &rmw_time_t) -> Self { - assert!( - rmw_time.nsec < 1_000_000_000, - "nsec part of rmw_time_t should be less than 1 billion" - ); + #[cfg(not(r2r__ros__distro__foxy))] + { + assert!( + rmw_time.nsec < 1_000_000_000, + "nsec part of rmw_time_t should be less than 1 billion" + ); + } + + #[cfg(r2r__ros__distro__foxy)] + { + // FIXME: In foxy, duration data obtained from publisher with default qos profile is + // sec: 7FFFFFFF (2147483647), nsec: FFFFFFFF (4294967295) + if rmw_time.nsec == 4294967295 { + // 0s indicates deadline policies are not tracked or enforced in foxy + return Duration::new(0, 0); + } else if rmw_time.nsec > 1_000_000_000 { + panic!("nsec part of rmw_time_t should be less than 1 billion"); + } + } Duration::new(rmw_time.sec, rmw_time.nsec as u32) }