Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correct rcl entity lifecycles and fix spurious test failures #386

Merged
merged 20 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion rclrs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@ path = "src/lib.rs"
# Please keep the list of dependencies alphabetically sorted,
# and also state why each dependency is needed.
[dependencies]
# Needed for dynamically finding type support libraries
# Needed for dynamically finding type support libraries
ament_rs = { version = "0.2", optional = true }

# Needed for uploading documentation to docs.rs
cfg-if = "1.0.0"

# Needed for clients
futures = "0.3"

# Needed for dynamic messages
libloading = { version = "0.8", optional = true }

# Needed for the Message trait, among others
rosidl_runtime_rs = "0.4"

Expand Down
68 changes: 41 additions & 27 deletions rclrs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,38 @@ use rosidl_runtime_rs::Message;

use crate::error::{RclReturnCode, ToResult};
use crate::MessageCow;
use crate::{rcl_bindings::*, RclrsError};
use crate::{rcl_bindings::*, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX};

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
// they are running in. Therefore, this type can be safely sent to another thread.
unsafe impl Send for rcl_client_t {}

/// Internal struct used by clients.
/// Manage the lifecycle of an `rcl_client_t`, including managing its dependencies
/// on `rcl_node_t` and `rcl_context_t` by ensuring that these dependencies are
/// [dropped after][1] the `rcl_client_t`.
///
/// [1]: <https://doc.rust-lang.org/reference/destructors.html>
pub struct ClientHandle {
rcl_client_mtx: Mutex<rcl_client_t>,
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
rcl_client: Mutex<rcl_client_t>,
node_handle: Arc<NodeHandle>,
pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
}

impl ClientHandle {
pub(crate) fn lock(&self) -> MutexGuard<rcl_client_t> {
self.rcl_client_mtx.lock().unwrap()
self.rcl_client.lock().unwrap()
}
}

impl Drop for ClientHandle {
fn drop(&mut self) {
let rcl_client = self.rcl_client_mtx.get_mut().unwrap();
let rcl_node_mtx = &mut *self.rcl_node_mtx.lock().unwrap();
// SAFETY: No preconditions for this function
let rcl_client = self.rcl_client.get_mut().unwrap();
let mut rcl_node = self.node_handle.rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
// SAFETY: The entity lifecycle mutex is locked to protect against the risk of
// global variables in the rmw implementation being unsafely modified during cleanup.
unsafe {
rcl_client_fini(rcl_client, rcl_node_mtx);
rcl_client_fini(rcl_client, &mut *rcl_node);
}
}
}
Expand Down Expand Up @@ -74,7 +80,7 @@ where
T: rosidl_runtime_rs::Service,
{
/// Creates a new client.
pub(crate) fn new(rcl_node_mtx: Arc<Mutex<rcl_node_t>>, topic: &str) -> Result<Self, RclrsError>
pub(crate) fn new(node_handle: Arc<NodeHandle>, topic: &str) -> Result<Self, RclrsError>
// This uses pub(crate) visibility to avoid instantiating this struct outside
// [`Node::create_client`], see the struct's documentation for the rationale
where
Expand All @@ -92,24 +98,32 @@ where
// SAFETY: No preconditions for this function.
let client_options = unsafe { rcl_client_get_default_options() };

unsafe {
// SAFETY: The rcl_client is zero-initialized as expected by this function.
// The rcl_node is kept alive because it is co-owned by the client.
// The topic name and the options are copied by this function, so they can be dropped
// afterwards.
rcl_client_init(
&mut rcl_client,
&*rcl_node_mtx.lock().unwrap(),
type_support,
topic_c_string.as_ptr(),
&client_options,
)
.ok()?;
{
let rcl_node = node_handle.rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();

// SAFETY:
// * The rcl_client was zero-initialized as expected by this function.
// * The rcl_node is kept alive by the NodeHandle because it is a dependency of the client.
// * The topic name and the options are copied by this function, so they can be dropped
// afterwards.
// * The entity lifecycle mutex is locked to protect against the risk of global
// variables in the rmw implementation being unsafely modified during initialization.
unsafe {
rcl_client_init(
&mut rcl_client,
&*rcl_node,
type_support,
topic_c_string.as_ptr(),
&client_options,
)
.ok()?;
}
}

let handle = Arc::new(ClientHandle {
rcl_client_mtx: Mutex::new(rcl_client),
rcl_node_mtx,
rcl_client: Mutex::new(rcl_client),
node_handle,
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
});

Expand Down Expand Up @@ -245,8 +259,8 @@ where
///
pub fn service_is_ready(&self) -> Result<bool, RclrsError> {
let mut is_ready = false;
let client = &mut *self.handle.rcl_client_mtx.lock().unwrap();
let node = &mut *self.handle.rcl_node_mtx.lock().unwrap();
let client = &mut *self.handle.rcl_client.lock().unwrap();
let node = &mut *self.handle.node_handle.rcl_node.lock().unwrap();

unsafe {
// SAFETY both node and client are guaranteed to be valid here
Expand Down
168 changes: 143 additions & 25 deletions rclrs/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,31 @@ use std::vec::Vec;
use crate::rcl_bindings::*;
use crate::{RclrsError, ToResult};

/// This is locked whenever initializing or dropping any middleware entity
/// because we have found issues in RCL and some RMW implementations that
/// make it unsafe to simultaneously initialize and/or drop middleware
/// entities such as `rcl_context_t` and `rcl_node_t` as well middleware
/// primitives such as `rcl_publisher_t`, `rcl_subscription_t`, etc.
/// It seems these C and C++ based libraries will regularly use
/// unprotected global variables in their object initialization and cleanup.
///
/// Further discussion with the RCL team may help to improve the RCL
/// documentation to specifically call out where these risks are present. For
/// now we lock this mutex for any RCL function that carries reasonable suspicion
/// of a risk.
pub(crate) static ENTITY_LIFECYCLE_MUTEX: Mutex<()> = Mutex::new(());

impl Drop for rcl_context_t {
fn drop(&mut self) {
unsafe {
// The context may be invalid when rcl_init failed, e.g. because of invalid command
// line arguments.
// SAFETY: No preconditions for this function.

// SAFETY: No preconditions for rcl_context_is_valid.
if rcl_context_is_valid(self) {
// SAFETY: These functions have no preconditions besides a valid rcl_context
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
// SAFETY: The entity lifecycle mutex is locked to protect against the risk of
// global variables in the rmw implementation being unsafely modified during cleanup.
rcl_shutdown(self);
rcl_context_fini(self);
}
Expand All @@ -39,16 +56,26 @@ unsafe impl Send for rcl_context_t {}
/// - the allocator used (left as the default by `rclrs`)
///
pub struct Context {
pub(crate) rcl_context_mtx: Arc<Mutex<rcl_context_t>>,
pub(crate) handle: Arc<ContextHandle>,
}

/// This struct manages the lifetime and access to the `rcl_context_t`. It will also
/// account for the lifetimes of any dependencies, if we need to add
/// dependencies in the future (currently there are none). It is not strictly
/// necessary to decompose `Context` and `ContextHandle` like this, but we are
/// doing it to be consistent with the lifecycle management of other rcl
/// bindings in this library.
pub(crate) struct ContextHandle {
pub(crate) rcl_context: Mutex<rcl_context_t>,
}

impl Context {
/// Creates a new context.
///
/// Usually, this would be called with `std::env::args()`, analogously to `rclcpp::init()`.
/// Usually this would be called with `std::env::args()`, analogously to `rclcpp::init()`.
/// See also the official "Passing ROS arguments to nodes via the command-line" tutorial.
///
/// Creating a context can fail in case the args contain invalid ROS arguments.
/// Creating a context will fail if the args contain invalid ROS arguments.
///
/// # Example
/// ```
Expand All @@ -58,6 +85,21 @@ impl Context {
/// assert!(Context::new(invalid_remapping).is_err());
/// ```
pub fn new(args: impl IntoIterator<Item = String>) -> Result<Self, RclrsError> {
Self::new_with_options(args, InitOptions::new())
}

/// Same as [`Context::new`] except you can additionally provide initialization options.
///
/// # Example
/// ```
/// use rclrs::{Context, InitOptions};
/// let context = Context::new_with_options([], InitOptions::new().with_domain_id(Some(5))).unwrap();
/// assert_eq!(context.domain_id(), 5);
/// ````
pub fn new_with_options(
args: impl IntoIterator<Item = String>,
options: InitOptions,
) -> Result<Self, RclrsError> {
// SAFETY: Getting a zero-initialized value is always safe
let mut rcl_context = unsafe { rcl_get_zero_initialized_context() };
let cstring_args: Vec<CString> = args
Expand All @@ -74,48 +116,124 @@ impl Context {
unsafe {
// SAFETY: No preconditions for this function.
let allocator = rcutils_get_default_allocator();
// SAFETY: Getting a zero-initialized value is always safe.
let mut rcl_init_options = rcl_get_zero_initialized_init_options();
// SAFETY: Passing in a zero-initialized value is expected.
// In the case where this returns not ok, there's nothing to clean up.
rcl_init_options_init(&mut rcl_init_options, allocator).ok()?;
// SAFETY: This function does not store the ephemeral init_options and c_args
// pointers. Passing in a zero-initialized rcl_context is expected.
let ret = rcl_init(
c_args.len() as i32,
if c_args.is_empty() {
std::ptr::null()
} else {
c_args.as_ptr()
},
&rcl_init_options,
&mut rcl_context,
)
.ok();
let mut rcl_init_options = options.into_rcl(allocator)?;
// SAFETY:
// * This function does not store the ephemeral init_options and c_args pointers.
// * Passing in a zero-initialized rcl_context is mandatory.
// * The entity lifecycle mutex is locked to protect against the risk of global variables
// in the rmw implementation being unsafely modified during initialization.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This would actually be in rcl, not an rmw implementation right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No the comment is correct as-is. To be more concrete about it: Most of the segfaults we were seeing are coming from FastDDS via rmw_fastrtps_cpp, which is an RMW implementation.

The segfaults happen because that particular RMW implementation makes unsafe use of global variables. That problem would exist whether or not RCL is thread-safe.

Note that "RMW implementation" =/= "the implementation of the rmw library", although the human language aspect of all this is admittedly very confusing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in the unsafe block below, we are calling rcl_init(), not an RMW implementation function. I'm not as familiar with the internals of rcl as I should be, but perhaps rcl_init() is then calling an RMW implementation function?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but perhaps rcl_init() is then calling an RMW implementation function?

Correct, client libraries like rclrs do not ever call any rmw functions directly. Instead rcl is the abstraction layer that all client libraries are supposed to interface with. Many (but not all) rcl functions will then call rmw functions as needed, and those rmw functions will hook the calls into whatever RMW implementation has been loaded.

As a rule of thumb, any rcl function that involves sending information out over a middleware (including to form connections and perform discovery) will call into the RMW implementation. But this is something that I would recommend rcl to officially document.

let ret = {
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
rcl_init(
c_args.len() as i32,
if c_args.is_empty() {
std::ptr::null()
} else {
c_args.as_ptr()
},
&rcl_init_options,
&mut rcl_context,
)
.ok()
};
// SAFETY: It's safe to pass in an initialized object.
// Early return will not leak memory, because this is the last fini function.
rcl_init_options_fini(&mut rcl_init_options).ok()?;
// Move the check after the last fini()
ret?;
}
Ok(Self {
rcl_context_mtx: Arc::new(Mutex::new(rcl_context)),
handle: Arc::new(ContextHandle {
rcl_context: Mutex::new(rcl_context),
}),
})
}

/// Returns the ROS domain ID that the context is using.
///
/// The domain ID controls which nodes can send messages to each other, see the [ROS 2 concept article][1].
/// It can be set through the `ROS_DOMAIN_ID` environment variable.
///
/// [1]: https://docs.ros.org/en/rolling/Concepts/About-Domain-ID.html
pub fn domain_id(&self) -> usize {
let mut domain_id: usize = 0;
let ret = unsafe {
rcl_context_get_domain_id(
&mut *self.handle.rcl_context.lock().unwrap(),
&mut domain_id,
)
};

debug_assert_eq!(ret, 0);
domain_id
}

/// Checks if the context is still valid.
///
/// This will return `false` when a signal has caused the context to shut down (currently
/// unimplemented).
pub fn ok(&self) -> bool {
// This will currently always return true, but once we have a signal handler, the signal
// handler could call `rcl_shutdown()`, hence making the context invalid.
let rcl_context = &mut *self.rcl_context_mtx.lock().unwrap();
let rcl_context = &mut *self.handle.rcl_context.lock().unwrap();
// SAFETY: No preconditions for this function.
unsafe { rcl_context_is_valid(rcl_context) }
}
}

/// Additional options for initializing the Context.
#[derive(Default, Clone)]
pub struct InitOptions {
/// The domain ID that should be used by the Context. Set to None to ask for
/// the default behavior, which is to set the domain ID according to the
/// [ROS_DOMAIN_ID][1] environment variable.
///
/// [1]: https://docs.ros.org/en/rolling/Concepts/Intermediate/About-Domain-ID.html#the-ros-domain-id
domain_id: Option<usize>,
}

impl InitOptions {
/// Create a new InitOptions with all default values.
pub fn new() -> InitOptions {
Self::default()
}

/// Transform an InitOptions into a new one with a certain domain_id
pub fn with_domain_id(mut self, domain_id: Option<usize>) -> InitOptions {
self.domain_id = domain_id;
self
}

/// Set the domain_id of an InitOptions, or reset it to the default behavior
/// (determined by environment variables) by providing None.
pub fn set_domain_id(&mut self, domain_id: Option<usize>) {
self.domain_id = domain_id;
}

/// Get the domain_id that will be provided by these InitOptions.
pub fn domain_id(&self) -> Option<usize> {
self.domain_id
}

fn into_rcl(self, allocator: rcutils_allocator_s) -> Result<rcl_init_options_t, RclrsError> {
unsafe {
// SAFETY: Getting a zero-initialized value is always safe.
let mut rcl_init_options = rcl_get_zero_initialized_init_options();
// SAFETY: Passing in a zero-initialized value is expected.
// In the case where this returns not ok, there's nothing to clean up.
rcl_init_options_init(&mut rcl_init_options, allocator).ok()?;

// We only need to set the domain_id if the user asked for something
// other than None. When the user asks for None, that is equivalent
// to the default value in rcl_init_options.
if let Some(domain_id) = self.domain_id {
rcl_init_options_set_domain_id(&mut rcl_init_options, domain_id);
}
Ok(rcl_init_options)
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 3 additions & 1 deletion rclrs/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ impl SingleThreadedExecutor {
for node in { self.nodes_mtx.lock().unwrap() }
.iter()
.filter_map(Weak::upgrade)
.filter(|node| unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) })
.filter(|node| unsafe {
rcl_context_is_valid(&*node.handle.context_handle.rcl_context.lock().unwrap())
})
{
let wait_set = WaitSet::new_for_node(&node)?;
let ready_entities = wait_set.wait(timeout)?;
Expand Down
Loading
Loading