Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Network: enable DHT server mode only while consensus is established #3038

Open
wants to merge 1 commit into
base: albatross
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 31 additions & 20 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use nimiq_blockchain_proxy::BlockchainProxy;
#[cfg(feature = "full")]
use nimiq_blockchain_proxy::BlockchainReadProxy;
use nimiq_hash::Blake2bHash;
use nimiq_network_interface::{network::Network, request::request_handler};
use nimiq_network_interface::{
network::{DhtMode, Network},
request::request_handler,
};
use nimiq_time::{interval, Interval};
use nimiq_utils::{spawn, WakerExt};
use nimiq_zkp_component::zkp_component::ZKPComponentProxy;
Expand Down Expand Up @@ -391,8 +394,7 @@ impl<N: Network> Consensus<N> {
if self.is_established() {
if self.num_agents() < self.min_peers {
warn!("Lost consensus!");
self.established_flag.swap(false, Ordering::Release);
return Some(ConsensusEvent::Lost);
return Some(self.on_consensus_lost());
}
// Check if validity window availability changed.
if let (_, Some(event)) = self.check_validity_window() {
Expand All @@ -408,19 +410,12 @@ impl<N: Network> Consensus<N> {
if self.num_agents() >= self.min_peers && self.sync.state_complete() {
if self.sync.accepted_block_announcements() >= Self::MIN_BLOCKS_ESTABLISHED {
info!("Consensus established, number of accepted announcements satisfied.");
self.established_flag.swap(true, Ordering::Release);

// Also stop any other checks.
self.head_requests = None;
self.head_requests_time = None;

self.zkp_proxy
.request_zkp_from_peers(self.sync.peers(), false);

let (synced_validity_window, _) = self.check_validity_window();
return Some(ConsensusEvent::Established {
synced_validity_window,
});
return Some(self.on_consensus_established());
} else {
// The head state check is carried out immediately after we reach the minimum
// number of peers and then after certain time intervals until consensus is reached.
Expand All @@ -430,15 +425,7 @@ impl<N: Network> Consensus<N> {
// We would like that 2/3 of our peers have a known state.
if head_request.num_known_blocks >= 2 * head_request.num_unknown_blocks {
info!("Consensus established, 2/3 of heads known.");
self.established_flag.swap(true, Ordering::Release);

self.zkp_proxy
.request_zkp_from_peers(self.sync.peers(), false);

let (synced_validity_window, _) = self.check_validity_window();
return Some(ConsensusEvent::Established {
synced_validity_window,
});
return Some(self.on_consensus_established());
}
}

Expand All @@ -450,6 +437,30 @@ impl<N: Network> Consensus<N> {
None
}

fn on_consensus_established(&mut self) -> ConsensusEvent {
self.established_flag.swap(true, Ordering::Release);

self.zkp_proxy
.request_zkp_from_peers(self.sync.peers(), false);

let network = Arc::clone(&self.network);
spawn(async move { network.dht_set_mode(DhtMode::Server).await });

let (synced_validity_window, _) = self.check_validity_window();
ConsensusEvent::Established {
synced_validity_window,
}
}

fn on_consensus_lost(&mut self) -> ConsensusEvent {
self.established_flag.swap(false, Ordering::Release);

let network = Arc::clone(&self.network);
spawn(async move { network.dht_set_mode(DhtMode::Client).await });

ConsensusEvent::Lost
}

/// Requests heads from connected peers.
fn request_heads(&mut self) {
// Wait for an ongoing head request to finish.
Expand Down
9 changes: 9 additions & 0 deletions network-interface/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ pub trait RequestResponse {
type Response: Serialize + Deserialize + Sync;
}

#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum DhtMode {
Client,
Server,
}

#[async_trait]
pub trait Network: Send + Sync + Unpin + 'static {
type PeerId: Copy + Debug + Display + Ord + Hash + Send + Sync + Unpin + 'static;
Expand Down Expand Up @@ -170,6 +176,9 @@ pub trait Network: Send + Sync + Unpin + 'static {
where
T: Topic + Sync;

/// Sets the current operation mode (client/server) for the DHT protocol
async fn dht_set_mode(&self, mode: DhtMode);

/// Gets a value from the distributed hash table
async fn dht_get<K, V, T>(&self, k: &K) -> Result<Option<V>, Self::Error>
where
Expand Down
5 changes: 5 additions & 0 deletions network-libp2p/src/autonat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ pub(crate) struct NatState {
}

impl NatState {
/// Gets the current NAT status of the local peer
pub fn get_status(&self) -> NatStatus {
self.status
}

/// Adds an address to track its NAT status
pub fn add_address(&mut self, address: Multiaddr) {
self.address_status.insert(address, NatStatus::Unknown);
Expand Down
4 changes: 4 additions & 0 deletions network-libp2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ impl Behaviour {
#[cfg(feature = "kad")]
if force_dht_server_mode {
dht.set_mode(Some(kad::Mode::Server));
} else {
// Force the DHT mode to Client initially, as we only want to allow Server mode once
// consensus is established.
dht.set_mode(Some(kad::Mode::Client));
}

// Discovery behaviour
Expand Down
15 changes: 13 additions & 2 deletions network-libp2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use libp2p::{
};
use nimiq_network_interface::{
network::{
CloseReason, MsgAcceptance, Network as NetworkInterface, NetworkEvent, SubscribeEvents,
Topic,
CloseReason, DhtMode, MsgAcceptance, Network as NetworkInterface, NetworkEvent,
SubscribeEvents, Topic,
},
peer_info::{PeerInfo, Services},
request::{
Expand Down Expand Up @@ -625,6 +625,17 @@ impl NetworkInterface for Network {
.expect("Failed to send reported message validation result: receiver hung up");
}

async fn dht_set_mode(&self, mode: DhtMode) {
if let Err(error) = self
.action_tx
.clone()
.send(NetworkAction::DhtSetMode { mode })
.await
{
error!(?mode, %error, "could not send dht_set_mode action to channel");
};
}

async fn dht_get<K, V, T>(&self, k: &K) -> Result<Option<V>, Self::Error>
where
K: AsRef<[u8]> + Send + Sync,
Expand Down
5 changes: 4 additions & 1 deletion network-libp2p/src/network_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use libp2p::{
};
use nimiq_keys::KeyPair;
use nimiq_network_interface::{
network::{CloseReason, MsgAcceptance, PubsubId, Topic},
network::{CloseReason, DhtMode, MsgAcceptance, PubsubId, Topic},
peer_info::Services,
request::{RequestError, RequestType},
};
Expand All @@ -39,6 +39,9 @@ pub(crate) enum NetworkAction {
address: Multiaddr,
output: oneshot::Sender<Result<(), NetworkError>>,
},
DhtSetMode {
mode: DhtMode,
},
DhtGet {
key: Vec<u8>,
output: oneshot::Sender<Result<Vec<u8>, NetworkError>>,
Expand Down
48 changes: 40 additions & 8 deletions network-libp2p/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use libp2p::{
use libp2p::{dns, tcp, websocket};
use log::Instrument;
use nimiq_network_interface::{
network::{CloseReason, NetworkEvent},
network::{CloseReason, DhtMode, NetworkEvent},
peer_info::PeerInfo,
request::{peek_type, InboundRequestError, OutboundRequestError, RequestError},
};
Expand Down Expand Up @@ -170,7 +170,7 @@ pub(crate) async fn swarm_task(
},
action = action_rx.recv() => {
if let Some(action) = action {
perform_action(action, &mut swarm, &mut task_state);
perform_action(action, &mut swarm, &mut task_state, &events_tx);
}
else {
// `action_rx.next()` will return `None` if all senders (i.e. the `Network` object) are dropped.
Expand Down Expand Up @@ -510,7 +510,9 @@ fn handle_dht_event(event: kad::Event, event_info: EventInfo) {
},
} => handle_dht_inbound_put(source, connection, record, event_info),

kad::Event::ModeChanged { new_mode } => handle_dht_mode_change(new_mode, event_info),
kad::Event::ModeChanged { new_mode } => {
handle_dht_mode_change(new_mode, event_info.state, event_info.events_tx)
}

_ => {}
}
Expand Down Expand Up @@ -697,12 +699,16 @@ fn handle_dht_inbound_put(
}

#[cfg(feature = "kad")]
fn handle_dht_mode_change(new_mode: Mode, event_info: EventInfo) {
fn handle_dht_mode_change(
new_mode: Mode,
state: &mut TaskState,
events_tx: &broadcast::Sender<NetworkEvent<PeerId>>,
) {
debug!(%new_mode, "DHT mode changed");
if new_mode == Mode::Server {
event_info.state.dht_server_mode = true;
if event_info.state.dht_bootstrap_state == DhtBootStrapState::Completed {
let _ = event_info.events_tx.send(NetworkEvent::DhtReady);
state.dht_server_mode = true;
if state.dht_bootstrap_state == DhtBootStrapState::Completed {
let _ = events_tx.send(NetworkEvent::DhtReady);
}
}
}
Expand Down Expand Up @@ -1025,7 +1031,12 @@ fn handle_request_response_inbound_failure(
error!(%request_id, %peer_id, %error, "Inbound request failed");
}

fn perform_action(action: NetworkAction, swarm: &mut NimiqSwarm, state: &mut TaskState) {
fn perform_action(
action: NetworkAction,
swarm: &mut NimiqSwarm,
state: &mut TaskState,
events_tx: &broadcast::Sender<NetworkEvent<PeerId>>,
) {
match action {
NetworkAction::Dial { peer_id, output } => {
let dial_opts = DialOpts::peer_id(peer_id)
Expand All @@ -1041,6 +1052,27 @@ fn perform_action(action: NetworkAction, swarm: &mut NimiqSwarm, state: &mut Tas
let result = swarm.dial(dial_opts).map_err(Into::into);
output.send(result).ok();
}
NetworkAction::DhtSetMode { mode } => {
#[cfg(feature = "kad")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feature annotations support { } to encapsulate a block of code:

#[cfg(feature = "kad")]
{
    ...
}

let mode = match mode {
DhtMode::Client => Some(Mode::Client),
DhtMode::Server => {
if state.nat_status.get_status() == NatStatus::Public {
Some(Mode::Server)
} else {
// Set to auto, such that it automatically switches to server mode in case
// the NAT status changes to public later on.
None
}
}
};
#[cfg(feature = "kad")]
swarm.behaviour_mut().dht.set_mode(mode);
#[cfg(feature = "kad")]
if let Some(mode) = mode {
handle_dht_mode_change(mode, state, events_tx);
}
}
NetworkAction::DhtGet { key, output } => {
#[cfg(feature = "kad")]
let query_id = swarm.behaviour_mut().dht.get_record(key.into());
Expand Down
7 changes: 6 additions & 1 deletion network-mock/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use async_trait::async_trait;
use futures::{stream::BoxStream, StreamExt};
use nimiq_network_interface::{
network::{
CloseReason, MsgAcceptance, Network, NetworkEvent, PubsubId, SubscribeEvents, Topic,
CloseReason, DhtMode, MsgAcceptance, Network, NetworkEvent, PubsubId, SubscribeEvents,
Topic,
},
peer_info::{PeerInfo, Services},
request::{
Expand Down Expand Up @@ -534,6 +535,10 @@ impl Network for MockNetwork {
// TODO implement
}

async fn dht_set_mode(&self, _mode: DhtMode) {
// TODO implement
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set an unimplemented!() here? Same for validate_message. Otherwise the caller has no idea this still needs an actual impl

}

async fn dht_get<K, V, T>(&self, k: &K) -> Result<Option<V>, Self::Error>
where
K: AsRef<[u8]> + Send + Sync,
Expand Down
Loading