Skip to content

Commit

Permalink
Fix panic on concurrent read_datagram completion
Browse files Browse the repository at this point in the history
The previous logic assumed that a datagram must be available if the
future was notified, but this might not be the case if e.g. two
futures are concurrently waiting for datagrams, but only one datagram
has been received, since we use notify_waiters to wake all waiting
futures to be robust in the face of cancellation.
  • Loading branch information
Ralith authored and djc committed Nov 29, 2022
1 parent 5c7f7aa commit 478445e
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 13 deletions.
29 changes: 16 additions & 13 deletions quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ impl Connection {
/// Receive an application datagram
pub fn read_datagram(&self) -> ReadDatagram<'_> {
ReadDatagram {
conn: self.0.clone(),
conn: &self.0,
notify: self.0.shared.datagrams.notified(),
}
}
Expand Down Expand Up @@ -706,7 +706,7 @@ fn poll_accept<'a>(
pin_project! {
/// Future produced by [`Connection::read_datagram`]
pub struct ReadDatagram<'a> {
conn: ConnectionRef,
conn: &'a ConnectionRef,
#[pin]
notify: Notified<'a>,
}
Expand All @@ -715,19 +715,22 @@ pin_project! {
impl Future for ReadDatagram<'_> {
type Output = Result<Bytes, ConnectionError>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let mut conn = this.conn.state.lock("ReadDatagram::poll");
let mut this = self.project();
let mut state = this.conn.state.lock("ReadDatagram::poll");
// Check for buffered datagrams before checking `state.error` so that already-received
// datagrams, which are necessarily finite, can be drained from a closed connection.
if let Some(x) = conn.inner.datagrams().recv() {
Poll::Ready(Ok(x))
} else if let Some(ref e) = conn.error {
Poll::Ready(Err(e.clone()))
} else if this.notify.poll(ctx).is_pending() {
// `conn` lock ensures we don't race with readiness
Poll::Pending
} else {
unreachable!("ReadDatagram notified with no datagrams pending");
if let Some(x) = state.inner.datagrams().recv() {
return Poll::Ready(Ok(x));
} else if let Some(ref e) = state.error {
return Poll::Ready(Err(e.clone()));
}
loop {
match this.notify.as_mut().poll(ctx) {
// `state` lock ensures we didn't race with readiness
Poll::Pending => return Poll::Pending,
// Spurious wakeup, get a new future
Poll::Ready(()) => this.notify.set(this.conn.shared.datagrams.notified()),
}
}
}
}
Expand Down
36 changes: 36 additions & 0 deletions quinn/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,3 +687,39 @@ async fn stream_id_flow_control() {
}
);
}

#[tokio::test]
async fn two_datagram_readers() {
let _guard = subscribe();
let endpoint = endpoint();

let (client, server) = tokio::join!(
endpoint
.connect(endpoint.local_addr().unwrap(), "localhost")
.unwrap(),
async { endpoint.accept().await.unwrap().await }
);
let client = client.unwrap();
let server = server.unwrap();

let done = tokio::sync::Notify::new();
let (a, b, ()) = tokio::join!(
async {
let x = client.read_datagram().await.unwrap();
done.notify_waiters();
x
},
async {
let x = client.read_datagram().await.unwrap();
done.notify_waiters();
x
},
async {
server.send_datagram(b"one"[..].into()).unwrap();
done.notified().await;
server.send_datagram(b"two"[..].into()).unwrap();
}
);
assert!(*a == *b"one" || *b == *b"one");
assert!(*a == *b"two" || *b == *b"two");
}

0 comments on commit 478445e

Please sign in to comment.