diff --git a/tests/client.rs b/tests/client.rs index 6c9d3d7587..89b90796e0 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -2122,6 +2122,82 @@ mod conn { .expect_err("client should be closed"); } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn http2_connect_detect_close() { + // Regression test for failure to fully close connections when using HTTP2 CONNECT + // We send 2 requests and then drop them. We should see the connection gracefully close. + use futures_util::future; + let (listener, addr) = setup_tk_test_server().await; + let (tx, rxx) = oneshot::channel::<()>(); + + tokio::task::spawn(async move { + use hyper::server::conn::http2; + use hyper::service::service_fn; + + let res = listener.accept().await; + let (stream, _) = res.unwrap(); + let stream = TokioIo::new(stream); + + let service = service_fn(move |req: Request| { + tokio::task::spawn(async move { + let io = &mut TokioIo::new(hyper::upgrade::on(req).await.unwrap()); + io.write_all(b"hello\n").await.unwrap(); + }); + + future::ok::<_, hyper::Error>(Response::new(Empty::::new())) + }); + + tokio::task::spawn(async move { + let conn = http2::Builder::new(TokioExecutor).serve_connection(stream, service); + let _ = conn.await; + tx.send(()).unwrap(); + }); + }); + + let io = tcp_connect(&addr).await.expect("tcp connect"); + let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) + .handshake(io) + .await + .expect("http handshake"); + + tokio::task::spawn(async move { + conn.await.expect("client conn"); + }); + + // Sanity check that client is ready + future::poll_fn(|ctx| client.poll_ready(ctx)) + .await + .expect("client poll ready sanity"); + let requests = 2; + let mut clients = vec![client.clone(), client]; + let (tx, rx) = oneshot::channel::<()>(); + let (tx2, rx2) = oneshot::channel::<()>(); + let mut rxs = vec![rx, rx2]; + for _i in 0..requests { + let mut client = clients.pop().unwrap(); + let rx = rxs.pop().unwrap(); + let req = Request::builder() + .method(Method::CONNECT) + .uri(format!("{}", addr)) + .body(Empty::::new()) + .expect("request builder"); + + let resp = client.send_request(req).await.expect("req1 send"); + assert_eq!(resp.status(), 200); + let upgrade = hyper::upgrade::on(resp).await.unwrap(); + tokio::task::spawn(async move { + let _ = rx.await; + drop(upgrade); + }); + } + drop(tx); + drop(tx2); + tokio::time::timeout(Duration::from_secs(1), rxx) + .await + .expect("drop with 1s") + .expect("tx dropped without sending"); + } + #[tokio::test] async fn http2_keep_alive_detects_unresponsive_server() { let (listener, addr) = setup_tk_test_server().await;