Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into gprusak-payload-inter…
Browse files Browse the repository at this point in the history
…face
  • Loading branch information
pompon0 committed Nov 16, 2023
2 parents ba3bbda + b621d63 commit 88dcb42
Show file tree
Hide file tree
Showing 17 changed files with 1,685 additions and 1,180 deletions.
1 change: 1 addition & 0 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/actors/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tracing.workspace = true
vise.workspace = true

[dev-dependencies]
test-casing.workspace = true
tokio.workspace = true

[build-dependencies]
Expand Down
20 changes: 10 additions & 10 deletions node/actors/executor/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,33 +52,33 @@ impl FullValidatorConfig {
}
}

/// Creates a new external node and configures this validator to accept incoming connections from it.
pub fn connect_external_node(&mut self, rng: &mut impl Rng) -> ExternalNodeConfig {
let external_node_config = ExternalNodeConfig::new(rng, self);
/// Creates a new full node and configures this validator to accept incoming connections from it.
pub fn connect_full_node(&mut self, rng: &mut impl Rng) -> FullNodeConfig {
let full_node_config = FullNodeConfig::new(rng, self);
self.node_config
.gossip
.static_inbound
.insert(external_node_config.node_key.public());
external_node_config
.insert(full_node_config.node_key.public());
full_node_config
}
}

/// Configuration for an external node (i.e., non-validator node).
/// Configuration for a full non-validator node.
#[derive(Debug)]
#[non_exhaustive]
pub struct ExternalNodeConfig {
pub struct FullNodeConfig {
/// Executor configuration.
pub node_config: ExecutorConfig,
/// Secret key of the node used for identification in the gossip network.
pub node_key: node::SecretKey,
}

impl ExternalNodeConfig {
impl FullNodeConfig {
fn new(rng: &mut impl Rng, validator: &FullValidatorConfig) -> Self {
let node_key: node::SecretKey = rng.gen();
let external_node_addr = net::tcp::testonly::reserve_listener();
let full_node_addr = net::tcp::testonly::reserve_listener();
let node_config = ExecutorConfig {
server_addr: *external_node_addr,
server_addr: *full_node_addr,
gossip: GossipConfig {
key: node_key.public(),
static_outbound: HashMap::from([(
Expand Down
135 changes: 121 additions & 14 deletions node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,36 @@
//! High-level tests for `Executor`.
use super::*;
use crate::testonly::FullValidatorConfig;
use zksync_concurrency::{sync, testonly::abort_on_panic};
use zksync_consensus_roles::validator::{BlockNumber, Payload};
use rand::Rng;
use std::iter;
use test_casing::test_casing;
use zksync_concurrency::{sync, testonly::abort_on_panic, time};
use zksync_consensus_roles::validator::{BlockNumber, FinalBlock, Payload};
use zksync_consensus_storage::{BlockStore, InMemoryStorage};

impl FullValidatorConfig {
fn gen_blocks(&self, rng: &mut impl Rng, count: usize) -> Vec<FinalBlock> {
let genesis_block = self.node_config.genesis_block.clone();
let validators = &self.node_config.validators;
let blocks = iter::successors(Some(genesis_block), |parent| {
let payload: Payload = rng.gen();
let header = validator::BlockHeader {
parent: parent.header.hash(),
number: parent.header.number.next(),
payload: payload.hash(),
};
let commit = self.validator_key.sign_msg(validator::ReplicaCommit {
protocol_version: validator::CURRENT_VERSION,
view: validator::ViewNumber(header.number.0),
proposal: header,
});
let justification = validator::CommitQC::from(&[commit], validators).unwrap();
Some(FinalBlock::new(header, payload, justification))
});
blocks.skip(1).take(count).collect()
}

fn into_executor(self, storage: Arc<InMemoryStorage>) -> Executor<InMemoryStorage> {
let mut executor = Executor::new(self.node_config, self.node_key, storage.clone()).unwrap();
executor
Expand Down Expand Up @@ -41,35 +66,117 @@ async fn executing_single_validator() {
}

#[tokio::test]
async fn executing_validator_and_external_node() {
async fn executing_validator_and_full_node() {
abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0));
let rng = &mut ctx.rng();

let mut validator = FullValidatorConfig::for_single_validator(rng, Payload(vec![]));
let external_node = validator.connect_external_node(rng);
let full_node = validator.connect_full_node(rng);

let genesis_block = &validator.node_config.genesis_block;
let validator_storage = InMemoryStorage::new(genesis_block.clone());
let validator_storage = Arc::new(validator_storage);
let external_node_storage = InMemoryStorage::new(genesis_block.clone());
let external_node_storage = Arc::new(external_node_storage);
let mut en_subscriber = external_node_storage.subscribe_to_block_writes();
let full_node_storage = InMemoryStorage::new(genesis_block.clone());
let full_node_storage = Arc::new(full_node_storage);
let mut full_node_subscriber = full_node_storage.subscribe_to_block_writes();

let validator = validator.into_executor(validator_storage.clone());
let external_node = Executor::new(
external_node.node_config,
external_node.node_key,
external_node_storage.clone(),
let full_node = Executor::new(
full_node.node_config,
full_node.node_key,
full_node_storage.clone(),
)
.unwrap();

scope::run!(ctx, |ctx, s| async {
s.spawn_bg(validator.run(ctx));
s.spawn_bg(external_node.run(ctx));
s.spawn_bg(full_node.run(ctx));
for _ in 0..5 {
let number = *sync::changed(ctx, &mut en_subscriber).await?;
tracing::trace!(%number, "External node received block");
let number = *sync::changed(ctx, &mut full_node_subscriber).await?;
tracing::trace!(%number, "Full node received block");
}
anyhow::Ok(())
})
.await
.unwrap();
}

#[test_casing(2, [false, true])]
#[tokio::test]
async fn syncing_full_node_from_snapshot(delay_block_storage: bool) {
abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0));
let rng = &mut ctx.rng();

let mut validator = FullValidatorConfig::for_single_validator(rng, Payload(vec![]));
let full_node = validator.connect_full_node(rng);

let genesis_block = &validator.node_config.genesis_block;
let blocks = validator.gen_blocks(rng, 10);
let validator_storage = InMemoryStorage::new(genesis_block.clone());
let validator_storage = Arc::new(validator_storage);
if !delay_block_storage {
// Instead of running consensus on the validator, add the generated blocks manually.
for block in &blocks {
validator_storage.put_block(ctx, block).await.unwrap();
}
}
let validator = Executor::new(
validator.node_config,
validator.node_key,
validator_storage.clone(),
)
.unwrap();

// Start a full node from a snapshot.
let full_node_storage = InMemoryStorage::new(blocks[3].clone());
let full_node_storage = Arc::new(full_node_storage);
let mut full_node_subscriber = full_node_storage.subscribe_to_block_writes();

let full_node = Executor::new(
full_node.node_config,
full_node.node_key,
full_node_storage.clone(),
)
.unwrap();

scope::run!(ctx, |ctx, s| async {
s.spawn_bg(validator.run(ctx));
s.spawn_bg(full_node.run(ctx));

if delay_block_storage {
// Emulate the validator gradually adding new blocks to the storage.
s.spawn_bg(async {
for block in &blocks {
ctx.sleep(time::Duration::milliseconds(500)).await?;
validator_storage.put_block(ctx, block).await?;
}
Ok(())
});
}

loop {
let last_contiguous_full_node_block =
full_node_storage.last_contiguous_block_number(ctx).await?;
tracing::trace!(
%last_contiguous_full_node_block,
"Full node updated last contiguous block"
);
if last_contiguous_full_node_block == BlockNumber(10) {
break; // The full node has received all blocks!
}
// Wait until the node storage is updated.
let number = *sync::changed(ctx, &mut full_node_subscriber).await?;
tracing::trace!(%number, "Full node received block");
}

// Check that the node didn't receive any blocks with number lesser than the initial snapshot block.
for lesser_block_number in 0..3 {
let block = full_node_storage
.block(ctx, BlockNumber(lesser_block_number))
.await?;
assert!(block.is_none());
}
anyhow::Ok(())
})
Expand Down
2 changes: 0 additions & 2 deletions node/actors/sync_blocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ mod tests;
pub use crate::config::Config;
use crate::peers::PeerStates;

// FIXME(slowli): when run on validator node, the actor creates unnecessary `GetBlocks` requests

/// Block syncing actor responsible for synchronizing L2 blocks with other nodes.
#[derive(Debug)]
pub struct SyncBlocks {
Expand Down
43 changes: 34 additions & 9 deletions node/actors/sync_blocks/src/peers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use self::events::PeerStateEvent;
use crate::{io, Config};
use anyhow::Context as _;
use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, ops, sync::Arc};
use tracing::instrument;
use zksync_concurrency::{
ctx::{self, channel},
Expand All @@ -25,10 +25,18 @@ type PeerStateUpdate = (node::PublicKey, SyncState);

#[derive(Debug)]
struct PeerState {
first_stored_block: BlockNumber,
last_contiguous_stored_block: BlockNumber,
get_block_semaphore: Arc<Semaphore>,
}

impl PeerState {
fn has_block(&self, number: BlockNumber) -> bool {
let range = self.first_stored_block..=self.last_contiguous_stored_block;
range.contains(&number)
}
}

/// Handle for [`PeerStates`] allowing to send updates to it.
#[derive(Debug, Clone)]
pub(crate) struct PeerStatesHandle {
Expand Down Expand Up @@ -196,8 +204,8 @@ impl PeerStates {
peer_key: node::PublicKey,
state: SyncState,
) -> ctx::OrCanceled<BlockNumber> {
let last_contiguous_stored_block = match self.validate_sync_state(state) {
Ok(block_number) => block_number,
let numbers = match self.validate_sync_state(state) {
Ok(numbers) => numbers,
Err(err) => {
tracing::warn!(%err, "Invalid `SyncState` received from peer");
if let Some(events_sender) = &self.events_sender {
Expand All @@ -207,10 +215,13 @@ impl PeerStates {
// TODO: ban peer etc.
}
};
let first_stored_block = *numbers.start();
let last_contiguous_stored_block = *numbers.end();

let mut peers = sync::lock(ctx, &self.peers).await?;
let permits = self.config.max_concurrent_blocks_per_peer;
let peer_state = peers.entry(peer_key.clone()).or_insert_with(|| PeerState {
first_stored_block,
last_contiguous_stored_block,
get_block_semaphore: Arc::new(Semaphore::new(permits)),
});
Expand All @@ -223,6 +234,16 @@ impl PeerStates {
({last_contiguous_stored_block}) is lesser than the old value ({prev_contiguous_stored_block})"
);
}

peer_state.first_stored_block = first_stored_block;
// If `first_stored_block` increases, we could cancel getting pruned blocks from the peer here.
// However, the peer will respond such requests with a "missing block" error anyway,
// and new requests won't be routed to it because of updated `PeerState`,
// so having no special handling is fine.
// Likewise, no specialized handling is required for decreasing `first_stored_block`;
// if this leads to an ability to fetch some of the pending blocks, it'll be discovered
// after `sleep_interval_for_get_block` (i.e., soon enough).

tracing::trace!(
%prev_contiguous_stored_block,
%last_contiguous_stored_block,
Expand All @@ -237,7 +258,10 @@ impl PeerStates {
Ok(last_contiguous_stored_block)
}

fn validate_sync_state(&self, state: SyncState) -> anyhow::Result<BlockNumber> {
fn validate_sync_state(
&self,
state: SyncState,
) -> anyhow::Result<ops::RangeInclusive<BlockNumber>> {
let numbers = state.numbers();
anyhow::ensure!(
numbers.first_stored_block <= numbers.last_contiguous_stored_block,
Expand All @@ -252,10 +276,11 @@ impl PeerStates {
.last_contiguous_stored_block
.verify(&self.config.validator_set, self.config.consensus_threshold)
.context("Failed verifying `last_contiguous_stored_block`")?;
// We don't verify QCs for first and last stored blocks since they are not used
// in the following logic. To reflect this, the method consumes `SyncState` and returns
// the validated block number.
Ok(numbers.last_contiguous_stored_block)
// We don't verify QCs for the last stored block since it is not used
// in the following logic. The first stored block is not verified as well since it doesn't
// extend the set of blocks a peer should have. To reflect this, the method consumes `SyncState`
// and returns the validated block numbers.
Ok(numbers.first_stored_block..=numbers.last_contiguous_stored_block)
}

async fn get_and_save_block(
Expand Down Expand Up @@ -347,7 +372,7 @@ impl PeerStates {
) -> Option<(node::PublicKey, sync::OwnedSemaphorePermit)> {
let mut peers_with_no_permits = vec![];
let eligible_peers_info = peers.iter().filter(|(peer_key, state)| {
if state.last_contiguous_stored_block < block_number {
if !state.has_block(block_number) {
return false;
}
let available_permits = state.get_block_semaphore.available_permits();
Expand Down
Loading

0 comments on commit 88dcb42

Please sign in to comment.