diff --git a/node/network/src/behaviour/mod.rs b/node/network/src/behaviour/mod.rs index e18f0afa..f0e9da75 100644 --- a/node/network/src/behaviour/mod.rs +++ b/node/network/src/behaviour/mod.rs @@ -390,7 +390,7 @@ impl Behaviour { .gossipsub .publish(topic.clone().into(), message_data.clone()) { - warn!(error = ?e, "Could not publish message"); + warn!(error = ?e, topic = ?topic.kind(), "Failed to publish message"); // add to metrics if let Some(v) = metrics::get_int_gauge( diff --git a/node/network/src/config.rs b/node/network/src/config.rs index 616bd8b6..2f44dff6 100644 --- a/node/network/src/config.rs +++ b/node/network/src/config.rs @@ -294,32 +294,6 @@ impl From for NetworkLoad { /// Return a Lighthouse specific `GossipsubConfig` where the `message_id_fn` depends on the current fork. pub fn gossipsub_config(network_load: u8) -> GossipsubConfig { - // The function used to generate a gossipsub message id - // We use the first 8 bytes of SHA256(data) for content addressing - let fast_gossip_message_id = - |message: &RawGossipsubMessage| FastMessageId::from(&Sha256::digest(&message.data)[..8]); - fn prefix(prefix: [u8; 4], message: &GossipsubMessage) -> Vec { - let topic_bytes = message.topic.as_str().as_bytes(); - - // according to: https://github.com/ethereum/consensus-specs/blob/dev/specs/merge/p2p-interface.md#the-gossip-domain-gossipsub - // the derivation of the message-id remains the same in the merge - let topic_len_bytes = topic_bytes.len().to_le_bytes(); - let mut vec = Vec::with_capacity( - prefix.len() + topic_len_bytes.len() + topic_bytes.len() + message.data.len(), - ); - vec.extend_from_slice(&prefix); - vec.extend_from_slice(&topic_len_bytes); - vec.extend_from_slice(topic_bytes); - vec.extend_from_slice(&message.data); - vec - } - - let gossip_message_id = move |message: &GossipsubMessage| { - MessageId::from( - &Sha256::digest(prefix(MESSAGE_DOMAIN_VALID_SNAPPY, message).as_slice())[..20], - ) - }; - let load = NetworkLoad::from(network_load); GossipsubConfigBuilder::default() @@ -368,3 +342,41 @@ fn is_global(addr: &std::net::Ipv4Addr) -> bool { // Make sure the address is not in 0.0.0.0/8 && addr.octets()[0] != 0 } + +fn fast_gossip_message_id(message: &RawGossipsubMessage) -> FastMessageId { + // source | topic | data | nonce + let topic_bytes = message.topic.as_str().as_bytes(); + let mut buf = Vec::with_capacity(64 + topic_bytes.len() + message.data.len() + 8); + + if let Some(peer_id) = message.source { + buf.extend_from_slice(&peer_id.to_bytes()); + } + + buf.extend_from_slice(&topic_bytes); + buf.extend_from_slice(&message.data); + + if let Some(nonce) = message.sequence_number { + buf.extend_from_slice(&nonce.to_le_bytes()); + } + + FastMessageId::from(&Sha256::digest(&buf)[..8]) +} + +fn gossip_message_id(message: &GossipsubMessage) -> MessageId { + // prefix | source | topic | data | nonce + let topic_bytes = message.topic.as_str().as_bytes(); + let mut vec = Vec::with_capacity( + MESSAGE_DOMAIN_VALID_SNAPPY.len() + 64 + topic_bytes.len() + message.data.len() + 8, + ); + vec.extend_from_slice(&MESSAGE_DOMAIN_VALID_SNAPPY); + if let Some(peer_id) = message.source { + vec.extend_from_slice(&peer_id.to_bytes()); + } + vec.extend_from_slice(topic_bytes); + vec.extend_from_slice(&message.data); + if let Some(nonce) = message.sequence_number { + vec.extend_from_slice(&nonce.to_le_bytes()); + } + + MessageId::from(&Sha256::digest(&vec)[..20]) +} diff --git a/node/network/src/lib.rs b/node/network/src/lib.rs index 29a503f2..2ed2f8f2 100644 --- a/node/network/src/lib.rs +++ b/node/network/src/lib.rs @@ -97,9 +97,11 @@ pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_F /// - v1: Broadcast FindFile & AnnounceFile messages in the whole network, which caused network too heavey. /// - v2: Publish NewFile to neighbors only and announce file via RPC message. /// - v3: Add shard config in Status message. +/// - v4: Refactor pubsub messages. pub const PROTOCOL_VERSION_V1: [u8; 3] = [0, 1, 1]; pub const PROTOCOL_VERSION_V2: [u8; 3] = [0, 2, 1]; pub const PROTOCOL_VERSION_V3: [u8; 3] = [0, 3, 0]; +pub const PROTOCOL_VERSION_V4: [u8; 3] = [0, 4, 0]; /// Application level requests sent to the network. #[derive(Debug, Clone, Copy)] diff --git a/node/router/src/service.rs b/node/router/src/service.rs index 190ff0c7..f9d4160e 100644 --- a/node/router/src/service.rs +++ b/node/router/src/service.rs @@ -343,6 +343,7 @@ impl RouterService { let msg = PubsubMessage::NewFile(new_file.into()); self.libp2p.swarm.behaviour_mut().publish(vec![msg]); metrics::SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE.mark(1); + debug!(?new_file, "Publish NewFile message"); } NetworkMessage::UPnPMappingEstablished { tcp_socket, diff --git a/node/src/config/convert.rs b/node/src/config/convert.rs index 0ba2a1eb..5f765594 100644 --- a/node/src/config/convert.rs +++ b/node/src/config/convert.rs @@ -43,13 +43,9 @@ impl ZgsConfig { chain_id, flow_address, p2p_protocol_version: ProtocolVersion { - major: network::PROTOCOL_VERSION_V3[0], - minor: network::PROTOCOL_VERSION_V3[1], - build: if self.sync.neighbors_only { - network::PROTOCOL_VERSION_V3[2] + 1 - } else { - network::PROTOCOL_VERSION_V3[2] - }, + major: network::PROTOCOL_VERSION_V4[0], + minor: network::PROTOCOL_VERSION_V4[1], + build: network::PROTOCOL_VERSION_V4[2], }, }; network_config.network_id = local_network_id.clone(); diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index 0865b3ed..cba6b698 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -677,7 +677,7 @@ impl SerialSyncController { } else { // FindFile timeout if since.elapsed() >= self.config.peer_find_timeout { - if self.config.neighbors_only { + if self.goal.is_all_chunks() && self.config.neighbors_only { self.state = SyncState::Failed { reason: FailureReason::TimeoutFindFile, }; @@ -1694,7 +1694,10 @@ mod tests { let file_location_cache = create_file_location_cache(peer_id, vec![tx_id]); let controller = SerialSyncController::new( - Config::default(), + Config { + neighbors_only: false, + ..Default::default() + }, tx_id, 0, FileSyncGoal::new_file(num_chunks as u64), diff --git a/node/sync/src/lib.rs b/node/sync/src/lib.rs index 3f08d853..cfe2df50 100644 --- a/node/sync/src/lib.rs +++ b/node/sync/src/lib.rs @@ -68,7 +68,7 @@ impl Default for Config { fn default() -> Self { Self { // sync service config - neighbors_only: false, + neighbors_only: true, heartbeat_interval: Duration::from_secs(5), auto_sync_enabled: false, max_sync_files: 8, diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 90e1c316..646da5be 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -941,8 +941,14 @@ mod tests { } async fn spawn_sync_service(&mut self, with_peer_store: bool) -> SyncSender { - self.spawn_sync_service_with_config(with_peer_store, Config::default()) - .await + self.spawn_sync_service_with_config( + with_peer_store, + Config { + neighbors_only: false, + ..Default::default() + }, + ) + .await } async fn spawn_sync_service_with_config( diff --git a/run/config-testnet-standard.toml b/run/config-testnet-standard.toml index d39b57de..5434154e 100644 --- a/run/config-testnet-standard.toml +++ b/run/config-testnet-standard.toml @@ -232,10 +232,6 @@ batcher_announcement_capacity = 100 # all files, and sufficient disk space is required. auto_sync_enabled = true -# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file -# announcements in the whole network, which leads to high latency or even timeout to sync files. -neighbors_only = true - # Maximum number of files in sync from other peers simultaneously. # max_sync_files = 8 diff --git a/run/config-testnet-turbo.toml b/run/config-testnet-turbo.toml index f3083cca..a59047af 100644 --- a/run/config-testnet-turbo.toml +++ b/run/config-testnet-turbo.toml @@ -244,10 +244,6 @@ batcher_announcement_capacity = 100 # all files, and sufficient disk space is required. auto_sync_enabled = true -# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file -# announcements in the whole network, which leads to high latency or even timeout to sync files. -neighbors_only = true - # Maximum number of files in sync from other peers simultaneously. # max_sync_files = 8 diff --git a/run/config.toml b/run/config.toml index fe7ab123..2c43742f 100644 --- a/run/config.toml +++ b/run/config.toml @@ -246,10 +246,6 @@ # all files, and sufficient disk space is required. # auto_sync_enabled = false -# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file -# announcements in the whole network, which leads to high latency or even timeout to sync files. -neighbors_only = true - # Maximum number of files in sync from other peers simultaneously. # max_sync_files = 8 diff --git a/tests/crash_test.py b/tests/crash_test.py index 503e614a..22c09c18 100755 --- a/tests/crash_test.py +++ b/tests/crash_test.py @@ -20,8 +20,9 @@ def run_test(self): segment = submit_data(self.nodes[0], chunk_data) self.log.info("segment: %s", segment) + wait_until(lambda: self.nodes[0].zgs_get_file_info(data_root)["finalized"] is True) - for i in range(self.num_nodes): + for i in range(1, self.num_nodes): wait_until( lambda: self.nodes[i].zgs_get_file_info(data_root) is not None ) diff --git a/tests/sync_auto_random_v2_test.py b/tests/sync_auto_random_v2_test.py index f584283d..a00c5593 100644 --- a/tests/sync_auto_random_v2_test.py +++ b/tests/sync_auto_random_v2_test.py @@ -14,7 +14,6 @@ def setup_params(self): "auto_sync_enabled": True, "max_sequential_workers": 0, "max_random_workers": 3, - "neighbors_only": True, } } diff --git a/tests/sync_test.py b/tests/sync_test.py index 45f5bb5f..e2e8f7fa 100755 --- a/tests/sync_test.py +++ b/tests/sync_test.py @@ -45,13 +45,15 @@ def __test_sync_file_by_rpc(self): wait_until(lambda: client2.zgs_get_file_info(data_root) is not None) time.sleep(3) assert_equal(client2.zgs_get_file_info(data_root)["finalized"], False) - assert(client2.admin_get_file_location(0) is None) + # file sync use ASK_FILE & ANSWER FILE protocol, and do not cache file announcement anymore. + # assert(client2.admin_get_file_location(0) is None) # Trigger file sync by rpc assert(client2.admin_start_sync_file(0) is None) wait_until(lambda: client2.sync_status_is_completed_or_unknown(0)) wait_until(lambda: client2.zgs_get_file_info(data_root)["finalized"]) - assert(client2.admin_get_file_location(0) is not None) + # file sync use ASK_FILE & ANSWER FILE protocol, and do not cache file announcement anymore. + # assert(client2.admin_get_file_location(0) is not None) # Validate data assert_equal(