Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor announce shard config pubsub message #292

Merged
merged 3 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions node/network/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;

pub use globals::NetworkGlobals;
pub use pubsub::{
AnnounceChunks, AnnounceFile, AnnounceShardConfig, FindChunks, FindFile, HasSignature, NewFile,
PubsubMessage, SignedAnnounceChunks, SignedAnnounceFile, SignedAnnounceShardConfig,
SignedMessage, SnappyTransform,
AnnounceChunks, AnnounceFile, FindChunks, FindFile, HasSignature, NewFile, PubsubMessage,
SignedAnnounceChunks, SignedAnnounceFile, SignedMessage, SnappyTransform, TimedMessage,
};
pub use topics::{GossipEncoding, GossipKind, GossipTopic};
31 changes: 22 additions & 9 deletions node/network/src/types/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use libp2p::{
gossipsub::{DataTransform, GossipsubMessage, RawGossipsubMessage},
Multiaddr, PeerId,
};
use shared_types::TxID;
use shared_types::{timestamp_now, ShardConfig, TxID};
use snap::raw::{decompress_len, Decoder, Encoder};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
Expand Down Expand Up @@ -162,14 +162,28 @@ pub struct AnnounceChunks {
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)]
pub struct AnnounceShardConfig {
pub num_shard: usize,
pub shard_id: usize,
pub peer_id: WrappedPeerId,
pub at: WrappedMultiaddr,
pub struct TimedMessage<T: Encode + Decode> {
pub inner: T,
pub timestamp: u32,
}

impl<T: Encode + Decode> Deref for TimedMessage<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<T: Encode + Decode> From<T> for TimedMessage<T> {
fn from(value: T) -> Self {
Self {
inner: value,
timestamp: timestamp_now(),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)]
pub struct SignedMessage<T: Encode + Decode> {
pub inner: T,
Expand Down Expand Up @@ -210,7 +224,6 @@ impl<T: Encode + Decode> HasSignature for SignedMessage<T> {
}

pub type SignedAnnounceFile = SignedMessage<AnnounceFile>;
pub type SignedAnnounceShardConfig = SignedMessage<AnnounceShardConfig>;
pub type SignedAnnounceChunks = SignedMessage<AnnounceChunks>;

type SignedAnnounceFiles = Vec<SignedAnnounceFile>;
Expand All @@ -222,7 +235,7 @@ pub enum PubsubMessage {
FindFile(FindFile),
FindChunks(FindChunks),
AnnounceFile(Vec<SignedAnnounceFile>),
AnnounceShardConfig(SignedAnnounceShardConfig),
AnnounceShardConfig(TimedMessage<ShardConfig>),
AnnounceChunks(SignedAnnounceChunks),
}

Expand Down Expand Up @@ -342,7 +355,7 @@ impl PubsubMessage {
.map_err(|e| format!("{:?}", e))?,
)),
GossipKind::AnnounceShardConfig => Ok(PubsubMessage::AnnounceShardConfig(
SignedAnnounceShardConfig::from_ssz_bytes(data)
TimedMessage::<ShardConfig>::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?,
)),
}
Expand Down
2 changes: 1 addition & 1 deletion node/network/src/types/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub const FIND_FILE_TOPIC: &str = "find_file";
pub const FIND_CHUNKS_TOPIC: &str = "find_chunks";
pub const ANNOUNCE_FILE_TOPIC: &str = "announce_file";
pub const ANNOUNCE_CHUNKS_TOPIC: &str = "announce_chunks";
pub const ANNOUNCE_SHARD_CONFIG_TOPIC: &str = "announce_shard_config";
pub const ANNOUNCE_SHARD_CONFIG_TOPIC: &str = "announce_shard_config_v2";

/// A gossipsub topic which encapsulates the type of messages that should be sent and received over
/// the pubsub protocol and the way the messages should be encoded.
Expand Down
79 changes: 16 additions & 63 deletions node/router/src/libp2p_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use chunk_pool::ChunkPoolMessage;
use file_location_cache::FileLocationCache;
use network::multiaddr::Protocol;
use network::rpc::methods::FileAnnouncement;
use network::types::{AnnounceShardConfig, NewFile, SignedAnnounceShardConfig};
use network::types::{NewFile, TimedMessage};
use network::{
rpc::StatusMessage,
types::{
Expand Down Expand Up @@ -383,7 +383,7 @@ impl Libp2pEventHandler {
}
PubsubMessage::AnnounceShardConfig(msg) => {
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD.mark(1);
self.on_announce_shard_config(propagation_source, msg)
self.on_announce_shard_config(propagation_source, source, msg)
}
}
}
Expand Down Expand Up @@ -553,35 +553,6 @@ impl Libp2pEventHandler {
Some(signed)
}

pub async fn construct_announce_shard_config_message(
&self,
shard_config: ShardConfig,
) -> Option<PubsubMessage> {
let peer_id = *self.network_globals.peer_id.read();
let addr = self.construct_announced_ip().await?;
let timestamp = timestamp_now();

let msg = AnnounceShardConfig {
num_shard: shard_config.num_shard,
shard_id: shard_config.shard_id,
peer_id: peer_id.into(),
at: addr.into(),
timestamp,
};

let mut signed = match SignedMessage::sign_message(msg, &self.local_keypair) {
Ok(signed) => signed,
Err(e) => {
error!(%e, "Failed to sign AnnounceShardConfig message");
return None;
}
};

signed.resend_timestamp = timestamp;

Some(PubsubMessage::AnnounceShardConfig(signed))
}

async fn on_find_file(&self, from: PeerId, msg: FindFile) -> MessageAcceptance {
let FindFile {
tx_id, timestamp, ..
Expand Down Expand Up @@ -872,52 +843,34 @@ impl Libp2pEventHandler {
fn on_announce_shard_config(
&self,
propagation_source: PeerId,
msg: SignedAnnounceShardConfig,
source: PeerId,
msg: TimedMessage<shared_types::ShardConfig>,
) -> MessageAcceptance {
// verify message signature
if !verify_signature(&msg, &msg.peer_id, propagation_source) {
return MessageAcceptance::Reject;
}

// verify public ip address if required
let addr = msg.at.clone().into();
if !self.config.private_ip_enabled && !Self::contains_public_ip(&addr) {
return MessageAcceptance::Reject;
}

// verify announced ip address if required
if !self.config.private_ip_enabled
&& self.config.check_announced_ip
&& !self.verify_announced_address(&msg.peer_id, &addr)
{
return MessageAcceptance::Reject;
}

// propagate gossip to peers
// validate timestamp
let d = duration_since(
msg.resend_timestamp,
msg.timestamp,
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD_LATENCY.clone(),
);
if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_SHARD_CONFIG_TIMEOUT {
debug!(%msg.resend_timestamp, ?d, "Invalid resend timestamp, ignoring AnnounceShardConfig message");
debug!(?d, %propagation_source, %source, ?msg, "Invalid timestamp, ignoring AnnounceShardConfig message");
return MessageAcceptance::Ignore;
}

let shard_config = ShardConfig {
shard_id: msg.shard_id,
num_shard: msg.num_shard,
let shard_config = match ShardConfig::try_from(msg.inner) {
Ok(v) => v,
Err(_) => return MessageAcceptance::Reject,
};

// insert message to cache
self.file_location_cache
.insert_peer_config(source, shard_config);

// notify sync layer
self.send_to_sync(SyncMessage::AnnounceShardConfig {
shard_config,
peer_id: msg.peer_id.clone().into(),
addr,
peer_id: source,
});

// insert message to cache
self.file_location_cache
.insert_peer_config(msg.peer_id.clone().into(), shard_config);

MessageAcceptance::Accept
}

Expand Down
11 changes: 4 additions & 7 deletions node/router/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,10 @@ impl RouterService {
PrunerMessage::ChangeShardConfig(shard_config) => {
self.libp2p_event_handler
.send_to_chunk_pool(ChunkPoolMessage::ChangeShardConfig(shard_config));
if let Some(msg) = self
.libp2p_event_handler
.construct_announce_shard_config_message(shard_config)
.await
{
self.libp2p_event_handler.publish(msg)
}

let shard_config = shared_types::ShardConfig::from(shard_config);
self.libp2p_event_handler
.publish(PubsubMessage::AnnounceShardConfig(shard_config.into()));
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions node/shared_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,13 @@ pub enum TxSeqOrRoot {
Root(DataRoot),
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, DeriveEncode, DeriveDecode)]
#[serde(rename_all = "camelCase")]
pub struct ShardConfig {
pub num_shard: usize,
pub shard_id: usize,
}

#[cfg(test)]
mod tests {
use std::str::FromStr;
Expand Down
17 changes: 17 additions & 0 deletions node/storage/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,23 @@ impl TryFrom<Option<String>> for ShardConfig {
}
}

impl TryFrom<shared_types::ShardConfig> for ShardConfig {
type Error = String;

fn try_from(value: shared_types::ShardConfig) -> Result<Self, Self::Error> {
Self::new(value.shard_id, value.num_shard)
}
}

impl From<ShardConfig> for shared_types::ShardConfig {
fn from(value: ShardConfig) -> Self {
Self {
num_shard: value.num_shard,
shard_id: value.shard_id,
}
}
}

impl ShardConfig {
pub fn new(id: usize, num: usize) -> Result<Self, String> {
let config = ShardConfig {
Expand Down
1 change: 0 additions & 1 deletion node/sync/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ pub enum SyncMessage {
AnnounceShardConfig {
shard_config: ShardConfig,
peer_id: PeerId,
addr: Multiaddr,
},
AnnounceChunksGossip {
msg: AnnounceChunks,
Expand Down
Loading