From c48c2c44a155b88236c6fcdb488bb1d3c22b9561 Mon Sep 17 00:00:00 2001 From: John Howard Date: Wed, 1 May 2024 11:23:40 -0700 Subject: [PATCH] Make connection tracking resiliant to connection reuse (#991) * Make connection tracking resiliant to connection reuse We have a race where if I open a connection with the same 4 tuple too quickly, we would occasionally fail to track. This is due to ordering: * New connection register * Old connection close * New connection track -- fails Now, we make sure we account (in count) for the new connection in register so we don't have this race. Hard to test in unit tests or even integration; I used netperf TCP_CRR and its pretty easy to trigger. * Track and register together --- src/proxy/connection_manager.rs | 217 ++++++++++++++------------------ src/proxy/outbound.rs | 2 +- 2 files changed, 97 insertions(+), 122 deletions(-) diff --git a/src/proxy/connection_manager.rs b/src/proxy/connection_manager.rs index ca49547f4..b0c5a31ff 100644 --- a/src/proxy/connection_manager.rs +++ b/src/proxy/connection_manager.rs @@ -18,6 +18,7 @@ use crate::state::DemandProxyState; use crate::state::ProxyRbacContext; use drain; use serde::{Serialize, Serializer}; +use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::fmt::Formatter; use std::future::Future; @@ -25,7 +26,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::sync::RwLock; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; struct ConnectionDrain { // TODO: this should almost certainly be changed to a type which has counted references exposed. @@ -39,7 +40,7 @@ struct ConnectionDrain { impl ConnectionDrain { fn new() -> Self { let (tx, rx) = drain::channel(); - ConnectionDrain { tx, rx, count: 0 } + ConnectionDrain { tx, rx, count: 1 } } /// drain drops the internal reference to rx and then signals drain on the tx @@ -174,13 +175,15 @@ impl ConnectionManager { ctx: ctx.clone(), dest_service, }; - self.register(&conn); + let Some(watch) = self.register(&conn) else { + warn!("failed to track {conn:?}"); + debug_assert!(false, "failed to track {conn:?}"); + return Err(Error::AuthorizationPolicyRejection); + }; if !state.assert_rbac(ctx).await { + self.release(&conn); return Err(Error::AuthorizationPolicyRejection); } - let Some(watch) = self.track(&conn) else { - return Err(Error::AuthorizationPolicyRejection); - }; Ok(ConnectionGuard { cm: self.clone(), conn, @@ -191,30 +194,19 @@ impl ConnectionManager { // this must be done before a connection can be tracked // allows policy to be asserted against the connection // even no tasks have a receiver channel yet - fn register(&self, c: &InboundConnection) { - self.drains - .write() - .expect("mutex") - .entry(c.clone()) - .or_insert(ConnectionDrain::new()); - } - - // get a channel to receive close on for your connection - // requires that the connection be registered first - // if you receive None this connection is invalid and should close - fn track(&self, c: &InboundConnection) -> Option { - match self - .drains - .write() - .expect("mutex") - .entry(c.to_owned()) - .and_modify(|cd| cd.count += 1) - { - std::collections::hash_map::Entry::Occupied(cd) => { + fn register(&self, c: &InboundConnection) -> Option { + match self.drains.write().expect("mutex").entry(c.clone()) { + Entry::Occupied(mut cd) => { + cd.get_mut().count += 1; let rx = cd.get().rx.clone(); Some(rx) } - std::collections::hash_map::Entry::Vacant(_) => None, + Entry::Vacant(entry) => { + let drain = ConnectionDrain::new(); + let rx = drain.rx.clone(); + entry.insert(drain); + Some(rx) + } } } @@ -341,15 +333,27 @@ mod tests { use crate::xds::istio::security::{Action, Authorization, Scope}; use crate::xds::ProxyStateUpdateMutator; - use super::{ConnectionManager, InboundConnection, PolicyWatcher}; + use super::{ConnectionGuard, ConnectionManager, InboundConnection, PolicyWatcher}; #[tokio::test] async fn test_connection_manager_close() { // setup a new ConnectionManager - let connection_manager = ConnectionManager::default(); + let cm = ConnectionManager::default(); // ensure drains is empty - assert_eq!(connection_manager.drains.read().unwrap().len(), 0); - assert_eq!(connection_manager.connections().len(), 0); + assert_eq!(cm.drains.read().unwrap().len(), 0); + assert_eq!(cm.connections().len(), 0); + + let register = |cm: &ConnectionManager, c: &InboundConnection| { + let cm = cm.clone(); + let c = c.clone(); + + let watch = cm.register(&c).unwrap(); + ConnectionGuard { + cm, + conn: c, + watch: Some(watch), + } + }; // track a new connection let rbac_ctx1 = InboundConnection { @@ -371,36 +375,19 @@ mod tests { dest_service: None, }; - // assert that tracking an unregistered connection is None - let close1 = connection_manager.track(&rbac_ctx1); - assert!(close1.is_none()); - assert_eq!(connection_manager.drains.read().unwrap().len(), 0); - assert_eq!(connection_manager.connections().len(), 0); - - connection_manager.register(&rbac_ctx1); - assert_eq!(connection_manager.drains.read().unwrap().len(), 1); - assert_eq!(connection_manager.connections().len(), 1); - assert_eq!(connection_manager.connections(), vec!(rbac_ctx1.clone())); - - let close1 = connection_manager - .track(&rbac_ctx1) - .expect("should not be None"); - // ensure drains contains exactly 1 item - assert_eq!(connection_manager.drains.read().unwrap().len(), 1); - assert_eq!(connection_manager.connections().len(), 1); - assert_eq!(connection_manager.connections(), vec!(rbac_ctx1.clone())); + let mut close1 = register(&cm, &rbac_ctx1); + assert_eq!(cm.drains.read().unwrap().len(), 1); + assert_eq!(cm.connections().len(), 1); + assert_eq!(cm.connections(), vec!(rbac_ctx1.clone())); // setup a second track on the same connection - let another_conn1 = rbac_ctx1.clone(); - let another_close1 = connection_manager - .track(&another_conn1) - .expect("should not be None"); + let mut another_close1 = register(&cm, &rbac_ctx1); // ensure drains contains exactly 1 item - assert_eq!(connection_manager.drains.read().unwrap().len(), 1); - assert_eq!(connection_manager.connections().len(), 1); - assert_eq!(connection_manager.connections(), vec!(rbac_ctx1.clone())); + assert_eq!(cm.drains.read().unwrap().len(), 1); + assert_eq!(cm.connections().len(), 1); + assert_eq!(cm.connections(), vec!(rbac_ctx1.clone())); // track a second connection let rbac_ctx2 = InboundConnection { @@ -422,44 +409,52 @@ mod tests { dest_service: None, }; - connection_manager.register(&rbac_ctx2); - let close2 = connection_manager - .track(&rbac_ctx2) - .expect("should not be None"); - + let mut close2 = register(&cm, &rbac_ctx2); // ensure drains contains exactly 2 items - assert_eq!(connection_manager.drains.read().unwrap().len(), 2); - assert_eq!(connection_manager.connections().len(), 2); - let mut connections = connection_manager.connections(); + assert_eq!(cm.drains.read().unwrap().len(), 2); + assert_eq!(cm.connections().len(), 2); + let mut connections = cm.connections(); connections.sort(); // ordering cannot be guaranteed without sorting assert_eq!(connections, vec![rbac_ctx1.clone(), rbac_ctx2.clone()]); // spawn tasks to assert that we close in a timely manner for rbac_ctx1 - tokio::spawn(assert_close(close1)); - tokio::spawn(assert_close(another_close1)); + tokio::spawn(assert_close(close1.watch.take().unwrap())); + tokio::spawn(assert_close(another_close1.watch.take().unwrap())); // close rbac_ctx1 - connection_manager.close(&rbac_ctx1).await; + cm.close(&rbac_ctx1).await; // ensure drains contains exactly 1 item - assert_eq!(connection_manager.drains.read().unwrap().len(), 1); - assert_eq!(connection_manager.connections().len(), 1); - assert_eq!(connection_manager.connections(), vec!(rbac_ctx2.clone())); + assert_eq!(cm.drains.read().unwrap().len(), 1); + assert_eq!(cm.connections().len(), 1); + assert_eq!(cm.connections(), vec!(rbac_ctx2.clone())); // spawn a task to assert that we close in a timely manner for rbac_ctx2 - tokio::spawn(assert_close(close2)); + tokio::spawn(assert_close(close2.watch.take().unwrap())); // close rbac_ctx2 - connection_manager.close(&rbac_ctx2).await; + cm.close(&rbac_ctx2).await; // assert that drains is empty again - assert_eq!(connection_manager.drains.read().unwrap().len(), 0); - assert_eq!(connection_manager.connections().len(), 0); + assert_eq!(cm.drains.read().unwrap().len(), 0); + assert_eq!(cm.connections().len(), 0); } #[tokio::test] async fn test_connection_manager_release() { // setup a new ConnectionManager - let connection_manager = ConnectionManager::default(); + let cm = ConnectionManager::default(); // ensure drains is empty - assert_eq!(connection_manager.drains.read().unwrap().len(), 0); - assert_eq!(connection_manager.connections().len(), 0); + assert_eq!(cm.drains.read().unwrap().len(), 0); + assert_eq!(cm.connections().len(), 0); + + let register = |cm: &ConnectionManager, c: &InboundConnection| { + let cm = cm.clone(); + let c = c.clone(); + + let watch = cm.register(&c).unwrap(); + ConnectionGuard { + cm, + conn: c, + watch: Some(watch), + } + }; // create a new connection let conn1 = InboundConnection { @@ -502,70 +497,51 @@ mod tests { }; let another_conn1 = conn1.clone(); - connection_manager.register(&conn1); + let close1 = register(&cm, &conn1); + let another_close1 = register(&cm, &another_conn1); - // watch the connections - let close1 = connection_manager - .track(&conn1) - .expect("should not be None"); - let another_close1 = connection_manager - .track(&another_conn1) - .expect("should not be None"); // ensure drains contains exactly 1 item - assert_eq!(connection_manager.drains.read().unwrap().len(), 1); - assert_eq!(connection_manager.connections().len(), 1); - assert_eq!(connection_manager.connections(), vec!(conn1.clone())); + assert_eq!(cm.drains.read().unwrap().len(), 1); + assert_eq!(cm.connections().len(), 1); + assert_eq!(cm.connections(), vec!(conn1.clone())); // release conn1's clone drop(another_close1); - connection_manager.release(&another_conn1); // ensure drains still contains exactly 1 item - assert_eq!(connection_manager.drains.read().unwrap().len(), 1); - assert_eq!(connection_manager.connections().len(), 1); - assert_eq!(connection_manager.connections(), vec!(conn1.clone())); - - connection_manager.register(&conn2); - // track conn2 - let close2 = connection_manager - .track(&conn2) - .expect("should not be None"); + assert_eq!(cm.drains.read().unwrap().len(), 1); + assert_eq!(cm.connections().len(), 1); + assert_eq!(cm.connections(), vec!(conn1.clone())); + + let close2 = register(&cm, &conn2); // ensure drains contains exactly 2 items - assert_eq!(connection_manager.drains.read().unwrap().len(), 2); - assert_eq!(connection_manager.connections().len(), 2); - let mut connections = connection_manager.connections(); + assert_eq!(cm.drains.read().unwrap().len(), 2); + assert_eq!(cm.connections().len(), 2); + let mut connections = cm.connections(); connections.sort(); // ordering cannot be guaranteed without sorting assert_eq!(connections, vec![conn1.clone(), conn2.clone()]); // release conn1 drop(close1); - connection_manager.release(&conn1); // ensure drains contains exactly 1 item - assert_eq!(connection_manager.drains.read().unwrap().len(), 1); - assert_eq!(connection_manager.connections().len(), 1); - assert_eq!(connection_manager.connections(), vec!(conn2.clone())); + assert_eq!(cm.drains.read().unwrap().len(), 1); + assert_eq!(cm.connections().len(), 1); + assert_eq!(cm.connections(), vec!(conn2.clone())); // clone conn2 and track it let another_conn2 = conn2.clone(); - let another_close2 = connection_manager - .track(&another_conn2) - .expect("should not be None"); - drop(close2); + let another_close2 = register(&cm, &another_conn2); // release tracking on conn2 - connection_manager.release(&conn2); + drop(close2); // ensure drains still contains exactly 1 item - assert_eq!(connection_manager.drains.read().unwrap().len(), 1); - assert_eq!(connection_manager.connections().len(), 1); - assert_eq!( - connection_manager.connections(), - vec!(another_conn2.clone()) - ); + assert_eq!(cm.drains.read().unwrap().len(), 1); + assert_eq!(cm.connections().len(), 1); + assert_eq!(cm.connections(), vec!(another_conn2.clone())); // release tracking on conn2's clone drop(another_close2); - connection_manager.release(&another_conn2); // ensure drains contains exactly 0 items - assert_eq!(connection_manager.drains.read().unwrap().len(), 0); - assert_eq!(connection_manager.connections().len(), 0); + assert_eq!(cm.drains.read().unwrap().len(), 0); + assert_eq!(cm.connections().len(), 0); } #[tokio::test] @@ -612,9 +588,8 @@ mod tests { dest_service: None, }; // watch the connection - connection_manager.register(&conn1); let close1 = connection_manager - .track(&conn1) + .register(&conn1) .expect("should not be None"); // generate policy which denies everything diff --git a/src/proxy/outbound.rs b/src/proxy/outbound.rs index a2b567ab5..7ae31e9b4 100644 --- a/src/proxy/outbound.rs +++ b/src/proxy/outbound.rs @@ -338,7 +338,7 @@ impl OutboundConnection { req: &Request, connection_stats: &ConnectionResult, ) -> Result<(), Error> { - info!( + debug!( "Proxying to {} using TCP via {} type {:?}", req.destination, req.gateway, req.request_type );