Skip to content

Commit

Permalink
[BFT-445] L1 batch gossip sync (#118)
Browse files Browse the repository at this point in the history
## What ❔

Enable syncing L1 batches by gossip sync.

## Why ❔

Just like blocks, nodes needs to fetch batches of blocks from other
peers.

---------

Co-authored-by: Grzegorz Prusak <[email protected]>
Co-authored-by: Ignacio Avecilla <[email protected]>
Co-authored-by: Bruno França <[email protected]>
  • Loading branch information
4 people authored Jun 25, 2024
1 parent 2dbb9a6 commit af168bf
Show file tree
Hide file tree
Showing 42 changed files with 1,852 additions and 236 deletions.
1 change: 1 addition & 0 deletions node/actors/bft/src/testonly/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub(super) struct Node {
pub(crate) net: network::Config,
pub(crate) behavior: Behavior,
pub(crate) block_store: Arc<storage::BlockStore>,
pub(crate) batch_store: Arc<storage::BatchStore>,
}

impl Node {
Expand Down
18 changes: 11 additions & 7 deletions node/actors/bft/src/testonly/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use zksync_concurrency::{
};
use zksync_consensus_network as network;
use zksync_consensus_roles::validator;
use zksync_consensus_storage::{testonly::new_store, BlockStore};
use zksync_consensus_storage::{testonly::TestMemoryStorage, BlockStore};
use zksync_consensus_utils::pipe;

pub(crate) enum Network {
Expand Down Expand Up @@ -134,15 +134,16 @@ impl Test {
let mut honest = vec![];
scope::run!(ctx, |ctx, s| async {
for (i, net) in nets.into_iter().enumerate() {
let (store, runner) = new_store(ctx, genesis).await;
s.spawn_bg(async { Ok(runner.run(ctx).await?) });
let store = TestMemoryStorage::new(ctx, genesis).await;
s.spawn_bg(async { Ok(store.runner.run(ctx).await?) });
if self.nodes[i].0 == Behavior::Honest {
honest.push(store.clone());
honest.push(store.blocks.clone());
}
nodes.push(Node {
net,
behavior: self.nodes[i].0,
block_store: store,
block_store: store.blocks,
batch_store: store.batches,
});
}
assert!(!honest.is_empty());
Expand Down Expand Up @@ -188,8 +189,11 @@ async fn run_nodes_real(ctx: &ctx::Ctx, specs: &[Node]) -> anyhow::Result<()> {
scope::run!(ctx, |ctx, s| async {
let mut nodes = vec![];
for (i, spec) in specs.iter().enumerate() {
let (node, runner) =
network::testonly::Instance::new(spec.net.clone(), spec.block_store.clone());
let (node, runner) = network::testonly::Instance::new(
spec.net.clone(),
spec.block_store.clone(),
spec.batch_store.clone(),
);
s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i)));
nodes.push(node);
}
Expand Down
17 changes: 7 additions & 10 deletions node/actors/bft/src/testonly/ut_harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ use zksync_consensus_roles::validator::{
self, CommitQC, LeaderCommit, LeaderPrepare, Phase, PrepareQC, ReplicaCommit, ReplicaPrepare,
SecretKey, Signed, ViewNumber,
};
use zksync_consensus_storage::{
testonly::{in_memory, new_store},
BlockStoreRunner,
};
use zksync_consensus_storage::testonly::{in_memory, TestMemoryStorage, TestMemoryStorageRunner};
use zksync_consensus_utils::enum_util::Variant;

pub(crate) const MAX_PAYLOAD_SIZE: usize = 1000;
Expand All @@ -41,7 +38,7 @@ impl UTHarness {
pub(crate) async fn new(
ctx: &ctx::Ctx,
num_validators: usize,
) -> (UTHarness, BlockStoreRunner) {
) -> (UTHarness, TestMemoryStorageRunner) {
Self::new_with_payload(
ctx,
num_validators,
Expand All @@ -54,15 +51,15 @@ impl UTHarness {
ctx: &ctx::Ctx,
num_validators: usize,
payload_manager: Box<dyn PayloadManager>,
) -> (UTHarness, BlockStoreRunner) {
) -> (UTHarness, TestMemoryStorageRunner) {
let rng = &mut ctx.rng();
let setup = validator::testonly::Setup::new(rng, num_validators);
let (block_store, runner) = new_store(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let (send, recv) = ctx::channel::unbounded();

let cfg = Arc::new(Config {
secret_key: setup.validator_keys[0].clone(),
block_store: block_store.clone(),
block_store: store.blocks.clone(),
replica_store: Box::new(in_memory::ReplicaStore::default()),
payload_manager,
max_payload_size: MAX_PAYLOAD_SIZE,
Expand All @@ -79,11 +76,11 @@ impl UTHarness {
leader_send,
};
let _: Signed<ReplicaPrepare> = this.try_recv().unwrap();
(this, runner)
(this, store.runner)
}

/// Creates a new `UTHarness` with minimally-significant validator set size.
pub(crate) async fn new_many(ctx: &ctx::Ctx) -> (UTHarness, BlockStoreRunner) {
pub(crate) async fn new_many(ctx: &ctx::Ctx) -> (UTHarness, TestMemoryStorageRunner) {
let num_validators = 6;
let (util, runner) = UTHarness::new(ctx, num_validators).await;
assert!(util.genesis().validators.max_faulty_weight() > 0);
Expand Down
25 changes: 20 additions & 5 deletions node/actors/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use std::{
use zksync_concurrency::{ctx, limiter, net, scope, time};
use zksync_consensus_bft as bft;
use zksync_consensus_network as network;
use zksync_consensus_roles::{node, validator};
use zksync_consensus_storage::{BlockStore, ReplicaStore};
use zksync_consensus_roles::{attester, node, validator};
use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore};
use zksync_consensus_utils::pipe;
use zksync_protobuf::kB;

Expand All @@ -29,6 +29,13 @@ pub struct Validator {
pub payload_manager: Box<dyn bft::PayloadManager>,
}

/// Validator-related part of [`Executor`].
#[derive(Debug)]
pub struct Attester {
/// Consensus network configuration.
pub key: attester::SecretKey,
}

/// Config of the node executor.
#[derive(Debug)]
pub struct Config {
Expand Down Expand Up @@ -75,8 +82,12 @@ pub struct Executor {
pub config: Config,
/// Block storage used by the node.
pub block_store: Arc<BlockStore>,
/// Batch storage used by the node.
pub batch_store: Arc<BatchStore>,
/// Validator-specific node data.
pub validator: Option<Validator>,
/// Validator-specific node data.
pub attester: Option<Attester>,
}

impl Executor {
Expand All @@ -87,6 +98,7 @@ impl Executor {
public_addr: self.config.public_addr.clone(),
gossip: self.config.gossip(),
validator_key: self.validator.as_ref().map(|v| v.key.clone()),
attester_key: self.attester.as_ref().map(|a| a.key.clone()),
ping_timeout: Some(time::Duration::seconds(10)),
max_block_size: self.config.max_payload_size.saturating_add(kB),
max_block_queue_size: 20,
Expand All @@ -111,9 +123,12 @@ impl Executor {
tracing::debug!("Starting actors in separate threads.");
scope::run!(ctx, |ctx, s| async {
s.spawn(async { dispatcher.run(ctx).await.context("IO Dispatcher stopped") });

let (net, runner) =
network::Network::new(network_config, self.block_store.clone(), network_actor_pipe);
let (net, runner) = network::Network::new(
network_config,
self.block_store.clone(),
self.batch_store.clone(),
network_actor_pipe,
);
net.register_metrics();
s.spawn(async { runner.run(ctx).await.context("Network stopped") });

Expand Down
Loading

0 comments on commit af168bf

Please sign in to comment.