Skip to content

Commit

Permalink
chore: resolve comment for PR
Browse files Browse the repository at this point in the history
  • Loading branch information
marverlous811 committed Nov 22, 2024
1 parent 63e2032 commit 33242a4
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 20 deletions.
12 changes: 6 additions & 6 deletions src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio::sync::mpsc::Sender;

use crate::{
msg::{BroadcastMsgId, P2pServiceId, PeerMessage},
peer::{PeerConnectionAlias, PeerMetrics},
peer::{PeerConnectionAlias, PeerConnectionMetric},
router::{RouteAction, SharedRouterTable},
service::P2pServiceEvent,
stream::P2pQuicStream,
Expand All @@ -18,7 +18,7 @@ use crate::{
#[derive(Debug)]
struct SharedCtxInternal {
conns: HashMap<ConnectionId, PeerConnectionAlias>,
conn_metrics: HashMap<ConnectionId, (PeerId, PeerMetrics)>,
conn_metrics: HashMap<ConnectionId, (PeerId, PeerConnectionMetric)>,
received_broadcast_msg: LruCache<BroadcastMsgId, ()>,
services: [Option<Sender<P2pServiceEvent>>; 256],
}
Expand Down Expand Up @@ -50,11 +50,11 @@ impl SharedCtxInternal {
self.conns.values().cloned().collect::<Vec<_>>()
}

fn update_conn_metrics(&mut self, conn: &ConnectionId, peer: PeerId, metrics: PeerMetrics) {
fn update_conn_metrics(&mut self, conn: &ConnectionId, peer: PeerId, metrics: PeerConnectionMetric) {
self.conn_metrics.insert(*conn, (peer, metrics));
}

fn metrics(&self) -> Vec<(ConnectionId, PeerId, PeerMetrics)> {
fn metrics(&self) -> Vec<(ConnectionId, PeerId, PeerConnectionMetric)> {
let mut ret = vec![];
for (conn, (peer, metrics)) in self.conn_metrics.clone() {
ret.push((conn, peer, metrics));
Expand Down Expand Up @@ -115,11 +115,11 @@ impl SharedCtx {
self.ctx.read().conns()
}

pub fn update_metrics(&self, conn: &ConnectionId, peer: PeerId, metrics: PeerMetrics) {
pub fn update_metrics(&self, conn: &ConnectionId, peer: PeerId, metrics: PeerConnectionMetric) {
self.ctx.write().update_conn_metrics(conn, peer, metrics);
}

pub fn metrics(&self) -> Vec<(ConnectionId, PeerId, PeerMetrics)> {
pub fn metrics(&self) -> Vec<(ConnectionId, PeerId, PeerConnectionMetric)> {
self.ctx.read().metrics()
}

Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use derive_more::derive::{Deref, Display, From};
use discovery::{PeerDiscovery, PeerDiscoverySync};
use msg::{P2pServiceId, PeerMessage};
use neighbours::NetworkNeighbours;
use peer::{PeerConnection, PeerMetrics};
use peer::{PeerConnection, PeerConnectionMetric};
use quinn::{Endpoint, Incoming, VarInt};
use router::RouterTableSync;
use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
Expand Down Expand Up @@ -113,7 +113,7 @@ enum InternalEvent {
PeerConnected(ConnectionId, PeerId, u16),
PeerConnectError(ConnectionId, Option<PeerId>, anyhow::Error),
PeerData(ConnectionId, PeerId, PeerMainData),
PeerStats(ConnectionId, PeerId, PeerMetrics),
PeerStats(ConnectionId, PeerId, PeerConnectionMetric),
PeerDisconnected(ConnectionId, PeerId),
}

Expand Down
2 changes: 1 addition & 1 deletion src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mod peer_alias;
mod peer_internal;

pub use peer_alias::PeerConnectionAlias;
pub use peer_internal::PeerMetrics;
pub use peer_internal::PeerConnectionMetric;

enum PeerConnectionControl {
Send(PeerMessage),
Expand Down
5 changes: 2 additions & 3 deletions src/peer/peer_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::{
use super::PeerConnectionControl;

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub struct PeerMetrics {
pub struct PeerConnectionMetric {
pub uptime: u64,
pub rtt: u16,
pub lost_pkt: u64,
Expand Down Expand Up @@ -90,9 +90,8 @@ impl PeerConnectionInternal {
_ = self.ticker.tick() => {
let rtt_ms = self.connection.rtt().as_millis().min(u16::MAX as u128) as u16;
let connection_stats = self.connection.stats();
log::info!("connection stats: {:?}", connection_stats);
self.ctx.router().set_direct(self.conn_id, self.to_id, rtt_ms);
let metrics = PeerMetrics {
let metrics = PeerConnectionMetric {
uptime: self.started.elapsed().as_secs(),
lost_pkt: connection_stats.path.lost_packets,
lost_bytes: connection_stats.path.lost_bytes,
Expand Down
16 changes: 9 additions & 7 deletions src/service/metrics_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@ use std::{collections::VecDeque, time::Duration};
use serde::{Deserialize, Serialize};
use tokio::{select, time::Interval};

use crate::{peer::PeerMetrics, ConnectionId, ErrorExt, P2pServiceEvent, PeerId};
use crate::{peer::PeerConnectionMetric, ConnectionId, ErrorExt, P2pServiceEvent, PeerId};

use super::P2pService;

#[derive(Debug, PartialEq, Eq)]
pub enum MetricsServiceEvent {
PeerMetrics(PeerId, Vec<(ConnectionId, PeerId, PeerMetrics)>),
OnPeerConnectionMetric(PeerId, Vec<(ConnectionId, PeerId, PeerConnectionMetric)>),
}

#[derive(Deserialize, Serialize)]
enum Message {
Scan,
Info(Vec<(ConnectionId, PeerId, PeerMetrics)>),
Info(Vec<(ConnectionId, PeerId, PeerConnectionMetric)>),
}

const DEFAULT_COLLECTOR_INTERVAL: u64 = 1;

pub struct MetricsService {
is_collector: bool,
service: P2pService,
Expand All @@ -27,7 +29,7 @@ pub struct MetricsService {

impl MetricsService {
pub fn new(collect_interval: Option<Duration>, service: P2pService, is_collector: bool) -> Self {
let ticker = tokio::time::interval(collect_interval.unwrap_or(Duration::from_secs(1)));
let ticker = tokio::time::interval(collect_interval.unwrap_or(Duration::from_secs(DEFAULT_COLLECTOR_INTERVAL)));

Self {
is_collector,
Expand All @@ -47,7 +49,7 @@ impl MetricsService {
_ = self.ticker.tick() => {
if self.is_collector {
let metrics = self.service.ctx.metrics();
self.outs.push_back(MetricsServiceEvent::PeerMetrics(self.service.router().local_id(), metrics));
self.outs.push_back(MetricsServiceEvent::OnPeerConnectionMetric(self.service.router().local_id(), metrics));

let requester = self.service.requester();
tokio::spawn(async move {
Expand All @@ -63,13 +65,13 @@ impl MetricsService {
let metrics = self.service.ctx.metrics();
let requester = self.service.requester();
tokio::spawn(async move {
requester.send_unicast(from, bincode::serialize(&Message::Info(metrics)).expect("should convert to buf"))
requester.try_send_unicast(from, bincode::serialize(&Message::Info(metrics)).expect("should convert to buf"))
.await
.print_on_err("send metrics info to collector error");
});
}
Message::Info(peer_metrics) => {
self.outs.push_back(MetricsServiceEvent::PeerMetrics(from, peer_metrics));
self.outs.push_back(MetricsServiceEvent::OnPeerConnectionMetric(from, peer_metrics));
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/tests/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn metric_collect() {
let event_from_peers: Vec<(PeerId, usize)> = events
.iter()
.map(|e| match e {
MetricsServiceEvent::PeerMetrics(peer, metrics) => (*peer, metrics.len()),
MetricsServiceEvent::OnPeerConnectionMetric(peer, metrics) => (*peer, metrics.len()),
})
.collect();

Expand Down

0 comments on commit 33242a4

Please sign in to comment.