diff --git a/rclrs/Cargo.toml b/rclrs/Cargo.toml index 69319f659..8ea67974b 100644 --- a/rclrs/Cargo.toml +++ b/rclrs/Cargo.toml @@ -14,15 +14,18 @@ path = "src/lib.rs" # Please keep the list of dependencies alphabetically sorted, # and also state why each dependency is needed. [dependencies] -# Needed for dynamically finding type support libraries +# Needed for dynamically finding type support libraries ament_rs = { version = "0.2", optional = true } + # Needed for uploading documentation to docs.rs cfg-if = "1.0.0" # Needed for clients futures = "0.3" + # Needed for dynamic messages libloading = { version = "0.8", optional = true } + # Needed for the Message trait, among others rosidl_runtime_rs = "0.4" diff --git a/rclrs/src/client.rs b/rclrs/src/client.rs index b7db3a9c4..e7b75cba0 100644 --- a/rclrs/src/client.rs +++ b/rclrs/src/client.rs @@ -9,32 +9,38 @@ use rosidl_runtime_rs::Message; use crate::error::{RclReturnCode, ToResult}; use crate::MessageCow; -use crate::{rcl_bindings::*, RclrsError}; +use crate::{rcl_bindings::*, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX}; // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread // they are running in. Therefore, this type can be safely sent to another thread. unsafe impl Send for rcl_client_t {} -/// Internal struct used by clients. +/// Manage the lifecycle of an `rcl_client_t`, including managing its dependencies +/// on `rcl_node_t` and `rcl_context_t` by ensuring that these dependencies are +/// [dropped after][1] the `rcl_client_t`. +/// +/// [1]: pub struct ClientHandle { - rcl_client_mtx: Mutex, - rcl_node_mtx: Arc>, + rcl_client: Mutex, + node_handle: Arc, pub(crate) in_use_by_wait_set: Arc, } impl ClientHandle { pub(crate) fn lock(&self) -> MutexGuard { - self.rcl_client_mtx.lock().unwrap() + self.rcl_client.lock().unwrap() } } impl Drop for ClientHandle { fn drop(&mut self) { - let rcl_client = self.rcl_client_mtx.get_mut().unwrap(); - let rcl_node_mtx = &mut *self.rcl_node_mtx.lock().unwrap(); - // SAFETY: No preconditions for this function + let rcl_client = self.rcl_client.get_mut().unwrap(); + let mut rcl_node = self.node_handle.rcl_node.lock().unwrap(); + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + // SAFETY: The entity lifecycle mutex is locked to protect against the risk of + // global variables in the rmw implementation being unsafely modified during cleanup. unsafe { - rcl_client_fini(rcl_client, rcl_node_mtx); + rcl_client_fini(rcl_client, &mut *rcl_node); } } } @@ -74,7 +80,7 @@ where T: rosidl_runtime_rs::Service, { /// Creates a new client. - pub(crate) fn new(rcl_node_mtx: Arc>, topic: &str) -> Result + pub(crate) fn new(node_handle: Arc, topic: &str) -> Result // This uses pub(crate) visibility to avoid instantiating this struct outside // [`Node::create_client`], see the struct's documentation for the rationale where @@ -92,24 +98,32 @@ where // SAFETY: No preconditions for this function. let client_options = unsafe { rcl_client_get_default_options() }; - unsafe { - // SAFETY: The rcl_client is zero-initialized as expected by this function. - // The rcl_node is kept alive because it is co-owned by the client. - // The topic name and the options are copied by this function, so they can be dropped - // afterwards. - rcl_client_init( - &mut rcl_client, - &*rcl_node_mtx.lock().unwrap(), - type_support, - topic_c_string.as_ptr(), - &client_options, - ) - .ok()?; + { + let rcl_node = node_handle.rcl_node.lock().unwrap(); + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + + // SAFETY: + // * The rcl_client was zero-initialized as expected by this function. + // * The rcl_node is kept alive by the NodeHandle because it is a dependency of the client. + // * The topic name and the options are copied by this function, so they can be dropped + // afterwards. + // * The entity lifecycle mutex is locked to protect against the risk of global + // variables in the rmw implementation being unsafely modified during initialization. + unsafe { + rcl_client_init( + &mut rcl_client, + &*rcl_node, + type_support, + topic_c_string.as_ptr(), + &client_options, + ) + .ok()?; + } } let handle = Arc::new(ClientHandle { - rcl_client_mtx: Mutex::new(rcl_client), - rcl_node_mtx, + rcl_client: Mutex::new(rcl_client), + node_handle, in_use_by_wait_set: Arc::new(AtomicBool::new(false)), }); @@ -245,8 +259,8 @@ where /// pub fn service_is_ready(&self) -> Result { let mut is_ready = false; - let client = &mut *self.handle.rcl_client_mtx.lock().unwrap(); - let node = &mut *self.handle.rcl_node_mtx.lock().unwrap(); + let client = &mut *self.handle.rcl_client.lock().unwrap(); + let node = &mut *self.handle.node_handle.rcl_node.lock().unwrap(); unsafe { // SAFETY both node and client are guaranteed to be valid here diff --git a/rclrs/src/context.rs b/rclrs/src/context.rs index 7c5d6a763..6d566433c 100644 --- a/rclrs/src/context.rs +++ b/rclrs/src/context.rs @@ -7,14 +7,31 @@ use std::vec::Vec; use crate::rcl_bindings::*; use crate::{RclrsError, ToResult}; +/// This is locked whenever initializing or dropping any middleware entity +/// because we have found issues in RCL and some RMW implementations that +/// make it unsafe to simultaneously initialize and/or drop middleware +/// entities such as `rcl_context_t` and `rcl_node_t` as well middleware +/// primitives such as `rcl_publisher_t`, `rcl_subscription_t`, etc. +/// It seems these C and C++ based libraries will regularly use +/// unprotected global variables in their object initialization and cleanup. +/// +/// Further discussion with the RCL team may help to improve the RCL +/// documentation to specifically call out where these risks are present. For +/// now we lock this mutex for any RCL function that carries reasonable suspicion +/// of a risk. +pub(crate) static ENTITY_LIFECYCLE_MUTEX: Mutex<()> = Mutex::new(()); + impl Drop for rcl_context_t { fn drop(&mut self) { unsafe { // The context may be invalid when rcl_init failed, e.g. because of invalid command // line arguments. - // SAFETY: No preconditions for this function. + + // SAFETY: No preconditions for rcl_context_is_valid. if rcl_context_is_valid(self) { - // SAFETY: These functions have no preconditions besides a valid rcl_context + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + // SAFETY: The entity lifecycle mutex is locked to protect against the risk of + // global variables in the rmw implementation being unsafely modified during cleanup. rcl_shutdown(self); rcl_context_fini(self); } @@ -39,16 +56,26 @@ unsafe impl Send for rcl_context_t {} /// - the allocator used (left as the default by `rclrs`) /// pub struct Context { - pub(crate) rcl_context_mtx: Arc>, + pub(crate) handle: Arc, +} + +/// This struct manages the lifetime and access to the `rcl_context_t`. It will also +/// account for the lifetimes of any dependencies, if we need to add +/// dependencies in the future (currently there are none). It is not strictly +/// necessary to decompose `Context` and `ContextHandle` like this, but we are +/// doing it to be consistent with the lifecycle management of other rcl +/// bindings in this library. +pub(crate) struct ContextHandle { + pub(crate) rcl_context: Mutex, } impl Context { /// Creates a new context. /// - /// Usually, this would be called with `std::env::args()`, analogously to `rclcpp::init()`. + /// Usually this would be called with `std::env::args()`, analogously to `rclcpp::init()`. /// See also the official "Passing ROS arguments to nodes via the command-line" tutorial. /// - /// Creating a context can fail in case the args contain invalid ROS arguments. + /// Creating a context will fail if the args contain invalid ROS arguments. /// /// # Example /// ``` @@ -58,6 +85,21 @@ impl Context { /// assert!(Context::new(invalid_remapping).is_err()); /// ``` pub fn new(args: impl IntoIterator) -> Result { + Self::new_with_options(args, InitOptions::new()) + } + + /// Same as [`Context::new`] except you can additionally provide initialization options. + /// + /// # Example + /// ``` + /// use rclrs::{Context, InitOptions}; + /// let context = Context::new_with_options([], InitOptions::new().with_domain_id(Some(5))).unwrap(); + /// assert_eq!(context.domain_id(), 5); + /// ```` + pub fn new_with_options( + args: impl IntoIterator, + options: InitOptions, + ) -> Result { // SAFETY: Getting a zero-initialized value is always safe let mut rcl_context = unsafe { rcl_get_zero_initialized_context() }; let cstring_args: Vec = args @@ -74,24 +116,26 @@ impl Context { unsafe { // SAFETY: No preconditions for this function. let allocator = rcutils_get_default_allocator(); - // SAFETY: Getting a zero-initialized value is always safe. - let mut rcl_init_options = rcl_get_zero_initialized_init_options(); - // SAFETY: Passing in a zero-initialized value is expected. - // In the case where this returns not ok, there's nothing to clean up. - rcl_init_options_init(&mut rcl_init_options, allocator).ok()?; - // SAFETY: This function does not store the ephemeral init_options and c_args - // pointers. Passing in a zero-initialized rcl_context is expected. - let ret = rcl_init( - c_args.len() as i32, - if c_args.is_empty() { - std::ptr::null() - } else { - c_args.as_ptr() - }, - &rcl_init_options, - &mut rcl_context, - ) - .ok(); + let mut rcl_init_options = options.into_rcl(allocator)?; + // SAFETY: + // * This function does not store the ephemeral init_options and c_args pointers. + // * Passing in a zero-initialized rcl_context is mandatory. + // * The entity lifecycle mutex is locked to protect against the risk of global variables + // in the rmw implementation being unsafely modified during initialization. + let ret = { + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + rcl_init( + c_args.len() as i32, + if c_args.is_empty() { + std::ptr::null() + } else { + c_args.as_ptr() + }, + &rcl_init_options, + &mut rcl_context, + ) + .ok() + }; // SAFETY: It's safe to pass in an initialized object. // Early return will not leak memory, because this is the last fini function. rcl_init_options_fini(&mut rcl_init_options).ok()?; @@ -99,10 +143,31 @@ impl Context { ret?; } Ok(Self { - rcl_context_mtx: Arc::new(Mutex::new(rcl_context)), + handle: Arc::new(ContextHandle { + rcl_context: Mutex::new(rcl_context), + }), }) } + /// Returns the ROS domain ID that the context is using. + /// + /// The domain ID controls which nodes can send messages to each other, see the [ROS 2 concept article][1]. + /// It can be set through the `ROS_DOMAIN_ID` environment variable. + /// + /// [1]: https://docs.ros.org/en/rolling/Concepts/About-Domain-ID.html + pub fn domain_id(&self) -> usize { + let mut domain_id: usize = 0; + let ret = unsafe { + rcl_context_get_domain_id( + &mut *self.handle.rcl_context.lock().unwrap(), + &mut domain_id, + ) + }; + + debug_assert_eq!(ret, 0); + domain_id + } + /// Checks if the context is still valid. /// /// This will return `false` when a signal has caused the context to shut down (currently @@ -110,12 +175,65 @@ impl Context { pub fn ok(&self) -> bool { // This will currently always return true, but once we have a signal handler, the signal // handler could call `rcl_shutdown()`, hence making the context invalid. - let rcl_context = &mut *self.rcl_context_mtx.lock().unwrap(); + let rcl_context = &mut *self.handle.rcl_context.lock().unwrap(); // SAFETY: No preconditions for this function. unsafe { rcl_context_is_valid(rcl_context) } } } +/// Additional options for initializing the Context. +#[derive(Default, Clone)] +pub struct InitOptions { + /// The domain ID that should be used by the Context. Set to None to ask for + /// the default behavior, which is to set the domain ID according to the + /// [ROS_DOMAIN_ID][1] environment variable. + /// + /// [1]: https://docs.ros.org/en/rolling/Concepts/Intermediate/About-Domain-ID.html#the-ros-domain-id + domain_id: Option, +} + +impl InitOptions { + /// Create a new InitOptions with all default values. + pub fn new() -> InitOptions { + Self::default() + } + + /// Transform an InitOptions into a new one with a certain domain_id + pub fn with_domain_id(mut self, domain_id: Option) -> InitOptions { + self.domain_id = domain_id; + self + } + + /// Set the domain_id of an InitOptions, or reset it to the default behavior + /// (determined by environment variables) by providing None. + pub fn set_domain_id(&mut self, domain_id: Option) { + self.domain_id = domain_id; + } + + /// Get the domain_id that will be provided by these InitOptions. + pub fn domain_id(&self) -> Option { + self.domain_id + } + + fn into_rcl(self, allocator: rcutils_allocator_s) -> Result { + unsafe { + // SAFETY: Getting a zero-initialized value is always safe. + let mut rcl_init_options = rcl_get_zero_initialized_init_options(); + // SAFETY: Passing in a zero-initialized value is expected. + // In the case where this returns not ok, there's nothing to clean up. + rcl_init_options_init(&mut rcl_init_options, allocator).ok()?; + + // We only need to set the domain_id if the user asked for something + // other than None. When the user asks for None, that is equivalent + // to the default value in rcl_init_options. + if let Some(domain_id) = self.domain_id { + rcl_init_options_set_domain_id(&mut rcl_init_options, domain_id); + } + Ok(rcl_init_options) + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/rclrs/src/executor.rs b/rclrs/src/executor.rs index c9469d7a7..2f0b6c4d9 100644 --- a/rclrs/src/executor.rs +++ b/rclrs/src/executor.rs @@ -42,7 +42,9 @@ impl SingleThreadedExecutor { for node in { self.nodes_mtx.lock().unwrap() } .iter() .filter_map(Weak::upgrade) - .filter(|node| unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) }) + .filter(|node| unsafe { + rcl_context_is_valid(&*node.handle.context_handle.rcl_context.lock().unwrap()) + }) { let wait_set = WaitSet::new_for_node(&node)?; let ready_entities = wait_set.wait(timeout)?; diff --git a/rclrs/src/node.rs b/rclrs/src/node.rs index c89a1ef74..9410e3321 100644 --- a/rclrs/src/node.rs +++ b/rclrs/src/node.rs @@ -13,18 +13,12 @@ pub use self::builder::*; pub use self::graph::*; use crate::rcl_bindings::*; use crate::{ - Client, ClientBase, Clock, Context, GuardCondition, ParameterBuilder, ParameterInterface, - ParameterVariant, Parameters, Publisher, QoSProfile, RclrsError, Service, ServiceBase, - Subscription, SubscriptionBase, SubscriptionCallback, TimeSource, ToResult, + Client, ClientBase, Clock, Context, ContextHandle, GuardCondition, ParameterBuilder, + ParameterInterface, ParameterVariant, Parameters, Publisher, QoSProfile, RclrsError, Service, + ServiceBase, Subscription, SubscriptionBase, SubscriptionCallback, TimeSource, + ENTITY_LIFECYCLE_MUTEX, }; -impl Drop for rcl_node_t { - fn drop(&mut self) { - // SAFETY: No preconditions for this function - unsafe { rcl_node_fini(self).ok().unwrap() }; - } -} - // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread // they are running in. Therefore, this type can be safely sent to another thread. unsafe impl Send for rcl_node_t {} @@ -71,18 +65,35 @@ pub struct Node { pub(crate) subscriptions_mtx: Mutex>>, time_source: TimeSource, parameter: ParameterInterface, - // Note: it's important to have those last since `drop` will be called in order of declaration - // in the struct and both `TimeSource` and `ParameterInterface` contain subscriptions / - // services that will fail to be dropped if the context or node is destroyed first. - pub(crate) rcl_node_mtx: Arc>, - pub(crate) rcl_context_mtx: Arc>, + pub(crate) handle: Arc, +} + +/// This struct manages the lifetime of an `rcl_node_t`, and accounts for its +/// dependency on the lifetime of its `rcl_context_t` by ensuring that this +/// dependency is [dropped after][1] the `rcl_node_t`. +/// +/// [1]: +pub(crate) struct NodeHandle { + pub(crate) rcl_node: Mutex, + pub(crate) context_handle: Arc, +} + +impl Drop for NodeHandle { + fn drop(&mut self) { + let _context_lock = self.context_handle.rcl_context.lock().unwrap(); + let mut rcl_node = self.rcl_node.lock().unwrap(); + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + // SAFETY: The entity lifecycle mutex is locked to protect against the risk of + // global variables in the rmw implementation being unsafely modified during cleanup. + unsafe { rcl_node_fini(&mut *rcl_node) }; + } } impl Eq for Node {} impl PartialEq for Node { fn eq(&self, other: &Self) -> bool { - Arc::ptr_eq(&self.rcl_node_mtx, &other.rcl_node_mtx) + Arc::ptr_eq(&self.handle, &other.handle) } } @@ -182,7 +193,8 @@ impl Node { &self, getter: unsafe extern "C" fn(*const rcl_node_t) -> *const c_char, ) -> String { - unsafe { call_string_getter_with_handle(&self.rcl_node_mtx.lock().unwrap(), getter) } + let rcl_node = self.handle.rcl_node.lock().unwrap(); + unsafe { call_string_getter_with_rcl_node(&rcl_node, getter) } } /// Creates a [`Client`][1]. @@ -193,7 +205,7 @@ impl Node { where T: rosidl_runtime_rs::Service, { - let client = Arc::new(Client::::new(Arc::clone(&self.rcl_node_mtx), topic)?); + let client = Arc::new(Client::::new(Arc::clone(&self.handle), topic)?); { self.clients_mtx.lock().unwrap() }.push(Arc::downgrade(&client) as Weak); Ok(client) } @@ -208,8 +220,8 @@ impl Node { /// [1]: crate::GuardCondition /// [2]: crate::spin_once pub fn create_guard_condition(&self) -> Arc { - let guard_condition = Arc::new(GuardCondition::new_with_rcl_context( - &mut self.rcl_context_mtx.lock().unwrap(), + let guard_condition = Arc::new(GuardCondition::new_with_context_handle( + Arc::clone(&self.handle.context_handle), None, )); { self.guard_conditions_mtx.lock().unwrap() } @@ -230,8 +242,8 @@ impl Node { where F: Fn() + Send + Sync + 'static, { - let guard_condition = Arc::new(GuardCondition::new_with_rcl_context( - &mut self.rcl_context_mtx.lock().unwrap(), + let guard_condition = Arc::new(GuardCondition::new_with_context_handle( + Arc::clone(&self.handle.context_handle), Some(Box::new(callback) as Box), )); { self.guard_conditions_mtx.lock().unwrap() } @@ -251,11 +263,7 @@ impl Node { where T: Message, { - let publisher = Arc::new(Publisher::::new( - Arc::clone(&self.rcl_node_mtx), - topic, - qos, - )?); + let publisher = Arc::new(Publisher::::new(Arc::clone(&self.handle), topic, qos)?); Ok(publisher) } @@ -273,7 +281,7 @@ impl Node { F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send, { let service = Arc::new(Service::::new( - Arc::clone(&self.rcl_node_mtx), + Arc::clone(&self.handle), topic, callback, )?); @@ -296,7 +304,7 @@ impl Node { T: Message, { let subscription = Arc::new(Subscription::::new( - Arc::clone(&self.rcl_node_mtx), + Arc::clone(&self.handle), topic, qos, callback, @@ -358,11 +366,11 @@ impl Node { // add description about this function is for getting actual domain_id // and about override of domain_id via node option pub fn domain_id(&self) -> usize { - let rcl_node = &*self.rcl_node_mtx.lock().unwrap(); + let rcl_node = self.handle.rcl_node.lock().unwrap(); let mut domain_id: usize = 0; let ret = unsafe { // SAFETY: No preconditions for this function. - rcl_node_get_domain_id(rcl_node, &mut domain_id) + rcl_node_get_domain_id(&*rcl_node, &mut domain_id) }; debug_assert_eq!(ret, 0); @@ -438,7 +446,7 @@ impl Node { // function, which is why it's not merged into Node::call_string_getter(). // This function is unsafe since it's possible to pass in an rcl_node_t with dangling // pointers etc. -pub(crate) unsafe fn call_string_getter_with_handle( +pub(crate) unsafe fn call_string_getter_with_rcl_node( rcl_node: &rcl_node_t, getter: unsafe extern "C" fn(*const rcl_node_t) -> *const c_char, ) -> String { diff --git a/rclrs/src/node/builder.rs b/rclrs/src/node/builder.rs index 2dd82a79c..cdd4ffc53 100644 --- a/rclrs/src/node/builder.rs +++ b/rclrs/src/node/builder.rs @@ -3,8 +3,8 @@ use std::sync::{Arc, Mutex}; use crate::rcl_bindings::*; use crate::{ - ClockType, Context, Node, ParameterInterface, QoSProfile, RclrsError, TimeSource, ToResult, - QOS_PROFILE_CLOCK, + ClockType, Context, ContextHandle, Node, NodeHandle, ParameterInterface, QoSProfile, + RclrsError, TimeSource, ToResult, ENTITY_LIFECYCLE_MUTEX, QOS_PROFILE_CLOCK, }; /// A builder for creating a [`Node`][1]. @@ -42,7 +42,7 @@ use crate::{ /// [1]: crate::Node /// [2]: crate::Node::builder pub struct NodeBuilder { - context: Arc>, + context: Arc, name: String, namespace: String, use_global_arguments: bool, @@ -83,14 +83,14 @@ impl NodeBuilder { /// RclrsError::RclError { code: RclReturnCode::NodeInvalidName, .. } /// )); /// # Ok::<(), RclrsError>(()) - /// ``` - /// + /// ``` + /// /// [1]: crate::Node#naming /// [2]: https://docs.ros2.org/latest/api/rmw/validate__node__name_8h.html#a5690a285aed9735f89ef11950b6e39e3 /// [3]: NodeBuilder::build pub fn new(context: &Context, name: &str) -> NodeBuilder { NodeBuilder { - context: context.rcl_context_mtx.clone(), + context: Arc::clone(&context.handle), name: name.to_string(), namespace: "/".to_string(), use_global_arguments: true, @@ -193,7 +193,7 @@ impl NodeBuilder { /// used in creating the context. /// /// For more details about command line arguments, see [here][2]. - /// + /// /// # Example /// ``` /// # use rclrs::{Context, Node, NodeBuilder, RclrsError}; @@ -261,15 +261,18 @@ impl NodeBuilder { s: self.namespace.clone(), })?; let rcl_node_options = self.create_rcl_node_options()?; - let rcl_context = &mut *self.context.lock().unwrap(); + let rcl_context = &mut *self.context.rcl_context.lock().unwrap(); // SAFETY: Getting a zero-initialized value is always safe. let mut rcl_node = unsafe { rcl_get_zero_initialized_node() }; unsafe { - // SAFETY: The rcl_node is zero-initialized as expected by this function. - // The strings and node options are copied by this function, so we don't need - // to keep them alive. - // The rcl_context has to be kept alive because it is co-owned by the node. + // SAFETY: + // * The rcl_node is zero-initialized as mandated by this function. + // * The strings and node options are copied by this function, so we don't need to keep them alive. + // * The rcl_context is kept alive by the ContextHandle because it is a dependency of the node. + // * The entity lifecycle mutex is locked to protect against the risk of + // global variables in the rmw implementation being unsafely modified during cleanup. + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); rcl_node_init( &mut rcl_node, node_name.as_ptr(), @@ -280,15 +283,20 @@ impl NodeBuilder { .ok()?; }; - let rcl_node_mtx = Arc::new(Mutex::new(rcl_node)); - let parameter = ParameterInterface::new( - &rcl_node_mtx, - &rcl_node_options.arguments, - &rcl_context.global_arguments, - )?; + let handle = Arc::new(NodeHandle { + rcl_node: Mutex::new(rcl_node), + context_handle: Arc::clone(&self.context), + }); + let parameter = { + let rcl_node = handle.rcl_node.lock().unwrap(); + ParameterInterface::new( + &rcl_node, + &rcl_node_options.arguments, + &rcl_context.global_arguments, + )? + }; let node = Arc::new(Node { - rcl_node_mtx, - rcl_context_mtx: self.context.clone(), + handle, clients_mtx: Mutex::new(vec![]), guard_conditions_mtx: Mutex::new(vec![]), services_mtx: Mutex::new(vec![]), diff --git a/rclrs/src/node/graph.rs b/rclrs/src/node/graph.rs index 968bb5c4b..01ad8fd3d 100644 --- a/rclrs/src/node/graph.rs +++ b/rclrs/src/node/graph.rs @@ -137,8 +137,9 @@ impl Node { // SAFETY: rcl_names_and_types is zero-initialized as expected by this call unsafe { + let rcl_node = self.handle.rcl_node.lock().unwrap(); rcl_get_topic_names_and_types( - &*self.rcl_node_mtx.lock().unwrap(), + &*rcl_node, &mut rcutils_get_default_allocator(), false, &mut rcl_names_and_types, @@ -166,8 +167,9 @@ impl Node { // SAFETY: node_names and node_namespaces are zero-initialized as expected by this call. unsafe { + let rcl_node = self.handle.rcl_node.lock().unwrap(); rcl_get_node_names( - &*self.rcl_node_mtx.lock().unwrap(), + &*rcl_node, rcutils_get_default_allocator(), &mut rcl_names, &mut rcl_namespaces, @@ -213,8 +215,9 @@ impl Node { // SAFETY: The node_names, namespaces, and enclaves are zero-initialized as expected by this call. unsafe { + let rcl_node = self.handle.rcl_node.lock().unwrap(); rcl_get_node_names_with_enclaves( - &*self.rcl_node_mtx.lock().unwrap(), + &*rcl_node, rcutils_get_default_allocator(), &mut rcl_names, &mut rcl_namespaces, @@ -262,12 +265,8 @@ impl Node { // SAFETY: The topic_name string was correctly allocated previously unsafe { - rcl_count_publishers( - &*self.rcl_node_mtx.lock().unwrap(), - topic_name.as_ptr(), - &mut count, - ) - .ok()? + let rcl_node = self.handle.rcl_node.lock().unwrap(); + rcl_count_publishers(&*rcl_node, topic_name.as_ptr(), &mut count).ok()? }; Ok(count) } @@ -282,12 +281,8 @@ impl Node { // SAFETY: The topic_name string was correctly allocated previously unsafe { - rcl_count_subscribers( - &*self.rcl_node_mtx.lock().unwrap(), - topic_name.as_ptr(), - &mut count, - ) - .ok()? + let rcl_node = self.handle.rcl_node.lock().unwrap(); + rcl_count_subscribers(&*rcl_node, topic_name.as_ptr(), &mut count).ok()? }; Ok(count) } @@ -336,8 +331,9 @@ impl Node { // SAFETY: node_name and node_namespace have been zero-initialized. unsafe { + let rcl_node = self.handle.rcl_node.lock().unwrap(); getter( - &*self.rcl_node_mtx.lock().unwrap(), + &*rcl_node, &mut rcutils_get_default_allocator(), node_name.as_ptr(), node_namespace.as_ptr(), @@ -371,8 +367,9 @@ impl Node { // SAFETY: topic has been zero-initialized unsafe { + let rcl_node = self.handle.rcl_node.lock().unwrap(); getter( - &*self.rcl_node_mtx.lock().unwrap(), + &*rcl_node, &mut rcutils_get_default_allocator(), topic.as_ptr(), false, @@ -458,14 +455,39 @@ fn convert_names_and_types( #[cfg(test)] mod tests { use super::*; - use crate::Context; + use crate::{Context, InitOptions}; #[test] fn test_graph_empty() { - let context = Context::new([]).unwrap(); + // cargo test by default will run all test functions in parallel using + // as many threads as the underlying system allows. However, the test + // expectations of test_graph_empty will fail if its detects any other middleware + // activity while it's running. + // + // If we ensure that the Context of test_graph_empty is given a different domain ID + // from the rest of the tests, then we can ensure that it will not observe any other + // middleware activity, and its expectations can pass (as long as the user is not + // running any other ROS executables on their system). + // + // By default we will assign 99 to the domain ID of test_graph_empty's Context. + // However, if the ROS_DOMAIN_ID environment variable was set to 99 by the user, + // then the rest of the tests will be using that value. So here we are detecting + // that situation and setting the domain ID of test_graph_empty's Context to 98 + // in that situation. + // + // 99 and 98 are just chosen as arbitrary valid domain ID values. There is + // otherwise nothing special about either value. + let domain_id: usize = std::env::var("ROS_DOMAIN_ID") + .ok() + .and_then(|value| value.parse().ok()) + .map(|value: usize| if value != 99 { 99 } else { 98 }) + .unwrap_or(99); + + let context = + Context::new_with_options([], InitOptions::new().with_domain_id(Some(domain_id))) + .unwrap(); let node_name = "test_publisher_names_and_types"; let node = Node::new(&context, node_name).unwrap(); - // Test that the graph has no publishers let names_and_topics = node .get_publisher_names_and_types_by_node(node_name, "") diff --git a/rclrs/src/parameter.rs b/rclrs/src/parameter.rs index 75256d869..db3bb3948 100644 --- a/rclrs/src/parameter.rs +++ b/rclrs/src/parameter.rs @@ -5,7 +5,7 @@ pub(crate) use override_map::*; pub use value::*; use crate::rcl_bindings::*; -use crate::{call_string_getter_with_handle, RclrsError}; +use crate::{call_string_getter_with_rcl_node, RclrsError}; use std::collections::{btree_map::Entry, BTreeMap}; use std::fmt::Debug; use std::marker::PhantomData; @@ -776,13 +776,12 @@ pub(crate) struct ParameterInterface { impl ParameterInterface { pub(crate) fn new( - rcl_node_mtx: &Arc>, + rcl_node: &rcl_node_t, node_arguments: &rcl_arguments_t, global_arguments: &rcl_arguments_t, ) -> Result { - let rcl_node = rcl_node_mtx.lock().unwrap(); let override_map = unsafe { - let fqn = call_string_getter_with_handle(&rcl_node, rcl_node_get_fully_qualified_name); + let fqn = call_string_getter_with_rcl_node(rcl_node, rcl_node_get_fully_qualified_name); resolve_parameter_overrides(&fqn, node_arguments, global_arguments)? }; diff --git a/rclrs/src/parameter/value.rs b/rclrs/src/parameter/value.rs index 1802145f5..a56e77ffc 100644 --- a/rclrs/src/parameter/value.rs +++ b/rclrs/src/parameter/value.rs @@ -433,7 +433,7 @@ mod tests { let mut rcl_params = std::ptr::null_mut(); unsafe { rcl_arguments_get_param_overrides( - &ctx.rcl_context_mtx.lock().unwrap().global_arguments, + &ctx.handle.rcl_context.lock().unwrap().global_arguments, &mut rcl_params, ) .ok()?; diff --git a/rclrs/src/publisher.rs b/rclrs/src/publisher.rs index 0f512c5f6..82034f9ae 100644 --- a/rclrs/src/publisher.rs +++ b/rclrs/src/publisher.rs @@ -9,6 +9,7 @@ use rosidl_runtime_rs::{Message, RmwMessage}; use crate::error::{RclrsError, ToResult}; use crate::qos::QoSProfile; use crate::rcl_bindings::*; +use crate::{NodeHandle, ENTITY_LIFECYCLE_MUTEX}; mod loaned_message; pub use loaned_message::*; @@ -17,6 +18,28 @@ pub use loaned_message::*; // they are running in. Therefore, this type can be safely sent to another thread. unsafe impl Send for rcl_publisher_t {} +/// Manage the lifecycle of an `rcl_publisher_t`, including managing its dependencies +/// on `rcl_node_t` and `rcl_context_t` by ensuring that these dependencies are +/// [dropped after][1] the `rcl_publisher_t`. +/// +/// [1]: +struct PublisherHandle { + rcl_publisher: Mutex, + node_handle: Arc, +} + +impl Drop for PublisherHandle { + fn drop(&mut self) { + let mut rcl_node = self.node_handle.rcl_node.lock().unwrap(); + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + // SAFETY: The entity lifecycle mutex is locked to protect against the risk of + // global variables in the rmw implementation being unsafely modified during cleanup. + unsafe { + rcl_publisher_fini(self.rcl_publisher.get_mut().unwrap(), &mut *rcl_node); + } + } +} + /// Struct for sending messages of type `T`. /// /// Multiple publishers can be created for the same topic, in different nodes or the same node. @@ -31,27 +54,11 @@ pub struct Publisher where T: Message, { - rcl_publisher_mtx: Mutex, - rcl_node_mtx: Arc>, // The data pointed to by type_support_ptr has static lifetime; // it is global data in the type support library. type_support_ptr: *const rosidl_message_type_support_t, message: PhantomData, -} - -impl Drop for Publisher -where - T: Message, -{ - fn drop(&mut self) { - unsafe { - // SAFETY: No preconditions for this function (besides the arguments being valid). - rcl_publisher_fini( - self.rcl_publisher_mtx.get_mut().unwrap(), - &mut *self.rcl_node_mtx.lock().unwrap(), - ); - } - } + handle: PublisherHandle, } // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread @@ -68,8 +75,8 @@ where /// Creates a new `Publisher`. /// /// Node and namespace changes are always applied _before_ topic remapping. - pub fn new( - rcl_node_mtx: Arc>, + pub(crate) fn new( + node_handle: Arc, topic: &str, qos: QoSProfile, ) -> Result @@ -88,27 +95,35 @@ where // SAFETY: No preconditions for this function. let mut publisher_options = unsafe { rcl_publisher_get_default_options() }; publisher_options.qos = qos.into(); - unsafe { - // SAFETY: The rcl_publisher is zero-initialized as expected by this function. - // The rcl_node is kept alive because it is co-owned by the subscription. - // The topic name and the options are copied by this function, so they can be dropped - // afterwards. - // TODO: type support? - rcl_publisher_init( - &mut rcl_publisher, - &*rcl_node_mtx.lock().unwrap(), - type_support_ptr, - topic_c_string.as_ptr(), - &publisher_options, - ) - .ok()?; + + { + let rcl_node = node_handle.rcl_node.lock().unwrap(); + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + unsafe { + // SAFETY: + // * The rcl_publisher is zero-initialized as mandated by this function. + // * The rcl_node is kept alive by the NodeHandle because it is a dependency of the publisher. + // * The topic name and the options are copied by this function, so they can be dropped afterwards. + // * The entity lifecycle mutex is locked to protect against the risk of global + // variables in the rmw implementation being unsafely modified during cleanup. + rcl_publisher_init( + &mut rcl_publisher, + &*rcl_node, + type_support_ptr, + topic_c_string.as_ptr(), + &publisher_options, + ) + .ok()?; + } } Ok(Self { - rcl_publisher_mtx: Mutex::new(rcl_publisher), - rcl_node_mtx, type_support_ptr, message: PhantomData, + handle: PublisherHandle { + rcl_publisher: Mutex::new(rcl_publisher), + node_handle, + }, }) } @@ -121,7 +136,7 @@ where // The unsafe variables created get converted to safe types before being returned unsafe { let raw_topic_pointer = - rcl_publisher_get_topic_name(&*self.rcl_publisher_mtx.lock().unwrap()); + rcl_publisher_get_topic_name(&*self.handle.rcl_publisher.lock().unwrap()); CStr::from_ptr(raw_topic_pointer) .to_string_lossy() .into_owned() @@ -146,7 +161,7 @@ where /// [1]: https://github.com/ros2/ros2/issues/255 pub fn publish<'a, M: MessageCow<'a, T>>(&self, message: M) -> Result<(), RclrsError> { let rmw_message = T::into_rmw_message(message.into_cow()); - let rcl_publisher = &mut *self.rcl_publisher_mtx.lock().unwrap(); + let rcl_publisher = &mut *self.handle.rcl_publisher.lock().unwrap(); unsafe { // SAFETY: The message type is guaranteed to match the publisher type by the type system. // The message does not need to be valid beyond the duration of this function call. @@ -200,7 +215,7 @@ where unsafe { // SAFETY: msg_ptr contains a null ptr as expected by this function. rcl_borrow_loaned_message( - &*self.rcl_publisher_mtx.lock().unwrap(), + &*self.handle.rcl_publisher.lock().unwrap(), self.type_support_ptr, &mut msg_ptr, ) diff --git a/rclrs/src/publisher/loaned_message.rs b/rclrs/src/publisher/loaned_message.rs index 350c1ad23..742b1e7fe 100644 --- a/rclrs/src/publisher/loaned_message.rs +++ b/rclrs/src/publisher/loaned_message.rs @@ -55,7 +55,7 @@ where unsafe { // SAFETY: These two pointers are valid, and the msg_ptr is not used afterwards. rcl_return_loaned_message_from_publisher( - &*self.publisher.rcl_publisher_mtx.lock().unwrap(), + &*self.publisher.handle.rcl_publisher.lock().unwrap(), self.msg_ptr as *mut _, ) .ok() @@ -80,7 +80,7 @@ where unsafe { // SAFETY: These two pointers are valid, and the msg_ptr is not used afterwards. rcl_publish_loaned_message( - &*self.publisher.rcl_publisher_mtx.lock().unwrap(), + &*self.publisher.handle.rcl_publisher.lock().unwrap(), self.msg_ptr as *mut _, std::ptr::null_mut(), ) diff --git a/rclrs/src/service.rs b/rclrs/src/service.rs index 6e55094d6..5dfa8f511 100644 --- a/rclrs/src/service.rs +++ b/rclrs/src/service.rs @@ -7,31 +7,38 @@ use rosidl_runtime_rs::Message; use crate::error::{RclReturnCode, ToResult}; use crate::{rcl_bindings::*, MessageCow, RclrsError}; +use crate::{NodeHandle, ENTITY_LIFECYCLE_MUTEX}; // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread // they are running in. Therefore, this type can be safely sent to another thread. unsafe impl Send for rcl_service_t {} -/// Internal struct used by services. +/// Manage the lifecycle of an `rcl_service_t`, including managing its dependencies +/// on `rcl_node_t` and `rcl_context_t` by ensuring that these dependencies are +/// [dropped after][1] the `rcl_service_t`. +/// +/// [1]: pub struct ServiceHandle { - rcl_service_mtx: Mutex, - rcl_node_mtx: Arc>, + rcl_service: Mutex, + node_handle: Arc, pub(crate) in_use_by_wait_set: Arc, } impl ServiceHandle { pub(crate) fn lock(&self) -> MutexGuard { - self.rcl_service_mtx.lock().unwrap() + self.rcl_service.lock().unwrap() } } impl Drop for ServiceHandle { fn drop(&mut self) { - let rcl_service = self.rcl_service_mtx.get_mut().unwrap(); - let rcl_node = &mut *self.rcl_node_mtx.lock().unwrap(); - // SAFETY: No preconditions for this function + let rcl_service = self.rcl_service.get_mut().unwrap(); + let mut rcl_node = self.node_handle.rcl_node.lock().unwrap(); + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + // SAFETY: The entity lifecycle mutex is locked to protect against the risk of + // global variables in the rmw implementation being unsafely modified during cleanup. unsafe { - rcl_service_fini(rcl_service, rcl_node); + rcl_service_fini(rcl_service, &mut *rcl_node); } } } @@ -71,7 +78,7 @@ where { /// Creates a new service. pub(crate) fn new( - rcl_node_mtx: Arc>, + node_handle: Arc, topic: &str, callback: F, ) -> Result @@ -93,24 +100,31 @@ where // SAFETY: No preconditions for this function. let service_options = unsafe { rcl_service_get_default_options() }; - unsafe { - // SAFETY: The rcl_service is zero-initialized as expected by this function. - // The rcl_node is kept alive because it is co-owned by the service. - // The topic name and the options are copied by this function, so they can be dropped - // afterwards. - rcl_service_init( - &mut rcl_service, - &*rcl_node_mtx.lock().unwrap(), - type_support, - topic_c_string.as_ptr(), - &service_options as *const _, - ) - .ok()?; + { + let rcl_node = node_handle.rcl_node.lock().unwrap(); + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + unsafe { + // SAFETY: + // * The rcl_service is zero-initialized as mandated by this function. + // * The rcl_node is kept alive by the NodeHandle it is a dependency of the service. + // * The topic name and the options are copied by this function, so they can be dropped + // afterwards. + // * The entity lifecycle mutex is locked to protect against the risk of global + // variables in the rmw implementation being unsafely modified during initialization. + rcl_service_init( + &mut rcl_service, + &*rcl_node, + type_support, + topic_c_string.as_ptr(), + &service_options as *const _, + ) + .ok()?; + } } let handle = Arc::new(ServiceHandle { - rcl_service_mtx: Mutex::new(rcl_service), - rcl_node_mtx, + rcl_service: Mutex::new(rcl_service), + node_handle, in_use_by_wait_set: Arc::new(AtomicBool::new(false)), }); diff --git a/rclrs/src/subscription.rs b/rclrs/src/subscription.rs index 7e3439df1..cbc72bc1b 100644 --- a/rclrs/src/subscription.rs +++ b/rclrs/src/subscription.rs @@ -8,7 +8,7 @@ use rosidl_runtime_rs::{Message, RmwMessage}; use crate::error::{RclReturnCode, ToResult}; use crate::qos::QoSProfile; -use crate::{rcl_bindings::*, RclrsError}; +use crate::{rcl_bindings::*, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX}; mod callback; mod message_info; @@ -21,26 +21,32 @@ pub use readonly_loaned_message::*; // they are running in. Therefore, this type can be safely sent to another thread. unsafe impl Send for rcl_subscription_t {} -/// Internal struct used by subscriptions. +/// Manage the lifecycle of an `rcl_subscription_t`, including managing its dependencies +/// on `rcl_node_t` and `rcl_context_t` by ensuring that these dependencies are +/// [dropped after][1] the `rcl_subscription_t`. +/// +/// [1]: pub struct SubscriptionHandle { - rcl_subscription_mtx: Mutex, - rcl_node_mtx: Arc>, + rcl_subscription: Mutex, + node_handle: Arc, pub(crate) in_use_by_wait_set: Arc, } impl SubscriptionHandle { pub(crate) fn lock(&self) -> MutexGuard { - self.rcl_subscription_mtx.lock().unwrap() + self.rcl_subscription.lock().unwrap() } } impl Drop for SubscriptionHandle { fn drop(&mut self) { - let rcl_subscription = self.rcl_subscription_mtx.get_mut().unwrap(); - let rcl_node = &mut *self.rcl_node_mtx.lock().unwrap(); - // SAFETY: No preconditions for this function (besides the arguments being valid). + let rcl_subscription = self.rcl_subscription.get_mut().unwrap(); + let mut rcl_node = self.node_handle.rcl_node.lock().unwrap(); + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + // SAFETY: The entity lifecycle mutex is locked to protect against the risk of + // global variables in the rmw implementation being unsafely modified during cleanup. unsafe { - rcl_subscription_fini(rcl_subscription, rcl_node); + rcl_subscription_fini(rcl_subscription, &mut *rcl_node); } } } @@ -85,7 +91,7 @@ where { /// Creates a new subscription. pub(crate) fn new( - rcl_node_mtx: Arc>, + node_handle: Arc, topic: &str, qos: QoSProfile, callback: impl SubscriptionCallback, @@ -107,25 +113,31 @@ where // SAFETY: No preconditions for this function. let mut subscription_options = unsafe { rcl_subscription_get_default_options() }; subscription_options.qos = qos.into(); - unsafe { - // SAFETY: The rcl_subscription is zero-initialized as expected by this function. - // The rcl_node is kept alive because it is co-owned by the subscription. - // The topic name and the options are copied by this function, so they can be dropped - // afterwards. - // TODO: type support? - rcl_subscription_init( - &mut rcl_subscription, - &*rcl_node_mtx.lock().unwrap(), - type_support, - topic_c_string.as_ptr(), - &subscription_options, - ) - .ok()?; + + { + let rcl_node = node_handle.rcl_node.lock().unwrap(); + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + unsafe { + // SAFETY: + // * The rcl_subscription is zero-initialized as mandated by this function. + // * The rcl_node is kept alive by the NodeHandle because it is a dependency of the subscription. + // * The topic name and the options are copied by this function, so they can be dropped afterwards. + // * The entity lifecycle mutex is locked to protect against the risk of global + // variables in the rmw implementation being unsafely modified during cleanup. + rcl_subscription_init( + &mut rcl_subscription, + &*rcl_node, + type_support, + topic_c_string.as_ptr(), + &subscription_options, + ) + .ok()?; + } } let handle = Arc::new(SubscriptionHandle { - rcl_subscription_mtx: Mutex::new(rcl_subscription), - rcl_node_mtx, + rcl_subscription: Mutex::new(rcl_subscription), + node_handle, in_use_by_wait_set: Arc::new(AtomicBool::new(false)), }); diff --git a/rclrs/src/wait.rs b/rclrs/src/wait.rs index 94811c75a..cd7c51f93 100644 --- a/rclrs/src/wait.rs +++ b/rclrs/src/wait.rs @@ -15,24 +15,33 @@ // DISTRIBUTION A. Approved for public release; distribution unlimited. // OPSEC #4584. -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::Duration; use std::vec::Vec; use crate::error::{to_rclrs_result, RclReturnCode, RclrsError, ToResult}; use crate::rcl_bindings::*; -use crate::{ClientBase, Context, Node, ServiceBase, SubscriptionBase}; +use crate::{ClientBase, Context, ContextHandle, Node, ServiceBase, SubscriptionBase}; mod exclusivity_guard; mod guard_condition; use exclusivity_guard::*; pub use guard_condition::*; -/// A struct for waiting on subscriptions and other waitable entities to become ready. -pub struct WaitSet { +/// Manage the lifecycle of an `rcl_wait_set_t`, including managing its dependency +/// on `rcl_context_t` by ensuring that this dependency is [dropped after][1] the +/// `rcl_wait_set_t`. +/// +/// [1]: +struct WaitSetHandle { rcl_wait_set: rcl_wait_set_t, // Used to ensure the context is alive while the wait set is alive. - _rcl_context_mtx: Arc>, + #[allow(dead_code)] + context_handle: Arc, +} + +/// A struct for waiting on subscriptions and other waitable entities to become ready. +pub struct WaitSet { // The subscriptions that are currently registered in the wait set. // This correspondence is an invariant that must be maintained by all functions, // even in the error case. @@ -41,6 +50,7 @@ pub struct WaitSet { // The guard conditions that are currently registered in the wait set. guard_conditions: Vec>>, services: Vec>>, + handle: WaitSetHandle, } /// A list of entities that are ready, returned by [`WaitSet::wait`]. @@ -91,6 +101,7 @@ impl WaitSet { let rcl_wait_set = unsafe { // SAFETY: Getting a zero-initialized value is always safe let mut rcl_wait_set = rcl_get_zero_initialized_wait_set(); + let mut rcl_context = context.handle.rcl_context.lock().unwrap(); // SAFETY: We're passing in a zero-initialized wait set and a valid context. // There are no other preconditions. rcl_wait_set_init( @@ -101,19 +112,21 @@ impl WaitSet { number_of_clients, number_of_services, number_of_events, - &mut *context.rcl_context_mtx.lock().unwrap(), + &mut *rcl_context, rcutils_get_default_allocator(), ) .ok()?; rcl_wait_set }; Ok(Self { - rcl_wait_set, - _rcl_context_mtx: context.rcl_context_mtx.clone(), subscriptions: Vec::new(), guard_conditions: Vec::new(), clients: Vec::new(), services: Vec::new(), + handle: WaitSetHandle { + rcl_wait_set, + context_handle: Arc::clone(&context.handle), + }, }) } @@ -126,7 +139,7 @@ impl WaitSet { let live_guard_conditions = node.live_guard_conditions(); let live_services = node.live_services(); let ctx = Context { - rcl_context_mtx: node.rcl_context_mtx.clone(), + handle: Arc::clone(&node.handle.context_handle), }; let mut wait_set = WaitSet::new( live_subscriptions.len(), @@ -169,7 +182,7 @@ impl WaitSet { // valid, which it always is in our case. Hence, only debug_assert instead of returning // Result. // SAFETY: No preconditions for this function (besides passing in a valid wait set). - let ret = unsafe { rcl_wait_set_clear(&mut self.rcl_wait_set) }; + let ret = unsafe { rcl_wait_set_clear(&mut self.handle.rcl_wait_set) }; debug_assert_eq!(ret, 0); } @@ -196,7 +209,7 @@ impl WaitSet { // for as long as the wait set exists, because it's stored in self.subscriptions. // Passing in a null pointer for the third argument is explicitly allowed. rcl_wait_set_add_subscription( - &mut self.rcl_wait_set, + &mut self.handle.rcl_wait_set, &*subscription.handle().lock(), std::ptr::null_mut(), ) @@ -228,8 +241,8 @@ impl WaitSet { unsafe { // SAFETY: Safe if the wait set and guard condition are initialized rcl_wait_set_add_guard_condition( - &mut self.rcl_wait_set, - &*guard_condition.rcl_guard_condition.lock().unwrap(), + &mut self.handle.rcl_wait_set, + &*guard_condition.handle.rcl_guard_condition.lock().unwrap(), std::ptr::null_mut(), ) .ok()?; @@ -258,7 +271,7 @@ impl WaitSet { // for as long as the wait set exists, because it's stored in self.clients. // Passing in a null pointer for the third argument is explicitly allowed. rcl_wait_set_add_client( - &mut self.rcl_wait_set, + &mut self.handle.rcl_wait_set, &*client.handle().lock() as *const _, core::ptr::null_mut(), ) @@ -288,7 +301,7 @@ impl WaitSet { // for as long as the wait set exists, because it's stored in self.services. // Passing in a null pointer for the third argument is explicitly allowed. rcl_wait_set_add_service( - &mut self.rcl_wait_set, + &mut self.handle.rcl_wait_set, &*service.handle().lock() as *const _, core::ptr::null_mut(), ) @@ -337,7 +350,7 @@ impl WaitSet { // We cannot currently guarantee that the wait sets may not share content, but it is // mentioned in the doc comment for `add_subscription`. // Also, the rcl_wait_set is obviously valid. - match unsafe { rcl_wait(&mut self.rcl_wait_set, timeout_ns) }.ok() { + match unsafe { rcl_wait(&mut self.handle.rcl_wait_set, timeout_ns) }.ok() { Ok(_) => (), Err(error) => match error { RclrsError::RclError { code, msg } => match code { @@ -357,7 +370,7 @@ impl WaitSet { // SAFETY: The `subscriptions` entry is an array of pointers, and this dereferencing is // equivalent to // https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419 - let wait_set_entry = unsafe { *self.rcl_wait_set.subscriptions.add(i) }; + let wait_set_entry = unsafe { *self.handle.rcl_wait_set.subscriptions.add(i) }; if !wait_set_entry.is_null() { ready_entities .subscriptions @@ -369,7 +382,7 @@ impl WaitSet { // SAFETY: The `clients` entry is an array of pointers, and this dereferencing is // equivalent to // https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419 - let wait_set_entry = unsafe { *self.rcl_wait_set.clients.add(i) }; + let wait_set_entry = unsafe { *self.handle.rcl_wait_set.clients.add(i) }; if !wait_set_entry.is_null() { ready_entities.clients.push(Arc::clone(&client.waitable)); } @@ -379,7 +392,7 @@ impl WaitSet { // SAFETY: The `clients` entry is an array of pointers, and this dereferencing is // equivalent to // https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419 - let wait_set_entry = unsafe { *self.rcl_wait_set.guard_conditions.add(i) }; + let wait_set_entry = unsafe { *self.handle.rcl_wait_set.guard_conditions.add(i) }; if !wait_set_entry.is_null() { ready_entities .guard_conditions @@ -391,7 +404,7 @@ impl WaitSet { // SAFETY: The `services` entry is an array of pointers, and this dereferencing is // equivalent to // https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419 - let wait_set_entry = unsafe { *self.rcl_wait_set.services.add(i) }; + let wait_set_entry = unsafe { *self.handle.rcl_wait_set.services.add(i) }; if !wait_set_entry.is_null() { ready_entities.services.push(Arc::clone(&service.waitable)); } diff --git a/rclrs/src/wait/guard_condition.rs b/rclrs/src/wait/guard_condition.rs index f4d9f651d..e9c4f1148 100644 --- a/rclrs/src/wait/guard_condition.rs +++ b/rclrs/src/wait/guard_condition.rs @@ -1,7 +1,7 @@ use std::sync::{atomic::AtomicBool, Arc, Mutex}; use crate::rcl_bindings::*; -use crate::{Context, RclrsError, ToResult}; +use crate::{Context, ContextHandle, RclrsError, ToResult}; /// A waitable entity used for waking up a wait set manually. /// @@ -45,18 +45,30 @@ use crate::{Context, RclrsError, ToResult}; /// ``` pub struct GuardCondition { /// The rcl_guard_condition_t that this struct encapsulates. - pub(crate) rcl_guard_condition: Mutex, + pub(crate) handle: GuardConditionHandle, /// An optional callback to call when this guard condition is triggered. callback: Option>, /// A flag to indicate if this guard condition has already been assigned to a wait set. pub(crate) in_use_by_wait_set: Arc, } +/// Manage the lifecycle of an `rcl_guard_condition_t`, including managing its dependency +/// on `rcl_context_t` by ensuring that this dependency is [dropped after][1] the +/// `rcl_guard_condition_t`. +/// +/// [1]: +pub(crate) struct GuardConditionHandle { + pub(crate) rcl_guard_condition: Mutex, + /// Keep the context alive for the whole lifecycle of the guard condition + #[allow(dead_code)] + pub(crate) context_handle: Arc, +} + impl Drop for GuardCondition { fn drop(&mut self) { unsafe { // SAFETY: No precondition for this function (besides passing in a valid guard condition) - rcl_guard_condition_fini(&mut *self.rcl_guard_condition.lock().unwrap()); + rcl_guard_condition_fini(&mut *self.handle.rcl_guard_condition.lock().unwrap()); } } } @@ -66,8 +78,8 @@ impl PartialEq for GuardCondition { // Because GuardCondition controls the creation of the rcl_guard_condition, each unique GuardCondition should have a unique // rcl_guard_condition. Thus comparing equality of this member should be enough. std::ptr::eq( - &self.rcl_guard_condition.lock().unwrap().impl_, - &other.rcl_guard_condition.lock().unwrap().impl_, + &self.handle.rcl_guard_condition.lock().unwrap().impl_, + &other.handle.rcl_guard_condition.lock().unwrap().impl_, ) } } @@ -80,7 +92,7 @@ unsafe impl Send for rcl_guard_condition_t {} impl GuardCondition { /// Creates a new guard condition with no callback. pub fn new(context: &Context) -> Self { - Self::new_with_rcl_context(&mut context.rcl_context_mtx.lock().unwrap(), None) + Self::new_with_context_handle(Arc::clone(&context.handle), None) } /// Creates a new guard condition with a callback. @@ -88,8 +100,8 @@ impl GuardCondition { where F: Fn() + Send + Sync + 'static, { - Self::new_with_rcl_context( - &mut context.rcl_context_mtx.lock().unwrap(), + Self::new_with_context_handle( + Arc::clone(&context.handle), Some(Box::new(callback) as Box), ) } @@ -98,23 +110,31 @@ impl GuardCondition { /// Note this function enables calling `Node::create_guard_condition`[1] without providing the Context separately /// /// [1]: Node::create_guard_condition - pub(crate) fn new_with_rcl_context( - context: &mut rcl_context_t, + pub(crate) fn new_with_context_handle( + context_handle: Arc, callback: Option>, ) -> Self { - // SAFETY: Getting a zero initialized value is always safe - let mut guard_condition = unsafe { rcl_get_zero_initialized_guard_condition() }; - unsafe { - // SAFETY: The context must be valid, and the guard condition must be zero-initialized - rcl_guard_condition_init( - &mut guard_condition, - context, - rcl_guard_condition_get_default_options(), - ); - } + let rcl_guard_condition = { + // SAFETY: Getting a zero initialized value is always safe + let mut guard_condition = unsafe { rcl_get_zero_initialized_guard_condition() }; + let mut rcl_context = context_handle.rcl_context.lock().unwrap(); + unsafe { + // SAFETY: The context must be valid, and the guard condition must be zero-initialized + rcl_guard_condition_init( + &mut guard_condition, + &mut *rcl_context, + rcl_guard_condition_get_default_options(), + ); + } + + Mutex::new(guard_condition) + }; Self { - rcl_guard_condition: Mutex::new(guard_condition), + handle: GuardConditionHandle { + rcl_guard_condition, + context_handle, + }, callback, in_use_by_wait_set: Arc::new(AtomicBool::new(false)), } @@ -124,7 +144,8 @@ impl GuardCondition { pub fn trigger(&self) -> Result<(), RclrsError> { unsafe { // SAFETY: The rcl_guard_condition_t is valid. - rcl_trigger_guard_condition(&mut *self.rcl_guard_condition.lock().unwrap()).ok()?; + rcl_trigger_guard_condition(&mut *self.handle.rcl_guard_condition.lock().unwrap()) + .ok()?; } if let Some(callback) = &self.callback { callback();