Skip to content

Commit

Permalink
More comments fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
brunoffranca committed Oct 7, 2024
1 parent 7fa5ae1 commit ab7eff6
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 38 deletions.
13 changes: 2 additions & 11 deletions node/actors/network/src/debug_page/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
sync::{atomic::Ordering, Arc},
time::SystemTime,
};
use tls_listener::TlsListener;
use tokio::net::TcpListener;
Expand Down Expand Up @@ -570,10 +569,7 @@ impl Server {
.human_count_bytes()
.to_string(),
Self::human_readable_duration(
SystemTime::now()
.duration_since(connection.stats.established)
.unwrap_or_default()
.as_secs(),
connection.stats.established.elapsed().whole_seconds() as u64,
),
])
}
Expand Down Expand Up @@ -613,12 +609,7 @@ impl Server {
.load(Ordering::Relaxed)
.human_count_bytes()
.to_string(),
Self::human_readable_duration(
SystemTime::now()
.duration_since(stats.established)
.unwrap_or_default()
.as_secs(),
),
Self::human_readable_duration(stats.established.elapsed().whole_seconds() as u64),
])
}

Expand Down
74 changes: 47 additions & 27 deletions node/actors/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ use std::{
Arc, Weak,
},
task::{ready, Context, Poll},
time::{SystemTime, UNIX_EPOCH},
};
use vise::{
Collector, Counter, EncodeLabelSet, EncodeLabelValue, Family, Gauge, GaugeGuard, Metrics, Unit,
};
use zksync_concurrency::{ctx, io, net};
use zksync_concurrency::{ctx, io, net, time::Instant};

/// Metered TCP stream.
#[pin_project::pin_project]
Expand Down Expand Up @@ -230,8 +229,8 @@ pub struct MeteredStreamStats {
pub sent: AtomicU64,
/// Total bytes received over the Stream.
pub received: AtomicU64,
/// System time since the connection started.
pub established: SystemTime,
/// Time when the connection started.
pub established: Instant,
/// IP Address and port of current connection.
pub peer_addr: SocketAddr,
/// Total bytes sent in the current minute.
Expand All @@ -242,22 +241,22 @@ pub struct MeteredStreamStats {
pub current_minute_received: AtomicU64,
/// Total bytes received in the previous minute.
pub previous_minute_received: AtomicU64,
/// The start of the current minute (in seconds since Unix epoch).
pub current_minute_start: AtomicU64,
/// Minutes elapsed since the connection started, when this metrics were last updated.
pub minutes_elapsed_last: AtomicU64,
}

impl MeteredStreamStats {
fn new(peer_addr: SocketAddr) -> Self {
Self {
sent: 0.into(),
received: 0.into(),
established: SystemTime::now(),
established: Instant::now(),
peer_addr,
current_minute_sent: 0.into(),
previous_minute_sent: 0.into(),
current_minute_received: 0.into(),
previous_minute_received: 0.into(),
current_minute_start: 0.into(),
minutes_elapsed_last: 0.into(),
}
}

Expand All @@ -276,35 +275,56 @@ impl MeteredStreamStats {
}

fn update_minute(&self) {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();

let elapsed_secs = now.saturating_sub(self.current_minute_start.load(Ordering::Relaxed));

if elapsed_secs >= 60 {
self.previous_minute_sent.store(
self.current_minute_sent.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.previous_minute_received.store(
self.current_minute_received.load(Ordering::Relaxed),
Ordering::Relaxed,
);
let elapsed_minutes_now = self.established.elapsed().whole_seconds() as u64 / 60;
let elapsed_minutes_last = self.minutes_elapsed_last.load(Ordering::Relaxed);

if elapsed_minutes_now > elapsed_minutes_last {
if elapsed_minutes_now - elapsed_minutes_last > 1 {
self.previous_minute_sent.store(0, Ordering::Relaxed);
self.previous_minute_received.store(0, Ordering::Relaxed);
} else {
self.previous_minute_sent.store(
self.current_minute_sent.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.previous_minute_received.store(
self.current_minute_received.load(Ordering::Relaxed),
Ordering::Relaxed,
);
}

self.current_minute_sent.store(0, Ordering::Relaxed);
self.current_minute_received.store(0, Ordering::Relaxed);
self.current_minute_start.store(now, Ordering::Relaxed);
self.minutes_elapsed_last
.store(elapsed_minutes_now, Ordering::Relaxed);
}
}

/// Returns the upload throughput of the connection in bytes per second.
pub fn sent_throughput(&self) -> f64 {
self.previous_minute_sent.load(Ordering::Relaxed) as f64 / 60.0
let elapsed_minutes_now = self.established.elapsed().whole_seconds() as u64 / 60;
let elapsed_minutes_last = self.minutes_elapsed_last.load(Ordering::Relaxed);

if elapsed_minutes_now - elapsed_minutes_last == 0 {
self.previous_minute_sent.load(Ordering::Relaxed) as f64 / 60.0
} else if elapsed_minutes_now - elapsed_minutes_last == 1 {
self.current_minute_sent.load(Ordering::Relaxed) as f64 / 60.0
} else {
0.0
}
}

/// Returns the download throughput of the connection in bytes per second.
pub fn received_throughput(&self) -> f64 {
self.previous_minute_received.load(Ordering::Relaxed) as f64 / 60.0
let elapsed_minutes_now = self.established.elapsed().whole_seconds() as u64 / 60;
let elapsed_minutes_last = self.minutes_elapsed_last.load(Ordering::Relaxed);

if elapsed_minutes_now - elapsed_minutes_last == 0 {
self.previous_minute_received.load(Ordering::Relaxed) as f64 / 60.0
} else if elapsed_minutes_now - elapsed_minutes_last == 1 {
self.current_minute_received.load(Ordering::Relaxed) as f64 / 60.0
} else {
0.0
}
}
}

0 comments on commit ab7eff6

Please sign in to comment.