From 33242a4196213f5e2141b645ebf4031afbf54886 Mon Sep 17 00:00:00 2001 From: shadowWalker Date: Fri, 22 Nov 2024 09:26:10 +0700 Subject: [PATCH] chore: resolve comment for PR --- src/ctx.rs | 12 ++++++------ src/lib.rs | 4 ++-- src/peer.rs | 2 +- src/peer/peer_internal.rs | 5 ++--- src/service/metrics_service.rs | 16 +++++++++------- src/tests/metrics.rs | 2 +- 6 files changed, 21 insertions(+), 20 deletions(-) diff --git a/src/ctx.rs b/src/ctx.rs index 52b76d1..41f654a 100644 --- a/src/ctx.rs +++ b/src/ctx.rs @@ -7,7 +7,7 @@ use tokio::sync::mpsc::Sender; use crate::{ msg::{BroadcastMsgId, P2pServiceId, PeerMessage}, - peer::{PeerConnectionAlias, PeerMetrics}, + peer::{PeerConnectionAlias, PeerConnectionMetric}, router::{RouteAction, SharedRouterTable}, service::P2pServiceEvent, stream::P2pQuicStream, @@ -18,7 +18,7 @@ use crate::{ #[derive(Debug)] struct SharedCtxInternal { conns: HashMap, - conn_metrics: HashMap, + conn_metrics: HashMap, received_broadcast_msg: LruCache, services: [Option>; 256], } @@ -50,11 +50,11 @@ impl SharedCtxInternal { self.conns.values().cloned().collect::>() } - fn update_conn_metrics(&mut self, conn: &ConnectionId, peer: PeerId, metrics: PeerMetrics) { + fn update_conn_metrics(&mut self, conn: &ConnectionId, peer: PeerId, metrics: PeerConnectionMetric) { self.conn_metrics.insert(*conn, (peer, metrics)); } - fn metrics(&self) -> Vec<(ConnectionId, PeerId, PeerMetrics)> { + fn metrics(&self) -> Vec<(ConnectionId, PeerId, PeerConnectionMetric)> { let mut ret = vec![]; for (conn, (peer, metrics)) in self.conn_metrics.clone() { ret.push((conn, peer, metrics)); @@ -115,11 +115,11 @@ impl SharedCtx { self.ctx.read().conns() } - pub fn update_metrics(&self, conn: &ConnectionId, peer: PeerId, metrics: PeerMetrics) { + pub fn update_metrics(&self, conn: &ConnectionId, peer: PeerId, metrics: PeerConnectionMetric) { self.ctx.write().update_conn_metrics(conn, peer, metrics); } - pub fn metrics(&self) -> Vec<(ConnectionId, PeerId, PeerMetrics)> { + pub fn metrics(&self) -> Vec<(ConnectionId, PeerId, PeerConnectionMetric)> { self.ctx.read().metrics() } diff --git a/src/lib.rs b/src/lib.rs index 26ab07e..89ee652 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,7 +13,7 @@ use derive_more::derive::{Deref, Display, From}; use discovery::{PeerDiscovery, PeerDiscoverySync}; use msg::{P2pServiceId, PeerMessage}; use neighbours::NetworkNeighbours; -use peer::{PeerConnection, PeerMetrics}; +use peer::{PeerConnection, PeerConnectionMetric}; use quinn::{Endpoint, Incoming, VarInt}; use router::RouterTableSync; use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; @@ -113,7 +113,7 @@ enum InternalEvent { PeerConnected(ConnectionId, PeerId, u16), PeerConnectError(ConnectionId, Option, anyhow::Error), PeerData(ConnectionId, PeerId, PeerMainData), - PeerStats(ConnectionId, PeerId, PeerMetrics), + PeerStats(ConnectionId, PeerId, PeerConnectionMetric), PeerDisconnected(ConnectionId, PeerId), } diff --git a/src/peer.rs b/src/peer.rs index 901f402..c43d304 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -24,7 +24,7 @@ mod peer_alias; mod peer_internal; pub use peer_alias::PeerConnectionAlias; -pub use peer_internal::PeerMetrics; +pub use peer_internal::PeerConnectionMetric; enum PeerConnectionControl { Send(PeerMessage), diff --git a/src/peer/peer_internal.rs b/src/peer/peer_internal.rs index 4ae6e6a..cce2ff3 100644 --- a/src/peer/peer_internal.rs +++ b/src/peer/peer_internal.rs @@ -34,7 +34,7 @@ use crate::{ use super::PeerConnectionControl; #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] -pub struct PeerMetrics { +pub struct PeerConnectionMetric { pub uptime: u64, pub rtt: u16, pub lost_pkt: u64, @@ -90,9 +90,8 @@ impl PeerConnectionInternal { _ = self.ticker.tick() => { let rtt_ms = self.connection.rtt().as_millis().min(u16::MAX as u128) as u16; let connection_stats = self.connection.stats(); - log::info!("connection stats: {:?}", connection_stats); self.ctx.router().set_direct(self.conn_id, self.to_id, rtt_ms); - let metrics = PeerMetrics { + let metrics = PeerConnectionMetric { uptime: self.started.elapsed().as_secs(), lost_pkt: connection_stats.path.lost_packets, lost_bytes: connection_stats.path.lost_bytes, diff --git a/src/service/metrics_service.rs b/src/service/metrics_service.rs index 501c1c8..829a8de 100644 --- a/src/service/metrics_service.rs +++ b/src/service/metrics_service.rs @@ -3,21 +3,23 @@ use std::{collections::VecDeque, time::Duration}; use serde::{Deserialize, Serialize}; use tokio::{select, time::Interval}; -use crate::{peer::PeerMetrics, ConnectionId, ErrorExt, P2pServiceEvent, PeerId}; +use crate::{peer::PeerConnectionMetric, ConnectionId, ErrorExt, P2pServiceEvent, PeerId}; use super::P2pService; #[derive(Debug, PartialEq, Eq)] pub enum MetricsServiceEvent { - PeerMetrics(PeerId, Vec<(ConnectionId, PeerId, PeerMetrics)>), + OnPeerConnectionMetric(PeerId, Vec<(ConnectionId, PeerId, PeerConnectionMetric)>), } #[derive(Deserialize, Serialize)] enum Message { Scan, - Info(Vec<(ConnectionId, PeerId, PeerMetrics)>), + Info(Vec<(ConnectionId, PeerId, PeerConnectionMetric)>), } +const DEFAULT_COLLECTOR_INTERVAL: u64 = 1; + pub struct MetricsService { is_collector: bool, service: P2pService, @@ -27,7 +29,7 @@ pub struct MetricsService { impl MetricsService { pub fn new(collect_interval: Option, service: P2pService, is_collector: bool) -> Self { - let ticker = tokio::time::interval(collect_interval.unwrap_or(Duration::from_secs(1))); + let ticker = tokio::time::interval(collect_interval.unwrap_or(Duration::from_secs(DEFAULT_COLLECTOR_INTERVAL))); Self { is_collector, @@ -47,7 +49,7 @@ impl MetricsService { _ = self.ticker.tick() => { if self.is_collector { let metrics = self.service.ctx.metrics(); - self.outs.push_back(MetricsServiceEvent::PeerMetrics(self.service.router().local_id(), metrics)); + self.outs.push_back(MetricsServiceEvent::OnPeerConnectionMetric(self.service.router().local_id(), metrics)); let requester = self.service.requester(); tokio::spawn(async move { @@ -63,13 +65,13 @@ impl MetricsService { let metrics = self.service.ctx.metrics(); let requester = self.service.requester(); tokio::spawn(async move { - requester.send_unicast(from, bincode::serialize(&Message::Info(metrics)).expect("should convert to buf")) + requester.try_send_unicast(from, bincode::serialize(&Message::Info(metrics)).expect("should convert to buf")) .await .print_on_err("send metrics info to collector error"); }); } Message::Info(peer_metrics) => { - self.outs.push_back(MetricsServiceEvent::PeerMetrics(from, peer_metrics)); + self.outs.push_back(MetricsServiceEvent::OnPeerConnectionMetric(from, peer_metrics)); } } } diff --git a/src/tests/metrics.rs b/src/tests/metrics.rs index 23c4352..52a3fba 100644 --- a/src/tests/metrics.rs +++ b/src/tests/metrics.rs @@ -31,7 +31,7 @@ async fn metric_collect() { let event_from_peers: Vec<(PeerId, usize)> = events .iter() .map(|e| match e { - MetricsServiceEvent::PeerMetrics(peer, metrics) => (*peer, metrics.len()), + MetricsServiceEvent::OnPeerConnectionMetric(peer, metrics) => (*peer, metrics.len()), }) .collect();