Skip to content

Commit

Permalink
Refactor announce shard config pubsub message (#292)
Browse files Browse the repository at this point in the history
* Refactor announce shard config pubsub message

* change topic name for announce shard config

* minor change
  • Loading branch information
boqiu authored Dec 5, 2024
1 parent 4bb2fac commit c5ddcc1
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 84 deletions.
5 changes: 2 additions & 3 deletions node/network/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;

pub use globals::NetworkGlobals;
pub use pubsub::{
AnnounceChunks, AnnounceFile, AnnounceShardConfig, FindChunks, FindFile, HasSignature, NewFile,
PubsubMessage, SignedAnnounceChunks, SignedAnnounceFile, SignedAnnounceShardConfig,
SignedMessage, SnappyTransform,
AnnounceChunks, AnnounceFile, FindChunks, FindFile, HasSignature, NewFile, PubsubMessage,
SignedAnnounceChunks, SignedAnnounceFile, SignedMessage, SnappyTransform, TimedMessage,
};
pub use topics::{GossipEncoding, GossipKind, GossipTopic};
31 changes: 22 additions & 9 deletions node/network/src/types/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use libp2p::{
gossipsub::{DataTransform, GossipsubMessage, RawGossipsubMessage},
Multiaddr, PeerId,
};
use shared_types::TxID;
use shared_types::{timestamp_now, ShardConfig, TxID};
use snap::raw::{decompress_len, Decoder, Encoder};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
Expand Down Expand Up @@ -162,14 +162,28 @@ pub struct AnnounceChunks {
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)]
pub struct AnnounceShardConfig {
pub num_shard: usize,
pub shard_id: usize,
pub peer_id: WrappedPeerId,
pub at: WrappedMultiaddr,
pub struct TimedMessage<T: Encode + Decode> {
pub inner: T,
pub timestamp: u32,
}

impl<T: Encode + Decode> Deref for TimedMessage<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<T: Encode + Decode> From<T> for TimedMessage<T> {
fn from(value: T) -> Self {
Self {
inner: value,
timestamp: timestamp_now(),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)]
pub struct SignedMessage<T: Encode + Decode> {
pub inner: T,
Expand Down Expand Up @@ -210,7 +224,6 @@ impl<T: Encode + Decode> HasSignature for SignedMessage<T> {
}

pub type SignedAnnounceFile = SignedMessage<AnnounceFile>;
pub type SignedAnnounceShardConfig = SignedMessage<AnnounceShardConfig>;
pub type SignedAnnounceChunks = SignedMessage<AnnounceChunks>;

type SignedAnnounceFiles = Vec<SignedAnnounceFile>;
Expand All @@ -222,7 +235,7 @@ pub enum PubsubMessage {
FindFile(FindFile),
FindChunks(FindChunks),
AnnounceFile(Vec<SignedAnnounceFile>),
AnnounceShardConfig(SignedAnnounceShardConfig),
AnnounceShardConfig(TimedMessage<ShardConfig>),
AnnounceChunks(SignedAnnounceChunks),
}

Expand Down Expand Up @@ -342,7 +355,7 @@ impl PubsubMessage {
.map_err(|e| format!("{:?}", e))?,
)),
GossipKind::AnnounceShardConfig => Ok(PubsubMessage::AnnounceShardConfig(
SignedAnnounceShardConfig::from_ssz_bytes(data)
TimedMessage::<ShardConfig>::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?,
)),
}
Expand Down
2 changes: 1 addition & 1 deletion node/network/src/types/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub const FIND_FILE_TOPIC: &str = "find_file";
pub const FIND_CHUNKS_TOPIC: &str = "find_chunks";
pub const ANNOUNCE_FILE_TOPIC: &str = "announce_file";
pub const ANNOUNCE_CHUNKS_TOPIC: &str = "announce_chunks";
pub const ANNOUNCE_SHARD_CONFIG_TOPIC: &str = "announce_shard_config";
pub const ANNOUNCE_SHARD_CONFIG_TOPIC: &str = "announce_shard_config_v2";

/// A gossipsub topic which encapsulates the type of messages that should be sent and received over
/// the pubsub protocol and the way the messages should be encoded.
Expand Down
79 changes: 16 additions & 63 deletions node/router/src/libp2p_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use chunk_pool::ChunkPoolMessage;
use file_location_cache::FileLocationCache;
use network::multiaddr::Protocol;
use network::rpc::methods::FileAnnouncement;
use network::types::{AnnounceShardConfig, NewFile, SignedAnnounceShardConfig};
use network::types::{NewFile, TimedMessage};
use network::{
rpc::StatusMessage,
types::{
Expand Down Expand Up @@ -383,7 +383,7 @@ impl Libp2pEventHandler {
}
PubsubMessage::AnnounceShardConfig(msg) => {
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD.mark(1);
self.on_announce_shard_config(propagation_source, msg)
self.on_announce_shard_config(propagation_source, source, msg)
}
}
}
Expand Down Expand Up @@ -553,35 +553,6 @@ impl Libp2pEventHandler {
Some(signed)
}

pub async fn construct_announce_shard_config_message(
&self,
shard_config: ShardConfig,
) -> Option<PubsubMessage> {
let peer_id = *self.network_globals.peer_id.read();
let addr = self.construct_announced_ip().await?;
let timestamp = timestamp_now();

let msg = AnnounceShardConfig {
num_shard: shard_config.num_shard,
shard_id: shard_config.shard_id,
peer_id: peer_id.into(),
at: addr.into(),
timestamp,
};

let mut signed = match SignedMessage::sign_message(msg, &self.local_keypair) {
Ok(signed) => signed,
Err(e) => {
error!(%e, "Failed to sign AnnounceShardConfig message");
return None;
}
};

signed.resend_timestamp = timestamp;

Some(PubsubMessage::AnnounceShardConfig(signed))
}

async fn on_find_file(&self, from: PeerId, msg: FindFile) -> MessageAcceptance {
let FindFile {
tx_id, timestamp, ..
Expand Down Expand Up @@ -872,52 +843,34 @@ impl Libp2pEventHandler {
fn on_announce_shard_config(
&self,
propagation_source: PeerId,
msg: SignedAnnounceShardConfig,
source: PeerId,
msg: TimedMessage<shared_types::ShardConfig>,
) -> MessageAcceptance {
// verify message signature
if !verify_signature(&msg, &msg.peer_id, propagation_source) {
return MessageAcceptance::Reject;
}

// verify public ip address if required
let addr = msg.at.clone().into();
if !self.config.private_ip_enabled && !Self::contains_public_ip(&addr) {
return MessageAcceptance::Reject;
}

// verify announced ip address if required
if !self.config.private_ip_enabled
&& self.config.check_announced_ip
&& !self.verify_announced_address(&msg.peer_id, &addr)
{
return MessageAcceptance::Reject;
}

// propagate gossip to peers
// validate timestamp
let d = duration_since(
msg.resend_timestamp,
msg.timestamp,
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD_LATENCY.clone(),
);
if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_SHARD_CONFIG_TIMEOUT {
debug!(%msg.resend_timestamp, ?d, "Invalid resend timestamp, ignoring AnnounceShardConfig message");
debug!(?d, %propagation_source, %source, ?msg, "Invalid timestamp, ignoring AnnounceShardConfig message");
return MessageAcceptance::Ignore;
}

let shard_config = ShardConfig {
shard_id: msg.shard_id,
num_shard: msg.num_shard,
let shard_config = match ShardConfig::try_from(msg.inner) {
Ok(v) => v,
Err(_) => return MessageAcceptance::Reject,
};

// insert message to cache
self.file_location_cache
.insert_peer_config(source, shard_config);

// notify sync layer
self.send_to_sync(SyncMessage::AnnounceShardConfig {
shard_config,
peer_id: msg.peer_id.clone().into(),
addr,
peer_id: source,
});

// insert message to cache
self.file_location_cache
.insert_peer_config(msg.peer_id.clone().into(), shard_config);

MessageAcceptance::Accept
}

Expand Down
11 changes: 4 additions & 7 deletions node/router/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,10 @@ impl RouterService {
PrunerMessage::ChangeShardConfig(shard_config) => {
self.libp2p_event_handler
.send_to_chunk_pool(ChunkPoolMessage::ChangeShardConfig(shard_config));
if let Some(msg) = self
.libp2p_event_handler
.construct_announce_shard_config_message(shard_config)
.await
{
self.libp2p_event_handler.publish(msg)
}

let shard_config = shared_types::ShardConfig::from(shard_config);
self.libp2p_event_handler
.publish(PubsubMessage::AnnounceShardConfig(shard_config.into()));
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions node/shared_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,13 @@ pub enum TxSeqOrRoot {
Root(DataRoot),
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, DeriveEncode, DeriveDecode)]
#[serde(rename_all = "camelCase")]
pub struct ShardConfig {
pub num_shard: usize,
pub shard_id: usize,
}

#[cfg(test)]
mod tests {
use std::str::FromStr;
Expand Down
17 changes: 17 additions & 0 deletions node/storage/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,23 @@ impl TryFrom<Option<String>> for ShardConfig {
}
}

impl TryFrom<shared_types::ShardConfig> for ShardConfig {
type Error = String;

fn try_from(value: shared_types::ShardConfig) -> Result<Self, Self::Error> {
Self::new(value.shard_id, value.num_shard)
}
}

impl From<ShardConfig> for shared_types::ShardConfig {
fn from(value: ShardConfig) -> Self {
Self {
num_shard: value.num_shard,
shard_id: value.shard_id,
}
}
}

impl ShardConfig {
pub fn new(id: usize, num: usize) -> Result<Self, String> {
let config = ShardConfig {
Expand Down
1 change: 0 additions & 1 deletion node/sync/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ pub enum SyncMessage {
AnnounceShardConfig {
shard_config: ShardConfig,
peer_id: PeerId,
addr: Multiaddr,
},
AnnounceChunksGossip {
msg: AnnounceChunks,
Expand Down

0 comments on commit c5ddcc1

Please sign in to comment.