Skip to content

Commit

Permalink
streams: track the push task separately
Browse files Browse the repository at this point in the history
The push task is a separate task from the recv task, so its state needs
to be tracked separately for waking. I don't know how to be systematic
about ensuring that notify_push is called in all the right places, but
this is an initial attempt.

In order to test this works, we manually utilize FuturesUnordered which
does fine-grained task wake tracking. The added test failed before
making the other changes.
  • Loading branch information
ajwerner committed Aug 6, 2024
1 parent 7dbb5c5 commit 0f10650
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 15 deletions.
5 changes: 1 addition & 4 deletions src/proto/streams/prioritize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,10 +839,7 @@ impl Prioritize {
}),
None => {
if let Some(reason) = stream.state.get_scheduled_reset() {
let stream_id = stream.id;
stream
.state
.set_reset(stream_id, reason, Initiator::Library);
stream.set_reset(reason, Initiator::Library);

let frame = frame::Reset::new(stream.id, reason);
Frame::Reset(frame)
Expand Down
6 changes: 5 additions & 1 deletion src/proto/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ impl Recv {
let is_open = stream.state.ensure_recv_open()?;

if is_open {
stream.recv_task = Some(cx.waker().clone());
stream.push_task = Some(cx.waker().clone());
Poll::Pending
} else {
Poll::Ready(None)
Expand Down Expand Up @@ -760,6 +760,7 @@ impl Recv {
.pending_recv
.push_back(&mut self.buffer, Event::Headers(Server(req)));
stream.notify_recv();
stream.notify_push();
Ok(())
}

Expand Down Expand Up @@ -814,6 +815,7 @@ impl Recv {

stream.notify_send();
stream.notify_recv();
stream.notify_push();

Ok(())
}
Expand All @@ -826,6 +828,7 @@ impl Recv {
// If a receiver is waiting, notify it
stream.notify_send();
stream.notify_recv();
stream.notify_push();
}

pub fn go_away(&mut self, last_processed_id: StreamId) {
Expand All @@ -837,6 +840,7 @@ impl Recv {
stream.state.recv_eof();
stream.notify_send();
stream.notify_recv();
stream.notify_push();
}

pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) {
Expand Down
5 changes: 1 addition & 4 deletions src/proto/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,7 @@ impl Send {
}

// Transition the state to reset no matter what.
stream.state.set_reset(stream_id, reason, initiator);
// Notify the recv task if it's waiting, because it'll
// want to hear about the reset.
stream.notify_recv();
stream.set_reset(reason, initiator);

// If closed AND the send queue is flushed, then the stream cannot be
// reset explicitly, either. Implicit resets can still be queued.
Expand Down
20 changes: 20 additions & 0 deletions src/proto/streams/stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::Reason;

use super::*;

use std::task::{Context, Waker};
Expand Down Expand Up @@ -104,6 +106,9 @@ pub(super) struct Stream {
/// Task tracking receiving frames
pub recv_task: Option<Waker>,

/// Task tracking pushed promises.
pub push_task: Option<Waker>,

/// The stream's pending push promises
pub pending_push_promises: store::Queue<NextAccept>,

Expand Down Expand Up @@ -186,6 +191,7 @@ impl Stream {
pending_recv: buffer::Deque::new(),
is_recv: true,
recv_task: None,
push_task: None,
pending_push_promises: store::Queue::new(),
content_length: ContentLength::Omitted,
}
Expand Down Expand Up @@ -369,6 +375,20 @@ impl Stream {
task.wake();
}
}

pub(super) fn notify_push(&mut self) {
if let Some(task) = self.push_task.take() {
task.wake();
}
}

/// Set the stream's state to `Closed` with the given reason and initiator.
/// Notify the send and receive tasks, if they exist.
pub(super) fn set_reset(&mut self, reason: Reason, initiator: Initiator) {
self.state.set_reset(self.id, reason, initiator);
self.notify_push();
self.notify_recv();
}
}

impl store::Next for NextAccept {
Expand Down
2 changes: 1 addition & 1 deletion src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ impl Inner {

let parent = &mut self.store.resolve(parent_key);
parent.pending_push_promises = ppp;
parent.notify_recv();
parent.notify_push();
};

Ok(())
Expand Down
17 changes: 12 additions & 5 deletions tests/h2-tests/tests/push_promise.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use futures::future::join;
use futures::{StreamExt, TryStreamExt};
use std::iter::FromIterator;

use futures::{future::join, FutureExt as _, StreamExt, TryStreamExt};
use h2_support::prelude::*;

#[tokio::test]
Expand Down Expand Up @@ -51,9 +52,15 @@ async fn recv_push_works() {
let ps: Vec<_> = p.collect().await;
assert_eq!(1, ps.len())
};

h2.drive(join(check_resp_status, check_pushed_response))
.await;
// Use a FuturesUnordered to poll both tasks but only poll them
// if they have been notified.
let tasks = futures::stream::FuturesUnordered::from_iter([
check_resp_status.boxed(),
check_pushed_response.boxed(),
])
.collect::<()>();

h2.drive(tasks).await;
};

join(mock, h2).await;
Expand Down

0 comments on commit 0f10650

Please sign in to comment.