Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Poll the main node for batch to vote on (BFT-496) #161

Merged
merged 9 commits into from
Jul 31, 2024
1 change: 1 addition & 0 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/actors/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ zksync_consensus_utils.workspace = true
zksync_protobuf.workspace = true

anyhow.workspace = true
async-trait.workspace = true
rand.workspace = true
tracing.workspace = true
vise.workspace = true
Expand Down
53 changes: 23 additions & 30 deletions node/actors/executor/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
use crate::Attester;
use anyhow::Context;
use std::sync::Arc;
use zksync_concurrency::{ctx, time};
use zksync_consensus_network::gossip::BatchVotesPublisher;
use zksync_concurrency::{ctx, sync, time};
use zksync_consensus_network::gossip::{AttestationStatusReceiver, BatchVotesPublisher};
use zksync_consensus_roles::attester;
use zksync_consensus_storage::{BatchStore, BlockStore};

Expand All @@ -16,6 +16,7 @@ pub(super) struct AttesterRunner {
batch_store: Arc<BatchStore>,
attester: Attester,
publisher: BatchVotesPublisher,
status: AttestationStatusReceiver,
}

impl AttesterRunner {
Expand All @@ -25,16 +26,18 @@ impl AttesterRunner {
batch_store: Arc<BatchStore>,
attester: Attester,
publisher: BatchVotesPublisher,
status: AttestationStatusReceiver,
) -> Self {
Self {
block_store,
batch_store,
attester,
publisher,
status,
}
}
/// Poll the database for new L1 batches and publish our signature over the batch.
pub(super) async fn run(self, ctx: &ctx::Ctx) -> ctx::Result<()> {
pub(super) async fn run(mut self, ctx: &ctx::Ctx) -> ctx::Result<()> {
let public_key = self.attester.key.public();
// TODO: In the future when we have attester rotation these checks will have to be checked inside the loop.
let Some(attesters) = self.block_store.genesis().attesters.as_ref() else {
Expand All @@ -48,28 +51,26 @@ impl AttesterRunner {

let genesis = self.block_store.genesis().hash();

// Find the initial range of batches that we want to (re)sign after a (re)start.
let last_batch_number = self
.batch_store
.wait_until_persisted(ctx, attester::BatchNumber(0))
.await
.context("wait_until_persisted")?
.last
.unwrap_or_default();
let mut prev = None;

// Determine the batch to start signing from.
let earliest_batch_number = self
.batch_store
.earliest_batch_number_to_sign(ctx)
.await
.context("earliest_batch_number_to_sign")?
.unwrap_or(last_batch_number);
loop {
let batch_number =
sync::wait_for_some(ctx, &mut self.status, |s| match s.next_batch_to_attest {
next if next == prev => None,
next => next,
})
.await?;

tracing::info!(%earliest_batch_number, %last_batch_number, "attesting batches");
tracing::info!(%batch_number, "attestation status");

let mut batch_number = earliest_batch_number;
// We can avoid actively polling the database in `wait_for_batch_to_sign` by waiting its existence
// to be indicated in memory (which itself relies on polling). This happens once we have the commitment,
// which for nodes that get the blocks through BFT should happen after execution. Nodes which
// rely on batch sync don't participate in attestations as they need the batch on L1 first.
self.batch_store
.wait_until_persisted(ctx, batch_number)
.await?;

loop {
// Try to get the next batch to sign; the commitment might not be available just yet.
let batch = self.wait_for_batch_to_sign(ctx, batch_number).await?;

Expand All @@ -85,15 +86,7 @@ impl AttesterRunner {
.await
.context("publish")?;

batch_number = batch_number.next();

// We can avoid actively polling the database by waiting for the next persisted batch to appear
// in the memory (which itself relies on polling). This happens once we have the commitment,
// which for nodes that get the blocks through BFT should happen after execution. Nodes which
// rely on batch sync don't participate in attestations as they need the batch on L1 first.
self.batch_store
.wait_until_persisted(ctx, batch_number)
.await?;
prev = Some(batch_number);
}
}

Expand Down
6 changes: 5 additions & 1 deletion node/actors/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
};
use zksync_concurrency::{ctx, limiter, net, scope, time};
use zksync_consensus_bft as bft;
use zksync_consensus_network as network;
use zksync_consensus_network::{self as network, gossip::AttestationStatusClient};
use zksync_consensus_roles::{attester, node, validator};
use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore};
use zksync_consensus_utils::pipe;
Expand Down Expand Up @@ -97,6 +97,8 @@ pub struct Executor {
pub validator: Option<Validator>,
/// Validator-specific node data.
pub attester: Option<Attester>,
/// Client to use to poll attestation status: either through the main node API or the DB.
pub attestation_status_client: Box<dyn AttestationStatusClient>,
}

impl Executor {
Expand Down Expand Up @@ -138,6 +140,7 @@ impl Executor {
self.block_store.clone(),
self.batch_store.clone(),
network_actor_pipe,
self.attestation_status_client,
);
net.register_metrics();
s.spawn(async { runner.run(ctx).await.context("Network stopped") });
Expand All @@ -149,6 +152,7 @@ impl Executor {
self.batch_store.clone(),
attester,
net.batch_vote_publisher(),
net.attestation_status_receiver(),
);
s.spawn(async {
runner.run(ctx).await?;
Expand Down
17 changes: 16 additions & 1 deletion node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use rand::Rng as _;
use tracing::Instrument as _;
use zksync_concurrency::testonly::abort_on_panic;
use zksync_consensus_bft as bft;
use zksync_consensus_network::testonly::{new_configs, new_fullnode};
use zksync_consensus_network::{
gossip::LocalAttestationStatus,
testonly::{new_configs, new_fullnode},
};
use zksync_consensus_roles::validator::{testonly::Setup, BlockNumber};
use zksync_consensus_storage::{
testonly::{in_memory, TestMemoryStorage},
Expand All @@ -26,12 +29,21 @@ fn config(cfg: &network::Config) -> Config {
}
}

/// The test executors below are not running with attesters, so it doesn't matter if the clients
/// are returning views based on the store of main node or each to their own. For simplicity this
/// returns an implementation that queries the local store of each instance. Alternatively we
/// could implement an instance that never queries anything.
fn mk_attestation_status_client(batch_store: &Arc<BatchStore>) -> impl AttestationStatusClient {
LocalAttestationStatus::new(batch_store.clone())
}

fn validator(
cfg: &network::Config,
block_store: Arc<BlockStore>,
batch_store: Arc<BatchStore>,
replica_store: impl ReplicaStore,
) -> Executor {
let attestation_status_client = Box::new(mk_attestation_status_client(&batch_store));
Executor {
config: config(cfg),
block_store,
Expand All @@ -42,6 +54,7 @@ fn validator(
payload_manager: Box::new(bft::testonly::RandomPayload(1000)),
}),
attester: None,
attestation_status_client,
}
}

Expand All @@ -50,12 +63,14 @@ fn fullnode(
block_store: Arc<BlockStore>,
batch_store: Arc<BatchStore>,
) -> Executor {
let attestation_status_client = Box::new(mk_attestation_status_client(&batch_store));
Executor {
config: config(cfg),
block_store,
batch_store,
validator: None,
attester: None,
attestation_status_client,
}
}

Expand Down
85 changes: 85 additions & 0 deletions node/actors/network/src/gossip/attestation_status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use std::sync::Arc;

use zksync_concurrency::{ctx, sync};
use zksync_consensus_roles::attester;
use zksync_consensus_storage::BatchStore;

use crate::watch::Watch;

/// An interface which is used by attesters and nodes collecting votes over gossip to determine
/// which is the next batch they are all supposed to be voting on, according to the main node.
#[async_trait::async_trait]
pub trait AttestationStatusClient: 'static + std::fmt::Debug + Send + Sync {
/// Get the next batch number for which the main node expects a batch QC to be formed.
///
/// The API might return an error while genesis is being created, which we represent with `None`
/// here and mean that we'll have to try again later.
async fn next_batch_to_attest(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>>;
pompon0 marked this conversation as resolved.
Show resolved Hide resolved
}

/// Implement the attestation status for the main node by returning the next to vote on from the [BatchStore].
#[derive(Debug, Clone)]
pub struct LocalAttestationStatus(Arc<BatchStore>);

impl LocalAttestationStatus {
/// Create local attestation client form a [BatchStore].
pub fn new(store: Arc<BatchStore>) -> Self {
Self(store)
}
}

#[async_trait::async_trait]
impl AttestationStatusClient for LocalAttestationStatus {
async fn next_batch_to_attest(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
self.0.next_batch_to_attest(ctx).await
}
}

/// Coordinate the attestation by showing the status as seen by the main node.
pub struct AttestationStatus {
/// Next batch number where voting is expected.
///
/// Its value is `None` until the background process polling the main node
/// can establish a value to start from.
pub next_batch_to_attest: Option<attester::BatchNumber>,
aakoshh marked this conversation as resolved.
Show resolved Hide resolved
}

/// The subscription over the attestation status which votes can monitor for change.
pub type AttestationStatusReceiver = sync::watch::Receiver<AttestationStatus>;

/// A [Watch] over an [AttestationStatus] which we can use to notify components about
/// changes in the batch number the main node expects attesters to vote on.
pub(crate) struct AttestationStatusWatch(Watch<AttestationStatus>);

impl Default for AttestationStatusWatch {
fn default() -> Self {
Self(Watch::new(AttestationStatus {
next_batch_to_attest: None,
}))
}
}

impl AttestationStatusWatch {
/// Subscribes to AttestationStatus updates.
pub(crate) fn subscribe(&self) -> AttestationStatusReceiver {
self.0.subscribe()
}

/// Set the next batch number to attest on and notify subscribers it changed.
pub(crate) async fn update(&self, next_batch_to_attest: attester::BatchNumber) {
let this = self.0.lock().await;
this.send_if_modified(|status| {
if status.next_batch_to_attest == Some(next_batch_to_attest) {
return false;
}
status.next_batch_to_attest = Some(next_batch_to_attest);
true
});
}
}
22 changes: 13 additions & 9 deletions node/actors/network/src/gossip/batch_votes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl BatchUpdateStats {
pub(crate) struct BatchVotes {
/// The latest vote received from each attester. We only keep the last one
/// for now, hoping that with 1 minute batches there's plenty of time for
/// the quorum to be reached, but eventually we'll have to allow multiple
/// the quorum to be reached, but eventually we might have to allow multiple
/// votes across different heights.
pub(crate) votes: im::HashMap<attester::PublicKey, Arc<attester::Signed<attester::Batch>>>,

Expand All @@ -51,6 +51,11 @@ pub(crate) struct BatchVotes {
im::OrdMap<attester::BatchNumber, im::HashMap<attester::BatchHash, attester::Weight>>,

/// The minimum batch number for which we are still interested in votes.
///
/// Because we only store 1 vote per attester the memory is very much bounded,
/// but this extra pruning mechanism can be used to clear votes of attesters
/// who have been removed from the committee, as well as to get rid of the
/// last quorum we found and stored, and look for the a new one in the next round.
pub(crate) min_batch_number: attester::BatchNumber,
}

Expand Down Expand Up @@ -131,20 +136,19 @@ impl BatchVotes {

/// Check if we have achieved quorum for any of the batch hashes.
///
/// The return value is a vector because eventually we will be potentially waiting for
/// quorums on multiple heights simultaneously.
/// Returns the first quorum it finds, after which we expect that the state of the main node or L1
/// will indicate that attestation on the next height can happen, which will either naturally move
/// the QC, or we can do so by increasing the `min_batch_number`.
///
/// For repeated queries we can supply a skip list of heights for which we already saved the QC.
pub(super) fn find_quorums(
/// While we only store 1 vote per attester we'll only ever have at most one quorum anyway.
pub(super) fn find_quorum(
&self,
attesters: &attester::Committee,
genesis: &attester::GenesisHash,
skip: impl Fn(attester::BatchNumber) -> bool,
) -> Vec<attester::BatchQC> {
) -> Option<attester::BatchQC> {
let threshold = attesters.threshold();
self.support
.iter()
.filter(|(number, _)| !skip(**number))
.flat_map(|(number, candidates)| {
candidates
.iter()
Expand All @@ -170,7 +174,7 @@ impl BatchVotes {
}
})
})
.collect()
.next()
}

/// Set the minimum batch number for which we admit votes.
Expand Down
Loading
Loading