Skip to content

Commit

Permalink
Added "Isolated" mode for the sync_handling. Running the node in this…
Browse files Browse the repository at this point in the history
… mode will make the node not care about peers when switching to KeepUp mode. This allows to set up a node with existing lmdb database and query data through the binary port.
  • Loading branch information
Jakub Zajkowski committed Dec 12, 2024
1 parent c881b11 commit 48d56a9
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 17 deletions.
11 changes: 11 additions & 0 deletions node/src/reactor/main_reactor/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ impl MainReactor {
match self.initialize_next_component(effect_builder) {
Some(effects) => (initialization_logic_default_delay.into(), effects),
None => {
if self.sync_handling.is_isolated() {
// If node is "isolated" it doesn't care about peers
if let Err(msg) = self.refresh_contract_runtime() {
return (
Duration::ZERO,
fatal!(effect_builder, "{}", msg).ignore(),
);
}
self.state = ReactorState::KeepUp;
return (Duration::ZERO, Effects::new());
}
if false == self.net.has_sufficient_fully_connected_peers() {
info!("Initialize: awaiting sufficient fully-connected peers");
return (initialization_logic_default_delay.into(), Effects::new());
Expand Down
28 changes: 18 additions & 10 deletions node/src/reactor/main_reactor/keep_up.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,16 +236,24 @@ impl MainReactor {
) -> Option<KeepUpInstruction> {
match sync_instruction {
SyncInstruction::Leap { .. } | SyncInstruction::LeapIntervalElapsed { .. } => {
// the block accumulator is unsure what our block position is relative to the
// network and wants to check peers for their notion of current tip.
// to do this, we switch back to CatchUp which will engage the necessary
// machinery to poll the network via the SyncLeap mechanic. if it turns out
// we are actually at or near tip after all, we simply switch back to KeepUp
// and continue onward. the accumulator is designed to periodically do this
// if we've received no gossip about new blocks from peers within an interval.
// this is to protect against partitioning and is not problematic behavior
// when / if it occurs.
Some(KeepUpInstruction::CatchUp)
if !self.sync_handling.is_isolated() {
// the block accumulator is unsure what our block position is relative to the
// network and wants to check peers for their notion of current tip.
// to do this, we switch back to CatchUp which will engage the necessary
// machinery to poll the network via the SyncLeap mechanic. if it turns out
// we are actually at or near tip after all, we simply switch back to KeepUp
// and continue onward. the accumulator is designed to periodically do this
// if we've received no gossip about new blocks from peers within an interval.
// this is to protect against partitioning and is not problematic behavior
// when / if it occurs.
Some(KeepUpInstruction::CatchUp)
} else {
// If the node operates in isolated mode the assumption is that it might not
// have any peers. So going back to CatchUp to query their
// notion of tip might effectively disable nodes components to respond.
// That's why - for isolated mode - we bypass this mechanism.
None
}
}
SyncInstruction::BlockSync { block_hash } => {
debug!("KeepUp: BlockSync: {:?}", block_hash);
Expand Down
151 changes: 144 additions & 7 deletions node/src/reactor/main_reactor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod binary_port;
mod transactions;

use std::{
collections::{BTreeMap, BTreeSet},
collections::{BTreeMap, BTreeSet, HashMap},
convert::TryFrom,
iter,
net::SocketAddr,
Expand All @@ -11,13 +11,22 @@ use std::{
time::Duration,
};

use casper_binary_port::{
BinaryMessage, BinaryMessageCodec, BinaryRequest, BinaryRequestHeader, InformationRequest,
Uptime,
};
use either::Either;
use futures::{SinkExt, StreamExt};
use num::Zero;
use num_rational::Ratio;
use num_traits::One;
use rand::Rng;
use tempfile::TempDir;
use tokio::time::{self, error::Elapsed};
use tokio::{
net::TcpStream,
time::{self, error::Elapsed, timeout},
};
use tokio_util::codec::Framed;
use tracing::{error, info};

use casper_storage::{
Expand All @@ -29,6 +38,7 @@ use casper_storage::{
global_state::state::{StateProvider, StateReader},
};
use casper_types::{
bytesrepr::{FromBytes, ToBytes},
execution::{ExecutionResult, ExecutionResultV2, TransformKindV2, TransformV2},
system::{
auction::{BidAddr, BidKind, BidsExt, DelegationRate, DelegatorKind},
Expand Down Expand Up @@ -121,6 +131,7 @@ struct ConfigsOverride {
chain_name: Option<String>,
gas_hold_balance_handling: Option<HoldBalanceHandling>,
transaction_v1_override: Option<TransactionV1Config>,
node_config_override: NodeConfigOverride,
}

impl ConfigsOverride {
Expand Down Expand Up @@ -209,6 +220,11 @@ impl ConfigsOverride {
}
}

#[derive(Clone, Default)]
struct NodeConfigOverride {
sync_handling_override: Option<SyncHandling>,
}

impl Default for ConfigsOverride {
fn default() -> Self {
ConfigsOverride {
Expand Down Expand Up @@ -237,6 +253,7 @@ impl Default for ConfigsOverride {
chain_name: None,
gas_hold_balance_handling: None,
transaction_v1_override: None,
node_config_override: NodeConfigOverride::default(),
}
}
}
Expand Down Expand Up @@ -357,6 +374,7 @@ impl TestFixture {
chain_name,
gas_hold_balance_handling,
transaction_v1_override,
node_config_override,
} = spec_override.unwrap_or_default();
if era_duration != TimeDiff::from_millis(0) {
chainspec.core_config.era_duration = era_duration;
Expand Down Expand Up @@ -422,8 +440,12 @@ impl TestFixture {
};

for secret_key in secret_keys {
let (config, storage_dir) =
fixture.create_node_config(secret_key.as_ref(), None, storage_multiplier);
let (config, storage_dir) = fixture.create_node_config(
secret_key.as_ref(),
None,
storage_multiplier,
node_config_override.clone(),
);
fixture.add_node(secret_key, config, storage_dir).await;
}

Expand Down Expand Up @@ -545,6 +567,7 @@ impl TestFixture {
secret_key: &SecretKey,
maybe_trusted_hash: Option<BlockHash>,
storage_multiplier: u8,
node_config_override: NodeConfigOverride,
) -> (Config, TempDir) {
// Set the network configuration.
let network_cfg = match self.node_contexts.first() {
Expand All @@ -569,6 +592,12 @@ impl TestFixture {
},
..Default::default()
};
let NodeConfigOverride {
sync_handling_override,
} = node_config_override;
if let Some(sync_handling) = sync_handling_override {
cfg.node.sync_handling = sync_handling;
}

// Additionally set up storage in a temporary directory.
let (storage_cfg, temp_dir) = storage::Config::new_for_tests(storage_multiplier);
Expand Down Expand Up @@ -1156,7 +1185,12 @@ async fn historical_sync_with_era_height_1() {
// Create a joiner node.
let secret_key = SecretKey::random(&mut fixture.rng);
let trusted_hash = *fixture.highest_complete_block().hash();
let (mut config, storage_dir) = fixture.create_node_config(&secret_key, Some(trusted_hash), 1);
let (mut config, storage_dir) = fixture.create_node_config(
&secret_key,
Some(trusted_hash),
1,
NodeConfigOverride::default(),
);
config.node.sync_handling = SyncHandling::Genesis;
let joiner_id = fixture
.add_node(Arc::new(secret_key), config, storage_dir)
Expand Down Expand Up @@ -1213,7 +1247,12 @@ async fn should_not_historical_sync_no_sync_node() {
);
info!("joining node using block {trusted_height} {trusted_hash}");
let secret_key = SecretKey::random(&mut fixture.rng);
let (mut config, storage_dir) = fixture.create_node_config(&secret_key, Some(trusted_hash), 1);
let (mut config, storage_dir) = fixture.create_node_config(
&secret_key,
Some(trusted_hash),
1,
NodeConfigOverride::default(),
);
config.node.sync_handling = SyncHandling::NoSync;
let joiner_id = fixture
.add_node(Arc::new(secret_key), config, storage_dir)
Expand Down Expand Up @@ -1297,7 +1336,12 @@ async fn should_catch_up_and_shutdown() {

info!("joining node using block {trusted_height} {trusted_hash}");
let secret_key = SecretKey::random(&mut fixture.rng);
let (mut config, storage_dir) = fixture.create_node_config(&secret_key, Some(trusted_hash), 1);
let (mut config, storage_dir) = fixture.create_node_config(
&secret_key,
Some(trusted_hash),
1,
NodeConfigOverride::default(),
);
config.node.sync_handling = SyncHandling::CompleteBlock;
let joiner_id = fixture
.add_node(Arc::new(secret_key), config, storage_dir)
Expand Down Expand Up @@ -1339,6 +1383,99 @@ async fn should_catch_up_and_shutdown() {
);
}

fn network_is_in_keepup(
nodes: &HashMap<NodeId, Runner<ConditionCheckReactor<FilterReactor<MainReactor>>>>,
) -> bool {
nodes
.values()
.all(|node| node.reactor().inner().inner().state == ReactorState::KeepUp)
}

const MESSAGE_SIZE: u32 = 1024 * 1024 * 10;

async fn setup_network_and_get_binary_port_handle(
initial_stakes: InitialStakes,
spec_override: ConfigsOverride,
) -> (
Framed<TcpStream, BinaryMessageCodec>,
impl futures::Future<Output = (TestingNetwork<FilterReactor<MainReactor>>, TestRng)>,
) {
let mut fixture = timeout(
Duration::from_secs(10),
TestFixture::new(initial_stakes, Some(spec_override)),
)
.await
.unwrap();
let mut rng = fixture.rng_mut().create_child();
let net = fixture.network_mut();
net.settle_on(&mut rng, network_is_in_keepup, Duration::from_secs(59))
.await;
let (_, first_node) = net
.nodes()
.iter()
.next()
.expect("should have at least one node");
let binary_port_addr = first_node
.main_reactor()
.binary_port
.bind_address()
.unwrap();
let finish_cranking = fixture.run_until_stopped(rng.create_child());
let address = format!("localhost:{}", binary_port_addr.port());
let stream = TcpStream::connect(address.clone())
.await
.expect("should create stream");
let client = Framed::new(stream, BinaryMessageCodec::new(MESSAGE_SIZE));
(client, finish_cranking)
}

#[tokio::test]
async fn should_start_in_isolation() {
let initial_stakes = InitialStakes::Random { count: 1 };
let spec_override = ConfigsOverride {
node_config_override: NodeConfigOverride {
sync_handling_override: Some(SyncHandling::Isolated),
},
..Default::default()
};
let (mut client, finish_cranking) =
setup_network_and_get_binary_port_handle(initial_stakes, spec_override).await;
let request = BinaryRequest::Get(
InformationRequest::Uptime
.try_into()
.expect("should convert"),
);
let header =
BinaryRequestHeader::new(ProtocolVersion::from_parts(2, 0, 0), request.tag(), 1_u16);
let header_bytes = ToBytes::to_bytes(&header).expect("should serialize");
let original_request_bytes = header_bytes
.iter()
.chain(
ToBytes::to_bytes(&request)
.expect("should serialize")
.iter(),
)
.cloned()
.collect::<Vec<_>>();
client
.send(BinaryMessage::new(original_request_bytes.clone()))
.await
.expect("should send message");
let response = timeout(Duration::from_secs(10), client.next())
.await
.unwrap_or_else(|err| panic!("should complete without timeout: {}", err))
.unwrap_or_else(|| panic!("should have bytes"))
.unwrap_or_else(|err| panic!("should have ok response: {}", err));
let uptime: Uptime = FromBytes::from_bytes(response.payload())
.expect("Uptime should be deserializable")
.0;
assert!(uptime.into_inner() > 0);

let (_net, _rng) = timeout(Duration::from_secs(10), finish_cranking)
.await
.unwrap_or_else(|_| panic!("should finish cranking without timeout"));
}

#[tokio::test]
async fn run_equivocator_network() {
let mut rng = crate::new_rng();
Expand Down
8 changes: 8 additions & 0 deletions node/src/types/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub enum SyncHandling {
/// Don't attempt to sync historical blocks and shut down node instead of switching to KeepUp
/// after acquiring the first complete block
CompleteBlock,
/// The node operates in isolation - no peers are needed, the node won't wait for peers to
/// switch to KeepUp.
Isolated,
}

impl SyncHandling {
Expand All @@ -45,6 +48,11 @@ impl SyncHandling {
pub fn is_complete_block(&self) -> bool {
matches!(self, SyncHandling::CompleteBlock)
}

/// Isolated?
pub fn is_isolated(&self) -> bool {
matches!(self, SyncHandling::Isolated)
}
}

/// Node fast-sync configuration.
Expand Down

0 comments on commit 48d56a9

Please sign in to comment.