Skip to content

Commit

Permalink
Add test for HTTP2 CONNECT termination (#3655)
Browse files Browse the repository at this point in the history
  • Loading branch information
howardjohn authored May 17, 2024
1 parent ac84af6 commit a8f9e06
Showing 1 changed file with 76 additions and 0 deletions.
76 changes: 76 additions & 0 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2122,6 +2122,82 @@ mod conn {
.expect_err("client should be closed");
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn http2_connect_detect_close() {
// Regression test for failure to fully close connections when using HTTP2 CONNECT
// We send 2 requests and then drop them. We should see the connection gracefully close.
use futures_util::future;
let (listener, addr) = setup_tk_test_server().await;
let (tx, rxx) = oneshot::channel::<()>();

tokio::task::spawn(async move {
use hyper::server::conn::http2;
use hyper::service::service_fn;

let res = listener.accept().await;
let (stream, _) = res.unwrap();
let stream = TokioIo::new(stream);

let service = service_fn(move |req: Request<hyper::body::Incoming>| {
tokio::task::spawn(async move {
let io = &mut TokioIo::new(hyper::upgrade::on(req).await.unwrap());
io.write_all(b"hello\n").await.unwrap();
});

future::ok::<_, hyper::Error>(Response::new(Empty::<Bytes>::new()))
});

tokio::task::spawn(async move {
let conn = http2::Builder::new(TokioExecutor).serve_connection(stream, service);
let _ = conn.await;
tx.send(()).unwrap();
});
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.handshake(io)
.await
.expect("http handshake");

tokio::task::spawn(async move {
conn.await.expect("client conn");
});

// Sanity check that client is ready
future::poll_fn(|ctx| client.poll_ready(ctx))
.await
.expect("client poll ready sanity");
let requests = 2;
let mut clients = vec![client.clone(), client];
let (tx, rx) = oneshot::channel::<()>();
let (tx2, rx2) = oneshot::channel::<()>();
let mut rxs = vec![rx, rx2];
for _i in 0..requests {
let mut client = clients.pop().unwrap();
let rx = rxs.pop().unwrap();
let req = Request::builder()
.method(Method::CONNECT)
.uri(format!("{}", addr))
.body(Empty::<Bytes>::new())
.expect("request builder");

let resp = client.send_request(req).await.expect("req1 send");
assert_eq!(resp.status(), 200);
let upgrade = hyper::upgrade::on(resp).await.unwrap();
tokio::task::spawn(async move {
let _ = rx.await;
drop(upgrade);
});
}
drop(tx);
drop(tx2);
tokio::time::timeout(Duration::from_secs(1), rxx)
.await
.expect("drop with 1s")
.expect("tx dropped without sending");
}

#[tokio::test]
async fn http2_keep_alive_detects_unresponsive_server() {
let (listener, addr) = setup_tk_test_server().await;
Expand Down

0 comments on commit a8f9e06

Please sign in to comment.