Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(consensus): add tracing instrumentation and logging to consensus #2138

Merged
merged 1 commit into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ fn run_consensus(
consensus_channels: BroadcastSubscriberChannels<ConsensusMessage>,
) -> anyhow::Result<JoinHandle<Result<(), ConsensusError>>> {
let Ok(validator_id) = env::var("CONSENSUS_VALIDATOR_ID") else {
info!("CONSENSUS_VALIDATOR_ID is not set. Not run consensus.");
info!("CONSENSUS_VALIDATOR_ID is not set. Not running consensus.");
return Ok(tokio::spawn(pending()));
};
info!("Running consensus as validator {validator_id}");
Expand Down
10 changes: 5 additions & 5 deletions crates/sequencing/papyrus_consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use papyrus_network::network_manager::SubscriberReceiver;
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal};
use single_height_consensus::SingleHeightConsensus;
use starknet_api::block::{BlockHash, BlockNumber};
use tracing::info;
use tracing::{debug, info, instrument};
use types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId};

// TODO(matan): Remove dead code allowance at the end of milestone 1.
Expand All @@ -37,6 +37,7 @@ pub mod types;
use futures::StreamExt;

// TODO(dvir): add test for this.
#[instrument(skip(context, start_height, network_receiver), level = "info")]
#[allow(missing_docs)]
pub async fn run_consensus<BlockT: ConsensusBlock>(
context: Arc<dyn ConsensusContext<Block = BlockT>>,
Expand All @@ -50,12 +51,11 @@ where
{
let mut current_height = start_height;
loop {
info!("Starting consensus for height {current_height}");
debug!("Starting consensus for height {current_height}");
let mut shc =
SingleHeightConsensus::new(current_height, context.clone(), validator_id).await;

let block = if let Some(block) = shc.start().await? {
info!("Proposer flow height {current_height}");
block
} else {
info!("Validator flow height {current_height}");
Expand All @@ -73,8 +73,8 @@ where
};

info!(
"Finished consensus for height: {start_height}. Agreed on block with id: {}",
block.id()
"Finished consensus for height: {current_height}. Agreed on block with id: {:x}",
block.id().0
);
current_height = current_height.unchecked_next();
}
Expand Down
14 changes: 14 additions & 0 deletions crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::Arc;

use futures::channel::{mpsc, oneshot};
use starknet_api::block::{BlockHash, BlockNumber};
use tracing::{debug, info, instrument};

use crate::types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId};

Expand Down Expand Up @@ -36,11 +37,15 @@ where
Self { height, context, validators, id }
}

#[instrument(skip(self), fields(height=self.height.0), level = "debug")]
pub(crate) async fn start(&mut self) -> Result<Option<BlockT>, ConsensusError> {
info!("Starting consensus with validators {:?}", self.validators);

let proposer_id = self.context.proposer(&self.validators, self.height);
if proposer_id != self.id {
return Ok(None);
}
debug!("Proposer flow");

let (content_receiver, block_receiver) = self.context.build_proposal(self.height).await;
let (fin_sender, fin_receiver) = oneshot::channel();
Expand All @@ -62,12 +67,21 @@ where

/// Receive a proposal from a peer node. Returns only once the proposal has been fully received
/// and processed.
#[instrument(
skip(self, init, content_receiver, fin_receiver),
fields(height = %self.height),
level = "debug",
)]
pub(crate) async fn handle_proposal(
&mut self,
init: ProposalInit,
content_receiver: mpsc::Receiver<<BlockT as ConsensusBlock>::ProposalChunk>,
fin_receiver: oneshot::Receiver<BlockHash>,
) -> Result<Option<BlockT>, ConsensusError> {
debug!(
"Received proposal: proposal_height={}, proposer={:?}",
init.height.0, init.proposer
);
let proposer_id = self.context.proposer(&self.validators, self.height);
if init.height != self.height {
let msg = format!("invalid height: expected {:?}, got {:?}", self.height, init.height);
Expand Down
Loading