From e8a0f717fb8216524f9903cc41a0250d5ea599e5 Mon Sep 17 00:00:00 2001 From: Kotauskas Date: Mon, 25 Mar 2024 13:52:04 +0300 Subject: [PATCH] Small named pipe refactor --- src/local_socket.rs | 1 + src/os/windows/file_handle.rs | 40 ++++- src/os/windows/named_pipe.rs | 1 + .../{stream/wrapper_fns.rs => c_wrappers.rs} | 161 ++++++++++-------- src/os/windows/named_pipe/listener.rs | 20 ++- .../named_pipe/listener/collect_options.rs | 97 +++++++++++ src/os/windows/named_pipe/stream.rs | 2 - src/os/windows/named_pipe/stream/error.rs | 6 - src/os/windows/named_pipe/stream/impl.rs | 10 +- src/os/windows/named_pipe/stream/impl/ctor.rs | 6 +- .../windows/named_pipe/stream/impl/handle.rs | 71 ++++---- .../windows/named_pipe/tokio/stream/error.rs | 6 - .../windows/named_pipe/tokio/stream/impl.rs | 5 +- .../named_pipe/tokio/stream/impl/ctor.rs | 6 +- .../named_pipe/tokio/stream/impl/handle.rs | 75 ++++---- 15 files changed, 337 insertions(+), 170 deletions(-) rename src/os/windows/named_pipe/{stream/wrapper_fns.rs => c_wrappers.rs} (80%) create mode 100644 src/os/windows/named_pipe/listener/collect_options.rs diff --git a/src/local_socket.rs b/src/local_socket.rs index 951ab24..8e664de 100644 --- a/src/local_socket.rs +++ b/src/local_socket.rs @@ -132,6 +132,7 @@ pub(crate) use concurrency_detector::*; // TODO extension traits in crate::os for exposing some OS-specific functionality here // TODO remove that whole ImplProperties thing in favor of a new trait-based system // TODO ListenerOptions +// TODO clean up matters of nonblocking listeners use std::io; diff --git a/src/os/windows/file_handle.rs b/src/os/windows/file_handle.rs index 512b459..d6e1728 100644 --- a/src/os/windows/file_handle.rs +++ b/src/os/windows/file_handle.rs @@ -1,7 +1,10 @@ use super::{c_wrappers, downgrade_eof, winprelude::*}; use crate::{OrErrno, TryClone}; use std::{io, mem::MaybeUninit, ptr}; -use windows_sys::Win32::Storage::FileSystem::{FlushFileBuffers, ReadFile, WriteFile}; +use windows_sys::Win32::{ + Foundation::MAX_PATH, + Storage::FileSystem::{FlushFileBuffers, GetFinalPathNameByHandleW, ReadFile, WriteFile}, +}; /// Newtype wrapper which defines file I/O operations on a handle to a file. #[repr(transparent)] @@ -45,6 +48,41 @@ impl FileHandle { pub fn flush_hndl(handle: HANDLE) -> io::Result<()> { downgrade_eof(unsafe { FlushFileBuffers(handle) }.true_val_or_errno(())) } + + // The second arm is unreachable if cap > len. + #[allow(dead_code, clippy::arithmetic_side_effects)] + pub fn path(handle: BorrowedHandle<'_>) -> io::Result> { + let mut buf = Vec::with_capacity((MAX_PATH + 1) as usize); + match Self::_path(handle.as_int_handle(), &mut buf) { + (_, Ok(true)) => Ok(buf), + (len, Ok(false)) => { + buf.reserve_exact(len - buf.capacity()); + match Self::_path(handle.as_int_handle(), &mut buf) { + (_, Ok(true)) => Ok(buf), + (_, Ok(false)) => unreachable!(), + (_, Err(e)) => Err(e), + } + } + (_, Err(e)) => Err(e), + } + } + #[allow(clippy::arithmetic_side_effects)] // Path lengths can never overflow usize. + fn _path(handle: HANDLE, buf: &mut Vec) -> (usize, io::Result) { + buf.clear(); + let buflen = buf.capacity().try_into().unwrap_or(u32::MAX); + let rslt = unsafe { GetFinalPathNameByHandleW(handle, buf.as_mut_ptr(), buflen, 0) }; + let len = rslt as usize; + let e = if rslt >= buflen { + Ok(false) + } else if rslt == 0 { + Err(io::Error::last_os_error()) + } else { + // +1 to include the nul terminator in the size. + unsafe { buf.set_len(rslt as usize + 1) } + Ok(true) + }; + (len, e) + } } impl TryClone for FileHandle { fn try_clone(&self) -> io::Result { diff --git a/src/os/windows/named_pipe.rs b/src/os/windows/named_pipe.rs index 5e95e15..b4f0933 100644 --- a/src/os/windows/named_pipe.rs +++ b/src/os/windows/named_pipe.rs @@ -64,6 +64,7 @@ pub mod local_socket { } mod atomic_enum; +mod c_wrappers; mod limbo_pool; mod maybe_arc; mod needs_flush; diff --git a/src/os/windows/named_pipe/stream/wrapper_fns.rs b/src/os/windows/named_pipe/c_wrappers.rs similarity index 80% rename from src/os/windows/named_pipe/stream/wrapper_fns.rs rename to src/os/windows/named_pipe/c_wrappers.rs index 9b69cd8..14793b5 100644 --- a/src/os/windows/named_pipe/stream/wrapper_fns.rs +++ b/src/os/windows/named_pipe/c_wrappers.rs @@ -15,10 +15,17 @@ use windows_sys::Win32::{ }, System::Pipes::{ GetNamedPipeHandleStateW, GetNamedPipeInfo, PeekNamedPipe, SetNamedPipeHandleState, - WaitNamedPipeW, PIPE_NOWAIT, PIPE_SERVER_END, PIPE_TYPE_MESSAGE, + WaitNamedPipeW, PIPE_NOWAIT, }, }; +fn optional_out_ptr(outref: Option<&mut T>) -> *mut T { + match outref { + Some(outref) => outref as *mut T, + None => ptr::null_mut(), + } +} + /// Helper for several functions that take a handle and a u32 out-pointer. pub(crate) unsafe fn hget( handle: BorrowedHandle<'_>, @@ -28,25 +35,95 @@ pub(crate) unsafe fn hget( unsafe { f(handle.as_int_handle(), &mut x as *mut _) }.true_val_or_errno(x) } -pub(crate) fn get_flags(handle: BorrowedHandle<'_>) -> io::Result { - let mut flags: u32 = 0; +pub(crate) fn get_np_info( + handle: BorrowedHandle<'_>, + flags: Option<&mut u32>, + in_buf: Option<&mut u32>, + out_buf: Option<&mut u32>, + max_instances: Option<&mut u32>, +) -> io::Result<()> { unsafe { GetNamedPipeInfo( handle.as_int_handle(), - &mut flags as *mut _, - ptr::null_mut(), - ptr::null_mut(), - ptr::null_mut(), + optional_out_ptr(flags), + optional_out_ptr(in_buf), + optional_out_ptr(out_buf), + optional_out_ptr(max_instances), ) } - .true_val_or_errno(flags) + .true_val_or_errno(()) } -pub(crate) fn is_server_from_sys(handle: BorrowedHandle<'_>) -> io::Result { - Ok(get_flags(handle)? & PIPE_SERVER_END != 0) + +pub(crate) fn get_np_handle_state( + handle: BorrowedHandle<'_>, + mode: Option<&mut u32>, + cur_instances: Option<&mut u32>, + max_collection_count: Option<&mut u32>, + collect_data_timeout: Option<&mut u32>, + mut username: Option<&mut [MaybeUninit]>, +) -> io::Result<()> { + // TODO expose the rest of the owl as public API + unsafe { + GetNamedPipeHandleStateW( + handle.as_int_handle(), + optional_out_ptr(mode), + optional_out_ptr(cur_instances), + optional_out_ptr(max_collection_count), + optional_out_ptr(collect_data_timeout), + username + .as_deref_mut() + .map(|s| s.as_mut_ptr().cast()) + .unwrap_or(ptr::null_mut()), + username + .map(|s| u32::try_from(s.len()).unwrap_or(u32::MAX)) + .unwrap_or(0), + ) + } + .true_val_or_errno(()) } -pub(crate) fn has_msg_boundaries_from_sys(handle: BorrowedHandle<'_>) -> io::Result { - Ok((get_flags(handle)? & PIPE_TYPE_MESSAGE) != 0) + +pub(crate) fn set_np_handle_state( + handle: BorrowedHandle<'_>, + mode: Option, + max_collection_count: Option, + collect_data_timeout: Option, +) -> io::Result<()> { + let (mut mode_, has_mode) = (mode.unwrap_or_default(), mode.is_some()); + let (mut mcc, has_mcc) = ( + max_collection_count.unwrap_or_default(), + max_collection_count.is_some(), + ); + let (mut cdt, has_cdt) = ( + collect_data_timeout.unwrap_or_default(), + collect_data_timeout.is_some(), + ); + let toptr = |r: &mut u32| r as *mut u32; + let null = ptr::null_mut(); + unsafe { + SetNamedPipeHandleState( + handle.as_int_handle(), + if has_mode { toptr(&mut mode_) } else { null }, + if has_mcc { toptr(&mut mcc) } else { null }, + if has_cdt { toptr(&mut cdt) } else { null }, + ) + } + .true_val_or_errno(()) } + +#[inline] +pub(crate) fn get_flags(handle: BorrowedHandle<'_>) -> io::Result { + let mut flags: u32 = 0; + get_np_info(handle, Some(&mut flags), None, None, None)?; + Ok(flags) +} + +#[allow(dead_code)] +pub(crate) fn get_np_handle_mode(handle: BorrowedHandle<'_>) -> io::Result { + let mut mode = 0_u32; + get_np_handle_state(handle, Some(&mut mode), None, None, None, None)?; + Ok(mode) +} + #[allow(dead_code)] // TODO give this thing a public API pub(crate) fn peek_msg_len(handle: BorrowedHandle<'_>) -> io::Result { let mut msglen: u32 = 0; @@ -108,64 +185,6 @@ pub(crate) fn connect_without_waiting( } } -#[allow(dead_code)] -pub(crate) fn get_named_pipe_handle_state( - handle: BorrowedHandle<'_>, - mode: Option<&mut u32>, - cur_instances: Option<&mut u32>, - max_collection_count: Option<&mut u32>, - collect_data_timeout: Option<&mut u32>, - mut username: Option<&mut [MaybeUninit]>, -) -> io::Result<()> { - // TODO expose the rest of the owl as public API - let toptr = |r: &mut u32| r as *mut u32; - let null = ptr::null_mut(); - unsafe { - GetNamedPipeHandleStateW( - handle.as_int_handle(), - mode.map(toptr).unwrap_or(null), - cur_instances.map(toptr).unwrap_or(null), - max_collection_count.map(toptr).unwrap_or(null), - collect_data_timeout.map(toptr).unwrap_or(null), - username - .as_deref_mut() - .map(|s| s.as_mut_ptr().cast()) - .unwrap_or(ptr::null_mut()), - username - .map(|s| u32::try_from(s.len()).unwrap_or(u32::MAX)) - .unwrap_or(0), - ) - } - .true_val_or_errno(()) -} -pub(crate) fn set_named_pipe_handle_state( - handle: BorrowedHandle<'_>, - mode: Option, - max_collection_count: Option, - collect_data_timeout: Option, -) -> io::Result<()> { - let (mut mode_, has_mode) = (mode.unwrap_or_default(), mode.is_some()); - let (mut mcc, has_mcc) = ( - max_collection_count.unwrap_or_default(), - max_collection_count.is_some(), - ); - let (mut cdt, has_cdt) = ( - collect_data_timeout.unwrap_or_default(), - collect_data_timeout.is_some(), - ); - let toptr = |r: &mut u32| r as *mut u32; - let null = ptr::null_mut(); - unsafe { - SetNamedPipeHandleState( - handle.as_int_handle(), - if has_mode { toptr(&mut mode_) } else { null }, - if has_mcc { toptr(&mut mcc) } else { null }, - if has_cdt { toptr(&mut cdt) } else { null }, - ) - } - .true_val_or_errno(()) -} - pub(crate) fn set_nonblocking_given_readmode( handle: BorrowedHandle<'_>, nonblocking: bool, @@ -176,7 +195,7 @@ pub(crate) fn set_nonblocking_given_readmode( if nonblocking { mode |= PIPE_NOWAIT; } - set_named_pipe_handle_state(handle, Some(mode), None, None) + set_np_handle_state(handle, Some(mode), None, None) } pub(crate) fn block_for_server(path: &[u16], timeout: WaitTimeout) -> io::Result<()> { diff --git a/src/os/windows/named_pipe/listener.rs b/src/os/windows/named_pipe/listener.rs index a8c401a..37ce942 100644 --- a/src/os/windows/named_pipe/listener.rs +++ b/src/os/windows/named_pipe/listener.rs @@ -4,7 +4,7 @@ mod options; pub use {incoming::*, options::*}; -use super::{PipeModeTag, PipeStream, PipeStreamRole, RawPipeStream}; +use super::{c_wrappers, PipeModeTag, PipeStream, PipeStreamRole, RawPipeStream}; use crate::{ os::windows::{winprelude::*, FileHandle}, poison_error, LOCK_POISON, @@ -22,7 +22,7 @@ use std::{ }; use windows_sys::Win32::{Foundation::ERROR_PIPE_CONNECTED, System::Pipes::ConnectNamedPipe}; -// TODO add conversion from handle after all +// TODO finish create_instance and add conversion from handles after all /// The server for a named pipe, listening for connections to clients and producing pipe streams. /// @@ -82,7 +82,7 @@ impl PipeListener { // Doesn't actually even need to be atomic to begin with, but it's simpler and more // convenient to do this instead. The mutex takes care of ordering. self.nonblocking.store(nonblocking, Relaxed); - super::set_nonblocking_given_readmode(instance.as_handle(), nonblocking, Rm::MODE)?; + c_wrappers::set_nonblocking_given_readmode(instance.as_handle(), nonblocking, Rm::MODE)?; // Make it clear that the lock survives until this moment. drop(instance); Ok(()) @@ -122,6 +122,20 @@ impl Debug for PipeListener { } } +/// The returned handle is owned by the listener until the next call to +/// `.accept()`/`::next()`, after which it is owned by the returned stream +/// instead. +/// +/// This momentarily locks an internal mutex. +impl AsRawHandle for PipeListener { + fn as_raw_handle(&self) -> RawHandle { + self.stored_instance + .lock() + .expect(LOCK_POISON) + .as_raw_handle() + } +} + impl From> for OwnedHandle { fn from(p: PipeListener) -> Self { p.stored_instance.into_inner().expect(LOCK_POISON).into() diff --git a/src/os/windows/named_pipe/listener/collect_options.rs b/src/os/windows/named_pipe/listener/collect_options.rs new file mode 100644 index 0000000..5957e09 --- /dev/null +++ b/src/os/windows/named_pipe/listener/collect_options.rs @@ -0,0 +1,97 @@ +use super::*; +use crate::{ + os::windows::{ + named_pipe::{pipe_mode, PipeMode, WaitTimeout}, + path_conversion::*, + SecurityDescriptor, + }, + TryClone, +}; +use std::{borrow::Cow, num::NonZeroU8, os::windows::prelude::*}; +use widestring::{u16cstr, U16CStr, U16CString}; +use windows_sys::Win32::System::Pipes::{PIPE_NOWAIT, PIPE_TYPE_MESSAGE}; + +impl PipeListenerOptions<'_> { + // TODO detailed error information like with streams + #[allow(clippy::unwrap_used, clippy::unwrap_in_result)] + pub fn collect_from_handle(handle: BorrowedHandle<'_>) -> io::Result { + let mut rslt = Self::default(); + + let [mut flags, mut max_instances] = [0_u32; 2]; + c_wrappers::get_np_info( + handle, + Some(&mut flags), + Some(&mut rslt.input_buffer_size_hint), + Some(&mut rslt.output_buffer_size_hint), + Some(&mut max_instances), + )?; + rslt.mode = PipeMode::try_from(flags & PIPE_TYPE_MESSAGE).unwrap(); + if max_instances == 255 { + // 255 is sentinel for unlimited instances. We re-sentinel via NonZeroU8. + max_instances = 0; + } + rslt.instance_limit = NonZeroU8::new(u8::try_from(max_instances).unwrap_or(0)); + + // TODO error out if PIPE_SERVER_END in flags, check for REJECT_REMOTE_CLIENTS (its presence + // in the flags is not documented) + + let mode = c_wrappers::get_np_handle_mode(handle)?; + rslt.nonblocking = mode & PIPE_NOWAIT != 0; + + let path = FileHandle::path(handle)?; + rslt.path = Cow::Owned(unsafe { + // SAFETY: Windows will never write interior nuls there. + U16CString::from_vec_unchecked(path) + }); + // TODO security descriptor, inheritable + + Ok(rslt) + } +} + +#[cfg(test)] +mod tests { + use super::*; + // TODO + fn check_collect(original: PipeListenerOptions<'_>) { + let listener = original.create::(); + let collected = PipeListenerOptions::collect_from_handle(todo!("as_handle")) + .expect("failed to collect options"); + + assert_eq!(collected.path, original.path); + assert_eq!(collected.mode, original.mode); + assert_eq!(collected.nonblocking, original.nonblocking); + assert_eq!(collected.instance_limit, original.instance_limit); + assert_eq!(collected.accept_remote, original.accept_remote); + assert_eq!( + collected.input_buffer_size_hint, + original.input_buffer_size_hint + ); + assert_eq!( + collected.output_buffer_size_hint, + original.output_buffer_size_hint + ); + // FIXME can't PartialEq security descriptors + assert!(collected.security_descriptor.is_some()); + assert_eq!(collected.inheritable, original.inheritable); + } + + #[test] + fn collect_duplex_byte() { + let opts = PipeListenerOptions { + path: todo!(), + mode: PipeMode::Bytes, + nonblocking: true, + instance_limit: NonZeroU8::new(250), + write_through: true, + accept_remote: false, + input_buffer_size_hint: 420, + output_buffer_size_hint: 228, + wait_timeout: WaitTimeout::from_raw(1987), + security_descriptor: todo!(), + inheritable: true, + ..Default::default() + }; + check_collect::(opts); + } +} diff --git a/src/os/windows/named_pipe/stream.rs b/src/os/windows/named_pipe/stream.rs index 92dee61..da5a57e 100644 --- a/src/os/windows/named_pipe/stream.rs +++ b/src/os/windows/named_pipe/stream.rs @@ -4,9 +4,7 @@ pub use {enums::*, error::*}; mod r#impl; mod limbo; -mod wrapper_fns; pub(super) use r#impl::*; -pub(crate) use wrapper_fns::*; use super::{MaybeArc, NeedsFlush}; use crate::{ diff --git a/src/os/windows/named_pipe/stream/error.rs b/src/os/windows/named_pipe/stream/error.rs index bd71a5c..fd03da6 100644 --- a/src/os/windows/named_pipe/stream/error.rs +++ b/src/os/windows/named_pipe/stream/error.rs @@ -14,9 +14,6 @@ pub enum FromHandleErrorKind { /// It wasn't possible to determine whether the pipe handle corresponds to a pipe server or a /// pipe client. IsServerCheckFailed, - /// The type being converted into has message semantics, but it wasn't possible to determine - /// whether message boundaries are preserved in the pipe. - MessageBoundariesCheckFailed, /// The type being converted into has message semantics, but message boundaries are not /// preserved in the pipe. NoMessageBoundaries, @@ -26,9 +23,6 @@ impl FromHandleErrorKind { use FromHandleErrorKind::*; match self { IsServerCheckFailed => "failed to determine if the pipe is server-side or not", - MessageBoundariesCheckFailed => { - "failed to make sure that the pipe preserves message boundaries" - } NoMessageBoundaries => "the pipe does not preserve message boundaries", } } diff --git a/src/os/windows/named_pipe/stream/impl.rs b/src/os/windows/named_pipe/stream/impl.rs index d8e685e..b051465 100644 --- a/src/os/windows/named_pipe/stream/impl.rs +++ b/src/os/windows/named_pipe/stream/impl.rs @@ -12,8 +12,12 @@ pub(crate) use send_off::{LIMBO_ERR, REBURY_ERR}; use super::*; use crate::os::windows::{ - c_wrappers, decode_eof, - named_pipe::{needs_flush::NeedsFlushVal, PipeMode}, + decode_eof, + named_pipe::{ + c_wrappers::{self as c_wrappers, hget}, + needs_flush::NeedsFlushVal, + PipeMode, + }, FileHandle, }; use std::{ @@ -108,6 +112,6 @@ impl PipeStream { /// [`.set_nonblocking()`]: super::super::PipeListener::set_nonblocking #[inline] pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { - super::set_nonblocking_given_readmode(self.as_handle(), nonblocking, Rm::MODE) + c_wrappers::set_nonblocking_given_readmode(self.as_handle(), nonblocking, Rm::MODE) } } diff --git a/src/os/windows/named_pipe/stream/impl/ctor.rs b/src/os/windows/named_pipe/stream/impl/ctor.rs index c397b18..054e9b5 100644 --- a/src/os/windows/named_pipe/stream/impl/ctor.rs +++ b/src/os/windows/named_pipe/stream/impl/ctor.rs @@ -34,9 +34,9 @@ impl RawPipeStream { fn _connect(path: &[u16], recv: Option, send: Option) -> io::Result { let handle = loop { - match connect_without_waiting(path, recv, send, false) { + match c_wrappers::connect_without_waiting(path, recv, send, false) { Err(e) if e.kind() == io::ErrorKind::WouldBlock => { - block_for_server(path, WaitTimeout::DEFAULT)?; + c_wrappers::block_for_server(path, WaitTimeout::DEFAULT)?; continue; } els => break els, @@ -44,7 +44,7 @@ impl RawPipeStream { }?; if recv == Some(PipeMode::Messages) { - set_named_pipe_handle_state( + c_wrappers::set_np_handle_state( handle.as_handle(), Some(PIPE_READMODE_MESSAGE), None, diff --git a/src/os/windows/named_pipe/stream/impl/handle.rs b/src/os/windows/named_pipe/stream/impl/handle.rs index f8adb69..ac342ee 100644 --- a/src/os/windows/named_pipe/stream/impl/handle.rs +++ b/src/os/windows/named_pipe/stream/impl/handle.rs @@ -1,5 +1,7 @@ +use windows_sys::Win32::System::Pipes::{PIPE_SERVER_END, PIPE_TYPE_MESSAGE}; + use super::*; -use crate::TryClone; +use crate::{os::windows::c_wrappers::duplicate_handle, TryClone}; use std::mem::ManuallyDrop; impl AsHandle for RawPipeStream { @@ -8,21 +10,31 @@ impl AsHandle for RawPipeStream { self.file_handle().as_handle() } } +derive_asraw!(RawPipeStream); + +impl RawPipeStream { + fn from_handle_given_flags(handle: OwnedHandle, flags: u32) -> Self { + Self::new(FileHandle::from(handle), flags & PIPE_SERVER_END != 0) + } +} + +fn is_server_check_failed_error(cause: io::Error, handle: OwnedHandle) -> FromHandleError { + FromHandleError { + details: FromHandleErrorKind::IsServerCheckFailed, + cause: Some(cause), + source: Some(handle), + } +} + impl TryFrom for RawPipeStream { type Error = FromHandleError; fn try_from(handle: OwnedHandle) -> Result { - let is_server = match is_server_from_sys(handle.as_handle()) { - Ok(b) => b, - Err(e) => { - return Err(FromHandleError { - details: FromHandleErrorKind::IsServerCheckFailed, - cause: Some(e), - source: Some(handle), - }) - } + let flags = match c_wrappers::get_flags(handle.as_handle()) { + Ok(f) => f, + Err(e) => return Err(is_server_check_failed_error(e, handle)), }; - Ok(Self::new(FileHandle::from(handle), is_server)) + Ok(Self::from_handle_given_flags(handle, flags)) } } impl From for OwnedHandle { @@ -34,8 +46,6 @@ impl From for OwnedHandle { } } -derive_asraw!(RawPipeStream); - /// Attempts to unwrap the given stream into the raw owned handle type, returning itself back if /// no ownership over it is available, as is the case when the stream is split. impl TryFrom> for OwnedHandle { @@ -59,35 +69,28 @@ impl TryFrom> for OwnedHand impl TryFrom for PipeStream { type Error = FromHandleError; fn try_from(handle: OwnedHandle) -> Result { - let raw = RawPipeStream::try_from(handle)?; + let flags = match c_wrappers::get_flags(handle.as_handle()) { + Ok(f) => f, + Err(e) => return Err(is_server_check_failed_error(e, handle)), + }; // If the wrapper type tries to receive incoming data as messages, that might break if // the underlying pipe has no message boundaries. Let's check for that. - if Rm::MODE == Some(PipeMode::Messages) { - let msg_bnd = match has_msg_boundaries_from_sys(raw.as_handle()) { - Ok(b) => b, - Err(e) => { - return Err(FromHandleError { - details: FromHandleErrorKind::MessageBoundariesCheckFailed, - cause: Some(e), - source: Some(raw.into()), - }) - } - }; - if !msg_bnd { - return Err(FromHandleError { - details: FromHandleErrorKind::NoMessageBoundaries, - cause: None, - source: Some(raw.into()), - }); - } + if Rm::MODE == Some(PipeMode::Messages) && flags & PIPE_TYPE_MESSAGE == 0 { + return Err(FromHandleError { + details: FromHandleErrorKind::NoMessageBoundaries, + cause: None, + source: Some(handle), + }); } - Ok(Self::new(raw)) + Ok(Self::new(RawPipeStream::from_handle_given_flags( + handle, flags, + ))) } } impl TryClone for PipeStream { fn try_clone(&self) -> io::Result { - let handle = c_wrappers::duplicate_handle(self.as_handle())?; + let handle = duplicate_handle(self.as_handle())?; self.raw.needs_flush.on_clone(); let mut new = RawPipeStream::new(handle.into(), self.is_server()); new.needs_flush = NeedsFlushVal::Always.into(); diff --git a/src/os/windows/named_pipe/tokio/stream/error.rs b/src/os/windows/named_pipe/tokio/stream/error.rs index f536895..1ab7e71 100644 --- a/src/os/windows/named_pipe/tokio/stream/error.rs +++ b/src/os/windows/named_pipe/tokio/stream/error.rs @@ -11,9 +11,6 @@ pub enum FromHandleErrorKind { /// It wasn't possible to determine whether the pipe handle corresponds to a pipe server or a /// pipe client. IsServerCheckFailed, - /// The type being converted into has message semantics, but it wasn't possible to determine - /// whether message boundaries are preserved in the pipe. - MessageBoundariesCheckFailed, /// The type being converted into has message semantics, but message boundaries are not /// preserved in the pipe. NoMessageBoundaries, @@ -29,9 +26,6 @@ impl FromHandleErrorKind { use FromHandleErrorKind::*; match self { IsServerCheckFailed => "failed to determine if the pipe is server-side or not", - MessageBoundariesCheckFailed => { - "failed to make sure that the pipe preserves message boundaries" - } NoMessageBoundaries => "the pipe does not preserve message boundaries", TokioError => "Tokio error", } diff --git a/src/os/windows/named_pipe/tokio/stream/impl.rs b/src/os/windows/named_pipe/tokio/stream/impl.rs index 6576c9c..4e4bd74 100644 --- a/src/os/windows/named_pipe/tokio/stream/impl.rs +++ b/src/os/windows/named_pipe/tokio/stream/impl.rs @@ -18,7 +18,10 @@ mod send_off; use super::*; use crate::os::windows::{ - named_pipe::{has_msg_boundaries_from_sys, hget, is_server_from_sys, PipeMode}, + named_pipe::{ + c_wrappers::{self, hget}, + PipeMode, + }, winprelude::*, }; use std::{ diff --git a/src/os/windows/named_pipe/tokio/stream/impl/ctor.rs b/src/os/windows/named_pipe/tokio/stream/impl/ctor.rs index 87253bb..56426f2 100644 --- a/src/os/windows/named_pipe/tokio/stream/impl/ctor.rs +++ b/src/os/windows/named_pipe/tokio/stream/impl/ctor.rs @@ -1,6 +1,6 @@ use super::*; use crate::os::windows::{ - named_pipe::{connect_without_waiting, stream::block_for_server, NeedsFlushVal, WaitTimeout}, + named_pipe::{NeedsFlushVal, WaitTimeout}, path_conversion::*, }; use std::{ffi::OsStr, mem::take, path::Path}; @@ -22,7 +22,7 @@ impl RawPipeStream { async fn wait_for_server(path: Vec) -> io::Result> { tokio::task::spawn_blocking(move || { - block_for_server(&path, WaitTimeout::DEFAULT)?; + c_wrappers::block_for_server(&path, WaitTimeout::DEFAULT)?; Ok(path) }) .await @@ -52,7 +52,7 @@ impl RawPipeStream { send: Option, ) -> io::Result { let client = loop { - match connect_without_waiting(&path, recv, send, true) { + match c_wrappers::connect_without_waiting(&path, recv, send, true) { Err(e) if e.kind() == io::ErrorKind::WouldBlock => { let path_take = Self::wait_for_server(take(&mut path)).await?; path = path_take; diff --git a/src/os/windows/named_pipe/tokio/stream/impl/handle.rs b/src/os/windows/named_pipe/tokio/stream/impl/handle.rs index 63c4f29..fb96e01 100644 --- a/src/os/windows/named_pipe/tokio/stream/impl/handle.rs +++ b/src/os/windows/named_pipe/tokio/stream/impl/handle.rs @@ -1,3 +1,5 @@ +use windows_sys::Win32::System::Pipes::{PIPE_SERVER_END, PIPE_TYPE_MESSAGE}; + use super::*; use std::mem::ManuallyDrop; @@ -15,27 +17,18 @@ impl AsHandle for RawPipeStream { self.inner().as_handle() } } +derive_asraw!(RawPipeStream); -impl TryFrom for RawPipeStream { - type Error = FromHandleError; - - fn try_from(handle: OwnedHandle) -> Result { - let is_server = match is_server_from_sys(handle.as_handle()) { - Ok(b) => b, - Err(e) => { - return Err(FromHandleError { - details: FromHandleErrorKind::IsServerCheckFailed, - cause: Some(e), - source: Some(handle), - }) - } - }; - +impl RawPipeStream { + fn try_from_handle_given_flags( + handle: OwnedHandle, + flags: u32, + ) -> Result { let rh = handle.as_raw_handle(); let handle = ManuallyDrop::new(handle); let tkresult = unsafe { - match is_server { + match flags & PIPE_SERVER_END != 0 { true => TokioNPServer::from_raw_handle(rh).map(InnerTokio::Server), false => TokioNPClient::from_raw_handle(rh).map(InnerTokio::Client), } @@ -51,8 +44,24 @@ impl TryFrom for RawPipeStream { } } -// Tokio does not implement TryInto -derive_asraw!(RawPipeStream); +fn is_server_check_failed_error(cause: io::Error, handle: OwnedHandle) -> FromHandleError { + FromHandleError { + details: FromHandleErrorKind::IsServerCheckFailed, + cause: Some(cause), + source: Some(handle), + } +} + +impl TryFrom for RawPipeStream { + type Error = FromHandleError; + + fn try_from(handle: OwnedHandle) -> Result { + match c_wrappers::get_flags(handle.as_handle()) { + Ok(flags) => Self::try_from_handle_given_flags(handle, flags), + Err(e) => Err(is_server_check_failed_error(e, handle)), + } + } +} impl AsHandle for PipeStream { fn as_handle(&self) -> BorrowedHandle<'_> { @@ -71,28 +80,20 @@ impl TryFrom for PipeStream Result { + let flags = match c_wrappers::get_flags(handle.as_handle()) { + Ok(f) => f, + Err(e) => return Err(is_server_check_failed_error(e, handle)), + }; // If the wrapper type tries to receive incoming data as messages, that might break if // the underlying pipe has no message boundaries. Let's check for that. - if Rm::MODE == Some(PipeMode::Messages) { - let msg_bnd = match has_msg_boundaries_from_sys(handle.as_handle()) { - Ok(b) => b, - Err(e) => { - return Err(FromHandleError { - details: FromHandleErrorKind::MessageBoundariesCheckFailed, - cause: Some(e), - source: Some(handle), - }) - } - }; - if !msg_bnd { - return Err(FromHandleError { - details: FromHandleErrorKind::NoMessageBoundaries, - cause: None, - source: Some(handle), - }); - } + if Rm::MODE == Some(PipeMode::Messages) && flags & PIPE_TYPE_MESSAGE == 0 { + return Err(FromHandleError { + details: FromHandleErrorKind::NoMessageBoundaries, + cause: None, + source: Some(handle), + }); } - let raw = RawPipeStream::try_from(handle)?; + let raw = RawPipeStream::try_from_handle_given_flags(handle, flags)?; Ok(Self::new(raw)) } }