Skip to content

Commit

Permalink
Update implementation of publisher to handle wait_for_inter_process_s…
Browse files Browse the repository at this point in the history
…ubscribers
  • Loading branch information
mchhoy committed Dec 27, 2023
1 parent 957a6fc commit b4f905b
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 50 deletions.
1 change: 0 additions & 1 deletion r2r/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ r2r_actions = { path = "../r2r_actions", version = "0.8.2" }
r2r_macros = { path = "../r2r_macros", version = "0.8.2" }
uuid = { version = "1.2.2", features = ["serde", "v4"] }
futures = "0.3.25"
futures-timer = "3.0.2"
log = "0.4.18"
phf = "0.11.1"

Expand Down
12 changes: 8 additions & 4 deletions r2r/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub struct Node {
// timers,
timers: Vec<Timer_>,
// and the publishers, whom we allow to be shared.. hmm.
pubs: Vec<Arc<rcl_publisher_t>>,
pubs: Vec<Arc<Publisher_>>,
}

unsafe impl Send for Node {}
Expand Down Expand Up @@ -828,6 +828,10 @@ impl Node {
c.lock().unwrap().poll_available(self.node_handle.as_mut());
}

for p in &self.pubs {
p.poll_has_inter_process_subscribers();
}

let timeout = timeout.as_nanos() as i64;
let mut ws = unsafe { rcl_get_zero_initialized_wait_set() };

Expand Down Expand Up @@ -1305,9 +1309,9 @@ impl Drop for Node {
s.lock().unwrap().destroy(&mut self.node_handle);
}
while let Some(p) = self.pubs.pop() {
let mut p = wait_until_unwrapped(p);
let _ret = unsafe { rcl_publisher_fini(&mut p as *mut _, self.node_handle.as_mut()) };
// TODO: check ret
let p = wait_until_unwrapped(p);

p.destroy(self.node_handle.as_mut());
}
unsafe {
rcl_node_fini(self.node_handle.as_mut());
Expand Down
189 changes: 144 additions & 45 deletions r2r/src/publishers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Once;
use std::sync::Weak;
use std::sync::Mutex;
use futures::Future;
use futures::channel::oneshot;
use futures::TryFutureExt;

use crate::error::*;
use crate::msg_types::*;
Expand Down Expand Up @@ -37,6 +41,68 @@ use r2r_rcl::*;

unsafe impl<T> Send for Publisher<T> where T: WrappedTypesupport {}

pub(crate) struct Publisher_ {
handle: rcl_publisher_t,

// TODO use a mpsc to avoid the mutex?
poll_inter_process_subscriber_channels: Mutex<Vec<oneshot::Sender<()>>>
}

impl Publisher_
{
fn get_inter_process_subscription_count(&self) -> Result<usize> {
// See https://github.com/ros2/rclcpp/issues/623

let mut inter_process_subscription_count = 0;

let result = unsafe {
rcl_publisher_get_subscription_count(
&self.handle as *const rcl_publisher_s,
&mut inter_process_subscription_count as *mut usize,
)
};

if result == RCL_RET_OK as i32 {
Ok(inter_process_subscription_count)
} else {
Err(Error::from_rcl_error(result))
}
}

pub(crate) fn poll_has_inter_process_subscribers(&self) {

let mut poll_inter_process_subscriber_channels =
self.poll_inter_process_subscriber_channels.lock().unwrap();

if poll_inter_process_subscriber_channels.is_empty() {
return;
}
let inter_process_subscription_count = self.get_inter_process_subscription_count();
match inter_process_subscription_count {
Ok(0) => {
// not available...
}
Ok(_) => {
// send ok and close channels
while let Some(sender) = poll_inter_process_subscriber_channels.pop() {
let _res = sender.send(()); // we ignore if receiver dropped.
}
}
Err(_) => {
// error, close all channels
poll_inter_process_subscriber_channels.clear();
}
}
}

pub(crate) fn destroy(mut self, node: &mut rcl_node_t) {
let _ret = unsafe { rcl_publisher_fini(&mut self.handle as *mut _, node) };

// TODO: check ret
}
}


/// A ROS (typed) publisher.
///
/// This contains a `Weak Arc` to a typed publisher. As such it is safe to
Expand All @@ -46,7 +112,7 @@ pub struct Publisher<T>
where
T: WrappedTypesupport,
{
handle: Weak<rcl_publisher_t>,
handle: Weak<Publisher_>,
type_: PhantomData<T>,
}

Expand All @@ -58,11 +124,11 @@ unsafe impl Send for PublisherUntyped {}
/// move between threads.
#[derive(Debug, Clone)]
pub struct PublisherUntyped {
handle: Weak<rcl_publisher_t>,
handle: Weak<Publisher_>,
type_: String,
}

pub fn make_publisher<T>(handle: Weak<rcl_publisher_t>) -> Publisher<T>
pub fn make_publisher<T>(handle: Weak<Publisher_>) -> Publisher<T>
where
T: WrappedTypesupport,
{
Expand All @@ -72,14 +138,17 @@ where
}
}

pub fn make_publisher_untyped(handle: Weak<rcl_publisher_t>, type_: String) -> PublisherUntyped {
PublisherUntyped { handle, type_ }
pub fn make_publisher_untyped(handle: Weak<Publisher_>, type_: String) -> PublisherUntyped {
PublisherUntyped {
handle,
type_,
}
}

pub fn create_publisher_helper(
node: &mut rcl_node_t, topic: &str, typesupport: *const rosidl_message_type_support_t,
qos_profile: QosProfile,
) -> Result<rcl_publisher_t> {
) -> Result<Publisher_> {
let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() };
let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;

Expand All @@ -95,7 +164,10 @@ pub fn create_publisher_helper(
)
};
if result == RCL_RET_OK as i32 {
Ok(publisher_handle)
Ok(Publisher_ {
handle: publisher_handle,
poll_inter_process_subscriber_channels: Mutex::new(Vec::new())
})
} else {
Err(Error::from_rcl_error(result))
}
Expand All @@ -116,7 +188,11 @@ impl PublisherUntyped {
native_msg.from_json(msg)?;

let result =
unsafe { rcl_publish(publisher.as_ref(), native_msg.void_ptr(), std::ptr::null_mut()) };
unsafe { rcl_publish(
&publisher.handle as *const rcl_publisher_s,
native_msg.void_ptr(),
std::ptr::null_mut())
};

if result == RCL_RET_OK as i32 {
Ok(())
Expand All @@ -125,8 +201,34 @@ impl PublisherUntyped {
Err(Error::from_rcl_error(result))
}
}

/// Gets the number of external subscribers (i.e. it doesn't
/// count subscribers from the same process).
pub fn get_inter_process_subscription_count(&self) -> Result<usize> {
self.handle
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
.get_inter_process_subscription_count()
}

/// Waits for at least one external subscriber to begin subscribing to the
/// topic. It doesn't count subscribers from the same process.
pub fn wait_for_inter_process_subscribers(&self) -> Result<impl Future<Output = Result<()>>> {
let (sender, receiver) = oneshot::channel();

self.handle
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
.poll_inter_process_subscriber_channels
.lock()
.unwrap()
.push(sender);

Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
}
}


impl<T: 'static> Publisher<T>
where
T: WrappedTypesupport,
Expand All @@ -143,7 +245,11 @@ where
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
let native_msg: WrappedNativeMsg<T> = WrappedNativeMsg::<T>::from(msg);
let result =
unsafe { rcl_publish(publisher.as_ref(), native_msg.void_ptr(), std::ptr::null_mut()) };
unsafe { rcl_publish(
&publisher.handle as *const rcl_publisher_s,
native_msg.void_ptr(),
std::ptr::null_mut())
};

if result == RCL_RET_OK as i32 {
Ok(())
Expand All @@ -163,17 +269,21 @@ where
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;

if unsafe { rcl_publisher_can_loan_messages(publisher.as_ref()) } {
if unsafe { rcl_publisher_can_loan_messages(&publisher.handle as *const rcl_publisher_s) } {
let mut loaned_msg: *mut c_void = std::ptr::null_mut();
let ret = unsafe {
rcl_borrow_loaned_message(publisher.as_ref(), T::get_ts(), &mut loaned_msg)
rcl_borrow_loaned_message(
&publisher.handle as *const rcl_publisher_s,
T::get_ts(),
&mut loaned_msg
)
};
if ret != RCL_RET_OK as i32 {
log::error!("Failed getting loaned message");
return Err(Error::from_rcl_error(ret));
}

let handle_box = Box::new(*publisher.as_ref());
let handle_box = Box::new(publisher.handle);
let msg = WrappedNativeMsg::<T>::from_loaned(
loaned_msg as *mut T::CStruct,
Box::new(|msg: *mut T::CStruct| {
Expand Down Expand Up @@ -226,13 +336,17 @@ where

// publish and return loaned message to middleware
rcl_publish_loaned_message(
publisher.as_ref(),
&publisher.handle as *const rcl_publisher_s,
msg.void_ptr_mut(),
std::ptr::null_mut(),
)
}
} else {
unsafe { rcl_publish(publisher.as_ref(), msg.void_ptr(), std::ptr::null_mut()) }
unsafe { rcl_publish(
&publisher.handle as *const rcl_publisher_s,
msg.void_ptr(),
std::ptr::null_mut()
) }
};

if result == RCL_RET_OK as i32 {
Expand All @@ -246,41 +360,26 @@ where
/// Gets the number of external subscribers (i.e. it doesn't
/// count subscribers from the same process).
pub fn get_inter_process_subscription_count(&self) -> Result<usize> {
// See https://github.com/ros2/rclcpp/issues/623

let mut inter_process_subscription_count = 0;

let publisher = self
.handle
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;

let result = unsafe {
rcl_publisher_get_subscription_count(
publisher.as_ref(),
&mut inter_process_subscription_count as *mut usize,
)
};

if result == RCL_RET_OK as i32 {
Ok(inter_process_subscription_count)
} else {
Err(Error::from_rcl_error(result))
}
self.handle
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
.get_inter_process_subscription_count()
}

/// Waits for at least one external subscriber to begin subscribing to the
/// topic. It doesn't count subscribers from the same process.
pub async fn wait_for_inter_process_subscribers(&self) -> Result<()> {
// According to this there is no event available:
//
// https://github.com/ros2/rclcpp/issues/623
loop {
if self.get_inter_process_subscription_count()? > 0 {
return Ok(());
}
pub fn wait_for_inter_process_subscribers(&self) -> Result<impl Future<Output = Result<()>>> {
let (sender, receiver) = oneshot::channel();

futures_timer::Delay::new(std::time::Duration::from_millis(100)).await
}
self.handle
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
.poll_inter_process_subscriber_channels
.lock()
.unwrap()
.push(sender);

Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
}

}

0 comments on commit b4f905b

Please sign in to comment.