Skip to content

Commit

Permalink
refactor: offer rpc endpoints (#1347)
Browse files Browse the repository at this point in the history
  • Loading branch information
njgheorghita authored Jul 29, 2024
1 parent 14020a1 commit 761a523
Show file tree
Hide file tree
Showing 15 changed files with 171 additions and 139 deletions.
14 changes: 13 additions & 1 deletion ethportal-api/src/beacon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,26 @@ pub trait BeaconNetworkApi {
) -> RpcResult<TraceGossipInfo>;

/// Send an OFFER request with given ContentKey, to the designated peer and wait for a response.
/// Does not store the content locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
#[method(name = "beaconOffer")]
async fn offer(
&self,
enr: Enr,
content_key: BeaconContentKey,
content_value: Option<BeaconContentValue>,
content_value: BeaconContentValue,
) -> RpcResult<AcceptInfo>;

/// Send an OFFER request with given ContentKeys, to the designated peer and wait for a
/// response. Requires the content keys to be stored locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
#[method(name = "beaconWireOffer")]
async fn wire_offer(
&self,
enr: Enr,
content_keys: Vec<BeaconContentKey>,
) -> RpcResult<AcceptInfo>;

/// Store content key with a content data to the local database.
Expand Down
14 changes: 13 additions & 1 deletion ethportal-api/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,26 @@ pub trait HistoryNetworkApi {
) -> RpcResult<TraceGossipInfo>;

/// Send an OFFER request with given ContentKey, to the designated peer and wait for a response.
/// Does not store the content locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
#[method(name = "historyOffer")]
async fn offer(
&self,
enr: Enr,
content_key: HistoryContentKey,
content_value: Option<HistoryContentValue>,
content_value: HistoryContentValue,
) -> RpcResult<AcceptInfo>;

/// Send an OFFER request with given ContentKeys, to the designated peer and wait for a
/// response. Requires the content keys to be stored locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
#[method(name = "historyWireOffer")]
async fn wire_offer(
&self,
enr: Enr,
content_keys: Vec<HistoryContentKey>,
) -> RpcResult<AcceptInfo>;

/// Store content key with a content data to the local database.
Expand Down
3 changes: 2 additions & 1 deletion ethportal-api/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,15 @@ pub trait StateNetworkApi {
) -> RpcResult<TraceGossipInfo>;

/// Send an OFFER request with given ContentKey, to the designated peer and wait for a response.
/// Does not store the content locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
#[method(name = "stateOffer")]
async fn offer(
&self,
enr: Enr,
content_key: StateContentKey,
content_value: Option<StateContentValue>,
content_value: StateContentValue,
) -> RpcResult<AcceptInfo>;

/// Store content key with a content data to the local database.
Expand Down
20 changes: 13 additions & 7 deletions ethportal-api/src/types/jsonrpc/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ pub enum StateEndpoint {
TraceRecursiveFindContent(StateContentKey),
/// params: [content_key, content_value]
Store(StateContentKey, StateContentValue),
/// params: [enr, content_key]
Offer(Enr, StateContentKey, Option<StateContentValue>),
/// params: [content_key, content_value]
/// params: [enr, content_key, content_value]
Offer(Enr, StateContentKey, StateContentValue),
/// WireOffer is not supported in the state network, since locally
/// stored values do not contain the proofs necessary for valid gossip.
/// params: [enr, content_key, content_value]
Gossip(StateContentKey, StateContentValue),
/// params: [content_key, content_value]
TraceGossip(StateContentKey, StateContentValue),
Expand Down Expand Up @@ -75,8 +77,10 @@ pub enum HistoryEndpoint {
Gossip(HistoryContentKey, HistoryContentValue),
/// params: [content_key, content_value]
TraceGossip(HistoryContentKey, HistoryContentValue),
/// params: [enr, content_key]
Offer(Enr, HistoryContentKey, Option<HistoryContentValue>),
/// params: [enr, content_key, content_value]
Offer(Enr, HistoryContentKey, HistoryContentValue),
/// params: [enr, [content_key]]
WireOffer(Enr, Vec<HistoryContentKey>),
/// params: [enr]
Ping(Enr),
/// params: content_key
Expand Down Expand Up @@ -119,8 +123,10 @@ pub enum BeaconEndpoint {
Gossip(BeaconContentKey, BeaconContentValue),
/// params: [content_key, content_value]
TraceGossip(BeaconContentKey, BeaconContentValue),
/// params: [enr, content_key]
Offer(Enr, BeaconContentKey, Option<BeaconContentValue>),
/// params: [enr, content_key, content_value]
Offer(Enr, BeaconContentKey, BeaconContentValue),
/// params: [enr, [content_key]]
WireOffer(Enr, Vec<BeaconContentKey>),
/// params: enr
Ping(Enr),
/// params: content_key
Expand Down
18 changes: 8 additions & 10 deletions ethportal-peertest/src/scenarios/offer_accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ pub async fn test_unpopulated_offer(peertest: &Peertest, target: &Client) {

assert!(store_result);

// Send unpopulated offer request from testnode to bootnode
// Send wire offer request from testnode to bootnode
let result = target
.offer(
.wire_offer(
Enr::from_str(&peertest.bootnode.enr.to_base64()).unwrap(),
content_key.clone(),
None,
vec![content_key.clone()],
)
.await
.unwrap();
Expand All @@ -59,12 +58,11 @@ pub async fn test_unpopulated_offer_fails_with_missing_content(

let (content_key, _content_value) = fixture_header_with_proof();

// validate that unpopulated offer fails if content not available locally
// validate that wire offer fails if content not available locally
match target
.offer(
.wire_offer(
Enr::from_str(&peertest.bootnode.enr.to_base64()).unwrap(),
content_key.clone(),
None,
vec![content_key.clone()],
)
.await
{
Expand All @@ -85,7 +83,7 @@ pub async fn test_populated_offer(peertest: &Peertest, target: &Client) {
.offer(
Enr::from_str(&peertest.bootnode.enr.to_base64()).unwrap(),
content_key.clone(),
Some(content_value.clone()),
content_value.clone(),
)
.await
.unwrap();
Expand Down Expand Up @@ -150,7 +148,7 @@ pub async fn test_offer_propagates_gossip(peertest: &Peertest, target: &Client)
.offer(
fresh_enr.clone(),
content_key.clone(),
Some(content_value.clone()),
content_value.clone(),
)
.await
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion ethportal-peertest/src/scenarios/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async fn test_state_offer(fixture: &StateFixture, target: &Client, peer: &Peerte
.offer(
peer.enr.clone(),
fixture.content_data.key.clone(),
Some(fixture.content_data.offer_value.clone()),
fixture.content_data.offer_value.clone(),
)
.await
.unwrap();
Expand Down
53 changes: 18 additions & 35 deletions portalnet/src/overlay/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ use ethportal_api::{
distance::{Distance, Metric},
enr::Enr,
portal_wire::{
Accept, Content, CustomPayload, FindContent, FindNodes, Message, Nodes, Offer, Ping,
Pong, PopulatedOffer, ProtocolId, Request, Response,
Accept, Content, CustomPayload, FindContent, FindNodes, Message, Nodes, Ping, Pong,
PopulatedOffer, ProtocolId, Request, Response,
},
},
utils::bytes::hex_encode,
Expand Down Expand Up @@ -518,48 +518,31 @@ where

/// Offer is sent in order to store content to k nodes with radii that contain content-id
/// Offer is also sent to nodes after FindContent (POKE)
pub async fn send_offer(
pub async fn send_wire_offer(
&self,
content_keys: Vec<RawContentKey>,
enr: Enr,
content_keys: Vec<TContentKey>,
) -> Result<Accept, OverlayRequestError> {
let content_items = content_keys
.into_iter()
.map(|key| match self.store.read().get(&key) {
Ok(Some(content)) => Ok((key.into(), content.clone())),
_ => Err(OverlayRequestError::ContentNotFound {
message: format!("Content key not found in local store: {key:02X?}"),
utp: false,
trace: None,
}),
})
.collect::<Result<Vec<(RawContentKey, Vec<u8>)>, OverlayRequestError>>()?;
// Construct the request.
let request = Offer {
content_keys: content_keys.clone(),
};
let request = PopulatedOffer { content_items };
let direction = RequestDirection::Outgoing {
destination: enr.clone(),
};

// Validate that the content keys are available in the local store, before sending the
// offer
for content_key in content_keys.into_iter() {
let content_key = TContentKey::try_from(content_key.clone()).map_err(|err| {
OverlayRequestError::ContentNotFound {
message: format!(
"Error decoding content key for content key: {content_key:02X?} - {err}"
),
utp: false,
trace: None,
}
})?;
match self.store.read().get(&content_key) {
Ok(Some(_)) => {}
_ => {
return Err(OverlayRequestError::ContentNotFound {
message: format!(
"Content key not found in local store: {content_key:02X?}"
),
utp: false,
trace: None,
});
}
}
}

// Send the request and wait on the response.
match self
.send_overlay_request(Request::Offer(request), direction)
.send_overlay_request(Request::PopulatedOffer(request), direction)
.await
{
Ok(Response::Accept(accept)) => Ok(accept),
Expand All @@ -569,7 +552,7 @@ where
}

/// Send Offer request without storing the content into db
pub async fn send_populated_offer(
pub async fn send_offer(
&self,
enr: Enr,
content_key: RawContentKey,
Expand Down
3 changes: 1 addition & 2 deletions portalnet/src/overlay/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,8 +872,7 @@ where
protocol = %self.protocol,
request.source = %source,
request.discv5.id = %request_id,
"Handling Ping message {}",
request
"Handling Ping message {request}",
);

let enr_seq = self.local_enr().seq();
Expand Down
21 changes: 17 additions & 4 deletions rpc/src/beacon_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,23 +230,36 @@ impl BeaconNetworkApiServer for BeaconNetworkApi {
}

/// Send an OFFER request with given ContentKey, to the designated peer and wait for a response.
/// If the content value is provided, a "populated" offer is used, which will not store the
/// content locally. Otherwise a regular offer is sent, after validating that the content is
/// available locally.
/// Does not store content locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
async fn offer(
&self,
enr: Enr,
content_key: BeaconContentKey,
content_value: Option<BeaconContentValue>,
content_value: BeaconContentValue,
) -> RpcResult<AcceptInfo> {
let endpoint = BeaconEndpoint::Offer(enr, content_key, content_value);
let result = self.proxy_query_to_beacon_subnet(endpoint).await?;
let result: AcceptInfo = from_value(result)?;
Ok(result)
}

/// Send an OFFER request with given ContentKeys, to the designated peer and wait for a
/// response. Requires the content keys to be stored locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
async fn wire_offer(
&self,
enr: Enr,
content_keys: Vec<BeaconContentKey>,
) -> RpcResult<AcceptInfo> {
let endpoint = BeaconEndpoint::WireOffer(enr, content_keys);
let result = self.proxy_query_to_beacon_subnet(endpoint).await?;
let result: AcceptInfo = from_value(result)?;
Ok(result)
}

/// Store content key with a content data to the local database.
async fn store(
&self,
Expand Down
21 changes: 17 additions & 4 deletions rpc/src/history_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,23 +170,36 @@ impl HistoryNetworkApiServer for HistoryNetworkApi {
}

/// Send an OFFER request with given ContentKey, to the designated peer and wait for a response.
/// If the content value is provided, a "populated" offer is used, which will not store the
/// content locally. Otherwise a regular offer is sent, after validating that the content is
/// available locally.
/// Does not store content locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
async fn offer(
&self,
enr: Enr,
content_key: HistoryContentKey,
content_value: Option<HistoryContentValue>,
content_value: HistoryContentValue,
) -> RpcResult<AcceptInfo> {
let endpoint = HistoryEndpoint::Offer(enr, content_key, content_value);
let result = proxy_query_to_history_subnet(&self.network, endpoint).await?;
let result: AcceptInfo = from_value(result)?;
Ok(result)
}

/// Send an OFFER request with given ContentKeys, to the designated peer and wait for a
/// response. Requires the content keys to be stored locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
async fn wire_offer(
&self,
enr: Enr,
content_keys: Vec<HistoryContentKey>,
) -> RpcResult<AcceptInfo> {
let endpoint = HistoryEndpoint::WireOffer(enr, content_keys);
let result = proxy_query_to_history_subnet(&self.network, endpoint).await?;
let result: AcceptInfo = from_value(result)?;
Ok(result)
}

/// Store content key with a content data to the local database.
async fn store(
&self,
Expand Down
6 changes: 2 additions & 4 deletions rpc/src/state_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,16 +190,14 @@ impl StateNetworkApiServer for StateNetworkApi {
}

/// Send an OFFER request with given ContentKey, to the designated peer and wait for a response.
/// If the content value is provided, a "populated" offer is used, which will not store the
/// content locally. Otherwise a regular offer is sent, after validating that the content is
/// available locally.
/// Does not store content locally.
/// Returns the content keys bitlist upon successful content transmission or empty bitlist
/// receive.
async fn offer(
&self,
enr: Enr,
content_key: StateContentKey,
content_value: Option<StateContentValue>,
content_value: StateContentValue,
) -> RpcResult<AcceptInfo> {
let endpoint = StateEndpoint::Offer(enr, content_key, content_value);
let result = self.proxy_query_to_state_subnet(endpoint).await?;
Expand Down
Loading

0 comments on commit 761a523

Please sign in to comment.