Skip to content

Commit

Permalink
network/metrics: Expose number of banned peers from peerstore and ena…
Browse files Browse the repository at this point in the history
…ble litep2p metrics (#4977)

This PR extends the metrics exposed by the peerstore with the total
number of banned peers.

The new metric is exposed under
`substrate_sub_libp2p_peerset_num_banned_peers`.

To easily extend metrics in the future, the `fn num_known_peers` is
removed in favor of `fn status`.

While at it, enable the metrics for litep2p:
- total number of peers from peerstore (needed to debug memory
consumption)
- total number of banned peers from peerstore (needed to debug
reputation bans and disconnects)

Have added a couple of tests to validate that the number of banned peers
is exposed properly.

Part of: #4681


### Testing Done
Using [subp2p-explorer](https://github.com/lexnv/subp2p-explorer) have
submitted random data on tx protocol.
The peer gets banned, the num of banned peers is incremented then the
peer is disconnected.

cc @paritytech/networking

---------

Signed-off-by: Alexandru Vasile <[email protected]>
Co-authored-by: Dmitry Markin <[email protected]>
  • Loading branch information
lexnv and dmitry-markin authored Jul 23, 2024
1 parent 9974a68 commit 7f905e2
Show file tree
Hide file tree
Showing 22 changed files with 234 additions and 87 deletions.
6 changes: 4 additions & 2 deletions cumulus/client/relay-chain-minimal-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,10 @@ async fn new_minimal_relay_chain<Block: BlockT, Network: NetworkBackend<RelayBlo
relay_chain_rpc_client: Arc<BlockChainRpcClient>,
) -> Result<NewMinimalNode, RelayChainError> {
let role = config.role.clone();
let mut net_config =
sc_network::config::FullNetworkConfiguration::<_, _, Network>::new(&config.network);
let mut net_config = sc_network::config::FullNetworkConfiguration::<_, _, Network>::new(
&config.network,
config.prometheus_config.as_ref().map(|cfg| cfg.registry.clone()),
);
let metrics = Network::register_notification_metrics(
config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
);
Expand Down
5 changes: 4 additions & 1 deletion cumulus/polkadot-parachain/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,10 @@ pub(crate) trait NodeSpec {
let prometheus_registry = parachain_config.prometheus_registry().cloned();
let transaction_pool = params.transaction_pool.clone();
let import_queue_service = params.import_queue.service();
let net_config = FullNetworkConfiguration::<_, _, Net>::new(&parachain_config.network);
let net_config = FullNetworkConfiguration::<_, _, Net>::new(
&parachain_config.network,
prometheus_registry.clone(),
);

let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) =
build_network(BuildNetworkParams {
Expand Down
8 changes: 5 additions & 3 deletions cumulus/test/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,11 @@ where
.map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;

let import_queue_service = params.import_queue.service();
let net_config = FullNetworkConfiguration::<Block, Hash, Net>::new(&parachain_config.network);
let prometheus_registry = parachain_config.prometheus_registry().cloned();
let net_config = FullNetworkConfiguration::<Block, Hash, Net>::new(
&parachain_config.network,
prometheus_registry.clone(),
);

let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) =
build_network(BuildNetworkParams {
Expand All @@ -372,8 +376,6 @@ where
})
.await?;

let prometheus_registry = parachain_config.prometheus_registry().cloned();

let keystore = params.keystore_container.keystore();
let rpc_builder = {
let client = client.clone();
Expand Down
6 changes: 4 additions & 2 deletions polkadot/node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -838,8 +838,10 @@ pub fn new_full<
let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht;
let auth_disc_public_addresses = config.network.public_addresses.clone();

let mut net_config =
sc_network::config::FullNetworkConfiguration::<_, _, Network>::new(&config.network);
let mut net_config = sc_network::config::FullNetworkConfiguration::<_, _, Network>::new(
&config.network,
config.prometheus_config.as_ref().map(|cfg| cfg.registry.clone()),
);

let genesis_hash = client.block_hash(0).ok().flatten().expect("Genesis block exists; qed");
let peer_store_handle = net_config.peer_store_handle();
Expand Down
6 changes: 4 additions & 2 deletions substrate/bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,10 @@ pub fn new_full_base<N: NetworkBackend<Block, <Block as BlockT>::Hash>>(
let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht;
let auth_disc_public_addresses = config.network.public_addresses.clone();

let mut net_config =
sc_network::config::FullNetworkConfiguration::<_, _, N>::new(&config.network);
let mut net_config = sc_network::config::FullNetworkConfiguration::<_, _, N>::new(
&config.network,
config.prometheus_config.as_ref().map(|cfg| cfg.registry.clone()),
);

let genesis_hash = client.block_hash(0).ok().flatten().expect("Genesis block exists; qed");
let peer_store_handle = net_config.peer_store_handle();
Expand Down
8 changes: 6 additions & 2 deletions substrate/client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,13 +790,16 @@ pub struct FullNetworkConfiguration<B: BlockT + 'static, H: ExHashT, N: NetworkB

/// Handle to [`PeerStore`](crate::peer_store::PeerStore).
peer_store_handle: Arc<dyn PeerStoreProvider>,

/// Registry for recording prometheus metrics to.
pub metrics_registry: Option<Registry>,
}

impl<B: BlockT + 'static, H: ExHashT, N: NetworkBackend<B, H>> FullNetworkConfiguration<B, H, N> {
/// Create new [`FullNetworkConfiguration`].
pub fn new(network_config: &NetworkConfiguration) -> Self {
pub fn new(network_config: &NetworkConfiguration, metrics_registry: Option<Registry>) -> Self {
let bootnodes = network_config.boot_nodes.iter().map(|bootnode| bootnode.peer_id).collect();
let peer_store = N::peer_store(bootnodes);
let peer_store = N::peer_store(bootnodes, metrics_registry.clone());
let peer_store_handle = peer_store.handle();

Self {
Expand All @@ -805,6 +808,7 @@ impl<B: BlockT + 'static, H: ExHashT, N: NetworkBackend<B, H>> FullNetworkConfig
notification_protocols: Vec::new(),
request_response_protocols: Vec::new(),
network_config: network_config.clone(),
metrics_registry,
}
}

Expand Down
7 changes: 5 additions & 2 deletions substrate/client/network/src/litep2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,11 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
Arc::clone(&self.network_service)
}

fn peer_store(bootnodes: Vec<sc_network_types::PeerId>) -> Self::PeerStore {
Peerstore::new(bootnodes)
fn peer_store(
bootnodes: Vec<sc_network_types::PeerId>,
metrics_registry: Option<Registry>,
) -> Self::PeerStore {
Peerstore::new(bootnodes, metrics_registry)
}

fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
Expand Down
104 changes: 77 additions & 27 deletions substrate/client/network/src/litep2p/peerstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
use crate::{
peer_store::{PeerStoreProvider, ProtocolHandle},
service::traits::PeerStore,
service::{metrics::PeerStoreMetrics, traits::PeerStore},
ObservedRole, ReputationChange,
};

use parking_lot::Mutex;
use prometheus_endpoint::Registry;
use wasm_timer::Delay;

use sc_network_types::PeerId;
Expand Down Expand Up @@ -118,12 +119,22 @@ impl PeerInfo {
pub struct PeerstoreHandleInner {
peers: HashMap<PeerId, PeerInfo>,
protocols: Vec<Arc<dyn ProtocolHandle>>,
metrics: Option<PeerStoreMetrics>,
}

#[derive(Debug, Clone, Default)]
pub struct PeerstoreHandle(Arc<Mutex<PeerstoreHandleInner>>);

impl PeerstoreHandle {
/// Constructs a new [`PeerstoreHandle`].
fn new(
peers: HashMap<PeerId, PeerInfo>,
protocols: Vec<Arc<dyn ProtocolHandle>>,
metrics: Option<PeerStoreMetrics>,
) -> Self {
Self(Arc::new(Mutex::new(PeerstoreHandleInner { peers, protocols, metrics })))
}

/// Add known peer to [`Peerstore`].
pub fn add_known_peer(&self, peer: PeerId) {
self.0
Expand All @@ -150,8 +161,18 @@ impl PeerstoreHandle {

// Retain only entries with non-zero reputation values or not expired ones.
let now = Instant::now();
lock.peers
.retain(|_, info| info.reputation != 0 || info.last_updated + FORGET_AFTER > now);
let mut num_banned_peers = 0;
lock.peers.retain(|_, info| {
if info.is_banned() {
num_banned_peers += 1;
}
info.reputation != 0 || info.last_updated + FORGET_AFTER > now
});

if let Some(metrics) = &lock.metrics {
metrics.num_discovered.set(lock.peers.len() as u64);
metrics.num_banned_peers.set(num_banned_peers);
}
}
}

Expand Down Expand Up @@ -240,14 +261,6 @@ impl PeerStoreProvider for PeerstoreHandle {
.collect::<Vec<_>>()
}

/// Get the number of known peers.
///
/// This number might not include some connected peers in rare cases when their reputation
/// was not updated for one hour, because their entries in [`PeerStore`] were dropped.
fn num_known_peers(&self) -> usize {
self.0.lock().peers.len()
}

/// Add known peer.
fn add_known_peer(&self, peer: PeerId) {
self.0.lock().peers.entry(peer).or_default().last_updated = Instant::now();
Expand All @@ -270,21 +283,23 @@ pub struct Peerstore {

impl Peerstore {
/// Create new [`Peerstore`].
pub fn new(bootnodes: Vec<PeerId>) -> Self {
let peerstore_handle = PeerstoreHandle(Arc::new(Mutex::new(Default::default())));

for bootnode in bootnodes {
peerstore_handle.add_known_peer(bootnode);
}

Self { peerstore_handle }
}

/// Create new [`Peerstore`] from a [`PeerstoreHandle`].
pub fn from_handle(peerstore_handle: PeerstoreHandle, bootnodes: Vec<PeerId>) -> Self {
for bootnode in bootnodes {
peerstore_handle.add_known_peer(bootnode);
}
pub fn new(bootnodes: Vec<PeerId>, metrics_registry: Option<Registry>) -> Self {
let metrics = if let Some(registry) = &metrics_registry {
PeerStoreMetrics::register(registry)
.map_err(|err| {
log::error!(target: LOG_TARGET, "Failed to register peer store metrics: {}", err);
err
})
.ok()
} else {
None
};

let peerstore_handle = PeerstoreHandle::new(
bootnodes.iter().map(|peer_id| (*peer_id, PeerInfo::default())).collect(),
Vec::new(),
metrics,
);

Self { peerstore_handle }
}
Expand Down Expand Up @@ -336,7 +351,7 @@ impl PeerStore for Peerstore {

#[cfg(test)]
mod tests {
use super::PeerInfo;
use super::{PeerInfo, PeerStoreProvider, Peerstore};

#[test]
fn decaying_zero_reputation_yields_zero() {
Expand Down Expand Up @@ -403,4 +418,39 @@ mod tests {
peer_info.decay_reputation(SECONDS / 2);
assert_eq!(peer_info.reputation, 0);
}

#[test]
fn report_banned_peers() {
let peer_a = sc_network_types::PeerId::random();
let peer_b = sc_network_types::PeerId::random();
let peer_c = sc_network_types::PeerId::random();

let metrics_registry = prometheus_endpoint::Registry::new();
let mut peerstore = Peerstore::new(
vec![peer_a, peer_b, peer_c].into_iter().map(Into::into).collect(),
Some(metrics_registry),
);
let metrics = peerstore.peerstore_handle.0.lock().metrics.as_ref().unwrap().clone();
let handle = peerstore.handle();

// Check initial state. Advance time to propagate peers.
handle.progress_time(1);
assert_eq!(metrics.num_discovered.get(), 3);
assert_eq!(metrics.num_banned_peers.get(), 0);

// Report 2 peers with a negative reputation.
handle.report_peer(
peer_a,
sc_network_common::types::ReputationChange { value: i32::MIN, reason: "test".into() },
);
handle.report_peer(
peer_b,
sc_network_common::types::ReputationChange { value: i32::MIN, reason: "test".into() },
);

// Advance time to propagate peers.
handle.progress_time(1);
assert_eq!(metrics.num_discovered.get(), 3);
assert_eq!(metrics.num_banned_peers.get(), 2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async fn test_once() {
})
.collect();

let peerstore = Peerstore::new(bootnodes);
let peerstore = Peerstore::new(bootnodes, None);
let peer_store_handle = peerstore.handle();

let (mut peerset, to_peerset) = Peerset::new(
Expand Down
4 changes: 0 additions & 4 deletions substrate/client/network/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,6 @@ impl PeerStoreProvider for MockPeerStore {
unimplemented!()
}

fn num_known_peers(&self) -> usize {
0usize
}

fn add_known_peer(&self, _peer_id: PeerId) {
unimplemented!()
}
Expand Down
Loading

0 comments on commit 7f905e2

Please sign in to comment.