Skip to content

Commit

Permalink
Small named pipe refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
kotauskas committed Mar 25, 2024
1 parent 31d7fda commit e8a0f71
Show file tree
Hide file tree
Showing 15 changed files with 337 additions and 170 deletions.
1 change: 1 addition & 0 deletions src/local_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub(crate) use concurrency_detector::*;
// TODO extension traits in crate::os for exposing some OS-specific functionality here
// TODO remove that whole ImplProperties thing in favor of a new trait-based system
// TODO ListenerOptions
// TODO clean up matters of nonblocking listeners

use std::io;

Expand Down
40 changes: 39 additions & 1 deletion src/os/windows/file_handle.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use super::{c_wrappers, downgrade_eof, winprelude::*};
use crate::{OrErrno, TryClone};
use std::{io, mem::MaybeUninit, ptr};
use windows_sys::Win32::Storage::FileSystem::{FlushFileBuffers, ReadFile, WriteFile};
use windows_sys::Win32::{
Foundation::MAX_PATH,
Storage::FileSystem::{FlushFileBuffers, GetFinalPathNameByHandleW, ReadFile, WriteFile},
};

/// Newtype wrapper which defines file I/O operations on a handle to a file.
#[repr(transparent)]
Expand Down Expand Up @@ -45,6 +48,41 @@ impl FileHandle {
pub fn flush_hndl(handle: HANDLE) -> io::Result<()> {
downgrade_eof(unsafe { FlushFileBuffers(handle) }.true_val_or_errno(()))
}

// The second arm is unreachable if cap > len.
#[allow(dead_code, clippy::arithmetic_side_effects)]
pub fn path(handle: BorrowedHandle<'_>) -> io::Result<Vec<u16>> {
let mut buf = Vec::with_capacity((MAX_PATH + 1) as usize);
match Self::_path(handle.as_int_handle(), &mut buf) {
(_, Ok(true)) => Ok(buf),
(len, Ok(false)) => {
buf.reserve_exact(len - buf.capacity());
match Self::_path(handle.as_int_handle(), &mut buf) {
(_, Ok(true)) => Ok(buf),
(_, Ok(false)) => unreachable!(),
(_, Err(e)) => Err(e),
}
}
(_, Err(e)) => Err(e),
}
}
#[allow(clippy::arithmetic_side_effects)] // Path lengths can never overflow usize.
fn _path(handle: HANDLE, buf: &mut Vec<u16>) -> (usize, io::Result<bool>) {
buf.clear();
let buflen = buf.capacity().try_into().unwrap_or(u32::MAX);
let rslt = unsafe { GetFinalPathNameByHandleW(handle, buf.as_mut_ptr(), buflen, 0) };
let len = rslt as usize;
let e = if rslt >= buflen {
Ok(false)
} else if rslt == 0 {
Err(io::Error::last_os_error())
} else {
// +1 to include the nul terminator in the size.
unsafe { buf.set_len(rslt as usize + 1) }
Ok(true)
};
(len, e)
}
}
impl TryClone for FileHandle {
fn try_clone(&self) -> io::Result<Self> {
Expand Down
1 change: 1 addition & 0 deletions src/os/windows/named_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub mod local_socket {
}

mod atomic_enum;
mod c_wrappers;
mod limbo_pool;
mod maybe_arc;
mod needs_flush;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@ use windows_sys::Win32::{
},
System::Pipes::{
GetNamedPipeHandleStateW, GetNamedPipeInfo, PeekNamedPipe, SetNamedPipeHandleState,
WaitNamedPipeW, PIPE_NOWAIT, PIPE_SERVER_END, PIPE_TYPE_MESSAGE,
WaitNamedPipeW, PIPE_NOWAIT,
},
};

fn optional_out_ptr<T>(outref: Option<&mut T>) -> *mut T {
match outref {
Some(outref) => outref as *mut T,
None => ptr::null_mut(),
}
}

/// Helper for several functions that take a handle and a u32 out-pointer.
pub(crate) unsafe fn hget(
handle: BorrowedHandle<'_>,
Expand All @@ -28,25 +35,95 @@ pub(crate) unsafe fn hget(
unsafe { f(handle.as_int_handle(), &mut x as *mut _) }.true_val_or_errno(x)
}

pub(crate) fn get_flags(handle: BorrowedHandle<'_>) -> io::Result<u32> {
let mut flags: u32 = 0;
pub(crate) fn get_np_info(
handle: BorrowedHandle<'_>,
flags: Option<&mut u32>,
in_buf: Option<&mut u32>,
out_buf: Option<&mut u32>,
max_instances: Option<&mut u32>,
) -> io::Result<()> {
unsafe {
GetNamedPipeInfo(
handle.as_int_handle(),
&mut flags as *mut _,
ptr::null_mut(),
ptr::null_mut(),
ptr::null_mut(),
optional_out_ptr(flags),
optional_out_ptr(in_buf),
optional_out_ptr(out_buf),
optional_out_ptr(max_instances),
)
}
.true_val_or_errno(flags)
.true_val_or_errno(())
}
pub(crate) fn is_server_from_sys(handle: BorrowedHandle<'_>) -> io::Result<bool> {
Ok(get_flags(handle)? & PIPE_SERVER_END != 0)

pub(crate) fn get_np_handle_state(
handle: BorrowedHandle<'_>,
mode: Option<&mut u32>,
cur_instances: Option<&mut u32>,
max_collection_count: Option<&mut u32>,
collect_data_timeout: Option<&mut u32>,
mut username: Option<&mut [MaybeUninit<u16>]>,
) -> io::Result<()> {
// TODO expose the rest of the owl as public API
unsafe {
GetNamedPipeHandleStateW(
handle.as_int_handle(),
optional_out_ptr(mode),
optional_out_ptr(cur_instances),
optional_out_ptr(max_collection_count),
optional_out_ptr(collect_data_timeout),
username
.as_deref_mut()
.map(|s| s.as_mut_ptr().cast())
.unwrap_or(ptr::null_mut()),
username
.map(|s| u32::try_from(s.len()).unwrap_or(u32::MAX))
.unwrap_or(0),
)
}
.true_val_or_errno(())
}
pub(crate) fn has_msg_boundaries_from_sys(handle: BorrowedHandle<'_>) -> io::Result<bool> {
Ok((get_flags(handle)? & PIPE_TYPE_MESSAGE) != 0)

pub(crate) fn set_np_handle_state(
handle: BorrowedHandle<'_>,
mode: Option<u32>,
max_collection_count: Option<u32>,
collect_data_timeout: Option<u32>,
) -> io::Result<()> {
let (mut mode_, has_mode) = (mode.unwrap_or_default(), mode.is_some());
let (mut mcc, has_mcc) = (
max_collection_count.unwrap_or_default(),
max_collection_count.is_some(),
);
let (mut cdt, has_cdt) = (
collect_data_timeout.unwrap_or_default(),
collect_data_timeout.is_some(),
);
let toptr = |r: &mut u32| r as *mut u32;
let null = ptr::null_mut();
unsafe {
SetNamedPipeHandleState(
handle.as_int_handle(),
if has_mode { toptr(&mut mode_) } else { null },
if has_mcc { toptr(&mut mcc) } else { null },
if has_cdt { toptr(&mut cdt) } else { null },
)
}
.true_val_or_errno(())
}

#[inline]
pub(crate) fn get_flags(handle: BorrowedHandle<'_>) -> io::Result<u32> {
let mut flags: u32 = 0;
get_np_info(handle, Some(&mut flags), None, None, None)?;
Ok(flags)
}

#[allow(dead_code)]
pub(crate) fn get_np_handle_mode(handle: BorrowedHandle<'_>) -> io::Result<u32> {
let mut mode = 0_u32;
get_np_handle_state(handle, Some(&mut mode), None, None, None, None)?;
Ok(mode)
}

#[allow(dead_code)] // TODO give this thing a public API
pub(crate) fn peek_msg_len(handle: BorrowedHandle<'_>) -> io::Result<usize> {
let mut msglen: u32 = 0;
Expand Down Expand Up @@ -108,64 +185,6 @@ pub(crate) fn connect_without_waiting(
}
}

#[allow(dead_code)]
pub(crate) fn get_named_pipe_handle_state(
handle: BorrowedHandle<'_>,
mode: Option<&mut u32>,
cur_instances: Option<&mut u32>,
max_collection_count: Option<&mut u32>,
collect_data_timeout: Option<&mut u32>,
mut username: Option<&mut [MaybeUninit<u16>]>,
) -> io::Result<()> {
// TODO expose the rest of the owl as public API
let toptr = |r: &mut u32| r as *mut u32;
let null = ptr::null_mut();
unsafe {
GetNamedPipeHandleStateW(
handle.as_int_handle(),
mode.map(toptr).unwrap_or(null),
cur_instances.map(toptr).unwrap_or(null),
max_collection_count.map(toptr).unwrap_or(null),
collect_data_timeout.map(toptr).unwrap_or(null),
username
.as_deref_mut()
.map(|s| s.as_mut_ptr().cast())
.unwrap_or(ptr::null_mut()),
username
.map(|s| u32::try_from(s.len()).unwrap_or(u32::MAX))
.unwrap_or(0),
)
}
.true_val_or_errno(())
}
pub(crate) fn set_named_pipe_handle_state(
handle: BorrowedHandle<'_>,
mode: Option<u32>,
max_collection_count: Option<u32>,
collect_data_timeout: Option<u32>,
) -> io::Result<()> {
let (mut mode_, has_mode) = (mode.unwrap_or_default(), mode.is_some());
let (mut mcc, has_mcc) = (
max_collection_count.unwrap_or_default(),
max_collection_count.is_some(),
);
let (mut cdt, has_cdt) = (
collect_data_timeout.unwrap_or_default(),
collect_data_timeout.is_some(),
);
let toptr = |r: &mut u32| r as *mut u32;
let null = ptr::null_mut();
unsafe {
SetNamedPipeHandleState(
handle.as_int_handle(),
if has_mode { toptr(&mut mode_) } else { null },
if has_mcc { toptr(&mut mcc) } else { null },
if has_cdt { toptr(&mut cdt) } else { null },
)
}
.true_val_or_errno(())
}

pub(crate) fn set_nonblocking_given_readmode(
handle: BorrowedHandle<'_>,
nonblocking: bool,
Expand All @@ -176,7 +195,7 @@ pub(crate) fn set_nonblocking_given_readmode(
if nonblocking {
mode |= PIPE_NOWAIT;
}
set_named_pipe_handle_state(handle, Some(mode), None, None)
set_np_handle_state(handle, Some(mode), None, None)
}

pub(crate) fn block_for_server(path: &[u16], timeout: WaitTimeout) -> io::Result<()> {
Expand Down
20 changes: 17 additions & 3 deletions src/os/windows/named_pipe/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod options;

pub use {incoming::*, options::*};

use super::{PipeModeTag, PipeStream, PipeStreamRole, RawPipeStream};
use super::{c_wrappers, PipeModeTag, PipeStream, PipeStreamRole, RawPipeStream};
use crate::{
os::windows::{winprelude::*, FileHandle},
poison_error, LOCK_POISON,
Expand All @@ -22,7 +22,7 @@ use std::{
};
use windows_sys::Win32::{Foundation::ERROR_PIPE_CONNECTED, System::Pipes::ConnectNamedPipe};

// TODO add conversion from handle after all
// TODO finish create_instance and add conversion from handles after all

/// The server for a named pipe, listening for connections to clients and producing pipe streams.
///
Expand Down Expand Up @@ -82,7 +82,7 @@ impl<Rm: PipeModeTag, Sm: PipeModeTag> PipeListener<Rm, Sm> {
// Doesn't actually even need to be atomic to begin with, but it's simpler and more
// convenient to do this instead. The mutex takes care of ordering.
self.nonblocking.store(nonblocking, Relaxed);
super::set_nonblocking_given_readmode(instance.as_handle(), nonblocking, Rm::MODE)?;
c_wrappers::set_nonblocking_given_readmode(instance.as_handle(), nonblocking, Rm::MODE)?;
// Make it clear that the lock survives until this moment.
drop(instance);
Ok(())
Expand Down Expand Up @@ -122,6 +122,20 @@ impl<Rm: PipeModeTag, Sm: PipeModeTag> Debug for PipeListener<Rm, Sm> {
}
}

/// The returned handle is owned by the listener until the next call to
/// `.accept()`/`<Incoming as Iterator>::next()`, after which it is owned by the returned stream
/// instead.
///
/// This momentarily locks an internal mutex.
impl<Rm: PipeModeTag, Sm: PipeModeTag> AsRawHandle for PipeListener<Rm, Sm> {
fn as_raw_handle(&self) -> RawHandle {
self.stored_instance
.lock()
.expect(LOCK_POISON)
.as_raw_handle()
}
}

impl<Rm: PipeModeTag, Sm: PipeModeTag> From<PipeListener<Rm, Sm>> for OwnedHandle {
fn from(p: PipeListener<Rm, Sm>) -> Self {
p.stored_instance.into_inner().expect(LOCK_POISON).into()
Expand Down
Loading

0 comments on commit e8a0f71

Please sign in to comment.