Skip to content

Commit

Permalink
BFT-496: Add AttestationStatus and related structs
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh committed Jul 29, 2024
1 parent ec9a937 commit a0a7da0
Show file tree
Hide file tree
Showing 13 changed files with 261 additions and 82 deletions.
1 change: 1 addition & 0 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/actors/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ zksync_consensus_utils.workspace = true
zksync_protobuf.workspace = true

anyhow.workspace = true
async-trait.workspace = true
rand.workspace = true
tracing.workspace = true
vise.workspace = true
Expand Down
53 changes: 23 additions & 30 deletions node/actors/executor/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
use crate::Attester;
use anyhow::Context;
use std::sync::Arc;
use zksync_concurrency::{ctx, time};
use zksync_consensus_network::gossip::BatchVotesPublisher;
use zksync_concurrency::{ctx, sync, time};
use zksync_consensus_network::gossip::{AttestationStatusReceiver, BatchVotesPublisher};
use zksync_consensus_roles::attester;
use zksync_consensus_storage::{BatchStore, BlockStore};

Expand All @@ -16,6 +16,7 @@ pub(super) struct AttesterRunner {
batch_store: Arc<BatchStore>,
attester: Attester,
publisher: BatchVotesPublisher,
status: AttestationStatusReceiver,
}

impl AttesterRunner {
Expand All @@ -25,16 +26,18 @@ impl AttesterRunner {
batch_store: Arc<BatchStore>,
attester: Attester,
publisher: BatchVotesPublisher,
status: AttestationStatusReceiver,
) -> Self {
Self {
block_store,
batch_store,
attester,
publisher,
status,
}
}
/// Poll the database for new L1 batches and publish our signature over the batch.
pub(super) async fn run(self, ctx: &ctx::Ctx) -> ctx::Result<()> {
pub(super) async fn run(mut self, ctx: &ctx::Ctx) -> ctx::Result<()> {
let public_key = self.attester.key.public();
// TODO: In the future when we have attester rotation these checks will have to be checked inside the loop.
let Some(attesters) = self.block_store.genesis().attesters.as_ref() else {
Expand All @@ -48,28 +51,26 @@ impl AttesterRunner {

let genesis = self.block_store.genesis().hash();

// Find the initial range of batches that we want to (re)sign after a (re)start.
let last_batch_number = self
.batch_store
.wait_until_persisted(ctx, attester::BatchNumber(0))
.await
.context("wait_until_persisted")?
.last
.unwrap_or_default();
let mut prev = None;

// Determine the batch to start signing from.
let earliest_batch_number = self
.batch_store
.earliest_batch_number_to_sign(ctx)
.await
.context("earliest_batch_number_to_sign")?
.unwrap_or(last_batch_number);
loop {
let batch_number =
sync::wait_for_some(ctx, &mut self.status, |s| match s.next_batch_to_attest {
next if next == prev => None,
next => next,
})
.await?;

tracing::info!(%earliest_batch_number, %last_batch_number, "attesting batches");
tracing::info!(%batch_number, "attestation status");

let mut batch_number = earliest_batch_number;
// We can avoid actively polling the database in `wait_for_batch_to_sign` by waiting its existence
// to be indicated in memory (which itself relies on polling). This happens once we have the commitment,
// which for nodes that get the blocks through BFT should happen after execution. Nodes which
// rely on batch sync don't participate in attestations as they need the batch on L1 first.
self.batch_store
.wait_until_persisted(ctx, batch_number)
.await?;

loop {
// Try to get the next batch to sign; the commitment might not be available just yet.
let batch = self.wait_for_batch_to_sign(ctx, batch_number).await?;

Expand All @@ -85,15 +86,7 @@ impl AttesterRunner {
.await
.context("publish")?;

batch_number = batch_number.next();

// We can avoid actively polling the database by waiting for the next persisted batch to appear
// in the memory (which itself relies on polling). This happens once we have the commitment,
// which for nodes that get the blocks through BFT should happen after execution. Nodes which
// rely on batch sync don't participate in attestations as they need the batch on L1 first.
self.batch_store
.wait_until_persisted(ctx, batch_number)
.await?;
prev = Some(batch_number);
}
}

Expand Down
6 changes: 5 additions & 1 deletion node/actors/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
};
use zksync_concurrency::{ctx, limiter, net, scope, time};
use zksync_consensus_bft as bft;
use zksync_consensus_network as network;
use zksync_consensus_network::{self as network, gossip::AttestationStatusClient};
use zksync_consensus_roles::{attester, node, validator};
use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore};
use zksync_consensus_utils::pipe;
Expand Down Expand Up @@ -97,6 +97,8 @@ pub struct Executor {
pub validator: Option<Validator>,
/// Validator-specific node data.
pub attester: Option<Attester>,
/// Client to use to poll attestation status: either through the main node API or the DB.
pub attestation_status_client: Box<dyn AttestationStatusClient>,
}

impl Executor {
Expand Down Expand Up @@ -138,6 +140,7 @@ impl Executor {
self.block_store.clone(),
self.batch_store.clone(),
network_actor_pipe,
self.attestation_status_client,
);
net.register_metrics();
s.spawn(async { runner.run(ctx).await.context("Network stopped") });
Expand All @@ -149,6 +152,7 @@ impl Executor {
self.batch_store.clone(),
attester,
net.batch_vote_publisher(),
net.attestation_status_receiver(),
);
s.spawn(async {
runner.run(ctx).await?;
Expand Down
17 changes: 16 additions & 1 deletion node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use rand::Rng as _;
use tracing::Instrument as _;
use zksync_concurrency::testonly::abort_on_panic;
use zksync_consensus_bft as bft;
use zksync_consensus_network::testonly::{new_configs, new_fullnode};
use zksync_consensus_network::{
gossip::LocalAttestationStatus,
testonly::{new_configs, new_fullnode},
};
use zksync_consensus_roles::validator::{testonly::Setup, BlockNumber};
use zksync_consensus_storage::{
testonly::{in_memory, TestMemoryStorage},
Expand All @@ -26,12 +29,21 @@ fn config(cfg: &network::Config) -> Config {
}
}

/// The test executors below are not running with attesters, so it doesn't matter if the clients
/// are returning views based on the store of main node or each to their own. For simplicity this
/// returns an implementation that queries the local store of each instance. Alternatively we
/// could implement an instance that never queries anything.
fn mk_attestation_status_client(batch_store: &Arc<BatchStore>) -> impl AttestationStatusClient {
LocalAttestationStatus::new(batch_store.clone())
}

fn validator(
cfg: &network::Config,
block_store: Arc<BlockStore>,
batch_store: Arc<BatchStore>,
replica_store: impl ReplicaStore,
) -> Executor {
let attestation_status_client = Box::new(mk_attestation_status_client(&batch_store));
Executor {
config: config(cfg),
block_store,
Expand All @@ -42,6 +54,7 @@ fn validator(
payload_manager: Box::new(bft::testonly::RandomPayload(1000)),
}),
attester: None,
attestation_status_client,
}
}

Expand All @@ -50,12 +63,14 @@ fn fullnode(
block_store: Arc<BlockStore>,
batch_store: Arc<BatchStore>,
) -> Executor {
let attestation_status_client = Box::new(mk_attestation_status_client(&batch_store));
Executor {
config: config(cfg),
block_store,
batch_store,
validator: None,
attester: None,
attestation_status_client,
}
}

Expand Down
82 changes: 82 additions & 0 deletions node/actors/network/src/gossip/attestation_status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::sync::Arc;

use zksync_concurrency::{ctx, sync};
use zksync_consensus_roles::attester;
use zksync_consensus_storage::BatchStore;

use crate::watch::Watch;

/// An interface which is used by attesters and nodes collecting votes over gossip to determine
/// which is the next batch they are all supposed to be voting on, according to the main node.
#[async_trait::async_trait]
pub trait AttestationStatusClient: 'static + std::fmt::Debug + Send + Sync {
/// Get the next batch number for which the main node expects a batch QC to be formed.
///
/// The API might return an error while genesis is being created, which we represent with `None`
/// here and mean that we'll have to try again later.
async fn next_batch_to_attest(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>>;
}

/// Implement the attestation status for the main node by returning the next to vote on from the [BatchStore].
#[derive(Debug, Clone)]
pub struct LocalAttestationStatus(Arc<BatchStore>);

impl LocalAttestationStatus {
/// Create local attestation client form a [BatchStore].
pub fn new(store: Arc<BatchStore>) -> Self {
Self(store)
}
}

#[async_trait::async_trait]
impl AttestationStatusClient for LocalAttestationStatus {
async fn next_batch_to_attest(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
self.0.next_batch_to_attest(ctx).await
}
}

/// Coordinate the attestation by showing the status as seen by the main node.
pub struct AttestationStatus {
/// Next batch number where voting is expected.
///
/// Its value is `None` until the background process polling the main node
/// can establish a value to start from.
pub next_batch_to_attest: Option<attester::BatchNumber>,
}

/// The subscription over the attestation status which votes can monitor for change.
pub type AttestationStatusReceiver = sync::watch::Receiver<AttestationStatus>;

pub(crate) struct AttestationStatusWatch(Watch<AttestationStatus>);

impl Default for AttestationStatusWatch {
fn default() -> Self {
Self(Watch::new(AttestationStatus {
next_batch_to_attest: None,
}))
}
}

impl AttestationStatusWatch {
/// Subscribes to AttestationStatus updates.
pub(crate) fn subscribe(&self) -> AttestationStatusReceiver {
self.0.subscribe()
}

pub(crate) async fn update(&self, next_batch_to_attest: attester::BatchNumber) {
let this = self.0.lock().await;
this.send_if_modified(|status| match status.next_batch_to_attest {
Some(n) if n == next_batch_to_attest => false,
_ => {
status.next_batch_to_attest = Some(next_batch_to_attest);
true
}
});
}
}
Loading

0 comments on commit a0a7da0

Please sign in to comment.