Skip to content

Commit

Permalink
Support for dynamic attester committee (#175)
Browse files Browse the repository at this point in the history
It will support collecting votes for a single batch for now. I've
extended the rpc to support both pushing and pulling the votes (with
dynamic committee we cannot really collect future votes). I've reworked
the API to concentrate the api surface in one place.

I've also moved the rpc registry from rust code to protobuf enum to make
enforcing compatibility easier.

I'll prepare a corresponding PR in zksync-era before merging to make
sure that the integration is smooth e2e.

---------

Co-authored-by: Akosh Farkash <[email protected]>
  • Loading branch information
pompon0 and aakoshh authored Aug 16, 2024
1 parent 9a255d2 commit b9c5fa7
Show file tree
Hide file tree
Showing 39 changed files with 998 additions and 1,643 deletions.
256 changes: 0 additions & 256 deletions node/actors/executor/src/attestation.rs

This file was deleted.

6 changes: 3 additions & 3 deletions node/actors/executor/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ impl Dispatcher {
}

/// Method to start the IO dispatcher. It is simply a loop to receive messages from the actors and then forward them.
pub(super) async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
scope::run!(ctx, |ctx, s| async {
pub(super) async fn run(mut self, ctx: &ctx::Ctx) {
let _: ctx::OrCanceled<()> = scope::run!(ctx, |ctx, s| async {
// Start a task to handle the messages from the consensus actor.
s.spawn(async {
while let Ok(msg) = self.consensus_output.recv(ctx).await {
Expand Down Expand Up @@ -65,6 +65,6 @@ impl Dispatcher {

Ok(())
})
.await
.await;
}
}
47 changes: 11 additions & 36 deletions node/actors/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@
use crate::io::Dispatcher;
use anyhow::Context as _;
use network::http;
pub use network::RpcConfig;
pub use network::{gossip::attestation, RpcConfig};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use zksync_concurrency::{ctx, limiter, net, scope, time};
use zksync_consensus_bft as bft;
use zksync_consensus_network::{self as network, gossip::AttestationStatusWatch};
use zksync_consensus_roles::{attester, node, validator};
use zksync_consensus_network as network;
use zksync_consensus_roles::{node, validator};
use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore};
use zksync_consensus_utils::pipe;
use zksync_protobuf::kB;

pub mod attestation;
mod io;
#[cfg(test)]
mod tests;
Expand All @@ -31,13 +30,6 @@ pub struct Validator {
pub payload_manager: Box<dyn bft::PayloadManager>,
}

/// Validator-related part of [`Executor`].
#[derive(Debug)]
pub struct Attester {
/// Consensus network configuration.
pub key: attester::SecretKey,
}

/// Config of the node executor.
#[derive(Debug)]
pub struct Config {
Expand Down Expand Up @@ -98,10 +90,9 @@ pub struct Executor {
pub batch_store: Arc<BatchStore>,
/// Validator-specific node data.
pub validator: Option<Validator>,
/// Validator-specific node data.
pub attester: Option<Attester>,
/// Status showing where the main node wants attester to cast their votes.
pub attestation_status: Arc<AttestationStatusWatch>,
/// Attestation controller. Caller should actively configure the batch
/// for which the attestation votes should be collected.
pub attestation: Arc<attestation::Controller>,
}

impl Executor {
Expand All @@ -112,7 +103,6 @@ impl Executor {
public_addr: self.config.public_addr.clone(),
gossip: self.config.gossip(),
validator_key: self.validator.as_ref().map(|v| v.key.clone()),
attester_key: self.attester.as_ref().map(|a| a.key.clone()),
ping_timeout: Some(time::Duration::seconds(10)),
max_block_size: self.config.max_payload_size.saturating_add(kB),
max_batch_size: self.config.max_batch_size.saturating_add(kB),
Expand All @@ -137,35 +127,20 @@ impl Executor {

tracing::debug!("Starting actors in separate threads.");
scope::run!(ctx, |ctx, s| async {
s.spawn(async { dispatcher.run(ctx).await.context("IO Dispatcher stopped") });
s.spawn(async {
dispatcher.run(ctx).await;
Ok(())
});
let (net, runner) = network::Network::new(
network_config,
self.block_store.clone(),
self.batch_store.clone(),
network_actor_pipe,
self.attestation_status.clone(),
self.attestation,
);
net.register_metrics();
s.spawn(async { runner.run(ctx).await.context("Network stopped") });

if let Some(attester) = self.attester {
tracing::info!("Running the node in attester mode.");
let runner = attestation::AttesterRunner::new(
self.block_store.clone(),
self.batch_store.clone(),
attester,
net.batch_vote_publisher(),
self.attestation_status.subscribe(),
self.config.batch_poll_interval,
);
s.spawn(async {
runner.run(ctx).await?;
Ok(())
});
} else {
tracing::info!("Running the node in non-attester mode.");
}

if let Some(debug_config) = self.config.debug_page {
s.spawn(async {
http::DebugPageServer::new(debug_config, net)
Expand Down
Loading

0 comments on commit b9c5fa7

Please sign in to comment.