Skip to content

Commit

Permalink
feat: export quinn connection metrics (#23)
Browse files Browse the repository at this point in the history
* log ts for debug secure handshake

* feat: add metric collector service

* chore: remove manual count stream

* chore: resolve comment for PR

* chore: change log stats to debug level

* feat: add sent_pkt and largest_mtu stats to metrics

* chore: export peerConnectionMetrics, that can use by other service
  • Loading branch information
marverlous811 authored Nov 26, 2024
1 parent 5e1d3b4 commit 57df7cd
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 3 deletions.
26 changes: 25 additions & 1 deletion src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio::sync::mpsc::Sender;

use crate::{
msg::{BroadcastMsgId, P2pServiceId, PeerMessage},
peer::PeerConnectionAlias,
peer::{PeerConnectionAlias, PeerConnectionMetric},
router::{RouteAction, SharedRouterTable},
service::P2pServiceEvent,
stream::P2pQuicStream,
Expand All @@ -18,6 +18,7 @@ use crate::{
#[derive(Debug)]
struct SharedCtxInternal {
conns: HashMap<ConnectionId, PeerConnectionAlias>,
conn_metrics: HashMap<ConnectionId, (PeerId, PeerConnectionMetric)>,
received_broadcast_msg: LruCache<BroadcastMsgId, ()>,
services: [Option<Sender<P2pServiceEvent>>; 256],
}
Expand All @@ -38,6 +39,7 @@ impl SharedCtxInternal {

fn unregister_conn(&mut self, conn: &ConnectionId) {
self.conns.remove(conn);
self.conn_metrics.remove(conn);
}

fn conn(&self, conn: &ConnectionId) -> Option<PeerConnectionAlias> {
Expand All @@ -48,6 +50,19 @@ impl SharedCtxInternal {
self.conns.values().cloned().collect::<Vec<_>>()
}

fn update_conn_metrics(&mut self, conn: &ConnectionId, peer: PeerId, metrics: PeerConnectionMetric) {
self.conn_metrics.insert(*conn, (peer, metrics));
}

fn metrics(&self) -> Vec<(ConnectionId, PeerId, PeerConnectionMetric)> {
let mut ret = vec![];
for (conn, (peer, metrics)) in self.conn_metrics.clone() {
ret.push((conn, peer, metrics));
}

ret
}

/// check if we already got the message
/// if is not, it return true and save to cache list
/// if already it return false and do nothing
Expand All @@ -72,6 +87,7 @@ impl SharedCtx {
Self {
ctx: Arc::new(RwLock::new(SharedCtxInternal {
conns: Default::default(),
conn_metrics: Default::default(),
received_broadcast_msg: LruCache::new(8192.try_into().expect("should ok")),
services: std::array::from_fn(|_| None),
})),
Expand Down Expand Up @@ -99,6 +115,14 @@ impl SharedCtx {
self.ctx.read().conns()
}

pub fn update_metrics(&self, conn: &ConnectionId, peer: PeerId, metrics: PeerConnectionMetric) {
self.ctx.write().update_conn_metrics(conn, peer, metrics);
}

pub fn metrics(&self) -> Vec<(ConnectionId, PeerId, PeerConnectionMetric)> {
self.ctx.read().metrics()
}

pub fn router(&self) -> &SharedRouterTable {
&self.router
}
Expand Down
7 changes: 7 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ mod stream;
mod tests;
mod utils;

pub use peer::PeerConnectionMetric;
pub use requester::P2pNetworkRequester;
pub use router::SharedRouterTable;
pub use secure::*;
Expand Down Expand Up @@ -113,6 +114,7 @@ enum InternalEvent {
PeerConnected(ConnectionId, PeerId, u16),
PeerConnectError(ConnectionId, Option<PeerId>, anyhow::Error),
PeerData(ConnectionId, PeerId, PeerMainData),
PeerStats(ConnectionId, PeerId, PeerConnectionMetric),
PeerDisconnected(ConnectionId, PeerId),
}

Expand Down Expand Up @@ -270,6 +272,11 @@ impl<SECURE: HandshakeProtocol> P2pNetwork<SECURE> {
self.neighbours.remove(&conn);
Ok(P2pNetworkEvent::PeerDisconnected(conn, peer))
}
InternalEvent::PeerStats(conn, to_peer, metrics) => {
log::debug!("[P2pNetwork] conn {conn} to peer {to_peer} metrics {:?}", metrics);
self.ctx.update_metrics(&conn, to_peer, metrics);
Ok(P2pNetworkEvent::Continue)
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod peer_alias;
mod peer_internal;

pub use peer_alias::PeerConnectionAlias;
pub use peer_internal::PeerConnectionMetric;

enum PeerConnectionControl {
Send(PeerMessage),
Expand Down
34 changes: 32 additions & 2 deletions src/peer/peer_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@
//! - Only use async with current connection stream
//! - For other communication should use try_send for avoiding blocking
use std::{net::SocketAddr, time::Duration};
use std::{
net::SocketAddr,
time::{Duration, Instant},
};

use anyhow::anyhow;
use futures::{SinkExt, StreamExt};
use quinn::{Connection, RecvStream, SendStream};
use serde::{Deserialize, Serialize};
use tokio::{
io::copy_bidirectional,
select,
Expand All @@ -29,6 +33,18 @@ use crate::{

use super::PeerConnectionControl;

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub struct PeerConnectionMetric {
pub uptime: u64,
pub rtt: u16,
pub sent_pkt: u64,
pub lost_pkt: u64,
pub lost_bytes: u64,
pub send_bytes: u64,
pub recv_bytes: u64,
pub current_mtu: u16,
}

pub struct PeerConnectionInternal {
conn_id: ConnectionId,
to_id: PeerId,
Expand All @@ -39,6 +55,7 @@ pub struct PeerConnectionInternal {
internal_tx: Sender<InternalEvent>,
control_rx: Receiver<PeerConnectionControl>,
ticker: Interval,
started: Instant,
}

impl PeerConnectionInternal {
Expand All @@ -65,6 +82,7 @@ impl PeerConnectionInternal {
internal_tx,
control_rx,
ticker: tokio::time::interval(Duration::from_secs(1)),
started: Instant::now(),
}
}

Expand All @@ -73,7 +91,19 @@ impl PeerConnectionInternal {
select! {
_ = self.ticker.tick() => {
let rtt_ms = self.connection.rtt().as_millis().min(u16::MAX as u128) as u16;
let connection_stats = self.connection.stats();
self.ctx.router().set_direct(self.conn_id, self.to_id, rtt_ms);
let metrics = PeerConnectionMetric {
uptime: self.started.elapsed().as_secs(),
sent_pkt: connection_stats.path.sent_packets,
lost_pkt: connection_stats.path.lost_packets,
lost_bytes: connection_stats.path.lost_bytes,
rtt: rtt_ms,
send_bytes: connection_stats.udp_tx.bytes,
recv_bytes: connection_stats.udp_rx.bytes,
current_mtu: connection_stats.path.current_mtu,
};
let _ = self.internal_tx.try_send(InternalEvent::PeerStats(self.conn_id, self.to_id, metrics));
},
open = self.connection.accept_bi() => {
let (send, recv) = open?;
Expand All @@ -86,7 +116,7 @@ impl PeerConnectionInternal {
control = self.control_rx.recv() => {
let control = control.ok_or(anyhow!("peer control channel ended"))?;
self.on_control(control).await?;
},
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
use crate::{ctx::SharedCtx, msg::P2pServiceId, router::SharedRouterTable, stream::P2pQuicStream, PeerId};

pub mod alias_service;
pub mod metrics_service;
pub mod pubsub_service;
pub mod visualization_service;

Expand Down
84 changes: 84 additions & 0 deletions src/service/metrics_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use std::{collections::VecDeque, time::Duration};

use serde::{Deserialize, Serialize};
use tokio::{select, time::Interval};

use crate::{peer::PeerConnectionMetric, ConnectionId, ErrorExt, P2pServiceEvent, PeerId};

use super::P2pService;

#[derive(Debug, PartialEq, Eq)]
pub enum MetricsServiceEvent {
OnPeerConnectionMetric(PeerId, Vec<(ConnectionId, PeerId, PeerConnectionMetric)>),
}

#[derive(Deserialize, Serialize)]
enum Message {
Scan,
Info(Vec<(ConnectionId, PeerId, PeerConnectionMetric)>),
}

const DEFAULT_COLLECTOR_INTERVAL: u64 = 1;

pub struct MetricsService {
is_collector: bool,
service: P2pService,
ticker: Interval,
outs: VecDeque<MetricsServiceEvent>,
}

impl MetricsService {
pub fn new(collect_interval: Option<Duration>, service: P2pService, is_collector: bool) -> Self {
let ticker = tokio::time::interval(collect_interval.unwrap_or(Duration::from_secs(DEFAULT_COLLECTOR_INTERVAL)));

Self {
is_collector,
ticker,
service,
outs: VecDeque::new(),
}
}

pub async fn recv(&mut self) -> anyhow::Result<MetricsServiceEvent> {
loop {
if let Some(out) = self.outs.pop_front() {
return Ok(out);
}

select! {
_ = self.ticker.tick() => {
if self.is_collector {
let metrics = self.service.ctx.metrics();
self.outs.push_back(MetricsServiceEvent::OnPeerConnectionMetric(self.service.router().local_id(), metrics));

let requester = self.service.requester();
tokio::spawn(async move {
requester.send_broadcast(bincode::serialize(&Message::Scan).expect("should convert to buf")).await;
});
}
}
event = self.service.recv() => match event.expect("should work") {
P2pServiceEvent::Unicast(from, data) | P2pServiceEvent::Broadcast(from, data) => {
if let Ok(msg) = bincode::deserialize::<Message>(&data) {
match msg {
Message::Scan => {
let metrics = self.service.ctx.metrics();
let requester = self.service.requester();
tokio::spawn(async move {
requester.try_send_unicast(from, bincode::serialize(&Message::Info(metrics)).expect("should convert to buf"))
.await
.print_on_err("send metrics info to collector error");
});
}
Message::Info(peer_metrics) => {
self.outs.push_back(MetricsServiceEvent::OnPeerConnectionMetric(from, peer_metrics));
}
}
}
}
P2pServiceEvent::Stream(..) => {}
}
}
}
}
}
1 change: 1 addition & 0 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{P2pNetwork, P2pNetworkConfig, PeerAddress, PeerId, SharedKeyHandshak
mod alias;
mod cross_nodes;
mod discovery;
mod metrics;
mod pubsub;
mod visualization;

Expand Down
41 changes: 41 additions & 0 deletions src/tests/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::time::Duration;

use crate::{
metrics_service::{MetricsService, MetricsServiceEvent},
PeerId,
};
use test_log::test;

use super::create_node;

#[test(tokio::test)]
async fn metric_collect() {
let (mut node1, addr1) = create_node(true, 1, vec![]).await;
let mut service1 = MetricsService::new(None, node1.create_service(0.into()), true);
tokio::spawn(async move { while node1.recv().await.is_ok() {} });

let (mut node2, _) = create_node(true, 2, vec![addr1.clone()]).await;
let mut service2 = MetricsService::new(None, node2.create_service(0.into()), false);
tokio::spawn(async move { while node2.recv().await.is_ok() {} });
tokio::spawn(async move { while service2.recv().await.is_ok() {} });

tokio::time::sleep(Duration::from_secs(1)).await;

let events = vec![
tokio::time::timeout(Duration::from_secs(3), service1.recv()).await.expect("").expect(""),
tokio::time::timeout(Duration::from_secs(3), service1.recv()).await.expect("").expect(""),
tokio::time::timeout(Duration::from_secs(3), service1.recv()).await.expect("").expect(""),
tokio::time::timeout(Duration::from_secs(3), service1.recv()).await.expect("").expect(""),
];

let event_from_peers: Vec<(PeerId, usize)> = events
.iter()
.map(|e| match e {
MetricsServiceEvent::OnPeerConnectionMetric(peer, metrics) => (*peer, metrics.len()),
})
.collect();

println!("{:?}", events);

assert_eq!(event_from_peers, vec![(PeerId(1), 1), (PeerId(1), 1), (PeerId(2), 1), (PeerId(2), 1)]);
}

0 comments on commit 57df7cd

Please sign in to comment.