Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Use earliest batch number to sign (BFT-474) #150

Merged
merged 9 commits into from
Jul 9, 2024
104 changes: 63 additions & 41 deletions node/actors/executor/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use anyhow::Context;
use zksync_concurrency::ctx;
use zksync_concurrency::time;
use zksync_consensus_network::gossip::BatchVotesPublisher;
use zksync_consensus_roles::attester::BatchNumber;
use zksync_consensus_roles::attester;
use zksync_consensus_storage::{BatchStore, BlockStore};

use crate::Attester;
Expand Down Expand Up @@ -39,51 +39,73 @@ impl AttesterRunner {
/// Poll the database for new L1 batches and publish our signature over the batch.
pub(super) async fn run(self, ctx: &ctx::Ctx) -> ctx::Result<()> {
let public_key = self.attester.key.public();
// The first batch number we want to publish our vote for. We don't have to re-publish a vote
// because once it enters the vote register even future peers can pull it from there.
let mut min_batch_number = BatchNumber(0);
// TODO: In the future when we have attester rotation these checks will have to be checked inside the loop.
let Some(attesters) = self.block_store.genesis().attesters.as_ref() else {
tracing::warn!("Attester key is set, but the attester committee is empty.");
return Ok(());
};
if !attesters.contains(&public_key) {
tracing::warn!("Attester key is set, but not part of the attester committee.");
return Ok(());
}

// Find the initial range of batches that we want to (re)sign after a (re)start.
let last_batch_number = self
.batch_store
.last_batch_number(ctx)
.await
.context("last_batch_number")?
.unwrap_or_default();

// Determine the batch to start signing from.
let earliest_batch_number = self
.batch_store
.earliest_batch_number_to_sign(ctx)
.await
.context("earliest_batch_number_to_sign")?
.unwrap_or(last_batch_number);

tracing::info!(%earliest_batch_number, %last_batch_number, "attesting batches");

let mut batch_number = earliest_batch_number;

loop {
// Pretend that the attesters can evolve.
let Some(attesters) = self.block_store.genesis().attesters.as_ref() else {
continue;
};
if !attesters.contains(&public_key) {
continue;
}
// Try to get the next batch to sign; the commitment might not be available just yet.
let batch = self.wait_for_batch_to_sign(ctx, batch_number).await?;

// The certificates might be collected out of order because of how gossip works;
// we could query the DB to see if we already have a QC, or we can just go ahead
// and publish our vote, and let others ignore it.

let mut unsigned_batch_numbers = self
tracing::info!(%batch_number, "publishing attestation");

// We only have to publish a vote once; future peers can pull it from the register.
self.publisher
.publish(attesters, &self.attester.key, batch)
.await
.context("publish")?;

batch_number = batch_number.next();
}
}

/// Wait for the batch commitment to become available.
async fn wait_for_batch_to_sign(
&self,
ctx: &ctx::Ctx,
number: attester::BatchNumber,
) -> ctx::Result<attester::Batch> {
loop {
if let Some(batch) = self
.batch_store
.unsigned_batch_numbers(ctx)
.batch_to_sign(ctx, number)
.await
.context("unsigned_batch_numbers")?;

// Just to be sure we go from smaller to higher batches.
unsigned_batch_numbers.sort();

for bn in unsigned_batch_numbers {
// If we have already voted on this we can move on, no need to fetch the payload again.
// Batches appear in the store in order, even if we might have QC for a newer and not for an older batch,
// so once published our vote for a certain height, we can expect that we only have to vote on newer batches.
if bn < min_batch_number {
continue;
}

if let Some(batch) = self
.batch_store
.batch_to_sign(ctx, bn)
.await
.context("batch_to_sign")?
{
min_batch_number = batch.number.next();

self.publisher
.publish(attesters, &self.attester.key, batch)
.await
.context("publish")?;
}
.context("batch_to_sign")?
{
return Ok(batch);
} else {
ctx.sleep(POLL_INTERVAL).await?;
}
aakoshh marked this conversation as resolved.
Show resolved Hide resolved

ctx.sleep(POLL_INTERVAL).await?;
}
}
}
4 changes: 4 additions & 0 deletions node/actors/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub struct Config {
pub public_addr: net::Host,
/// Maximal size of the block payload.
pub max_payload_size: usize,
/// Maximal size of a batch, which includes `max_payload_size` per block in the batch,
/// plus the size of the Merkle proof of the commitment being included on L1 (should be ~1kB).
pub max_batch_size: usize,
/// Key of this node. It uniquely identifies the node.
/// It should match the secret key provided in the `node_key` file.
pub node_key: node::SecretKey,
Expand Down Expand Up @@ -107,6 +110,7 @@ impl Executor {
attester_key: self.attester.as_ref().map(|a| a.key.clone()),
ping_timeout: Some(time::Duration::seconds(10)),
max_block_size: self.config.max_payload_size.saturating_add(kB),
max_batch_size: self.config.max_batch_size.saturating_add(kB),
max_block_queue_size: 20,
tcp_accept_rate: limiter::Rate {
burst: 10,
Expand Down
1 change: 1 addition & 0 deletions node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ fn config(cfg: &network::Config) -> Config {
server_addr: *cfg.server_addr,
public_addr: cfg.public_addr.clone(),
max_payload_size: usize::MAX,
max_batch_size: usize::MAX,
node_key: cfg.gossip.key.clone(),
gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit,
gossip_static_inbound: cfg.gossip.static_inbound.clone(),
Expand Down
2 changes: 2 additions & 0 deletions node/actors/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ pub struct Config {
pub attester_key: Option<attester::SecretKey>,
/// Maximal size of the proto-encoded `validator::FinalBlock` in bytes.
pub max_block_size: usize,
/// Maximal size of the proto-encoded `attester::SyncBatch` in bytes.
pub max_batch_size: usize,
/// If a peer doesn't respond to a ping message within `ping_timeout`,
/// the connection is dropped.
/// `None` disables sending ping messages (useful for tests).
Expand Down
10 changes: 6 additions & 4 deletions node/actors/network/src/gossip/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,19 @@ impl<'a> PushBlockStoreStateServer<'a> {
/// Represents what we know about the state of available batches on the remote peer.
struct PushBatchStoreStateServer {
state: sync::watch::Sender<BatchStoreState>,
max_batch_size: usize,
}

impl PushBatchStoreStateServer {
/// Start out not knowing anything about the remote peer.
fn new() -> Self {
fn new(max_batch_size: usize) -> Self {
Self {
state: sync::watch::channel(BatchStoreState {
first: BatchNumber(0),
last: None,
})
.0,
max_batch_size,
}
}
}
Expand All @@ -113,7 +115,7 @@ impl rpc::Handler<rpc::push_block_store_state::Rpc> for &PushBlockStoreStateServ
#[async_trait]
impl rpc::Handler<rpc::push_batch_store_state::Rpc> for &PushBatchStoreStateServer {
fn max_req_size(&self) -> usize {
10 * kB
self.max_batch_size.saturating_add(kB)
}
async fn handle(
&self,
Expand Down Expand Up @@ -175,7 +177,7 @@ impl Network {
ctx,
self.cfg.rpc.push_batch_store_state_rate,
);
let push_batch_store_state_server = PushBatchStoreStateServer::new();
let push_batch_store_state_server = PushBatchStoreStateServer::new(self.cfg.max_batch_size);
scope::run!(ctx, |ctx, s| async {
let mut service = rpc::Service::new()
.add_client(&push_validator_addrs_client)
Expand Down Expand Up @@ -354,7 +356,7 @@ impl Network {
self.cfg.rpc.get_batch_timeout.map(|t| ctx.with_timeout(t));
let ctx = ctx_with_timeout.as_ref().unwrap_or(ctx);
let batch = call
.call(ctx, &req, self.cfg.max_block_size.saturating_add(kB))
.call(ctx, &req, self.cfg.max_batch_size.saturating_add(kB))
.await?
.0
.context("empty response")?;
Expand Down
2 changes: 2 additions & 0 deletions node/actors/network/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ where
static_outbound: HashMap::default(),
},
max_block_size: usize::MAX,
max_batch_size: usize::MAX,
tcp_accept_rate: limiter::Rate::INF,
rpc: RpcConfig::default(),
max_block_queue_size: 10,
Expand Down Expand Up @@ -146,6 +147,7 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config {
static_outbound: [(peer.gossip.key.public(), peer.public_addr.clone())].into(),
},
max_block_size: usize::MAX,
max_batch_size: usize::MAX,
tcp_accept_rate: limiter::Rate::INF,
rpc: RpcConfig::default(),
max_block_queue_size: 10,
Expand Down
82 changes: 73 additions & 9 deletions node/libs/storage/src/batch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub trait PersistentBatchStore: 'static + fmt::Debug + Send + Sync {
/// Returns `None` if no batches have been created yet.
async fn last_batch(&self, ctx: &ctx::Ctx) -> ctx::Result<Option<attester::BatchNumber>>;

/// Get the numbers of L1 batches which are missing the corresponding L1 batch quorum certificates
/// Get the earliest of L1 batches which are missing the corresponding L1 batch quorum certificates
/// and potentially need to be signed by attesters.
///
/// A replica might never have a complete history of L1 batch QCs; once the L1 batch is included on L1,
Expand All @@ -65,10 +65,10 @@ pub trait PersistentBatchStore: 'static + fmt::Debug + Send + Sync {
/// where it's still necessary to gossip votes; for example the main node will want to have a QC on
/// every batch while it's the one submitting them to L1, while replicas can ask the L1 what is considered
/// final.
async fn unsigned_batch_numbers(
async fn earliest_batch_number_to_sign(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Vec<attester::BatchNumber>>;
) -> ctx::Result<Option<attester::BatchNumber>>;

/// Get the L1 batch QC from storage with the highest number.
///
Expand Down Expand Up @@ -115,8 +115,25 @@ pub trait PersistentBatchStore: 'static + fmt::Debug + Send + Sync {
#[derive(Debug)]
struct Inner {
/// Batches that are queued to be persisted.
///
/// This reflects the state of the `cache`. Its source is mainly the gossip layer (the RPC protocols started in `Network::run_stream`):
/// * the node pushes `SyncBatch` records which appear in `queued` to its gossip peers
/// * the node pulls `SyncBatch` records that it needs from gossip peers that reported to have them, and adds them to `queued`
/// * the `BatchStoreRunner` looks for new items in `queued` and pushes them into the `PersistentBatchStore`
///
/// XXX: There doesn't seem to be anything that currently actively pushes into `queued` from outside gossip,
/// like it happens with the `BlockStore::queue_block` being called from BFT.
queued: BatchStoreState,
/// Batches that are already persisted.
///
/// This reflects the state of the database. Its source is mainly the `PersistentBatchStore`:
/// * the `BatchStoreRunner` subscribes to `PersistedBatchStore::persisted()` and copies its contents to here;
/// it also uses the opportunity to clear items from the `cache` but notably doesn't update `queued` which
/// which would cause the data to be gossiped
///
/// Be careful that the `BatchStoreState` works with `SyncBatch` which requires a `proof` of inclusion on L1,
/// so this persistence is much delayed compared to the latest batch physically available in the database:
/// the batch also has to be signed by attesters, submitted to L1, and finalised there to appear here.
persisted: BatchStoreState,
cache: VecDeque<attester::SyncBatch>,
}
Expand Down Expand Up @@ -258,23 +275,67 @@ impl BatchStore {
Ok(batch)
}

/// Retrieve the number of all batches that don't have a QC yet and potentially need to be signed.
/// Retrieve the maximum persisted batch number.
pub async fn last_batch_number(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
{
// let inner = self.inner.borrow();

// For now we ignore `queued` here because it's not clear how it's updated,
// validation is missing and it seems to depend entirely on gossip. Don't
// want it to somehow get us stuck and prevent voting. At least the persisted
// cache is maintained by two background processes copying the data from the DB.

// if let Some(ref batch) = inner.queued.last {
// return Ok(Some(batch.number));
// }

// We also have to ignore `persisted` because `last` is an instance of `SyncBatch`
// which is conceptually only available once we have a proof that it's been included
// on L1, which requires a signature in the first place.

// if let Some(ref batch) = inner.persisted.last {
// return Ok(Some(batch.number));
// }
}

// Get the last L1 batch that exists in the DB regardless of its status.
let batch = self
.persistent
.last_batch(ctx)
.await
.context("persistent.last_batch()")?;

Ok(batch)
}

/// Retrieve the minimum batch number that doesn't have a QC yet and potentially need to be signed.
///
/// There might be unsigned batches before this one in the database, however we don't consider it
/// necessary to sign them any more, because for example they have already been submitted to L1.
///
/// It returns only the numbers which follow the last finalized batch, that is, there might be batches
/// before the earliest in these numbers that isn't signed, but it would be futile to sign them any more.
pub async fn unsigned_batch_numbers(
/// There might also be signed batches *after* this one, due to the way gossiping works, but we
/// might still have to fill the gaps by (re)submitting our signature to allow them to be submitted.
///
/// Returns `None` if all existing batches are signed, or there are not batches yet to be signed at all.
pub async fn earliest_batch_number_to_sign(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Vec<attester::BatchNumber>> {
) -> ctx::Result<Option<attester::BatchNumber>> {
let unsigned = self
.persistent
.unsigned_batch_numbers(ctx)
.earliest_batch_number_to_sign(ctx)
.await
.context("persistent.get_batch_to_sign()")?;
Ok(unsigned)
}

/// Retrieve a batch to be signed.
///
/// This might be `None` even if the L1 batch already exists, because the commitment
/// in it is populated asynchronously.
pub async fn batch_to_sign(
&self,
ctx: &ctx::Ctx,
Expand All @@ -300,6 +361,9 @@ impl BatchStore {
batch: attester::SyncBatch,
_genesis: validator::Genesis,
) -> ctx::Result<()> {
// XXX: Once we can validate `SyncBatch::proof` we should do it before adding the
// batch to the cache, otherwise a malicious peer could serve us data that prevents
// other inputs from entering the queue. It will also cause it to be gossiped at the moment.
sync::wait_for(ctx, &mut self.inner.subscribe(), |inner| {
inner.queued.next() >= batch.number
})
Expand Down
7 changes: 3 additions & 4 deletions node/libs/storage/src/testonly/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,17 @@ impl PersistentBatchStore for BatchStore {
Ok(certs.get(last_batch_number).cloned())
}

async fn unsigned_batch_numbers(
async fn earliest_batch_number_to_sign(
&self,
_ctx: &ctx::Ctx,
) -> ctx::Result<Vec<attester::BatchNumber>> {
) -> ctx::Result<Option<attester::BatchNumber>> {
let batches = self.0.batches.lock().unwrap();
let certs = self.0.certs.lock().unwrap();

Ok(batches
.iter()
.map(|b| b.number)
.filter(|n| !certs.contains_key(n))
.collect())
.find(|n| !certs.contains_key(n)))
}

async fn get_batch_to_sign(
Expand Down
1 change: 1 addition & 0 deletions node/tools/src/bin/deployer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ fn generate_consensus_nodes(nodes: usize, seed_nodes_amount: Option<usize>) -> V
metrics_server_addr: None,
genesis: setup.genesis.clone(),
max_payload_size: 1000000,
max_batch_size: 100000000,
validator_key: Some(validator_keys[i].clone()),
attester_key: Some(attester_keys[i].clone()),
node_key: node_keys[i].clone(),
Expand Down
1 change: 1 addition & 0 deletions node/tools/src/bin/localnet_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ fn main() -> anyhow::Result<()> {
.map(|port| SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), port)),
genesis: setup.genesis.clone(),
max_payload_size: 1000000,
max_batch_size: 100000000,
node_key: node_keys[i].clone(),
validator_key: validator_keys.get(i).cloned(),
attester_key: attester_keys.get(i).cloned(),
Expand Down
Loading
Loading