From 09f981980ad377f2291a8f8e3551bd4dc5ed7d7f Mon Sep 17 00:00:00 2001 From: Tobias Stark Date: Thu, 30 May 2024 16:04:33 +0000 Subject: [PATCH] Add and remove subscription from waitset depending on wait state --- r2r/src/nodes.rs | 61 ++++++++++++++++--- r2r/src/subscribers.rs | 131 ++++++++++++++++++++++++++++++++++------- r2r/src/time_source.rs | 5 ++ 3 files changed, 169 insertions(+), 28 deletions(-) diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index f8404f750..39e483d7d 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -65,6 +65,10 @@ pub struct Node { // time source that provides simulated time #[cfg(r2r__rosgraph_msgs__msg__Clock)] time_source: TimeSource, + // and a guard condition to notify the waitset that it should reload its elements. + // This guard condition must be triggered by any subscriber, service, etc. that changes + // its `is_waiting` state to true + waitset_elements_changed_gc: rcl_guard_condition_t, } unsafe impl Send for Node {} @@ -182,7 +186,7 @@ impl Node { /// Creates a ROS node. pub fn create(ctx: Context, name: &str, namespace: &str) -> Result { - let (res, node_handle) = { + let (res, waitset_gc, node_handle) = { let mut ctx_handle = ctx.context_handle.lock().unwrap(); let c_node_name = CString::new(name).unwrap(); @@ -199,7 +203,9 @@ impl Node { &node_options as *const _, ) }; - (res, node_handle) + + let waitset_gc = new_guard_condition(ctx_handle.as_mut())?; + (res, waitset_gc, node_handle) }; if res == RCL_RET_OK as i32 { @@ -225,6 +231,7 @@ impl Node { ros_clock, #[cfg(r2r__rosgraph_msgs__msg__Clock)] time_source, + waitset_elements_changed_gc: waitset_gc, }; node.load_params()?; Ok(node) @@ -552,14 +559,20 @@ impl Node { { let subscription_handle = create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?; - let (sender, receiver) = mpsc::channel::(10); + let waker = Arc::new(std::sync::Mutex::new(None)); let ws = TypedSubscriber { rcl_handle: subscription_handle, - sender, + waker: Arc::clone(&waker), }; self.subscribers.push(Box::new(ws)); - Ok(receiver) + + Ok(SubscriberStream:: { + rcl_handle: subscription_handle, + waker, + waiting_state_changed_gc: self.waitset_elements_changed_gc, + stream_type: std::marker::PhantomData, + }) } /// Subscribe to a ROS topic. @@ -987,7 +1000,7 @@ impl Node { rcl_wait_set_init( &mut ws, self.subscribers.len() + total_action_subs, - 0, + 1, // for the waitset_elements_changed_gc self.timers.len() + total_action_timers, self.clients.len() + total_action_clients, self.services.len() + total_action_services, @@ -1001,9 +1014,27 @@ impl Node { rcl_wait_set_clear(&mut ws); } + unsafe { + // First off, add the waitset_elements_changed guard condition. + // Rationale: The code below will add only subscribers that are actively waiting. + // This avoids an endless loop where a busy subscriber keeps waking up the waitset + // even though it doesn't have the capacity to handle the new data. However, it also + // means that a subscriber/service/etc that changes its waiting state needs to update + // the waitset, otherwise it will not be woken up when new data arrives. In that situation + // it shall trigger this guard condition, which will force a wakeup of the waitset and a return + // from this function. On the next call to spin_once, the subscriber will be added. + rcl_wait_set_add_guard_condition( + &mut ws, + &self.waitset_elements_changed_gc as *const rcl_guard_condition_t, + std::ptr::null_mut(), + ); + } + for s in &self.subscribers { - unsafe { - rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut()); + if s.is_waiting() { + unsafe { + rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut()); + } } } @@ -1589,3 +1620,17 @@ fn convert_info_array_to_vec( topic_info_list } + +fn new_guard_condition(ctx: &mut rcl_context_s) -> Result { + unsafe { + let mut gc = rcl_get_zero_initialized_guard_condition(); + match Error::from_rcl_error(rcl_guard_condition_init( + &mut gc, + ctx, + rcl_guard_condition_get_default_options(), + )) { + Error::RCL_RET_OK => Ok(gc), + e => Err(e), + } + } +} diff --git a/r2r/src/subscribers.rs b/r2r/src/subscribers.rs index 0d3f34fc8..c89b52052 100644 --- a/r2r/src/subscribers.rs +++ b/r2r/src/subscribers.rs @@ -4,20 +4,64 @@ use std::ffi::CString; use crate::{error::*, msg_types::*, qos::QosProfile}; use r2r_rcl::*; use std::ffi::{c_void, CStr}; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Poll; +use futures::stream::Stream; pub trait Subscriber_ { fn handle(&self) -> &rcl_subscription_t; /// Returns true if the subscriber stream has been dropped. fn handle_incoming(&mut self) -> bool; + // Returns true if the subscriber is waiting for incoming messages + fn is_waiting(&self) -> bool; fn destroy(&mut self, node: &mut rcl_node_t); } -pub struct TypedSubscriber +// TODO(tobias.stark): Implement the new wakeup logic for the other subscriber types as well. +// Let's just take TypedSubscriber as our proof of concept. +pub struct TypedSubscriber { + pub rcl_handle: rcl_subscription_t, + // The waker to call when new data is available + pub waker: Arc>>, +} + +pub struct SubscriberStream where T: WrappedTypesupport, { pub rcl_handle: rcl_subscription_t, - pub sender: mpsc::Sender, + pub waker: Arc>>, + pub waiting_state_changed_gc: rcl_guard_condition_t, + // suppress Rust's "unused type" error + pub stream_type: std::marker::PhantomData, +} + +impl std::marker::Unpin for SubscriberStream {} +unsafe impl std::marker::Send for SubscriberStream {} + +impl SubscriberStream +where + T: WrappedTypesupport, +{ + fn receive(&mut self) -> Option { + let mut msg_info = rmw_message_info_t::default(); // we dont care for now + let mut msg = WrappedNativeMsg::::new(); + let ret = unsafe { + rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut()) + }; + if ret == RCL_RET_OK as i32 { + Some(T::from_native(&msg)) + } else if ret == RCL_RET_SUBSCRIPTION_TAKE_FAILED as i32 { + // No message available + None + } else { + // An unexpected error while reading. The old code just ignored it. + // For now just panic, but we should think about this again + panic!("Error while reading message from subscription: {ret}"); + } + } } pub struct NativeSubscriber @@ -40,33 +84,23 @@ pub struct RawSubscriber { pub sender: mpsc::Sender>, } -impl Subscriber_ for TypedSubscriber -where - T: WrappedTypesupport, -{ +impl Subscriber_ for TypedSubscriber { fn handle(&self) -> &rcl_subscription_t { &self.rcl_handle } fn handle_incoming(&mut self) -> bool { - let mut msg_info = rmw_message_info_t::default(); // we dont care for now - let mut msg = WrappedNativeMsg::::new(); - let ret = unsafe { - rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut()) - }; - if ret == RCL_RET_OK as i32 { - let msg = T::from_native(&msg); - if let Err(e) = self.sender.try_send(msg) { - if e.is_disconnected() { - // user dropped the handle to the stream, signal removal. - return true; - } - log::debug!("error {:?}", e) - } + let locked_waker = self.waker.lock().unwrap(); + if let Some(ref waker) = *locked_waker { + waker.wake_by_ref(); } false } + fn is_waiting(&self) -> bool { + self.waker.lock().unwrap().is_some() + } + fn destroy(&mut self, node: &mut rcl_node_t) { unsafe { rcl_subscription_fini(&mut self.rcl_handle, node); @@ -144,6 +178,11 @@ where false } + fn is_waiting(&self) -> bool { + // TODO(tobiasstark): Implement + true + } + fn destroy(&mut self, node: &mut rcl_node_t) { unsafe { rcl_subscription_fini(&mut self.rcl_handle, node); @@ -176,6 +215,11 @@ impl Subscriber_ for UntypedSubscriber { false } + fn is_waiting(&self) -> bool { + // TODO(tobiasstark): Implement + true + } + fn destroy(&mut self, node: &mut rcl_node_t) { unsafe { rcl_subscription_fini(&mut self.rcl_handle, node); @@ -222,6 +266,11 @@ impl Subscriber_ for RawSubscriber { false } + fn is_waiting(&self) -> bool { + // TODO(tobiasstark): Implement + true + } + fn destroy(&mut self, node: &mut rcl_node_t) { unsafe { rcl_subscription_fini(&mut self.rcl_handle, node); @@ -254,3 +303,45 @@ pub fn create_subscription_helper( Err(Error::from_rcl_error(result)) } } + +impl Stream for SubscriberStream +where + T: WrappedTypesupport, +{ + type Item = T; + + // Required method + fn poll_next( + mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, + ) -> Poll> { + if let Some(msg) = self.receive() { + *self.waker.lock().unwrap() = None; + return Poll::Ready(Some(msg)); + } + + // Update the stored waker, depending on whether the subscriber is now pending or not + let was_waiting = { + let mut stored_waker = self.waker.lock().unwrap(); + let was_waiting = stored_waker.is_some(); + *stored_waker = Some(cx.waker().clone()); + was_waiting + }; + + // If the subscription goes from not-waiting to waiting, notify the waitset so it adds this subscription + if !was_waiting { + unsafe { + match Error::from_rcl_error(rcl_trigger_guard_condition( + &mut self.waiting_state_changed_gc, + )) { + Error::RCL_RET_OK => {} + e => { + // This can only fail if the guard condition object was invalid, so panic is the appropriate response + panic!("Failed to trigger guard condition: {e}"); + } + } + } + } + + Poll::Pending + } +} diff --git a/r2r/src/time_source.rs b/r2r/src/time_source.rs index 91b1a09c6..b25a95d8f 100644 --- a/r2r/src/time_source.rs +++ b/r2r/src/time_source.rs @@ -247,6 +247,11 @@ impl Subscriber_ for TimeSourceSubscriber { } } + fn is_waiting(&self) -> bool { + // TODO(tobiasstark): Implement + true + } + fn destroy(&mut self, node: &mut rcl_node_t) { unsafe { rcl_subscription_fini(&mut self.subscriber_handle, node);