Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ValidatorInfo to PeerContacts. #3039

Open
wants to merge 20 commits into
base: albatross
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
80fce1a
Add ValidatorInfo to the PeerContacts. Do basic verification
nibhar Nov 12, 2024
a07c03b
Compile fix
ii-cruz Nov 25, 2024
98e0d1f
Add/remove peer ids from validator addresses
viquezclaudio Nov 25, 2024
ab3fc09
Network: prepare network interface for validator discovery
styppo Nov 25, 2024
63403bf
Populate validator_info
nibhar Nov 25, 2024
20dc107
ValidatorNetwork: resolve validator peer ids from peer contact book
styppo Nov 25, 2024
802d32a
Validator: register callback for the validator record
nibhar Nov 25, 2024
51136f0
DhtVerifier: Remove unused and commented code
nibhar Nov 25, 2024
833bdda
DHT: fix clippy warnings
styppo Nov 25, 2024
8f7fe54
ValidatorNetwork: use correct validator_id when fetching peer ids fro…
styppo Nov 25, 2024
c867f84
Network: fix filtering of `local_only` contacts
styppo Nov 25, 2024
4586742
Fix clippy warnings
viquezclaudio Nov 25, 2024
e592e10
Fixing the feature compilation
ii-cruz Nov 26, 2024
3058be2
ValidatorNetwork: use contact book peer ids where required
styppo Nov 26, 2024
09e6a5d
Validator: register signing callback in constructor
styppo Nov 26, 2024
a84c105
Network: correctly update own peer contact
styppo Nov 26, 2024
9b8c194
ValidatorNetwork: cache verified validator peer ids from the contact …
styppo Nov 26, 2024
c94fdb5
Network: periodically re-verify local_only contacts in peer contact book
styppo Nov 28, 2024
bb0a8b7
Network: close connection for all discovery errors
styppo Jan 15, 2025
14a7780
Network: fix wasm compilation error
styppo Jan 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dht/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ nimiq-blockchain-interface = { workspace = true }
nimiq-blockchain-proxy = { workspace = true, features = ["full"] }
nimiq-keys = { workspace = true }
nimiq-log = { workspace = true, optional = true }
nimiq-network-interface = { workspace = true }
nimiq-network-libp2p = { workspace = true }
nimiq-serde = { workspace = true }
nimiq-utils = { workspace = true }
Expand Down
117 changes: 67 additions & 50 deletions dht/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use nimiq_blockchain_proxy::BlockchainProxy;
use nimiq_keys::{Address, KeyPair};
use nimiq_network_interface::{
network::Network as NetworkInterface, validator_record::ValidatorRecord,
};
use nimiq_network_libp2p::{
dht::{DhtRecord, DhtVerifierError, Verifier as DhtVerifier},
discovery::peer_contacts::{ValidatorInfoError, ValidatorRecordVerifier},
libp2p::kad::Record,
PeerId,
Network, PeerId,
};
use nimiq_serde::Deserialize;
use nimiq_utils::tagged_signing::{TaggedSignable, TaggedSigned};
use nimiq_validator_network::validator_record::ValidatorRecord;

pub struct Verifier {
blockchain: BlockchainProxy,
Expand All @@ -17,71 +20,46 @@ impl Verifier {
pub fn new(blockchain: BlockchainProxy) -> Self {
Self { blockchain }
}
}

fn verify_validator_record(&self, record: &Record) -> Result<DhtRecord, DhtVerifierError> {
// Deserialize the value of the record, which is a ValidatorRecord. If it fails return an error.
let validator_record =
TaggedSigned::<ValidatorRecord<PeerId>, KeyPair>::deserialize_from_vec(&record.value)
.map_err(DhtVerifierError::MalformedValue)?;

// Make sure the peer who signed the record is also the one presented in the record.
if let Some(publisher) = record.publisher {
if validator_record.record.peer_id != publisher {
return Err(DhtVerifierError::PublisherMismatch(
publisher,
validator_record.record.peer_id,
));
}
} else {
log::warn!("Validating a dht record without a publisher");
return Err(DhtVerifierError::PublisherMissing);
}

// Deserialize the key of the record which is an Address. If it fails return an error.
let validator_address = Address::deserialize_from_vec(record.key.as_ref())
.map_err(DhtVerifierError::MalformedKey)?;

// Make sure the validator address used as key is identical to the one in the record.
if validator_record.record.validator_address != validator_address {
return Err(DhtVerifierError::AddressMismatch(
validator_address,
validator_record.record.validator_address,
));
}

impl ValidatorRecordVerifier for Verifier {
fn verify_validator_record(
&self,
signed_record: &TaggedSigned<
ValidatorRecord<<Network as NetworkInterface>::PeerId>,
KeyPair,
>,
) -> Result<(), ValidatorInfoError> {
// Acquire blockchain read access. For now exclude Light clients.
let blockchain = match self.blockchain {
BlockchainProxy::Light(ref _light_blockchain) => {
return Err(DhtVerifierError::UnknownTag)
}
BlockchainProxy::Full(ref full_blockchain) => full_blockchain,
BlockchainProxy::Full(ref blockchain) => blockchain,
BlockchainProxy::Light(_) => return Err(ValidatorInfoError::StateIncomplete),
};
let blockchain_read = blockchain.read();

// Get the staking contract to retrieve the public key for verification.
let staking_contract = blockchain_read
.get_staking_contract_if_complete(None)
.ok_or(DhtVerifierError::StateIncomplete)?;
.ok_or(ValidatorInfoError::StateIncomplete)?;

// Get the public key needed for verification.
let data_store = blockchain_read.get_staking_contract_store();
let txn = blockchain_read.read_transaction();
let public_key = staking_contract
.get_validator(&data_store.read(&txn), &validator_address)
.ok_or(DhtVerifierError::UnknownValidator(validator_address))?
.get_validator(
&data_store.read(&txn),
&signed_record.record.validator_address,
)
.ok_or(ValidatorInfoError::UnknownValidator(
signed_record.record.validator_address.clone(),
))?
.signing_key;

// Verify the record.
validator_record
signed_record
.verify(&public_key)
.then(|| {
DhtRecord::Validator(
record.publisher.unwrap(),
validator_record.record,
record.clone(),
)
})
.ok_or(DhtVerifierError::InvalidSignature)
.then_some(())
.ok_or(ValidatorInfoError::InvalidSignature)
}
}

Expand All @@ -96,7 +74,46 @@ impl DhtVerifier for Verifier {

// Depending on tag perform the verification.
match tag {
ValidatorRecord::<PeerId>::TAG => self.verify_validator_record(record),
ValidatorRecord::<PeerId>::TAG => {
// Deserialize the value of the record, which is a ValidatorRecord. If it fails return an error.
let validator_record =
TaggedSigned::<ValidatorRecord<PeerId>, KeyPair>::deserialize_from_vec(
&record.value,
)
.map_err(DhtVerifierError::MalformedValue)?;

// Make sure the peer who published the record is also the one signed into the record.
if record.publisher.ok_or(DhtVerifierError::MissingPublisher)?
!= validator_record.record.peer_id
{
return Err(DhtVerifierError::PublisherMismatch(
record.publisher.unwrap(),
validator_record.record.peer_id,
));
}

// Deserialize the key of the record which is an Address. If it fails return an error.
let validator_address = Address::deserialize_from_vec(record.key.as_ref())
.map_err(DhtVerifierError::MalformedKey)?;

// Make sure the address used as key is also the one signed into the record.
if validator_address != validator_record.record.validator_address {
return Err(DhtVerifierError::AddressMismatch(
validator_address,
validator_record.record.validator_address,
));
}

self.verify_validator_record(&validator_record)
.map_err(|_| DhtVerifierError::ValidatorInfoError)
.map(|_| {
DhtRecord::Validator(
validator_record.record.peer_id,
validator_record.record,
record.clone(),
)
})
}
_ => {
log::error!(tag, "DHT invalid record tag received");
Err(DhtVerifierError::UnknownTag)
Expand Down
4 changes: 2 additions & 2 deletions handel/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl std::fmt::Debug for Protocol {
}
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(bound = "C: AggregatableContribution")]
struct SerializableLevelUpdate<C: AggregatableContribution> {
aggregate: C,
Expand Down Expand Up @@ -164,7 +164,7 @@ impl<C: AggregatableContribution> From<LevelUpdate<C>> for SerializableLevelUpda
}
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(bound = "C: AggregatableContribution")]
struct Update<C: AggregatableContribution>(pub SerializableLevelUpdate<C>);

Expand Down
1 change: 1 addition & 0 deletions network-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ thiserror = "2.0"
tokio = { version = "1.43", features = ["rt"] }
tokio-stream = { version = "0.1", features = ["default", "sync"] }

nimiq-keys = { workspace = true, features = ["serde-derive"] }
nimiq-serde = { workspace = true }
nimiq-utils = { workspace = true, features = ["tagged-signing"] }
1 change: 1 addition & 0 deletions network-interface/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod network;
pub mod peer_info;
pub mod request;
pub mod validator_record;

pub use multiaddr::{multiaddr, Multiaddr, Protocol};
34 changes: 32 additions & 2 deletions network-interface/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use std::{
collections::HashSet,
fmt::{Debug, Display},
hash::Hash,
time::Duration,
};

use async_trait::async_trait;
use futures::stream::BoxStream;
use nimiq_keys::{Address, KeyPair};
use nimiq_serde::{Deserialize, DeserializeError, Serialize};
use nimiq_utils::tagged_signing::{TaggedKeyPair, TaggedSignable};
use nimiq_utils::tagged_signing::{TaggedKeyPair, TaggedSignable, TaggedSigned};
use thiserror::Error;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;

use crate::{
peer_info::*,
request::{Message, Request, RequestError},
validator_record::ValidatorRecord,
};

/// Network events that the network will report when subscribing
Expand Down Expand Up @@ -87,7 +90,17 @@ pub enum SendError {

#[async_trait]
pub trait Network: Send + Sync + Unpin + 'static {
type PeerId: Copy + Debug + Display + Ord + Hash + Send + Sync + Unpin + 'static;
type PeerId: Copy
+ Debug
+ Display
+ Ord
+ Hash
+ Send
+ Sync
+ Unpin
+ Serialize
+ Deserialize
+ 'static;
type AddressType: Debug + Display + 'static;
type Error: std::error::Error;
type PubsubId: PubsubId<Self::PeerId> + Send + Sync + Unpin;
Expand All @@ -113,6 +126,14 @@ pub trait Network: Send + Sync + Unpin + 'static {
min_peers: usize,
) -> Result<Vec<Self::PeerId>, Self::Error>;

/// Returns all peer ids that are known for the given validator.
/// The returned list might contain unverified mappings.
fn get_peers_by_validator(
&self,
validator_address: &Address,
include_unverified: bool,
) -> HashSet<Self::PeerId>;

/// Returns true when the given peer provides the services flags that are required by us
fn peer_provides_required_services(&self, peer_id: Self::PeerId) -> bool;

Expand Down Expand Up @@ -218,4 +239,13 @@ pub trait Network: Send + Sync + Unpin + 'static {
request_id: Self::RequestId,
response: Req::Response,
) -> Result<(), Self::Error>;

/// Registers a callback to produce a signed ValidatorRecord from a given peer_id and timestamp.
fn register_validator_signing_callback(
&self,
callback: impl Fn(Self::PeerId, u64) -> TaggedSigned<ValidatorRecord<Self::PeerId>, KeyPair>
+ Send
+ Sync
+ 'static,
);
}
2 changes: 1 addition & 1 deletion network-interface/src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl RequestKind for MessageMarker {
}

pub trait RequestCommon:
Serialize + Deserialize + Send + Sync + Unpin + fmt::Debug + 'static
Serialize + Deserialize + Send + Sync + Unpin + fmt::Debug + Clone + 'static
{
type Kind: RequestKind;
const TYPE_ID: u16;
Expand Down
17 changes: 15 additions & 2 deletions network-libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ nimiq-utils = { workspace = true, features = [
"tagged-signing",
"libp2p",
"time",
"spawn",
] }
nimiq-validator-network = { workspace = true }

Expand Down Expand Up @@ -85,12 +86,24 @@ libp2p = { version = "0.54", default-features = false, features = [

[dev-dependencies]
# In dev/testing we require more tokio features
tokio = { version = "1.43", features = ["macros", "rt", "rt-multi-thread", "test-util", "time", "tracing"] }
tokio = { version = "1.43", features = [
"macros",
"rt",
"rt-multi-thread",
"test-util",
"time",
"tracing",
] }

nimiq-test-log = { workspace = true }
nimiq-test-utils = { workspace = true }

[features]
kad = []
metrics = ["prometheus-client"]
tokio-websocket = ["libp2p/dns", "libp2p/tcp", "libp2p/tokio", "libp2p/websocket"]
tokio-websocket = [
"libp2p/dns",
"libp2p/tcp",
"libp2p/tokio",
"libp2p/websocket",
]
13 changes: 9 additions & 4 deletions network-libp2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ use parking_lot::RwLock;
use rand::rngs::OsRng;

use crate::{
connection_pool,
connection_pool::behaviour::Config as PoolConfig,
discovery::{self, peer_contacts::PeerContactBook},
connection_pool::{self, behaviour::Config as PoolConfig},
discovery::{
self,
peer_contacts::{PeerContactBook, ValidatorRecordVerifier},
},
dispatch::codecs::MessageCodec,
Config,
};
Expand Down Expand Up @@ -50,6 +52,7 @@ impl Behaviour {
contacts: Arc<RwLock<PeerContactBook>>,
peer_score_params: gossipsub::PeerScoreParams,
force_dht_server_mode: bool,
#[cfg(feature = "kad")] verifier: Arc<dyn ValidatorRecordVerifier>,
) -> Self {
let public_key = config.keypair.public();
let peer_id = public_key.to_peer_id();
Expand All @@ -68,6 +71,8 @@ impl Behaviour {
config.discovery.clone(),
config.keypair.clone(),
Arc::clone(&contacts),
#[cfg(feature = "kad")]
verifier,
);

// Gossipsub behaviour
Expand Down Expand Up @@ -169,6 +174,6 @@ impl Behaviour {
/// Updates the scores of all peers in the peer contact book.
/// Updates are performed with the score values of Gossipsub
pub fn update_scores(&self, contacts: Arc<RwLock<PeerContactBook>>) {
contacts.read().update_scores(&self.gossipsub);
contacts.write().update_scores(&self.gossipsub);
}
}
4 changes: 2 additions & 2 deletions network-libp2p/src/connection_pool/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ impl Behaviour {
let own_peer_id = own_contact.peer_id();

contacts
.query(self.required_services)
.query(self.required_services, true)
.filter_map(|contact| {
let peer_id = contact.peer_id();
if peer_id != own_peer_id
Expand Down Expand Up @@ -562,7 +562,7 @@ impl Behaviour {
let own_peer_id = own_contact.peer_id();

contacts
.query(services)
.query(services, true)
.filter_map(|contact| {
let peer_id = contact.peer_id();
if peer_id != own_peer_id
Expand Down
Loading
Loading