Skip to content

Commit

Permalink
fix: unable to find ENR for NodeId killing network perf (#1348)
Browse files Browse the repository at this point in the history
* fix: unable to find ENR for NodeId killing network perf

* fix: `Discv5UdpSocket` also check sub-network kbuckets

* fix: calculate cache size based off max uTP connection limit

* fix: ci

* fix: cache uses full uTP limit
  • Loading branch information
KolbyML authored Jul 29, 2024
1 parent 761a523 commit a47e748
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 57 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

88 changes: 67 additions & 21 deletions portalnet/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ use discv5::{
};
use lru::LruCache;
use parking_lot::RwLock;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, RwLock as TokioRwLock};
use tracing::{debug, info, warn};
use trin_validation::oracle::HeaderOracle;
use utp_rs::{cid::ConnectionPeer, udp::AsyncUdpSocket};

use super::config::PortalnetConfig;
Expand Down Expand Up @@ -346,13 +347,73 @@ impl Discovery {
}

pub struct Discv5UdpSocket {
talk_reqs: mpsc::UnboundedReceiver<TalkRequest>,
talk_request_receiver: mpsc::UnboundedReceiver<TalkRequest>,
discv5: Arc<Discovery>,
enr_cache: Arc<TokioRwLock<LruCache<NodeId, Enr>>>,
header_oracle: Arc<TokioRwLock<HeaderOracle>>,
}

impl Discv5UdpSocket {
pub fn new(discv5: Arc<Discovery>, talk_reqs: mpsc::UnboundedReceiver<TalkRequest>) -> Self {
Self { discv5, talk_reqs }
pub fn new(
discv5: Arc<Discovery>,
talk_request_receiver: mpsc::UnboundedReceiver<TalkRequest>,
header_oracle: Arc<TokioRwLock<HeaderOracle>>,
enr_cache_capacity: usize,
) -> Self {
let enr_cache = LruCache::new(enr_cache_capacity);
let enr_cache = Arc::new(TokioRwLock::new(enr_cache));
Self {
discv5,
talk_request_receiver,
enr_cache,
header_oracle,
}
}

async fn find_enr(&mut self, node_id: &NodeId) -> io::Result<UtpEnr> {
if let Some(cached_enr) = self.enr_cache.write().await.get(node_id).cloned() {
return Ok(UtpEnr(cached_enr));
}

if let Some(enr) = self.discv5.find_enr(node_id) {
self.enr_cache.write().await.put(*node_id, enr.clone());
return Ok(UtpEnr(enr));
}

if let Some(enr) = self.discv5.cached_node_addr(node_id) {
self.enr_cache.write().await.put(*node_id, enr.enr.clone());
return Ok(UtpEnr(enr.enr));
}

let history_jsonrpc_tx = self.header_oracle.read().await.history_jsonrpc_tx();
if let Ok(history_jsonrpc_tx) = history_jsonrpc_tx {
if let Ok(enr) = HeaderOracle::history_get_enr(node_id, history_jsonrpc_tx).await {
self.enr_cache.write().await.put(*node_id, enr.clone());
return Ok(UtpEnr(enr));
}
}

let state_jsonrpc_tx = self.header_oracle.read().await.state_jsonrpc_tx();
if let Ok(state_jsonrpc_tx) = state_jsonrpc_tx {
if let Ok(enr) = HeaderOracle::state_get_enr(node_id, state_jsonrpc_tx).await {
self.enr_cache.write().await.put(*node_id, enr.clone());
return Ok(UtpEnr(enr));
}
}

let beacon_jsonrpc_tx = self.header_oracle.read().await.beacon_jsonrpc_tx();
if let Ok(beacon_jsonrpc_tx) = beacon_jsonrpc_tx {
if let Ok(enr) = HeaderOracle::beacon_get_enr(node_id, beacon_jsonrpc_tx).await {
self.enr_cache.write().await.put(*node_id, enr.clone());
return Ok(UtpEnr(enr));
}
}

debug!(node_id = %node_id, "uTP packet from unknown source");
Err(io::Error::new(
io::ErrorKind::Other,
"ENR not found for talk req destination",
))
}
}

Expand Down Expand Up @@ -418,25 +479,10 @@ impl AsyncUdpSocket<UtpEnr> for Discv5UdpSocket {
}

async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, UtpEnr)> {
match self.talk_reqs.recv().await {
match self.talk_request_receiver.recv().await {
Some(talk_req) => {
let src_node_id = talk_req.node_id();
let enr = match self.discv5.find_enr(src_node_id) {
Some(enr) => UtpEnr(enr),
None => {
let enr = match self.discv5.cached_node_addr(src_node_id) {
Some(node_addr) => Ok(node_addr.enr),
None => {
debug!(node_id = %src_node_id, "uTP packet from unknown source");
Err(io::Error::new(
io::ErrorKind::Other,
"ENR not found for talk req destination",
))
}
}?;
UtpEnr(enr)
}
};
let enr = self.find_enr(src_node_id).await?;
let packet = talk_req.body();
let n = std::cmp::min(buf.len(), packet.len());
buf[..n].copy_from_slice(&packet[..n]);
Expand Down
30 changes: 16 additions & 14 deletions portalnet/src/overlay/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -946,12 +946,12 @@ where
Ok(Content::Content(content))
} else {
// Generate a connection ID for the uTP connection.
let node_addr = self.discovery.cached_node_addr(source).ok_or_else(|| {
let enr = self.find_enr(source).ok_or_else(|| {
OverlayRequestError::AcceptError(
"unable to find ENR for NodeId".to_string(),
)
})?;
let enr = UtpEnr(node_addr.enr);
let enr = UtpEnr(enr);
let cid = self.utp_controller.cid(enr, false);
let cid_send = cid.send;

Expand Down Expand Up @@ -1038,7 +1038,7 @@ where

// if we're unable to find the ENR for the source node we throw an error
// since the enr is required for the accept queue, and it is expected to be present
let node_addr = self.discovery.cached_node_addr(source).ok_or_else(|| {
let enr = self.find_enr(source).ok_or_else(|| {
OverlayRequestError::AcceptError("unable to find ENR for NodeId".to_string())
})?;
for (i, key) in content_keys.iter().enumerate() {
Expand All @@ -1055,11 +1055,7 @@ where
})?;
if accept {
// accept all keys that are successfully added to the queue
if self
.accept_queue
.write()
.add_key_to_queue(key, &node_addr.enr)
{
if self.accept_queue.write().add_key_to_queue(key, &enr) {
accepted_keys.push(key.clone());
} else {
accept = false;
Expand All @@ -1083,10 +1079,10 @@ where

// Generate a connection ID for the uTP connection if there is data we would like to
// accept.
let node_addr = self.discovery.cached_node_addr(source).ok_or_else(|| {
let enr = self.find_enr(source).ok_or_else(|| {
OverlayRequestError::AcceptError("unable to find ENR for NodeId".to_string())
})?;
let enr = UtpEnr(node_addr.enr);
let enr = UtpEnr(enr);
let enr_str = if enabled!(Level::TRACE) {
enr.0.to_base64()
} else {
Expand Down Expand Up @@ -2656,7 +2652,7 @@ mod tests {
use discv5::kbucket::Entry;
use rstest::*;
use serial_test::serial;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::{mpsc::unbounded_channel, RwLock as TokioRwLock};
use tokio_test::{assert_pending, assert_ready, task};

use crate::{
Expand All @@ -2674,7 +2670,7 @@ mod tests {
};
use trin_metrics::portalnet::PORTALNET_METRICS;
use trin_storage::{DistanceFunction, MemoryContentStore};
use trin_validation::validator::MockValidator;
use trin_validation::{oracle::HeaderOracle, validator::MockValidator};

macro_rules! poll_command_rx {
($service:ident) => {
Expand All @@ -2691,9 +2687,15 @@ mod tests {
let temp_dir = setup_temp_dir().unwrap().into_path();
let discovery = Arc::new(Discovery::new(portal_config, temp_dir, MAINNET.clone()).unwrap());

let header_oracle = HeaderOracle::default();
let header_oracle = Arc::new(TokioRwLock::new(header_oracle));
let (_utp_talk_req_tx, utp_talk_req_rx) = unbounded_channel();
let discv5_utp =
crate::discovery::Discv5UdpSocket::new(Arc::clone(&discovery), utp_talk_req_rx);
let discv5_utp = crate::discovery::Discv5UdpSocket::new(
Arc::clone(&discovery),
utp_talk_req_rx,
header_oracle,
50,
);
let utp_socket = utp_rs::socket::UtpSocket::with_socket(discv5_utp);
let metrics = OverlayMetricsReporter {
overlay_metrics: PORTALNET_METRICS.overlay(),
Expand Down
9 changes: 6 additions & 3 deletions portalnet/tests/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
use discv5::TalkRequest;
use parking_lot::RwLock;
use tokio::{
sync::{mpsc, mpsc::unbounded_channel},
sync::{mpsc, mpsc::unbounded_channel, RwLock as TokioRwLock},
time::{self, Duration},
};
use utp_rs::socket::UtpSocket;
Expand All @@ -27,7 +27,7 @@ use portalnet::{
utils::db::setup_temp_dir,
};
use trin_storage::{ContentStore, DistanceFunction, MemoryContentStore};
use trin_validation::validator::MockValidator;
use trin_validation::{oracle::HeaderOracle, validator::MockValidator};

async fn init_overlay(
discovery: Arc<Discovery>,
Expand All @@ -39,8 +39,11 @@ async fn init_overlay(
let store = MemoryContentStore::new(node_id, DistanceFunction::Xor);
let store = Arc::new(RwLock::new(store));

let header_oracle = HeaderOracle::default();
let header_oracle = Arc::new(TokioRwLock::new(header_oracle));
let (_utp_talk_req_tx, utp_talk_req_rx) = unbounded_channel();
let discv5_utp = Discv5UdpSocket::new(Arc::clone(&discovery), utp_talk_req_rx);
let discv5_utp =
Discv5UdpSocket::new(Arc::clone(&discovery), utp_talk_req_rx, header_oracle, 50);
let utp_socket = Arc::new(UtpSocket::with_socket(discv5_utp));

let validator = Arc::new(MockValidator {});
Expand Down
37 changes: 24 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@

use std::sync::Arc;

use rpc::{launch_jsonrpc_server, RpcServerHandle};
use tokio::sync::{mpsc, RwLock};
use tracing::info;
use tree_hash::TreeHash;
use utp_rs::socket::UtpSocket;

#[cfg(windows)]
use ethportal_api::types::cli::Web3TransportType;
use ethportal_api::{
Expand All @@ -21,12 +15,17 @@ use portalnet::{
events::PortalnetEvents,
utils::db::{configure_node_data_dir, configure_trin_data_dir},
};
use rpc::{launch_jsonrpc_server, RpcServerHandle};
use tokio::sync::{mpsc, RwLock};
use tracing::info;
use tree_hash::TreeHash;
use trin_beacon::initialize_beacon_network;
use trin_history::initialize_history_network;
use trin_state::initialize_state_network;
use trin_storage::PortalStorageConfig;
use trin_utils::version::get_trin_version;
use trin_validation::oracle::HeaderOracle;
use utp_rs::socket::UtpSocket;

pub async fn run_trin(
trin_config: TrinConfig,
Expand Down Expand Up @@ -68,9 +67,27 @@ pub async fn run_trin(
prometheus_exporter::start(addr)?;
}

// Initialize validation oracle
let header_oracle = HeaderOracle::default();
info!(hash_tree_root = %hex_encode(header_oracle.header_validator.pre_merge_acc.tree_hash_root().0),"Loaded
pre-merge accumulator.");
let header_oracle = Arc::new(RwLock::new(header_oracle));

// Initialize and spawn uTP socket
let (utp_talk_reqs_tx, utp_talk_reqs_rx) = mpsc::unbounded_channel();
let discv5_utp_socket = Discv5UdpSocket::new(Arc::clone(&discovery), utp_talk_reqs_rx);

// Set the enr_cache_capacity to the maximum uTP limit between all active networks. This is
// a trade off between memory usage and increased searches from the networks for each Enr.
// utp_transfer_limit is 2x as it would be utp_transfer_limit for incoming and
// utp_transfer_limit for outgoing
let enr_cache_capacity =
portalnet_config.utp_transfer_limit * 2 * trin_config.portal_subnetworks.len();
let discv5_utp_socket = Discv5UdpSocket::new(
Arc::clone(&discovery),
utp_talk_reqs_rx,
header_oracle.clone(),
enr_cache_capacity,
);
let utp_socket = UtpSocket::with_socket(discv5_utp_socket);
let utp_socket = Arc::new(utp_socket);

Expand All @@ -80,12 +97,6 @@ pub async fn run_trin(
discovery.local_enr().node_id(),
)?;

// Initialize validation oracle
let header_oracle = HeaderOracle::default();
info!(hash_tree_root = %hex_encode(header_oracle.header_validator.pre_merge_acc.tree_hash_root().0),"Loaded
pre-merge accumulator.");
let header_oracle = Arc::new(RwLock::new(header_oracle));

// Initialize state sub-network service and event handlers, if selected
let (state_handler, state_network_task, state_event_tx, state_jsonrpc_tx, state_event_stream) =
if trin_config
Expand Down
1 change: 1 addition & 0 deletions trin-state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub async fn initialize_state_network(
StateEventStream,
)> {
let (state_jsonrpc_tx, state_jsonrpc_rx) = mpsc::unbounded_channel::<StateJsonRpcRequest>();
header_oracle.write().await.state_jsonrpc_tx = Some(state_jsonrpc_tx.clone());
let (state_event_tx, state_event_rx) = mpsc::unbounded_channel::<OverlayRequest>();

let state_network = StateNetwork::new(
Expand Down
1 change: 1 addition & 0 deletions trin-validation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ authors = ["https://github.com/ethereum/trin/graphs/contributors"]
[dependencies]
alloy-primitives = { version = "0.7.0", features = ["ssz"] }
anyhow = "1.0.68"
enr = "0.10.0"
eth2_hashing = "0.2.0"
ethereum_ssz = "0.5.3"
ethereum_ssz_derive = "0.5.3"
Expand Down
Loading

0 comments on commit a47e748

Please sign in to comment.