Skip to content

Commit

Permalink
For now, just do the foolproof collision check
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Leggett <[email protected]>
  • Loading branch information
bleggett committed Apr 18, 2024
1 parent a0e5f26 commit c398d25
Showing 1 changed file with 57 additions and 23 deletions.
80 changes: 57 additions & 23 deletions src/proxy/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub struct WorkloadHBONEPool {
max_streamcount: u16,
// this is effectively just a convenience data type - a rwlocked hashmap with keying and LRU drops
// and has no actual hyper/http/connection logic.
connected_pool: Arc<pingora_pool::ConnectionPool<Client>>,
connected_pool: Arc<pingora_pool::ConnectionPool<ConnClient>>,
cfg: config::Config,
socket_factory: Arc<dyn SocketFactory + Send + Sync>,
cert_manager: Arc<SecretManager>,
Expand Down Expand Up @@ -120,7 +120,7 @@ impl WorkloadHBONEPool {
//
// If many `connects` request a connection to the same dest at once, all will wait until exactly
// one connection is created, before deciding if they should create more or just use that one.
pub async fn connect(&mut self, key: WorkloadKey) -> Result<Client, Error> {
pub async fn connect(&mut self, key: WorkloadKey) -> Result<ConnClient, Error> {
debug!("pool connect START");
// TODO BML this may not be collision resistant/slow. It should be resistant enough for workloads tho.
let mut s = DefaultHasher::new();
Expand Down Expand Up @@ -220,11 +220,12 @@ impl WorkloadHBONEPool {
debug!("appears we need a new conn, retaining connlock");
debug!("nothing else is creating a conn, make one");
let pool_conn = self.spawn_new_pool_conn(key.clone()).await;
let client = Client(
pool_conn?,
Arc::new(AtomicU16::new(0)),
self.max_streamcount,
);
let client = ConnClient{
sender: pool_conn?,
stream_count: Arc::new(AtomicU16::new(0)),
stream_count_max: self.max_streamcount,
wl_key: key.clone(),
};

debug!(
"starting new conn for key {:#?} with pk {:#?}",
Expand Down Expand Up @@ -311,25 +312,41 @@ impl WorkloadHBONEPool {
None => {
debug!("spawning new conn for key {:#?} to replace", key);
let pool_conn = self.spawn_new_pool_conn(key.clone()).await;
let r_conn = Client(
pool_conn?,
Arc::new(AtomicU16::new(0)),
self.max_streamcount,
);

let r_conn = ConnClient{
sender: pool_conn?,
stream_count: Arc::new(AtomicU16::new(0)),
stream_count_max: self.max_streamcount,
wl_key: key.clone(),
};
self.checkin_conn(r_conn.clone(), pool_key.clone());
Ok(r_conn)
}
}
// END take outer readlock
}
}.and_then(|conn| {
// Finally, we either have a conn or an error.
// Just for safety's sake, since we are using a hash thanks to pingora supporting arbitrary Eq, Hash
// types, do a deep equality test before returning the conn, returning an error if the conn's key does
// not equal the provided key
//
// this is a final safety check for collisions, we will throw up our hands and refuse to return the conn
match conn.is_for_workload(key) {
Ok(()) => {
return Ok(conn)
}
Err(e) => {
return Err(e)
}
}
})

}
async fn first_checkout_conn_from_pool(
&self,
key: &WorkloadKey,
hash_key: u64,
pool_key: &pingora_pool::ConnectionMeta,
) -> Option<Client> {
) -> Option<ConnClient> {
debug!("first checkout READLOCK");
let map_read_lock = self.established_conn_writelock.read().await;
match map_read_lock.get(&hash_key) {
Expand Down Expand Up @@ -372,7 +389,7 @@ impl WorkloadHBONEPool {
//
// Note that this simply removes the client ref from this pool - if other things hold client/streamrefs refs,
// they must also drop those before the underlying connection is fully closed.
fn checkin_conn(&self, conn: Client, pool_key: pingora_pool::ConnectionMeta) {
fn checkin_conn(&self, conn: ConnClient, pool_key: pingora_pool::ConnectionMeta) {
let (evict, pickup) = self.connected_pool.put(&pool_key, conn);
let rx = self.timeout_recv.clone();
let pool_ref = self.connected_pool.clone();
Expand Down Expand Up @@ -439,13 +456,21 @@ impl WorkloadHBONEPool {
}

#[derive(Debug, Clone)]
pub struct Client(http2::SendRequest<Empty<Bytes>>, Arc<AtomicU16>, u16);
// A sort of faux-client, that represents a single checked-out 'request sender' which might
// send requests over some underlying stream using some underlying http/2 client
pub struct ConnClient {
sender: http2::SendRequest<Empty<Bytes>>,
stream_count: Arc<AtomicU16>, // the current streamcount for this client conn.
stream_count_max: u16, // the max streamcount associated with this client.
// A WL key may have many clients, but every client has no more than one WL key
wl_key: WorkloadKey, // the WL key associated with this client.
}

impl Client {
impl ConnClient {
pub fn at_max_streamcount(&self) -> bool {
let curr_count = self.1.load(Ordering::Relaxed);
let curr_count = self.stream_count.load(Ordering::Relaxed);
debug!("checking streamcount: {curr_count}");
if curr_count >= self.2 {
if curr_count >= self.stream_count_max {
return true;
}
false
Expand All @@ -456,12 +481,21 @@ impl Client {
req: Request<Empty<Bytes>>,
) -> impl Future<Output = hyper::Result<Response<Incoming>>> {
// TODO should we enforce streamcount per-sent-request? This would be slow.
self.1.fetch_add(1, Ordering::Relaxed);
self.0.send_request(req)
self.stream_count.fetch_add(1, Ordering::Relaxed);
self.sender.send_request(req)
}

pub fn is_for_workload(&self, wl_key: WorkloadKey) -> Result<(), crate::proxy::Error> {
if !(self.wl_key == wl_key) {
return Err(crate::proxy::Error::Generic("fetched connection does not match workload key!".into()))
} else {
return Ok(())
}
}

}

impl Drop for Client {
impl Drop for ConnClient {
fn drop(&mut self) {
println!("Dropping Client!");
}
Expand Down

0 comments on commit c398d25

Please sign in to comment.