Skip to content

Commit

Permalink
Split named_pipe::tokio::stream::impls
Browse files Browse the repository at this point in the history
  • Loading branch information
kotauskas committed Jan 25, 2024
1 parent c085297 commit d80bcb9
Show file tree
Hide file tree
Showing 12 changed files with 670 additions and 664 deletions.
9 changes: 1 addition & 8 deletions src/os/windows/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ pub(super) mod winprelude {
pub(crate) use windows_sys::Win32::Foundation::{HANDLE, INVALID_HANDLE_VALUE};
}

use std::{
io::{self, ErrorKind::BrokenPipe},
task::Poll,
};
use std::io::{self, ErrorKind::BrokenPipe};
use winprelude::*;

pub(crate) trait AsRawHandleExt: AsRawHandle {
Expand All @@ -31,7 +28,3 @@ pub(super) fn downgrade_eof<T: Default>(r: io::Result<T>) -> io::Result<T> {
els => els,
}
}
#[allow(unused)]
pub(super) fn downgrade_poll_eof<T: Default>(r: Poll<io::Result<T>>) -> Poll<io::Result<T>> {
r.map(downgrade_eof)
}
4 changes: 2 additions & 2 deletions src/os/windows/named_pipe/stream/impl/ctor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::os::windows::named_pipe::path_conversion;
use windows_sys::Win32::{Foundation::ERROR_PIPE_BUSY, System::Pipes::PIPE_READMODE_MESSAGE};

impl RawPipeStream {
pub(crate) fn new(handle: FileHandle, is_server: bool) -> Self {
pub(super) fn new(handle: FileHandle, is_server: bool) -> Self {
Self {
handle: Some(handle),
is_server,
Expand All @@ -13,7 +13,7 @@ impl RawPipeStream {
pub(crate) fn new_server(handle: FileHandle) -> Self {
Self::new(handle, true)
}
pub(crate) fn new_client(handle: FileHandle) -> Self {
fn new_client(handle: FileHandle) -> Self {
Self::new(handle, false)
}

Expand Down
2 changes: 1 addition & 1 deletion src/os/windows/named_pipe/tokio/stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// TODO message reading disabled due to a lack of support in Mio; we should try to figure something
// out, they need to add first-class message pipe support and handling of ERROR_MORE_DATA
mod impls;
mod r#impl;
mod limbo;

use crate::{
Expand Down
98 changes: 98 additions & 0 deletions src/os/windows/named_pipe/tokio/stream/impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
//! Methods and trait implementations for `PipeStream`.
macro_rules! same_clsrv {
($nm:ident in $var:expr => $e:expr) => {
match $var {
InnerTokio::Server($nm) => $e,
InnerTokio::Client($nm) => $e,
}
};
}

mod ctor;
mod debug;
mod handle;
mod recv_bytes;
mod send;
mod send_off;

use super::*;
use crate::os::windows::{
named_pipe::{has_msg_boundaries_from_sys, hget, is_server_from_sys, MaybeArc, PipeMode},
winprelude::*,
};
use std::{
future::{self, Future},
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::net::windows::named_pipe::{NamedPipeClient as TokioNPClient, NamedPipeServer as TokioNPServer};
use windows_sys::Win32::System::Pipes;

impl<Rm: PipeModeTag, Sm: PipeModeTag> PipeStream<Rm, Sm> {
/// Splits the pipe stream by value, returning a receive half and a send half. The stream is closed when both are
/// dropped, kind of like an `Arc` (which is how it's implemented under the hood).
pub fn split(mut self) -> (RecvPipeStream<Rm>, SendPipeStream<Sm>) {
let (raw_ac, raw_a) = (self.raw.refclone(), self.raw);
(
RecvPipeStream {
raw: raw_a,
flush: None.into(), // PERF the mutex is unnecessary for readers
_phantom: PhantomData,
},
SendPipeStream {
raw: raw_ac,
flush: self.flush,
_phantom: PhantomData,
},
)
}
/// Attempts to reunite a receive half with a send half to yield the original stream back,
/// returning both halves as an error if they belong to different streams (or when using
/// this method on streams that were never split to begin with).
pub fn reunite(rh: RecvPipeStream<Rm>, sh: SendPipeStream<Sm>) -> ReuniteResult<Rm, Sm> {
if !MaybeArc::ptr_eq(&rh.raw, &sh.raw) {
return Err(ReuniteError { rh, sh });
}
let PipeStream { mut raw, flush, .. } = sh;
drop(rh);
raw.try_make_owned();
Ok(PipeStream {
raw,
flush,
_phantom: PhantomData,
})
}
/// Retrieves the process identifier of the client side of the named pipe connection.
#[inline]
pub fn client_process_id(&self) -> io::Result<u32> {
unsafe { hget(self.as_handle(), Pipes::GetNamedPipeClientProcessId) }
}
/// Retrieves the session identifier of the client side of the named pipe connection.
#[inline]
pub fn client_session_id(&self) -> io::Result<u32> {
unsafe { hget(self.as_handle(), Pipes::GetNamedPipeClientSessionId) }
}
/// Retrieves the process identifier of the server side of the named pipe connection.
#[inline]
pub fn server_process_id(&self) -> io::Result<u32> {
unsafe { hget(self.as_handle(), Pipes::GetNamedPipeServerProcessId) }
}
/// Retrieves the session identifier of the server side of the named pipe connection.
#[inline]
pub fn server_session_id(&self) -> io::Result<u32> {
unsafe { hget(self.as_handle(), Pipes::GetNamedPipeServerSessionId) }
}
/// Returns `true` if the stream was created by a listener (server-side), `false` if it was created by connecting to
/// a server (server-side).
#[inline]
pub fn is_server(&self) -> bool {
matches!(self.raw.inner(), &InnerTokio::Server(..))
}
/// Returns `true` if the stream was created by connecting to a server (client-side), `false` if it was created by a
/// listener (server-side).
#[inline]
pub fn is_client(&self) -> bool {
!self.is_server()
}
}
86 changes: 86 additions & 0 deletions src/os/windows/named_pipe/tokio/stream/impl/ctor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use super::*;
use crate::os::windows::named_pipe::{
connect_without_waiting, path_conversion,
stream::{block_for_server, WaitTimeout},
MaybeArc, NeedsFlushVal, PipeMode,
};
use std::ffi::OsStr;

impl RawPipeStream {
pub(super) fn new(inner: InnerTokio) -> Self {
Self {
inner: Some(inner),
needs_flush: NeedsFlush::from(NeedsFlushVal::No),
//recv_msg_state: Mutex::new(RecvMsgState::NotRecving),
}
}
pub(crate) fn new_server(server: TokioNPServer) -> Self {
Self::new(InnerTokio::Server(server))
}
fn new_client(client: TokioNPClient) -> Self {
Self::new(InnerTokio::Client(client))
}

async fn wait_for_server(path: Vec<u16>) -> io::Result<Vec<u16>> {
tokio::task::spawn_blocking(move || {
block_for_server(&path, WaitTimeout::DEFAULT)?;
Ok(path)
})
.await
.expect("waiting for server panicked")
}

async fn connect(
pipename: &OsStr,
hostname: Option<&OsStr>,
read: Option<PipeMode>,
write: Option<PipeMode>,
) -> io::Result<Self> {
// FIXME should probably upstream FILE_WRITE_ATTRIBUTES for PipeMode::Messages to Tokio

let mut path = Some(path_conversion::convert_and_encode_path(pipename, hostname));
let client = loop {
match connect_without_waiting(path.as_ref().unwrap(), read, write, true) {
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
let path_take = Self::wait_for_server(path.take().unwrap()).await?;
path = Some(path_take);
}
not_waiting => break not_waiting?,
}
};
let client = unsafe { TokioNPClient::from_raw_handle(client.into_raw_handle())? };
/* MESSAGE READING DISABLED
if read == Some(PipeMode::Messages) {
set_named_pipe_handle_state(client.as_handle(), Some(PIPE_READMODE_MESSAGE), None, None)?;
}
*/
Ok(Self::new_client(client))
}
}

impl<Rm: PipeModeTag, Sm: PipeModeTag> PipeStream<Rm, Sm> {
/// Connects to the specified named pipe (the `\\.\pipe\` prefix is added automatically),
/// waiting until a server instance is dispatched.
pub async fn connect(pipename: impl AsRef<OsStr>) -> io::Result<Self> {
let raw = RawPipeStream::connect(pipename.as_ref(), None, Rm::MODE, Sm::MODE).await?;
Ok(Self::new(raw))
}
/// Connects to the specified named pipe at a remote computer (the `\\<hostname>\pipe\` prefix
/// is added automatically), blocking until a server instance is dispatched.
pub async fn connect_to_remote(pipename: impl AsRef<OsStr>, hostname: impl AsRef<OsStr>) -> io::Result<Self> {
let raw = RawPipeStream::connect(pipename.as_ref(), Some(hostname.as_ref()), Rm::MODE, Sm::MODE).await?;
Ok(Self::new(raw))
}
}

impl<Rm: PipeModeTag, Sm: PipeModeTag> PipeStream<Rm, Sm> {
/// Internal constructor used by the listener. It's a logic error, but not UB, to create the thing from the wrong
/// kind of thing, but that never ever happens, to the best of my ability.
pub(crate) fn new(raw: RawPipeStream) -> Self {
Self {
raw: MaybeArc::Inline(raw),
flush: Mutex::new(None),
_phantom: PhantomData,
}
}
}
34 changes: 34 additions & 0 deletions src/os/windows/named_pipe/tokio/stream/impl/debug.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use super::*;
use std::fmt::{self, Debug, DebugStruct, Formatter};

impl RawPipeStream {
fn fill_fields<'a, 'b, 'c>(
&self,
dbst: &'a mut DebugStruct<'b, 'c>,
readmode: Option<PipeMode>,
writemode: Option<PipeMode>,
) -> &'a mut DebugStruct<'b, 'c> {
let (tokio_object, is_server) = match self.inner() {
InnerTokio::Server(s) => (s as _, true),
InnerTokio::Client(c) => (c as _, false),
};
if let Some(readmode) = readmode {
dbst.field("read_mode", &readmode);
}
if let Some(writemode) = writemode {
dbst.field("write_mode", &writemode);
}
dbst.field("tokio_object", tokio_object).field("is_server", &is_server)
}
}

impl<Rm: PipeModeTag, Sm: PipeModeTag> Debug for PipeStream<Rm, Sm> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let mut dbst = f.debug_struct("PipeStream");
self.raw.fill_fields(&mut dbst, Rm::MODE, Sm::MODE);
if Sm::MODE.is_some() {
dbst.field("flush", &self.flush);
}
dbst.finish()
}
}
100 changes: 100 additions & 0 deletions src/os/windows/named_pipe/tokio/stream/impl/handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use super::*;
use std::mem::ManuallyDrop;

impl AsHandle for InnerTokio {
#[inline]
fn as_handle(&self) -> BorrowedHandle<'_> {
same_clsrv!(x in self => x.as_handle())
}
}
derive_asraw!(InnerTokio);

impl AsHandle for RawPipeStream {
#[inline]
fn as_handle(&self) -> BorrowedHandle<'_> {
self.inner().as_handle()
}
}

impl TryFrom<OwnedHandle> for RawPipeStream {
type Error = FromHandleError;

fn try_from(handle: OwnedHandle) -> Result<Self, Self::Error> {
let is_server = match is_server_from_sys(handle.as_handle()) {
Ok(b) => b,
Err(e) => {
return Err(FromHandleError {
details: FromHandleErrorKind::IsServerCheckFailed,
cause: Some(e),
source: Some(handle),
})
}
};

let rh = handle.as_raw_handle();
let handle = ManuallyDrop::new(handle);

let tkresult = unsafe {
match is_server {
true => TokioNPServer::from_raw_handle(rh).map(InnerTokio::Server),
false => TokioNPClient::from_raw_handle(rh).map(InnerTokio::Client),
}
};
match tkresult {
Ok(s) => Ok(Self::new(s)),
Err(e) => Err(FromHandleError {
details: FromHandleErrorKind::TokioError,
cause: Some(e),
source: Some(ManuallyDrop::into_inner(handle)),
}),
}
}
}

// Tokio does not implement TryInto<OwnedHandle>
derive_asraw!(RawPipeStream);

impl<Rm: PipeModeTag, Sm: PipeModeTag> AsHandle for PipeStream<Rm, Sm> {
fn as_handle(&self) -> BorrowedHandle<'_> {
self.raw.as_handle()
}
}

/// Attempts to wrap the given handle into the high-level pipe stream type. If the underlying pipe
/// type is wrong or trying to figure out whether it's wrong or not caused a system call error, the
/// corresponding error condition is returned.
///
/// For more on why this can fail, see [`FromHandleError`]. Most notably, server-side write-only
/// pipes will cause "access denied" errors because they lack permissions to check whether it's a
/// server-side pipe and whether it has message boundaries.
impl<Rm: PipeModeTag, Sm: PipeModeTag> TryFrom<OwnedHandle> for PipeStream<Rm, Sm> {
type Error = FromHandleError;

fn try_from(handle: OwnedHandle) -> Result<Self, Self::Error> {
// If the wrapper type tries to read incoming data as messages, that might break if
// the underlying pipe has no message boundaries. Let's check for that.
if Rm::MODE == Some(PipeMode::Messages) {
let msg_bnd = match has_msg_boundaries_from_sys(handle.as_handle()) {
Ok(b) => b,
Err(e) => {
return Err(FromHandleError {
details: FromHandleErrorKind::MessageBoundariesCheckFailed,
cause: Some(e),
source: Some(handle),
})
}
};
if !msg_bnd {
return Err(FromHandleError {
details: FromHandleErrorKind::NoMessageBoundaries,
cause: None,
source: Some(handle),
});
}
}
let raw = RawPipeStream::try_from(handle)?;
Ok(Self::new(raw))
}
}

derive_asraw!({Rm: PipeModeTag, Sm: PipeModeTag} PipeStream<Rm, Sm>, windows);
29 changes: 29 additions & 0 deletions src/os/windows/named_pipe/tokio/stream/impl/recv_bytes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use super::*;
use crate::os::windows::downgrade_eof;
use tokio::io::{AsyncRead, ReadBuf};

impl RawPipeStream {
fn poll_read_readbuf(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
loop {
match downgrade_eof(same_clsrv!(x in self.inner() => x.try_read_buf(buf))) {
Ok(..) => return Poll::Ready(Ok(())),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
Err(e) => return Poll::Ready(Err(e)),
}
ready!(same_clsrv!(x in self.inner() => x.poll_read_ready(cx)))?;
}
}
}

impl<Sm: PipeModeTag> AsyncRead for &PipeStream<pipe_mode::Bytes, Sm> {
#[inline(always)]
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
self.get_mut().raw.poll_read_readbuf(cx, buf)
}
}
impl<Sm: PipeModeTag> AsyncRead for PipeStream<pipe_mode::Bytes, Sm> {
#[inline(always)]
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
AsyncRead::poll_read(Pin::new(&mut &*self), cx, buf)
}
}
Loading

0 comments on commit d80bcb9

Please sign in to comment.