From d2f09fb757e5be928a6720da416dbe8bd37aa208 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Sat, 13 Jan 2024 18:49:46 +0900 Subject: [PATCH] fix: set MAX_CONCURRENT_STREAMS to usize::MAX if no value is advertised initially --- src/proto/settings.rs | 18 ++- src/proto/streams/counts.rs | 8 +- src/proto/streams/streams.rs | 8 +- tests/h2-tests/tests/client_request.rs | 161 +++++++++++++++++++++++++ 4 files changed, 188 insertions(+), 7 deletions(-) diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 28065cc6..72ba11fa 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -12,6 +12,9 @@ pub(crate) struct Settings { /// the socket first then the settings applied **before** receiving any /// further frames. remote: Option, + /// Whether the connection has received the initial SETTINGS frame from the + /// remote peer. + has_received_remote_initial_settings: bool, } #[derive(Debug)] @@ -32,6 +35,7 @@ impl Settings { // the handshake process. local: Local::WaitingAck(local), remote: None, + has_received_remote_initial_settings: false, } } @@ -96,6 +100,15 @@ impl Settings { } } + /// Sets `true` to `self.has_received_remote_initial_settings`. + /// Returns `true` if this method is called for the first time. + /// (i.e. it is the initial SETTINGS frame from the remote peer) + fn mark_remote_initial_settings_as_received(&mut self) -> bool { + let has_received = self.has_received_remote_initial_settings; + self.has_received_remote_initial_settings = true; + !has_received + } + pub(crate) fn poll_send( &mut self, cx: &mut Context, @@ -108,7 +121,7 @@ impl Settings { C: Buf, P: Peer, { - if let Some(settings) = &self.remote { + if let Some(settings) = self.remote.clone() { if !dst.poll_ready(cx)?.is_ready() { return Poll::Pending; } @@ -121,7 +134,8 @@ impl Settings { tracing::trace!("ACK sent; applying settings"); - streams.apply_remote_settings(settings)?; + let is_initial = self.mark_remote_initial_settings_as_received(); + streams.apply_remote_settings(&settings, is_initial)?; if let Some(val) = settings.header_table_size() { dst.set_send_header_table_size(val as usize); diff --git a/src/proto/streams/counts.rs b/src/proto/streams/counts.rs index add1312e..f8810333 100644 --- a/src/proto/streams/counts.rs +++ b/src/proto/streams/counts.rs @@ -147,9 +147,11 @@ impl Counts { self.num_remote_reset_streams -= 1; } - pub fn apply_remote_settings(&mut self, settings: &frame::Settings) { - if let Some(val) = settings.max_concurrent_streams() { - self.max_send_streams = val as usize; + pub fn apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool) { + match settings.max_concurrent_streams() { + Some(val) => self.max_send_streams = val as usize, + None if is_initial => self.max_send_streams = usize::MAX, + None => {} } } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 274bf455..b6194ca0 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -186,14 +186,18 @@ where me.poll_complete(&self.send_buffer, cx, dst) } - pub fn apply_remote_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> { + pub fn apply_remote_settings( + &mut self, + frame: &frame::Settings, + is_initial: bool, + ) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; let mut send_buffer = self.send_buffer.inner.lock().unwrap(); let send_buffer = &mut *send_buffer; - me.counts.apply_remote_settings(frame); + me.counts.apply_remote_settings(frame, is_initial); me.actions.send.apply_remote_settings( frame, diff --git a/tests/h2-tests/tests/client_request.rs b/tests/h2-tests/tests/client_request.rs index 88c7df46..13bc5f22 100644 --- a/tests/h2-tests/tests/client_request.rs +++ b/tests/h2-tests/tests/client_request.rs @@ -1661,6 +1661,167 @@ async fn client_builder_header_table_size() { join(srv, h2).await; } +#[tokio::test] +async fn configured_max_concurrent_send_streams_and_update_it_based_on_empty_settings_frame() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + // Send empty SETTINGS frame (no MAX_CONCURRENT_STREAMS is provided) + srv.send_frame(frames::settings()).await; + }; + + let h2 = async move { + let (_client, h2) = client::Builder::new() + // Configure the initial value to 2024 + .initial_max_send_streams(2024) + .handshake::<_, bytes::Bytes>(io) + .await + .unwrap(); + let mut h2 = std::pin::pin!(h2); + // It should be pre-configured value before it receives the initial + // SETTINGS frame from the server + assert_eq!(h2.max_concurrent_send_streams(), 2024); + h2.as_mut().await.unwrap(); + // If the server's initial SETTINGS frame does not include + // MAX_CONCURRENT_STREAMS, this should be updated to usize::MAX. + assert_eq!(h2.max_concurrent_send_streams(), usize::MAX); + }; + + join(srv, h2).await; +} + +#[tokio::test] +async fn configured_max_concurrent_send_streams_and_update_it_based_on_non_empty_settings_frame() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + // Send SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42 + srv.send_frame(frames::settings().max_concurrent_streams(42)) + .await; + }; + + let h2 = async move { + let (_client, h2) = client::Builder::new() + // Configure the initial value to 2024 + .initial_max_send_streams(2024) + .handshake::<_, bytes::Bytes>(io) + .await + .unwrap(); + let mut h2 = std::pin::pin!(h2); + // It should be pre-configured value before it receives the initial + // SETTINGS frame from the server + assert_eq!(h2.max_concurrent_send_streams(), 2024); + h2.as_mut().await.unwrap(); + // Now the client has received the initial SETTINGS frame from the + // server, which should update the value accordingly + assert_eq!(h2.max_concurrent_send_streams(), 42); + }; + + join(srv, h2).await; +} + +#[tokio::test] +async fn receive_settings_frame_twice_with_second_one_empty() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + // Send the initial SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42 + srv.send_frame(frames::settings().max_concurrent_streams(42)) + .await; + + // Handle the client's connection preface + srv.read_preface().await.unwrap(); + match srv.next().await { + Some(frame) => match frame.unwrap() { + h2::frame::Frame::Settings(_) => { + let ack = frame::Settings::ack(); + srv.send(ack.into()).await.unwrap(); + } + frame => { + panic!("unexpected frame: {:?}", frame); + } + }, + None => { + panic!("unexpected EOF"); + } + } + + // Should receive the ack for the server's initial SETTINGS frame + let frame = assert_settings!(srv.next().await.unwrap().unwrap()); + assert!(frame.is_ack()); + + // Send another SETTINGS frame with no MAX_CONCURRENT_STREAMS + // This should not update the max_concurrent_send_streams value that + // the client manages. + srv.send_frame(frames::settings()).await; + }; + + let h2 = async move { + let (_client, h2) = client::handshake(io).await.unwrap(); + let mut h2 = std::pin::pin!(h2); + assert_eq!(h2.max_concurrent_send_streams(), usize::MAX); + h2.as_mut().await.unwrap(); + // Even though the second SETTINGS frame contained no value for + // MAX_CONCURRENT_STREAMS, update to usize::MAX should not happen + assert_eq!(h2.max_concurrent_send_streams(), 42); + }; + + join(srv, h2).await; +} + +#[tokio::test] +async fn receive_settings_frame_twice_with_second_one_non_empty() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + // Send the initial SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42 + srv.send_frame(frames::settings().max_concurrent_streams(42)) + .await; + + // Handle the client's connection preface + srv.read_preface().await.unwrap(); + match srv.next().await { + Some(frame) => match frame.unwrap() { + h2::frame::Frame::Settings(_) => { + let ack = frame::Settings::ack(); + srv.send(ack.into()).await.unwrap(); + } + frame => { + panic!("unexpected frame: {:?}", frame); + } + }, + None => { + panic!("unexpected EOF"); + } + } + + // Should receive the ack for the server's initial SETTINGS frame + let frame = assert_settings!(srv.next().await.unwrap().unwrap()); + assert!(frame.is_ack()); + + // Send another SETTINGS frame with no MAX_CONCURRENT_STREAMS + // This should not update the max_concurrent_send_streams value that + // the client manages. + srv.send_frame(frames::settings().max_concurrent_streams(2024)) + .await; + }; + + let h2 = async move { + let (_client, h2) = client::handshake(io).await.unwrap(); + let mut h2 = std::pin::pin!(h2); + assert_eq!(h2.max_concurrent_send_streams(), usize::MAX); + h2.as_mut().await.unwrap(); + // The most-recently advertised value should be used + assert_eq!(h2.max_concurrent_send_streams(), 2024); + }; + + join(srv, h2).await; +} + const SETTINGS: &[u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; const SETTINGS_ACK: &[u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];