Skip to content

Commit

Permalink
Some review fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
brunoffranca committed Oct 6, 2024
1 parent 510be97 commit f4cb2a2
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 74 deletions.
46 changes: 26 additions & 20 deletions node/actors/network/src/debug_page/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,16 +389,7 @@ impl Server {
.with_header(2, "Fetch queue")
.with_paragraph(format!(
"Blocks: {:?}",
self.network
.gossip
.fetch_queue
.blocks
.subscribe()
.borrow()
.keys()
.map(|x| x.0)
.collect::<Vec<_>>()
.sort()
self.network.gossip.fetch_queue.current_blocks()
));

// Attester network
Expand All @@ -409,7 +400,7 @@ impl Server {
self.network
.gossip
.attestation
.key
.key()
.clone()
.map_or("None".to_string(), |k| k.public().encode())
));
Expand All @@ -418,27 +409,29 @@ impl Server {
.network
.gossip
.attestation
.state
.state()
.subscribe()
.borrow()
.clone()
{
html = html
.with_paragraph(format!(
"Batch to attest:\nNumber: {}, Hash: {}, Genesis hash: {}",
state.info.batch_to_attest.number,
state.info.batch_to_attest.hash.encode(),
state.info.batch_to_attest.genesis.encode(),
state.info().batch_to_attest.number,
state.info().batch_to_attest.hash.encode(),
state.info().batch_to_attest.genesis.encode(),
))
.with_header(2, "Committee")
.with_paragraph(Self::attester_committee_table(state.info.committee.iter()))
.with_paragraph(Self::attester_committee_table(
state.info().committee.iter(),
))
.with_paragraph(format!(
"Total weight: {}",
state.info.committee.total_weight()
state.info().committee.total_weight()
))
.with_header(2, "Votes")
.with_paragraph(Self::attester_votes_table(state.votes.iter()))
.with_paragraph(format!("Total weight: {}", state.total_weight));
.with_paragraph(Self::attester_votes_table(state.votes().iter()))
.with_paragraph(format!("Total weight: {}", state.total_weight()));
}

// Validator network
Expand Down Expand Up @@ -662,6 +655,19 @@ impl Server {
let minutes = (seconds % 3600) / 60;
let seconds = seconds % 60;

format!("{}d {}h {}m {}s", days, hours, minutes, seconds)
let mut components = Vec::new();

if days > 0 {
components.push(format!("{}d", days));
}
if hours > 0 {
components.push(format!("{}h", hours));
}
if minutes > 0 {
components.push(format!("{}m", minutes));
}

components.push(format!("{}s", seconds));
components.join(" ")
}
}
61 changes: 44 additions & 17 deletions node/actors/network/src/gossip/attestation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,31 @@ pub struct Info {
#[derive(Clone)]
pub struct State {
/// Info about the batch to attest and the committee.
pub info: Arc<Info>,
info: Arc<Info>,
/// Votes collected so far.
pub votes: im::HashMap<attester::PublicKey, Arc<attester::Signed<attester::Batch>>>,
votes: im::HashMap<attester::PublicKey, Arc<attester::Signed<attester::Batch>>>,
/// Total weight of the votes collected.
pub total_weight: attester::Weight,
total_weight: attester::Weight,
}

/// Diff between 2 states.
pub(crate) struct Diff {
/// New votes.
pub(crate) votes: Vec<Arc<attester::Signed<attester::Batch>>>,
/// New info, if changed.
pub(crate) info: Option<Arc<Info>>,
}
impl State {
/// Returns a reference to the `info` field.
pub fn info(&self) -> &Arc<Info> {
&self.info
}

impl Diff {
fn is_empty(&self) -> bool {
self.votes.is_empty() && self.info.is_none()
/// Returns a reference to the `votes` field.
pub fn votes(
&self,
) -> &im::HashMap<attester::PublicKey, Arc<attester::Signed<attester::Batch>>> {
&self.votes
}

/// Returns a reference to the `total_weight` field.
pub fn total_weight(&self) -> &attester::Weight {
&self.total_weight
}
}

impl State {
/// Returns a diff between `self` state and `old` state.
/// Diff contains votes which are present is `self`, but not in `old`.
fn diff(&self, old: &Option<Self>) -> Diff {
Expand Down Expand Up @@ -141,6 +144,20 @@ impl State {
}
}

/// Diff between 2 states.
pub(crate) struct Diff {
/// New votes.
pub(crate) votes: Vec<Arc<attester::Signed<attester::Batch>>>,
/// New info, if changed.
pub(crate) info: Option<Arc<Info>>,
}

impl Diff {
fn is_empty(&self) -> bool {
self.votes.is_empty() && self.info.is_none()
}
}

/// Receiver of state diffs.
pub(crate) struct DiffReceiver {
prev: Option<State>,
Expand Down Expand Up @@ -224,9 +241,9 @@ impl DiffReceiver {
pub struct Controller {
/// Key to automatically vote for batches.
/// None, if the current node is not an attester.
pub(crate) key: Option<attester::SecretKey>,
key: Option<attester::SecretKey>,
/// Internal state of the controller.
pub(crate) state: Watch<Option<State>>,
state: Watch<Option<State>>,
}

impl fmt::Debug for Controller {
Expand All @@ -247,6 +264,16 @@ impl Controller {
}
}

/// Returns a reference to the key, if it exists.
pub fn key(&self) -> Option<&attester::SecretKey> {
self.key.as_ref()
}

/// Returns a reference to the state.
pub fn state(&self) -> &Watch<Option<State>> {
&self.state
}

/// Registers metrics for this controller.
pub(crate) fn register_metrics(self: &Arc<Self>) {
metrics::Metrics::register(Arc::downgrade(self));
Expand Down
15 changes: 14 additions & 1 deletion node/actors/network/src/gossip/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub(crate) enum RequestItem {

/// Queue of block fetch request.
pub(crate) struct Queue {
pub(crate) blocks: sync::watch::Sender<BlockInner>,
blocks: sync::watch::Sender<BlockInner>,
batches: sync::watch::Sender<BatchInner>,
}

Expand All @@ -37,6 +37,19 @@ impl Default for Queue {
}

impl Queue {
/// Returns the sorted list of currently requested blocks.
pub(crate) fn current_blocks(&self) -> Vec<u64> {
let mut blocks = self
.blocks
.subscribe()
.borrow()
.keys()
.map(|x| x.0)
.collect::<Vec<_>>();
blocks.sort();
blocks
}

/// Requests a resource from peers and waits until it is stored.
/// Note: in the current implementation concurrent calls for the same resource number are
/// unsupported - second call will override the first call.
Expand Down
74 changes: 38 additions & 36 deletions node/actors/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,16 @@ pub struct MeteredStreamStats {
pub established: SystemTime,
/// IP Address and port of current connection.
pub peer_addr: SocketAddr,
/// Circular buffer for sent bytes in the last minute.
sent_buffer: VecDeque<(Instant, u64)>,
/// Circular buffer for received bytes in the last minute.
received_buffer: VecDeque<(Instant, u64)>,
/// Total bytes sent in the current minute.
current_minute_sent: AtomicU64,
/// Total bytes sent in the previous minute.
previous_minute_sent: AtomicU64,
/// Total bytes received in the current minute.
current_minute_received: AtomicU64,
/// Total bytes received in the previous minute.
previous_minute_received: AtomicU64,
/// The start of the current minute.
current_minute_start: Instant,
}

impl MeteredStreamStats {
Expand All @@ -245,58 +251,54 @@ impl MeteredStreamStats {
received: 0.into(),
established: SystemTime::now(),
peer_addr,
sent_buffer: VecDeque::new(),
received_buffer: VecDeque::new(),
current_minute_sent: 0.into(),
previous_minute_sent: 0.into(),
current_minute_received: 0.into(),
previous_minute_received: 0.into(),
current_minute_start: Instant::now(),
}
}

fn read(&mut self, amount: u64) {
self.update_minute();
self.received.fetch_add(amount, Ordering::Relaxed);
self.received_buffer.push_back((Instant::now(), amount));
self.cleanup_buffers();
self.current_minute_received
.fetch_add(amount, Ordering::Relaxed);
}

fn wrote(&mut self, amount: u64) {
self.update_minute();
self.sent.fetch_add(amount, Ordering::Relaxed);
self.sent_buffer.push_back((Instant::now(), amount));
self.cleanup_buffers();
self.current_minute_sent
.fetch_add(amount, Ordering::Relaxed);
}

fn cleanup_buffers(&mut self) {
let one_minute_ago = Instant::now() - Duration::from_secs(60);
fn update_minute(&mut self) {
let now = Instant::now();
let elapsed = now.duration_since(self.current_minute_start);

while let Some(&(timestamp, _)) = self.sent_buffer.front() {
if timestamp < one_minute_ago {
self.sent_buffer.pop_front();
} else {
break;
}
}

while let Some(&(timestamp, _)) = self.received_buffer.front() {
if timestamp < one_minute_ago {
self.received_buffer.pop_front();
} else {
break;
}
if elapsed >= Duration::from_secs(60) {
self.previous_minute_sent.store(
self.current_minute_sent.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.previous_minute_received.store(
self.current_minute_received.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.current_minute_sent.store(0, Ordering::Relaxed);
self.current_minute_received.store(0, Ordering::Relaxed);
self.current_minute_start = now;
}
}

/// Returns the upload throughput of the connection in bytes per second.
pub fn sent_throughput(&self) -> f64 {
self.sent_buffer
.iter()
.map(|&(_, amount)| amount)
.sum::<u64>() as f64
/ 60.0
self.previous_minute_sent.load(Ordering::Relaxed) as f64 / 60.0
}

/// Returns the download throughput of the connection in bytes per second.
pub fn received_throughput(&self) -> f64 {
self.received_buffer
.iter()
.map(|&(_, amount)| amount)
.sum::<u64>() as f64
/ 60.0
self.previous_minute_received.load(Ordering::Relaxed) as f64 / 60.0
}
}

0 comments on commit f4cb2a2

Please sign in to comment.