Skip to content

Commit

Permalink
fix: stream flow control insufficient size before ack (#746)
Browse files Browse the repository at this point in the history
  • Loading branch information
dswij authored and seanmonstar committed Mar 15, 2024
1 parent 94e80b1 commit 7ed7686
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 8 deletions.
4 changes: 0 additions & 4 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ where
pub fn new(codec: Codec<T, Prioritized<B>>, config: Config) -> Connection<T, P, B> {
fn streams_config(config: &Config) -> streams::Config {
streams::Config {
local_init_window_sz: config
.settings
.initial_window_size()
.unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE),
initial_max_send_streams: config.initial_max_send_streams,
local_max_buffer_size: config.max_send_buffer_size,
local_next_stream_id: config.next_stream_id,
Expand Down
3 changes: 0 additions & 3 deletions src/proto/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ use std::time::Duration;

#[derive(Debug)]
pub struct Config {
/// Initial window size of locally initiated streams
pub local_init_window_sz: WindowSize,

/// Initial maximum number of locally initiated streams.
/// After receiving a Settings frame from the remote peer,
/// the connection will overwrite this value with the
Expand Down
2 changes: 1 addition & 1 deletion src/proto/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl Recv {
flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap();

Recv {
init_window_sz: config.local_init_window_sz,
init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
flow,
in_flight_data: 0 as WindowSize,
next_stream_id: Ok(next_stream_id.into()),
Expand Down
82 changes: 82 additions & 0 deletions tests/h2-tests/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1415,3 +1415,85 @@ async fn reject_informational_status_header_in_request() {

join(client, srv).await;
}

#[tokio::test]
async fn client_drop_connection_without_close_notify() {
h2_support::trace_init!();

let (io, mut client) = mock::new();
let client = async move {
let _recv_settings = client.assert_server_handshake().await;
client
.send_frame(frames::headers(1).request("GET", "https://example.com/"))
.await;
client.send_frame(frames::data(1, &b"hello"[..])).await;
client.recv_frame(frames::headers(1).response(200)).await;

client.close_without_notify(); // Client closed without notify causing UnexpectedEof
};

let mut builder = server::Builder::new();
builder.max_concurrent_streams(1);

let h2 = async move {
let mut srv = builder.handshake::<_, Bytes>(io).await.expect("handshake");
let (req, mut stream) = srv.next().await.unwrap().unwrap();

assert_eq!(req.method(), &http::Method::GET);

let rsp = http::Response::builder().status(200).body(()).unwrap();
stream.send_response(rsp, false).unwrap();

// Step the conn state forward and hitting the EOF
// But we have no outstanding request from client to be satisfied, so we should not return
// an error
let _ = poll_fn(|cx| srv.poll_closed(cx)).await.unwrap();
};

join(client, h2).await;
}

#[tokio::test]
async fn init_window_size_smaller_than_default_should_use_default_before_ack() {
h2_support::trace_init!();

let (io, mut client) = mock::new();
let client = async move {
// Client can send in some data before ACK;
// Server needs to make sure the Recv stream has default window size
// as per https://datatracker.ietf.org/doc/html/rfc9113#name-initial-flow-control-window
client.write_preface().await;
client
.send(frame::Settings::default().into())
.await
.unwrap();
client.next().await.expect("unexpected EOF").unwrap();
client
.send_frame(frames::headers(1).request("GET", "https://example.com/"))
.await;
client.send_frame(frames::data(1, &b"hello"[..])).await;
client.send(frame::Settings::ack().into()).await.unwrap();
client.next().await;
client
.recv_frame(frames::headers(1).response(200).eos())
.await;
};

let mut builder = server::Builder::new();
builder.max_concurrent_streams(1);
builder.initial_window_size(1);
let h2 = async move {
let mut srv = builder.handshake::<_, Bytes>(io).await.expect("handshake");
let (req, mut stream) = srv.next().await.unwrap().unwrap();

assert_eq!(req.method(), &http::Method::GET);

let rsp = http::Response::builder().status(200).body(()).unwrap();
stream.send_response(rsp, true).unwrap();

// Drive the state forward
let _ = poll_fn(|cx| srv.poll_closed(cx)).await.unwrap();
};

join(client, h2).await;
}

0 comments on commit 7ed7686

Please sign in to comment.