From f7c886361417d6692443a3531c57c09832bf353a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Ciarcin=CC=81ski?= Date: Thu, 20 Jun 2024 16:58:49 +0200 Subject: [PATCH 1/7] Code cleanups --- execution_engine_testing/tests/Cargo.toml | 2 +- node/build.rs | 9 ++- node/src/app/main.rs | 8 +-- node/src/cli/arglang.rs | 4 +- .../block_accumulator/block_acceptor.rs | 37 ++++++----- .../src/components/block_accumulator/tests.rs | 2 +- .../execution_results_acquisition.rs | 4 +- .../components/block_synchronizer/tests.rs | 14 ++--- node/src/components/consensus.rs | 4 +- .../components/consensus/era_supervisor.rs | 2 +- .../consensus/highway_core/highway_testing.rs | 6 +- .../consensus/protocols/highway/tests.rs | 2 +- .../consensus/protocols/zug/des_testing.rs | 4 +- .../components/contract_runtime/exec_queue.rs | 2 +- node/src/components/diagnostics_port/tasks.rs | 4 +- node/src/components/event_stream_server.rs | 4 +- .../event_stream_server/sse_server.rs | 8 +-- node/src/components/gossiper/gossip_table.rs | 2 +- node/src/components/network.rs | 63 +++++++++---------- .../components/network/message_pack_format.rs | 8 +-- node/src/components/network/outgoing.rs | 24 +++---- node/src/components/network/tasks.rs | 7 +-- node/src/components/storage/tests.rs | 2 +- node/src/components/transaction_buffer.rs | 7 +-- .../components/transaction_buffer/tests.rs | 4 +- node/src/components/upgrade_watcher.rs | 2 +- node/src/effect.rs | 46 +++++++------- node/src/lib.rs | 10 +-- node/src/logging.rs | 14 ++--- node/src/reactor.rs | 27 ++++---- node/src/reactor/main_reactor/control.rs | 2 +- node/src/reactor/main_reactor/event.rs | 2 +- node/src/tls.rs | 10 +-- node/src/types/sync_leap.rs | 2 +- node/src/utils.rs | 7 ++- node/src/utils/work_queue.rs | 2 +- types/src/uref.rs | 2 +- 37 files changed, 166 insertions(+), 193 deletions(-) diff --git a/execution_engine_testing/tests/Cargo.toml b/execution_engine_testing/tests/Cargo.toml index 047fe63e9e..5598ee5a4e 100644 --- a/execution_engine_testing/tests/Cargo.toml +++ b/execution_engine_testing/tests/Cargo.toml @@ -9,7 +9,7 @@ base16 = "0.2.1" casper-engine-test-support = { path = "../test_support" } casper-execution-engine = { path = "../../execution_engine", features = ["test-support"] } casper-storage = { path = "../../storage" } -casper-types = { path = "../../types", default_features = false, features = ["datasize", "json-schema"] } +casper-types = { path = "../../types", default-features = false, features = ["datasize", "json-schema"] } casper-wasm = "0.46.0" clap = "2" fs_extra = "1.2.0" diff --git a/node/build.rs b/node/build.rs index afa9876009..42f0f6a06a 100644 --- a/node/build.rs +++ b/node/build.rs @@ -20,24 +20,23 @@ fn main() { .output() { Ok(output) => { - //In the event the git command is successful, export the properly formatted git hash to + // In the event the git command is successful, export the properly formatted git hash to // cargo at compile time. let git_hash_raw = String::from_utf8(output.stdout).expect("Failed to obtain commit hash to string"); let git_hash = git_hash_raw.trim_end_matches('\n'); - println!("cargo:rustc-env={}={}", NODE_GIT_HASH_ENV_VAR, git_hash); + println!("cargo:rustc-env={NODE_GIT_HASH_ENV_VAR}={git_hash}"); } Err(error) => { - println!("cargo:warning={}", error); + println!("cargo:warning={error}"); println!("cargo:warning=casper-node build version will not include git short hash"); } } println!( - "cargo:rustc-env={}={}", - NODE_BUILD_PROFILE_ENV_VAR, + "cargo:rustc-env={NODE_BUILD_PROFILE_ENV_VAR}={}", env::var(CARGO_BUILD_PROFILE_ENV_VAR).unwrap() ); } diff --git a/node/src/app/main.rs b/node/src/app/main.rs index 607c4e6a5a..9b42acb1ab 100644 --- a/node/src/app/main.rs +++ b/node/src/app/main.rs @@ -25,12 +25,12 @@ fn panic_hook(info: &PanicInfo) { // Print panic info if let Some(s) = info.payload().downcast_ref::<&str>() { - eprintln!("node panicked: {}", s); + eprintln!("node panicked: {s}"); // TODO - use `info.message()` once https://github.com/rust-lang/rust/issues/66745 is fixed // } else if let Some(message) = info.message() { - // eprintln!("{}", message); + // eprintln!("{message}"); } else { - eprintln!("{}", info); + eprintln!("{info}"); } // Abort after a panic, even if only a worker thread panicked. @@ -55,7 +55,7 @@ fn main() -> anyhow::Result<()> { // Parse CLI args and run selected subcommand. let opts = Cli::from_args(); - runtime.block_on(async { opts.run().await })? + runtime.block_on(opts.run())? }; info!(%exit_code, "exiting casper-node"); diff --git a/node/src/cli/arglang.rs b/node/src/cli/arglang.rs index 78fcba8c56..16cc045661 100644 --- a/node/src/cli/arglang.rs +++ b/node/src/cli/arglang.rs @@ -64,7 +64,7 @@ fn tokenize(input: &str) -> Result, Error> { // Check if we need to complete a token. if !buffer.is_empty() { match ch { - Some(' ') | Some('"') | Some('[') | Some(']') | Some(',') | None => { + Some(' ' | '"' | '[' | ']' | ',') | None => { // Try to parse as number or bool first. if let Ok(value) = i64::from_str(&buffer) { tokens.push(Token::I64(value)); @@ -162,7 +162,7 @@ where } } } - Some(t @ Token::CloseBracket) | Some(t @ Token::Comma) => { + Some(t @ (Token::CloseBracket | Token::Comma)) => { return Err(Error::UnexpectedToken(t)); } None => { diff --git a/node/src/components/block_accumulator/block_acceptor.rs b/node/src/components/block_accumulator/block_acceptor.rs index 2b80a455fd..518aa8b873 100644 --- a/node/src/components/block_accumulator/block_acceptor.rs +++ b/node/src/components/block_accumulator/block_acceptor.rs @@ -286,27 +286,26 @@ impl BlockAcceptor { }, faulty_senders, ); - } else { - if meta_block - .state - .register_as_stored() - .was_already_registered() - { - error!( - %block_hash, - block_height = meta_block.block.height(), - meta_block_state = ?meta_block.state, - "should not store the same block more than once" - ); - } - return ( - ShouldStore::SufficientlySignedBlock { - meta_block: meta_block.clone(), - block_signatures, - }, - faulty_senders, + } + if meta_block + .state + .register_as_stored() + .was_already_registered() + { + error!( + %block_hash, + block_height = meta_block.block.height(), + meta_block_state = ?meta_block.state, + "should not store the same block more than once" ); } + return ( + ShouldStore::SufficientlySignedBlock { + meta_block: meta_block.clone(), + block_signatures, + }, + faulty_senders, + ); } } diff --git a/node/src/components/block_accumulator/tests.rs b/node/src/components/block_accumulator/tests.rs index 3ff9adcd73..21fd8d98be 100644 --- a/node/src/components/block_accumulator/tests.rs +++ b/node/src/components/block_accumulator/tests.rs @@ -2126,7 +2126,7 @@ async fn block_accumulator_doesnt_purge_with_delayed_block_execution() { // block can be delayed. Since we would purge an acceptor if the purge interval has passed, // we want to simulate a situation in which the purge interval was exceeded in order to test // the special case that if an acceptor that had sufficient finality, it is not purged. - tokio::time::sleep( + time::sleep( Duration::from(runner.reactor().block_accumulator.purge_interval) + Duration::from_secs(1), ) .await; diff --git a/node/src/components/block_synchronizer/execution_results_acquisition.rs b/node/src/components/block_synchronizer/execution_results_acquisition.rs index 02d6cd08e2..f99afb5483 100644 --- a/node/src/components/block_synchronizer/execution_results_acquisition.rs +++ b/node/src/components/block_synchronizer/execution_results_acquisition.rs @@ -3,7 +3,7 @@ mod tests; use std::{ collections::HashMap, - fmt::{self, Debug, Display, Formatter}, + fmt::{self, Display, Formatter}, }; use datasize::DataSize; @@ -87,7 +87,7 @@ pub(crate) enum Error { } impl Display for Error { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { Error::BlockHashMismatch { expected, actual } => { write!( diff --git a/node/src/components/block_synchronizer/tests.rs b/node/src/components/block_synchronizer/tests.rs index 83d619ffa1..3d79fa486e 100644 --- a/node/src/components/block_synchronizer/tests.rs +++ b/node/src/components/block_synchronizer/tests.rs @@ -2445,7 +2445,7 @@ async fn historical_sync_skips_exec_results_and_deploys_if_block_empty() { Event::GlobalStateSynced { block_hash: *block.hash(), result: Ok(GlobalStateSynchronizerResponse::new( - super::global_state_synchronizer::RootHash::new(*block.state_root_hash()), + global_state_synchronizer::RootHash::new(*block.state_root_hash()), vec![], )), }, @@ -2552,7 +2552,7 @@ async fn historical_sync_no_legacy_block() { Event::GlobalStateSynced { block_hash: *block.hash(), result: Ok(GlobalStateSynchronizerResponse::new( - super::global_state_synchronizer::RootHash::new(*block.state_root_hash()), + global_state_synchronizer::RootHash::new(*block.state_root_hash()), vec![], )), }, @@ -2780,7 +2780,7 @@ async fn historical_sync_legacy_block_strict_finality() { Event::GlobalStateSynced { block_hash: *block.hash(), result: Ok(GlobalStateSynchronizerResponse::new( - super::global_state_synchronizer::RootHash::new(*block.state_root_hash()), + global_state_synchronizer::RootHash::new(*block.state_root_hash()), vec![], )), }, @@ -2982,7 +2982,7 @@ async fn historical_sync_legacy_block_weak_finality() { Event::GlobalStateSynced { block_hash: *block.hash(), result: Ok(GlobalStateSynchronizerResponse::new( - super::global_state_synchronizer::RootHash::new(*block.state_root_hash()), + global_state_synchronizer::RootHash::new(*block.state_root_hash()), vec![], )), }, @@ -3195,7 +3195,7 @@ async fn historical_sync_legacy_block_any_finality() { Event::GlobalStateSynced { block_hash: *block.hash(), result: Ok(GlobalStateSynchronizerResponse::new( - super::global_state_synchronizer::RootHash::new(*block.state_root_hash()), + global_state_synchronizer::RootHash::new(*block.state_root_hash()), vec![], )), }, @@ -3865,7 +3865,7 @@ async fn historical_sync_latch_should_not_decrement_for_old_deploy_fetch_respons Event::GlobalStateSynced { block_hash: *block.hash(), result: Ok(GlobalStateSynchronizerResponse::new( - super::global_state_synchronizer::RootHash::new(*block.state_root_hash()), + global_state_synchronizer::RootHash::new(*block.state_root_hash()), vec![], )), }, @@ -4136,7 +4136,7 @@ async fn historical_sync_latch_should_not_decrement_for_old_execution_results() Event::GlobalStateSynced { block_hash: *block.hash(), result: Ok(GlobalStateSynchronizerResponse::new( - super::global_state_synchronizer::RootHash::new(*block.state_root_hash()), + global_state_synchronizer::RootHash::new(*block.state_root_hash()), vec![], )), }, diff --git a/node/src/components/consensus.rs b/node/src/components/consensus.rs index c553cf9d52..0062386ed6 100644 --- a/node/src/components/consensus.rs +++ b/node/src/components/consensus.rs @@ -183,7 +183,7 @@ pub(crate) enum Event { } impl Debug for ConsensusMessage { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { ConsensusMessage::Protocol { era_id, payload: _ } => { write!(f, "Protocol {{ era_id: {:?}, .. }}", era_id) @@ -218,7 +218,7 @@ impl Display for ConsensusMessage { } impl Debug for ConsensusRequestMessage { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!( f, "ConsensusRequestMessage {{ era_id: {:?}, .. }}", diff --git a/node/src/components/consensus/era_supervisor.rs b/node/src/components/consensus/era_supervisor.rs index 4b4efc6dd1..29b3febac3 100644 --- a/node/src/components/consensus/era_supervisor.rs +++ b/node/src/components/consensus/era_supervisor.rs @@ -142,7 +142,7 @@ impl EraSupervisor { registry: &Registry, ) -> Result { let unit_files_folder = storage_dir.join("unit_files"); - std::fs::create_dir_all(&unit_files_folder)?; + fs::create_dir_all(&unit_files_folder)?; info!(our_id = %validator_matrix.public_signing_key(), "EraSupervisor pubkey",); let metrics = Metrics::new(registry)?; diff --git a/node/src/components/consensus/highway_core/highway_testing.rs b/node/src/components/consensus/highway_core/highway_testing.rs index 97a96bf494..a3e1251a49 100644 --- a/node/src/components/consensus/highway_core/highway_testing.rs +++ b/node/src/components/consensus/highway_core/highway_testing.rs @@ -71,7 +71,7 @@ enum HighwayMessage { } impl Debug for HighwayMessage { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { HighwayMessage::Timer(t) => f.debug_tuple("Timer").field(&t.millis()).finish(), HighwayMessage::RequestBlock(bc) => f @@ -153,7 +153,7 @@ pub(crate) enum TestRunError { } impl Display for TestRunError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { TestRunError::NoMessages => write!( f, @@ -1024,7 +1024,7 @@ impl Debug for HashWrapper { } impl Display for HashWrapper { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { Debug::fmt(self, f) } } diff --git a/node/src/components/consensus/protocols/highway/tests.rs b/node/src/components/consensus/protocols/highway/tests.rs index 3ff64c52ff..2f3b17c009 100644 --- a/node/src/components/consensus/protocols/highway/tests.rs +++ b/node/src/components/consensus/protocols/highway/tests.rs @@ -48,7 +48,7 @@ where highway_testing::TEST_ENDORSEMENT_EVIDENCE_LIMIT, ); let weights = weights.into_iter().map(|w| w.into()).collect::>(); - state::State::new(weights, params, vec![], vec![]) + State::new(weights, params, vec![], vec![]) } const INSTANCE_ID_DATA: &[u8; 1] = &[123u8; 1]; diff --git a/node/src/components/consensus/protocols/zug/des_testing.rs b/node/src/components/consensus/protocols/zug/des_testing.rs index e21938820e..fd544efe53 100644 --- a/node/src/components/consensus/protocols/zug/des_testing.rs +++ b/node/src/components/consensus/protocols/zug/des_testing.rs @@ -166,7 +166,7 @@ pub(crate) enum TestRunError { } impl Display for TestRunError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { TestRunError::NoMessages => write!( f, @@ -999,7 +999,7 @@ impl Debug for HashWrapper { } impl Display for HashWrapper { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { Debug::fmt(self, f) } } diff --git a/node/src/components/contract_runtime/exec_queue.rs b/node/src/components/contract_runtime/exec_queue.rs index 95efd19f31..e843b59ec1 100644 --- a/node/src/components/contract_runtime/exec_queue.rs +++ b/node/src/components/contract_runtime/exec_queue.rs @@ -44,7 +44,7 @@ impl ExecQueue { *locked_queue = locked_queue.split_off(&height); - core::convert::TryInto::try_into(locked_queue.len()).unwrap_or(i64::MIN) + TryInto::try_into(locked_queue.len()).unwrap_or(i64::MIN) } } diff --git a/node/src/components/diagnostics_port/tasks.rs b/node/src/components/diagnostics_port/tasks.rs index 0e3c34461b..ee6136c482 100644 --- a/node/src/components/diagnostics_port/tasks.rs +++ b/node/src/components/diagnostics_port/tasks.rs @@ -395,7 +395,7 @@ impl Session { let tempdir = tempfile::tempdir().map_err(ObtainDumpError::CreateTempDir)?; let tempfile_path = tempdir.path().join("queue-dump"); - let tempfile = fs::File::create(&tempfile_path).map_err(ObtainDumpError::CreateTempFile)?; + let tempfile = File::create(&tempfile_path).map_err(ObtainDumpError::CreateTempFile)?; effect_builder .diagnostics_port_dump_queue(self.create_queue_dump_format(tempfile)) @@ -403,7 +403,7 @@ impl Session { // We can now reopen the file and return it. let reopened_tempfile = - fs::File::open(tempfile_path).map_err(ObtainDumpError::ReopenTempFile)?; + File::open(tempfile_path).map_err(ObtainDumpError::ReopenTempFile)?; Ok(reopened_tempfile) } diff --git a/node/src/components/event_stream_server.rs b/node/src/components/event_stream_server.rs index e1f24122f9..ff7f0882bd 100644 --- a/node/src/components/event_stream_server.rs +++ b/node/src/components/event_stream_server.rs @@ -232,7 +232,7 @@ where fn handle_event( &mut self, - _effect_builder: EffectBuilder, + effect_builder: EffectBuilder, _rng: &mut NodeRng, event: Self::Event, ) -> Effects { @@ -256,7 +256,7 @@ where } ComponentState::Initializing => match event { Event::Initialize => { - let (effects, state) = self.bind(self.config.enable_server, _effect_builder); + let (effects, state) = self.bind(self.config.enable_server, effect_builder); >::set_state(self, state); effects } diff --git a/node/src/components/event_stream_server/sse_server.rs b/node/src/components/event_stream_server/sse_server.rs index 02351b99bb..994abad7d8 100644 --- a/node/src/components/event_stream_server/sse_server.rs +++ b/node/src/components/event_stream_server/sse_server.rs @@ -234,7 +234,7 @@ pub(super) struct NewSubscriberInfo { /// Maps the `event` to a warp event, or `None` if it's a malformed event (ie.: `ApiVersion` event /// with `id` set or event other than `ApiVersion` without `id`) -async fn map_server_sent_event( +fn map_server_sent_event( event: &ServerSentEvent, ) -> Option> { let id = match event.id { @@ -292,7 +292,7 @@ async fn map_server_sent_event( /// /// If `query` is not empty, returns a 422 response if `query` doesn't have exactly one entry, /// "starts_from" mapped to a value representing an event ID. -fn parse_query(query: HashMap) -> Result, Response> { +fn parse_query(query: &HashMap) -> Result, Response> { if query.is_empty() { return Ok(None); } @@ -374,7 +374,7 @@ impl ChannelsAndFilter { return create_503(); } - let start_from = match parse_query(query) { + let start_from = match parse_query(&query) { Ok(maybe_id) => maybe_id, Err(error_response) => return error_response, }; @@ -489,7 +489,7 @@ fn stream_to_client( .chain(ongoing_stream) .filter_map(move |result| async move { match result { - Ok(event) => map_server_sent_event(&event).await, + Ok(event) => map_server_sent_event(&event), Err(error) => Some(Err(error)), } }) diff --git a/node/src/components/gossiper/gossip_table.rs b/node/src/components/gossiper/gossip_table.rs index 1e46b01770..565998b19f 100644 --- a/node/src/components/gossiper/gossip_table.rs +++ b/node/src/components/gossiper/gossip_table.rs @@ -673,7 +673,7 @@ mod tests { // Check same complete data from other source causes `Noop` to be returned since we still // have all gossip requests in flight. Check it updates holders. - gossip_table.register_infection_attempt(&data_id, std::iter::once(&node_ids[0])); + gossip_table.register_infection_attempt(&data_id, iter::once(&node_ids[0])); let action = gossip_table.new_complete_data(&data_id, Some(node_ids[0]), GossipTarget::All); assert_eq!(GossipAction::Noop, action); check_holders(&node_ids[..1], &gossip_table, &data_id); diff --git a/node/src/components/network.rs b/node/src/components/network.rs index 51a8756075..da81fcfe8b 100644 --- a/node/src/components/network.rs +++ b/node/src/components/network.rs @@ -45,7 +45,7 @@ pub(crate) mod tasks; mod tests; use std::{ - collections::{BTreeMap, HashMap, HashSet}, + collections::{BTreeMap, BTreeSet, HashMap, HashSet}, fmt::{self, Debug, Display, Formatter}, io, net::{SocketAddr, TcpListener}, @@ -278,7 +278,7 @@ where ); let context = Arc::new(NetworkContext::new( - cfg.clone(), + &cfg, our_identity, node_key_pair.map(NodeKeyPair::new), chain_info_source.into(), @@ -421,7 +421,7 @@ where total_outgoing_manager_connected_peers += 1; if self.outgoing_limiter.is_validator_in_era(era_id, &peer_id) { total_connected_validators_in_era += 1; - self.send_message(peer_id, msg.clone(), None) + self.send_message(peer_id, msg.clone(), None); } } @@ -441,7 +441,7 @@ where msg: Arc>, gossip_target: GossipTarget, count: usize, - exclude: HashSet, + exclude: &HashSet, ) -> HashSet { let is_validator_in_era = |era: EraId, peer_id: &NodeId| self.outgoing_limiter.is_validator_in_era(era, peer_id); @@ -449,7 +449,7 @@ where rng, gossip_target, count, - exclude.clone(), + exclude, self.outgoing_manager.connected_peers(), is_validator_in_era, ); @@ -556,7 +556,7 @@ where if let Some(symmetries) = self.connection_symmetries.get(&peer_id) { let incoming_count = symmetries .incoming_addrs() - .map(|addrs| addrs.len()) + .map(BTreeSet::len) .unwrap_or_default(); if incoming_count >= self.cfg.max_incoming_peer_connections as usize { @@ -635,12 +635,8 @@ where span.in_scope(|| { // Log the outcome. match result { - Ok(()) => { - info!("regular connection closing") - } - Err(ref err) => { - warn!(err = display_error(err), "connection dropped") - } + Ok(()) => info!("regular connection closing"), + Err(ref err) => warn!(err = display_error(err), "connection dropped"), } // Update the connection symmetries. @@ -655,7 +651,6 @@ where /// Determines whether an outgoing peer should be blocked based on the connection error. fn is_blockable_offense_for_outgoing( - &self, error: &ConnectionError, ) -> Option { match error { @@ -721,7 +716,7 @@ where // We perform blocking first, to not trigger a reconnection before blocking. let mut requests = Vec::new(); - if let Some(justification) = self.is_blockable_offense_for_outgoing(&error) { + if let Some(justification) = Self::is_blockable_offense_for_outgoing(&error) { requests.extend(self.outgoing_manager.block_addr( peer_addr, now, @@ -853,7 +848,7 @@ where Arc::new(Message::Payload(*payload)), gossip_target, count, - exclude, + &exclude, ); auto_closing_responder.respond(sent_to).ignore() } @@ -886,7 +881,7 @@ where { let mut effects = Effects::new(); - for request in requests.into_iter() { + for request in requests { trace!(%request, "processing dial request"); match request { DialRequest::Dial { addr, span } => effects.extend( @@ -901,7 +896,7 @@ where // Dropping the `handle` is enough to signal the connection to shutdown. span.in_scope(|| { debug!("dropping connection, as requested"); - }) + }); } DialRequest::SendPing { peer_id, @@ -989,7 +984,7 @@ where ret.insert(node_id, connection.peer_addr.to_string()); } else { // This should never happen unless the state of `OutgoingManager` is corrupt. - warn!(%node_id, "route disappeared unexpectedly") + warn!(%node_id, "route disappeared unexpectedly"); } } @@ -1052,7 +1047,7 @@ where our_id=%self.context.our_id(), err=display_error(err), "could not join server task cleanly" - ) + ); } } } @@ -1074,7 +1069,7 @@ fn choose_gossip_peers( rng: &mut NodeRng, gossip_target: GossipTarget, count: usize, - exclude: HashSet, + exclude: &HashSet, connected_peers: impl Iterator, is_validator_in_era: F, ) -> HashSet @@ -1442,7 +1437,7 @@ mod gossip_target_tests { &mut rng, TARGET, VALIDATOR_COUNT + NON_VALIDATOR_COUNT + 1, - HashSet::new(), + &HashSet::new(), fixture.all_peers.iter().copied(), fixture.is_validator_in_era(), ); @@ -1453,7 +1448,7 @@ mod gossip_target_tests { &mut rng, TARGET, VALIDATOR_COUNT + NON_VALIDATOR_COUNT, - HashSet::new(), + &HashSet::new(), fixture.all_peers.iter().copied(), fixture.is_validator_in_era(), ); @@ -1465,7 +1460,7 @@ mod gossip_target_tests { &mut rng, TARGET, 2 * VALIDATOR_COUNT, - HashSet::new(), + &HashSet::new(), fixture.all_peers.iter().copied(), fixture.is_validator_in_era(), ); @@ -1479,7 +1474,7 @@ mod gossip_target_tests { &mut rng, TARGET, VALIDATOR_COUNT, - HashSet::new(), + &HashSet::new(), fixture.all_peers.iter().copied(), fixture.is_validator_in_era(), ); @@ -1496,7 +1491,7 @@ mod gossip_target_tests { &mut rng, TARGET, 2, - HashSet::new(), + &HashSet::new(), fixture.all_peers.iter().copied(), fixture.is_validator_in_era(), ); @@ -1514,7 +1509,7 @@ mod gossip_target_tests { &mut rng, TARGET, 1, - HashSet::new(), + &HashSet::new(), fixture.all_peers.iter().copied(), fixture.is_validator_in_era(), ); @@ -1538,7 +1533,7 @@ mod gossip_target_tests { &mut rng, TARGET, VALIDATOR_COUNT, - exclude.clone(), + &exclude, fixture.all_peers.iter().copied(), fixture.is_validator_in_era(), ); @@ -1556,7 +1551,7 @@ mod gossip_target_tests { &mut rng, TARGET, 3, - exclude.clone(), + &exclude, fixture.all_peers.iter().copied(), fixture.is_validator_in_era(), ); @@ -1577,7 +1572,7 @@ mod gossip_target_tests { &mut rng, TARGET, VALIDATOR_COUNT + NON_VALIDATOR_COUNT + 1, - HashSet::new(), + &HashSet::new(), fixture.all_peers.iter().copied(), fixture.is_validator_in_era(), ); @@ -1588,7 +1583,7 @@ mod gossip_target_tests { &mut rng, TARGET, VALIDATOR_COUNT + NON_VALIDATOR_COUNT, - HashSet::new(), + &HashSet::new(), fixture.all_peers.iter().copied(), fixture.is_validator_in_era(), ); @@ -1599,7 +1594,7 @@ mod gossip_target_tests { &mut rng, TARGET, VALIDATOR_COUNT, - HashSet::new(), + &HashSet::new(), fixture.validators.iter().copied(), fixture.is_validator_in_era(), ); @@ -1612,7 +1607,7 @@ mod gossip_target_tests { &mut rng, TARGET, VALIDATOR_COUNT, - HashSet::new(), + &HashSet::new(), fixture.non_validators.iter().copied(), fixture.is_validator_in_era(), ); @@ -1631,7 +1626,7 @@ mod gossip_target_tests { &mut rng, TARGET, VALIDATOR_COUNT, - exclude.clone(), + &exclude, fixture.all_peers.iter().copied(), fixture.is_validator_in_era(), ); @@ -1655,7 +1650,7 @@ mod gossip_target_tests { &mut rng, TARGET, 1, - exclude.clone(), + &exclude, fixture.all_peers.iter().copied(), fixture.is_validator_in_era(), ); diff --git a/node/src/components/network/message_pack_format.rs b/node/src/components/network/message_pack_format.rs index 27a9ee2457..b991c6639e 100644 --- a/node/src/components/network/message_pack_format.rs +++ b/node/src/components/network/message_pack_format.rs @@ -4,10 +4,7 @@ //! our network decoder via `Cargo.toml`; using `tokio_serde::MessagePack` would instead tie it //! to the dependency specified in `tokio_serde`'s `Cargo.toml`. -use std::{ - io::{self, Cursor}, - pin::Pin, -}; +use std::{io, pin::Pin}; use bytes::{Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -41,7 +38,6 @@ where #[inline] fn deserialize(self: Pin<&mut Self>, src: &BytesMut) -> Result { - rmp_serde::from_read(Cursor::new(src)) - .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err)) + rmp_serde::from_read_ref(src).map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err)) } } diff --git a/node/src/components/network/outgoing.rs b/node/src/components/network/outgoing.rs index dab88505d9..550a8a6394 100644 --- a/node/src/components/network/outgoing.rs +++ b/node/src/components/network/outgoing.rs @@ -89,9 +89,6 @@ //! precedence over the previous one. This prevents problems when a notification of a terminated //! connection is overtaken by the new connection announcement. -// Clippy has a lot of false positives due to `span.clone()`-closures. -#![allow(clippy::redundant_clone)] - use std::{ collections::{hash_map::Entry, HashMap}, error::Error, @@ -102,7 +99,6 @@ use std::{ }; use datasize::DataSize; - use prometheus::IntGauge; use rand::Rng; use tracing::{debug, error, error_span, field::Empty, info, trace, warn, Span}; @@ -230,9 +226,9 @@ impl DialOutcome { /// Retrieves the socket address from the `DialOutcome`. fn addr(&self) -> SocketAddr { match self { - DialOutcome::Successful { addr, .. } => *addr, - DialOutcome::Failed { addr, .. } => *addr, - DialOutcome::Loopback { addr, .. } => *addr, + DialOutcome::Successful { addr, .. } + | DialOutcome::Failed { addr, .. } + | DialOutcome::Loopback { addr, .. } => *addr, } } } @@ -305,7 +301,7 @@ impl OutgoingConfig { /// `failed_attempts` (n) is the number of previous attempts *before* the current failure (thus /// starting at 0). The backoff time will be double for each attempt. fn calc_backoff(&self, failed_attempts: u8) -> Duration { - 2u32.pow(failed_attempts as u32) * self.base_timeout + (1u32 << failed_attempts as u32) * self.base_timeout } } @@ -539,7 +535,7 @@ where /// Iterates over all connected peer IDs. pub(crate) fn connected_peers(&'_ self) -> impl Iterator + '_ { - self.routes.keys().cloned() + self.routes.keys().copied() } /// Notify about a potentially new address that has been discovered. @@ -718,7 +714,7 @@ where let mut to_reconnect = Vec::new(); let mut to_ping = Vec::new(); - for (&addr, outgoing) in self.outgoing.iter_mut() { + for (&addr, outgoing) in &mut self.outgoing { // Note: `Span::in_scope` is no longer serviceable here due to borrow limitations. let _span_guard = make_span(addr, Some(outgoing)).entered(); @@ -806,12 +802,12 @@ where } // Remove all addresses marked for forgetting. - to_forget.into_iter().for_each(|addr| { + for addr in to_forget { self.outgoing.remove(&addr); - }); + } // Fail connections that are taking way too long to connect. - to_fail.into_iter().for_each(|(addr, failures_so_far)| { + for (addr, failures_so_far) in to_fail { let span = make_span(addr, self.outgoing.get(&addr)); span.in_scope(|| { @@ -824,7 +820,7 @@ where }, ) }); - }); + } let mut dial_requests = Vec::new(); diff --git a/node/src/components/network/tasks.rs b/node/src/components/network/tasks.rs index 671c2a11f5..16e2a1daf9 100644 --- a/node/src/components/network/tasks.rs +++ b/node/src/components/network/tasks.rs @@ -146,7 +146,7 @@ where Span::current().record("peer_id", &field::display(peer_id)); if peer_id == context.our_id { - info!("incoming loopback connection"); + info!("outgoing loopback connection"); return OutgoingConnection::Loopback { peer_addr }; } @@ -240,7 +240,7 @@ where impl NetworkContext { pub(super) fn new( - cfg: Config, + cfg: &Config, our_identity: Identity, node_key_pair: Option, chain_info: ChainInfo, @@ -608,8 +608,7 @@ pub(super) async fn server( let handler_span = span.clone(); tokio::spawn( async move { - let incoming = - handle_incoming(context.clone(), stream, peer_addr).await; + let incoming = handle_incoming(context, stream, peer_addr).await; event_queue .schedule( Event::IncomingConnection { diff --git a/node/src/components/storage/tests.rs b/node/src/components/storage/tests.rs index ed04c1f8fd..962ea3979c 100644 --- a/node/src/components/storage/tests.rs +++ b/node/src/components/storage/tests.rs @@ -2278,7 +2278,7 @@ fn check_force_resync_with_marker_file() { ); drop(storage); // Remove the marker file. - std::fs::remove_file(&force_resync_file_path).unwrap(); + fs::remove_file(&force_resync_file_path).unwrap(); assert!(!force_resync_file_path.exists()); // Reinitialize storage with force resync enabled. diff --git a/node/src/components/transaction_buffer.rs b/node/src/components/transaction_buffer.rs index 1a5ffde1f1..5ac8a8603e 100644 --- a/node/src/components/transaction_buffer.rs +++ b/node/src/components/transaction_buffer.rs @@ -122,7 +122,7 @@ impl TransactionBuffer { } }; debug!( - blocks = ?blocks.iter().map(|b| b.height()).collect_vec(), + blocks = ?blocks.iter().map(Block::height).collect_vec(), "TransactionBuffer: initialization" ); info!("initialized {}", >::name(self)); @@ -162,7 +162,7 @@ impl TransactionBuffer { // clear expired transaction from all holds, then clear any entries that have no items // remaining self.hold.iter_mut().for_each(|(_, held_transactions)| { - held_transactions.retain(|transaction_hash| !freed.contains_key(transaction_hash)) + held_transactions.retain(|transaction_hash| !freed.contains_key(transaction_hash)); }); self.hold.retain(|_, remaining| !remaining.is_empty()); @@ -208,7 +208,6 @@ impl TransactionBuffer { } fn register_transaction_gossiped( - &mut self, transaction_id: TransactionId, effect_builder: EffectBuilder, ) -> Effects @@ -776,7 +775,7 @@ where Effects::new() } Event::ReceiveTransactionGossiped(transaction_id) => { - self.register_transaction_gossiped(transaction_id, effect_builder) + Self::register_transaction_gossiped(transaction_id, effect_builder) } Event::StoredTransaction(transaction_id, maybe_transaction) => { match maybe_transaction { diff --git a/node/src/components/transaction_buffer/tests.rs b/node/src/components/transaction_buffer/tests.rs index ac05429a91..a9c544ae0b 100644 --- a/node/src/components/transaction_buffer/tests.rs +++ b/node/src/components/transaction_buffer/tests.rs @@ -368,7 +368,7 @@ fn get_appendable_block_when_transfers_are_of_one_category() { get_appendable_block( &mut rng, &mut transaction_buffer, - std::iter::repeat_with(|| MINT_LANE_ID), + iter::repeat_with(|| MINT_LANE_ID), transaction_config .transaction_v1_config .get_max_transaction_count(MINT_LANE_ID) as usize @@ -439,7 +439,7 @@ fn get_appendable_block_when_standards_are_of_one_category() { get_appendable_block( &mut rng, &mut transaction_buffer, - std::iter::repeat_with(|| large_lane_id), + iter::repeat_with(|| large_lane_id), transaction_config .transaction_v1_config .get_max_transaction_count(large_lane_id) as usize diff --git a/node/src/components/upgrade_watcher.rs b/node/src/components/upgrade_watcher.rs index aaeddd05ea..20b333d5c7 100644 --- a/node/src/components/upgrade_watcher.rs +++ b/node/src/components/upgrade_watcher.rs @@ -329,7 +329,7 @@ struct UpgradePoint { impl UpgradePoint { /// Parses a chainspec file at the given path as an `UpgradePoint`. - fn from_chainspec_path + std::fmt::Debug>(path: P) -> Result { + fn from_chainspec_path + fmt::Debug>(path: P) -> Result { let bytes = file_utils::read_file(path.as_ref().join(CHAINSPEC_FILENAME)) .map_err(Error::LoadUpgradePoint)?; Ok(toml::from_str(std::str::from_utf8(&bytes).unwrap())?) diff --git a/node/src/effect.rs b/node/src/effect.rs index 330c7d82a2..b7de2ad642 100644 --- a/node/src/effect.rs +++ b/node/src/effect.rs @@ -252,12 +252,12 @@ impl AutoClosingResponder { impl AutoClosingResponder { /// Send `Some(data)` to the origin of the request. pub(crate) async fn respond(self, data: T) { - self.into_inner().respond(Some(data)).await + self.into_inner().respond(Some(data)).await; } /// Send `None` to the origin of the request. pub(crate) async fn respond_none(self) { - self.into_inner().respond(None).await + self.into_inner().respond(None).await; } } @@ -273,7 +273,7 @@ impl Drop for AutoClosingResponder { debug!( unsent_value = %self.0, "failed to auto-close responder, ignoring" - ) + ); } } } @@ -615,7 +615,7 @@ impl EffectBuilder { // // If it does happen, we pretend nothing happened instead of crashing. if self.event_queue.shutdown_flag().is_set() { - debug!(%err, channel=?type_name::(), "ignoring closed channel due to shutdown") + debug!(%err, channel=?type_name::(), "ignoring closed channel due to shutdown"); } else { error!(%err, channel=?type_name::(), "request for channel closed, this may be a bug? \ check if a component is stuck from now on"); @@ -653,14 +653,14 @@ impl EffectBuilder { { self.event_queue .schedule(FatalAnnouncement { file, line, msg }, QueueKind::Control) - .await + .await; } /// Sets a timeout. pub(crate) async fn set_timeout(self, timeout: Duration) -> Duration { let then = Instant::now(); time::sleep(timeout).await; - Instant::now() - then + then.elapsed() } /// Retrieve a snapshot of the nodes current metrics formatted as string. @@ -826,7 +826,7 @@ impl EffectBuilder { >::from_incoming(sender, payload), QueueKind::NetworkIncoming, ) - .await + .await; } /// Announces that a gossiper has received a new item, where the item's ID is the complete item. @@ -1013,7 +1013,7 @@ impl EffectBuilder { UpgradeWatcherAnnouncement(maybe_next_upgrade), QueueKind::Control, ) - .await + .await; } /// Announces a committed Step success. @@ -1026,7 +1026,7 @@ impl EffectBuilder { ContractRuntimeAnnouncement::CommitStepSuccess { era_id, effects }, QueueKind::ContractRuntime, ) - .await + .await; } pub(crate) async fn update_contract_runtime_state(self, new_pre_state: ExecutionPreState) @@ -1038,7 +1038,7 @@ impl EffectBuilder { ContractRuntimeRequest::UpdatePreState { new_pre_state }, QueueKind::ContractRuntime, ) - .await + .await; } /// Announces validators for upcoming era. @@ -1057,7 +1057,7 @@ impl EffectBuilder { }, QueueKind::ContractRuntime, ) - .await + .await; } pub(crate) async fn announce_new_era_gas_price(self, era_id: EraId, next_era_gas_price: u8) @@ -1072,7 +1072,7 @@ impl EffectBuilder { }, QueueKind::ContractRuntime, ) - .await + .await; } /// Begins gossiping an item. @@ -1090,7 +1090,7 @@ impl EffectBuilder { }, QueueKind::Gossip, ) - .await + .await; } /// Puts the given block into the linear block store. @@ -1726,7 +1726,7 @@ impl EffectBuilder { }, QueueKind::ToStorage, ) - .await + .await; } /// Gets the requested block and its finality signatures. @@ -1888,7 +1888,7 @@ impl EffectBuilder { }, QueueKind::ContractRuntime, ) - .await + .await; } pub(crate) async fn enqueue_protocol_upgrade( @@ -1910,7 +1910,7 @@ impl EffectBuilder { }, QueueKind::Control, ) - .await + .await; } /// Checks whether the transactions included in the block exist on the network and the block is @@ -1946,7 +1946,7 @@ impl EffectBuilder { ConsensusAnnouncement::Proposed(Box::new(proposed_block)), QueueKind::Consensus, ) - .await + .await; } /// Announces that a block has been finalized. @@ -1959,7 +1959,7 @@ impl EffectBuilder { ConsensusAnnouncement::Finalized(Box::new(finalized_block)), QueueKind::Consensus, ) - .await + .await; } /// Announces that a meta block has been created or its state has changed. @@ -1969,7 +1969,7 @@ impl EffectBuilder { { self.event_queue .schedule(MetaBlockAnnouncement(meta_block), QueueKind::Regular) - .await + .await; } /// Announces that a finalized block has been created, but it was not @@ -1983,7 +1983,7 @@ impl EffectBuilder { UnexecutedBlockAnnouncement(block_height), QueueKind::Regular, ) - .await + .await; } /// An equivocation has been detected. @@ -2004,7 +2004,7 @@ impl EffectBuilder { }, QueueKind::Consensus, ) - .await + .await; } /// Blocks a specific peer due to a transgression. @@ -2026,7 +2026,7 @@ impl EffectBuilder { }, QueueKind::NetworkInfo, ) - .await + .await; } /// Gets the next scheduled upgrade, if any. @@ -2243,7 +2243,7 @@ impl EffectBuilder { }, QueueKind::Control, ) - .await + .await; } /// Activates/deactivates a failpoint from a given activation. diff --git a/node/src/lib.rs b/node/src/lib.rs index a2922d1913..d28c365039 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -145,10 +145,7 @@ mod tests { let (version, sha) = prefix.split_once('-').unwrap_or((prefix, "")); assert_eq!(version, env!("CARGO_PKG_VERSION")); - assert_eq!( - sha, - std::env::var("NODE_GIT_SHA").unwrap_or_default().as_str() - ); + assert_eq!(sha, env::var("NODE_GIT_SHA").unwrap_or_default().as_str()); if env!("NODE_BUILD_PROFILE") == "release" { assert_eq!(profile, ""); } else { @@ -163,10 +160,7 @@ mod tests { let (version, sha) = prefix.split_once('-').unwrap_or((prefix, "")); assert_eq!(version, env!("CARGO_PKG_VERSION")); - assert_eq!( - sha, - std::env::var("NODE_GIT_SHA").unwrap_or_default().as_str() - ); + assert_eq!(sha, env::var("NODE_GIT_SHA").unwrap_or_default().as_str()); if env!("NODE_BUILD_PROFILE") == "release" { assert_eq!(profile, ""); } else { diff --git a/node/src/logging.rs b/node/src/logging.rs index b73f4300c9..06e83611a8 100644 --- a/node/src/logging.rs +++ b/node/src/logging.rs @@ -1,6 +1,6 @@ //! Logging via the tracing crate. -use std::{env, fmt, io}; +use std::{env, fmt, io, string::ToString}; use ansi_term::{Color, Style}; use anyhow::anyhow; @@ -284,8 +284,8 @@ impl ReloadHandle { /// Returns a string representation of the current [`EnvFilter`], if set. fn display_log_filter(&self) -> Result { match self { - ReloadHandle::Text(handle) => handle.with_current(|env_filter| env_filter.to_string()), - ReloadHandle::Json(handle) => handle.with_current(|env_filter| env_filter.to_string()), + ReloadHandle::Text(handle) => handle.with_current(ToString::to_string), + ReloadHandle::Json(handle) => handle.with_current(ToString::to_string), } } } @@ -311,12 +311,12 @@ pub fn display_global_env_filter() -> anyhow::Result { } /// Type alias for the formatting function used. -pub type FormatDebugFn = fn(&mut Writer, &Field, &dyn std::fmt::Debug) -> fmt::Result; +pub type FormatDebugFn = fn(&mut Writer, &Field, &dyn fmt::Debug) -> fmt::Result; fn format_into_debug_writer( writer: &mut Writer, field: &Field, - value: &dyn std::fmt::Debug, + value: &dyn fmt::Debug, ) -> fmt::Result { match field.name() { LOG_FIELD_MESSAGE => write!(writer, "{:?}", value), @@ -346,7 +346,7 @@ pub fn init_with_config(config: &LoggingConfig) -> anyhow::Result<()> { // Setup a new tracing-subscriber writing to `stdout` for logging. LoggingFormat::Text => { let builder = tracing_subscriber::fmt() - .with_writer(io::stdout as fn() -> std::io::Stdout) + .with_writer(io::stdout as fn() -> io::Stdout) .with_env_filter(filter) .fmt_fields(formatter) .event_format(FmtEvent::new(config.color, config.abbreviate_modules)) @@ -360,7 +360,7 @@ pub fn init_with_config(config: &LoggingConfig) -> anyhow::Result<()> { // JSON logging writes to `stdout` as well but uses the JSON format. LoggingFormat::Json => { let builder = tracing_subscriber::fmt() - .with_writer(io::stdout as fn() -> std::io::Stdout) + .with_writer(io::stdout as fn() -> io::Stdout) .with_env_filter(filter) .json() .with_filter_reloading(); diff --git a/node/src/reactor.rs b/node/src/reactor.rs index a4a94cfa93..843b7c5f9f 100644 --- a/node/src/reactor.rs +++ b/node/src/reactor.rs @@ -222,7 +222,7 @@ impl EventQueueHandle { where REv: From, { - self.schedule_with_ancestor(None, event, queue_kind).await + self.schedule_with_ancestor(None, event, queue_kind).await; } /// Schedule an event on a specific queue. @@ -236,7 +236,7 @@ impl EventQueueHandle { { self.scheduler .push((ancestor, event.into()), queue_kind) - .await + .await; } /// Returns number of events in each of the scheduler's queues. @@ -538,8 +538,7 @@ where ); // Run all effects from component instantiation. process_effects(None, scheduler, initial_effects, QueueKind::Regular) - .instrument(debug_span!("process initial effects")) - .await; + .instrument(debug_span!("process initial effects")); info!("reactor main loop is ready"); @@ -658,7 +657,7 @@ where warn!( ?err, "failed to write/flush queue dump using debug format" - ) + ); }) .ok(); }) @@ -710,9 +709,7 @@ where self.scheduler, effects, queue_kind, - ) - .in_current_span() - .await; + ); self.current_event_id += 1; @@ -836,12 +833,10 @@ where let effects = create_effects(effect_builder); - process_effects(None, self.scheduler, effects, QueueKind::Regular) - .instrument(debug_span!( - "process injected effects", - ev = self.current_event_id - )) - .await; + process_effects(None, self.scheduler, effects, QueueKind::Regular).instrument(debug_span!( + "process injected effects", + ev = self.current_event_id + )); } /// Processes a single event if there is one and we haven't previously handled an exit code. @@ -937,7 +932,7 @@ where /// Spawns tasks that will process the given effects. /// /// Result events from processing the events will be scheduled with the given ancestor. -async fn process_effects( +fn process_effects( ancestor: Option, scheduler: &'static Scheduler, effects: Effects, @@ -948,7 +943,7 @@ async fn process_effects( for effect in effects { tokio::spawn(async move { for event in effect.await { - scheduler.push((ancestor, event), queue_kind).await + scheduler.push((ancestor, event), queue_kind).await; } }); } diff --git a/node/src/reactor/main_reactor/control.rs b/node/src/reactor/main_reactor/control.rs index 7cb01e868d..1268114dab 100644 --- a/node/src/reactor/main_reactor/control.rs +++ b/node/src/reactor/main_reactor/control.rs @@ -482,7 +482,7 @@ impl MainReactor { Ok(Some(block_header)) if block_header.is_switch_block() => { block_header.is_last_block_before_activation(&self.chainspec.protocol_config) } - Ok(Some(_)) | Ok(None) => false, + Ok(Some(_) | None) => false, Err(msg) => { error!("{:?}: {}", self.state, msg); false diff --git a/node/src/reactor/main_reactor/event.rs b/node/src/reactor/main_reactor/event.rs index d14584ab4f..ffa2c2dbbd 100644 --- a/node/src/reactor/main_reactor/event.rs +++ b/node/src/reactor/main_reactor/event.rs @@ -562,7 +562,7 @@ impl From for MainEvent { impl From for MainEvent { fn from(request: TrieAccumulatorRequest) -> Self { MainEvent::BlockSynchronizer(block_synchronizer::Event::GlobalStateSynchronizer( - block_synchronizer::GlobalStateSynchronizerEvent::TrieAccumulatorEvent(request.into()), + GlobalStateSynchronizerEvent::TrieAccumulatorEvent(request.into()), )) } } diff --git a/node/src/tls.rs b/node/src/tls.rs index cad3f18468..b5bcaf0003 100644 --- a/node/src/tls.rs +++ b/node/src/tls.rs @@ -63,7 +63,7 @@ mod big_array { big_array! { BigArray; } } -/// The chosen signature algorithm (**ECDSA with SHA512**). +/// The chosen signature algorithm (**ECDSA with SHA512**). const SIGNATURE_ALGORITHM: Nid = Nid::ECDSA_WITH_SHA512; /// The underlying elliptic curve (**P-521**). @@ -556,7 +556,7 @@ fn tls_cert_from_x509_and_key( ec_key: EcKey, ) -> Result { let cert_fingerprint = cert_fingerprint(&cert)?; - let key_fingerprint = key_fingerprint(ec_key)?; + let key_fingerprint = key_fingerprint(&ec_key)?; Ok(TlsCert { x509: cert, cert_fingerprint, @@ -575,7 +575,7 @@ pub(crate) fn cert_fingerprint(cert: &X509) -> Result) -> Result { +pub(crate) fn key_fingerprint(ec_key: &EcKey) -> Result { let mut big_num_context = BigNumContext::new().map_err(ValidationError::BigNumContextNotAvailable)?; let buf = ec_key @@ -710,7 +710,7 @@ fn generate_private_key() -> SslResult> { // TODO: Please verify this for accuracy! let ec_group = ec::EcGroup::from_curve_name(SIGNATURE_CURVE)?; - let ec_key = ec::EcKey::generate(ec_group.as_ref())?; + let ec_key = EcKey::generate(ec_group.as_ref())?; PKey::from_ec_key(ec_key) } @@ -836,7 +836,7 @@ impl Hash for Sha512 { // TODO: Benchmark if this is really worthwhile over the automatic derivation. chunk.copy_from_slice(&self.bytes()[0..8]); - state.write_u64(u64::from_le_bytes(chunk)) + state.write_u64(u64::from_le_bytes(chunk)); } } diff --git a/node/src/types/sync_leap.rs b/node/src/types/sync_leap.rs index ab51f11534..62178da8e3 100644 --- a/node/src/types/sync_leap.rs +++ b/node/src/types/sync_leap.rs @@ -84,7 +84,7 @@ impl SyncLeapIdentifier { } impl Display for SyncLeapIdentifier { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!( f, "{} trusted_ancestor_only: {}", diff --git a/node/src/utils.rs b/node/src/utils.rs index 42b124cefd..b9d372a515 100644 --- a/node/src/utils.rs +++ b/node/src/utils.rs @@ -175,7 +175,7 @@ impl SharedFlag { /// Set the flag. pub(crate) fn set(self) { - self.0.store(true, Ordering::SeqCst) + self.0.store(true, Ordering::SeqCst); } /// Returns a shared instance of the flag for testing. @@ -287,8 +287,9 @@ impl Source { impl Display for Source { fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { match self { - Source::PeerGossiped(node_id) => Display::fmt(node_id, formatter), - Source::Peer(node_id) => Display::fmt(node_id, formatter), + Source::PeerGossiped(node_id) | Source::Peer(node_id) => { + Display::fmt(node_id, formatter) + } Source::Client => write!(formatter, "client"), Source::SpeculativeExec => write!(formatter, "client (speculative exec)"), Source::Ourself => write!(formatter, "ourself"), diff --git a/node/src/utils/work_queue.rs b/node/src/utils/work_queue.rs index 76c5d23c9b..a04bc9b7c7 100644 --- a/node/src/utils/work_queue.rs +++ b/node/src/utils/work_queue.rs @@ -217,7 +217,7 @@ impl JobHandle { impl Drop for JobHandle { fn drop(&mut self) { - self.queue.complete_job() + self.queue.complete_job(); } } diff --git a/types/src/uref.rs b/types/src/uref.rs index 29bd061b68..8e2b039298 100644 --- a/types/src/uref.rs +++ b/types/src/uref.rs @@ -207,7 +207,7 @@ impl URef { /// Removes specific access rights from this URef if present. pub fn disable_access_rights(&mut self, access_rights: AccessRights) { - self.1.remove(access_rights) + self.1.remove(access_rights); } } From fed483e9dad3c010491236d2342cad4aba223f5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Ciarcin=CC=81ski?= Date: Wed, 26 Jun 2024 21:33:15 +0200 Subject: [PATCH 2/7] More cleanups --- .../components/consensus/protocols/highway.rs | 3 +- .../consensus/protocols/zug/participation.rs | 3 +- node/src/components/gossiper/gossip_table.rs | 32 +++++++++---------- node/src/reactor/event_queue_metrics.rs | 2 +- node/src/reactor/main_reactor/control.rs | 2 +- 5 files changed, 20 insertions(+), 22 deletions(-) diff --git a/node/src/components/consensus/protocols/highway.rs b/node/src/components/consensus/protocols/highway.rs index c6589fc004..e0c081d0af 100644 --- a/node/src/components/consensus/protocols/highway.rs +++ b/node/src/components/consensus/protocols/highway.rs @@ -339,9 +339,8 @@ impl HighwayProtocol { }); } return outcomes; - } else { - self.log_proposal(vertex, "proposal does not need validation"); } + self.log_proposal(vertex, "proposal does not need validation"); } // Either consensus value doesn't need validation or it's not a proposal. diff --git a/node/src/components/consensus/protocols/zug/participation.rs b/node/src/components/consensus/protocols/zug/participation.rs index 68200c0cfc..4ca1b22d78 100644 --- a/node/src/components/consensus/protocols/zug/participation.rs +++ b/node/src/components/consensus/protocols/zug/participation.rs @@ -50,9 +50,8 @@ impl ParticipationStatus { { if r_id.saturating_add(2) < zug.current_round { return Some(ParticipationStatus::LastSeenInRound(*r_id)); - } else { - return None; // Seen recently; considered currently active. } + return None; // Seen recently; considered currently active. } } Some(ParticipationStatus::Inactive) diff --git a/node/src/components/gossiper/gossip_table.rs b/node/src/components/gossiper/gossip_table.rs index 565998b19f..7b09cba5dc 100644 --- a/node/src/components/gossiper/gossip_table.rs +++ b/node/src/components/gossiper/gossip_table.rs @@ -680,7 +680,7 @@ mod tests { // Check receiving a gossip response, causes `ShouldGossip` to be returned and holders // updated. - gossip_table.register_infection_attempt(&data_id, std::iter::once(&node_ids[1])); + gossip_table.register_infection_attempt(&data_id, iter::once(&node_ids[1])); let action = gossip_table.already_infected(&data_id, node_ids[1]); let expected = GossipAction::ShouldGossip(ShouldGossip { count: 1, @@ -691,7 +691,7 @@ mod tests { assert_eq!(expected, action); check_holders(&node_ids[..2], &gossip_table, &data_id); - gossip_table.register_infection_attempt(&data_id, std::iter::once(&node_ids[2])); + gossip_table.register_infection_attempt(&data_id, iter::once(&node_ids[2])); let action = gossip_table.new_complete_data(&data_id, Some(node_ids[2]), GossipTarget::All); assert_eq!(GossipAction::Noop, action); check_holders(&node_ids[..3], &gossip_table, &data_id); @@ -700,7 +700,7 @@ mod tests { // causes `Noop` to be returned and holders cleared. let limit = 3 + EXPECTED_DEFAULT_INFECTION_TARGET; for node_id in &node_ids[3..limit] { - gossip_table.register_infection_attempt(&data_id, std::iter::once(node_id)); + gossip_table.register_infection_attempt(&data_id, iter::once(node_id)); let _ = gossip_table.we_infected(&data_id, *node_id); } let action = gossip_table.new_complete_data(&data_id, None, GossipTarget::All); @@ -739,7 +739,7 @@ mod tests { let _ = gossip_table.new_complete_data(&data_id, None, GossipTarget::All); let limit = EXPECTED_DEFAULT_INFECTION_TARGET - 1; for node_id in node_ids.iter().take(limit) { - gossip_table.register_infection_attempt(&data_id, std::iter::once(node_id)); + gossip_table.register_infection_attempt(&data_id, iter::once(node_id)); let action = gossip_table.we_infected(&data_id, *node_id); assert_eq!(GossipAction::Noop, action); assert!(!gossip_table.finished.contains(&data_id)); @@ -747,7 +747,7 @@ mod tests { // Check recording an infection from an already-recorded infectee doesn't cause us to stop // gossiping. - gossip_table.register_infection_attempt(&data_id, std::iter::once(&node_ids[limit - 1])); + gossip_table.register_infection_attempt(&data_id, iter::once(&node_ids[limit - 1])); let action = gossip_table.we_infected(&data_id, node_ids[limit - 1]); let expected = GossipAction::ShouldGossip(ShouldGossip { count: 1, @@ -759,7 +759,7 @@ mod tests { assert!(!gossip_table.finished.contains(&data_id)); // Check third new infection does cause us to stop gossiping. - gossip_table.register_infection_attempt(&data_id, std::iter::once(&node_ids[limit])); + gossip_table.register_infection_attempt(&data_id, iter::once(&node_ids[limit])); let action = gossip_table.we_infected(&data_id, node_ids[limit]); assert_eq!(GossipAction::AnnounceFinished, action); assert!(gossip_table.finished.contains(&data_id)); @@ -818,7 +818,7 @@ mod tests { let limit = EXPECTED_DEFAULT_ATTEMPTED_TO_INFECT_LIMIT - 1; for node_id in node_ids.iter().take(limit) { let _ = gossip_table.new_complete_data(&data_id, Some(*node_id), GossipTarget::All); - gossip_table.register_infection_attempt(&data_id, std::iter::once(node_id)); + gossip_table.register_infection_attempt(&data_id, iter::once(node_id)); assert!(!gossip_table.finished.contains(&data_id)); } @@ -826,7 +826,7 @@ mod tests { // `finished` collection. gossip_table.register_infection_attempt( &data_id, - std::iter::once(&node_ids[EXPECTED_DEFAULT_ATTEMPTED_TO_INFECT_LIMIT]), + iter::once(&node_ids[EXPECTED_DEFAULT_ATTEMPTED_TO_INFECT_LIMIT]), ); let action = gossip_table.check_timeout( &data_id, @@ -875,7 +875,7 @@ mod tests { let _ = gossip_table.new_complete_data(&data_id, None, GossipTarget::All); let limit = EXPECTED_DEFAULT_ATTEMPTED_TO_INFECT_LIMIT - 1; for (index, node_id) in node_ids.iter().enumerate().take(limit) { - gossip_table.register_infection_attempt(&data_id, std::iter::once(node_id)); + gossip_table.register_infection_attempt(&data_id, iter::once(node_id)); let action = gossip_table.already_infected(&data_id, *node_id); let expected = GossipAction::ShouldGossip(ShouldGossip { count: 1, @@ -888,7 +888,7 @@ mod tests { // Check recording a non-infection from an already-recorded holder doesn't cause us to stop // gossiping. - gossip_table.register_infection_attempt(&data_id, std::iter::once(&node_ids[0])); + gossip_table.register_infection_attempt(&data_id, iter::once(&node_ids[0])); let action = gossip_table.already_infected(&data_id, node_ids[0]); let expected = GossipAction::ShouldGossip(ShouldGossip { count: 1, @@ -899,7 +899,7 @@ mod tests { assert_eq!(expected, action); // Check 15th non-infection does cause us to stop gossiping. - gossip_table.register_infection_attempt(&data_id, std::iter::once(&node_ids[limit])); + gossip_table.register_infection_attempt(&data_id, iter::once(&node_ids[limit])); let action = gossip_table.we_infected(&data_id, node_ids[limit]); assert_eq!(GossipAction::AnnounceFinished, action); } @@ -917,19 +917,19 @@ mod tests { let _ = gossip_table.new_complete_data(&data_id, None, GossipTarget::All); let infection_limit = EXPECTED_DEFAULT_INFECTION_TARGET - 1; for node_id in &node_ids[0..infection_limit] { - gossip_table.register_infection_attempt(&data_id, std::iter::once(node_id)); + gossip_table.register_infection_attempt(&data_id, iter::once(node_id)); let _ = gossip_table.we_infected(&data_id, *node_id); } let attempted_to_infect = EXPECTED_DEFAULT_ATTEMPTED_TO_INFECT_LIMIT - 2; for node_id in &node_ids[infection_limit..attempted_to_infect] { - gossip_table.register_infection_attempt(&data_id, std::iter::once(node_id)); + gossip_table.register_infection_attempt(&data_id, iter::once(node_id)); let _ = gossip_table.already_infected(&data_id, *node_id); } // Check adding 12th non-infection doesn't cause us to stop gossiping. gossip_table - .register_infection_attempt(&data_id, std::iter::once(&node_ids[attempted_to_infect])); + .register_infection_attempt(&data_id, iter::once(&node_ids[attempted_to_infect])); let action = gossip_table.already_infected(&data_id, node_ids[attempted_to_infect]); let expected = GossipAction::ShouldGossip(ShouldGossip { count: 1, @@ -958,11 +958,11 @@ mod tests { // check_timeout for node 0 should return Noop, and for node 1 it should represent a timed // out response and return ShouldGossip. - gossip_table.register_infection_attempt(&data_id, std::iter::once(&node_ids[0])); + gossip_table.register_infection_attempt(&data_id, iter::once(&node_ids[0])); let action = gossip_table.check_timeout(&data_id, node_ids[0]); assert_eq!(GossipAction::Noop, action); - gossip_table.register_infection_attempt(&data_id, std::iter::once(&node_ids[1])); + gossip_table.register_infection_attempt(&data_id, iter::once(&node_ids[1])); let action = gossip_table.check_timeout(&data_id, node_ids[1]); let expected = GossipAction::ShouldGossip(ShouldGossip { count: 1, diff --git a/node/src/reactor/event_queue_metrics.rs b/node/src/reactor/event_queue_metrics.rs index a9971bff59..1b19d46839 100644 --- a/node/src/reactor/event_queue_metrics.rs +++ b/node/src/reactor/event_queue_metrics.rs @@ -90,7 +90,7 @@ impl Drop for EventQueueMetrics { .for_each(|(key, queue_gauge)| { self.registry .unregister(Box::new(queue_gauge.clone())) - .unwrap_or_else(|_| error!("unregistering {} failed: was not registered", key)) + .unwrap_or_else(|_| error!("unregistering {} failed: was not registered", key)); }); } } diff --git a/node/src/reactor/main_reactor/control.rs b/node/src/reactor/main_reactor/control.rs index 1268114dab..d59ef1e2cc 100644 --- a/node/src/reactor/main_reactor/control.rs +++ b/node/src/reactor/main_reactor/control.rs @@ -36,7 +36,7 @@ impl MainReactor { effects.extend( async move { if !delay.is_zero() { - tokio::time::sleep(delay).await + tokio::time::sleep(delay).await; } } .event(|_| MainEvent::ReactorCrank), From fc97d5a831593729b930c7bf7e4f95642748a7d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Ciarcin=CC=81ski?= Date: Fri, 28 Jun 2024 13:18:46 +0200 Subject: [PATCH 3/7] Code cleanup --- node/src/cli.rs | 9 ++--- node/src/cli/arglang.rs | 2 +- node/src/components/binary_port.rs | 2 +- .../components/block_synchronizer/tests.rs | 2 +- node/src/components/block_validator.rs | 4 +-- .../consensus/highway_core/state.rs | 2 +- .../consensus/highway_core/state/panorama.rs | 2 +- node/src/components/fetcher/item_fetcher.rs | 8 ++--- node/src/components/gossiper/gossip_table.rs | 3 +- node/src/components/network.rs | 36 +++++++++++-------- node/src/components/network/event.rs | 6 ++-- node/src/components/network/tasks.rs | 8 ++--- node/src/components/rest_server.rs | 29 ++++++++++----- .../src/components/rest_server/http_server.rs | 4 +-- .../components/transaction_buffer/tests.rs | 6 ++-- node/src/types/block/block_payload.rs | 16 ++++----- node/src/types/value_or_chunk.rs | 5 --- node/src/utils.rs | 11 +++--- node/src/utils/ds.rs | 2 +- types/src/chainspec.rs | 2 -- 20 files changed, 82 insertions(+), 77 deletions(-) diff --git a/node/src/cli.rs b/node/src/cli.rs index 960bc6b7de..e83656fe70 100644 --- a/node/src/cli.rs +++ b/node/src/cli.rs @@ -203,8 +203,7 @@ impl Cli { let old_root = old_config .parent() - .map(|path| path.to_owned()) - .unwrap_or_else(|| "/".into()); + .map_or_else(|| "/".into(), Path::to_path_buf); let encoded_old_config = fs::read_to_string(&old_config) .context("could not read old configuration file") .with_context(|| old_config.display().to_string())?; @@ -225,8 +224,7 @@ impl Cli { let old_root = old_config .parent() - .map(|path| path.to_owned()) - .unwrap_or_else(|| "/".into()); + .map_or_else(|| "/".into(), Path::to_path_buf); let encoded_old_config = fs::read_to_string(&old_config) .context("could not read old configuration file") .with_context(|| old_config.display().to_string())?; @@ -251,8 +249,7 @@ impl Cli { // Otherwise, we default to `/`. let root = config .parent() - .map(|path| path.to_owned()) - .unwrap_or_else(|| "/".into()); + .map_or_else(|| "/".into(), Path::to_path_buf); // The app supports running without a config file, using default values. let encoded_config = fs::read_to_string(config) diff --git a/node/src/cli/arglang.rs b/node/src/cli/arglang.rs index 16cc045661..1d935e6df2 100644 --- a/node/src/cli/arglang.rs +++ b/node/src/cli/arglang.rs @@ -71,7 +71,7 @@ fn tokenize(input: &str) -> Result, Error> { } else if let Ok(value) = bool::from_str(&buffer) { tokens.push(Token::Boolean(value)); } else { - tokens.push(Token::String(buffer.clone())) + tokens.push(Token::String(buffer.clone())); } buffer.clear(); diff --git a/node/src/components/binary_port.rs b/node/src/components/binary_port.rs index 1ddfef39ee..ad15b9b076 100644 --- a/node/src/components/binary_port.rs +++ b/node/src/components/binary_port.rs @@ -1416,7 +1416,7 @@ where let response = handle_request(request, effect_builder, &config, &chainspec, &metrics) .await; - responder.respond(response).await + responder.respond(response).await; } .ignore() } diff --git a/node/src/components/block_synchronizer/tests.rs b/node/src/components/block_synchronizer/tests.rs index 3d79fa486e..9fe662f47e 100644 --- a/node/src/components/block_synchronizer/tests.rs +++ b/node/src/components/block_synchronizer/tests.rs @@ -275,7 +275,7 @@ impl BlockSynchronizer { Arc::new(Chainspec::random(rng)), MAX_SIMULTANEOUS_PEERS, validator_matrix, - &prometheus::Registry::new(), + &Registry::new(), ) .expect("Failed to create BlockSynchronizer"); diff --git a/node/src/components/block_validator.rs b/node/src/components/block_validator.rs index ceb3a28139..57fa59acc5 100644 --- a/node/src/components/block_validator.rs +++ b/node/src/components/block_validator.rs @@ -543,7 +543,7 @@ impl BlockValidator { Err(error) => warn!(%transaction_hash, %error, "could not fetch transaction"), } match result { - Ok(FetchedData::FromStorage { item }) | Ok(FetchedData::FromPeer { item, .. }) => { + Ok(FetchedData::FromStorage { item } | FetchedData::FromPeer { item, .. }) => { let item_hash = item.hash(); if item_hash != transaction_hash { // Hard failure - change state to Invalid. @@ -661,7 +661,7 @@ impl BlockValidator { } } match result { - Ok(FetchedData::FromStorage { .. }) | Ok(FetchedData::FromPeer { .. }) => { + Ok(FetchedData::FromStorage { .. } | FetchedData::FromPeer { .. }) => { let mut effects = Effects::new(); for state in self.validation_states.values_mut() { let responders = state.try_add_signature(&finality_signature_id); diff --git a/node/src/components/consensus/highway_core/state.rs b/node/src/components/consensus/highway_core/state.rs index 2352ff8ac5..ef878c4052 100644 --- a/node/src/components/consensus/highway_core/state.rs +++ b/node/src/components/consensus/highway_core/state.rs @@ -452,7 +452,7 @@ impl State { } let idx = evidence.perpetrator(); match self.faults.get(&idx) { - Some(&Fault::Banned) | Some(&Fault::Direct(_)) => return false, + Some(&Fault::Banned | &Fault::Direct(_)) => return false, None | Some(&Fault::Indirect) => (), } // TODO: Should use Display, not Debug! diff --git a/node/src/components/consensus/highway_core/state/panorama.rs b/node/src/components/consensus/highway_core/state/panorama.rs index 320712541b..cc3fe8168c 100644 --- a/node/src/components/consensus/highway_core/state/panorama.rs +++ b/node/src/components/consensus/highway_core/state/panorama.rs @@ -215,7 +215,7 @@ impl Panorama { .find(|(_, unit)| unit.timestamp <= timestamp) .map(|(vh, _)| *vh) .map_or(Observation::None, Observation::Correct), - obs @ Observation::None | obs @ Observation::Faulty => obs.clone(), + obs @ (Observation::None | Observation::Faulty) => obs.clone(), }; Panorama::from(self.iter().map(obs_cutoff).collect_vec()) } diff --git a/node/src/components/fetcher/item_fetcher.rs b/node/src/components/fetcher/item_fetcher.rs index f16be27a53..e86e64c7db 100644 --- a/node/src/components/fetcher/item_fetcher.rs +++ b/node/src/components/fetcher/item_fetcher.rs @@ -241,10 +241,10 @@ pub(super) trait ItemFetcher { } } Err( - error @ Error::Absent { .. } - | error @ Error::Rejected { .. } - | error @ Error::CouldNotConstructGetRequest { .. } - | error @ Error::ValidationMetadataMismatch { .. }, + error @ (Error::Absent { .. } + | Error::Rejected { .. } + | Error::CouldNotConstructGetRequest { .. } + | Error::ValidationMetadataMismatch { .. }), ) => { // For all other error variants we can safely respond with failure as there's no // chance for the request to succeed. diff --git a/node/src/components/gossiper/gossip_table.rs b/node/src/components/gossiper/gossip_table.rs index 7b09cba5dc..a7ce14d921 100644 --- a/node/src/components/gossiper/gossip_table.rs +++ b/node/src/components/gossiper/gossip_table.rs @@ -136,9 +136,8 @@ impl State { exclude_peers: self.attempted_to_infect.clone(), is_already_held: !is_new, }); - } else { - return GossipAction::Noop; } + return GossipAction::Noop; } if is_new { diff --git a/node/src/components/network.rs b/node/src/components/network.rs index da81fcfe8b..5a5f0fe988 100644 --- a/node/src/components/network.rs +++ b/node/src/components/network.rs @@ -54,6 +54,7 @@ use std::{ }; use datasize::DataSize; +#[cfg(test)] use futures::{future::BoxFuture, FutureExt}; use itertools::Itertools; use prometheus::Registry; @@ -62,13 +63,14 @@ use rand::{ Rng, }; use serde::{Deserialize, Serialize}; +#[cfg(test)] +use tokio::task::JoinHandle; use tokio::{ net::TcpStream, sync::{ mpsc::{self, UnboundedSender}, watch, }, - task::JoinHandle, }; use tokio_openssl::SslStream; use tokio_util::codec::LengthDelimitedCodec; @@ -103,6 +105,8 @@ use self::{ symmetry::ConnectionSymmetry, tasks::{MessageQueueItem, NetworkContext}, }; +#[cfg(test)] +use crate::reactor::Finalize; use crate::{ components::{gossiper::GossipItem, Component, ComponentState, InitializedComponent}, effect::{ @@ -110,7 +114,7 @@ use crate::{ requests::{BeginGossipRequest, NetworkInfoRequest, NetworkRequest, StorageRequest}, AutoClosingResponder, EffectBuilder, EffectExt, Effects, GossipTarget, }, - reactor::{Finalize, ReactorEvent}, + reactor::ReactorEvent, tls, types::{NodeId, ValidatorMatrix}, utils::{self, display_error, Source}, @@ -119,7 +123,9 @@ use crate::{ const COMPONENT_NAME: &str = "network"; +#[cfg(test)] const MAX_METRICS_DROP_ATTEMPTS: usize = 25; +#[cfg(test)] const DROP_RETRY_DELAY: Duration = Duration::from_millis(100); /// How often to keep attempting to reconnect to a node before giving up. Note that reconnection @@ -179,7 +185,7 @@ where /// Tracks nodes that have announced themselves as nodes that are syncing. syncing_nodes: HashSet, - + #[data_size(skip)] channel_management: Option, /// Networking metrics. @@ -203,26 +209,24 @@ where state: ComponentState, } -#[derive(DataSize)] struct ChannelManagement { /// Channel signaling a shutdown of the network. // Note: This channel is closed when `Network` is dropped, signalling the receivers that // they should cease operation. - #[data_size(skip)] + #[cfg(test)] shutdown_sender: Option>, /// Join handle for the server thread. - #[data_size(skip)] + #[cfg(test)] server_join_handle: Option>, /// Channel signaling a shutdown of the incoming connections. // Note: This channel is closed when we finished syncing, so the `Network` can close all // connections. When they are re-established, the proper value of the now updated `is_syncing` // flag will be exchanged on handshake. - #[data_size(skip)] + #[cfg(test)] close_incoming_sender: Option>, /// Handle used by the `message_reader` task to receive a notification that incoming /// connections should be closed. - #[data_size(skip)] close_incoming_receiver: watch::Receiver<()>, } @@ -354,11 +358,11 @@ where // which we need to shutdown cleanly later on. info!(%local_addr, %public_addr, %protocol_version, "starting server background task"); - let (server_shutdown_sender, server_shutdown_receiver) = watch::channel(()); - let (close_incoming_sender, close_incoming_receiver) = watch::channel(()); + let (_server_shutdown_sender, server_shutdown_receiver) = watch::channel(()); + let (_close_incoming_sender, close_incoming_receiver) = watch::channel(()); let context = self.context.clone(); - let server_join_handle = tokio::spawn( + let _server_join_handle = tokio::spawn( tasks::server( context, tokio::net::TcpListener::from_std(listener).map_err(Error::ListenerConversion)?, @@ -368,9 +372,12 @@ where ); let channel_management = ChannelManagement { - shutdown_sender: Some(server_shutdown_sender), - server_join_handle: Some(server_join_handle), - close_incoming_sender: Some(close_incoming_sender), + #[cfg(test)] + shutdown_sender: Some(_server_shutdown_sender), + #[cfg(test)] + server_join_handle: Some(_server_join_handle), + #[cfg(test)] + close_incoming_sender: Some(_close_incoming_sender), close_incoming_receiver, }; @@ -1026,6 +1033,7 @@ where } } +#[cfg(test)] impl Finalize for Network where REv: Send + 'static, diff --git a/node/src/components/network/event.rs b/node/src/components/network/event.rs index 59c34f1b52..90570c8216 100644 --- a/node/src/components/network/event.rs +++ b/node/src/components/network/event.rs @@ -1,6 +1,7 @@ use std::{ fmt::{self, Debug, Display, Formatter}, - io, mem, + io, + mem::size_of, net::SocketAddr, sync::Arc, }; @@ -22,8 +23,7 @@ use crate::{ protocol::Message as ProtocolMessage, }; -const _NETWORK_EVENT_SIZE: usize = mem::size_of::>(); -const_assert!(_NETWORK_EVENT_SIZE < 65); +const_assert!(size_of::>() < 65); /// A network event. #[derive(Debug, From, Serialize)] diff --git a/node/src/components/network/tasks.rs b/node/src/components/network/tasks.rs index 16e2a1daf9..63e859158e 100644 --- a/node/src/components/network/tasks.rs +++ b/node/src/components/network/tasks.rs @@ -28,7 +28,7 @@ use prometheus::IntGauge; use rand::Rng; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tokio::{ - net::TcpStream, + net::{TcpListener, TcpStream}, sync::{mpsc::UnboundedReceiver, watch, Semaphore}, }; use tokio_openssl::SslStream; @@ -581,7 +581,7 @@ where /// Runs the server core acceptor loop. pub(super) async fn server( context: Arc>, - listener: tokio::net::TcpListener, + listener: TcpListener, mut shutdown_receiver: watch::Receiver<()>, ) where REv: From> + Send, @@ -630,8 +630,8 @@ pub(super) async fn server( // // The code in its current state will consume 100% CPU if local resource // exhaustion happens, as no distinction is made and no delay introduced. - Err(ref err) => { - warn!(%context.our_id, err=display_error(err), "dropping incoming connection during accept") + Err(err) => { + warn!(%context.our_id, err=display_error(&err), "dropping incoming connection during accept") } } } diff --git a/node/src/components/rest_server.rs b/node/src/components/rest_server.rs index d5c2d7bce4..7d15783a99 100644 --- a/node/src/components/rest_server.rs +++ b/node/src/components/rest_server.rs @@ -28,14 +28,22 @@ mod info; use std::{net::SocketAddr, sync::Arc}; use datasize::DataSize; -use futures::{future::BoxFuture, join, FutureExt}; +use futures::join; +#[cfg(test)] +use futures::{future::BoxFuture, FutureExt}; use once_cell::sync::OnceCell; -use tokio::{sync::oneshot, task::JoinHandle}; -use tracing::{debug, error, info, warn}; +use tokio::sync::oneshot; +#[cfg(test)] +use tokio::task::JoinHandle; +#[cfg(test)] +use tracing::debug; +use tracing::{error, info, warn}; use casper_types::ProtocolVersion; use super::{Component, ComponentState, InitializedComponent}; +#[cfg(test)] +use crate::reactor::Finalize; use crate::{ components::PortBoundComponent, effect::{ @@ -46,7 +54,7 @@ use crate::{ }, EffectBuilder, EffectExt, Effects, }, - reactor::{main_reactor::MainEvent, Finalize}, + reactor::main_reactor::MainEvent, types::{ChainspecInfo, StatusFeed}, utils::{self, ListeningError}, NodeRng, @@ -94,11 +102,13 @@ impl ReactorEventT for REv where #[derive(DataSize, Debug)] pub(crate) struct InnerRestServer { /// When the message is sent, it signals the server loop to exit cleanly. + #[cfg(test)] #[data_size(skip)] shutdown_sender: oneshot::Sender<()>, /// The address the server is listening on. local_addr: Arc>, /// The task handle which will only join once the server loop has exited. + #[cfg(test)] #[data_size(skip)] server_join_handle: Option>, /// The network name, as specified in the chainspec @@ -283,12 +293,12 @@ where effect_builder: EffectBuilder, ) -> Result, Self::Error> { let cfg = &self.config; - let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>(); + let (_shutdown_sender, shutdown_receiver) = oneshot::channel::<()>(); let builder = utils::start_listening(&cfg.address)?; let local_addr: Arc> = Default::default(); - let server_join_handle = if cfg.cors_origin.is_empty() { + let _server_join_handle = if cfg.cors_origin.is_empty() { Some(tokio::spawn(http_server::run( builder, effect_builder, @@ -312,8 +322,10 @@ where let network_name = self.network_name.clone(); self.inner_rest = Some(InnerRestServer { local_addr, - shutdown_sender, - server_join_handle, + #[cfg(test)] + shutdown_sender: _shutdown_sender, + #[cfg(test)] + server_join_handle: _server_join_handle, network_name, }); @@ -321,6 +333,7 @@ where } } +#[cfg(test)] impl Finalize for RestServer { fn finalize(self) -> BoxFuture<'static, ()> { async { diff --git a/node/src/components/rest_server/http_server.rs b/node/src/components/rest_server/http_server.rs index c632044949..55de370dd3 100644 --- a/node/src/components/rest_server/http_server.rs +++ b/node/src/components/rest_server/http_server.rs @@ -54,7 +54,7 @@ pub(super) async fn run( // Shutdown the server gracefully. let _ = server - .with_graceful_shutdown(async { + .with_graceful_shutdown(async move { shutdown_receiver.await.ok(); }) .map_err(|error| { @@ -109,7 +109,7 @@ pub(super) async fn run_with_cors( // Shutdown the server gracefully. let _ = server - .with_graceful_shutdown(async { + .with_graceful_shutdown(async move { shutdown_receiver.await.ok(); }) .map_err(|error| { diff --git a/node/src/components/transaction_buffer/tests.rs b/node/src/components/transaction_buffer/tests.rs index a9c544ae0b..488de296d8 100644 --- a/node/src/components/transaction_buffer/tests.rs +++ b/node/src/components/transaction_buffer/tests.rs @@ -847,7 +847,7 @@ fn register_transactions_and_blocks() { // test if transactions held for proposed blocks which did not get finalized in time // are eligible again let count = rng.gen_range(1..11); - let txns: Vec<_> = std::iter::repeat_with(|| Transaction::Deploy(Deploy::random(&mut rng))) + let txns: Vec<_> = iter::repeat_with(|| Transaction::Deploy(Deploy::random(&mut rng))) .take(count) .collect(); let block = FinalizedBlock::random_with_specifics( @@ -1401,7 +1401,7 @@ fn register_random_deploys_unique_hashes( num_deploys: usize, rng: &mut TestRng, ) { - let deploys = std::iter::repeat_with(|| { + let deploys = iter::repeat_with(|| { let name = format!("{}", rng.gen::()); let call = format!("{}", rng.gen::()); Deploy::random_contract_by_name( @@ -1424,7 +1424,7 @@ fn register_random_deploys_same_hash( num_deploys: usize, rng: &mut TestRng, ) { - let deploys = std::iter::repeat_with(|| { + let deploys = iter::repeat_with(|| { let name = "test".to_owned(); let call = "test".to_owned(); Deploy::random_contract_by_name( diff --git a/node/src/types/block/block_payload.rs b/node/src/types/block/block_payload.rs index 8c3f8b2c06..8474cf1970 100644 --- a/node/src/types/block/block_payload.rs +++ b/node/src/types/block/block_payload.rs @@ -49,7 +49,7 @@ impl BlockPayload { let mut ret = vec![]; if let Some(transactions) = self.transactions.get(&MINT_LANE_ID) { for transaction in transactions { - ret.push(transaction) + ret.push(transaction); } } ret.into_iter() @@ -60,7 +60,7 @@ impl BlockPayload { let mut ret = vec![]; if let Some(transactions) = self.transactions.get(&AUCTION_LANE_ID) { for transaction in transactions { - ret.push(transaction) + ret.push(transaction); } } ret.into_iter() @@ -71,7 +71,7 @@ impl BlockPayload { let mut ret = vec![]; if let Some(transactions) = self.transactions.get(&INSTALL_UPGRADE_LANE_ID) { for transaction in transactions { - ret.push(transaction) + ret.push(transaction); } } ret.into_iter() @@ -85,7 +85,7 @@ impl BlockPayload { let mut ret = vec![]; if let Some(transactions) = self.transactions.get(&category) { for transaction in transactions { - ret.push(transaction) + ret.push(transaction); } } ret.into_iter() @@ -93,7 +93,7 @@ impl BlockPayload { pub(crate) fn finalized_payload(&self) -> BTreeMap> { let mut ret = BTreeMap::new(); - for (category, transactions) in self.transactions.iter() { + for (category, transactions) in &self.transactions { let transactions = transactions.iter().map(|(tx, _)| *tx).collect(); ret.insert(*category, transactions); } @@ -104,11 +104,7 @@ impl BlockPayload { /// Returns count of transactions by category. pub fn count(&self, category: Option) -> usize { match category { - None => self - .transactions - .values() - .map(|transactions| transactions.len()) - .sum(), + None => self.transactions.values().map(Vec::len).sum(), Some(category) => match self.transactions.get(&category) { Some(values) => values.len(), None => 0, diff --git a/node/src/types/value_or_chunk.rs b/node/src/types/value_or_chunk.rs index b6381fba29..d8d071ef0f 100644 --- a/node/src/types/value_or_chunk.rs +++ b/node/src/types/value_or_chunk.rs @@ -107,11 +107,6 @@ impl Display for ValueOrChunk> { } } -/// Error type simply conveying that chunk validation failed. -#[derive(Debug, Error)] -#[error("Chunk validation failed")] -pub(crate) struct ChunkValidationError; - #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, DataSize)] pub struct HashingTrieRaw { inner: TrieRaw, diff --git a/node/src/utils.rs b/node/src/utils.rs index b9d372a515..72d03fa7ef 100644 --- a/node/src/utils.rs +++ b/node/src/utils.rs @@ -15,18 +15,16 @@ pub(crate) mod specimen; pub(crate) mod umask; pub mod work_queue; +#[cfg(test)] +use std::{any, sync::Arc, time::Duration}; use std::{ - any, fmt::{self, Debug, Display, Formatter}, io, net::{SocketAddr, ToSocketAddrs}, ops::{Add, BitXorAssign, Div}, path::{Path, PathBuf}, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - time::{Duration, Instant, SystemTime}, + sync::atomic::{AtomicBool, Ordering}, + time::{Instant, SystemTime}, }; use datasize::DataSize; @@ -360,6 +358,7 @@ pub(crate) fn xor(lhs: &mut [u8], rhs: &[u8]) { /// /// Using this function is usually a potential architectural issue and it should be used very /// sparingly. Consider introducing a different access pattern for the value under `Arc`. +#[cfg(test)] pub(crate) async fn wait_for_arc_drop( arc: Arc, attempts: usize, diff --git a/node/src/utils/ds.rs b/node/src/utils/ds.rs index f4f257c278..e412381afa 100644 --- a/node/src/utils/ds.rs +++ b/node/src/utils/ds.rs @@ -53,7 +53,7 @@ where let sampled = vec .as_slice() .choose_multiple(&mut rng, SAMPLE_SIZE) - .map(|v| v.estimate_heap_size()) + .map(DataSize::estimate_heap_size) .sum(); base_size + scale_sample(vec.len(), sampled) } diff --git a/types/src/chainspec.rs b/types/src/chainspec.rs index a695b45336..b1e4d60273 100644 --- a/types/src/chainspec.rs +++ b/types/src/chainspec.rs @@ -51,8 +51,6 @@ pub use core_config::{ DEFAULT_GAS_HOLD_INTERVAL, }; pub use fee_handling::FeeHandling; -#[cfg(any(feature = "testing", test))] -pub use genesis_config::DEFAULT_AUCTION_DELAY; #[cfg(any(feature = "std", test))] pub use genesis_config::{GenesisConfig, GenesisConfigBuilder}; pub use global_state_update::{GlobalStateUpdate, GlobalStateUpdateConfig, GlobalStateUpdateError}; From 9114aadfa00cfaf93a78019163bbdc33b6b713ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Ciarcin=CC=81ski?= Date: Mon, 1 Jul 2024 12:18:47 +0200 Subject: [PATCH 4/7] Handle server shutdown_receiver as for testing only --- node/src/components/network.rs | 12 ++++++------ node/src/components/network/tasks.rs | 25 ++++++++++++++++--------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/node/src/components/network.rs b/node/src/components/network.rs index 5a5f0fe988..c3830b33b0 100644 --- a/node/src/components/network.rs +++ b/node/src/components/network.rs @@ -223,7 +223,6 @@ struct ChannelManagement { // Note: This channel is closed when we finished syncing, so the `Network` can close all // connections. When they are re-established, the proper value of the now updated `is_syncing` // flag will be exchanged on handshake. - #[cfg(test)] close_incoming_sender: Option>, /// Handle used by the `message_reader` task to receive a notification that incoming /// connections should be closed. @@ -358,14 +357,16 @@ where // which we need to shutdown cleanly later on. info!(%local_addr, %public_addr, %protocol_version, "starting server background task"); - let (_server_shutdown_sender, server_shutdown_receiver) = watch::channel(()); - let (_close_incoming_sender, close_incoming_receiver) = watch::channel(()); + #[cfg(test)] + let (server_shutdown_sender, server_shutdown_receiver) = watch::channel(()); + let (close_incoming_sender, close_incoming_receiver) = watch::channel(()); let context = self.context.clone(); let _server_join_handle = tokio::spawn( tasks::server( context, tokio::net::TcpListener::from_std(listener).map_err(Error::ListenerConversion)?, + #[cfg(test)] server_shutdown_receiver, ) .in_current_span(), @@ -373,11 +374,10 @@ where let channel_management = ChannelManagement { #[cfg(test)] - shutdown_sender: Some(_server_shutdown_sender), + shutdown_sender: Some(server_shutdown_sender), #[cfg(test)] server_join_handle: Some(_server_join_handle), - #[cfg(test)] - close_incoming_sender: Some(_close_incoming_sender), + close_incoming_sender: Some(close_incoming_sender), close_incoming_receiver, }; diff --git a/node/src/components/network/tasks.rs b/node/src/components/network/tasks.rs index 63e859158e..34f314cb0e 100644 --- a/node/src/components/network/tasks.rs +++ b/node/src/components/network/tasks.rs @@ -582,6 +582,7 @@ where pub(super) async fn server( context: Arc>, listener: TcpListener, + #[cfg(test)] mut shutdown_receiver: watch::Receiver<()>, ) where REv: From> + Send, @@ -637,16 +638,22 @@ pub(super) async fn server( } }; - let shutdown_messages = async move { while shutdown_receiver.changed().await.is_ok() {} }; + #[cfg(not(test))] + accept_connections.await; - // Now we can wait for either the `shutdown` channel's remote end to do be dropped or the - // infinite loop to terminate, which never happens. - match future::select(Box::pin(shutdown_messages), Box::pin(accept_connections)).await { - Either::Left(_) => info!( - %context.our_id, - "shutting down socket, no longer accepting incoming connections" - ), - Either::Right(_) => unreachable!(), + #[cfg(test)] + { + let shutdown_messages = async move { while shutdown_receiver.changed().await.is_ok() {} }; + + // Now we can wait for either the `shutdown` channel's remote end to do be dropped or the + // infinite loop to terminate, which never happens. + match future::select(Box::pin(shutdown_messages), Box::pin(accept_connections)).await { + Either::Left(_) => info!( + %context.our_id, + "shutting down socket, no longer accepting incoming connections" + ), + Either::Right(_) => unreachable!(), + } } } From 82f9a2ae64fabe849aec2cdb190d0e1e833a760d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Ciarcin=CC=81ski?= Date: Mon, 1 Jul 2024 12:31:44 +0200 Subject: [PATCH 5/7] Apply cargo fmt --- node/src/components/network/tasks.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/node/src/components/network/tasks.rs b/node/src/components/network/tasks.rs index 34f314cb0e..9de7cba63f 100644 --- a/node/src/components/network/tasks.rs +++ b/node/src/components/network/tasks.rs @@ -582,8 +582,7 @@ where pub(super) async fn server( context: Arc>, listener: TcpListener, - #[cfg(test)] - mut shutdown_receiver: watch::Receiver<()>, + #[cfg(test)] mut shutdown_receiver: watch::Receiver<()>, ) where REv: From> + Send, P: Payload, From 09f16662bb4421ed078320c6f68e02a87b0a21d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Ciarcin=CC=81ski?= Date: Thu, 4 Jul 2024 12:18:02 +0200 Subject: [PATCH 6/7] Remove unused connection symmetries; more code cleanup --- node/src/components/binary_port.rs | 4 +- node/src/components/network.rs | 31 ++++++----- node/src/components/network/metrics.rs | 4 +- node/src/components/network/tasks.rs | 2 +- node/src/components/storage.rs | 72 ++++++++++++-------------- node/src/components/storage/tests.rs | 56 +++++--------------- 6 files changed, 69 insertions(+), 100 deletions(-) diff --git a/node/src/components/binary_port.rs b/node/src/components/binary_port.rs index ad15b9b076..5f3d451af6 100644 --- a/node/src/components/binary_port.rs +++ b/node/src/components/binary_port.rs @@ -929,7 +929,7 @@ where }; let block_rewards = match era_end.rewards() { Rewards::V2(rewards) => rewards, - _ => { + Rewards::V1(_) => { return BinaryResponse::new_error( ErrorCode::UnsupportedRewardsV1Request, protocol_version, @@ -974,7 +974,7 @@ where .await .map_or_else( |err| BinaryResponse::new_error(err.into(), protocol_version), - |_| BinaryResponse::new_empty(protocol_version), + |()| BinaryResponse::new_empty(protocol_version), ) } diff --git a/node/src/components/network.rs b/node/src/components/network.rs index c3830b33b0..9aabc8a980 100644 --- a/node/src/components/network.rs +++ b/node/src/components/network.rs @@ -45,7 +45,10 @@ pub(crate) mod tasks; mod tests; use std::{ - collections::{BTreeMap, BTreeSet, HashMap, HashSet}, + collections::{ + hash_map::{Entry, HashMap}, + BTreeMap, BTreeSet, HashSet, + }, fmt::{self, Debug, Display, Formatter}, io, net::{SocketAddr, TcpListener}, @@ -223,7 +226,7 @@ struct ChannelManagement { // Note: This channel is closed when we finished syncing, so the `Network` can close all // connections. When they are re-established, the proper value of the now updated `is_syncing` // flag will be exchanged on handshake. - close_incoming_sender: Option>, + _close_incoming_sender: Option>, /// Handle used by the `message_reader` task to receive a notification that incoming /// connections should be closed. close_incoming_receiver: watch::Receiver<()>, @@ -359,7 +362,7 @@ where #[cfg(test)] let (server_shutdown_sender, server_shutdown_receiver) = watch::channel(()); - let (close_incoming_sender, close_incoming_receiver) = watch::channel(()); + let (_close_incoming_sender, close_incoming_receiver) = watch::channel(()); let context = self.context.clone(); let _server_join_handle = tokio::spawn( @@ -377,7 +380,7 @@ where shutdown_sender: Some(server_shutdown_sender), #[cfg(test)] server_join_handle: Some(_server_join_handle), - close_incoming_sender: Some(close_incoming_sender), + _close_incoming_sender: Some(_close_incoming_sender), close_incoming_receiver, }; @@ -647,10 +650,11 @@ where } // Update the connection symmetries. - self.connection_symmetries - .entry(peer_id) - .or_default() - .remove_incoming(peer_addr, Instant::now()); + if let Entry::Occupied(mut entry) = self.connection_symmetries.entry(peer_id) { + if entry.get_mut().remove_incoming(peer_addr, Instant::now()) { + entry.remove(); + } + } Effects::new() }) @@ -871,10 +875,11 @@ where .outgoing_manager .handle_connection_drop(peer_addr, Instant::now()); - self.connection_symmetries - .entry(peer_id) - .or_default() - .unmark_outgoing(Instant::now()); + if let Entry::Occupied(mut entry) = self.connection_symmetries.entry(peer_id) { + if entry.get_mut().unmark_outgoing(Instant::now()) { + entry.remove(); + } + } self.outgoing_limiter.remove_connected_validator(&peer_id); @@ -1044,7 +1049,7 @@ where if let Some(mut channel_management) = self.channel_management.take() { // Close the shutdown socket, causing the server to exit. drop(channel_management.shutdown_sender.take()); - drop(channel_management.close_incoming_sender.take()); + drop(channel_management._close_incoming_sender.take()); // Wait for the server to exit cleanly. if let Some(join_handle) = channel_management.server_join_handle.take() { diff --git a/node/src/components/network/metrics.rs b/node/src/components/network/metrics.rs index 9cf382a2b4..3f296f7a86 100644 --- a/node/src/components/network/metrics.rs +++ b/node/src/components/network/metrics.rs @@ -474,11 +474,11 @@ impl Metrics { } MessageKind::BlockGossip => { metrics.out_bytes_block_gossip.inc_by(size); - metrics.out_count_block_gossip.inc() + metrics.out_count_block_gossip.inc(); } MessageKind::FinalitySignatureGossip => { metrics.out_bytes_finality_signature_gossip.inc_by(size); - metrics.out_count_finality_signature_gossip.inc() + metrics.out_count_finality_signature_gossip.inc(); } MessageKind::AddressGossip => { metrics.out_bytes_address_gossip.inc_by(size); diff --git a/node/src/components/network/tasks.rs b/node/src/components/network/tasks.rs index 9de7cba63f..952693cb40 100644 --- a/node/src/components/network/tasks.rs +++ b/node/src/components/network/tasks.rs @@ -631,7 +631,7 @@ pub(super) async fn server( // The code in its current state will consume 100% CPU if local resource // exhaustion happens, as no distinction is made and no delay introduced. Err(err) => { - warn!(%context.our_id, err=display_error(&err), "dropping incoming connection during accept") + warn!(%context.our_id, err=display_error(&err), "dropping incoming connection during accept"); } } } diff --git a/node/src/components/storage.rs b/node/src/components/storage.rs index 496e1c59ee..f3ccaeeb8a 100644 --- a/node/src/components/storage.rs +++ b/node/src/components/storage.rs @@ -608,7 +608,7 @@ impl Storage { } => { let mut era_ids = HashSet::new(); let txn = self.block_store.checkout_ro()?; - for transaction_hash in transaction_hashes.iter() { + for transaction_hash in &transaction_hashes { let maybe_block_info: Option = txn.read(*transaction_hash)?; if let Some(block_info) = maybe_block_info { @@ -670,7 +670,7 @@ impl Storage { responder, } => { let ro_txn = self.block_store.checkout_ro()?; - let maybe_transaction = match self.get_transaction_with_finalized_approvals( + let maybe_transaction = match Self::get_transaction_with_finalized_approvals( &ro_txn, &transaction_id.transaction_hash(), )? { @@ -694,9 +694,10 @@ impl Storage { let ro_txn = self.block_store.checkout_ro()?; let transaction = if with_finalized_approvals { - match self - .get_transaction_with_finalized_approvals(&ro_txn, &transaction_hash)? - { + match Self::get_transaction_with_finalized_approvals( + &ro_txn, + &transaction_hash, + )? { Some((transaction, maybe_approvals)) => { if let Some(approvals) = maybe_approvals { transaction.with_approvals(approvals) @@ -747,9 +748,10 @@ impl Storage { } => { let txn = self.block_store.checkout_ro()?; responder - .respond( - self.get_execution_results_with_transaction_headers(&txn, &block_hash)?, - ) + .respond(Self::get_execution_results_with_transaction_headers( + &txn, + &block_hash, + )?) .ignore() } StorageRequest::GetBlockExecutionResultsOrChunk { id, responder } => responder @@ -793,7 +795,7 @@ impl Storage { only_from_available_block_range, responder, } => { - if !(self.should_return_block(block_height, only_from_available_block_range)?) { + if !(self.should_return_block(block_height, only_from_available_block_range)) { return Ok(responder.respond(None).ignore()); } @@ -1023,7 +1025,7 @@ impl Storage { block_height: u64, only_from_available_block_range: bool, ) -> Result, FatalStorageError> { - if !(self.should_return_block(block_height, only_from_available_block_range)?) { + if !(self.should_return_block(block_height, only_from_available_block_range)) { Ok(None) } else { let txn = self.block_store.checkout_ro()?; @@ -1050,7 +1052,7 @@ impl Storage { transaction_hashes .map(|transaction_hash| { - self.get_transaction_with_finalized_approvals(&ro_txn, transaction_hash) + Self::get_transaction_with_finalized_approvals(&ro_txn, transaction_hash) .map_err(FatalStorageError::from) }) .collect() @@ -1202,9 +1204,7 @@ impl Storage { /// Retrieves the height of the highest complete block (if any). pub(crate) fn highest_complete_block_height(&self) -> Option { - self.completed_blocks - .highest_sequence() - .map(|sequence| sequence.high()) + self.completed_blocks.highest_sequence().map(Sequence::high) } /// Retrieves the contiguous segment of the block chain starting at the highest known switch @@ -1337,7 +1337,7 @@ impl Storage { .into_iter() .flatten() { - transactions.push(transaction) + transactions.push(transaction); } Ok(Some((block, transactions))) @@ -1424,7 +1424,7 @@ impl Storage { None => return Ok(None), }; - if !(self.should_return_block(block_header.height(), only_from_available_block_range)?) { + if !(self.should_return_block(block_header.height(), only_from_available_block_range)) { return Ok(None); } @@ -1506,7 +1506,7 @@ impl Storage { )), }, }; - result.push(SignedBlockHeader::new(block_header, block_signatures)) + result.push(SignedBlockHeader::new(block_header, block_signatures)); } None => return Ok(None), } @@ -1615,7 +1615,7 @@ impl Storage { let transaction_hash = TransactionHash::from(deploy_hash); let txn = self.block_store.checkout_ro()?; let transaction = - match self.get_transaction_with_finalized_approvals(&txn, &transaction_hash)? { + match Self::get_transaction_with_finalized_approvals(&txn, &transaction_hash)? { Some((transaction, maybe_approvals)) => { if let Some(approvals) = maybe_approvals { transaction.with_approvals(approvals) @@ -1695,7 +1695,6 @@ impl Storage { /// Retrieves a single transaction along with its finalized approvals. #[allow(clippy::type_complexity)] fn get_transaction_with_finalized_approvals( - &self, txn: &(impl DataReader + DataReader>), transaction_hash: &TransactionHash, @@ -1827,11 +1826,11 @@ impl Storage { &self, block_height: u64, only_from_available_block_range: bool, - ) -> Result { + ) -> bool { if only_from_available_block_range { - Ok(self.get_available_block_range().contains(block_height)) + self.get_available_block_range().contains(block_height) } else { - Ok(true) + true } } @@ -1903,7 +1902,7 @@ impl Storage { ) -> Result, FatalStorageError> { let txn = self.block_store.checkout_ro()?; - let execution_results = match self.get_execution_results(&txn, request.block_hash())? { + let execution_results = match Self::get_execution_results(&txn, request.block_hash())? { Some(execution_results) => execution_results .into_iter() .map(|(_deploy_hash, execution_result)| execution_result) @@ -1952,7 +1951,6 @@ impl Storage { } fn get_execution_results( - &self, txn: &(impl DataReader + DataReader), block_hash: &BlockHash, ) -> Result>, FatalStorageError> { @@ -1991,14 +1989,13 @@ impl Storage { #[allow(clippy::type_complexity)] fn get_execution_results_with_transaction_headers( - &self, txn: &(impl DataReader + DataReader + DataReader), block_hash: &BlockHash, ) -> Result>, FatalStorageError> { - let execution_results = match self.get_execution_results(txn, block_hash)? { + let execution_results = match Self::get_execution_results(txn, block_hash)? { Some(execution_results) => execution_results, None => return Ok(None), }; @@ -2077,14 +2074,15 @@ fn should_move_storage_files_to_network_subdir( let mut files_found = vec![]; let mut files_not_found = vec![]; - file_names.iter().for_each(|file_name| { + for file_name in file_names { let file_path = root.join(file_name); - match file_path.exists() { - true => files_found.push(file_path), - false => files_not_found.push(file_path), + if file_path.exists() { + files_found.push(file_path); + } else { + files_not_found.push(file_path); } - }); + } let should_move_files = files_found.len() == file_names.len(); @@ -2179,7 +2177,7 @@ impl Storage { .block_store .checkout_ro() .expect("could not create RO transaction"); - self.get_transaction_with_finalized_approvals(&txn, transaction_hash) + Self::get_transaction_with_finalized_approvals(&txn, transaction_hash) .expect("could not retrieve a transaction with finalized approvals from storage") } @@ -2281,10 +2279,7 @@ impl Storage { .expect("should create ro txn"); let block: Block = ro_txn.read(block_hash).expect("should read block")?; - if !(self - .should_return_block(block.height(), only_from_available_block_range) - .expect("should check if block is in range")) - { + if !(self.should_return_block(block.height(), only_from_available_block_range)) { return None; } if block_hash != *block.hash() { @@ -2313,10 +2308,7 @@ impl Storage { height: u64, only_from_available_block_range: bool, ) -> Option { - if !(self - .should_return_block(height, only_from_available_block_range) - .expect("should check if block is in range")) - { + if !(self.should_return_block(height, only_from_available_block_range)) { return None; } let ro_txn = self diff --git a/node/src/components/storage/tests.rs b/node/src/components/storage/tests.rs index 962ea3979c..7b7f64abe8 100644 --- a/node/src/components/storage/tests.rs +++ b/node/src/components/storage/tests.rs @@ -2147,51 +2147,23 @@ fn should_restrict_returned_blocks() { // Without restriction, the node should attempt to return any requested block // regardless if it is in the disjoint sequences. - assert!(storage - .should_return_block(0, false) - .expect("should return block failed")); - assert!(storage - .should_return_block(1, false) - .expect("should return block failed")); - assert!(storage - .should_return_block(2, false) - .expect("should return block failed")); - assert!(storage - .should_return_block(3, false) - .expect("should return block failed")); - assert!(storage - .should_return_block(4, false) - .expect("should return block failed")); - assert!(storage - .should_return_block(5, false) - .expect("should return block failed")); - assert!(storage - .should_return_block(6, false) - .expect("should return block failed")); + assert!(storage.should_return_block(0, false)); + assert!(storage.should_return_block(1, false)); + assert!(storage.should_return_block(2, false)); + assert!(storage.should_return_block(3, false)); + assert!(storage.should_return_block(4, false)); + assert!(storage.should_return_block(5, false)); + assert!(storage.should_return_block(6, false)); // With restriction, the node should attempt to return only the blocks that are // on the highest disjoint sequence, i.e blocks 4 and 5 only. - assert!(!storage - .should_return_block(0, true) - .expect("should return block failed")); - assert!(!storage - .should_return_block(1, true) - .expect("should return block failed")); - assert!(!storage - .should_return_block(2, true) - .expect("should return block failed")); - assert!(!storage - .should_return_block(3, true) - .expect("should return block failed")); - assert!(storage - .should_return_block(4, true) - .expect("should return block failed")); - assert!(storage - .should_return_block(5, true) - .expect("should return block failed")); - assert!(!storage - .should_return_block(6, true) - .expect("should return block failed")); + assert!(!storage.should_return_block(0, true)); + assert!(!storage.should_return_block(1, true)); + assert!(!storage.should_return_block(2, true)); + assert!(!storage.should_return_block(3, true)); + assert!(storage.should_return_block(4, true)); + assert!(storage.should_return_block(5, true)); + assert!(!storage.should_return_block(6, true)); } #[test] From 9000755814918d1895eb363657bd84b8a35f63d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Ciarcin=CC=81ski?= Date: Thu, 19 Sep 2024 11:46:13 +0200 Subject: [PATCH 7/7] Revert the changes regardin join_handle to fix upgrade --- node/src/components/network.rs | 31 +++++++---------------- node/src/components/network/tasks.rs | 37 ++++++++++++---------------- node/src/components/rest_server.rs | 29 ++++++---------------- node/src/utils.rs | 11 +++++---- 4 files changed, 39 insertions(+), 69 deletions(-) diff --git a/node/src/components/network.rs b/node/src/components/network.rs index 9aabc8a980..14067acf28 100644 --- a/node/src/components/network.rs +++ b/node/src/components/network.rs @@ -57,7 +57,6 @@ use std::{ }; use datasize::DataSize; -#[cfg(test)] use futures::{future::BoxFuture, FutureExt}; use itertools::Itertools; use prometheus::Registry; @@ -66,14 +65,13 @@ use rand::{ Rng, }; use serde::{Deserialize, Serialize}; -#[cfg(test)] -use tokio::task::JoinHandle; use tokio::{ net::TcpStream, sync::{ mpsc::{self, UnboundedSender}, watch, }, + task::JoinHandle, }; use tokio_openssl::SslStream; use tokio_util::codec::LengthDelimitedCodec; @@ -108,8 +106,6 @@ use self::{ symmetry::ConnectionSymmetry, tasks::{MessageQueueItem, NetworkContext}, }; -#[cfg(test)] -use crate::reactor::Finalize; use crate::{ components::{gossiper::GossipItem, Component, ComponentState, InitializedComponent}, effect::{ @@ -117,7 +113,7 @@ use crate::{ requests::{BeginGossipRequest, NetworkInfoRequest, NetworkRequest, StorageRequest}, AutoClosingResponder, EffectBuilder, EffectExt, Effects, GossipTarget, }, - reactor::ReactorEvent, + reactor::{Finalize, ReactorEvent}, tls, types::{NodeId, ValidatorMatrix}, utils::{self, display_error, Source}, @@ -126,9 +122,7 @@ use crate::{ const COMPONENT_NAME: &str = "network"; -#[cfg(test)] const MAX_METRICS_DROP_ATTEMPTS: usize = 25; -#[cfg(test)] const DROP_RETRY_DELAY: Duration = Duration::from_millis(100); /// How often to keep attempting to reconnect to a node before giving up. Note that reconnection @@ -216,17 +210,15 @@ struct ChannelManagement { /// Channel signaling a shutdown of the network. // Note: This channel is closed when `Network` is dropped, signalling the receivers that // they should cease operation. - #[cfg(test)] shutdown_sender: Option>, /// Join handle for the server thread. - #[cfg(test)] server_join_handle: Option>, /// Channel signaling a shutdown of the incoming connections. // Note: This channel is closed when we finished syncing, so the `Network` can close all // connections. When they are re-established, the proper value of the now updated `is_syncing` // flag will be exchanged on handshake. - _close_incoming_sender: Option>, + close_incoming_sender: Option>, /// Handle used by the `message_reader` task to receive a notification that incoming /// connections should be closed. close_incoming_receiver: watch::Receiver<()>, @@ -360,27 +352,23 @@ where // which we need to shutdown cleanly later on. info!(%local_addr, %public_addr, %protocol_version, "starting server background task"); - #[cfg(test)] let (server_shutdown_sender, server_shutdown_receiver) = watch::channel(()); - let (_close_incoming_sender, close_incoming_receiver) = watch::channel(()); + let (close_incoming_sender, close_incoming_receiver) = watch::channel(()); let context = self.context.clone(); - let _server_join_handle = tokio::spawn( + let server_join_handle = tokio::spawn( tasks::server( context, tokio::net::TcpListener::from_std(listener).map_err(Error::ListenerConversion)?, - #[cfg(test)] server_shutdown_receiver, ) .in_current_span(), ); let channel_management = ChannelManagement { - #[cfg(test)] shutdown_sender: Some(server_shutdown_sender), - #[cfg(test)] - server_join_handle: Some(_server_join_handle), - _close_incoming_sender: Some(_close_incoming_sender), + server_join_handle: Some(server_join_handle), + close_incoming_sender: Some(close_incoming_sender), close_incoming_receiver, }; @@ -996,7 +984,7 @@ where ret.insert(node_id, connection.peer_addr.to_string()); } else { // This should never happen unless the state of `OutgoingManager` is corrupt. - warn!(%node_id, "route disappeared unexpectedly"); + warn!(%node_id, "route disappeared unexpectedly") } } @@ -1038,7 +1026,6 @@ where } } -#[cfg(test)] impl Finalize for Network where REv: Send + 'static, @@ -1049,7 +1036,7 @@ where if let Some(mut channel_management) = self.channel_management.take() { // Close the shutdown socket, causing the server to exit. drop(channel_management.shutdown_sender.take()); - drop(channel_management._close_incoming_sender.take()); + drop(channel_management.close_incoming_sender.take()); // Wait for the server to exit cleanly. if let Some(join_handle) = channel_management.server_join_handle.take() { diff --git a/node/src/components/network/tasks.rs b/node/src/components/network/tasks.rs index 952693cb40..f2a637a384 100644 --- a/node/src/components/network/tasks.rs +++ b/node/src/components/network/tasks.rs @@ -28,7 +28,7 @@ use prometheus::IntGauge; use rand::Rng; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tokio::{ - net::{TcpListener, TcpStream}, + net::TcpStream, sync::{mpsc::UnboundedReceiver, watch, Semaphore}, }; use tokio_openssl::SslStream; @@ -581,8 +581,8 @@ where /// Runs the server core acceptor loop. pub(super) async fn server( context: Arc>, - listener: TcpListener, - #[cfg(test)] mut shutdown_receiver: watch::Receiver<()>, + listener: tokio::net::TcpListener, + mut shutdown_receiver: watch::Receiver<()>, ) where REv: From> + Send, P: Payload, @@ -608,7 +608,8 @@ pub(super) async fn server( let handler_span = span.clone(); tokio::spawn( async move { - let incoming = handle_incoming(context, stream, peer_addr).await; + let incoming = + handle_incoming(context.clone(), stream, peer_addr).await; event_queue .schedule( Event::IncomingConnection { @@ -630,29 +631,23 @@ pub(super) async fn server( // // The code in its current state will consume 100% CPU if local resource // exhaustion happens, as no distinction is made and no delay introduced. - Err(err) => { - warn!(%context.our_id, err=display_error(&err), "dropping incoming connection during accept"); + Err(ref err) => { + warn!(%context.our_id, err=display_error(err), "dropping incoming connection during accept") } } } }; - #[cfg(not(test))] - accept_connections.await; + let shutdown_messages = async move { while shutdown_receiver.changed().await.is_ok() {} }; - #[cfg(test)] - { - let shutdown_messages = async move { while shutdown_receiver.changed().await.is_ok() {} }; - - // Now we can wait for either the `shutdown` channel's remote end to do be dropped or the - // infinite loop to terminate, which never happens. - match future::select(Box::pin(shutdown_messages), Box::pin(accept_connections)).await { - Either::Left(_) => info!( - %context.our_id, - "shutting down socket, no longer accepting incoming connections" - ), - Either::Right(_) => unreachable!(), - } + // Now we can wait for either the `shutdown` channel's remote end to do be dropped or the + // infinite loop to terminate, which never happens. + match future::select(Box::pin(shutdown_messages), Box::pin(accept_connections)).await { + Either::Left(_) => info!( + %context.our_id, + "shutting down socket, no longer accepting incoming connections" + ), + Either::Right(_) => unreachable!(), } } diff --git a/node/src/components/rest_server.rs b/node/src/components/rest_server.rs index 7d15783a99..d5c2d7bce4 100644 --- a/node/src/components/rest_server.rs +++ b/node/src/components/rest_server.rs @@ -28,22 +28,14 @@ mod info; use std::{net::SocketAddr, sync::Arc}; use datasize::DataSize; -use futures::join; -#[cfg(test)] -use futures::{future::BoxFuture, FutureExt}; +use futures::{future::BoxFuture, join, FutureExt}; use once_cell::sync::OnceCell; -use tokio::sync::oneshot; -#[cfg(test)] -use tokio::task::JoinHandle; -#[cfg(test)] -use tracing::debug; -use tracing::{error, info, warn}; +use tokio::{sync::oneshot, task::JoinHandle}; +use tracing::{debug, error, info, warn}; use casper_types::ProtocolVersion; use super::{Component, ComponentState, InitializedComponent}; -#[cfg(test)] -use crate::reactor::Finalize; use crate::{ components::PortBoundComponent, effect::{ @@ -54,7 +46,7 @@ use crate::{ }, EffectBuilder, EffectExt, Effects, }, - reactor::main_reactor::MainEvent, + reactor::{main_reactor::MainEvent, Finalize}, types::{ChainspecInfo, StatusFeed}, utils::{self, ListeningError}, NodeRng, @@ -102,13 +94,11 @@ impl ReactorEventT for REv where #[derive(DataSize, Debug)] pub(crate) struct InnerRestServer { /// When the message is sent, it signals the server loop to exit cleanly. - #[cfg(test)] #[data_size(skip)] shutdown_sender: oneshot::Sender<()>, /// The address the server is listening on. local_addr: Arc>, /// The task handle which will only join once the server loop has exited. - #[cfg(test)] #[data_size(skip)] server_join_handle: Option>, /// The network name, as specified in the chainspec @@ -293,12 +283,12 @@ where effect_builder: EffectBuilder, ) -> Result, Self::Error> { let cfg = &self.config; - let (_shutdown_sender, shutdown_receiver) = oneshot::channel::<()>(); + let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>(); let builder = utils::start_listening(&cfg.address)?; let local_addr: Arc> = Default::default(); - let _server_join_handle = if cfg.cors_origin.is_empty() { + let server_join_handle = if cfg.cors_origin.is_empty() { Some(tokio::spawn(http_server::run( builder, effect_builder, @@ -322,10 +312,8 @@ where let network_name = self.network_name.clone(); self.inner_rest = Some(InnerRestServer { local_addr, - #[cfg(test)] - shutdown_sender: _shutdown_sender, - #[cfg(test)] - server_join_handle: _server_join_handle, + shutdown_sender, + server_join_handle, network_name, }); @@ -333,7 +321,6 @@ where } } -#[cfg(test)] impl Finalize for RestServer { fn finalize(self) -> BoxFuture<'static, ()> { async { diff --git a/node/src/utils.rs b/node/src/utils.rs index 72d03fa7ef..b9d372a515 100644 --- a/node/src/utils.rs +++ b/node/src/utils.rs @@ -15,16 +15,18 @@ pub(crate) mod specimen; pub(crate) mod umask; pub mod work_queue; -#[cfg(test)] -use std::{any, sync::Arc, time::Duration}; use std::{ + any, fmt::{self, Debug, Display, Formatter}, io, net::{SocketAddr, ToSocketAddrs}, ops::{Add, BitXorAssign, Div}, path::{Path, PathBuf}, - sync::atomic::{AtomicBool, Ordering}, - time::{Instant, SystemTime}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::{Duration, Instant, SystemTime}, }; use datasize::DataSize; @@ -358,7 +360,6 @@ pub(crate) fn xor(lhs: &mut [u8], rhs: &[u8]) { /// /// Using this function is usually a potential architectural issue and it should be used very /// sparingly. Consider introducing a different access pattern for the value under `Arc`. -#[cfg(test)] pub(crate) async fn wait_for_arc_drop( arc: Arc, attempts: usize,