From 7b23269c93a03cb200801fadf58971710d60aaa8 Mon Sep 17 00:00:00 2001 From: John Howard Date: Wed, 24 Apr 2024 16:52:48 -0700 Subject: [PATCH 1/2] http: fix http2 connections that are never closed --- src/proto/h2/client.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 7cb6c6ed5b..f8df97aeaa 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -340,6 +340,7 @@ where // and then close. trace!("send_request dropped, starting conn shutdown"); drop(this.cancel_tx.take().expect("ConnTask Future polled twice")); + return Poll::Ready(()); } Poll::Pending From f43483f4803fb4df57e51423e2de092582bc40b7 Mon Sep 17 00:00:00 2001 From: John Howard Date: Thu, 25 Apr 2024 10:03:11 -0700 Subject: [PATCH 2/2] Add regression test --- tests/client.rs | 68 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/tests/client.rs b/tests/client.rs index 43e1f08acb..e978e614c1 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -2009,6 +2009,74 @@ mod conn { .expect_err("client should be closed"); } + #[tokio::test] + async fn http2_connect_detect_close() { + // Regression test for failure to fully close connections when using HTTP2 CONNECT + // We send a request, read/write some data, then drop the client connection. + use futures_util::future; + let (listener, addr) = setup_tk_test_server().await; + let (tx, rx) = oneshot::channel::<()>(); + const BODY: &[u8] = b"hello world"; + tokio::task::spawn(async move { + use hyper::server::conn::http2; + use hyper::service::service_fn; + + let res = listener.accept().await; + let (stream, _) = res.unwrap(); + let stream = TokioIo::new(stream); + + let service = service_fn(|req: Request| { + tokio::task::spawn(async move { + let io = &mut TokioIo::new(hyper::upgrade::on(req).await.unwrap()); + let mut buf: [u8; BODY.len()] = [0; BODY.len()]; + io.read_exact(&mut buf).await.unwrap(); + io.write_all(BODY).await.unwrap(); + }); + + future::ok::<_, hyper::Error>(Response::new(Empty::::new())) + }); + + tokio::task::spawn(async move { + let conn = http2::Builder::new(TokioExecutor).serve_connection(stream, service); + conn.await.unwrap(); + drop(tx); + }); + }); + + let io = tcp_connect(&addr).await.expect("tcp connect"); + let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) + .handshake(io) + .await + .expect("http handshake"); + + tokio::task::spawn(async move { + conn.await.expect("client conn"); + }); + + // Sanity check that client is ready + future::poll_fn(|ctx| client.poll_ready(ctx)) + .await + .expect("client poll ready sanity"); + + let req = Request::builder() + .method(Method::CONNECT) + .uri(format!("{}", addr)) + .body(Empty::::new()) + .expect("request builder"); + + let resp = client.send_request(req).await.expect("req1 send"); + assert_eq!(resp.status(), 200); + let io = &mut TokioIo::new(hyper::upgrade::on(resp).await.unwrap()); + + let mut buf: [u8; BODY.len()] = [0; BODY.len()]; + io.write_all(BODY).await.unwrap(); + io.read_exact(&mut buf).await.unwrap(); + drop(client); + let _ = tokio::time::timeout(Duration::from_secs(1), rx) + .await + .unwrap(); + } + #[tokio::test] async fn http2_keep_alive_detects_unresponsive_server() { let (listener, addr) = setup_tk_test_server().await;