From 9b4b0436c31bbd57a04981c519e0c3630fceb612 Mon Sep 17 00:00:00 2001 From: Bo QIU <35757521+boqiu@users.noreply.github.com> Date: Fri, 19 Jan 2024 14:04:59 +0800 Subject: [PATCH] Supports to sync partial chunks (#4) * refactor p2p signed message * add new pubsub messages in network layer to find chunks * handle find chunks pubsub message in router * Supports to sync partial chunks * add admin rpc to sync chunks * limit number of chunks to sync at a time * refactor code to sync file and chunks * add more switches to trigger file sync * fix ut failure * refactor code --- node/file_location_cache/src/test_util.rs | 4 +- node/network/src/behaviour/gossip_cache.rs | 26 ++++ node/network/src/types/mod.rs | 5 +- node/network/src/types/pubsub.rs | 82 ++++++++--- node/network/src/types/topics.rs | 17 ++- node/router/src/libp2p_event_handler.rs | 159 +++++++++++++++++--- node/rpc/src/admin/api.rs | 8 + node/rpc/src/admin/impl.rs | 30 ++++ node/src/config/convert.rs | 8 - node/src/config/mod.rs | 5 +- node/src/main.rs | 3 +- node/sync/src/controllers/mod.rs | 37 ++++- node/sync/src/controllers/serial.rs | 77 ++++++---- node/sync/src/lib.rs | 19 +-- node/sync/src/service.rs | 164 ++++++++++++++++----- run/config.toml | 4 +- 16 files changed, 516 insertions(+), 132 deletions(-) diff --git a/node/file_location_cache/src/test_util.rs b/node/file_location_cache/src/test_util.rs index adf20edc..2e2f8be3 100644 --- a/node/file_location_cache/src/test_util.rs +++ b/node/file_location_cache/src/test_util.rs @@ -1,6 +1,6 @@ use network::{ libp2p::identity, - types::{AnnounceFile, SignedAnnounceFile}, + types::{AnnounceFile, SignedAnnounceFile, SignedMessage}, Multiaddr, PeerId, }; use shared_types::{timestamp_now, TxID}; @@ -42,6 +42,6 @@ impl AnnounceFileBuilder { }; let keypair = identity::Keypair::generate_secp256k1(); - msg.into_signed(&keypair).unwrap() + SignedMessage::sign_message(msg, &keypair).unwrap() } } diff --git a/node/network/src/behaviour/gossip_cache.rs b/node/network/src/behaviour/gossip_cache.rs index 4ea3cad1..b3432de5 100644 --- a/node/network/src/behaviour/gossip_cache.rs +++ b/node/network/src/behaviour/gossip_cache.rs @@ -22,8 +22,12 @@ pub struct GossipCache { example: Option, /// Timeout for FindFile messages. find_file: Option, + /// Timeout for FindChunks messages. + find_chunks: Option, /// Timeout for AnnounceFile. announce_file: Option, + /// Timeout for AnnounceChunks. + announce_chunks: Option, } #[derive(Default)] @@ -33,8 +37,12 @@ pub struct GossipCacheBuilder { example: Option, /// Timeout for blocks FindFile messages. find_file: Option, + /// Timeout for blocks FindChunks messages. + find_chunks: Option, /// Timeout for AnnounceFile messages. announce_file: Option, + /// Timeout for AnnounceChunks messages. + announce_chunks: Option, } #[allow(dead_code)] @@ -58,18 +66,32 @@ impl GossipCacheBuilder { self } + /// Timeout for FindChunks messages. + pub fn find_chunks_timeout(mut self, timeout: Duration) -> Self { + self.find_chunks = Some(timeout); + self + } + /// Timeout for AnnounceFile messages. pub fn announce_file_timeout(mut self, timeout: Duration) -> Self { self.announce_file = Some(timeout); self } + /// Timeout for AnnounceChunks messages. + pub fn announce_chunks_timeout(mut self, timeout: Duration) -> Self { + self.announce_chunks = Some(timeout); + self + } + pub fn build(self) -> GossipCache { let GossipCacheBuilder { default_timeout, example, find_file, + find_chunks, announce_file, + announce_chunks, } = self; GossipCache { @@ -77,7 +99,9 @@ impl GossipCacheBuilder { topic_msgs: HashMap::default(), example: example.or(default_timeout), find_file: find_file.or(default_timeout), + find_chunks: find_chunks.or(default_timeout), announce_file: announce_file.or(default_timeout), + announce_chunks: announce_chunks.or(default_timeout), } } } @@ -94,7 +118,9 @@ impl GossipCache { let expire_timeout = match topic.kind() { GossipKind::Example => self.example, GossipKind::FindFile => self.find_file, + GossipKind::FindChunks => self.find_chunks, GossipKind::AnnounceFile => self.announce_file, + GossipKind::AnnounceChunks => self.announce_chunks, }; let expire_timeout = match expire_timeout { diff --git a/node/network/src/types/mod.rs b/node/network/src/types/mod.rs index 0269cf48..05d74dfb 100644 --- a/node/network/src/types/mod.rs +++ b/node/network/src/types/mod.rs @@ -6,5 +6,8 @@ mod topics; pub type Enr = discv5::enr::Enr; pub use globals::NetworkGlobals; -pub use pubsub::{AnnounceFile, FindFile, PubsubMessage, SignedAnnounceFile, SnappyTransform}; +pub use pubsub::{ + AnnounceChunks, AnnounceFile, FindChunks, FindFile, HasSignature, PubsubMessage, + SignedAnnounceChunks, SignedAnnounceFile, SignedMessage, SnappyTransform, +}; pub use topics::{GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS}; diff --git a/node/network/src/types/pubsub.rs b/node/network/src/types/pubsub.rs index 81cb8309..4e5a9e08 100644 --- a/node/network/src/types/pubsub.rs +++ b/node/network/src/types/pubsub.rs @@ -120,6 +120,14 @@ pub struct FindFile { pub timestamp: u32, } +#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] +pub struct FindChunks { + pub tx_id: TxID, + pub index_start: u64, // inclusive + pub index_end: u64, // exclusive + pub timestamp: u32, +} + #[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)] pub struct AnnounceFile { pub tx_id: TxID, @@ -128,46 +136,65 @@ pub struct AnnounceFile { pub timestamp: u32, } -impl AnnounceFile { - pub fn into_signed(self, keypair: &Keypair) -> Result { - let raw = self.as_ssz_bytes(); - let signature = keypair.sign(&raw)?; - - Ok(SignedAnnounceFile { - inner: self, - signature, - resend_timestamp: 0, - }) - } +#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)] +pub struct AnnounceChunks { + pub tx_id: TxID, + pub index_start: u64, // inclusive + pub index_end: u64, // exclusive + pub peer_id: WrappedPeerId, + pub at: WrappedMultiaddr, + pub timestamp: u32, } #[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)] -pub struct SignedAnnounceFile { - pub inner: AnnounceFile, +pub struct SignedMessage { + pub inner: T, pub signature: Vec, pub resend_timestamp: u32, } -impl SignedAnnounceFile { - pub fn verify_signature(&self, public_key: &PublicKey) -> bool { - let raw = self.inner.as_ssz_bytes(); - public_key.verify(&raw, &self.signature) +impl SignedMessage { + pub fn sign_message(msg: T, keypair: &Keypair) -> Result, SigningError> { + let raw = msg.as_ssz_bytes(); + let signature = keypair.sign(&raw)?; + + Ok(SignedMessage { + inner: msg, + signature, + resend_timestamp: 0, + }) } } -impl Deref for SignedAnnounceFile { - type Target = AnnounceFile; +impl Deref for SignedMessage { + type Target = T; fn deref(&self) -> &Self::Target { &self.inner } } +pub trait HasSignature { + fn verify_signature(&self, public_key: &PublicKey) -> bool; +} + +impl HasSignature for SignedMessage { + fn verify_signature(&self, public_key: &PublicKey) -> bool { + let raw = self.inner.as_ssz_bytes(); + public_key.verify(&raw, &self.signature) + } +} + +pub type SignedAnnounceFile = SignedMessage; +pub type SignedAnnounceChunks = SignedMessage; + #[derive(Debug, Clone, PartialEq, Eq)] pub enum PubsubMessage { ExampleMessage(u64), FindFile(FindFile), + FindChunks(FindChunks), AnnounceFile(SignedAnnounceFile), + AnnounceChunks(SignedAnnounceChunks), } // Implements the `DataTransform` trait of gossipsub to employ snappy compression @@ -242,7 +269,9 @@ impl PubsubMessage { match self { PubsubMessage::ExampleMessage(_) => GossipKind::Example, PubsubMessage::FindFile(_) => GossipKind::FindFile, + PubsubMessage::FindChunks(_) => GossipKind::FindChunks, PubsubMessage::AnnounceFile(_) => GossipKind::AnnounceFile, + PubsubMessage::AnnounceChunks(_) => GossipKind::AnnounceChunks, } } @@ -267,9 +296,16 @@ impl PubsubMessage { GossipKind::FindFile => Ok(PubsubMessage::FindFile( FindFile::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?, )), + GossipKind::FindChunks => Ok(PubsubMessage::FindChunks( + FindChunks::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?, + )), GossipKind::AnnounceFile => Ok(PubsubMessage::AnnounceFile( SignedAnnounceFile::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?, )), + GossipKind::AnnounceChunks => Ok(PubsubMessage::AnnounceChunks( + SignedAnnounceChunks::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?, + )), } } } @@ -285,7 +321,9 @@ impl PubsubMessage { match &self { PubsubMessage::ExampleMessage(data) => data.as_ssz_bytes(), PubsubMessage::FindFile(data) => data.as_ssz_bytes(), + PubsubMessage::FindChunks(data) => data.as_ssz_bytes(), PubsubMessage::AnnounceFile(data) => data.as_ssz_bytes(), + PubsubMessage::AnnounceChunks(data) => data.as_ssz_bytes(), } } } @@ -299,9 +337,15 @@ impl std::fmt::Display for PubsubMessage { PubsubMessage::FindFile(msg) => { write!(f, "FindFile message: {:?}", msg) } + PubsubMessage::FindChunks(msg) => { + write!(f, "FindChunks message: {:?}", msg) + } PubsubMessage::AnnounceFile(msg) => { write!(f, "AnnounceFile message: {:?}", msg) } + PubsubMessage::AnnounceChunks(msg) => { + write!(f, "AnnounceChunks message: {:?}", msg) + } } } } diff --git a/node/network/src/types/topics.rs b/node/network/src/types/topics.rs index 58367815..bc6498b2 100644 --- a/node/network/src/types/topics.rs +++ b/node/network/src/types/topics.rs @@ -9,9 +9,16 @@ pub const TOPIC_PREFIX: &str = "eth2"; pub const SSZ_SNAPPY_ENCODING_POSTFIX: &str = "ssz_snappy"; pub const EXAMPLE_TOPIC: &str = "example"; pub const FIND_FILE_TOPIC: &str = "find_file"; +pub const FIND_CHUNKS_TOPIC: &str = "find_chunks"; pub const ANNOUNCE_FILE_TOPIC: &str = "announce_file"; +pub const ANNOUNCE_CHUNKS_TOPIC: &str = "announce_chunks"; -pub const CORE_TOPICS: [GossipKind; 2] = [GossipKind::FindFile, GossipKind::AnnounceFile]; +pub const CORE_TOPICS: [GossipKind; 4] = [ + GossipKind::FindFile, + GossipKind::FindChunks, + GossipKind::AnnounceFile, + GossipKind::AnnounceChunks, +]; /// A gossipsub topic which encapsulates the type of messages that should be sent and received over /// the pubsub protocol and the way the messages should be encoded. @@ -30,7 +37,9 @@ pub struct GossipTopic { pub enum GossipKind { Example, FindFile, + FindChunks, AnnounceFile, + AnnounceChunks, } /// The known encoding types for gossipsub messages. @@ -67,7 +76,9 @@ impl GossipTopic { let kind = match topic_parts[2] { EXAMPLE_TOPIC => GossipKind::Example, FIND_FILE_TOPIC => GossipKind::FindFile, + FIND_CHUNKS_TOPIC => GossipKind::FindChunks, ANNOUNCE_FILE_TOPIC => GossipKind::AnnounceFile, + ANNOUNCE_CHUNKS_TOPIC => GossipKind::AnnounceChunks, _ => return Err(format!("Unknown topic: {}", topic)), }; @@ -93,7 +104,9 @@ impl From for String { let kind = match topic.kind { GossipKind::Example => EXAMPLE_TOPIC, GossipKind::FindFile => FIND_FILE_TOPIC, + GossipKind::FindChunks => FIND_CHUNKS_TOPIC, GossipKind::AnnounceFile => ANNOUNCE_FILE_TOPIC, + GossipKind::AnnounceChunks => ANNOUNCE_CHUNKS_TOPIC, }; format!("/{}/{}/{}", TOPIC_PREFIX, kind, encoding) @@ -109,7 +122,9 @@ impl std::fmt::Display for GossipTopic { let kind = match self.kind { GossipKind::Example => EXAMPLE_TOPIC, GossipKind::FindFile => FIND_FILE_TOPIC, + GossipKind::FindChunks => FIND_CHUNKS_TOPIC, GossipKind::AnnounceFile => ANNOUNCE_FILE_TOPIC, + GossipKind::AnnounceChunks => ANNOUNCE_CHUNKS_TOPIC, }; write!(f, "/{}/{}/{}", TOPIC_PREFIX, kind, encoding) diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index 0b6c25f3..7081d94f 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -3,11 +3,14 @@ use std::{ops::Neg, sync::Arc}; use file_location_cache::FileLocationCache; use network::{ rpc::StatusMessage, - types::{AnnounceFile, FindFile, SignedAnnounceFile}, + types::{ + AnnounceChunks, AnnounceFile, FindChunks, FindFile, HasSignature, SignedAnnounceChunks, + SignedAnnounceFile, SignedMessage, + }, Keypair, MessageAcceptance, MessageId, NetworkGlobals, NetworkMessage, PeerId, PeerRequestId, PublicKey, PubsubMessage, Request, RequestId, Response, }; -use shared_types::{timestamp_now, TxID}; +use shared_types::{bytes_to_chunks, timestamp_now, TxID}; use storage_async::Store; use sync::{SyncMessage, SyncSender}; use tokio::sync::{mpsc, RwLock}; @@ -40,6 +43,21 @@ fn peer_id_to_public_key(peer_id: &PeerId) -> Result { }) } +fn verify_signature(msg: &dyn HasSignature, peer_id: &PeerId, propagation_source: PeerId) -> bool { + match peer_id_to_public_key(peer_id) { + Ok(pub_key) => msg.verify_signature(&pub_key), + Err(err) => { + error!( + ?err, + ?peer_id, + ?propagation_source, + "Failed to verify signature" + ); + false + } + } +} + pub struct Libp2pEventHandler { /// A collection of global variables, accessible outside of the network service. network_globals: Arc, @@ -213,7 +231,9 @@ impl Libp2pEventHandler { match message { PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore, PubsubMessage::FindFile(msg) => self.on_find_file(msg).await, + PubsubMessage::FindChunks(msg) => self.on_find_chunks(msg).await, PubsubMessage::AnnounceFile(msg) => self.on_announce_file(propagation_source, msg), + PubsubMessage::AnnounceChunks(msg) => self.on_announce_chunks(propagation_source, msg), } } @@ -237,7 +257,7 @@ impl Libp2pEventHandler { timestamp, }; - let mut signed = match msg.into_signed(&self.local_keypair) { + let mut signed = match SignedMessage::sign_message(msg, &self.local_keypair) { Ok(signed) => signed, Err(e) => { error!(%tx_id.seq, %e, "Failed to sign AnnounceFile message"); @@ -292,28 +312,106 @@ impl Libp2pEventHandler { MessageAcceptance::Accept } - fn on_announce_file( + pub fn construct_announce_chunks_message( &self, - propagation_source: PeerId, - msg: SignedAnnounceFile, - ) -> MessageAcceptance { - // verify message signature - let pk = match peer_id_to_public_key(&msg.peer_id) { - Ok(pk) => pk, + tx_id: TxID, + index_start: u64, + index_end: u64, + ) -> Option { + let peer_id = *self.network_globals.peer_id.read(); + let addr = self + .network_globals + .listen_multiaddrs + .read() + .first()? + .clone(); + let timestamp = timestamp_now(); + + let msg = AnnounceChunks { + tx_id, + index_start, + index_end, + peer_id: peer_id.into(), + at: addr.into(), + timestamp, + }; + + let mut signed = match SignedMessage::sign_message(msg, &self.local_keypair) { + Ok(signed) => signed, Err(e) => { - error!( - "Failed to convert peer id {:?} to public key: {:?}", - msg.peer_id, e - ); + error!(%tx_id.seq, %e, "Failed to sign AnnounceChunks message"); + return None; + } + }; + + signed.resend_timestamp = timestamp; + + Some(PubsubMessage::AnnounceChunks(signed)) + } + + async fn on_find_chunks(&self, msg: FindChunks) -> MessageAcceptance { + // validate message + if msg.index_start >= msg.index_end { + debug!(?msg, "Invalid chunk index range"); + return MessageAcceptance::Reject; + } + + // verify timestamp + let d = duration_since(msg.timestamp); + if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT { + debug!(%msg.timestamp, "Invalid timestamp, ignoring FindFile message"); + return MessageAcceptance::Ignore; + } + + // check if we have specified chunks even file not finalized yet + // validate end index + let tx = match self.store.get_tx_by_seq_number(msg.tx_id.seq).await { + Ok(Some(tx)) if tx.id() == msg.tx_id => tx, + _ => return MessageAcceptance::Accept, + }; + + // validate index range + if let Ok(size) = usize::try_from(tx.size) { + let num_chunks = bytes_to_chunks(size); + if msg.index_end > num_chunks as u64 { + debug!(?msg, "Invalid chunk end index for FindChunks message"); return MessageAcceptance::Reject; } + } + + // TODO(qhz): check if there is better way to check existence of requested chunks. + let _ = match self + .store + .get_chunks_by_tx_and_index_range( + msg.tx_id.seq, + msg.index_start as usize, + msg.index_end as usize, + ) + .await + { + Ok(Some(_)) => (), + _ => return MessageAcceptance::Accept, }; - if !msg.verify_signature(&pk) { - warn!( - "Received message with invalid signature from peer {:?}", - propagation_source - ); + debug!(?msg, "Found chunks to respond FindChunks message"); + + match self.construct_announce_chunks_message(msg.tx_id, msg.index_start, msg.index_end) { + Some(msg) => { + self.publish(msg); + MessageAcceptance::Ignore + } + // propagate FindFile query to other nodes + None => MessageAcceptance::Accept, + } + } + + fn on_announce_file( + &self, + propagation_source: PeerId, + msg: SignedAnnounceFile, + ) -> MessageAcceptance { + // verify message signature + if !verify_signature(&msg, &msg.peer_id, propagation_source) { return MessageAcceptance::Reject; } @@ -336,6 +434,29 @@ impl Libp2pEventHandler { MessageAcceptance::Accept } + + fn on_announce_chunks( + &self, + propagation_source: PeerId, + msg: SignedAnnounceChunks, + ) -> MessageAcceptance { + // verify message signature + if !verify_signature(&msg, &msg.peer_id, propagation_source) { + return MessageAcceptance::Reject; + } + + // propagate gossip to peers + let d = duration_since(msg.resend_timestamp); + if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_FILE_TIMEOUT { + debug!(%msg.resend_timestamp, "Invalid resend timestamp, ignoring AnnounceChunks message"); + return MessageAcceptance::Ignore; + } + + // notify sync layer + self.send_to_sync(SyncMessage::AnnounceChunksGossip { msg: msg.inner }); + + MessageAcceptance::Accept + } } #[cfg(test)] diff --git a/node/rpc/src/admin/api.rs b/node/rpc/src/admin/api.rs index d4bc63db..8c178edb 100644 --- a/node/rpc/src/admin/api.rs +++ b/node/rpc/src/admin/api.rs @@ -12,6 +12,14 @@ pub trait Rpc { #[method(name = "startSyncFile")] async fn start_sync_file(&self, tx_seq: u64) -> RpcResult<()>; + #[method(name = "startSyncChunks")] + async fn start_sync_chunks( + &self, + tx_seq: u64, + start_index: u64, + end_index: u64, // exclusive + ) -> RpcResult<()>; + #[method(name = "getSyncStatus")] async fn get_sync_status(&self, tx_seq: u64) -> RpcResult; diff --git a/node/rpc/src/admin/impl.rs b/node/rpc/src/admin/impl.rs index 0406c626..99a069c6 100644 --- a/node/rpc/src/admin/impl.rs +++ b/node/rpc/src/admin/impl.rs @@ -47,6 +47,36 @@ impl RpcServer for RpcServerImpl { } } + #[tracing::instrument(skip(self), err)] + async fn start_sync_chunks( + &self, + tx_seq: u64, + start_index: u64, + end_index: u64, + ) -> RpcResult<()> { + info!("admin_startSyncChunks({tx_seq}, {start_index}, {end_index})"); + + let response = self + .ctx + .request_sync(SyncRequest::SyncChunks { + tx_seq, + start_index, + end_index, + }) + .await?; + + match response { + SyncResponse::SyncFile { err } => { + if err.is_empty() { + Ok(()) + } else { + Err(error::internal_error(err)) + } + } + _ => Err(error::internal_error("unexpected response type")), + } + } + #[tracing::instrument(skip(self), err)] async fn get_sync_status(&self, tx_seq: u64) -> RpcResult { info!("admin_getSyncStatus({tx_seq})"); diff --git a/node/src/config/convert.rs b/node/src/config/convert.rs index 57be42ca..13db69bf 100644 --- a/node/src/config/convert.rs +++ b/node/src/config/convert.rs @@ -6,7 +6,6 @@ use log_entry_sync::{CacheConfig, ContractAddress, LogSyncConfig}; use miner::MinerConfig; use network::NetworkConfig; use rpc::RPCConfig; -use std::time::Duration; use storage::StorageConfig; impl ZgsConfig { @@ -148,11 +147,4 @@ impl ZgsConfig { router_config.libp2p_nodes = network_config.libp2p_nodes.to_vec(); Ok(router_config) } - - pub fn sync_config(&self) -> sync::Config { - let mut config = self.sync.clone(); - config.find_peer_timeout = Duration::from_secs(self.find_peer_timeout_secs); - config.enable_chunk_request = self.enable_chunk_request; - config - } } diff --git a/node/src/config/mod.rs b/node/src/config/mod.rs index 5ecfba68..435452ed 100644 --- a/node/src/config/mod.rs +++ b/node/src/config/mod.rs @@ -57,10 +57,6 @@ build_config! { (mine_contract_address, (String), "".to_string()) (miner_id, (Option), None) (miner_key, (Option), None) - - // sync - (find_peer_timeout_secs, (u64), 30) - (enable_chunk_request, (bool), true) } #[derive(Debug, Default, Deserialize)] @@ -68,6 +64,7 @@ build_config! { pub struct ZgsConfig { pub raw_conf: RawConfiguration, + // sync config, configured by [sync] section by `config` crate. pub sync: sync::Config, } diff --git a/node/src/main.rs b/node/src/main.rs index dade5ee8..9a86bb98 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -17,7 +17,6 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result Result Self { + assert!( + index_start < index_end && index_end <= num_chunks, + "invalid index_end" + ); + Self { + num_chunks, + index_start, + index_end, + } + } + + pub fn new_file(num_chunks: u64) -> Self { + Self::new(num_chunks, 0, num_chunks) + } + + pub fn is_all_chunks(&self) -> bool { + self.index_start == 0 && self.index_end == self.num_chunks + } +} #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct FileSyncInfo { pub elapsed_secs: u64, pub peers: usize, - pub num_chunks: u64, + pub goal: FileSyncGoal, pub next_chunks: u64, pub state: String, } diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index 91d672c5..442f93f6 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -1,8 +1,9 @@ use crate::context::SyncNetworkContext; use crate::controllers::peers::{PeerState, SyncPeers}; -use crate::controllers::FileSyncInfo; +use crate::controllers::{FileSyncGoal, FileSyncInfo}; use file_location_cache::FileLocationCache; use libp2p::swarm::DialError; +use network::types::FindChunks; use network::{ multiaddr::Protocol, rpc::GetChunksRequest, types::FindFile, Multiaddr, NetworkMessage, PeerAction, PeerId, PubsubMessage, SyncId as RequestId, @@ -14,7 +15,7 @@ use std::{ }; use storage_async::Store; -const MAX_CHUNKS_TO_REQUEST: u64 = 2 * 1024; +pub const MAX_CHUNKS_TO_REQUEST: u64 = 2 * 1024; const MAX_REQUEST_FAILURES: usize = 100; const PEER_REQUEST_TIMEOUT: Duration = Duration::from_secs(5); const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(5); @@ -63,8 +64,8 @@ pub struct SerialSyncController { since: Instant, - /// The size of the file to be synced. - num_chunks: u64, + /// File sync goal. + goal: FileSyncGoal, /// The next chunk id that we need to retrieve. next_chunk: u64, @@ -91,7 +92,7 @@ pub struct SerialSyncController { impl SerialSyncController { pub fn new( tx_id: TxID, - num_chunks: u64, + goal: FileSyncGoal, ctx: Arc, store: Store, file_location_cache: Arc, @@ -100,8 +101,8 @@ impl SerialSyncController { tx_seq: tx_id.seq, tx_id, since: Instant::now(), - num_chunks, - next_chunk: 0, + goal, + next_chunk: goal.index_start, failures: 0, state: SyncState::Idle, peers: Default::default(), @@ -115,7 +116,7 @@ impl SerialSyncController { FileSyncInfo { elapsed_secs: self.since.elapsed().as_secs(), peers: self.peers.count(&[PeerState::Connected]), - num_chunks: self.num_chunks, + goal: self.goal, next_chunks: self.next_chunk, state: format!("{:?}", self.state), } @@ -127,7 +128,7 @@ impl SerialSyncController { /// Resets the status to re-sync file when failed. pub fn reset(&mut self) { - self.next_chunk = 0; + self.next_chunk = self.goal.index_start; self.failures = 0; self.state = SyncState::Idle; // remove disconnected peers @@ -137,6 +138,23 @@ impl SerialSyncController { fn try_find_peers(&mut self) { info!(%self.tx_seq, "Finding peers"); + if self.goal.is_all_chunks() { + self.publish_find_file(); + } else { + self.publish_find_chunks(); + } + + let now = Instant::now(); + + let (since, updated) = match self.state { + SyncState::FindingPeers { since, .. } => (since, now), + _ => (now, now), + }; + + self.state = SyncState::FindingPeers { since, updated }; + } + + fn publish_find_file(&mut self) { // try from cache let mut found_new_peer = false; @@ -149,21 +167,23 @@ impl SerialSyncController { found_new_peer = self.on_peer_found(peer_id, addr) || found_new_peer; } - if !found_new_peer { - self.ctx.publish(PubsubMessage::FindFile(FindFile { - tx_id: self.tx_id, - timestamp: timestamp_now(), - })); + if found_new_peer { + return; } - let now = Instant::now(); - - let (since, updated) = match self.state { - SyncState::FindingPeers { since, .. } => (since, now), - _ => (now, now), - }; + self.ctx.publish(PubsubMessage::FindFile(FindFile { + tx_id: self.tx_id, + timestamp: timestamp_now(), + })); + } - self.state = SyncState::FindingPeers { since, updated }; + fn publish_find_chunks(&self) { + self.ctx.publish(PubsubMessage::FindChunks(FindChunks { + tx_id: self.tx_id, + index_start: self.goal.index_start, + index_end: self.goal.index_end, + timestamp: timestamp_now(), + })); } fn try_connect(&mut self) { @@ -201,7 +221,7 @@ impl SerialSyncController { // request next chunk array let from_chunk = self.next_chunk; - let to_chunk = std::cmp::min(from_chunk + MAX_CHUNKS_TO_REQUEST, self.num_chunks); + let to_chunk = std::cmp::min(from_chunk + MAX_CHUNKS_TO_REQUEST, self.goal.index_end); let request_id = network::RequestId::Sync(RequestId::SerialSync { tx_id: self.tx_id }); @@ -412,11 +432,17 @@ impl SerialSyncController { } // prepare to download next - if self.next_chunk < self.num_chunks { + if self.next_chunk < self.goal.index_end { self.state = SyncState::Idle; return; } + // completed to download chunks + if !self.goal.is_all_chunks() { + self.state = SyncState::Completed; + return; + } + // finalize tx if all chunks downloaded match self .store @@ -1317,7 +1343,8 @@ mod tests { since: Instant::now(), }; - controller.num_chunks = 2048; + controller.goal.num_chunks = 2048; + controller.goal.index_end = 2048; controller.on_response(peer_id, chunks).await; match controller.get_status() { @@ -1499,7 +1526,7 @@ mod tests { let controller = SerialSyncController::new( tx_id, - num_chunks as u64, + FileSyncGoal::new_file(num_chunks as u64), ctx, Store::new(store, task_executor), file_location_cache, diff --git a/node/sync/src/lib.rs b/node/sync/src/lib.rs index dd5be31d..2f24701b 100644 --- a/node/sync/src/lib.rs +++ b/node/sync/src/lib.rs @@ -13,30 +13,25 @@ use serde::Deserialize; pub use service::{SyncMessage, SyncReceiver, SyncRequest, SyncResponse, SyncSender, SyncService}; use std::time::Duration; -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Copy, Debug, Deserialize)] #[serde(default)] pub struct Config { - pub auto_sync_disabled: bool, + pub auto_sync_enabled: bool, pub max_sync_files: usize, #[serde(deserialize_with = "deserialize_duration")] pub find_peer_timeout: Duration, - pub enable_chunk_request: bool, + pub sync_file_by_rpc_enabled: bool, + pub sync_file_on_announcement_enabled: bool, } impl Default for Config { fn default() -> Self { Self { - auto_sync_disabled: false, + auto_sync_enabled: false, max_sync_files: 100, find_peer_timeout: Duration::from_secs(30), - enable_chunk_request: false, + sync_file_by_rpc_enabled: true, + sync_file_on_announcement_enabled: false, } } } - -impl Config { - pub fn disable_auto_sync(mut self) -> Self { - self.auto_sync_disabled = true; - self - } -} diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 5a5a7403..d99ddba2 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -1,11 +1,15 @@ use crate::auto_sync::AutoSyncManager; use crate::context::SyncNetworkContext; -use crate::controllers::{FailureReason, FileSyncInfo, SerialSyncController, SyncState}; +use crate::controllers::{ + FailureReason, FileSyncGoal, FileSyncInfo, SerialSyncController, SyncState, + MAX_CHUNKS_TO_REQUEST, +}; use crate::Config; use anyhow::{bail, Result}; use file_location_cache::FileLocationCache; use libp2p::swarm::DialError; use log_entry_sync::LogSyncEvent; +use network::types::AnnounceChunks; use network::{ rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, PeerId, PeerRequestId, SyncId as RequestId, @@ -56,14 +60,31 @@ pub enum SyncMessage { peer_id: PeerId, addr: Multiaddr, }, + AnnounceChunksGossip { + msg: AnnounceChunks, + }, } #[derive(Debug)] pub enum SyncRequest { - SyncStatus { tx_seq: u64 }, - SyncFile { tx_seq: u64 }, - FileSyncInfo { tx_seq: Option }, - TerminateFileSync { tx_seq: u64, is_reverted: bool }, + SyncStatus { + tx_seq: u64, + }, + SyncFile { + tx_seq: u64, + }, + SyncChunks { + tx_seq: u64, + start_index: u64, + end_index: u64, + }, + FileSyncInfo { + tx_seq: Option, + }, + TerminateFileSync { + tx_seq: u64, + is_reverted: bool, + }, } #[derive(Debug)] @@ -134,7 +155,7 @@ impl SyncService { let manager = AutoSyncManager::new(store.clone(), sync_send.clone(), config.clone()).await?; - if !config.auto_sync_disabled { + if config.auto_sync_enabled { manager.spwn(&executor, event_recv); } @@ -218,6 +239,8 @@ impl SyncService { } => { self.on_announce_file_gossip(tx_id, peer_id, addr).await; } + + SyncMessage::AnnounceChunksGossip { msg } => self.on_announce_chunks_gossip(msg).await, } } @@ -237,24 +260,19 @@ impl SyncService { } SyncRequest::SyncFile { tx_seq } => { - if !self.controllers.contains_key(&tx_seq) - && self.controllers.len() >= self.config.max_sync_files - { - let _ = sender.send(SyncResponse::SyncFile { - err: format!( - "max sync file limitation reached: {:?}", - self.config.max_sync_files - ), - }); - return; - } - - let err = match self.on_start_sync_file(tx_seq, None).await { - Ok(()) => "".into(), - Err(err) => err.to_string(), - }; + let result = self.on_sync_file_request(tx_seq, None).await; + let _ = sender.send(SyncResponse::SyncFile { err: result }); + } - let _ = sender.send(SyncResponse::SyncFile { err }); + SyncRequest::SyncChunks { + tx_seq, + start_index, + end_index, + } => { + let result = self + .on_sync_file_request(tx_seq, Some((start_index, end_index))) + .await; + let _ = sender.send(SyncResponse::SyncFile { err: result }); } SyncRequest::FileSyncInfo { tx_seq } => { @@ -348,6 +366,12 @@ impl SyncService { return Ok(()); } + // ban peer if requested too many chunks + if request.index_end - request.index_start > MAX_CHUNKS_TO_REQUEST { + self.ctx.ban_peer(peer_id, "Too many chunks requested"); + return Ok(()); + } + // ban peer if invalid tx requested let tx = match self.store.get_tx_by_seq_number(request.tx_id.seq).await? { Some(tx) => tx, @@ -463,15 +487,37 @@ impl SyncService { } } + async fn on_sync_file_request( + &mut self, + tx_seq: u64, + maybe_range: Option<(u64, u64)>, + ) -> String { + if maybe_range.is_none() && !self.config.sync_file_by_rpc_enabled { + return "Disabled to sync file".into(); + } + + if !self.controllers.contains_key(&tx_seq) + && self.controllers.len() >= self.config.max_sync_files + { + return format!( + "Max sync file limitation reached: {}", + self.config.max_sync_files + ); + } + + match self.on_start_sync_file(tx_seq, maybe_range, None).await { + Ok(()) => "".into(), + Err(e) => e.to_string(), + } + } + async fn on_start_sync_file( &mut self, tx_seq: u64, + maybe_range: Option<(u64, u64)>, maybe_peer: Option<(PeerId, Multiaddr)>, ) -> Result<()> { info!(%tx_seq, "Start to sync file"); - if !self.config.enable_chunk_request { - return Ok(()); - } // remove failed entry if caused by tx reverted, so as to re-sync // file with latest tx_id. @@ -498,7 +544,7 @@ impl SyncService { }; let num_chunks = match usize::try_from(tx.size) { - Ok(size) => bytes_to_chunks(size), + Ok(size) => bytes_to_chunks(size) as u64, Err(_) => { error!(%tx_seq, "Unexpected transaction size: {}", tx.size); bail!("Unexpected transaction size"); @@ -510,9 +556,18 @@ impl SyncService { bail!("File already exists"); } + let (index_start, index_end) = match maybe_range { + Some((start, end)) => (start, end), + None => (0, num_chunks), + }; + + if index_start >= index_end || index_end > num_chunks { + bail!("invalid chunk range"); + } + entry.insert(SerialSyncController::new( tx.id(), - num_chunks as u64, + FileSyncGoal::new(num_chunks, index_start, index_end), self.ctx.clone(), self.store.clone(), self.file_location_cache.clone(), @@ -542,8 +597,16 @@ impl SyncService { // File already in sync if let Some(controller) = self.controllers.get_mut(&tx_seq) { - controller.on_peer_found(peer_id, addr); - controller.transition(); + let info = controller.get_sync_info(); + if info.goal.is_all_chunks() { + controller.on_peer_found(peer_id, addr); + controller.transition(); + } + + return; + } + + if !self.config.sync_file_on_announcement_enabled { return; } @@ -558,12 +621,30 @@ impl SyncService { } // Now, always sync files among all nodes - if let Err(err) = self.on_start_sync_file(tx_seq, Some((peer_id, addr))).await { + if let Err(err) = self + .on_start_sync_file(tx_seq, None, Some((peer_id, addr))) + .await + { // FIXME(zz): This is possible for tx missing. Is it expected? error!(%tx_seq, %err, "Failed to sync file"); } } + async fn on_announce_chunks_gossip(&mut self, msg: AnnounceChunks) { + info!(?msg, "Received AnnounceChunks gossip"); + + if let Some(controller) = self.controllers.get_mut(&msg.tx_id.seq) { + let info = controller.get_sync_info(); + if !info.goal.is_all_chunks() + && info.goal.index_start == msg.index_start + && info.goal.index_end == msg.index_end + { + controller.on_peer_found(msg.peer_id.into(), msg.at.into()); + controller.transition(); + } + } + } + /// Terminate file sync of `min_tx_seq`. /// If `is_reverted` is `true` (means confirmed transactions reverted), /// also terminate `tx_seq` greater than `min_tx_seq` @@ -679,6 +760,15 @@ mod tests { } async fn spawn_sync_service(&self, with_peer_store: bool) -> SyncSender { + self.spawn_sync_service_with_config(with_peer_store, Config::default()) + .await + } + + async fn spawn_sync_service_with_config( + &self, + with_peer_store: bool, + config: Config, + ) -> SyncSender { let store = if with_peer_store { self.peer_store.clone() } else { @@ -686,7 +776,7 @@ mod tests { }; SyncService::spawn_with_config( - Config::default().disable_auto_sync(), + config, self.runtime.task_executor.clone(), self.network_send.clone(), store, @@ -719,7 +809,7 @@ mod tests { .unwrap(); let mut sync = SyncService { - config: Config::default().disable_auto_sync(), + config: Config::default(), msg_recv: sync_recv, ctx: Arc::new(SyncNetworkContext::new(network_send)), store, @@ -754,7 +844,7 @@ mod tests { .unwrap(); let mut sync = SyncService { - config: Config::default().disable_auto_sync(), + config: Config::default(), msg_recv: sync_recv, ctx: Arc::new(SyncNetworkContext::new(network_send)), store, @@ -1070,7 +1160,7 @@ mod tests { let (network_send, mut network_recv) = mpsc::unbounded_channel::(); let (_event_send, event_recv) = broadcast::channel(16); let sync_send = SyncService::spawn_with_config( - Config::default().disable_auto_sync(), + Config::default(), runtime.task_executor.clone(), network_send, store.clone(), @@ -1341,7 +1431,9 @@ mod tests { #[tokio::test] async fn test_announce_file() { let mut runtime = TestSyncRuntime::new(vec![1535], 0); - let sync_send = runtime.spawn_sync_service(false).await; + let mut config = Config::default(); + config.sync_file_on_announcement_enabled = true; + let sync_send = runtime.spawn_sync_service_with_config(false, config).await; let tx_seq = 0u64; let address: Multiaddr = "/ip4/127.0.0.1/tcp/10000".parse().unwrap(); diff --git a/run/config.toml b/run/config.toml index 293bb695..6f94148d 100644 --- a/run/config.toml +++ b/run/config.toml @@ -27,6 +27,8 @@ log_sync_start_block_number = 134253180 log_page_size = 999 # [sync] -# auto_sync_disabled = false +# auto_sync_enabled = false # max_sync_files = 8 # find_peer_timeout = "30s" +# sync_file_by_rpc_enabled = true +# sync_file_on_announcement_enabled = false