Skip to content

Commit

Permalink
lints
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Leggett <[email protected]>
  • Loading branch information
bleggett committed Apr 26, 2024
1 parent e364069 commit 0def96d
Showing 1 changed file with 32 additions and 18 deletions.
50 changes: 32 additions & 18 deletions src/proxy/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,10 @@ impl PoolState {
};
match found_conn {
Some(exist_conn_lock) => {
debug!("checkout - found mutex for key, waiting for writelock");
debug!(
"checkout - found mutex for pool key {:#?}, waiting for writelock",
pool_key
);
let _conn_lock = exist_conn_lock.as_ref().lock().await;

trace!(
Expand Down Expand Up @@ -1199,7 +1202,7 @@ mod test {
let cfg = crate::config::Config {
local_node: Some("local-node".to_string()),
pool_max_streams_per_conn: 50,
pool_unused_release_timeout: Duration::from_secs(2),
pool_unused_release_timeout: Duration::from_secs(1),
..crate::config::parse_config().unwrap()
};
let sock_fact = Arc::new(crate::proxy::DefaultSocketFactory);
Expand All @@ -1217,7 +1220,7 @@ mod test {
let mut tasks = futures::stream::FuturesUnordered::new();
loop {
count += 1;
tasks.push(spawn_client(pool.clone(), key1.clone(), server_addr, 25));
tasks.push(spawn_client(pool.clone(), key1.clone(), server_addr, 1));

if count == client_count {
break;
Expand All @@ -1228,6 +1231,21 @@ mod test {
let persist_res =
spawn_persistent_client(pool.clone(), key1.clone(), server_addr, client_stop);

// TODO we spawn clients too fast (and they have little to do) and they actually break the
// local "fake" test server, causing it to start returning "conn refused/peer refused the connection"
// when the pool tries to create new connections for that caller
//
// (the pool will just pass that conn refused back to the caller)
//
// In the real world this is fine, since we aren't hitting a local server,
// servers can refuse connections - in synthetic tests it leads to flakes.
//
// It is worth considering if the pool should throttle how frequently it allows itself to create
// connections to real upstreams (e.g. "I created a conn for this key 10ms ago and you've already burned through
// your streamcount, chill out, you're gonna overload the dest")
//
// For now, streamcount is an inexact flow control for this.
sleep(Duration::from_millis(500)).await;
//loop thru the nonpersistent clients and wait for them to finish
while let Some(Err(res)) = tasks.next().await {
assert!(!res.is_panic(), "CLIENT PANICKED!");
Expand All @@ -1241,12 +1259,12 @@ mod test {
"actual before conncount was {before_conncount}"
);
assert!(
before_dropcount == 0,
before_dropcount == 1,
"actual before dropcount was {before_dropcount}"
);

// Attempt to wait long enough for pool conns to timeout+evict
sleep(Duration::from_secs(2)).await;
sleep(Duration::from_secs(1)).await;

let real_conncount = conn_counter.load(Ordering::Relaxed);
assert!(real_conncount == 3, "actual conncount was {real_conncount}");
Expand All @@ -1257,7 +1275,7 @@ mod test {
client_stop_signal.drain().await;
assert!(persist_res.await.is_ok(), "PERSIST CLIENT ERROR");

sleep(Duration::from_secs(2)).await;
sleep(Duration::from_secs(1)).await;

let after_conncount = conn_counter.load(Ordering::Relaxed);
assert!(
Expand Down Expand Up @@ -1311,8 +1329,6 @@ mod test {

if res.is_err() {
panic!("SEND ERR: {:#?} sendcount {count}", res);
} else if !res.is_ok() {
panic!("CLIENT RETURNED ERROR")
}

let mut okres = res.unwrap();
Expand All @@ -1327,7 +1343,6 @@ mod test {
debug!("CLIENT DONE");
break;
}

}
})
}
Expand Down Expand Up @@ -1361,17 +1376,16 @@ mod test {
start.elapsed().as_millis()
);

let mut count = 0u32;
// send forever, once we get a conn, until someone signals us to stop
let send_loop = async move {
//send once, then hold the conn open until signaled
let res = c1.send_request(req()).await;
if res.is_err() {
panic!("SEND ERR: {:#?}", res);
}
loop {
count += 1;
let res = c1.send_request(req()).await;
if res.is_err() {
panic!("SEND ERR: {:#?} sendcount {count}", res);
} else if res.unwrap().status() != 200 {
panic!("CLIENT RETURNED ERROR")
}
debug!("persistent client yielding");
sleep(Duration::from_millis(1)).await; //yield may be enough
tokio::task::yield_now().await;
}
};

Expand Down

0 comments on commit 0def96d

Please sign in to comment.