Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[consensus improve logging when receiving blocks #20825

Merged
merged 2 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions consensus/config/src/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,9 @@ impl AuthorityIndex {
}
}

// TODO: re-evaluate formats for production debugging.
impl Display for AuthorityIndex {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if self.value() < 26 {
let c = (b'A' + self.value() as u8) as char;
f.write_str(c.to_string().as_str())
} else {
write!(f, "[{:02}]", self.value())
}
write!(f, "[{}]", self.value())
}
}

Expand Down
21 changes: 18 additions & 3 deletions consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{sync::Arc, time::Instant};

use consensus_config::{AuthorityIndex, Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
use itertools::Itertools;
use parking_lot::RwLock;
use prometheus::Registry;
use sui_protocol_config::{ConsensusNetwork, ProtocolConfig};
Expand Down Expand Up @@ -176,11 +177,25 @@ where
registry: Registry,
boot_counter: u64,
) -> Self {
assert!(
committee.is_valid_index(own_index),
"Invalid own index {}",
own_index
);
let own_hostname = &committee.authority(own_index).hostname;
info!(
"Starting consensus authority {} {}, {:?}, boot counter {}",
own_index, own_hostname, protocol_config.version, boot_counter
);
info!(
"Starting consensus authority {}\n{:#?}\n{:#?}\n{:?}\nBoot counter: {}",
own_index, committee, parameters, protocol_config.version, boot_counter
"Consensus authorities: {}",
committee
.authorities()
.map(|(i, a)| format!("{}: {}", i, a.hostname))
.join(", ")
);
assert!(committee.is_valid_index(own_index));
info!("Consensus parameters: {:?}", parameters);
info!("Consensus committee: {:?}", committee);
let context = Arc::new(Context::new(
own_index,
committee,
Expand Down
16 changes: 8 additions & 8 deletions consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use parking_lot::RwLock;
use sui_macros::fail_point_async;
use tokio::{sync::broadcast, time::sleep};
use tokio_util::sync::ReusableBoxFuture;
use tracing::{debug, info, trace, warn};
use tracing::{debug, info, warn};

use crate::{
block::{BlockAPI as _, BlockRef, SignedBlock, VerifiedBlock, GENESIS_ROUND},
Expand Down Expand Up @@ -114,8 +114,8 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
return Err(e);
}
let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);

trace!("Received block {verified_block} via send block.");
let block_ref = verified_block.reference();
debug!("Received block {} via send block.", block_ref);

// Reject block with timestamp too far in the future.
let now = self.context.clock.timestamp_utc_ms();
Expand All @@ -130,12 +130,12 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
.inc();
debug!(
"Block {:?} timestamp ({} > {}) is too far in the future, rejected.",
verified_block.reference(),
block_ref,
verified_block.timestamp_ms(),
now,
);
return Err(ConsensusError::BlockRejected {
block_ref: verified_block.reference(),
block_ref,
reason: format!(
"Block timestamp is too far in the future: {} > {}",
verified_block.timestamp_ms(),
Expand All @@ -154,7 +154,7 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
.inc_by(forward_time_drift.as_millis() as u64);
debug!(
"Block {:?} timestamp ({} > {}) is in the future, waiting for {}ms",
verified_block.reference(),
block_ref,
verified_block.timestamp_ms(),
now,
forward_time_drift.as_millis(),
Expand Down Expand Up @@ -189,12 +189,12 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
.inc();
debug!(
"Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
verified_block.reference(),
block_ref,
last_commit_index,
quorum_commit_index,
);
return Err(ConsensusError::BlockRejected {
block_ref: verified_block.reference(),
block_ref,
reason: format!(
"Last commit index is lagging quorum commit index too much ({} < {})",
last_commit_index, quorum_commit_index,
Expand Down
10 changes: 6 additions & 4 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1378,7 +1378,9 @@ mod test {
}

#[tokio::test]
#[should_panic(expected = "Attempted to check for slot A8 that is <= the last evicted round 8")]
#[should_panic(
expected = "Attempted to check for slot [0]8 that is <= the last evicted round 8"
)]
async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
/// Only keep elements up to 2 rounds before the last committed round
const CACHED_ROUNDS: Round = 2;
Expand Down Expand Up @@ -1420,7 +1422,7 @@ mod test {

#[tokio::test]
#[should_panic(
expected = "Attempted to check for slot B3 that is <= the last gc evicted round 3"
expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3"
)]
async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range_gc_enabled() {
/// Keep 2 rounds from the highest committed round. This is considered universal and minimum necessary blocks to hold
Expand Down Expand Up @@ -2036,7 +2038,7 @@ mod test {

#[tokio::test]
#[should_panic(
expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority C"
expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
)]
async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
// GIVEN
Expand Down Expand Up @@ -2083,7 +2085,7 @@ mod test {

#[tokio::test]
#[should_panic(
expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority C"
expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
)]
async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range_gc_enabled() {
// GIVEN
Expand Down
25 changes: 18 additions & 7 deletions consensus/core/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,10 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
.await
{
Ok(blocks) => {
debug!("Subscribed to peer {} after {} attempts", peer, retries);
debug!(
"Subscribed to peer {} {} after {} attempts",
peer, peer_hostname, retries
);
context
.metrics
.node_metrics
Expand All @@ -170,7 +173,10 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
blocks
}
Err(e) => {
debug!("Failed to subscribe to blocks from peer {}: {}", peer, e);
debug!(
"Failed to subscribe to blocks from peer {} {}: {}",
peer, peer_hostname, e
);
context
.metrics
.node_metrics
Expand All @@ -182,7 +188,6 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
};

// Now can consider the subscription successful
let peer_hostname = &context.committee.authority(peer).hostname;
context
.metrics
.node_metrics
Expand All @@ -206,20 +211,26 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
match e {
ConsensusError::BlockRejected { block_ref, reason } => {
debug!(
"Failed to process block from peer {} for block {:?}: {}",
peer, block_ref, reason
"Failed to process block from peer {} {} for block {:?}: {}",
peer, peer_hostname, block_ref, reason
);
}
_ => {
info!("Invalid block received from peer {}: {}", peer, e,);
info!(
"Invalid block received from peer {} {}: {}",
peer, peer_hostname, e
);
}
}
}
// Reset retries when a block is received.
retries = 0;
}
None => {
debug!("Subscription to blocks from peer {} ended", peer);
debug!(
"Subscription to blocks from peer {} {} ended",
peer, peer_hostname
);
retries += 1;
break 'stream;
}
Expand Down
16 changes: 8 additions & 8 deletions consensus/core/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
commands_sender: Sender<Command>,
) {
const MAX_RETRIES: u32 = 5;

let peer_hostname = &context.committee.authority(peer_index).hostname;
let mut requests = FuturesUnordered::new();

loop {
Expand All @@ -464,14 +464,14 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
commands_sender.clone(),
"live"
).await {
warn!("Error while processing fetched blocks from peer {peer_index}: {err}");
warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
}
},
Err(_) => {
if retries <= MAX_RETRIES {
requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, retries))
} else {
warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index}.");
warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
// we don't necessarily need to do, but dropping the guard here to unlock the blocks
drop(blocks_guard);
}
Expand Down Expand Up @@ -558,8 +558,9 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
}

debug!(
"Synced missing ancestor blocks {} from peer {peer_index}",
blocks.iter().map(|b| b.reference().to_string()).join(","),
"Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
blocks.len(),
blocks.iter().map(|b| b.reference().to_string()).join(", "),
);

// Now send them to core for processing. Ignore the returned missing blocks as we don't want
Expand Down Expand Up @@ -640,8 +641,8 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
let now = context.clock.timestamp_utc_ms();
if now < verified_block.timestamp_ms() {
warn!(
"Fetched block {} timestamp {} is in the future (now={}). Ignoring.",
verified_block,
"Synced block {} timestamp {} is in the future (now={}). Ignoring.",
verified_block.reference(),
verified_block.timestamp_ms(),
now
);
Expand Down Expand Up @@ -1025,7 +1026,6 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
let peer_hostname = &context.committee.authority(peer_index).hostname;
match response {
Ok(fetched_blocks) => {
info!("Fetched {} blocks from peer {}", fetched_blocks.len(), peer_hostname);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this log?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicates the log line Synced {} missing blocks from peer ..., without the block refs.

results.push((blocks_guard, fetched_blocks, peer_index));

// no more pending requests are left, just break the loop
Expand Down
Loading