Skip to content

Commit

Permalink
Add and remove subscription from waitset depending on wait state
Browse files Browse the repository at this point in the history
  • Loading branch information
tobiasstarkwayve committed Jun 11, 2024
1 parent cc84beb commit 5fa069d
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 28 deletions.
62 changes: 54 additions & 8 deletions r2r/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pub struct Node {
timers: Vec<Timer_>,
// and the publishers, whom we allow to be shared.. hmm.
pubs: Vec<Arc<rcl_publisher_t>>,
// and a guard condition to notify the waitset that it should reload its elements.
// This guard condition must be triggered by any subscriber, service, etc. that changes
// its `is_waiting` state to true
waitset_elements_changed_gc: rcl_guard_condition_t,
}

unsafe impl Send for Node {}
Expand Down Expand Up @@ -156,7 +160,7 @@ impl Node {

/// Creates a ROS node.
pub fn create(ctx: Context, name: &str, namespace: &str) -> Result<Node> {
let (res, node_handle) = {
let (res, waitset_gc, node_handle) = {
let mut ctx_handle = ctx.context_handle.lock().unwrap();

let c_node_name = CString::new(name).unwrap();
Expand All @@ -173,7 +177,9 @@ impl Node {
&node_options as *const _,
)
};
(res, node_handle)

let waitset_gc = new_guard_condition(ctx_handle.as_mut())?;
(res, waitset_gc, node_handle)
};

if res == RCL_RET_OK as i32 {
Expand All @@ -188,6 +194,7 @@ impl Node {
action_servers: Vec::new(),
timers: Vec::new(),
pubs: Vec::new(),
waitset_elements_changed_gc: waitset_gc,
};
node.load_params()?;
Ok(node)
Expand Down Expand Up @@ -297,14 +304,20 @@ impl Node {
{
let subscription_handle =
create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?;
let (sender, receiver) = mpsc::channel::<T>(10);

let waker = Arc::new(std::sync::Mutex::new(None));
let ws = TypedSubscriber {
rcl_handle: subscription_handle,
sender,
waker: Arc::clone(&waker),
};
self.subscribers.push(Box::new(ws));
Ok(receiver)

Ok(SubscriberStream::<T> {
rcl_handle: subscription_handle,
waker,
waiting_state_changed_gc: self.waitset_elements_changed_gc,
stream_type: std::marker::PhantomData,
})
}

/// Subscribe to a ROS topic.
Expand Down Expand Up @@ -667,7 +680,7 @@ impl Node {
rcl_wait_set_init(
&mut ws,
self.subscribers.len() + total_action_subs,
0,
1, // for the waitset_elements_changed_gc
self.timers.len() + total_action_timers,
self.clients.len() + total_action_clients,
self.services.len() + total_action_services,
Expand All @@ -681,9 +694,28 @@ impl Node {
rcl_wait_set_clear(&mut ws);
}

unsafe {
// First off, add the waitset_elements_changed guard condition.
// Rationale: The code below will add only subscribers that are actively waiting.
// This avoids an endless loop where a busy subscriber keeps waking up the waitset
// even though the subscriber doesn't have the capacity to handle the new data. However, it also
// means that a subscriber that changes its waiting state needs to update
// the waitset, otherwise the waitset will not wake up when new data arrives. In that situation
// the subscriber shall trigger this guard condition, which will force a wakeup of the waitset and a return
// from this function. On the next call to spin_once, the subscriber will be added.
// (The same holds analogously for services, etc.)
rcl_wait_set_add_guard_condition(
&mut ws,
&self.waitset_elements_changed_gc as *const rcl_guard_condition_t,
std::ptr::null_mut(),
);
}

for s in &self.subscribers {
unsafe {
rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut());
if s.is_waiting() {
unsafe {
rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut());
}
}
}

Expand Down Expand Up @@ -1081,3 +1113,17 @@ impl Drop for Node {
pub trait IsAvailablePollable {
fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()>;
}

fn new_guard_condition(ctx: &mut rcl_context_s) -> Result<rcl_guard_condition_t> {
unsafe {
let mut gc = rcl_get_zero_initialized_guard_condition();
match Error::from_rcl_error(rcl_guard_condition_init(
&mut gc,
ctx,
rcl_guard_condition_get_default_options(),
)) {
Error::RCL_RET_OK => Ok(gc),
e => Err(e),
}
}
}
125 changes: 105 additions & 20 deletions r2r/src/subscribers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,63 @@ use crate::qos::QosProfile;
use r2r_rcl::*;
use std::ffi::c_void;
use std::ffi::CStr;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;

pub trait Subscriber_ {
fn handle(&self) -> &rcl_subscription_t;
/// Returns true if the subscriber stream has been dropped.
fn handle_incoming(&mut self) -> bool;
// Returns true if the subscriber is waiting for incoming messages
fn is_waiting(&self) -> bool;
fn destroy(&mut self, node: &mut rcl_node_t);
}

pub struct TypedSubscriber<T>
// TODO(tobias.stark): Implement the new wakeup logic for the other subscriber types as well.
// Let's just take TypedSubscriber as our proof of concept.
pub struct TypedSubscriber {
pub rcl_handle: rcl_subscription_t,
// The waker to call when new data is available
pub waker: Arc<std::sync::Mutex<Option<std::task::Waker>>>,
}

pub struct SubscriberStream<T>
where
T: WrappedTypesupport,
{
pub rcl_handle: rcl_subscription_t,
pub sender: mpsc::Sender<T>,
pub waker: Arc<std::sync::Mutex<Option<std::task::Waker>>>,
pub waiting_state_changed_gc: rcl_guard_condition_t,
// suppress Rust's "unused type" error
pub stream_type: std::marker::PhantomData<T>,
}

impl<T: WrappedTypesupport + 'static> std::marker::Unpin for SubscriberStream<T> {}
unsafe impl<T: WrappedTypesupport + 'static> std::marker::Send for SubscriberStream<T> {}

impl<T: 'static> SubscriberStream<T>
where
T: WrappedTypesupport,
{
fn receive(&mut self) -> Option<T> {
let mut msg_info = rmw_message_info_t::default(); // we dont care for now
let mut msg = WrappedNativeMsg::<T>::new();
let ret = unsafe {
rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut())
};
if ret == RCL_RET_OK as i32 {
Some(T::from_native(&msg))
} else if ret == RCL_RET_SUBSCRIPTION_TAKE_FAILED as i32 {
// No message available
None
} else {
// An unexpected error while reading. The old code just ignored it.
// For now just panic, but we should think about this again
panic!("Error while reading message from subscription: {ret}");
}
}
}

pub struct NativeSubscriber<T>
Expand All @@ -37,33 +80,23 @@ pub struct UntypedSubscriber {
pub sender: mpsc::Sender<Result<serde_json::Value>>,
}

impl<T: 'static> Subscriber_ for TypedSubscriber<T>
where
T: WrappedTypesupport,
{
impl Subscriber_ for TypedSubscriber {
fn handle(&self) -> &rcl_subscription_t {
&self.rcl_handle
}

fn handle_incoming(&mut self) -> bool {
let mut msg_info = rmw_message_info_t::default(); // we dont care for now
let mut msg = WrappedNativeMsg::<T>::new();
let ret = unsafe {
rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut())
};
if ret == RCL_RET_OK as i32 {
let msg = T::from_native(&msg);
if let Err(e) = self.sender.try_send(msg) {
if e.is_disconnected() {
// user dropped the handle to the stream, signal removal.
return true;
}
log::debug!("error {:?}", e)
}
let locked_waker = self.waker.lock().unwrap();
if let Some(ref waker) = *locked_waker {
waker.wake_by_ref();
}
false
}

fn is_waiting(&self) -> bool {
self.waker.lock().unwrap().is_some()
}

fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_subscription_fini(&mut self.rcl_handle, node);
Expand Down Expand Up @@ -140,6 +173,11 @@ where
false
}

fn is_waiting(&self) -> bool {
// TODO(tobiasstark): implement
true
}

fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_subscription_fini(&mut self.rcl_handle, node);
Expand Down Expand Up @@ -172,6 +210,11 @@ impl Subscriber_ for UntypedSubscriber {
false
}

fn is_waiting(&self) -> bool {
// TODO(tobiasstark): Implement
true
}

fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_subscription_fini(&mut self.rcl_handle, node);
Expand Down Expand Up @@ -203,3 +246,45 @@ pub fn create_subscription_helper(
Err(Error::from_rcl_error(result))
}
}

impl<T: 'static> tokio_stream::Stream for SubscriberStream<T>
where
T: WrappedTypesupport,
{
type Item = T;

// Required method
fn poll_next(
mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
if let Some(msg) = self.receive() {
self.waker.lock().unwrap() = None;
return Poll::Ready(Some(msg));
}

// Update the stored waker, depending on whether the subscriber is now pending or not
let was_waiting = {
let mut stored_waker = self.waker.lock().unwrap();
let was_waiting = stored_waker.is_some();
*stored_waker = Some(cx.waker().clone());
was_waiting
};

// If the subscription goes from not-waiting to waiting, notify the waitset so it adds this subscription
if !was_waiting {
unsafe {
match Error::from_rcl_error(rcl_trigger_guard_condition(
&mut self.waiting_state_changed_gc,
)) {
Error::RCL_RET_OK => {}
e => {
// This can only fail if the guard condition object was invalid, so panic is the appropriate response
panic!("Failed to trigger guard condition: {e}");
}
}
}
}

Poll::Pending
}
}

0 comments on commit 5fa069d

Please sign in to comment.