From 3d6936e70a323f3e5b9304178d0923c01d2ef64b Mon Sep 17 00:00:00 2001 From: Michael Hoy Date: Thu, 28 Dec 2023 06:14:09 +0800 Subject: [PATCH] Add rcl_publisher_get_subscription_count related methods (#75) * Add rcl_publisher_get_subscription_count related methods * Update implementation of publisher to handle wait_for_inter_process_subscribers --- r2r/src/nodes.rs | 12 ++- r2r/src/publishers.rs | 168 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 162 insertions(+), 18 deletions(-) diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index d7a78b2d7..49b08c1b5 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -51,7 +51,7 @@ pub struct Node { // timers, timers: Vec, // and the publishers, whom we allow to be shared.. hmm. - pubs: Vec>, + pubs: Vec>, } unsafe impl Send for Node {} @@ -828,6 +828,10 @@ impl Node { c.lock().unwrap().poll_available(self.node_handle.as_mut()); } + for p in &self.pubs { + p.poll_has_inter_process_subscribers(); + } + let timeout = timeout.as_nanos() as i64; let mut ws = unsafe { rcl_get_zero_initialized_wait_set() }; @@ -1305,9 +1309,9 @@ impl Drop for Node { s.lock().unwrap().destroy(&mut self.node_handle); } while let Some(p) = self.pubs.pop() { - let mut p = wait_until_unwrapped(p); - let _ret = unsafe { rcl_publisher_fini(&mut p as *mut _, self.node_handle.as_mut()) }; - // TODO: check ret + let p = wait_until_unwrapped(p); + + p.destroy(self.node_handle.as_mut()); } unsafe { rcl_node_fini(self.node_handle.as_mut()); diff --git a/r2r/src/publishers.rs b/r2r/src/publishers.rs index 55668c49a..75b668f74 100644 --- a/r2r/src/publishers.rs +++ b/r2r/src/publishers.rs @@ -4,6 +4,10 @@ use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Once; use std::sync::Weak; +use std::sync::Mutex; +use futures::Future; +use futures::channel::oneshot; +use futures::TryFutureExt; use crate::error::*; use crate::msg_types::*; @@ -37,6 +41,68 @@ use r2r_rcl::*; unsafe impl Send for Publisher where T: WrappedTypesupport {} +pub(crate) struct Publisher_ { + handle: rcl_publisher_t, + + // TODO use a mpsc to avoid the mutex? + poll_inter_process_subscriber_channels: Mutex>> +} + +impl Publisher_ +{ + fn get_inter_process_subscription_count(&self) -> Result { + // See https://github.com/ros2/rclcpp/issues/623 + + let mut inter_process_subscription_count = 0; + + let result = unsafe { + rcl_publisher_get_subscription_count( + &self.handle as *const rcl_publisher_t, + &mut inter_process_subscription_count as *mut usize, + ) + }; + + if result == RCL_RET_OK as i32 { + Ok(inter_process_subscription_count) + } else { + Err(Error::from_rcl_error(result)) + } + } + + pub(crate) fn poll_has_inter_process_subscribers(&self) { + + let mut poll_inter_process_subscriber_channels = + self.poll_inter_process_subscriber_channels.lock().unwrap(); + + if poll_inter_process_subscriber_channels.is_empty() { + return; + } + let inter_process_subscription_count = self.get_inter_process_subscription_count(); + match inter_process_subscription_count { + Ok(0) => { + // not available... + } + Ok(_) => { + // send ok and close channels + while let Some(sender) = poll_inter_process_subscriber_channels.pop() { + let _res = sender.send(()); // we ignore if receiver dropped. + } + } + Err(_) => { + // error, close all channels + poll_inter_process_subscriber_channels.clear(); + } + } + } + + pub(crate) fn destroy(mut self, node: &mut rcl_node_t) { + let _ret = unsafe { rcl_publisher_fini(&mut self.handle as *mut _, node) }; + + // TODO: check ret + } +} + + /// A ROS (typed) publisher. /// /// This contains a `Weak Arc` to a typed publisher. As such it is safe to @@ -46,7 +112,7 @@ pub struct Publisher where T: WrappedTypesupport, { - handle: Weak, + handle: Weak, type_: PhantomData, } @@ -58,11 +124,11 @@ unsafe impl Send for PublisherUntyped {} /// move between threads. #[derive(Debug, Clone)] pub struct PublisherUntyped { - handle: Weak, + handle: Weak, type_: String, } -pub fn make_publisher(handle: Weak) -> Publisher +pub fn make_publisher(handle: Weak) -> Publisher where T: WrappedTypesupport, { @@ -72,14 +138,17 @@ where } } -pub fn make_publisher_untyped(handle: Weak, type_: String) -> PublisherUntyped { - PublisherUntyped { handle, type_ } +pub fn make_publisher_untyped(handle: Weak, type_: String) -> PublisherUntyped { + PublisherUntyped { + handle, + type_, + } } pub fn create_publisher_helper( node: &mut rcl_node_t, topic: &str, typesupport: *const rosidl_message_type_support_t, qos_profile: QosProfile, -) -> Result { +) -> Result { let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() }; let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; @@ -95,7 +164,10 @@ pub fn create_publisher_helper( ) }; if result == RCL_RET_OK as i32 { - Ok(publisher_handle) + Ok(Publisher_ { + handle: publisher_handle, + poll_inter_process_subscriber_channels: Mutex::new(Vec::new()) + }) } else { Err(Error::from_rcl_error(result)) } @@ -116,7 +188,11 @@ impl PublisherUntyped { native_msg.from_json(msg)?; let result = - unsafe { rcl_publish(publisher.as_ref(), native_msg.void_ptr(), std::ptr::null_mut()) }; + unsafe { rcl_publish( + &publisher.handle as *const rcl_publisher_t, + native_msg.void_ptr(), + std::ptr::null_mut()) + }; if result == RCL_RET_OK as i32 { Ok(()) @@ -125,8 +201,34 @@ impl PublisherUntyped { Err(Error::from_rcl_error(result)) } } + + /// Gets the number of external subscribers (i.e. it doesn't + /// count subscribers from the same process). + pub fn get_inter_process_subscription_count(&self) -> Result { + self.handle + .upgrade() + .ok_or(Error::RCL_RET_PUBLISHER_INVALID)? + .get_inter_process_subscription_count() + } + + /// Waits for at least one external subscriber to begin subscribing to the + /// topic. It doesn't count subscribers from the same process. + pub fn wait_for_inter_process_subscribers(&self) -> Result>> { + let (sender, receiver) = oneshot::channel(); + + self.handle + .upgrade() + .ok_or(Error::RCL_RET_PUBLISHER_INVALID)? + .poll_inter_process_subscriber_channels + .lock() + .unwrap() + .push(sender); + + Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID)) + } } + impl Publisher where T: WrappedTypesupport, @@ -143,7 +245,11 @@ where .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; let native_msg: WrappedNativeMsg = WrappedNativeMsg::::from(msg); let result = - unsafe { rcl_publish(publisher.as_ref(), native_msg.void_ptr(), std::ptr::null_mut()) }; + unsafe { rcl_publish( + &publisher.handle as *const rcl_publisher_t, + native_msg.void_ptr(), + std::ptr::null_mut()) + }; if result == RCL_RET_OK as i32 { Ok(()) @@ -163,17 +269,21 @@ where .upgrade() .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; - if unsafe { rcl_publisher_can_loan_messages(publisher.as_ref()) } { + if unsafe { rcl_publisher_can_loan_messages(&publisher.handle as *const rcl_publisher_t) } { let mut loaned_msg: *mut c_void = std::ptr::null_mut(); let ret = unsafe { - rcl_borrow_loaned_message(publisher.as_ref(), T::get_ts(), &mut loaned_msg) + rcl_borrow_loaned_message( + &publisher.handle as *const rcl_publisher_t, + T::get_ts(), + &mut loaned_msg + ) }; if ret != RCL_RET_OK as i32 { log::error!("Failed getting loaned message"); return Err(Error::from_rcl_error(ret)); } - let handle_box = Box::new(*publisher.as_ref()); + let handle_box = Box::new(publisher.handle); let msg = WrappedNativeMsg::::from_loaned( loaned_msg as *mut T::CStruct, Box::new(|msg: *mut T::CStruct| { @@ -226,13 +336,17 @@ where // publish and return loaned message to middleware rcl_publish_loaned_message( - publisher.as_ref(), + &publisher.handle as *const rcl_publisher_t, msg.void_ptr_mut(), std::ptr::null_mut(), ) } } else { - unsafe { rcl_publish(publisher.as_ref(), msg.void_ptr(), std::ptr::null_mut()) } + unsafe { rcl_publish( + &publisher.handle as *const rcl_publisher_t, + msg.void_ptr(), + std::ptr::null_mut() + ) } }; if result == RCL_RET_OK as i32 { @@ -242,4 +356,30 @@ where Err(Error::from_rcl_error(result)) } } + + /// Gets the number of external subscribers (i.e. it doesn't + /// count subscribers from the same process). + pub fn get_inter_process_subscription_count(&self) -> Result { + self.handle + .upgrade() + .ok_or(Error::RCL_RET_PUBLISHER_INVALID)? + .get_inter_process_subscription_count() + } + + /// Waits for at least one external subscriber to begin subscribing to the + /// topic. It doesn't count subscribers from the same process. + pub fn wait_for_inter_process_subscribers(&self) -> Result>> { + let (sender, receiver) = oneshot::channel(); + + self.handle + .upgrade() + .ok_or(Error::RCL_RET_PUBLISHER_INVALID)? + .poll_inter_process_subscriber_channels + .lock() + .unwrap() + .push(sender); + + Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID)) + } + }