diff --git a/node/Cargo.lock b/node/Cargo.lock index bbd75b91..4bde4f56 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -802,7 +802,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60f6d271ca33075c88028be6f04d502853d63a5ece419d269c15315d4fc1cf1d" dependencies = [ "pkcs8", - "serde", "signature", ] @@ -900,21 +899,6 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" -[[package]] -name = "futures" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" -dependencies = [ - "futures-channel", - "futures-core", - "futures-executor", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - [[package]] name = "futures-channel" version = "0.3.28" @@ -922,7 +906,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" dependencies = [ "futures-core", - "futures-sink", ] [[package]] @@ -931,40 +914,6 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" -[[package]] -name = "futures-executor" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", -] - -[[package]] -name = "futures-io" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" - -[[package]] -name = "futures-macro" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.32", -] - -[[package]] -name = "futures-sink" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" - [[package]] name = "futures-task" version = "0.3.28" @@ -977,16 +926,10 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ - "futures-channel", "futures-core", - "futures-io", - "futures-macro", - "futures-sink", "futures-task", - "memchr", "pin-project-lite", "pin-utils", - "slab", ] [[package]] @@ -1367,7 +1310,6 @@ dependencies = [ "async-trait", "concurrency", "crypto", - "futures", "im", "once_cell", "pin-project", @@ -2175,15 +2117,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "slab" -version = "0.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6528351c9bc8ab22353f9d776db39a20288e8d6c37ef8cfe3317cf875eecfc2d" -dependencies = [ - "autocfg", -] - [[package]] name = "smallvec" version = "1.11.0" diff --git a/node/Cargo.toml b/node/Cargo.toml index b22a7fb6..e1a35c99 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -31,10 +31,8 @@ ark-ec = "0.4.2" ark-serialize = { version = "0.4.2", features = ["std"] } num-traits = "0.2.17" clap = { version = "4.3.3", features = ["derive"] } -ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] } -futures = "0.3.28" +ed25519-dalek = { version = "2.0.0", features = ["rand_core"] } hex = "0.4.3" -hyper = { version = "0.14.27", features = ["http1", "http2", "server", "tcp"] } im = "15.1.0" once_cell = "1.17.1" pin-project = "1.1.0" diff --git a/node/actors/executor/Cargo.toml b/node/actors/executor/Cargo.toml index 574cc456..701a0a03 100644 --- a/node/actors/executor/Cargo.toml +++ b/node/actors/executor/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true [dependencies] anyhow.workspace = true +rand.workspace = true tracing.workspace = true vise.workspace = true @@ -23,5 +24,4 @@ network = { path = "../network" } sync_blocks = { path = "../sync_blocks" } [dev-dependencies] -rand.workspace = true tokio.workspace = true diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index 93e987de..a8c7f5ee 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -13,6 +13,7 @@ use utils::pipe; mod config; mod io; mod metrics; +pub mod testonly; #[cfg(test)] mod tests; diff --git a/node/actors/executor/src/testonly.rs b/node/actors/executor/src/testonly.rs new file mode 100644 index 00000000..c3ae74f2 --- /dev/null +++ b/node/actors/executor/src/testonly.rs @@ -0,0 +1,99 @@ +//! Testing extensions for node executor. + +use crate::config::{ConsensusConfig, ExecutorConfig, GossipConfig}; +use concurrency::net; +use consensus::testonly::make_genesis; +use network::testonly::Instance; +use rand::Rng; +use roles::{ + node, + validator::{self, Payload}, +}; +use std::collections::HashMap; + +/// Full validator configuration. +#[derive(Debug)] +#[non_exhaustive] +pub struct FullValidatorConfig { + /// Executor configuration. + pub node_config: ExecutorConfig, + /// Secret key of the node used for identification in the gossip network. + pub node_key: node::SecretKey, + /// Consensus configuration of the validator. + pub consensus_config: ConsensusConfig, + /// Secret key for consensus. + pub validator_key: validator::SecretKey, +} + +impl FullValidatorConfig { + /// Generates a validator config for a network with a single validator. + pub fn for_single_validator(rng: &mut impl Rng, genesis_block_payload: Payload) -> Self { + let mut net_configs = Instance::new_configs(rng, 1, 0); + assert_eq!(net_configs.len(), 1); + let net_config = net_configs.pop().unwrap(); + let consensus_config = net_config.consensus.unwrap(); + let validator_key = consensus_config.key.clone(); + let consensus_config = ConsensusConfig::from(consensus_config); + + let (genesis_block, validators) = + make_genesis(&[validator_key.clone()], genesis_block_payload); + let node_key = net_config.gossip.key.clone(); + let node_config = ExecutorConfig { + server_addr: *net_config.server_addr, + gossip: net_config.gossip.into(), + genesis_block, + validators, + }; + + Self { + node_config, + node_key, + consensus_config, + validator_key, + } + } + + /// Creates a new external node and configures this validator to accept incoming connections from it. + pub fn connect_external_node(&mut self, rng: &mut impl Rng) -> ExternalNodeConfig { + let external_node_config = ExternalNodeConfig::new(rng, self); + self.node_config + .gossip + .static_inbound + .insert(external_node_config.node_key.public()); + external_node_config + } +} + +/// Configuration for an external node (i.e., non-validator node). +#[derive(Debug)] +#[non_exhaustive] +pub struct ExternalNodeConfig { + /// Executor configuration. + pub node_config: ExecutorConfig, + /// Secret key of the node used for identification in the gossip network. + pub node_key: node::SecretKey, +} + +impl ExternalNodeConfig { + fn new(rng: &mut impl Rng, validator: &FullValidatorConfig) -> Self { + let node_key: node::SecretKey = rng.gen(); + let external_node_addr = net::tcp::testonly::reserve_listener(); + let node_config = ExecutorConfig { + server_addr: *external_node_addr, + gossip: GossipConfig { + key: node_key.public(), + static_outbound: HashMap::from([( + validator.node_key.public(), + validator.node_config.server_addr, + )]), + ..validator.node_config.gossip.clone() + }, + ..validator.node_config.clone() + }; + + Self { + node_config, + node_key, + } + } +} diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index ac335e8c..606818a8 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -1,24 +1,11 @@ //! High-level tests for `Executor`. use super::*; +use crate::testonly::FullValidatorConfig; use concurrency::sync; -use consensus::testonly::make_genesis; -use network::testonly::Instance; -use rand::Rng; use roles::validator::{BlockNumber, Payload}; -use std::collections::HashMap; use storage::{BlockStore, InMemoryStorage, StorageError}; -async fn run_executor(ctx: &ctx::Ctx, executor: Executor) -> anyhow::Result<()> { - executor.run(ctx).await.or_else(|err| { - if err.root_cause().is::() { - Ok(()) // Test has successfully finished - } else { - Err(err) - } - }) -} - async fn store_final_blocks( ctx: &ctx::Ctx, mut blocks_receiver: channel::UnboundedReceiver, @@ -37,40 +24,7 @@ async fn store_final_blocks( Ok(()) } -#[derive(Debug)] -struct FullValidatorConfig { - node_config: ExecutorConfig, - node_key: node::SecretKey, - consensus_config: ConsensusConfig, - validator_key: validator::SecretKey, -} - impl FullValidatorConfig { - fn for_single_validator(rng: &mut impl Rng) -> Self { - let mut net_configs = Instance::new_configs(rng, 1, 0); - assert_eq!(net_configs.len(), 1); - let net_config = net_configs.pop().unwrap(); - let consensus_config = net_config.consensus.unwrap(); - let validator_key = consensus_config.key.clone(); - let consensus_config = ConsensusConfig::from(consensus_config); - - let (genesis_block, validators) = make_genesis(&[validator_key.clone()], Payload(vec![])); - let node_key = net_config.gossip.key.clone(); - let node_config = ExecutorConfig { - server_addr: *net_config.server_addr, - gossip: net_config.gossip.into(), - genesis_block, - validators, - }; - - Self { - node_config, - node_key, - consensus_config, - validator_key, - } - } - fn into_executor( self, storage: Arc, @@ -98,14 +52,14 @@ async fn executing_single_validator() { let ctx = &ctx::root(); let rng = &mut ctx.rng(); - let validator = FullValidatorConfig::for_single_validator(rng); + let validator = FullValidatorConfig::for_single_validator(rng, Payload(vec![])); let genesis_block = &validator.node_config.genesis_block; let storage = InMemoryStorage::new(genesis_block.clone()); let storage = Arc::new(storage); let (executor, mut blocks_receiver) = validator.into_executor(storage); scope::run!(ctx, |ctx, s| async { - s.spawn_bg(run_executor(ctx, executor)); + s.spawn_bg(executor.run(ctx)); let mut expected_block_number = BlockNumber(1); while expected_block_number < BlockNumber(5) { @@ -126,28 +80,8 @@ async fn executing_validator_and_external_node() { let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0)); let rng = &mut ctx.rng(); - let mut validator = FullValidatorConfig::for_single_validator(rng); - - let external_node_key: node::SecretKey = rng.gen(); - let external_node_addr = net::tcp::testonly::reserve_listener(); - let external_node_config = ExecutorConfig { - server_addr: *external_node_addr, - gossip: GossipConfig { - key: external_node_key.public(), - static_outbound: HashMap::from([( - validator.node_key.public(), - validator.node_config.server_addr, - )]), - ..validator.node_config.gossip.clone() - }, - ..validator.node_config.clone() - }; - - validator - .node_config - .gossip - .static_inbound - .insert(external_node_key.public()); + let mut validator = FullValidatorConfig::for_single_validator(rng, Payload(vec![])); + let external_node = validator.connect_external_node(rng); let genesis_block = &validator.node_config.genesis_block; let validator_storage = InMemoryStorage::new(genesis_block.clone()); @@ -158,15 +92,15 @@ async fn executing_validator_and_external_node() { let (validator, blocks_receiver) = validator.into_executor(validator_storage.clone()); let external_node = Executor::new( - external_node_config, - external_node_key, + external_node.node_config, + external_node.node_key, external_node_storage.clone(), ) .unwrap(); scope::run!(ctx, |ctx, s| async { - s.spawn_bg(run_executor(ctx, validator)); - s.spawn_bg(run_executor(ctx, external_node)); + s.spawn_bg(validator.run(ctx)); + s.spawn_bg(external_node.run(ctx)); s.spawn_bg(store_final_blocks(ctx, blocks_receiver, validator_storage)); for _ in 0..5 { diff --git a/node/actors/network/Cargo.toml b/node/actors/network/Cargo.toml index 7d24287a..cfa3643b 100644 --- a/node/actors/network/Cargo.toml +++ b/node/actors/network/Cargo.toml @@ -25,7 +25,6 @@ schema = { path = "../../libs/schema" } utils = { path = "../../libs/utils" } [dev-dependencies] -futures.workspace = true pretty_assertions.workspace = true test-casing.workspace = true tokio.workspace = true diff --git a/node/actors/network/src/consensus/tests.rs b/node/actors/network/src/consensus/tests.rs index faf83064..089e47ec 100644 --- a/node/actors/network/src/consensus/tests.rs +++ b/node/actors/network/src/consensus/tests.rs @@ -16,12 +16,12 @@ async fn test_one_connection_per_validator() { let mut nodes = testonly::Instance::new(rng, 3, 1); scope::run!(ctx, |ctx,s| async { - for (i,n) in nodes.iter().enumerate() { + for (i, node) in nodes.iter().enumerate() { let (network_pipe, _) = pipe::new(); s.spawn_bg(run_network( ctx, - n.state.clone(), + node.state.clone(), network_pipe ).instrument(tracing::info_span!("node", i))); } diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index dcc79baf..4e240355 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -362,7 +362,7 @@ pub(crate) async fn run_client( sender: &channel::UnboundedSender, mut receiver: channel::UnboundedReceiver, ) -> anyhow::Result<()> { - let res = scope::run!(ctx, |ctx, s| async { + scope::run!(ctx, |ctx, s| async { // Spawn a tasks handling static outbound connections. for (peer, addr) in &state.gossip.cfg.static_outbound { s.spawn::<()>(async { @@ -394,8 +394,8 @@ pub(crate) async fn run_client( Ok(()) } }) - .await; + .await + .ok(); - assert_eq!(Err(ctx::Canceled), res); Ok(()) } diff --git a/node/actors/network/src/gossip/tests.rs b/node/actors/network/src/gossip/tests.rs index 77ad9f63..b3c44754 100644 --- a/node/actors/network/src/gossip/tests.rs +++ b/node/actors/network/src/gossip/tests.rs @@ -27,12 +27,12 @@ async fn test_one_connection_per_node() { let mut nodes: Vec<_> = testonly::Instance::new(rng, 5, 2); scope::run!(ctx, |ctx,s| async { - for n in &nodes { + for node in &nodes { let (network_pipe, _) = pipe::new(); s.spawn_bg(run_network( ctx, - n.state.clone(), + node.state.clone(), network_pipe )); } diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index fd047daf..72007f28 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -53,26 +53,26 @@ impl Instance { pub fn new_configs(rng: &mut R, n: usize, gossip_peers: usize) -> Vec { let keys: Vec = (0..n).map(|_| rng.gen()).collect(); let validators = validator::ValidatorSet::new(keys.iter().map(|k| k.public())).unwrap(); - let mut cfgs: Vec<_> = (0..n) - .map(|i| { - let addr = net::tcp::testonly::reserve_listener(); - Config { - server_addr: addr, - validators: validators.clone(), - consensus: Some(consensus::Config { - key: keys[i].clone(), - public_addr: *addr, - }), - gossip: gossip::Config { - key: rng.gen(), - dynamic_inbound_limit: n as u64, - static_inbound: HashSet::default(), - static_outbound: HashMap::default(), - enable_pings: true, - }, - } - }) - .collect(); + let configs = keys.iter().map(|key| { + let addr = net::tcp::testonly::reserve_listener(); + Config { + server_addr: addr, + validators: validators.clone(), + consensus: Some(consensus::Config { + key: key.clone(), + public_addr: *addr, + }), + gossip: gossip::Config { + key: rng.gen(), + dynamic_inbound_limit: n as u64, + static_inbound: HashSet::default(), + static_outbound: HashMap::default(), + enable_pings: true, + }, + } + }); + let mut cfgs: Vec<_> = configs.collect(); + for i in 0..cfgs.len() { for j in 0..gossip_peers { let j = (i + j + 1) % n; diff --git a/node/actors/network/src/tests.rs b/node/actors/network/src/tests.rs index fda4675c..ce42f70b 100644 --- a/node/actors/network/src/tests.rs +++ b/node/actors/network/src/tests.rs @@ -12,10 +12,10 @@ async fn test_metrics() { let rng = &mut ctx.rng(); let nodes = testonly::Instance::new(rng, 3, 1); scope::run!(ctx, |ctx, s| async { - for (i, n) in nodes.iter().enumerate() { + for (i, node) in nodes.iter().enumerate() { let (network_pipe, _) = pipe::new(); s.spawn_bg( - run_network(ctx, n.state.clone(), network_pipe) + run_network(ctx, node.state.clone(), network_pipe) .instrument(tracing::info_span!("node", i)), ); } diff --git a/node/actors/sync_blocks/src/lib.rs b/node/actors/sync_blocks/src/lib.rs index a8028b51..6992470a 100644 --- a/node/actors/sync_blocks/src/lib.rs +++ b/node/actors/sync_blocks/src/lib.rs @@ -13,7 +13,7 @@ use concurrency::{ }; use network::io::SyncState; use std::sync::Arc; -use storage::WriteBlockStore; +use storage::{StorageError, StorageResult, WriteBlockStore}; use tracing::instrument; use utils::pipe::ActorPipe; @@ -72,12 +72,19 @@ impl SyncBlocks { pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { let storage = self.message_handler.storage.clone(); - scope::run!(ctx, |ctx, s| async { + let result = scope::run!(ctx, |ctx, s| async { s.spawn_bg(Self::emit_state_updates(ctx, storage, &self.state_sender)); s.spawn_bg(self.peer_states.run(ctx)); self.message_handler.process_messages(ctx).await }) - .await + .await; + + // Since we clearly type cancellation errors, it's easier propagate them up to this entry point, + // rather than catching in the constituent tasks. + result.or_else(|err| match err { + StorageError::Canceled(_) => Ok(()), // Cancellation is not propagated as an error + StorageError::Database(err) => Err(err), + }) } #[instrument(level = "trace", skip_all, err)] @@ -85,7 +92,7 @@ impl SyncBlocks { ctx: &ctx::Ctx, storage: Arc, state_sender: &watch::Sender, - ) -> anyhow::Result<()> { + ) -> StorageResult<()> { let mut storage_subscriber = storage.subscribe_to_block_writes(); loop { let state = Self::get_sync_state(ctx, storage.as_ref()).await?; @@ -104,7 +111,7 @@ impl SyncBlocks { async fn get_sync_state( ctx: &ctx::Ctx, storage: &dyn WriteBlockStore, - ) -> anyhow::Result { + ) -> StorageResult { let last_contiguous_block_number = storage.last_contiguous_block_number(ctx).await?; let last_contiguous_stored_block = storage .block(ctx, last_contiguous_block_number) diff --git a/node/actors/sync_blocks/src/message_handler.rs b/node/actors/sync_blocks/src/message_handler.rs index e3779c54..16343134 100644 --- a/node/actors/sync_blocks/src/message_handler.rs +++ b/node/actors/sync_blocks/src/message_handler.rs @@ -5,7 +5,7 @@ use concurrency::ctx::{self, channel}; use network::io::{GetBlockError, GetBlockResponse, SyncBlocksRequest}; use roles::validator::BlockNumber; use std::sync::Arc; -use storage::WriteBlockStore; +use storage::{StorageResult, WriteBlockStore}; use tracing::instrument; /// Inner details of `SyncBlocks` actor allowing to process messages. @@ -22,9 +22,8 @@ pub(crate) struct SyncBlocksMessageHandler { impl SyncBlocksMessageHandler { /// Implements the message processing loop. #[instrument(level = "trace", skip_all, err)] - pub(crate) async fn process_messages(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - loop { - let input_message = self.message_receiver.recv(ctx).await?; + pub(crate) async fn process_messages(mut self, ctx: &ctx::Ctx) -> StorageResult<()> { + while let Ok(input_message) = self.message_receiver.recv(ctx).await { match input_message { InputMessage::Network(SyncBlocksRequest::UpdatePeerSyncState { peer, @@ -42,6 +41,7 @@ impl SyncBlocksMessageHandler { } } } + Ok(()) } /// Gets a block with the specified `number` from the storage. @@ -52,7 +52,7 @@ impl SyncBlocksMessageHandler { &self, ctx: &ctx::Ctx, number: BlockNumber, - ) -> anyhow::Result { + ) -> StorageResult { Ok(self .storage .block(ctx, number) diff --git a/node/actors/sync_blocks/src/peers/mod.rs b/node/actors/sync_blocks/src/peers/mod.rs index 4485a386..1e2d5da6 100644 --- a/node/actors/sync_blocks/src/peers/mod.rs +++ b/node/actors/sync_blocks/src/peers/mod.rs @@ -14,7 +14,7 @@ use roles::{ validator::{BlockHeader, BlockNumber, FinalBlock, PayloadHash}, }; use std::{collections::HashMap, sync::Arc}; -use storage::WriteBlockStore; +use storage::{StorageResult, WriteBlockStore}; use tracing::instrument; mod events; @@ -78,7 +78,7 @@ impl PeerStates { /// 1. Get information about missing blocks from the storage. /// 2. Spawn a task processing `SyncState`s from peers. /// 3. Spawn a task to get each missing block. - pub(crate) async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + pub(crate) async fn run(mut self, ctx: &ctx::Ctx) -> StorageResult<()> { let updates_receiver = self.updates_receiver.take().unwrap(); let storage = self.storage.as_ref(); let get_block_semaphore = Semaphore::new(self.config.max_concurrent_blocks); @@ -123,7 +123,7 @@ impl PeerStates { ctx: &ctx::Ctx, mut updates_receiver: channel::UnboundedReceiver, new_blocks_sender: watch::Sender, - ) -> anyhow::Result<()> { + ) -> StorageResult<()> { loop { let (peer_key, sync_state) = updates_receiver.recv(ctx).await?; let new_last_block_number = self @@ -151,7 +151,7 @@ impl PeerStates { ctx: &ctx::Ctx, peer_key: node::PublicKey, state: SyncState, - ) -> anyhow::Result { + ) -> ctx::OrCanceled { let last_contiguous_stored_block = match self.validate_sync_state(state) { Ok(block_number) => block_number, Err(err) => { @@ -220,7 +220,7 @@ impl PeerStates { block_number: BlockNumber, get_block_permit: sync::SemaphorePermit<'_>, storage: &dyn WriteBlockStore, - ) -> anyhow::Result<()> { + ) -> StorageResult<()> { let block = self.get_block(ctx, block_number).await?; drop(get_block_permit); @@ -236,7 +236,7 @@ impl PeerStates { &self, ctx: &ctx::Ctx, block_number: BlockNumber, - ) -> anyhow::Result { + ) -> ctx::OrCanceled { loop { let Some((peer_key, _permit)) = Self::acquire_peer_permit(&*sync::lock(ctx, &self.peers).await?, block_number) @@ -315,7 +315,7 @@ impl PeerStates { ctx: &ctx::Ctx, recipient: node::PublicKey, number: BlockNumber, - ) -> anyhow::Result> { + ) -> ctx::OrCanceled> { let (response, response_receiver) = oneshot::channel(); let message = SyncBlocksInputMessage::GetBlock { recipient: recipient.clone(), @@ -378,7 +378,7 @@ impl PeerStates { &self, ctx: &ctx::Ctx, peer_key: &node::PublicKey, - ) -> anyhow::Result<()> { + ) -> ctx::OrCanceled<()> { let mut peers = sync::lock(ctx, &self.peers).await?; if let Some(state) = peers.remove(peer_key) { tracing::trace!(?state, "Dropping peer connection state"); diff --git a/node/actors/sync_blocks/src/peers/tests.rs b/node/actors/sync_blocks/src/peers/tests.rs index 9dcf01ad..6eea24fa 100644 --- a/node/actors/sync_blocks/src/peers/tests.rs +++ b/node/actors/sync_blocks/src/peers/tests.rs @@ -6,7 +6,7 @@ use concurrency::time; use rand::{rngs::StdRng, seq::IteratorRandom, Rng}; use roles::validator; use std::{collections::HashSet, fmt}; -use storage::{BlockStore, InMemoryStorage}; +use storage::{BlockStore, InMemoryStorage, StorageError}; use test_casing::{test_casing, Product}; const TEST_TIMEOUT: time::Duration = time::Duration::seconds(5); @@ -115,12 +115,9 @@ async fn test_peer_states(test: T) { scope::run!(ctx, |ctx, s| async { s.spawn_bg(async { - peer_states.run(ctx).await.or_else(|err| { - if err.root_cause().is::() { - Ok(()) // Swallow cancellation errors after the test is finished - } else { - Err(err) - } + peer_states.run(ctx).await.or_else(|err| match err { + StorageError::Canceled(_) => Ok(()), // Swallow cancellation errors after the test is finished + StorageError::Database(err) => Err(err), }) }); test.test(ctx, test_handles).await diff --git a/node/actors/sync_blocks/src/tests/end_to_end.rs b/node/actors/sync_blocks/src/tests/end_to_end.rs index 7bb78b1f..1d9256bf 100644 --- a/node/actors/sync_blocks/src/tests/end_to_end.rs +++ b/node/actors/sync_blocks/src/tests/end_to_end.rs @@ -115,7 +115,7 @@ impl Node { sync_blocks_config, ) .await - .expect("Failed"); + .expect("Failed initializing `sync_blocks` actor"); let sync_states_subscriber = sync_blocks.subscribe_to_state_updates(); self.network @@ -158,8 +158,10 @@ impl Node { self.switch_off_receiver .recv_or_disconnected(ctx) - .await? + .await .ok(); + // ^ Unlike with `switch_on_receiver`, the context may get canceled before the receiver + // is dropped, so we swallow both cancellation and disconnect errors here. tracing::trace!("Node stopped"); Ok(()) }) @@ -240,14 +242,9 @@ async fn test_sync_blocks(test: T) { s.spawn_bg(async { let test_validators = test_validators; let key = node.key(); - let err = node.run(ctx, &test_validators).await.unwrap_err(); - + node.run(ctx, &test_validators).await?; tracing::trace!(?key, "Node task completed"); - if err.root_cause().is::() { - Ok(()) // Test has successfully completed - } else { - Err(err) - } + Ok(()) }); } diff --git a/node/libs/concurrency/src/net/tcp/testonly.rs b/node/libs/concurrency/src/net/tcp/testonly.rs index 08ace50b..7406f691 100644 --- a/node/libs/concurrency/src/net/tcp/testonly.rs +++ b/node/libs/concurrency/src/net/tcp/testonly.rs @@ -1,13 +1,23 @@ //! Test-only TCP utilities. use super::{accept, connect, ListenerAddr, Stream, RESERVED_LISTENER_ADDRS}; use crate::{ctx, scope}; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; /// Reserves a random port on localhost for a TCP listener. pub fn reserve_listener() -> ListenerAddr { - let guard = tokio::net::TcpSocket::new_v6().unwrap(); + // Try to bind to an Ipv6 address, then fall back to Ipv4 if Ipv6 is not available. + let localhost_addr = SocketAddr::from((Ipv6Addr::LOCALHOST, 0)); + let mut guard = tokio::net::TcpSocket::new_v6().unwrap(); guard.set_reuseaddr(true).unwrap(); guard.set_reuseport(true).unwrap(); - guard.bind("[::1]:0".parse().unwrap()).unwrap(); + + if guard.bind(localhost_addr).is_err() { + let localhost_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); + guard = tokio::net::TcpSocket::new_v4().unwrap(); + guard.set_reuseaddr(true).unwrap(); + guard.set_reuseport(true).unwrap(); + guard.bind(localhost_addr).unwrap(); + } let addr = guard.local_addr().unwrap(); RESERVED_LISTENER_ADDRS.lock().unwrap().insert(addr, guard); ListenerAddr(addr) diff --git a/node/libs/crypto/src/bn254/tests.rs b/node/libs/crypto/src/bn254/tests.rs index 933b914b..d07afbfa 100644 --- a/node/libs/crypto/src/bn254/tests.rs +++ b/node/libs/crypto/src/bn254/tests.rs @@ -1,8 +1,6 @@ -use std::iter::repeat_with; - -use rand::{rngs::StdRng, Rng, SeedableRng}; - use crate::bn254::{AggregateSignature, PublicKey, SecretKey, Signature}; +use rand::{rngs::StdRng, Rng, SeedableRng}; +use std::iter::repeat_with; #[test] fn signature_smoke() { diff --git a/node/libs/crypto/src/sha256/mod.rs b/node/libs/crypto/src/sha256/mod.rs index 7f1a8902..bc7510dc 100644 --- a/node/libs/crypto/src/sha256/mod.rs +++ b/node/libs/crypto/src/sha256/mod.rs @@ -9,11 +9,17 @@ pub mod testonly; pub struct Sha256(pub(crate) [u8; 32]); impl Sha256 { - /// Computes a SHA256 hash of a message. + /// Computes a SHA-256 hash of a message. pub fn new(msg: &[u8]) -> Self { Self(sha2::Sha256::new().chain(msg).finalize().into()) } + /// Interprets the specified `bytes` as a hash digest (i.e., a reverse operation to [`Self::as_bytes()`]). + /// It is caller's responsibility to ensure that `bytes` are actually a SHA-256 hash digest. + pub fn from_bytes(bytes: [u8; 32]) -> Self { + Self(bytes) + } + /// Returns a reference to the bytes of this hash. pub fn as_bytes(&self) -> &[u8; 32] { &self.0 diff --git a/node/libs/roles/src/validator/messages/block.rs b/node/libs/roles/src/validator/messages/block.rs index 770361d3..e4c96180 100644 --- a/node/libs/roles/src/validator/messages/block.rs +++ b/node/libs/roles/src/validator/messages/block.rs @@ -16,16 +16,17 @@ pub struct Payload(pub Vec); pub struct PayloadHash(pub(crate) sha256::Sha256); impl TextFmt for PayloadHash { - fn encode(&self) -> String { - format!("payload:sha256:{}", hex::encode(ByteFmt::encode(&self.0))) - } fn decode(text: Text) -> anyhow::Result { text.strip("payload:sha256:")?.decode_hex().map(Self) } + + fn encode(&self) -> String { + format!("payload:sha256:{}", hex::encode(ByteFmt::encode(&self.0))) + } } impl fmt::Debug for PayloadHash { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.write_str(&TextFmt::encode(self)) } } @@ -61,24 +62,40 @@ impl fmt::Display for BlockNumber { } } -/// Hash of the block. +/// Hash of the block header. #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct BlockHeaderHash(pub(crate) sha256::Sha256); +impl BlockHeaderHash { + /// Interprets the specified `bytes` as a block header hash digest (i.e., a reverse operation to [`Self::as_bytes()`]). + /// It is caller's responsibility to ensure that `bytes` are actually a block header hash digest. + pub fn from_bytes(bytes: [u8; 32]) -> Self { + Self(sha256::Sha256::from_bytes(bytes)) + } + + /// Returns a reference to the bytes of this hash. + pub fn as_bytes(&self) -> &[u8; 32] { + self.0.as_bytes() + } +} + impl TextFmt for BlockHeaderHash { + fn decode(text: Text) -> anyhow::Result { + text.strip("block_header_hash:sha256:")? + .decode_hex() + .map(Self) + } + fn encode(&self) -> String { format!( - "block_hash:sha256:{}", + "block_header_hash:sha256:{}", hex::encode(ByteFmt::encode(&self.0)) ) } - fn decode(text: Text) -> anyhow::Result { - text.strip("block_hash:sha256:")?.decode_hex().map(Self) - } } impl fmt::Debug for BlockHeaderHash { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.write_str(&TextFmt::encode(self)) } } @@ -147,16 +164,18 @@ impl ByteFmt for FinalBlock { fn decode(bytes: &[u8]) -> anyhow::Result { ::schema::decode(bytes) } + fn encode(&self) -> Vec { ::schema::encode(self) } } impl TextFmt for FinalBlock { - fn encode(&self) -> String { - format!("final_block:{}", hex::encode(ByteFmt::encode(self))) - } fn decode(text: Text) -> anyhow::Result { text.strip("final_block:")?.decode_hex() } + + fn encode(&self) -> String { + format!("final_block:{}", hex::encode(ByteFmt::encode(self))) + } }