Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Poll the main node for batch to vote on (BFT-496) #161

Merged
merged 9 commits into from
Jul 31, 2024
1 change: 1 addition & 0 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion node/actors/bft/src/testonly/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use zksync_concurrency::{
},
oneshot, scope,
};
use zksync_consensus_network as network;
use zksync_consensus_network::{self as network};
use zksync_consensus_roles::validator;
use zksync_consensus_storage::{testonly::TestMemoryStorage, BlockStore};
use zksync_consensus_utils::pipe;
Expand Down Expand Up @@ -135,9 +135,11 @@ impl Test {
for (i, net) in nets.into_iter().enumerate() {
let store = TestMemoryStorage::new(ctx, genesis).await;
s.spawn_bg(async { Ok(store.runner.run(ctx).await?) });

if self.nodes[i].0 == Behavior::Honest {
honest.push(store.blocks.clone());
}

nodes.push(Node {
net,
behavior: self.nodes[i].0,
Expand Down
1 change: 1 addition & 0 deletions node/actors/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ zksync_consensus_utils.workspace = true
zksync_protobuf.workspace = true

anyhow.workspace = true
async-trait.workspace = true
rand.workspace = true
tracing.workspace = true
vise.workspace = true
Expand Down
148 changes: 114 additions & 34 deletions node/actors/executor/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@
use crate::Attester;
use anyhow::Context;
use std::sync::Arc;
use zksync_concurrency::{ctx, time};
use zksync_consensus_network::gossip::BatchVotesPublisher;
use zksync_concurrency::{ctx, sync, time};
use zksync_consensus_network::gossip::{
AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher,
};
use zksync_consensus_roles::attester;
use zksync_consensus_storage::{BatchStore, BlockStore};

const POLL_INTERVAL: time::Duration = time::Duration::seconds(1);

/// Polls the database for new batches to be signed and publishes them to the gossip channel.
pub(super) struct AttesterRunner {
block_store: Arc<BlockStore>,
batch_store: Arc<BatchStore>,
attester: Attester,
publisher: BatchVotesPublisher,
status: AttestationStatusReceiver,
poll_interval: time::Duration,
}

impl AttesterRunner {
Expand All @@ -25,16 +27,20 @@ impl AttesterRunner {
batch_store: Arc<BatchStore>,
attester: Attester,
publisher: BatchVotesPublisher,
status: AttestationStatusReceiver,
poll_interval: time::Duration,
) -> Self {
Self {
block_store,
batch_store,
attester,
publisher,
status,
poll_interval,
}
}
/// Poll the database for new L1 batches and publish our signature over the batch.
pub(super) async fn run(self, ctx: &ctx::Ctx) -> ctx::Result<()> {
pub(super) async fn run(mut self, ctx: &ctx::Ctx) -> ctx::Result<()> {
let public_key = self.attester.key.public();
// TODO: In the future when we have attester rotation these checks will have to be checked inside the loop.
let Some(attesters) = self.block_store.genesis().attesters.as_ref() else {
Expand All @@ -48,28 +54,27 @@ impl AttesterRunner {

let genesis = self.block_store.genesis().hash();

// Find the initial range of batches that we want to (re)sign after a (re)start.
let last_batch_number = self
.batch_store
.wait_until_persisted(ctx, attester::BatchNumber(0))
.await
.context("wait_until_persisted")?
.last
.unwrap_or_default();
// Subscribe starts as seen but we don't want to miss the first item.
self.status.mark_changed();

// Determine the batch to start signing from.
let earliest_batch_number = self
.batch_store
.earliest_batch_number_to_sign(ctx)
.await
.context("earliest_batch_number_to_sign")?
.unwrap_or(last_batch_number);
loop {
let Some(batch_number) = sync::changed(ctx, &mut self.status)
.await?
.next_batch_to_attest
else {
continue;
};

tracing::info!(%earliest_batch_number, %last_batch_number, "attesting batches");
tracing::info!(%batch_number, "attestation status");

let mut batch_number = earliest_batch_number;
// We can avoid actively polling the database in `wait_for_batch_to_sign` by waiting its existence
// to be indicated in memory (which itself relies on polling). This happens once we have the commitment,
// which for nodes that get the blocks through BFT should happen after execution. Nodes which
// rely on batch sync don't participate in attestations as they need the batch on L1 first.
self.batch_store
.wait_until_persisted(ctx, batch_number)
.await?;

loop {
// Try to get the next batch to sign; the commitment might not be available just yet.
let batch = self.wait_for_batch_to_sign(ctx, batch_number).await?;

Expand All @@ -84,16 +89,6 @@ impl AttesterRunner {
.publish(attesters, &genesis, &self.attester.key, batch)
.await
.context("publish")?;

batch_number = batch_number.next();

// We can avoid actively polling the database by waiting for the next persisted batch to appear
// in the memory (which itself relies on polling). This happens once we have the commitment,
// which for nodes that get the blocks through BFT should happen after execution. Nodes which
// rely on batch sync don't participate in attestations as they need the batch on L1 first.
self.batch_store
.wait_until_persisted(ctx, batch_number)
.await?;
}
}

Expand All @@ -112,8 +107,93 @@ impl AttesterRunner {
{
return Ok(batch);
} else {
ctx.sleep(POLL_INTERVAL).await?;
ctx.sleep(self.poll_interval).await?;
}
}
}
}

/// An interface which is used by attesters and nodes collecting votes over gossip to determine
/// which is the next batch they are all supposed to be voting on, according to the main node.
///
/// This is a convenience interface to be used with the [AttestationStatusRunner].
#[async_trait::async_trait]
pub trait AttestationStatusClient: 'static + Send + Sync {
/// Get the next batch number for which the main node expects a batch QC to be formed.
///
/// The API might return an error while genesis is being created, which we represent with `None`
/// here and mean that we'll have to try again later.
async fn next_batch_to_attest(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>>;
}

/// Use an [AttestationStatusClient] to periodically poll the main node and update the [AttestationStatusWatch].
///
/// This is provided for convenience.
pub struct AttestationStatusRunner {
status: Arc<AttestationStatusWatch>,
client: Box<dyn AttestationStatusClient>,
poll_interval: time::Duration,
}

impl AttestationStatusRunner {
/// Create a new runner to poll the main node.
pub fn new(
status: Arc<AttestationStatusWatch>,
client: Box<dyn AttestationStatusClient>,
poll_interval: time::Duration,
) -> Self {
Self {
status,
client,
poll_interval,
}
}

/// Runner based on a [BatchStore].
pub fn new_from_store(
status: Arc<AttestationStatusWatch>,
store: Arc<BatchStore>,
poll_interval: time::Duration,
) -> Self {
Self::new(
status,
Box::new(LocalAttestationStatusClient(store)),
poll_interval,
)
}

/// Run the poll loop.
pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
loop {
match self.client.next_batch_to_attest(ctx).await {
Ok(Some(batch_number)) => {
self.status.update(batch_number).await;
}
Ok(None) => tracing::debug!("waiting for attestation status..."),
Err(error) => tracing::error!(
?error,
"failed to poll attestation status, retrying later..."
),
}
if let Err(ctx::Canceled) = ctx.sleep(self.poll_interval).await {
return Ok(());
}
}
}
}

/// Implement the attestation status for the main node by returning the next to vote on from the [BatchStore].
struct LocalAttestationStatusClient(Arc<BatchStore>);

#[async_trait::async_trait]
impl AttestationStatusClient for LocalAttestationStatusClient {
async fn next_batch_to_attest(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
self.0.next_batch_to_attest(ctx).await
}
}
12 changes: 10 additions & 2 deletions node/actors/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ use std::{
};
use zksync_concurrency::{ctx, limiter, net, scope, time};
use zksync_consensus_bft as bft;
use zksync_consensus_network as network;
use zksync_consensus_network::{self as network, gossip::AttestationStatusWatch};
use zksync_consensus_roles::{attester, node, validator};
use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore};
use zksync_consensus_utils::pipe;
use zksync_protobuf::kB;

mod attestation;
pub mod attestation;
mod io;
#[cfg(test)]
mod tests;
Expand Down Expand Up @@ -70,6 +70,9 @@ pub struct Config {
/// Http debug page configuration.
/// If None, debug page is disabled
pub debug_page: Option<http::DebugPageConfig>,

/// How often to poll the database looking for the batch commitment.
pub batch_poll_interval: time::Duration,
}

impl Config {
Expand Down Expand Up @@ -97,6 +100,8 @@ pub struct Executor {
pub validator: Option<Validator>,
/// Validator-specific node data.
pub attester: Option<Attester>,
/// Status showing where the main node wants attester to cast their votes.
pub attestation_status: Arc<AttestationStatusWatch>,
}

impl Executor {
Expand Down Expand Up @@ -138,6 +143,7 @@ impl Executor {
self.block_store.clone(),
self.batch_store.clone(),
network_actor_pipe,
self.attestation_status.clone(),
);
net.register_metrics();
s.spawn(async { runner.run(ctx).await.context("Network stopped") });
Expand All @@ -149,6 +155,8 @@ impl Executor {
self.batch_store.clone(),
attester,
net.batch_vote_publisher(),
self.attestation_status.subscribe(),
self.config.batch_poll_interval,
);
s.spawn(async {
runner.run(ctx).await?;
Expand Down
9 changes: 9 additions & 0 deletions node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@ fn config(cfg: &network::Config) -> Config {
gossip_static_outbound: cfg.gossip.static_outbound.clone(),
rpc: cfg.rpc.clone(),
debug_page: None,
batch_poll_interval: time::Duration::seconds(1),
}
}

/// The test executors below are not running with attesters, so we just create an [AttestationStatusWatch]
/// that will never be updated.
fn never_attest() -> Arc<AttestationStatusWatch> {
Arc::new(AttestationStatusWatch::default())
}

fn validator(
cfg: &network::Config,
block_store: Arc<BlockStore>,
Expand All @@ -42,6 +49,7 @@ fn validator(
payload_manager: Box::new(bft::testonly::RandomPayload(1000)),
}),
attester: None,
attestation_status: never_attest(),
}
}

Expand All @@ -56,6 +64,7 @@ fn fullnode(
batch_store,
validator: None,
attester: None,
attestation_status: never_attest(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions node/actors/network/src/consensus/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,11 @@ async fn test_address_change() {
// should get reconstructed.
cfgs[0].server_addr = net::tcp::testonly::reserve_listener();
cfgs[0].public_addr = (*cfgs[0].server_addr).into();

let (node0, runner) =
testonly::Instance::new(cfgs[0].clone(), store.blocks.clone(), store.batches.clone());
s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node0")));

nodes[0] = node0;
for n in &nodes {
n.wait_for_consensus_connections().await;
Expand Down
57 changes: 57 additions & 0 deletions node/actors/network/src/gossip/attestation_status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::fmt;

use zksync_concurrency::sync;
use zksync_consensus_roles::attester;

use crate::watch::Watch;

/// Coordinate the attestation by showing the status as seen by the main node.
#[derive(Debug, Clone)]
pub struct AttestationStatus {
/// Next batch number where voting is expected.
///
/// Its value is `None` until the background process polling the main node
/// can establish a value to start from.
pub next_batch_to_attest: Option<attester::BatchNumber>,
aakoshh marked this conversation as resolved.
Show resolved Hide resolved
}

/// The subscription over the attestation status which voters can monitor for change.
pub type AttestationStatusReceiver = sync::watch::Receiver<AttestationStatus>;

/// A [Watch] over an [AttestationStatus] which we can use to notify components about
/// changes in the batch number the main node expects attesters to vote on.
pub struct AttestationStatusWatch(Watch<AttestationStatus>);

impl fmt::Debug for AttestationStatusWatch {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("AttestationStatusWatch")
.finish_non_exhaustive()
}
}

impl Default for AttestationStatusWatch {
fn default() -> Self {
Self(Watch::new(AttestationStatus {
next_batch_to_attest: None,
}))
}
}

impl AttestationStatusWatch {
/// Subscribes to AttestationStatus updates.
pub fn subscribe(&self) -> AttestationStatusReceiver {
self.0.subscribe()
}

/// Set the next batch number to attest on and notify subscribers it changed.
pub async fn update(&self, next_batch_to_attest: attester::BatchNumber) {
let this = self.0.lock().await;
this.send_if_modified(|status| {
if status.next_batch_to_attest == Some(next_batch_to_attest) {
return false;
}
status.next_batch_to_attest = Some(next_batch_to_attest);
true
});
}
}
Loading
Loading