Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: oneshot::Receiver::{is_empty, is_closed} #7137

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 174 additions & 3 deletions tokio/src/sync/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,164 @@ impl<T> Receiver<T> {
}
}

/// Checks if a channel is empty.
///
/// This method returns `true` if the channel has no messages.
///
/// # Examples
///
/// ```
/// use tokio::sync::oneshot;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = oneshot::channel();
/// assert!(rx.is_empty());
///
/// tx.send(0).unwrap();
/// assert!(!rx.is_empty());
///
/// assert_eq!((&mut rx).await, Ok(0));
/// assert!(rx.is_empty());
/// }
/// ```
pub fn is_empty(&self) -> bool {
if let Some(inner) = self.inner.as_ref() {
let state = State::load(&inner.state, Acquire);
if state.is_complete() {
// SAFETY: If `state.is_complete()` returns true, then the
// `VALUE_SENT` bit has been set and the sender side of the
// channel will no longer attempt to access the inner
// `UnsafeCell`. Therefore, it is now safe for us to access the
// cell.
//
// The channel is empty if it does not have a value.
unsafe { !inner.has_value() }
} else if state.is_closed() {
// The receiver closed the channel...
true
} else {
// No value has been sent yet.
true
}
} else {
true
}
}

/// Checks if the channel has been closed.
///
/// This happens when the corresponding sender is either dropped or sends a
/// value, or when this receiver has closed the channel.
Comment on lines +981 to +982
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this feels somewhat different than the semantics of is_closed on other channels, which only returns true when the other side has been dropped. On the other hand, there's a valid argument that this is just because those channels don't have the property that sending a value closes the channel. I'm not sure how I feel about this: I think it might be worth being consistent that "closed" means "dropped by the other side", but I'm open to being convinced otherwise.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the Sender::is_closed method currently exists and only inspects the value of the CLOSED bit:

/// Returns `true` if the associated [`Receiver`] handle has been dropped.
///
/// A [`Receiver`] is closed by either calling [`close`] explicitly or the
/// [`Receiver`] value is dropped.
///
/// If `true` is returned, a call to `send` will always result in an error.
///
/// [`Receiver`]: Receiver
/// [`close`]: Receiver::close
///
/// # Examples
///
/// ```
/// use tokio::sync::oneshot;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = oneshot::channel();
///
/// assert!(!tx.is_closed());
///
/// drop(rx);
///
/// assert!(tx.is_closed());
/// assert!(tx.send("never received").is_err());
/// }
/// ```
pub fn is_closed(&self) -> bool {
let inner = self.inner.as_ref().unwrap();
let state = State::load(&inner.state, Acquire);
state.is_closed()
}

Of course, that method can't ever be called if a value was sent, as doing so consumes the sender. so. Hm.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah ... I'm not sure what is the least surprising here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a very interesting conundrum!

to begin by restating the problem directly: we are asking ourselves whether or
not tokio::sync::oneshot::Receiver::is_closed() should return true when
the corresponding tokio::sync::oneshot::Sender has been consumed by sending
a value.

let's consider the alternatives and how they would affect callers using them
to determine whether or not a receiver may be polled.

option a: is_closed() does not include completed channel

in this model, is_closed() only returns true when the sender is dropped, or
when the receiver closes the channel. it does not return true if a value
was sent.

in this case, we would see the following values when inquiring about closedness
and emptiness.

- is_closed() is_empty() safe to poll?
initial false true yes
receiver closes channel true true or false yes
sender was dropped true true yes
sender sent value (unreceived) false false yes
sender sent value (received) false true no

option b: is_closed() does include completed channel

in this model, is_closed() returns true when the receiver closes the channel,
when the sender is dropped, or when the sender is used to send a value.

- is_closed() is_empty() safe to poll?
initial false true yes
receiver closes channel true true or false yes
sender was dropped true true yes
sender sent value (unreceived) true false yes
sender sent value (received) true true no

⚖️ comparing a and b

now, this brings us to @hawkw's note:

Hmm, this feels somewhat different than the semantics of is_closed on other
channels, which only returns true when the other side has been dropped. On
the other hand, there's a valid argument that this is just because those
channels don't have the property that sending a value closes the channel. I'm
not sure how I feel about this: I think it might be worth being consistent
that "closed" means "dropped by the other side", but I'm open to being
convinced otherwise.

first, i would note that we actually have some minor semantic differences
between different channels' respective is_closed methods already, upon
closer inspection. i'd argue that is fine, and inherent to the fact that
different channel have different semantics, and that is bound to be reflected
in methods querying their conceptual state.

as an example, sync::watch::Sender::is_closed returns true if all receivers
are dropped, while sync::mpsc::Sender::is_closed returns true if all
receivers are dropped or when Receiver::close is called. a sync::watch
channel can have many receivers, so it makes sense that its notion of
closedness would be narrower than a channel that conversely has a single
receiver.

it seems like it would be in line with the above for a oneshot channel to
have its own unique caveat related for is_closed, returning true when the
sender has been dropped by virtue of having sent a value.

...a secret third thing?

now, zooming back out for a moment: i found myself curious about this because
i've frequently encountered a pattern when interacting with oneshot channels
wherein they are very frequently wrapped in an Option<T>. once the receiver
is polled and yields a Poll::Ready(_) value of some kind, the caller takes
the receiver out of the Option<T> and mem::drop()s it to prevent from being
polled again.

prior to reading the internals of the oneshot channel, and learning more
about how the sender and receiver interact, my initial intuition was that one
could check that a receiver was finished by checking that the channel was
closed, and empty.

looking at the tables above however, i've found that neither option a or b
provide a surefire way to check that a receiver is finished, or if it is safe
to poll(). for example: an empty, closed channel could indicate that the
sender was dropped, but that doesn't provide information about whether the
Err(RecvError) has been returned yet.

now, looking at the receiver, and its Future implementation:

// tokio/src/sync/oneshot.rs (abbrevated)

#[derive(Debug)]
pub struct Receiver<T> {
    inner: Option<Arc<Inner<T>>>,
    // spans...
}

impl<T> Future for Receiver<T> {
    type Output = Result<T, RecvError>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // If `inner` is `None`, then `poll()` has already completed.
        let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
            ready!(inner.poll_recv(cx))?
        } else {
            panic!("called after complete");
        };

        self.inner = None;
        Ready(Ok(ret))
    }
}

i see utility for a third method tokio::sync::oneshot::Receiver::is_finished().

impl<T> Receiver<T> {
    /// Checks if this receiver is finished.
    ///
    /// This will return true after this receiver has been polled and yielded
    /// a result.
    fn is_finished(&self) -> bool {
        self.inner.is_none()
    }
}

if this was an option, i would feel personally much less attached to the
decision over whether or not is_closed reports true when a value has been
sent.

this would directly let callers check if polling the receiver would panic, and
in friendly, direct terms. this would also mean that the common pattern of
wrapping receivers in Option<Receiver<T>> would be largely outmoded.
is_finished would let us inquire about the state of the receiver's own
internal Option<T>, without needing to wrap it in another outer option we
can inspect.

what do you think, @Darksonn, and @hawkw?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the intent of this PR is a way to detect whether polling the channel will panic, then having a function that does exactly that sounds good to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the intent of this PR, broadly speaking, is to introduce accessors that allows sync::oneshot::Receiver<T> to inspect the channel's status in the same manner that other channels like sync::mpsc's and sync::broadcast's respective Receiver<T>s can.

detecting panics is one acute example, but i've also felt a need for these interfaces when writing Body implementations for types backed by a oneshot, to provide another example.

a Body backed by a oneshot receiver cannot report proper is_end_stream() hints because the receiver doesn't provide a way to inspect the channel. these felt like relatively agnostic methods to expose in part because they follow patterns exposed by other channels.

91682a9 introduces an is_finished() method. i'd be happy to outline that into a separate PR if we don't feel a disposition to merge is_empty() and is_closed() methods. what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

91682a9 introduces an is_finished() method. i'd be happy to outline that into a separate PR if we don't feel a disposition to merge is_empty() and is_closed() methods. what do you think?

Personally, I'd be inclined to suggest a PR adding a method like that separately, so that the change you actually need isn't blocked on deciding the right semantics for is_closed. I think that change should hopefully be fairly uncontroversial.

Regarding naming for that method, I think "is_finished" is probably fine, but I'll note that the futures crate's FusedFuture trait calls its method with similar semantics is_terminated. Although we probably won't be adding FusedFuture implementations in Tokio due to stability concerns, it might be worth choosing a name that's consistent with the one in futures? I could be convinced either way though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, to be clear, I'd also still like to have is_closed() and is_empty() — I just think we could add an is_finished_and_or_terminated() without having to wait for figuring out is_closed().

I think we could probably also land an is_empty() method separately, if you like. I don't think there's much ambiguity about what "empty" means in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you both for the advice and for your time, @Darksonn and @hawkw! i agree, landing these each separately sounds like a more prudent path forward.

i'm going to close this, and let us consider each of these proposals individually.

see:

///
/// # Examples
///
/// ```
/// use tokio::sync::oneshot;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = oneshot::channel::<()>();
/// assert!(!rx.is_closed());
/// drop(tx);
/// assert!(rx.is_closed());
/// }
/// ```
///
/// ```
/// use tokio::sync::oneshot;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = oneshot::channel::<()>();
/// assert!(!rx.is_closed());
/// rx.close();
/// assert!(rx.is_closed());
/// }
/// ```
///
/// ```
/// use tokio::sync::oneshot;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = oneshot::channel();
/// assert!(!rx.is_closed());
///
/// tx.send(0).unwrap();
/// assert!(rx.is_closed());
/// }
/// ```
pub fn is_closed(&self) -> bool {
if let Some(inner) = self.inner.as_ref() {
let state = State::load(&inner.state, Acquire);
state.is_closed() || state.is_complete()
} else {
true
}
}

/// Checks if this receiver is finished.
///
/// This returns true if this receiver has already yielded a [`Poll::Ready`] result, whether
/// that was a value `T`, or a [`RecvError`].
///
/// # Examples
///
/// Sending a value and polling it.
///
/// ```
/// use futures::task::noop_waker_ref;
/// use tokio::sync::oneshot;
///
/// use std::future::Future;
/// use std::pin::Pin;
/// use std::task::{Context, Poll};
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = oneshot::channel();
///
/// // A receiver is not finished when it is initialized.
/// assert!(!rx.is_finished());
///
/// // A receiver is not finished it is polled and is still pending.
/// let poll = Pin::new(&mut rx).poll(&mut Context::from_waker(noop_waker_ref()));
/// assert_eq!(poll, Poll::Pending);
/// assert!(!rx.is_finished());
///
/// // A receiver is not finished if a value has been sent, but not yet read.
/// tx.send(0).unwrap();
/// assert!(!rx.is_finished());
///
/// // A receiver *is* finished after it has been polled and yielded a value.
/// assert_eq!((&mut rx).await, Ok(0));
/// assert!(rx.is_finished());
/// }
/// ```
///
/// Dropping the sender.
///
/// ```
/// use tokio::sync::oneshot;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = oneshot::channel::<()>();
///
/// // A receiver is not immediately finished when the sender is dropped.
/// drop(tx);
/// assert!(!rx.is_finished());
///
/// // A receiver *is* finished after it has been polled and yielded an error.
/// let _ = (&mut rx).await.unwrap_err();
/// assert!(rx.is_finished());
/// }
/// ```
pub fn is_finished(&self) -> bool {
self.inner.is_none()
}

/// Attempts to receive a value.
///
/// If a pending value exists in the channel, it is returned. If no value
Expand Down Expand Up @@ -1106,18 +1264,18 @@ impl<T> Future for Receiver<T> {

let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?;
let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx))).map_err(Into::into);

#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
let res = ready!(inner.poll_recv(cx))?;
let res = ready!(inner.poll_recv(cx)).map_err(Into::into);

res
} else {
panic!("called after complete");
};

self.inner = None;
Ready(Ok(ret))
Ready(ret)
}
}

Expand Down Expand Up @@ -1233,6 +1391,19 @@ impl<T> Inner<T> {
unsafe fn consume_value(&self) -> Option<T> {
self.value.with_mut(|ptr| (*ptr).take())
}

/// Returns true if there is a value. This function does not check `state`.
///
/// # Safety
///
/// Calling this method concurrently on multiple threads will result in a
/// data race. The `VALUE_SENT` state bit is used to ensure that only the
/// sender *or* the receiver will call this method at a given point in time.
/// If `VALUE_SENT` is not set, then only the sender may call this method;
/// if it is set, then only the receiver may call this method.
unsafe fn has_value(&self) -> bool {
self.value.with(|ptr| (*ptr).is_some())
}
}

unsafe impl<T: Send> Send for Inner<T> {}
Expand Down
124 changes: 124 additions & 0 deletions tokio/tests/sync_oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,127 @@ fn sender_changes_task() {

assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx)));
}

#[test]
fn receiver_is_closed_send() {
let (tx, rx) = oneshot::channel::<i32>();
assert!(
!rx.is_closed(),
"channel is NOT closed before value is sent"
);
tx.send(17).unwrap();
assert!(rx.is_closed(), "channel IS closed after value is sent");
}

#[test]
fn receiver_is_closed_drop() {
let (tx, rx) = oneshot::channel::<i32>();
assert!(
!rx.is_closed(),
"channel is NOT closed before sender is dropped"
);
drop(tx);
assert!(rx.is_closed(), "channel IS closed after sender is dropped");
}

#[test]
fn receiver_is_closed_rx_close() {
let (_tx, mut rx) = oneshot::channel::<i32>();
assert!(!rx.is_closed());
rx.close();
assert!(rx.is_closed());
}

#[test]
fn receiver_is_empty_send() {
let (tx, mut rx) = oneshot::channel::<i32>();

assert!(rx.is_empty(), "channel IS empty before value is sent");
tx.send(17).unwrap();
assert!(!rx.is_empty(), "channel is NOT empty after value is sent");

let mut task = task::spawn(());
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
assert_ready_eq!(poll, Ok(17));

assert!(rx.is_empty(), "channel IS empty after value is read");
}

#[test]
fn receiver_is_empty_drop() {
let (tx, mut rx) = oneshot::channel::<i32>();

assert!(rx.is_empty(), "channel IS empty before sender is dropped");
drop(tx);
assert!(rx.is_empty(), "channel IS empty after sender is dropped");

let mut task = task::spawn(());
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
assert_ready_err!(poll);

assert!(rx.is_empty(), "channel IS empty after value is read");
}

#[test]
fn receiver_is_empty_rx_close() {
let (_tx, mut rx) = oneshot::channel::<i32>();
assert!(rx.is_empty());
rx.close();
assert!(rx.is_empty());
}

#[test]
fn receiver_is_finished_send() {
let (tx, mut rx) = oneshot::channel::<i32>();

assert!(
!rx.is_finished(),
"channel is NOT finished before value is sent"
);
tx.send(17).unwrap();
assert!(
!rx.is_finished(),
"channel is NOT finished after value is sent"
);

let mut task = task::spawn(());
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
assert_ready_eq!(poll, Ok(17));

assert!(rx.is_finished(), "channel IS finished after value is read");
}

#[test]
fn receiver_is_finished_drop() {
let (tx, mut rx) = oneshot::channel::<i32>();

assert!(
!rx.is_finished(),
"channel is NOT finished before sender is dropped"
);
drop(tx);
assert!(
!rx.is_finished(),
"channel is NOT finished after sender is dropped"
);

let mut task = task::spawn(());
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
assert_ready_err!(poll);

assert!(rx.is_finished(), "channel IS finished after value is read");
}

#[test]
fn receiver_is_finished_rx_close() {
let (_tx, mut rx) = oneshot::channel::<i32>();
assert!(!rx.is_finished(), "channel is NOT finished before closing");
rx.close();
assert!(!rx.is_finished(), "channel is NOT finished before closing");

let mut task = task::spawn(());
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
assert_ready_err!(poll);

assert!(rx.is_finished(), "channel IS finished after value is read");
}