Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Use earliest batch number to sign (BFT-474) #150

Merged
merged 9 commits into from
Jul 9, 2024
88 changes: 52 additions & 36 deletions node/actors/executor/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use anyhow::Context;
use zksync_concurrency::ctx;
use zksync_concurrency::time;
use zksync_consensus_network::gossip::BatchVotesPublisher;
use zksync_consensus_roles::attester::BatchNumber;
use zksync_consensus_storage::{BatchStore, BlockStore};

use crate::Attester;
Expand Down Expand Up @@ -39,51 +38,68 @@ impl AttesterRunner {
/// Poll the database for new L1 batches and publish our signature over the batch.
pub(super) async fn run(self, ctx: &ctx::Ctx) -> ctx::Result<()> {
let public_key = self.attester.key.public();
// The first batch number we want to publish our vote for. We don't have to re-publish a vote
// because once it enters the vote register even future peers can pull it from there.
let mut min_batch_number = BatchNumber(0);
loop {
// Pretend that the attesters can evolve.
let Some(attesters) = self.block_store.genesis().attesters.as_ref() else {
continue;
};
if !attesters.contains(&public_key) {
continue;
}

let mut unsigned_batch_numbers = self
.batch_store
.unsigned_batch_numbers(ctx)
.await
.context("unsigned_batch_numbers")?;
// TODO: In the future when we have attester rotation these checks will have to be checked inside the loop.
let Some(attesters) = self.block_store.genesis().attesters.as_ref() else {
tracing::warn!("Attester key is set, but the attester committee is empty.");
return Ok(());
};
if !attesters.contains(&public_key) {
tracing::warn!("Attester key is set, but not part of the attester committee.");
return Ok(());
}

// Just to be sure we go from smaller to higher batches.
unsigned_batch_numbers.sort();
// Find the initial range of batches that we want to (re)sign after a (re)start.
let mut last_batch_number = self
.batch_store
.last_batch_number(ctx)
.await
.context("last_batch_number")?
.unwrap_or_default();

for bn in unsigned_batch_numbers {
// If we have already voted on this we can move on, no need to fetch the payload again.
// Batches appear in the store in order, even if we might have QC for a newer and not for an older batch,
// so once published our vote for a certain height, we can expect that we only have to vote on newer batches.
if bn < min_batch_number {
continue;
}
let mut earliest_batch_number = self
.batch_store
.earliest_batch_number_to_sign(ctx)
.await
.context("earliest_batch_number_to_sign")?
.unwrap_or(last_batch_number);

if let Some(batch) = self
loop {
while earliest_batch_number <= last_batch_number {
// Try to get the next batch to sign; the commitment might not be available just yet.
let Some(batch) = self
.batch_store
.batch_to_sign(ctx, bn)
.batch_to_sign(ctx, earliest_batch_number)
.await
.context("batch_to_sign")?
{
min_batch_number = batch.number.next();
else {
break;
};

// The certificates might be collected out of order because of how gossip works;
// we could query the DB to see if we already have a QC, or we can just go ahead
// and publish our vote, and let others ignore it.

self.publisher
.publish(attesters, &self.attester.key, batch)
.await
.context("publish")?;
}
// We only have to publish a vote once; future peers can pull it from the register.
self.publisher
.publish(attesters, &self.attester.key, batch)
.await
.context("publish")?;

earliest_batch_number = earliest_batch_number.next();
}

// Wait some time before we poll the database again to see if there is a new batch to sign.
ctx.sleep(POLL_INTERVAL).await?;

// Refresh the upper end of the range.
if earliest_batch_number > last_batch_number {
last_batch_number = self
.batch_store
.last_batch_number(ctx)
.await
.context("last_batch_number")?
.unwrap_or(last_batch_number);
}
aakoshh marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
5 changes: 5 additions & 0 deletions node/actors/network/src/gossip/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ impl rpc::Handler<rpc::push_block_store_state::Rpc> for &PushBlockStoreStateServ
#[async_trait]
impl rpc::Handler<rpc::push_batch_store_state::Rpc> for &PushBatchStoreStateServer {
fn max_req_size(&self) -> usize {
// XXX: The request will actually contain a `SyncBatch` which has all the blocks in the batch,
// so a constant 10kB cannot be the right limit. There is a `max_block_size` in config which
// should come into play, with some other limit on the batch size.
10 * kB
}
async fn handle(
Expand Down Expand Up @@ -353,6 +356,8 @@ impl Network {
let ctx_with_timeout =
self.cfg.rpc.get_batch_timeout.map(|t| ctx.with_timeout(t));
let ctx = ctx_with_timeout.as_ref().unwrap_or(ctx);
// XXX: `max_block_size` isn't the right limit here as the response
// will contain all blocks of a batch.
let batch = call
.call(ctx, &req, self.cfg.max_block_size.saturating_add(kB))
.await?
Expand Down
65 changes: 56 additions & 9 deletions node/libs/storage/src/batch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub trait PersistentBatchStore: 'static + fmt::Debug + Send + Sync {
/// Returns `None` if no batches have been created yet.
async fn last_batch(&self, ctx: &ctx::Ctx) -> ctx::Result<Option<attester::BatchNumber>>;

/// Get the numbers of L1 batches which are missing the corresponding L1 batch quorum certificates
/// Get the earliest of L1 batches which are missing the corresponding L1 batch quorum certificates
/// and potentially need to be signed by attesters.
///
/// A replica might never have a complete history of L1 batch QCs; once the L1 batch is included on L1,
Expand All @@ -65,10 +65,10 @@ pub trait PersistentBatchStore: 'static + fmt::Debug + Send + Sync {
/// where it's still necessary to gossip votes; for example the main node will want to have a QC on
/// every batch while it's the one submitting them to L1, while replicas can ask the L1 what is considered
/// final.
async fn unsigned_batch_numbers(
async fn earliest_batch_number_to_sign(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Vec<attester::BatchNumber>>;
) -> ctx::Result<Option<attester::BatchNumber>>;

/// Get the L1 batch QC from storage with the highest number.
///
Expand Down Expand Up @@ -258,23 +258,67 @@ impl BatchStore {
Ok(batch)
}

/// Retrieve the number of all batches that don't have a QC yet and potentially need to be signed.
/// Retrieve the maximum persisted batch number.
pub async fn last_batch_number(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
{
// let inner = self.inner.borrow();

// For now we ignore `queued` here because it's not clear how it's updated,
// validation is missing and it seems to depend entirely on gossip. Don't
// want it to somehow get us stuck and prevent voting. At least the persisted
// cache is maintained by two background processes copying the data from the DB.

// if let Some(ref batch) = inner.queued.last {
// return Ok(Some(batch.number));
// }

// We also have to ignore `persisted` because `last` is an instance of `SyncBatch`
// which is conceptually only available once we have a proof that it's been included
// on L1, which requires a signature in the first place.

// if let Some(ref batch) = inner.persisted.last {
// return Ok(Some(batch.number));
// }
}

// Get the last L1 batch that exists in the DB regardless of its status.
let batch = self
.persistent
.last_batch(ctx)
.await
.context("persistent.last_batch()")?;

Ok(batch)
}

/// Retrieve the minimum batch number that doesn't have a QC yet and potentially need to be signed.
///
/// There might be unsigned batches before this one in the database, however we don't consider it
/// necessary to sign them any more, because for example they have already been submitted to L1.
///
/// It returns only the numbers which follow the last finalized batch, that is, there might be batches
/// before the earliest in these numbers that isn't signed, but it would be futile to sign them any more.
pub async fn unsigned_batch_numbers(
/// There might also be signed batches *after* this one, due to the way gossiping works, but we
/// might still have to fill the gaps by (re)submitting our signature to allow them to be submitted.
///
/// Returns `None` if all existing batches are signed, or there are not batches yet to be signed at all.
pub async fn earliest_batch_number_to_sign(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Vec<attester::BatchNumber>> {
) -> ctx::Result<Option<attester::BatchNumber>> {
let unsigned = self
.persistent
.unsigned_batch_numbers(ctx)
.earliest_batch_number_to_sign(ctx)
.await
.context("persistent.get_batch_to_sign()")?;
Ok(unsigned)
}

/// Retrieve a batch to be signed.
///
/// This might be `None` even if the L1 batch already exists, because the commitment
/// in it is populated asynchronously.
pub async fn batch_to_sign(
&self,
ctx: &ctx::Ctx,
Expand All @@ -300,6 +344,9 @@ impl BatchStore {
batch: attester::SyncBatch,
_genesis: validator::Genesis,
) -> ctx::Result<()> {
// XXX: Once we can validate `SyncBatch::proof` we should do it before adding the
// batch to the cache, otherwise a malicious peer could serve us data that prevents
// other inputs from entering the queue. It will also cause it to be gossiped at the moment.
sync::wait_for(ctx, &mut self.inner.subscribe(), |inner| {
inner.queued.next() >= batch.number
})
Expand Down
7 changes: 3 additions & 4 deletions node/libs/storage/src/testonly/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,17 @@ impl PersistentBatchStore for BatchStore {
Ok(certs.get(last_batch_number).cloned())
}

async fn unsigned_batch_numbers(
async fn earliest_batch_number_to_sign(
&self,
_ctx: &ctx::Ctx,
) -> ctx::Result<Vec<attester::BatchNumber>> {
) -> ctx::Result<Option<attester::BatchNumber>> {
let batches = self.0.batches.lock().unwrap();
let certs = self.0.certs.lock().unwrap();

Ok(batches
.iter()
.map(|b| b.number)
.filter(|n| !certs.contains_key(n))
.collect())
.find(|n| !certs.contains_key(n)))
}

async fn get_batch_to_sign(
Expand Down
Loading