diff --git a/crates/anemo/src/config.rs b/crates/anemo/src/config.rs index 5bf59f1b..aef70162 100644 --- a/crates/anemo/src/config.rs +++ b/crates/anemo/src/config.rs @@ -45,12 +45,39 @@ pub struct Config { #[serde(skip_serializing_if = "Option::is_none")] pub connection_backoff_ms: Option, + /// Set a timeout, in milliseconds, for all inbound and outbound connects. + /// + /// In unspecified, this will default to `10,000` milliseconds. + #[serde(skip_serializing_if = "Option::is_none")] + pub connect_timeout_ms: Option, + /// Maximum number of concurrent connections to attempt to establish at a given point in time. /// /// If unspecified, this will default to `100`. #[serde(skip_serializing_if = "Option::is_none")] pub max_concurrent_outstanding_connecting_connections: Option, + /// Maximum number of concurrent connections to have established at a given point in time. + /// + /// This limit is applied in the following ways: + /// - Inbound connections from [`KnownPeers`] with [`PeerAffinity::High`] bypass this limit. All + /// other inbound connections are only accepted if the total number of inbound and outbound + /// connections, irrespective of affinity, is less than this limit. + /// - Outbound connections explicitly made by the application via [`Network::connect`] or + /// [`Network::connect_with_peer_id`] bypass this limit. + /// - Outbound connections made in the background, due to configured [`KnownPeers`], to peers with + /// [`PeerAffinity::High`] bypass this limit and are always attempted, while peers with lower + /// affinity respect this limit. + /// + /// If unspecified, there will be no limit on the number of concurrent connections. + /// + /// [`KnownPeers`]: crate::KnownPeers + /// [`PeerAffinity::High`]: crate::types::PeerAffinity::High + /// [`Network::connect`]: crate::Network::connect + /// [`Network::connect_with_peer_id`]: crate::Network::connect_with_peer_id + #[serde(skip_serializing_if = "Option::is_none")] + pub max_concurrent_connections: Option, + /// Size of the broadcast channel use for subscribing to /// [`PeerEvent`](crate::types::PeerEvent)s via /// [`Network::subscribe`](crate::Network::subscribe). @@ -161,6 +188,12 @@ impl Config { Duration::from_millis(self.connection_backoff_ms.unwrap_or(CONNECTION_BACKOFF_MS)) } + pub(crate) fn connect_timeout(&self) -> Duration { + const CONNECTION_TIMEOUT_MS: u64 = 10_000; // 10 seconds + + Duration::from_millis(self.connect_timeout_ms.unwrap_or(CONNECTION_TIMEOUT_MS)) + } + pub(crate) fn max_concurrent_outstanding_connecting_connections(&self) -> usize { const MAX_CONCURRENT_OUTSTANDING_CONNECTING_CONNECTIONS: usize = 100; @@ -168,6 +201,10 @@ impl Config { .unwrap_or(MAX_CONCURRENT_OUTSTANDING_CONNECTING_CONNECTIONS) } + pub(crate) fn max_concurrent_connections(&self) -> Option { + self.max_concurrent_connections + } + pub(crate) fn peer_event_broadcast_channel_capacity(&self) -> usize { const PEER_EVENT_BROADCAST_CHANNEL_CAPACITY: usize = 128; diff --git a/crates/anemo/src/network/connection_manager.rs b/crates/anemo/src/network/connection_manager.rs index afdd7aeb..3ead7ffa 100644 --- a/crates/anemo/src/network/connection_manager.rs +++ b/crates/anemo/src/network/connection_manager.rs @@ -3,11 +3,10 @@ use crate::{ config::Config, connection::Connection, endpoint::{Connecting, Endpoint}, - types::{Address, DisconnectReason, PeerEvent, PeerInfo}, + types::{Address, DisconnectReason, PeerAffinity, PeerEvent, PeerInfo}, ConnectionOrigin, PeerId, Request, Response, Result, }; use bytes::Bytes; -use futures::FutureExt; use std::{ collections::{hash_map::Entry, HashMap}, convert::Infallible, @@ -227,13 +226,67 @@ impl ConnectionManager { fn handle_incoming(&mut self, connecting: Connecting) { trace!("received new incoming connection"); - let connecting = connecting.map(|connecting_result| ConnectingOutput { + + self.pending_connections.spawn(Self::handle_incoming_task( + connecting, + self.config.clone(), + self.active_peers.clone(), + self.known_peers.clone(), + )); + } + + async fn handle_incoming_task( + connecting: Connecting, + config: Arc, + active_peers: ActivePeers, + known_peers: KnownPeers, + ) -> ConnectingOutput { + let fut = async { + let connection = connecting.await?; + + // TODO close the connection explicitly with a reason once we have machine + // readable errors. See https://github.com/MystenLabs/anemo/issues/13 for more info. + match known_peers.get(&connection.peer_id()) { + Some(PeerInfo { affinity, .. }) if matches!(affinity, PeerAffinity::High) => { + // Do nothing, let the connection through + } + Some(PeerInfo { affinity, .. }) if matches!(affinity, PeerAffinity::Never) => { + return Err(anyhow::anyhow!( + "rejecting connection from peer {} due to having PeerAffinity::Never", + connection.peer_id() + )); + } + // Check connection Limits + _ => { + if let Some(limit) = config.max_concurrent_connections() { + // We've hit the limit + // TODO maybe have a way to temporatily hold on to a "slot" so that we can ensure + // we don't go over this limit if multiple connections come in simultaneously. + if active_peers.len() >= limit { + // Connection doesn't meet the requirements to bypass the limit so bail + return Err(anyhow::anyhow!( + "dropping connection from peer {} due to connection limits", + connection.peer_id() + )); + } + } + } + } + + super::wire::handshake(connection).await + }; + + let connecting_result = tokio::time::timeout(config.connect_timeout(), fut) + .await + .map_err(Into::into) + .and_then(std::convert::identity); + + ConnectingOutput { connecting_result, maybe_oneshot: None, target_address: None, target_peer_id: None, - }); - self.pending_connections.spawn(connecting); + } } fn handle_connecting_result( @@ -318,7 +371,8 @@ impl ConnectionManager { known_peers .values() .filter(|peer_info| { - peer_info.peer_id != self.endpoint.peer_id() // We don't dial ourself + !matches!(peer_info.affinity, PeerAffinity::Never) + && peer_info.peer_id != self.endpoint.peer_id() // We don't dial ourself && !peer_info.address.is_empty() // The peer has an address we can dial && !active_peers.contains(&peer_info.peer_id) // The node is not already connected. && !self.pending_dials.contains_key(&peer_info.peer_id) // There is no pending dial to this node. @@ -364,26 +418,48 @@ impl ConnectionManager { peer_id: Option, oneshot: oneshot::Sender>, ) { - let target_address = Some(address.clone()); - let connecting = if let Some(peer_id) = peer_id { + let target_address = address.clone(); + let maybe_connecting = if let Some(peer_id) = peer_id { self.endpoint .connect_with_expected_peer_id(address, peer_id) } else { self.endpoint.connect(address) }; - let connecting = async move { - let connecting_result = match connecting { - Ok(connecting) => connecting.await, - Err(e) => Err(e), - }; - ConnectingOutput { - connecting_result, - maybe_oneshot: Some(oneshot), - target_address, - target_peer_id: peer_id, - } + self.pending_connections.spawn(Self::dial_peer_task( + maybe_connecting, + target_address, + peer_id, + oneshot, + self.config.clone(), + )); + } + + // TODO maybe look at cloning the endpoint so that we can try multiple addresses in the event + // Address resolves to multiple ips. + async fn dial_peer_task( + maybe_connecting: Result, + target_address: Address, + peer_id: Option, + oneshot: oneshot::Sender>, + config: Arc, + ) -> ConnectingOutput { + let fut = async { + let connection = maybe_connecting?.await?; + + super::wire::handshake(connection).await }; - self.pending_connections.spawn(connecting); + + let connecting_result = tokio::time::timeout(config.connect_timeout(), fut) + .await + .map_err(Into::into) + .and_then(std::convert::identity); + + ConnectingOutput { + connecting_result, + maybe_oneshot: Some(oneshot), + target_address: Some(target_address), + target_peer_id: peer_id, + } } } @@ -479,6 +555,10 @@ impl ActivePeers { self.0.write().unwrap() } + fn len(&self) -> usize { + self.inner().len() + } + pub fn downgrade(&self) -> ActivePeersRef { ActivePeersRef(Arc::downgrade(&self.0)) } @@ -518,6 +598,10 @@ impl ActivePeersInner { self.connections.keys().copied().collect() } + fn len(&self) -> usize { + self.connections.len() + } + fn get(&self, peer_id: &PeerId) -> Option { self.connections.get(peer_id).cloned() } diff --git a/crates/anemo/src/network/tests.rs b/crates/anemo/src/network/tests.rs index 48bd1791..ebfb3bdb 100644 --- a/crates/anemo/src/network/tests.rs +++ b/crates/anemo/src/network/tests.rs @@ -1,4 +1,4 @@ -use crate::{Network, NetworkRef, Request, Response, Result}; +use crate::{types::PeerEvent, Network, NetworkRef, Request, Response, Result}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use std::convert::Infallible; use tower::{util::BoxCloneService, ServiceExt}; @@ -73,6 +73,52 @@ async fn connect_with_invalid_peer_id() -> Result<()> { Ok(()) } +#[tokio::test] +async fn connect_with_invalid_peer_id_ensure_server_doesnt_succeed() -> Result<()> { + let _gaurd = crate::init_tracing_for_testing(); + + let network_1 = build_network()?; + let network_2 = build_network()?; + let network_3 = build_network()?; + + let (mut subscriber_2, _) = network_2.subscribe().unwrap(); + + // Try to dial network 2, but with network 3's peer id + network_1 + .connect_with_peer_id(network_2.local_addr(), network_3.peer_id()) + .await + .unwrap_err(); + + tokio::task::yield_now().await; + + // network 2 dialing 3 should succeed + network_2 + .connect_with_peer_id(network_3.local_addr(), network_3.peer_id()) + .await + .unwrap(); + + assert_eq!( + subscriber_2.try_recv(), + Ok(PeerEvent::NewPeer(network_3.peer_id())) + ); + + drop(network_2); + + assert_eq!( + subscriber_2.recv().await, + Ok(PeerEvent::LostPeer( + network_3.peer_id(), + crate::types::DisconnectReason::ConnectionLost + )), + ); + assert_eq!( + subscriber_2.recv().await, + Err(tokio::sync::broadcast::error::RecvError::Closed), + ); + + Ok(()) +} + #[tokio::test] async fn connect_with_hostname() -> Result<()> { let _gaurd = crate::init_tracing_for_testing(); @@ -100,6 +146,139 @@ async fn connect_with_hostname() -> Result<()> { Ok(()) } +#[tokio::test] +async fn max_concurrent_connections_0() -> Result<()> { + let _gaurd = crate::init_tracing_for_testing(); + + // Setup a network which disallows all incoming connections + let config = crate::Config { + max_concurrent_connections: Some(0), + ..Default::default() + }; + let network_1 = Network::bind("localhost:0") + .random_private_key() + .server_name("test") + .config(config) + .start(echo_service())?; + + let network_2 = build_network()?; + + network_2 + .connect_with_peer_id(network_1.local_addr(), network_1.peer_id()) + .await + .unwrap_err(); + + Ok(()) +} + +#[tokio::test] +async fn max_concurrent_connections_1() -> Result<()> { + let _gaurd = crate::init_tracing_for_testing(); + + // Setup a network which disallows all incoming connections + let config = crate::Config { + max_concurrent_connections: Some(1), + ..Default::default() + }; + let network_1 = Network::bind("localhost:0") + .random_private_key() + .server_name("test") + .config(config) + .start(echo_service())?; + + let network_2 = build_network()?; + let network_3 = build_network()?; + + // first connection succeeds + network_2 + .connect_with_peer_id(network_1.local_addr(), network_1.peer_id()) + .await + .unwrap(); + + // second connection fails + network_3 + .connect_with_peer_id(network_1.local_addr(), network_1.peer_id()) + .await + .unwrap_err(); + + // explicitly making an outbound connection bypasses this limit + network_1 + .connect_with_peer_id(network_3.local_addr(), network_3.peer_id()) + .await + .unwrap(); + + Ok(()) +} + +#[tokio::test] +async fn reject_peer_with_affinity_never() -> Result<()> { + let _gaurd = crate::init_tracing_for_testing(); + + let network_1 = build_network()?; + let network_2 = build_network()?; + + // Configure peer 2 with affinity never + let peer_info_2 = crate::types::PeerInfo { + peer_id: network_2.peer_id(), + affinity: crate::types::PeerAffinity::Never, + address: vec![], + }; + network_1.known_peers().insert(peer_info_2); + + // When peer 2 tries to connect peer 1 will reject it + network_2 + .connect_with_peer_id(network_1.local_addr(), network_1.peer_id()) + .await + .unwrap_err(); + + Ok(()) +} + +#[tokio::test] +async fn peers_with_affinity_never_are_not_dialed_in_the_background() -> Result<()> { + let _gaurd = crate::init_tracing_for_testing(); + + let network_1 = build_network()?; + let network_2 = build_network()?; + let network_3 = build_network()?; + + let mut subscriber_1 = network_1.subscribe()?.0; + + // Configure peer 2 with affinity never + let peer_info_2 = crate::types::PeerInfo { + peer_id: network_2.peer_id(), + affinity: crate::types::PeerAffinity::Never, + address: vec![], + }; + network_1.known_peers().insert(peer_info_2); + // Configure peer 3 with high affinity + let peer_info_3 = crate::types::PeerInfo { + peer_id: network_3.peer_id(), + affinity: crate::types::PeerAffinity::High, + address: vec![network_3.local_addr().into()], + }; + network_1.known_peers().insert(peer_info_3); + + // When peer 2 tries to connect peer 1 will reject it + network_2 + .connect_with_peer_id(network_1.local_addr(), network_1.peer_id()) + .await + .unwrap_err(); + + // We only ever see connections being made/lost with peer 3 and not peer 2 + let peer_id_3 = network_3.peer_id(); + assert_eq!(PeerEvent::NewPeer(peer_id_3), subscriber_1.recv().await?); + + drop(network_3); + + assert_eq!( + PeerEvent::LostPeer(peer_id_3, crate::types::DisconnectReason::ConnectionLost), + subscriber_1.recv().await? + ); + + Ok(()) +} + fn build_network() -> Result { build_network_with_addr("localhost:0") } diff --git a/crates/anemo/src/network/wire.rs b/crates/anemo/src/network/wire.rs index 9f407025..622cd085 100644 --- a/crates/anemo/src/network/wire.rs +++ b/crates/anemo/src/network/wire.rs @@ -29,6 +29,33 @@ pub(crate) fn network_message_frame_codec() -> LengthDelimitedCodec { .new_codec() } +/// Anemo requires mTLS in order to ensure that both sides of the connections are authenticated by +/// the other. This is specifically required so that regaurdless of which peer initiaties a +/// connection, both sides will be able to know the PeerId of the other side. One challenge with +/// this is that due to the ordering of how certs are exchanged, the client side may think the +/// connection is fully established when in reality the server may still reject the connection. To +/// handle this anemo has a very brief handshake, essentially an ACK, that is initiated by the +/// server side to inform the client that the server has finished establishing the connection. +/// +/// Performing this small handshake will also enable the server side to make decisions about +/// whether to keep the connection based on things like the client side's PeerId. +pub(crate) async fn handshake( + connection: crate::connection::Connection, +) -> Result { + match connection.origin() { + crate::ConnectionOrigin::Inbound => { + let mut send_stream = connection.open_uni().await?; + write_version_frame(&mut send_stream, Version::V1).await?; + send_stream.finish().await?; + } + crate::ConnectionOrigin::Outbound => { + let mut recv_stream = connection.accept_uni().await?; + read_version_frame(&mut recv_stream).await?; + } + } + Ok(connection) +} + pub(crate) async fn read_version_frame( recv_stream: &mut T, ) -> Result { diff --git a/crates/anemo/src/types/mod.rs b/crates/anemo/src/types/mod.rs index f81773e3..70ddc9e8 100644 --- a/crates/anemo/src/types/mod.rs +++ b/crates/anemo/src/types/mod.rs @@ -44,9 +44,12 @@ pub mod header { #[derive(Clone, Copy, Debug)] pub enum PeerAffinity { - /// Always attempt to maintain a connection with this Peer + /// Always attempt to maintain a connection with this Peer. High, - // None, + /// Never attempt to maintain a connection with this Peer. + /// + /// Inbound connection requests from these Peers are rejected. + Never, } #[derive(Clone, Debug)]