Skip to content

Commit

Permalink
improve: AttestationStatusWatch::next_batch_to_attest non-optional (B…
Browse files Browse the repository at this point in the history
…FT-496) (#165)

## What ❔

`AttestationStatusWatch` must be initialised with a `BatchNumber`, it
cannot have `None` any more. `AttestationStatusRunner::new` was replaced
with the `AttestationStatusRunner::init` method which asynchronously
polls the API until the first value is returned, and then returns itself
along with the `AttestationStatusWatch` it created. This can then be
passed to the `Executor`, while the `AttestationStatusRunner::run` will
keep the status up to date in the background.

## Why ❔

In the review of #161
it was observed that the `Executor` can wait until this data is
available. In theory it is only unavailable if the main node API is
down, in which case an external node couldn't pull Genesis either and
would probably fail during startup, or if the Genesis itself is still
under construction in the database, which is a transient state under
which an external node as mentioned would not start, and apparently the
main node doesn't need the `Executor` to get over it. By removing `None`
as an option for `next_batch_to_attest` the state is easier to reason
about.
  • Loading branch information
aakoshh authored Jul 31, 2024
1 parent bce5e3e commit 638b23e
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 55 deletions.
77 changes: 48 additions & 29 deletions node/actors/executor/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,9 @@ impl AttesterRunner {
self.status.mark_changed();

loop {
let Some(batch_number) = sync::changed(ctx, &mut self.status)
let batch_number = sync::changed(ctx, &mut self.status)
.await?
.next_batch_to_attest
else {
continue;
};
.next_batch_to_attest;

tracing::info!(%batch_number, "attestation status");

Expand Down Expand Up @@ -139,48 +136,70 @@ pub struct AttestationStatusRunner {
}

impl AttestationStatusRunner {
/// Create a new runner to poll the main node.
pub fn new(
status: Arc<AttestationStatusWatch>,
/// Create a new [AttestationStatusWatch] and an [AttestationStatusRunner] to poll the main node.
///
/// It polls the [AttestationStatusClient] until it returns a value to initialize the status with.
pub async fn init(
ctx: &ctx::Ctx,
client: Box<dyn AttestationStatusClient>,
poll_interval: time::Duration,
) -> Self {
Self {
status,
) -> ctx::OrCanceled<(Arc<AttestationStatusWatch>, Self)> {
let status = Arc::new(AttestationStatusWatch::new(attester::BatchNumber(0)));
let mut runner = Self {
status: status.clone(),
client,
poll_interval,
}
};
runner.poll_until_some(ctx).await?;
Ok((status, runner))
}

/// Runner based on a [BatchStore].
pub fn new_from_store(
status: Arc<AttestationStatusWatch>,
/// Initialize an [AttestationStatusWatch] based on a [BatchStore] and return it along with the [AttestationStatusRunner].
pub async fn init_from_store(
ctx: &ctx::Ctx,
store: Arc<BatchStore>,
poll_interval: time::Duration,
) -> Self {
Self::new(
status,
) -> ctx::OrCanceled<(Arc<AttestationStatusWatch>, Self)> {
Self::init(
ctx,
Box::new(LocalAttestationStatusClient(store)),
poll_interval,
)
.await
}

/// Run the poll loop.
pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
let _ = self.poll_forever(ctx).await;
Ok(())
}

/// Poll the client forever in a loop or until canceled.
async fn poll_forever(&mut self, ctx: &ctx::Ctx) -> ctx::OrCanceled<()> {
loop {
self.poll_until_some(ctx).await?;
ctx.sleep(self.poll_interval).await?;
}
}

/// Poll the client until some data is returned and write it into the status.
async fn poll_until_some(&mut self, ctx: &ctx::Ctx) -> ctx::OrCanceled<()> {
loop {
match self.client.next_batch_to_attest(ctx).await {
Ok(Some(batch_number)) => {
self.status.update(batch_number).await;
Ok(Some(next_batch_to_attest)) => {
self.status.update(next_batch_to_attest).await;
}
Ok(None) => {
tracing::debug!("waiting for attestation status...")
}
Err(error) => {
tracing::error!(
?error,
"failed to poll attestation status, retrying later..."
)
}
Ok(None) => tracing::debug!("waiting for attestation status..."),
Err(error) => tracing::error!(
?error,
"failed to poll attestation status, retrying later..."
),
}
if let Err(ctx::Canceled) = ctx.sleep(self.poll_interval).await {
return Ok(());
}
ctx.sleep(self.poll_interval).await?;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn config(cfg: &network::Config) -> Config {
/// The test executors below are not running with attesters, so we just create an [AttestationStatusWatch]
/// that will never be updated.
fn never_attest() -> Arc<AttestationStatusWatch> {
Arc::new(AttestationStatusWatch::default())
Arc::new(AttestationStatusWatch::new(attester::BatchNumber(0)))
}

fn validator(
Expand Down
19 changes: 9 additions & 10 deletions node/actors/network/src/gossip/attestation_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use crate::watch::Watch;
pub struct AttestationStatus {
/// Next batch number where voting is expected.
///
/// Its value is `None` until the background process polling the main node
/// can establish a value to start from.
pub next_batch_to_attest: Option<attester::BatchNumber>,
/// The node is expected to poll the main node during initialization until
/// the batch to start from is established.
pub next_batch_to_attest: attester::BatchNumber,
}

/// The subscription over the attestation status which voters can monitor for change.
Expand All @@ -29,15 +29,14 @@ impl fmt::Debug for AttestationStatusWatch {
}
}

impl Default for AttestationStatusWatch {
fn default() -> Self {
impl AttestationStatusWatch {
/// Create a new watch going from a specific batch number.
pub fn new(next_batch_to_attest: attester::BatchNumber) -> Self {
Self(Watch::new(AttestationStatus {
next_batch_to_attest: None,
next_batch_to_attest,
}))
}
}

impl AttestationStatusWatch {
/// Subscribes to AttestationStatus updates.
pub fn subscribe(&self) -> AttestationStatusReceiver {
self.0.subscribe()
Expand All @@ -47,10 +46,10 @@ impl AttestationStatusWatch {
pub async fn update(&self, next_batch_to_attest: attester::BatchNumber) {
let this = self.0.lock().await;
this.send_if_modified(|status| {
if status.next_batch_to_attest == Some(next_batch_to_attest) {
if status.next_batch_to_attest == next_batch_to_attest {
return false;
}
status.next_batch_to_attest = Some(next_batch_to_attest);
status.next_batch_to_attest = next_batch_to_attest;
true
});
}
Expand Down
7 changes: 2 additions & 5 deletions node/actors/network/src/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,9 @@ impl Network {

loop {
// Wait until the status indicates that we're ready to sign the next batch.
let Some(next_batch_number) = sync::changed(ctx, &mut recv_status)
let next_batch_number = sync::changed(ctx, &mut recv_status)
.await?
.next_batch_to_attest
else {
continue;
};
.next_batch_to_attest;

// Get rid of all previous votes. We don't expect this to go backwards without regenesis, which will involve a restart.
self.batch_votes
Expand Down
1 change: 1 addition & 0 deletions node/actors/network/src/gossip/tests/fetch_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ async fn test_simple() {
scope::run!(ctx, |ctx, s| async {
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
s.spawn_bg(store.runner.run(ctx));

let (_node, runner) = crate::testonly::Instance::new(
cfg.clone(),
store.blocks.clone(),
Expand Down
5 changes: 3 additions & 2 deletions node/actors/network/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use zksync_concurrency::{
ctx::{self, channel},
io, limiter, net, scope, sync, time,
};
use zksync_consensus_roles::{node, validator};
use zksync_consensus_roles::{attester, node, validator};
use zksync_consensus_storage::{BatchStore, BlockStore};
use zksync_consensus_utils::pipe;

Expand Down Expand Up @@ -199,7 +199,8 @@ impl Instance {
) -> (Self, InstanceRunner) {
// Semantically we'd want this to be created at the same level as the stores,
// but doing so would introduce a lot of extra cruft in setting up tests.
let attestation_status = Arc::new(AttestationStatusWatch::default());
let attestation_status =
Arc::new(AttestationStatusWatch::new(attester::BatchNumber::default()));

let (actor_pipe, dispatcher_pipe) = pipe::new();
let (net, net_runner) = Network::new(
Expand Down
16 changes: 8 additions & 8 deletions node/tools/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ use std::{
fs, io,
net::SocketAddr,
path::PathBuf,
sync::Arc,
};
use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer};
use zksync_concurrency::{ctx, net, scope, time};
use zksync_consensus_bft as bft;
use zksync_consensus_crypto::{read_optional_text, read_required_text, Text, TextFmt};
use zksync_consensus_executor::{self as executor, attestation::AttestationStatusRunner};
use zksync_consensus_network::{gossip::AttestationStatusWatch, http};
use zksync_consensus_network::http;
use zksync_consensus_roles::{attester, node, validator};
use zksync_consensus_storage::testonly::{TestMemoryStorage, TestMemoryStorageRunner};
use zksync_consensus_utils::debug_page;
Expand Down Expand Up @@ -265,12 +264,13 @@ impl Configs {
let store = TestMemoryStorage::new(ctx, &self.app.genesis).await;

// We don't have an API to poll in this setup, we can only create a local store based attestation client.
let attestation_status = Arc::new(AttestationStatusWatch::default());
let attestation_status_runner = AttestationStatusRunner::new_from_store(
attestation_status.clone(),
store.batches.clone(),
time::Duration::seconds(1),
);
let (attestation_status, attestation_status_runner) =
AttestationStatusRunner::init_from_store(
ctx,
store.batches.clone(),
time::Duration::seconds(1),
)
.await?;

let runner = TestExecutorRunner {
storage_runner: store.runner,
Expand Down

0 comments on commit 638b23e

Please sign in to comment.