Skip to content

Commit

Permalink
sync: oneshot::Receiver::{is_empty, is_closed}
Browse files Browse the repository at this point in the history
this commit proposes two new methods for tokio's
`sync::oneshot::Receiver<T>` type: `is_empty()` and `is_closed()`.

these follow the same general pattern as existing `is_empty()` and
`is_closed()` methods on other channels such as
[`sync::mpsc::Sender::is_closed()`](https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Sender.html#method.is_closed),
[`sync::mpsc::Receiver::is_closed()`](https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.is_closed),
[`sync::oneshot::Receiver::is_closed()`](https://docs.rs/tokio/latest/tokio/sync/oneshot/struct.Sender.html#method.is_closed),
[`sync::broadcast::Receiver::is_empty()`](https://docs.rs/tokio/latest/tokio/sync/broadcast/struct.Receiver.html#method.is_empty),
[`sync::mpsc::Receiver::is_empty()`](https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.is_empty),
...and so forth.

broadly speaking, users of the oneshot channel are encouraged to
`.await` the `Receiver<T>` directly, as it will only yield a single
value.

users that are writing low-level code, implementing `Future`s directly,
may instead poll the receiver via
`<Receiver<T> as Future<Output = Result<T, RecvError>::poll(..)`.

because (a) `poll()` has an `&mut self` receiver that does not take
ownership in the same way as `.await`ing the receiver will, and (b) the
contract of `std::future::Future` states that clients should not poll a
future after it has yielded `Poll::Ready(value)`, users implementing
`Future`s in terms of a oneshot channel do not have a way to inspect
the state of a oneshot channel.

this commit aims to provide such users means to passively inspect the
state of a oneshot channel, to avoid violating the contact of
`Future::poll(..)`, or requiring that a oneshot channel users track this
state themselves externally.
  • Loading branch information
cratelyn committed Feb 3, 2025
1 parent b8ac94e commit 0931406
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 0 deletions.
116 changes: 116 additions & 0 deletions tokio/src/sync/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,109 @@ impl<T> Receiver<T> {
}
}

/// Checks if a channel is empty.
///
/// This method returns `true` if the channel has no messages.
///
/// # Examples
///
/// ```
/// use futures::task::noop_waker_ref;
/// use tokio::sync::oneshot;
///
/// use std::future::Future;
/// use std::pin::Pin;
/// use std::task::{Context, Poll};
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = oneshot::channel();
/// assert!(rx.is_empty());
///
/// tx.send(0).unwrap();
/// assert!(!rx.is_empty());
///
/// let poll = Pin::new(&mut rx).poll(&mut Context::from_waker(noop_waker_ref()));
/// assert_eq!(poll, Poll::Ready(Ok(0)));
/// assert!(rx.is_empty());
/// }
/// ```
pub fn is_empty(&self) -> bool {
if let Some(inner) = self.inner.as_ref() {
let state = State::load(&inner.state, Acquire);
if state.is_complete() {
// SAFETY: If `state.is_complete()` returns true, then the
// `VALUE_SENT` bit has been set and the sender side of the
// channel will no longer attempt to access the inner
// `UnsafeCell`. Therefore, it is now safe for us to access the
// cell.
//
// The channel is empty if it does not have a value.
unsafe { !inner.has_value() }
} else if state.is_closed() {
// The receiver closed the channel...
true
} else {
// No value has been sent yet.
true
}
} else {
true
}
}

/// Checks if the channel has been closed.
///
/// This happens when the corresponding sender is either dropped or sends a
/// value, or when this receiver has closed the channel.
///
/// # Examples
///
/// ```
/// use tokio::sync::oneshot;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = oneshot::channel::<()>();
/// assert!(!rx.is_closed());
/// drop(tx);
/// assert!(rx.is_closed());
/// }
/// ```
///
/// ```
/// use tokio::sync::oneshot;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = oneshot::channel::<()>();
/// assert!(!rx.is_closed());
/// rx.close();
/// assert!(rx.is_closed());
/// }
/// ```
///
/// ```
/// use tokio::sync::oneshot;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = oneshot::channel();
/// assert!(!rx.is_closed());
///
/// tx.send(0).unwrap();
/// assert!(rx.is_closed());
/// }
/// ```
pub fn is_closed(&self) -> bool {
if let Some(inner) = self.inner.as_ref() {
let state = State::load(&inner.state, Acquire);
state.is_closed() || state.is_complete()
} else {
true
}
}

/// Attempts to receive a value.
///
/// If a pending value exists in the channel, it is returned. If no value
Expand Down Expand Up @@ -1233,6 +1336,19 @@ impl<T> Inner<T> {
unsafe fn consume_value(&self) -> Option<T> {
self.value.with_mut(|ptr| (*ptr).take())
}

/// Returns true if there is a value. This function does not check `state`.
///
/// # Safety
///
/// Calling this method concurrently on multiple threads will result in a
/// data race. The `VALUE_SENT` state bit is used to ensure that only the
/// sender *or* the receiver will call this method at a given point in time.
/// If `VALUE_SENT` is not set, then only the sender may call this method;
/// if it is set, then only the receiver may call this method.
unsafe fn has_value(&self) -> bool {
self.value.with(|ptr| (*ptr).is_some())
}
}

unsafe impl<T: Send> Send for Inner<T> {}
Expand Down
68 changes: 68 additions & 0 deletions tokio/tests/sync_oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,71 @@ fn sender_changes_task() {

assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx)));
}

#[test]
fn receiver_is_closed_send() {
let (tx, rx) = oneshot::channel::<i32>();
assert!(
!rx.is_closed(),
"channel is NOT closed before value is sent"
);
tx.send(17).unwrap();
assert!(rx.is_closed(), "channel IS closed after value is sent");
}

#[test]
fn receiver_is_closed_drop() {
let (tx, rx) = oneshot::channel::<i32>();
assert!(
!rx.is_closed(),
"channel is NOT closed before sender is dropped"
);
drop(tx);
assert!(rx.is_closed(), "channel IS closed after sender is dropped");
}

#[test]
fn receiver_is_closed_rx_close() {
let (_tx, mut rx) = oneshot::channel::<i32>();
assert!(!rx.is_closed());
rx.close();
assert!(rx.is_closed());
}

#[test]
fn receiver_is_empty_send() {
let (tx, mut rx) = oneshot::channel::<i32>();

assert!(rx.is_empty(), "channel IS empty before value is sent");
tx.send(17).unwrap();
assert!(!rx.is_empty(), "channel is NOT empty after value is sent");

let mut task = task::spawn(());
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
assert_ready_eq!(poll, Ok(17));

assert!(rx.is_empty(), "channel IS empty after value is read");
}

#[test]
fn receiver_is_empty_drop() {
let (tx, mut rx) = oneshot::channel::<i32>();

assert!(rx.is_empty(), "channel IS empty before sender is dropped");
drop(tx);
assert!(rx.is_empty(), "channel IS empty after sender is dropped");

let mut task = task::spawn(());
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
assert_ready_err!(poll);

assert!(rx.is_empty(), "channel IS empty after value is read");
}

#[test]
fn receiver_is_empty_rx_close() {
let (_tx, mut rx) = oneshot::channel::<i32>();
assert!(rx.is_empty());
rx.close();
assert!(rx.is_empty());
}

0 comments on commit 0931406

Please sign in to comment.