diff --git a/src/proxy/pool.rs b/src/proxy/pool.rs index 4f8c278e5..2700b0d25 100644 --- a/src/proxy/pool.rs +++ b/src/proxy/pool.rs @@ -32,7 +32,7 @@ use tokio::sync::watch; use tokio::task; use tokio::sync::Mutex; -use tracing::{debug, error}; +use tracing::{debug, error, trace}; use crate::config; use crate::identity::{Identity, SecretManager}; @@ -41,6 +41,8 @@ use flurry::HashMap; use pingora_pool; +// This is merely a counter to track the overall number of conns this pool spawns +// to ensure we get unique poolkeys-per-new-conn, it is not a limit static GLOBAL_CONN_COUNT: AtomicI32 = AtomicI32::new(0); // A relatively nonstandard HTTP/2 connection pool designed to allow multiplexing proxied workload connections @@ -53,8 +55,6 @@ static GLOBAL_CONN_COUNT: AtomicI32 = AtomicI32::new(0); // by flow control throttling. #[derive(Clone)] pub struct WorkloadHBONEPool { - // this is effectively just a convenience data type - a rwlocked hashmap with keying and LRU drops - // and has no actual hyper/http/connection logic. cfg: config::Config, socket_factory: Arc, cert_manager: Arc, @@ -63,6 +63,7 @@ pub struct WorkloadHBONEPool { max_streamcount: u16, } +// PoolState is effectively the gnarly inner state stuff that needs thread/task sync, and should be wrapped in a Mutex. struct PoolState { pool_notifier: watch::Sender, // This is already impl clone? rustc complains that it isn't, tho timeout_tx: watch::Sender, // This is already impl clone? rustc complains that it isn't, tho @@ -70,7 +71,7 @@ struct PoolState { // this is effectively just a convenience data type - a rwlocked hashmap with keying and LRU drops // and has no actual hyper/http/connection logic. connected_pool: Arc>, - // this must be a readlockable list-of-locks, so we can lock per-key, not globally, and avoid holding up all conn attempts + // this must be an atomic/concurrent-safe list-of-locks, so we can lock per-key, not globally, and avoid holding up all conn attempts established_conn_writelock: HashMap>>>, close_pollers: futures::stream::FuturesUnordered>, pool_unused_release_timeout: Duration, @@ -86,7 +87,8 @@ impl PoolState { // // Idle poppers are safe to invoke if the conn they are popping is already gone // from the inner queue, so we will start one for every insert, let them run or terminate on their own, - // and poll them to completion on shutdown. + // and poll them to completion on shutdown - any duplicates from repeated checkouts/checkins of the same conn + // will simply resolve as a no-op in order. // // Note that "idle" in the context of this pool means "no one has asked for it or dropped it in X time, so prune it". // @@ -114,6 +116,19 @@ impl PoolState { let _ = self.pool_notifier.send(true); } + // Does an initial, naive check to see if a conn exists for this key. + // + // If it does, WRITELOCK the mutex for that key, clone (or create), check in the clone, + // and return the other reference for use. + // + // Otherwise, return None. + // + // It is important that the *initial* check here is authoritative, hence the locks, as + // we must know if this is a connection for a key *nobody* has tried to start yet, + // or if other things have already established conns for this key. + // + // This is so we can backpressure correctly if 1000 tasks all demand a new connection + // to the same key at once, and not eagerly open 1000 tunnel connections. async fn first_checkout_conn_from_pool( &self, workload_key: &WorkloadKey, @@ -122,38 +137,30 @@ impl PoolState { debug!("first checkout READGUARD"); let found_conn = { - // BEGIN take outer readlock - debug!("pool connect MAP OUTER READ/WRITE GUARD"); + trace!("pool connect outer map - take guard"); let guard = self.established_conn_writelock.guard(); - debug!("pool connect MAP OUTER READ/WRITE START"); - //OLD - // debug!("pool connect MAP OUTER READLOCK START"); - // let map_read_lock = self.state.established_conn_writelock.read().await; - // debug!("pool connect MAP OUTER READLOCK END"); + trace!("pool connect outer map - check for keyed mutex"); let exist_conn_lock = self.established_conn_writelock.get(&pool_key.key, &guard); - // BEGIN take inner writelock - debug!("pool connect MAP INNER WRITELOCK START"); match exist_conn_lock { Some(e_conn_lock) => e_conn_lock.clone(), None => None, } - // exist_conn_lock.as_ref().unwrap().clone() }; - // let guard = self.established_conn_writelock.guard(); - // debug!("pool connect MAP OUTER READGUARD GOT"); match found_conn { - // match map_read_lock.get(&pool_key.key) { Some(exist_conn_lock) => { - debug!("first checkout INNER WRITELOCK"); + debug!("first checkout - found mutex for key, waiting for writelock"); let _conn_lock = exist_conn_lock.as_ref().lock().await; debug!( - "getting conn for key {:#?} and hash {:#?}", + "first checkout - got writelock for conn with key {:#?} and hash {:#?}", workload_key, pool_key.key ); self.connected_pool.get(&pool_key.key).and_then(|e_conn| { - debug!("got existing conn for key {:#?}", workload_key); + trace!( + "first checkout - inner pool - got existing conn for key {:#?}", + workload_key + ); if e_conn.at_max_streamcount() { debug!( "got conn for key {:#?}, but streamcount is maxed", @@ -171,9 +178,11 @@ impl PoolState { } } +// When the Arc-wrapped PoolState is finally dropped, trigger the drain, +// which will terminate all connection driver spawns, as well as cancel all outstanding eviction timeout spawns impl Drop for PoolState { fn drop(&mut self) { - debug!("poolstate dropping, cancelling all outstanding pool eviction timeout spawns"); + debug!("poolstate dropping, stopping all connection drivers and cancelling all outstanding eviction timeout spawns"); let _ = self.timeout_tx.send(true); } } @@ -221,6 +230,7 @@ impl WorkloadHBONEPool { pub async fn connect(&mut self, workload_key: WorkloadKey) -> Result { debug!("pool connect START"); // TODO BML this may not be collision resistant/slow. It should be resistant enough for workloads tho. + // We are doing a deep-equals check at the end to mitigate any collisions, will see about bumping Pingora let mut s = DefaultHasher::new(); workload_key.hash(&mut s); let hash_key = s.finish(); @@ -228,8 +238,7 @@ impl WorkloadHBONEPool { hash_key, GLOBAL_CONN_COUNT.fetch_add(1, Ordering::Relaxed), ); - - debug!("pool connect GET EXISTING"); + debug!("initial attempt - try to get existing conn from pool"); // First, see if we can naively just check out a connection. // This should be the common case, except for the first establishment of a new connection/key. // This will be done under outer readlock (nonexclusive)/inner keyed writelock (exclusive). @@ -245,37 +254,27 @@ impl WorkloadHBONEPool { .first_checkout_conn_from_pool(&workload_key, &pool_key) .await; - debug!("pool connect GOT EXISTING"); if existing_conn.is_some() { - debug!("using existing conn, connect future will be dropped on the floor"); + debug!("initial attempt - found existing conn, done"); Ok(existing_conn.unwrap()) } else { // We couldn't get a conn. This means either nobody has tried to establish any conns for this key yet, // or they have, but no conns are currently available // (because someone else has checked all of them out and not put any back yet) // - // So, we wil writelock the outer lock to see if an inner lock entry exists for our key. - // - // critical block - this writelocks the entire pool for all tasks/threads - // as we check to see if anyone has ever inserted a sharded mutex for this key. + // So, we will take a nonexclusive readlock on the lockmap, to see if an inner lock + // exists for our key. // // If not, we insert one. - // - // We want to hold this for as short as possible a time, and drop it - // before we hold it over an await. - // - // this is is the ONLY block where we should hold a writelock on the whole mutex map - // for the rest, a readlock (nonexclusive) is sufficient. { - debug!("pool connect MAP OUTER READ/WRITE GUARD"); + debug!("didn't find a connection for key {:#?}, making sure lockmap has entry", hash_key); let guard = self.state.established_conn_writelock.guard(); - debug!("pool connect MAP OUTER READ/WRITE START"); match self.state.established_conn_writelock.try_insert(hash_key, Some(Arc::new(Mutex::new(()))), &guard) { Ok(_) => { - debug!("inserting conn mutex for key {:#?}", hash_key); + debug!("inserting conn mutex for key {:#?} into lockmap", hash_key); } Err(_) => { - debug!("already have conn for key {:#?}", hash_key); + debug!("already have conn for key {:#?} in lockmap", hash_key); } } } @@ -285,7 +284,7 @@ impl WorkloadHBONEPool { // 2. We can now, under readlock(nonexclusive) in the outer map, attempt to // take the inner writelock for our specific key (exclusive). // - // This unblocks other tasks spawning connections against other keys, but blocks other + // This doesn't block other tasks spawning connections against other keys, but DOES block other // tasks spawning connections against THIS key - which is what we want. // NOTE: This inner, key-specific mutex is a tokio::async::Mutex, and not a stdlib sync mutex. @@ -296,37 +295,29 @@ impl WorkloadHBONEPool { // hold it over does not resolve. // // HOWEVER. Here we know this connection will either establish or timeout - // and we WANT other tasks to go back to sleep if there is an outstanding lock. - // So the downsides are actually useful (we WANT task contention - - // to block other parallel tasks from trying to spawn a connection if we are already doing so) + // and we WANT other tasks to go back to sleep if a task is already trying to create a new connection for this key. // - let found_conn = { - // BEGIN take outer readlock - debug!("pool connect MAP OUTER READ/WRITE GUARD"); + // So the downsides are actually useful (we WANT task contention - + // to block other parallel tasks from trying to spawn a connection for this key if we are already doing so) + let inner_conn_lock = { + trace!("fallback attempt - getting keyed lock out of lockmap"); let guard = self.state.established_conn_writelock.guard(); - debug!("pool connect MAP OUTER READ/WRITE START"); - - //OLD - // debug!("pool connect MAP OUTER READLOCK START"); - // let map_read_lock = self.state.established_conn_writelock.read().await; - // debug!("pool connect MAP OUTER READLOCK END"); let exist_conn_lock = self.state.established_conn_writelock.get(&hash_key, &guard).unwrap(); - // BEGIN take inner writelock - debug!("pool connect MAP INNER WRITELOCK START"); + trace!("fallback attempt - got keyed lock out of lockmap"); exist_conn_lock.as_ref().unwrap().clone() }; - // drop(guard); - let got_conn = match found_conn.try_lock() { + debug!("appears we need a new conn, attempting to win connlock for wl key {:#?}", workload_key); + let got_conn = match inner_conn_lock.try_lock() { Ok(_guard) => { + // BEGIN take inner writelock // If we get here, it means the following are true: // 1. We did not get a connection for our key. // 2. We have the exclusive inner writelock to create a new connection for our key. // // So, carry on doing that. - debug!("appears we need a new conn, retaining connlock"); - debug!("nothing else is creating a conn, make one"); + debug!("nothing else is creating a conn and we won the lock, make one"); let pool_conn = self.spawn_new_pool_conn(workload_key.clone()).await; let client = ConnClient{ sender: pool_conn?, @@ -339,12 +330,11 @@ impl WorkloadHBONEPool { "starting new conn for key {:#?} with pk {:#?}", workload_key, pool_key ); - debug!("dropping lock"); Some(client) // END take inner writelock } Err(_) => { - debug!("something else is creating a conn, wait for it"); + debug!("we didnt' win the lock, something else is creating a conn, wait for it"); // If we get here, it means the following are true: // 1. At one point, there was a preexisting conn in the pool for this key. // 2. When we checked, we got nothing for that key. @@ -356,7 +346,7 @@ impl WorkloadHBONEPool { loop { match self.pool_watcher.changed().await { Ok(_) => { - debug!( + trace!( "notified a new conn was enpooled, checking for hash {:#?}", hash_key ); @@ -365,7 +355,7 @@ impl WorkloadHBONEPool { let existing_conn = self.state.connected_pool.get(&hash_key); match existing_conn { None => { - debug!("got nothing"); + trace!("woke up on pool notification, but didn't find a conn for {:#?} yet", hash_key); continue; } Some(e_conn) => { @@ -376,13 +366,11 @@ impl WorkloadHBONEPool { debug!("found existing conn for key {:#?}, but streamcount is maxed", workload_key); break None; } - debug!("pool connect LOOP END"); break Some(e_conn); } } } Err(_) => { - // END take outer readlock return Err(Error::WorkloadHBONEPoolDraining); } } @@ -430,7 +418,6 @@ impl WorkloadHBONEPool { Ok(r_conn) } } - // END take outer readlock }.and_then(|conn| { // Finally, we either have a conn or an error. // Just for safety's sake, since we are using a hash thanks to pingora supporting arbitrary Eq, Hash