diff --git a/crates/anemo/src/network/connection_manager.rs b/crates/anemo/src/network/connection_manager.rs index 49fe4803..4762db01 100644 --- a/crates/anemo/src/network/connection_manager.rs +++ b/crates/anemo/src/network/connection_manager.rs @@ -244,27 +244,30 @@ impl ConnectionManager { let fut = async { let connection = connecting.await?; - // Check against limit - if let Some(limit) = config.max_concurrent_connections() { - // We've hit the limit - // TODO maybe have a way to temporatily hold on to a "slot" so that we can ensure - // we don't go over this limit if multiple connections come in simultaneously. - if active_peers.len() >= limit { - // check if this is a high affinity peer to bypass the limit - match known_peers.get(&connection.peer_id()) { - Some(PeerInfo { affinity, .. }) - if matches!(affinity, PeerAffinity::High) => - { - // Do nothing, let the connection through - } - // Connection doesn't meet the requirements to bypass the limit so bail - // TODO close the connection explicitly with a reason once we have machine - // readable errors - _ => { + // TODO close the connection explicitly with a reason once we have machine + // readable errors + match known_peers.get(&connection.peer_id()) { + Some(PeerInfo { affinity, .. }) if matches!(affinity, PeerAffinity::High) => { + // Do nothing, let the connection through + } + Some(PeerInfo { affinity, .. }) if matches!(affinity, PeerAffinity::Never) => { + return Err(anyhow::anyhow!( + "rejecting connection from peer {} due to having PeerAffinity::Never", + connection.peer_id() + )); + } + // Check connection Limits + _ => { + if let Some(limit) = config.max_concurrent_connections() { + // We've hit the limit + // TODO maybe have a way to temporatily hold on to a "slot" so that we can ensure + // we don't go over this limit if multiple connections come in simultaneously. + if active_peers.len() >= limit { + // Connection doesn't meet the requirements to bypass the limit so bail return Err(anyhow::anyhow!( "dropping connection from peer {} due to connection limits", connection.peer_id() - )) + )); } } } @@ -368,7 +371,8 @@ impl ConnectionManager { known_peers .values() .filter(|peer_info| { - peer_info.peer_id != self.endpoint.peer_id() // We don't dial ourself + !matches!(peer_info.affinity, PeerAffinity::Never) + && peer_info.peer_id != self.endpoint.peer_id() // We don't dial ourself && !peer_info.address.is_empty() // The peer has an address we can dial && !active_peers.contains(&peer_info.peer_id) // The node is not already connected. && !self.pending_dials.contains_key(&peer_info.peer_id) // There is no pending dial to this node. diff --git a/crates/anemo/src/network/tests.rs b/crates/anemo/src/network/tests.rs index 3120d92f..ebfb3bdb 100644 --- a/crates/anemo/src/network/tests.rs +++ b/crates/anemo/src/network/tests.rs @@ -210,6 +210,75 @@ async fn max_concurrent_connections_1() -> Result<()> { Ok(()) } +#[tokio::test] +async fn reject_peer_with_affinity_never() -> Result<()> { + let _gaurd = crate::init_tracing_for_testing(); + + let network_1 = build_network()?; + let network_2 = build_network()?; + + // Configure peer 2 with affinity never + let peer_info_2 = crate::types::PeerInfo { + peer_id: network_2.peer_id(), + affinity: crate::types::PeerAffinity::Never, + address: vec![], + }; + network_1.known_peers().insert(peer_info_2); + + // When peer 2 tries to connect peer 1 will reject it + network_2 + .connect_with_peer_id(network_1.local_addr(), network_1.peer_id()) + .await + .unwrap_err(); + + Ok(()) +} + +#[tokio::test] +async fn peers_with_affinity_never_are_not_dialed_in_the_background() -> Result<()> { + let _gaurd = crate::init_tracing_for_testing(); + + let network_1 = build_network()?; + let network_2 = build_network()?; + let network_3 = build_network()?; + + let mut subscriber_1 = network_1.subscribe()?.0; + + // Configure peer 2 with affinity never + let peer_info_2 = crate::types::PeerInfo { + peer_id: network_2.peer_id(), + affinity: crate::types::PeerAffinity::Never, + address: vec![], + }; + network_1.known_peers().insert(peer_info_2); + // Configure peer 3 with high affinity + let peer_info_3 = crate::types::PeerInfo { + peer_id: network_3.peer_id(), + affinity: crate::types::PeerAffinity::High, + address: vec![network_3.local_addr().into()], + }; + network_1.known_peers().insert(peer_info_3); + + // When peer 2 tries to connect peer 1 will reject it + network_2 + .connect_with_peer_id(network_1.local_addr(), network_1.peer_id()) + .await + .unwrap_err(); + + // We only ever see connections being made/lost with peer 3 and not peer 2 + let peer_id_3 = network_3.peer_id(); + assert_eq!(PeerEvent::NewPeer(peer_id_3), subscriber_1.recv().await?); + + drop(network_3); + + assert_eq!( + PeerEvent::LostPeer(peer_id_3, crate::types::DisconnectReason::ConnectionLost), + subscriber_1.recv().await? + ); + + Ok(()) +} + fn build_network() -> Result { build_network_with_addr("localhost:0") } diff --git a/crates/anemo/src/types/mod.rs b/crates/anemo/src/types/mod.rs index f81773e3..70ddc9e8 100644 --- a/crates/anemo/src/types/mod.rs +++ b/crates/anemo/src/types/mod.rs @@ -44,9 +44,12 @@ pub mod header { #[derive(Clone, Copy, Debug)] pub enum PeerAffinity { - /// Always attempt to maintain a connection with this Peer + /// Always attempt to maintain a connection with this Peer. High, - // None, + /// Never attempt to maintain a connection with this Peer. + /// + /// Inbound connection requests from these Peers are rejected. + Never, } #[derive(Clone, Debug)]