From df9a2621138abb5c7733466c66e0da71314506b8 Mon Sep 17 00:00:00 2001 From: dswij Date: Fri, 2 Feb 2024 21:38:20 +0800 Subject: [PATCH] fix: stream flow control insufficient size before ack (#746) --- src/proto/connection.rs | 4 -- src/proto/streams/mod.rs | 3 -- src/proto/streams/recv.rs | 2 +- tests/h2-tests/tests/server.rs | 83 ++++++++++++++++++++++++++++++++++ 4 files changed, 84 insertions(+), 8 deletions(-) diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 5d6b9d2b..7a14111b 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -106,10 +106,6 @@ where pub fn new(codec: Codec>, config: Config) -> Connection { fn streams_config(config: &Config) -> streams::Config { streams::Config { - local_init_window_sz: config - .settings - .initial_window_size() - .unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE), initial_max_send_streams: config.initial_max_send_streams, local_max_buffer_size: config.max_send_buffer_size, local_next_stream_id: config.next_stream_id, diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index b347442a..c4a83234 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -33,9 +33,6 @@ use std::time::Duration; #[derive(Debug)] pub struct Config { - /// Initial window size of locally initiated streams - pub local_init_window_sz: WindowSize, - /// Initial maximum number of locally initiated streams. /// After receiving a Settings frame from the remote peer, /// the connection will overwrite this value with the diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 0063942a..76d19722 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -93,7 +93,7 @@ impl Recv { flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap(); Recv { - init_window_sz: config.local_init_window_sz, + init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, flow, in_flight_data: 0 as WindowSize, next_stream_id: Ok(next_stream_id.into()), diff --git a/tests/h2-tests/tests/server.rs b/tests/h2-tests/tests/server.rs index 6075c7dc..ca116221 100644 --- a/tests/h2-tests/tests/server.rs +++ b/tests/h2-tests/tests/server.rs @@ -1415,3 +1415,86 @@ async fn reject_informational_status_header_in_request() { join(client, srv).await; } + +#[tokio::test] +async fn client_drop_connection_without_close_notify() { + h2_support::trace_init!(); + + let (io, mut client) = mock::new(); + let client = async move { + let _recv_settings = client.assert_server_handshake().await; + client + .send_frame(frames::headers(1).request("GET", "https://example.com/")) + .await; + client.send_frame(frames::data(1, &b"hello"[..])).await; + client.recv_frame(frames::headers(1).response(200)).await; + + client.close_without_notify(); // Client closed without notify causing UnexpectedEof + }; + + let mut builder = server::Builder::new(); + builder.max_concurrent_streams(1); + + let h2 = async move { + let mut srv = builder.handshake::<_, Bytes>(io).await.expect("handshake"); + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + + assert_eq!(req.method(), &http::Method::GET); + + let rsp = http::Response::builder().status(200).body(()).unwrap(); + stream.send_response(rsp, false).unwrap(); + + // Step the conn state forward and hitting the EOF + // But we have no outstanding request from client to be satisfied, so we should not return + // an error + let _ = poll_fn(|cx| srv.poll_closed(cx)).await.unwrap(); + }; + + join(client, h2).await; +} + +#[tokio::test] +async fn init_window_size_smaller_than_default_should_use_default_before_ack() { + h2_support::trace_init!(); + + let (io, mut client) = mock::new(); + let client = async move { + // Client can send in some data before ACK; + // Server needs to make sure the Recv stream has default window size + // as per https://datatracker.ietf.org/doc/html/rfc9113#name-initial-flow-control-window + client.write_preface().await; + client + .send(frame::Settings::default().into()) + .await + .unwrap(); + client.next().await.expect("unexpected EOF").unwrap(); + client + .send_frame(frames::headers(1).request("GET", "https://example.com/")) + .await; + client.send_frame(frames::data(1, &b"hello"[..])).await; + client.send(frame::Settings::ack().into()).await.unwrap(); + client.next().await; + client + .recv_frame(frames::headers(1).response(200).eos()) + .await; + }; + + let mut builder = server::Builder::new(); + builder.max_concurrent_streams(1); + builder.initial_window_size(1); + let h2 = async move { + let mut srv = builder.handshake::<_, Bytes>(io).await.expect("handshake"); + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + + assert_eq!(req.method(), &http::Method::GET); + + let rsp = http::Response::builder().status(200).body(()).unwrap(); + stream.send_response(rsp, true).unwrap(); + + // Drive the state forward + let _ = poll_fn(|cx| srv.poll_closed(cx)).await.unwrap(); + }; + + join(client, h2).await; +} +