Skip to content

Commit

Permalink
fix(torture): mark comms stream non-blocking
Browse files Browse the repository at this point in the history
tokio's unixstream is expected to be non-blocking. To quote the
[doc]:

    The caller is responsible for ensuring that the stream is in
    non-blocking mode. Otherwise all I/O operations on the stream
    will block the thread, which will cause unexpected behavior.

    Non-blocking mode can be set using set_nonblocking.

And oh boy we were observing unexpected behavior. One is that
because of blocking of the thread polling some tokio worker thread we
had problems with receiving from a oneshot channel which required some
nasty workaround (fix is going to follow later on). Another problem this
caused is a deadlock of both the supervisor and the agent when the
former was waiting on the LLDB taking the backtrace of the agent. LLDB
for unresearched reason was hanging itself.

This changeset should finally fix this issue.

[doc]: https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html#method.from_std
  • Loading branch information
pepyakin committed Feb 21, 2025
1 parent 11c2ea9 commit dd6d283
Showing 1 changed file with 43 additions and 21 deletions.
64 changes: 43 additions & 21 deletions torture/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,39 +28,56 @@ use tracing::trace;
/// We pick a high number to avoid conflicts with other file descriptors.
const CANARY_SOCKET_FD: RawFd = 1000;

/// Check whether the given file descriptor is valid.
fn is_valid_fd(fd: RawFd) -> bool {
unsafe { libc::fcntl(fd, libc::F_GETFD) != -1 }
}

/// Check whether the file descriptor is set to non-blocking mode.
fn is_nonblocking(fd: RawFd) -> bool {
unsafe { libc::fcntl(fd, libc::F_GETFL) & libc::O_NONBLOCK == libc::O_NONBLOCK }
}

/// Check if the file descriptor corresponds to a Unix domain socket.
/// In our case, we're verifying that the socket type is SOCK_STREAM.
fn is_unix_socket(fd: RawFd) -> bool {
let mut sock_type: libc::c_int = 0;
let mut type_len = std::mem::size_of::<libc::c_int>() as libc::socklen_t;
unsafe {
libc::getsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_TYPE,
&mut sock_type as *mut _ as *mut _,
&mut type_len,
) == 0
&& sock_type == libc::SOCK_STREAM
}
}

/// Checks for evidence that this process is a child of a parent process that spawned it.
///
/// Returns a UnixStream if the process is a child, otherwise returns None.
///
/// Panics if called more than once.
pub fn am_spawned() -> Option<UnixStream> {
static CALLED: AtomicBool = AtomicBool::new(false);

// Only take ownership of the fd if we haven't already
if CALLED.swap(true, Ordering::SeqCst) {
return None;
// This function should not be called more than once to protect against multiple ownership
// of the file descriptor.
panic!();
}

let is_valid_fd = unsafe { libc::fcntl(CANARY_SOCKET_FD, libc::F_GETFD) != -1 };
if !is_valid_fd {
if !is_valid_fd(CANARY_SOCKET_FD) {
return None;
}

// Check if it's actually a Unix domain socket
let mut type_: libc::c_int = 0;
let mut type_len = std::mem::size_of::<libc::c_int>() as libc::socklen_t;

let is_unix_socket = unsafe {
libc::getsockopt(
CANARY_SOCKET_FD,
libc::SOL_SOCKET,
libc::SO_TYPE,
&mut type_ as *mut _ as *mut _,
&mut type_len,
) == 0
&& type_ == libc::SOCK_STREAM
};
if !is_unix_socket(CANARY_SOCKET_FD) {
panic!("not unix socket");
}

if !is_unix_socket {
return None;
if !is_nonblocking(CANARY_SOCKET_FD) {
panic!("non blocking");
}

let stream = unsafe {
Expand All @@ -78,6 +95,11 @@ pub fn am_spawned() -> Option<UnixStream> {
pub fn spawn_child() -> Result<(Child, UnixStream)> {
let (sock1, sock2) = UnixStream::pair()?;

// Those sockets are going to be used in tokio and as such they should be both set to
// non-blocking mode.
sock1.set_nonblocking(true)?;
sock2.set_nonblocking(true)?;

let child = spawn_child_with_sock(sock2.as_raw_fd())?;
drop(sock2); // Close parent's end in child

Expand Down

0 comments on commit dd6d283

Please sign in to comment.