Skip to content

Commit

Permalink
feat(consensus): add an error type for consensus and utlize in Single…
Browse files Browse the repository at this point in the history
…HeightConsensus
  • Loading branch information
matan-starkware committed May 28, 2024
1 parent 00fa36f commit ea805e8
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 14 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sequencing/papyrus_consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ async-trait.workspace = true
futures.workspace = true
starknet_api.workspace = true
tokio = { workspace = true, features = ["full"] }
thiserror.workspace = true

[dev-dependencies]
mockall.workspace = true
30 changes: 18 additions & 12 deletions crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use starknet_api::block::BlockNumber;
use crate::types::{
ConsensusBlock,
ConsensusContext,
ConsensusError,
NodeId,
PeeringConsensusMessage,
ProposalInit,
Expand Down Expand Up @@ -43,30 +44,33 @@ where
Self { height, context, validators, id, to_peering_sender, from_peering_receiver }
}

pub(crate) async fn run(mut self) -> BlockT {
pub(crate) async fn run(mut self) -> Result<BlockT, ConsensusError> {
// TODO(matan): In the future this logic will be encapsulated in the state machine, and SHC
// will await a signal from SHC to propose.
let proposer_id = self.context.proposer(&self.validators, self.height);
if proposer_id == self.id { self.propose().await } else { self.validate(proposer_id).await }
}

async fn propose(&mut self) -> BlockT {
async fn propose(&mut self) -> Result<BlockT, ConsensusError> {
let (content_receiver, block_receiver) = self.context.build_proposal(self.height).await;
let (fin_sender, fin_receiver) = oneshot::channel();
let init = ProposalInit { height: self.height, proposer: self.id };
self.to_peering_sender
.send(PeeringConsensusMessage::Proposal((init, content_receiver, fin_receiver)))
.await
.expect("failed to send proposal to peering");
let block = block_receiver.await.expect("failed to build block");
.await?;
let block = block_receiver.await?;
// TODO: Switch this to the Proposal signature.
fin_sender.send(block.id()).expect("failed to send block hash");
block
fin_sender.send(block.id()).map_err(|_| {
ConsensusError::Other("Failed to send block hash to Peering".to_owned())
})?;
Ok(block)
}

async fn validate(&mut self, proposer_id: NodeId) -> BlockT {
async fn validate(&mut self, proposer_id: NodeId) -> Result<BlockT, ConsensusError> {
let Some(msg) = self.from_peering_receiver.next().await else {
panic!("Peering component disconnected from SingleHeightConsensus");
return Err(ConsensusError::Other(
"Peering component disconnected from SingleHeightConsensus".to_owned(),
));
};

let (init, content_receiver, fin_receiver) = match msg {
Expand All @@ -77,10 +81,12 @@ where
assert_eq!(init.height, self.height);
assert_eq!(init.proposer, proposer_id);
let block_receiver = self.context.validate_proposal(self.height, content_receiver).await;
let block = block_receiver.await.expect("failed to build block");
let fin = fin_receiver.await.expect("failed to receive block hash");
let block = block_receiver.await?;
let fin = fin_receiver.await.map_err(|_| {
ConsensusError::Other("Failed to receiver block hash to Peering".to_owned())
})?;
// TODO Switch to signature validation.
assert_eq!(block.id(), fin);
block
Ok(block)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ async fn propose() {
let (shc, mut shc_to_peering_receiver, _) = test_fields.new_shc(BlockNumber(0), node_id).await;

// This calls to `context.proposer` and `context.build_proposal`.
let block = shc.run().await;
let block = shc.run().await.unwrap();
assert_eq!(block.id(), block_id);

// Check what was sent to peering. We don't check the content stream as that is filled by
Expand Down Expand Up @@ -136,5 +136,5 @@ async fn validate() {
fin_sender.send(block_id).unwrap();

// This calls to `context.proposer` and `context.build_proposal`.
assert_eq!(shc.run().await.id(), block_id);
assert_eq!(shc.run().await.unwrap().id(), block_id);
}
10 changes: 10 additions & 0 deletions crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,13 @@ pub(crate) enum PeeringConsensusMessage<ProposalChunkT> {
// TODO: Switch the oneshot channel to be a Signature when we add.
Proposal((ProposalInit, mpsc::Receiver<ProposalChunkT>, oneshot::Receiver<BlockHash>)),
}

#[derive(thiserror::Error, Debug)]
pub enum ConsensusError {
#[error(transparent)]
Send(#[from] mpsc::SendError),
#[error(transparent)]
Canceled(#[from] oneshot::Canceled),
#[error("{0}")]
Other(String),
}

0 comments on commit ea805e8

Please sign in to comment.