Skip to content

Commit

Permalink
fix(l1): peers tcp connections (#1885)
Browse files Browse the repository at this point in the history
**Motivation**
We observed that our peer count was significantly lower compared to
other execution clients.

**Description**
This PR refines the exploratory work done in #1875. While debugging, we
developed tools to track individual peer traces (see
[here](https://github.com/MarcosNicolau/eth-el-clients-debugging) if
interested). Through this, we discovered that although we were
exchanging messages with many peers, they were not appearing in the
stats. This indicated that they were not being inserted into the
`KademliaTable`.

The root cause was that the Bucket was already full, which happens
quickly since each bucket only holds 10 entries. Below is a list of
changes that improved the peer situation:
- Peers were connected but not properly inserted into the table: We
temporarily fixed this by forcing peer insertion, even if it exceeded
the protocol’s allowed limit. An alternative solution could be
maintaining a secondary table for P2P peers, but that would be better
suited for the incoming refactor.
- Frame decoding verification was causing connection crashes:
Specifically, an assertion on `header-data` was failing. While the spec
dictates that it should be RLP-encoded zero, some clients would not send
the encoded values. Additionally, we found that no other clients seem to
enforce verification on this field.
-  Added logging for message exchanges between peers.
- Refactored functions that were over-locking the KademliaTable: This
became particularly noticeable when a high number of peers were
discovered.

**Other Findings**
- Incorrect capability exchange logic: We attempt to connect to peers
with the snap capability without first verifying if they support our eth
protocol version. This leads to failed connections. A separate PR will
address this, this is the cause of the observed error message related to
capability exchange failures due to mismatched eth and snap protocols.
- Most disconnections occur because peers have already reached their
connection limit (sending us `Too many peers` as the reason).
- A future improvement could be implementing a reconnection mechanism.
- The majority of our connections are incoming.

Closes #1811
  • Loading branch information
MarcosNicolau authored Feb 11, 2025
1 parent d874830 commit c38c601
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 66 deletions.
8 changes: 6 additions & 2 deletions crates/networking/p2p/discv4/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ impl Discv4Server {
pub(super) mod tests {
use super::*;
use crate::{
network::{node_id_from_signing_key, MAX_MESSAGES_TO_BROADCAST},
network::{node_id_from_signing_key, serve_p2p_requests, MAX_MESSAGES_TO_BROADCAST},
rlpx::message::Message as RLPxMessage,
};
use ethrex_storage::{EngineType, Store};
Expand Down Expand Up @@ -693,7 +693,7 @@ pub(super) mod tests {
broadcast,
};

let discv4 = Discv4Server::try_new(ctx).await?;
let discv4 = Discv4Server::try_new(ctx.clone()).await?;

if should_start_server {
tracker.spawn({
Expand All @@ -702,6 +702,9 @@ pub(super) mod tests {
discv4.receive().await;
}
});
// we need to spawn the p2p service, as the nodes will try to connect each other via tcp once bonded
// if that connection fails, then they are remove themselves from the table, we want them to be bonded for these tests
ctx.tracker.spawn(serve_p2p_requests(ctx.clone()));
}

Ok(discv4)
Expand All @@ -715,6 +718,7 @@ pub(super) mod tests {
server_a
.try_add_peer_and_ping(server_b.ctx.local_node, server_a.ctx.table.lock().await)
.await?;

// allow some time for the server to respond
sleep(Duration::from_secs(1)).await;
Ok(())
Expand Down
64 changes: 35 additions & 29 deletions crates/networking/p2p/kademlia.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ethrex_core::{H256, H512, U256};
use sha3::{Digest, Keccak256};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{mpsc, Mutex};
use tracing::{debug, info};
use tracing::debug;

pub const MAX_NODES_PER_BUCKET: usize = 16;
const NUMBER_OF_BUCKETS: usize = 256;
Expand Down Expand Up @@ -66,7 +66,19 @@ impl KademliaTable {
let node_id = node.node_id;
let bucket_idx = bucket_number(node_id, self.local_node_id);

self.insert_node_inner(node, bucket_idx)
self.insert_node_inner(node, bucket_idx, false)
}

/// Inserts a node into the table, even if the bucket is full.
/// # Returns
/// A tuple containing:
/// 1. PeerData: none if the peer was already in the table or as a potential replacement
/// 2. A bool indicating if the node was inserted to the table
pub fn insert_node_forced(&mut self, node: Node) -> (Option<PeerData>, bool) {
let node_id = node.node_id;
let bucket_idx = bucket_number(node_id, self.local_node_id);

self.insert_node_inner(node, bucket_idx, true)
}

#[cfg(test)]
Expand All @@ -75,10 +87,15 @@ impl KademliaTable {
node: Node,
bucket_idx: usize,
) -> (Option<PeerData>, bool) {
self.insert_node_inner(node, bucket_idx)
self.insert_node_inner(node, bucket_idx, false)
}

fn insert_node_inner(&mut self, node: Node, bucket_idx: usize) -> (Option<PeerData>, bool) {
fn insert_node_inner(
&mut self,
node: Node,
bucket_idx: usize,
force_push: bool,
) -> (Option<PeerData>, bool) {
let node_id = node.node_id;

let peer_already_in_table = self.buckets[bucket_idx]
Expand All @@ -98,9 +115,15 @@ impl KademliaTable {

let peer = PeerData::new(node, NodeRecord::default(), false);

if self.buckets[bucket_idx].peers.len() == MAX_NODES_PER_BUCKET {
self.insert_as_replacement(&peer, bucket_idx);
(Some(peer), false)
if self.buckets[bucket_idx].peers.len() >= MAX_NODES_PER_BUCKET {
if force_push {
self.remove_from_replacements(node_id, bucket_idx);
self.buckets[bucket_idx].peers.push(peer.clone());
(Some(peer), true)
} else {
self.insert_as_replacement(&peer, bucket_idx);
(Some(peer), false)
}
} else {
self.remove_from_replacements(node_id, bucket_idx);
self.buckets[bucket_idx].peers.push(peer.clone());
Expand Down Expand Up @@ -226,10 +249,11 @@ impl KademliaTable {
&'a self,
filter: &'a dyn Fn(&'a PeerData) -> bool,
) -> Option<&'a PeerData> {
let filtered_peers: Vec<&PeerData> = self.filter_peers(filter).collect();
let peer_idx = rand::random::<usize>()
.checked_rem(self.filter_peers(filter).count())
.checked_rem(filtered_peers.len())
.unwrap_or_default();
self.filter_peers(filter).nth(peer_idx)
filtered_peers.get(peer_idx).cloned()
}

/// Replaces the peer with the given id with the latest replacement stored.
Expand Down Expand Up @@ -284,13 +308,8 @@ impl KademliaTable {
channels: PeerChannels,
capabilities: Vec<Capability>,
) {
let bucket_idx = bucket_number(self.local_node_id, node_id);
if let Some(peer) = self.buckets.get_mut(bucket_idx).and_then(|bucket| {
bucket
.peers
.iter_mut()
.find(|peer| peer.node.node_id == node_id)
}) {
let peer = self.get_by_node_id_mut(node_id);
if let Some(peer) = peer {
peer.channels = Some(channels);
peer.supported_capabilities = capabilities;
peer.is_connected = true;
Expand All @@ -312,19 +331,6 @@ impl KademliaTable {
self.get_random_peer_with_filter(&filter)
.and_then(|peer| peer.channels.clone())
}

/// Outputs total amount of peers, active peers, and active peers supporting the Snap Capability to the command line
pub fn show_peer_stats(&self) {
let active_filter = |peer: &PeerData| -> bool { peer.channels.as_ref().is_some() };
let snap_active_filter = |peer: &PeerData| -> bool {
peer.channels.as_ref().is_some()
&& peer.supported_capabilities.contains(&Capability::Snap)
};
let total_peers = self.iter_peers().count();
let active_peers = self.filter_peers(&active_filter).count();
let snap_active_peers = self.filter_peers(&snap_active_filter).count();
info!("Snap Peers: {snap_active_peers} / Active Peers {active_peers} / Total Peers: {total_peers}")
}
}

/// Computes the distance between two nodes according to the discv4 protocol
Expand Down
50 changes: 31 additions & 19 deletions crates/networking/p2p/network.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use crate::discv4::{
helpers::current_unix_time,
server::{DiscoveryError, Discv4Server},
};
use crate::kademlia::KademliaTable;
use crate::kademlia::{self, KademliaTable};
use crate::rlpx::p2p::Capability;
use crate::rlpx::{
connection::RLPxConnBroadcastSender, handshake, message::Message as RLPxMessage,
};
use crate::types::Node;
use crate::{
discv4::{
helpers::current_unix_time,
server::{DiscoveryError, Discv4Server},
},
rlpx::utils::log_peer_error,
};
use ethrex_core::H512;
use ethrex_storage::Store;
use k256::{
Expand Down Expand Up @@ -95,7 +99,7 @@ pub async fn start_network(
Ok(())
}

async fn serve_p2p_requests(context: P2PContext) {
pub(crate) async fn serve_p2p_requests(context: P2PContext) {
let tcp_addr = context.local_node.tcp_addr();
let listener = match listener(tcp_addr) {
Ok(result) => result,
Expand Down Expand Up @@ -130,9 +134,6 @@ async fn handle_peer_as_receiver(context: P2PContext, peer_addr: SocketAddr, str
match handshake::as_receiver(context, peer_addr, stream).await {
Ok(mut conn) => conn.start(table).await,
Err(e) => {
// TODO We should remove the peer from the table if connection failed
// but currently it will make the tests fail
// table.lock().await.replace_peer(node.node_id);
debug!("Error creating tcp connection with peer at {peer_addr}: {e}")
}
}
Expand All @@ -143,21 +144,17 @@ pub async fn handle_peer_as_initiator(context: P2PContext, node: Node) {
let stream = match tcp_stream(addr).await {
Ok(result) => result,
Err(e) => {
// TODO We should remove the peer from the table if connection failed
// but currently it will make the tests fail
// table.lock().await.replace_peer(node.node_id);
debug!("Error establishing tcp connection with peer at {addr}: {e}");
log_peer_error(&node, &format!("Error creating tcp connection {e}"));
context.table.lock().await.replace_peer(node.node_id);
return;
}
};
let table = context.table.clone();
match handshake::as_initiator(context, node, stream).await {
Ok(mut conn) => conn.start(table).await,
Err(e) => {
// TODO We should remove the peer from the table if connection failed
// but currently it will make the tests fail
// table.lock().await.replace_peer(node.node_id);
debug!("Error creating tcp connection with peer at {addr}: {e}")
log_peer_error(&node, &format!("Error creating tcp connection {e}"));
table.lock().await.replace_peer(node.node_id);
}
};
}
Expand All @@ -174,10 +171,25 @@ pub fn node_id_from_signing_key(signer: &SigningKey) -> H512 {

/// Shows the amount of connected peers, active peers, and peers suitable for snap sync on a set interval
pub async fn periodically_show_peer_stats(peer_table: Arc<Mutex<KademliaTable>>) {
const INTERVAL_DURATION: tokio::time::Duration = tokio::time::Duration::from_secs(120);
const INTERVAL_DURATION: tokio::time::Duration = tokio::time::Duration::from_secs(30);
let mut interval = tokio::time::interval(INTERVAL_DURATION);
loop {
peer_table.lock().await.show_peer_stats();
// clone peers to keep the lock short
let peers: Vec<kademlia::PeerData> =
peer_table.lock().await.iter_peers().cloned().collect();
let total_peers = peers.len();
let active_peers = peers
.iter()
.filter(|peer| -> bool { peer.channels.as_ref().is_some() })
.count();
let snap_active_peers = peers
.iter()
.filter(|peer| -> bool {
peer.channels.as_ref().is_some()
&& peer.supported_capabilities.contains(&Capability::Snap)
})
.count();
info!("Snap Peers: {snap_active_peers} / Active Peers {active_peers} / Total Peers: {total_peers}");
interval.tick().await;
}
}
39 changes: 29 additions & 10 deletions crates/networking/p2p/rlpx/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,15 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
// NOTE: if the peer came from the discovery server it will already be inserted in the table
// but that might not always be the case, so we try to add it to the table
// Note: we don't ping the node we let the validation service do its job
table.lock().await.insert_node(self.node);
table.lock().await.init_backend_communication(
self.node.node_id,
peer_channels,
capabilities,
);
{
let mut table_lock = table.lock().await;
table_lock.insert_node_forced(self.node);
table_lock.init_backend_communication(
self.node.node_id,
peer_channels,
capabilities,
);
}
if let Err(e) = self.connection_loop(sender, receiver).await {
self.connection_failed("Error during RLPx connection", e, table)
.await;
Expand Down Expand Up @@ -159,6 +162,7 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
&format!("{error_text}: ({error}), discarding peer {remote_node_id}"),
);
table.lock().await.replace_peer(remote_node_id);
let _ = self.framed.close().await;
}

fn match_disconnect_reason(&self, error: &RLPxError) -> Option<u8> {
Expand All @@ -180,6 +184,13 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
// Receive Hello message
match self.receive().await? {
Message::Hello(hello_message) => {
log_peer_debug(
&self.node,
&format!(
"Hello message capabilities {:?}",
hello_message.capabilities
),
);
self.capabilities = hello_message.capabilities;

// Check if we have any capability in common
Expand Down Expand Up @@ -223,10 +234,20 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
tokio::select! {
// Expect a message from the remote peer
message = self.receive() => {
self.handle_message(message?, sender.clone()).await?;
match message {
Ok(message) => {
log_peer_debug(&self.node, &format!("Received message {}", message));
self.handle_message(message, sender.clone()).await?;
},
Err(e) => {
log_peer_debug(&self.node, &format!("Received RLPX Error in msg {}", e));
return Err(e);
}
}
}
// Expect a message from the backend
Some(message) = receiver.recv() => {
log_peer_debug(&self.node, &format!("Sending message {}", message));
self.send(message).await?;
}
// This is not ideal, but using the receiver without
Expand Down Expand Up @@ -285,7 +306,6 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
}
Message::Ping(_) => {
self.send(Message::Pong(PongMessage {})).await?;
log_peer_debug(&self.node, "Pong sent");
}
Message::Pong(_) => {
// We ignore received Pong messages
Expand Down Expand Up @@ -417,8 +437,7 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
// https://github.com/ethereum/devp2p/blob/master/caps/eth.md#status-0x00
match self.receive().await? {
Message::Status(msg_data) => {
// TODO: Check message status is correct.
log_peer_debug(&self.node, "Received Status");
log_peer_debug(&self.node, &format!("Received Status {:?}", msg_data));
backend::validate_status(msg_data, &self.storage)?
}
Message::Disconnect(disconnect) => {
Expand Down
4 changes: 0 additions & 4 deletions crates/networking/p2p/rlpx/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,6 @@ impl Decoder for RLPxCodec {
let mut temp_ingress_aes = self.ingress_aes.clone();
temp_ingress_aes.apply_keystream(header_text);

// header-data = [capability-id, context-id]
// Both are unused, and always zero
assert_eq!(&header_text[3..6], &(0_u8, 0_u8).encode_to_vec());

let frame_size: usize =
u32::from_be_bytes([0, header_text[0], header_text[1], header_text[2]])
.try_into()
Expand Down
4 changes: 2 additions & 2 deletions crates/networking/p2p/rlpx/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ where
node_id: remote_state.node_id,
};
let codec = RLPxCodec::new(&local_state, &remote_state, hashed_nonces);
log_peer_debug(&node, "Completed handshake!");
log_peer_debug(&node, "Completed handshake as receiver!");
Ok(RLPxConnection::new(
context.signer,
node,
Expand All @@ -77,7 +77,7 @@ where
let hashed_nonces: [u8; 32] =
Keccak256::digest([remote_state.nonce.0, local_state.nonce.0].concat()).into();
let codec = RLPxCodec::new(&local_state, &remote_state, hashed_nonces);
log_peer_debug(&node, "Completed handshake!");
log_peer_debug(&node, "Completed handshake as initiator!");
Ok(RLPxConnection::new(
context.signer,
node,
Expand Down

0 comments on commit c38c601

Please sign in to comment.