diff --git a/node/CHANGELOG.md b/node/CHANGELOG.md index f613a469f0..a11531c278 100644 --- a/node/CHANGELOG.md +++ b/node/CHANGELOG.md @@ -14,6 +14,7 @@ All notable changes to this project will be documented in this file. The format ### Added * Add `BinaryPort` interface along with the relevant config entries. * Added chainspec settings `finders_fee`, `finality_signature_proportion` and `signature_rewards_max_delay` to control behavior of the new seigniorage model. +* Isolated sync handling, which comes online with only local data and rejects peers. Useful for testing, auditing, and similar scenarios. ### Changed * All SSE events are emitted via the `/events` endpoint. None of the previous ones (`/events/main`, `/events/deploys`, and `/events/sigs`) is available any longer. diff --git a/node/src/components/network.rs b/node/src/components/network.rs index d1a96ede50..c141c660a1 100644 --- a/node/src/components/network.rs +++ b/node/src/components/network.rs @@ -248,6 +248,7 @@ where registry: &Registry, chain_info_source: C, validator_matrix: ValidatorMatrix, + allow_handshake: bool, ) -> Result> { let net_metrics = Arc::new(Metrics::new(registry)?); @@ -285,6 +286,7 @@ where node_key_pair.map(NodeKeyPair::new), chain_info_source.into(), &net_metrics, + allow_handshake, )); let component = Network { @@ -662,6 +664,7 @@ where | ConnectionError::TlsHandshake(_) | ConnectionError::HandshakeSend(_) | ConnectionError::HandshakeRecv(_) + | ConnectionError::HandshakeNotAllowed | ConnectionError::IncompatibleVersion(_) => None, // These errors are potential bugs on our side. @@ -1121,6 +1124,10 @@ where { type Event = Event

; + fn name(&self) -> &str { + COMPONENT_NAME + } + fn handle_event( &mut self, effect_builder: EffectBuilder, @@ -1281,10 +1288,6 @@ where }, } } - - fn name(&self) -> &str { - COMPONENT_NAME - } } impl InitializedComponent for Network diff --git a/node/src/components/network/error.rs b/node/src/components/network/error.rs index 2a641c99a3..ef7e4bdf81 100644 --- a/node/src/components/network/error.rs +++ b/node/src/components/network/error.rs @@ -196,6 +196,9 @@ pub enum ConnectionError { /// This is usually a bug. #[error("handshake sink/stream could not be reunited")] FailedToReuniteHandshakeSinkAndStream, + /// Handshake not allowed (Isolated mode) + #[error("handshake not allowed (Isolated mode)")] + HandshakeNotAllowed, } /// IO operation that can time out or close. diff --git a/node/src/components/network/tasks.rs b/node/src/components/network/tasks.rs index f2a637a384..f4ddcd56cc 100644 --- a/node/src/components/network/tasks.rs +++ b/node/src/components/network/tasks.rs @@ -236,6 +236,8 @@ where max_in_flight_demands: usize, /// Flag indicating whether this node is syncing. is_syncing: AtomicBool, + /// If false, will not allow handshake. + allow_handshake: bool, } impl NetworkContext { @@ -245,6 +247,7 @@ impl NetworkContext { node_key_pair: Option, chain_info: ChainInfo, net_metrics: &Arc, + allow_handshake: bool, ) -> Self { // Set the demand max from configuration, regarding `0` as "unlimited". let max_in_flight_demands = if cfg.max_in_flight_demands == 0 { @@ -277,6 +280,7 @@ impl NetworkContext { tarpit_chance: cfg.tarpit_chance, max_in_flight_demands, is_syncing: AtomicBool::new(false), + allow_handshake, } } @@ -363,6 +367,14 @@ where peer_consensus_public_key, is_peer_syncing: _, }) => { + if !context.allow_handshake { + return IncomingConnection::Failed { + peer_addr, + peer_id, + error: ConnectionError::HandshakeNotAllowed, + }; + } + if let Some(ref public_key) = peer_consensus_public_key { Span::current().record("consensus_key", &field::display(public_key)); } diff --git a/node/src/components/network/tests.rs b/node/src/components/network/tests.rs index b2c43aaf56..73bd10a1ab 100644 --- a/node/src/components/network/tests.rs +++ b/node/src/components/network/tests.rs @@ -21,13 +21,13 @@ use tracing::{debug, info}; use casper_types::{Chainspec, ChainspecRawBytes, SecretKey}; use super::{ - chain_info::ChainInfo, Config, Event as NetworkEvent, FromIncoming, GossipedAddress, Identity, + chain_info::ChainInfo, Event as NetworkEvent, FromIncoming, GossipedAddress, Identity, MessageKind, Network, Payload, }; use crate::{ components::{ gossiper::{self, GossipItem, Gossiper}, - Component, InitializedComponent, + network, Component, InitializedComponent, }, effect::{ announcements::{ControlAnnouncement, GossiperAnnouncement, PeerBehaviorAnnouncement}, @@ -39,13 +39,13 @@ use crate::{ EffectBuilder, Effects, }, protocol, - reactor::{self, EventQueueHandle, Finalize, Reactor, Runner}, + reactor::{self, main_reactor::Config, EventQueueHandle, Finalize, Reactor, Runner}, testing::{ self, init_logging, network::{NetworkedReactor, Nodes, TestingNetwork}, ConditionCheckReactor, }, - types::{NodeId, ValidatorMatrix}, + types::{NodeId, SyncHandling, ValidatorMatrix}, NodeRng, }; @@ -182,43 +182,6 @@ impl Reactor for TestReactor { type Config = Config; type Error = anyhow::Error; - fn new( - cfg: Self::Config, - _chainspec: Arc, - _chainspec_raw_bytes: Arc, - our_identity: Identity, - registry: &Registry, - _event_queue: EventQueueHandle, - rng: &mut NodeRng, - ) -> anyhow::Result<(Self, Effects)> { - let secret_key = SecretKey::random(rng); - let mut net = Network::new( - cfg, - our_identity, - None, - registry, - ChainInfo::create_for_testing(), - ValidatorMatrix::new_with_validator(Arc::new(secret_key)), - )?; - let gossiper_config = gossiper::Config::new_with_small_timeouts(); - let address_gossiper = Gossiper::<{ GossipedAddress::ID_IS_COMPLETE_ITEM }, _>::new( - "address_gossiper", - gossiper_config, - registry, - )?; - - net.start_initialization(); - let effects = smallvec![async { smallvec![Event::Net(NetworkEvent::Initialize)] }.boxed()]; - - Ok(( - TestReactor { - net, - address_gossiper, - }, - effects, - )) - } - fn dispatch_event( &mut self, effect_builder: EffectBuilder, @@ -277,6 +240,45 @@ impl Reactor for TestReactor { Event::BlocklistAnnouncement(_announcement) => Effects::new(), } } + + fn new( + cfg: Self::Config, + _chainspec: Arc, + _chainspec_raw_bytes: Arc, + our_identity: Identity, + registry: &Registry, + _event_queue: EventQueueHandle, + rng: &mut NodeRng, + ) -> anyhow::Result<(Self, Effects)> { + let secret_key = SecretKey::random(rng); + let allow_handshake = cfg.node.sync_handling != SyncHandling::Isolated; + let mut net = Network::new( + cfg.network.clone(), + our_identity, + None, + registry, + ChainInfo::create_for_testing(), + ValidatorMatrix::new_with_validator(Arc::new(secret_key)), + allow_handshake, + )?; + let gossiper_config = gossiper::Config::new_with_small_timeouts(); + let address_gossiper = Gossiper::<{ GossipedAddress::ID_IS_COMPLETE_ITEM }, _>::new( + "address_gossiper", + gossiper_config, + registry, + )?; + + net.start_initialization(); + let effects = smallvec![async { smallvec![Event::Net(NetworkEvent::Initialize)] }.boxed()]; + + Ok(( + TestReactor { + net, + address_gossiper, + }, + effects, + )) + } } impl NetworkedReactor for TestReactor { @@ -351,13 +353,15 @@ async fn run_two_node_network_five_times() { let mut net = TestingNetwork::new(); let start = Instant::now(); - net.add_node_with_config( - Config::default_local_net_first_node(first_node_port), - &mut rng, - ) - .await - .unwrap(); - net.add_node_with_config(Config::default_local_net(first_node_port), &mut rng) + + let cfg = Config::default().with_network_config( + network::Config::default_local_net_first_node(first_node_port), + ); + net.add_node_with_config(cfg, &mut rng).await.unwrap(); + + let cfg = Config::default() + .with_network_config(network::Config::default_local_net(first_node_port)); + net.add_node_with_config(cfg.clone(), &mut rng) .await .unwrap(); let end = Instant::now(); @@ -417,12 +421,11 @@ async fn bind_to_real_network_interface() { .ip(); let port = testing::unused_port_on_localhost(); - let local_net_config = Config::new((local_addr, port).into()); + let cfg = + Config::default().with_network_config(network::Config::new((local_addr, port).into())); let mut net = TestingNetwork::::new(); - net.add_node_with_config(local_net_config, &mut rng) - .await - .unwrap(); + net.add_node_with_config(cfg, &mut rng).await.unwrap(); // The network should be fully connected. let timeout = Duration::from_secs(2); @@ -452,17 +455,16 @@ async fn check_varying_size_network_connects() { // Pick a random port in the higher ranges that is likely to be unused. let first_node_port = testing::unused_port_on_localhost(); + let cfg = Config::default().with_network_config( + network::Config::default_local_net_first_node(first_node_port), + ); - let _ = net - .add_node_with_config( - Config::default_local_net_first_node(first_node_port), - &mut rng, - ) - .await - .unwrap(); + let _ = net.add_node_with_config(cfg, &mut rng).await.unwrap(); + let cfg = Config::default() + .with_network_config(network::Config::default_local_net(first_node_port)); for _ in 1..number_of_nodes { - net.add_node_with_config(Config::default_local_net(first_node_port), &mut rng) + net.add_node_with_config(cfg.clone(), &mut rng) .await .unwrap(); } @@ -506,16 +508,17 @@ async fn ensure_peers_metric_is_correct() { // Pick a random port in the higher ranges that is likely to be unused. let first_node_port = testing::unused_port_on_localhost(); - let _ = net - .add_node_with_config( - Config::default_local_net_first_node(first_node_port), - &mut rng, - ) - .await - .unwrap(); + let cfg = Config::default().with_network_config( + network::Config::default_local_net_first_node(first_node_port), + ); + + let _ = net.add_node_with_config(cfg, &mut rng).await.unwrap(); + + let cfg = Config::default() + .with_network_config(network::Config::default_local_net(first_node_port)); for _ in 1..number_of_nodes { - net.add_node_with_config(Config::default_local_net(first_node_port), &mut rng) + net.add_node_with_config(cfg.clone(), &mut rng) .await .unwrap(); } diff --git a/node/src/reactor/main_reactor.rs b/node/src/reactor/main_reactor.rs index 0cd799f49f..7ddd3b25dc 100644 --- a/node/src/reactor/main_reactor.rs +++ b/node/src/reactor/main_reactor.rs @@ -1130,6 +1130,8 @@ impl reactor::Reactor for MainReactor { registry, )?; + let allow_handshake = config.node.sync_handling != SyncHandling::Isolated; + let network = Network::new( config.network.clone(), network_identity, @@ -1137,6 +1139,7 @@ impl reactor::Reactor for MainReactor { registry, chainspec.as_ref(), validator_matrix.clone(), + allow_handshake, )?; let address_gossiper = Gossiper::<{ GossipedAddress::ID_IS_COMPLETE_ITEM }, _>::new( diff --git a/node/src/reactor/main_reactor/config.rs b/node/src/reactor/main_reactor/config.rs index 0432a56c63..510eddb48c 100644 --- a/node/src/reactor/main_reactor/config.rs +++ b/node/src/reactor/main_reactor/config.rs @@ -72,4 +72,11 @@ impl Config { chainspec.transaction_config.max_timestamp_leeway; } } + + /// Set network config. + #[cfg(test)] + pub(crate) fn with_network_config(mut self, network_config: NetworkConfig) -> Self { + self.network = network_config; + self + } } diff --git a/node/src/reactor/main_reactor/control.rs b/node/src/reactor/main_reactor/control.rs index 026ef01c7f..8de6a3394a 100644 --- a/node/src/reactor/main_reactor/control.rs +++ b/node/src/reactor/main_reactor/control.rs @@ -61,6 +61,17 @@ impl MainReactor { match self.initialize_next_component(effect_builder) { Some(effects) => (initialization_logic_default_delay.into(), effects), None => { + if self.sync_handling.is_isolated() { + // If node is "isolated" it doesn't care about peers + if let Err(msg) = self.refresh_contract_runtime() { + return ( + Duration::ZERO, + fatal!(effect_builder, "{}", msg).ignore(), + ); + } + self.state = ReactorState::KeepUp; + return (Duration::ZERO, Effects::new()); + } if false == self.net.has_sufficient_fully_connected_peers() { info!("Initialize: awaiting sufficient fully-connected peers"); return (initialization_logic_default_delay.into(), Effects::new()); diff --git a/node/src/reactor/main_reactor/keep_up.rs b/node/src/reactor/main_reactor/keep_up.rs index 5344f6508d..63c288c9cb 100644 --- a/node/src/reactor/main_reactor/keep_up.rs +++ b/node/src/reactor/main_reactor/keep_up.rs @@ -236,16 +236,24 @@ impl MainReactor { ) -> Option { match sync_instruction { SyncInstruction::Leap { .. } | SyncInstruction::LeapIntervalElapsed { .. } => { - // the block accumulator is unsure what our block position is relative to the - // network and wants to check peers for their notion of current tip. - // to do this, we switch back to CatchUp which will engage the necessary - // machinery to poll the network via the SyncLeap mechanic. if it turns out - // we are actually at or near tip after all, we simply switch back to KeepUp - // and continue onward. the accumulator is designed to periodically do this - // if we've received no gossip about new blocks from peers within an interval. - // this is to protect against partitioning and is not problematic behavior - // when / if it occurs. - Some(KeepUpInstruction::CatchUp) + if !self.sync_handling.is_isolated() { + // the block accumulator is unsure what our block position is relative to the + // network and wants to check peers for their notion of current tip. + // to do this, we switch back to CatchUp which will engage the necessary + // machinery to poll the network via the SyncLeap mechanic. if it turns out + // we are actually at or near tip after all, we simply switch back to KeepUp + // and continue onward. the accumulator is designed to periodically do this + // if we've received no gossip about new blocks from peers within an interval. + // this is to protect against partitioning and is not problematic behavior + // when / if it occurs. + Some(KeepUpInstruction::CatchUp) + } else { + // If the node operates in isolated mode the assumption is that it might not + // have any peers. So going back to CatchUp to query their + // notion of tip might effectively disable nodes components to respond. + // That's why - for isolated mode - we bypass this mechanism. + None + } } SyncInstruction::BlockSync { block_hash } => { debug!("KeepUp: BlockSync: {:?}", block_hash); diff --git a/node/src/reactor/main_reactor/tests.rs b/node/src/reactor/main_reactor/tests.rs index 25ae0641b7..57174611f6 100644 --- a/node/src/reactor/main_reactor/tests.rs +++ b/node/src/reactor/main_reactor/tests.rs @@ -2,7 +2,7 @@ mod binary_port; mod transactions; use std::{ - collections::{BTreeMap, BTreeSet}, + collections::{BTreeMap, BTreeSet, HashMap}, convert::TryFrom, iter, net::SocketAddr, @@ -11,13 +11,22 @@ use std::{ time::Duration, }; +use casper_binary_port::{ + BinaryMessage, BinaryMessageCodec, BinaryRequest, BinaryRequestHeader, + BinaryResponseAndRequest, InformationRequest, Uptime, +}; use either::Either; +use futures::{SinkExt, StreamExt}; use num::Zero; use num_rational::Ratio; use num_traits::One; use rand::Rng; use tempfile::TempDir; -use tokio::time::{self, error::Elapsed}; +use tokio::{ + net::TcpStream, + time::{self, error::Elapsed, timeout}, +}; +use tokio_util::codec::Framed; use tracing::{error, info}; use casper_storage::{ @@ -29,6 +38,7 @@ use casper_storage::{ global_state::state::{StateProvider, StateReader}, }; use casper_types::{ + bytesrepr::{FromBytes, ToBytes}, execution::{ExecutionResult, ExecutionResultV2, TransformKindV2, TransformV2}, system::{ auction::{BidAddr, BidKind, BidsExt, DelegationRate, DelegatorKind}, @@ -38,9 +48,9 @@ use casper_types::{ AccountConfig, AccountsConfig, ActivationPoint, AddressableEntityHash, AvailableBlockRange, Block, BlockHash, BlockHeader, BlockV2, CLValue, Chainspec, ChainspecRawBytes, ConsensusProtocolName, Deploy, EraId, FeeHandling, Gas, HoldBalanceHandling, Key, Motes, - NextUpgrade, PricingHandling, PricingMode, ProtocolVersion, PublicKey, RefundHandling, Rewards, - SecretKey, StoredValue, SystemHashRegistry, TimeDiff, Timestamp, Transaction, TransactionHash, - TransactionV1Config, ValidatorConfig, U512, + NextUpgrade, Peers, PricingHandling, PricingMode, ProtocolVersion, PublicKey, RefundHandling, + Rewards, SecretKey, StoredValue, SystemHashRegistry, TimeDiff, Timestamp, Transaction, + TransactionHash, TransactionV1Config, ValidatorConfig, U512, }; use crate::{ @@ -121,6 +131,7 @@ struct ConfigsOverride { chain_name: Option, gas_hold_balance_handling: Option, transaction_v1_override: Option, + node_config_override: NodeConfigOverride, } impl ConfigsOverride { @@ -209,6 +220,11 @@ impl ConfigsOverride { } } +#[derive(Clone, Default)] +struct NodeConfigOverride { + sync_handling_override: Option, +} + impl Default for ConfigsOverride { fn default() -> Self { ConfigsOverride { @@ -237,6 +253,7 @@ impl Default for ConfigsOverride { chain_name: None, gas_hold_balance_handling: None, transaction_v1_override: None, + node_config_override: NodeConfigOverride::default(), } } } @@ -357,6 +374,7 @@ impl TestFixture { chain_name, gas_hold_balance_handling, transaction_v1_override, + node_config_override, } = spec_override.unwrap_or_default(); if era_duration != TimeDiff::from_millis(0) { chainspec.core_config.era_duration = era_duration; @@ -422,8 +440,12 @@ impl TestFixture { }; for secret_key in secret_keys { - let (config, storage_dir) = - fixture.create_node_config(secret_key.as_ref(), None, storage_multiplier); + let (config, storage_dir) = fixture.create_node_config( + secret_key.as_ref(), + None, + storage_multiplier, + node_config_override.clone(), + ); fixture.add_node(secret_key, config, storage_dir).await; } @@ -545,6 +567,7 @@ impl TestFixture { secret_key: &SecretKey, maybe_trusted_hash: Option, storage_multiplier: u8, + node_config_override: NodeConfigOverride, ) -> (Config, TempDir) { // Set the network configuration. let network_cfg = match self.node_contexts.first() { @@ -569,6 +592,12 @@ impl TestFixture { }, ..Default::default() }; + let NodeConfigOverride { + sync_handling_override, + } = node_config_override; + if let Some(sync_handling) = sync_handling_override { + cfg.node.sync_handling = sync_handling; + } // Additionally set up storage in a temporary directory. let (storage_cfg, temp_dir) = storage::Config::new_for_tests(storage_multiplier); @@ -1156,7 +1185,12 @@ async fn historical_sync_with_era_height_1() { // Create a joiner node. let secret_key = SecretKey::random(&mut fixture.rng); let trusted_hash = *fixture.highest_complete_block().hash(); - let (mut config, storage_dir) = fixture.create_node_config(&secret_key, Some(trusted_hash), 1); + let (mut config, storage_dir) = fixture.create_node_config( + &secret_key, + Some(trusted_hash), + 1, + NodeConfigOverride::default(), + ); config.node.sync_handling = SyncHandling::Genesis; let joiner_id = fixture .add_node(Arc::new(secret_key), config, storage_dir) @@ -1213,7 +1247,12 @@ async fn should_not_historical_sync_no_sync_node() { ); info!("joining node using block {trusted_height} {trusted_hash}"); let secret_key = SecretKey::random(&mut fixture.rng); - let (mut config, storage_dir) = fixture.create_node_config(&secret_key, Some(trusted_hash), 1); + let (mut config, storage_dir) = fixture.create_node_config( + &secret_key, + Some(trusted_hash), + 1, + NodeConfigOverride::default(), + ); config.node.sync_handling = SyncHandling::NoSync; let joiner_id = fixture .add_node(Arc::new(secret_key), config, storage_dir) @@ -1297,7 +1336,12 @@ async fn should_catch_up_and_shutdown() { info!("joining node using block {trusted_height} {trusted_hash}"); let secret_key = SecretKey::random(&mut fixture.rng); - let (mut config, storage_dir) = fixture.create_node_config(&secret_key, Some(trusted_hash), 1); + let (mut config, storage_dir) = fixture.create_node_config( + &secret_key, + Some(trusted_hash), + 1, + NodeConfigOverride::default(), + ); config.node.sync_handling = SyncHandling::CompleteBlock; let joiner_id = fixture .add_node(Arc::new(secret_key), config, storage_dir) @@ -1339,6 +1383,161 @@ async fn should_catch_up_and_shutdown() { ); } +fn network_is_in_keepup( + nodes: &HashMap>>>, +) -> bool { + nodes + .values() + .all(|node| node.reactor().inner().inner().state == ReactorState::KeepUp) +} + +const MESSAGE_SIZE: u32 = 1024 * 1024 * 10; + +async fn setup_network_and_get_binary_port_handle( + initial_stakes: InitialStakes, + spec_override: ConfigsOverride, +) -> ( + Framed, + impl futures::Future>, TestRng)>, +) { + let mut fixture = timeout( + Duration::from_secs(10), + TestFixture::new(initial_stakes, Some(spec_override)), + ) + .await + .unwrap(); + let mut rng = fixture.rng_mut().create_child(); + let net = fixture.network_mut(); + net.settle_on(&mut rng, network_is_in_keepup, Duration::from_secs(59)) + .await; + let (_, first_node) = net + .nodes() + .iter() + .next() + .expect("should have at least one node"); + let binary_port_addr = first_node + .main_reactor() + .binary_port + .bind_address() + .unwrap(); + let finish_cranking = fixture.run_until_stopped(rng.create_child()); + let address = format!("localhost:{}", binary_port_addr.port()); + let stream = TcpStream::connect(address.clone()) + .await + .expect("should create stream"); + let client = Framed::new(stream, BinaryMessageCodec::new(MESSAGE_SIZE)); + (client, finish_cranking) +} + +#[tokio::test] +async fn should_start_in_isolation() { + let initial_stakes = InitialStakes::Random { count: 1 }; + let spec_override = ConfigsOverride { + node_config_override: NodeConfigOverride { + sync_handling_override: Some(SyncHandling::Isolated), + }, + ..Default::default() + }; + let (mut client, finish_cranking) = + setup_network_and_get_binary_port_handle(initial_stakes, spec_override).await; + + let uptime_request_bytes = { + let request = BinaryRequest::Get( + InformationRequest::Uptime + .try_into() + .expect("should convert"), + ); + let header = + BinaryRequestHeader::new(ProtocolVersion::from_parts(2, 0, 0), request.tag(), 1_u16); + let header_bytes = ToBytes::to_bytes(&header).expect("should serialize"); + header_bytes + .iter() + .chain( + ToBytes::to_bytes(&request) + .expect("should serialize") + .iter(), + ) + .cloned() + .collect::>() + }; + client + .send(BinaryMessage::new(uptime_request_bytes)) + .await + .expect("should send message"); + let response = timeout(Duration::from_secs(20), client.next()) + .await + .unwrap_or_else(|err| panic!("should complete uptime request without timeout: {}", err)) + .unwrap_or_else(|| panic!("should have bytes")) + .unwrap_or_else(|err| panic!("should have ok response: {}", err)); + let (binary_response_and_request, _): (BinaryResponseAndRequest, _) = + FromBytes::from_bytes(response.payload()).expect("should deserialize response"); + let response = binary_response_and_request.response().payload(); + let (uptime, remainder): (Uptime, _) = + FromBytes::from_bytes(response).expect("Peers should be deserializable"); + assert!(remainder.is_empty()); + assert!(uptime.into_inner() > 0); + let (_net, _rng) = timeout(Duration::from_secs(20), finish_cranking) + .await + .unwrap_or_else(|_| panic!("should finish cranking without timeout")); +} + +#[tokio::test] +async fn should_be_peerless_in_isolation() { + let initial_stakes = InitialStakes::Random { count: 1 }; + let spec_override = ConfigsOverride { + node_config_override: NodeConfigOverride { + sync_handling_override: Some(SyncHandling::Isolated), + }, + ..Default::default() + }; + let (mut client, finish_cranking) = + setup_network_and_get_binary_port_handle(initial_stakes, spec_override).await; + + let peers_request_bytes = { + let request = BinaryRequest::Get( + InformationRequest::Peers + .try_into() + .expect("should convert"), + ); + let header = + BinaryRequestHeader::new(ProtocolVersion::from_parts(2, 0, 0), request.tag(), 1_u16); + let header_bytes = ToBytes::to_bytes(&header).expect("should serialize"); + header_bytes + .iter() + .chain( + ToBytes::to_bytes(&request) + .expect("should serialize") + .iter(), + ) + .cloned() + .collect::>() + }; + client + .send(BinaryMessage::new(peers_request_bytes)) + .await + .expect("should send message"); + let response = timeout(Duration::from_secs(20), client.next()) + .await + .unwrap_or_else(|err| panic!("should complete peers request without timeout: {}", err)) + .unwrap_or_else(|| panic!("should have bytes")) + .unwrap_or_else(|err| panic!("should have ok response: {}", err)); + let (binary_response_and_request, _): (BinaryResponseAndRequest, _) = + FromBytes::from_bytes(response.payload()).expect("should deserialize response"); + let response = binary_response_and_request.response().payload(); + + let (peers, remainder): (Peers, _) = + FromBytes::from_bytes(response).expect("Peers should be deserializable"); + assert!(remainder.is_empty()); + assert!( + peers.into_inner().is_empty(), + "should not have peers in isolated mode" + ); + + let (_net, _rng) = timeout(Duration::from_secs(20), finish_cranking) + .await + .unwrap_or_else(|_| panic!("should finish cranking without timeout")); +} + #[tokio::test] async fn run_equivocator_network() { let mut rng = crate::new_rng(); diff --git a/node/src/types/node_config.rs b/node/src/types/node_config.rs index cec55c6437..f0f2081a3d 100644 --- a/node/src/types/node_config.rs +++ b/node/src/types/node_config.rs @@ -10,7 +10,7 @@ const DEFAULT_SHUTDOWN_FOR_UPGRADE_TIMEOUT: &str = "2min"; const DEFAULT_UPGRADE_TIMEOUT: &str = "30sec"; /// Node sync configuration. -#[derive(DataSize, Debug, Deserialize, Serialize, Clone, Default)] +#[derive(DataSize, Debug, Deserialize, Serialize, Clone, Default, Eq, PartialEq)] #[serde(rename_all = "lowercase")] pub enum SyncHandling { /// Attempt to acquire all historical state back to genesis. @@ -23,6 +23,9 @@ pub enum SyncHandling { /// Don't attempt to sync historical blocks and shut down node instead of switching to KeepUp /// after acquiring the first complete block CompleteBlock, + /// The node operates in isolation - no peers are needed, the node won't wait for peers to + /// switch to KeepUp. + Isolated, } impl SyncHandling { @@ -45,6 +48,11 @@ impl SyncHandling { pub fn is_complete_block(&self) -> bool { matches!(self, SyncHandling::CompleteBlock) } + + /// Isolated? + pub fn is_isolated(&self) -> bool { + matches!(self, SyncHandling::Isolated) + } } /// Node fast-sync configuration. diff --git a/resources/local/config.toml b/resources/local/config.toml index 64eaa48f7e..3110f23a84 100644 --- a/resources/local/config.toml +++ b/resources/local/config.toml @@ -10,6 +10,7 @@ # 'genesis' (node will attempt to acquire all block data back to genesis) # 'ttl' (node will attempt to acquire all block data to comply with time to live enforcement) # 'nosync' (node will only acquire blocks moving forward) +# 'isolated' (node will initialize without peers and will not accept peers) # note: ttl is a chainsepc configured behavior on a given network; consult the `max_ttl` chainspec setting # (it is currently ~18 hours by default on production and production-like networks but subject to change). # note: `nosync` is incompatible with validator behavior; a nosync node is prevented from participating diff --git a/resources/production/config-example.toml b/resources/production/config-example.toml index 530632f9e5..1a56bafb5c 100644 --- a/resources/production/config-example.toml +++ b/resources/production/config-example.toml @@ -8,13 +8,17 @@ # Historical sync behavior for this node. Options are: # 'genesis' (node will attempt to acquire all block data back to genesis) +# note: as time goes on, the time to sync all the way back to genesis takes progressively longer. # 'ttl' (node will attempt to acquire all block data to comply with time to live enforcement) -# 'nosync' (node will only acquire blocks moving forward) # note: ttl is a chainsepc configured behavior on a given network; consult the `max_ttl` chainspec setting # (it is currently ~18 hours by default on production and production-like networks but subject to change). +# 'nosync' (node will only acquire blocks moving forward) # note: `nosync` is incompatible with validator behavior; a nosync node is prevented from participating # in consensus / switching to validate mode. it is primarily for lightweight nodes that are # only interested in recent activity. +# 'isolated' (node will initialize without peers and will not accept peers) +# note: an isolated node will not connect to, sync with, or keep up with the network, but will respond to +# binary port, rest server, event server, and diagnostic port connections. sync_handling = 'ttl' # Idle time after which the syncing process is considered stalled. diff --git a/types/src/bytesrepr.rs b/types/src/bytesrepr.rs index 2ca8547a0b..fef64fdda6 100644 --- a/types/src/bytesrepr.rs +++ b/types/src/bytesrepr.rs @@ -159,10 +159,6 @@ impl Display for Error { } impl ToBytes for Error { - fn write_bytes(&self, writer: &mut Vec) -> Result<(), Error> { - (*self as u8).write_bytes(writer) - } - fn to_bytes(&self) -> Result, Error> { (*self as u8).to_bytes() } @@ -170,6 +166,10 @@ impl ToBytes for Error { fn serialized_length(&self) -> usize { U8_SERIALIZED_LENGTH } + + fn write_bytes(&self, writer: &mut Vec) -> Result<(), Error> { + (*self as u8).write_bytes(writer) + } } impl FromBytes for Error { @@ -541,7 +541,7 @@ fn vec_from_vec(bytes: Vec) -> Result<(Vec, Vec), Error /// 4096 bytes. This function will never return less than 1. #[inline] fn cautious(hint: usize) -> usize { - let el_size = core::mem::size_of::(); + let el_size = mem::size_of::(); core::cmp::max(core::cmp::min(hint, 4096 / el_size), 1) } diff --git a/types/src/peers_map.rs b/types/src/peers_map.rs index 1ac5da20a1..caaaf1c4c5 100644 --- a/types/src/peers_map.rs +++ b/types/src/peers_map.rs @@ -134,4 +134,18 @@ mod tests { let val = Peers::random(rng); bytesrepr::test_serialization_roundtrip(&val); } + + #[test] + fn bytesrepr_empty_roundtrip() { + let val = Peers(vec![]); + bytesrepr::test_serialization_roundtrip(&val); + } + + #[test] + fn bytesrepr_empty_vec_should_have_count_0() { + let val = Peers(vec![]); + let x = Peers::to_bytes(&val).expect("should have vec"); + let (count, _) = u32::from_bytes(&x).expect("should have count"); + assert!(count == 0, "count should be 0"); + } }