diff --git a/node/src/reactor/main_reactor/control.rs b/node/src/reactor/main_reactor/control.rs index 026ef01c7f..8de6a3394a 100644 --- a/node/src/reactor/main_reactor/control.rs +++ b/node/src/reactor/main_reactor/control.rs @@ -61,6 +61,17 @@ impl MainReactor { match self.initialize_next_component(effect_builder) { Some(effects) => (initialization_logic_default_delay.into(), effects), None => { + if self.sync_handling.is_isolated() { + // If node is "isolated" it doesn't care about peers + if let Err(msg) = self.refresh_contract_runtime() { + return ( + Duration::ZERO, + fatal!(effect_builder, "{}", msg).ignore(), + ); + } + self.state = ReactorState::KeepUp; + return (Duration::ZERO, Effects::new()); + } if false == self.net.has_sufficient_fully_connected_peers() { info!("Initialize: awaiting sufficient fully-connected peers"); return (initialization_logic_default_delay.into(), Effects::new()); diff --git a/node/src/reactor/main_reactor/keep_up.rs b/node/src/reactor/main_reactor/keep_up.rs index 5344f6508d..63c288c9cb 100644 --- a/node/src/reactor/main_reactor/keep_up.rs +++ b/node/src/reactor/main_reactor/keep_up.rs @@ -236,16 +236,24 @@ impl MainReactor { ) -> Option { match sync_instruction { SyncInstruction::Leap { .. } | SyncInstruction::LeapIntervalElapsed { .. } => { - // the block accumulator is unsure what our block position is relative to the - // network and wants to check peers for their notion of current tip. - // to do this, we switch back to CatchUp which will engage the necessary - // machinery to poll the network via the SyncLeap mechanic. if it turns out - // we are actually at or near tip after all, we simply switch back to KeepUp - // and continue onward. the accumulator is designed to periodically do this - // if we've received no gossip about new blocks from peers within an interval. - // this is to protect against partitioning and is not problematic behavior - // when / if it occurs. - Some(KeepUpInstruction::CatchUp) + if !self.sync_handling.is_isolated() { + // the block accumulator is unsure what our block position is relative to the + // network and wants to check peers for their notion of current tip. + // to do this, we switch back to CatchUp which will engage the necessary + // machinery to poll the network via the SyncLeap mechanic. if it turns out + // we are actually at or near tip after all, we simply switch back to KeepUp + // and continue onward. the accumulator is designed to periodically do this + // if we've received no gossip about new blocks from peers within an interval. + // this is to protect against partitioning and is not problematic behavior + // when / if it occurs. + Some(KeepUpInstruction::CatchUp) + } else { + // If the node operates in isolated mode the assumption is that it might not + // have any peers. So going back to CatchUp to query their + // notion of tip might effectively disable nodes components to respond. + // That's why - for isolated mode - we bypass this mechanism. + None + } } SyncInstruction::BlockSync { block_hash } => { debug!("KeepUp: BlockSync: {:?}", block_hash); diff --git a/node/src/reactor/main_reactor/tests.rs b/node/src/reactor/main_reactor/tests.rs index 25ae0641b7..dba0ec477c 100644 --- a/node/src/reactor/main_reactor/tests.rs +++ b/node/src/reactor/main_reactor/tests.rs @@ -2,7 +2,7 @@ mod binary_port; mod transactions; use std::{ - collections::{BTreeMap, BTreeSet}, + collections::{BTreeMap, BTreeSet, HashMap}, convert::TryFrom, iter, net::SocketAddr, @@ -11,13 +11,22 @@ use std::{ time::Duration, }; +use casper_binary_port::{ + BinaryMessage, BinaryMessageCodec, BinaryRequest, BinaryRequestHeader, InformationRequest, + Uptime, +}; use either::Either; +use futures::{SinkExt, StreamExt}; use num::Zero; use num_rational::Ratio; use num_traits::One; use rand::Rng; use tempfile::TempDir; -use tokio::time::{self, error::Elapsed}; +use tokio::{ + net::TcpStream, + time::{self, error::Elapsed, timeout}, +}; +use tokio_util::codec::Framed; use tracing::{error, info}; use casper_storage::{ @@ -29,6 +38,7 @@ use casper_storage::{ global_state::state::{StateProvider, StateReader}, }; use casper_types::{ + bytesrepr::{FromBytes, ToBytes}, execution::{ExecutionResult, ExecutionResultV2, TransformKindV2, TransformV2}, system::{ auction::{BidAddr, BidKind, BidsExt, DelegationRate, DelegatorKind}, @@ -121,6 +131,7 @@ struct ConfigsOverride { chain_name: Option, gas_hold_balance_handling: Option, transaction_v1_override: Option, + node_config_override: NodeConfigOverride, } impl ConfigsOverride { @@ -209,6 +220,11 @@ impl ConfigsOverride { } } +#[derive(Clone, Default)] +struct NodeConfigOverride { + sync_handling_override: Option, +} + impl Default for ConfigsOverride { fn default() -> Self { ConfigsOverride { @@ -237,6 +253,7 @@ impl Default for ConfigsOverride { chain_name: None, gas_hold_balance_handling: None, transaction_v1_override: None, + node_config_override: NodeConfigOverride::default(), } } } @@ -357,6 +374,7 @@ impl TestFixture { chain_name, gas_hold_balance_handling, transaction_v1_override, + node_config_override, } = spec_override.unwrap_or_default(); if era_duration != TimeDiff::from_millis(0) { chainspec.core_config.era_duration = era_duration; @@ -422,8 +440,12 @@ impl TestFixture { }; for secret_key in secret_keys { - let (config, storage_dir) = - fixture.create_node_config(secret_key.as_ref(), None, storage_multiplier); + let (config, storage_dir) = fixture.create_node_config( + secret_key.as_ref(), + None, + storage_multiplier, + node_config_override.clone(), + ); fixture.add_node(secret_key, config, storage_dir).await; } @@ -545,6 +567,7 @@ impl TestFixture { secret_key: &SecretKey, maybe_trusted_hash: Option, storage_multiplier: u8, + node_config_override: NodeConfigOverride, ) -> (Config, TempDir) { // Set the network configuration. let network_cfg = match self.node_contexts.first() { @@ -569,6 +592,12 @@ impl TestFixture { }, ..Default::default() }; + let NodeConfigOverride { + sync_handling_override, + } = node_config_override; + if let Some(sync_handling) = sync_handling_override { + cfg.node.sync_handling = sync_handling; + } // Additionally set up storage in a temporary directory. let (storage_cfg, temp_dir) = storage::Config::new_for_tests(storage_multiplier); @@ -1156,7 +1185,12 @@ async fn historical_sync_with_era_height_1() { // Create a joiner node. let secret_key = SecretKey::random(&mut fixture.rng); let trusted_hash = *fixture.highest_complete_block().hash(); - let (mut config, storage_dir) = fixture.create_node_config(&secret_key, Some(trusted_hash), 1); + let (mut config, storage_dir) = fixture.create_node_config( + &secret_key, + Some(trusted_hash), + 1, + NodeConfigOverride::default(), + ); config.node.sync_handling = SyncHandling::Genesis; let joiner_id = fixture .add_node(Arc::new(secret_key), config, storage_dir) @@ -1213,7 +1247,12 @@ async fn should_not_historical_sync_no_sync_node() { ); info!("joining node using block {trusted_height} {trusted_hash}"); let secret_key = SecretKey::random(&mut fixture.rng); - let (mut config, storage_dir) = fixture.create_node_config(&secret_key, Some(trusted_hash), 1); + let (mut config, storage_dir) = fixture.create_node_config( + &secret_key, + Some(trusted_hash), + 1, + NodeConfigOverride::default(), + ); config.node.sync_handling = SyncHandling::NoSync; let joiner_id = fixture .add_node(Arc::new(secret_key), config, storage_dir) @@ -1297,7 +1336,12 @@ async fn should_catch_up_and_shutdown() { info!("joining node using block {trusted_height} {trusted_hash}"); let secret_key = SecretKey::random(&mut fixture.rng); - let (mut config, storage_dir) = fixture.create_node_config(&secret_key, Some(trusted_hash), 1); + let (mut config, storage_dir) = fixture.create_node_config( + &secret_key, + Some(trusted_hash), + 1, + NodeConfigOverride::default(), + ); config.node.sync_handling = SyncHandling::CompleteBlock; let joiner_id = fixture .add_node(Arc::new(secret_key), config, storage_dir) @@ -1339,6 +1383,99 @@ async fn should_catch_up_and_shutdown() { ); } +fn network_is_in_keepup( + nodes: &HashMap>>>, +) -> bool { + nodes + .values() + .all(|node| node.reactor().inner().inner().state == ReactorState::KeepUp) +} + +const MESSAGE_SIZE: u32 = 1024 * 1024 * 10; + +async fn setup_network_and_get_binary_port_handle( + initial_stakes: InitialStakes, + spec_override: ConfigsOverride, +) -> ( + Framed, + impl futures::Future>, TestRng)>, +) { + let mut fixture = timeout( + Duration::from_secs(10), + TestFixture::new(initial_stakes, Some(spec_override)), + ) + .await + .unwrap(); + let mut rng = fixture.rng_mut().create_child(); + let net = fixture.network_mut(); + net.settle_on(&mut rng, network_is_in_keepup, Duration::from_secs(59)) + .await; + let (_, first_node) = net + .nodes() + .iter() + .next() + .expect("should have at least one node"); + let binary_port_addr = first_node + .main_reactor() + .binary_port + .bind_address() + .unwrap(); + let finish_cranking = fixture.run_until_stopped(rng.create_child()); + let address = format!("localhost:{}", binary_port_addr.port()); + let stream = TcpStream::connect(address.clone()) + .await + .expect("should create stream"); + let client = Framed::new(stream, BinaryMessageCodec::new(MESSAGE_SIZE)); + (client, finish_cranking) +} + +#[tokio::test] +async fn should_start_in_isolation() { + let initial_stakes = InitialStakes::Random { count: 1 }; + let spec_override = ConfigsOverride { + node_config_override: NodeConfigOverride { + sync_handling_override: Some(SyncHandling::Isolated), + }, + ..Default::default() + }; + let (mut client, finish_cranking) = + setup_network_and_get_binary_port_handle(initial_stakes, spec_override).await; + let request = BinaryRequest::Get( + InformationRequest::Uptime + .try_into() + .expect("should convert"), + ); + let header = + BinaryRequestHeader::new(ProtocolVersion::from_parts(2, 0, 0), request.tag(), 1_u16); + let header_bytes = ToBytes::to_bytes(&header).expect("should serialize"); + let original_request_bytes = header_bytes + .iter() + .chain( + ToBytes::to_bytes(&request) + .expect("should serialize") + .iter(), + ) + .cloned() + .collect::>(); + client + .send(BinaryMessage::new(original_request_bytes.clone())) + .await + .expect("should send message"); + let response = timeout(Duration::from_secs(10), client.next()) + .await + .unwrap_or_else(|err| panic!("should complete without timeout: {}", err)) + .unwrap_or_else(|| panic!("should have bytes")) + .unwrap_or_else(|err| panic!("should have ok response: {}", err)); + let uptime: Uptime = FromBytes::from_bytes(response.payload()) + .expect("Uptime should be deserializable") + .0; + assert!(uptime.into_inner() > 0); + + let (_net, _rng) = timeout(Duration::from_secs(10), finish_cranking) + .await + .unwrap_or_else(|_| panic!("should finish cranking without timeout")); +} + #[tokio::test] async fn run_equivocator_network() { let mut rng = crate::new_rng(); diff --git a/node/src/types/node_config.rs b/node/src/types/node_config.rs index cec55c6437..617664efd7 100644 --- a/node/src/types/node_config.rs +++ b/node/src/types/node_config.rs @@ -23,6 +23,9 @@ pub enum SyncHandling { /// Don't attempt to sync historical blocks and shut down node instead of switching to KeepUp /// after acquiring the first complete block CompleteBlock, + /// The node operates in isolation - no peers are needed, the node won't wait for peers to + /// switch to KeepUp. + Isolated, } impl SyncHandling { @@ -45,6 +48,11 @@ impl SyncHandling { pub fn is_complete_block(&self) -> bool { matches!(self, SyncHandling::CompleteBlock) } + + /// Isolated? + pub fn is_isolated(&self) -> bool { + matches!(self, SyncHandling::Isolated) + } } /// Node fast-sync configuration.