From 2d5de80fb16569d449f6f7a5c326ca92aa9d80a2 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Sat, 13 Jan 2024 18:22:22 +0900 Subject: [PATCH 1/2] doc: clarify that the default value of initial_max_send_streams is 100 --- src/client.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/client.rs b/src/client.rs index 35cfc141..c3afd7a9 100644 --- a/src/client.rs +++ b/src/client.rs @@ -312,9 +312,11 @@ pub struct Builder { reset_stream_duration: Duration, /// Initial maximum number of locally initiated (send) streams. - /// After receiving a Settings frame from the remote peer, + /// After receiving a SETTINGS frame from the remote peer, /// the connection will overwrite this value with the /// MAX_CONCURRENT_STREAMS specified in the frame. + /// If no value is advertised by the remote peer in the initial SETTINGS + /// frame, it will be set to usize::MAX. initial_max_send_streams: usize, /// Initial target window size for new connections. @@ -844,8 +846,10 @@ impl Builder { /// Sets the initial maximum of locally initiated (send) streams. /// /// The initial settings will be overwritten by the remote peer when - /// the Settings frame is received. The new value will be set to the - /// `max_concurrent_streams()` from the frame. + /// the SETTINGS frame is received. The new value will be set to the + /// `max_concurrent_streams()` from the frame. If no value is advertised in + /// the initial SETTINGS frame from the remote peer as part of + /// [HTTP/2 Connection Preface], `usize::MAX` will be set. /// /// This setting prevents the caller from exceeding this number of /// streams that are counted towards the concurrency limit. @@ -855,7 +859,10 @@ impl Builder { /// /// See [Section 5.1.2] in the HTTP/2 spec for more details. /// - /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2 + /// The default value is `usize::MAX`. + /// + /// [HTTP/2 Connection Preface]: https://httpwg.org/specs/rfc9113.html#preface + /// [Section 5.1.2]: https://httpwg.org/specs/rfc9113.html#rfc.section.5.1.2 /// /// # Examples /// From c75575256e7cfabbbebe12023281b491dd60b351 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Sat, 13 Jan 2024 18:49:46 +0900 Subject: [PATCH 2/2] fix: set MAX_CONCURRENT_STREAMS to usize::MAX if no value is advertised initially --- src/proto/settings.rs | 18 ++- src/proto/streams/counts.rs | 8 +- src/proto/streams/streams.rs | 8 +- tests/h2-tests/tests/client_request.rs | 161 +++++++++++++++++++++++++ 4 files changed, 188 insertions(+), 7 deletions(-) diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 28065cc6..72ba11fa 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -12,6 +12,9 @@ pub(crate) struct Settings { /// the socket first then the settings applied **before** receiving any /// further frames. remote: Option, + /// Whether the connection has received the initial SETTINGS frame from the + /// remote peer. + has_received_remote_initial_settings: bool, } #[derive(Debug)] @@ -32,6 +35,7 @@ impl Settings { // the handshake process. local: Local::WaitingAck(local), remote: None, + has_received_remote_initial_settings: false, } } @@ -96,6 +100,15 @@ impl Settings { } } + /// Sets `true` to `self.has_received_remote_initial_settings`. + /// Returns `true` if this method is called for the first time. + /// (i.e. it is the initial SETTINGS frame from the remote peer) + fn mark_remote_initial_settings_as_received(&mut self) -> bool { + let has_received = self.has_received_remote_initial_settings; + self.has_received_remote_initial_settings = true; + !has_received + } + pub(crate) fn poll_send( &mut self, cx: &mut Context, @@ -108,7 +121,7 @@ impl Settings { C: Buf, P: Peer, { - if let Some(settings) = &self.remote { + if let Some(settings) = self.remote.clone() { if !dst.poll_ready(cx)?.is_ready() { return Poll::Pending; } @@ -121,7 +134,8 @@ impl Settings { tracing::trace!("ACK sent; applying settings"); - streams.apply_remote_settings(settings)?; + let is_initial = self.mark_remote_initial_settings_as_received(); + streams.apply_remote_settings(&settings, is_initial)?; if let Some(val) = settings.header_table_size() { dst.set_send_header_table_size(val as usize); diff --git a/src/proto/streams/counts.rs b/src/proto/streams/counts.rs index add1312e..f8810333 100644 --- a/src/proto/streams/counts.rs +++ b/src/proto/streams/counts.rs @@ -147,9 +147,11 @@ impl Counts { self.num_remote_reset_streams -= 1; } - pub fn apply_remote_settings(&mut self, settings: &frame::Settings) { - if let Some(val) = settings.max_concurrent_streams() { - self.max_send_streams = val as usize; + pub fn apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool) { + match settings.max_concurrent_streams() { + Some(val) => self.max_send_streams = val as usize, + None if is_initial => self.max_send_streams = usize::MAX, + None => {} } } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 274bf455..b6194ca0 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -186,14 +186,18 @@ where me.poll_complete(&self.send_buffer, cx, dst) } - pub fn apply_remote_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> { + pub fn apply_remote_settings( + &mut self, + frame: &frame::Settings, + is_initial: bool, + ) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; let mut send_buffer = self.send_buffer.inner.lock().unwrap(); let send_buffer = &mut *send_buffer; - me.counts.apply_remote_settings(frame); + me.counts.apply_remote_settings(frame, is_initial); me.actions.send.apply_remote_settings( frame, diff --git a/tests/h2-tests/tests/client_request.rs b/tests/h2-tests/tests/client_request.rs index 88c7df46..13bc5f22 100644 --- a/tests/h2-tests/tests/client_request.rs +++ b/tests/h2-tests/tests/client_request.rs @@ -1661,6 +1661,167 @@ async fn client_builder_header_table_size() { join(srv, h2).await; } +#[tokio::test] +async fn configured_max_concurrent_send_streams_and_update_it_based_on_empty_settings_frame() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + // Send empty SETTINGS frame (no MAX_CONCURRENT_STREAMS is provided) + srv.send_frame(frames::settings()).await; + }; + + let h2 = async move { + let (_client, h2) = client::Builder::new() + // Configure the initial value to 2024 + .initial_max_send_streams(2024) + .handshake::<_, bytes::Bytes>(io) + .await + .unwrap(); + let mut h2 = std::pin::pin!(h2); + // It should be pre-configured value before it receives the initial + // SETTINGS frame from the server + assert_eq!(h2.max_concurrent_send_streams(), 2024); + h2.as_mut().await.unwrap(); + // If the server's initial SETTINGS frame does not include + // MAX_CONCURRENT_STREAMS, this should be updated to usize::MAX. + assert_eq!(h2.max_concurrent_send_streams(), usize::MAX); + }; + + join(srv, h2).await; +} + +#[tokio::test] +async fn configured_max_concurrent_send_streams_and_update_it_based_on_non_empty_settings_frame() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + // Send SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42 + srv.send_frame(frames::settings().max_concurrent_streams(42)) + .await; + }; + + let h2 = async move { + let (_client, h2) = client::Builder::new() + // Configure the initial value to 2024 + .initial_max_send_streams(2024) + .handshake::<_, bytes::Bytes>(io) + .await + .unwrap(); + let mut h2 = std::pin::pin!(h2); + // It should be pre-configured value before it receives the initial + // SETTINGS frame from the server + assert_eq!(h2.max_concurrent_send_streams(), 2024); + h2.as_mut().await.unwrap(); + // Now the client has received the initial SETTINGS frame from the + // server, which should update the value accordingly + assert_eq!(h2.max_concurrent_send_streams(), 42); + }; + + join(srv, h2).await; +} + +#[tokio::test] +async fn receive_settings_frame_twice_with_second_one_empty() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + // Send the initial SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42 + srv.send_frame(frames::settings().max_concurrent_streams(42)) + .await; + + // Handle the client's connection preface + srv.read_preface().await.unwrap(); + match srv.next().await { + Some(frame) => match frame.unwrap() { + h2::frame::Frame::Settings(_) => { + let ack = frame::Settings::ack(); + srv.send(ack.into()).await.unwrap(); + } + frame => { + panic!("unexpected frame: {:?}", frame); + } + }, + None => { + panic!("unexpected EOF"); + } + } + + // Should receive the ack for the server's initial SETTINGS frame + let frame = assert_settings!(srv.next().await.unwrap().unwrap()); + assert!(frame.is_ack()); + + // Send another SETTINGS frame with no MAX_CONCURRENT_STREAMS + // This should not update the max_concurrent_send_streams value that + // the client manages. + srv.send_frame(frames::settings()).await; + }; + + let h2 = async move { + let (_client, h2) = client::handshake(io).await.unwrap(); + let mut h2 = std::pin::pin!(h2); + assert_eq!(h2.max_concurrent_send_streams(), usize::MAX); + h2.as_mut().await.unwrap(); + // Even though the second SETTINGS frame contained no value for + // MAX_CONCURRENT_STREAMS, update to usize::MAX should not happen + assert_eq!(h2.max_concurrent_send_streams(), 42); + }; + + join(srv, h2).await; +} + +#[tokio::test] +async fn receive_settings_frame_twice_with_second_one_non_empty() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + // Send the initial SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42 + srv.send_frame(frames::settings().max_concurrent_streams(42)) + .await; + + // Handle the client's connection preface + srv.read_preface().await.unwrap(); + match srv.next().await { + Some(frame) => match frame.unwrap() { + h2::frame::Frame::Settings(_) => { + let ack = frame::Settings::ack(); + srv.send(ack.into()).await.unwrap(); + } + frame => { + panic!("unexpected frame: {:?}", frame); + } + }, + None => { + panic!("unexpected EOF"); + } + } + + // Should receive the ack for the server's initial SETTINGS frame + let frame = assert_settings!(srv.next().await.unwrap().unwrap()); + assert!(frame.is_ack()); + + // Send another SETTINGS frame with no MAX_CONCURRENT_STREAMS + // This should not update the max_concurrent_send_streams value that + // the client manages. + srv.send_frame(frames::settings().max_concurrent_streams(2024)) + .await; + }; + + let h2 = async move { + let (_client, h2) = client::handshake(io).await.unwrap(); + let mut h2 = std::pin::pin!(h2); + assert_eq!(h2.max_concurrent_send_streams(), usize::MAX); + h2.as_mut().await.unwrap(); + // The most-recently advertised value should be used + assert_eq!(h2.max_concurrent_send_streams(), 2024); + }; + + join(srv, h2).await; +} + const SETTINGS: &[u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; const SETTINGS_ACK: &[u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];