Skip to content

Commit

Permalink
from_handle_and_options()
Browse files Browse the repository at this point in the history
  • Loading branch information
kotauskas committed Mar 24, 2024
1 parent b37b153 commit a5b0dcd
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 22 deletions.
25 changes: 23 additions & 2 deletions src/os/windows/named_pipe/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{
};
use windows_sys::Win32::{Foundation::ERROR_PIPE_CONNECTED, System::Pipes::ConnectNamedPipe};

// TODO add conversion from handle after all; allow passing assumed config
// TODO add conversion from handle after all

/// The server for a named pipe, listening for connections to clients and producing pipe streams.
///
Expand Down Expand Up @@ -60,12 +60,14 @@ impl<Rm: PipeModeTag, Sm: PipeModeTag> PipeListener<Rm, Sm> {

Ok(PipeStream::new(raw))
}

/// Creates an iterator which accepts connections from clients, blocking each time `next()` is
/// called until one connects.
#[inline(always)]
#[inline]
pub fn incoming(&self) -> Incoming<'_, Rm, Sm> {
Incoming(self)
}

/// Enables or disables the nonblocking mode for all existing instances of the listener and
/// future ones. By default, it is disabled.
///
Expand All @@ -86,6 +88,24 @@ impl<Rm: PipeModeTag, Sm: PipeModeTag> PipeListener<Rm, Sm> {
Ok(())
}

/// Creates a listener from a handle and a [`PipeListenerOptions`] table with the assumption
/// that the handle was created with those options.
///
/// The options are necessary to provide because the listener needs to create new instances of
/// the named pipe server in `.accept()`.
// TODO mention TryFrom<OwnedHandle> here
pub fn from_handle_and_options(
handle: OwnedHandle,
options: PipeListenerOptions<'static>,
) -> Self {
Self {
nonblocking: AtomicBool::new(options.nonblocking),
config: options,
stored_instance: Mutex::new(FileHandle::from(handle)),
_phantom: PhantomData,
}
}

fn create_instance(&self, nonblocking: bool) -> io::Result<FileHandle> {
self.config
.create_instance(false, nonblocking, false, Self::STREAM_ROLE, Rm::MODE)
Expand All @@ -101,6 +121,7 @@ impl<Rm: PipeModeTag, Sm: PipeModeTag> Debug for PipeListener<Rm, Sm> {
.finish()
}
}

impl<Rm: PipeModeTag, Sm: PipeModeTag> From<PipeListener<Rm, Sm>> for OwnedHandle {
fn from(p: PipeListener<Rm, Sm>) -> Self {
p.stored_instance.into_inner().expect(LOCK_POISON).into()
Expand Down
26 changes: 12 additions & 14 deletions src/os/windows/named_pipe/listener/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@ pub struct PipeListenerOptions<'path> {
pub nonblocking: bool,
/// Specifies the maximum amount of instances of the pipe which can be created, i.e. how many
/// clients can be communicated with at once. If set to 1, trying to create multiple instances
/// at the same time will return an error. If set to `None`, no limit is applied. The value 255
/// is not allowed because of Windows limitations.
/// at the same time will return an error (in fact, this breaks `.accept()`). If set to `None`,
/// no limit is applied. The value 255 is not allowed because it is the underlying Windows API's
/// sentinel for not having a limit.
pub instance_limit: Option<NonZeroU8>,
/// Enables write-through mode, which applies only to network connections to the pipe. If
/// enabled, sending to the pipe will always block until all data is delivered to the other end
/// instead of piling up in the kernel's network buffer until a certain amount of data
/// accamulates or a certain period of time passes, which is when the system actually sends the
/// accumulates or a certain period of time passes, which is when the system actually sends the
/// contents of the buffer over the network.
///
/// Not required for pipes which are restricted to local connections only. If debug assertions
/// are enabled, setting this parameter on a local-only pipe will cause a panic when the pipe is
/// created; in release builds, creation will successfully complete without any errors and the
/// flag will be completely ignored.
/// If debug assertions are enabled, setting this parameter on a local-only pipe will cause a
/// panic when the pipe is created; in release builds, creation will successfully complete
/// without any errors and the flag will be completely ignored.
pub write_through: bool,
/// Enables remote machines to connect to the named pipe over the network.
pub accept_remote: bool,
Expand Down Expand Up @@ -165,14 +165,12 @@ impl<'path> PipeListenerOptions<'path> {
pub fn create<Rm: PipeModeTag, Sm: PipeModeTag>(&self) -> io::Result<PipeListener<Rm, Sm>> {
let (owned_config, instance) =
self._create(PipeListener::<Rm, Sm>::STREAM_ROLE, Rm::MODE)?;
let nonblocking = owned_config.nonblocking.into();
Ok(PipeListener {
config: owned_config,
nonblocking,
stored_instance: Mutex::new(instance),
_phantom: PhantomData,
})
Ok(PipeListener::from_handle_and_options(
instance.into(),
owned_config,
))
}

/// Alias for [`.create()`](Self::create) with the same `Rm` and `Sm`.
#[inline]
pub fn create_duplex<M: PipeModeTag>(&self) -> io::Result<PipeListener<M, M>> {
Expand Down
45 changes: 39 additions & 6 deletions src/os/windows/named_pipe/tokio/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,43 @@ impl<Rm: PipeModeTag, Sm: PipeModeTag> PipeListener<Rm, Sm> {
Ok(PipeStream::new(raw))
}

/// Creates a listener from a [corresponding Tokio object](TokioNPServer) and a
/// [`PipeListenerOptions`] table with the assumption that the handle was created with those
/// options.
///
/// The options are necessary to provide because the listener needs to create new instances of
/// the named pipe server in `.accept()`.
// TODO mention TryFrom<OwnedHandle> here
pub fn from_tokio_and_options(
tokio_object: TokioNPServer,
options: PipeListenerOptions<'static>,
) -> Self {
Self {
config: options,
stored_instance: Mutex::new(tokio_object),
_phantom: PhantomData,
}
}

/// Creates a listener from a handle and a [`PipeListenerOptions`] table with the assumption
/// that the handle was created with those options.
///
/// The options are necessary to provide because the listener needs to create new instances of
/// the named pipe server in `.accept()`.
///
/// # Errors
/// Returns an error if called outside a Tokio runtime.
// TODO mention TryFrom<OwnedHandle> here
pub fn from_handle_and_options(
handle: OwnedHandle,
options: PipeListenerOptions<'static>,
) -> io::Result<Self> {
Ok(Self::from_tokio_and_options(
npserver_from_handle(handle)?,
options,
))
}

fn create_instance(&self) -> io::Result<TokioNPServer> {
self.config
.create_instance(false, false, true, Self::STREAM_ROLE, Rm::MODE)
Expand Down Expand Up @@ -177,11 +214,7 @@ impl PipeListenerOptionsExt for PipeListenerOptions<'_> {
fn create_tokio<Rm: PipeModeTag, Sm: PipeModeTag>(&self) -> io::Result<PipeListener<Rm, Sm>> {
let (owned_config, instance) =
_create_tokio(self, PipeListener::<Rm, Sm>::STREAM_ROLE, Rm::MODE)?;
Ok(PipeListener {
config: owned_config,
stored_instance: Mutex::new(instance),
_phantom: PhantomData,
})
Ok(PipeListener::from_tokio_and_options(instance, owned_config))
}
}
impl Sealed for PipeListenerOptions<'_> {}
Expand All @@ -197,7 +230,7 @@ fn _create_tokio(
config.nonblocking = false;

let instance = config
.create_instance(true, config.nonblocking, true, role, recv_mode)
.create_instance(true, false, true, role, recv_mode)
.and_then(npserver_from_handle)?;

Ok((config, instance))
Expand Down

0 comments on commit a5b0dcd

Please sign in to comment.