From 761a523df47b59c894da1dba8c1a5e1249b24284 Mon Sep 17 00:00:00 2001 From: Nick Gheorghita Date: Mon, 29 Jul 2024 10:42:24 -0400 Subject: [PATCH] refactor: offer rpc endpoints (#1347) --- ethportal-api/src/beacon.rs | 14 ++++- ethportal-api/src/history.rs | 14 ++++- ethportal-api/src/state.rs | 3 +- ethportal-api/src/types/jsonrpc/endpoints.rs | 20 ++++--- .../src/scenarios/offer_accept.rs | 18 +++---- ethportal-peertest/src/scenarios/state.rs | 2 +- portalnet/src/overlay/protocol.rs | 53 +++++++------------ portalnet/src/overlay/service.rs | 3 +- rpc/src/beacon_rpc.rs | 21 ++++++-- rpc/src/history_rpc.rs | 21 ++++++-- rpc/src/state_rpc.rs | 6 +-- trin-beacon/src/jsonrpc.rs | 50 +++++++++-------- trin-execution/src/storage/evm_db.rs | 1 - trin-history/src/jsonrpc.rs | 49 +++++++++-------- trin-state/src/jsonrpc.rs | 35 ++++-------- 15 files changed, 171 insertions(+), 139 deletions(-) diff --git a/ethportal-api/src/beacon.rs b/ethportal-api/src/beacon.rs index 9ac8c88ef..ee66031bc 100644 --- a/ethportal-api/src/beacon.rs +++ b/ethportal-api/src/beacon.rs @@ -99,6 +99,7 @@ pub trait BeaconNetworkApi { ) -> RpcResult; /// Send an OFFER request with given ContentKey, to the designated peer and wait for a response. + /// Does not store the content locally. /// Returns the content keys bitlist upon successful content transmission or empty bitlist /// receive. #[method(name = "beaconOffer")] @@ -106,7 +107,18 @@ pub trait BeaconNetworkApi { &self, enr: Enr, content_key: BeaconContentKey, - content_value: Option, + content_value: BeaconContentValue, + ) -> RpcResult; + + /// Send an OFFER request with given ContentKeys, to the designated peer and wait for a + /// response. Requires the content keys to be stored locally. + /// Returns the content keys bitlist upon successful content transmission or empty bitlist + /// receive. + #[method(name = "beaconWireOffer")] + async fn wire_offer( + &self, + enr: Enr, + content_keys: Vec, ) -> RpcResult; /// Store content key with a content data to the local database. diff --git a/ethportal-api/src/history.rs b/ethportal-api/src/history.rs index b4dd4dcd2..e09fbbdd5 100644 --- a/ethportal-api/src/history.rs +++ b/ethportal-api/src/history.rs @@ -99,6 +99,7 @@ pub trait HistoryNetworkApi { ) -> RpcResult; /// Send an OFFER request with given ContentKey, to the designated peer and wait for a response. + /// Does not store the content locally. /// Returns the content keys bitlist upon successful content transmission or empty bitlist /// receive. #[method(name = "historyOffer")] @@ -106,7 +107,18 @@ pub trait HistoryNetworkApi { &self, enr: Enr, content_key: HistoryContentKey, - content_value: Option, + content_value: HistoryContentValue, + ) -> RpcResult; + + /// Send an OFFER request with given ContentKeys, to the designated peer and wait for a + /// response. Requires the content keys to be stored locally. + /// Returns the content keys bitlist upon successful content transmission or empty bitlist + /// receive. + #[method(name = "historyWireOffer")] + async fn wire_offer( + &self, + enr: Enr, + content_keys: Vec, ) -> RpcResult; /// Store content key with a content data to the local database. diff --git a/ethportal-api/src/state.rs b/ethportal-api/src/state.rs index bc18b664e..2be2c01f8 100644 --- a/ethportal-api/src/state.rs +++ b/ethportal-api/src/state.rs @@ -92,6 +92,7 @@ pub trait StateNetworkApi { ) -> RpcResult; /// Send an OFFER request with given ContentKey, to the designated peer and wait for a response. + /// Does not store the content locally. /// Returns the content keys bitlist upon successful content transmission or empty bitlist /// receive. #[method(name = "stateOffer")] @@ -99,7 +100,7 @@ pub trait StateNetworkApi { &self, enr: Enr, content_key: StateContentKey, - content_value: Option, + content_value: StateContentValue, ) -> RpcResult; /// Store content key with a content data to the local database. diff --git a/ethportal-api/src/types/jsonrpc/endpoints.rs b/ethportal-api/src/types/jsonrpc/endpoints.rs index 57c96af48..83f3980e7 100644 --- a/ethportal-api/src/types/jsonrpc/endpoints.rs +++ b/ethportal-api/src/types/jsonrpc/endpoints.rs @@ -42,9 +42,11 @@ pub enum StateEndpoint { TraceRecursiveFindContent(StateContentKey), /// params: [content_key, content_value] Store(StateContentKey, StateContentValue), - /// params: [enr, content_key] - Offer(Enr, StateContentKey, Option), - /// params: [content_key, content_value] + /// params: [enr, content_key, content_value] + Offer(Enr, StateContentKey, StateContentValue), + /// WireOffer is not supported in the state network, since locally + /// stored values do not contain the proofs necessary for valid gossip. + /// params: [enr, content_key, content_value] Gossip(StateContentKey, StateContentValue), /// params: [content_key, content_value] TraceGossip(StateContentKey, StateContentValue), @@ -75,8 +77,10 @@ pub enum HistoryEndpoint { Gossip(HistoryContentKey, HistoryContentValue), /// params: [content_key, content_value] TraceGossip(HistoryContentKey, HistoryContentValue), - /// params: [enr, content_key] - Offer(Enr, HistoryContentKey, Option), + /// params: [enr, content_key, content_value] + Offer(Enr, HistoryContentKey, HistoryContentValue), + /// params: [enr, [content_key]] + WireOffer(Enr, Vec), /// params: [enr] Ping(Enr), /// params: content_key @@ -119,8 +123,10 @@ pub enum BeaconEndpoint { Gossip(BeaconContentKey, BeaconContentValue), /// params: [content_key, content_value] TraceGossip(BeaconContentKey, BeaconContentValue), - /// params: [enr, content_key] - Offer(Enr, BeaconContentKey, Option), + /// params: [enr, content_key, content_value] + Offer(Enr, BeaconContentKey, BeaconContentValue), + /// params: [enr, [content_key]] + WireOffer(Enr, Vec), /// params: enr Ping(Enr), /// params: content_key diff --git a/ethportal-peertest/src/scenarios/offer_accept.rs b/ethportal-peertest/src/scenarios/offer_accept.rs index fe52aa240..adf2562f2 100644 --- a/ethportal-peertest/src/scenarios/offer_accept.rs +++ b/ethportal-peertest/src/scenarios/offer_accept.rs @@ -29,12 +29,11 @@ pub async fn test_unpopulated_offer(peertest: &Peertest, target: &Client) { assert!(store_result); - // Send unpopulated offer request from testnode to bootnode + // Send wire offer request from testnode to bootnode let result = target - .offer( + .wire_offer( Enr::from_str(&peertest.bootnode.enr.to_base64()).unwrap(), - content_key.clone(), - None, + vec![content_key.clone()], ) .await .unwrap(); @@ -59,12 +58,11 @@ pub async fn test_unpopulated_offer_fails_with_missing_content( let (content_key, _content_value) = fixture_header_with_proof(); - // validate that unpopulated offer fails if content not available locally + // validate that wire offer fails if content not available locally match target - .offer( + .wire_offer( Enr::from_str(&peertest.bootnode.enr.to_base64()).unwrap(), - content_key.clone(), - None, + vec![content_key.clone()], ) .await { @@ -85,7 +83,7 @@ pub async fn test_populated_offer(peertest: &Peertest, target: &Client) { .offer( Enr::from_str(&peertest.bootnode.enr.to_base64()).unwrap(), content_key.clone(), - Some(content_value.clone()), + content_value.clone(), ) .await .unwrap(); @@ -150,7 +148,7 @@ pub async fn test_offer_propagates_gossip(peertest: &Peertest, target: &Client) .offer( fresh_enr.clone(), content_key.clone(), - Some(content_value.clone()), + content_value.clone(), ) .await .unwrap(); diff --git a/ethportal-peertest/src/scenarios/state.rs b/ethportal-peertest/src/scenarios/state.rs index 13d0caed9..fe5c48e23 100644 --- a/ethportal-peertest/src/scenarios/state.rs +++ b/ethportal-peertest/src/scenarios/state.rs @@ -47,7 +47,7 @@ async fn test_state_offer(fixture: &StateFixture, target: &Client, peer: &Peerte .offer( peer.enr.clone(), fixture.content_data.key.clone(), - Some(fixture.content_data.offer_value.clone()), + fixture.content_data.offer_value.clone(), ) .await .unwrap(); diff --git a/portalnet/src/overlay/protocol.rs b/portalnet/src/overlay/protocol.rs index 993e3c7f6..b632dd798 100644 --- a/portalnet/src/overlay/protocol.rs +++ b/portalnet/src/overlay/protocol.rs @@ -42,8 +42,8 @@ use ethportal_api::{ distance::{Distance, Metric}, enr::Enr, portal_wire::{ - Accept, Content, CustomPayload, FindContent, FindNodes, Message, Nodes, Offer, Ping, - Pong, PopulatedOffer, ProtocolId, Request, Response, + Accept, Content, CustomPayload, FindContent, FindNodes, Message, Nodes, Ping, Pong, + PopulatedOffer, ProtocolId, Request, Response, }, }, utils::bytes::hex_encode, @@ -518,48 +518,31 @@ where /// Offer is sent in order to store content to k nodes with radii that contain content-id /// Offer is also sent to nodes after FindContent (POKE) - pub async fn send_offer( + pub async fn send_wire_offer( &self, - content_keys: Vec, enr: Enr, + content_keys: Vec, ) -> Result { + let content_items = content_keys + .into_iter() + .map(|key| match self.store.read().get(&key) { + Ok(Some(content)) => Ok((key.into(), content.clone())), + _ => Err(OverlayRequestError::ContentNotFound { + message: format!("Content key not found in local store: {key:02X?}"), + utp: false, + trace: None, + }), + }) + .collect::)>, OverlayRequestError>>()?; // Construct the request. - let request = Offer { - content_keys: content_keys.clone(), - }; + let request = PopulatedOffer { content_items }; let direction = RequestDirection::Outgoing { destination: enr.clone(), }; - // Validate that the content keys are available in the local store, before sending the - // offer - for content_key in content_keys.into_iter() { - let content_key = TContentKey::try_from(content_key.clone()).map_err(|err| { - OverlayRequestError::ContentNotFound { - message: format!( - "Error decoding content key for content key: {content_key:02X?} - {err}" - ), - utp: false, - trace: None, - } - })?; - match self.store.read().get(&content_key) { - Ok(Some(_)) => {} - _ => { - return Err(OverlayRequestError::ContentNotFound { - message: format!( - "Content key not found in local store: {content_key:02X?}" - ), - utp: false, - trace: None, - }); - } - } - } - // Send the request and wait on the response. match self - .send_overlay_request(Request::Offer(request), direction) + .send_overlay_request(Request::PopulatedOffer(request), direction) .await { Ok(Response::Accept(accept)) => Ok(accept), @@ -569,7 +552,7 @@ where } /// Send Offer request without storing the content into db - pub async fn send_populated_offer( + pub async fn send_offer( &self, enr: Enr, content_key: RawContentKey, diff --git a/portalnet/src/overlay/service.rs b/portalnet/src/overlay/service.rs index 30d175c0a..9f4aa6dd7 100644 --- a/portalnet/src/overlay/service.rs +++ b/portalnet/src/overlay/service.rs @@ -872,8 +872,7 @@ where protocol = %self.protocol, request.source = %source, request.discv5.id = %request_id, - "Handling Ping message {}", - request + "Handling Ping message {request}", ); let enr_seq = self.local_enr().seq(); diff --git a/rpc/src/beacon_rpc.rs b/rpc/src/beacon_rpc.rs index 6aec3797f..54dedd7d7 100644 --- a/rpc/src/beacon_rpc.rs +++ b/rpc/src/beacon_rpc.rs @@ -230,16 +230,14 @@ impl BeaconNetworkApiServer for BeaconNetworkApi { } /// Send an OFFER request with given ContentKey, to the designated peer and wait for a response. - /// If the content value is provided, a "populated" offer is used, which will not store the - /// content locally. Otherwise a regular offer is sent, after validating that the content is - /// available locally. + /// Does not store content locally. /// Returns the content keys bitlist upon successful content transmission or empty bitlist /// receive. async fn offer( &self, enr: Enr, content_key: BeaconContentKey, - content_value: Option, + content_value: BeaconContentValue, ) -> RpcResult { let endpoint = BeaconEndpoint::Offer(enr, content_key, content_value); let result = self.proxy_query_to_beacon_subnet(endpoint).await?; @@ -247,6 +245,21 @@ impl BeaconNetworkApiServer for BeaconNetworkApi { Ok(result) } + /// Send an OFFER request with given ContentKeys, to the designated peer and wait for a + /// response. Requires the content keys to be stored locally. + /// Returns the content keys bitlist upon successful content transmission or empty bitlist + /// receive. + async fn wire_offer( + &self, + enr: Enr, + content_keys: Vec, + ) -> RpcResult { + let endpoint = BeaconEndpoint::WireOffer(enr, content_keys); + let result = self.proxy_query_to_beacon_subnet(endpoint).await?; + let result: AcceptInfo = from_value(result)?; + Ok(result) + } + /// Store content key with a content data to the local database. async fn store( &self, diff --git a/rpc/src/history_rpc.rs b/rpc/src/history_rpc.rs index bd4082821..2c21f60b6 100644 --- a/rpc/src/history_rpc.rs +++ b/rpc/src/history_rpc.rs @@ -170,16 +170,14 @@ impl HistoryNetworkApiServer for HistoryNetworkApi { } /// Send an OFFER request with given ContentKey, to the designated peer and wait for a response. - /// If the content value is provided, a "populated" offer is used, which will not store the - /// content locally. Otherwise a regular offer is sent, after validating that the content is - /// available locally. + /// Does not store content locally. /// Returns the content keys bitlist upon successful content transmission or empty bitlist /// receive. async fn offer( &self, enr: Enr, content_key: HistoryContentKey, - content_value: Option, + content_value: HistoryContentValue, ) -> RpcResult { let endpoint = HistoryEndpoint::Offer(enr, content_key, content_value); let result = proxy_query_to_history_subnet(&self.network, endpoint).await?; @@ -187,6 +185,21 @@ impl HistoryNetworkApiServer for HistoryNetworkApi { Ok(result) } + /// Send an OFFER request with given ContentKeys, to the designated peer and wait for a + /// response. Requires the content keys to be stored locally. + /// Returns the content keys bitlist upon successful content transmission or empty bitlist + /// receive. + async fn wire_offer( + &self, + enr: Enr, + content_keys: Vec, + ) -> RpcResult { + let endpoint = HistoryEndpoint::WireOffer(enr, content_keys); + let result = proxy_query_to_history_subnet(&self.network, endpoint).await?; + let result: AcceptInfo = from_value(result)?; + Ok(result) + } + /// Store content key with a content data to the local database. async fn store( &self, diff --git a/rpc/src/state_rpc.rs b/rpc/src/state_rpc.rs index 9e0e60368..5ad2f3a85 100644 --- a/rpc/src/state_rpc.rs +++ b/rpc/src/state_rpc.rs @@ -190,16 +190,14 @@ impl StateNetworkApiServer for StateNetworkApi { } /// Send an OFFER request with given ContentKey, to the designated peer and wait for a response. - /// If the content value is provided, a "populated" offer is used, which will not store the - /// content locally. Otherwise a regular offer is sent, after validating that the content is - /// available locally. + /// Does not store content locally. /// Returns the content keys bitlist upon successful content transmission or empty bitlist /// receive. async fn offer( &self, enr: Enr, content_key: StateContentKey, - content_value: Option, + content_value: StateContentValue, ) -> RpcResult { let endpoint = StateEndpoint::Offer(enr, content_key, content_value); let result = self.proxy_query_to_state_subnet(endpoint).await?; diff --git a/trin-beacon/src/jsonrpc.rs b/trin-beacon/src/jsonrpc.rs index 51308696a..500274a89 100644 --- a/trin-beacon/src/jsonrpc.rs +++ b/trin-beacon/src/jsonrpc.rs @@ -12,7 +12,7 @@ use ethportal_api::{ query_trace::QueryTrace, }, utils::bytes::hex_encode, - BeaconContentKey, BeaconContentValue, OverlayContentKey, RawContentKey, + BeaconContentKey, BeaconContentValue, OverlayContentKey, }; use portalnet::overlay::errors::OverlayRequestError; use serde_json::{json, Value}; @@ -75,6 +75,9 @@ async fn complete_request(network: Arc, request: BeaconJsonRpcReq BeaconEndpoint::Offer(enr, content_key, content_value) => { offer(network, enr, content_key, content_value).await } + BeaconEndpoint::WireOffer(enr, content_keys) => { + wire_offer(network, enr, content_keys).await + } BeaconEndpoint::Ping(enr) => ping(network, enr).await, BeaconEndpoint::RoutingTableInfo => { serde_json::to_value(network.overlay.routing_table_info()) @@ -331,28 +334,31 @@ async fn offer( network: Arc, enr: discv5::enr::Enr, content_key: BeaconContentKey, - content_value: Option, + content_value: BeaconContentValue, ) -> Result { - if let Some(content_value) = content_value { - let content_value = content_value.encode(); - match network - .overlay - .send_populated_offer(enr, content_key.into(), content_value) - .await - { - Ok(accept) => Ok(json!(AcceptInfo { - content_keys: accept.content_keys, - })), - Err(msg) => Err(format!("Populated Offer request timeout: {msg:?}")), - } - } else { - let content_key: Vec = vec![content_key.to_bytes()]; - match network.overlay.send_offer(content_key, enr).await { - Ok(accept) => Ok(json!(AcceptInfo { - content_keys: accept.content_keys, - })), - Err(msg) => Err(format!("Offer request timeout: {msg:?}")), - } + match network + .overlay + .send_offer(enr, content_key.into(), content_value.encode()) + .await + { + Ok(accept) => Ok(json!(AcceptInfo { + content_keys: accept.content_keys, + })), + Err(msg) => Err(format!("Offer request timeout: {msg:?}")), + } +} + +/// Constructs a JSON call for the WireOffer method. +async fn wire_offer( + network: Arc, + enr: discv5::enr::Enr, + content_keys: Vec, +) -> Result { + match network.overlay.send_wire_offer(enr, content_keys).await { + Ok(accept) => Ok(json!(AcceptInfo { + content_keys: accept.content_keys, + })), + Err(msg) => Err(format!("WireOffer request timeout: {msg:?}")), } } diff --git a/trin-execution/src/storage/evm_db.rs b/trin-execution/src/storage/evm_db.rs index 55721577f..f9d1843c7 100644 --- a/trin-execution/src/storage/evm_db.rs +++ b/trin-execution/src/storage/evm_db.rs @@ -24,7 +24,6 @@ use super::{ const REVERSE_HASH_LOOKUP_PREFIX: &[u8] = b"reverse hash lookup"; #[derive(Debug, Clone)] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct EvmDB { /// State config pub config: StateConfig, diff --git a/trin-history/src/jsonrpc.rs b/trin-history/src/jsonrpc.rs index 19cc4f391..f8db1e11f 100644 --- a/trin-history/src/jsonrpc.rs +++ b/trin-history/src/jsonrpc.rs @@ -11,7 +11,7 @@ use ethportal_api::{ query_trace::QueryTrace, }, utils::bytes::hex_encode, - ContentValue, HistoryContentKey, OverlayContentKey, RawContentKey, + ContentValue, HistoryContentKey, HistoryContentValue, OverlayContentKey, }; use portalnet::overlay::errors::OverlayRequestError; use serde_json::{json, Value}; @@ -74,6 +74,9 @@ async fn complete_request(network: Arc, request: HistoryJsonRpcR HistoryEndpoint::Offer(enr, content_key, content_value) => { offer(network, enr, content_key, content_value).await } + HistoryEndpoint::WireOffer(enr, content_keys) => { + wire_offer(network, enr, content_keys).await + } HistoryEndpoint::Ping(enr) => ping(network, enr).await, HistoryEndpoint::RoutingTableInfo => { serde_json::to_value(network.overlay.routing_table_info()) @@ -330,31 +333,33 @@ async fn offer( network: Arc, enr: discv5::enr::Enr, content_key: HistoryContentKey, - content_value: Option, + content_value: HistoryContentValue, ) -> Result { - if let Some(content_value) = content_value { - let content_value = content_value.encode(); - match network - .overlay - .send_populated_offer(enr, content_key.into(), content_value) - .await - { - Ok(accept) => Ok(json!(AcceptInfo { - content_keys: accept.content_keys, - })), - Err(msg) => Err(format!("Populated Offer request timeout: {msg:?}")), - } - } else { - let content_key: Vec = vec![content_key.to_bytes()]; - match network.overlay.send_offer(content_key, enr).await { - Ok(accept) => Ok(json!(AcceptInfo { - content_keys: accept.content_keys, - })), - Err(msg) => Err(format!("Offer request timeout: {msg:?}")), - } + match network + .overlay + .send_offer(enr, content_key.into(), content_value.encode()) + .await + { + Ok(accept) => Ok(json!(AcceptInfo { + content_keys: accept.content_keys, + })), + Err(msg) => Err(format!("Offer request timeout: {msg:?}")), } } +/// Constructs a JSON call for the WireOffer method. +async fn wire_offer( + network: Arc, + enr: discv5::enr::Enr, + content_keys: Vec, +) -> Result { + match network.overlay.send_wire_offer(enr, content_keys).await { + Ok(accept) => Ok(json!(AcceptInfo { + content_keys: accept.content_keys, + })), + Err(msg) => Err(format!("WireOffer request timeout: {msg:?}")), + } +} /// Constructs a JSON call for the Ping method. async fn ping( network: Arc, diff --git a/trin-state/src/jsonrpc.rs b/trin-state/src/jsonrpc.rs index 4b6a84a55..8dc4f005c 100644 --- a/trin-state/src/jsonrpc.rs +++ b/trin-state/src/jsonrpc.rs @@ -304,31 +304,18 @@ async fn offer( network: Arc, enr: Enr, content_key: StateContentKey, - content_value: Option, + content_value: StateContentValue, ) -> Result { - if let Some(content_value) = content_value { - to_json_result( - "Populate Offer", - network - .overlay - .send_populated_offer(enr, content_key.into(), content_value.encode()) - .await - .map(|accept| AcceptInfo { - content_keys: accept.content_keys, - }), - ) - } else { - to_json_result( - "Offer", - network - .overlay - .send_offer(vec![content_key.into()], enr) - .await - .map(|accept| AcceptInfo { - content_keys: accept.content_keys, - }), - ) - } + to_json_result( + "Offer", + network + .overlay + .send_offer(enr, content_key.into(), content_value.encode()) + .await + .map(|accept| AcceptInfo { + content_keys: accept.content_keys, + }), + ) } async fn gossip(