Skip to content

Commit

Permalink
feat(consensus): add tracing instrumentation and logging to consensus
Browse files Browse the repository at this point in the history
  • Loading branch information
matan-starkware committed Jun 24, 2024
1 parent 77789e7 commit 99ad6c4
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 5 deletions.
4 changes: 2 additions & 2 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use starknet_client::reader::PendingData;
use tokio::sync::RwLock;
use tokio::task::{JoinError, JoinHandle};
use tracing::metadata::LevelFilter;
use tracing::{debug_span, error, info, warn, Instrument};
use tracing::{debug, debug_span, error, info, warn, Instrument};
use tracing_subscriber::prelude::*;
use tracing_subscriber::{fmt, EnvFilter};

Expand Down Expand Up @@ -98,7 +98,7 @@ fn run_consensus(
consensus_channels: BroadcastSubscriberChannels<ConsensusMessage>,
) -> anyhow::Result<JoinHandle<Result<(), ConsensusError>>> {
let Ok(validator_id) = env::var("CONSENSUS_VALIDATOR_ID") else {
info!("CONSENSUS_VALIDATOR_ID is not set. Not run consensus.");
debug!("CONSENSUS_VALIDATOR_ID is not set. Not run consensus.");
return Ok(tokio::spawn(pending()));
};
info!("Running consensus as validator {validator_id}");
Expand Down
9 changes: 6 additions & 3 deletions crates/sequencing/papyrus_consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use papyrus_network::network_manager::SubscriberReceiver;
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal};
use single_height_consensus::SingleHeightConsensus;
use starknet_api::block::{BlockHash, BlockNumber};
use tracing::info;
use tracing::{debug, info, instrument};
use types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId};

// TODO(matan): Remove dead code allowance at the end of milestone 1.
Expand All @@ -20,6 +20,10 @@ pub mod types;
use futures::StreamExt;

// TODO(dvir): add test for this.
#[instrument(
skip(context, start_height, network_receiver),
fields(validator_id = %validator_id)
)]
pub async fn run_consensus<BlockT: ConsensusBlock>(
context: Arc<dyn ConsensusContext<Block = BlockT>>,
start_height: BlockNumber,
Expand All @@ -32,12 +36,11 @@ where
{
let mut current_height = start_height;
loop {
info!("Starting consensus for height {current_height}");
debug!("Starting consensus for height {current_height}");
let mut shc =
SingleHeightConsensus::new(current_height, context.clone(), validator_id).await;

let block = if let Some(block) = shc.start().await? {
info!("Proposer flow height {current_height}");
block
} else {
info!("Validator flow height {current_height}");
Expand Down
14 changes: 14 additions & 0 deletions crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::Arc;

use futures::channel::{mpsc, oneshot};
use starknet_api::block::{BlockHash, BlockNumber};
use tracing::{debug, info, instrument};

use crate::types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId};

Expand Down Expand Up @@ -36,11 +37,18 @@ where
Self { height, context, validators, id }
}

#[instrument(skip(self), fields(height=self.height.0), level = "info")]
pub(crate) async fn start(&mut self) -> Result<Option<BlockT>, ConsensusError> {
info!(
"Starting consensus with validators {:?}",
self.validators.iter().map(|id| format!("{}", id)).collect::<Vec<_>>()
);

let proposer_id = self.context.proposer(&self.validators, self.height);
if proposer_id != self.id {
return Ok(None);
}
debug!("Proposer flow");

let (content_receiver, block_receiver) = self.context.build_proposal(self.height).await;
let (fin_sender, fin_receiver) = oneshot::channel();
Expand All @@ -62,12 +70,18 @@ where

/// Receive a proposal from a peer node. Returns only once the proposal has been fully received
/// and processed.
#[instrument(
skip(self, init, content_receiver, fin_receiver),
fields(height = %self.height),
level = "info"
)]
pub(crate) async fn handle_proposal(
&mut self,
init: ProposalInit,
content_receiver: mpsc::Receiver<<BlockT as ConsensusBlock>::ProposalChunk>,
fin_receiver: oneshot::Receiver<BlockHash>,
) -> Result<Option<BlockT>, ConsensusError> {
debug!("Received proposal: proposal_height={}, proposer={}", init.height.0, init.proposer);
let proposer_id = self.context.proposer(&self.validators, self.height);
if init.height != self.height {
let msg = format!("invalid height: expected {:?}, got {:?}", self.height, init.height);
Expand Down

0 comments on commit 99ad6c4

Please sign in to comment.