Skip to content

Commit

Permalink
Add and remove subscription from waitset depending on wait state
Browse files Browse the repository at this point in the history
  • Loading branch information
tobiasstarkwayve committed Jun 11, 2024
1 parent 2194815 commit 09f9819
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 28 deletions.
61 changes: 53 additions & 8 deletions r2r/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ pub struct Node {
// time source that provides simulated time
#[cfg(r2r__rosgraph_msgs__msg__Clock)]
time_source: TimeSource,
// and a guard condition to notify the waitset that it should reload its elements.
// This guard condition must be triggered by any subscriber, service, etc. that changes
// its `is_waiting` state to true
waitset_elements_changed_gc: rcl_guard_condition_t,
}

unsafe impl Send for Node {}
Expand Down Expand Up @@ -182,7 +186,7 @@ impl Node {

/// Creates a ROS node.
pub fn create(ctx: Context, name: &str, namespace: &str) -> Result<Node> {
let (res, node_handle) = {
let (res, waitset_gc, node_handle) = {
let mut ctx_handle = ctx.context_handle.lock().unwrap();

let c_node_name = CString::new(name).unwrap();
Expand All @@ -199,7 +203,9 @@ impl Node {
&node_options as *const _,
)
};
(res, node_handle)

let waitset_gc = new_guard_condition(ctx_handle.as_mut())?;
(res, waitset_gc, node_handle)
};

if res == RCL_RET_OK as i32 {
Expand All @@ -225,6 +231,7 @@ impl Node {
ros_clock,
#[cfg(r2r__rosgraph_msgs__msg__Clock)]
time_source,
waitset_elements_changed_gc: waitset_gc,
};
node.load_params()?;
Ok(node)
Expand Down Expand Up @@ -552,14 +559,20 @@ impl Node {
{
let subscription_handle =
create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?;
let (sender, receiver) = mpsc::channel::<T>(10);

let waker = Arc::new(std::sync::Mutex::new(None));
let ws = TypedSubscriber {
rcl_handle: subscription_handle,
sender,
waker: Arc::clone(&waker),
};
self.subscribers.push(Box::new(ws));
Ok(receiver)

Ok(SubscriberStream::<T> {
rcl_handle: subscription_handle,
waker,
waiting_state_changed_gc: self.waitset_elements_changed_gc,
stream_type: std::marker::PhantomData,
})
}

/// Subscribe to a ROS topic.
Expand Down Expand Up @@ -987,7 +1000,7 @@ impl Node {
rcl_wait_set_init(
&mut ws,
self.subscribers.len() + total_action_subs,
0,
1, // for the waitset_elements_changed_gc
self.timers.len() + total_action_timers,
self.clients.len() + total_action_clients,
self.services.len() + total_action_services,
Expand All @@ -1001,9 +1014,27 @@ impl Node {
rcl_wait_set_clear(&mut ws);
}

unsafe {
// First off, add the waitset_elements_changed guard condition.
// Rationale: The code below will add only subscribers that are actively waiting.
// This avoids an endless loop where a busy subscriber keeps waking up the waitset
// even though it doesn't have the capacity to handle the new data. However, it also
// means that a subscriber/service/etc that changes its waiting state needs to update
// the waitset, otherwise it will not be woken up when new data arrives. In that situation
// it shall trigger this guard condition, which will force a wakeup of the waitset and a return
// from this function. On the next call to spin_once, the subscriber will be added.
rcl_wait_set_add_guard_condition(
&mut ws,
&self.waitset_elements_changed_gc as *const rcl_guard_condition_t,
std::ptr::null_mut(),
);
}

for s in &self.subscribers {
unsafe {
rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut());
if s.is_waiting() {
unsafe {
rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut());
}
}
}

Expand Down Expand Up @@ -1589,3 +1620,17 @@ fn convert_info_array_to_vec(

topic_info_list
}

fn new_guard_condition(ctx: &mut rcl_context_s) -> Result<rcl_guard_condition_t> {
unsafe {
let mut gc = rcl_get_zero_initialized_guard_condition();
match Error::from_rcl_error(rcl_guard_condition_init(
&mut gc,
ctx,
rcl_guard_condition_get_default_options(),
)) {
Error::RCL_RET_OK => Ok(gc),
e => Err(e),
}
}
}
131 changes: 111 additions & 20 deletions r2r/src/subscribers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,64 @@ use std::ffi::CString;
use crate::{error::*, msg_types::*, qos::QosProfile};
use r2r_rcl::*;
use std::ffi::{c_void, CStr};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use futures::stream::Stream;

pub trait Subscriber_ {
fn handle(&self) -> &rcl_subscription_t;
/// Returns true if the subscriber stream has been dropped.
fn handle_incoming(&mut self) -> bool;
// Returns true if the subscriber is waiting for incoming messages
fn is_waiting(&self) -> bool;
fn destroy(&mut self, node: &mut rcl_node_t);
}

pub struct TypedSubscriber<T>
// TODO(tobias.stark): Implement the new wakeup logic for the other subscriber types as well.
// Let's just take TypedSubscriber as our proof of concept.
pub struct TypedSubscriber {
pub rcl_handle: rcl_subscription_t,
// The waker to call when new data is available
pub waker: Arc<std::sync::Mutex<Option<std::task::Waker>>>,
}

pub struct SubscriberStream<T>
where
T: WrappedTypesupport,
{
pub rcl_handle: rcl_subscription_t,
pub sender: mpsc::Sender<T>,
pub waker: Arc<std::sync::Mutex<Option<std::task::Waker>>>,
pub waiting_state_changed_gc: rcl_guard_condition_t,
// suppress Rust's "unused type" error
pub stream_type: std::marker::PhantomData<T>,
}

impl<T: WrappedTypesupport + 'static> std::marker::Unpin for SubscriberStream<T> {}
unsafe impl<T: WrappedTypesupport + 'static> std::marker::Send for SubscriberStream<T> {}

impl<T: 'static> SubscriberStream<T>
where
T: WrappedTypesupport,
{
fn receive(&mut self) -> Option<T> {
let mut msg_info = rmw_message_info_t::default(); // we dont care for now
let mut msg = WrappedNativeMsg::<T>::new();
let ret = unsafe {
rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut())
};
if ret == RCL_RET_OK as i32 {
Some(T::from_native(&msg))
} else if ret == RCL_RET_SUBSCRIPTION_TAKE_FAILED as i32 {
// No message available
None
} else {
// An unexpected error while reading. The old code just ignored it.
// For now just panic, but we should think about this again
panic!("Error while reading message from subscription: {ret}");
}
}
}

pub struct NativeSubscriber<T>
Expand All @@ -40,33 +84,23 @@ pub struct RawSubscriber {
pub sender: mpsc::Sender<Vec<u8>>,
}

impl<T: 'static> Subscriber_ for TypedSubscriber<T>
where
T: WrappedTypesupport,
{
impl Subscriber_ for TypedSubscriber {
fn handle(&self) -> &rcl_subscription_t {
&self.rcl_handle
}

fn handle_incoming(&mut self) -> bool {
let mut msg_info = rmw_message_info_t::default(); // we dont care for now
let mut msg = WrappedNativeMsg::<T>::new();
let ret = unsafe {
rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut())
};
if ret == RCL_RET_OK as i32 {
let msg = T::from_native(&msg);
if let Err(e) = self.sender.try_send(msg) {
if e.is_disconnected() {
// user dropped the handle to the stream, signal removal.
return true;
}
log::debug!("error {:?}", e)
}
let locked_waker = self.waker.lock().unwrap();
if let Some(ref waker) = *locked_waker {
waker.wake_by_ref();
}
false
}

fn is_waiting(&self) -> bool {
self.waker.lock().unwrap().is_some()
}

fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_subscription_fini(&mut self.rcl_handle, node);
Expand Down Expand Up @@ -144,6 +178,11 @@ where
false
}

fn is_waiting(&self) -> bool {
// TODO(tobiasstark): Implement
true
}

fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_subscription_fini(&mut self.rcl_handle, node);
Expand Down Expand Up @@ -176,6 +215,11 @@ impl Subscriber_ for UntypedSubscriber {
false
}

fn is_waiting(&self) -> bool {
// TODO(tobiasstark): Implement
true
}

fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_subscription_fini(&mut self.rcl_handle, node);
Expand Down Expand Up @@ -222,6 +266,11 @@ impl Subscriber_ for RawSubscriber {
false
}

fn is_waiting(&self) -> bool {
// TODO(tobiasstark): Implement
true
}

fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_subscription_fini(&mut self.rcl_handle, node);
Expand Down Expand Up @@ -254,3 +303,45 @@ pub fn create_subscription_helper(
Err(Error::from_rcl_error(result))
}
}

impl<T: 'static> Stream for SubscriberStream<T>
where
T: WrappedTypesupport,
{
type Item = T;

// Required method
fn poll_next(
mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>,
) -> Poll<Option<T>> {
if let Some(msg) = self.receive() {
*self.waker.lock().unwrap() = None;
return Poll::Ready(Some(msg));
}

// Update the stored waker, depending on whether the subscriber is now pending or not
let was_waiting = {
let mut stored_waker = self.waker.lock().unwrap();
let was_waiting = stored_waker.is_some();
*stored_waker = Some(cx.waker().clone());
was_waiting
};

// If the subscription goes from not-waiting to waiting, notify the waitset so it adds this subscription
if !was_waiting {
unsafe {
match Error::from_rcl_error(rcl_trigger_guard_condition(
&mut self.waiting_state_changed_gc,
)) {
Error::RCL_RET_OK => {}
e => {
// This can only fail if the guard condition object was invalid, so panic is the appropriate response
panic!("Failed to trigger guard condition: {e}");
}
}
}
}

Poll::Pending
}
}
5 changes: 5 additions & 0 deletions r2r/src/time_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,11 @@ impl Subscriber_ for TimeSourceSubscriber {
}
}

fn is_waiting(&self) -> bool {
// TODO(tobiasstark): Implement
true
}

fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_subscription_fini(&mut self.subscriber_handle, node);
Expand Down

0 comments on commit 09f9819

Please sign in to comment.