From 0fa22fe8d54c682b4a355941b98e42ec9004df3f Mon Sep 17 00:00:00 2001 From: taikulawo Date: Tue, 24 Dec 2024 18:21:18 +0800 Subject: [PATCH 1/7] update --- Cargo.toml | 2 +- src/proto/connection.rs | 37 ++++++++++++++++++++++++++++++++----- 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 34ca3d68..e7584910 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,7 @@ atomic-waker = "1.0.0" futures-core = { version = "0.3", default-features = false } futures-sink = { version = "0.3", default-features = false } tokio-util = { version = "0.7.1", features = ["codec", "io"] } -tokio = { version = "1", features = ["io-util"] } +tokio = { version = "1", features = ["io-util", "time"] } bytes = "1" http = "1" tracing = { version = "0.1.35", default-features = false, features = ["std"] } diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 5589fabc..9d862419 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -7,12 +7,14 @@ use crate::proto::*; use bytes::Bytes; use futures_core::Stream; +use std::future::Future; use std::io; use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; use tokio::io::AsyncRead; +use tokio::time::Sleep; /// An H2 connection #[derive(Debug)] @@ -57,6 +59,9 @@ where /// A `tracing` span tracking the lifetime of the connection. span: tracing::Span, + sleep: Option, + keepalive_timeout: Option, + /// Client or server _phantom: PhantomData

, } @@ -82,6 +87,7 @@ pub(crate) struct Config { pub reset_stream_max: usize, pub remote_reset_stream_max: usize, pub local_error_reset_streams_max: Option, + pub keepalive_timeout: Option, pub settings: frame::Settings, } @@ -135,6 +141,8 @@ where ping_pong: PingPong::new(), settings: Settings::new(config.settings), streams, + sleep: None, + keepalive_timeout: config.keepalive_timeout, span: tracing::debug_span!("Connection", peer = %P::NAME), _phantom: PhantomData, }, @@ -263,22 +271,23 @@ where let _e = span.enter(); let span = tracing::trace_span!("poll"); let _e = span.enter(); - - loop { + 'outer: loop { tracing::trace!(connection.state = ?self.inner.state); // TODO: probably clean up this glob of code match self.inner.state { // When open, continue to poll a frame State::Open => { let result = match self.poll2(cx) { - Poll::Ready(result) => result, + Poll::Ready(result) => { + self.inner.sleep = None; + result + } // The connection is not ready to make progress Poll::Pending => { // Ensure all window updates have been sent. // // This will also handle flushing `self.codec` ready!(self.inner.streams.poll_complete(cx, &mut self.codec))?; - if (self.inner.error.is_some() || self.inner.go_away.should_close_on_idle()) && !self.inner.streams.has_streams() @@ -286,7 +295,25 @@ where self.inner.as_dyn().go_away_now(Reason::NO_ERROR); continue; } - + if !self.inner.streams.has_streams() { + loop { + match (self.inner.sleep.as_mut(), self.inner.keepalive_timeout) + { + (Some(sleep), _) => { + tokio::pin!(sleep); + let x = sl.as_mut(); + ready!(sl.poll(cx)); + self.inner.as_dyn().go_away_now(Reason::NO_ERROR); + continue 'outer; + } + (None, Some(timeout)) => { + let sleep = tokio::time::sleep(timeout); + self.inner.sleep.insert(sleep); + } + _ => break, + } + } + } return Poll::Pending; } }; From 963dbd3e103fa009bbbc97cedd7513963598eca3 Mon Sep 17 00:00:00 2001 From: taikulawo Date: Tue, 24 Dec 2024 19:24:42 +0800 Subject: [PATCH 2/7] sleep --- src/client.rs | 4 ++++ src/proto/connection.rs | 9 ++++----- src/server.rs | 6 +++++- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/client.rs b/src/client.rs index ffeda607..0748209e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -343,6 +343,8 @@ pub struct Builder { /// /// When this gets exceeded, we issue GOAWAYs. local_max_error_reset_streams: Option, + + keepalive_timeout: Option, } #[derive(Debug)] @@ -661,6 +663,7 @@ impl Builder { initial_target_connection_window_size: None, initial_max_send_streams: usize::MAX, settings: Default::default(), + keepalive_timeout: None, stream_id: 1.into(), local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX), } @@ -1332,6 +1335,7 @@ where max_send_buffer_size: builder.max_send_buffer_size, reset_stream_duration: builder.reset_stream_duration, reset_stream_max: builder.reset_stream_max, + keepalive_timeout: builder.keepalive_timeout, remote_reset_stream_max: builder.pending_accept_reset_stream_max, local_error_reset_streams_max: builder.local_max_error_reset_streams, settings: builder.settings.clone(), diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 9d862419..a8424f71 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -59,7 +59,7 @@ where /// A `tracing` span tracking the lifetime of the connection. span: tracing::Span, - sleep: Option, + sleep: Option>, keepalive_timeout: Option, /// Client or server @@ -300,15 +300,14 @@ where match (self.inner.sleep.as_mut(), self.inner.keepalive_timeout) { (Some(sleep), _) => { - tokio::pin!(sleep); - let x = sl.as_mut(); - ready!(sl.poll(cx)); + let sleep = unsafe { Pin::new_unchecked(&mut **sleep) }; + ready!(sleep.poll(cx)); self.inner.as_dyn().go_away_now(Reason::NO_ERROR); continue 'outer; } (None, Some(timeout)) => { let sleep = tokio::time::sleep(timeout); - self.inner.sleep.insert(sleep); + self.inner.sleep.replace(Box::new(sleep)); } _ => break, } diff --git a/src/server.rs b/src/server.rs index b00bc086..f82a663e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -258,6 +258,9 @@ pub struct Builder { /// /// When this gets exceeded, we issue GOAWAYs. local_max_error_reset_streams: Option, + + /// Keepalive timeout + keepalive_timeout: Option, } /// Send a response back to the client @@ -650,7 +653,7 @@ impl Builder { settings: Settings::default(), initial_target_connection_window_size: None, max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, - + keepalive_timeout: None, local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX), } } @@ -1379,6 +1382,7 @@ where initial_max_send_streams: 0, max_send_buffer_size: self.builder.max_send_buffer_size, reset_stream_duration: self.builder.reset_stream_duration, + keepalive_timeout: self.builder.keepalive_timeout, reset_stream_max: self.builder.reset_stream_max, remote_reset_stream_max: self.builder.pending_accept_reset_stream_max, local_error_reset_streams_max: self From d01927637c1b19bb4ca22e0d2036e39e0329122b Mon Sep 17 00:00:00 2001 From: taikulawo Date: Tue, 24 Dec 2024 19:33:15 +0800 Subject: [PATCH 3/7] update --- src/client.rs | 5 +++++ src/server.rs | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/src/client.rs b/src/client.rs index 0748209e..bba24170 100644 --- a/src/client.rs +++ b/src/client.rs @@ -999,6 +999,11 @@ impl Builder { self } + /// Sets the duration connection should be closed when there no stream. + pub fn keepalive_timeout(&mut self, dur: Duration) -> &mut Self { + self.keepalive_timeout = Some(dur); + self + } /// Sets the maximum number of local resets due to protocol errors made by the remote end. /// /// Invalid frames and many other protocol errors will lead to resets being generated for those streams. diff --git a/src/server.rs b/src/server.rs index f82a663e..69734f6a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1018,6 +1018,11 @@ impl Builder { self } + /// Sets the duration connection should be closed when there no stream. + pub fn keepalive_timeout(&mut self, dur: Duration) -> &mut Self { + self.keepalive_timeout = Some(dur); + self + } /// Enables the [extended CONNECT protocol]. /// /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 From ee3ab8443dc28131f3a95b60db66614c37db6b3b Mon Sep 17 00:00:00 2001 From: taikulawo Date: Tue, 24 Dec 2024 19:38:26 +0800 Subject: [PATCH 4/7] update --- src/proto/connection.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/proto/connection.rs b/src/proto/connection.rs index a8424f71..082b16f2 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -59,7 +59,7 @@ where /// A `tracing` span tracking the lifetime of the connection. span: tracing::Span, - sleep: Option>, + keepalive: Option>>, keepalive_timeout: Option, /// Client or server @@ -141,7 +141,7 @@ where ping_pong: PingPong::new(), settings: Settings::new(config.settings), streams, - sleep: None, + keepalive: None, keepalive_timeout: config.keepalive_timeout, span: tracing::debug_span!("Connection", peer = %P::NAME), _phantom: PhantomData, @@ -279,7 +279,7 @@ where State::Open => { let result = match self.poll2(cx) { Poll::Ready(result) => { - self.inner.sleep = None; + self.inner.keepalive = None; result } // The connection is not ready to make progress @@ -297,17 +297,19 @@ where } if !self.inner.streams.has_streams() { loop { - match (self.inner.sleep.as_mut(), self.inner.keepalive_timeout) - { + match ( + self.inner.keepalive.as_mut(), + self.inner.keepalive_timeout, + ) { (Some(sleep), _) => { - let sleep = unsafe { Pin::new_unchecked(&mut **sleep) }; - ready!(sleep.poll(cx)); + ready!(sleep.as_mut().poll(cx)); self.inner.as_dyn().go_away_now(Reason::NO_ERROR); continue 'outer; } (None, Some(timeout)) => { - let sleep = tokio::time::sleep(timeout); - self.inner.sleep.replace(Box::new(sleep)); + self.inner + .keepalive + .replace(Box::pin(tokio::time::sleep(timeout))); } _ => break, } From cb6213c8adcf1a913c4376e41d53887b2579caaa Mon Sep 17 00:00:00 2001 From: taikulawo Date: Tue, 24 Dec 2024 21:04:38 +0800 Subject: [PATCH 5/7] update --- src/frame/reason.rs | 3 +++ src/proto/connection.rs | 2 +- tests/h2-tests/tests/server.rs | 35 ++++++++++++++++++++++++++++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/src/frame/reason.rs b/src/frame/reason.rs index ff5e2012..24a73d4f 100644 --- a/src/frame/reason.rs +++ b/src/frame/reason.rs @@ -58,6 +58,8 @@ impl Reason { pub const INADEQUATE_SECURITY: Reason = Reason(12); /// The endpoint requires that HTTP/1.1 be used instead of HTTP/2. pub const HTTP_1_1_REQUIRED: Reason = Reason(13); + /// The endpoint requires that HTTP/1.1 be used instead of HTTP/2. + pub const KEEPALIVE_TIMEOUT: Reason = Reason(14); /// Get a string description of the error code. pub fn description(&self) -> &str { @@ -79,6 +81,7 @@ impl Reason { 11 => "detected excessive load generating behavior", 12 => "security properties do not meet minimum requirements", 13 => "endpoint requires HTTP/1.1", + 14 => "keepalive timeout reached", _ => "unknown reason", } } diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 082b16f2..43774c6f 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -303,7 +303,7 @@ where ) { (Some(sleep), _) => { ready!(sleep.as_mut().poll(cx)); - self.inner.as_dyn().go_away_now(Reason::NO_ERROR); + self.inner.as_dyn().go_away_now(Reason::KEEPALIVE_TIMEOUT); continue 'outer; } (None, Some(timeout)) => { diff --git a/tests/h2-tests/tests/server.rs b/tests/h2-tests/tests/server.rs index c1af5419..fa5c74b1 100644 --- a/tests/h2-tests/tests/server.rs +++ b/tests/h2-tests/tests/server.rs @@ -69,6 +69,41 @@ async fn server_builder_set_max_concurrent_streams() { join(client, h2).await; } +#[tokio::test] +async fn server_builder_set_keepalive_timeout() { + h2_support::trace_init!(); + let (io, mut client) = mock::new(); + let h1 = async { + let settings = client.assert_server_handshake().await; + assert_default_settings!(settings); + client + .send_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + client + .recv_frame(frames::headers(1).response(200).eos()) + .await; + }; + + let mut builder = server::Builder::new(); + builder.keepalive_timeout(Duration::from_secs(2)); + let h2 = async move { + let mut srv = builder.handshake::<_, Bytes>(io).await.expect("handshake"); + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + assert_eq!(req.method(), &http::Method::GET); + + let rsp = http::Response::builder().status(200).body(()).unwrap(); + let res = stream.send_response(rsp, true).unwrap(); + drop(res); + let r1 = srv.accept().await; + println!("rrr {r1:?}"); + assert!(r1.is_some_and(|f| f.is_err_and(|f| f.is_go_away()))); + }; + join(h1, h2).await; +} #[tokio::test] async fn serve_request() { h2_support::trace_init!(); From af3d02681fdc17dad2087b9e3891a7f650e36dbc Mon Sep 17 00:00:00 2001 From: taikulawo Date: Tue, 24 Dec 2024 21:06:51 +0800 Subject: [PATCH 6/7] update --- src/frame/reason.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frame/reason.rs b/src/frame/reason.rs index 24a73d4f..3b366061 100644 --- a/src/frame/reason.rs +++ b/src/frame/reason.rs @@ -58,7 +58,7 @@ impl Reason { pub const INADEQUATE_SECURITY: Reason = Reason(12); /// The endpoint requires that HTTP/1.1 be used instead of HTTP/2. pub const HTTP_1_1_REQUIRED: Reason = Reason(13); - /// The endpoint requires that HTTP/1.1 be used instead of HTTP/2. + /// The endpoint reach keepalive timeout pub const KEEPALIVE_TIMEOUT: Reason = Reason(14); /// Get a string description of the error code. From 4cf6bc5d14d391418f8e2fc8a30836ee1ac774d3 Mon Sep 17 00:00:00 2001 From: taikulawo Date: Wed, 25 Dec 2024 11:13:32 +0800 Subject: [PATCH 7/7] add active stream prototype --- src/client.rs | 1 - src/proto/connection.rs | 8 +++++++- src/proto/streams/streams.rs | 1 - src/server.rs | 9 +++++++++ 4 files changed, 16 insertions(+), 3 deletions(-) diff --git a/src/client.rs b/src/client.rs index bba24170..f6ea04dd 100644 --- a/src/client.rs +++ b/src/client.rs @@ -582,7 +582,6 @@ where } } -#[cfg(feature = "unstable")] impl SendRequest where B: Buf, diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 43774c6f..46dbc316 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -181,6 +181,10 @@ where pub(crate) fn max_recv_streams(&self) -> usize { self.inner.streams.max_recv_streams() } + /// Returns the number of active stream + pub(crate) fn active_streams(&self) -> usize { + self.inner.streams.num_active_streams() + } #[cfg(feature = "unstable")] pub fn num_wired_streams(&self) -> usize { @@ -303,7 +307,9 @@ where ) { (Some(sleep), _) => { ready!(sleep.as_mut().poll(cx)); - self.inner.as_dyn().go_away_now(Reason::KEEPALIVE_TIMEOUT); + self.inner + .as_dyn() + .go_away_now(Reason::KEEPALIVE_TIMEOUT); continue 'outer; } (None, Some(timeout)) => { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 875b6103..b2f7918f 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -1008,7 +1008,6 @@ where self.inner.lock().unwrap().counts.max_recv_streams() } - #[cfg(feature = "unstable")] pub fn num_active_streams(&self) -> usize { let me = self.inner.lock().unwrap(); me.store.num_active_streams() diff --git a/src/server.rs b/src/server.rs index 69734f6a..1e74d3f8 100644 --- a/src/server.rs +++ b/src/server.rs @@ -584,6 +584,15 @@ where self.connection.max_recv_streams() } + /// Returns whether has stream alive + pub fn has_streams_or_other_references(&self) -> bool { + self.connection.has_streams_or_other_references() + } + /// Returns the number of current active stream. + pub fn active_stream(&self) -> usize { + self.connection.active_streams() + } + // Could disappear at anytime. #[doc(hidden)] #[cfg(feature = "unstable")]