From 9efa17d0a7d889de507cb7baf81c8148889a9b5b Mon Sep 17 00:00:00 2001 From: Michael Hoy Date: Wed, 13 Dec 2023 23:36:36 +0800 Subject: [PATCH 1/4] Add rcl_publisher_get_subscription_count related methods --- r2r/Cargo.toml | 1 + r2r/src/publishers.rs | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/r2r/Cargo.toml b/r2r/Cargo.toml index 39baf8ba6..d72551d61 100644 --- a/r2r/Cargo.toml +++ b/r2r/Cargo.toml @@ -25,6 +25,7 @@ r2r_actions = { path = "../r2r_actions", version = "0.8.2" } r2r_macros = { path = "../r2r_macros", version = "0.8.2" } uuid = { version = "1.2.2", features = ["serde", "v4"] } futures = "0.3.25" +futures-timer = "3.0.2" log = "0.4.18" phf = "0.11.1" diff --git a/r2r/src/publishers.rs b/r2r/src/publishers.rs index 55668c49a..9874307f7 100644 --- a/r2r/src/publishers.rs +++ b/r2r/src/publishers.rs @@ -242,4 +242,36 @@ where Err(Error::from_rcl_error(result)) } } + + /// Gets the number of external subscribers (i.e. it doesn't + /// count subscribers from the same process). + pub fn get_inter_process_subscription_count(&self) -> usize { + // See https://github.com/ros2/rclcpp/issues/623 + + let mut inter_process_subscription_count = 0; + + unsafe { + rcl_publisher_get_subscription_count( + self.handle.as_ptr(), + &mut inter_process_subscription_count as *mut usize, + ) + }; + + inter_process_subscription_count + } + + /// Waits for at least one external subscriber to begin subscribing to the + /// topic. It doesn't count subscribers from the same process. + pub async fn wait_for_inter_process_subscribers(&self) { + // According to this there is no event available: + // + // https://github.com/ros2/rclcpp/issues/623 + loop { + if self.get_inter_process_subscription_count() > 0 { + break; + } + + futures_timer::Delay::new(std::time::Duration::from_millis(100)).await + } + } } From 957a6fcee3858625b4e02d505ed1402e19c0e1e2 Mon Sep 17 00:00:00 2001 From: Michael Hoy Date: Thu, 14 Dec 2023 08:23:47 +0800 Subject: [PATCH 2/4] Add error handling --- r2r/src/publishers.rs | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/r2r/src/publishers.rs b/r2r/src/publishers.rs index 9874307f7..132a3e417 100644 --- a/r2r/src/publishers.rs +++ b/r2r/src/publishers.rs @@ -245,30 +245,39 @@ where /// Gets the number of external subscribers (i.e. it doesn't /// count subscribers from the same process). - pub fn get_inter_process_subscription_count(&self) -> usize { + pub fn get_inter_process_subscription_count(&self) -> Result { // See https://github.com/ros2/rclcpp/issues/623 let mut inter_process_subscription_count = 0; - unsafe { + let publisher = self + .handle + .upgrade() + .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; + + let result = unsafe { rcl_publisher_get_subscription_count( - self.handle.as_ptr(), + publisher.as_ref(), &mut inter_process_subscription_count as *mut usize, ) }; - inter_process_subscription_count + if result == RCL_RET_OK as i32 { + Ok(inter_process_subscription_count) + } else { + Err(Error::from_rcl_error(result)) + } } /// Waits for at least one external subscriber to begin subscribing to the /// topic. It doesn't count subscribers from the same process. - pub async fn wait_for_inter_process_subscribers(&self) { + pub async fn wait_for_inter_process_subscribers(&self) -> Result<()> { // According to this there is no event available: // // https://github.com/ros2/rclcpp/issues/623 loop { - if self.get_inter_process_subscription_count() > 0 { - break; + if self.get_inter_process_subscription_count()? > 0 { + return Ok(()); } futures_timer::Delay::new(std::time::Duration::from_millis(100)).await From b4f905b3606cb90fd4c5095eb9933de7835bedb2 Mon Sep 17 00:00:00 2001 From: Michael Hoy Date: Wed, 27 Dec 2023 23:35:28 +0800 Subject: [PATCH 3/4] Update implementation of publisher to handle wait_for_inter_process_subscribers --- r2r/Cargo.toml | 1 - r2r/src/nodes.rs | 12 ++- r2r/src/publishers.rs | 189 ++++++++++++++++++++++++++++++++---------- 3 files changed, 152 insertions(+), 50 deletions(-) diff --git a/r2r/Cargo.toml b/r2r/Cargo.toml index d72551d61..39baf8ba6 100644 --- a/r2r/Cargo.toml +++ b/r2r/Cargo.toml @@ -25,7 +25,6 @@ r2r_actions = { path = "../r2r_actions", version = "0.8.2" } r2r_macros = { path = "../r2r_macros", version = "0.8.2" } uuid = { version = "1.2.2", features = ["serde", "v4"] } futures = "0.3.25" -futures-timer = "3.0.2" log = "0.4.18" phf = "0.11.1" diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index d7a78b2d7..49b08c1b5 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -51,7 +51,7 @@ pub struct Node { // timers, timers: Vec, // and the publishers, whom we allow to be shared.. hmm. - pubs: Vec>, + pubs: Vec>, } unsafe impl Send for Node {} @@ -828,6 +828,10 @@ impl Node { c.lock().unwrap().poll_available(self.node_handle.as_mut()); } + for p in &self.pubs { + p.poll_has_inter_process_subscribers(); + } + let timeout = timeout.as_nanos() as i64; let mut ws = unsafe { rcl_get_zero_initialized_wait_set() }; @@ -1305,9 +1309,9 @@ impl Drop for Node { s.lock().unwrap().destroy(&mut self.node_handle); } while let Some(p) = self.pubs.pop() { - let mut p = wait_until_unwrapped(p); - let _ret = unsafe { rcl_publisher_fini(&mut p as *mut _, self.node_handle.as_mut()) }; - // TODO: check ret + let p = wait_until_unwrapped(p); + + p.destroy(self.node_handle.as_mut()); } unsafe { rcl_node_fini(self.node_handle.as_mut()); diff --git a/r2r/src/publishers.rs b/r2r/src/publishers.rs index 132a3e417..4da649684 100644 --- a/r2r/src/publishers.rs +++ b/r2r/src/publishers.rs @@ -4,6 +4,10 @@ use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Once; use std::sync::Weak; +use std::sync::Mutex; +use futures::Future; +use futures::channel::oneshot; +use futures::TryFutureExt; use crate::error::*; use crate::msg_types::*; @@ -37,6 +41,68 @@ use r2r_rcl::*; unsafe impl Send for Publisher where T: WrappedTypesupport {} +pub(crate) struct Publisher_ { + handle: rcl_publisher_t, + + // TODO use a mpsc to avoid the mutex? + poll_inter_process_subscriber_channels: Mutex>> +} + +impl Publisher_ +{ + fn get_inter_process_subscription_count(&self) -> Result { + // See https://github.com/ros2/rclcpp/issues/623 + + let mut inter_process_subscription_count = 0; + + let result = unsafe { + rcl_publisher_get_subscription_count( + &self.handle as *const rcl_publisher_s, + &mut inter_process_subscription_count as *mut usize, + ) + }; + + if result == RCL_RET_OK as i32 { + Ok(inter_process_subscription_count) + } else { + Err(Error::from_rcl_error(result)) + } + } + + pub(crate) fn poll_has_inter_process_subscribers(&self) { + + let mut poll_inter_process_subscriber_channels = + self.poll_inter_process_subscriber_channels.lock().unwrap(); + + if poll_inter_process_subscriber_channels.is_empty() { + return; + } + let inter_process_subscription_count = self.get_inter_process_subscription_count(); + match inter_process_subscription_count { + Ok(0) => { + // not available... + } + Ok(_) => { + // send ok and close channels + while let Some(sender) = poll_inter_process_subscriber_channels.pop() { + let _res = sender.send(()); // we ignore if receiver dropped. + } + } + Err(_) => { + // error, close all channels + poll_inter_process_subscriber_channels.clear(); + } + } + } + + pub(crate) fn destroy(mut self, node: &mut rcl_node_t) { + let _ret = unsafe { rcl_publisher_fini(&mut self.handle as *mut _, node) }; + + // TODO: check ret + } +} + + /// A ROS (typed) publisher. /// /// This contains a `Weak Arc` to a typed publisher. As such it is safe to @@ -46,7 +112,7 @@ pub struct Publisher where T: WrappedTypesupport, { - handle: Weak, + handle: Weak, type_: PhantomData, } @@ -58,11 +124,11 @@ unsafe impl Send for PublisherUntyped {} /// move between threads. #[derive(Debug, Clone)] pub struct PublisherUntyped { - handle: Weak, + handle: Weak, type_: String, } -pub fn make_publisher(handle: Weak) -> Publisher +pub fn make_publisher(handle: Weak) -> Publisher where T: WrappedTypesupport, { @@ -72,14 +138,17 @@ where } } -pub fn make_publisher_untyped(handle: Weak, type_: String) -> PublisherUntyped { - PublisherUntyped { handle, type_ } +pub fn make_publisher_untyped(handle: Weak, type_: String) -> PublisherUntyped { + PublisherUntyped { + handle, + type_, + } } pub fn create_publisher_helper( node: &mut rcl_node_t, topic: &str, typesupport: *const rosidl_message_type_support_t, qos_profile: QosProfile, -) -> Result { +) -> Result { let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() }; let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; @@ -95,7 +164,10 @@ pub fn create_publisher_helper( ) }; if result == RCL_RET_OK as i32 { - Ok(publisher_handle) + Ok(Publisher_ { + handle: publisher_handle, + poll_inter_process_subscriber_channels: Mutex::new(Vec::new()) + }) } else { Err(Error::from_rcl_error(result)) } @@ -116,7 +188,11 @@ impl PublisherUntyped { native_msg.from_json(msg)?; let result = - unsafe { rcl_publish(publisher.as_ref(), native_msg.void_ptr(), std::ptr::null_mut()) }; + unsafe { rcl_publish( + &publisher.handle as *const rcl_publisher_s, + native_msg.void_ptr(), + std::ptr::null_mut()) + }; if result == RCL_RET_OK as i32 { Ok(()) @@ -125,8 +201,34 @@ impl PublisherUntyped { Err(Error::from_rcl_error(result)) } } + + /// Gets the number of external subscribers (i.e. it doesn't + /// count subscribers from the same process). + pub fn get_inter_process_subscription_count(&self) -> Result { + self.handle + .upgrade() + .ok_or(Error::RCL_RET_PUBLISHER_INVALID)? + .get_inter_process_subscription_count() + } + + /// Waits for at least one external subscriber to begin subscribing to the + /// topic. It doesn't count subscribers from the same process. + pub fn wait_for_inter_process_subscribers(&self) -> Result>> { + let (sender, receiver) = oneshot::channel(); + + self.handle + .upgrade() + .ok_or(Error::RCL_RET_PUBLISHER_INVALID)? + .poll_inter_process_subscriber_channels + .lock() + .unwrap() + .push(sender); + + Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID)) + } } + impl Publisher where T: WrappedTypesupport, @@ -143,7 +245,11 @@ where .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; let native_msg: WrappedNativeMsg = WrappedNativeMsg::::from(msg); let result = - unsafe { rcl_publish(publisher.as_ref(), native_msg.void_ptr(), std::ptr::null_mut()) }; + unsafe { rcl_publish( + &publisher.handle as *const rcl_publisher_s, + native_msg.void_ptr(), + std::ptr::null_mut()) + }; if result == RCL_RET_OK as i32 { Ok(()) @@ -163,17 +269,21 @@ where .upgrade() .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; - if unsafe { rcl_publisher_can_loan_messages(publisher.as_ref()) } { + if unsafe { rcl_publisher_can_loan_messages(&publisher.handle as *const rcl_publisher_s) } { let mut loaned_msg: *mut c_void = std::ptr::null_mut(); let ret = unsafe { - rcl_borrow_loaned_message(publisher.as_ref(), T::get_ts(), &mut loaned_msg) + rcl_borrow_loaned_message( + &publisher.handle as *const rcl_publisher_s, + T::get_ts(), + &mut loaned_msg + ) }; if ret != RCL_RET_OK as i32 { log::error!("Failed getting loaned message"); return Err(Error::from_rcl_error(ret)); } - let handle_box = Box::new(*publisher.as_ref()); + let handle_box = Box::new(publisher.handle); let msg = WrappedNativeMsg::::from_loaned( loaned_msg as *mut T::CStruct, Box::new(|msg: *mut T::CStruct| { @@ -226,13 +336,17 @@ where // publish and return loaned message to middleware rcl_publish_loaned_message( - publisher.as_ref(), + &publisher.handle as *const rcl_publisher_s, msg.void_ptr_mut(), std::ptr::null_mut(), ) } } else { - unsafe { rcl_publish(publisher.as_ref(), msg.void_ptr(), std::ptr::null_mut()) } + unsafe { rcl_publish( + &publisher.handle as *const rcl_publisher_s, + msg.void_ptr(), + std::ptr::null_mut() + ) } }; if result == RCL_RET_OK as i32 { @@ -246,41 +360,26 @@ where /// Gets the number of external subscribers (i.e. it doesn't /// count subscribers from the same process). pub fn get_inter_process_subscription_count(&self) -> Result { - // See https://github.com/ros2/rclcpp/issues/623 - - let mut inter_process_subscription_count = 0; - - let publisher = self - .handle - .upgrade() - .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; - - let result = unsafe { - rcl_publisher_get_subscription_count( - publisher.as_ref(), - &mut inter_process_subscription_count as *mut usize, - ) - }; - - if result == RCL_RET_OK as i32 { - Ok(inter_process_subscription_count) - } else { - Err(Error::from_rcl_error(result)) - } + self.handle + .upgrade() + .ok_or(Error::RCL_RET_PUBLISHER_INVALID)? + .get_inter_process_subscription_count() } /// Waits for at least one external subscriber to begin subscribing to the /// topic. It doesn't count subscribers from the same process. - pub async fn wait_for_inter_process_subscribers(&self) -> Result<()> { - // According to this there is no event available: - // - // https://github.com/ros2/rclcpp/issues/623 - loop { - if self.get_inter_process_subscription_count()? > 0 { - return Ok(()); - } + pub fn wait_for_inter_process_subscribers(&self) -> Result>> { + let (sender, receiver) = oneshot::channel(); - futures_timer::Delay::new(std::time::Duration::from_millis(100)).await - } + self.handle + .upgrade() + .ok_or(Error::RCL_RET_PUBLISHER_INVALID)? + .poll_inter_process_subscriber_channels + .lock() + .unwrap() + .push(sender); + + Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID)) } + } From 46f5cedbafa7b4965199c5ab16f8663f02135739 Mon Sep 17 00:00:00 2001 From: Michael Hoy Date: Wed, 27 Dec 2023 23:39:14 +0800 Subject: [PATCH 4/4] Replace rcl_publisher_s with rcl_publisher_t --- r2r/src/publishers.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/r2r/src/publishers.rs b/r2r/src/publishers.rs index 4da649684..75b668f74 100644 --- a/r2r/src/publishers.rs +++ b/r2r/src/publishers.rs @@ -57,7 +57,7 @@ impl Publisher_ let result = unsafe { rcl_publisher_get_subscription_count( - &self.handle as *const rcl_publisher_s, + &self.handle as *const rcl_publisher_t, &mut inter_process_subscription_count as *mut usize, ) }; @@ -189,7 +189,7 @@ impl PublisherUntyped { let result = unsafe { rcl_publish( - &publisher.handle as *const rcl_publisher_s, + &publisher.handle as *const rcl_publisher_t, native_msg.void_ptr(), std::ptr::null_mut()) }; @@ -246,7 +246,7 @@ where let native_msg: WrappedNativeMsg = WrappedNativeMsg::::from(msg); let result = unsafe { rcl_publish( - &publisher.handle as *const rcl_publisher_s, + &publisher.handle as *const rcl_publisher_t, native_msg.void_ptr(), std::ptr::null_mut()) }; @@ -269,11 +269,11 @@ where .upgrade() .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; - if unsafe { rcl_publisher_can_loan_messages(&publisher.handle as *const rcl_publisher_s) } { + if unsafe { rcl_publisher_can_loan_messages(&publisher.handle as *const rcl_publisher_t) } { let mut loaned_msg: *mut c_void = std::ptr::null_mut(); let ret = unsafe { rcl_borrow_loaned_message( - &publisher.handle as *const rcl_publisher_s, + &publisher.handle as *const rcl_publisher_t, T::get_ts(), &mut loaned_msg ) @@ -336,14 +336,14 @@ where // publish and return loaned message to middleware rcl_publish_loaned_message( - &publisher.handle as *const rcl_publisher_s, + &publisher.handle as *const rcl_publisher_t, msg.void_ptr_mut(), std::ptr::null_mut(), ) } } else { unsafe { rcl_publish( - &publisher.handle as *const rcl_publisher_s, + &publisher.handle as *const rcl_publisher_t, msg.void_ptr(), std::ptr::null_mut() ) }