Skip to content

Commit

Permalink
Added support for block store pruning (BFT-473) (#136)
Browse files Browse the repository at this point in the history
With this PR, gossipnet syncing will be able to behave correctly even
under aggressive pruning (i.e. even if we end up with no certified
blocks in storage at all).

---------

Co-authored-by: Bruno França <[email protected]>
  • Loading branch information
pompon0 and brunoffranca authored Jun 27, 2024
1 parent 1d1486f commit be2610e
Show file tree
Hide file tree
Showing 17 changed files with 288 additions and 57 deletions.
6 changes: 2 additions & 4 deletions node/actors/bft/src/testonly/twins/scenario.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::collections::BTreeSet;

use rand::{seq::SliceRandom, Rng};

use super::{splits, HasKey, Split, Twin};
use rand::{seq::SliceRandom, Rng};
use std::collections::BTreeSet;

/// A cluster holds all the nodes in the simulation, some of which are twins.
pub struct Cluster<T> {
Expand Down
9 changes: 3 additions & 6 deletions node/actors/bft/src/testonly/twins/tests.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
use super::{splits, Cluster, HasKey, ScenarioGenerator, Split, Twin};
use crate::testonly::twins::unique_key_count;
use rand::Rng;
use std::{
collections::{BTreeSet, HashSet},
fmt::Debug,
};

use rand::Rng;
use zksync_concurrency::ctx;

use crate::testonly::twins::unique_key_count;

use super::{splits, Cluster, HasKey, ScenarioGenerator, Split, Twin};

#[test]
fn test_splits() {
let got = splits(&["foo", "bar", "baz"], 2);
Expand Down
2 changes: 1 addition & 1 deletion node/actors/bft/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ async fn run_twins(

// A single scenario with 11 replicas took 3-5 seconds.
// Panic on timeout; works with `cargo nextest` and the `abort_on_panic` above.
let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30));
let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(60));
let ctx = &ctx::test_root(&ctx::RealClock);

#[derive(PartialEq, Debug)]
Expand Down
5 changes: 1 addition & 4 deletions node/actors/network/src/gossip/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,7 @@ impl Network {
loop {
let req = rpc::push_block_store_state::Req(state.clone());
push_block_store_state_client.call(ctx, &req, kB).await?;
state = self
.block_store
.wait_until_queued(ctx, state.next())
.await?;
state = self.block_store.wait_for_queued_change(ctx, &state).await?;
}
});

Expand Down
76 changes: 75 additions & 1 deletion node/actors/network/src/gossip/tests/fetch_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use rand::Rng as _;
use tracing::Instrument as _;
use zksync_concurrency::{ctx, limiter, scope, testonly::abort_on_panic};
use zksync_consensus_roles::validator;
use zksync_consensus_storage::{testonly::TestMemoryStorage, BlockStoreState};
use zksync_consensus_storage::{
testonly::{in_memory, TestMemoryStorage},
BlockStore, BlockStoreState, PersistentBlockStore as _,
};

#[tokio::test]
async fn test_simple() {
Expand Down Expand Up @@ -329,3 +332,74 @@ async fn test_retry() {
.await
.unwrap();
}

/// Test checking that if storage is truncated,
/// then the node announces that to peers.
#[tokio::test]
async fn test_announce_truncated_block_range() {
abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();
let mut setup = validator::testonly::Setup::new(rng, 1);
setup.push_blocks(rng, 10);
let mut cfg = crate::testonly::new_configs(rng, &setup, 0)[0].clone();
cfg.rpc.push_block_store_state_rate = limiter::Rate::INF;
cfg.rpc.get_block_rate = limiter::Rate::INF;
cfg.rpc.get_block_timeout = None;
cfg.validator_key = None;

scope::run!(ctx, |ctx, s| async {
// Build a custom persistent store, so that we can tweak it later.
let mut persistent =
in_memory::BlockStore::new(setup.genesis.clone(), setup.genesis.first_block);
let (block_store, runner) = BlockStore::new(ctx, Box::new(persistent.clone())).await?;
s.spawn_bg(runner.run(ctx));
// Use the standard batch store since it doesn't matter.
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let (_node, runner) =
crate::testonly::Instance::new(cfg.clone(), block_store, store.batches);
s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node")));
// Fill in all the blocks.
for b in &setup.blocks {
persistent.queue_next_block(ctx, b.clone()).await?;
}

// Connect to the node.
let (conn, runner) = gossip::testonly::connect(ctx, &cfg, setup.genesis.hash())
.await
.unwrap();
s.spawn_bg(async {
assert_matches!(runner.run(ctx).await, Err(mux::RunError::Canceled(_)));
Ok(())
});

let mut first = setup.genesis.first_block;
loop {
tracing::info!("Truncate up to {first}");
persistent.truncate(first);
first = first + 3;

// Listen to `PublicBlockStoreState` messages.
// Until it is consistent with storage.
loop {
let mut stream = conn
.open_server::<rpc::push_block_store_state::Rpc>(ctx)
.await?;
let state = stream.recv(ctx).await.unwrap();
stream.send(ctx, &()).await.unwrap();
if state.0 == *persistent.persisted().borrow() {
break;
}
}

// If there are no blocks left, we are done.
let left = persistent.persisted().borrow().clone();
if left.next() <= left.first {
break;
}
}
Ok(())
})
.await
.unwrap();
}
3 changes: 1 addition & 2 deletions node/actors/network/src/gossip/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::ValidatorAddrs;
use crate::{
gossip::{batch_votes::BatchVotesWatch, handshake, validator_addrs::ValidatorAddrsWatch},
metrics, preface, rpc, testonly,
Expand All @@ -19,8 +20,6 @@ use zksync_concurrency::{
use zksync_consensus_roles::{attester, validator};
use zksync_consensus_storage::testonly::TestMemoryStorage;

use super::ValidatorAddrs;

mod fetch_batches;
mod fetch_blocks;
mod syncing;
Expand Down
94 changes: 93 additions & 1 deletion node/actors/network/src/gossip/tests/syncing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use zksync_concurrency::{
time,
};
use zksync_consensus_roles::validator;
use zksync_consensus_storage::testonly::TestMemoryStorage;
use zksync_consensus_storage::{
testonly::{dump, in_memory, TestMemoryStorage},
BlockStore,
};

const EXCHANGED_STATE_COUNT: usize = 5;
const NETWORK_CONNECTIVITY_CASES: [(usize, usize); 5] = [(2, 1), (3, 2), (5, 3), (10, 4), (10, 7)];
Expand Down Expand Up @@ -414,3 +417,92 @@ async fn uncoordinated_batch_syncing(
.await
.unwrap();
}

/// Test checking that if blocks that weren't queued get persisted,
/// the syncing can behave accordingly.
#[tokio::test(flavor = "multi_thread")]
async fn test_sidechannel_sync() {
abort_on_panic();
let _guard = set_timeout(time::Duration::seconds(20));

let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();
let mut setup = validator::testonly::Setup::new(rng, 2);
setup.push_blocks(rng, 10);
let cfgs = testonly::new_configs(rng, &setup, 1);
scope::run!(ctx, |ctx, s| async {
let mut stores = vec![];
let mut nodes = vec![];
for (i, mut cfg) in cfgs.into_iter().enumerate() {
cfg.rpc.push_block_store_state_rate = limiter::Rate::INF;
cfg.rpc.get_block_rate = limiter::Rate::INF;
cfg.rpc.get_block_timeout = None;
cfg.validator_key = None;

// Build a custom persistent store, so that we can tweak it later.
let persistent =
in_memory::BlockStore::new(setup.genesis.clone(), setup.genesis.first_block);
stores.push(persistent.clone());
let (block_store, runner) = BlockStore::new(ctx, Box::new(persistent)).await?;
s.spawn_bg(runner.run(ctx));
// Use the standard batch store since it doesn't matter.
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let (node, runner) = testonly::Instance::new(cfg, block_store, store.batches);
s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i)));
nodes.push(node);
}

{
// Truncate at the start.
stores[1].truncate(setup.blocks[3].number());

// Sync a block prefix.
let prefix = &setup.blocks[0..5];
for b in prefix {
nodes[0]
.net
.gossip
.block_store
.queue_block(ctx, b.clone())
.await?;
}
nodes[1]
.net
.gossip
.block_store
.wait_until_persisted(ctx, prefix.last().unwrap().number())
.await?;

// Check that the expected block range is actually stored.
assert_eq!(setup.blocks[3..5], dump(ctx, &stores[1]).await);
}

{
// Truncate more than prefix.
stores[1].truncate(setup.blocks[8].number());

// Sync a block suffix.
let suffix = &setup.blocks[5..10];
for b in suffix {
nodes[0]
.net
.gossip
.block_store
.queue_block(ctx, b.clone())
.await?;
}
nodes[1]
.net
.gossip
.block_store
.wait_until_persisted(ctx, suffix.last().unwrap().number())
.await?;

// Check that the expected block range is actually stored.
assert_eq!(setup.blocks[8..10], dump(ctx, &stores[1]).await);
}
Ok(())
})
.await
.unwrap();
}
3 changes: 1 addition & 2 deletions node/actors/network/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Http Server to export debug information
use crate::{consensus, MeteredStreamStats, Network};
use anyhow::Context as _;
use base64::Engine;
use build_html::{Html, HtmlContainer, HtmlPage, Table, TableCell, TableCellType, TableRow};
Expand Down Expand Up @@ -29,8 +30,6 @@ use tokio_rustls::{
use zksync_concurrency::{ctx, scope};
use zksync_consensus_utils::debug_page;

use crate::{consensus, MeteredStreamStats, Network};

const STYLE: &str = include_str!("style.css");

/// Http debug page configuration.
Expand Down
3 changes: 1 addition & 2 deletions node/actors/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ pub mod testonly;
#[cfg(test)]
mod tests;
mod watch;
pub use metrics::MeteredStreamStats;

pub use config::*;
pub use metrics::MeteredStreamStats;

/// State of the network actor observable outside of the actor.
pub struct Network {
Expand Down
31 changes: 31 additions & 0 deletions node/libs/concurrency/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,23 @@ pub async fn notified(ctx: &ctx::Ctx, notify: &Notify) -> ctx::OrCanceled<()> {
ctx.wait(notify.notified()).await
}

/// Sends the modified value iff `f()` returns `Ok`.
/// Forwards the result of `f()` to the caller.
pub fn try_send_modify<T, R, E>(
send: &watch::Sender<T>,
f: impl FnOnce(&mut T) -> Result<R, E>,
) -> Result<R, E> {
let mut res = None;
send.send_if_modified(|v| {
let x = f(v);
let s = x.is_ok();
res = Some(x);
s
});
// safe, since `res` is set by `send_if_modified`.
res.unwrap()
}

/// Waits for a watch change notification.
/// Immediately borrows a reference to the new value.
pub async fn changed<'a, T>(
Expand Down Expand Up @@ -138,6 +155,20 @@ pub async fn wait_for<'a, T>(
Err(ctx::Canceled)
}

/// Waits until predicate is different than `None`.
pub async fn wait_for_some<T, R>(
ctx: &ctx::Ctx,
recv: &mut watch::Receiver<T>,
pred: impl Fn(&T) -> Option<R>,
) -> ctx::OrCanceled<R> {
recv.mark_changed();
loop {
if let Some(v) = pred(&*changed(ctx, recv).await?) {
return Ok(v);
}
}
}

struct ExclusiveLockInner<T> {
value: T,
drop_sender: oneshot::Sender<T>,
Expand Down
6 changes: 2 additions & 4 deletions node/libs/crypto/src/secp256k1/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
//! ECDSA signatures over the Secp256k1 curve, chosen to work with EVM precompiles.
use std::hash::Hash;

use crate::{keccak256::Keccak256, ByteFmt};
use anyhow::bail;
use std::hash::Hash;
use zeroize::ZeroizeOnDrop;

use crate::{keccak256::Keccak256, ByteFmt};

mod testonly;

#[cfg(test)]
Expand Down
6 changes: 2 additions & 4 deletions node/libs/crypto/src/secp256k1/tests.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::fmt::Debug;

use crate::{secp256k1::*, ByteFmt};
use rand::{
distributions::{Distribution, Standard},
rngs::StdRng,
Rng, SeedableRng,
};

use crate::{secp256k1::*, ByteFmt};
use std::fmt::Debug;

fn make_rng() -> StdRng {
StdRng::seed_from_u64(29483920)
Expand Down
4 changes: 1 addition & 3 deletions node/libs/roles/src/attester/keys/aggregate_signature.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use crate::attester::{Batch, MsgHash, Signers};
use std::fmt;

use zksync_consensus_crypto::{bls12_381, ByteFmt, Text, TextFmt};
use zksync_consensus_utils::enum_util::Variant;

use crate::attester::{Batch, MsgHash, Signers};

// TODO: Once EIP-2537 is merged, `attester::Signature` could be changed to point at the BLS signature.
// For now these are just a placeholders so we can keep an `AggregateSignature` around.
type BlsSignature = bls12_381::Signature;
Expand Down
3 changes: 1 addition & 2 deletions node/libs/roles/src/attester/testonly.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::validator::Payload;

use super::{
AggregateMultiSig, AggregateSignature, Batch, BatchNumber, BatchQC, Committee, Msg, MsgHash,
MultiSig, PublicKey, SecretKey, Signature, Signed, Signers, SyncBatch, WeightedAttester,
};
use crate::validator::Payload;
use bit_vec::BitVec;
use rand::{
distributions::{Distribution, Standard},
Expand Down
6 changes: 2 additions & 4 deletions node/libs/storage/src/batch_store.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
//! Defines storage layer for batches of blocks.
use anyhow::Context as _;
use std::collections::VecDeque;
use std::{fmt, sync::Arc};
use std::{collections::VecDeque, fmt, sync::Arc};
use zksync_concurrency::{ctx, scope, sync};
use zksync_consensus_roles::attester;
use zksync_consensus_roles::validator;
use zksync_consensus_roles::{attester, validator};

/// State of the `BatchStore`: continuous range of batches.
#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down
Loading

0 comments on commit be2610e

Please sign in to comment.