From 80fce1a572b1163ffb023e2fb0161f88f827bdf8 Mon Sep 17 00:00:00 2001 From: Sebastian Date: Mon, 11 Nov 2024 19:41:19 -0600 Subject: [PATCH 01/20] Add ValidatorInfo to the PeerContacts. Do basic verification --- Cargo.lock | 1 + dht/Cargo.toml | 1 + dht/src/lib.rs | 132 +++++++++++------ network-libp2p/src/behaviour.rs | 11 +- .../src/connection_pool/behaviour.rs | 4 +- network-libp2p/src/dht.rs | 2 +- network-libp2p/src/discovery/behaviour.rs | 21 ++- network-libp2p/src/discovery/handler.rs | 106 ++++++++++---- network-libp2p/src/discovery/peer_contacts.rs | 136 ++++++++++++++++-- network-libp2p/src/network.rs | 12 +- network-libp2p/src/swarm.rs | 22 ++- network-libp2p/tests/discovery.rs | 8 +- network-libp2p/tests/network.rs | 39 +++-- 13 files changed, 379 insertions(+), 116 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7de36f845f..d52d64865b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4032,6 +4032,7 @@ dependencies = [ "nimiq-blockchain-proxy", "nimiq-keys", "nimiq-log", + "nimiq-network-interface", "nimiq-network-libp2p", "nimiq-serde", "nimiq-utils", diff --git a/dht/Cargo.toml b/dht/Cargo.toml index ddfd26382b..b63b60ee96 100644 --- a/dht/Cargo.toml +++ b/dht/Cargo.toml @@ -26,6 +26,7 @@ nimiq-blockchain-interface = { workspace = true } nimiq-blockchain-proxy = { workspace = true, features = ["full"] } nimiq-keys = { workspace = true } nimiq-log = { workspace = true, optional = true } +nimiq-network-interface = { workspace = true } nimiq-network-libp2p = { workspace = true } nimiq-serde = { workspace = true } nimiq-utils = { workspace = true } diff --git a/dht/src/lib.rs b/dht/src/lib.rs index afcc7deb30..d7198d16cc 100644 --- a/dht/src/lib.rs +++ b/dht/src/lib.rs @@ -1,9 +1,11 @@ use nimiq_blockchain_proxy::BlockchainProxy; use nimiq_keys::{Address, KeyPair}; +use nimiq_network_interface::network::Network as NetworkInterface; use nimiq_network_libp2p::{ dht::{DhtRecord, DhtVerifierError, Verifier as DhtVerifier}, + discovery::peer_contacts::{ValidatorInfoError, ValidatorRecordVerifier}, libp2p::kad::Record, - PeerId, + Network, PeerId, }; use nimiq_serde::Deserialize; use nimiq_utils::tagged_signing::{TaggedSignable, TaggedSigned}; @@ -17,42 +19,50 @@ impl Verifier { pub fn new(blockchain: BlockchainProxy) -> Self { Self { blockchain } } +} - fn verify_validator_record(&self, record: &Record) -> Result { - // Deserialize the value of the record, which is a ValidatorRecord. If it fails return an error. - let validator_record = - TaggedSigned::, KeyPair>::deserialize_from_vec(&record.value) - .map_err(DhtVerifierError::MalformedValue)?; - - // Make sure the peer who signed the record is also the one presented in the record. - if let Some(publisher) = record.publisher { - if validator_record.record.peer_id != publisher { - return Err(DhtVerifierError::PublisherMismatch( - publisher, - validator_record.record.peer_id, - )); - } - } else { - log::warn!("Validating a dht record without a publisher"); - return Err(DhtVerifierError::PublisherMissing); - } +impl ValidatorRecordVerifier for Verifier { + fn verify_validator_record( + &self, + signed_record: &TaggedSigned< + ValidatorRecord<::PeerId>, + KeyPair, + >, + ) -> Result<(), ValidatorInfoError> { + // // Deserialize the value of the record, which is a ValidatorRecord. If it fails return an error. + // let validator_record = + // TaggedSigned::, KeyPair>::deserialize_from_vec(&record.record.value) + // .map_err(DhtVerifierError::MalformedValue)?; - // Deserialize the key of the record which is an Address. If it fails return an error. - let validator_address = Address::deserialize_from_vec(record.key.as_ref()) - .map_err(DhtVerifierError::MalformedKey)?; + // // Make sure the peer who signed the record is also the one presented in the record. + // if let Some(publisher) = record.publisher { + // if validator_record.record.peer_id != publisher { + // return Err(DhtVerifierError::PublisherMismatch( + // publisher, + // validator_record.record.peer_id, + // )); + // } + // } else { + // log::warn!("Validating a dht record without a publisher"); + // return Err(DhtVerifierError::PublisherMissing); + // } + + // // Deserialize the key of the record which is an Address. If it fails return an error. + // let validator_address = Address::deserialize_from_vec(record.key.as_ref()) + // .map_err(DhtVerifierError::MalformedKey)?; // Make sure the validator address used as key is identical to the one in the record. - if validator_record.record.validator_address != validator_address { - return Err(DhtVerifierError::AddressMismatch( - validator_address, - validator_record.record.validator_address, - )); - } + // if signed_record.record.validator_address != validator_address { + // return Err(DhtVerifierError::AddressMismatch( + // validator_address, + // validator_record.record.validator_address, + // )); + // } // Acquire blockchain read access. For now exclude Light clients. let blockchain = match self.blockchain { BlockchainProxy::Light(ref _light_blockchain) => { - return Err(DhtVerifierError::UnknownTag) + panic!("Light Blockchains cannot verify validator records.") } BlockchainProxy::Full(ref full_blockchain) => full_blockchain, }; @@ -61,27 +71,26 @@ impl Verifier { // Get the staking contract to retrieve the public key for verification. let staking_contract = blockchain_read .get_staking_contract_if_complete(None) - .ok_or(DhtVerifierError::StateIncomplete)?; + .ok_or(ValidatorInfoError::StateIncomplete)?; // Get the public key needed for verification. let data_store = blockchain_read.get_staking_contract_store(); let txn = blockchain_read.read_transaction(); let public_key = staking_contract - .get_validator(&data_store.read(&txn), &validator_address) - .ok_or(DhtVerifierError::UnknownValidator(validator_address))? + .get_validator( + &data_store.read(&txn), + &signed_record.record.validator_address, + ) + .ok_or(ValidatorInfoError::UnknownValidator( + signed_record.record.validator_address.clone(), + ))? .signing_key; // Verify the record. - validator_record + signed_record .verify(&public_key) - .then(|| { - DhtRecord::Validator( - record.publisher.unwrap(), - validator_record.record, - record.clone(), - ) - }) - .ok_or(DhtVerifierError::InvalidSignature) + .then(|| ()) + .ok_or(ValidatorInfoError::InvalidSignature) } } @@ -96,7 +105,46 @@ impl DhtVerifier for Verifier { // Depending on tag perform the verification. match tag { - ValidatorRecord::::TAG => self.verify_validator_record(record), + ValidatorRecord::::TAG => { + // Deserialize the value of the record, which is a ValidatorRecord. If it fails return an error. + let validator_record = + TaggedSigned::, KeyPair>::deserialize_from_vec( + &record.value, + ) + .map_err(DhtVerifierError::MalformedValue)?; + + // Make sure the peer who published the record is also the one signed into the record. + if record.publisher.ok_or(DhtVerifierError::MissingPublisher)? + != validator_record.record.peer_id + { + return Err(DhtVerifierError::PublisherMismatch( + record.publisher.unwrap(), + validator_record.record.peer_id, + )); + } + + // Deserialize the key of the record which is an Address. If it fails return an error. + let validator_address = Address::deserialize_from_vec(record.key.as_ref()) + .map_err(DhtVerifierError::MalformedKey)?; + + // Make sure the address used as key is also the one signed into the record. + if validator_address != validator_record.record.validator_address { + return Err(DhtVerifierError::AddressMismatch( + validator_address, + validator_record.record.validator_address, + )); + } + + self.verify_validator_record(&validator_record) + .map_err(DhtVerifierError::ValidatorInfoError) + .and_then(|_| { + Ok(DhtRecord::Validator( + validator_record.record.peer_id, + validator_record.record, + record.clone(), + )) + }) + } _ => { log::error!(tag, "DHT invalid record tag received"); Err(DhtVerifierError::UnknownTag) diff --git a/network-libp2p/src/behaviour.rs b/network-libp2p/src/behaviour.rs index ac26a382af..cef502500c 100644 --- a/network-libp2p/src/behaviour.rs +++ b/network-libp2p/src/behaviour.rs @@ -12,9 +12,11 @@ use parking_lot::RwLock; use rand::rngs::OsRng; use crate::{ - connection_pool, - connection_pool::behaviour::Config as PoolConfig, - discovery::{self, peer_contacts::PeerContactBook}, + connection_pool::{self, behaviour::Config as PoolConfig}, + discovery::{ + self, + peer_contacts::{PeerContactBook, ValidatorRecordVerifier}, + }, dispatch::codecs::MessageCodec, Config, }; @@ -50,6 +52,7 @@ impl Behaviour { contacts: Arc>, peer_score_params: gossipsub::PeerScoreParams, force_dht_server_mode: bool, + #[cfg(feature = "kad")] verifier: Arc, ) -> Self { let public_key = config.keypair.public(); let peer_id = public_key.to_peer_id(); @@ -68,6 +71,8 @@ impl Behaviour { config.discovery.clone(), config.keypair.clone(), Arc::clone(&contacts), + #[cfg(feature = "kad")] + verifier, ); // Gossipsub behaviour diff --git a/network-libp2p/src/connection_pool/behaviour.rs b/network-libp2p/src/connection_pool/behaviour.rs index 757bde0415..e95047dcf1 100644 --- a/network-libp2p/src/connection_pool/behaviour.rs +++ b/network-libp2p/src/connection_pool/behaviour.rs @@ -534,7 +534,7 @@ impl Behaviour { let own_peer_id = own_contact.peer_id(); contacts - .query(self.required_services) + .query(self.required_services, true) .filter_map(|contact| { let peer_id = contact.peer_id(); if peer_id != own_peer_id @@ -562,7 +562,7 @@ impl Behaviour { let own_peer_id = own_contact.peer_id(); contacts - .query(services) + .query(services, true) .filter_map(|contact| { let peer_id = contact.peer_id(); if peer_id != own_peer_id diff --git a/network-libp2p/src/dht.rs b/network-libp2p/src/dht.rs index 7502d190c1..a0fbe47307 100644 --- a/network-libp2p/src/dht.rs +++ b/network-libp2p/src/dht.rs @@ -5,7 +5,7 @@ use nimiq_serde::DeserializeError; use nimiq_validator_network::validator_record::ValidatorRecord; pub use crate::network_types::DhtRecord; -use crate::Network; +use crate::{discovery::peer_contacts::ValidatorInfoError, Network}; #[derive(Debug)] pub enum DhtVerifierError { diff --git a/network-libp2p/src/discovery/behaviour.rs b/network-libp2p/src/discovery/behaviour.rs index 4711f96389..8aaa2a3bad 100644 --- a/network-libp2p/src/discovery/behaviour.rs +++ b/network-libp2p/src/discovery/behaviour.rs @@ -11,7 +11,7 @@ use libp2p::{ identity::Keypair, swarm::{ behaviour::{ConnectionClosed, ConnectionEstablished}, - CloseConnection, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, ToSwarm, + ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, ToSwarm, }, Multiaddr, PeerId, }; @@ -22,7 +22,7 @@ use parking_lot::RwLock; use super::{ handler::{Handler, HandlerOutEvent}, - peer_contacts::{PeerContact, PeerContactBook}, + peer_contacts::{PeerContact, PeerContactBook, ValidatorRecordVerifier}, }; #[derive(Clone, Debug)] @@ -115,6 +115,10 @@ pub struct Behaviour { /// Timer to do house-keeping in the peer address book. house_keeping_timer: Interval, + + /// dht verifier TODO + #[cfg(feature = "kad")] + verifier: Arc, } impl Behaviour { @@ -122,6 +126,7 @@ impl Behaviour { config: Config, keypair: Keypair, peer_contact_book: Arc>, + #[cfg(feature = "kad")] verifier: Arc, ) -> Self { let house_keeping_timer = interval(config.house_keeping_interval); peer_contact_book.write().update_own_contact(&keypair); @@ -139,6 +144,8 @@ impl Behaviour { peer_contact_book, events, house_keeping_timer, + #[cfg(feature = "kad")] + verifier, } } @@ -177,6 +184,8 @@ impl NetworkBehaviour for Behaviour { self.keypair.clone(), self.peer_contact_book(), remote_addr.clone(), + #[cfg(feature = "kad")] + Arc::clone(&self.verifier), )) } @@ -194,6 +203,8 @@ impl NetworkBehaviour for Behaviour { self.keypair.clone(), self.peer_contact_book(), addr.clone(), + #[cfg(feature = "kad")] + Arc::clone(&self.verifier), )) } @@ -288,10 +299,8 @@ impl NetworkBehaviour for Behaviour { .push_back(ToSwarm::NewExternalAddrCandidate(observed_address)); } HandlerOutEvent::Update => self.events.push_back(ToSwarm::GenerateEvent(Event::Update)), - HandlerOutEvent::Error(_) => self.events.push_back(ToSwarm::CloseConnection { - peer_id, - connection: CloseConnection::All, - }), + // Errors must not result in a closed connection as light clients are unable to verify ValidatorRecord. + HandlerOutEvent::Error(error) => log::trace!(?error, "Received invalid contact"), } } } diff --git a/network-libp2p/src/discovery/handler.rs b/network-libp2p/src/discovery/handler.rs index ca2844d074..6b3bcbc67b 100644 --- a/network-libp2p/src/discovery/handler.rs +++ b/network-libp2p/src/discovery/handler.rs @@ -33,7 +33,10 @@ use thiserror::Error; use super::{ behaviour::Config, message_codec::{MessageReader, MessageWriter}, - peer_contacts::{PeerContactBook, SignedPeerContact}, + peer_contacts::{ + PeerContactBook, PeerContactError, SignedPeerContact, ValidatorInfoError, + ValidatorRecordVerifier, + }, protocol::{ChallengeNonce, DiscoveryMessage, DiscoveryProtocol}, }; use crate::{AUTONAT_DIAL_BACK_PROTOCOL, AUTONAT_DIAL_REQUEST_PROTOCOL}; @@ -134,6 +137,11 @@ pub struct Handler { /// The peer contact book peer_contact_book: Arc>, + /// Used to verify PeerContacts. + /// Required as contacts could contain a ValidatorInfo, for which a current verification key is required. + #[cfg(feature = "kad")] + verifier: Arc, + /// The peer address we're connected to (address that got us connected). peer_address: Multiaddr, @@ -179,6 +187,7 @@ impl Handler { keypair: Keypair, peer_contact_book: Arc>, peer_address: Multiaddr, + #[cfg(feature = "kad")] verifier: Arc, ) -> Self { if let Some(peer_contact) = peer_contact_book.write().get(&peer_id) { if let Some(outer_protocol_address) = outer_protocol_address(&peer_address) { @@ -202,6 +211,8 @@ impl Handler { inbound: None, outbound: None, waker: None, + #[cfg(feature = "kad")] + verifier, events: VecDeque::new(), } } @@ -232,7 +243,7 @@ impl Handler { let mut rng = thread_rng(); peer_contact_book - .query(self.services_filter) + .query(self.services_filter, false) .choose_multiple(&mut rng, limit) .into_iter() .map(|c| c.signed().clone()) @@ -287,6 +298,44 @@ pub(crate) fn outer_protocol_address(addr: &Multiaddr) -> Option { .unwrap_or(None) } +fn filter_contact( + timestamp: u64, + #[cfg(feature = "kad")] verifier: Arc, +) -> Box Option> { + Box::new(move |mut peer_contact: SignedPeerContact| { + match peer_contact.verify( + #[cfg(feature = "kad")] + Arc::clone(&verifier), + ) { + // Contacts with too many addresses or an invalid peer signature are rejected + // and removed from the collection + Err(PeerContactError::AdvertisedAddressesExceeded) + | Err(PeerContactError::InvalidSignature) => None, + // If there is a validator record, but the state is incomplete, + // then it cannot be verified and must be checked again at a later time. + Err(PeerContactError::ValidatorRecord(ValidatorInfoError::StateIncomplete)) => { + peer_contact.local_only = true; + Some(peer_contact) + } + // If there is a validator record but either the signature is invalid, or the validator is unknown, + // the head timestamp of the blockchain and the creation timestamp must be compared. + Err(PeerContactError::ValidatorRecord(ValidatorInfoError::InvalidSignature)) + | Err(PeerContactError::ValidatorRecord(ValidatorInfoError::UnknownValidator(_))) => { + // Set it to local only. Some contacts will be discarded by the next condition still. + peer_contact.local_only = true; + + // Retain only contacts which are in the future, as they may still verify then. + if timestamp < peer_contact.inner.timestamp() { + Some(peer_contact) + } else { + None + } + } + Ok(_) => Some(peer_contact), + } + }) +} + impl ConnectionHandler for Handler { type FromBehaviour = (); type ToBehaviour = HandlerOutEvent; @@ -526,7 +575,13 @@ impl ConnectionHandler for Handler { peer_contacts, } => { // Check the peer contact for a valid signature. - if !peer_contact.verify() { + let Some(peer_contact) = filter_contact( + 0, + #[cfg(feature = "kad")] + Arc::clone(&self.verifier), + )( + peer_contact.clone() + ) else { return Poll::Ready( ConnectionHandlerEvent::NotifyBehaviour( HandlerOutEvent::Error( @@ -536,7 +591,7 @@ impl ConnectionHandler for Handler { ), ), ); - } + }; if self.peer_id != peer_contact.peer_id() { return Poll::Ready( @@ -574,19 +629,14 @@ impl ConnectionHandler for Handler { ), ); } - for peer_contact in &peer_contacts { - if !peer_contact.verify() { - return Poll::Ready( - ConnectionHandlerEvent::NotifyBehaviour( - HandlerOutEvent::Error( - Error::InvalidPeerContactSignature { - peer_contact: peer_contact.clone(), - }, - ), - ), - ); - } - } + let peer_contacts: Vec = peer_contacts + .into_iter() + .filter_map(filter_contact( + 0, + #[cfg(feature = "kad")] + Arc::clone(&self.verifier), + )) + .collect(); let mut peer_contact_book = self.peer_contact_book.write(); @@ -685,19 +735,15 @@ impl ConnectionHandler for Handler { ), ); } - for peer_contact in &peer_contacts { - if !peer_contact.verify() { - return Poll::Ready( - ConnectionHandlerEvent::NotifyBehaviour( - HandlerOutEvent::Error( - Error::InvalidPeerContactSignature { - peer_contact: peer_contact.clone(), - }, - ), - ), - ); - } - } + + let peer_contacts: Vec = peer_contacts + .into_iter() + .filter_map(filter_contact( + 0, + #[cfg(feature = "kad")] + Arc::clone(&self.verifier), + )) + .collect(); // Insert the new peer contacts into the peer contact book. self.peer_contact_book.write().insert_all_filtered( diff --git a/network-libp2p/src/discovery/peer_contacts.rs b/network-libp2p/src/discovery/peer_contacts.rs index c77961e62b..212a0d3874 100644 --- a/network-libp2p/src/discovery/peer_contacts.rs +++ b/network-libp2p/src/discovery/peer_contacts.rs @@ -16,7 +16,7 @@ use nimiq_network_interface::{ network::Network as NetworkInterface, peer_info::{PeerInfo, Services}, }; -use nimiq_utils::tagged_signing::{TaggedKeyPair, TaggedSignable, TaggedSignature}; +use nimiq_utils::tagged_signing::{TaggedKeyPair, TaggedSignable, TaggedSignature, TaggedSigned}; use nimiq_validator_network::validator_record::ValidatorRecord; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; @@ -28,6 +28,10 @@ use crate::{utils, Network}; pub enum PeerContactError { #[error("Exceeded number of advertised addresses")] AdvertisedAddressesExceeded, + #[error("Validator Record failed to verify")] + ValidatorRecord(ValidatorInfoError), + #[error("Contact signature is invalid")] + InvalidSignature, } /// The validator info contains all information which is not present in a [PeerContact] @@ -43,10 +47,75 @@ pub struct ValidatorInfo { signature: TaggedSignature::PeerId>, KeyPair>, } +#[derive(Debug)] +pub enum ValidatorInfoError { + StateIncomplete, + InvalidSignature, + UnknownValidator(Address), +} + +impl ValidatorInfo { + pub fn new( + validator_address: Address, + signature: TaggedSignature::PeerId>, KeyPair>, + ) -> Self { + Self { + validator_address, + signature, + } + } + + pub fn verify( + &self, + timestamp: u64, + peer_id: PeerId, + verification_key: &::PublicKey, + ) -> Result<(), ValidatorInfoError> { + // Reconstruct the record + let record = ValidatorRecord { + validator_address: self.validator_address.clone(), + timestamp, + peer_id, + }; + + // Reconstruct the signed record. + let signed_record = TaggedSigned::new(record, self.signature.clone()); + + // Verify the record and return the result. + signed_record + .verify(verification_key) + .then_some(()) + .ok_or(ValidatorInfoError::InvalidSignature) + } +} + +pub trait ValidatorRecordVerifier: Send + Sync { + fn verify_validator_record( + &self, + signed_record: &TaggedSigned< + ValidatorRecord<::PeerId>, + KeyPair, + >, + ) -> Result<(), ValidatorInfoError>; +} + +impl ValidatorRecordVerifier for () { + fn verify_validator_record( + &self, + _signed_record: &TaggedSigned< + ValidatorRecord<::PeerId>, + KeyPair, + >, + ) -> Result<(), ValidatorInfoError> { + Ok(()) + } +} + /// A plain peer contact. This contains: /// /// - A set of multi-addresses for the peer. /// - The peer's public key. +/// - An optional [ValidatorInfo] in case this peer is a running a validator. /// - A bitmask of the services supported by this peer. /// - A timestamp when this contact information was generated. /// @@ -60,6 +129,9 @@ pub struct PeerContact { #[serde(with = "self::serde_public_key")] pub public_key: PublicKey, + /// Optional validator info in case the node is a validator. + pub validator_info: Option, + /// Services supported by this peer. pub services: Services, @@ -91,6 +163,7 @@ impl PeerContact { Ok(Self { addresses, public_key, + validator_info: None, services, validator_info: None, timestamp, @@ -99,7 +172,7 @@ impl PeerContact { /// Derives the peer ID from the public key pub fn peer_id(&self) -> PeerId { - self.public_key.clone().to_peer_id() + self.public_key.to_peer_id() } /// Returns the timestamp of the contact. It is generally set to the time the contact was created. @@ -123,6 +196,7 @@ impl PeerContact { SignedPeerContact { inner: self, signature, + local_only: false, } } @@ -160,10 +234,35 @@ impl PeerContact { /// Verifies whether the lengths of the advertised addresses are within /// the expected limits. This is helpful to verify a received peer contact. - pub fn verify(&self) -> Result<(), PeerContactError> { + pub fn verify( + &self, + #[cfg(feature = "kad")] verifier: Arc, + ) -> Result<(), PeerContactError> { if self.addresses.len() > Self::MAX_ADDRESSES { return Err(PeerContactError::AdvertisedAddressesExceeded); } + + // In case the Contact includes a ValidatorInfo it also needs to be verified. + #[cfg(feature = "kad")] + if let Some(ValidatorInfo { + validator_address, + signature, + }) = self.validator_info.clone() + { + // Reconstruct the record + let record = ValidatorRecord { + validator_address, + timestamp: self.timestamp, + peer_id: self.peer_id(), + }; + + let signed_record = TaggedSigned::new(record, signature); + + verifier + .verify_validator_record(&signed_record) + .map_err(PeerContactError::ValidatorRecord)?; + } + Ok(()) } } @@ -180,17 +279,30 @@ pub struct SignedPeerContact { /// The signature over the serialized peer contact. pub signature: TaggedSignature, + + #[serde(skip)] + pub local_only: bool, } impl SignedPeerContact { /// Verifies that the signature is valid for this peer contact and also does /// intrinsic verification on the inner PeerContact. - pub fn verify(&self) -> bool { - if self.inner.verify().is_err() { - return false; - }; - self.signature + pub fn verify( + &self, + #[cfg(feature = "kad")] verifier: Arc, + ) -> Result<(), PeerContactError> { + // The record signature must be verifid first. + if !self + .signature .tagged_verify(&self.inner, &self.inner.public_key) + { + return Err(PeerContactError::InvalidSignature); + } + + self.inner.verify( + #[cfg(feature = "kad")] + verifier, + ) } /// Gets the public key of this peer contact. @@ -509,11 +621,15 @@ impl PeerContactBook { /// Gets a set of peer contacts given a services filter. /// Every peer contact that matches such services will be returned. - pub fn query(&self, services: Services) -> impl Iterator> + '_ { + pub fn query( + &self, + services: Services, + include_local_only: bool, + ) -> impl Iterator> + '_ { // TODO: This is a naive implementation // TODO: Sort by score? self.peer_contacts.iter().filter_map(move |(_, contact)| { - if contact.matches(services) { + if !contact.contact.local_only || include_local_only && contact.matches(services) { Some(Arc::clone(contact)) } else { None diff --git a/network-libp2p/src/network.rs b/network-libp2p/src/network.rs index e60e8d471f..ec5f0b6c28 100644 --- a/network-libp2p/src/network.rs +++ b/network-libp2p/src/network.rs @@ -38,7 +38,7 @@ use tokio_stream::wrappers::{BroadcastStream, ReceiverStream}; use crate::network_metrics::NetworkMetrics; use crate::{ dht, - discovery::peer_contacts::PeerContactBook, + discovery::peer_contacts::{PeerContactBook, ValidatorRecordVerifier}, network_types::{GossipsubId, NetworkAction, ValidateMessage}, rate_limiting::RateLimitConfig, swarm::{new_swarm, swarm_task}, @@ -79,7 +79,7 @@ impl Network { /// pub async fn new( config: Config, - #[cfg(feature = "kad")] dht_verifier: impl dht::Verifier + 'static, + #[cfg(feature = "kad")] verifier: impl dht::Verifier + ValidatorRecordVerifier + 'static, ) -> Self { let required_services = config.required_services; // TODO: persist to disk @@ -100,11 +100,17 @@ impl Network { // In memory transport we don't have a mechanism that sets the DHT in server mode such as confirming an address // with Autonat. This is because Autonat v1 only works with IP addresses. let force_dht_server_mode = config.memory_transport; + + #[cfg(feature = "kad")] + let verifier = Arc::new(verifier); + let swarm = new_swarm( config, Arc::clone(&contacts), params.clone(), force_dht_server_mode, + #[cfg(feature = "kad")] + (Arc::clone(&verifier) as Arc), ); let local_peer_id = *Swarm::local_peer_id(&swarm); @@ -128,7 +134,7 @@ impl Network { update_scores, Arc::clone(&contacts), #[cfg(feature = "kad")] - dht_verifier, + (Arc::clone(&verifier) as Arc), force_dht_server_mode, dht_quorum, #[cfg(feature = "metrics")] diff --git a/network-libp2p/src/swarm.rs b/network-libp2p/src/swarm.rs index cb1646b627..70b770f388 100644 --- a/network-libp2p/src/swarm.rs +++ b/network-libp2p/src/swarm.rs @@ -45,7 +45,10 @@ use crate::network_metrics::NetworkMetrics; use crate::{ autonat::NatStatus, behaviour, dht, - discovery::{self, peer_contacts::PeerContactBook}, + discovery::{ + self, + peer_contacts::{PeerContactBook, ValidatorRecordVerifier}, + }, network_types::{ DhtBootStrapState, DhtRecord, DhtResults, GossipsubTopicInfo, NetworkAction, TaskState, ValidateMessage, @@ -63,7 +66,7 @@ struct EventInfo<'a> { connected_peers: &'a RwLock>, rate_limiting: &'a mut RateLimits, #[cfg(feature = "kad")] - dht_verifier: &'a dyn dht::Verifier, + dht_verifier: Arc, #[cfg(feature = "metrics")] metrics: &'a Arc, } @@ -73,6 +76,7 @@ pub(crate) fn new_swarm( contacts: Arc>, peer_score_params: gossipsub::PeerScoreParams, force_dht_server_mode: bool, + #[cfg(feature = "kad")] verifier: Arc, ) -> Swarm { let keypair = config.keypair.clone(); let transport = new_transport( @@ -83,8 +87,14 @@ pub(crate) fn new_swarm( ) .unwrap(); - let behaviour = - behaviour::Behaviour::new(config, contacts, peer_score_params, force_dht_server_mode); + let behaviour = behaviour::Behaviour::new( + config, + contacts, + peer_score_params, + force_dht_server_mode, + #[cfg(feature = "kad")] + verifier, + ); // TODO add proper config #[cfg(not(target_family = "wasm"))] @@ -114,7 +124,7 @@ pub(crate) async fn swarm_task( connected_peers: Arc>>, mut update_scores: Interval, contacts: Arc>, - #[cfg(feature = "kad")] dht_verifier: impl dht::Verifier, + #[cfg(feature = "kad")] dht_verifier: Arc, force_dht_server_mode: bool, dht_quorum: NonZeroU8, #[cfg(feature = "metrics")] metrics: Arc, @@ -162,7 +172,7 @@ pub(crate) async fn swarm_task( connected_peers: &connected_peers, rate_limiting: &mut rate_limiting, #[cfg(feature = "kad")] - dht_verifier: &dht_verifier, + dht_verifier: Arc::clone(&dht_verifier), #[cfg( feature = "metrics")] metrics: &metrics, }, ); diff --git a/network-libp2p/tests/discovery.rs b/network-libp2p/tests/discovery.rs index 900dbdce01..8d22baa570 100644 --- a/network-libp2p/tests/discovery.rs +++ b/network-libp2p/tests/discovery.rs @@ -82,8 +82,12 @@ impl TestNode { true, ))); - let behaviour = - discovery::Behaviour::new(config, keypair.clone(), Arc::clone(&peer_contact_book)); + let behaviour = discovery::Behaviour::new( + config, + keypair.clone(), + Arc::clone(&peer_contact_book), + Arc::new(()), + ); let mut swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() diff --git a/network-libp2p/tests/network.rs b/network-libp2p/tests/network.rs index 8fec7df3ef..2a85dc7342 100644 --- a/network-libp2p/tests/network.rs +++ b/network-libp2p/tests/network.rs @@ -15,7 +15,10 @@ use nimiq_network_interface::{ }; use nimiq_network_libp2p::{ dht, - discovery::{self, peer_contacts::PeerContact}, + discovery::{ + self, + peer_contacts::{PeerContact, ValidatorInfoError, ValidatorRecordVerifier}, + }, Config, Network, }; use nimiq_serde::{Deserialize, Serialize}; @@ -203,6 +206,26 @@ impl Verifier { } } +impl ValidatorRecordVerifier for Verifier { + fn verify_validator_record( + &self, + signed_record: &TaggedSigned< + ValidatorRecord<::PeerId>, + KeyPair, + >, + ) -> Result<(), ValidatorInfoError> { + let keys = self.keys.read(); + let public_key = keys.get(&signed_record.record.validator_address).ok_or( + ValidatorInfoError::UnknownValidator(signed_record.record.validator_address.clone()), + )?; + + signed_record + .verify(&public_key) + .then(|| ()) + .ok_or(ValidatorInfoError::InvalidSignature) + } +} + impl dht::Verifier for Verifier { fn verify( &self, @@ -228,21 +251,15 @@ impl dht::Verifier for Verifier { let validator_address = Address::deserialize_from_vec(record.key.as_ref()) .map_err(dht::DhtVerifierError::MalformedKey)?; - let keys = self.keys.read(); - let public_key = keys - .get(&validator_address) - .ok_or(dht::DhtVerifierError::UnknownValidator(validator_address))?; - - validator_record - .verify(public_key) - .then(|| { + self.verify_validator_record(&validator_record) + .map(|_| { dht::DhtRecord::Validator( - record.publisher.unwrap(), + validator_record.record.peer_id, validator_record.record, record.clone(), ) }) - .ok_or(dht::DhtVerifierError::InvalidSignature) + .map_err(dht::DhtVerifierError::ValidatorInfoError) } } From a07c03b074cf9a8342e11c64ca68b92f75676197 Mon Sep 17 00:00:00 2001 From: ii-cruz Date: Mon, 25 Nov 2024 01:25:48 -0600 Subject: [PATCH 02/20] Compile fix --- dht/src/lib.rs | 2 +- network-libp2p/src/dht.rs | 4 +++- network-libp2p/src/discovery/peer_contacts.rs | 6 +----- network-libp2p/tests/network.rs | 2 +- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/dht/src/lib.rs b/dht/src/lib.rs index d7198d16cc..5ffcc2185b 100644 --- a/dht/src/lib.rs +++ b/dht/src/lib.rs @@ -136,7 +136,7 @@ impl DhtVerifier for Verifier { } self.verify_validator_record(&validator_record) - .map_err(DhtVerifierError::ValidatorInfoError) + .map_err(|_| DhtVerifierError::ValidatorInfoError) .and_then(|_| { Ok(DhtRecord::Validator( validator_record.record.peer_id, diff --git a/network-libp2p/src/dht.rs b/network-libp2p/src/dht.rs index a0fbe47307..a6590193f6 100644 --- a/network-libp2p/src/dht.rs +++ b/network-libp2p/src/dht.rs @@ -5,7 +5,7 @@ use nimiq_serde::DeserializeError; use nimiq_validator_network::validator_record::ValidatorRecord; pub use crate::network_types::DhtRecord; -use crate::{discovery::peer_contacts::ValidatorInfoError, Network}; +use crate::Network; #[derive(Debug)] pub enum DhtVerifierError { @@ -22,6 +22,8 @@ pub enum DhtVerifierError { ), StateIncomplete, InvalidSignature, + ValidatorInfoError, + MissingPublisher, } pub trait Verifier: Send + Sync { diff --git a/network-libp2p/src/discovery/peer_contacts.rs b/network-libp2p/src/discovery/peer_contacts.rs index 212a0d3874..091f07eb8c 100644 --- a/network-libp2p/src/discovery/peer_contacts.rs +++ b/network-libp2p/src/discovery/peer_contacts.rs @@ -129,9 +129,6 @@ pub struct PeerContact { #[serde(with = "self::serde_public_key")] pub public_key: PublicKey, - /// Optional validator info in case the node is a validator. - pub validator_info: Option, - /// Services supported by this peer. pub services: Services, @@ -165,7 +162,6 @@ impl PeerContact { public_key, validator_info: None, services, - validator_info: None, timestamp, }) } @@ -291,7 +287,7 @@ impl SignedPeerContact { &self, #[cfg(feature = "kad")] verifier: Arc, ) -> Result<(), PeerContactError> { - // The record signature must be verifid first. + // The record signature must be verified first. if !self .signature .tagged_verify(&self.inner, &self.inner.public_key) diff --git a/network-libp2p/tests/network.rs b/network-libp2p/tests/network.rs index 2a85dc7342..9ed586f355 100644 --- a/network-libp2p/tests/network.rs +++ b/network-libp2p/tests/network.rs @@ -259,7 +259,7 @@ impl dht::Verifier for Verifier { record.clone(), ) }) - .map_err(dht::DhtVerifierError::ValidatorInfoError) + .map_err(|_| dht::DhtVerifierError::ValidatorInfoError) } } From 98e0d1fa8a10262512a06b4b46b002531fa98422 Mon Sep 17 00:00:00 2001 From: viquezclaudio Date: Mon, 25 Nov 2024 02:56:13 -0600 Subject: [PATCH 03/20] Add/remove peer ids from validator addresses --- network-libp2p/src/discovery/peer_contacts.rs | 60 ++++++++++++++++--- 1 file changed, 52 insertions(+), 8 deletions(-) diff --git a/network-libp2p/src/discovery/peer_contacts.rs b/network-libp2p/src/discovery/peer_contacts.rs index 091f07eb8c..aa0e7ee595 100644 --- a/network-libp2p/src/discovery/peer_contacts.rs +++ b/network-libp2p/src/discovery/peer_contacts.rs @@ -1,5 +1,5 @@ use std::{ - collections::{HashMap, HashSet}, + collections::{hash_map::Entry, HashMap, HashSet}, sync::Arc, time::Duration, }; @@ -429,6 +429,8 @@ pub struct PeerContactBook { /// Contact information for other peers in the network indexed by their /// peer ID. peer_contacts: HashMap>, + /// Reverse map when we + validator_peer_ids: HashMap>, /// Only return secure websocket addresses. /// With this flag non secure websocket addresses will be stored (to still have a valid signature of the peer contact) /// but won't be returned when calling `get_addresses` @@ -458,9 +460,19 @@ impl PeerContactBook { only_secure_addresses, allow_loopback_addresses, memory_transport, + validator_peer_ids: HashMap::new(), } } + /// Obtain a list of peer ids associated to the given validator address + pub fn get_validator_peer_ids(&self, validator_address: &Address) -> Vec { + let Some(peer_ids) = self.validator_peer_ids.get(validator_address) else { + return vec![]; + }; + + peer_ids.iter().cloned().collect() + } + /// Insert a peer contact or update an existing one pub fn insert(&mut self, contact: SignedPeerContact) { // Don't insert our own contact into our peer contacts @@ -468,17 +480,25 @@ impl PeerContactBook { return; } + let peer_id = contact.peer_id(); + log::debug!(peer_id = %contact.peer_id(), addresses = ?contact.inner.addresses, "Adding peer contact"); let current_ts = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_secs(); + if let Some(validator) = &contact.inner.validator_info { + self.validator_peer_ids + .entry(validator.validator_address.clone()) + .or_insert(HashSet::new()) + .insert(peer_id); + } + let info = PeerContactInfo::from(contact); - let peer_id = info.peer_id; match self.peer_contacts.entry(peer_id) { - std::collections::hash_map::Entry::Occupied(mut entry) => { + Entry::Occupied(mut entry) => { let entry_value = entry.get_mut(); // Only update the contact if the timestamp is greater than the entry we have for this peer // and if the timestamp of the peer contact is not in the future @@ -488,7 +508,7 @@ impl PeerContactBook { *entry_value = Arc::new(info); } } - std::collections::hash_map::Entry::Vacant(entry) => { + Entry::Vacant(entry) => { entry.insert(Arc::new(info)); } } @@ -539,6 +559,13 @@ impl PeerContactBook { .peer_contacts .insert(info.peer_id, Arc::clone(&info)) .is_none(); + if let Some(validator_info) = &info.contact.inner.validator_info { + self.validator_peer_ids + .entry(validator_info.validator_address.clone()) + .or_insert(HashSet::new()) + .insert(info.peer_id); + } + if is_new { log::trace!( peer_id = %info.peer_id, @@ -701,16 +728,33 @@ impl PeerContactBook { unix_time, ) { debug!(%peer_id, "Removing peer contact because of old age"); - Some(peer_id) + Some(( + peer_id.clone(), + peer_contact.contact.inner.validator_info.clone(), + )) } else { None } }) - .cloned() - .collect::>(); + .collect::)>>(); - for peer_id in delete_peers { + for (peer_id, validator_info) in delete_peers { self.peer_contacts.remove(&peer_id); + if let Some(validator_info) = validator_info { + match self + .validator_peer_ids + .entry(validator_info.validator_address.clone()) + { + Entry::Occupied(mut entry) => { + entry.get_mut().remove(&peer_id); + + if entry.get_mut().is_empty() { + entry.remove(); + } + } + Entry::Vacant(_) => {} + } + } } } } From ab3fc0983377211c4267bef6ce285bab4f9bce49 Mon Sep 17 00:00:00 2001 From: styppo Date: Mon, 25 Nov 2024 09:21:02 +0000 Subject: [PATCH 04/20] Network: prepare network interface for validator discovery --- Cargo.lock | 2 +- dht/src/lib.rs | 5 ++-- network-interface/Cargo.toml | 1 + network-interface/src/lib.rs | 1 + network-interface/src/network.rs | 26 +++++++++++++++++-- .../src/validator_record.rs | 0 network-libp2p/Cargo.toml | 1 - network-libp2p/src/dht.rs | 5 ++-- network-libp2p/src/discovery/peer_contacts.rs | 2 +- network-libp2p/src/network.rs | 13 ++++++++++ network-libp2p/src/network_types.rs | 2 +- network-libp2p/tests/network.rs | 2 +- network-mock/Cargo.toml | 1 + network-mock/src/network.rs | 16 +++++++++++- validator-network/src/lib.rs | 1 - validator-network/src/network_impl.rs | 2 +- 16 files changed, 66 insertions(+), 14 deletions(-) rename {validator-network => network-interface}/src/validator_record.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index d52d64865b..a61e2ce6c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4493,6 +4493,7 @@ dependencies = [ "bitflags 2.8.0", "futures-util", "multiaddr", + "nimiq-keys", "nimiq-serde", "nimiq-utils", "serde", @@ -4523,7 +4524,6 @@ dependencies = [ "nimiq-test-utils", "nimiq-time", "nimiq-utils", - "nimiq-validator-network", "parking_lot", "pin-project", "pin-project-lite", diff --git a/dht/src/lib.rs b/dht/src/lib.rs index 5ffcc2185b..1b065081e5 100644 --- a/dht/src/lib.rs +++ b/dht/src/lib.rs @@ -1,6 +1,8 @@ use nimiq_blockchain_proxy::BlockchainProxy; use nimiq_keys::{Address, KeyPair}; -use nimiq_network_interface::network::Network as NetworkInterface; +use nimiq_network_interface::{ + network::Network as NetworkInterface, validator_record::ValidatorRecord, +}; use nimiq_network_libp2p::{ dht::{DhtRecord, DhtVerifierError, Verifier as DhtVerifier}, discovery::peer_contacts::{ValidatorInfoError, ValidatorRecordVerifier}, @@ -9,7 +11,6 @@ use nimiq_network_libp2p::{ }; use nimiq_serde::Deserialize; use nimiq_utils::tagged_signing::{TaggedSignable, TaggedSigned}; -use nimiq_validator_network::validator_record::ValidatorRecord; pub struct Verifier { blockchain: BlockchainProxy, diff --git a/network-interface/Cargo.toml b/network-interface/Cargo.toml index 56456dd06e..e59f188587 100644 --- a/network-interface/Cargo.toml +++ b/network-interface/Cargo.toml @@ -30,5 +30,6 @@ thiserror = "2.0" tokio = { version = "1.43", features = ["rt"] } tokio-stream = { version = "0.1", features = ["default", "sync"] } +nimiq-keys = { workspace = true } nimiq-serde = { workspace = true } nimiq-utils = { workspace = true, features = ["tagged-signing"] } diff --git a/network-interface/src/lib.rs b/network-interface/src/lib.rs index a001071467..52d403177e 100644 --- a/network-interface/src/lib.rs +++ b/network-interface/src/lib.rs @@ -1,5 +1,6 @@ pub mod network; pub mod peer_info; pub mod request; +pub mod validator_record; pub use multiaddr::{multiaddr, Multiaddr, Protocol}; diff --git a/network-interface/src/network.rs b/network-interface/src/network.rs index 6c852e86b6..04af848a65 100644 --- a/network-interface/src/network.rs +++ b/network-interface/src/network.rs @@ -6,14 +6,16 @@ use std::{ use async_trait::async_trait; use futures::stream::BoxStream; +use nimiq_keys::{Address, KeyPair}; use nimiq_serde::{Deserialize, DeserializeError, Serialize}; -use nimiq_utils::tagged_signing::{TaggedKeyPair, TaggedSignable}; +use nimiq_utils::tagged_signing::{TaggedKeyPair, TaggedSignable, TaggedSigned}; use thiserror::Error; use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use crate::{ peer_info::*, request::{Message, Request, RequestError}, + validator_record::ValidatorRecord, }; /// Network events that the network will report when subscribing @@ -87,7 +89,17 @@ pub enum SendError { #[async_trait] pub trait Network: Send + Sync + Unpin + 'static { - type PeerId: Copy + Debug + Display + Ord + Hash + Send + Sync + Unpin + 'static; + type PeerId: Copy + + Debug + + Display + + Ord + + Hash + + Send + + Sync + + Unpin + + Serialize + + Deserialize + + 'static; type AddressType: Debug + Display + 'static; type Error: std::error::Error; type PubsubId: PubsubId + Send + Sync + Unpin; @@ -113,6 +125,10 @@ pub trait Network: Send + Sync + Unpin + 'static { min_peers: usize, ) -> Result, Self::Error>; + /// Returns all peer ids that are known for the given validator. + /// The returned list might contain unverified mappings. + fn get_peers_by_validator(&self, validator_address: Address) -> Vec; + /// Returns true when the given peer provides the services flags that are required by us fn peer_provides_required_services(&self, peer_id: Self::PeerId) -> bool; @@ -218,4 +234,10 @@ pub trait Network: Send + Sync + Unpin + 'static { request_id: Self::RequestId, response: Req::Response, ) -> Result<(), Self::Error>; + + /// Registers a callback to produce a signed ValidatorRecord from a given peer_id and timestamp. + fn register_validator_signing_callback( + &self, + callback: impl Fn(Self::PeerId, u64) -> TaggedSigned, KeyPair>, + ); } diff --git a/validator-network/src/validator_record.rs b/network-interface/src/validator_record.rs similarity index 100% rename from validator-network/src/validator_record.rs rename to network-interface/src/validator_record.rs diff --git a/network-libp2p/Cargo.toml b/network-libp2p/Cargo.toml index a43461ea59..c91ebceefe 100644 --- a/network-libp2p/Cargo.toml +++ b/network-libp2p/Cargo.toml @@ -52,7 +52,6 @@ nimiq-utils = { workspace = true, features = [ "libp2p", "time", ] } -nimiq-validator-network = { workspace = true } [target.'cfg(not(target_family = "wasm"))'.dependencies] libp2p = { version = "0.54", default-features = false, features = [ diff --git a/network-libp2p/src/dht.rs b/network-libp2p/src/dht.rs index a6590193f6..3e529db829 100644 --- a/network-libp2p/src/dht.rs +++ b/network-libp2p/src/dht.rs @@ -1,8 +1,9 @@ use libp2p::{kad::Record, PeerId}; use nimiq_keys::Address; -use nimiq_network_interface::network::Network as NetworkInterface; +use nimiq_network_interface::{ + network::Network as NetworkInterface, validator_record::ValidatorRecord, +}; use nimiq_serde::DeserializeError; -use nimiq_validator_network::validator_record::ValidatorRecord; pub use crate::network_types::DhtRecord; use crate::Network; diff --git a/network-libp2p/src/discovery/peer_contacts.rs b/network-libp2p/src/discovery/peer_contacts.rs index aa0e7ee595..da37868e71 100644 --- a/network-libp2p/src/discovery/peer_contacts.rs +++ b/network-libp2p/src/discovery/peer_contacts.rs @@ -15,9 +15,9 @@ use nimiq_keys::{Address, KeyPair}; use nimiq_network_interface::{ network::Network as NetworkInterface, peer_info::{PeerInfo, Services}, + validator_record::ValidatorRecord, }; use nimiq_utils::tagged_signing::{TaggedKeyPair, TaggedSignable, TaggedSignature, TaggedSigned}; -use nimiq_validator_network::validator_record::ValidatorRecord; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use thiserror::Error; diff --git a/network-libp2p/src/network.rs b/network-libp2p/src/network.rs index ec5f0b6c28..3fc9d12e08 100644 --- a/network-libp2p/src/network.rs +++ b/network-libp2p/src/network.rs @@ -13,6 +13,7 @@ use futures::{future::BoxFuture, ready, stream::BoxStream, Stream, StreamExt}; use libp2p::{ gossipsub, request_response::InboundRequestId, swarm::NetworkInfo, Multiaddr, PeerId, Swarm, }; +use nimiq_keys::{Address, KeyPair}; use nimiq_network_interface::{ network::{ CloseReason, MsgAcceptance, Network as NetworkInterface, NetworkEvent, SubscribeEvents, @@ -23,6 +24,7 @@ use nimiq_network_interface::{ InboundRequestError, Message, OutboundRequestError, Request, RequestCommon, RequestError, RequestSerialize, RequestType, }, + validator_record::ValidatorRecord, }; use nimiq_serde::{Deserialize, Serialize}; use nimiq_time::{interval, timeout}; @@ -528,6 +530,10 @@ impl NetworkInterface for Network { Ok(filtered_peers) } + fn get_peers_by_validator(&self, validator_address: Address) -> Vec { + todo!() + } + fn peer_provides_required_services(&self, peer_id: PeerId) -> bool { if let Some(peer_info) = self.connected_peers.read().get(&peer_id) { peer_info.get_services().contains(self.required_services) @@ -749,4 +755,11 @@ impl NetworkInterface for Network { output_rx.await? } + + fn register_validator_signing_callback( + &self, + callback: impl Fn(Self::PeerId, u64) -> TaggedSigned, KeyPair>, + ) { + todo!() + } } diff --git a/network-libp2p/src/network_types.rs b/network-libp2p/src/network_types.rs index 0579b1c8bf..ce66771a23 100644 --- a/network-libp2p/src/network_types.rs +++ b/network-libp2p/src/network_types.rs @@ -15,10 +15,10 @@ use nimiq_network_interface::{ network::{CloseReason, MsgAcceptance, PubsubId, Topic}, peer_info::Services, request::{RequestError, RequestType}, + validator_record::ValidatorRecord, }; use nimiq_serde::{Deserialize, DeserializeError}; use nimiq_utils::tagged_signing::{TaggedSignable, TaggedSigned}; -use nimiq_validator_network::validator_record::ValidatorRecord; use thiserror::Error; use tokio::sync::{mpsc, oneshot}; diff --git a/network-libp2p/tests/network.rs b/network-libp2p/tests/network.rs index 9ed586f355..8d805bf264 100644 --- a/network-libp2p/tests/network.rs +++ b/network-libp2p/tests/network.rs @@ -12,6 +12,7 @@ use nimiq_keys::{Address, KeyPair}; use nimiq_network_interface::{ network::{CloseReason, MsgAcceptance, Network as NetworkInterface, NetworkEvent, Topic}, peer_info::Services, + validator_record::ValidatorRecord, }; use nimiq_network_libp2p::{ dht, @@ -30,7 +31,6 @@ use nimiq_utils::{ spawn, tagged_signing::{TaggedKeyPair, TaggedSignable, TaggedSigned}, }; -use nimiq_validator_network::validator_record::ValidatorRecord; use parking_lot::RwLock; use rand::{thread_rng, Rng}; diff --git a/network-mock/Cargo.toml b/network-mock/Cargo.toml index b372e240b5..eeef99bd39 100644 --- a/network-mock/Cargo.toml +++ b/network-mock/Cargo.toml @@ -33,6 +33,7 @@ tokio = { version = "1.43", features = [ ] } tokio-stream = "0.1" +nimiq-keys = { workspace = true } nimiq-network-interface = { workspace = true } nimiq-serde = { workspace = true } nimiq-time = { workspace = true } diff --git a/network-mock/src/network.rs b/network-mock/src/network.rs index aebb148ec3..bcf0cbf815 100644 --- a/network-mock/src/network.rs +++ b/network-mock/src/network.rs @@ -8,6 +8,7 @@ use std::{ use async_trait::async_trait; use futures::{stream::BoxStream, StreamExt}; +use nimiq_keys::{Address, KeyPair}; use nimiq_network_interface::{ network::{ CloseReason, MsgAcceptance, Network, NetworkEvent, PubsubId, SubscribeEvents, Topic, @@ -17,10 +18,11 @@ use nimiq_network_interface::{ InboundRequestError, Message, OutboundRequestError, Request, RequestCommon, RequestError, RequestKind, RequestSerialize, RequestType, }, + validator_record::ValidatorRecord, }; use nimiq_serde::{Deserialize, DeserializeError, Serialize}; use nimiq_time::timeout; -use nimiq_utils::tagged_signing::{TaggedKeyPair, TaggedSignable}; +use nimiq_utils::tagged_signing::{TaggedKeyPair, TaggedSignable, TaggedSigned}; use parking_lot::{Mutex, RwLock}; use thiserror::Error; use tokio::sync::{broadcast, mpsc, oneshot}; @@ -652,4 +654,16 @@ impl Network for MockNetwork { ) -> Result, MockNetworkError> { Ok(self.get_peers()) } + + fn get_peers_by_validator(&self, _validator_address: Address) -> Vec { + // TODO + vec![] + } + + fn register_validator_signing_callback( + &self, + _callback: impl Fn(Self::PeerId, u64) -> TaggedSigned, KeyPair>, + ) { + // TODO + } } diff --git a/validator-network/src/lib.rs b/validator-network/src/lib.rs index 58a5e035bc..c737c5338c 100644 --- a/validator-network/src/lib.rs +++ b/validator-network/src/lib.rs @@ -1,7 +1,6 @@ pub mod error; pub mod network_impl; pub mod single_response_requester; -pub mod validator_record; use async_trait::async_trait; use futures::stream::BoxStream; diff --git a/validator-network/src/network_impl.rs b/validator-network/src/network_impl.rs index d71cc3635c..0fb0954d51 100644 --- a/validator-network/src/network_impl.rs +++ b/validator-network/src/network_impl.rs @@ -7,6 +7,7 @@ use nimiq_keys::{Address, KeyPair}; use nimiq_network_interface::{ network::{CloseReason, MsgAcceptance, Network, SubscribeEvents, Topic}, request::{InboundRequestError, Message, Request, RequestCommon, RequestError}, + validator_record::ValidatorRecord, }; use nimiq_primitives::slots_allocation::{Validator, Validators}; use nimiq_serde::{Deserialize, Serialize}; @@ -15,7 +16,6 @@ use parking_lot::RwLock; use time::OffsetDateTime; use super::{MessageStream, NetworkError, PubsubId, ValidatorNetwork}; -use crate::validator_record::ValidatorRecord; /// Validator `PeerId` cache state #[derive(Clone, Copy)] From 63403bfe002f276afb976e738603ad0d23f976e2 Mon Sep 17 00:00:00 2001 From: Sebastian Date: Mon, 25 Nov 2024 04:15:10 -0600 Subject: [PATCH 05/20] Populate validator_info --- network-interface/src/network.rs | 5 +- network-libp2p/src/discovery/peer_contacts.rs | 53 ++++++++++++++++--- network-libp2p/src/network.rs | 9 +++- 3 files changed, 57 insertions(+), 10 deletions(-) diff --git a/network-interface/src/network.rs b/network-interface/src/network.rs index 04af848a65..7914d47249 100644 --- a/network-interface/src/network.rs +++ b/network-interface/src/network.rs @@ -238,6 +238,9 @@ pub trait Network: Send + Sync + Unpin + 'static { /// Registers a callback to produce a signed ValidatorRecord from a given peer_id and timestamp. fn register_validator_signing_callback( &self, - callback: impl Fn(Self::PeerId, u64) -> TaggedSigned, KeyPair>, + callback: impl Fn(Self::PeerId, u64) -> TaggedSigned, KeyPair> + + Send + + Sync + + 'static, ); } diff --git a/network-libp2p/src/discovery/peer_contacts.rs b/network-libp2p/src/discovery/peer_contacts.rs index da37868e71..00e2b41533 100644 --- a/network-libp2p/src/discovery/peer_contacts.rs +++ b/network-libp2p/src/discovery/peer_contacts.rs @@ -1,5 +1,6 @@ use std::{ collections::{hash_map::Entry, HashMap, HashSet}, + fmt::Debug, sync::Arc, time::Duration, }; @@ -11,7 +12,7 @@ use libp2p::{ multiaddr::Protocol, Multiaddr, PeerId, }; -use nimiq_keys::{Address, KeyPair}; +use nimiq_keys::{Address, KeyPair as SchnorrKey}; use nimiq_network_interface::{ network::Network as NetworkInterface, peer_info::{PeerInfo, Services}, @@ -44,7 +45,7 @@ pub struct ValidatorInfo { /// The signature for the [ValidatorRecord]. /// It does _not_ verify for this structure, but only once the [nimiq_utils::tagged_signing::TaggedSigned] is reconstructed /// with the given information of this struct and the corresponding [PeerContact]. - signature: TaggedSignature::PeerId>, KeyPair>, + signature: TaggedSignature::PeerId>, SchnorrKey>, } #[derive(Debug)] @@ -57,7 +58,10 @@ pub enum ValidatorInfoError { impl ValidatorInfo { pub fn new( validator_address: Address, - signature: TaggedSignature::PeerId>, KeyPair>, + signature: TaggedSignature< + ValidatorRecord<::PeerId>, + SchnorrKey, + >, ) -> Self { Self { validator_address, @@ -69,7 +73,7 @@ impl ValidatorInfo { &self, timestamp: u64, peer_id: PeerId, - verification_key: &::PublicKey, + verification_key: &::PublicKey, ) -> Result<(), ValidatorInfoError> { // Reconstruct the record let record = ValidatorRecord { @@ -94,7 +98,7 @@ pub trait ValidatorRecordVerifier: Send + Sync { &self, signed_record: &TaggedSigned< ValidatorRecord<::PeerId>, - KeyPair, + SchnorrKey, >, ) -> Result<(), ValidatorInfoError>; } @@ -104,7 +108,7 @@ impl ValidatorRecordVerifier for () { &self, _signed_record: &TaggedSigned< ValidatorRecord<::PeerId>, - KeyPair, + SchnorrKey, >, ) -> Result<(), ValidatorInfoError> { Ok(()) @@ -420,7 +424,6 @@ impl PeerContactInfo { /// Main structure that holds the peer information that has been obtained or /// discovered by the discovery protocol. -#[derive(Debug)] pub struct PeerContactBook { /// Contact information for our own. own_peer_contact: PeerContactInfo, @@ -439,6 +442,24 @@ pub struct PeerContactBook { allow_loopback_addresses: bool, /// Flag to indicate whether to support memory transport addresses memory_transport: bool, + /// Validator signing callback: + validator_record_signing: Option< + Box TaggedSigned, SchnorrKey> + Send + Sync>, + >, +} + +impl Debug for PeerContactBook { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PeerContactBook") + .field("own_peer_contact", &self.own_peer_contact) + .field("own_peer_id", &self.own_peer_id) + .field("peer_contacts", &self.peer_contacts) + .field("validator_peer_ids", &self.validator_peer_ids) + .field("only_secure_addresses", &self.only_secure_addresses) + .field("allow_loopback_addresses", &self.allow_loopback_addresses) + .field("memory_transport", &self.memory_transport) + .finish() + } } impl PeerContactBook { @@ -461,6 +482,7 @@ impl PeerContactBook { allow_loopback_addresses, memory_transport, validator_peer_ids: HashMap::new(), + validator_record_signing: None, } } @@ -706,6 +728,13 @@ impl PeerContactBook { // Update timestamp contact.set_current_time(); + contact.validator_info = self.validator_record_signing.as_ref().and_then(|callback| { + let tagged_signed = (callback)(contact.peer_id(), contact.timestamp); + Some(ValidatorInfo { + validator_address: tagged_signed.record.validator_address.clone(), + signature: tagged_signed.signature.clone(), + }) + }); self.own_peer_contact = PeerContactInfo::from(contact.sign(keypair)); } @@ -815,6 +844,16 @@ impl PeerContactBook { } } } + + pub fn register_validator_signing_callback( + &mut self, + callback: impl Fn(PeerId, u64) -> TaggedSigned, SchnorrKey> + + Send + + Sync + + 'static, + ) { + self.validator_record_signing = Some(Box::new(callback)); + } } mod serde_public_key { diff --git a/network-libp2p/src/network.rs b/network-libp2p/src/network.rs index 3fc9d12e08..4c534366bf 100644 --- a/network-libp2p/src/network.rs +++ b/network-libp2p/src/network.rs @@ -758,8 +758,13 @@ impl NetworkInterface for Network { fn register_validator_signing_callback( &self, - callback: impl Fn(Self::PeerId, u64) -> TaggedSigned, KeyPair>, + callback: impl Fn(Self::PeerId, u64) -> TaggedSigned, KeyPair> + + Send + + Sync + + 'static, ) { - todo!() + self.contacts + .write() + .register_validator_signing_callback(callback) } } From 20dc107935e03d2a5851b2e970bad63d3ac73e31 Mon Sep 17 00:00:00 2001 From: styppo Date: Mon, 25 Nov 2024 10:20:47 +0000 Subject: [PATCH 06/20] ValidatorNetwork: resolve validator peer ids from peer contact book --- handel/tests/mod.rs | 4 +- network-interface/src/network.rs | 2 +- network-interface/src/request/mod.rs | 2 +- network-libp2p/src/network.rs | 6 ++- network-mock/src/network.rs | 2 +- validator-network/src/network_impl.rs | 55 ++++++++++++++++++++------- validator/tests/mock.rs | 2 +- 7 files changed, 52 insertions(+), 21 deletions(-) diff --git a/handel/tests/mod.rs b/handel/tests/mod.rs index e0caefabc1..c2f5cf0fd5 100644 --- a/handel/tests/mod.rs +++ b/handel/tests/mod.rs @@ -135,7 +135,7 @@ impl std::fmt::Debug for Protocol { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] #[serde(bound = "C: AggregatableContribution")] struct SerializableLevelUpdate { aggregate: C, @@ -164,7 +164,7 @@ impl From> for SerializableLevelUpda } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] #[serde(bound = "C: AggregatableContribution")] struct Update(pub SerializableLevelUpdate); diff --git a/network-interface/src/network.rs b/network-interface/src/network.rs index 7914d47249..571db251a4 100644 --- a/network-interface/src/network.rs +++ b/network-interface/src/network.rs @@ -127,7 +127,7 @@ pub trait Network: Send + Sync + Unpin + 'static { /// Returns all peer ids that are known for the given validator. /// The returned list might contain unverified mappings. - fn get_peers_by_validator(&self, validator_address: Address) -> Vec; + fn get_peers_by_validator(&self, validator_address: &Address) -> Vec; /// Returns true when the given peer provides the services flags that are required by us fn peer_provides_required_services(&self, peer_id: Self::PeerId) -> bool; diff --git a/network-interface/src/request/mod.rs b/network-interface/src/request/mod.rs index f911ee15a4..98ca1c70b7 100644 --- a/network-interface/src/request/mod.rs +++ b/network-interface/src/request/mod.rs @@ -129,7 +129,7 @@ impl RequestKind for MessageMarker { } pub trait RequestCommon: - Serialize + Deserialize + Send + Sync + Unpin + fmt::Debug + 'static + Serialize + Deserialize + Send + Sync + Unpin + fmt::Debug + Clone + 'static { type Kind: RequestKind; const TYPE_ID: u16; diff --git a/network-libp2p/src/network.rs b/network-libp2p/src/network.rs index 4c534366bf..0cf5f94e83 100644 --- a/network-libp2p/src/network.rs +++ b/network-libp2p/src/network.rs @@ -530,8 +530,10 @@ impl NetworkInterface for Network { Ok(filtered_peers) } - fn get_peers_by_validator(&self, validator_address: Address) -> Vec { - todo!() + fn get_peers_by_validator(&self, validator_address: &Address) -> Vec { + self.contacts + .read() + .get_validator_peer_ids(validator_address) } fn peer_provides_required_services(&self, peer_id: PeerId) -> bool { diff --git a/network-mock/src/network.rs b/network-mock/src/network.rs index bcf0cbf815..b6d3721b1b 100644 --- a/network-mock/src/network.rs +++ b/network-mock/src/network.rs @@ -655,7 +655,7 @@ impl Network for MockNetwork { Ok(self.get_peers()) } - fn get_peers_by_validator(&self, _validator_address: Address) -> Vec { + fn get_peers_by_validator(&self, _validator_address: &Address) -> Vec { // TODO vec![] } diff --git a/validator-network/src/network_impl.rs b/validator-network/src/network_impl.rs index 0fb0954d51..a9ef4ed04e 100644 --- a/validator-network/src/network_impl.rs +++ b/validator-network/src/network_impl.rs @@ -11,11 +11,12 @@ use nimiq_network_interface::{ }; use nimiq_primitives::slots_allocation::{Validator, Validators}; use nimiq_serde::{Deserialize, Serialize}; -use nimiq_utils::spawn; +use nimiq_utils::{spawn, stream::FuturesUnordered}; use parking_lot::RwLock; use time::OffsetDateTime; use super::{MessageStream, NetworkError, PubsubId, ValidatorNetwork}; +use crate::NetworkError::UnknownValidator; /// Validator `PeerId` cache state #[derive(Clone, Copy)] @@ -305,7 +306,7 @@ where /// /// This makes it easier for the recipient to check that the sender is indeed a /// currently elected validator. -#[derive(Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] struct ValidatorMessage { validator_id: u16, inner: M, @@ -380,25 +381,53 @@ where validator_id: self.local_validator_id()?, inner: msg, }; + // Use the last known peer ID, knowing that it might be already outdated. - // The network doesn't have a way to know if a record is outdated but we mark + // The network doesn't have a way to know if a record is outdated, but we mark // them as potentially outdated when a request/response error happens. // If the cache has a potentially outdated value, it will be updated soon // and then available to use by future calls to this function. - let peer_id = self + let cached_peer_id = self .get_validator_cache(validator_id) - .potentially_outdated_peer_id() - .ok_or_else(|| NetworkError::UnknownValidator(validator_id))?; - - self.network - .message(msg, peer_id) - .map_err(|e| { + .potentially_outdated_peer_id(); + if let Some(peer_id) = cached_peer_id { + if let Err(e) = self.network.message(msg.clone(), peer_id).await { // The validator peer id might have changed and thus caused a connection failure. self.clear_validator_peer_id_cache_on_error(validator_id, &e, &peer_id); + } else { + return Ok(()); + } + } - NetworkError::Request(e) - }) - .await + // Try all validator peer_ids from our peer contact book. + let our_address = { + let own_validator_id = self.own_validator_id.read(); + let Some(own_validator_id) = *own_validator_id else { + return Err(UnknownValidator(validator_id)); + }; + let validators = self.validators.read(); + let Some(validators) = validators.as_ref() else { + return Err(UnknownValidator(validator_id)); + }; + validators + .get_validator_by_slot_band(own_validator_id) + .address + .clone() + }; + + let mut futures = FuturesUnordered::new(); + for peer_id in self.network.get_peers_by_validator(&our_address) { + let network = Arc::clone(&self.network); + let msg = msg.clone(); + futures.push(async move { network.message(msg, peer_id).await }); + } + + let results = futures.collect::>>().await; + if results.iter().any(|result| result.is_ok()) { + Ok(()) + } else { + Err(UnknownValidator(validator_id)) + } } async fn request( diff --git a/validator/tests/mock.rs b/validator/tests/mock.rs index 23a5b8bd81..35055e0186 100644 --- a/validator/tests/mock.rs +++ b/validator/tests/mock.rs @@ -30,7 +30,7 @@ use nimiq_validator::aggregation::{ }; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] struct SkipBlockMessage(SerializableLevelUpdate); impl RequestCommon for SkipBlockMessage { From 802d32a5a31919df8b93abe24bdf88aa1f6e7b7e Mon Sep 17 00:00:00 2001 From: Sebastian Date: Mon, 25 Nov 2024 04:53:54 -0600 Subject: [PATCH 07/20] Validator: register callback for the validator record --- validator-network/src/lib.rs | 15 +++++++++++++++ validator-network/src/network_impl.rs | 17 ++++++++++++++++- validator/src/validator.rs | 22 +++++++++++++++++++++- 3 files changed, 52 insertions(+), 2 deletions(-) diff --git a/validator-network/src/lib.rs b/validator-network/src/lib.rs index c737c5338c..0471dec57f 100644 --- a/validator-network/src/lib.rs +++ b/validator-network/src/lib.rs @@ -8,8 +8,10 @@ use nimiq_keys::{Address, KeyPair}; use nimiq_network_interface::{ network::{CloseReason, MsgAcceptance, Network, SubscribeEvents, Topic}, request::{Message, Request, RequestCommon}, + validator_record::ValidatorRecord, }; use nimiq_primitives::slots_allocation::Validators; +use nimiq_utils::tagged_signing::TaggedSigned; pub use crate::error::NetworkError; @@ -97,4 +99,17 @@ pub trait ValidatorNetwork: Send + Sync { /// Returns the network peer ID for the given `validator_id` if it is known. fn get_peer_id(&self, validator_id: u16) -> Option<::PeerId>; + + /// Registers a callback to produce a signed ValidatorRecord from a given peer_id and timestamp. + fn register_validator_signing_callback( + &self, + callback: impl Fn( + ::PeerId, + u64, + ) + -> TaggedSigned::PeerId>, KeyPair> + + Send + + Sync + + 'static, + ); } diff --git a/validator-network/src/network_impl.rs b/validator-network/src/network_impl.rs index a9ef4ed04e..aa20813787 100644 --- a/validator-network/src/network_impl.rs +++ b/validator-network/src/network_impl.rs @@ -11,7 +11,7 @@ use nimiq_network_interface::{ }; use nimiq_primitives::slots_allocation::{Validator, Validators}; use nimiq_serde::{Deserialize, Serialize}; -use nimiq_utils::{spawn, stream::FuturesUnordered}; +use nimiq_utils::{spawn, stream::FuturesUnordered, tagged_signing::TaggedSigned}; use parking_lot::RwLock; use time::OffsetDateTime; @@ -577,4 +577,19 @@ where self.get_validator_cache(validator_id) .potentially_outdated_peer_id() } + + /// Registers a callback to produce a signed ValidatorRecord from a given peer_id and timestamp. + fn register_validator_signing_callback( + &self, + callback: impl Fn( + ::PeerId, + u64, + ) + -> TaggedSigned::PeerId>, KeyPair> + + Send + + Sync + + 'static, + ) { + self.network.register_validator_signing_callback(callback) + } } diff --git a/validator/src/validator.rs b/validator/src/validator.rs index 4c007fd451..4bfe518404 100644 --- a/validator/src/validator.rs +++ b/validator/src/validator.rs @@ -32,10 +32,14 @@ use nimiq_mempool_task::MempoolTask; use nimiq_network_interface::{ network::{MsgAcceptance, Network, NetworkEvent, SubscribeEvents}, request::request_handler, + validator_record::ValidatorRecord, }; use nimiq_primitives::{coin::Coin, policy::Policy}; use nimiq_transaction_builder::TransactionBuilder; -use nimiq_utils::spawn; +use nimiq_utils::{ + spawn, + tagged_signing::{TaggedKeyPair, TaggedSignable, TaggedSigned}, +}; use nimiq_validator_network::{PubsubId, ValidatorNetwork}; use parking_lot::RwLock; #[cfg(feature = "metrics")] @@ -366,6 +370,22 @@ where // Inform the network about the current validator ID. self.network.set_validator_id(*self.slot_band.read()); + let key = self.signing_key(); + let validator_address = self.validator_address(); + + self.network + .register_validator_signing_callback(move |peer_id, timestamp| { + let record = ValidatorRecord { + timestamp, + peer_id, + validator_address: validator_address.clone(), + }; + + let signature = key.tagged_sign(&record); + + TaggedSigned::new(record, signature) + }); + // Set the elected validators of the current epoch in the network as well. self.network.set_validators(validators); From 51136f01f9a151528d8e258c03403fe8c8b0039b Mon Sep 17 00:00:00 2001 From: Sebastian Date: Mon, 25 Nov 2024 12:06:33 -0600 Subject: [PATCH 08/20] DhtVerifier: Remove unused and commented code --- dht/src/lib.rs | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/dht/src/lib.rs b/dht/src/lib.rs index 1b065081e5..92e65a53ba 100644 --- a/dht/src/lib.rs +++ b/dht/src/lib.rs @@ -30,36 +30,6 @@ impl ValidatorRecordVerifier for Verifier { KeyPair, >, ) -> Result<(), ValidatorInfoError> { - // // Deserialize the value of the record, which is a ValidatorRecord. If it fails return an error. - // let validator_record = - // TaggedSigned::, KeyPair>::deserialize_from_vec(&record.record.value) - // .map_err(DhtVerifierError::MalformedValue)?; - - // // Make sure the peer who signed the record is also the one presented in the record. - // if let Some(publisher) = record.publisher { - // if validator_record.record.peer_id != publisher { - // return Err(DhtVerifierError::PublisherMismatch( - // publisher, - // validator_record.record.peer_id, - // )); - // } - // } else { - // log::warn!("Validating a dht record without a publisher"); - // return Err(DhtVerifierError::PublisherMissing); - // } - - // // Deserialize the key of the record which is an Address. If it fails return an error. - // let validator_address = Address::deserialize_from_vec(record.key.as_ref()) - // .map_err(DhtVerifierError::MalformedKey)?; - - // Make sure the validator address used as key is identical to the one in the record. - // if signed_record.record.validator_address != validator_address { - // return Err(DhtVerifierError::AddressMismatch( - // validator_address, - // validator_record.record.validator_address, - // )); - // } - // Acquire blockchain read access. For now exclude Light clients. let blockchain = match self.blockchain { BlockchainProxy::Light(ref _light_blockchain) => { From 833bddaa857f90b86283e58883cf80f9d6ebb558 Mon Sep 17 00:00:00 2001 From: styppo Date: Mon, 25 Nov 2024 17:58:56 +0000 Subject: [PATCH 09/20] DHT: fix clippy warnings --- dht/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dht/src/lib.rs b/dht/src/lib.rs index 92e65a53ba..19df588d58 100644 --- a/dht/src/lib.rs +++ b/dht/src/lib.rs @@ -60,7 +60,7 @@ impl ValidatorRecordVerifier for Verifier { // Verify the record. signed_record .verify(&public_key) - .then(|| ()) + .then_some(()) .ok_or(ValidatorInfoError::InvalidSignature) } } @@ -108,12 +108,12 @@ impl DhtVerifier for Verifier { self.verify_validator_record(&validator_record) .map_err(|_| DhtVerifierError::ValidatorInfoError) - .and_then(|_| { - Ok(DhtRecord::Validator( + .map(|_| { + DhtRecord::Validator( validator_record.record.peer_id, validator_record.record, record.clone(), - )) + ) }) } _ => { From 8f7fe54b853279f8b9fbddba4c9953955b82c117 Mon Sep 17 00:00:00 2001 From: styppo Date: Mon, 25 Nov 2024 17:59:13 +0000 Subject: [PATCH 10/20] ValidatorNetwork: use correct validator_id when fetching peer ids from the contact book --- validator-network/src/network_impl.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/validator-network/src/network_impl.rs b/validator-network/src/network_impl.rs index aa20813787..bf2b7d0df9 100644 --- a/validator-network/src/network_impl.rs +++ b/validator-network/src/network_impl.rs @@ -400,23 +400,19 @@ where } // Try all validator peer_ids from our peer contact book. - let our_address = { - let own_validator_id = self.own_validator_id.read(); - let Some(own_validator_id) = *own_validator_id else { - return Err(UnknownValidator(validator_id)); - }; + let validator_address = { let validators = self.validators.read(); let Some(validators) = validators.as_ref() else { return Err(UnknownValidator(validator_id)); }; validators - .get_validator_by_slot_band(own_validator_id) + .get_validator_by_slot_band(validator_id) .address .clone() }; let mut futures = FuturesUnordered::new(); - for peer_id in self.network.get_peers_by_validator(&our_address) { + for peer_id in self.network.get_peers_by_validator(&validator_address) { let network = Arc::clone(&self.network); let msg = msg.clone(); futures.push(async move { network.message(msg, peer_id).await }); From c867f84affbf75b614c315fe1bfee00c4f53dca8 Mon Sep 17 00:00:00 2001 From: styppo Date: Mon, 25 Nov 2024 18:12:49 +0000 Subject: [PATCH 11/20] Network: fix filtering of `local_only` contacts --- network-libp2p/src/discovery/peer_contacts.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network-libp2p/src/discovery/peer_contacts.rs b/network-libp2p/src/discovery/peer_contacts.rs index 00e2b41533..c332d0b23b 100644 --- a/network-libp2p/src/discovery/peer_contacts.rs +++ b/network-libp2p/src/discovery/peer_contacts.rs @@ -674,7 +674,7 @@ impl PeerContactBook { // TODO: This is a naive implementation // TODO: Sort by score? self.peer_contacts.iter().filter_map(move |(_, contact)| { - if !contact.contact.local_only || include_local_only && contact.matches(services) { + if contact.matches(services) && (!contact.contact.local_only || include_local_only) { Some(Arc::clone(contact)) } else { None From 4586742e346c90dd82d3323018d9e15f41b6aac8 Mon Sep 17 00:00:00 2001 From: viquezclaudio Date: Mon, 25 Nov 2024 16:46:35 -0600 Subject: [PATCH 12/20] Fix clippy warnings --- network-libp2p/src/discovery/peer_contacts.rs | 9 +++------ validator/src/validator.rs | 2 +- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/network-libp2p/src/discovery/peer_contacts.rs b/network-libp2p/src/discovery/peer_contacts.rs index c332d0b23b..4348ec4240 100644 --- a/network-libp2p/src/discovery/peer_contacts.rs +++ b/network-libp2p/src/discovery/peer_contacts.rs @@ -513,7 +513,7 @@ impl PeerContactBook { if let Some(validator) = &contact.inner.validator_info { self.validator_peer_ids .entry(validator.validator_address.clone()) - .or_insert(HashSet::new()) + .or_default() .insert(peer_id); } @@ -584,7 +584,7 @@ impl PeerContactBook { if let Some(validator_info) = &info.contact.inner.validator_info { self.validator_peer_ids .entry(validator_info.validator_address.clone()) - .or_insert(HashSet::new()) + .or_default() .insert(info.peer_id); } @@ -757,10 +757,7 @@ impl PeerContactBook { unix_time, ) { debug!(%peer_id, "Removing peer contact because of old age"); - Some(( - peer_id.clone(), - peer_contact.contact.inner.validator_info.clone(), - )) + Some((*peer_id, peer_contact.contact.inner.validator_info.clone())) } else { None } diff --git a/validator/src/validator.rs b/validator/src/validator.rs index 4bfe518404..616153c330 100644 --- a/validator/src/validator.rs +++ b/validator/src/validator.rs @@ -38,7 +38,7 @@ use nimiq_primitives::{coin::Coin, policy::Policy}; use nimiq_transaction_builder::TransactionBuilder; use nimiq_utils::{ spawn, - tagged_signing::{TaggedKeyPair, TaggedSignable, TaggedSigned}, + tagged_signing::{TaggedKeyPair, TaggedSigned}, }; use nimiq_validator_network::{PubsubId, ValidatorNetwork}; use parking_lot::RwLock; From e592e1057e8149148613da29bd548621dad6410c Mon Sep 17 00:00:00 2001 From: ii-cruz Date: Mon, 25 Nov 2024 22:25:31 -0600 Subject: [PATCH 13/20] Fixing the feature compilation --- Cargo.lock | 1 + network-interface/Cargo.toml | 2 +- network-libp2p/Cargo.toml | 18 ++++++++++++++++-- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a61e2ce6c1..e091555fa1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4524,6 +4524,7 @@ dependencies = [ "nimiq-test-utils", "nimiq-time", "nimiq-utils", + "nimiq-validator-network", "parking_lot", "pin-project", "pin-project-lite", diff --git a/network-interface/Cargo.toml b/network-interface/Cargo.toml index e59f188587..4ad6363eb9 100644 --- a/network-interface/Cargo.toml +++ b/network-interface/Cargo.toml @@ -30,6 +30,6 @@ thiserror = "2.0" tokio = { version = "1.43", features = ["rt"] } tokio-stream = { version = "0.1", features = ["default", "sync"] } -nimiq-keys = { workspace = true } +nimiq-keys = { workspace = true, features = ["serde-derive"] } nimiq-serde = { workspace = true } nimiq-utils = { workspace = true, features = ["tagged-signing"] } diff --git a/network-libp2p/Cargo.toml b/network-libp2p/Cargo.toml index c91ebceefe..2e7de33952 100644 --- a/network-libp2p/Cargo.toml +++ b/network-libp2p/Cargo.toml @@ -51,7 +51,9 @@ nimiq-utils = { workspace = true, features = [ "tagged-signing", "libp2p", "time", + "spawn", ] } +nimiq-validator-network = { workspace = true } [target.'cfg(not(target_family = "wasm"))'.dependencies] libp2p = { version = "0.54", default-features = false, features = [ @@ -84,7 +86,14 @@ libp2p = { version = "0.54", default-features = false, features = [ [dev-dependencies] # In dev/testing we require more tokio features -tokio = { version = "1.43", features = ["macros", "rt", "rt-multi-thread", "test-util", "time", "tracing"] } +tokio = { version = "1.43", features = [ + "macros", + "rt", + "rt-multi-thread", + "test-util", + "time", + "tracing", +] } nimiq-test-log = { workspace = true } nimiq-test-utils = { workspace = true } @@ -92,4 +101,9 @@ nimiq-test-utils = { workspace = true } [features] kad = [] metrics = ["prometheus-client"] -tokio-websocket = ["libp2p/dns", "libp2p/tcp", "libp2p/tokio", "libp2p/websocket"] +tokio-websocket = [ + "libp2p/dns", + "libp2p/tcp", + "libp2p/tokio", + "libp2p/websocket", +] From 3058be2b1b3716b03179a3006c60166d25278f01 Mon Sep 17 00:00:00 2001 From: styppo Date: Tue, 26 Nov 2024 02:56:23 +0000 Subject: [PATCH 14/20] ValidatorNetwork: use contact book peer ids where required --- dht/src/lib.rs | 6 +- network-interface/src/network.rs | 3 +- network-libp2p/src/discovery/peer_contacts.rs | 20 ++-- network-libp2p/src/network.rs | 4 +- network-mock/src/network.rs | 5 +- validator-network/src/network_impl.rs | 92 ++++++++++++------- 6 files changed, 81 insertions(+), 49 deletions(-) diff --git a/dht/src/lib.rs b/dht/src/lib.rs index 19df588d58..c0084f2b84 100644 --- a/dht/src/lib.rs +++ b/dht/src/lib.rs @@ -32,10 +32,8 @@ impl ValidatorRecordVerifier for Verifier { ) -> Result<(), ValidatorInfoError> { // Acquire blockchain read access. For now exclude Light clients. let blockchain = match self.blockchain { - BlockchainProxy::Light(ref _light_blockchain) => { - panic!("Light Blockchains cannot verify validator records.") - } - BlockchainProxy::Full(ref full_blockchain) => full_blockchain, + BlockchainProxy::Full(ref blockchain) => blockchain, + BlockchainProxy::Light(_) => return Err(ValidatorInfoError::StateIncomplete), }; let blockchain_read = blockchain.read(); diff --git a/network-interface/src/network.rs b/network-interface/src/network.rs index 571db251a4..6e2954cd8c 100644 --- a/network-interface/src/network.rs +++ b/network-interface/src/network.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashSet, fmt::{Debug, Display}, hash::Hash, time::Duration, @@ -127,7 +128,7 @@ pub trait Network: Send + Sync + Unpin + 'static { /// Returns all peer ids that are known for the given validator. /// The returned list might contain unverified mappings. - fn get_peers_by_validator(&self, validator_address: &Address) -> Vec; + fn get_peers_by_validator(&self, validator_address: &Address) -> HashSet; /// Returns true when the given peer provides the services flags that are required by us fn peer_provides_required_services(&self, peer_id: Self::PeerId) -> bool; diff --git a/network-libp2p/src/discovery/peer_contacts.rs b/network-libp2p/src/discovery/peer_contacts.rs index 4348ec4240..c5203d77b2 100644 --- a/network-libp2p/src/discovery/peer_contacts.rs +++ b/network-libp2p/src/discovery/peer_contacts.rs @@ -487,12 +487,11 @@ impl PeerContactBook { } /// Obtain a list of peer ids associated to the given validator address - pub fn get_validator_peer_ids(&self, validator_address: &Address) -> Vec { - let Some(peer_ids) = self.validator_peer_ids.get(validator_address) else { - return vec![]; - }; - - peer_ids.iter().cloned().collect() + pub fn get_validator_peer_ids(&self, validator_address: &Address) -> HashSet { + self.validator_peer_ids + .get(validator_address) + .cloned() + .unwrap_or_default() } /// Insert a peer contact or update an existing one @@ -504,7 +503,6 @@ impl PeerContactBook { let peer_id = contact.peer_id(); - log::debug!(peer_id = %contact.peer_id(), addresses = ?contact.inner.addresses, "Adding peer contact"); let current_ts = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() @@ -531,6 +529,13 @@ impl PeerContactBook { } } Entry::Vacant(entry) => { + log::trace!( + peer_id = %info.peer_id, + services = ?info.services(), + addresses = ?info.contact.inner.addresses, + validator_address = ?info.contact.inner.validator_info.as_ref().map(|info| info.validator_address.clone()), + "Adding peer contact", + ); entry.insert(Arc::new(info)); } } @@ -593,6 +598,7 @@ impl PeerContactBook { peer_id = %info.peer_id, services = ?info.services(), addresses = ?info.contact.inner.addresses, + validator_address = ?info.contact.inner.validator_info.as_ref().map(|info| info.validator_address.clone()), "Adding peer contact", ); } diff --git a/network-libp2p/src/network.rs b/network-libp2p/src/network.rs index 0cf5f94e83..aa6299607a 100644 --- a/network-libp2p/src/network.rs +++ b/network-libp2p/src/network.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, future::Future, pin::Pin, sync::Arc, @@ -530,7 +530,7 @@ impl NetworkInterface for Network { Ok(filtered_peers) } - fn get_peers_by_validator(&self, validator_address: &Address) -> Vec { + fn get_peers_by_validator(&self, validator_address: &Address) -> HashSet { self.contacts .read() .get_validator_peer_ids(validator_address) diff --git a/network-mock/src/network.rs b/network-mock/src/network.rs index b6d3721b1b..6acf457a2e 100644 --- a/network-mock/src/network.rs +++ b/network-mock/src/network.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashSet, sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -655,9 +656,9 @@ impl Network for MockNetwork { Ok(self.get_peers()) } - fn get_peers_by_validator(&self, _validator_address: &Address) -> Vec { + fn get_peers_by_validator(&self, _validator_address: &Address) -> HashSet { // TODO - vec![] + HashSet::new() } fn register_validator_signing_callback( diff --git a/validator-network/src/network_impl.rs b/validator-network/src/network_impl.rs index bf2b7d0df9..2d5251ed8e 100644 --- a/validator-network/src/network_impl.rs +++ b/validator-network/src/network_impl.rs @@ -1,7 +1,13 @@ -use std::{collections::BTreeMap, error::Error, fmt::Debug, future, sync::Arc}; +use std::{ + collections::{BTreeMap, HashSet}, + error::Error, + fmt::Debug, + future, + sync::Arc, +}; use async_trait::async_trait; -use futures::{future::BoxFuture, stream::BoxStream, FutureExt, StreamExt, TryFutureExt}; +use futures::{future::BoxFuture, stream::BoxStream, FutureExt, StreamExt}; use log::warn; use nimiq_keys::{Address, KeyPair}; use nimiq_network_interface::{ @@ -299,6 +305,22 @@ where } } } + + /// Fetches all peer ids from the contact book for the given validator_id. + fn get_validator_peer_ids(&self, validator_id: u16) -> HashSet { + // Try all validator peer_ids from our peer contact book. + let validator_address = { + let validators = self.validators.read(); + let Some(validators) = validators.as_ref() else { + return HashSet::new(); + }; + validators + .get_validator_by_slot_band(validator_id) + .address + .clone() + }; + self.network.get_peers_by_validator(&validator_address) + } } /// Messages sent over the validator network get augmented with the sending @@ -400,19 +422,8 @@ where } // Try all validator peer_ids from our peer contact book. - let validator_address = { - let validators = self.validators.read(); - let Some(validators) = validators.as_ref() else { - return Err(UnknownValidator(validator_id)); - }; - validators - .get_validator_by_slot_band(validator_id) - .address - .clone() - }; - let mut futures = FuturesUnordered::new(); - for peer_id in self.network.get_peers_by_validator(&validator_address) { + for peer_id in self.get_validator_peer_ids(validator_id) { let network = Arc::clone(&self.network); let msg = msg.clone(); futures.push(async move { network.message(msg, peer_id).await }); @@ -438,19 +449,27 @@ where validator_id: self.local_validator_id()?, inner: request, }; + if let Some(peer_id) = self.get_validator_cache(validator_id).current_peer_id() { - self.network - .request(request, peer_id) - .map_err(|e| { - // The validator peer id might have changed and thus caused a connection failure. - self.clear_validator_peer_id_cache_on_error(validator_id, &e, &peer_id); - - NetworkError::Request(e) - }) - .await - } else { - Err(NetworkError::Unreachable) + match self.network.request(request.clone(), peer_id).await { + Ok(response) => return Ok(response), + Err(e) => self.clear_validator_peer_id_cache_on_error(validator_id, &e, &peer_id), + } } + + // Try all validator peer_ids from our peer contact book. + let mut futures = FuturesUnordered::new(); + for peer_id in self.get_validator_peer_ids(validator_id) { + let network = Arc::clone(&self.network); + let request = request.clone(); + futures.push(async move { network.request(request, peer_id).await }); + } + + futures + .filter_map(|result| future::ready(result.ok())) + .next() + .await + .ok_or(UnknownValidator(validator_id)) } fn receive(&self) -> MessageStream @@ -464,17 +483,20 @@ where .filter_map(move |(message, peer_id)| { let self_ = self_.arc_clone(); async move { - let validator_peer_id = self_.get_validator_cache(message.validator_id).potentially_outdated_peer_id(); + let cached_peer_id = self_.get_validator_cache(message.validator_id).potentially_outdated_peer_id(); + let peer_ids = self_.get_validator_peer_ids(message.validator_id); + // Check that each message actually comes from the peer that it // claims it comes from. Reject it otherwise. - if validator_peer_id + if cached_peer_id .as_ref() .map(|pid| *pid != peer_id) - .unwrap_or(true) + .unwrap_or(true) && !peer_ids.contains(&peer_id) { - warn!(%peer_id, ?validator_peer_id, claimed_validator_id = message.validator_id, "Dropping validator message"); + warn!(%peer_id, ?cached_peer_id, ?peer_ids, claimed_validator_id = message.validator_id, "Dropping validator message"); return None; } + Some((message.inner, message.validator_id)) } }), @@ -491,18 +513,22 @@ where .filter_map(move |(message, request_id, peer_id)| { let self_ = self_.arc_clone(); async move { - let validator_peer_id = self_.get_validator_cache(message.validator_id).potentially_outdated_peer_id(); + let cached_peer_id = self_.get_validator_cache(message.validator_id).potentially_outdated_peer_id(); + let peer_ids = self_.get_validator_peer_ids(message.validator_id); + // Check that each message actually comes from the peer that it // claims it comes from. Reject it otherwise. - if validator_peer_id + if cached_peer_id .as_ref() .map(|pid| *pid != peer_id) - .unwrap_or(true) + .unwrap_or(true) && !peer_ids.contains(&peer_id) { - warn!(%peer_id, ?validator_peer_id, claimed_validator_id = message.validator_id, "Dropping validator request"); + warn!(%peer_id, ?cached_peer_id, ?peer_ids, claimed_validator_id = message.validator_id, "Dropping validator message"); return None; } + Some((message.inner, request_id, message.validator_id)) + } }) .boxed() From 09e6a5d361cfa9762a0ffa31d9a1d7bfd199a35c Mon Sep 17 00:00:00 2001 From: styppo Date: Tue, 26 Nov 2024 02:57:25 +0000 Subject: [PATCH 15/20] Validator: register signing callback in constructor --- validator/src/validator.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/validator/src/validator.rs b/validator/src/validator.rs index 616153c330..ed051ed788 100644 --- a/validator/src/validator.rs +++ b/validator/src/validator.rs @@ -204,6 +204,19 @@ where .await }); + let key = signing_key.clone(); + let validator_address1 = validator_address.clone(); + network.register_validator_signing_callback(move |peer_id, timestamp| { + let record = ValidatorRecord { + timestamp, + peer_id, + validator_address: validator_address1.clone(), + }; + + let signature = key.tagged_sign(&record); + TaggedSigned::new(record, signature) + }); + Self { consensus: consensus.proxy(), blockchain, From a84c105e6ef484d38da184bc985bce5bdc9fd350 Mon Sep 17 00:00:00 2001 From: styppo Date: Tue, 26 Nov 2024 17:27:28 +0000 Subject: [PATCH 16/20] Network: correctly update own peer contact --- network-libp2p/src/discovery/behaviour.rs | 8 ++-- network-libp2p/src/discovery/peer_contacts.rs | 47 ++++++++++--------- network-libp2p/src/network.rs | 3 +- network-libp2p/tests/discovery.rs | 21 ++++----- 4 files changed, 40 insertions(+), 39 deletions(-) diff --git a/network-libp2p/src/discovery/behaviour.rs b/network-libp2p/src/discovery/behaviour.rs index 8aaa2a3bad..1ee52ba9fa 100644 --- a/network-libp2p/src/discovery/behaviour.rs +++ b/network-libp2p/src/discovery/behaviour.rs @@ -129,7 +129,7 @@ impl Behaviour { #[cfg(feature = "kad")] verifier: Arc, ) -> Self { let house_keeping_timer = interval(config.house_keeping_interval); - peer_contact_book.write().update_own_contact(&keypair); + peer_contact_book.write().refresh_own_contact(); // Report our own known addresses as candidates to the swarm let mut events = VecDeque::new(); @@ -151,9 +151,7 @@ impl Behaviour { /// Adds addresses into our own contact within the peer contact book pub fn add_own_addresses(&self, addresses: Vec) { - self.peer_contact_book - .write() - .add_own_addresses(addresses, &self.keypair) + self.peer_contact_book.write().add_own_addresses(addresses) } /// Returns whether an address in `Multiaddr` format is a dialable websocket address @@ -238,7 +236,7 @@ impl NetworkBehaviour for Behaviour { Poll::Ready(Some(_)) => { trace!("Doing house-keeping in peer address book"); let mut peer_address_book = self.peer_contact_book.write(); - peer_address_book.update_own_contact(&self.keypair); + peer_address_book.refresh_own_contact(); peer_address_book.house_keeping(); } Poll::Ready(None) => unreachable!(), diff --git a/network-libp2p/src/discovery/peer_contacts.rs b/network-libp2p/src/discovery/peer_contacts.rs index c5203d77b2..31459ce41a 100644 --- a/network-libp2p/src/discovery/peer_contacts.rs +++ b/network-libp2p/src/discovery/peer_contacts.rs @@ -429,10 +429,12 @@ pub struct PeerContactBook { own_peer_contact: PeerContactInfo, /// Own Peer ID (also present in `own_peer_contact`) own_peer_id: PeerId, + /// Identity keypair for this node. + key_pair: Keypair, /// Contact information for other peers in the network indexed by their /// peer ID. peer_contacts: HashMap>, - /// Reverse map when we + /// Map from validator address to peer ids. validator_peer_ids: HashMap>, /// Only return secure websocket addresses. /// With this flag non secure websocket addresses will be stored (to still have a valid signature of the peer contact) @@ -468,15 +470,19 @@ impl PeerContactBook { /// Creates a new `PeerContactBook` given our own peer contact information. pub fn new( - own_peer_contact: SignedPeerContact, + mut own_peer_contact: PeerContact, + key_pair: Keypair, only_secure_addresses: bool, allow_loopback_addresses: bool, memory_transport: bool, ) -> Self { - let own_peer_id = own_peer_contact.inner.peer_id(); + let own_peer_id = own_peer_contact.peer_id(); + own_peer_contact.set_current_time(); + Self { - own_peer_contact: own_peer_contact.into(), + own_peer_contact: own_peer_contact.sign(&key_pair).into(), own_peer_id, + key_pair, peer_contacts: HashMap::new(), only_secure_addresses, allow_loopback_addresses, @@ -703,37 +709,33 @@ impl PeerContactBook { } /// Adds a set of addresses to the list of addresses known for our own contact. - pub fn add_own_addresses>( - &mut self, - addresses: I, - keypair: &Keypair, - ) { + pub fn add_own_addresses>(&mut self, addresses: I) { let mut contact = self.own_peer_contact.contact.inner.clone(); let addresses = addresses.into_iter().collect::>(); trace!(?addresses, "Adding own addresses"); contact.add_addresses(addresses); - self.own_peer_contact = PeerContactInfo::from(contact.sign(keypair)); + self.update_own_contact(contact); } /// Removes a set of addresses from the list of addresses known for our own. - pub fn remove_own_addresses>( - &mut self, - addresses: I, - keypair: &Keypair, - ) { + pub fn remove_own_addresses>(&mut self, addresses: I) { let mut contact = self.own_peer_contact.contact.inner.clone(); let addresses = addresses.into_iter().collect::>(); contact.remove_addresses(addresses); - self.own_peer_contact = PeerContactInfo::from(contact.sign(keypair)); + self.update_own_contact(contact); } - /// Updates the timestamp of our own contact - pub fn update_own_contact(&mut self, keypair: &Keypair) { - // Not really optimal to clone here, but *shrugs* - let mut contact = self.own_peer_contact.contact.inner.clone(); + /// Updates the timestamp of our own contact. + pub fn refresh_own_contact(&mut self) { + let contact = self.own_peer_contact.contact.inner.clone(); + self.update_own_contact(contact); + } - // Update timestamp + fn update_own_contact(&mut self, mut contact: PeerContact) { + // Update timestamp. contact.set_current_time(); + + // Update validator info. contact.validator_info = self.validator_record_signing.as_ref().and_then(|callback| { let tagged_signed = (callback)(contact.peer_id(), contact.timestamp); Some(ValidatorInfo { @@ -742,7 +744,7 @@ impl PeerContactBook { }) }); - self.own_peer_contact = PeerContactInfo::from(contact.sign(keypair)); + self.own_peer_contact = PeerContactInfo::from(contact.sign(&self.key_pair)); } /// Gets our own contact information @@ -856,6 +858,7 @@ impl PeerContactBook { + 'static, ) { self.validator_record_signing = Some(Box::new(callback)); + self.refresh_own_contact(); } } diff --git a/network-libp2p/src/network.rs b/network-libp2p/src/network.rs index aa6299607a..f7aa82aaec 100644 --- a/network-libp2p/src/network.rs +++ b/network-libp2p/src/network.rs @@ -87,7 +87,8 @@ impl Network { // TODO: persist to disk let own_peer_contact = config.peer_contact.clone(); let contacts = Arc::new(RwLock::new(PeerContactBook::new( - own_peer_contact.sign(&config.keypair), + own_peer_contact, + config.keypair.clone(), config.only_secure_ws_connections, config.allow_loopback_addresses, config.memory_transport, diff --git a/network-libp2p/tests/discovery.rs b/network-libp2p/tests/discovery.rs index 8d22baa570..f7189c781e 100644 --- a/network-libp2p/tests/discovery.rs +++ b/network-libp2p/tests/discovery.rs @@ -72,11 +72,11 @@ impl TestNode { .unwrap() .as_secs(), ) - .expect("PeerContact must be creatable") - .sign(&keypair); + .expect("PeerContact must be creatable"); let peer_contact_book = Arc::new(RwLock::new(PeerContactBook::new( peer_contact, + keypair.clone(), false, true, true, @@ -127,6 +127,11 @@ impl TestNode { } fn random_peer_contact(n: usize, services: Services) -> SignedPeerContact { + let (keypair, peer_contact) = random_peer_key_and_contact(n, services); + peer_contact.sign(&keypair) +} + +fn random_peer_key_and_contact(n: usize, services: Services) -> (Keypair, PeerContact) { let keypair = Keypair::generate_ed25519(); let peer_contact = PeerContact::new( @@ -144,9 +149,8 @@ fn random_peer_contact(n: usize, services: Services) -> SignedPeerContact { ) .expect("PeerContact must be creatable"); - peer_contact.sign(&keypair) + (keypair, peer_contact) } - fn test_peers_in_contact_book( peer_contact_book: &PeerContactBook, peer_contacts: &[SignedPeerContact], @@ -257,18 +261,13 @@ pub async fn test_dialing_peer_from_contacts() { #[test] fn test_housekeeping() { - let mut peer_contact_book = PeerContactBook::new( - random_peer_contact(1, Services::FULL_BLOCKS), - false, - true, - true, - ); + let (keypair, peer_contact) = random_peer_key_and_contact(1, Services::FULL_BLOCKS); + let mut peer_contact_book = PeerContactBook::new(peer_contact, keypair, false, true, true); let fresh_contact = random_peer_contact(1, Services::FULL_BLOCKS); let old_contact = { let keypair = Keypair::generate_ed25519(); - let peer_contact = PeerContact::new( Some("/dns/test_old.local/tcp/443/wss".parse().unwrap()), keypair.public(), From 9b8c194d88e643d9943b34f380e4891c81fcfe88 Mon Sep 17 00:00:00 2001 From: styppo Date: Tue, 26 Nov 2024 22:37:35 +0000 Subject: [PATCH 17/20] ValidatorNetwork: cache verified validator peer ids from the contact book --- network-interface/src/network.rs | 6 +- network-libp2p/src/discovery/peer_contacts.rs | 27 ++++- network-libp2p/src/network.rs | 8 +- network-mock/src/network.rs | 6 +- validator-network/src/network_impl.rs | 113 +++++++++++------- 5 files changed, 107 insertions(+), 53 deletions(-) diff --git a/network-interface/src/network.rs b/network-interface/src/network.rs index 6e2954cd8c..48c82af038 100644 --- a/network-interface/src/network.rs +++ b/network-interface/src/network.rs @@ -128,7 +128,11 @@ pub trait Network: Send + Sync + Unpin + 'static { /// Returns all peer ids that are known for the given validator. /// The returned list might contain unverified mappings. - fn get_peers_by_validator(&self, validator_address: &Address) -> HashSet; + fn get_peers_by_validator( + &self, + validator_address: &Address, + include_unverified: bool, + ) -> HashSet; /// Returns true when the given peer provides the services flags that are required by us fn peer_provides_required_services(&self, peer_id: Self::PeerId) -> bool; diff --git a/network-libp2p/src/discovery/peer_contacts.rs b/network-libp2p/src/discovery/peer_contacts.rs index 31459ce41a..216dd5b828 100644 --- a/network-libp2p/src/discovery/peer_contacts.rs +++ b/network-libp2p/src/discovery/peer_contacts.rs @@ -493,11 +493,29 @@ impl PeerContactBook { } /// Obtain a list of peer ids associated to the given validator address - pub fn get_validator_peer_ids(&self, validator_address: &Address) -> HashSet { - self.validator_peer_ids - .get(validator_address) + pub fn get_validator_peer_ids( + &self, + validator_address: &Address, + include_unverified: bool, + ) -> HashSet { + let Some(peers) = self.validator_peer_ids.get(validator_address) else { + return HashSet::new(); + }; + + if include_unverified { + return peers.clone(); + } + + peers + .iter() + .filter(|peer_id| { + self.peer_contacts + .get(peer_id) + .map(|contact| !contact.contact.local_only) + .unwrap_or(false) + }) .cloned() - .unwrap_or_default() + .collect() } /// Insert a peer contact or update an existing one @@ -850,6 +868,7 @@ impl PeerContactBook { } } + /// Registers a callback to produce a signed ValidatorRecord from a given peer_id and timestamp. pub fn register_validator_signing_callback( &mut self, callback: impl Fn(PeerId, u64) -> TaggedSigned, SchnorrKey> diff --git a/network-libp2p/src/network.rs b/network-libp2p/src/network.rs index f7aa82aaec..df8a458f68 100644 --- a/network-libp2p/src/network.rs +++ b/network-libp2p/src/network.rs @@ -531,10 +531,14 @@ impl NetworkInterface for Network { Ok(filtered_peers) } - fn get_peers_by_validator(&self, validator_address: &Address) -> HashSet { + fn get_peers_by_validator( + &self, + validator_address: &Address, + include_unverified: bool, + ) -> HashSet { self.contacts .read() - .get_validator_peer_ids(validator_address) + .get_validator_peer_ids(validator_address, include_unverified) } fn peer_provides_required_services(&self, peer_id: PeerId) -> bool { diff --git a/network-mock/src/network.rs b/network-mock/src/network.rs index 6acf457a2e..89b1bfbcb7 100644 --- a/network-mock/src/network.rs +++ b/network-mock/src/network.rs @@ -656,7 +656,11 @@ impl Network for MockNetwork { Ok(self.get_peers()) } - fn get_peers_by_validator(&self, _validator_address: &Address) -> HashSet { + fn get_peers_by_validator( + &self, + _validator_address: &Address, + _include_unverified: bool, + ) -> HashSet { // TODO HashSet::new() } diff --git a/validator-network/src/network_impl.rs b/validator-network/src/network_impl.rs index 2d5251ed8e..bd703f6842 100644 --- a/validator-network/src/network_impl.rs +++ b/validator-network/src/network_impl.rs @@ -114,16 +114,6 @@ where self.own_validator_id.read().ok_or(NetworkError::NotElected) } - /// Given the Validators and a validator_id, returns the Validator represented by the id if it exists. - /// None otherwise. - fn get_validator(validators: Option<&Validators>, validator_id: u16) -> Option<&Validator> { - // Acquire read on the validators and make sure they have been set. Return None otherwise. - validators.and_then(|validators| { - (usize::from(validator_id) < validators.num_validators()) - .then(|| validators.get_validator_by_slot_band(validator_id)) - }) - } - /// Looks up the peer ID for a validator address in the DHT. async fn resolve_peer_id( network: &N, @@ -143,14 +133,11 @@ where network: &N, validator_address: &Address, ) -> Result, NetworkError> { - if let Some(record) = network + let peer_id = network .dht_get::<_, ValidatorRecord, KeyPair>(validator_address) .await? - { - Ok(Some(record.peer_id)) - } else { - Ok(None) - } + .map(|record| record.peer_id); + Ok(peer_id) } /// Looks up the peer ID for a validator address in the DHT and updates @@ -161,24 +148,25 @@ where /// /// The given `validator_id` is used for logging purposes only. async fn update_peer_id_cache(&self, validator_id: u16, validator_address: &Address) { - let cache_value = match Self::resolve_peer_id( + let result = Self::resolve_peer_id( &self.network, validator_address, Arc::clone(&self.dht_fallback), ) - .await - { + .await; + + let mut cache_value = match result { Ok(Some(peer_id)) => { - log::trace!( + log::debug!( %peer_id, validator_id, %validator_address, - "Resolved validator peer ID" + "Resolved validator peer ID from DHT" ); Ok(peer_id) } Ok(None) => { - log::debug!(validator_id, %validator_address, "Unable to resolve validator peer ID: Entry not found in DHT"); + log::debug!(validator_id, %validator_address, "Unable to resolve validator peer ID from DHT: Entry not found"); Err(()) } Err(error) => { @@ -186,12 +174,27 @@ where validator_id, ?error, %validator_address, - "Unable to resolve validator peer ID: Network error" + "Unable to resolve validator peer ID from DHT: Network error" ); Err(()) } }; + // If the DHT lookup failed, check the peer contact book for a verified peer id. + if cache_value.is_err() { + cache_value = self + .get_verified_validator_peer_id(validator_id) + .inspect(|peer_id| { + log::debug!( + %peer_id, + validator_id, + %validator_address, + "Resolved validator peer ID from contact book" + ); + }) + .ok_or(()); + } + match self .validator_peer_id_cache .write() @@ -213,12 +216,11 @@ where /// Look up the peer ID for a validator ID. fn get_validator_cache(&self, validator_id: u16) -> CacheState { - let validators = self.validators.read(); - let Some(validator) = Self::get_validator(validators.as_ref(), validator_id) else { + let Some(validator_address) = self.get_validator_address(validator_id) else { return CacheState::Error(None); }; - if let Some(cache_state) = self.validator_peer_id_cache.read().get(&validator.address) { + if let Some(cache_state) = self.validator_peer_id_cache.read().get(&validator_address) { match *cache_state { CacheState::Resolved(..) => return *cache_state, CacheState::Error(..) => {} @@ -235,7 +237,7 @@ where { // Re-check the validator Peer ID cache with the write lock taken and update it if necessary let mut validator_peer_id_cache = self.validator_peer_id_cache.write(); - if let Some(cache_state) = validator_peer_id_cache.get_mut(&validator.address) { + if let Some(cache_state) = validator_peer_id_cache.get_mut(&validator_address) { new_cache_state = match *cache_state { CacheState::Resolved(..) => return *cache_state, CacheState::Error(prev_peer_id) => { @@ -255,9 +257,9 @@ where } else { new_cache_state = CacheState::InProgress(None); // No cache entry for this validator ID: we are going to perform the DHT query - validator_peer_id_cache.insert(validator.address.clone(), new_cache_state); + validator_peer_id_cache.insert(validator_address.clone(), new_cache_state); log::debug!( - ?validator.address, + %validator_address, validator_id, "No cache entry found, querying DHT", ); @@ -265,7 +267,6 @@ where } let self_ = self.arc_clone(); - let validator_address = validator.address.clone(); spawn(async move { Self::update_peer_id_cache(&self_, validator_id, &validator_address).await; }); @@ -287,14 +288,13 @@ where // Fetch the validator from the validators. If it does not exist that peer_id is not // assigned in this epoch and there is no cached entry to clear. - let validators = self.validators.read(); - let Some(validator) = Self::get_validator(validators.as_ref(), validator_id) else { + let Some(validator_address) = self.get_validator_address(validator_id) else { return; }; // Fetch the cache. If it does not exist there is no need to clear. let mut validator_peer_id_cache = self.validator_peer_id_cache.write(); - let Some(cache_entry) = validator_peer_id_cache.get_mut(&validator.address) else { + let Some(cache_entry) = validator_peer_id_cache.get_mut(&validator_address) else { return; }; @@ -306,20 +306,43 @@ where } } - /// Fetches all peer ids from the contact book for the given validator_id. + /// Fetches all peer ids (including unverified ones) from the contact book for the given validator_id. fn get_validator_peer_ids(&self, validator_id: u16) -> HashSet { - // Try all validator peer_ids from our peer contact book. - let validator_address = { - let validators = self.validators.read(); - let Some(validators) = validators.as_ref() else { - return HashSet::new(); - }; - validators - .get_validator_by_slot_band(validator_id) - .address - .clone() + let Some(validator_address) = self.get_validator_address(validator_id) else { + return HashSet::new(); }; - self.network.get_peers_by_validator(&validator_address) + self.network + .get_peers_by_validator(&validator_address, true) + } + + /// Fetches the verified peer id from the contact book for the given validator_id if it exists. + fn get_verified_validator_peer_id(&self, validator_id: u16) -> Option { + let validator_address = self.get_validator_address(validator_id)?; + let peer_ids = self + .network + .get_peers_by_validator(&validator_address, false); + + if peer_ids.len() > 1 { + warn!( + %validator_address, + num_peer_ids = peer_ids.len(), + "More than one verified peer id found for validator" + ); + } + + // TODO Pick latest peer id instead of random one. + peer_ids.iter().next().cloned() + } + + /// Fetches the validator address for the given validator_id. + fn get_validator_address(&self, validator_id: u16) -> Option
{ + let validators = self.validators.read(); + let address = validators + .as_ref()? + .get_validator_by_slot_band(validator_id) + .address + .clone(); + Some(address) } } From c94fdb5d1fcf1e76a879d688acdfc75074e81ad9 Mon Sep 17 00:00:00 2001 From: styppo Date: Thu, 28 Nov 2024 19:22:43 +0000 Subject: [PATCH 18/20] Network: periodically re-verify local_only contacts in peer contact book --- network-libp2p/src/behaviour.rs | 2 +- network-libp2p/src/discovery/handler.rs | 38 +--- network-libp2p/src/discovery/peer_contacts.rs | 202 +++++++++--------- network-libp2p/src/network.rs | 9 +- network-libp2p/tests/discovery.rs | 18 +- validator-network/src/network_impl.rs | 2 +- 6 files changed, 130 insertions(+), 141 deletions(-) diff --git a/network-libp2p/src/behaviour.rs b/network-libp2p/src/behaviour.rs index cef502500c..124c3693b3 100644 --- a/network-libp2p/src/behaviour.rs +++ b/network-libp2p/src/behaviour.rs @@ -174,6 +174,6 @@ impl Behaviour { /// Updates the scores of all peers in the peer contact book. /// Updates are performed with the score values of Gossipsub pub fn update_scores(&self, contacts: Arc>) { - contacts.read().update_scores(&self.gossipsub); + contacts.write().update_scores(&self.gossipsub); } } diff --git a/network-libp2p/src/discovery/handler.rs b/network-libp2p/src/discovery/handler.rs index 6b3bcbc67b..0140d43a75 100644 --- a/network-libp2p/src/discovery/handler.rs +++ b/network-libp2p/src/discovery/handler.rs @@ -1,6 +1,7 @@ use std::{ collections::{HashSet, VecDeque}, future::Future as _, + ops::Deref, pin::Pin, sync::Arc, task::{Context, Poll, Waker}, @@ -189,7 +190,7 @@ impl Handler { peer_address: Multiaddr, #[cfg(feature = "kad")] verifier: Arc, ) -> Self { - if let Some(peer_contact) = peer_contact_book.write().get(&peer_id) { + if let Some(peer_contact) = peer_contact_book.write().get_mut(&peer_id) { if let Some(outer_protocol_address) = outer_protocol_address(&peer_address) { peer_contact.set_outer_protocol_address(outer_protocol_address); } @@ -299,39 +300,22 @@ pub(crate) fn outer_protocol_address(addr: &Multiaddr) -> Option { } fn filter_contact( - timestamp: u64, #[cfg(feature = "kad")] verifier: Arc, ) -> Box Option> { Box::new(move |mut peer_contact: SignedPeerContact| { match peer_contact.verify( #[cfg(feature = "kad")] - Arc::clone(&verifier), + verifier.deref(), ) { - // Contacts with too many addresses or an invalid peer signature are rejected - // and removed from the collection - Err(PeerContactError::AdvertisedAddressesExceeded) - | Err(PeerContactError::InvalidSignature) => None, + Ok(_) => Some(peer_contact), // If there is a validator record, but the state is incomplete, // then it cannot be verified and must be checked again at a later time. Err(PeerContactError::ValidatorRecord(ValidatorInfoError::StateIncomplete)) => { peer_contact.local_only = true; Some(peer_contact) } - // If there is a validator record but either the signature is invalid, or the validator is unknown, - // the head timestamp of the blockchain and the creation timestamp must be compared. - Err(PeerContactError::ValidatorRecord(ValidatorInfoError::InvalidSignature)) - | Err(PeerContactError::ValidatorRecord(ValidatorInfoError::UnknownValidator(_))) => { - // Set it to local only. Some contacts will be discarded by the next condition still. - peer_contact.local_only = true; - - // Retain only contacts which are in the future, as they may still verify then. - if timestamp < peer_contact.inner.timestamp() { - Some(peer_contact) - } else { - None - } - } - Ok(_) => Some(peer_contact), + // Filter contact if verification fails for any other reason. + Err(_) => None, } }) } @@ -575,13 +559,11 @@ impl ConnectionHandler for Handler { peer_contacts, } => { // Check the peer contact for a valid signature. - let Some(peer_contact) = filter_contact( - 0, + let filter_fn = filter_contact( #[cfg(feature = "kad")] Arc::clone(&self.verifier), - )( - peer_contact.clone() - ) else { + ); + let Some(peer_contact) = filter_fn(peer_contact.clone()) else { return Poll::Ready( ConnectionHandlerEvent::NotifyBehaviour( HandlerOutEvent::Error( @@ -632,7 +614,6 @@ impl ConnectionHandler for Handler { let peer_contacts: Vec = peer_contacts .into_iter() .filter_map(filter_contact( - 0, #[cfg(feature = "kad")] Arc::clone(&self.verifier), )) @@ -739,7 +720,6 @@ impl ConnectionHandler for Handler { let peer_contacts: Vec = peer_contacts .into_iter() .filter_map(filter_contact( - 0, #[cfg(feature = "kad")] Arc::clone(&self.verifier), )) diff --git a/network-libp2p/src/discovery/peer_contacts.rs b/network-libp2p/src/discovery/peer_contacts.rs index 216dd5b828..88f0603730 100644 --- a/network-libp2p/src/discovery/peer_contacts.rs +++ b/network-libp2p/src/discovery/peer_contacts.rs @@ -1,6 +1,7 @@ use std::{ collections::{hash_map::Entry, HashMap, HashSet}, fmt::Debug, + ops::Deref, sync::Arc, time::Duration, }; @@ -19,7 +20,6 @@ use nimiq_network_interface::{ validator_record::ValidatorRecord, }; use nimiq_utils::tagged_signing::{TaggedKeyPair, TaggedSignable, TaggedSignature, TaggedSigned}; -use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -68,29 +68,6 @@ impl ValidatorInfo { signature, } } - - pub fn verify( - &self, - timestamp: u64, - peer_id: PeerId, - verification_key: &::PublicKey, - ) -> Result<(), ValidatorInfoError> { - // Reconstruct the record - let record = ValidatorRecord { - validator_address: self.validator_address.clone(), - timestamp, - peer_id, - }; - - // Reconstruct the signed record. - let signed_record = TaggedSigned::new(record, self.signature.clone()); - - // Verify the record and return the result. - signed_record - .verify(verification_key) - .then_some(()) - .ok_or(ValidatorInfoError::InvalidSignature) - } } pub trait ValidatorRecordVerifier: Send + Sync { @@ -236,7 +213,7 @@ impl PeerContact { /// the expected limits. This is helpful to verify a received peer contact. pub fn verify( &self, - #[cfg(feature = "kad")] verifier: Arc, + #[cfg(feature = "kad")] verifier: &dyn ValidatorRecordVerifier, ) -> Result<(), PeerContactError> { if self.addresses.len() > Self::MAX_ADDRESSES { return Err(PeerContactError::AdvertisedAddressesExceeded); @@ -289,7 +266,7 @@ impl SignedPeerContact { /// intrinsic verification on the inner PeerContact. pub fn verify( &self, - #[cfg(feature = "kad")] verifier: Arc, + #[cfg(feature = "kad")] verifier: &dyn ValidatorRecordVerifier, ) -> Result<(), PeerContactError> { // The record signature must be verified first. if !self @@ -325,7 +302,7 @@ struct PeerContactMeta { /// This encapsulates a peer contact (signed), but also pre-computes frequently used values such as `peer_id` and /// `protocols`. It also contains meta-data that can be mutated. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct PeerContactInfo { /// The peer ID derived from the public key in the peer contact. peer_id: PeerId, @@ -334,7 +311,7 @@ pub struct PeerContactInfo { contact: SignedPeerContact, /// Mutable meta-data. - meta: RwLock, + meta: PeerContactMeta, } impl From for PeerContactInfo { @@ -344,10 +321,10 @@ impl From for PeerContactInfo { Self { peer_id, contact, - meta: RwLock::new(PeerContactMeta { + meta: PeerContactMeta { score: 0., outer_protocol_address: None, - }), + }, } } } @@ -384,7 +361,10 @@ impl PeerContactInfo { } /// Returns whether the peer contact exceeds its age limit - pub fn exceeds_age(&self, max_age: Duration, unix_time: Duration) -> bool { + pub fn exceeds_age(&self, max_age: Duration) -> bool { + let unix_time = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); unix_time .checked_sub(Duration::from_secs(self.contact.inner.timestamp())) .is_some_and(|age| age > max_age) @@ -397,28 +377,25 @@ impl PeerContactInfo { /// Gets the peer score pub fn get_score(&self) -> f64 { - self.meta.read().score + self.meta.score } /// Sets the peer score - pub fn set_score(&self, score: f64) { - self.meta.write().score = score; + pub fn set_score(&mut self, score: f64) { + self.meta.score = score; } /// Gets the outer protocol address of the peer. For example `/ip4/x.x.x.x` or `/dns4/foo.bar` pub fn get_outer_protocol_address(&self) -> Option { - self.meta.read().outer_protocol_address.clone() + self.meta.outer_protocol_address.clone() } /// Sets the outer protocol address of the peer once - pub fn set_outer_protocol_address(&self, addr: Multiaddr) { - self.meta - .write() - .outer_protocol_address - .get_or_insert_with(|| { - trace!(peer_id = %self.peer_id, %addr, "Set outer protocol address for peer"); - addr - }); + pub fn set_outer_protocol_address(&mut self, addr: Multiaddr) { + self.meta.outer_protocol_address.get_or_insert_with(|| { + trace!(peer_id = %self.peer_id, %addr, "Set outer protocol address for peer"); + addr + }); } } @@ -433,7 +410,7 @@ pub struct PeerContactBook { key_pair: Keypair, /// Contact information for other peers in the network indexed by their /// peer ID. - peer_contacts: HashMap>, + peer_contacts: HashMap, /// Map from validator address to peer ids. validator_peer_ids: HashMap>, /// Only return secure websocket addresses. @@ -444,10 +421,12 @@ pub struct PeerContactBook { allow_loopback_addresses: bool, /// Flag to indicate whether to support memory transport addresses memory_transport: bool, - /// Validator signing callback: - validator_record_signing: Option< + /// Validator signing callback + validator_record_signer: Option< Box TaggedSigned, SchnorrKey> + Send + Sync>, >, + /// Validator record verifier + validator_record_verifier: Arc, } impl Debug for PeerContactBook { @@ -466,7 +445,7 @@ impl Debug for PeerContactBook { impl PeerContactBook { /// If a peer's age exceeds this value in seconds, it is removed (30 minutes) - pub const MAX_PEER_AGE: u64 = 30 * 60; + pub const MAX_PEER_AGE: Duration = Duration::from_secs(30 * 60); /// Creates a new `PeerContactBook` given our own peer contact information. pub fn new( @@ -475,6 +454,7 @@ impl PeerContactBook { only_secure_addresses: bool, allow_loopback_addresses: bool, memory_transport: bool, + #[cfg(feature = "kad")] validator_record_verifier: Arc, ) -> Self { let own_peer_id = own_peer_contact.peer_id(); own_peer_contact.set_current_time(); @@ -488,7 +468,8 @@ impl PeerContactBook { allow_loopback_addresses, memory_transport, validator_peer_ids: HashMap::new(), - validator_record_signing: None, + validator_record_signer: None, + validator_record_verifier, } } @@ -549,7 +530,7 @@ impl PeerContactBook { if entry_value.contact().timestamp < info.contact().timestamp && info.contact().timestamp <= current_ts { - *entry_value = Arc::new(info); + *entry_value = info; } } Entry::Vacant(entry) => { @@ -560,7 +541,7 @@ impl PeerContactBook { validator_address = ?info.contact.inner.validator_info.as_ref().map(|info| info.validator_address.clone()), "Adding peer contact", ); - entry.insert(Arc::new(info)); + entry.insert(info); } } } @@ -605,10 +586,9 @@ impl PeerContactBook { // TODO Check peer contact timestamp // Call `insert()` here instead? - let info = Arc::new(info); let is_new = self .peer_contacts - .insert(info.peer_id, Arc::clone(&info)) + .insert(info.peer_id, info.clone()) .is_none(); if let Some(validator_info) = &info.contact.inner.validator_info { self.validator_peer_ids @@ -649,10 +629,16 @@ impl PeerContactBook { } } - /// Gets a peer contact if it exists given its peer_id. + /// Gets a reference to a peer contact if it exists given its peer_id. + /// If the peer_id is not found, `None` is returned. + pub fn get(&self, peer_id: &PeerId) -> Option<&PeerContactInfo> { + self.peer_contacts.get(peer_id) + } + + /// Gets a mutable reference to a peer contact if it exists given its peer_id. /// If the peer_id is not found, `None` is returned. - pub fn get(&self, peer_id: &PeerId) -> Option> { - self.peer_contacts.get(peer_id).cloned() + pub fn get_mut(&mut self, peer_id: &PeerId) -> Option<&mut PeerContactInfo> { + self.peer_contacts.get_mut(peer_id) } /// Gets the peer contact's addresses if it exists given its peer_id. @@ -700,12 +686,12 @@ impl PeerContactBook { &self, services: Services, include_local_only: bool, - ) -> impl Iterator> + '_ { + ) -> impl Iterator + '_ { // TODO: This is a naive implementation // TODO: Sort by score? self.peer_contacts.iter().filter_map(move |(_, contact)| { if contact.matches(services) && (!contact.contact.local_only || include_local_only) { - Some(Arc::clone(contact)) + Some(contact) } else { None } @@ -714,9 +700,8 @@ impl PeerContactBook { /// Updates the score of every peer in the contact book with the gossipsub /// peer score. - pub fn update_scores(&self, gossipsub: &gossipsub::Behaviour) { - let contacts = self.peer_contacts.iter(); - + pub fn update_scores(&mut self, gossipsub: &gossipsub::Behaviour) { + let contacts = self.peer_contacts.iter_mut(); for contact in contacts { if let Some(score) = gossipsub.peer_score(contact.0) { contact.1.set_score(score); @@ -754,7 +739,7 @@ impl PeerContactBook { contact.set_current_time(); // Update validator info. - contact.validator_info = self.validator_record_signing.as_ref().and_then(|callback| { + contact.validator_info = self.validator_record_signer.as_ref().and_then(|callback| { let tagged_signed = (callback)(contact.peer_id(), contact.timestamp); Some(ValidatorInfo { validator_address: tagged_signed.record.validator_address.clone(), @@ -770,47 +755,6 @@ impl PeerContactBook { &self.own_peer_contact } - /// Removes peer contacts that have already exceeded the maximum age as - /// defined in `MAX_PEER_AGE`. - pub fn house_keeping(&mut self) { - if let Ok(unix_time) = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) { - let delete_peers = self - .peer_contacts - .iter() - .filter_map(|(peer_id, peer_contact)| { - if peer_contact.exceeds_age( - Duration::from_secs(PeerContactBook::MAX_PEER_AGE), - unix_time, - ) { - debug!(%peer_id, "Removing peer contact because of old age"); - Some((*peer_id, peer_contact.contact.inner.validator_info.clone())) - } else { - None - } - }) - .collect::)>>(); - - for (peer_id, validator_info) in delete_peers { - self.peer_contacts.remove(&peer_id); - if let Some(validator_info) = validator_info { - match self - .validator_peer_ids - .entry(validator_info.validator_address.clone()) - { - Entry::Occupied(mut entry) => { - entry.get_mut().remove(&peer_id); - - if entry.get_mut().is_empty() { - entry.remove(); - } - } - Entry::Vacant(_) => {} - } - } - } - } - } - /// Returns true if an address is valid for dialing. /// It performs basic checks against unsupported addresses. pub fn is_address_dialable(&self, address: &Multiaddr) -> bool { @@ -868,6 +812,58 @@ impl PeerContactBook { } } + /// Removes expired or invalid peer contacts from the contact books. + pub fn house_keeping(&mut self) { + let peers_to_delete = self + .peer_contacts + .iter_mut() + .filter_map(|(peer_id, peer_contact)| { + // Remove expired peers. + if peer_contact.exceeds_age(PeerContactBook::MAX_PEER_AGE) { + debug!(%peer_id, "Removing peer contact because of old age"); + return Some(peer_id.clone()); + } + + // Re-verify local_only contacts. + if peer_contact.contact.local_only { + let result = peer_contact + .contact + .verify(self.validator_record_verifier.deref()); + match result { + // Verification succeeded, clear local_only flag. + Ok(_) => peer_contact.contact.local_only = false, + // State is (still) incomplete, do nothing. + Err(PeerContactError::ValidatorRecord( + ValidatorInfoError::StateIncomplete, + )) => {} + // Verification failed, delete contact. + Err(_) => return Some(peer_id.clone()), + }; + } + + None + }) + .collect::>(); + + for peer_id in peers_to_delete { + let contact = self.peer_contacts.remove(&peer_id).unwrap(); + + let Some(validator_info) = contact.contact.inner.validator_info.as_ref() else { + continue; + }; + + let entry = self + .validator_peer_ids + .entry(validator_info.validator_address.clone()); + if let Entry::Occupied(mut entry) = entry { + entry.get_mut().remove(&peer_id); + if entry.get().is_empty() { + entry.remove(); + } + } + } + } + /// Registers a callback to produce a signed ValidatorRecord from a given peer_id and timestamp. pub fn register_validator_signing_callback( &mut self, @@ -876,7 +872,7 @@ impl PeerContactBook { + Sync + 'static, ) { - self.validator_record_signing = Some(Box::new(callback)); + self.validator_record_signer = Some(Box::new(callback)); self.refresh_own_contact(); } } diff --git a/network-libp2p/src/network.rs b/network-libp2p/src/network.rs index df8a458f68..827a97432c 100644 --- a/network-libp2p/src/network.rs +++ b/network-libp2p/src/network.rs @@ -86,12 +86,18 @@ impl Network { let required_services = config.required_services; // TODO: persist to disk let own_peer_contact = config.peer_contact.clone(); + + #[cfg(feature = "kad")] + let verifier = Arc::new(verifier); + let contacts = Arc::new(RwLock::new(PeerContactBook::new( own_peer_contact, config.keypair.clone(), config.only_secure_ws_connections, config.allow_loopback_addresses, config.memory_transport, + #[cfg(feature = "kad")] + (Arc::clone(&verifier) as Arc), ))); let params = gossipsub::PeerScoreParams { ip_colocation_factor_threshold: 20.0, @@ -104,9 +110,6 @@ impl Network { // with Autonat. This is because Autonat v1 only works with IP addresses. let force_dht_server_mode = config.memory_transport; - #[cfg(feature = "kad")] - let verifier = Arc::new(verifier); - let swarm = new_swarm( config, Arc::clone(&contacts), diff --git a/network-libp2p/tests/discovery.rs b/network-libp2p/tests/discovery.rs index f7189c781e..fb5768cb7e 100644 --- a/network-libp2p/tests/discovery.rs +++ b/network-libp2p/tests/discovery.rs @@ -20,7 +20,7 @@ use nimiq_hash::Blake2bHash; use nimiq_network_interface::peer_info::Services; use nimiq_network_libp2p::discovery::{ self, - peer_contacts::{PeerContact, PeerContactBook, SignedPeerContact}, + peer_contacts::{PeerContact, PeerContactBook, SignedPeerContact, ValidatorRecordVerifier}, }; use nimiq_test_log::test; use nimiq_utils::spawn; @@ -80,6 +80,8 @@ impl TestNode { false, true, true, + #[cfg(feature = "kad")] + (Arc::new(()) as Arc), ))); let behaviour = discovery::Behaviour::new( @@ -262,7 +264,15 @@ pub async fn test_dialing_peer_from_contacts() { #[test] fn test_housekeeping() { let (keypair, peer_contact) = random_peer_key_and_contact(1, Services::FULL_BLOCKS); - let mut peer_contact_book = PeerContactBook::new(peer_contact, keypair, false, true, true); + let mut peer_contact_book = PeerContactBook::new( + peer_contact, + keypair, + false, + true, + true, + #[cfg(feature = "kad")] + (Arc::new(()) as Arc), + ); let fresh_contact = random_peer_contact(1, Services::FULL_BLOCKS); @@ -275,8 +285,8 @@ fn test_housekeeping() { SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() - .as_secs() - .saturating_sub(PeerContactBook::MAX_PEER_AGE * 2), + .saturating_sub(PeerContactBook::MAX_PEER_AGE * 2) + .as_secs(), ) .expect("Peer contact must be creatable"); diff --git a/validator-network/src/network_impl.rs b/validator-network/src/network_impl.rs index bd703f6842..e4fe662a4b 100644 --- a/validator-network/src/network_impl.rs +++ b/validator-network/src/network_impl.rs @@ -15,7 +15,7 @@ use nimiq_network_interface::{ request::{InboundRequestError, Message, Request, RequestCommon, RequestError}, validator_record::ValidatorRecord, }; -use nimiq_primitives::slots_allocation::{Validator, Validators}; +use nimiq_primitives::slots_allocation::Validators; use nimiq_serde::{Deserialize, Serialize}; use nimiq_utils::{spawn, stream::FuturesUnordered, tagged_signing::TaggedSigned}; use parking_lot::RwLock; From bb0a8b70576025e140349dbca4f12ad8cdb7974a Mon Sep 17 00:00:00 2001 From: styppo Date: Wed, 15 Jan 2025 14:59:31 +0000 Subject: [PATCH 19/20] Network: close connection for all discovery errors --- network-libp2p/src/discovery/behaviour.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/network-libp2p/src/discovery/behaviour.rs b/network-libp2p/src/discovery/behaviour.rs index 1ee52ba9fa..cfe99b8d88 100644 --- a/network-libp2p/src/discovery/behaviour.rs +++ b/network-libp2p/src/discovery/behaviour.rs @@ -11,7 +11,7 @@ use libp2p::{ identity::Keypair, swarm::{ behaviour::{ConnectionClosed, ConnectionEstablished}, - ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, ToSwarm, + CloseConnection, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, ToSwarm, }, Multiaddr, PeerId, }; @@ -297,8 +297,10 @@ impl NetworkBehaviour for Behaviour { .push_back(ToSwarm::NewExternalAddrCandidate(observed_address)); } HandlerOutEvent::Update => self.events.push_back(ToSwarm::GenerateEvent(Event::Update)), - // Errors must not result in a closed connection as light clients are unable to verify ValidatorRecord. - HandlerOutEvent::Error(error) => log::trace!(?error, "Received invalid contact"), + HandlerOutEvent::Error(_) => self.events.push_back(ToSwarm::CloseConnection { + peer_id, + connection: CloseConnection::All, + }), } } } From 7fdc90938b5eb301e2828fe47d83d197d31775d6 Mon Sep 17 00:00:00 2001 From: styppo Date: Mon, 27 Jan 2025 13:08:40 +0000 Subject: [PATCH 20/20] Network: fix wasm compilation error --- network-libp2p/src/discovery/peer_contacts.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/network-libp2p/src/discovery/peer_contacts.rs b/network-libp2p/src/discovery/peer_contacts.rs index 88f0603730..80d37cec1d 100644 --- a/network-libp2p/src/discovery/peer_contacts.rs +++ b/network-libp2p/src/discovery/peer_contacts.rs @@ -426,6 +426,7 @@ pub struct PeerContactBook { Box TaggedSigned, SchnorrKey> + Send + Sync>, >, /// Validator record verifier + #[cfg(feature = "kad")] validator_record_verifier: Arc, } @@ -825,6 +826,7 @@ impl PeerContactBook { } // Re-verify local_only contacts. + #[cfg(feature = "kad")] if peer_contact.contact.local_only { let result = peer_contact .contact