Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce a mechanism for setting maximum number of connections #19

Merged
merged 8 commits into from
Dec 17, 2022
37 changes: 37 additions & 0 deletions crates/anemo/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,39 @@ pub struct Config {
#[serde(skip_serializing_if = "Option::is_none")]
pub connection_backoff_ms: Option<u64>,

/// Set a timeout, in milliseconds, for all inbound and outbound connects.
///
/// In unspecified, this will default to `10,000` milliseconds.
#[serde(skip_serializing_if = "Option::is_none")]
pub connect_timeout_ms: Option<u64>,

/// Maximum number of concurrent connections to attempt to establish at a given point in time.
///
/// If unspecified, this will default to `100`.
#[serde(skip_serializing_if = "Option::is_none")]
pub max_concurrent_outstanding_connecting_connections: Option<usize>,

/// Maximum number of concurrent connections to have established at a given point in time.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ty for the excellent docs

///
/// This limit is applied in the following ways:
/// - Inbound connections from [`KnownPeers`] with [`PeerAffinity::High`] bypass this limit. All
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small request - Can you reword this to make it clearer whether the "limit bypass" cases count towards the limit, even if they bypass it? (I think yes but not 100% sure)

/// other inbound connections are only accepted if the total number of inbound and outbound
/// connections, irrespective of affinity, is less than this limit.
/// - Outbound connections explicitly made by the application via [`Network::connect`] or
/// [`Network::connect_with_peer_id`] bypass this limit.
/// - Outbound connections made in the background, due to configured [`KnownPeers`], to peers with
/// [`PeerAffinity::High`] bypass this limit and are always attempted, while peers with lower
/// affinity respect this limit.
///
/// If unspecified, there will be no limit on the number of concurrent connections.
///
/// [`KnownPeers`]: crate::KnownPeers
/// [`PeerAffinity::High`]: crate::types::PeerAffinity::High
/// [`Network::connect`]: crate::Network::connect
/// [`Network::connect_with_peer_id`]: crate::Network::connect_with_peer_id
#[serde(skip_serializing_if = "Option::is_none")]
pub max_concurrent_connections: Option<usize>,

/// Size of the broadcast channel use for subscribing to
/// [`PeerEvent`](crate::types::PeerEvent)s via
/// [`Network::subscribe`](crate::Network::subscribe).
Expand Down Expand Up @@ -161,13 +188,23 @@ impl Config {
Duration::from_millis(self.connection_backoff_ms.unwrap_or(CONNECTION_BACKOFF_MS))
}

pub(crate) fn connect_timeout(&self) -> Duration {
const CONNECTION_TIMEOUT_MS: u64 = 10_000; // 10 seconds

Duration::from_millis(self.connect_timeout_ms.unwrap_or(CONNECTION_TIMEOUT_MS))
}

pub(crate) fn max_concurrent_outstanding_connecting_connections(&self) -> usize {
const MAX_CONCURRENT_OUTSTANDING_CONNECTING_CONNECTIONS: usize = 100;

self.max_concurrent_outstanding_connecting_connections
.unwrap_or(MAX_CONCURRENT_OUTSTANDING_CONNECTING_CONNECTIONS)
}

pub(crate) fn max_concurrent_connections(&self) -> Option<usize> {
self.max_concurrent_connections
}

pub(crate) fn peer_event_broadcast_channel_capacity(&self) -> usize {
const PEER_EVENT_BROADCAST_CHANNEL_CAPACITY: usize = 128;

Expand Down
124 changes: 104 additions & 20 deletions crates/anemo/src/network/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ use crate::{
config::Config,
connection::Connection,
endpoint::{Connecting, Endpoint},
types::{Address, DisconnectReason, PeerEvent, PeerInfo},
types::{Address, DisconnectReason, PeerAffinity, PeerEvent, PeerInfo},
ConnectionOrigin, PeerId, Request, Response, Result,
};
use bytes::Bytes;
use futures::FutureExt;
use std::{
collections::{hash_map::Entry, HashMap},
convert::Infallible,
Expand Down Expand Up @@ -227,13 +226,67 @@ impl ConnectionManager {

fn handle_incoming(&mut self, connecting: Connecting) {
trace!("received new incoming connection");
let connecting = connecting.map(|connecting_result| ConnectingOutput {

self.pending_connections.spawn(Self::handle_incoming_task(
connecting,
self.config.clone(),
self.active_peers.clone(),
self.known_peers.clone(),
));
}

async fn handle_incoming_task(
connecting: Connecting,
config: Arc<Config>,
active_peers: ActivePeers,
known_peers: KnownPeers,
) -> ConnectingOutput {
let fut = async {
let connection = connecting.await?;

// TODO close the connection explicitly with a reason once we have machine
// readable errors. See https://github.com/MystenLabs/anemo/issues/13 for more info.
match known_peers.get(&connection.peer_id()) {
Some(PeerInfo { affinity, .. }) if matches!(affinity, PeerAffinity::High) => {
// Do nothing, let the connection through
}
Some(PeerInfo { affinity, .. }) if matches!(affinity, PeerAffinity::Never) => {
return Err(anyhow::anyhow!(
"rejecting connection from peer {} due to having PeerAffinity::Never",
connection.peer_id()
));
}
// Check connection Limits
_ => {
if let Some(limit) = config.max_concurrent_connections() {
// We've hit the limit
// TODO maybe have a way to temporatily hold on to a "slot" so that we can ensure
// we don't go over this limit if multiple connections come in simultaneously.
if active_peers.len() >= limit {
// Connection doesn't meet the requirements to bypass the limit so bail
return Err(anyhow::anyhow!(
"dropping connection from peer {} due to connection limits",
connection.peer_id()
));
}
}
}
}

super::wire::handshake(connection).await
};

let connecting_result = tokio::time::timeout(config.connect_timeout(), fut)
.await
.map_err(Into::into)
.and_then(std::convert::identity);

ConnectingOutput {
connecting_result,
maybe_oneshot: None,
target_address: None,
target_peer_id: None,
});
self.pending_connections.spawn(connecting);
}
}

fn handle_connecting_result(
Expand Down Expand Up @@ -318,7 +371,8 @@ impl ConnectionManager {
known_peers
.values()
.filter(|peer_info| {
peer_info.peer_id != self.endpoint.peer_id() // We don't dial ourself
!matches!(peer_info.affinity, PeerAffinity::Never)
&& peer_info.peer_id != self.endpoint.peer_id() // We don't dial ourself
&& !peer_info.address.is_empty() // The peer has an address we can dial
&& !active_peers.contains(&peer_info.peer_id) // The node is not already connected.
&& !self.pending_dials.contains_key(&peer_info.peer_id) // There is no pending dial to this node.
Expand Down Expand Up @@ -364,26 +418,48 @@ impl ConnectionManager {
peer_id: Option<PeerId>,
oneshot: oneshot::Sender<Result<PeerId>>,
) {
let target_address = Some(address.clone());
let connecting = if let Some(peer_id) = peer_id {
let target_address = address.clone();
let maybe_connecting = if let Some(peer_id) = peer_id {
self.endpoint
.connect_with_expected_peer_id(address, peer_id)
} else {
self.endpoint.connect(address)
};
let connecting = async move {
let connecting_result = match connecting {
Ok(connecting) => connecting.await,
Err(e) => Err(e),
};
ConnectingOutput {
connecting_result,
maybe_oneshot: Some(oneshot),
target_address,
target_peer_id: peer_id,
}
self.pending_connections.spawn(Self::dial_peer_task(
maybe_connecting,
target_address,
peer_id,
oneshot,
self.config.clone(),
));
}

// TODO maybe look at cloning the endpoint so that we can try multiple addresses in the event
// Address resolves to multiple ips.
async fn dial_peer_task(
maybe_connecting: Result<Connecting>,
target_address: Address,
peer_id: Option<PeerId>,
oneshot: oneshot::Sender<Result<PeerId>>,
config: Arc<Config>,
) -> ConnectingOutput {
let fut = async {
let connection = maybe_connecting?.await?;

super::wire::handshake(connection).await
};
self.pending_connections.spawn(connecting);

let connecting_result = tokio::time::timeout(config.connect_timeout(), fut)
.await
.map_err(Into::into)
.and_then(std::convert::identity);

ConnectingOutput {
connecting_result,
maybe_oneshot: Some(oneshot),
target_address: Some(target_address),
target_peer_id: peer_id,
}
}
}

Expand Down Expand Up @@ -479,6 +555,10 @@ impl ActivePeers {
self.0.write().unwrap()
}

fn len(&self) -> usize {
self.inner().len()
}

pub fn downgrade(&self) -> ActivePeersRef {
ActivePeersRef(Arc::downgrade(&self.0))
}
Expand Down Expand Up @@ -518,6 +598,10 @@ impl ActivePeersInner {
self.connections.keys().copied().collect()
}

fn len(&self) -> usize {
self.connections.len()
}

fn get(&self, peer_id: &PeerId) -> Option<Connection> {
self.connections.get(peer_id).cloned()
}
Expand Down
Loading