Skip to content

Commit

Permalink
Sync file from neighbors by default (#295)
Browse files Browse the repository at this point in the history
* sync from neighbors by default

* change p2p protocol version

* minor fix for crash python test

* Fix pubsub message id issue

* fix python tests
  • Loading branch information
boqiu authored Dec 9, 2024
1 parent 6b2420c commit d3a2118
Show file tree
Hide file tree
Showing 14 changed files with 65 additions and 55 deletions.
2 changes: 1 addition & 1 deletion node/network/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ impl<AppReqId: ReqId> Behaviour<AppReqId> {
.gossipsub
.publish(topic.clone().into(), message_data.clone())
{
warn!(error = ?e, "Could not publish message");
warn!(error = ?e, topic = ?topic.kind(), "Failed to publish message");

// add to metrics
if let Some(v) = metrics::get_int_gauge(
Expand Down
64 changes: 38 additions & 26 deletions node/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,32 +294,6 @@ impl From<u8> for NetworkLoad {

/// Return a Lighthouse specific `GossipsubConfig` where the `message_id_fn` depends on the current fork.
pub fn gossipsub_config(network_load: u8) -> GossipsubConfig {
// The function used to generate a gossipsub message id
// We use the first 8 bytes of SHA256(data) for content addressing
let fast_gossip_message_id =
|message: &RawGossipsubMessage| FastMessageId::from(&Sha256::digest(&message.data)[..8]);
fn prefix(prefix: [u8; 4], message: &GossipsubMessage) -> Vec<u8> {
let topic_bytes = message.topic.as_str().as_bytes();

// according to: https://github.com/ethereum/consensus-specs/blob/dev/specs/merge/p2p-interface.md#the-gossip-domain-gossipsub
// the derivation of the message-id remains the same in the merge
let topic_len_bytes = topic_bytes.len().to_le_bytes();
let mut vec = Vec::with_capacity(
prefix.len() + topic_len_bytes.len() + topic_bytes.len() + message.data.len(),
);
vec.extend_from_slice(&prefix);
vec.extend_from_slice(&topic_len_bytes);
vec.extend_from_slice(topic_bytes);
vec.extend_from_slice(&message.data);
vec
}

let gossip_message_id = move |message: &GossipsubMessage| {
MessageId::from(
&Sha256::digest(prefix(MESSAGE_DOMAIN_VALID_SNAPPY, message).as_slice())[..20],
)
};

let load = NetworkLoad::from(network_load);

GossipsubConfigBuilder::default()
Expand Down Expand Up @@ -368,3 +342,41 @@ fn is_global(addr: &std::net::Ipv4Addr) -> bool {
// Make sure the address is not in 0.0.0.0/8
&& addr.octets()[0] != 0
}

fn fast_gossip_message_id(message: &RawGossipsubMessage) -> FastMessageId {
// source | topic | data | nonce
let topic_bytes = message.topic.as_str().as_bytes();
let mut buf = Vec::with_capacity(64 + topic_bytes.len() + message.data.len() + 8);

if let Some(peer_id) = message.source {
buf.extend_from_slice(&peer_id.to_bytes());
}

buf.extend_from_slice(&topic_bytes);
buf.extend_from_slice(&message.data);

if let Some(nonce) = message.sequence_number {
buf.extend_from_slice(&nonce.to_le_bytes());
}

FastMessageId::from(&Sha256::digest(&buf)[..8])
}

fn gossip_message_id(message: &GossipsubMessage) -> MessageId {
// prefix | source | topic | data | nonce
let topic_bytes = message.topic.as_str().as_bytes();
let mut vec = Vec::with_capacity(
MESSAGE_DOMAIN_VALID_SNAPPY.len() + 64 + topic_bytes.len() + message.data.len() + 8,
);
vec.extend_from_slice(&MESSAGE_DOMAIN_VALID_SNAPPY);
if let Some(peer_id) = message.source {
vec.extend_from_slice(&peer_id.to_bytes());
}
vec.extend_from_slice(topic_bytes);
vec.extend_from_slice(&message.data);
if let Some(nonce) = message.sequence_number {
vec.extend_from_slice(&nonce.to_le_bytes());
}

MessageId::from(&Sha256::digest(&vec)[..20])
}
2 changes: 2 additions & 0 deletions node/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_F
/// - v1: Broadcast FindFile & AnnounceFile messages in the whole network, which caused network too heavey.
/// - v2: Publish NewFile to neighbors only and announce file via RPC message.
/// - v3: Add shard config in Status message.
/// - v4: Refactor pubsub messages.
pub const PROTOCOL_VERSION_V1: [u8; 3] = [0, 1, 1];
pub const PROTOCOL_VERSION_V2: [u8; 3] = [0, 2, 1];
pub const PROTOCOL_VERSION_V3: [u8; 3] = [0, 3, 0];
pub const PROTOCOL_VERSION_V4: [u8; 3] = [0, 4, 0];

/// Application level requests sent to the network.
#[derive(Debug, Clone, Copy)]
Expand Down
1 change: 1 addition & 0 deletions node/router/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ impl RouterService {
let msg = PubsubMessage::NewFile(new_file.into());
self.libp2p.swarm.behaviour_mut().publish(vec![msg]);
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE.mark(1);
debug!(?new_file, "Publish NewFile message");
}
NetworkMessage::UPnPMappingEstablished {
tcp_socket,
Expand Down
10 changes: 3 additions & 7 deletions node/src/config/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,9 @@ impl ZgsConfig {
chain_id,
flow_address,
p2p_protocol_version: ProtocolVersion {
major: network::PROTOCOL_VERSION_V3[0],
minor: network::PROTOCOL_VERSION_V3[1],
build: if self.sync.neighbors_only {
network::PROTOCOL_VERSION_V3[2] + 1
} else {
network::PROTOCOL_VERSION_V3[2]
},
major: network::PROTOCOL_VERSION_V4[0],
minor: network::PROTOCOL_VERSION_V4[1],
build: network::PROTOCOL_VERSION_V4[2],
},
};
network_config.network_id = local_network_id.clone();
Expand Down
7 changes: 5 additions & 2 deletions node/sync/src/controllers/serial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ impl SerialSyncController {
} else {
// FindFile timeout
if since.elapsed() >= self.config.peer_find_timeout {
if self.config.neighbors_only {
if self.goal.is_all_chunks() && self.config.neighbors_only {
self.state = SyncState::Failed {
reason: FailureReason::TimeoutFindFile,
};
Expand Down Expand Up @@ -1694,7 +1694,10 @@ mod tests {
let file_location_cache = create_file_location_cache(peer_id, vec![tx_id]);

let controller = SerialSyncController::new(
Config::default(),
Config {
neighbors_only: false,
..Default::default()
},
tx_id,
0,
FileSyncGoal::new_file(num_chunks as u64),
Expand Down
2 changes: 1 addition & 1 deletion node/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl Default for Config {
fn default() -> Self {
Self {
// sync service config
neighbors_only: false,
neighbors_only: true,
heartbeat_interval: Duration::from_secs(5),
auto_sync_enabled: false,
max_sync_files: 8,
Expand Down
10 changes: 8 additions & 2 deletions node/sync/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -941,8 +941,14 @@ mod tests {
}

async fn spawn_sync_service(&mut self, with_peer_store: bool) -> SyncSender {
self.spawn_sync_service_with_config(with_peer_store, Config::default())
.await
self.spawn_sync_service_with_config(
with_peer_store,
Config {
neighbors_only: false,
..Default::default()
},
)
.await
}

async fn spawn_sync_service_with_config(
Expand Down
4 changes: 0 additions & 4 deletions run/config-testnet-standard.toml
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,6 @@ batcher_announcement_capacity = 100
# all files, and sufficient disk space is required.
auto_sync_enabled = true

# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file
# announcements in the whole network, which leads to high latency or even timeout to sync files.
neighbors_only = true

# Maximum number of files in sync from other peers simultaneously.
# max_sync_files = 8

Expand Down
4 changes: 0 additions & 4 deletions run/config-testnet-turbo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,6 @@ batcher_announcement_capacity = 100
# all files, and sufficient disk space is required.
auto_sync_enabled = true

# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file
# announcements in the whole network, which leads to high latency or even timeout to sync files.
neighbors_only = true

# Maximum number of files in sync from other peers simultaneously.
# max_sync_files = 8

Expand Down
4 changes: 0 additions & 4 deletions run/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,6 @@
# all files, and sufficient disk space is required.
# auto_sync_enabled = false

# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file
# announcements in the whole network, which leads to high latency or even timeout to sync files.
neighbors_only = true

# Maximum number of files in sync from other peers simultaneously.
# max_sync_files = 8

Expand Down
3 changes: 2 additions & 1 deletion tests/crash_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ def run_test(self):

segment = submit_data(self.nodes[0], chunk_data)
self.log.info("segment: %s", segment)
wait_until(lambda: self.nodes[0].zgs_get_file_info(data_root)["finalized"] is True)

for i in range(self.num_nodes):
for i in range(1, self.num_nodes):
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root) is not None
)
Expand Down
1 change: 0 additions & 1 deletion tests/sync_auto_random_v2_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def setup_params(self):
"auto_sync_enabled": True,
"max_sequential_workers": 0,
"max_random_workers": 3,
"neighbors_only": True,
}
}

Expand Down
6 changes: 4 additions & 2 deletions tests/sync_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ def __test_sync_file_by_rpc(self):
wait_until(lambda: client2.zgs_get_file_info(data_root) is not None)
time.sleep(3)
assert_equal(client2.zgs_get_file_info(data_root)["finalized"], False)
assert(client2.admin_get_file_location(0) is None)
# file sync use ASK_FILE & ANSWER FILE protocol, and do not cache file announcement anymore.
# assert(client2.admin_get_file_location(0) is None)

# Trigger file sync by rpc
assert(client2.admin_start_sync_file(0) is None)
wait_until(lambda: client2.sync_status_is_completed_or_unknown(0))
wait_until(lambda: client2.zgs_get_file_info(data_root)["finalized"])
assert(client2.admin_get_file_location(0) is not None)
# file sync use ASK_FILE & ANSWER FILE protocol, and do not cache file announcement anymore.
# assert(client2.admin_get_file_location(0) is not None)

# Validate data
assert_equal(
Expand Down

0 comments on commit d3a2118

Please sign in to comment.