diff --git a/src/proxy/pool.rs b/src/proxy/pool.rs index 94bcea107..ad3e76585 100644 --- a/src/proxy/pool.rs +++ b/src/proxy/pool.rs @@ -642,12 +642,10 @@ mod test { use futures_util::StreamExt; use hyper::body::Incoming; - use http_body_util::BodyExt; use hyper::service::service_fn; use hyper::{Request, Response}; use std::sync::atomic::AtomicU32; use std::time::Duration; - use tokio::io::AsyncWriteExt; use tokio::net::TcpListener; use tokio::task::{self}; use tokio::time::sleep; @@ -1252,29 +1250,25 @@ mod test { continue; } + //Attempt to wait a bit more, to ensure the connections NOT held open by our persistent client are dropped. + sleep(Duration::from_secs(1)).await; let before_conncount = conn_counter.load(Ordering::Relaxed); let before_dropcount = conn_drop_counter.load(Ordering::Relaxed); assert!( before_conncount == 3, "actual before conncount was {before_conncount}" ); + // At this point, we should still have one conn that hasn't been dropped + // because we haven't ended the persistent client assert!( - before_dropcount == 1, + before_dropcount == 2, "actual before dropcount was {before_dropcount}" ); - // Attempt to wait long enough for pool conns to timeout+evict - sleep(Duration::from_secs(1)).await; - - let real_conncount = conn_counter.load(Ordering::Relaxed); - assert!(real_conncount == 3, "actual conncount was {real_conncount}"); - // At this point, we should still have one conn that hasn't been dropped - // because we haven't ended the persistent client - let real_dropcount = conn_drop_counter.load(Ordering::Relaxed); - assert!(real_dropcount == 2, "actual dropcount was {real_dropcount}"); client_stop_signal.drain().await; assert!(persist_res.await.is_ok(), "PERSIST CLIENT ERROR"); + //Attempt to wait a bit more, to ensure the connections held open by our persistent client is dropped. sleep(Duration::from_secs(1)).await; let after_conncount = conn_counter.load(Ordering::Relaxed); @@ -1331,14 +1325,6 @@ mod test { panic!("SEND ERR: {:#?} sendcount {count}", res); } - let mut okres = res.unwrap(); - const HBONE_MESSAGE: &[u8] = b"hbone\n"; - while let Some(next) = okres.frame().await { - let frame = next.expect("better have a resp body"); - if let Some(chunk) = frame.data_ref() { - assert_eq!(HBONE_MESSAGE, chunk); - } - } if count >= req_count { debug!("CLIENT DONE"); break; @@ -1413,7 +1399,6 @@ mod test { Ok(upgraded) => { let (mut ri, mut wi) = tokio::io::split(hyper_util::rt::TokioIo::new(upgraded)); - wi.write_all(b"hbone\n").await.unwrap(); tcp::handle_stream(tcp::Mode::ReadWrite, &mut ri, &mut wi).await; } Err(e) => panic!("No upgrade {e}"),