Skip to content

Commit

Permalink
Refactor file sync p2p protocol to NEW_FILE + ASK_FILE + ANSWER_FILE (#…
Browse files Browse the repository at this point in the history
…293)

* refactor new file pubsub message

* extract common struct ShardedFile

* refactor file sync to ASK_FILE & ANSWER_FILE protocol

* simplify code
  • Loading branch information
boqiu authored Dec 6, 2024
1 parent c5ddcc1 commit afa471e
Show file tree
Hide file tree
Showing 19 changed files with 336 additions and 372 deletions.
139 changes: 15 additions & 124 deletions node/network/src/behaviour/gossip_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,141 +13,34 @@ use tokio_util::time::delay_queue::{DelayQueue, Key};
/// messages are ignored. This behaviour can be changed using `GossipCacheBuilder::default_timeout`
/// to apply the same delay to every kind. Individual timeouts for specific kinds can be set and
/// will overwrite the default_timeout if present.
#[derive(Default)]
pub struct GossipCache {
/// Expire timeouts for each topic-msg pair.
expirations: DelayQueue<(GossipTopic, Vec<u8>)>,
/// Messages cached for each topic.
topic_msgs: HashMap<GossipTopic, HashMap<Vec<u8>, Key>>,
/// Timeout for Example messages.
example: Option<Duration>,
/// Timeout for NewFile messages.
new_file: Option<Duration>,
/// Timeout for FindFile messages.
find_file: Option<Duration>,
/// Timeout for FindChunks messages.
find_chunks: Option<Duration>,
/// Timeout for AnnounceFile.
announce_file: Option<Duration>,
/// Timeout for AnnounceChunks.
announce_chunks: Option<Duration>,
/// Timeout for AnnounceShardConfig.
announce_shard_config: Option<Duration>,
}

#[derive(Default)]
pub struct GossipCacheBuilder {
default_timeout: Option<Duration>,
/// Timeout for Example messages.
example: Option<Duration>,
/// Timeout for NewFile messages.
new_file: Option<Duration>,
/// Timeout for blocks FindFile messages.
find_file: Option<Duration>,
/// Timeout for blocks FindChunks messages.
find_chunks: Option<Duration>,
/// Timeout for AnnounceFile messages.
announce_file: Option<Duration>,
/// Timeout for AnnounceChunks messages.
announce_chunks: Option<Duration>,
/// Timeout for AnnounceShardConfig messages.
announce_shard_config: Option<Duration>,
}

#[allow(dead_code)]
impl GossipCacheBuilder {
/// By default, all timeouts all disabled. Setting a default timeout will enable all timeout
/// that are not already set.
pub fn default_timeout(mut self, timeout: Duration) -> Self {
self.default_timeout = Some(timeout);
self
}

/// Timeout for Example messages.
pub fn example_timeout(mut self, timeout: Duration) -> Self {
self.example = Some(timeout);
self
}

/// Timeout for NewFile messages.
pub fn new_file_timeout(mut self, timeout: Duration) -> Self {
self.new_file = Some(timeout);
self
}

/// Timeout for FindFile messages.
pub fn find_file_timeout(mut self, timeout: Duration) -> Self {
self.find_file = Some(timeout);
self
}

/// Timeout for FindChunks messages.
pub fn find_chunks_timeout(mut self, timeout: Duration) -> Self {
self.find_chunks = Some(timeout);
self
}

/// Timeout for AnnounceFile messages.
pub fn announce_file_timeout(mut self, timeout: Duration) -> Self {
self.announce_file = Some(timeout);
self
}

/// Timeout for AnnounceChunks messages.
pub fn announce_chunks_timeout(mut self, timeout: Duration) -> Self {
self.announce_chunks = Some(timeout);
self
}

/// Timeout for AnnounceShardConfig messages.
pub fn announce_shard_config_timeout(mut self, timeout: Duration) -> Self {
self.announce_shard_config = Some(timeout);
self
}

pub fn build(self) -> GossipCache {
let GossipCacheBuilder {
default_timeout,
example,
new_file,
find_file,
find_chunks,
announce_file,
announce_chunks,
announce_shard_config,
} = self;

GossipCache {
expirations: DelayQueue::default(),
topic_msgs: HashMap::default(),
example: example.or(default_timeout),
new_file: new_file.or(default_timeout),
find_file: find_file.or(default_timeout),
find_chunks: find_chunks.or(default_timeout),
announce_file: announce_file.or(default_timeout),
announce_chunks: announce_chunks.or(default_timeout),
announce_shard_config: announce_shard_config.or(default_timeout),
}
}
/// Timeout for pubsub messages.
timeouts: HashMap<GossipKind, Duration>,
}

impl GossipCache {
/// Get a builder of a `GossipCache`. Topic kinds for which no timeout is defined will be
/// ignored if added in `insert`.
pub fn builder() -> GossipCacheBuilder {
GossipCacheBuilder::default()
#[cfg(test)]
pub fn new_with_default_timeout(timeout: Duration) -> Self {
Self {
default_timeout: Some(timeout),
..Default::default()
}
}

// Insert a message to be sent later.
pub fn insert(&mut self, topic: GossipTopic, data: Vec<u8>) {
let expire_timeout = match topic.kind() {
GossipKind::Example => self.example,
GossipKind::NewFile => self.new_file,
GossipKind::FindFile => self.find_file,
GossipKind::FindChunks => self.find_chunks,
GossipKind::AnnounceFile => self.announce_file,
GossipKind::AnnounceChunks => self.announce_chunks,
GossipKind::AnnounceShardConfig => self.announce_shard_config,
};
let expire_timeout = self
.timeouts
.get(topic.kind())
.cloned()
.or(self.default_timeout);

let expire_timeout = match expire_timeout {
Some(expire_timeout) => expire_timeout,
Expand Down Expand Up @@ -221,9 +114,7 @@ mod tests {

#[tokio::test]
async fn test_stream() {
let mut cache = GossipCache::builder()
.default_timeout(Duration::from_millis(300))
.build();
let mut cache = GossipCache::new_with_default_timeout(Duration::from_millis(300));
let test_topic =
GossipTopic::new(GossipKind::Example, crate::types::GossipEncoding::SSZSnappy);
cache.insert(test_topic, vec![]);
Expand Down
26 changes: 13 additions & 13 deletions node/network/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::peer_manager::{
ConnectionDirection, PeerManager, PeerManagerEvent,
};
use crate::rpc::methods::DataByHashRequest;
use crate::rpc::methods::FileAnnouncement;
use crate::rpc::methods::GetChunksRequest;
use crate::rpc::*;
use crate::service::Context as ServiceContext;
Expand All @@ -32,7 +31,7 @@ use libp2p::{
},
NetworkBehaviour, PeerId,
};
use shared_types::ChunkArrayWithProof;
use shared_types::{ChunkArrayWithProof, ShardedFile};
use std::{
collections::VecDeque,
sync::Arc,
Expand Down Expand Up @@ -236,6 +235,9 @@ impl<AppReqId: ReqId> Behaviour<AppReqId> {
params
.topics
.insert(get_hash(GossipKind::NewFile), TopicScoreParams::default());
params
.topics
.insert(get_hash(GossipKind::AskFile), TopicScoreParams::default());
params
.topics
.insert(get_hash(GossipKind::FindFile), TopicScoreParams::default());
Expand Down Expand Up @@ -270,12 +272,10 @@ impl<AppReqId: ReqId> Behaviour<AppReqId> {
..config.peer_manager
};

let slot_duration = std::time::Duration::from_secs(12);
// let slot_duration = std::time::Duration::from_secs(12);
// let slot_duration = std::time::Duration::from_secs(ctx.chain_spec.seconds_per_slot);

let gossip_cache = GossipCache::builder()
.example_timeout(slot_duration) // TODO
.build();
let gossip_cache = GossipCache::default();

Ok(Behaviour {
// Sub-behaviours
Expand Down Expand Up @@ -547,8 +547,8 @@ impl<AppReqId: ReqId> Behaviour<AppReqId> {
Request::DataByHash { .. } => {
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["data_by_hash"])
}
Request::AnnounceFile { .. } => {
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["announce_file"])
Request::AnswerFile { .. } => {
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["answer_file"])
}
Request::GetChunks { .. } => {
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["get_chunks"])
Expand Down Expand Up @@ -780,8 +780,8 @@ where
InboundRequest::DataByHash(req) => {
self.propagate_request(peer_request_id, peer_id, Request::DataByHash(req))
}
InboundRequest::AnnounceFile(req) => {
self.propagate_request(peer_request_id, peer_id, Request::AnnounceFile(req))
InboundRequest::AnswerFile(req) => {
self.propagate_request(peer_request_id, peer_id, Request::AnswerFile(req))
}
InboundRequest::GetChunks(req) => {
self.propagate_request(peer_request_id, peer_id, Request::GetChunks(req))
Expand Down Expand Up @@ -997,8 +997,8 @@ pub enum Request {
Status(StatusMessage),
/// A data by hash request.
DataByHash(DataByHashRequest),
/// An AnnounceFile message.
AnnounceFile(FileAnnouncement),
/// An AnswerFile message.
AnswerFile(ShardedFile),
/// A GetChunks request.
GetChunks(GetChunksRequest),
}
Expand All @@ -1008,7 +1008,7 @@ impl std::convert::From<Request> for OutboundRequest {
match req {
Request::Status(s) => OutboundRequest::Status(s),
Request::DataByHash(r) => OutboundRequest::DataByHash(r),
Request::AnnounceFile(r) => OutboundRequest::AnnounceFile(r),
Request::AnswerFile(r) => OutboundRequest::AnswerFile(r),
Request::GetChunks(r) => OutboundRequest::GetChunks(r),
}
}
Expand Down
6 changes: 3 additions & 3 deletions node/network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ impl PeerManager {
Protocol::Goodbye => PeerAction::LowToleranceError,
Protocol::Status => PeerAction::LowToleranceError,
Protocol::DataByHash => PeerAction::MidToleranceError,
Protocol::AnnounceFile => PeerAction::MidToleranceError,
Protocol::AnswerFile => PeerAction::MidToleranceError,
Protocol::GetChunks => PeerAction::MidToleranceError,
},
},
Expand All @@ -480,7 +480,7 @@ impl PeerManager {
Protocol::Goodbye => return,
Protocol::Status => PeerAction::LowToleranceError,
Protocol::DataByHash => return,
Protocol::AnnounceFile => return,
Protocol::AnswerFile => return,
Protocol::GetChunks => return,
}
}
Expand All @@ -495,7 +495,7 @@ impl PeerManager {
Protocol::Goodbye => return,
Protocol::Status => return,
Protocol::DataByHash => PeerAction::MidToleranceError,
Protocol::AnnounceFile => PeerAction::MidToleranceError,
Protocol::AnswerFile => PeerAction::MidToleranceError,
Protocol::GetChunks => PeerAction::MidToleranceError,
},
},
Expand Down
14 changes: 7 additions & 7 deletions node/network/src/rpc/codec/ssz_snappy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::rpc::{
};
use crate::rpc::{InboundRequest, OutboundRequest, RPCCodedResponse, RPCResponse};
use libp2p::bytes::BytesMut;
use shared_types::ChunkArrayWithProof;
use shared_types::{ChunkArrayWithProof, ShardedFile};
use snap::read::FrameDecoder;
use snap::write::FrameEncoder;
use ssz::{Decode, Encode};
Expand Down Expand Up @@ -159,7 +159,7 @@ impl Encoder<OutboundRequest> for SSZSnappyOutboundCodec {
OutboundRequest::Goodbye(req) => req.as_ssz_bytes(),
OutboundRequest::Ping(req) => req.as_ssz_bytes(),
OutboundRequest::DataByHash(req) => req.hashes.as_ssz_bytes(),
OutboundRequest::AnnounceFile(req) => req.as_ssz_bytes(),
OutboundRequest::AnswerFile(req) => req.as_ssz_bytes(),
OutboundRequest::GetChunks(req) => req.as_ssz_bytes(),
};
// SSZ encoded bytes should be within `max_packet_size`
Expand Down Expand Up @@ -347,8 +347,8 @@ fn handle_v1_request(
Protocol::DataByHash => Ok(Some(InboundRequest::DataByHash(DataByHashRequest {
hashes: VariableList::from_ssz_bytes(decoded_buffer)?,
}))),
Protocol::AnnounceFile => Ok(Some(InboundRequest::AnnounceFile(
FileAnnouncement::from_ssz_bytes(decoded_buffer)?,
Protocol::AnswerFile => Ok(Some(InboundRequest::AnswerFile(
ShardedFile::from_ssz_bytes(decoded_buffer)?,
))),
Protocol::GetChunks => Ok(Some(InboundRequest::GetChunks(
GetChunksRequest::from_ssz_bytes(decoded_buffer)?,
Expand Down Expand Up @@ -377,9 +377,9 @@ fn handle_v1_response(
Protocol::DataByHash => Ok(Some(RPCResponse::DataByHash(Box::new(
ZgsData::from_ssz_bytes(decoded_buffer)?,
)))),
// This case should be unreachable as `AnnounceFile` has no response.
Protocol::AnnounceFile => Err(RPCError::InvalidData(
"AnnounceFile RPC message has no valid response".to_string(),
// This case should be unreachable as `AnswerFile` has no response.
Protocol::AnswerFile => Err(RPCError::InvalidData(
"AnswerFile RPC message has no valid response".to_string(),
)),
Protocol::GetChunks => Ok(Some(RPCResponse::Chunks(
ChunkArrayWithProof::from_ssz_bytes(decoded_buffer)?,
Expand Down
8 changes: 0 additions & 8 deletions node/network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,6 @@ pub struct DataByHashRequest {
pub hashes: VariableList<Hash256, MaxRequestBlocks>,
}

// The message of `AnnounceFile` RPC message.
#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
pub struct FileAnnouncement {
pub tx_id: TxID,
pub num_shard: usize,
pub shard_id: usize,
}

/// Request a chunk array from a peer.
#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
pub struct GetChunksRequest {
Expand Down
2 changes: 1 addition & 1 deletion node/network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl<Id: ReqId> RPC<Id> {
.n_every(Protocol::Status, 5, Duration::from_secs(15))
.one_every(Protocol::Goodbye, Duration::from_secs(10))
.n_every(Protocol::DataByHash, 128, Duration::from_secs(10))
.n_every(Protocol::AnnounceFile, 256, Duration::from_secs(10))
.n_every(Protocol::AnswerFile, 256, Duration::from_secs(10))
.n_every(Protocol::GetChunks, 4096, Duration::from_secs(10))
.build()
.expect("Configuration parameters are valid");
Expand Down
Loading

0 comments on commit afa471e

Please sign in to comment.