Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rcl_publisher_get_subscription_count related methods #75

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions r2r/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub struct Node {
// timers,
timers: Vec<Timer_>,
// and the publishers, whom we allow to be shared.. hmm.
pubs: Vec<Arc<rcl_publisher_t>>,
pubs: Vec<Arc<Publisher_>>,
}

unsafe impl Send for Node {}
Expand Down Expand Up @@ -828,6 +828,10 @@ impl Node {
c.lock().unwrap().poll_available(self.node_handle.as_mut());
}

for p in &self.pubs {
p.poll_has_inter_process_subscribers();
}

let timeout = timeout.as_nanos() as i64;
let mut ws = unsafe { rcl_get_zero_initialized_wait_set() };

Expand Down Expand Up @@ -1305,9 +1309,9 @@ impl Drop for Node {
s.lock().unwrap().destroy(&mut self.node_handle);
}
while let Some(p) = self.pubs.pop() {
let mut p = wait_until_unwrapped(p);
let _ret = unsafe { rcl_publisher_fini(&mut p as *mut _, self.node_handle.as_mut()) };
// TODO: check ret
let p = wait_until_unwrapped(p);

p.destroy(self.node_handle.as_mut());
}
unsafe {
rcl_node_fini(self.node_handle.as_mut());
Expand Down
168 changes: 154 additions & 14 deletions r2r/src/publishers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Once;
use std::sync::Weak;
use std::sync::Mutex;
use futures::Future;
use futures::channel::oneshot;
use futures::TryFutureExt;

use crate::error::*;
use crate::msg_types::*;
Expand Down Expand Up @@ -37,6 +41,68 @@ use r2r_rcl::*;

unsafe impl<T> Send for Publisher<T> where T: WrappedTypesupport {}

pub(crate) struct Publisher_ {
handle: rcl_publisher_t,

// TODO use a mpsc to avoid the mutex?
poll_inter_process_subscriber_channels: Mutex<Vec<oneshot::Sender<()>>>
}

impl Publisher_
{
fn get_inter_process_subscription_count(&self) -> Result<usize> {
// See https://github.com/ros2/rclcpp/issues/623

let mut inter_process_subscription_count = 0;

let result = unsafe {
rcl_publisher_get_subscription_count(
&self.handle as *const rcl_publisher_t,
&mut inter_process_subscription_count as *mut usize,
)
};

if result == RCL_RET_OK as i32 {
Ok(inter_process_subscription_count)
} else {
Err(Error::from_rcl_error(result))
}
}

pub(crate) fn poll_has_inter_process_subscribers(&self) {

let mut poll_inter_process_subscriber_channels =
self.poll_inter_process_subscriber_channels.lock().unwrap();

if poll_inter_process_subscriber_channels.is_empty() {
return;
}
let inter_process_subscription_count = self.get_inter_process_subscription_count();
match inter_process_subscription_count {
Ok(0) => {
// not available...
}
Ok(_) => {
// send ok and close channels
while let Some(sender) = poll_inter_process_subscriber_channels.pop() {
let _res = sender.send(()); // we ignore if receiver dropped.
}
}
Err(_) => {
// error, close all channels
poll_inter_process_subscriber_channels.clear();
}
}
}

pub(crate) fn destroy(mut self, node: &mut rcl_node_t) {
let _ret = unsafe { rcl_publisher_fini(&mut self.handle as *mut _, node) };

// TODO: check ret
}
}


/// A ROS (typed) publisher.
///
/// This contains a `Weak Arc` to a typed publisher. As such it is safe to
Expand All @@ -46,7 +112,7 @@ pub struct Publisher<T>
where
T: WrappedTypesupport,
{
handle: Weak<rcl_publisher_t>,
handle: Weak<Publisher_>,
type_: PhantomData<T>,
}

Expand All @@ -58,11 +124,11 @@ unsafe impl Send for PublisherUntyped {}
/// move between threads.
#[derive(Debug, Clone)]
pub struct PublisherUntyped {
handle: Weak<rcl_publisher_t>,
handle: Weak<Publisher_>,
type_: String,
}

pub fn make_publisher<T>(handle: Weak<rcl_publisher_t>) -> Publisher<T>
pub fn make_publisher<T>(handle: Weak<Publisher_>) -> Publisher<T>
where
T: WrappedTypesupport,
{
Expand All @@ -72,14 +138,17 @@ where
}
}

pub fn make_publisher_untyped(handle: Weak<rcl_publisher_t>, type_: String) -> PublisherUntyped {
PublisherUntyped { handle, type_ }
pub fn make_publisher_untyped(handle: Weak<Publisher_>, type_: String) -> PublisherUntyped {
PublisherUntyped {
handle,
type_,
}
}

pub fn create_publisher_helper(
node: &mut rcl_node_t, topic: &str, typesupport: *const rosidl_message_type_support_t,
qos_profile: QosProfile,
) -> Result<rcl_publisher_t> {
) -> Result<Publisher_> {
let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() };
let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;

Expand All @@ -95,7 +164,10 @@ pub fn create_publisher_helper(
)
};
if result == RCL_RET_OK as i32 {
Ok(publisher_handle)
Ok(Publisher_ {
handle: publisher_handle,
poll_inter_process_subscriber_channels: Mutex::new(Vec::new())
})
} else {
Err(Error::from_rcl_error(result))
}
Expand All @@ -116,7 +188,11 @@ impl PublisherUntyped {
native_msg.from_json(msg)?;

let result =
unsafe { rcl_publish(publisher.as_ref(), native_msg.void_ptr(), std::ptr::null_mut()) };
unsafe { rcl_publish(
&publisher.handle as *const rcl_publisher_t,
native_msg.void_ptr(),
std::ptr::null_mut())
};

if result == RCL_RET_OK as i32 {
Ok(())
Expand All @@ -125,8 +201,34 @@ impl PublisherUntyped {
Err(Error::from_rcl_error(result))
}
}

/// Gets the number of external subscribers (i.e. it doesn't
/// count subscribers from the same process).
pub fn get_inter_process_subscription_count(&self) -> Result<usize> {
self.handle
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
.get_inter_process_subscription_count()
}

/// Waits for at least one external subscriber to begin subscribing to the
/// topic. It doesn't count subscribers from the same process.
pub fn wait_for_inter_process_subscribers(&self) -> Result<impl Future<Output = Result<()>>> {
let (sender, receiver) = oneshot::channel();

self.handle
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
.poll_inter_process_subscriber_channels
.lock()
.unwrap()
.push(sender);

Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
}
}


impl<T: 'static> Publisher<T>
where
T: WrappedTypesupport,
Expand All @@ -143,7 +245,11 @@ where
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
let native_msg: WrappedNativeMsg<T> = WrappedNativeMsg::<T>::from(msg);
let result =
unsafe { rcl_publish(publisher.as_ref(), native_msg.void_ptr(), std::ptr::null_mut()) };
unsafe { rcl_publish(
&publisher.handle as *const rcl_publisher_t,
native_msg.void_ptr(),
std::ptr::null_mut())
};

if result == RCL_RET_OK as i32 {
Ok(())
Expand All @@ -163,17 +269,21 @@ where
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;

if unsafe { rcl_publisher_can_loan_messages(publisher.as_ref()) } {
if unsafe { rcl_publisher_can_loan_messages(&publisher.handle as *const rcl_publisher_t) } {
let mut loaned_msg: *mut c_void = std::ptr::null_mut();
let ret = unsafe {
rcl_borrow_loaned_message(publisher.as_ref(), T::get_ts(), &mut loaned_msg)
rcl_borrow_loaned_message(
&publisher.handle as *const rcl_publisher_t,
T::get_ts(),
&mut loaned_msg
)
};
if ret != RCL_RET_OK as i32 {
log::error!("Failed getting loaned message");
return Err(Error::from_rcl_error(ret));
}

let handle_box = Box::new(*publisher.as_ref());
let handle_box = Box::new(publisher.handle);
let msg = WrappedNativeMsg::<T>::from_loaned(
loaned_msg as *mut T::CStruct,
Box::new(|msg: *mut T::CStruct| {
Expand Down Expand Up @@ -226,13 +336,17 @@ where

// publish and return loaned message to middleware
rcl_publish_loaned_message(
publisher.as_ref(),
&publisher.handle as *const rcl_publisher_t,
msg.void_ptr_mut(),
std::ptr::null_mut(),
)
}
} else {
unsafe { rcl_publish(publisher.as_ref(), msg.void_ptr(), std::ptr::null_mut()) }
unsafe { rcl_publish(
&publisher.handle as *const rcl_publisher_t,
msg.void_ptr(),
std::ptr::null_mut()
) }
};

if result == RCL_RET_OK as i32 {
Expand All @@ -242,4 +356,30 @@ where
Err(Error::from_rcl_error(result))
}
}

/// Gets the number of external subscribers (i.e. it doesn't
/// count subscribers from the same process).
pub fn get_inter_process_subscription_count(&self) -> Result<usize> {
self.handle
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
.get_inter_process_subscription_count()
}

/// Waits for at least one external subscriber to begin subscribing to the
/// topic. It doesn't count subscribers from the same process.
pub fn wait_for_inter_process_subscribers(&self) -> Result<impl Future<Output = Result<()>>> {
let (sender, receiver) = oneshot::channel();

self.handle
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
.poll_inter_process_subscriber_channels
.lock()
.unwrap()
.push(sender);

Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
}

}