Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry custody requests after peer metadata updates #6975

Open
wants to merge 3 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,8 +712,9 @@ impl<E: EthSpec> PeerManager<E> {
}

/// Received a metadata response from a peer.
pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData<E>) {
pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData<E>) -> bool {
let mut invalid_meta_data = false;
let mut updated_cgc = false;

if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
if let Some(known_meta_data) = &peer_info.meta_data() {
Expand All @@ -732,12 +733,16 @@ impl<E: EthSpec> PeerManager<E> {
"peer_id" => %peer_id, "new_seq_no" => meta_data.seq_number());
}

let known_custody_group_count = peer_info
.meta_data()
.and_then(|meta_data| meta_data.custody_group_count().copied().ok());

let custody_group_count_opt = meta_data.custody_group_count().copied().ok();
peer_info.set_meta_data(meta_data);

if self.network_globals.spec.is_peer_das_scheduled() {
// Gracefully ignore metadata/v2 peers. Potentially downscore after PeerDAS to
// prioritize PeerDAS peers.
// Gracefully ignore metadata/v2 peers.
// TODO(das) Potentially downscore after PeerDAS to prioritize PeerDAS peers.
if let Some(custody_group_count) = custody_group_count_opt {
match self.compute_peer_custody_groups(peer_id, custody_group_count) {
Ok(custody_groups) => {
Expand All @@ -759,6 +764,8 @@ impl<E: EthSpec> PeerManager<E> {
})
.collect();
peer_info.set_custody_subnets(custody_subnets);

updated_cgc = Some(custody_group_count) != known_custody_group_count;
}
Err(err) => {
debug!(self.log, "Unable to compute peer custody groups from metadata";
Expand All @@ -781,6 +788,8 @@ impl<E: EthSpec> PeerManager<E> {
if invalid_meta_data {
self.goodbye_peer(peer_id, GoodbyeReason::Fault, ReportSource::PeerManager)
}

updated_cgc
}

/// Updates the gossipsub scores for all known peers in gossipsub.
Expand Down
5 changes: 4 additions & 1 deletion beacon_node/lighthouse_network/src/service/api_types.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::rpc::{
methods::{ResponseTermination, RpcResponse, RpcSuccessResponse, StatusMessage},
SubstreamId,
MetaData, SubstreamId,
};
use libp2p::swarm::ConnectionId;
use std::fmt::{Display, Formatter};
Expand Down Expand Up @@ -147,6 +147,8 @@ pub enum RequestId {
// `RPCCodedResponse`.
#[derive(Debug, Clone, PartialEq)]
pub enum Response<E: EthSpec> {
/// A Metadata message.
MetaData(MetaData<E>, /* updated_cgc */ bool),
/// A Status message.
Status(StatusMessage),
/// A response to a get BLOCKS_BY_RANGE request. A None response signals the end of the batch.
Expand Down Expand Up @@ -198,6 +200,7 @@ impl<E: EthSpec> std::convert::From<Response<E>> for RpcResponse<E> {
Some(d) => RpcResponse::Success(RpcSuccessResponse::DataColumnsByRange(d)),
None => RpcResponse::StreamTermination(ResponseTermination::DataColumnsByRange),
},
Response::MetaData(m, _) => RpcResponse::Success(RpcSuccessResponse::MetaData(m)),
Response::Status(s) => RpcResponse::Success(RpcSuccessResponse::Status(s)),
Response::LightClientBootstrap(b) => {
RpcResponse::Success(RpcSuccessResponse::LightClientBootstrap(b))
Expand Down
8 changes: 5 additions & 3 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1600,9 +1600,11 @@ impl<E: EthSpec> Network<E> {
None
}
RpcSuccessResponse::MetaData(meta_data) => {
self.peer_manager_mut()
.meta_data_response(&peer_id, meta_data);
None
let updated_cgc = self
.peer_manager_mut()
.meta_data_response(&peer_id, meta_data.clone());
// Send event after calling into peer_manager so the PeerDB is updated
self.build_response(id, peer_id, Response::MetaData(meta_data, updated_cgc))
}
/* Network propagated protocols */
RpcSuccessResponse::Status(msg) => {
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,9 @@ impl<T: BeaconChainTypes> Router<T> {
response: Response<T::EthSpec>,
) {
match response {
Response::MetaData(_meta_data, updated_cgc) => {
self.on_meta_data_response(peer_id, updated_cgc)
}
Response::Status(status_message) => {
debug!(self.log, "Received Status Response"; "peer_id" => %peer_id, &status_message);
self.handle_beacon_processor_send_result(
Expand Down Expand Up @@ -614,6 +617,12 @@ impl<T: BeaconChainTypes> Router<T> {
)
}

pub fn on_meta_data_response(&mut self, peer_id: PeerId, updated_cgc: bool) {
if updated_cgc {
self.send_to_sync(SyncMessage::UpdatedPeerCgc(peer_id));
}
}

/// Handle a `BlocksByRange` response from the peer.
/// A `beacon_block` behaves as a stream which is terminated on a `None` response.
pub fn on_blocks_by_range_response(
Expand Down
14 changes: 14 additions & 0 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ pub enum SyncMessage<E: EthSpec> {
head_slot: Option<Slot>,
},

/// Peer manager has received a MetaData of a peer with a new or updated CGC value.
UpdatedPeerCgc(PeerId),

/// A block has been received from the RPC.
RpcBlock {
request_id: SyncRequestId,
Expand Down Expand Up @@ -483,6 +486,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}

fn updated_peer_cgc(&mut self, _peer_id: PeerId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to resume by range request as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, I resume range sync aswell

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I'll run some tests today to confirm this fixes the issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't look like this fixes the issue. I still don't see range requests until a few minutes later until we add a new finalized chain.

Right after startup, waiting for custody peers

Feb 12 04:33:07.181 DEBG Waiting for peers to be available on sampling column subnets, chain: 1, service: range_sync, service: sync, module: network::sync::range_sync::chain:1057

Got peer metadata response after 15s

Feb 12 04:33:22.271 DEBG Obtained peer's metadata, new_seq_no: 6, peer_id: 16Uiu2HAmJyaVGkRGR9ACqompkoge8T2x4KFH4KbzDkh7zz6uN2JX, service: libp2p, module: lighthouse_network::peer_manager:732

No range requests until ~5 mins later

Feb 12 04:38:07.303 DEBG Finalization sync peer joined, peer_id: 16Uiu2HAmJyaVGkRGR9ACqompkoge8T2x4KFH4KbzDkh7zz6uN2JX, service: range_sync, service: sync, module: network::sync::range_sync::range:143
Feb 12 04:38:07.305 DEBG New chain added to sync, id: 2, from: 38, to: 1071, end_root: 0x3be00d7ce6e52f7938fd588d909055f72469d0f09ce545d7b23077f2d6b40e8a, current_target: 38, batches: 0, peers: 1, state: Stopped, sync_type: Finalized, peer_id: 16Uiu2HAmJyaVGkRGR9ACqompk
oge8T2x4KFH4KbzDkh7zz6uN2JX, service: range_sync, service: sync, module: network::sync::range_sync::chain_collection:506
Feb 12 04:38:07.306 DEBG Sync RPC request sent, id: 4/3/RangeSync/39/1, peer: 16Uiu2HAmAAZ5wP6fvpe1b9tWNmgA2Wn8MsrNYVhkw5WohcAoaHKR, epoch: 39, slots: 32, method: BlocksByRange, service: sync, module: network::sync::network_context:788
Feb 12 04:38:07.307 DEBG Sync RPC request sent, id: 5/3/RangeSync/39/1, peer: 16Uiu2HAm9PijSZpm5QUphXRoBtkhUZPkGJ4Rgxk4Bny91oZPYZLG, columns: [74, 30, 39, 19, 63, 41, 52, 47, 58], epoch: 39, slots: 32, method: DataColumnsByRange, service: sync, module: network::sync::network_context:870

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you saw this log? Received updated peer CGC message

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recall seeing this. I think I was using the right locally-built image, but can be worth re-testing to confirm if you have time.

// Try to make progress on custody requests that are waiting for peers
for (id, result) in self.network.continue_custody_by_root_requests() {
self.on_custody_by_root_result(id, result);
}
}

/// Handles RPC errors related to requests that were emitted from the sync manager.
fn inject_error(&mut self, peer_id: PeerId, request_id: SyncRequestId, error: RPCError) {
trace!(self.log, "Sync manager received a failed RPC");
Expand Down Expand Up @@ -758,6 +768,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} => {
self.add_peers_force_range_sync(&peers, head_root, head_slot);
}
SyncMessage::UpdatedPeerCgc(peer_id) => {
debug!(self.log, "Received updated peer CGC message"; "peer" => ?peer_id);
self.updated_peer_cgc(peer_id);
}
SyncMessage::RpcBlock {
request_id,
peer_id,
Expand Down
Loading