Skip to content

Commit

Permalink
backport/stable2412: litep2p: Provide partial results to speedup GetR…
Browse files Browse the repository at this point in the history
…ecord queries (#7192)

Backport #7099 into `stable2412` from lexnv.

See the
[documentation](https://github.com/paritytech/polkadot-sdk/blob/master/docs/BACKPORT.md)
on how to use this bot.

<!--
  # To be used by other automation, do not modify:
  original-pr-number: #${pull_number}
-->

---------

Signed-off-by: Alexandru Vasile <[email protected]>
Co-authored-by: Alexandru Vasile <[email protected]>
Co-authored-by: Alexandru Vasile <[email protected]>
Co-authored-by: Egor_P <[email protected]>
  • Loading branch information
4 people authored Jan 17, 2025
1 parent 60832af commit c36fdaa
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 68 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ linked-hash-map = { version = "0.5.4" }
linked_hash_set = { version = "0.1.4" }
linregress = { version = "0.5.1" }
lite-json = { version = "0.2.0", default-features = false }
litep2p = { version = "0.8.4", features = ["websocket"] }
litep2p = { version = "0.9.0", features = ["websocket"] }
log = { version = "0.4.22", default-features = false }
macro_magic = { version = "0.5.1" }
maplit = { version = "1.0.2" }
Expand Down
16 changes: 16 additions & 0 deletions prdoc/pr_7099.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
title: Provide partial results to speedup GetRecord queries

doc:
- audience: Node Dev
description: |
This PR provides the partial results of the GetRecord kademlia query.

This significantly improves the authority discovery records, from ~37 minutes to ~2/3 minutes.
In contrast, libp2p discovers authority records in around ~10 minutes.

The authority discovery was slow because litep2p provided the records only after the Kademlia query was completed. A normal Kademlia query completes in around 40 seconds to a few minutes.
In this PR, partial records are provided as soon as they are discovered from the network.

crates:
- name: sc-network
bump: patch
33 changes: 25 additions & 8 deletions substrate/client/network/src/litep2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use litep2p::{
identify::{Config as IdentifyConfig, IdentifyEvent},
kademlia::{
Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder,
IncomingRecordValidationMode, KademliaEvent, KademliaHandle, QueryId, Quorum,
Record, RecordKey, RecordsType,
IncomingRecordValidationMode, KademliaEvent, KademliaHandle, PeerRecord, QueryId,
Quorum, Record, RecordKey,
},
ping::{Config as PingConfig, PingEvent},
},
Expand Down Expand Up @@ -129,13 +129,19 @@ pub enum DiscoveryEvent {
address: Multiaddr,
},

/// Record was found from the DHT.
/// `GetRecord` query succeeded.
GetRecordSuccess {
/// Query ID.
query_id: QueryId,
},

/// Records.
records: RecordsType,
/// Record was found from the DHT.
GetRecordPartialResult {
/// Query ID.
query_id: QueryId,

/// Record.
record: PeerRecord,
},

/// Record was successfully stored on the DHT.
Expand Down Expand Up @@ -550,13 +556,24 @@ impl Stream for Discovery {
peers: peers.into_iter().collect(),
}))
},
Poll::Ready(Some(KademliaEvent::GetRecordSuccess { query_id, records })) => {
Poll::Ready(Some(KademliaEvent::GetRecordSuccess { query_id })) => {
log::trace!(
target: LOG_TARGET,
"`GET_RECORD` succeeded for {query_id:?}: {records:?}",
"`GET_RECORD` succeeded for {query_id:?}",
);

return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id, records }));
return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id }));
},
Poll::Ready(Some(KademliaEvent::GetRecordPartialResult { query_id, record })) => {
log::trace!(
target: LOG_TARGET,
"`GET_RECORD` intermediary succeeded for {query_id:?}: {record:?}",
);

return Poll::Ready(Some(DiscoveryEvent::GetRecordPartialResult {
query_id,
record,
}));
},
Poll::Ready(Some(KademliaEvent::PutRecordSuccess { query_id, key: _ })) =>
return Poll::Ready(Some(DiscoveryEvent::PutRecordSuccess { query_id })),
Expand Down
100 changes: 43 additions & 57 deletions substrate/client/network/src/litep2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use litep2p::{
protocol::{
libp2p::{
bitswap::Config as BitswapConfig,
kademlia::{QueryId, Record, RecordsType},
kademlia::{QueryId, Record},
},
request_response::ConfigBuilder as RequestResponseConfigBuilder,
},
Expand Down Expand Up @@ -820,35 +820,60 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
self.peerstore_handle.add_known_peer(peer.into());
}
}
Some(DiscoveryEvent::GetRecordSuccess { query_id, records }) => {
match self.pending_get_values.remove(&query_id) {
None => log::warn!(
Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
if !self.pending_get_values.contains_key(&query_id) {
log::error!(
target: LOG_TARGET,
"`GET_VALUE` succeeded for a non-existent query",
),
"Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
);

continue
}

let peer_id: sc_network_types::PeerId = record.peer.into();
let record = PeerRecord {
record: P2PRecord {
key: record.record.key.to_vec().into(),
value: record.record.value,
publisher: record.record.publisher.map(|peer_id| {
let peer_id: sc_network_types::PeerId = peer_id.into();
peer_id.into()
}),
expires: record.record.expires,
},
peer: Some(peer_id.into()),
};

self.event_streams.send(
Event::Dht(
DhtEvent::ValueFound(
record.into()
)
)
);
}
Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
match self.pending_get_values.remove(&query_id) {
Some((key, started)) => {
log::trace!(
target: LOG_TARGET,
"`GET_VALUE` for {:?} ({query_id:?}) succeeded",
key,
"`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
);
for record in litep2p_to_libp2p_peer_record(records) {
self.event_streams.send(
Event::Dht(
DhtEvent::ValueFound(
record
)
)
);
}

if let Some(ref metrics) = self.metrics {
metrics
.kademlia_query_duration
.with_label_values(&["value-get"])
.observe(started.elapsed().as_secs_f64());
}
}
},
None => {
log::error!(
target: LOG_TARGET,
"Missing/invalid pending query for `GET_VALUE`: {query_id:?}"
);
debug_assert!(false);
},
}
}
Some(DiscoveryEvent::PutRecordSuccess { query_id }) => {
Expand Down Expand Up @@ -1095,42 +1120,3 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
}
}
}

// Glue code to convert from a litep2p records type to a libp2p2 PeerRecord.
fn litep2p_to_libp2p_peer_record(records: RecordsType) -> Vec<PeerRecord> {
match records {
litep2p::protocol::libp2p::kademlia::RecordsType::LocalStore(record) => {
vec![PeerRecord {
record: P2PRecord {
key: record.key.to_vec().into(),
value: record.value,
publisher: record.publisher.map(|peer_id| {
let peer_id: sc_network_types::PeerId = peer_id.into();
peer_id.into()
}),
expires: record.expires,
},
peer: None,
}]
},
litep2p::protocol::libp2p::kademlia::RecordsType::Network(records) => records
.into_iter()
.map(|record| {
let peer_id: sc_network_types::PeerId = record.peer.into();

PeerRecord {
record: P2PRecord {
key: record.record.key.to_vec().into(),
value: record.record.value,
publisher: record.record.publisher.map(|peer_id| {
let peer_id: sc_network_types::PeerId = peer_id.into();
peer_id.into()
}),
expires: record.record.expires,
},
peer: Some(peer_id.into()),
}
})
.collect::<Vec<_>>(),
}
}

0 comments on commit c36fdaa

Please sign in to comment.