From 973590408bf72a5e19a4bd1cc309937ba664e9f0 Mon Sep 17 00:00:00 2001 From: pompon0 Date: Thu, 26 Oct 2023 18:31:32 +0200 Subject: [PATCH] feat: implemented BlockHeader (BFT-357) (#12) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implemented https://www.notion.so/matterlabs/5d238e08fbed476b8550f697d9c39df6?v=51a4bffaba05499e8ca11bd625fff54a&p=16c93154366c4a3994457b44043dea71&pm=s with some differences. Summary of changes: * Replaced Block with BlockHeader with just a hash of the payload * Made consensus messages include the whole BlockHeader, rather than just hash (since it is small, once we moved the payload out) * Introduced a strongly typed Payload * Added protocol_version to the BlockHeader (and relevant checks to the consensus) - currently hardcoded to 0 Minor fixes: * split the leader::Error and replica::Error into per-method errors (since the error types returned by each are disjoint) * relaxed the "stringly" typed assertions from sync_block tests (if needed they should use strongly typed errors instead) * added "size" to the protobuf encoding of BitVec, so that the encoded BitVec value is context free --------- Co-authored-by: Bruno França --- .github/workflows/protobuf_conformance.yaml | 6 +- node/Cargo.lock | 1 + node/Cranky.toml | 1 + node/actors/consensus/src/leader/error.rs | 55 ----- node/actors/consensus/src/leader/mod.rs | 1 - .../consensus/src/leader/replica_commit.rs | 50 ++++- .../consensus/src/leader/replica_prepare.rs | 119 +++++----- .../consensus/src/leader/state_machine.rs | 31 ++- node/actors/consensus/src/leader/tests.rs | 26 +-- node/actors/consensus/src/lib.rs | 8 + node/actors/consensus/src/replica/block.rs | 34 +-- node/actors/consensus/src/replica/error.rs | 70 ------ .../consensus/src/replica/leader_commit.rs | 37 +++- .../consensus/src/replica/leader_prepare.rs | 209 ++++++++++-------- node/actors/consensus/src/replica/mod.rs | 1 - node/actors/consensus/src/replica/new_view.rs | 10 +- .../consensus/src/replica/state_machine.rs | 95 +++++--- node/actors/consensus/src/replica/tests.rs | 3 +- node/actors/consensus/src/testonly/fuzz.rs | 48 ++-- node/actors/consensus/src/testonly/make.rs | 11 +- node/actors/consensus/src/testonly/run.rs | 12 +- node/actors/executor/src/lib/io.rs | 2 +- node/actors/executor/src/main.rs | 2 +- node/actors/network/src/gossip/tests.rs | 7 +- node/actors/network/src/io.rs | 9 +- node/actors/network/src/testonly.rs | 2 +- node/actors/sync_blocks/src/peers/mod.rs | 12 +- node/actors/sync_blocks/src/peers/tests.rs | 71 ++---- node/actors/sync_blocks/src/tests/mod.rs | 64 ++---- node/libs/roles/src/validator/conv.rs | 145 ++++++------ .../roles/src/validator/messages/block.rs | 85 +++++-- .../roles/src/validator/messages/consensus.rs | 98 ++++---- node/libs/roles/src/validator/mod.rs | 3 +- node/libs/roles/src/validator/testonly.rs | 110 ++++++--- node/libs/roles/src/validator/tests.rs | 39 ++-- node/libs/schema/Cargo.toml | 1 + node/libs/schema/proto/roles/validator.proto | 90 ++++---- node/libs/schema/proto/std.proto | 16 ++ node/libs/schema/proto/storage.proto | 7 +- node/libs/schema/src/std_conv.rs | 21 ++ node/libs/storage/src/lib.rs | 2 +- node/libs/storage/src/rocksdb.rs | 8 +- node/libs/storage/src/testonly.rs | 34 +-- node/libs/storage/src/tests.rs | 49 +--- node/libs/storage/src/types.rs | 75 ++++--- node/tools/src/bin/localnet_config.rs | 3 +- 46 files changed, 933 insertions(+), 850 deletions(-) delete mode 100644 node/actors/consensus/src/leader/error.rs delete mode 100644 node/actors/consensus/src/replica/error.rs diff --git a/.github/workflows/protobuf_conformance.yaml b/.github/workflows/protobuf_conformance.yaml index d6a29b5d..6027b84f 100644 --- a/.github/workflows/protobuf_conformance.yaml +++ b/.github/workflows/protobuf_conformance.yaml @@ -18,13 +18,13 @@ jobs: conformance: runs-on: [ubuntu-22.04-github-hosted-16core] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: path: "this" - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: repository: "protocolbuffers/protobuf" - ref: "main" + ref: "v24.4" path: "protobuf" - uses: mozilla-actions/sccache-action@v0.0.3 - name: build test diff --git a/node/Cargo.lock b/node/Cargo.lock index 945ac86c..f81a3c58 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -1700,6 +1700,7 @@ name = "schema" version = "0.1.0" dependencies = [ "anyhow", + "bit-vec", "concurrency", "once_cell", "prettyplease 0.2.12", diff --git a/node/Cranky.toml b/node/Cranky.toml index ffe11cbe..fdc9a236 100644 --- a/node/Cranky.toml +++ b/node/Cranky.toml @@ -41,4 +41,5 @@ warn = [ allow = [ # Produces too many false positives. "clippy::redundant_locals", + "clippy::needless_pass_by_ref_mut", ] diff --git a/node/actors/consensus/src/leader/error.rs b/node/actors/consensus/src/leader/error.rs deleted file mode 100644 index 32b05713..00000000 --- a/node/actors/consensus/src/leader/error.rs +++ /dev/null @@ -1,55 +0,0 @@ -use roles::validator; -use thiserror::Error; - -#[derive(Error, Debug)] -#[allow(clippy::missing_docs_in_private_items)] -pub(crate) enum Error { - #[error("received replica commit message for a missing proposal")] - ReplicaCommitMissingProposal, - #[error("received replica commit message for a past view/phase (current view: {current_view:?}, current phase: {current_phase:?})")] - ReplicaCommitOld { - current_view: validator::ViewNumber, - current_phase: validator::Phase, - }, - #[error("received replica prepare message for a past view/phase (current view: {current_view:?}, current phase: {current_phase:?})")] - ReplicaPrepareOld { - current_view: validator::ViewNumber, - current_phase: validator::Phase, - }, - #[error("received replica commit message for a view when we are not a leader")] - ReplicaCommitWhenNotLeaderInView, - #[error("received replica prepare message for a view when we are not a leader")] - ReplicaPrepareWhenNotLeaderInView, - #[error("received replica commit message that already exists (existing message: {existing_message:?}")] - ReplicaCommitExists { existing_message: String }, - #[error("received replica prepare message that already exists (existing message: {existing_message:?}")] - ReplicaPrepareExists { existing_message: String }, - #[error("received replica commit message while number of received messages is below threshold. waiting for more (received: {num_messages:?}, threshold: {threshold:?}")] - ReplicaCommitNumReceivedBelowThreshold { - num_messages: usize, - threshold: usize, - }, - #[error("received replica prepare message while number of received messages below threshold. waiting for more (received: {num_messages:?}, threshold: {threshold:?}")] - ReplicaPrepareNumReceivedBelowThreshold { - num_messages: usize, - threshold: usize, - }, - #[error("received replica prepare message with high QC of a future view (high QC view: {high_qc_view:?}, current view: {current_view:?}")] - ReplicaPrepareHighQCOfFutureView { - high_qc_view: validator::ViewNumber, - current_view: validator::ViewNumber, - }, - #[error("received replica commit message with invalid signature")] - ReplicaCommitInvalidSignature(#[source] crypto::bls12_381::Error), - #[error("received replica prepare message with invalid signature")] - ReplicaPrepareInvalidSignature(#[source] crypto::bls12_381::Error), - #[error("received replica prepare message with invalid high QC")] - ReplicaPrepareInvalidHighQC(#[source] anyhow::Error), -} - -/// Needed due to source errors. -impl PartialEq for Error { - fn eq(&self, other: &Self) -> bool { - self.to_string() == other.to_string() - } -} diff --git a/node/actors/consensus/src/leader/mod.rs b/node/actors/consensus/src/leader/mod.rs index 8c4d99af..55a609a6 100644 --- a/node/actors/consensus/src/leader/mod.rs +++ b/node/actors/consensus/src/leader/mod.rs @@ -2,7 +2,6 @@ //! and aggregates replica messages. It mainly acts as a central point of communication for the replicas. Note that //! our consensus node will perform both the replica and leader roles simultaneously. -mod error; mod replica_commit; mod replica_prepare; mod state_machine; diff --git a/node/actors/consensus/src/leader/replica_commit.rs b/node/actors/consensus/src/leader/replica_commit.rs index 9a1634d8..82ab32b2 100644 --- a/node/actors/consensus/src/leader/replica_commit.rs +++ b/node/actors/consensus/src/leader/replica_commit.rs @@ -1,10 +1,35 @@ use super::StateMachine; -use crate::{inner::ConsensusInner, leader::error::Error, metrics}; +use crate::{inner::ConsensusInner, metrics}; use concurrency::{ctx, metrics::LatencyHistogramExt as _}; use network::io::{ConsensusInputMessage, Target}; use roles::validator; use tracing::instrument; +#[derive(thiserror::Error, Debug)] +#[allow(clippy::missing_docs_in_private_items)] +pub(crate) enum Error { + #[error("unexpected proposal")] + UnexpectedProposal, + #[error("past view/phase (current view: {current_view:?}, current phase: {current_phase:?})")] + Old { + current_view: validator::ViewNumber, + current_phase: validator::Phase, + }, + #[error("we are not a leader for this message's view")] + NotLeaderInView, + #[error("duplicate message from a replica (existing message: {existing_message:?}")] + DuplicateMessage { + existing_message: validator::ReplicaCommit, + }, + #[error("number of received messages is below threshold. waiting for more (received: {num_messages:?}, threshold: {threshold:?}")] + NumReceivedBelowThreshold { + num_messages: usize, + threshold: usize, + }, + #[error("invalid signature: {0:#}")] + InvalidSignature(#[source] crypto::bls12_381::Error), +} + impl StateMachine { #[instrument(level = "trace", ret)] pub(crate) fn process_replica_commit( @@ -21,7 +46,7 @@ impl StateMachine { // If the message is from the "past", we discard it. if (message.view, validator::Phase::Commit) < (self.view, self.phase) { - return Err(Error::ReplicaCommitOld { + return Err(Error::Old { current_view: self.view, current_phase: self.phase, }); @@ -29,7 +54,7 @@ impl StateMachine { // If the message is for a view when we are not a leader, we discard it. if consensus.view_leader(message.view) != consensus.secret_key.public() { - return Err(Error::ReplicaCommitWhenNotLeaderInView); + return Err(Error::NotLeaderInView); } // If we already have a message from the same validator and for the same view, we discard it. @@ -38,24 +63,22 @@ impl StateMachine { .get(&message.view) .and_then(|x| x.get(author)) { - return Err(Error::ReplicaCommitExists { - existing_message: format!("{:?}", existing_message), + return Err(Error::DuplicateMessage { + existing_message: existing_message.msg, }); } // ----------- Checking the signed part of the message -------------- // Check the signature on the message. - signed_message - .verify() - .map_err(Error::ReplicaCommitInvalidSignature)?; + signed_message.verify().map_err(Error::InvalidSignature)?; // ----------- Checking the contents of the message -------------- // We only accept replica commit messages for proposals that we have cached. That's so // we don't need to store replica commit messages for different proposals. - if self.block_proposal_cache != Some(message) { - return Err(Error::ReplicaCommitMissingProposal); + if self.block_proposal_cache != Some(message.proposal) { + return Err(Error::UnexpectedProposal); } // ----------- All checks finished. Now we process the message. -------------- @@ -70,7 +93,7 @@ impl StateMachine { let num_messages = self.commit_message_cache.get(&message.view).unwrap().len(); if num_messages < consensus.threshold() { - return Err(Error::ReplicaCommitNumReceivedBelowThreshold { + return Err(Error::NumReceivedBelowThreshold { num_messages, threshold: consensus.threshold(), }); @@ -110,7 +133,10 @@ impl StateMachine { message: consensus .secret_key .sign_msg(validator::ConsensusMsg::LeaderCommit( - validator::LeaderCommit { justification }, + validator::LeaderCommit { + protocol_version: validator::CURRENT_VERSION, + justification, + }, )), recipient: Target::Broadcast, }; diff --git a/node/actors/consensus/src/leader/replica_prepare.rs b/node/actors/consensus/src/leader/replica_prepare.rs index 5a55be53..800bd331 100644 --- a/node/actors/consensus/src/leader/replica_prepare.rs +++ b/node/actors/consensus/src/leader/replica_prepare.rs @@ -1,5 +1,5 @@ use super::StateMachine; -use crate::{inner::ConsensusInner, leader::error::Error, metrics}; +use crate::{inner::ConsensusInner, metrics}; use concurrency::ctx; use network::io::{ConsensusInputMessage, Target}; use rand::Rng; @@ -7,6 +7,38 @@ use roles::validator; use std::collections::HashMap; use tracing::instrument; +#[derive(thiserror::Error, Debug)] +#[allow(clippy::missing_docs_in_private_items)] +pub(crate) enum Error { + #[error("past view/phase (current view: {current_view:?}, current phase: {current_phase:?})")] + Old { + current_view: validator::ViewNumber, + current_phase: validator::Phase, + }, + #[error("we are not a leader for this message's view")] + NotLeaderInView, + #[error("duplicate message from a replica (existing message: {existing_message:?}")] + Exists { + existing_message: validator::ReplicaPrepare, + }, + #[error("number of received messages below threshold. waiting for more (received: {num_messages:?}, threshold: {threshold:?}")] + NumReceivedBelowThreshold { + num_messages: usize, + threshold: usize, + }, + #[error( + "high QC of a future view (high QC view: {high_qc_view:?}, current view: {current_view:?}" + )] + HighQCOfFutureView { + high_qc_view: validator::ViewNumber, + current_view: validator::ViewNumber, + }, + #[error("invalid signature: {0:#}")] + InvalidSignature(#[source] crypto::bls12_381::Error), + #[error("invalid high QC: {0:#}")] + InvalidHighQC(#[source] anyhow::Error), +} + impl StateMachine { #[instrument(level = "trace", ret)] pub(crate) fn process_replica_prepare( @@ -23,7 +55,7 @@ impl StateMachine { // If the message is from the "past", we discard it. if (message.view, validator::Phase::Prepare) < (self.view, self.phase) { - return Err(Error::ReplicaPrepareOld { + return Err(Error::Old { current_view: self.view, current_phase: self.phase, }); @@ -31,7 +63,7 @@ impl StateMachine { // If the message is for a view when we are not a leader, we discard it. if consensus.view_leader(message.view) != consensus.secret_key.public() { - return Err(Error::ReplicaPrepareWhenNotLeaderInView); + return Err(Error::NotLeaderInView); } // If we already have a message from the same validator and for the same view, we discard it. @@ -40,17 +72,15 @@ impl StateMachine { .get(&message.view) .and_then(|x| x.get(author)) { - return Err(Error::ReplicaPrepareExists { - existing_message: format!("{:?}", existing_message), + return Err(Error::Exists { + existing_message: existing_message.msg.clone(), }); } // ----------- Checking the signed part of the message -------------- // Check the signature on the message. - signed_message - .verify() - .map_err(Error::ReplicaPrepareInvalidSignature)?; + signed_message.verify().map_err(Error::InvalidSignature)?; // ----------- Checking the contents of the message -------------- @@ -58,13 +88,13 @@ impl StateMachine { message .high_qc .verify(&consensus.validator_set, consensus.threshold()) - .map_err(Error::ReplicaPrepareInvalidHighQC)?; + .map_err(Error::InvalidHighQC)?; // If the high QC is for a future view, we discard the message. // This check is not necessary for correctness, but it's useful to // guarantee that our proposals don't contain QCs from the future. if message.high_qc.message.view >= message.view { - return Err(Error::ReplicaPrepareHighQCOfFutureView { + return Err(Error::HighQCOfFutureView { high_qc_view: message.high_qc.message.view, current_view: message.view, }); @@ -82,7 +112,7 @@ impl StateMachine { let num_messages = self.prepare_message_cache.get(&message.view).unwrap().len(); if num_messages < consensus.threshold() { - return Err(Error::ReplicaPrepareNumReceivedBelowThreshold { + return Err(Error::NumReceivedBelowThreshold { num_messages, threshold: consensus.threshold(), }); @@ -93,13 +123,12 @@ impl StateMachine { // Get all the replica prepare messages for this view. Note that we consume the // messages here. That's purposeful, so that we don't create a new block proposal // for this same view if we receive another replica prepare message after this. - let replica_messages = self + let replica_messages: Vec<_> = self .prepare_message_cache .remove(&message.view) .unwrap() - .values() - .cloned() - .collect::>(); + .into_values() + .collect(); debug_assert!(num_messages == consensus.threshold()); @@ -107,13 +136,11 @@ impl StateMachine { // in this situation, we require 2*f+1 votes, where f is the maximum number of faulty replicas. let mut count: HashMap<_, usize> = HashMap::new(); - for vote in replica_messages.iter().map(|s| &s.msg.high_vote) { - *count - .entry((vote.proposal_block_number, vote.proposal_block_hash)) - .or_default() += 1; + for vote in replica_messages.iter() { + *count.entry(vote.msg.high_vote.proposal).or_default() += 1; } - let highest_vote = count + let highest_vote: Option = count .iter() // We only take one value from the iterator because there can only be at most one block with a quorum of 2f+1 votes. .find(|(_, v)| **v > 2 * consensus.faulty_replicas()) @@ -121,7 +148,7 @@ impl StateMachine { .cloned(); // Get the highest CommitQC. - let highest_qc = &replica_messages + let highest_qc: &validator::CommitQC = replica_messages .iter() .map(|s| &s.msg.high_qc) .max_by_key(|qc| qc.message.view) @@ -129,50 +156,25 @@ impl StateMachine { // Create the block proposal to send to the replicas, // and the commit vote to store in our block proposal cache. - let (proposal, commit_vote) = match highest_vote { + let (proposal, payload) = match highest_vote { // The previous block was not finalized, so we need to propose it again. - // For this we only need the hash, since we are guaranteed that at least + // For this we only need the header, since we are guaranteed that at least // f+1 honest replicas have the block can broadcast when finalized // (2f+1 have stated that they voted for the block, at most f are malicious). - Some((block_number, block_hash)) - if block_number != highest_qc.message.proposal_block_number - || block_hash != highest_qc.message.proposal_block_hash => - { - let vote = validator::ReplicaCommit { - view: message.view, - proposal_block_hash: block_hash, - proposal_block_number: block_number, - }; - - let proposal = validator::Proposal::Retry(vote); - - (proposal, vote) - } + Some(proposal) if proposal != highest_qc.message.proposal => (proposal, None), // The previous block was finalized, so we can propose a new block. _ => { // TODO(bruno): For now we just create a block with a random payload. After we integrate with // the execution layer we should have a call here to the mempool to get a real payload. - let mut payload = vec![0; ConsensusInner::PAYLOAD_MAX_SIZE]; - ctx.rng().fill(&mut payload[..]); - - let block = validator::Block { - parent: highest_qc.message.proposal_block_hash, - number: highest_qc.message.proposal_block_number.next(), - payload, - }; + let mut payload = validator::Payload(vec![0; ConsensusInner::PAYLOAD_MAX_SIZE]); + ctx.rng().fill(&mut payload.0[..]); + metrics::METRICS .leader_proposal_payload_size - .observe(block.payload.len()); - - let vote = validator::ReplicaCommit { - view: message.view, - proposal_block_hash: block.hash(), - proposal_block_number: block.number, - }; - - let proposal = validator::Proposal::New(block); - - (proposal, vote) + .observe(payload.0.len()); + let proposal = + validator::BlockHeader::new(&highest_qc.message.proposal, payload.hash()); + (proposal, Some(payload)) } }; @@ -181,7 +183,7 @@ impl StateMachine { self.view = message.view; self.phase = validator::Phase::Commit; self.phase_start = ctx.now(); - self.block_proposal_cache = Some(commit_vote); + self.block_proposal_cache = Some(proposal); // ----------- Prepare our message and send it -------------- @@ -195,7 +197,10 @@ impl StateMachine { .secret_key .sign_msg(validator::ConsensusMsg::LeaderPrepare( validator::LeaderPrepare { + protocol_version: validator::CURRENT_VERSION, + view: self.view, proposal, + proposal_payload: payload, justification, }, )), diff --git a/node/actors/consensus/src/leader/state_machine.rs b/node/actors/consensus/src/leader/state_machine.rs index 8a3b081e..a92ed26f 100644 --- a/node/actors/consensus/src/leader/state_machine.rs +++ b/node/actors/consensus/src/leader/state_machine.rs @@ -21,7 +21,7 @@ pub(crate) struct StateMachine { /// Time when the current phase has started. pub(crate) phase_start: time::Instant, /// A cache of our latest block proposal. We use it to determine if we accept a replica commit message. - pub(crate) block_proposal_cache: Option, + pub(crate) block_proposal_cache: Option, /// A cache of replica prepare messages indexed by view number and validator. pub(crate) prepare_message_cache: BTreeMap< validator::ViewNumber, @@ -59,18 +59,25 @@ impl StateMachine { input: validator::Signed, ) { let now = ctx.now(); - let (label, result) = match &input.msg { - validator::ConsensusMsg::ReplicaPrepare(_) => ( - metrics::ConsensusMsgLabel::ReplicaPrepare, - self.process_replica_prepare(ctx, consensus, input.cast().unwrap()), - ), - validator::ConsensusMsg::ReplicaCommit(_) => ( - metrics::ConsensusMsgLabel::ReplicaCommit, - self.process_replica_commit(ctx, consensus, input.cast().unwrap()), - ), + let label = match &input.msg { + validator::ConsensusMsg::ReplicaPrepare(_) => { + let res = self + .process_replica_prepare(ctx, consensus, input.cast().unwrap()) + .map_err(|err| { + tracing::warn!("process_replica_prepare: {err:#}"); + }); + metrics::ConsensusMsgLabel::ReplicaPrepare.with_result(&res) + } + validator::ConsensusMsg::ReplicaCommit(_) => { + let res = self + .process_replica_commit(ctx, consensus, input.cast().unwrap()) + .map_err(|err| { + tracing::warn!("process_replica_commit: {err:#}"); + }); + metrics::ConsensusMsgLabel::ReplicaCommit.with_result(&res) + } _ => unreachable!(), }; - metrics::METRICS.leader_processing_latency[&label.with_result(&result)] - .observe_latency(ctx.now() - now); + metrics::METRICS.leader_processing_latency[&label].observe_latency(ctx.now() - now); } } diff --git a/node/actors/consensus/src/leader/tests.rs b/node/actors/consensus/src/leader/tests.rs index 51fe66ee..2010b08f 100644 --- a/node/actors/consensus/src/leader/tests.rs +++ b/node/actors/consensus/src/leader/tests.rs @@ -1,4 +1,4 @@ -use crate::{leader::error::Error, testonly}; +use crate::testonly; use concurrency::ctx; use rand::{rngs::StdRng, Rng, SeedableRng}; use roles::validator; @@ -10,11 +10,9 @@ async fn replica_commit() { let rng = &mut StdRng::seed_from_u64(6516565651); let keys: Vec<_> = (0..1).map(|_| rng.gen()).collect(); - let (genesis, val_set) = testonly::make_genesis(&keys, vec![]); + let (genesis, val_set) = testonly::make_genesis(&keys, validator::Payload(vec![])); let (mut consensus, _) = testonly::make_consensus(ctx, &keys[0], &val_set, &genesis).await; - let proposal_block_hash = rng.gen(); - consensus.leader.view = validator::ViewNumber(3); consensus.leader.phase = validator::Phase::Commit; @@ -24,18 +22,18 @@ async fn replica_commit() { .secret_key .sign_msg(validator::ConsensusMsg::ReplicaCommit( validator::ReplicaCommit { + protocol_version: validator::CURRENT_VERSION, view: consensus.leader.view, - proposal_block_hash, - proposal_block_number: validator::BlockNumber(42), + proposal: rng.gen(), }, )); - assert_eq!( - consensus.leader.process_replica_commit( - ctx, - &consensus.inner, - test_replica_msg.cast().unwrap() - ), - Err(Error::ReplicaCommitMissingProposal) - ); + match consensus.leader.process_replica_commit( + ctx, + &consensus.inner, + test_replica_msg.cast().unwrap(), + ) { + Err(super::replica_commit::Error::UnexpectedProposal) => {} + res => panic!("unexpected result {res:?}"), + } } diff --git a/node/actors/consensus/src/lib.rs b/node/actors/consensus/src/lib.rs index 0857233e..6d278e05 100644 --- a/node/actors/consensus/src/lib.rs +++ b/node/actors/consensus/src/lib.rs @@ -99,6 +99,14 @@ impl Consensus { match input { Some(InputMessage::Network(req)) => { + if req.msg.msg.protocol_version() != validator::CURRENT_VERSION { + tracing::warn!( + "bad protocol version (expected: {:?}, received: {:?})", + validator::CURRENT_VERSION, + req.msg.msg.protocol_version() + ); + continue; + } match &req.msg.msg { validator::ConsensusMsg::ReplicaPrepare(_) | validator::ConsensusMsg::ReplicaCommit(_) => { diff --git a/node/actors/consensus/src/replica/block.rs b/node/actors/consensus/src/replica/block.rs index 1526d5c4..5591bf4b 100644 --- a/node/actors/consensus/src/replica/block.rs +++ b/node/actors/consensus/src/replica/block.rs @@ -16,24 +16,26 @@ impl StateMachine { // TODO(gprusak): for availability of finalized blocks, // replicas should be able to broadcast highest quorums without // the corresponding block (same goes for synchronization). - if let Some(block) = self + let Some(cache) = self .block_proposal_cache - .get(&commit_qc.message.proposal_block_number) - .and_then(|m| m.get(&commit_qc.message.proposal_block_hash)) - { - let final_block = validator::FinalBlock { - block: block.clone(), - justification: commit_qc.clone(), - }; + .get(&commit_qc.message.proposal.number) + else { + return; + }; + let Some(payload) = cache.get(&commit_qc.message.proposal.payload) else { + return; + }; + let block = validator::FinalBlock { + header: commit_qc.message.proposal, + payload: payload.clone(), + justification: commit_qc.clone(), + }; - info!( - "Finalized a block!\nFinal block: {:#?}", - final_block.block.hash() - ); + info!( + "Finalized a block!\nFinal block: {:#?}", + block.header.hash() + ); - consensus - .pipe - .send(OutputMessage::FinalizedBlock(final_block)); - } + consensus.pipe.send(OutputMessage::FinalizedBlock(block)); } } diff --git a/node/actors/consensus/src/replica/error.rs b/node/actors/consensus/src/replica/error.rs deleted file mode 100644 index 5d6c57c6..00000000 --- a/node/actors/consensus/src/replica/error.rs +++ /dev/null @@ -1,70 +0,0 @@ -use crate::validator; -use roles::validator::BlockHash; -use thiserror::Error; - -#[derive(Error, Debug)] -#[allow(clippy::missing_docs_in_private_items)] -pub(crate) enum Error { - #[error("received leader commit message with invalid leader (correct leader: {correct_leader:?}, received leader: {received_leader:?})]")] - LeaderCommitInvalidLeader { - correct_leader: validator::PublicKey, - received_leader: validator::PublicKey, - }, - #[error("received leader prepare message with invalid leader (correct leader: {correct_leader:?}, received leader: {received_leader:?}])")] - LeaderPrepareInvalidLeader { - correct_leader: validator::PublicKey, - received_leader: validator::PublicKey, - }, - #[error("received leader commit message for a past view/phase (current view: {current_view:?}, current phase: {current_phase:?})")] - LeaderCommitOld { - current_view: validator::ViewNumber, - current_phase: validator::Phase, - }, - #[error("received leader commit message for a past view/phase (current view: {current_view:?}, current phase: {current_phase:?})")] - LeaderPrepareOld { - current_view: validator::ViewNumber, - current_phase: validator::Phase, - }, - #[error("received leader commit message with invalid signature")] - LeaderCommitInvalidSignature(#[source] crypto::bls12_381::Error), - #[error("received leader prepare message with invalid signature")] - LeaderPrepareInvalidSignature(#[source] crypto::bls12_381::Error), - #[error("received leader commit message with invalid justification")] - LeaderCommitInvalidJustification(#[source] anyhow::Error), - #[error("received leader prepare message with empty map in the justification")] - LeaderPrepareJustificationWithEmptyMap, - #[error("received leader prepare message with invalid PrepareQC")] - LeaderPrepareInvalidPrepareQC(#[source] anyhow::Error), - #[error("received leader prepare message with invalid high QC")] - LeaderPrepareInvalidHighQC(#[source] anyhow::Error), - #[error("received leader prepare message with high QC of a future view (high QC view: {high_qc_view:?}, current view: {current_view:?}")] - LeaderPrepareHighQCOfFutureView { - high_qc_view: validator::ViewNumber, - current_view: validator::ViewNumber, - }, - #[error("received leader prepare message with new block proposal when the previous proposal was not finalized")] - LeaderPrepareProposalWhenPreviousNotFinalized, - #[error("received leader prepare message with new block proposal with invalid parent hash (correct parent hash: {correct_parent_hash:#?}, received parent hash: {received_parent_hash:#?}, block: {block:?})")] - LeaderPrepareProposalInvalidParentHash { - correct_parent_hash: BlockHash, - received_parent_hash: BlockHash, - block: validator::Block, - }, - #[error("received leader prepare message with block proposal with non-sequential number (correct proposal number: {correct_number}, received proposal number: {received_number}, block: {block:?})")] - LeaderPrepareProposalNonSequentialNumber { - correct_number: u64, - received_number: u64, - block: validator::Block, - }, - #[error("received leader prepare message with block proposal with an oversized payload (payload size: {payload_size}, block: {block:?}")] - LeaderPrepareProposalOversizedPayload { - payload_size: usize, - block: validator::Block, - }, - #[error("received leader prepare message with block re-proposal when the previous proposal was finalized")] - LeaderPrepareReproposalWhenFinalized, - #[error("received leader prepare message with block re-proposal of invalid block")] - LeaderPrepareReproposalInvalidBlock, - #[error("failed saving replica state to DB: {_0}")] - ReplicaStateSave(#[source] anyhow::Error), -} diff --git a/node/actors/consensus/src/replica/leader_commit.rs b/node/actors/consensus/src/replica/leader_commit.rs index 932a8e29..32cf70ce 100644 --- a/node/actors/consensus/src/replica/leader_commit.rs +++ b/node/actors/consensus/src/replica/leader_commit.rs @@ -1,9 +1,31 @@ use super::StateMachine; -use crate::{inner::ConsensusInner, replica::error::Error}; +use crate::inner::ConsensusInner; +use anyhow::Context as _; use concurrency::ctx; use roles::validator; use tracing::instrument; +#[derive(thiserror::Error, Debug)] +#[allow(clippy::missing_docs_in_private_items)] +pub(crate) enum Error { + #[error("invalid leader (correct leader: {correct_leader:?}, received leader: {received_leader:?})]")] + InvalidLeader { + correct_leader: validator::PublicKey, + received_leader: validator::PublicKey, + }, + #[error("past view/phase (current view: {current_view:?}, current phase: {current_phase:?})")] + Old { + current_view: validator::ViewNumber, + current_phase: validator::Phase, + }, + #[error("invalid signature: {0:#}")] + InvalidSignature(#[source] crypto::bls12_381::Error), + #[error("invalid justification: {0:#}")] + InvalidJustification(#[source] anyhow::Error), + #[error("internal error: {0:#}")] + Internal(#[from] anyhow::Error), +} + impl StateMachine { /// Processes a leader commit message. We can approve this leader message even if we /// don't have the block proposal stored. It is enough to see the justification. @@ -23,7 +45,7 @@ impl StateMachine { // Check that it comes from the correct leader. if author != &consensus.view_leader(view) { - return Err(Error::LeaderCommitInvalidLeader { + return Err(Error::InvalidLeader { correct_leader: consensus.view_leader(view), received_leader: author.clone(), }); @@ -31,7 +53,7 @@ impl StateMachine { // If the message is from the "past", we discard it. if (view, validator::Phase::Commit) < (self.view, self.phase) { - return Err(Error::LeaderCommitOld { + return Err(Error::Old { current_view: self.view, current_phase: self.phase, }); @@ -40,9 +62,7 @@ impl StateMachine { // ----------- Checking the signed part of the message -------------- // Check the signature on the message. - signed_message - .verify() - .map_err(Error::LeaderCommitInvalidSignature)?; + signed_message.verify().map_err(Error::InvalidSignature)?; // ----------- Checking the justification of the message -------------- @@ -50,7 +70,7 @@ impl StateMachine { message .justification .verify(&consensus.validator_set, consensus.threshold()) - .map_err(Error::LeaderCommitInvalidJustification)?; + .map_err(Error::InvalidJustification)?; // ----------- All checks finished. Now we process the message. -------------- @@ -65,7 +85,8 @@ impl StateMachine { // Start a new view. But first we skip to the view of this message. self.view = view; - self.start_new_view(ctx, consensus)?; + self.start_new_view(ctx, consensus) + .context("start_new_view()")?; Ok(()) } diff --git a/node/actors/consensus/src/replica/leader_prepare.rs b/node/actors/consensus/src/replica/leader_prepare.rs index 4ddfa33f..7eee6ac9 100644 --- a/node/actors/consensus/src/replica/leader_prepare.rs +++ b/node/actors/consensus/src/replica/leader_prepare.rs @@ -1,11 +1,71 @@ use super::StateMachine; -use crate::{inner::ConsensusInner, replica::error::Error}; +use crate::inner::ConsensusInner; +use anyhow::Context as _; use concurrency::ctx; use network::io::{ConsensusInputMessage, Target}; use roles::validator; use std::collections::HashMap; use tracing::instrument; +#[derive(thiserror::Error, Debug)] +#[allow(clippy::missing_docs_in_private_items)] +pub(crate) enum Error { + #[error("invalid leader (correct leader: {correct_leader:?}, received leader: {received_leader:?}])")] + InvalidLeader { + correct_leader: validator::PublicKey, + received_leader: validator::PublicKey, + }, + #[error("message for a past view/phase (current view: {current_view:?}, current phase: {current_phase:?})")] + Old { + current_view: validator::ViewNumber, + current_phase: validator::Phase, + }, + #[error("invalid signature: {0:#}")] + InvalidSignature(#[source] crypto::bls12_381::Error), + #[error("invalid PrepareQC: {0:#}")] + InvalidPrepareQC(#[source] anyhow::Error), + #[error("invalid high QC: {0:#}")] + InvalidHighQC(#[source] anyhow::Error), + #[error( + "high QC of a future view (high QC view: {high_qc_view:?}, current view: {current_view:?}" + )] + HighQCOfFutureView { + high_qc_view: validator::ViewNumber, + current_view: validator::ViewNumber, + }, + #[error("new block proposal when the previous proposal was not finalized")] + ProposalWhenPreviousNotFinalized, + #[error("block proposal with invalid parent hash (correct parent hash: {correct_parent_hash:#?}, received parent hash: {received_parent_hash:#?}, block: {header:?})")] + ProposalInvalidParentHash { + correct_parent_hash: validator::BlockHeaderHash, + received_parent_hash: validator::BlockHeaderHash, + header: validator::BlockHeader, + }, + #[error("block proposal with non-sequential number (correct proposal number: {correct_number}, received proposal number: {received_number}, block: {header:?})")] + ProposalNonSequentialNumber { + correct_number: validator::BlockNumber, + received_number: validator::BlockNumber, + header: validator::BlockHeader, + }, + #[error("block proposal with mismatched payload")] + ProposalMismatchedPayload, + #[error( + "block proposal with an oversized payload (payload size: {payload_size}, block: {header:?}" + )] + ProposalOversizedPayload { + payload_size: usize, + header: validator::BlockHeader, + }, + #[error("block re-proposal without quorum for the re-proposal")] + ReproposalWithoutQuorum, + #[error("block re-proposal when the previous proposal was finalized")] + ReproposalWhenFinalized, + #[error("block re-proposal of invalid block")] + ReproposalInvalidBlock, + #[error("internal error: {0:#}")] + Internal(#[from] anyhow::Error), +} + impl StateMachine { /// Processes a leader prepare message. #[instrument(level = "trace", ret)] @@ -20,17 +80,11 @@ impl StateMachine { // Unwrap message. let message = &signed_message.msg; let author = &signed_message.key; - let view = message - .justification - .map - .first_key_value() - .ok_or(Error::LeaderPrepareJustificationWithEmptyMap)? - .0 - .view; + let view = message.view; // Check that it comes from the correct leader. if author != &consensus.view_leader(view) { - return Err(Error::LeaderPrepareInvalidLeader { + return Err(Error::InvalidLeader { correct_leader: consensus.view_leader(view), received_leader: author.clone(), }); @@ -38,7 +92,7 @@ impl StateMachine { // If the message is from the "past", we discard it. if (view, validator::Phase::Prepare) < (self.view, self.phase) { - return Err(Error::LeaderPrepareOld { + return Err(Error::Old { current_view: self.view, current_phase: self.phase, }); @@ -46,39 +100,32 @@ impl StateMachine { // ----------- Checking the signed part of the message -------------- - signed_message - .verify() - .map_err(Error::LeaderPrepareInvalidSignature)?; + signed_message.verify().map_err(Error::InvalidSignature)?; // ----------- Checking the justification of the message -------------- // Verify the PrepareQC. message .justification - .verify(&consensus.validator_set, consensus.threshold()) - .map_err(Error::LeaderPrepareInvalidPrepareQC)?; + .verify(view, &consensus.validator_set, consensus.threshold()) + .map_err(Error::InvalidPrepareQC)?; // Get the highest block voted and check if there's a quorum of votes for it. To have a quorum // in this situation, we require 2*f+1 votes, where f is the maximum number of faulty replicas. let mut vote_count: HashMap<_, usize> = HashMap::new(); for (msg, signers) in &message.justification.map { - *vote_count - .entry(( - msg.high_vote.proposal_block_number, - msg.high_vote.proposal_block_hash, - )) - .or_default() += signers.len(); + *vote_count.entry(msg.high_vote.proposal).or_default() += signers.len(); } - let highest_vote = vote_count - .iter() + let highest_vote: Option = vote_count + .into_iter() // We only take one value from the iterator because there can only be at most one block with a quorum of 2f+1 votes. - .find(|(_, v)| **v > 2 * consensus.faulty_replicas()) + .find(|(_, v)| *v > 2 * consensus.faulty_replicas()) .map(|(h, _)| h); // Get the highest CommitQC and verify it. - let highest_qc = message + let highest_qc: validator::CommitQC = message .justification .map .keys() @@ -89,13 +136,13 @@ impl StateMachine { highest_qc .verify(&consensus.validator_set, consensus.threshold()) - .map_err(Error::LeaderPrepareInvalidHighQC)?; + .map_err(Error::InvalidHighQC)?; // If the high QC is for a future view, we discard the message. // This check is not necessary for correctness, but it's useful to // guarantee that our messages don't contain QCs from the future. if highest_qc.message.view >= view { - return Err(Error::LeaderPrepareHighQCOfFutureView { + return Err(Error::HighQCOfFutureView { high_qc_view: highest_qc.message.view, current_view: view, }); @@ -108,78 +155,68 @@ impl StateMachine { // ----------- Checking the block proposal -------------- // Check that the proposal is valid. - let (proposal_block_number, proposal_block_hash, proposal_block) = match &message.proposal { + match &message.proposal_payload { // The leader proposed a new block. - validator::Proposal::New(block) => { + Some(payload) => { + // Check that the payload doesn't exceed the maximum size. + if payload.0.len() > ConsensusInner::PAYLOAD_MAX_SIZE { + return Err(Error::ProposalOversizedPayload { + payload_size: payload.0.len(), + header: message.proposal, + }); + } + + // Check that payload matches the header + if message.proposal.payload != payload.hash() { + return Err(Error::ProposalMismatchedPayload); + } + // Check that we finalized the previous block. if highest_vote.is_some() - && highest_vote - != Some(&( - highest_qc.message.proposal_block_number, - highest_qc.message.proposal_block_hash, - )) + && highest_vote.as_ref() != Some(&highest_qc.message.proposal) { - return Err(Error::LeaderPrepareProposalWhenPreviousNotFinalized); + return Err(Error::ProposalWhenPreviousNotFinalized); } - if highest_qc.message.proposal_block_hash != block.parent { - return Err(Error::LeaderPrepareProposalInvalidParentHash { - correct_parent_hash: highest_qc.message.proposal_block_hash, - received_parent_hash: block.parent, - block: block.clone(), + // Parent hash should match. + if highest_qc.message.proposal.hash() != message.proposal.parent { + return Err(Error::ProposalInvalidParentHash { + correct_parent_hash: highest_qc.message.proposal.hash(), + received_parent_hash: message.proposal.parent, + header: message.proposal, }); } - if highest_qc.message.proposal_block_number.next() != block.number { - return Err(Error::LeaderPrepareProposalNonSequentialNumber { - correct_number: highest_qc.message.proposal_block_number.next().0, - received_number: block.number.0, - block: block.clone(), + // Block number should match. + if highest_qc.message.proposal.number.next() != message.proposal.number { + return Err(Error::ProposalNonSequentialNumber { + correct_number: highest_qc.message.proposal.number.next(), + received_number: message.proposal.number, + header: message.proposal, }); } - - // Check that the payload doesn't exceed the maximum size. - if block.payload.len() > ConsensusInner::PAYLOAD_MAX_SIZE { - return Err(Error::LeaderPrepareProposalOversizedPayload { - payload_size: block.payload.len(), - block: block.clone(), - }); - } - - (block.number, block.hash(), Some(block)) } // The leader is re-proposing a past block. - validator::Proposal::Retry(commit_vote) => { - if highest_vote.is_none() - || highest_vote - == Some(&( - highest_qc.message.proposal_block_number, - highest_qc.message.proposal_block_hash, - )) - { - return Err(Error::LeaderPrepareReproposalWhenFinalized); + None => { + let Some(highest_vote) = highest_vote else { + return Err(Error::ReproposalWithoutQuorum); + }; + if highest_vote == highest_qc.message.proposal { + return Err(Error::ReproposalWhenFinalized); } - - if highest_vote.unwrap() - != &( - commit_vote.proposal_block_number, - commit_vote.proposal_block_hash, - ) - { - return Err(Error::LeaderPrepareReproposalInvalidBlock); + if highest_vote != message.proposal { + return Err(Error::ReproposalInvalidBlock); } - - (highest_vote.unwrap().0, highest_vote.unwrap().1, None) } - }; + } // ----------- All checks finished. Now we process the message. -------------- // Create our commit vote. let commit_vote = validator::ReplicaCommit { + protocol_version: validator::CURRENT_VERSION, view, - proposal_block_hash, - proposal_block_number, + proposal: message.proposal, }; // Update the state machine. @@ -192,21 +229,15 @@ impl StateMachine { } // If we received a new block proposal, store it in our cache. - if let Some(block) = proposal_block { - match self.block_proposal_cache.get_mut(&proposal_block_number) { - Some(map) => { - map.insert(proposal_block_hash, block.clone()); - } - None => { - let mut map = HashMap::new(); - map.insert(proposal_block_hash, block.clone()); - self.block_proposal_cache.insert(proposal_block_number, map); - } - } + if let Some(payload) = &message.proposal_payload { + self.block_proposal_cache + .entry(message.proposal.number) + .or_default() + .insert(payload.hash(), payload.clone()); } // Backup our state. - self.backup_state(ctx).map_err(Error::ReplicaStateSave)?; + self.backup_state(ctx).context("backup_state()")?; // Send the replica message to the leader. let output_message = ConsensusInputMessage { diff --git a/node/actors/consensus/src/replica/mod.rs b/node/actors/consensus/src/replica/mod.rs index 556aab63..e76e045f 100644 --- a/node/actors/consensus/src/replica/mod.rs +++ b/node/actors/consensus/src/replica/mod.rs @@ -3,7 +3,6 @@ //! node will perform both the replica and leader roles simultaneously. mod block; -mod error; mod leader_commit; mod leader_prepare; mod new_view; diff --git a/node/actors/consensus/src/replica/new_view.rs b/node/actors/consensus/src/replica/new_view.rs index e0f84663..078fe488 100644 --- a/node/actors/consensus/src/replica/new_view.rs +++ b/node/actors/consensus/src/replica/new_view.rs @@ -1,5 +1,6 @@ -use super::{error::Error, StateMachine}; +use super::StateMachine; use crate::ConsensusInner; +use anyhow::Context as _; use concurrency::ctx; use network::io::{ConsensusInputMessage, Target}; use roles::validator; @@ -12,7 +13,7 @@ impl StateMachine { &mut self, ctx: &ctx::Ctx, consensus: &ConsensusInner, - ) -> Result<(), Error> { + ) -> anyhow::Result<()> { tracing::info!("Starting view {}", self.view.next().0); // Update the state machine. @@ -23,10 +24,10 @@ impl StateMachine { // Clear the block cache. self.block_proposal_cache - .retain(|k, _| k > &self.high_qc.message.proposal_block_number); + .retain(|k, _| k > &self.high_qc.message.proposal.number); // Backup our state. - self.backup_state(ctx).map_err(Error::ReplicaStateSave)?; + self.backup_state(ctx).context("backup_state")?; // Send the replica message to the next leader. let output_message = ConsensusInputMessage { @@ -34,6 +35,7 @@ impl StateMachine { .secret_key .sign_msg(validator::ConsensusMsg::ReplicaPrepare( validator::ReplicaPrepare { + protocol_version: validator::CURRENT_VERSION, view: next_view, high_vote: self.high_vote, high_qc: self.high_qc.clone(), diff --git a/node/actors/consensus/src/replica/state_machine.rs b/node/actors/consensus/src/replica/state_machine.rs index c3d373c8..9f252d6e 100644 --- a/node/actors/consensus/src/replica/state_machine.rs +++ b/node/actors/consensus/src/replica/state_machine.rs @@ -1,4 +1,5 @@ -use crate::{metrics, replica::error::Error, ConsensusInner}; +use crate::{metrics, ConsensusInner}; +use anyhow::Context as _; use concurrency::{ctx, metrics::LatencyHistogramExt as _, scope, time}; use roles::validator; use std::{ @@ -22,7 +23,7 @@ pub(crate) struct StateMachine { pub(crate) high_qc: validator::CommitQC, /// A cache of the received block proposals. pub(crate) block_proposal_cache: - BTreeMap>, + BTreeMap>, /// The deadline to receive an input message. pub(crate) timeout_deadline: time::Deadline, /// A reference to the storage module. We use it to backup the replica state. @@ -37,15 +38,24 @@ impl StateMachine { storage: Arc, ) -> anyhow::Result { Ok(match storage.replica_state(ctx).await? { - Some(backup) => Self { - view: backup.view, - phase: backup.phase, - high_vote: backup.high_vote, - high_qc: backup.high_qc, - block_proposal_cache: backup.block_proposal_cache, - timeout_deadline: time::Deadline::Infinite, - storage, - }, + Some(backup) => { + let mut block_proposal_cache: BTreeMap<_, HashMap<_, _>> = BTreeMap::new(); + for p in backup.proposals { + block_proposal_cache + .entry(p.number) + .or_default() + .insert(p.payload.hash(), p.payload); + } + Self { + view: backup.view, + phase: backup.phase, + high_vote: backup.high_vote, + high_qc: backup.high_qc, + block_proposal_cache, + timeout_deadline: time::Deadline::Infinite, + storage, + } + } None => { let head = storage.head_block(ctx).await?; Self { @@ -69,9 +79,10 @@ impl StateMachine { &mut self, ctx: &ctx::Ctx, consensus: &ConsensusInner, - ) -> Result<(), Error> { + ) -> anyhow::Result<()> { if self.view == validator::ViewNumber(0) { self.start_new_view(ctx, consensus) + .context("start_new_view") } else { self.reset_timer(ctx); Ok(()) @@ -96,38 +107,56 @@ impl StateMachine { }; let now = ctx.now(); - let (label, result) = match &signed_msg.msg { - validator::ConsensusMsg::LeaderPrepare(_) => ( - metrics::ConsensusMsgLabel::LeaderPrepare, - self.process_leader_prepare(ctx, consensus, signed_msg.cast().unwrap()), - ), - validator::ConsensusMsg::LeaderCommit(_) => ( - metrics::ConsensusMsgLabel::LeaderCommit, - self.process_leader_commit(ctx, consensus, signed_msg.cast().unwrap()), - ), + let label = match &signed_msg.msg { + validator::ConsensusMsg::LeaderPrepare(_) => { + let res = + match self.process_leader_prepare(ctx, consensus, signed_msg.cast().unwrap()) { + Err(super::leader_prepare::Error::Internal(err)) => { + return Err(err).context("process_leader_prepare()") + } + Err(err) => { + tracing::warn!("process_leader_prepare(): {err:#}"); + Err(()) + } + Ok(()) => Ok(()), + }; + metrics::ConsensusMsgLabel::LeaderPrepare.with_result(&res) + } + validator::ConsensusMsg::LeaderCommit(_) => { + let res = + match self.process_leader_commit(ctx, consensus, signed_msg.cast().unwrap()) { + Err(super::leader_commit::Error::Internal(err)) => { + return Err(err).context("process_leader_commit()") + } + Err(err) => { + tracing::warn!("process_leader_commit(): {err:#}"); + Err(()) + } + Ok(()) => Ok(()), + }; + metrics::ConsensusMsgLabel::LeaderCommit.with_result(&res) + } _ => unreachable!(), }; - metrics::METRICS.replica_processing_latency[&label.with_result(&result)] - .observe_latency(ctx.now() - now); - match result { - Ok(()) => Ok(()), - Err(err @ Error::ReplicaStateSave(_)) => Err(err.into()), - Err(err) => { - // Other errors from processing inputs are recoverable, so we just log them. - tracing::warn!("{err}"); - Ok(()) - } - } + metrics::METRICS.replica_processing_latency[&label].observe_latency(ctx.now() - now); + Ok(()) } /// Backups the replica state to disk. pub(crate) fn backup_state(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + let mut proposals = vec![]; + for (number, payloads) in &self.block_proposal_cache { + proposals.extend(payloads.values().map(|p| storage::Proposal { + number: *number, + payload: p.clone(), + })); + } let backup = storage::ReplicaState { view: self.view, phase: self.phase, high_vote: self.high_vote, high_qc: self.high_qc.clone(), - block_proposal_cache: self.block_proposal_cache.clone(), + proposals, }; let store_result = scope::run_blocking!(ctx, |ctx, s| { diff --git a/node/actors/consensus/src/replica/tests.rs b/node/actors/consensus/src/replica/tests.rs index 8789adb7..775929e8 100644 --- a/node/actors/consensus/src/replica/tests.rs +++ b/node/actors/consensus/src/replica/tests.rs @@ -10,7 +10,7 @@ async fn start_new_view_not_leader() { let rng = &mut ctx.rng(); let keys: Vec<_> = (0..4).map(|_| rng.gen()).collect(); - let (genesis, val_set) = testonly::make_genesis(&keys, vec![]); + let (genesis, val_set) = testonly::make_genesis(&keys, validator::Payload(vec![])); let (mut consensus, mut pipe) = testonly::make_consensus(ctx, &keys[0], &val_set, &genesis).await; // TODO: this test assumes a specific implementation of the leader schedule. @@ -38,6 +38,7 @@ async fn start_new_view_not_leader() { .secret_key .sign_msg(validator::ConsensusMsg::ReplicaPrepare( validator::ReplicaPrepare { + protocol_version: validator::CURRENT_VERSION, view: consensus.replica.view, high_vote: consensus.replica.high_vote, high_qc: consensus.replica.high_qc.clone(), diff --git a/node/actors/consensus/src/testonly/fuzz.rs b/node/actors/consensus/src/testonly/fuzz.rs index 1b321fda..e4b95c78 100644 --- a/node/actors/consensus/src/testonly/fuzz.rs +++ b/node/actors/consensus/src/testonly/fuzz.rs @@ -35,10 +35,11 @@ impl Fuzz for validator::ConsensusMsg { impl Fuzz for validator::ReplicaPrepare { fn mutate(&mut self, rng: &mut impl Rng) { - match rng.gen_range(0..2) { + match rng.gen_range(0..4) { 0 => self.view = rng.gen(), 1 => self.high_vote.mutate(rng), 2 => self.high_qc.mutate(rng), + 3 => self.protocol_version = rng.gen(), _ => unreachable!(), } } @@ -48,8 +49,8 @@ impl Fuzz for validator::ReplicaCommit { fn mutate(&mut self, rng: &mut impl Rng) { match rng.gen_range(0..3) { 0 => self.view = rng.gen(), - 1 => self.proposal_block_hash = rng.gen(), - 2 => self.proposal_block_number = rng.gen(), + 1 => self.proposal.mutate(rng), + 2 => self.protocol_version = rng.gen(), _ => unreachable!(), } } @@ -57,26 +58,22 @@ impl Fuzz for validator::ReplicaCommit { impl Fuzz for validator::LeaderPrepare { fn mutate(&mut self, rng: &mut impl Rng) { - match rng.gen_range(0..2) { + match rng.gen_range(0..3) { 0 => self.proposal.mutate(rng), 1 => self.justification.mutate(rng), + 2 => self.protocol_version = rng.gen(), _ => unreachable!(), } } } -impl Fuzz for validator::Proposal { - fn mutate(&mut self, rng: &mut impl Rng) { - match self { - validator::Proposal::New(x) => x.mutate(rng), - validator::Proposal::Retry(x) => x.mutate(rng), - }; - } -} - impl Fuzz for validator::LeaderCommit { fn mutate(&mut self, rng: &mut impl Rng) { - self.justification.mutate(rng); + match rng.gen_range(0..2) { + 0 => self.justification.mutate(rng), + 1 => self.protocol_version = rng.gen(), + _ => unreachable!(), + } } } @@ -155,21 +152,22 @@ impl Fuzz for validator::Signers { } } -impl Fuzz for validator::Block { +impl Fuzz for validator::Payload { + fn mutate(&mut self, rng: &mut impl Rng) { + // Push bytes into the payload until it exceeds the limit. + let num_bytes = crate::ConsensusInner::PAYLOAD_MAX_SIZE + 1 - self.0.len(); + let bytes: Vec = (0..num_bytes).map(|_| rng.gen()).collect(); + self.0.extend_from_slice(&bytes); + assert!(self.0.len() > crate::ConsensusInner::PAYLOAD_MAX_SIZE); + } +} + +impl Fuzz for validator::BlockHeader { fn mutate(&mut self, rng: &mut impl Rng) { match rng.gen_range(0..3) { 0 => self.parent = rng.gen(), 1 => self.number = rng.gen(), - 2 => { - // Push bytes into the payload until it exceeds the limit. - let num_bytes = crate::ConsensusInner::PAYLOAD_MAX_SIZE + 1 - self.payload.len(); - - let bytes: Vec = (0..num_bytes).map(|_| rng.gen()).collect(); - - self.payload.extend_from_slice(&bytes); - - assert!(self.payload.len() > crate::ConsensusInner::PAYLOAD_MAX_SIZE); - } + 2 => self.payload = rng.gen(), _ => unreachable!(), } } diff --git a/node/actors/consensus/src/testonly/make.rs b/node/actors/consensus/src/testonly/make.rs index 00c187c0..9af2b6d4 100644 --- a/node/actors/consensus/src/testonly/make.rs +++ b/node/actors/consensus/src/testonly/make.rs @@ -45,23 +45,24 @@ pub async fn make_consensus( /// and a validator set for the chain. pub fn make_genesis( keys: &[validator::SecretKey], - payload: Vec, + payload: validator::Payload, ) -> (validator::FinalBlock, validator::ValidatorSet) { - let block = validator::Block::genesis(payload); + let header = validator::BlockHeader::genesis(payload.hash()); let validator_set = validator::ValidatorSet::new(keys.iter().map(|k| k.public())).unwrap(); let signed_messages: Vec<_> = keys .iter() .map(|sk| { sk.sign_msg(validator::ReplicaCommit { + protocol_version: validator::CURRENT_VERSION, view: validator::ViewNumber(0), - proposal_block_hash: block.hash(), - proposal_block_number: validator::BlockNumber(0), + proposal: header, }) }) .collect(); let final_block = validator::FinalBlock { + header, + payload, justification: validator::CommitQC::from(&signed_messages, &validator_set).unwrap(), - block, }; (final_block, validator_set) } diff --git a/node/actors/consensus/src/testonly/run.rs b/node/actors/consensus/src/testonly/run.rs index 5e146217..8735d0b6 100644 --- a/node/actors/consensus/src/testonly/run.rs +++ b/node/actors/consensus/src/testonly/run.rs @@ -49,7 +49,7 @@ impl Test { .collect(); assert!(!honest.is_empty()); - let mut finalized: HashMap = + let mut finalized: HashMap = HashMap::new(); let mut observers: HashMap> = HashMap::new(); @@ -60,10 +60,10 @@ impl Test { if !honest.contains(&metric.validator) { continue; } - let block = metric.finalized_block.block; - let hash = block.hash(); - assert_eq!(*finalized.entry(block.number).or_insert(hash), hash); - let observers = observers.entry(block.number).or_default(); + let block = metric.finalized_block; + let hash = block.header.hash(); + assert_eq!(*finalized.entry(block.header.number).or_insert(hash), hash); + let observers = observers.entry(block.header.number).or_default(); if observers.insert(metric.validator.clone()) && observers.len() == honest.len() { fully_observed += 1; } @@ -85,7 +85,7 @@ async fn run_nodes( .iter() .map(|r| r.net.state().cfg().consensus.key.clone()) .collect(); - let (genesis_block, _) = testonly::make_genesis(&keys, vec![]); + let (genesis_block, _) = testonly::make_genesis(&keys, validator::Payload(vec![])); let network_ready = signal::Once::new(); let mut network_pipes = HashMap::new(); let mut network_send = HashMap::new(); diff --git a/node/actors/executor/src/lib/io.rs b/node/actors/executor/src/lib/io.rs index cfe684b8..c0cc6bc5 100644 --- a/node/actors/executor/src/lib/io.rs +++ b/node/actors/executor/src/lib/io.rs @@ -56,7 +56,7 @@ impl Dispatcher { ConsensusOutputMessage::FinalizedBlock(b) => { let number_metric = &metrics::METRICS.finalized_block_number; let current_number = number_metric.get(); - number_metric.set(current_number.max(b.block.number.0)); + number_metric.set(current_number.max(b.header.number.0)); // This works because this is the only place where `finalized_block_number` // is modified, and there should be a single running `Dispatcher`. } diff --git a/node/actors/executor/src/main.rs b/node/actors/executor/src/main.rs index 9fbff851..afb8e180 100644 --- a/node/actors/executor/src/main.rs +++ b/node/actors/executor/src/main.rs @@ -143,7 +143,7 @@ async fn main() -> anyhow::Result<()> { let storage = storage.clone(); loop { let block_finalized = storage.head_block(ctx).await.context("head_block")?; - let block_finalized = block_finalized.block.number.0; + let block_finalized = block_finalized.header.number.0; tracing::info!("current finalized block {}", block_finalized); if block_finalized > 100 { diff --git a/node/actors/network/src/gossip/tests.rs b/node/actors/network/src/gossip/tests.rs index 1180c74a..5256548f 100644 --- a/node/actors/network/src/gossip/tests.rs +++ b/node/actors/network/src/gossip/tests.rs @@ -346,8 +346,7 @@ impl NetworkState { state, response, } => { - let last_block_number = - state.last_stored_block.message.proposal_block_number; + let last_block_number = state.last_stored_block.message.proposal.number; if last_block_number == expected_latest_block_number { // We might receive outdated states, hence this check received_states_by_peer.insert(peer.clone(), *state.clone()); @@ -406,7 +405,7 @@ async fn uncoordinated_block_syncing( s.spawn(async { for state in states { ctx.sleep(state_generation_interval).await?; - let last_block_number = state.last_stored_block.message.proposal_block_number; + let last_block_number = state.last_stored_block.message.proposal.number; tracing::debug!("Generated `SyncState` with last block number {last_block_number}"); state_sender.send_replace(state); } @@ -445,7 +444,7 @@ async fn run_mock_uncoordinated_dispatcher( state, response, } => { - let last_block_number = state.last_stored_block.message.proposal_block_number; + let last_block_number = state.last_stored_block.message.proposal.number; tracing::debug!( "Node {node_key:?} received update with block number {last_block_number} from {peer:?}" ); diff --git a/node/actors/network/src/io.rs b/node/actors/network/src/io.rs index 6cc3f0b1..8a3ff6ef 100644 --- a/node/actors/network/src/io.rs +++ b/node/actors/network/src/io.rs @@ -74,12 +74,9 @@ impl SyncState { /// Returns numbers for block QCs contained in this state. pub fn numbers(&self) -> SyncStateNumbers { SyncStateNumbers { - first_stored_block: self.first_stored_block.message.proposal_block_number, - last_contiguous_stored_block: self - .last_contiguous_stored_block - .message - .proposal_block_number, - last_stored_block: self.last_stored_block.message.proposal_block_number, + first_stored_block: self.first_stored_block.message.proposal.number, + last_contiguous_stored_block: self.last_contiguous_stored_block.message.proposal.number, + last_stored_block: self.last_stored_block.message.proposal.number, } } } diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index 19d3bbe1..cbc92fab 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -212,7 +212,7 @@ impl SyncState { last_contiguous_stored_block: rng.gen(), last_stored_block: rng.gen(), }; - this.last_stored_block.message.proposal_block_number = number; + this.last_stored_block.message.proposal.number = number; this } } diff --git a/node/actors/sync_blocks/src/peers/mod.rs b/node/actors/sync_blocks/src/peers/mod.rs index 44817d6c..7eb70009 100644 --- a/node/actors/sync_blocks/src/peers/mod.rs +++ b/node/actors/sync_blocks/src/peers/mod.rs @@ -86,7 +86,7 @@ impl PeerStates { scope::run!(ctx, |ctx, s| async { let start_number = storage.last_contiguous_block_number(ctx).await?; - let mut last_block_number = storage.head_block(ctx).await?.block.number; + let mut last_block_number = storage.head_block(ctx).await?.header.number; let missing_blocks = storage .missing_block_numbers(ctx, start_number..last_block_number) .await?; @@ -334,16 +334,16 @@ impl PeerStates { fn validate_block(&self, block_number: BlockNumber, block: &FinalBlock) -> anyhow::Result<()> { anyhow::ensure!( - block.block.number == block_number, + block.header.number == block_number, "Block does not have requested number" ); anyhow::ensure!( - block.block.number == block.justification.message.proposal_block_number, - "Block numbers in `block` and quorum certificate don't match" + block.payload.hash() == block.header.payload, + "Block payload doesn't match the block header", ); anyhow::ensure!( - block.block.hash() == block.justification.message.proposal_block_hash, - "Block hashes in `block` and quorum certificate don't match" + block.header == block.justification.message.proposal, + "Quorum certificate proposal doesn't match the block header", ); block diff --git a/node/actors/sync_blocks/src/peers/tests.rs b/node/actors/sync_blocks/src/peers/tests.rs index 5175a5c6..3b588169 100644 --- a/node/actors/sync_blocks/src/peers/tests.rs +++ b/node/actors/sync_blocks/src/peers/tests.rs @@ -4,6 +4,7 @@ use assert_matches::assert_matches; use async_trait::async_trait; use concurrency::time; use rand::{rngs::StdRng, seq::IteratorRandom, Rng}; +use roles::validator; use std::{collections::HashSet, fmt}; use storage::RocksdbStorage; use test_casing::{test_casing, Product}; @@ -825,49 +826,25 @@ async fn processing_invalid_sync_states() { let mut invalid_sync_state = test_validators.sync_state(1); invalid_sync_state.first_stored_block = test_validators.final_blocks[2].justification.clone(); - let err = peer_states - .validate_sync_state(invalid_sync_state) - .unwrap_err(); - let err = format!("{err:?}"); - assert!(err.contains("first_stored_block"), "{err}"); + assert!(peer_states.validate_sync_state(invalid_sync_state).is_err()); let mut invalid_sync_state = test_validators.sync_state(1); invalid_sync_state.last_contiguous_stored_block = test_validators.final_blocks[2].justification.clone(); - let err = peer_states - .validate_sync_state(invalid_sync_state) - .unwrap_err(); - let err = format!("{err:?}"); - assert!(err.contains("last_contiguous_stored_block"), "{err}"); + assert!(peer_states.validate_sync_state(invalid_sync_state).is_err()); let mut invalid_sync_state = test_validators.sync_state(1); invalid_sync_state .last_contiguous_stored_block .message - .proposal_block_number = BlockNumber(5); - invalid_sync_state - .last_stored_block - .message - .proposal_block_number = BlockNumber(5); - let err = peer_states - .validate_sync_state(invalid_sync_state) - .unwrap_err(); - let err = format!("{err:?}"); - assert!( - err.contains("Failed verifying `last_contiguous_stored_block`"), - "{err}" - ); + .proposal + .number = BlockNumber(5); + invalid_sync_state.last_stored_block.message.proposal.number = BlockNumber(5); + assert!(peer_states.validate_sync_state(invalid_sync_state).is_err()); let other_network = TestValidators::new(4, 2, rng); let invalid_sync_state = other_network.sync_state(1); - let err = peer_states - .validate_sync_state(invalid_sync_state) - .unwrap_err(); - let err = format!("{err:?}"); - assert!( - err.contains("Failed verifying `last_contiguous_stored_block`"), - "{err}" - ); + assert!(peer_states.validate_sync_state(invalid_sync_state).is_err()); } #[tokio::test] @@ -883,39 +860,25 @@ async fn processing_invalid_blocks() { let (peer_states, _) = PeerStates::new(message_sender, storage, test_validators.test_config()); let invalid_block = &test_validators.final_blocks[0]; - let err = peer_states + assert!(peer_states .validate_block(BlockNumber(1), invalid_block) - .unwrap_err(); - let err = format!("{err:?}"); - assert!(err.contains("does not have requested number"), "{err}"); + .is_err()); let mut invalid_block = test_validators.final_blocks[1].clone(); invalid_block.justification = test_validators.final_blocks[0].justification.clone(); - let err = peer_states + assert!(peer_states .validate_block(BlockNumber(1), &invalid_block) - .unwrap_err(); - let err = format!("{err:?}"); - assert!( - err.contains("numbers in `block` and quorum certificate"), - "{err}" - ); + .is_err()); let mut invalid_block = test_validators.final_blocks[1].clone(); - invalid_block.block.payload = b"invalid".to_vec(); - let err = peer_states + invalid_block.payload = validator::Payload(b"invalid".to_vec()); + assert!(peer_states .validate_block(BlockNumber(1), &invalid_block) - .unwrap_err(); - let err = format!("{err:?}"); - assert!( - err.contains("hashes in `block` and quorum certificate"), - "{err}" - ); + .is_err()); let other_network = TestValidators::new(4, 2, rng); let invalid_block = &other_network.final_blocks[1]; - let err = peer_states + assert!(peer_states .validate_block(BlockNumber(1), invalid_block) - .unwrap_err(); - let err = format!("{err:?}"); - assert!(err.contains("verifying quorum certificate"), "{err}"); + .is_err()); } diff --git a/node/actors/sync_blocks/src/tests/mod.rs b/node/actors/sync_blocks/src/tests/mod.rs index 35061374..dd6619dd 100644 --- a/node/actors/sync_blocks/src/tests/mod.rs +++ b/node/actors/sync_blocks/src/tests/mod.rs @@ -7,7 +7,11 @@ use rand::{ distributions::{Distribution, Standard}, Rng, }; -use roles::validator::{self, Block, BlockNumber, CommitQC, FinalBlock, ValidatorSet}; +use roles::validator::{ + self, + testonly::{make_block, make_genesis_block}, + BlockHeader, BlockNumber, CommitQC, FinalBlock, Payload, ValidatorSet, +}; use std::iter; use storage::RocksdbStorage; use utils::pipe; @@ -44,17 +48,15 @@ impl TestValidators { final_blocks: vec![], }; - let mut latest_block = Block::genesis(vec![]); + let payload = Payload(vec![]); + let mut latest_block = BlockHeader::genesis(payload.hash()); let final_blocks = (0..block_count).map(|_| { let final_block = FinalBlock { - block: latest_block.clone(), + header: latest_block, + payload: payload.clone(), justification: this.certify_block(&latest_block), }; - latest_block = Block { - parent: latest_block.hash(), - number: latest_block.number.next(), - payload: vec![], - }; + latest_block = BlockHeader::new(&latest_block, payload.hash()); final_block }); this.final_blocks = final_blocks.collect(); @@ -65,11 +67,11 @@ impl TestValidators { Config::new(self.validator_set.clone(), self.validator_secret_keys.len()).unwrap() } - fn certify_block(&self, block: &Block) -> CommitQC { + fn certify_block(&self, proposal: &BlockHeader) -> CommitQC { let message_to_sign = validator::ReplicaCommit { - view: validator::ViewNumber(block.number.0), - proposal_block_hash: block.hash(), - proposal_block_number: block.number, + protocol_version: validator::CURRENT_VERSION, + view: validator::ViewNumber(proposal.number.0), + proposal: *proposal, }; let signed_messages: Vec<_> = self .validator_secret_keys @@ -107,13 +109,15 @@ async fn subscribing_to_state_updates() { let storage_dir = tempfile::tempdir().unwrap(); let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); - let genesis_block = create_block_from_base(Block::genesis(vec![]), rng); - let block_1 = create_block(&genesis_block, rng); - let block_2 = create_block(&block_1, rng); - let block_3 = create_block(&block_2, rng); + let genesis_block = make_genesis_block(rng); + let block_1 = make_block(rng, &genesis_block.header); + let block_2 = make_block(rng, &block_1.header); + let block_3 = make_block(rng, &block_2.header); - let storage = RocksdbStorage::new(ctx, &genesis_block, storage_dir.path()).await; - let storage = &Arc::new(storage.unwrap()); + let storage = RocksdbStorage::new(ctx, &genesis_block, storage_dir.path()) + .await + .unwrap(); + let storage = &Arc::new(storage); let (actor_pipe, _dispatcher_pipe) = pipe::new(); let actor = SyncBlocks::new(ctx, actor_pipe, storage.clone(), rng.gen()) .await @@ -174,26 +178,6 @@ async fn subscribing_to_state_updates() { .unwrap(); } -fn create_block(parent: &FinalBlock, rng: &mut impl Rng) -> FinalBlock { - let block = Block { - parent: parent.block.hash(), - number: parent.block.number.next(), - payload: Vec::new(), - }; - create_block_from_base(block, rng) -} - -fn create_block_from_base(block: Block, rng: &mut impl Rng) -> FinalBlock { - let mut justification: CommitQC = rng.gen(); - justification.message.proposal_block_number = block.number; - justification.message.proposal_block_hash = block.hash(); - - FinalBlock { - block, - justification, - } -} - #[tokio::test] async fn getting_blocks() { concurrency::testonly::abort_on_panic(); @@ -201,12 +185,12 @@ async fn getting_blocks() { let storage_dir = tempfile::tempdir().unwrap(); let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); - let genesis_block = create_block_from_base(Block::genesis(vec![]), rng); + let genesis_block = make_genesis_block(rng); let storage = RocksdbStorage::new(ctx, &genesis_block, storage_dir.path()); let storage = Arc::new(storage.await.unwrap()); let blocks = iter::successors(Some(genesis_block), |parent| { - Some(create_block(parent, rng)) + Some(make_block(rng, &parent.header)) }); let blocks: Vec<_> = blocks.take(5).collect(); for block in &blocks { diff --git a/node/libs/roles/src/validator/conv.rs b/node/libs/roles/src/validator/conv.rs index 74753aae..0b65e63d 100644 --- a/node/libs/roles/src/validator/conv.rs +++ b/node/libs/roles/src/validator/conv.rs @@ -1,17 +1,19 @@ use super::{ - AggregateSignature, Block, BlockHash, BlockNumber, CommitQC, ConsensusMsg, FinalBlock, - LeaderCommit, LeaderPrepare, Msg, MsgHash, NetAddress, Phase, PrepareQC, Proposal, PublicKey, - ReplicaCommit, ReplicaPrepare, Signature, Signed, Signers, ViewNumber, + AggregateSignature, BlockHeader, BlockHeaderHash, BlockNumber, CommitQC, ConsensusMsg, + FinalBlock, LeaderCommit, LeaderPrepare, Msg, MsgHash, NetAddress, Payload, PayloadHash, Phase, + PrepareQC, ProtocolVersion, PublicKey, ReplicaCommit, ReplicaPrepare, Signature, Signed, + Signers, ViewNumber, }; -use crate::{node::SessionId, validator}; +use crate::node::SessionId; use ::schema::{read_required, required, ProtoFmt}; use anyhow::Context as _; use crypto::ByteFmt; +use schema::proto::roles::validator as proto; use std::collections::BTreeMap; use utils::enum_util::Variant; -impl ProtoFmt for BlockHash { - type Proto = validator::schema::BlockHash; +impl ProtoFmt for BlockHeaderHash { + type Proto = proto::BlockHeaderHash; fn read(r: &Self::Proto) -> anyhow::Result { Ok(Self(ByteFmt::decode(required(&r.sha256)?)?)) } @@ -22,46 +24,60 @@ impl ProtoFmt for BlockHash { } } -impl ProtoFmt for Block { - type Proto = validator::schema::Block; +impl ProtoFmt for PayloadHash { + type Proto = proto::PayloadHash; + fn read(r: &Self::Proto) -> anyhow::Result { + Ok(Self(ByteFmt::decode(required(&r.sha256)?)?)) + } + fn build(&self) -> Self::Proto { + Self::Proto { + sha256: Some(self.0.encode()), + } + } +} + +impl ProtoFmt for BlockHeader { + type Proto = proto::BlockHeader; fn read(r: &Self::Proto) -> anyhow::Result { Ok(Self { parent: read_required(&r.parent).context("parent")?, number: BlockNumber(r.number.context("number")?), - payload: required(&r.payload).context("payload")?.clone(), + payload: read_required(&r.payload).context("payload")?, }) } fn build(&self) -> Self::Proto { Self::Proto { parent: Some(self.parent.build()), number: Some(self.number.0), - payload: Some(self.payload.clone()), + payload: Some(self.payload.build()), } } } impl ProtoFmt for FinalBlock { - type Proto = validator::schema::FinalBlock; + type Proto = proto::FinalBlock; fn read(r: &Self::Proto) -> anyhow::Result { Ok(Self { - block: read_required(&r.block).context("block")?, + header: read_required(&r.header).context("header")?, + payload: Payload(required(&r.payload).context("payload")?.clone()), justification: read_required(&r.justification).context("justification")?, }) } fn build(&self) -> Self::Proto { Self::Proto { - block: Some(self.block.build()), + header: Some(self.header.build()), + payload: Some(self.payload.0.clone()), justification: Some(self.justification.build()), } } } impl ProtoFmt for ConsensusMsg { - type Proto = validator::schema::ConsensusMsg; + type Proto = proto::ConsensusMsg; fn read(r: &Self::Proto) -> anyhow::Result { - use validator::schema::consensus_msg::T; + use proto::consensus_msg::T; Ok(match r.t.as_ref().context("missing")? { T::ReplicaPrepare(r) => { Self::ReplicaPrepare(ProtoFmt::read(r).context("ReplicaPrepare")?) @@ -73,7 +89,7 @@ impl ProtoFmt for ConsensusMsg { } fn build(&self) -> Self::Proto { - use validator::schema::consensus_msg::T; + use proto::consensus_msg::T; let t = match self { Self::ReplicaPrepare(x) => T::ReplicaPrepare(x.build()), @@ -87,11 +103,12 @@ impl ProtoFmt for ConsensusMsg { } impl ProtoFmt for ReplicaPrepare { - type Proto = validator::schema::ReplicaPrepare; + type Proto = proto::ReplicaPrepare; fn read(r: &Self::Proto) -> anyhow::Result { Ok(Self { - view: ViewNumber(r.view.context("view_number")?), + protocol_version: ProtocolVersion(r.protocol_version.context("protocol_version")?), + view: ViewNumber(*required(&r.view).context("view")?), high_vote: read_required(&r.high_vote).context("high_vote")?, high_qc: read_required(&r.high_qc).context("high_qc")?, }) @@ -99,6 +116,7 @@ impl ProtoFmt for ReplicaPrepare { fn build(&self) -> Self::Proto { Self::Proto { + protocol_version: Some(self.protocol_version.0), view: Some(self.view.0), high_vote: Some(self.high_vote.build()), high_qc: Some(self.high_qc.build()), @@ -107,90 +125,89 @@ impl ProtoFmt for ReplicaPrepare { } impl ProtoFmt for ReplicaCommit { - type Proto = validator::schema::ReplicaCommit; + type Proto = proto::ReplicaCommit; fn read(r: &Self::Proto) -> anyhow::Result { Ok(Self { - view: ViewNumber(r.view.context("view_number")?), - proposal_block_hash: read_required(&r.hash).context("hash")?, - proposal_block_number: BlockNumber(r.number.context("number")?), + protocol_version: ProtocolVersion(r.protocol_version.context("protocol_version")?), + view: ViewNumber(*required(&r.view).context("view")?), + proposal: read_required(&r.proposal).context("proposal")?, }) } fn build(&self) -> Self::Proto { Self::Proto { + protocol_version: Some(self.protocol_version.0), view: Some(self.view.0), - hash: Some(self.proposal_block_hash.build()), - number: Some(self.proposal_block_number.0), + proposal: Some(self.proposal.build()), } } } impl ProtoFmt for LeaderPrepare { - type Proto = validator::schema::LeaderPrepare; + type Proto = proto::LeaderPrepare; fn read(r: &Self::Proto) -> anyhow::Result { Ok(Self { + protocol_version: ProtocolVersion(r.protocol_version.context("protocol_version")?), + view: ViewNumber(*required(&r.view).context("view")?), proposal: read_required(&r.proposal).context("proposal")?, + proposal_payload: r.proposal_payload.as_ref().map(|p| Payload(p.clone())), justification: read_required(&r.justification).context("justification")?, }) } fn build(&self) -> Self::Proto { Self::Proto { + protocol_version: Some(self.protocol_version.0), + view: Some(self.view.0), proposal: Some(self.proposal.build()), + proposal_payload: self.proposal_payload.as_ref().map(|p| p.0.clone()), justification: Some(self.justification.build()), } } } -impl ProtoFmt for Proposal { - type Proto = validator::schema::Proposal; +impl ProtoFmt for LeaderCommit { + type Proto = proto::LeaderCommit; fn read(r: &Self::Proto) -> anyhow::Result { - use validator::schema::proposal::T; - Ok(match required(&r.t)? { - T::New(r) => Self::New(ProtoFmt::read(r).context("Block")?), - T::Retry(r) => Self::Retry(ProtoFmt::read(r).context("ReplicaCommit")?), + Ok(Self { + protocol_version: ProtocolVersion(r.protocol_version.context("protocol_version")?), + justification: read_required(&r.justification).context("justification")?, }) } fn build(&self) -> Self::Proto { - use validator::schema::proposal::T; - let t = match self { - Self::New(x) => T::New(x.build()), - Self::Retry(x) => T::Retry(x.build()), - }; - Self::Proto { t: Some(t) } + Self::Proto { + protocol_version: Some(self.protocol_version.0), + justification: Some(self.justification.build()), + } } } -impl ProtoFmt for LeaderCommit { - type Proto = validator::schema::LeaderCommit; +impl ProtoFmt for Signers { + type Proto = schema::proto::std::BitVector; fn read(r: &Self::Proto) -> anyhow::Result { - Ok(Self { - justification: read_required(&r.justification).context("justification")?, - }) + Ok(Self(ProtoFmt::read(r)?)) } fn build(&self) -> Self::Proto { - Self::Proto { - justification: Some(self.justification.build()), - } + self.0.build() } } impl ProtoFmt for PrepareQC { - type Proto = validator::schema::PrepareQc; + type Proto = proto::PrepareQc; fn read(r: &Self::Proto) -> anyhow::Result { let mut map = BTreeMap::new(); for (msg, signers) in r.msgs.iter().zip(r.signers.iter()) { map.insert( - read_required::(&Some(msg).cloned()).context("msg")?, - Signers::decode(signers).context("signers")?, + ReplicaPrepare::read(msg).context("msg")?, + Signers::read(signers).context("signers")?, ); } @@ -204,7 +221,7 @@ impl ProtoFmt for PrepareQC { let (msgs, signers) = self .map .iter() - .map(|(msg, signers)| (msg.clone().build(), signers.encode())) + .map(|(msg, signers)| (msg.build(), signers.build())) .unzip(); Self::Proto { @@ -216,12 +233,12 @@ impl ProtoFmt for PrepareQC { } impl ProtoFmt for CommitQC { - type Proto = validator::schema::CommitQc; + type Proto = proto::CommitQc; fn read(r: &Self::Proto) -> anyhow::Result { Ok(Self { message: read_required(&r.msg).context("msg")?, - signers: ByteFmt::decode(required(&r.signers).context("signers")?)?, + signers: read_required(&r.signers).context("signers")?, signature: read_required(&r.sig).context("sig")?, }) } @@ -229,17 +246,17 @@ impl ProtoFmt for CommitQC { fn build(&self) -> Self::Proto { Self::Proto { msg: Some(self.message.build()), - signers: Some(self.signers.encode()), + signers: Some(self.signers.build()), sig: Some(self.signature.build()), } } } impl ProtoFmt for Phase { - type Proto = validator::schema::Phase; + type Proto = proto::Phase; fn read(r: &Self::Proto) -> anyhow::Result { - use validator::schema::phase::T; + use proto::phase::T; Ok(match required(&r.t)? { T::Prepare(_) => Self::Prepare, T::Commit(_) => Self::Commit, @@ -247,7 +264,7 @@ impl ProtoFmt for Phase { } fn build(&self) -> Self::Proto { - use validator::schema::phase::T; + use proto::phase::T; let t = match self { Self::Prepare => T::Prepare(schema::proto::std::Void {}), Self::Commit => T::Commit(schema::proto::std::Void {}), @@ -257,7 +274,7 @@ impl ProtoFmt for Phase { } impl ProtoFmt for NetAddress { - type Proto = validator::schema::NetAddress; + type Proto = proto::NetAddress; fn read(r: &Self::Proto) -> anyhow::Result { Ok(Self { @@ -277,10 +294,10 @@ impl ProtoFmt for NetAddress { } impl ProtoFmt for Msg { - type Proto = validator::schema::Msg; + type Proto = proto::Msg; fn read(r: &Self::Proto) -> anyhow::Result { - use validator::schema::msg::T; + use proto::msg::T; Ok(match r.t.as_ref().context("missing")? { T::Consensus(r) => Self::Consensus(ProtoFmt::read(r).context("Consensus")?), T::SessionId(r) => Self::SessionId(SessionId(r.clone())), @@ -289,7 +306,7 @@ impl ProtoFmt for Msg { } fn build(&self) -> Self::Proto { - use validator::schema::msg::T; + use proto::msg::T; let t = match self { Self::Consensus(x) => T::Consensus(x.build()), @@ -302,7 +319,7 @@ impl ProtoFmt for Msg { } impl ProtoFmt for MsgHash { - type Proto = validator::schema::MsgHash; + type Proto = proto::MsgHash; fn read(r: &Self::Proto) -> anyhow::Result { Ok(Self(ByteFmt::decode(required(&r.sha256)?)?)) @@ -316,7 +333,7 @@ impl ProtoFmt for MsgHash { } impl + Clone> ProtoFmt for Signed { - type Proto = validator::schema::Signed; + type Proto = proto::Signed; fn read(r: &Self::Proto) -> anyhow::Result { Ok(Self { msg: V::extract(read_required::(&r.msg).context("msg")?)?, @@ -334,7 +351,7 @@ impl + Clone> ProtoFmt for Signed { } impl ProtoFmt for PublicKey { - type Proto = validator::schema::PublicKey; + type Proto = proto::PublicKey; fn read(r: &Self::Proto) -> anyhow::Result { Ok(Self(ByteFmt::decode(required(&r.bls12381)?)?)) @@ -348,7 +365,7 @@ impl ProtoFmt for PublicKey { } impl ProtoFmt for Signature { - type Proto = validator::schema::Signature; + type Proto = proto::Signature; fn read(r: &Self::Proto) -> anyhow::Result { Ok(Self(ByteFmt::decode(required(&r.bls12381)?)?)) @@ -362,7 +379,7 @@ impl ProtoFmt for Signature { } impl ProtoFmt for AggregateSignature { - type Proto = validator::schema::AggregateSignature; + type Proto = proto::AggregateSignature; fn read(r: &Self::Proto) -> anyhow::Result { Ok(Self(ByteFmt::decode(required(&r.bls12381)?)?)) diff --git a/node/libs/roles/src/validator/messages/block.rs b/node/libs/roles/src/validator/messages/block.rs index 314efa08..770361d3 100644 --- a/node/libs/roles/src/validator/messages/block.rs +++ b/node/libs/roles/src/validator/messages/block.rs @@ -4,6 +4,39 @@ use super::CommitQC; use crypto::{sha256, ByteFmt, Text, TextFmt}; use std::fmt; +/// Payload of the block. Consensus algorithm does not interpret the payload +/// (except for imposing a size limit for the payload). Proposing a payload +/// for a new block and interpreting the payload of the finalized blocks +/// should be implemented for the specific application of the consensus algorithm. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct Payload(pub Vec); + +/// Hash of the Payload. +#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct PayloadHash(pub(crate) sha256::Sha256); + +impl TextFmt for PayloadHash { + fn encode(&self) -> String { + format!("payload:sha256:{}", hex::encode(ByteFmt::encode(&self.0))) + } + fn decode(text: Text) -> anyhow::Result { + text.strip("payload:sha256:")?.decode_hex().map(Self) + } +} + +impl fmt::Debug for PayloadHash { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.write_str(&TextFmt::encode(self)) + } +} + +impl Payload { + /// Hash of the payload. + pub fn hash(&self) -> PayloadHash { + PayloadHash(sha256::Sha256::new(&self.0)) + } +} + /// Sequential number of the block. /// Genesis block has number 0. /// For other blocks: block.number = block.parent.number + 1. @@ -30,9 +63,9 @@ impl fmt::Display for BlockNumber { /// Hash of the block. #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct BlockHash(pub(crate) sha256::Sha256); +pub struct BlockHeaderHash(pub(crate) sha256::Sha256); -impl TextFmt for BlockHash { +impl TextFmt for BlockHeaderHash { fn encode(&self) -> String { format!( "block_hash:sha256:{}", @@ -44,55 +77,67 @@ impl TextFmt for BlockHash { } } -impl fmt::Debug for BlockHash { +impl fmt::Debug for BlockHeaderHash { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.write_str(&TextFmt::encode(self)) } } -/// A block. -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct Block { +/// A block header. +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct BlockHeader { /// Hash of the parent block. - pub parent: BlockHash, + pub parent: BlockHeaderHash, /// Number of the block. pub number: BlockNumber, /// Payload of the block. - pub payload: Vec, + pub payload: PayloadHash, } -impl Block { +impl BlockHeader { /// Returns the hash of the block. - pub fn hash(&self) -> BlockHash { - BlockHash(sha256::Sha256::new(&schema::canonical(self))) + pub fn hash(&self) -> BlockHeaderHash { + BlockHeaderHash(sha256::Sha256::new(&schema::canonical(self))) } /// Creates a genesis block. - pub fn genesis(payload: Vec) -> Block { - Block { - parent: BlockHash(sha256::Sha256::default()), + pub fn genesis(payload: PayloadHash) -> Self { + Self { + parent: BlockHeaderHash(sha256::Sha256::default()), number: BlockNumber(0), payload, } } + + /// Creates a child block for the given parent. + pub fn new(parent: &BlockHeader, payload: PayloadHash) -> Self { + Self { + parent: parent.hash(), + number: parent.number.next(), + payload, + } + } } /// A block that has been finalized by the consensus protocol. #[derive(Clone, Debug, PartialEq, Eq)] pub struct FinalBlock { - /// The block. - pub block: Block, + /// Header of the block. + pub header: BlockHeader, + /// Payload of the block. Should match `header.payload` hash. + pub payload: Payload, /// Justification for the block. What guarantees that the block is final. pub justification: CommitQC, } impl FinalBlock { /// Creates a new finalized block. - pub fn new(block: Block, justification: CommitQC) -> Self { - assert_eq!(block.hash(), justification.message.proposal_block_hash); - + pub fn new(header: BlockHeader, payload: Payload, justification: CommitQC) -> Self { + assert_eq!(header.payload, payload.hash()); + assert_eq!(header, justification.message.proposal); Self { - block, + header, + payload, justification, } } diff --git a/node/libs/roles/src/validator/messages/consensus.rs b/node/libs/roles/src/validator/messages/consensus.rs index e08b299b..1af7147f 100644 --- a/node/libs/roles/src/validator/messages/consensus.rs +++ b/node/libs/roles/src/validator/messages/consensus.rs @@ -1,13 +1,26 @@ //! Messages related to the consensus protocol. -use super::{Block, BlockHash, BlockNumber, Msg, Signed}; +use super::{BlockHeader, Msg, Payload, Signed}; use crate::validator; use anyhow::{bail, Context}; use bit_vec::BitVec; -use crypto::ByteFmt; use std::collections::{BTreeMap, BTreeSet, HashMap}; use utils::enum_util::{ErrBadVariant, Variant}; +/// Version of the consensus algorithm that the validator is using. +/// It allows to prevent misinterpretation of messages signed by validators +/// using different versions of the binaries. +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct ProtocolVersion(pub(crate) u32); + +/// We use a hardcoded protocol version for now. +/// Eventually validators should determine which version to use for which block by observing the relevant L1 contract. +/// +/// The validator binary has to support the current and next protocol version whenever +/// a protocol version update is needed (so that it can dynamically switch from producing +/// blocks for version X to version X+1). +pub const CURRENT_VERSION: ProtocolVersion = ProtocolVersion(0); + /// Consensus messages. #[allow(missing_docs)] #[derive(Clone, Debug, PartialEq, Eq)] @@ -28,6 +41,16 @@ impl ConsensusMsg { Self::LeaderCommit(_) => "LeaderCommit", } } + + /// Protocol version of this message. + pub fn protocol_version(&self) -> ProtocolVersion { + match self { + Self::ReplicaPrepare(m) => m.protocol_version, + Self::ReplicaCommit(m) => m.protocol_version, + Self::LeaderPrepare(m) => m.protocol_version, + Self::LeaderCommit(m) => m.protocol_version, + } + } } impl Variant for ReplicaPrepare { @@ -81,6 +104,8 @@ impl Variant for LeaderCommit { /// A Prepare message from a replica. #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct ReplicaPrepare { + /// Protocol version. + pub protocol_version: ProtocolVersion, /// The number of the current view. pub view: ViewNumber, /// The highest block that the replica has committed to. @@ -90,40 +115,37 @@ pub struct ReplicaPrepare { } /// A Commit message from a replica. -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct ReplicaCommit { + /// Protocol version. + pub protocol_version: ProtocolVersion, /// The number of the current view. pub view: ViewNumber, - /// The hash of the block that the replica is committing to. - pub proposal_block_hash: BlockHash, - /// The number of the block that the replica is committing to. - pub proposal_block_number: BlockNumber, + /// The header of the block that the replica is committing to. + pub proposal: BlockHeader, } /// A Prepare message from a leader. #[derive(Clone, Debug, PartialEq, Eq)] pub struct LeaderPrepare { - /// The proposal from the leader. - pub proposal: Proposal, + /// Protocol version. + pub protocol_version: ProtocolVersion, + /// The number of the current view. + pub view: ViewNumber, + /// The header of the block that the leader is proposing. + pub proposal: BlockHeader, + /// Payload of the block that the leader is proposing. + /// `None` iff this is a reproposal. + pub proposal_payload: Option, /// The PrepareQC that justifies this proposal from the leader. pub justification: PrepareQC, } -/// A proposal from a leader. -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum Proposal { - /// This is a new proposal, i.e. a new block built by this leader. - /// This happens whenever the previous block was finalized. - New(Block), - /// This a reproposal, i.e. a block that was built and proposed by - /// a previous leader but that wasn't finalized. We propose it again - /// to try to finalize it. - Retry(ReplicaCommit), -} - /// A Commit message from a leader. #[derive(Clone, Debug, PartialEq, Eq)] pub struct LeaderCommit { + /// Protocol version. + pub protocol_version: ProtocolVersion, /// The CommitQC that justifies the message from the leader. pub justification: CommitQC, } @@ -183,15 +205,13 @@ impl PrepareQC { } /// Verifies the integrity of the PrepareQC. - pub fn verify(&self, validators: &ValidatorSet, threshold: usize) -> anyhow::Result<()> { + pub fn verify( + &self, + view: ViewNumber, + validators: &ValidatorSet, + threshold: usize, + ) -> anyhow::Result<()> { // First we check that all messages are for the same view number. - let view = self - .map - .first_key_value() - .context("Empty map in PrepareQC!")? - .0 - .view; - for msg in self.map.keys() { if msg.view != view { bail!("PrepareQC contains messages for different views!"); @@ -203,10 +223,7 @@ impl PrepareQC { let mut num_signers = 0; for signer_bitmap in self.map.values() { - // When we serialize a QC, the signers bitmap may get padded with zeros at the end. - // We need to remove those zeros before we can verify the signature. - let mut signers = signer_bitmap.0.clone(); - signers.truncate(validators.len()); + let signers = signer_bitmap.0.clone(); if signers.len() != validators.len() { bail!("Bit vector in PrepareQC has wrong length!"); @@ -307,10 +324,7 @@ impl CommitQC { /// Verifies the signature of the CommitQC. pub fn verify(&self, validators: &ValidatorSet, threshold: usize) -> anyhow::Result<()> { - // When we serialize a QC, the signers bitmap may get padded with zeros at the end. - // We need to remove those zeros before we can verify the signature. - let mut signers = self.signers.0.clone(); - signers.truncate(validators.len()); + let signers = self.signers.0.clone(); // First we to do some checks on the signers bit map. if signers.len() != validators.len() { @@ -361,16 +375,6 @@ impl Signers { } } -impl ByteFmt for Signers { - fn decode(bytes: &[u8]) -> anyhow::Result { - Ok(Signers(BitVec::from_bytes(bytes))) - } - - fn encode(&self) -> Vec { - self.0.to_bytes() - } -} - /// A struct that represents a set of validators. It is used to store the current validator set. /// We represent each validator by its validator public key. #[derive(Clone, Debug, PartialEq, Eq)] diff --git a/node/libs/roles/src/validator/mod.rs b/node/libs/roles/src/validator/mod.rs index 03394a34..eb77831c 100644 --- a/node/libs/roles/src/validator/mod.rs +++ b/node/libs/roles/src/validator/mod.rs @@ -6,8 +6,7 @@ mod tests; mod conv; mod keys; mod messages; -mod testonly; +pub mod testonly; pub use keys::*; pub use messages::*; -pub use schema::proto::roles::validator as schema; diff --git a/node/libs/roles/src/validator/testonly.rs b/node/libs/roles/src/validator/testonly.rs index b4835a9b..dad8d8e8 100644 --- a/node/libs/roles/src/validator/testonly.rs +++ b/node/libs/roles/src/validator/testonly.rs @@ -1,7 +1,9 @@ +//! Test-only utilities. use super::{ - AggregateSignature, Block, BlockHash, BlockNumber, CommitQC, ConsensusMsg, FinalBlock, - LeaderCommit, LeaderPrepare, Msg, MsgHash, NetAddress, Phase, PrepareQC, Proposal, PublicKey, - ReplicaCommit, ReplicaPrepare, SecretKey, Signature, Signed, Signers, ValidatorSet, ViewNumber, + AggregateSignature, BlockHeader, BlockHeaderHash, BlockNumber, CommitQC, ConsensusMsg, + FinalBlock, LeaderCommit, LeaderPrepare, Msg, MsgHash, NetAddress, Payload, PayloadHash, Phase, + PrepareQC, ProtocolVersion, PublicKey, ReplicaCommit, ReplicaPrepare, SecretKey, Signature, + Signed, Signers, ValidatorSet, ViewNumber, CURRENT_VERSION, }; use bit_vec::BitVec; use concurrency::time; @@ -12,6 +14,46 @@ use rand::{ use std::sync::Arc; use utils::enum_util::Variant; +/// Constructs a CommitQC with `CommitQC.message.proposal` matching header. +/// WARNING: it is not a fully correct CommitQC. +pub fn make_justification(rng: &mut R, header: &BlockHeader) -> CommitQC { + CommitQC { + message: ReplicaCommit { + protocol_version: CURRENT_VERSION, + view: ViewNumber(header.number.0), + proposal: *header, + }, + signers: rng.gen(), + signature: rng.gen(), + } +} + +/// Constructs a genesis block with random payload. +/// WARNING: it is not a fully correct FinalBlock. +pub fn make_genesis_block(rng: &mut R) -> FinalBlock { + let payload: Payload = rng.gen(); + let header = BlockHeader::genesis(payload.hash()); + let justification = make_justification(rng, &header); + FinalBlock { + header, + payload, + justification, + } +} + +/// Constructs a random block with a given parent. +/// WARNING: this is not a fully correct FinalBlock. +pub fn make_block(rng: &mut R, parent: &BlockHeader) -> FinalBlock { + let payload: Payload = rng.gen(); + let header = BlockHeader::new(parent, payload.hash()); + let justification = make_justification(rng, &header); + FinalBlock { + header, + payload, + justification, + } +} + impl Distribution for Standard { fn sample(&self, rng: &mut R) -> AggregateSignature { AggregateSignature(rng.gen()) @@ -42,27 +84,46 @@ impl Distribution for Standard { } } -impl Distribution for Standard { - fn sample(&self, rng: &mut R) -> BlockHash { - BlockHash(rng.gen()) +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> ProtocolVersion { + ProtocolVersion(rng.gen()) + } +} + +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> PayloadHash { + PayloadHash(rng.gen()) + } +} + +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> BlockHeaderHash { + BlockHeaderHash(rng.gen()) } } -impl Distribution for Standard { - fn sample(&self, rng: &mut R) -> Block { - let arr_size: usize = rng.gen_range(0..11); - Block { +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> BlockHeader { + BlockHeader { parent: rng.gen(), number: rng.gen(), - payload: (0..arr_size).map(|_| rng.gen()).collect(), + payload: rng.gen(), } } } +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> Payload { + let size: usize = rng.gen_range(0..11); + Payload((0..size).map(|_| rng.gen()).collect()) + } +} + impl Distribution for Standard { fn sample(&self, rng: &mut R) -> FinalBlock { FinalBlock { - block: rng.gen(), + header: rng.gen(), + payload: rng.gen(), justification: rng.gen(), } } @@ -71,6 +132,7 @@ impl Distribution for Standard { impl Distribution for Standard { fn sample(&self, rng: &mut R) -> ReplicaPrepare { ReplicaPrepare { + protocol_version: rng.gen(), view: rng.gen(), high_vote: rng.gen(), high_qc: rng.gen(), @@ -81,9 +143,9 @@ impl Distribution for Standard { impl Distribution for Standard { fn sample(&self, rng: &mut R) -> ReplicaCommit { ReplicaCommit { + protocol_version: rng.gen(), view: rng.gen(), - proposal_block_hash: rng.gen(), - proposal_block_number: rng.gen(), + proposal: rng.gen(), } } } @@ -91,27 +153,19 @@ impl Distribution for Standard { impl Distribution for Standard { fn sample(&self, rng: &mut R) -> LeaderPrepare { LeaderPrepare { + protocol_version: rng.gen(), + view: rng.gen(), proposal: rng.gen(), + proposal_payload: rng.gen(), justification: rng.gen(), } } } -impl Distribution for Standard { - fn sample(&self, rng: &mut R) -> Proposal { - let i = rng.gen_range(0..2); - - match i { - 0 => Proposal::New(rng.gen()), - 1 => Proposal::Retry(rng.gen()), - _ => unreachable!(), - } - } -} - impl Distribution for Standard { fn sample(&self, rng: &mut R) -> LeaderCommit { LeaderCommit { + protocol_version: rng.gen(), justification: rng.gen(), } } @@ -133,8 +187,8 @@ impl Distribution for Standard { fn sample(&self, rng: &mut R) -> CommitQC { CommitQC { message: rng.gen(), - signers: rng.gen::(), - signature: rng.gen::(), + signers: rng.gen(), + signature: rng.gen(), } } } diff --git a/node/libs/roles/src/validator/tests.rs b/node/libs/roles/src/validator/tests.rs index 7bc23605..b1ceb948 100644 --- a/node/libs/roles/src/validator/tests.rs +++ b/node/libs/roles/src/validator/tests.rs @@ -36,12 +36,6 @@ fn test_byte_encoding() { ByteFmt::decode(&ByteFmt::encode(&final_block)).unwrap() ); - let signers: Signers = rng.gen(); - assert_eq!( - signers, - ByteFmt::decode(&ByteFmt::encode(&signers)).unwrap() - ); - let msg_hash: MsgHash = rng.gen(); assert_eq!( msg_hash, @@ -76,9 +70,12 @@ fn test_text_encoding() { Text::new(&t).decode::().unwrap() ); - let block_hash: BlockHash = rng.gen(); - let t = TextFmt::encode(&block_hash); - assert_eq!(block_hash, Text::new(&t).decode::().unwrap()); + let block_header_hash: BlockHeaderHash = rng.gen(); + let t = TextFmt::encode(&block_header_hash); + assert_eq!( + block_header_hash, + Text::new(&t).decode::().unwrap() + ); let final_block: FinalBlock = rng.gen(); let t = TextFmt::encode(&final_block); @@ -93,14 +90,16 @@ fn test_text_encoding() { fn test_schema_encoding() { let ctx = ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); - test_encode_random::<_, BlockHash>(rng); - test_encode_random::<_, Block>(rng); + test_encode_random::<_, PayloadHash>(rng); + test_encode_random::<_, BlockHeader>(rng); + test_encode_random::<_, BlockHeaderHash>(rng); test_encode_random::<_, FinalBlock>(rng); test_encode_random::<_, Signed>(rng); test_encode_random::<_, PrepareQC>(rng); test_encode_random::<_, CommitQC>(rng); test_encode_random::<_, Msg>(rng); test_encode_random::<_, MsgHash>(rng); + test_encode_random::<_, Signers>(rng); test_encode_random::<_, PublicKey>(rng); test_encode_random::<_, Signature>(rng); test_encode_random::<_, AggregateSignature>(rng); @@ -212,9 +211,11 @@ fn test_prepare_qc() { let sk2: SecretKey = rng.gen(); let sk3: SecretKey = rng.gen(); + let view: ViewNumber = rng.gen(); let mut msg1: ReplicaPrepare = rng.gen(); - let msg2: ReplicaPrepare = rng.gen(); - msg1.view = msg2.view; + let mut msg2: ReplicaPrepare = rng.gen(); + msg1.view = view; + msg2.view = view; let validator_set1 = ValidatorSet::new(vec![ sk1.public(), @@ -239,14 +240,14 @@ fn test_prepare_qc() { .unwrap(); // Matching validator set and enough signers. - assert!(agg_qc.verify(&validator_set1, 1).is_ok()); - assert!(agg_qc.verify(&validator_set1, 2).is_ok()); - assert!(agg_qc.verify(&validator_set1, 3).is_ok()); + assert!(agg_qc.verify(view, &validator_set1, 1).is_ok()); + assert!(agg_qc.verify(view, &validator_set1, 2).is_ok()); + assert!(agg_qc.verify(view, &validator_set1, 3).is_ok()); // Not enough signers. - assert!(agg_qc.verify(&validator_set1, 4).is_err()); + assert!(agg_qc.verify(view, &validator_set1, 4).is_err()); // Mismatching validator sets. - assert!(agg_qc.verify(&validator_set2, 3).is_err()); - assert!(agg_qc.verify(&validator_set3, 3).is_err()); + assert!(agg_qc.verify(view, &validator_set2, 3).is_err()); + assert!(agg_qc.verify(view, &validator_set3, 3).is_err()); } diff --git a/node/libs/schema/Cargo.toml b/node/libs/schema/Cargo.toml index 69961751..ea9c6941 100644 --- a/node/libs/schema/Cargo.toml +++ b/node/libs/schema/Cargo.toml @@ -11,6 +11,7 @@ name = "conformance_test" [dependencies] anyhow.workspace = true +bit-vec.workspace = true serde.workspace = true once_cell.workspace = true quick-protobuf.workspace = true diff --git a/node/libs/schema/proto/roles/validator.proto b/node/libs/schema/proto/roles/validator.proto index fa7ce905..8d02e1a7 100644 --- a/node/libs/schema/proto/roles/validator.proto +++ b/node/libs/schema/proto/roles/validator.proto @@ -4,23 +4,31 @@ package roles.validator; import "std.proto"; -message BlockHash { - optional bytes sha256 = 1; +message PayloadHash { + optional bytes sha256 = 1; // required } -message Block { - optional BlockHash parent = 1; - optional uint64 number = 2; - optional bytes payload = 3; +message BlockHeaderHash { + optional bytes sha256 = 1; // required +} + +message BlockHeader { + // Hash of the parent Block. + optional BlockHeaderHash parent = 2; // required + // Sequential number of the block = parent.number + 1. + optional uint64 number = 3; // required + // Hash of the block payload. + optional PayloadHash payload = 4; // required } message FinalBlock { - optional Block block = 1; - optional CommitQC justification = 2; + optional BlockHeader header = 1; // required + optional bytes payload = 2; // required + optional CommitQC justification = 3; // required } message ConsensusMsg { - oneof t { + oneof t { // required ReplicaPrepare replica_prepare = 1; ReplicaCommit replica_commit = 2; LeaderPrepare leader_prepare = 3; @@ -29,43 +37,41 @@ message ConsensusMsg { } message ReplicaPrepare { - optional uint64 view = 1; - optional ReplicaCommit high_vote = 2; - optional CommitQC high_qc = 3; + optional uint32 protocol_version = 4; // required + optional uint64 view = 1; // required + optional ReplicaCommit high_vote = 2; // required + optional CommitQC high_qc = 3; // required } message ReplicaCommit { - optional uint64 view = 1; - optional BlockHash hash = 2; - optional uint64 number = 3; + optional uint32 protocol_version = 3; // required + optional uint64 view = 1; // required + optional BlockHeader proposal = 2; // required } message LeaderPrepare { - optional Proposal proposal = 1; - optional PrepareQC justification = 2; -} - -message Proposal { - oneof t { - Block new = 1; - ReplicaCommit retry = 2; - } + optional uint32 protocol_version = 5; // required + optional uint64 view = 1; // required + optional BlockHeader proposal = 2; // required + optional bytes proposal_payload = 3; // optional (depending on justification) + optional PrepareQC justification = 4; // required } message LeaderCommit { - optional CommitQC justification = 1; + optional uint32 protocol_version = 2; // required + optional CommitQC justification = 1; // required } message PrepareQC { - repeated ReplicaPrepare msgs = 1; - repeated bytes signers = 2; - optional AggregateSignature sig = 3; + repeated ReplicaPrepare msgs = 1; // required + repeated std.BitVector signers = 2; // required + optional AggregateSignature sig = 3; // required } message CommitQC { - optional ReplicaCommit msg = 1; - optional bytes signers = 2; - optional AggregateSignature sig = 3; + optional ReplicaCommit msg = 1; // required + optional std.BitVector signers = 2; // required + optional AggregateSignature sig = 3; // required } message Phase { @@ -82,7 +88,7 @@ message Phase { // network connection to this address. message NetAddress { // Address of the validator. - optional std.SocketAddr addr = 1; + optional std.SocketAddr addr = 1; // required // Version of the discovery announcement. // Newer (higher) version overrides the older version. // When a validator gets restarted it broadcasts @@ -98,7 +104,7 @@ message NetAddress { // broadcast a new discovery message), or (mutli)proxy support (a validator // may maintain a dynamic set of trusted proxy servers which forward traffic // to it - this way validator wouldn't have to have a public IP at all). - optional uint64 version = 2; + optional uint64 version = 2; // required // Time at which this message has been signed. // Mostly an informational field: // we cannot use it instead of version field, @@ -119,11 +125,11 @@ message NetAddress { // we assume here that it is unlikely for timestamps to collide. // To make this reasoning more strict, we should rather use a random "tie breaker" // instead (replace timestamp with a random nonce, or use a hash of the entire message). - optional std.Timestamp timestamp = 3; + optional std.Timestamp timestamp = 3; // required } message Msg { - oneof t { + oneof t { // required ConsensusMsg consensus = 1; bytes session_id = 2; NetAddress net_address = 3; @@ -131,23 +137,23 @@ message Msg { } message MsgHash { - optional bytes sha256 = 1; + optional bytes sha256 = 1; // required } message Signed { - optional Msg msg = 1; - optional PublicKey key = 2; - optional Signature sig = 3; + optional Msg msg = 1; // required + optional PublicKey key = 2; // required + optional Signature sig = 3; // required } message PublicKey { - optional bytes bls12381 = 1; + optional bytes bls12381 = 1; // required } message Signature { - optional bytes bls12381 = 1; + optional bytes bls12381 = 1; // required } message AggregateSignature { - optional bytes bls12381 = 1; + optional bytes bls12381 = 1; // required } diff --git a/node/libs/schema/proto/std.proto b/node/libs/schema/proto/std.proto index eef05336..f5ee5efe 100644 --- a/node/libs/schema/proto/std.proto +++ b/node/libs/schema/proto/std.proto @@ -32,3 +32,19 @@ message SocketAddr { // TCP port (actually uint16, however uint32 is smallest supported protobuf type). optional uint32 port = 2; } + +// Compressed representation of a vector of bits. +// It occupies n/8 + O(1) bytes. +// Note: this is not a highly optimized data structure: +// - since it is a proto message it requires its length to be stored. +// - it has 2 fields, which require storing their own tag +// - the `bytes_` field requires storing its length, which theoretically +// could be derived from `size` field value. +// If we eventually start caring about the size, we could encode BitVector +// directly as a field of type `bytes` with delimiter of the form "01...1". +message BitVector { + // Number of bits in the vector. + optional uint64 size = 1; + // Vector of bits encoded as bytes in big endian order. + optional bytes bytes_ = 2; +} diff --git a/node/libs/schema/proto/storage.proto b/node/libs/schema/proto/storage.proto index d97b4708..dbcfa8e2 100644 --- a/node/libs/schema/proto/storage.proto +++ b/node/libs/schema/proto/storage.proto @@ -4,12 +4,15 @@ package storage; import "roles/validator.proto"; -message Void {} +message Proposal { + optional uint64 number = 1; + optional bytes payload = 2; +} message ReplicaState { optional uint64 view = 1; optional roles.validator.Phase phase = 2; optional roles.validator.ReplicaCommit high_vote = 3; optional roles.validator.CommitQC high_qc = 4; - repeated roles.validator.Block blocks = 5; + repeated Proposal proposals = 5; } diff --git a/node/libs/schema/src/std_conv.rs b/node/libs/schema/src/std_conv.rs index 2a909d6d..2f2fa89b 100644 --- a/node/libs/schema/src/std_conv.rs +++ b/node/libs/schema/src/std_conv.rs @@ -80,3 +80,24 @@ impl ProtoFmt for time::Duration { } } } + +impl ProtoFmt for bit_vec::BitVec { + type Proto = proto::BitVector; + + fn read(r: &Self::Proto) -> anyhow::Result { + let size = *required(&r.size).context("size")? as usize; + let mut this = Self::from_bytes(required(&r.bytes).context("bytes_")?); + if this.len() < size { + anyhow::bail!("'vector' has less than 'size' bits"); + } + this.truncate(size); + Ok(this) + } + + fn build(&self) -> Self::Proto { + Self::Proto { + size: Some(self.len() as u64), + bytes: Some(self.to_bytes()), + } + } +} diff --git a/node/libs/storage/src/lib.rs b/node/libs/storage/src/lib.rs index cfcb4247..e40b7aa9 100644 --- a/node/libs/storage/src/lib.rs +++ b/node/libs/storage/src/lib.rs @@ -11,5 +11,5 @@ mod types; pub use crate::{ rocksdb::RocksdbStorage, traits::{BlockStore, ReplicaStateStore, WriteBlockStore}, - types::{ReplicaState, StorageError, StorageResult}, + types::{Proposal, ReplicaState, StorageError, StorageResult}, }; diff --git a/node/libs/storage/src/rocksdb.rs b/node/libs/storage/src/rocksdb.rs index cb62418f..d2f1d43d 100644 --- a/node/libs/storage/src/rocksdb.rs +++ b/node/libs/storage/src/rocksdb.rs @@ -63,10 +63,10 @@ impl RocksdbStorage { let this = Self { inner: RwLock::new(db), cached_last_contiguous_block_number: AtomicU64::new(0), - block_writes_sender: watch::channel(genesis_block.block.number).0, + block_writes_sender: watch::channel(genesis_block.header.number).0, }; - if let Some(stored_genesis_block) = this.block(ctx, genesis_block.block.number).await? { - if stored_genesis_block.block != genesis_block.block { + if let Some(stored_genesis_block) = this.block(ctx, genesis_block.header.number).await? { + if stored_genesis_block.header != genesis_block.header { let err = anyhow::anyhow!("Mismatch between stored and expected genesis block"); return Err(StorageError::Database(err)); } @@ -207,7 +207,7 @@ impl RocksdbStorage { /// Insert a new block into the database. fn put_block_blocking(&self, finalized_block: &FinalBlock) -> anyhow::Result<()> { let db = self.write(); - let block_number = finalized_block.block.number; + let block_number = finalized_block.header.number; tracing::debug!("Inserting new block #{block_number} into the database."); let mut write_batch = rocksdb::WriteBatch::default(); diff --git a/node/libs/storage/src/testonly.rs b/node/libs/storage/src/testonly.rs index 293dc76c..cd079d81 100644 --- a/node/libs/storage/src/testonly.rs +++ b/node/libs/storage/src/testonly.rs @@ -1,37 +1,25 @@ //! Test-only utilities. -use crate::types::ReplicaState; +use crate::types::{Proposal, ReplicaState}; use rand::{distributions::Standard, prelude::Distribution, Rng}; -use roles::validator; -use std::collections::{BTreeMap, HashMap}; -impl Distribution for Standard { - fn sample(&self, rng: &mut R) -> ReplicaState { - let n = rng.gen_range(1..11); - - let mut block_proposal_cache: BTreeMap<_, HashMap<_, _>> = BTreeMap::new(); - - for _ in 0..n { - let block: validator::Block = rng.gen(); - - match block_proposal_cache.get_mut(&block.number) { - Some(blocks) => { - blocks.insert(block.hash(), block.clone()); - } - None => { - let mut blocks = HashMap::new(); - blocks.insert(block.hash(), block.clone()); - block_proposal_cache.insert(block.number, blocks); - } - } +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> Proposal { + Proposal { + number: rng.gen(), + payload: rng.gen(), } + } +} +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> ReplicaState { ReplicaState { view: rng.gen(), phase: rng.gen(), high_vote: rng.gen(), high_qc: rng.gen(), - block_proposal_cache, + proposals: (0..rng.gen_range(1..11)).map(|_| rng.gen()).collect(), } } } diff --git a/node/libs/storage/src/tests.rs b/node/libs/storage/src/tests.rs index b9248331..fd104cb0 100644 --- a/node/libs/storage/src/tests.rs +++ b/node/libs/storage/src/tests.rs @@ -2,13 +2,15 @@ use super::*; use crate::types::ReplicaState; use concurrency::ctx; use rand::{seq::SliceRandom, Rng}; -use roles::validator::{Block, BlockNumber, FinalBlock}; +use roles::validator::{testonly::make_block, BlockHeader, BlockNumber, FinalBlock, Payload}; use std::iter; use tempfile::TempDir; async fn init_store(ctx: &ctx::Ctx, rng: &mut R) -> (FinalBlock, RocksdbStorage, TempDir) { + let payload = Payload(vec![]); let genesis_block = FinalBlock { - block: Block::genesis(vec![]), + header: BlockHeader::genesis(payload.hash()), + payload, justification: rng.gen(), }; let temp_dir = TempDir::new().unwrap(); @@ -24,14 +26,7 @@ async fn init_store_twice() { let rng = &mut ctx.rng(); let (genesis_block, block_store, temp_dir) = init_store(&ctx, rng).await; - let block_1 = FinalBlock { - block: Block { - parent: genesis_block.block.hash(), - number: genesis_block.block.number.next(), - payload: Vec::new(), - }, - justification: rng.gen(), - }; + let block_1 = make_block(rng, &genesis_block.header); block_store.put_block(&ctx, &block_1).await.unwrap(); assert_eq!(block_store.first_block(&ctx).await.unwrap(), genesis_block); @@ -59,47 +54,25 @@ async fn test_put_block() { assert_eq!(*block_subscriber.borrow_and_update(), BlockNumber(0)); // Test inserting a block with a valid parent. - let block_1 = FinalBlock { - block: Block { - parent: genesis_block.block.hash(), - number: genesis_block.block.number.next(), - payload: Vec::new(), - }, - justification: rng.gen(), - }; + let block_1 = make_block(rng, &genesis_block.header); block_store.put_block(&ctx, &block_1).await.unwrap(); assert_eq!(block_store.first_block(&ctx).await.unwrap(), genesis_block); assert_eq!(block_store.head_block(&ctx).await.unwrap(), block_1); - assert_eq!(*block_subscriber.borrow_and_update(), block_1.block.number); + assert_eq!(*block_subscriber.borrow_and_update(), block_1.header.number); // Test inserting a block with a valid parent that is not the genesis. - let block_2 = FinalBlock { - block: Block { - parent: block_1.block.hash(), - number: block_1.block.number.next(), - payload: Vec::new(), - }, - justification: rng.gen(), - }; + let block_2 = make_block(rng, &block_1.header); block_store.put_block(&ctx, &block_2).await.unwrap(); assert_eq!(block_store.first_block(&ctx).await.unwrap(), genesis_block); assert_eq!(block_store.head_block(&ctx).await.unwrap(), block_2); - assert_eq!(*block_subscriber.borrow_and_update(), block_2.block.number); + assert_eq!(*block_subscriber.borrow_and_update(), block_2.header.number); } fn gen_blocks(rng: &mut impl Rng, genesis_block: FinalBlock, count: usize) -> Vec { let blocks = iter::successors(Some(genesis_block), |parent| { - let block = Block { - parent: parent.block.hash(), - number: parent.block.number.next(), - payload: Vec::new(), - }; - Some(FinalBlock { - block, - justification: rng.gen(), - }) + Some(make_block(rng, &parent.header)) }); blocks.skip(1).take(count).collect() } @@ -132,7 +105,7 @@ async fn test_get_missing_block_numbers() { .unwrap(); let mut expected_block_numbers: Vec<_> = - blocks[(i + 1)..].iter().map(|b| b.block.number).collect(); + blocks[(i + 1)..].iter().map(|b| b.header.number).collect(); expected_block_numbers.sort_unstable(); assert_eq!(missing_block_numbers, expected_block_numbers); diff --git a/node/libs/storage/src/types.rs b/node/libs/storage/src/types.rs index de6dc99f..fd39b362 100644 --- a/node/libs/storage/src/types.rs +++ b/node/libs/storage/src/types.rs @@ -4,11 +4,8 @@ use anyhow::Context as _; use concurrency::ctx; use rocksdb::{Direction, IteratorMode}; use roles::validator::{self, BlockNumber}; -use schema::{proto::storage as proto, read_required, ProtoFmt}; -use std::{ - collections::{BTreeMap, HashMap}, - iter, ops, -}; +use schema::{proto::storage as proto, read_required, required, ProtoFmt}; +use std::{iter, ops}; use thiserror::Error; /// Enum used to represent a key in the database. It also acts as a separator between different stores. @@ -56,6 +53,19 @@ impl DatabaseKey { } } +/// A payload of a proposed block which is not known to be finalized yet. +/// Replicas have to persist such proposed payloads for liveness: +/// consensus may finalize a block without knowing a payload in case of reproposals. +/// Currently we do not store the BlockHeader, because it is always +/// available in the LeaderPrepare message. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct Proposal { + /// Number of a block for which this payload has been proposed. + pub number: BlockNumber, + /// Proposed payload. + pub payload: validator::Payload, +} + /// The struct that contains the replica state to be persisted. #[derive(Clone, Debug, PartialEq, Eq)] pub struct ReplicaState { @@ -68,55 +78,52 @@ pub struct ReplicaState { /// The highest commit quorum certificate known to the replica. pub high_qc: validator::CommitQC, /// A cache of the received block proposals. - pub block_proposal_cache: - BTreeMap>, + pub proposals: Vec, } -impl ProtoFmt for ReplicaState { - type Proto = proto::ReplicaState; +impl ProtoFmt for Proposal { + type Proto = proto::Proposal; fn read(r: &Self::Proto) -> anyhow::Result { - let mut map: BTreeMap<_, HashMap<_, _>> = BTreeMap::new(); - - for schema_block in r.blocks.iter() { - let block: validator::Block = - read_required(&Some(schema_block).cloned()).context("block")?; - - match map.get_mut(&block.number) { - Some(blocks) => { - blocks.insert(block.hash(), block.clone()); - } - None => { - let mut blocks = HashMap::new(); - blocks.insert(block.hash(), block.clone()); - map.insert(block.number, blocks); - } - } + Ok(Self { + number: BlockNumber(*required(&r.number).context("number")?), + payload: validator::Payload(required(&r.payload).context("payload")?.clone()), + }) + } + + fn build(&self) -> Self::Proto { + Self::Proto { + number: Some(self.number.0), + payload: Some(self.payload.0.clone()), } + } +} +impl ProtoFmt for ReplicaState { + type Proto = proto::ReplicaState; + + fn read(r: &Self::Proto) -> anyhow::Result { Ok(Self { view: validator::ViewNumber(r.view.context("view_number")?), phase: read_required(&r.phase).context("phase")?, high_vote: read_required(&r.high_vote).context("high_vote")?, high_qc: read_required(&r.high_qc).context("high_qc")?, - block_proposal_cache: map, + proposals: r + .proposals + .iter() + .map(ProtoFmt::read) + .collect::>() + .context("proposals")?, }) } fn build(&self) -> Self::Proto { - let blocks = self - .block_proposal_cache - .values() - .flat_map(|x| x.values()) - .map(|block| block.build()) - .collect(); - Self::Proto { view: Some(self.view.0), phase: Some(self.phase.build()), high_vote: Some(self.high_vote.build()), high_qc: Some(self.high_qc.build()), - blocks, + proposals: self.proposals.iter().map(|p| p.build()).collect(), } } } diff --git a/node/tools/src/bin/localnet_config.rs b/node/tools/src/bin/localnet_config.rs index 3295d2c7..cc461fbf 100644 --- a/node/tools/src/bin/localnet_config.rs +++ b/node/tools/src/bin/localnet_config.rs @@ -60,7 +60,8 @@ fn main() -> anyhow::Result<()> { // Generate the genesis block. // TODO: generating genesis block shouldn't require knowing the private keys. - let (genesis, validator_set) = testonly::make_genesis(&validator_keys, vec![]); + let (genesis, validator_set) = + testonly::make_genesis(&validator_keys, validator::Payload(vec![])); // Each node will have `gossip_peers` outbound peers. let nodes = addrs.len();