From 8f79c126f632958f76d00ca04f3d1e16f2c1f525 Mon Sep 17 00:00:00 2001 From: Matan Markind Date: Wed, 19 Jun 2024 13:58:04 +0300 Subject: [PATCH] feat(consensus): add milestone 2 state machine --- Cargo.lock | 1 + .../sequencing/papyrus_consensus/Cargo.toml | 1 + .../sequencing/papyrus_consensus/src/lib.rs | 2 + .../papyrus_consensus/src/state_machine.rs | 241 ++++++++++++++++++ .../src/state_machine_test.rs | 91 +++++++ 5 files changed, 336 insertions(+) create mode 100644 crates/sequencing/papyrus_consensus/src/state_machine.rs create mode 100644 crates/sequencing/papyrus_consensus/src/state_machine_test.rs diff --git a/Cargo.lock b/Cargo.lock index e7bddcdb95..a9759b10fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5949,6 +5949,7 @@ dependencies = [ "papyrus_storage", "starknet-types-core", "starknet_api", + "test-case", "test_utils", "thiserror", "tokio", diff --git a/crates/sequencing/papyrus_consensus/Cargo.toml b/crates/sequencing/papyrus_consensus/Cargo.toml index bf144d1d0f..4686a61fc5 100644 --- a/crates/sequencing/papyrus_consensus/Cargo.toml +++ b/crates/sequencing/papyrus_consensus/Cargo.toml @@ -23,3 +23,4 @@ mockall.workspace = true papyrus_network = { path = "../../papyrus_network", version = "0.4.0-dev.2", features = ["testing"] } papyrus_storage = { path = "../../papyrus_storage", features = ["testing"] } test_utils = { path = "../../test_utils" } +test-case.workspace = true \ No newline at end of file diff --git a/crates/sequencing/papyrus_consensus/src/lib.rs b/crates/sequencing/papyrus_consensus/src/lib.rs index b07a18eb33..209c8342e7 100644 --- a/crates/sequencing/papyrus_consensus/src/lib.rs +++ b/crates/sequencing/papyrus_consensus/src/lib.rs @@ -19,6 +19,8 @@ pub mod papyrus_consensus_context; #[allow(dead_code)] #[allow(missing_docs)] pub mod single_height_consensus; +#[allow(missing_docs)] +pub mod state_machine; #[cfg(test)] pub(crate) mod test_utils; #[allow(dead_code)] diff --git a/crates/sequencing/papyrus_consensus/src/state_machine.rs b/crates/sequencing/papyrus_consensus/src/state_machine.rs new file mode 100644 index 0000000000..b3c85f05d4 --- /dev/null +++ b/crates/sequencing/papyrus_consensus/src/state_machine.rs @@ -0,0 +1,241 @@ +#[cfg(test)] +#[path = "state_machine_test.rs"] +mod state_machine_test; + +use std::collections::{HashMap, VecDeque}; +use std::vec; + +use starknet_api::block::BlockHash; + +use crate::types::ValidatorId; + +#[derive(Debug, Clone, PartialEq)] +pub enum StateMachineEvent { + // BlockHash, Round + GetProposal(Option, u32), + // BlockHash, Round, ValidRound + Propose(BlockHash, u32), + Prevote(BlockHash, u32), + Precommit(BlockHash, u32), + // SingleHeightConsensus can figure out the relevant precommits, as the StateMachine only + // records aggregated votes. + Decision(BlockHash, u32), +} + +pub enum Step { + Propose, + Prevote, + Precommit, +} + +/// State Machine for Milestone 2. Major assumptions: +/// 1. SHC handles replays and conflicts. +/// 2. SM must handle "out of order" messages (E.g. vote arrives before proposal). +/// 3. Only valid proposals (e.g. no NIL) +/// 4. No network failures - together with 3 this means we only support round 0. +pub struct StateMachine { + round: u32, + step: Step, + id: ValidatorId, + validators: Vec, + proposals: HashMap, + // {round: {block_hash: vote_count} + prevotes: HashMap>, + precommits: HashMap>, + // When true, the state machine will wait for a GetProposal event, cacheing all other input + // events in `events_queue`. + awaiting_get_proposal: bool, + events_queue: VecDeque, + leader_fn: Box ValidatorId>, +} + +impl StateMachine { + pub fn new( + id: ValidatorId, + validators: Vec, + leader_fn: Box ValidatorId>, + ) -> Self { + Self { + round: 0, + step: Step::Propose, + id, + validators, + proposals: HashMap::new(), + prevotes: HashMap::new(), + precommits: HashMap::new(), + awaiting_get_proposal: false, + events_queue: VecDeque::new(), + leader_fn, + } + } + + pub fn start(&mut self) -> Vec { + if self.id != (self.leader_fn)(self.round) { + return Vec::new(); + } + self.awaiting_get_proposal = true; + // TODO(matan): Initiate timeout proposal which can lead to round skipping. + vec![StateMachineEvent::GetProposal(None, self.round)] + } + + pub fn handle_event(&mut self, event: StateMachineEvent) -> Vec { + // Mimic LOC 18 in the paper; the state machine doesn't handle any events until `getValue` + // completes. + if self.awaiting_get_proposal { + match event { + StateMachineEvent::GetProposal(_, round) if round == self.round => { + self.events_queue.push_front(event); + } + _ => { + self.events_queue.push_back(event); + return Vec::new(); + } + } + } else { + self.events_queue.push_back(event); + } + + // The events queue only maintains state while we are waiting for a proposal. + let events_queue = std::mem::take(&mut self.events_queue); + self.handle_enqueued_events(events_queue) + } + + fn handle_enqueued_events( + &mut self, + mut events_queue: VecDeque, + ) -> Vec { + let mut output_events = Vec::new(); + while let Some(event) = events_queue.pop_front() { + for e in self.handle_event_internal(event) { + match e { + StateMachineEvent::Propose(_, _) + | StateMachineEvent::Prevote(_, _) + | StateMachineEvent::Precommit(_, _) => { + events_queue.push_back(e.clone()); + } + _ => {} + } + output_events.push(e); + } + } + output_events + } + + fn handle_event_internal(&mut self, event: StateMachineEvent) -> Vec { + match event { + StateMachineEvent::GetProposal(block_hash, round) => { + self.handle_get_proposal(block_hash, round) + } + StateMachineEvent::Propose(block_hash, round) => { + self.handle_proposal(block_hash, round) + } + StateMachineEvent::Prevote(block_hash, round) => self.handle_prevote(block_hash, round), + StateMachineEvent::Precommit(block_hash, round) => { + self.handle_precommit(block_hash, round) + } + StateMachineEvent::Decision(_, _) => { + unimplemented!( + "If the caller knows of a decision, it can just drop the state machine." + ) + } + } + } + + // The node finishes building a proposal in response to the state machine sending out + // GetProposal. + fn handle_get_proposal( + &mut self, + block_hash: Option, + round: u32, + ) -> Vec { + if !self.awaiting_get_proposal || self.round != round { + return Vec::new(); + } + self.awaiting_get_proposal = false; + let block_hash = block_hash.expect("GetProposal event must have a block_hash"); + let mut output = vec![StateMachineEvent::Propose(block_hash, round)]; + output.append(&mut self.advance_step(Step::Prevote)); + output + } + + // A proposal from a peer (or self) node. + fn handle_proposal(&mut self, block_hash: BlockHash, round: u32) -> Vec { + let old = self.proposals.insert(round, block_hash); + assert!(old.is_none(), "SHC should handle conflicts & replays"); + let mut output = vec![StateMachineEvent::Prevote(block_hash, round)]; + output.append(&mut self.advance_step(Step::Prevote)); + output + } + + fn handle_prevote(&mut self, block_hash: BlockHash, round: u32) -> Vec { + assert_eq!(round, 0, "Only round 0 is supported in this milestone."); + let prevote_count = self.prevotes.entry(round).or_default().entry(block_hash).or_insert(0); + *prevote_count += 1; + if *prevote_count < self.quorum() { + return Vec::new(); + } + self.send_precommit(block_hash, round) + } + + fn handle_precommit(&mut self, block_hash: BlockHash, round: u32) -> Vec { + assert_eq!(round, 0, "Only round 0 is supported in this milestone."); + let precommit_count = + self.precommits.entry(round).or_default().entry(block_hash).or_insert(0); + *precommit_count += 1; + if *precommit_count < self.quorum() { + return Vec::new(); + } + vec![StateMachineEvent::Decision(block_hash, round)] + } + + fn advance_step(&mut self, step: Step) -> Vec { + self.step = step; + // Check for an existing quorum in case messages arrived out of order. + match self.step { + Step::Propose => { + unimplemented!("Handled by `advance_round`") + } + Step::Prevote => self.check_prevote_quorum(self.round), + Step::Precommit => self.check_precommit_quorum(self.round), + } + } + + fn check_prevote_quorum(&mut self, round: u32) -> Vec { + let Some((block_hash, count)) = leading_vote(&self.prevotes, round) else { + return Vec::new(); + }; + if *count < self.quorum() { + return Vec::new(); + } + self.send_precommit(*block_hash, round) + } + + fn check_precommit_quorum(&mut self, round: u32) -> Vec { + let Some((block_hash, count)) = leading_vote(&self.precommits, round) else { + return Vec::new(); + }; + if *count < self.quorum() { + return Vec::new(); + } + vec![StateMachineEvent::Decision(*block_hash, round)] + } + + fn send_precommit(&mut self, block_hash: BlockHash, round: u32) -> Vec { + let mut output = vec![StateMachineEvent::Precommit(block_hash, round)]; + output.append(&mut self.advance_step(Step::Precommit)); + output + } + + fn quorum(&self) -> u32 { + let q = (2 * self.validators.len() / 3) + 1; + q as u32 + } +} + +fn leading_vote( + votes: &HashMap>, + round: u32, +) -> Option<(&BlockHash, &u32)> { + // We don't care which value is chosen in the case of a tie, since consensus requires 2/3+1. + votes.get(&round)?.iter().max_by(|a, b| a.1.cmp(b.1)) +} diff --git a/crates/sequencing/papyrus_consensus/src/state_machine_test.rs b/crates/sequencing/papyrus_consensus/src/state_machine_test.rs new file mode 100644 index 0000000000..f4d66edfc9 --- /dev/null +++ b/crates/sequencing/papyrus_consensus/src/state_machine_test.rs @@ -0,0 +1,91 @@ +use starknet_api::block::BlockHash; +use starknet_types_core::felt::Felt; +use test_case::test_case; + +use crate::state_machine::{StateMachine, StateMachineEvent}; + +fn create_state_machine(is_proposer: bool) -> StateMachine { + StateMachine::new( + if is_proposer { 1_u32.into() } else { 2_u32.into() }, + vec![1_u32.into(), 2_u32.into(), 3_u32.into(), 4_u32.into()], + Box::new(|_| 1_u32.into()), + ) +} + +#[test_case(true; "proposer")] +#[test_case(false; "validator")] +fn in_order(is_proposer: bool) { + let mut sm = create_state_machine(is_proposer); + + let mut events = sm.start(); + if is_proposer { + assert_eq!(events.remove(0), StateMachineEvent::GetProposal(None, 0)); + assert!(events.is_empty()); + + events = sm.handle_event(StateMachineEvent::GetProposal(Some(BlockHash(Felt::ONE)), 0)); + assert_eq!(events.remove(0), StateMachineEvent::Propose(BlockHash(Felt::ONE), 0)); + } else { + assert!(events.is_empty()); + events = sm.handle_event(StateMachineEvent::Propose(BlockHash(Felt::ONE), 0)); + } + assert_eq!(events.remove(0), StateMachineEvent::Prevote(BlockHash(Felt::ONE), 0)); + assert!(events.is_empty()); + + events = sm.handle_event(StateMachineEvent::Prevote(BlockHash(Felt::ONE), 0)); + assert!(events.is_empty()); + + events = sm.handle_event(StateMachineEvent::Prevote(BlockHash(Felt::ONE), 0)); + assert_eq!(events.remove(0), StateMachineEvent::Precommit(BlockHash(Felt::ONE), 0)); + assert!(events.is_empty()); + + events = sm.handle_event(StateMachineEvent::Precommit(BlockHash(Felt::ONE), 0)); + assert!(events.is_empty()); + + events = sm.handle_event(StateMachineEvent::Prevote(BlockHash(Felt::ONE), 0)); + assert_eq!(events.remove(0), StateMachineEvent::Precommit(BlockHash(Felt::ONE), 0)); + assert_eq!(events.remove(0), StateMachineEvent::Decision(BlockHash(Felt::ONE), 0)); + assert!(events.is_empty()); +} + +#[test] +fn validator_receives_votes_first() { + let mut sm = create_state_machine(false); + + let mut events = sm.start(); + assert!(events.is_empty()); + + // Send votes first. + events.append(&mut sm.handle_event(StateMachineEvent::Prevote(BlockHash(Felt::ONE), 0))); + events.append(&mut sm.handle_event(StateMachineEvent::Prevote(BlockHash(Felt::ONE), 0))); + events.append(&mut sm.handle_event(StateMachineEvent::Precommit(BlockHash(Felt::ONE), 0))); + events.append(&mut sm.handle_event(StateMachineEvent::Precommit(BlockHash(Felt::ONE), 0))); + assert!(events.is_empty()); + + // Finally the proposal arrives. + events = sm.handle_event(StateMachineEvent::Propose(BlockHash(Felt::ONE), 0)); + assert_eq!(events.remove(0), StateMachineEvent::Prevote(BlockHash(Felt::ONE), 0)); + assert_eq!(events.remove(0), StateMachineEvent::Precommit(BlockHash(Felt::ONE), 0)); + assert_eq!(events.remove(0), StateMachineEvent::Decision(BlockHash(Felt::ONE), 0)); + assert!(events.is_empty()); +} + +#[test] +fn cache_events_during_get_proposal() { + let mut sm = create_state_machine(true); + let mut events = sm.start(); + assert_eq!(events.remove(0), StateMachineEvent::GetProposal(None, 0)); + assert!(events.is_empty()); + + // TODO(matan): When we support NIL votes, we should send them. Real votes without the proposal + // doesn't make sense. + events.append(&mut sm.handle_event(StateMachineEvent::Prevote(BlockHash(Felt::ONE), 0))); + events.append(&mut sm.handle_event(StateMachineEvent::Prevote(BlockHash(Felt::ONE), 0))); + assert!(events.is_empty()); + + // Node finishes building the proposal. + events = sm.handle_event(StateMachineEvent::GetProposal(Some(BlockHash(Felt::ONE)), 0)); + assert_eq!(events.remove(0), StateMachineEvent::Propose(BlockHash(Felt::ONE), 0)); + assert_eq!(events.remove(0), StateMachineEvent::Prevote(BlockHash(Felt::ONE), 0)); + assert_eq!(events.remove(0), StateMachineEvent::Precommit(BlockHash(Felt::ONE), 0)); + assert!(events.is_empty()); +}