Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[consensus improve logging when receiving blocks #20825

Merged
merged 2 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions consensus/config/src/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,9 @@ impl AuthorityIndex {
}
}

// TODO: re-evaluate formats for production debugging.
impl Display for AuthorityIndex {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if self.value() < 26 {
let c = (b'A' + self.value() as u8) as char;
f.write_str(c.to_string().as_str())
} else {
write!(f, "[{:02}]", self.value())
}
write!(f, "[{}]", self.value())
}
}

Expand Down
21 changes: 18 additions & 3 deletions consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{sync::Arc, time::Instant};

use consensus_config::{AuthorityIndex, Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
use itertools::Itertools;
use parking_lot::RwLock;
use prometheus::Registry;
use sui_protocol_config::{ConsensusNetwork, ProtocolConfig};
Expand Down Expand Up @@ -176,11 +177,25 @@ where
registry: Registry,
boot_counter: u64,
) -> Self {
assert!(
committee.is_valid_index(own_index),
"Invalid own index {}",
own_index
);
let own_hostname = &committee.authority(own_index).hostname;
info!(
"Starting consensus authority {} {}, {:?}, boot counter {}",
own_index, own_hostname, protocol_config.version, boot_counter
);
info!(
"Starting consensus authority {}\n{:#?}\n{:#?}\n{:?}\nBoot counter: {}",
own_index, committee, parameters, protocol_config.version, boot_counter
"Consensus authorities: {}",
committee
.authorities()
.map(|(i, a)| format!("{}: {}", i, a.hostname))
.join(", ")
);
assert!(committee.is_valid_index(own_index));
info!("Consensus parameters: {:?}", parameters);
info!("Consensus committee: {:?}", committee);
let context = Arc::new(Context::new(
own_index,
committee,
Expand Down
16 changes: 8 additions & 8 deletions consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use parking_lot::RwLock;
use sui_macros::fail_point_async;
use tokio::{sync::broadcast, time::sleep};
use tokio_util::sync::ReusableBoxFuture;
use tracing::{debug, info, trace, warn};
use tracing::{debug, info, warn};

use crate::{
block::{BlockAPI as _, BlockRef, SignedBlock, VerifiedBlock, GENESIS_ROUND},
Expand Down Expand Up @@ -114,8 +114,8 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
return Err(e);
}
let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);

trace!("Received block {verified_block} via send block.");
let block_ref = verified_block.reference();
debug!("Received block {} via send block.", block_ref);

// Reject block with timestamp too far in the future.
let now = self.context.clock.timestamp_utc_ms();
Expand All @@ -130,12 +130,12 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
.inc();
debug!(
"Block {:?} timestamp ({} > {}) is too far in the future, rejected.",
verified_block.reference(),
block_ref,
verified_block.timestamp_ms(),
now,
);
return Err(ConsensusError::BlockRejected {
block_ref: verified_block.reference(),
block_ref,
reason: format!(
"Block timestamp is too far in the future: {} > {}",
verified_block.timestamp_ms(),
Expand All @@ -154,7 +154,7 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
.inc_by(forward_time_drift.as_millis() as u64);
debug!(
"Block {:?} timestamp ({} > {}) is in the future, waiting for {}ms",
verified_block.reference(),
block_ref,
verified_block.timestamp_ms(),
now,
forward_time_drift.as_millis(),
Expand Down Expand Up @@ -189,12 +189,12 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
.inc();
debug!(
"Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
verified_block.reference(),
block_ref,
last_commit_index,
quorum_commit_index,
);
return Err(ConsensusError::BlockRejected {
block_ref: verified_block.reference(),
block_ref,
reason: format!(
"Last commit index is lagging quorum commit index too much ({} < {})",
last_commit_index, quorum_commit_index,
Expand Down
4 changes: 2 additions & 2 deletions consensus/core/src/block_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ impl BlockManager {
let block = match self.try_accept_one_block(block) {
TryAcceptResult::Accepted(block) => block,
TryAcceptResult::Suspended(ancestors_to_fetch) => {
trace!(
"Missing ancestors for block {block_ref}: {}",
debug!(
"Missing ancestors to fetch for block {block_ref}: {}",
ancestors_to_fetch.iter().map(|b| b.to_string()).join(",")
);
missing_blocks.extend(ancestors_to_fetch);
Expand Down
5 changes: 4 additions & 1 deletion consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,10 @@ impl Core {
};

if !missing_block_refs.is_empty() {
debug!("Missing block refs: {:?}", missing_block_refs);
trace!(
"Missing block refs: {}",
missing_block_refs.iter().map(|b| b.to_string()).join(", ")
);
}

Ok(missing_block_refs)
Expand Down
10 changes: 6 additions & 4 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1378,7 +1378,9 @@ mod test {
}

#[tokio::test]
#[should_panic(expected = "Attempted to check for slot A8 that is <= the last evicted round 8")]
#[should_panic(
expected = "Attempted to check for slot [0]8 that is <= the last evicted round 8"
)]
async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
/// Only keep elements up to 2 rounds before the last committed round
const CACHED_ROUNDS: Round = 2;
Expand Down Expand Up @@ -1420,7 +1422,7 @@ mod test {

#[tokio::test]
#[should_panic(
expected = "Attempted to check for slot B3 that is <= the last gc evicted round 3"
expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3"
)]
async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range_gc_enabled() {
/// Keep 2 rounds from the highest committed round. This is considered universal and minimum necessary blocks to hold
Expand Down Expand Up @@ -2036,7 +2038,7 @@ mod test {

#[tokio::test]
#[should_panic(
expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority C"
expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
)]
async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
// GIVEN
Expand Down Expand Up @@ -2083,7 +2085,7 @@ mod test {

#[tokio::test]
#[should_panic(
expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority C"
expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
)]
async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range_gc_enabled() {
// GIVEN
Expand Down
25 changes: 18 additions & 7 deletions consensus/core/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,10 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
.await
{
Ok(blocks) => {
debug!("Subscribed to peer {} after {} attempts", peer, retries);
debug!(
"Subscribed to peer {} {} after {} attempts",
peer, peer_hostname, retries
);
context
.metrics
.node_metrics
Expand All @@ -170,7 +173,10 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
blocks
}
Err(e) => {
debug!("Failed to subscribe to blocks from peer {}: {}", peer, e);
debug!(
"Failed to subscribe to blocks from peer {} {}: {}",
peer, peer_hostname, e
);
context
.metrics
.node_metrics
Expand All @@ -182,7 +188,6 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
};

// Now can consider the subscription successful
let peer_hostname = &context.committee.authority(peer).hostname;
context
.metrics
.node_metrics
Expand All @@ -206,20 +211,26 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
match e {
ConsensusError::BlockRejected { block_ref, reason } => {
debug!(
"Failed to process block from peer {} for block {:?}: {}",
peer, block_ref, reason
"Failed to process block from peer {} {} for block {:?}: {}",
peer, peer_hostname, block_ref, reason
);
}
_ => {
info!("Invalid block received from peer {}: {}", peer, e,);
info!(
"Invalid block received from peer {} {}: {}",
peer, peer_hostname, e
);
}
}
}
// Reset retries when a block is received.
retries = 0;
}
None => {
debug!("Subscription to blocks from peer {} ended", peer);
debug!(
"Subscription to blocks from peer {} {} ended",
peer, peer_hostname
);
retries += 1;
break 'stream;
}
Expand Down
21 changes: 11 additions & 10 deletions consensus/core/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
commands_sender: Sender<Command>,
) {
const MAX_RETRIES: u32 = 5;

let peer_hostname = &context.committee.authority(peer_index).hostname;
let mut requests = FuturesUnordered::new();

loop {
Expand All @@ -464,14 +464,14 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
commands_sender.clone(),
"live"
).await {
warn!("Error while processing fetched blocks from peer {peer_index}: {err}");
warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
}
},
Err(_) => {
if retries <= MAX_RETRIES {
requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, retries))
} else {
warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index}.");
warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
// we don't necessarily need to do, but dropping the guard here to unlock the blocks
drop(blocks_guard);
}
Expand Down Expand Up @@ -558,8 +558,9 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
}

debug!(
"Synced missing ancestor blocks {} from peer {peer_index}",
blocks.iter().map(|b| b.reference().to_string()).join(","),
"Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
blocks.len(),
blocks.iter().map(|b| b.reference().to_string()).join(", "),
);

// Now send them to core for processing. Ignore the returned missing blocks as we don't want
Expand Down Expand Up @@ -640,8 +641,8 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
let now = context.clock.timestamp_utc_ms();
if now < verified_block.timestamp_ms() {
warn!(
"Fetched block {} timestamp {} is in the future (now={}). Ignoring.",
verified_block,
"Synced block {} timestamp {} is in the future (now={}). Ignoring.",
verified_block.reference(),
verified_block.timestamp_ms(),
now
);
Expand Down Expand Up @@ -994,8 +995,9 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
// lock the blocks to be fetched. If no lock can be acquired for any of the blocks then don't bother
if let Some(blocks_guard) = inflight_blocks.lock_blocks(block_refs.clone(), peer) {
info!(
"Fetching {} missing blocks from peer {}: {}",
"Periodic sync of {} missing blocks from peer {} {}: {}",
block_refs.len(),
peer,
peer_hostname,
block_refs
.iter()
Expand Down Expand Up @@ -1025,7 +1027,6 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
let peer_hostname = &context.committee.authority(peer_index).hostname;
match response {
Ok(fetched_blocks) => {
info!("Fetched {} blocks from peer {}", fetched_blocks.len(), peer_hostname);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this log?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicates the log line Synced {} missing blocks from peer ..., without the block refs.

results.push((blocks_guard, fetched_blocks, peer_index));

// no more pending requests are left, just break the loop
Expand All @@ -1039,7 +1040,7 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
// do best effort to lock guards. If we can't lock then don't bother at this run.
if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
info!(
"Retrying fetching {} missing blocks from peer {}: {}",
"Retrying syncing {} missing blocks from peer {}: {}",
blocks_guard.block_refs.len(),
peer_hostname,
blocks_guard.block_refs
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/consensus_manager/mysticeti_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl ConsensusManagerTrait for MysticetiManager {
*boot_counter += 1;
} else {
info!(
"Node has not participated in previous run. Boot counter will not increment {}",
"Node has not participated in previous epoch consensus. Boot counter ({}) will not increment.",
*boot_counter
);
}
Expand Down
Loading