Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
5021: Added "Isolated" mode for the sync_handling. Running the node in this… r=EdHastingsCasperAssociation a=zajko

… mode will make the node not care about peers when switching to KeepUp mode. This allows to set up a node with existing lmdb database and query data through the binary port.

This PR should have no downstream impact.


Co-authored-by: Jakub Zajkowski <[email protected]>
Co-authored-by: Ed Hastings <[email protected]>
Co-authored-by: zajko <[email protected]>
  • Loading branch information
4 people authored Dec 17, 2024
2 parents 61b78ea + 79128d4 commit ffb8064
Show file tree
Hide file tree
Showing 15 changed files with 376 additions and 99 deletions.
1 change: 1 addition & 0 deletions node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ All notable changes to this project will be documented in this file. The format
### Added
* Add `BinaryPort` interface along with the relevant config entries.
* Added chainspec settings `finders_fee`, `finality_signature_proportion` and `signature_rewards_max_delay` to control behavior of the new seigniorage model.
* Isolated sync handling, which comes online with only local data and rejects peers. Useful for testing, auditing, and similar scenarios.

### Changed
* All SSE events are emitted via the `<IP:Port>/events` endpoint. None of the previous ones (`/events/main`, `/events/deploys`, and `/events/sigs`) is available any longer.
Expand Down
11 changes: 7 additions & 4 deletions node/src/components/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ where
registry: &Registry,
chain_info_source: C,
validator_matrix: ValidatorMatrix,
allow_handshake: bool,
) -> Result<Network<REv, P>> {
let net_metrics = Arc::new(Metrics::new(registry)?);

Expand Down Expand Up @@ -285,6 +286,7 @@ where
node_key_pair.map(NodeKeyPair::new),
chain_info_source.into(),
&net_metrics,
allow_handshake,
));

let component = Network {
Expand Down Expand Up @@ -662,6 +664,7 @@ where
| ConnectionError::TlsHandshake(_)
| ConnectionError::HandshakeSend(_)
| ConnectionError::HandshakeRecv(_)
| ConnectionError::HandshakeNotAllowed
| ConnectionError::IncompatibleVersion(_) => None,

// These errors are potential bugs on our side.
Expand Down Expand Up @@ -1121,6 +1124,10 @@ where
{
type Event = Event<P>;

fn name(&self) -> &str {
COMPONENT_NAME
}

fn handle_event(
&mut self,
effect_builder: EffectBuilder<REv>,
Expand Down Expand Up @@ -1281,10 +1288,6 @@ where
},
}
}

fn name(&self) -> &str {
COMPONENT_NAME
}
}

impl<REv, P> InitializedComponent<REv> for Network<REv, P>
Expand Down
3 changes: 3 additions & 0 deletions node/src/components/network/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ pub enum ConnectionError {
/// This is usually a bug.
#[error("handshake sink/stream could not be reunited")]
FailedToReuniteHandshakeSinkAndStream,
/// Handshake not allowed (Isolated mode)
#[error("handshake not allowed (Isolated mode)")]
HandshakeNotAllowed,
}

/// IO operation that can time out or close.
Expand Down
12 changes: 12 additions & 0 deletions node/src/components/network/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ where
max_in_flight_demands: usize,
/// Flag indicating whether this node is syncing.
is_syncing: AtomicBool,
/// If false, will not allow handshake.
allow_handshake: bool,
}

impl<REv> NetworkContext<REv> {
Expand All @@ -245,6 +247,7 @@ impl<REv> NetworkContext<REv> {
node_key_pair: Option<NodeKeyPair>,
chain_info: ChainInfo,
net_metrics: &Arc<Metrics>,
allow_handshake: bool,
) -> Self {
// Set the demand max from configuration, regarding `0` as "unlimited".
let max_in_flight_demands = if cfg.max_in_flight_demands == 0 {
Expand Down Expand Up @@ -277,6 +280,7 @@ impl<REv> NetworkContext<REv> {
tarpit_chance: cfg.tarpit_chance,
max_in_flight_demands,
is_syncing: AtomicBool::new(false),
allow_handshake,
}
}

Expand Down Expand Up @@ -363,6 +367,14 @@ where
peer_consensus_public_key,
is_peer_syncing: _,
}) => {
if !context.allow_handshake {
return IncomingConnection::Failed {
peer_addr,
peer_id,
error: ConnectionError::HandshakeNotAllowed,
};
}

if let Some(ref public_key) = peer_consensus_public_key {
Span::current().record("consensus_key", &field::display(public_key));
}
Expand Down
139 changes: 71 additions & 68 deletions node/src/components/network/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ use tracing::{debug, info};
use casper_types::{Chainspec, ChainspecRawBytes, SecretKey};

use super::{
chain_info::ChainInfo, Config, Event as NetworkEvent, FromIncoming, GossipedAddress, Identity,
chain_info::ChainInfo, Event as NetworkEvent, FromIncoming, GossipedAddress, Identity,
MessageKind, Network, Payload,
};
use crate::{
components::{
gossiper::{self, GossipItem, Gossiper},
Component, InitializedComponent,
network, Component, InitializedComponent,
},
effect::{
announcements::{ControlAnnouncement, GossiperAnnouncement, PeerBehaviorAnnouncement},
Expand All @@ -39,13 +39,13 @@ use crate::{
EffectBuilder, Effects,
},
protocol,
reactor::{self, EventQueueHandle, Finalize, Reactor, Runner},
reactor::{self, main_reactor::Config, EventQueueHandle, Finalize, Reactor, Runner},
testing::{
self, init_logging,
network::{NetworkedReactor, Nodes, TestingNetwork},
ConditionCheckReactor,
},
types::{NodeId, ValidatorMatrix},
types::{NodeId, SyncHandling, ValidatorMatrix},
NodeRng,
};

Expand Down Expand Up @@ -182,43 +182,6 @@ impl Reactor for TestReactor {
type Config = Config;
type Error = anyhow::Error;

fn new(
cfg: Self::Config,
_chainspec: Arc<Chainspec>,
_chainspec_raw_bytes: Arc<ChainspecRawBytes>,
our_identity: Identity,
registry: &Registry,
_event_queue: EventQueueHandle<Self::Event>,
rng: &mut NodeRng,
) -> anyhow::Result<(Self, Effects<Self::Event>)> {
let secret_key = SecretKey::random(rng);
let mut net = Network::new(
cfg,
our_identity,
None,
registry,
ChainInfo::create_for_testing(),
ValidatorMatrix::new_with_validator(Arc::new(secret_key)),
)?;
let gossiper_config = gossiper::Config::new_with_small_timeouts();
let address_gossiper = Gossiper::<{ GossipedAddress::ID_IS_COMPLETE_ITEM }, _>::new(
"address_gossiper",
gossiper_config,
registry,
)?;

net.start_initialization();
let effects = smallvec![async { smallvec![Event::Net(NetworkEvent::Initialize)] }.boxed()];

Ok((
TestReactor {
net,
address_gossiper,
},
effects,
))
}

fn dispatch_event(
&mut self,
effect_builder: EffectBuilder<Self::Event>,
Expand Down Expand Up @@ -277,6 +240,45 @@ impl Reactor for TestReactor {
Event::BlocklistAnnouncement(_announcement) => Effects::new(),
}
}

fn new(
cfg: Self::Config,
_chainspec: Arc<Chainspec>,
_chainspec_raw_bytes: Arc<ChainspecRawBytes>,
our_identity: Identity,
registry: &Registry,
_event_queue: EventQueueHandle<Self::Event>,
rng: &mut NodeRng,
) -> anyhow::Result<(Self, Effects<Self::Event>)> {
let secret_key = SecretKey::random(rng);
let allow_handshake = cfg.node.sync_handling != SyncHandling::Isolated;
let mut net = Network::new(
cfg.network.clone(),
our_identity,
None,
registry,
ChainInfo::create_for_testing(),
ValidatorMatrix::new_with_validator(Arc::new(secret_key)),
allow_handshake,
)?;
let gossiper_config = gossiper::Config::new_with_small_timeouts();
let address_gossiper = Gossiper::<{ GossipedAddress::ID_IS_COMPLETE_ITEM }, _>::new(
"address_gossiper",
gossiper_config,
registry,
)?;

net.start_initialization();
let effects = smallvec![async { smallvec![Event::Net(NetworkEvent::Initialize)] }.boxed()];

Ok((
TestReactor {
net,
address_gossiper,
},
effects,
))
}
}

impl NetworkedReactor for TestReactor {
Expand Down Expand Up @@ -351,13 +353,15 @@ async fn run_two_node_network_five_times() {
let mut net = TestingNetwork::new();

let start = Instant::now();
net.add_node_with_config(
Config::default_local_net_first_node(first_node_port),
&mut rng,
)
.await
.unwrap();
net.add_node_with_config(Config::default_local_net(first_node_port), &mut rng)

let cfg = Config::default().with_network_config(
network::Config::default_local_net_first_node(first_node_port),
);
net.add_node_with_config(cfg, &mut rng).await.unwrap();

let cfg = Config::default()
.with_network_config(network::Config::default_local_net(first_node_port));
net.add_node_with_config(cfg.clone(), &mut rng)
.await
.unwrap();
let end = Instant::now();
Expand Down Expand Up @@ -417,12 +421,11 @@ async fn bind_to_real_network_interface() {
.ip();
let port = testing::unused_port_on_localhost();

let local_net_config = Config::new((local_addr, port).into());
let cfg =
Config::default().with_network_config(network::Config::new((local_addr, port).into()));

let mut net = TestingNetwork::<TestReactor>::new();
net.add_node_with_config(local_net_config, &mut rng)
.await
.unwrap();
net.add_node_with_config(cfg, &mut rng).await.unwrap();

// The network should be fully connected.
let timeout = Duration::from_secs(2);
Expand Down Expand Up @@ -452,17 +455,16 @@ async fn check_varying_size_network_connects() {

// Pick a random port in the higher ranges that is likely to be unused.
let first_node_port = testing::unused_port_on_localhost();
let cfg = Config::default().with_network_config(
network::Config::default_local_net_first_node(first_node_port),
);

let _ = net
.add_node_with_config(
Config::default_local_net_first_node(first_node_port),
&mut rng,
)
.await
.unwrap();
let _ = net.add_node_with_config(cfg, &mut rng).await.unwrap();
let cfg = Config::default()
.with_network_config(network::Config::default_local_net(first_node_port));

for _ in 1..number_of_nodes {
net.add_node_with_config(Config::default_local_net(first_node_port), &mut rng)
net.add_node_with_config(cfg.clone(), &mut rng)
.await
.unwrap();
}
Expand Down Expand Up @@ -506,16 +508,17 @@ async fn ensure_peers_metric_is_correct() {
// Pick a random port in the higher ranges that is likely to be unused.
let first_node_port = testing::unused_port_on_localhost();

let _ = net
.add_node_with_config(
Config::default_local_net_first_node(first_node_port),
&mut rng,
)
.await
.unwrap();
let cfg = Config::default().with_network_config(
network::Config::default_local_net_first_node(first_node_port),
);

let _ = net.add_node_with_config(cfg, &mut rng).await.unwrap();

let cfg = Config::default()
.with_network_config(network::Config::default_local_net(first_node_port));

for _ in 1..number_of_nodes {
net.add_node_with_config(Config::default_local_net(first_node_port), &mut rng)
net.add_node_with_config(cfg.clone(), &mut rng)
.await
.unwrap();
}
Expand Down
3 changes: 3 additions & 0 deletions node/src/reactor/main_reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1130,13 +1130,16 @@ impl reactor::Reactor for MainReactor {
registry,
)?;

let allow_handshake = config.node.sync_handling != SyncHandling::Isolated;

let network = Network::new(
config.network.clone(),
network_identity,
Some((our_secret_key, our_public_key)),
registry,
chainspec.as_ref(),
validator_matrix.clone(),
allow_handshake,
)?;

let address_gossiper = Gossiper::<{ GossipedAddress::ID_IS_COMPLETE_ITEM }, _>::new(
Expand Down
7 changes: 7 additions & 0 deletions node/src/reactor/main_reactor/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,11 @@ impl Config {
chainspec.transaction_config.max_timestamp_leeway;
}
}

/// Set network config.
#[cfg(test)]
pub(crate) fn with_network_config(mut self, network_config: NetworkConfig) -> Self {
self.network = network_config;
self
}
}
11 changes: 11 additions & 0 deletions node/src/reactor/main_reactor/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ impl MainReactor {
match self.initialize_next_component(effect_builder) {
Some(effects) => (initialization_logic_default_delay.into(), effects),
None => {
if self.sync_handling.is_isolated() {
// If node is "isolated" it doesn't care about peers
if let Err(msg) = self.refresh_contract_runtime() {
return (
Duration::ZERO,
fatal!(effect_builder, "{}", msg).ignore(),
);
}
self.state = ReactorState::KeepUp;
return (Duration::ZERO, Effects::new());
}
if false == self.net.has_sufficient_fully_connected_peers() {
info!("Initialize: awaiting sufficient fully-connected peers");
return (initialization_logic_default_delay.into(), Effects::new());
Expand Down
28 changes: 18 additions & 10 deletions node/src/reactor/main_reactor/keep_up.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,16 +236,24 @@ impl MainReactor {
) -> Option<KeepUpInstruction> {
match sync_instruction {
SyncInstruction::Leap { .. } | SyncInstruction::LeapIntervalElapsed { .. } => {
// the block accumulator is unsure what our block position is relative to the
// network and wants to check peers for their notion of current tip.
// to do this, we switch back to CatchUp which will engage the necessary
// machinery to poll the network via the SyncLeap mechanic. if it turns out
// we are actually at or near tip after all, we simply switch back to KeepUp
// and continue onward. the accumulator is designed to periodically do this
// if we've received no gossip about new blocks from peers within an interval.
// this is to protect against partitioning and is not problematic behavior
// when / if it occurs.
Some(KeepUpInstruction::CatchUp)
if !self.sync_handling.is_isolated() {
// the block accumulator is unsure what our block position is relative to the
// network and wants to check peers for their notion of current tip.
// to do this, we switch back to CatchUp which will engage the necessary
// machinery to poll the network via the SyncLeap mechanic. if it turns out
// we are actually at or near tip after all, we simply switch back to KeepUp
// and continue onward. the accumulator is designed to periodically do this
// if we've received no gossip about new blocks from peers within an interval.
// this is to protect against partitioning and is not problematic behavior
// when / if it occurs.
Some(KeepUpInstruction::CatchUp)
} else {
// If the node operates in isolated mode the assumption is that it might not
// have any peers. So going back to CatchUp to query their
// notion of tip might effectively disable nodes components to respond.
// That's why - for isolated mode - we bypass this mechanism.
None
}
}
SyncInstruction::BlockSync { block_hash } => {
debug!("KeepUp: BlockSync: {:?}", block_hash);
Expand Down
Loading

0 comments on commit ffb8064

Please sign in to comment.