Skip to content

Commit

Permalink
Make connection tracking resiliant to connection reuse (istio#991)
Browse files Browse the repository at this point in the history
* Make connection tracking resiliant to connection reuse

We have a race where if I open a connection with the same 4 tuple too
quickly, we would occasionally fail to track. This is due to ordering:

* New connection register
* Old connection close
* New connection track -- fails

Now, we make sure we account (in count) for the new connection in
register so we don't have this race.

Hard to test in unit tests or even integration; I used netperf TCP_CRR
and its pretty easy to trigger.

* Track and register together
  • Loading branch information
howardjohn authored May 1, 2024
1 parent 44d842c commit c48c2c4
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 122 deletions.
217 changes: 96 additions & 121 deletions src/proxy/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ use crate::state::DemandProxyState;
use crate::state::ProxyRbacContext;
use drain;
use serde::{Serialize, Serializer};
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::fmt::Formatter;
use std::future::Future;
use std::net::SocketAddr;

use std::sync::Arc;
use std::sync::RwLock;
use tracing::{debug, info};
use tracing::{debug, info, warn};

struct ConnectionDrain {
// TODO: this should almost certainly be changed to a type which has counted references exposed.
Expand All @@ -39,7 +40,7 @@ struct ConnectionDrain {
impl ConnectionDrain {
fn new() -> Self {
let (tx, rx) = drain::channel();
ConnectionDrain { tx, rx, count: 0 }
ConnectionDrain { tx, rx, count: 1 }
}

/// drain drops the internal reference to rx and then signals drain on the tx
Expand Down Expand Up @@ -174,13 +175,15 @@ impl ConnectionManager {
ctx: ctx.clone(),
dest_service,
};
self.register(&conn);
let Some(watch) = self.register(&conn) else {
warn!("failed to track {conn:?}");
debug_assert!(false, "failed to track {conn:?}");
return Err(Error::AuthorizationPolicyRejection);
};
if !state.assert_rbac(ctx).await {
self.release(&conn);
return Err(Error::AuthorizationPolicyRejection);
}
let Some(watch) = self.track(&conn) else {
return Err(Error::AuthorizationPolicyRejection);
};
Ok(ConnectionGuard {
cm: self.clone(),
conn,
Expand All @@ -191,30 +194,19 @@ impl ConnectionManager {
// this must be done before a connection can be tracked
// allows policy to be asserted against the connection
// even no tasks have a receiver channel yet
fn register(&self, c: &InboundConnection) {
self.drains
.write()
.expect("mutex")
.entry(c.clone())
.or_insert(ConnectionDrain::new());
}

// get a channel to receive close on for your connection
// requires that the connection be registered first
// if you receive None this connection is invalid and should close
fn track(&self, c: &InboundConnection) -> Option<drain::Watch> {
match self
.drains
.write()
.expect("mutex")
.entry(c.to_owned())
.and_modify(|cd| cd.count += 1)
{
std::collections::hash_map::Entry::Occupied(cd) => {
fn register(&self, c: &InboundConnection) -> Option<drain::Watch> {
match self.drains.write().expect("mutex").entry(c.clone()) {
Entry::Occupied(mut cd) => {
cd.get_mut().count += 1;
let rx = cd.get().rx.clone();
Some(rx)
}
std::collections::hash_map::Entry::Vacant(_) => None,
Entry::Vacant(entry) => {
let drain = ConnectionDrain::new();
let rx = drain.rx.clone();
entry.insert(drain);
Some(rx)
}
}
}

Expand Down Expand Up @@ -341,15 +333,27 @@ mod tests {
use crate::xds::istio::security::{Action, Authorization, Scope};
use crate::xds::ProxyStateUpdateMutator;

use super::{ConnectionManager, InboundConnection, PolicyWatcher};
use super::{ConnectionGuard, ConnectionManager, InboundConnection, PolicyWatcher};

#[tokio::test]
async fn test_connection_manager_close() {
// setup a new ConnectionManager
let connection_manager = ConnectionManager::default();
let cm = ConnectionManager::default();
// ensure drains is empty
assert_eq!(connection_manager.drains.read().unwrap().len(), 0);
assert_eq!(connection_manager.connections().len(), 0);
assert_eq!(cm.drains.read().unwrap().len(), 0);
assert_eq!(cm.connections().len(), 0);

let register = |cm: &ConnectionManager, c: &InboundConnection| {
let cm = cm.clone();
let c = c.clone();

let watch = cm.register(&c).unwrap();
ConnectionGuard {
cm,
conn: c,
watch: Some(watch),
}
};

// track a new connection
let rbac_ctx1 = InboundConnection {
Expand All @@ -371,36 +375,19 @@ mod tests {
dest_service: None,
};

// assert that tracking an unregistered connection is None
let close1 = connection_manager.track(&rbac_ctx1);
assert!(close1.is_none());
assert_eq!(connection_manager.drains.read().unwrap().len(), 0);
assert_eq!(connection_manager.connections().len(), 0);

connection_manager.register(&rbac_ctx1);
assert_eq!(connection_manager.drains.read().unwrap().len(), 1);
assert_eq!(connection_manager.connections().len(), 1);
assert_eq!(connection_manager.connections(), vec!(rbac_ctx1.clone()));

let close1 = connection_manager
.track(&rbac_ctx1)
.expect("should not be None");

// ensure drains contains exactly 1 item
assert_eq!(connection_manager.drains.read().unwrap().len(), 1);
assert_eq!(connection_manager.connections().len(), 1);
assert_eq!(connection_manager.connections(), vec!(rbac_ctx1.clone()));
let mut close1 = register(&cm, &rbac_ctx1);
assert_eq!(cm.drains.read().unwrap().len(), 1);
assert_eq!(cm.connections().len(), 1);
assert_eq!(cm.connections(), vec!(rbac_ctx1.clone()));

// setup a second track on the same connection
let another_conn1 = rbac_ctx1.clone();
let another_close1 = connection_manager
.track(&another_conn1)
.expect("should not be None");
let mut another_close1 = register(&cm, &rbac_ctx1);

// ensure drains contains exactly 1 item
assert_eq!(connection_manager.drains.read().unwrap().len(), 1);
assert_eq!(connection_manager.connections().len(), 1);
assert_eq!(connection_manager.connections(), vec!(rbac_ctx1.clone()));
assert_eq!(cm.drains.read().unwrap().len(), 1);
assert_eq!(cm.connections().len(), 1);
assert_eq!(cm.connections(), vec!(rbac_ctx1.clone()));

// track a second connection
let rbac_ctx2 = InboundConnection {
Expand All @@ -422,44 +409,52 @@ mod tests {
dest_service: None,
};

connection_manager.register(&rbac_ctx2);
let close2 = connection_manager
.track(&rbac_ctx2)
.expect("should not be None");

let mut close2 = register(&cm, &rbac_ctx2);
// ensure drains contains exactly 2 items
assert_eq!(connection_manager.drains.read().unwrap().len(), 2);
assert_eq!(connection_manager.connections().len(), 2);
let mut connections = connection_manager.connections();
assert_eq!(cm.drains.read().unwrap().len(), 2);
assert_eq!(cm.connections().len(), 2);
let mut connections = cm.connections();
connections.sort(); // ordering cannot be guaranteed without sorting
assert_eq!(connections, vec![rbac_ctx1.clone(), rbac_ctx2.clone()]);

// spawn tasks to assert that we close in a timely manner for rbac_ctx1
tokio::spawn(assert_close(close1));
tokio::spawn(assert_close(another_close1));
tokio::spawn(assert_close(close1.watch.take().unwrap()));
tokio::spawn(assert_close(another_close1.watch.take().unwrap()));
// close rbac_ctx1
connection_manager.close(&rbac_ctx1).await;
cm.close(&rbac_ctx1).await;
// ensure drains contains exactly 1 item
assert_eq!(connection_manager.drains.read().unwrap().len(), 1);
assert_eq!(connection_manager.connections().len(), 1);
assert_eq!(connection_manager.connections(), vec!(rbac_ctx2.clone()));
assert_eq!(cm.drains.read().unwrap().len(), 1);
assert_eq!(cm.connections().len(), 1);
assert_eq!(cm.connections(), vec!(rbac_ctx2.clone()));

// spawn a task to assert that we close in a timely manner for rbac_ctx2
tokio::spawn(assert_close(close2));
tokio::spawn(assert_close(close2.watch.take().unwrap()));
// close rbac_ctx2
connection_manager.close(&rbac_ctx2).await;
cm.close(&rbac_ctx2).await;
// assert that drains is empty again
assert_eq!(connection_manager.drains.read().unwrap().len(), 0);
assert_eq!(connection_manager.connections().len(), 0);
assert_eq!(cm.drains.read().unwrap().len(), 0);
assert_eq!(cm.connections().len(), 0);
}

#[tokio::test]
async fn test_connection_manager_release() {
// setup a new ConnectionManager
let connection_manager = ConnectionManager::default();
let cm = ConnectionManager::default();
// ensure drains is empty
assert_eq!(connection_manager.drains.read().unwrap().len(), 0);
assert_eq!(connection_manager.connections().len(), 0);
assert_eq!(cm.drains.read().unwrap().len(), 0);
assert_eq!(cm.connections().len(), 0);

let register = |cm: &ConnectionManager, c: &InboundConnection| {
let cm = cm.clone();
let c = c.clone();

let watch = cm.register(&c).unwrap();
ConnectionGuard {
cm,
conn: c,
watch: Some(watch),
}
};

// create a new connection
let conn1 = InboundConnection {
Expand Down Expand Up @@ -502,70 +497,51 @@ mod tests {
};
let another_conn1 = conn1.clone();

connection_manager.register(&conn1);
let close1 = register(&cm, &conn1);
let another_close1 = register(&cm, &another_conn1);

// watch the connections
let close1 = connection_manager
.track(&conn1)
.expect("should not be None");
let another_close1 = connection_manager
.track(&another_conn1)
.expect("should not be None");
// ensure drains contains exactly 1 item
assert_eq!(connection_manager.drains.read().unwrap().len(), 1);
assert_eq!(connection_manager.connections().len(), 1);
assert_eq!(connection_manager.connections(), vec!(conn1.clone()));
assert_eq!(cm.drains.read().unwrap().len(), 1);
assert_eq!(cm.connections().len(), 1);
assert_eq!(cm.connections(), vec!(conn1.clone()));

// release conn1's clone
drop(another_close1);
connection_manager.release(&another_conn1);
// ensure drains still contains exactly 1 item
assert_eq!(connection_manager.drains.read().unwrap().len(), 1);
assert_eq!(connection_manager.connections().len(), 1);
assert_eq!(connection_manager.connections(), vec!(conn1.clone()));

connection_manager.register(&conn2);
// track conn2
let close2 = connection_manager
.track(&conn2)
.expect("should not be None");
assert_eq!(cm.drains.read().unwrap().len(), 1);
assert_eq!(cm.connections().len(), 1);
assert_eq!(cm.connections(), vec!(conn1.clone()));

let close2 = register(&cm, &conn2);
// ensure drains contains exactly 2 items
assert_eq!(connection_manager.drains.read().unwrap().len(), 2);
assert_eq!(connection_manager.connections().len(), 2);
let mut connections = connection_manager.connections();
assert_eq!(cm.drains.read().unwrap().len(), 2);
assert_eq!(cm.connections().len(), 2);
let mut connections = cm.connections();
connections.sort(); // ordering cannot be guaranteed without sorting
assert_eq!(connections, vec![conn1.clone(), conn2.clone()]);

// release conn1
drop(close1);
connection_manager.release(&conn1);
// ensure drains contains exactly 1 item
assert_eq!(connection_manager.drains.read().unwrap().len(), 1);
assert_eq!(connection_manager.connections().len(), 1);
assert_eq!(connection_manager.connections(), vec!(conn2.clone()));
assert_eq!(cm.drains.read().unwrap().len(), 1);
assert_eq!(cm.connections().len(), 1);
assert_eq!(cm.connections(), vec!(conn2.clone()));

// clone conn2 and track it
let another_conn2 = conn2.clone();
let another_close2 = connection_manager
.track(&another_conn2)
.expect("should not be None");
drop(close2);
let another_close2 = register(&cm, &another_conn2);
// release tracking on conn2
connection_manager.release(&conn2);
drop(close2);
// ensure drains still contains exactly 1 item
assert_eq!(connection_manager.drains.read().unwrap().len(), 1);
assert_eq!(connection_manager.connections().len(), 1);
assert_eq!(
connection_manager.connections(),
vec!(another_conn2.clone())
);
assert_eq!(cm.drains.read().unwrap().len(), 1);
assert_eq!(cm.connections().len(), 1);
assert_eq!(cm.connections(), vec!(another_conn2.clone()));

// release tracking on conn2's clone
drop(another_close2);
connection_manager.release(&another_conn2);
// ensure drains contains exactly 0 items
assert_eq!(connection_manager.drains.read().unwrap().len(), 0);
assert_eq!(connection_manager.connections().len(), 0);
assert_eq!(cm.drains.read().unwrap().len(), 0);
assert_eq!(cm.connections().len(), 0);
}

#[tokio::test]
Expand Down Expand Up @@ -612,9 +588,8 @@ mod tests {
dest_service: None,
};
// watch the connection
connection_manager.register(&conn1);
let close1 = connection_manager
.track(&conn1)
.register(&conn1)
.expect("should not be None");

// generate policy which denies everything
Expand Down
2 changes: 1 addition & 1 deletion src/proxy/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ impl OutboundConnection {
req: &Request,
connection_stats: &ConnectionResult,
) -> Result<(), Error> {
info!(
debug!(
"Proxying to {} using TCP via {} type {:?}",
req.destination, req.gateway, req.request_type
);
Expand Down

0 comments on commit c48c2c4

Please sign in to comment.