diff --git a/node/network/src/types/mod.rs b/node/network/src/types/mod.rs index b93cf42c..8a198514 100644 --- a/node/network/src/types/mod.rs +++ b/node/network/src/types/mod.rs @@ -7,8 +7,7 @@ pub type Enr = discv5::enr::Enr; pub use globals::NetworkGlobals; pub use pubsub::{ - AnnounceChunks, AnnounceFile, AnnounceShardConfig, FindChunks, FindFile, HasSignature, NewFile, - PubsubMessage, SignedAnnounceChunks, SignedAnnounceFile, SignedAnnounceShardConfig, - SignedMessage, SnappyTransform, + AnnounceChunks, AnnounceFile, FindChunks, FindFile, HasSignature, NewFile, PubsubMessage, + SignedAnnounceChunks, SignedAnnounceFile, SignedMessage, SnappyTransform, TimedMessage, }; pub use topics::{GossipEncoding, GossipKind, GossipTopic}; diff --git a/node/network/src/types/pubsub.rs b/node/network/src/types/pubsub.rs index d5dd4e16..b516ef73 100644 --- a/node/network/src/types/pubsub.rs +++ b/node/network/src/types/pubsub.rs @@ -6,7 +6,7 @@ use libp2p::{ gossipsub::{DataTransform, GossipsubMessage, RawGossipsubMessage}, Multiaddr, PeerId, }; -use shared_types::TxID; +use shared_types::{timestamp_now, ShardConfig, TxID}; use snap::raw::{decompress_len, Decoder, Encoder}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; @@ -162,14 +162,28 @@ pub struct AnnounceChunks { } #[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)] -pub struct AnnounceShardConfig { - pub num_shard: usize, - pub shard_id: usize, - pub peer_id: WrappedPeerId, - pub at: WrappedMultiaddr, +pub struct TimedMessage { + pub inner: T, pub timestamp: u32, } +impl Deref for TimedMessage { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl From for TimedMessage { + fn from(value: T) -> Self { + Self { + inner: value, + timestamp: timestamp_now(), + } + } +} + #[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)] pub struct SignedMessage { pub inner: T, @@ -210,7 +224,6 @@ impl HasSignature for SignedMessage { } pub type SignedAnnounceFile = SignedMessage; -pub type SignedAnnounceShardConfig = SignedMessage; pub type SignedAnnounceChunks = SignedMessage; type SignedAnnounceFiles = Vec; @@ -222,7 +235,7 @@ pub enum PubsubMessage { FindFile(FindFile), FindChunks(FindChunks), AnnounceFile(Vec), - AnnounceShardConfig(SignedAnnounceShardConfig), + AnnounceShardConfig(TimedMessage), AnnounceChunks(SignedAnnounceChunks), } @@ -342,7 +355,7 @@ impl PubsubMessage { .map_err(|e| format!("{:?}", e))?, )), GossipKind::AnnounceShardConfig => Ok(PubsubMessage::AnnounceShardConfig( - SignedAnnounceShardConfig::from_ssz_bytes(data) + TimedMessage::::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?, )), } diff --git a/node/network/src/types/topics.rs b/node/network/src/types/topics.rs index ae087a8b..888193ba 100644 --- a/node/network/src/types/topics.rs +++ b/node/network/src/types/topics.rs @@ -13,7 +13,7 @@ pub const FIND_FILE_TOPIC: &str = "find_file"; pub const FIND_CHUNKS_TOPIC: &str = "find_chunks"; pub const ANNOUNCE_FILE_TOPIC: &str = "announce_file"; pub const ANNOUNCE_CHUNKS_TOPIC: &str = "announce_chunks"; -pub const ANNOUNCE_SHARD_CONFIG_TOPIC: &str = "announce_shard_config"; +pub const ANNOUNCE_SHARD_CONFIG_TOPIC: &str = "announce_shard_config_v2"; /// A gossipsub topic which encapsulates the type of messages that should be sent and received over /// the pubsub protocol and the way the messages should be encoded. diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index 146d1fec..b4ee95f3 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -6,7 +6,7 @@ use chunk_pool::ChunkPoolMessage; use file_location_cache::FileLocationCache; use network::multiaddr::Protocol; use network::rpc::methods::FileAnnouncement; -use network::types::{AnnounceShardConfig, NewFile, SignedAnnounceShardConfig}; +use network::types::{NewFile, TimedMessage}; use network::{ rpc::StatusMessage, types::{ @@ -383,7 +383,7 @@ impl Libp2pEventHandler { } PubsubMessage::AnnounceShardConfig(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD.mark(1); - self.on_announce_shard_config(propagation_source, msg) + self.on_announce_shard_config(propagation_source, source, msg) } } } @@ -553,35 +553,6 @@ impl Libp2pEventHandler { Some(signed) } - pub async fn construct_announce_shard_config_message( - &self, - shard_config: ShardConfig, - ) -> Option { - let peer_id = *self.network_globals.peer_id.read(); - let addr = self.construct_announced_ip().await?; - let timestamp = timestamp_now(); - - let msg = AnnounceShardConfig { - num_shard: shard_config.num_shard, - shard_id: shard_config.shard_id, - peer_id: peer_id.into(), - at: addr.into(), - timestamp, - }; - - let mut signed = match SignedMessage::sign_message(msg, &self.local_keypair) { - Ok(signed) => signed, - Err(e) => { - error!(%e, "Failed to sign AnnounceShardConfig message"); - return None; - } - }; - - signed.resend_timestamp = timestamp; - - Some(PubsubMessage::AnnounceShardConfig(signed)) - } - async fn on_find_file(&self, from: PeerId, msg: FindFile) -> MessageAcceptance { let FindFile { tx_id, timestamp, .. @@ -872,52 +843,34 @@ impl Libp2pEventHandler { fn on_announce_shard_config( &self, propagation_source: PeerId, - msg: SignedAnnounceShardConfig, + source: PeerId, + msg: TimedMessage, ) -> MessageAcceptance { - // verify message signature - if !verify_signature(&msg, &msg.peer_id, propagation_source) { - return MessageAcceptance::Reject; - } - - // verify public ip address if required - let addr = msg.at.clone().into(); - if !self.config.private_ip_enabled && !Self::contains_public_ip(&addr) { - return MessageAcceptance::Reject; - } - - // verify announced ip address if required - if !self.config.private_ip_enabled - && self.config.check_announced_ip - && !self.verify_announced_address(&msg.peer_id, &addr) - { - return MessageAcceptance::Reject; - } - - // propagate gossip to peers + // validate timestamp let d = duration_since( - msg.resend_timestamp, + msg.timestamp, metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD_LATENCY.clone(), ); if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_SHARD_CONFIG_TIMEOUT { - debug!(%msg.resend_timestamp, ?d, "Invalid resend timestamp, ignoring AnnounceShardConfig message"); + debug!(?d, %propagation_source, %source, ?msg, "Invalid timestamp, ignoring AnnounceShardConfig message"); return MessageAcceptance::Ignore; } - let shard_config = ShardConfig { - shard_id: msg.shard_id, - num_shard: msg.num_shard, + let shard_config = match ShardConfig::try_from(msg.inner) { + Ok(v) => v, + Err(_) => return MessageAcceptance::Reject, }; + + // insert message to cache + self.file_location_cache + .insert_peer_config(source, shard_config); + // notify sync layer self.send_to_sync(SyncMessage::AnnounceShardConfig { shard_config, - peer_id: msg.peer_id.clone().into(), - addr, + peer_id: source, }); - // insert message to cache - self.file_location_cache - .insert_peer_config(msg.peer_id.clone().into(), shard_config); - MessageAcceptance::Accept } diff --git a/node/router/src/service.rs b/node/router/src/service.rs index 8b46a41d..29efccf3 100644 --- a/node/router/src/service.rs +++ b/node/router/src/service.rs @@ -384,13 +384,10 @@ impl RouterService { PrunerMessage::ChangeShardConfig(shard_config) => { self.libp2p_event_handler .send_to_chunk_pool(ChunkPoolMessage::ChangeShardConfig(shard_config)); - if let Some(msg) = self - .libp2p_event_handler - .construct_announce_shard_config_message(shard_config) - .await - { - self.libp2p_event_handler.publish(msg) - } + + let shard_config = shared_types::ShardConfig::from(shard_config); + self.libp2p_event_handler + .publish(PubsubMessage::AnnounceShardConfig(shard_config.into())); } } } diff --git a/node/shared_types/src/lib.rs b/node/shared_types/src/lib.rs index 54f773da..f0dbd989 100644 --- a/node/shared_types/src/lib.rs +++ b/node/shared_types/src/lib.rs @@ -403,6 +403,13 @@ pub enum TxSeqOrRoot { Root(DataRoot), } +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, DeriveEncode, DeriveDecode)] +#[serde(rename_all = "camelCase")] +pub struct ShardConfig { + pub num_shard: usize, + pub shard_id: usize, +} + #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/node/storage/src/config.rs b/node/storage/src/config.rs index 2b7160e7..ef83c632 100644 --- a/node/storage/src/config.rs +++ b/node/storage/src/config.rs @@ -60,6 +60,23 @@ impl TryFrom> for ShardConfig { } } +impl TryFrom for ShardConfig { + type Error = String; + + fn try_from(value: shared_types::ShardConfig) -> Result { + Self::new(value.shard_id, value.num_shard) + } +} + +impl From for shared_types::ShardConfig { + fn from(value: ShardConfig) -> Self { + Self { + num_shard: value.num_shard, + shard_id: value.shard_id, + } + } +} + impl ShardConfig { pub fn new(id: usize, num: usize) -> Result { let config = ShardConfig { diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index a913426b..6a6c09ef 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -65,7 +65,6 @@ pub enum SyncMessage { AnnounceShardConfig { shard_config: ShardConfig, peer_id: PeerId, - addr: Multiaddr, }, AnnounceChunksGossip { msg: AnnounceChunks,