From 43b75429d8a87a5f7618892ca4a07c2852584482 Mon Sep 17 00:00:00 2001 From: Adrian Catangiu Date: Thu, 22 Feb 2024 13:44:41 +0200 Subject: [PATCH] sc-consensus-beefy: pump gossip engine while waiting for initialization conditions (#3435) As part of BEEFY worker/voter initialization the task waits for certain chain and backend conditions to be fulfilled: - BEEFY consensus enabled on-chain & GRANDPA best finalized higher than on-chain BEEFY genesis block, - backend has synced headers for BEEFY mandatory blocks between best BEEFY and best GRANDPA. During this waiting time, any messages gossiped on the BEEFY topic for current chain get enqueued in the gossip engine, leading to RAM bloating and output warning/error messages when the wait time is non-negligible (like during a clean sync). This PR adds logic to pump the gossip engine while waiting for other things to make sure gossiped messages get consumed (practically discarded until worker is fully initialized). Also raises the warning threshold for enqueued messages from 10k to 100k. This is in line with the other gossip protocols on the node. Fixes https://github.com/paritytech/polkadot-sdk/issues/3390 --------- Signed-off-by: Adrian Catangiu --- prdoc/pr_3435.prdoc | 16 + .../beefy/src/communication/gossip.rs | 35 +- .../consensus/beefy/src/communication/mod.rs | 2 - .../client/consensus/beefy/src/import.rs | 2 +- substrate/client/consensus/beefy/src/lib.rs | 414 +++++++++++++---- substrate/client/consensus/beefy/src/tests.rs | 116 +---- .../client/consensus/beefy/src/worker.rs | 419 +++++------------- .../consensus/beefy/src/commitment.rs | 21 + 8 files changed, 520 insertions(+), 505 deletions(-) create mode 100644 prdoc/pr_3435.prdoc diff --git a/prdoc/pr_3435.prdoc b/prdoc/pr_3435.prdoc new file mode 100644 index 000000000000..7d7896bff7d4 --- /dev/null +++ b/prdoc/pr_3435.prdoc @@ -0,0 +1,16 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Fix BEEFY-related gossip messages error logs + +doc: + - audience: Node Operator + description: | + Added logic to pump the gossip engine while waiting for other things + to make sure gossiped messages get consumed (practically discarded + until worker is fully initialized). + This fixes an issue where node operators saw error logs, and fixes + potential RAM bloat when BEEFY initialization takes a long time + (i.e. during clean sync). +crates: + - name: sc-consensus-beefy diff --git a/substrate/client/consensus/beefy/src/communication/gossip.rs b/substrate/client/consensus/beefy/src/communication/gossip.rs index 8a0b0a74308f..eb43c9173d75 100644 --- a/substrate/client/consensus/beefy/src/communication/gossip.rs +++ b/substrate/client/consensus/beefy/src/communication/gossip.rs @@ -56,6 +56,8 @@ pub(super) enum Action { Keep(H, ReputationChange), // discard, applying cost/benefit to originator. Discard(ReputationChange), + // ignore, no cost/benefit applied to originator. + DiscardNoReport, } /// An outcome of examining a message. @@ -68,7 +70,7 @@ enum Consider { /// Message is from the future. Reject. RejectFuture, /// Message cannot be evaluated. Reject. - RejectOutOfScope, + CannotEvaluate, } /// BEEFY gossip message type that gets encoded and sent on the network. @@ -168,18 +170,14 @@ impl Filter { .as_ref() .map(|f| // only from current set and only [filter.start, filter.end] - if set_id < f.validator_set.id() { + if set_id < f.validator_set.id() || round < f.start { Consider::RejectPast - } else if set_id > f.validator_set.id() { - Consider::RejectFuture - } else if round < f.start { - Consider::RejectPast - } else if round > f.end { + } else if set_id > f.validator_set.id() || round > f.end { Consider::RejectFuture } else { Consider::Accept }) - .unwrap_or(Consider::RejectOutOfScope) + .unwrap_or(Consider::CannotEvaluate) } /// Return true if `round` is >= than `max(session_start, best_beefy)`, @@ -199,7 +197,7 @@ impl Filter { Consider::Accept } ) - .unwrap_or(Consider::RejectOutOfScope) + .unwrap_or(Consider::CannotEvaluate) } /// Add new _known_ `round` to the set of seen valid justifications. @@ -244,7 +242,7 @@ where pub(crate) fn new( known_peers: Arc>>, ) -> (GossipValidator, TracingUnboundedReceiver) { - let (tx, rx) = tracing_unbounded("mpsc_beefy_gossip_validator", 10_000); + let (tx, rx) = tracing_unbounded("mpsc_beefy_gossip_validator", 100_000); let val = GossipValidator { votes_topic: votes_topic::(), justifs_topic: proofs_topic::(), @@ -289,7 +287,9 @@ where match filter.consider_vote(round, set_id) { Consider::RejectPast => return Action::Discard(cost::OUTDATED_MESSAGE), Consider::RejectFuture => return Action::Discard(cost::FUTURE_MESSAGE), - Consider::RejectOutOfScope => return Action::Discard(cost::OUT_OF_SCOPE_MESSAGE), + // When we can't evaluate, it's our fault (e.g. filter not initialized yet), we + // discard the vote without punishing or rewarding the sending peer. + Consider::CannotEvaluate => return Action::DiscardNoReport, Consider::Accept => {}, } @@ -330,7 +330,9 @@ where match guard.consider_finality_proof(round, set_id) { Consider::RejectPast => return Action::Discard(cost::OUTDATED_MESSAGE), Consider::RejectFuture => return Action::Discard(cost::FUTURE_MESSAGE), - Consider::RejectOutOfScope => return Action::Discard(cost::OUT_OF_SCOPE_MESSAGE), + // When we can't evaluate, it's our fault (e.g. filter not initialized yet), we + // discard the proof without punishing or rewarding the sending peer. + Consider::CannotEvaluate => return Action::DiscardNoReport, Consider::Accept => {}, } @@ -357,7 +359,9 @@ where Action::Keep(self.justifs_topic, benefit::VALIDATED_PROOF) } }) - .unwrap_or(Action::Discard(cost::OUT_OF_SCOPE_MESSAGE)) + // When we can't evaluate, it's our fault (e.g. filter not initialized yet), we + // discard the proof without punishing or rewarding the sending peer. + .unwrap_or(Action::DiscardNoReport) }; if matches!(action, Action::Keep(_, _)) { self.gossip_filter.write().mark_round_as_proven(round); @@ -404,6 +408,7 @@ where self.report(*sender, cb); ValidationResult::Discard }, + Action::DiscardNoReport => ValidationResult::Discard, } } @@ -579,8 +584,8 @@ pub(crate) mod tests { // filter not initialized let res = gv.validate(&mut context, &sender, &encoded); assert!(matches!(res, ValidationResult::Discard)); - expected_report.cost_benefit = cost::OUT_OF_SCOPE_MESSAGE; - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + // nothing reported + assert!(report_stream.try_recv().is_err()); gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set }); // nothing in cache first time diff --git a/substrate/client/consensus/beefy/src/communication/mod.rs b/substrate/client/consensus/beefy/src/communication/mod.rs index 3827559057dd..6fda63688e69 100644 --- a/substrate/client/consensus/beefy/src/communication/mod.rs +++ b/substrate/client/consensus/beefy/src/communication/mod.rs @@ -90,8 +90,6 @@ mod cost { pub(super) const BAD_SIGNATURE: Rep = Rep::new(-100, "BEEFY: Bad signature"); // Message received with vote from voter not in validator set. pub(super) const UNKNOWN_VOTER: Rep = Rep::new(-150, "BEEFY: Unknown voter"); - // A message received that cannot be evaluated relative to our current state. - pub(super) const OUT_OF_SCOPE_MESSAGE: Rep = Rep::new(-500, "BEEFY: Out-of-scope message"); // Message containing invalid proof. pub(super) const INVALID_PROOF: Rep = Rep::new(-5000, "BEEFY: Invalid commit"); // Reputation cost per signature checked for invalid proof. diff --git a/substrate/client/consensus/beefy/src/import.rs b/substrate/client/consensus/beefy/src/import.rs index fc19ecc30142..ed8ed68c4e8d 100644 --- a/substrate/client/consensus/beefy/src/import.rs +++ b/substrate/client/consensus/beefy/src/import.rs @@ -159,7 +159,7 @@ where // The proof is valid and the block is imported and final, we can import. debug!( target: LOG_TARGET, - "🥩 import justif {:?} for block number {:?}.", proof, number + "🥩 import justif {} for block number {:?}.", proof, number ); // Send the justification to the BEEFY voter for processing. self.justification_sender diff --git a/substrate/client/consensus/beefy/src/lib.rs b/substrate/client/consensus/beefy/src/lib.rs index 11a105561e47..323af1bc8305 100644 --- a/substrate/client/consensus/beefy/src/lib.rs +++ b/substrate/client/consensus/beefy/src/lib.rs @@ -31,7 +31,7 @@ use crate::{ import::BeefyBlockImport, metrics::register_metrics, }; -use futures::{stream::Fuse, StreamExt}; +use futures::{stream::Fuse, FutureExt, StreamExt}; use log::{debug, error, info, warn}; use parking_lot::Mutex; use prometheus::Registry; @@ -40,17 +40,21 @@ use sc_consensus::BlockImport; use sc_network::{NetworkRequest, NotificationService, ProtocolName}; use sc_network_gossip::{GossipEngine, Network as GossipNetwork, Syncing as GossipSyncing}; use sp_api::ProvideRuntimeApi; -use sp_blockchain::{ - Backend as BlockchainBackend, Error as ClientError, HeaderBackend, Result as ClientResult, -}; +use sp_blockchain::{Backend as BlockchainBackend, HeaderBackend}; use sp_consensus::{Error as ConsensusError, SyncOracle}; use sp_consensus_beefy::{ - ecdsa_crypto::AuthorityId, BeefyApi, MmrRootHash, PayloadProvider, ValidatorSet, + ecdsa_crypto::AuthorityId, BeefyApi, ConsensusLog, MmrRootHash, PayloadProvider, ValidatorSet, + BEEFY_ENGINE_ID, }; use sp_keystore::KeystorePtr; use sp_mmr_primitives::MmrApi; use sp_runtime::traits::{Block, Header as HeaderT, NumberFor, Zero}; -use std::{collections::BTreeMap, marker::PhantomData, sync::Arc, time::Duration}; +use std::{ + collections::{BTreeMap, VecDeque}, + marker::PhantomData, + sync::Arc, + time::Duration, +}; mod aux_schema; mod error; @@ -63,9 +67,19 @@ pub mod communication; pub mod import; pub mod justification; +use crate::{ + communication::{gossip::GossipValidator, peers::PeerReport}, + justification::BeefyVersionedFinalityProof, + keystore::BeefyKeystore, + metrics::VoterMetrics, + round::Rounds, + worker::{BeefyWorker, PersistedState}, +}; pub use communication::beefy_protocol_name::{ gossip_protocol_name, justifications_protocol_name as justifs_protocol_name, }; +use sc_utils::mpsc::TracingUnboundedReceiver; +use sp_runtime::generic::OpaqueDigestItemId; #[cfg(test)] mod tests; @@ -209,6 +223,247 @@ pub struct BeefyParams { /// Handler for incoming BEEFY justifications requests from a remote peer. pub on_demand_justifications_handler: BeefyJustifsRequestHandler, } +/// Helper object holding BEEFY worker communication/gossip components. +/// +/// These are created once, but will be reused if worker is restarted/reinitialized. +pub(crate) struct BeefyComms { + pub gossip_engine: GossipEngine, + pub gossip_validator: Arc>, + pub gossip_report_stream: TracingUnboundedReceiver, + pub on_demand_justifications: OnDemandJustificationsEngine, +} + +/// Helper builder object for building [worker::BeefyWorker]. +/// +/// It has to do it in two steps: initialization and build, because the first step can sleep waiting +/// for certain chain and backend conditions, and while sleeping we still need to pump the +/// GossipEngine. Once initialization is done, the GossipEngine (and other pieces) are added to get +/// the complete [worker::BeefyWorker] object. +pub(crate) struct BeefyWorkerBuilder { + // utilities + backend: Arc, + runtime: Arc, + key_store: BeefyKeystore, + // voter metrics + metrics: Option, + persisted_state: PersistedState, +} + +impl BeefyWorkerBuilder +where + B: Block + codec::Codec, + BE: Backend, + R: ProvideRuntimeApi, + R::Api: BeefyApi, +{ + /// This will wait for the chain to enable BEEFY (if not yet enabled) and also wait for the + /// backend to sync all headers required by the voter to build a contiguous chain of mandatory + /// justifications. Then it builds the initial voter state using a combination of previously + /// persisted state in AUX DB and latest chain information/progress. + /// + /// Returns a sane `BeefyWorkerBuilder` that can build the `BeefyWorker`. + pub async fn async_initialize( + backend: Arc, + runtime: Arc, + key_store: BeefyKeystore, + metrics: Option, + min_block_delta: u32, + gossip_validator: Arc>, + finality_notifications: &mut Fuse>, + ) -> Result { + // Wait for BEEFY pallet to be active before starting voter. + let (beefy_genesis, best_grandpa) = + wait_for_runtime_pallet(&*runtime, finality_notifications).await?; + + let persisted_state = Self::load_or_init_state( + beefy_genesis, + best_grandpa, + min_block_delta, + backend.clone(), + runtime.clone(), + &key_store, + &metrics, + ) + .await?; + // Update the gossip validator with the right starting round and set id. + persisted_state + .gossip_filter_config() + .map(|f| gossip_validator.update_filter(f))?; + + Ok(BeefyWorkerBuilder { backend, runtime, key_store, metrics, persisted_state }) + } + + /// Takes rest of missing pieces as params and builds the `BeefyWorker`. + pub fn build( + self, + payload_provider: P, + sync: Arc, + comms: BeefyComms, + links: BeefyVoterLinks, + pending_justifications: BTreeMap, BeefyVersionedFinalityProof>, + ) -> BeefyWorker { + BeefyWorker { + backend: self.backend, + runtime: self.runtime, + key_store: self.key_store, + metrics: self.metrics, + persisted_state: self.persisted_state, + payload_provider, + sync, + comms, + links, + pending_justifications, + } + } + + // If no persisted state present, walk back the chain from first GRANDPA notification to either: + // - latest BEEFY finalized block, or if none found on the way, + // - BEEFY pallet genesis; + // Enqueue any BEEFY mandatory blocks (session boundaries) found on the way, for voter to + // finalize. + async fn init_state( + beefy_genesis: NumberFor, + best_grandpa: ::Header, + min_block_delta: u32, + backend: Arc, + runtime: Arc, + ) -> Result, Error> { + let blockchain = backend.blockchain(); + + let beefy_genesis = runtime + .runtime_api() + .beefy_genesis(best_grandpa.hash()) + .ok() + .flatten() + .filter(|genesis| *genesis == beefy_genesis) + .ok_or_else(|| Error::Backend("BEEFY pallet expected to be active.".into()))?; + // Walk back the imported blocks and initialize voter either, at the last block with + // a BEEFY justification, or at pallet genesis block; voter will resume from there. + let mut sessions = VecDeque::new(); + let mut header = best_grandpa.clone(); + let state = loop { + if let Some(true) = blockchain + .justifications(header.hash()) + .ok() + .flatten() + .map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some()) + { + debug!( + target: LOG_TARGET, + "🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.", + *header.number() + ); + let best_beefy = *header.number(); + // If no session boundaries detected so far, just initialize new rounds here. + if sessions.is_empty() { + let active_set = + expect_validator_set(runtime.as_ref(), backend.as_ref(), &header).await?; + let mut rounds = Rounds::new(best_beefy, active_set); + // Mark the round as already finalized. + rounds.conclude(best_beefy); + sessions.push_front(rounds); + } + let state = PersistedState::checked_new( + best_grandpa, + best_beefy, + sessions, + min_block_delta, + beefy_genesis, + ) + .ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))?; + break state + } + + if *header.number() == beefy_genesis { + // We've reached BEEFY genesis, initialize voter here. + let genesis_set = + expect_validator_set(runtime.as_ref(), backend.as_ref(), &header).await?; + info!( + target: LOG_TARGET, + "🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \ + Starting voting rounds at block {:?}, genesis validator set {:?}.", + beefy_genesis, + genesis_set, + ); + + sessions.push_front(Rounds::new(beefy_genesis, genesis_set)); + break PersistedState::checked_new( + best_grandpa, + Zero::zero(), + sessions, + min_block_delta, + beefy_genesis, + ) + .ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))? + } + + if let Some(active) = find_authorities_change::(&header) { + debug!( + target: LOG_TARGET, + "🥩 Marking block {:?} as BEEFY Mandatory.", + *header.number() + ); + sessions.push_front(Rounds::new(*header.number(), active)); + } + + // Move up the chain. + header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?; + }; + + aux_schema::write_current_version(backend.as_ref())?; + aux_schema::write_voter_state(backend.as_ref(), &state)?; + Ok(state) + } + + async fn load_or_init_state( + beefy_genesis: NumberFor, + best_grandpa: ::Header, + min_block_delta: u32, + backend: Arc, + runtime: Arc, + key_store: &BeefyKeystore, + metrics: &Option, + ) -> Result, Error> { + // Initialize voter state from AUX DB if compatible. + if let Some(mut state) = crate::aux_schema::load_persistent(backend.as_ref())? + // Verify state pallet genesis matches runtime. + .filter(|state| state.pallet_genesis() == beefy_genesis) + { + // Overwrite persisted state with current best GRANDPA block. + state.set_best_grandpa(best_grandpa.clone()); + // Overwrite persisted data with newly provided `min_block_delta`. + state.set_min_block_delta(min_block_delta); + debug!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state); + + // Make sure that all the headers that we need have been synced. + let mut new_sessions = vec![]; + let mut header = best_grandpa.clone(); + while *header.number() > state.best_beefy() { + if state.voting_oracle().can_add_session(*header.number()) { + if let Some(active) = find_authorities_change::(&header) { + new_sessions.push((active, *header.number())); + } + } + header = + wait_for_parent_header(backend.blockchain(), header, HEADER_SYNC_DELAY).await?; + } + + // Make sure we didn't miss any sessions during node restart. + for (validator_set, new_session_start) in new_sessions.drain(..).rev() { + debug!( + target: LOG_TARGET, + "🥩 Handling missed BEEFY session after node restart: {:?}.", + new_session_start + ); + state.init_session_at(new_session_start, validator_set, key_store, metrics); + } + return Ok(state) + } + + // No valid voter-state persisted, re-initialize from pallet genesis. + Self::init_state(beefy_genesis, best_grandpa, min_block_delta, backend, runtime).await + } +} /// Start the BEEFY gadget. /// @@ -277,7 +532,7 @@ pub async fn start_beefy_gadget( known_peers, prometheus_registry.clone(), ); - let mut beefy_comms = worker::BeefyComms { + let mut beefy_comms = BeefyComms { gossip_engine, gossip_validator, gossip_report_stream, @@ -287,57 +542,45 @@ pub async fn start_beefy_gadget( // We re-create and re-run the worker in this loop in order to quickly reinit and resume after // select recoverable errors. loop { - // Wait for BEEFY pallet to be active before starting voter. - let (beefy_genesis, best_grandpa) = match wait_for_runtime_pallet( - &*runtime, - &mut beefy_comms.gossip_engine, - &mut finality_notifications, - ) - .await - { - Ok(res) => res, - Err(e) => { - error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e); - return - }, - }; - - let mut worker_base = worker::BeefyWorkerBase { - backend: backend.clone(), - runtime: runtime.clone(), - key_store: key_store.clone().into(), - metrics: metrics.clone(), - _phantom: Default::default(), - }; - - let persisted_state = match worker_base - .load_or_init_state(beefy_genesis, best_grandpa, min_block_delta) - .await - { - Ok(state) => state, - Err(e) => { - error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e); - return - }, + // Make sure to pump gossip engine while waiting for initialization conditions. + let worker_builder = loop { + futures::select! { + builder_init_result = BeefyWorkerBuilder::async_initialize( + backend.clone(), + runtime.clone(), + key_store.clone().into(), + metrics.clone(), + min_block_delta, + beefy_comms.gossip_validator.clone(), + &mut finality_notifications, + ).fuse() => { + match builder_init_result { + Ok(builder) => break builder, + Err(e) => { + error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", e); + return + }, + } + }, + // Pump peer reports + _ = &mut beefy_comms.gossip_report_stream.next() => { + continue + }, + // Pump gossip engine. + _ = &mut beefy_comms.gossip_engine => { + error!(target: LOG_TARGET, "🥩 Gossip engine has unexpectedly terminated."); + return + } + } }; - // Update the gossip validator with the right starting round and set id. - if let Err(e) = persisted_state - .gossip_filter_config() - .map(|f| beefy_comms.gossip_validator.update_filter(f)) - { - error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e); - return - } - let worker = worker::BeefyWorker { - base: worker_base, - payload_provider: payload_provider.clone(), - sync: sync.clone(), - comms: beefy_comms, - links: links.clone(), - pending_justifications: BTreeMap::new(), - persisted_state, - }; + let worker = worker_builder.build( + payload_provider.clone(), + sync.clone(), + beefy_comms, + links.clone(), + BTreeMap::new(), + ); match futures::future::select( Box::pin(worker.run(&mut block_import_justif, &mut finality_notifications)), @@ -404,9 +647,8 @@ where /// Should be called only once during worker initialization. async fn wait_for_runtime_pallet( runtime: &R, - mut gossip_engine: &mut GossipEngine, finality: &mut Fuse>, -) -> ClientResult<(NumberFor, ::Header)> +) -> Result<(NumberFor, ::Header), Error> where B: Block, R: ProvideRuntimeApi, @@ -414,33 +656,24 @@ where { info!(target: LOG_TARGET, "🥩 BEEFY gadget waiting for BEEFY pallet to become available..."); loop { - futures::select! { - notif = finality.next() => { - let notif = match notif { - Some(notif) => notif, - None => break - }; - let at = notif.header.hash(); - if let Some(start) = runtime.runtime_api().beefy_genesis(at).ok().flatten() { - if *notif.header.number() >= start { - // Beefy pallet available, return header for best grandpa at the time. - info!( - target: LOG_TARGET, - "🥩 BEEFY pallet available: block {:?} beefy genesis {:?}", - notif.header.number(), start - ); - return Ok((start, notif.header)) - } - } - }, - _ = gossip_engine => { - break + let notif = finality.next().await.ok_or_else(|| { + let err_msg = "🥩 Finality stream has unexpectedly terminated.".into(); + error!(target: LOG_TARGET, "{}", err_msg); + Error::Backend(err_msg) + })?; + let at = notif.header.hash(); + if let Some(start) = runtime.runtime_api().beefy_genesis(at).ok().flatten() { + if *notif.header.number() >= start { + // Beefy pallet available, return header for best grandpa at the time. + info!( + target: LOG_TARGET, + "🥩 BEEFY pallet available: block {:?} beefy genesis {:?}", + notif.header.number(), start + ); + return Ok((start, notif.header)) } } } - let err_msg = "🥩 Gossip engine has unexpectedly terminated.".into(); - error!(target: LOG_TARGET, "{}", err_msg); - Err(ClientError::Backend(err_msg)) } /// Provides validator set active `at_header`. It tries to get it from state, otherwise falls @@ -474,7 +707,7 @@ where if let Ok(Some(active)) = runtime.runtime_api().validator_set(header.hash()) { return Ok(active) } else { - match worker::find_authorities_change::(&header) { + match find_authorities_change::(&header) { Some(active) => return Ok(active), // Move up the chain. Ultimately we'll get it from chain genesis state, or error out // there. @@ -486,3 +719,18 @@ where } } } + +/// Scan the `header` digest log for a BEEFY validator set change. Return either the new +/// validator set or `None` in case no validator set change has been signaled. +pub(crate) fn find_authorities_change(header: &B::Header) -> Option> +where + B: Block, +{ + let id = OpaqueDigestItemId::Consensus(&BEEFY_ENGINE_ID); + + let filter = |log: ConsensusLog| match log { + ConsensusLog::AuthoritiesChange(validator_set) => Some(validator_set), + _ => None, + }; + header.digest().convert_first(|l| l.try_to(id).and_then(filter)) +} diff --git a/substrate/client/consensus/beefy/src/tests.rs b/substrate/client/consensus/beefy/src/tests.rs index 0e6ff210b158..d106c9dcd881 100644 --- a/substrate/client/consensus/beefy/src/tests.rs +++ b/substrate/client/consensus/beefy/src/tests.rs @@ -32,8 +32,8 @@ use crate::{ gossip_protocol_name, justification::*, wait_for_runtime_pallet, - worker::{BeefyWorkerBase, PersistedState}, - BeefyRPCLinks, BeefyVoterLinks, KnownPeers, + worker::PersistedState, + BeefyRPCLinks, BeefyVoterLinks, BeefyWorkerBuilder, KnownPeers, }; use futures::{future, stream::FuturesUnordered, Future, FutureExt, StreamExt}; use parking_lot::Mutex; @@ -368,27 +368,19 @@ async fn voter_init_setup( api: &TestApi, ) -> Result, Error> { let backend = net.peer(0).client().as_backend(); - let known_peers = Arc::new(Mutex::new(KnownPeers::new())); - let (gossip_validator, _) = GossipValidator::new(known_peers); - let gossip_validator = Arc::new(gossip_validator); - let mut gossip_engine = sc_network_gossip::GossipEngine::new( - net.peer(0).network_service().clone(), - net.peer(0).sync_service().clone(), - net.peer(0).take_notification_service(&beefy_gossip_proto_name()).unwrap(), - "/beefy/whatever", - gossip_validator, - None, - ); - let (beefy_genesis, best_grandpa) = - wait_for_runtime_pallet(api, &mut gossip_engine, finality).await.unwrap(); - let mut worker_base = BeefyWorkerBase { + let (beefy_genesis, best_grandpa) = wait_for_runtime_pallet(api, finality).await.unwrap(); + let key_store = None.into(); + let metrics = None; + BeefyWorkerBuilder::load_or_init_state( + beefy_genesis, + best_grandpa, + 1, backend, - runtime: Arc::new(api.clone()), - key_store: None.into(), - metrics: None, - _phantom: Default::default(), - }; - worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await + Arc::new(api.clone()), + &key_store, + &metrics, + ) + .await } // Spawns beefy voters. Returns a future to spawn on the runtime. @@ -1065,32 +1057,7 @@ async fn should_initialize_voter_at_custom_genesis() { net.peer(0).client().as_client().finalize_block(hashes[8], None).unwrap(); // load persistent state - nothing in DB, should init at genesis - // - // NOTE: code from `voter_init_setup()` is moved here because the new network event system - // doesn't allow creating a new `GossipEngine` as the notification handle is consumed by the - // first `GossipEngine` - let known_peers = Arc::new(Mutex::new(KnownPeers::new())); - let (gossip_validator, _) = GossipValidator::new(known_peers); - let gossip_validator = Arc::new(gossip_validator); - let mut gossip_engine = sc_network_gossip::GossipEngine::new( - net.peer(0).network_service().clone(), - net.peer(0).sync_service().clone(), - net.peer(0).take_notification_service(&beefy_gossip_proto_name()).unwrap(), - "/beefy/whatever", - gossip_validator, - None, - ); - let (beefy_genesis, best_grandpa) = - wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap(); - let mut worker_base = BeefyWorkerBase { - backend: backend.clone(), - runtime: Arc::new(api), - key_store: None.into(), - metrics: None, - _phantom: Default::default(), - }; - let persisted_state = - worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap(); + let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); // Test initialization at session boundary. // verify voter initialized with single session starting at block `custom_pallet_genesis` (7) @@ -1120,18 +1087,7 @@ async fn should_initialize_voter_at_custom_genesis() { net.peer(0).client().as_client().finalize_block(hashes[10], None).unwrap(); // load persistent state - state preset in DB, but with different pallet genesis - // the network state persists and uses the old `GossipEngine` initialized for `peer(0)` - let (beefy_genesis, best_grandpa) = - wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap(); - let mut worker_base = BeefyWorkerBase { - backend: backend.clone(), - runtime: Arc::new(api), - key_store: None.into(), - metrics: None, - _phantom: Default::default(), - }; - let new_persisted_state = - worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap(); + let new_persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); // verify voter initialized with single session starting at block `new_pallet_genesis` (10) let sessions = new_persisted_state.voting_oracle().sessions(); @@ -1322,32 +1278,7 @@ async fn should_catch_up_when_loading_saved_voter_state() { let api = TestApi::with_validator_set(&validator_set); // load persistent state - nothing in DB, should init at genesis - // - // NOTE: code from `voter_init_setup()` is moved here because the new network event system - // doesn't allow creating a new `GossipEngine` as the notification handle is consumed by the - // first `GossipEngine` - let known_peers = Arc::new(Mutex::new(KnownPeers::new())); - let (gossip_validator, _) = GossipValidator::new(known_peers); - let gossip_validator = Arc::new(gossip_validator); - let mut gossip_engine = sc_network_gossip::GossipEngine::new( - net.peer(0).network_service().clone(), - net.peer(0).sync_service().clone(), - net.peer(0).take_notification_service(&beefy_gossip_proto_name()).unwrap(), - "/beefy/whatever", - gossip_validator, - None, - ); - let (beefy_genesis, best_grandpa) = - wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap(); - let mut worker_base = BeefyWorkerBase { - backend: backend.clone(), - runtime: Arc::new(api.clone()), - key_store: None.into(), - metrics: None, - _phantom: Default::default(), - }; - let persisted_state = - worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap(); + let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); // Test initialization at session boundary. // verify voter initialized with two sessions starting at blocks 1 and 10 @@ -1374,18 +1305,7 @@ async fn should_catch_up_when_loading_saved_voter_state() { // finalize 25 without justifications net.peer(0).client().as_client().finalize_block(hashes[25], None).unwrap(); // load persistent state - state preset in DB - // the network state persists and uses the old `GossipEngine` initialized for `peer(0)` - let (beefy_genesis, best_grandpa) = - wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap(); - let mut worker_base = BeefyWorkerBase { - backend: backend.clone(), - runtime: Arc::new(api), - key_store: None.into(), - metrics: None, - _phantom: Default::default(), - }; - let persisted_state = - worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap(); + let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); // Verify voter initialized with old sessions plus a new one starting at block 20. // There shouldn't be any duplicates. diff --git a/substrate/client/consensus/beefy/src/worker.rs b/substrate/client/consensus/beefy/src/worker.rs index d7a229003cc4..c8eb19621ba5 100644 --- a/substrate/client/consensus/beefy/src/worker.rs +++ b/substrate/client/consensus/beefy/src/worker.rs @@ -17,46 +17,42 @@ // along with this program. If not, see . use crate::{ - aux_schema, communication::{ - gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage, GossipValidator}, + gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage}, peers::PeerReport, - request_response::outgoing_requests_engine::{OnDemandJustificationsEngine, ResponseInfo}, + request_response::outgoing_requests_engine::ResponseInfo, }, error::Error, - expect_validator_set, + find_authorities_change, justification::BeefyVersionedFinalityProof, keystore::BeefyKeystore, metric_inc, metric_set, metrics::VoterMetrics, round::{Rounds, VoteImportResult}, - wait_for_parent_header, BeefyVoterLinks, HEADER_SYNC_DELAY, LOG_TARGET, + BeefyComms, BeefyVoterLinks, LOG_TARGET, }; use codec::{Codec, Decode, DecodeAll, Encode}; use futures::{stream::Fuse, FutureExt, StreamExt}; use log::{debug, error, info, log_enabled, trace, warn}; use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, HeaderBackend}; -use sc_network_gossip::GossipEngine; -use sc_utils::{mpsc::TracingUnboundedReceiver, notification::NotificationReceiver}; +use sc_utils::notification::NotificationReceiver; use sp_api::ProvideRuntimeApi; use sp_arithmetic::traits::{AtLeast32Bit, Saturating}; -use sp_blockchain::Backend as BlockchainBackend; use sp_consensus::SyncOracle; use sp_consensus_beefy::{ check_equivocation_proof, ecdsa_crypto::{AuthorityId, Signature}, - BeefyApi, BeefySignatureHasher, Commitment, ConsensusLog, EquivocationProof, PayloadProvider, - ValidatorSet, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID, + BeefyApi, BeefySignatureHasher, Commitment, EquivocationProof, PayloadProvider, ValidatorSet, + VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID, }; use sp_runtime::{ - generic::{BlockId, OpaqueDigestItemId}, + generic::BlockId, traits::{Block, Header, NumberFor, Zero}, SaturatedConversion, }; use std::{ collections::{BTreeMap, BTreeSet, VecDeque}, fmt::Debug, - marker::PhantomData, sync::Arc, }; @@ -180,8 +176,8 @@ impl VoterOracle { } } - // Check if an observed session can be added to the Oracle. - fn can_add_session(&self, session_start: NumberFor) -> bool { + /// Check if an observed session can be added to the Oracle. + pub fn can_add_session(&self, session_start: NumberFor) -> bool { let latest_known_session_start = self.sessions.back().map(|session| session.session_start()); Some(session_start) > latest_known_session_start @@ -319,229 +315,28 @@ impl PersistedState { self.voting_oracle.best_grandpa_block_header = best_grandpa; } + pub fn voting_oracle(&self) -> &VoterOracle { + &self.voting_oracle + } + pub(crate) fn gossip_filter_config(&self) -> Result, Error> { let (start, end) = self.voting_oracle.accepted_interval()?; let validator_set = self.voting_oracle.current_validator_set()?; Ok(GossipFilterCfg { start, end, validator_set }) } -} - -/// Helper object holding BEEFY worker communication/gossip components. -/// -/// These are created once, but will be reused if worker is restarted/reinitialized. -pub(crate) struct BeefyComms { - pub gossip_engine: GossipEngine, - pub gossip_validator: Arc>, - pub gossip_report_stream: TracingUnboundedReceiver, - pub on_demand_justifications: OnDemandJustificationsEngine, -} - -pub(crate) struct BeefyWorkerBase { - // utilities - pub backend: Arc, - pub runtime: Arc, - pub key_store: BeefyKeystore, - - /// BEEFY client metrics. - pub metrics: Option, - - pub _phantom: PhantomData, -} - -impl BeefyWorkerBase -where - B: Block + Codec, - BE: Backend, - R: ProvideRuntimeApi, - R::Api: BeefyApi, -{ - // If no persisted state present, walk back the chain from first GRANDPA notification to either: - // - latest BEEFY finalized block, or if none found on the way, - // - BEEFY pallet genesis; - // Enqueue any BEEFY mandatory blocks (session boundaries) found on the way, for voter to - // finalize. - async fn init_state( - &self, - beefy_genesis: NumberFor, - best_grandpa: ::Header, - min_block_delta: u32, - ) -> Result, Error> { - let blockchain = self.backend.blockchain(); - - let beefy_genesis = self - .runtime - .runtime_api() - .beefy_genesis(best_grandpa.hash()) - .ok() - .flatten() - .filter(|genesis| *genesis == beefy_genesis) - .ok_or_else(|| Error::Backend("BEEFY pallet expected to be active.".into()))?; - // Walk back the imported blocks and initialize voter either, at the last block with - // a BEEFY justification, or at pallet genesis block; voter will resume from there. - let mut sessions = VecDeque::new(); - let mut header = best_grandpa.clone(); - let state = loop { - if let Some(true) = blockchain - .justifications(header.hash()) - .ok() - .flatten() - .map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some()) - { - debug!( - target: LOG_TARGET, - "🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.", - *header.number() - ); - let best_beefy = *header.number(); - // If no session boundaries detected so far, just initialize new rounds here. - if sessions.is_empty() { - let active_set = - expect_validator_set(self.runtime.as_ref(), self.backend.as_ref(), &header) - .await?; - let mut rounds = Rounds::new(best_beefy, active_set); - // Mark the round as already finalized. - rounds.conclude(best_beefy); - sessions.push_front(rounds); - } - let state = PersistedState::checked_new( - best_grandpa, - best_beefy, - sessions, - min_block_delta, - beefy_genesis, - ) - .ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))?; - break state - } - - if *header.number() == beefy_genesis { - // We've reached BEEFY genesis, initialize voter here. - let genesis_set = - expect_validator_set(self.runtime.as_ref(), self.backend.as_ref(), &header) - .await?; - info!( - target: LOG_TARGET, - "🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \ - Starting voting rounds at block {:?}, genesis validator set {:?}.", - beefy_genesis, - genesis_set, - ); - - sessions.push_front(Rounds::new(beefy_genesis, genesis_set)); - break PersistedState::checked_new( - best_grandpa, - Zero::zero(), - sessions, - min_block_delta, - beefy_genesis, - ) - .ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))? - } - - if let Some(active) = find_authorities_change::(&header) { - debug!( - target: LOG_TARGET, - "🥩 Marking block {:?} as BEEFY Mandatory.", - *header.number() - ); - sessions.push_front(Rounds::new(*header.number(), active)); - } - - // Move up the chain. - header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?; - }; - - aux_schema::write_current_version(self.backend.as_ref())?; - aux_schema::write_voter_state(self.backend.as_ref(), &state)?; - Ok(state) - } - - pub async fn load_or_init_state( - &mut self, - beefy_genesis: NumberFor, - best_grandpa: ::Header, - min_block_delta: u32, - ) -> Result, Error> { - // Initialize voter state from AUX DB if compatible. - if let Some(mut state) = crate::aux_schema::load_persistent(self.backend.as_ref())? - // Verify state pallet genesis matches runtime. - .filter(|state| state.pallet_genesis() == beefy_genesis) - { - // Overwrite persisted state with current best GRANDPA block. - state.set_best_grandpa(best_grandpa.clone()); - // Overwrite persisted data with newly provided `min_block_delta`. - state.set_min_block_delta(min_block_delta); - debug!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state); - - // Make sure that all the headers that we need have been synced. - let mut new_sessions = vec![]; - let mut header = best_grandpa.clone(); - while *header.number() > state.best_beefy() { - if state.voting_oracle.can_add_session(*header.number()) { - if let Some(active) = find_authorities_change::(&header) { - new_sessions.push((active, *header.number())); - } - } - header = - wait_for_parent_header(self.backend.blockchain(), header, HEADER_SYNC_DELAY) - .await?; - } - - // Make sure we didn't miss any sessions during node restart. - for (validator_set, new_session_start) in new_sessions.drain(..).rev() { - debug!( - target: LOG_TARGET, - "🥩 Handling missed BEEFY session after node restart: {:?}.", - new_session_start - ); - self.init_session_at(&mut state, validator_set, new_session_start); - } - return Ok(state) - } - - // No valid voter-state persisted, re-initialize from pallet genesis. - self.init_state(beefy_genesis, best_grandpa, min_block_delta).await - } - - /// Verify `active` validator set for `block` against the key store - /// - /// We want to make sure that we have _at least one_ key in our keystore that - /// is part of the validator set, that's because if there are no local keys - /// then we can't perform our job as a validator. - /// - /// Note that for a non-authority node there will be no keystore, and we will - /// return an error and don't check. The error can usually be ignored. - fn verify_validator_set( - &self, - block: &NumberFor, - active: &ValidatorSet, - ) -> Result<(), Error> { - let active: BTreeSet<&AuthorityId> = active.validators().iter().collect(); - - let public_keys = self.key_store.public_keys()?; - let store: BTreeSet<&AuthorityId> = public_keys.iter().collect(); - - if store.intersection(&active).count() == 0 { - let msg = "no authority public key found in store".to_string(); - debug!(target: LOG_TARGET, "🥩 for block {:?} {}", block, msg); - metric_inc!(self.metrics, beefy_no_authority_found_in_store); - Err(Error::Keystore(msg)) - } else { - Ok(()) - } - } /// Handle session changes by starting new voting round for mandatory blocks. - fn init_session_at( + pub fn init_session_at( &mut self, - persisted_state: &mut PersistedState, - validator_set: ValidatorSet, new_session_start: NumberFor, + validator_set: ValidatorSet, + key_store: &BeefyKeystore, + metrics: &Option, ) { debug!(target: LOG_TARGET, "🥩 New active validator set: {:?}", validator_set); // BEEFY should finalize a mandatory block during each session. - if let Ok(active_session) = persisted_state.voting_oracle.active_rounds() { + if let Ok(active_session) = self.voting_oracle.active_rounds() { if !active_session.mandatory_done() { debug!( target: LOG_TARGET, @@ -549,20 +344,20 @@ where validator_set.id(), active_session.validator_set_id(), ); - metric_inc!(self.metrics, beefy_lagging_sessions); + metric_inc!(metrics, beefy_lagging_sessions); } } if log_enabled!(target: LOG_TARGET, log::Level::Debug) { // verify the new validator set - only do it if we're also logging the warning - let _ = self.verify_validator_set(&new_session_start, &validator_set); + if verify_validator_set::(&new_session_start, &validator_set, key_store).is_err() { + metric_inc!(metrics, beefy_no_authority_found_in_store); + } } let id = validator_set.id(); - persisted_state - .voting_oracle - .add_session(Rounds::new(new_session_start, validator_set)); - metric_set!(self.metrics, beefy_validator_set_id, id); + self.voting_oracle.add_session(Rounds::new(new_session_start, validator_set)); + metric_set!(metrics, beefy_validator_set_id, id); info!( target: LOG_TARGET, "🥩 New Rounds for validator set id: {:?} with session_start {:?}", @@ -574,9 +369,10 @@ where /// A BEEFY worker/voter that follows the BEEFY protocol pub(crate) struct BeefyWorker { - pub base: BeefyWorkerBase, - - // utils + // utilities + pub backend: Arc, + pub runtime: Arc, + pub key_store: BeefyKeystore, pub payload_provider: P, pub sync: Arc, @@ -592,6 +388,8 @@ pub(crate) struct BeefyWorker { pub pending_justifications: BTreeMap, BeefyVersionedFinalityProof>, /// Persisted voter state. pub persisted_state: PersistedState, + /// BEEFY voter metrics + pub metrics: Option, } impl BeefyWorker @@ -622,8 +420,12 @@ where validator_set: ValidatorSet, new_session_start: NumberFor, ) { - self.base - .init_session_at(&mut self.persisted_state, validator_set, new_session_start); + self.persisted_state.init_session_at( + new_session_start, + validator_set, + &self.key_store, + &self.metrics, + ); } fn handle_finality_notification( @@ -639,8 +441,7 @@ where notification.tree_route, ); - self.base - .runtime + self.runtime .runtime_api() .beefy_genesis(header.hash()) .ok() @@ -654,7 +455,7 @@ where self.persisted_state.set_best_grandpa(header.clone()); // Check all (newly) finalized blocks for new session(s). - let backend = self.base.backend.clone(); + let backend = self.backend.clone(); for header in notification .tree_route .iter() @@ -673,7 +474,7 @@ where } if new_session_added { - crate::aux_schema::write_voter_state(&*self.base.backend, &self.persisted_state) + crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state) .map_err(|e| Error::Backend(e.to_string()))?; } @@ -707,7 +508,7 @@ where true, ); }, - RoundAction::Drop => metric_inc!(self.base.metrics, beefy_stale_votes), + RoundAction::Drop => metric_inc!(self.metrics, beefy_stale_votes), RoundAction::Enqueue => error!(target: LOG_TARGET, "🥩 unexpected vote: {:?}.", vote), }; Ok(()) @@ -727,23 +528,23 @@ where match self.voting_oracle().triage_round(block_num)? { RoundAction::Process => { debug!(target: LOG_TARGET, "🥩 Process justification for round: {:?}.", block_num); - metric_inc!(self.base.metrics, beefy_imported_justifications); + metric_inc!(self.metrics, beefy_imported_justifications); self.finalize(justification)? }, RoundAction::Enqueue => { debug!(target: LOG_TARGET, "🥩 Buffer justification for round: {:?}.", block_num); if self.pending_justifications.len() < MAX_BUFFERED_JUSTIFICATIONS { self.pending_justifications.entry(block_num).or_insert(justification); - metric_inc!(self.base.metrics, beefy_buffered_justifications); + metric_inc!(self.metrics, beefy_buffered_justifications); } else { - metric_inc!(self.base.metrics, beefy_buffered_justifications_dropped); + metric_inc!(self.metrics, beefy_buffered_justifications_dropped); warn!( target: LOG_TARGET, "🥩 Buffer justification dropped for round: {:?}.", block_num ); } }, - RoundAction::Drop => metric_inc!(self.base.metrics, beefy_stale_justifications), + RoundAction::Drop => metric_inc!(self.metrics, beefy_stale_justifications), }; Ok(()) } @@ -765,7 +566,7 @@ where // We created the `finality_proof` and know to be valid. // New state is persisted after finalization. self.finalize(finality_proof.clone())?; - metric_inc!(self.base.metrics, beefy_good_votes_processed); + metric_inc!(self.metrics, beefy_good_votes_processed); return Ok(Some(finality_proof)) }, VoteImportResult::Ok => { @@ -776,20 +577,17 @@ where .map(|(mandatory_num, _)| mandatory_num == block_number) .unwrap_or(false) { - crate::aux_schema::write_voter_state( - &*self.base.backend, - &self.persisted_state, - ) - .map_err(|e| Error::Backend(e.to_string()))?; + crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state) + .map_err(|e| Error::Backend(e.to_string()))?; } - metric_inc!(self.base.metrics, beefy_good_votes_processed); + metric_inc!(self.metrics, beefy_good_votes_processed); }, VoteImportResult::Equivocation(proof) => { - metric_inc!(self.base.metrics, beefy_equivocation_votes); + metric_inc!(self.metrics, beefy_equivocation_votes); self.report_equivocation(proof)?; }, - VoteImportResult::Invalid => metric_inc!(self.base.metrics, beefy_invalid_votes), - VoteImportResult::Stale => metric_inc!(self.base.metrics, beefy_stale_votes), + VoteImportResult::Invalid => metric_inc!(self.metrics, beefy_invalid_votes), + VoteImportResult::Stale => metric_inc!(self.metrics, beefy_stale_votes), }; Ok(None) } @@ -816,15 +614,14 @@ where // Set new best BEEFY block number. self.persisted_state.set_best_beefy(block_num); - crate::aux_schema::write_voter_state(&*self.base.backend, &self.persisted_state) + crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state) .map_err(|e| Error::Backend(e.to_string()))?; - metric_set!(self.base.metrics, beefy_best_block, block_num); + metric_set!(self.metrics, beefy_best_block, block_num); self.comms.on_demand_justifications.cancel_requests_older_than(block_num); if let Err(e) = self - .base .backend .blockchain() .expect_block_hash_from_id(&BlockId::Number(block_num)) @@ -834,8 +631,7 @@ where .notify(|| Ok::<_, ()>(hash)) .expect("forwards closure result; the closure always returns Ok; qed."); - self.base - .backend + self.backend .append_justification(hash, (BEEFY_ENGINE_ID, finality_proof.encode())) }) { debug!( @@ -872,13 +668,13 @@ where for (num, justification) in justifs_to_process.into_iter() { debug!(target: LOG_TARGET, "🥩 Handle buffered justification for: {:?}.", num); - metric_inc!(self.base.metrics, beefy_imported_justifications); + metric_inc!(self.metrics, beefy_imported_justifications); if let Err(err) = self.finalize(justification) { error!(target: LOG_TARGET, "🥩 Error finalizing block: {}", err); } } metric_set!( - self.base.metrics, + self.metrics, beefy_buffered_justifications, self.pending_justifications.len() ); @@ -890,7 +686,7 @@ where fn try_to_vote(&mut self) -> Result<(), Error> { // Vote if there's now a new vote target. if let Some(target) = self.voting_oracle().voting_target() { - metric_set!(self.base.metrics, beefy_should_vote_on, target); + metric_set!(self.metrics, beefy_should_vote_on, target); if target > self.persisted_state.best_voted { self.do_vote(target)?; } @@ -910,7 +706,6 @@ where self.persisted_state.voting_oracle.best_grandpa_block_header.clone() } else { let hash = self - .base .backend .blockchain() .expect_block_hash_from_id(&BlockId::Number(target_number)) @@ -922,7 +717,7 @@ where Error::Backend(err_msg) })?; - self.base.backend.blockchain().expect_header(hash).map_err(|err| { + self.backend.blockchain().expect_header(hash).map_err(|err| { let err_msg = format!( "Couldn't get header for block #{:?} ({:?}) (error: {:?}), skipping vote..", target_number, hash, err @@ -942,7 +737,7 @@ where let rounds = self.persisted_state.voting_oracle.active_rounds_mut()?; let (validators, validator_set_id) = (rounds.validators(), rounds.validator_set_id()); - let authority_id = if let Some(id) = self.base.key_store.authority_id(validators) { + let authority_id = if let Some(id) = self.key_store.authority_id(validators) { debug!(target: LOG_TARGET, "🥩 Local authority id: {:?}", id); id } else { @@ -956,7 +751,7 @@ where let commitment = Commitment { payload, block_number: target_number, validator_set_id }; let encoded_commitment = commitment.encode(); - let signature = match self.base.key_store.sign(&authority_id, &encoded_commitment) { + let signature = match self.key_store.sign(&authority_id, &encoded_commitment) { Ok(sig) => sig, Err(err) => { warn!(target: LOG_TARGET, "🥩 Error signing commitment: {:?}", err); @@ -981,7 +776,7 @@ where .gossip_engine .gossip_message(proofs_topic::(), encoded_proof, true); } else { - metric_inc!(self.base.metrics, beefy_votes_sent); + metric_inc!(self.metrics, beefy_votes_sent); debug!(target: LOG_TARGET, "🥩 Sent vote message: {:?}", vote); let encoded_vote = GossipMessage::::Vote(vote).encode(); self.comms.gossip_engine.gossip_message(votes_topic::(), encoded_vote, false); @@ -989,8 +784,8 @@ where // Persist state after vote to avoid double voting in case of voter restarts. self.persisted_state.best_voted = target_number; - metric_set!(self.base.metrics, beefy_best_voted, target_number); - crate::aux_schema::write_voter_state(&*self.base.backend, &self.persisted_state) + metric_set!(self.metrics, beefy_best_voted, target_number); + crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state) .map_err(|e| Error::Backend(e.to_string())) } @@ -1164,7 +959,7 @@ where if !check_equivocation_proof::<_, _, BeefySignatureHasher>(&proof) { debug!(target: LOG_TARGET, "🥩 Skip report for bad equivocation {:?}", proof); return Ok(()) - } else if let Some(local_id) = self.base.key_store.authority_id(validators) { + } else if let Some(local_id) = self.key_store.authority_id(validators) { if offender_id == local_id { warn!(target: LOG_TARGET, "🥩 Skip equivocation report for own equivocation"); return Ok(()) @@ -1173,7 +968,6 @@ where let number = *proof.round_number(); let hash = self - .base .backend .blockchain() .expect_block_hash_from_id(&BlockId::Number(number)) @@ -1184,7 +978,7 @@ where ); Error::Backend(err_msg) })?; - let runtime_api = self.base.runtime.runtime_api(); + let runtime_api = self.runtime.runtime_api(); // generate key ownership proof at that block let key_owner_proof = match runtime_api .generate_key_ownership_proof(hash, validator_set_id, offender_id) @@ -1201,7 +995,7 @@ where }; // submit equivocation report at **best** block - let best_block_hash = self.base.backend.blockchain().info().best_hash; + let best_block_hash = self.backend.blockchain().info().best_hash; runtime_api .submit_report_equivocation_unsigned_extrinsic(best_block_hash, proof, key_owner_proof) .map_err(Error::RuntimeApi)?; @@ -1210,21 +1004,6 @@ where } } -/// Scan the `header` digest log for a BEEFY validator set change. Return either the new -/// validator set or `None` in case no validator set change has been signaled. -pub(crate) fn find_authorities_change(header: &B::Header) -> Option> -where - B: Block, -{ - let id = OpaqueDigestItemId::Consensus(&BEEFY_ENGINE_ID); - - let filter = |log: ConsensusLog| match log { - ConsensusLog::AuthoritiesChange(validator_set) => Some(validator_set), - _ => None, - }; - header.digest().convert_first(|l| l.try_to(id).and_then(filter)) -} - /// Calculate next block number to vote on. /// /// Return `None` if there is no votable target yet. @@ -1261,11 +1040,42 @@ where } } +/// Verify `active` validator set for `block` against the key store +/// +/// We want to make sure that we have _at least one_ key in our keystore that +/// is part of the validator set, that's because if there are no local keys +/// then we can't perform our job as a validator. +/// +/// Note that for a non-authority node there will be no keystore, and we will +/// return an error and don't check. The error can usually be ignored. +fn verify_validator_set( + block: &NumberFor, + active: &ValidatorSet, + key_store: &BeefyKeystore, +) -> Result<(), Error> { + let active: BTreeSet<&AuthorityId> = active.validators().iter().collect(); + + let public_keys = key_store.public_keys()?; + let store: BTreeSet<&AuthorityId> = public_keys.iter().collect(); + + if store.intersection(&active).count() == 0 { + let msg = "no authority public key found in store".to_string(); + debug!(target: LOG_TARGET, "🥩 for block {:?} {}", block, msg); + Err(Error::Keystore(msg)) + } else { + Ok(()) + } +} + #[cfg(test)] pub(crate) mod tests { use super::*; use crate::{ - communication::notification::{BeefyBestBlockStream, BeefyVersionedFinalityProofStream}, + communication::{ + gossip::GossipValidator, + notification::{BeefyBestBlockStream, BeefyVersionedFinalityProofStream}, + request_response::outgoing_requests_engine::OnDemandJustificationsEngine, + }, tests::{ create_beefy_keystore, get_beefy_streams, make_beefy_ids, BeefyPeer, BeefyTestNet, TestApi, @@ -1275,6 +1085,7 @@ pub(crate) mod tests { use futures::{future::poll_fn, task::Poll}; use parking_lot::Mutex; use sc_client_api::{Backend as BackendT, HeaderBackend}; + use sc_network_gossip::GossipEngine; use sc_network_sync::SyncingService; use sc_network_test::TestNetFactory; use sp_blockchain::Backend as BlockchainBackendT; @@ -1283,7 +1094,7 @@ pub(crate) mod tests { known_payloads::MMR_ROOT_ID, mmr::MmrRootProvider, test_utils::{generate_equivocation_proof, Keyring}, - Payload, SignedCommitment, + ConsensusLog, Payload, SignedCommitment, }; use sp_runtime::traits::{Header as HeaderT, One}; use substrate_test_runtime_client::{ @@ -1292,10 +1103,6 @@ pub(crate) mod tests { }; impl PersistedState { - pub fn voting_oracle(&self) -> &VoterOracle { - &self.voting_oracle - } - pub fn active_round(&self) -> Result<&Rounds, Error> { self.voting_oracle.active_rounds() } @@ -1391,13 +1198,10 @@ pub(crate) mod tests { on_demand_justifications, }; BeefyWorker { - base: BeefyWorkerBase { - backend, - runtime: api, - key_store: Some(keystore).into(), - metrics, - _phantom: Default::default(), - }, + backend, + runtime: api, + key_store: Some(keystore).into(), + metrics, payload_provider, sync: Arc::new(sync), links, @@ -1675,19 +1479,22 @@ pub(crate) mod tests { let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone()); // keystore doesn't contain other keys than validators' - assert_eq!(worker.base.verify_validator_set(&1, &validator_set), Ok(())); + assert_eq!(verify_validator_set::(&1, &validator_set, &worker.key_store), Ok(())); // unknown `Bob` key let keys = &[Keyring::Bob]; let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); let err_msg = "no authority public key found in store".to_string(); let expected = Err(Error::Keystore(err_msg)); - assert_eq!(worker.base.verify_validator_set(&1, &validator_set), expected); + assert_eq!(verify_validator_set::(&1, &validator_set, &worker.key_store), expected); // worker has no keystore - worker.base.key_store = None.into(); + worker.key_store = None.into(); let expected_err = Err(Error::Keystore("no Keystore".into())); - assert_eq!(worker.base.verify_validator_set(&1, &validator_set), expected_err); + assert_eq!( + verify_validator_set::(&1, &validator_set, &worker.key_store), + expected_err + ); } #[tokio::test] @@ -1839,7 +1646,7 @@ pub(crate) mod tests { let mut net = BeefyTestNet::new(1); let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone()); - worker.base.runtime = api_alice.clone(); + worker.runtime = api_alice.clone(); // let there be a block with num = 1: let _ = net.peer(0).push_blocks(1, false); diff --git a/substrate/primitives/consensus/beefy/src/commitment.rs b/substrate/primitives/consensus/beefy/src/commitment.rs index 1f0fb34ebf10..335c6b604f04 100644 --- a/substrate/primitives/consensus/beefy/src/commitment.rs +++ b/substrate/primitives/consensus/beefy/src/commitment.rs @@ -97,6 +97,19 @@ pub struct SignedCommitment { pub signatures: Vec>, } +impl sp_std::fmt::Display + for SignedCommitment +{ + fn fmt(&self, f: &mut sp_std::fmt::Formatter<'_>) -> sp_std::fmt::Result { + let signatures_count = self.signatures.iter().filter(|s| s.is_some()).count(); + write!( + f, + "SignedCommitment(commitment: {:?}, signatures_count: {})", + self.commitment, signatures_count + ) + } +} + impl SignedCommitment { /// Return the number of collected signatures. pub fn no_of_signatures(&self) -> usize { @@ -241,6 +254,14 @@ pub enum VersionedFinalityProof { V1(SignedCommitment), } +impl sp_std::fmt::Display for VersionedFinalityProof { + fn fmt(&self, f: &mut sp_std::fmt::Formatter<'_>) -> sp_std::fmt::Result { + match self { + VersionedFinalityProof::V1(sc) => write!(f, "VersionedFinalityProof::V1({})", sc), + } + } +} + impl From> for VersionedFinalityProof { fn from(commitment: SignedCommitment) -> Self { VersionedFinalityProof::V1(commitment)