Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added "Isolated" mode for the sync_handling. Running the node in this… #5021

Open
wants to merge 1 commit into
base: feat-2.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions node/src/reactor/main_reactor/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ impl MainReactor {
match self.initialize_next_component(effect_builder) {
Some(effects) => (initialization_logic_default_delay.into(), effects),
None => {
if self.sync_handling.is_isolated() {
// If node is "isolated" it doesn't care about peers
if let Err(msg) = self.refresh_contract_runtime() {
return (
Duration::ZERO,
fatal!(effect_builder, "{}", msg).ignore(),
);
}
self.state = ReactorState::KeepUp;
return (Duration::ZERO, Effects::new());
}
if false == self.net.has_sufficient_fully_connected_peers() {
info!("Initialize: awaiting sufficient fully-connected peers");
return (initialization_logic_default_delay.into(), Effects::new());
Expand Down
28 changes: 18 additions & 10 deletions node/src/reactor/main_reactor/keep_up.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,16 +236,24 @@ impl MainReactor {
) -> Option<KeepUpInstruction> {
match sync_instruction {
SyncInstruction::Leap { .. } | SyncInstruction::LeapIntervalElapsed { .. } => {
// the block accumulator is unsure what our block position is relative to the
// network and wants to check peers for their notion of current tip.
// to do this, we switch back to CatchUp which will engage the necessary
// machinery to poll the network via the SyncLeap mechanic. if it turns out
// we are actually at or near tip after all, we simply switch back to KeepUp
// and continue onward. the accumulator is designed to periodically do this
// if we've received no gossip about new blocks from peers within an interval.
// this is to protect against partitioning and is not problematic behavior
// when / if it occurs.
Some(KeepUpInstruction::CatchUp)
if !self.sync_handling.is_isolated() {
// the block accumulator is unsure what our block position is relative to the
// network and wants to check peers for their notion of current tip.
// to do this, we switch back to CatchUp which will engage the necessary
// machinery to poll the network via the SyncLeap mechanic. if it turns out
// we are actually at or near tip after all, we simply switch back to KeepUp
// and continue onward. the accumulator is designed to periodically do this
// if we've received no gossip about new blocks from peers within an interval.
// this is to protect against partitioning and is not problematic behavior
// when / if it occurs.
Some(KeepUpInstruction::CatchUp)
} else {
// If the node operates in isolated mode the assumption is that it might not
// have any peers. So going back to CatchUp to query their
// notion of tip might effectively disable nodes components to respond.
// That's why - for isolated mode - we bypass this mechanism.
None
}
}
SyncInstruction::BlockSync { block_hash } => {
debug!("KeepUp: BlockSync: {:?}", block_hash);
Expand Down
151 changes: 144 additions & 7 deletions node/src/reactor/main_reactor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod binary_port;
mod transactions;

use std::{
collections::{BTreeMap, BTreeSet},
collections::{BTreeMap, BTreeSet, HashMap},
convert::TryFrom,
iter,
net::SocketAddr,
Expand All @@ -11,13 +11,22 @@ use std::{
time::Duration,
};

use casper_binary_port::{
BinaryMessage, BinaryMessageCodec, BinaryRequest, BinaryRequestHeader, InformationRequest,
Uptime,
};
use either::Either;
use futures::{SinkExt, StreamExt};
use num::Zero;
use num_rational::Ratio;
use num_traits::One;
use rand::Rng;
use tempfile::TempDir;
use tokio::time::{self, error::Elapsed};
use tokio::{
net::TcpStream,
time::{self, error::Elapsed, timeout},
};
use tokio_util::codec::Framed;
use tracing::{error, info};

use casper_storage::{
Expand All @@ -29,6 +38,7 @@ use casper_storage::{
global_state::state::{StateProvider, StateReader},
};
use casper_types::{
bytesrepr::{FromBytes, ToBytes},
execution::{ExecutionResult, ExecutionResultV2, TransformKindV2, TransformV2},
system::{
auction::{BidAddr, BidKind, BidsExt, DelegationRate, DelegatorKind},
Expand Down Expand Up @@ -121,6 +131,7 @@ struct ConfigsOverride {
chain_name: Option<String>,
gas_hold_balance_handling: Option<HoldBalanceHandling>,
transaction_v1_override: Option<TransactionV1Config>,
node_config_override: NodeConfigOverride,
}

impl ConfigsOverride {
Expand Down Expand Up @@ -209,6 +220,11 @@ impl ConfigsOverride {
}
}

#[derive(Clone, Default)]
struct NodeConfigOverride {
sync_handling_override: Option<SyncHandling>,
}

impl Default for ConfigsOverride {
fn default() -> Self {
ConfigsOverride {
Expand Down Expand Up @@ -237,6 +253,7 @@ impl Default for ConfigsOverride {
chain_name: None,
gas_hold_balance_handling: None,
transaction_v1_override: None,
node_config_override: NodeConfigOverride::default(),
}
}
}
Expand Down Expand Up @@ -357,6 +374,7 @@ impl TestFixture {
chain_name,
gas_hold_balance_handling,
transaction_v1_override,
node_config_override,
} = spec_override.unwrap_or_default();
if era_duration != TimeDiff::from_millis(0) {
chainspec.core_config.era_duration = era_duration;
Expand Down Expand Up @@ -422,8 +440,12 @@ impl TestFixture {
};

for secret_key in secret_keys {
let (config, storage_dir) =
fixture.create_node_config(secret_key.as_ref(), None, storage_multiplier);
let (config, storage_dir) = fixture.create_node_config(
secret_key.as_ref(),
None,
storage_multiplier,
node_config_override.clone(),
);
fixture.add_node(secret_key, config, storage_dir).await;
}

Expand Down Expand Up @@ -545,6 +567,7 @@ impl TestFixture {
secret_key: &SecretKey,
maybe_trusted_hash: Option<BlockHash>,
storage_multiplier: u8,
node_config_override: NodeConfigOverride,
) -> (Config, TempDir) {
// Set the network configuration.
let network_cfg = match self.node_contexts.first() {
Expand All @@ -569,6 +592,12 @@ impl TestFixture {
},
..Default::default()
};
let NodeConfigOverride {
sync_handling_override,
} = node_config_override;
if let Some(sync_handling) = sync_handling_override {
cfg.node.sync_handling = sync_handling;
}

// Additionally set up storage in a temporary directory.
let (storage_cfg, temp_dir) = storage::Config::new_for_tests(storage_multiplier);
Expand Down Expand Up @@ -1156,7 +1185,12 @@ async fn historical_sync_with_era_height_1() {
// Create a joiner node.
let secret_key = SecretKey::random(&mut fixture.rng);
let trusted_hash = *fixture.highest_complete_block().hash();
let (mut config, storage_dir) = fixture.create_node_config(&secret_key, Some(trusted_hash), 1);
let (mut config, storage_dir) = fixture.create_node_config(
&secret_key,
Some(trusted_hash),
1,
NodeConfigOverride::default(),
);
config.node.sync_handling = SyncHandling::Genesis;
let joiner_id = fixture
.add_node(Arc::new(secret_key), config, storage_dir)
Expand Down Expand Up @@ -1213,7 +1247,12 @@ async fn should_not_historical_sync_no_sync_node() {
);
info!("joining node using block {trusted_height} {trusted_hash}");
let secret_key = SecretKey::random(&mut fixture.rng);
let (mut config, storage_dir) = fixture.create_node_config(&secret_key, Some(trusted_hash), 1);
let (mut config, storage_dir) = fixture.create_node_config(
&secret_key,
Some(trusted_hash),
1,
NodeConfigOverride::default(),
);
config.node.sync_handling = SyncHandling::NoSync;
let joiner_id = fixture
.add_node(Arc::new(secret_key), config, storage_dir)
Expand Down Expand Up @@ -1297,7 +1336,12 @@ async fn should_catch_up_and_shutdown() {

info!("joining node using block {trusted_height} {trusted_hash}");
let secret_key = SecretKey::random(&mut fixture.rng);
let (mut config, storage_dir) = fixture.create_node_config(&secret_key, Some(trusted_hash), 1);
let (mut config, storage_dir) = fixture.create_node_config(
&secret_key,
Some(trusted_hash),
1,
NodeConfigOverride::default(),
);
config.node.sync_handling = SyncHandling::CompleteBlock;
let joiner_id = fixture
.add_node(Arc::new(secret_key), config, storage_dir)
Expand Down Expand Up @@ -1339,6 +1383,99 @@ async fn should_catch_up_and_shutdown() {
);
}

fn network_is_in_keepup(
nodes: &HashMap<NodeId, Runner<ConditionCheckReactor<FilterReactor<MainReactor>>>>,
) -> bool {
nodes
.values()
.all(|node| node.reactor().inner().inner().state == ReactorState::KeepUp)
}

const MESSAGE_SIZE: u32 = 1024 * 1024 * 10;

async fn setup_network_and_get_binary_port_handle(
initial_stakes: InitialStakes,
spec_override: ConfigsOverride,
) -> (
Framed<TcpStream, BinaryMessageCodec>,
impl futures::Future<Output = (TestingNetwork<FilterReactor<MainReactor>>, TestRng)>,
) {
let mut fixture = timeout(
Duration::from_secs(10),
TestFixture::new(initial_stakes, Some(spec_override)),
)
.await
.unwrap();
let mut rng = fixture.rng_mut().create_child();
let net = fixture.network_mut();
net.settle_on(&mut rng, network_is_in_keepup, Duration::from_secs(59))
.await;
let (_, first_node) = net
.nodes()
.iter()
.next()
.expect("should have at least one node");
let binary_port_addr = first_node
.main_reactor()
.binary_port
.bind_address()
.unwrap();
let finish_cranking = fixture.run_until_stopped(rng.create_child());
let address = format!("localhost:{}", binary_port_addr.port());
let stream = TcpStream::connect(address.clone())
.await
.expect("should create stream");
let client = Framed::new(stream, BinaryMessageCodec::new(MESSAGE_SIZE));
(client, finish_cranking)
}

#[tokio::test]
async fn should_start_in_isolation() {
let initial_stakes = InitialStakes::Random { count: 1 };
let spec_override = ConfigsOverride {
node_config_override: NodeConfigOverride {
sync_handling_override: Some(SyncHandling::Isolated),
},
..Default::default()
};
let (mut client, finish_cranking) =
setup_network_and_get_binary_port_handle(initial_stakes, spec_override).await;
let request = BinaryRequest::Get(
InformationRequest::Uptime
.try_into()
.expect("should convert"),
);
let header =
BinaryRequestHeader::new(ProtocolVersion::from_parts(2, 0, 0), request.tag(), 1_u16);
let header_bytes = ToBytes::to_bytes(&header).expect("should serialize");
let original_request_bytes = header_bytes
.iter()
.chain(
ToBytes::to_bytes(&request)
.expect("should serialize")
.iter(),
)
.cloned()
.collect::<Vec<_>>();
client
.send(BinaryMessage::new(original_request_bytes.clone()))
.await
.expect("should send message");
let response = timeout(Duration::from_secs(10), client.next())
.await
.unwrap_or_else(|err| panic!("should complete without timeout: {}", err))
.unwrap_or_else(|| panic!("should have bytes"))
.unwrap_or_else(|err| panic!("should have ok response: {}", err));
let uptime: Uptime = FromBytes::from_bytes(response.payload())
.expect("Uptime should be deserializable")
.0;
assert!(uptime.into_inner() > 0);

let (_net, _rng) = timeout(Duration::from_secs(10), finish_cranking)
.await
.unwrap_or_else(|_| panic!("should finish cranking without timeout"));
}

#[tokio::test]
async fn run_equivocator_network() {
let mut rng = crate::new_rng();
Expand Down
8 changes: 8 additions & 0 deletions node/src/types/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub enum SyncHandling {
/// Don't attempt to sync historical blocks and shut down node instead of switching to KeepUp
/// after acquiring the first complete block
CompleteBlock,
/// The node operates in isolation - no peers are needed, the node won't wait for peers to
/// switch to KeepUp.
Isolated,
}

impl SyncHandling {
Expand All @@ -45,6 +48,11 @@ impl SyncHandling {
pub fn is_complete_block(&self) -> bool {
matches!(self, SyncHandling::CompleteBlock)
}

/// Isolated?
pub fn is_isolated(&self) -> bool {
matches!(self, SyncHandling::Isolated)
}
}

/// Node fast-sync configuration.
Expand Down
Loading