From 015b3641e00da0d159c7276eca87066672c9d61c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruno=20Fran=C3=A7a?= Date: Tue, 22 Oct 2024 01:31:38 +0100 Subject: [PATCH] First pass on the bft actor. --- node/actors/bft/src/lib.rs | 7 +- node/actors/bft/src/replica/block.rs | 4 +- node/actors/bft/src/replica/mod.rs | 3 +- node/actors/bft/src/replica/new_view.rs | 4 +- .../{leader_prepare.rs => proposal.rs} | 180 +++++++++++------- node/actors/bft/src/replica/state_machine.rs | 19 +- node/actors/bft/src/replica/tests.rs | 30 +-- node/actors/bft/src/replica/timer.rs | 35 ---- node/actors/bft/src/testonly/ut_harness.rs | 18 +- 9 files changed, 155 insertions(+), 145 deletions(-) rename node/actors/bft/src/replica/{leader_prepare.rs => proposal.rs} (52%) delete mode 100644 node/actors/bft/src/replica/timer.rs diff --git a/node/actors/bft/src/lib.rs b/node/actors/bft/src/lib.rs index 89061564..c4bb716f 100644 --- a/node/actors/bft/src/lib.rs +++ b/node/actors/bft/src/lib.rs @@ -70,8 +70,9 @@ impl Config { anyhow::ensure!(genesis.protocol_version == validator::ProtocolVersion::CURRENT); genesis.verify().context("genesis().verify()")?; + // TODO: What about pruning??? if let Some(prev) = genesis.first_block.prev() { - tracing::info!("Waiting for the pre-genesis blocks to be persisted"); + tracing::info!("Waiting for the pre-fork blocks to be persisted"); if let Err(ctx::Canceled) = self.block_store.wait_until_persisted(ctx, prev).await { return Ok(()); } @@ -95,8 +96,8 @@ impl Config { tracing::info!("Starting consensus actor {:?}", cfg.secret_key.public()); - // This is the infinite loop where the consensus actually runs. The validator waits for either - // a message from the network or for a timeout, and processes each accordingly. + // This is the infinite loop where the consensus actually runs. The validator waits for + // a message from the network and processes it accordingly. loop { async { let InputMessage::Network(req) = pipe diff --git a/node/actors/bft/src/replica/block.rs b/node/actors/bft/src/replica/block.rs index 2d6dc0a8..eed18837 100644 --- a/node/actors/bft/src/replica/block.rs +++ b/node/actors/bft/src/replica/block.rs @@ -16,12 +16,12 @@ impl StateMachine { ) -> ctx::Result<()> { // Update high_qc. if self - .high_qc + .high_commit_qc .as_ref() .map(|qc| qc.view().number < commit_qc.view().number) .unwrap_or(true) { - self.high_qc = Some(commit_qc.clone()); + self.high_commit_qc = Some(commit_qc.clone()); } // TODO(gprusak): for availability of finalized blocks, // replicas should be able to broadcast highest quorums without diff --git a/node/actors/bft/src/replica/mod.rs b/node/actors/bft/src/replica/mod.rs index 640f044b..43b07b99 100644 --- a/node/actors/bft/src/replica/mod.rs +++ b/node/actors/bft/src/replica/mod.rs @@ -4,12 +4,11 @@ mod block; pub(crate) mod leader_commit; -pub(crate) mod leader_prepare; mod new_view; +pub(crate) mod proposal; pub(crate) mod replica_prepare; mod state_machine; #[cfg(test)] mod tests; -mod timer; pub(crate) use self::state_machine::StateMachine; diff --git a/node/actors/bft/src/replica/new_view.rs b/node/actors/bft/src/replica/new_view.rs index 16403136..610ad284 100644 --- a/node/actors/bft/src/replica/new_view.rs +++ b/node/actors/bft/src/replica/new_view.rs @@ -13,7 +13,7 @@ impl StateMachine { metrics::METRICS.replica_view_number.set(self.view.0); self.phase = validator::Phase::Prepare; - if let Some(qc) = self.high_qc.as_ref() { + if let Some(qc) = self.high_commit_qc.as_ref() { // Clear the block cache. self.block_proposal_cache .retain(|k, _| k > &qc.header().number); @@ -34,7 +34,7 @@ impl StateMachine { number: self.view, }, high_vote: self.high_vote.clone(), - high_qc: self.high_qc.clone(), + high_qc: self.high_commit_qc.clone(), }, )), recipient: Target::Broadcast, diff --git a/node/actors/bft/src/replica/leader_prepare.rs b/node/actors/bft/src/replica/proposal.rs similarity index 52% rename from node/actors/bft/src/replica/leader_prepare.rs rename to node/actors/bft/src/replica/proposal.rs index 55e43cf0..001ee891 100644 --- a/node/actors/bft/src/replica/leader_prepare.rs +++ b/node/actors/bft/src/replica/proposal.rs @@ -2,21 +2,11 @@ use super::StateMachine; use zksync_concurrency::{ctx, error::Wrap}; use zksync_consensus_network::io::{ConsensusInputMessage, Target}; -use zksync_consensus_roles::validator::{self, BlockNumber}; +use zksync_consensus_roles::validator::{self, BlockHeader, BlockNumber}; -/// Errors that can occur when processing a "leader prepare" message. +/// Errors that can occur when processing a LeaderProposal message. #[derive(Debug, thiserror::Error)] pub(crate) enum Error { - /// Invalid leader. - #[error( - "invalid leader (correct leader: {correct_leader:?}, received leader: {received_leader:?})" - )] - InvalidLeader { - /// Correct leader. - correct_leader: validator::PublicKey, - /// Received leader. - received_leader: validator::PublicKey, - }, /// Message for a past view or phase. #[error( "message for a past view / phase (current view: {current_view:?}, current phase: {current_phase:?})" @@ -27,15 +17,26 @@ pub(crate) enum Error { /// Current phase. current_phase: validator::Phase, }, + /// Invalid leader. + #[error( + "invalid leader (correct leader: {correct_leader:?}, received leader: {received_leader:?})" + )] + InvalidLeader { + /// Correct leader. + correct_leader: validator::PublicKey, + /// Received leader. + received_leader: validator::PublicKey, + }, + /// Leader proposed a block that was already pruned from replica's storage. + #[error("leader proposed a block that was already pruned from replica's storage")] + ProposalAlreadyPruned, /// Invalid message signature. #[error("invalid signature: {0:#}")] InvalidSignature(#[source] anyhow::Error), /// Invalid message. #[error("invalid message: {0:#}")] InvalidMessage(#[source] validator::LeaderPrepareVerifyError), - /// Leader proposed a block that was already pruned from replica's storage. - #[error("leader proposed a block that was already pruned from replica's storage")] - ProposalAlreadyPruned, + /// Oversized payload. #[error("block proposal with an oversized payload (payload size: {payload_size})")] ProposalOversizedPayload { @@ -44,7 +45,7 @@ pub(crate) enum Error { }, /// Invalid payload. #[error("invalid payload: {0:#}")] - ProposalInvalidPayload(#[source] anyhow::Error), + InvalidPayload(#[source] anyhow::Error), /// Previous payload missing. #[error("previous block proposal payload missing from store (block number: {prev_number})")] MissingPreviousPayload { @@ -69,11 +70,11 @@ impl Wrap for Error { } impl StateMachine { - /// Processes a leader prepare message. - pub(crate) async fn process_leader_prepare( + /// Processes a LeaderProposal message. + pub(crate) async fn on_proposal( &mut self, ctx: &ctx::Ctx, - signed_message: validator::Signed, + signed_message: validator::Signed, ) -> Result<(), Error> { // ----------- Checking origin of the message -------------- @@ -82,6 +83,15 @@ impl StateMachine { let author = &signed_message.key; let view = message.view().number; + // Check that the message is for the current view or a future view. We only allow proposals for + // the current view if we have not voted or timed out yet. + if view < self.view || (view == self.view && self.phase != validator::Phase::Prepare) { + return Err(Error::Old { + current_view: self.view, + current_phase: self.phase, + }); + } + // Check that it comes from the correct leader. let leader = self.config.genesis().view_leader(view); if author != &leader { @@ -91,96 +101,118 @@ impl StateMachine { }); } - // If the message is from the "past", we discard it. - if (view, validator::Phase::Prepare) < (self.view, self.phase) { - return Err(Error::Old { - current_view: self.view, - current_phase: self.phase, - }); - } - - // Replica MUSTN'T vote for blocks which have been already pruned for storage. - // (because it won't be able to persist and broadcast them once finalized). - // TODO(gprusak): it should never happen, we should add safety checks to prevent - // pruning blocks not known to be finalized. - if message.proposal.number < self.config.block_store.queued().first { - return Err(Error::ProposalAlreadyPruned); - } - // ----------- Checking the message -------------- signed_message.verify().map_err(Error::InvalidSignature)?; + message .verify(self.config.genesis()) .map_err(Error::InvalidMessage)?; - let high_qc = message.justification.high_qc(); + let (implied_block_number, implied_block_hash) = + message.justification.get_implied_block(self.genesis()); - if let Some(high_qc) = high_qc { - // Try to create a finalized block with this CommitQC and our block proposal cache. - // This gives us another chance to finalize a block that we may have missed before. - self.save_block(ctx, high_qc).await.wrap("save_block()")?; + // Replica MUSTN'T vote for blocks which have been already pruned for storage. + // (because it won't be able to persist and broadcast them once finalized). + // TODO(gprusak): it should never happen, we should add safety checks to prevent + // pruning blocks not known to be finalized. + if implied_block_number < self.config.block_store.queued().first { + return Err(Error::ProposalAlreadyPruned); } - // Check that the payload doesn't exceed the maximum size. - if let Some(payload) = &message.proposal_payload { - if payload.0.len() > self.config.max_payload_size { - return Err(Error::ProposalOversizedPayload { - payload_size: payload.0.len(), - }); - } + let block_hash = match implied_block_hash { + // This is a reproposal. We let the leader repropose blocks without sending + // them in the proposal (it sends only the number + hash). That allows a + // leader to repropose a block without having it stored. + // It is an optimization that allows us to not wait for a leader that has + // the previous proposal stored (which can take 4f views), and to somewhat + // speed up reproposals by skipping block broadcast. + // This only saves time because we have a gossip network running in parallel, + // and any time a replica is able to create a finalized block (by possessing + // both the block and the commit QC) it broadcasts the finalized block (this + // was meant to propagate the block to full nodes, but of course validators + // will end up receiving it as well). + Some(hash) => hash, + // This is a new proposal, so we need to verify it (i.e. execute it). + None => { + // Check that the payload is present. + let Some(payload) = message.proposal_payload else { + return Err(Error::MissingPayload); + }; + + if payload.len() > self.config.max_payload_size { + return Err(Error::ProposalOversizedPayload { + payload_size: payload.len(), + }); + } - if let Some(prev) = message.proposal.number.prev() { // Defensively assume that PayloadManager cannot verify proposal until the previous block is stored. - self.config - .block_store - .wait_until_persisted(&ctx.with_deadline(self.timeout_deadline), prev) + // Note that it doesn't mean that the block is actually available, as old blocks might get pruned or + // we might just have started from a snapshot state. It just means that we have the state of the chain + // up to the previous block. + if let Some(prev) = implied_block_number.prev() { + self.config + .block_store + .wait_until_persisted(&ctx.with_deadline(self.timeout_deadline), prev) + .await + .map_err(|_| Error::MissingPreviousPayload { prev_number: prev })?; + } + + // Execute the payload. + if let Err(err) = self + .config + .payload_manager + .verify(ctx, message.proposal.number, &payload) .await - .map_err(|_| Error::MissingPreviousPayload { prev_number: prev })?; + { + return Err(match err { + ctx::Error::Internal(err) => Error::InvalidPayload(err), + err @ ctx::Error::Canceled(_) => Error::Internal(err), + }); + } + + // The proposal is valid. We cache it, waiting for it to be committed. + self.block_proposal_cache + .entry(implied_block_number) + .or_default() + .insert(payload.hash(), payload.clone()); } - if let Err(err) = self - .config - .payload_manager - .verify(ctx, message.proposal.number, payload) - .await - { - return Err(match err { - err @ ctx::Error::Canceled(_) => Error::Internal(err), - ctx::Error::Internal(err) => Error::ProposalInvalidPayload(err), - }); - } - } + }; // ----------- All checks finished. Now we process the message. -------------- // Create our commit vote. let commit_vote = validator::ReplicaCommit { view: message.view().clone(), - proposal: message.proposal, + proposal: BlockHeader { + number: implied_block_number, + payload: block_hash, + }, }; // Update the state machine. self.view = message.view().number; self.phase = validator::Phase::Commit; self.high_vote = Some(commit_vote.clone()); - // If we received a new block proposal, store it in our cache. - if let Some(payload) = &message.proposal_payload { - self.block_proposal_cache - .entry(message.proposal.number) - .or_default() - .insert(payload.hash(), payload.clone()); - } + match message.justification { + validator::ProposalJustification::Commit(qc) => self.process_commit_qc(qc), + validator::ProposalJustification::Timeout(qc) => { + if let Some(high_qc) = qc.high_qc() { + self.process_commit_qc(high_qc); + } + self.high_timeout_qc = Some(qc); + } + }; // Backup our state. self.backup_state(ctx).await.wrap("backup_state()")?; - // Send the replica message to the leader. + // Broadcast our message. let output_message = ConsensusInputMessage { message: self .config .secret_key .sign_msg(validator::ConsensusMsg::ReplicaCommit(commit_vote)), - recipient: Target::Validator(author.clone()), }; self.outbound_pipe.send(output_message.into()); diff --git a/node/actors/bft/src/replica/state_machine.rs b/node/actors/bft/src/replica/state_machine.rs index 238cc0e3..1526b23f 100644 --- a/node/actors/bft/src/replica/state_machine.rs +++ b/node/actors/bft/src/replica/state_machine.rs @@ -31,7 +31,9 @@ pub(crate) struct StateMachine { /// The highest block proposal that the replica has committed to. pub(crate) high_vote: Option, /// The highest commit quorum certificate known to the replica. - pub(crate) high_qc: Option, + pub(crate) high_commit_qc: Option, + /// The highest timeout quorum certificate known to the replica. + pub(crate) high_timeout_qc: Option, /// A cache of the received block proposals. pub(crate) block_proposal_cache: BTreeMap>, @@ -40,6 +42,9 @@ pub(crate) struct StateMachine { } impl StateMachine { + /// The duration of the timeout. + pub(crate) const TIMEOUT_DURATION: time::Duration = time::Duration::milliseconds(2000); + /// Creates a new [`StateMachine`] instance, attempting to recover a past state from the storage module, /// otherwise initializes the state machine with the current head block. /// @@ -52,6 +57,7 @@ impl StateMachine { outbound_pipe: OutputSender, ) -> ctx::Result<(Self, sync::prunable_mpsc::Sender)> { let backup = config.replica_store.state(ctx).await?; + let mut block_proposal_cache: BTreeMap<_, HashMap<_, _>> = BTreeMap::new(); for proposal in backup.proposals { block_proposal_cache @@ -72,7 +78,8 @@ impl StateMachine { view: backup.view, phase: backup.phase, high_vote: backup.high_vote, - high_qc: backup.high_qc, + high_commit_qc: backup.high_commit_qc, + high_timeout_qc: backup.high_timeout_qc, block_proposal_cache, timeout_deadline: time::Deadline::Infinite, }; @@ -135,20 +142,20 @@ impl StateMachine { } ConsensusMsg::LeaderPrepare(_) => { let res = match self - .process_leader_prepare(ctx, req.msg.cast().unwrap()) + .on_proposal(ctx, req.msg.cast().unwrap()) .await .wrap("process_leader_prepare()") { Ok(()) => Ok(()), Err(err) => { match err { - super::leader_prepare::Error::Internal(e) => { + super::proposal::Error::Internal(e) => { tracing::error!( "process_leader_prepare: internal error: {e:#}" ); return Err(e); } - super::leader_prepare::Error::Old { .. } => { + super::proposal::Error::Old { .. } => { tracing::info!("process_leader_prepare: {err:#}"); } _ => { @@ -208,7 +215,7 @@ impl StateMachine { view: self.view, phase: self.phase, high_vote: self.high_vote.clone(), - high_qc: self.high_qc.clone(), + high_qc: self.high_commit_qc.clone(), proposals, }; self.config diff --git a/node/actors/bft/src/replica/tests.rs b/node/actors/bft/src/replica/tests.rs index afb2a8aa..36e7d084 100644 --- a/node/actors/bft/src/replica/tests.rs +++ b/node/actors/bft/src/replica/tests.rs @@ -1,4 +1,4 @@ -use super::{leader_commit, leader_prepare}; +use super::{leader_commit, proposal}; use crate::{ testonly, testonly::ut_harness::{UTHarness, MAX_PAYLOAD_SIZE}, @@ -58,7 +58,7 @@ async fn leader_prepare_bad_chain() { .await; assert_matches!( res, - Err(leader_prepare::Error::InvalidMessage( + Err(proposal::Error::InvalidMessage( validator::LeaderPrepareVerifyError::Justification( validator::PrepareQCVerifyError::View(_) ) @@ -129,7 +129,7 @@ async fn leader_prepare_invalid_leader() { .await; assert_matches!( res, - Err(leader_prepare::Error::InvalidLeader { correct_leader, received_leader }) => { + Err(proposal::Error::InvalidLeader { correct_leader, received_leader }) => { assert_eq!(correct_leader, util.keys[1].public()); assert_eq!(received_leader, util.keys[0].public()); } @@ -155,7 +155,7 @@ async fn leader_prepare_old_view() { .await; assert_matches!( res, - Err(leader_prepare::Error::Old { current_view, current_phase }) => { + Err(proposal::Error::Old { current_view, current_phase }) => { assert_eq!(current_view, util.replica.view); assert_eq!(current_phase, util.replica.phase); } @@ -187,7 +187,7 @@ async fn leader_prepare_pruned_block() { let res = util .process_leader_prepare(ctx, util.sign(leader_prepare)) .await; - assert_matches!(res, Err(leader_prepare::Error::ProposalAlreadyPruned)); + assert_matches!(res, Err(proposal::Error::ProposalAlreadyPruned)); Ok(()) }) .await @@ -231,7 +231,7 @@ async fn leader_prepare_invalid_payload() { let res = util .process_leader_prepare(ctx, util.sign(leader_prepare)) .await; - assert_matches!(res, Err(leader_prepare::Error::ProposalInvalidPayload(..))); + assert_matches!(res, Err(proposal::Error::InvalidPayload(..))); Ok(()) }) .await @@ -249,7 +249,7 @@ async fn leader_prepare_invalid_sig() { let mut leader_prepare = util.sign(leader_prepare); leader_prepare.sig = ctx.rng().gen(); let res = util.process_leader_prepare(ctx, leader_prepare).await; - assert_matches!(res, Err(leader_prepare::Error::InvalidSignature(..))); + assert_matches!(res, Err(proposal::Error::InvalidSignature(..))); Ok(()) }) .await @@ -271,7 +271,7 @@ async fn leader_prepare_invalid_prepare_qc() { .await; assert_matches!( res, - Err(leader_prepare::Error::InvalidMessage( + Err(proposal::Error::InvalidMessage( validator::LeaderPrepareVerifyError::Justification(_) )) ); @@ -299,7 +299,7 @@ async fn leader_prepare_proposal_oversized_payload() { .await; assert_matches!( res, - Err(leader_prepare::Error::ProposalOversizedPayload{ payload_size }) => { + Err(proposal::Error::ProposalOversizedPayload{ payload_size }) => { assert_eq!(payload_size, payload_oversize); } ); @@ -324,7 +324,7 @@ async fn leader_prepare_proposal_mismatched_payload() { .await; assert_matches!( res, - Err(leader_prepare::Error::InvalidMessage( + Err(proposal::Error::InvalidMessage( validator::LeaderPrepareVerifyError::ProposalMismatchedPayload )) ); @@ -357,7 +357,7 @@ async fn leader_prepare_proposal_when_previous_not_finalized() { .await; assert_matches!( res, - Err(leader_prepare::Error::InvalidMessage( + Err(proposal::Error::InvalidMessage( validator::LeaderPrepareVerifyError::ProposalWhenPreviousNotFinalized )) ); @@ -383,7 +383,7 @@ async fn leader_prepare_bad_block_number() { tracing::info!("Modify the proposal.number so that it doesn't match the previous block"); leader_prepare.proposal.number = rng.gen(); let res = util.process_leader_prepare(ctx, util.sign(leader_prepare.clone())).await; - assert_matches!(res, Err(leader_prepare::Error::InvalidMessage( + assert_matches!(res, Err(proposal::Error::InvalidMessage( validator::LeaderPrepareVerifyError::BadBlockNumber { got, want } )) => { assert_eq!(want, leader_prepare.justification.high_qc().unwrap().message.proposal.number.next()); @@ -426,7 +426,7 @@ async fn leader_prepare_reproposal_without_quorum() { .await; assert_matches!( res, - Err(leader_prepare::Error::InvalidMessage( + Err(proposal::Error::InvalidMessage( validator::LeaderPrepareVerifyError::ReproposalWithoutQuorum )) ); @@ -462,7 +462,7 @@ async fn leader_prepare_reproposal_when_finalized() { .await; assert_matches!( res, - Err(leader_prepare::Error::InvalidMessage( + Err(proposal::Error::InvalidMessage( validator::LeaderPrepareVerifyError::ReproposalWhenFinalized )) ); @@ -492,7 +492,7 @@ async fn leader_prepare_reproposal_invalid_block() { .await; assert_matches!( res, - Err(leader_prepare::Error::InvalidMessage( + Err(proposal::Error::InvalidMessage( validator::LeaderPrepareVerifyError::ReproposalBadBlock )) ); diff --git a/node/actors/bft/src/replica/timer.rs b/node/actors/bft/src/replica/timer.rs deleted file mode 100644 index 75570d2d..00000000 --- a/node/actors/bft/src/replica/timer.rs +++ /dev/null @@ -1,35 +0,0 @@ -use super::StateMachine; -use crate::metrics; -use zksync_concurrency::{ctx, metrics::LatencyGaugeExt as _, time}; -use zksync_consensus_roles::validator; - -impl StateMachine { - /// The base duration of the timeout. - pub(crate) const BASE_DURATION: time::Duration = time::Duration::milliseconds(2000); - /// Max duration of the timeout. - /// Consensus is unusable with this range of timeout anyway, - /// however to make debugging easier we bound it to a specific value. - pub(crate) const MAX_DURATION: time::Duration = time::Duration::seconds(1000000); - - /// Resets the timer. On every timeout we double the duration, starting from a given base duration. - /// This is a simple exponential backoff. - pub(crate) fn reset_timer(&mut self, ctx: &ctx::Ctx) { - let final_view = match self.high_qc.as_ref() { - Some(qc) => qc.view().number.next(), - None => validator::ViewNumber(0), - }; - let f = self - .view - .0 - .saturating_sub(final_view.0) - .try_into() - .unwrap_or(u32::MAX); - let f = 2u64.saturating_pow(f).try_into().unwrap_or(i32::MAX); - let timeout = Self::BASE_DURATION - .saturating_mul(f) - .min(Self::MAX_DURATION); - - metrics::METRICS.replica_view_timeout.set_latency(timeout); - self.timeout_deadline = time::Deadline::Finite(ctx.now() + timeout); - } -} diff --git a/node/actors/bft/src/testonly/ut_harness.rs b/node/actors/bft/src/testonly/ut_harness.rs index 406ea4df..2d3bb834 100644 --- a/node/actors/bft/src/testonly/ut_harness.rs +++ b/node/actors/bft/src/testonly/ut_harness.rs @@ -3,7 +3,7 @@ use crate::{ leader, leader::{replica_commit, replica_prepare}, replica, - replica::{leader_commit, leader_prepare}, + replica::{leader_commit, proposal}, testonly, Config, PayloadManager, }; use assert_matches::assert_matches; @@ -96,7 +96,7 @@ impl UTHarness { genesis: self.genesis().hash(), number: self.replica.view.next(), }, - high_qc: self.replica.high_qc.clone(), + high_qc: self.replica.high_commit_qc.clone(), high_vote: self.replica.high_vote.clone(), }; let replica_prepare = self.process_replica_timeout(ctx).await; @@ -140,14 +140,20 @@ impl UTHarness { validator::ReplicaPrepare { view: self.replica_view(), high_vote: self.replica.high_vote.clone(), - high_qc: self.replica.high_qc.clone(), + high_qc: self.replica.high_commit_qc.clone(), } } pub(crate) fn new_current_replica_commit(&self) -> validator::ReplicaCommit { validator::ReplicaCommit { view: self.replica_view(), - proposal: self.replica.high_qc.as_ref().unwrap().message.proposal, + proposal: self + .replica + .high_commit_qc + .as_ref() + .unwrap() + .message + .proposal, } } @@ -173,8 +179,8 @@ impl UTHarness { &mut self, ctx: &ctx::Ctx, msg: validator::Signed, - ) -> Result, leader_prepare::Error> { - self.replica.process_leader_prepare(ctx, msg).await?; + ) -> Result, proposal::Error> { + self.replica.on_proposal(ctx, msg).await?; Ok(self.try_recv().unwrap()) }