Skip to content

Commit

Permalink
Improve logging on receiving blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
mwtian committed Jan 9, 2025
1 parent 9501424 commit ee4946e
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 33 deletions.
8 changes: 1 addition & 7 deletions consensus/config/src/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,9 @@ impl AuthorityIndex {
}
}

// TODO: re-evaluate formats for production debugging.
impl Display for AuthorityIndex {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if self.value() < 26 {
let c = (b'A' + self.value() as u8) as char;
f.write_str(c.to_string().as_str())
} else {
write!(f, "[{:02}]", self.value())
}
write!(f, "[{}]", self.value())
}
}

Expand Down
21 changes: 18 additions & 3 deletions consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{sync::Arc, time::Instant};

use consensus_config::{AuthorityIndex, Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
use itertools::Itertools;
use parking_lot::RwLock;
use prometheus::Registry;
use sui_protocol_config::{ConsensusNetwork, ProtocolConfig};
Expand Down Expand Up @@ -176,11 +177,25 @@ where
registry: Registry,
boot_counter: u64,
) -> Self {
assert!(
committee.is_valid_index(own_index),
"Invalid own index {}",
own_index
);
let own_hostname = &committee.authority(own_index).hostname;
info!(
"Starting consensus authority {} {}, {:?}, boot counter {}",
own_index, own_hostname, protocol_config.version, boot_counter
);
info!(
"Starting consensus authority {}\n{:#?}\n{:#?}\n{:?}\nBoot counter: {}",
own_index, committee, parameters, protocol_config.version, boot_counter
"Consensus authorities: {}",
committee
.authorities()
.map(|(i, a)| format!("{}: {}", i, a.hostname))
.join(", ")
);
assert!(committee.is_valid_index(own_index));
info!("Consensus parameters: {:?}", parameters);
info!("Consensus committee: {:?}", committee);
let context = Arc::new(Context::new(
own_index,
committee,
Expand Down
16 changes: 8 additions & 8 deletions consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use parking_lot::RwLock;
use sui_macros::fail_point_async;
use tokio::{sync::broadcast, time::sleep};
use tokio_util::sync::ReusableBoxFuture;
use tracing::{debug, info, trace, warn};
use tracing::{debug, info, warn};

use crate::{
block::{BlockAPI as _, BlockRef, SignedBlock, VerifiedBlock, GENESIS_ROUND},
Expand Down Expand Up @@ -114,8 +114,8 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
return Err(e);
}
let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);

trace!("Received block {verified_block} via send block.");
let block_ref = verified_block.reference();
debug!("Received block {} via send block.", block_ref);

// Reject block with timestamp too far in the future.
let now = self.context.clock.timestamp_utc_ms();
Expand All @@ -130,12 +130,12 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
.inc();
debug!(
"Block {:?} timestamp ({} > {}) is too far in the future, rejected.",
verified_block.reference(),
block_ref,
verified_block.timestamp_ms(),
now,
);
return Err(ConsensusError::BlockRejected {
block_ref: verified_block.reference(),
block_ref,
reason: format!(
"Block timestamp is too far in the future: {} > {}",
verified_block.timestamp_ms(),
Expand All @@ -154,7 +154,7 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
.inc_by(forward_time_drift.as_millis() as u64);
debug!(
"Block {:?} timestamp ({} > {}) is in the future, waiting for {}ms",
verified_block.reference(),
block_ref,
verified_block.timestamp_ms(),
now,
forward_time_drift.as_millis(),
Expand Down Expand Up @@ -189,12 +189,12 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
.inc();
debug!(
"Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
verified_block.reference(),
block_ref,
last_commit_index,
quorum_commit_index,
);
return Err(ConsensusError::BlockRejected {
block_ref: verified_block.reference(),
block_ref,
reason: format!(
"Last commit index is lagging quorum commit index too much ({} < {})",
last_commit_index, quorum_commit_index,
Expand Down
25 changes: 18 additions & 7 deletions consensus/core/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,10 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
.await
{
Ok(blocks) => {
debug!("Subscribed to peer {} after {} attempts", peer, retries);
debug!(
"Subscribed to peer {} {} after {} attempts",
peer, peer_hostname, retries
);
context
.metrics
.node_metrics
Expand All @@ -170,7 +173,10 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
blocks
}
Err(e) => {
debug!("Failed to subscribe to blocks from peer {}: {}", peer, e);
debug!(
"Failed to subscribe to blocks from peer {} {}: {}",
peer, peer_hostname, e
);
context
.metrics
.node_metrics
Expand All @@ -182,7 +188,6 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
};

// Now can consider the subscription successful
let peer_hostname = &context.committee.authority(peer).hostname;
context
.metrics
.node_metrics
Expand All @@ -206,20 +211,26 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
match e {
ConsensusError::BlockRejected { block_ref, reason } => {
debug!(
"Failed to process block from peer {} for block {:?}: {}",
peer, block_ref, reason
"Failed to process block from peer {} {} for block {:?}: {}",
peer, peer_hostname, block_ref, reason
);
}
_ => {
info!("Invalid block received from peer {}: {}", peer, e,);
info!(
"Invalid block received from peer {} {}: {}",
peer, peer_hostname, e
);
}
}
}
// Reset retries when a block is received.
retries = 0;
}
None => {
debug!("Subscription to blocks from peer {} ended", peer);
debug!(
"Subscription to blocks from peer {} {} ended",
peer, peer_hostname
);
retries += 1;
break 'stream;
}
Expand Down
16 changes: 8 additions & 8 deletions consensus/core/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
commands_sender: Sender<Command>,
) {
const MAX_RETRIES: u32 = 5;

let peer_hostname = &context.committee.authority(peer_index).hostname;
let mut requests = FuturesUnordered::new();

loop {
Expand All @@ -464,14 +464,14 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
commands_sender.clone(),
"live"
).await {
warn!("Error while processing fetched blocks from peer {peer_index}: {err}");
warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
}
},
Err(_) => {
if retries <= MAX_RETRIES {
requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, retries))
} else {
warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index}.");
warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
// we don't necessarily need to do, but dropping the guard here to unlock the blocks
drop(blocks_guard);
}
Expand Down Expand Up @@ -558,8 +558,9 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
}

debug!(
"Synced missing ancestor blocks {} from peer {peer_index}",
blocks.iter().map(|b| b.reference().to_string()).join(","),
"Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
blocks.len(),
blocks.iter().map(|b| b.reference().to_string()).join(", "),
);

// Now send them to core for processing. Ignore the returned missing blocks as we don't want
Expand Down Expand Up @@ -640,8 +641,8 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
let now = context.clock.timestamp_utc_ms();
if now < verified_block.timestamp_ms() {
warn!(
"Fetched block {} timestamp {} is in the future (now={}). Ignoring.",
verified_block,
"Synced block {} timestamp {} is in the future (now={}). Ignoring.",
verified_block.reference(),
verified_block.timestamp_ms(),
now
);
Expand Down Expand Up @@ -1025,7 +1026,6 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
let peer_hostname = &context.committee.authority(peer_index).hostname;
match response {
Ok(fetched_blocks) => {
info!("Fetched {} blocks from peer {}", fetched_blocks.len(), peer_hostname);
results.push((blocks_guard, fetched_blocks, peer_index));

// no more pending requests are left, just break the loop
Expand Down

0 comments on commit ee4946e

Please sign in to comment.