diff --git a/node/actors/network/src/debug_page/mod.rs b/node/actors/network/src/debug_page/mod.rs index 7a759ba7..663f28f1 100644 --- a/node/actors/network/src/debug_page/mod.rs +++ b/node/actors/network/src/debug_page/mod.rs @@ -17,7 +17,6 @@ use std::{ collections::{HashMap, HashSet}, net::SocketAddr, sync::{atomic::Ordering, Arc}, - time::SystemTime, }; use tls_listener::TlsListener; use tokio::net::TcpListener; @@ -570,10 +569,7 @@ impl Server { .human_count_bytes() .to_string(), Self::human_readable_duration( - SystemTime::now() - .duration_since(connection.stats.established) - .unwrap_or_default() - .as_secs(), + connection.stats.established.elapsed().whole_seconds() as u64, ), ]) } @@ -613,12 +609,7 @@ impl Server { .load(Ordering::Relaxed) .human_count_bytes() .to_string(), - Self::human_readable_duration( - SystemTime::now() - .duration_since(stats.established) - .unwrap_or_default() - .as_secs(), - ), + Self::human_readable_duration(stats.established.elapsed().whole_seconds() as u64), ]) } diff --git a/node/actors/network/src/metrics.rs b/node/actors/network/src/metrics.rs index 23cff48e..97ca01e3 100644 --- a/node/actors/network/src/metrics.rs +++ b/node/actors/network/src/metrics.rs @@ -13,12 +13,11 @@ use std::{ Arc, Weak, }, task::{ready, Context, Poll}, - time::{SystemTime, UNIX_EPOCH}, }; use vise::{ Collector, Counter, EncodeLabelSet, EncodeLabelValue, Family, Gauge, GaugeGuard, Metrics, Unit, }; -use zksync_concurrency::{ctx, io, net}; +use zksync_concurrency::{ctx, io, net, time::Instant}; /// Metered TCP stream. #[pin_project::pin_project] @@ -230,8 +229,8 @@ pub struct MeteredStreamStats { pub sent: AtomicU64, /// Total bytes received over the Stream. pub received: AtomicU64, - /// System time since the connection started. - pub established: SystemTime, + /// Time when the connection started. + pub established: Instant, /// IP Address and port of current connection. pub peer_addr: SocketAddr, /// Total bytes sent in the current minute. @@ -242,8 +241,8 @@ pub struct MeteredStreamStats { pub current_minute_received: AtomicU64, /// Total bytes received in the previous minute. pub previous_minute_received: AtomicU64, - /// The start of the current minute (in seconds since Unix epoch). - pub current_minute_start: AtomicU64, + /// Minutes elapsed since the connection started, when this metrics were last updated. + pub minutes_elapsed_last: AtomicU64, } impl MeteredStreamStats { @@ -251,13 +250,13 @@ impl MeteredStreamStats { Self { sent: 0.into(), received: 0.into(), - established: SystemTime::now(), + established: Instant::now(), peer_addr, current_minute_sent: 0.into(), previous_minute_sent: 0.into(), current_minute_received: 0.into(), previous_minute_received: 0.into(), - current_minute_start: 0.into(), + minutes_elapsed_last: 0.into(), } } @@ -276,35 +275,56 @@ impl MeteredStreamStats { } fn update_minute(&self) { - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_secs(); - - let elapsed_secs = now.saturating_sub(self.current_minute_start.load(Ordering::Relaxed)); - - if elapsed_secs >= 60 { - self.previous_minute_sent.store( - self.current_minute_sent.load(Ordering::Relaxed), - Ordering::Relaxed, - ); - self.previous_minute_received.store( - self.current_minute_received.load(Ordering::Relaxed), - Ordering::Relaxed, - ); + let elapsed_minutes_now = self.established.elapsed().whole_seconds() as u64 / 60; + let elapsed_minutes_last = self.minutes_elapsed_last.load(Ordering::Relaxed); + + if elapsed_minutes_now > elapsed_minutes_last { + if elapsed_minutes_now - elapsed_minutes_last > 1 { + self.previous_minute_sent.store(0, Ordering::Relaxed); + self.previous_minute_received.store(0, Ordering::Relaxed); + } else { + self.previous_minute_sent.store( + self.current_minute_sent.load(Ordering::Relaxed), + Ordering::Relaxed, + ); + self.previous_minute_received.store( + self.current_minute_received.load(Ordering::Relaxed), + Ordering::Relaxed, + ); + } + self.current_minute_sent.store(0, Ordering::Relaxed); self.current_minute_received.store(0, Ordering::Relaxed); - self.current_minute_start.store(now, Ordering::Relaxed); + self.minutes_elapsed_last + .store(elapsed_minutes_now, Ordering::Relaxed); } } /// Returns the upload throughput of the connection in bytes per second. pub fn sent_throughput(&self) -> f64 { - self.previous_minute_sent.load(Ordering::Relaxed) as f64 / 60.0 + let elapsed_minutes_now = self.established.elapsed().whole_seconds() as u64 / 60; + let elapsed_minutes_last = self.minutes_elapsed_last.load(Ordering::Relaxed); + + if elapsed_minutes_now - elapsed_minutes_last == 0 { + self.previous_minute_sent.load(Ordering::Relaxed) as f64 / 60.0 + } else if elapsed_minutes_now - elapsed_minutes_last == 1 { + self.current_minute_sent.load(Ordering::Relaxed) as f64 / 60.0 + } else { + 0.0 + } } /// Returns the download throughput of the connection in bytes per second. pub fn received_throughput(&self) -> f64 { - self.previous_minute_received.load(Ordering::Relaxed) as f64 / 60.0 + let elapsed_minutes_now = self.established.elapsed().whole_seconds() as u64 / 60; + let elapsed_minutes_last = self.minutes_elapsed_last.load(Ordering::Relaxed); + + if elapsed_minutes_now - elapsed_minutes_last == 0 { + self.previous_minute_received.load(Ordering::Relaxed) as f64 / 60.0 + } else if elapsed_minutes_now - elapsed_minutes_last == 1 { + self.current_minute_received.load(Ordering::Relaxed) as f64 / 60.0 + } else { + 0.0 + } } }