Skip to content

Commit

Permalink
BFT-496: Move the polling of the API into a runner created where the …
Browse files Browse the repository at this point in the history
…storages are
  • Loading branch information
aakoshh committed Jul 30, 2024
1 parent 99ebe33 commit 40197f1
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 116 deletions.
4 changes: 3 additions & 1 deletion node/actors/bft/src/testonly/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use zksync_concurrency::{
},
oneshot, scope,
};
use zksync_consensus_network as network;
use zksync_consensus_network::{self as network};
use zksync_consensus_roles::validator;
use zksync_consensus_storage::{testonly::TestMemoryStorage, BlockStore};
use zksync_consensus_utils::pipe;
Expand Down Expand Up @@ -135,9 +135,11 @@ impl Test {
for (i, net) in nets.into_iter().enumerate() {
let store = TestMemoryStorage::new(ctx, genesis).await;
s.spawn_bg(async { Ok(store.runner.run(ctx).await?) });

if self.nodes[i].0 == Behavior::Honest {
honest.push(store.blocks.clone());
}

nodes.push(Node {
net,
behavior: self.nodes[i].0,
Expand Down
8 changes: 4 additions & 4 deletions node/actors/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
};
use zksync_concurrency::{ctx, limiter, net, scope, time};
use zksync_consensus_bft as bft;
use zksync_consensus_network::{self as network, gossip::AttestationStatusClient};
use zksync_consensus_network::{self as network, gossip::AttestationStatusWatch};
use zksync_consensus_roles::{attester, node, validator};
use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore};
use zksync_consensus_utils::pipe;
Expand Down Expand Up @@ -97,8 +97,8 @@ pub struct Executor {
pub validator: Option<Validator>,
/// Validator-specific node data.
pub attester: Option<Attester>,
/// Client to use to poll attestation status: either through the main node API or the DB.
pub attestation_status_client: Box<dyn AttestationStatusClient>,
/// Status showing where the main node wants attester to cast their votes.
pub attestation_status: Arc<AttestationStatusWatch>,
}

impl Executor {
Expand Down Expand Up @@ -140,7 +140,7 @@ impl Executor {
self.block_store.clone(),
self.batch_store.clone(),
network_actor_pipe,
self.attestation_status_client,
self.attestation_status.clone(),
);
net.register_metrics();
s.spawn(async { runner.run(ctx).await.context("Network stopped") });
Expand Down
21 changes: 7 additions & 14 deletions node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ use rand::Rng as _;
use tracing::Instrument as _;
use zksync_concurrency::testonly::abort_on_panic;
use zksync_consensus_bft as bft;
use zksync_consensus_network::{
gossip::LocalAttestationStatus,
testonly::{new_configs, new_fullnode},
};
use zksync_consensus_network::testonly::{new_configs, new_fullnode};
use zksync_consensus_roles::validator::{testonly::Setup, BlockNumber};
use zksync_consensus_storage::{
testonly::{in_memory, TestMemoryStorage},
Expand All @@ -29,12 +26,10 @@ fn config(cfg: &network::Config) -> Config {
}
}

/// The test executors below are not running with attesters, so it doesn't matter if the clients
/// are returning views based on the store of main node or each to their own. For simplicity this
/// returns an implementation that queries the local store of each instance. Alternatively we
/// could implement an instance that never queries anything.
fn mk_attestation_status_client(batch_store: &Arc<BatchStore>) -> impl AttestationStatusClient {
LocalAttestationStatus::new(batch_store.clone())
/// The test executors below are not running with attesters, so we just create an [AttestationStatusWatch]
/// that will never be updated.
fn never_attest() -> Arc<AttestationStatusWatch> {
Arc::new(AttestationStatusWatch::default())
}

fn validator(
Expand All @@ -43,7 +38,6 @@ fn validator(
batch_store: Arc<BatchStore>,
replica_store: impl ReplicaStore,
) -> Executor {
let attestation_status_client = Box::new(mk_attestation_status_client(&batch_store));
Executor {
config: config(cfg),
block_store,
Expand All @@ -54,7 +48,7 @@ fn validator(
payload_manager: Box::new(bft::testonly::RandomPayload(1000)),
}),
attester: None,
attestation_status_client,
attestation_status: never_attest(),
}
}

Expand All @@ -63,14 +57,13 @@ fn fullnode(
block_store: Arc<BlockStore>,
batch_store: Arc<BatchStore>,
) -> Executor {
let attestation_status_client = Box::new(mk_attestation_status_client(&batch_store));
Executor {
config: config(cfg),
block_store,
batch_store,
validator: None,
attester: None,
attestation_status_client,
attestation_status: never_attest(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions node/actors/network/src/consensus/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,11 @@ async fn test_address_change() {
// should get reconstructed.
cfgs[0].server_addr = net::tcp::testonly::reserve_listener();
cfgs[0].public_addr = (*cfgs[0].server_addr).into();

let (node0, runner) =
testonly::Instance::new(cfgs[0].clone(), store.blocks.clone(), store.batches.clone());
s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node0")));

nodes[0] = node0;
for n in &nodes {
n.wait_for_consensus_connections().await;
Expand Down
114 changes: 87 additions & 27 deletions node/actors/network/src/gossip/attestation_status.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::fmt;
use std::sync::Arc;

use zksync_concurrency::{ctx, sync};
use zksync_concurrency::{ctx, sync, time};
use zksync_consensus_roles::attester;
use zksync_consensus_storage::BatchStore;

Expand All @@ -9,7 +10,7 @@ use crate::watch::Watch;
/// An interface which is used by attesters and nodes collecting votes over gossip to determine
/// which is the next batch they are all supposed to be voting on, according to the main node.
#[async_trait::async_trait]
pub trait AttestationStatusClient: 'static + std::fmt::Debug + Send + Sync {
pub trait AttestationStatusClient: 'static + fmt::Debug + Send + Sync {
/// Get the next batch number for which the main node expects a batch QC to be formed.
///
/// The API might return an error while genesis is being created, which we represent with `None`
Expand All @@ -20,28 +21,8 @@ pub trait AttestationStatusClient: 'static + std::fmt::Debug + Send + Sync {
) -> ctx::Result<Option<attester::BatchNumber>>;
}

/// Implement the attestation status for the main node by returning the next to vote on from the [BatchStore].
#[derive(Debug, Clone)]
pub struct LocalAttestationStatus(Arc<BatchStore>);

impl LocalAttestationStatus {
/// Create local attestation client form a [BatchStore].
pub fn new(store: Arc<BatchStore>) -> Self {
Self(store)
}
}

#[async_trait::async_trait]
impl AttestationStatusClient for LocalAttestationStatus {
async fn next_batch_to_attest(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
self.0.next_batch_to_attest(ctx).await
}
}

/// Coordinate the attestation by showing the status as seen by the main node.
#[derive(Debug, Clone)]
pub struct AttestationStatus {
/// Next batch number where voting is expected.
///
Expand All @@ -50,12 +31,19 @@ pub struct AttestationStatus {
pub next_batch_to_attest: Option<attester::BatchNumber>,
}

/// The subscription over the attestation status which votes can monitor for change.
/// The subscription over the attestation status which voters can monitor for change.
pub type AttestationStatusReceiver = sync::watch::Receiver<AttestationStatus>;

/// A [Watch] over an [AttestationStatus] which we can use to notify components about
/// changes in the batch number the main node expects attesters to vote on.
pub(crate) struct AttestationStatusWatch(Watch<AttestationStatus>);
pub struct AttestationStatusWatch(Watch<AttestationStatus>);

impl fmt::Debug for AttestationStatusWatch {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("AttestationStatusWatch")
.finish_non_exhaustive()
}
}

impl Default for AttestationStatusWatch {
fn default() -> Self {
Expand All @@ -66,13 +54,23 @@ impl Default for AttestationStatusWatch {
}

impl AttestationStatusWatch {
/// Create a new [AttestationStatusWatch] paired up with an [AttestationStatusRunner] to keep it up to date.
pub fn new(
client: Box<dyn AttestationStatusClient>,
poll_interval: time::Duration,
) -> (Arc<Self>, AttestationStatusRunner) {
let status = Arc::new(AttestationStatusWatch::default());
let runner = AttestationStatusRunner::new(status.clone(), client, poll_interval);
(status, runner)
}

/// Subscribes to AttestationStatus updates.
pub(crate) fn subscribe(&self) -> AttestationStatusReceiver {
pub fn subscribe(&self) -> AttestationStatusReceiver {
self.0.subscribe()
}

/// Set the next batch number to attest on and notify subscribers it changed.
pub(crate) async fn update(&self, next_batch_to_attest: attester::BatchNumber) {
pub async fn update(&self, next_batch_to_attest: attester::BatchNumber) {
let this = self.0.lock().await;
this.send_if_modified(|status| {
if status.next_batch_to_attest == Some(next_batch_to_attest) {
Expand All @@ -83,3 +81,65 @@ impl AttestationStatusWatch {
});
}
}

/// Use an [AttestationStatusClient] to periodically poll the main node and update the [AttestationStatusWatch].
pub struct AttestationStatusRunner {
status: Arc<AttestationStatusWatch>,
client: Box<dyn AttestationStatusClient>,
poll_interval: time::Duration,
}

impl AttestationStatusRunner {
/// Create a new runner to poll the main node.
fn new(
status: Arc<AttestationStatusWatch>,
client: Box<dyn AttestationStatusClient>,
poll_interval: time::Duration,
) -> Self {
Self {
status,
client,
poll_interval,
}
}

/// Run the poll loop.
pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
loop {
match self.client.next_batch_to_attest(ctx).await {
Ok(Some(batch_number)) => {
self.status.update(batch_number).await;
}
Ok(None) => tracing::debug!("waiting for attestation status..."),
Err(error) => tracing::error!(
?error,
"failed to poll attestation status, retrying later..."
),
}
if let Err(ctx::Canceled) = ctx.sleep(self.poll_interval).await {
return Ok(());
}
}
}
}

/// Implement the attestation status for the main node by returning the next to vote on from the [BatchStore].
#[derive(Debug, Clone)]
pub struct LocalAttestationStatusClient(Arc<BatchStore>);

impl LocalAttestationStatusClient {
/// Create local attestation client form a [BatchStore].
pub fn new(store: Arc<BatchStore>) -> Self {
Self(store)
}
}

#[async_trait::async_trait]
impl AttestationStatusClient for LocalAttestationStatusClient {
async fn next_batch_to_attest(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
self.0.next_batch_to_attest(ctx).await
}
}
44 changes: 4 additions & 40 deletions node/actors/network/src/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@
//! eclipse attack. Dynamic connections are supposed to improve the properties of the gossip
//! network graph (minimize its diameter, increase connectedness).
pub use self::attestation_status::{
AttestationStatusClient, AttestationStatusReceiver, LocalAttestationStatus,
AttestationStatusClient, AttestationStatusReceiver, AttestationStatusRunner,
AttestationStatusWatch, LocalAttestationStatusClient,
};
pub use self::batch_votes::BatchVotesPublisher;
use self::batch_votes::BatchVotesWatch;
use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats};
use attestation_status::AttestationStatusWatch;
use fetch::RequestItem;
use std::sync::{atomic::AtomicUsize, Arc};
pub(crate) use validator_addrs::*;
use zksync_concurrency::time;
use zksync_concurrency::{ctx, ctx::channel, error::Wrap as _, scope, sync};
use zksync_consensus_roles::{node, validator};
use zksync_consensus_storage::{BatchStore, BlockStore};
Expand Down Expand Up @@ -66,8 +65,6 @@ pub(crate) struct Network {
pub(crate) push_validator_addrs_calls: AtomicUsize,
/// Shared watch over the current attestation status as indicated by the main node.
pub(crate) attestation_status: Arc<AttestationStatusWatch>,
/// Client to use to check the current attestation status on the main node.
pub(crate) attestation_status_client: Box<dyn AttestationStatusClient>,
}

impl Network {
Expand All @@ -77,7 +74,7 @@ impl Network {
block_store: Arc<BlockStore>,
batch_store: Arc<BatchStore>,
sender: channel::UnboundedSender<io::OutputMessage>,
attestation_status_client: Box<dyn AttestationStatusClient>,
attestation_status: Arc<AttestationStatusWatch>,
) -> Arc<Self> {
Arc::new(Self {
sender,
Expand All @@ -93,8 +90,7 @@ impl Network {
block_store,
batch_store,
push_validator_addrs_calls: 0.into(),
attestation_status: Arc::new(AttestationStatusWatch::default()),
attestation_status_client,
attestation_status,
})
}

Expand Down Expand Up @@ -210,36 +206,4 @@ impl Network {
.wrap("persist_batch_qc")?;
}
}

/// Poll the attestation status and update the watch.
pub(crate) async fn run_attestation_client(&self, ctx: &ctx::Ctx) -> ctx::Result<()> {
if self.genesis().attesters.is_none() {
tracing::info!("no attesters in genesis, not polling the attestation status");
return Ok(());
};

const POLL_INTERVAL: time::Duration = time::Duration::seconds(5);

loop {
match self
.attestation_status_client
.next_batch_to_attest(ctx)
.await
{
Ok(Some(batch_number)) => {
self.attestation_status.update(batch_number).await;
// We could also update the minimum batch number here, which might
// help mitigate the problem of missing a vote if the batch number
// happened to decrease. But we decided to fix it at the source,
// so the only place that is adjusted is before looking for a QC.
}
Ok(None) => tracing::debug!("waiting for attestation status..."),
Err(error) => tracing::error!(
?error,
"failed to poll attestation status, retrying later..."
),
}
ctx.sleep(POLL_INTERVAL).await?;
}
}
}
Loading

0 comments on commit 40197f1

Please sign in to comment.