From 5f7788b72a9988af16e5f12dcb740a474f4f9cd9 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Mon, 11 Dec 2023 09:17:08 +0900 Subject: [PATCH] Run prefix searching --- .../lighthouse_network/src/discovery/mod.rs | 106 +++++++++++++----- 1 file changed, 80 insertions(+), 26 deletions(-) diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index e0ed8f5168a..bda00d9560d 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -131,6 +131,8 @@ impl std::fmt::Debug for SubnetQuery { enum QueryType { /// We are searching for subnet peers. Subnet(Vec), + /// We are prefix searching for subnet peers. + PrefixSearch(Vec), /// We are searching for more peers without ENR or time constraints. FindPeers, } @@ -791,16 +793,32 @@ impl Discovery { // build the subnet predicate as a combination of the eth2_fork_predicate and the subnet predicate let subnet_predicate = subnet_predicate::(filtered_subnets, &self.log); - debug!( - self.log, - "Starting grouped subnet query"; - "subnets" => ?filtered_subnet_queries, - ); - self.start_query( - QueryType::Subnet(filtered_subnet_queries), - TARGET_PEERS_FOR_GROUPED_QUERY, - subnet_predicate, - ); + // TODO: Add CLI flag for this? + let prefix_search_for_subnet = true; + + if prefix_search_for_subnet { + debug!( + self.log, + "Starting prefix search query"; + "subnets" => ?filtered_subnet_queries, + ); + self.start_query( + QueryType::PrefixSearch(filtered_subnet_queries), + TARGET_PEERS_FOR_GROUPED_QUERY, + subnet_predicate, + ); + } else { + debug!( + self.log, + "Starting grouped subnet query"; + "subnets" => ?filtered_subnet_queries, + ); + self.start_query( + QueryType::Subnet(filtered_subnet_queries), + TARGET_PEERS_FOR_GROUPED_QUERY, + subnet_predicate, + ); + } } } @@ -831,22 +849,58 @@ impl Discovery { && (enr.tcp4().is_some() || enr.tcp6().is_some()) }; - // General predicate - let predicate: Box bool + Send> = - Box::new(move |enr: &Enr| eth2_fork_predicate(enr) && additional_predicate(enr)); - - // Build the future - let query_future = self - .discv5 - // Generate a random target node id. - .find_node_predicate(NodeId::random(), predicate, target_peers) - .map(|v| QueryResult { - query_type: query, - result: v, - }); + if let QueryType::PrefixSearch(subnet_queries) = query { + // Split the grouped subnet query into individual queries in order to prefix search. + for subnet_query in subnet_queries { + // Target node + let target_node = match &subnet_query.subnet { + Subnet::Attestation(subnet_id) => { + match self.prefix_mapping.get::(subnet_id) { + Ok(raw) => { + let raw_node_id: [u8; 32] = raw.into(); + NodeId::from(raw_node_id) + } + Err(e) => { + warn!(self.log, "Failed to get target NodeId"; "error" => %e, "subnet_id" => ?subnet_id); + continue; + } + } + } + Subnet::SyncCommittee(_) => NodeId::random(), + }; + // Build the future + let query_future = self + .discv5 + .find_node_predicate( + target_node, + Box::new(eth2_fork_predicate.clone()), + target_peers, + ) + .map(|v| QueryResult { + query_type: QueryType::PrefixSearch(vec![subnet_query]), + result: v, + }); - // Add the future to active queries, to be executed. - self.active_queries.push(Box::pin(query_future)); + // Add the future to active queries, to be executed. + self.active_queries.push(Box::pin(query_future)); + } + } else { + // General predicate + let predicate: Box bool + Send> = + Box::new(move |enr: &Enr| eth2_fork_predicate(enr) && additional_predicate(enr)); + // Build the future + let query_future = self + .discv5 + // Generate a random target node id. + .find_node_predicate(NodeId::random(), predicate, target_peers) + .map(|v| QueryResult { + query_type: query, + result: v, + }); + + // Add the future to active queries, to be executed. + self.active_queries.push(Box::pin(query_future)); + } } /// Process the completed QueryResult returned from discv5. @@ -878,7 +932,7 @@ impl Discovery { } } } - QueryType::Subnet(queries) => { + QueryType::Subnet(queries) | QueryType::PrefixSearch(queries) => { let subnets_searched_for: Vec = queries.iter().map(|query| query.subnet).collect(); match query.result {