From 57df7cd86ea28e8031a388d5426706878018e32b Mon Sep 17 00:00:00 2001 From: shadowWalker Date: Tue, 26 Nov 2024 14:28:15 +0700 Subject: [PATCH] feat: export quinn connection metrics (#23) * log ts for debug secure handshake * feat: add metric collector service * chore: remove manual count stream * chore: resolve comment for PR * chore: change log stats to debug level * feat: add sent_pkt and largest_mtu stats to metrics * chore: export peerConnectionMetrics, that can use by other service --- src/ctx.rs | 26 ++++++++++- src/lib.rs | 7 +++ src/peer.rs | 1 + src/peer/peer_internal.rs | 34 +++++++++++++- src/service.rs | 1 + src/service/metrics_service.rs | 84 ++++++++++++++++++++++++++++++++++ src/tests.rs | 1 + src/tests/metrics.rs | 41 +++++++++++++++++ 8 files changed, 192 insertions(+), 3 deletions(-) create mode 100644 src/service/metrics_service.rs create mode 100644 src/tests/metrics.rs diff --git a/src/ctx.rs b/src/ctx.rs index 23cac65..41f654a 100644 --- a/src/ctx.rs +++ b/src/ctx.rs @@ -7,7 +7,7 @@ use tokio::sync::mpsc::Sender; use crate::{ msg::{BroadcastMsgId, P2pServiceId, PeerMessage}, - peer::PeerConnectionAlias, + peer::{PeerConnectionAlias, PeerConnectionMetric}, router::{RouteAction, SharedRouterTable}, service::P2pServiceEvent, stream::P2pQuicStream, @@ -18,6 +18,7 @@ use crate::{ #[derive(Debug)] struct SharedCtxInternal { conns: HashMap, + conn_metrics: HashMap, received_broadcast_msg: LruCache, services: [Option>; 256], } @@ -38,6 +39,7 @@ impl SharedCtxInternal { fn unregister_conn(&mut self, conn: &ConnectionId) { self.conns.remove(conn); + self.conn_metrics.remove(conn); } fn conn(&self, conn: &ConnectionId) -> Option { @@ -48,6 +50,19 @@ impl SharedCtxInternal { self.conns.values().cloned().collect::>() } + fn update_conn_metrics(&mut self, conn: &ConnectionId, peer: PeerId, metrics: PeerConnectionMetric) { + self.conn_metrics.insert(*conn, (peer, metrics)); + } + + fn metrics(&self) -> Vec<(ConnectionId, PeerId, PeerConnectionMetric)> { + let mut ret = vec![]; + for (conn, (peer, metrics)) in self.conn_metrics.clone() { + ret.push((conn, peer, metrics)); + } + + ret + } + /// check if we already got the message /// if is not, it return true and save to cache list /// if already it return false and do nothing @@ -72,6 +87,7 @@ impl SharedCtx { Self { ctx: Arc::new(RwLock::new(SharedCtxInternal { conns: Default::default(), + conn_metrics: Default::default(), received_broadcast_msg: LruCache::new(8192.try_into().expect("should ok")), services: std::array::from_fn(|_| None), })), @@ -99,6 +115,14 @@ impl SharedCtx { self.ctx.read().conns() } + pub fn update_metrics(&self, conn: &ConnectionId, peer: PeerId, metrics: PeerConnectionMetric) { + self.ctx.write().update_conn_metrics(conn, peer, metrics); + } + + pub fn metrics(&self) -> Vec<(ConnectionId, PeerId, PeerConnectionMetric)> { + self.ctx.read().metrics() + } + pub fn router(&self) -> &SharedRouterTable { &self.router } diff --git a/src/lib.rs b/src/lib.rs index 80d63e2..402eaf7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,6 +44,7 @@ mod stream; mod tests; mod utils; +pub use peer::PeerConnectionMetric; pub use requester::P2pNetworkRequester; pub use router::SharedRouterTable; pub use secure::*; @@ -113,6 +114,7 @@ enum InternalEvent { PeerConnected(ConnectionId, PeerId, u16), PeerConnectError(ConnectionId, Option, anyhow::Error), PeerData(ConnectionId, PeerId, PeerMainData), + PeerStats(ConnectionId, PeerId, PeerConnectionMetric), PeerDisconnected(ConnectionId, PeerId), } @@ -270,6 +272,11 @@ impl P2pNetwork { self.neighbours.remove(&conn); Ok(P2pNetworkEvent::PeerDisconnected(conn, peer)) } + InternalEvent::PeerStats(conn, to_peer, metrics) => { + log::debug!("[P2pNetwork] conn {conn} to peer {to_peer} metrics {:?}", metrics); + self.ctx.update_metrics(&conn, to_peer, metrics); + Ok(P2pNetworkEvent::Continue) + } } } diff --git a/src/peer.rs b/src/peer.rs index 588cc8e..c43d304 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -24,6 +24,7 @@ mod peer_alias; mod peer_internal; pub use peer_alias::PeerConnectionAlias; +pub use peer_internal::PeerConnectionMetric; enum PeerConnectionControl { Send(PeerMessage), diff --git a/src/peer/peer_internal.rs b/src/peer/peer_internal.rs index fbce22a..d8119d9 100644 --- a/src/peer/peer_internal.rs +++ b/src/peer/peer_internal.rs @@ -5,11 +5,15 @@ //! - Only use async with current connection stream //! - For other communication should use try_send for avoiding blocking -use std::{net::SocketAddr, time::Duration}; +use std::{ + net::SocketAddr, + time::{Duration, Instant}, +}; use anyhow::anyhow; use futures::{SinkExt, StreamExt}; use quinn::{Connection, RecvStream, SendStream}; +use serde::{Deserialize, Serialize}; use tokio::{ io::copy_bidirectional, select, @@ -29,6 +33,18 @@ use crate::{ use super::PeerConnectionControl; +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] +pub struct PeerConnectionMetric { + pub uptime: u64, + pub rtt: u16, + pub sent_pkt: u64, + pub lost_pkt: u64, + pub lost_bytes: u64, + pub send_bytes: u64, + pub recv_bytes: u64, + pub current_mtu: u16, +} + pub struct PeerConnectionInternal { conn_id: ConnectionId, to_id: PeerId, @@ -39,6 +55,7 @@ pub struct PeerConnectionInternal { internal_tx: Sender, control_rx: Receiver, ticker: Interval, + started: Instant, } impl PeerConnectionInternal { @@ -65,6 +82,7 @@ impl PeerConnectionInternal { internal_tx, control_rx, ticker: tokio::time::interval(Duration::from_secs(1)), + started: Instant::now(), } } @@ -73,7 +91,19 @@ impl PeerConnectionInternal { select! { _ = self.ticker.tick() => { let rtt_ms = self.connection.rtt().as_millis().min(u16::MAX as u128) as u16; + let connection_stats = self.connection.stats(); self.ctx.router().set_direct(self.conn_id, self.to_id, rtt_ms); + let metrics = PeerConnectionMetric { + uptime: self.started.elapsed().as_secs(), + sent_pkt: connection_stats.path.sent_packets, + lost_pkt: connection_stats.path.lost_packets, + lost_bytes: connection_stats.path.lost_bytes, + rtt: rtt_ms, + send_bytes: connection_stats.udp_tx.bytes, + recv_bytes: connection_stats.udp_rx.bytes, + current_mtu: connection_stats.path.current_mtu, + }; + let _ = self.internal_tx.try_send(InternalEvent::PeerStats(self.conn_id, self.to_id, metrics)); }, open = self.connection.accept_bi() => { let (send, recv) = open?; @@ -86,7 +116,7 @@ impl PeerConnectionInternal { control = self.control_rx.recv() => { let control = control.ok_or(anyhow!("peer control channel ended"))?; self.on_control(control).await?; - }, + } } } } diff --git a/src/service.rs b/src/service.rs index 06cbc6f..6ba9b29 100644 --- a/src/service.rs +++ b/src/service.rs @@ -3,6 +3,7 @@ use tokio::sync::mpsc::{channel, Receiver, Sender}; use crate::{ctx::SharedCtx, msg::P2pServiceId, router::SharedRouterTable, stream::P2pQuicStream, PeerId}; pub mod alias_service; +pub mod metrics_service; pub mod pubsub_service; pub mod visualization_service; diff --git a/src/service/metrics_service.rs b/src/service/metrics_service.rs new file mode 100644 index 0000000..829a8de --- /dev/null +++ b/src/service/metrics_service.rs @@ -0,0 +1,84 @@ +use std::{collections::VecDeque, time::Duration}; + +use serde::{Deserialize, Serialize}; +use tokio::{select, time::Interval}; + +use crate::{peer::PeerConnectionMetric, ConnectionId, ErrorExt, P2pServiceEvent, PeerId}; + +use super::P2pService; + +#[derive(Debug, PartialEq, Eq)] +pub enum MetricsServiceEvent { + OnPeerConnectionMetric(PeerId, Vec<(ConnectionId, PeerId, PeerConnectionMetric)>), +} + +#[derive(Deserialize, Serialize)] +enum Message { + Scan, + Info(Vec<(ConnectionId, PeerId, PeerConnectionMetric)>), +} + +const DEFAULT_COLLECTOR_INTERVAL: u64 = 1; + +pub struct MetricsService { + is_collector: bool, + service: P2pService, + ticker: Interval, + outs: VecDeque, +} + +impl MetricsService { + pub fn new(collect_interval: Option, service: P2pService, is_collector: bool) -> Self { + let ticker = tokio::time::interval(collect_interval.unwrap_or(Duration::from_secs(DEFAULT_COLLECTOR_INTERVAL))); + + Self { + is_collector, + ticker, + service, + outs: VecDeque::new(), + } + } + + pub async fn recv(&mut self) -> anyhow::Result { + loop { + if let Some(out) = self.outs.pop_front() { + return Ok(out); + } + + select! { + _ = self.ticker.tick() => { + if self.is_collector { + let metrics = self.service.ctx.metrics(); + self.outs.push_back(MetricsServiceEvent::OnPeerConnectionMetric(self.service.router().local_id(), metrics)); + + let requester = self.service.requester(); + tokio::spawn(async move { + requester.send_broadcast(bincode::serialize(&Message::Scan).expect("should convert to buf")).await; + }); + } + } + event = self.service.recv() => match event.expect("should work") { + P2pServiceEvent::Unicast(from, data) | P2pServiceEvent::Broadcast(from, data) => { + if let Ok(msg) = bincode::deserialize::(&data) { + match msg { + Message::Scan => { + let metrics = self.service.ctx.metrics(); + let requester = self.service.requester(); + tokio::spawn(async move { + requester.try_send_unicast(from, bincode::serialize(&Message::Info(metrics)).expect("should convert to buf")) + .await + .print_on_err("send metrics info to collector error"); + }); + } + Message::Info(peer_metrics) => { + self.outs.push_back(MetricsServiceEvent::OnPeerConnectionMetric(from, peer_metrics)); + } + } + } + } + P2pServiceEvent::Stream(..) => {} + } + } + } + } +} diff --git a/src/tests.rs b/src/tests.rs index 235fd35..f4b4dbd 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -7,6 +7,7 @@ use crate::{P2pNetwork, P2pNetworkConfig, PeerAddress, PeerId, SharedKeyHandshak mod alias; mod cross_nodes; mod discovery; +mod metrics; mod pubsub; mod visualization; diff --git a/src/tests/metrics.rs b/src/tests/metrics.rs new file mode 100644 index 0000000..52a3fba --- /dev/null +++ b/src/tests/metrics.rs @@ -0,0 +1,41 @@ +use std::time::Duration; + +use crate::{ + metrics_service::{MetricsService, MetricsServiceEvent}, + PeerId, +}; +use test_log::test; + +use super::create_node; + +#[test(tokio::test)] +async fn metric_collect() { + let (mut node1, addr1) = create_node(true, 1, vec![]).await; + let mut service1 = MetricsService::new(None, node1.create_service(0.into()), true); + tokio::spawn(async move { while node1.recv().await.is_ok() {} }); + + let (mut node2, _) = create_node(true, 2, vec![addr1.clone()]).await; + let mut service2 = MetricsService::new(None, node2.create_service(0.into()), false); + tokio::spawn(async move { while node2.recv().await.is_ok() {} }); + tokio::spawn(async move { while service2.recv().await.is_ok() {} }); + + tokio::time::sleep(Duration::from_secs(1)).await; + + let events = vec![ + tokio::time::timeout(Duration::from_secs(3), service1.recv()).await.expect("").expect(""), + tokio::time::timeout(Duration::from_secs(3), service1.recv()).await.expect("").expect(""), + tokio::time::timeout(Duration::from_secs(3), service1.recv()).await.expect("").expect(""), + tokio::time::timeout(Duration::from_secs(3), service1.recv()).await.expect("").expect(""), + ]; + + let event_from_peers: Vec<(PeerId, usize)> = events + .iter() + .map(|e| match e { + MetricsServiceEvent::OnPeerConnectionMetric(peer, metrics) => (*peer, metrics.len()), + }) + .collect(); + + println!("{:?}", events); + + assert_eq!(event_from_peers, vec![(PeerId(1), 1), (PeerId(1), 1), (PeerId(2), 1), (PeerId(2), 1)]); +}