Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
eagr committed Nov 21, 2023
1 parent 64db2e5 commit 605cd8d
Showing 1 changed file with 13 additions and 107 deletions.
120 changes: 13 additions & 107 deletions substrate/utils/frame/remote-externalities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,36 +405,24 @@ where
/// Get keys with `prefix` at `block` in a parallel manner.
async fn rpc_get_keys_parallel(
&self,
prefix: StorageKey,
prefix: &StorageKey,
block: B::Hash,
parallel: usize,
// FIXME eagr: remove it
cutoff: u32,
) -> Result<Vec<StorageKey>, &'static str> {
// Divide the workload and return the start key of each chunks. Guarantee to return a
// non-empty list.
//
// Assuming 256-bit hashes, given a prefix of length L,
// SCALE = 32 - L, in the sense of the scale of the problem.
// We don't want to divide the workload too fine, which is also counter-productive.
// For that purpose, we use CUTOFF to control how fine the chunks are.
// The number of resulted chunks is determined as (2 ^ (SCALE / CUTOFF)) * SCALE.
fn gen_start_keys(prefix: &StorageKey, cutoff: u32) -> Vec<StorageKey> {
fn gen_start_keys(prefix: &StorageKey) -> Vec<StorageKey> {
let mut prefix = prefix.as_ref().to_vec();
let scale = 32usize.saturating_sub(prefix.len());

// no need to divide workload
if scale < 8 {
if scale < 9 {
prefix.extend(vec![0; scale]);
return vec![StorageKey(prefix)]
}

// FIXME eagr: use literal after determining the best cutoff
// let cutoff = cutoff;
// grow coefficient faster for larger scale
let coefficient = 2usize.saturating_pow(scale as u32 / cutoff);
let chunks = coefficient * scale;

// FIXME: figure out a better algo for dividing workload
let chunks = 16;
let step = 0x10000 / chunks;
let ext = scale - 2;

Expand All @@ -449,7 +437,7 @@ where
.collect()
}

let start_keys = gen_start_keys(&prefix, cutoff);
let start_keys = gen_start_keys(&prefix);
let start_keys: Vec<Option<&StorageKey>> = start_keys.iter().map(Some).collect();
let mut end_keys: Vec<Option<&StorageKey>> = start_keys[1..].to_vec();
end_keys.push(None);
Expand Down Expand Up @@ -708,8 +696,7 @@ where
let start = Instant::now();
let mut sp = Spinner::with_timer(Spinners::Dots, "Scraping keys...".into());
let keys = self
// FIXME eagr: remove cutoff
.rpc_get_keys_parallel(prefix.clone(), at, parallel, 8)
.rpc_get_keys_parallel(&prefix, at, parallel)
.await?
.into_iter()
.collect::<Vec<_>>();
Expand Down Expand Up @@ -955,6 +942,7 @@ where
let mut keys_and_values = Vec::new();
for prefix in &config.hashed_prefixes {
let now = std::time::Instant::now();
// By observation, 4 parallel tasks almost always perform the best.
let additional_key_values =
self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext, 4).await?;
let elapsed = now.elapsed();
Expand Down Expand Up @@ -1158,46 +1146,6 @@ where

Ok(ext)
}

/// Get all the keys at `prefix` at `hash` using the paged, safe RPC methods.
#[cfg(test)]
async fn rpc_get_keys_paged(
&self,
prefix: StorageKey,
hash: B::Hash,
) -> Result<Vec<StorageKey>, &'static str> {
let mut last_key: Option<StorageKey> = None;
let mut all_keys: Vec<StorageKey> = vec![];
let keys = loop {
// This loop can hit the node with very rapid requests, occasionally causing it to
// error out in CI (https://github.com/paritytech/substrate/issues/14129), so we retry.
let retry_strategy =
FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
let get_page_closure =
|| self.get_keys_single_page(Some(prefix.clone()), last_key.clone(), hash);
let page = Retry::spawn(retry_strategy, get_page_closure).await?;
let page_len = page.len();

all_keys.extend(page);

if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
log::debug!(target: LOG_TARGET, "last page received: {}", page_len);
break all_keys
} else {
let new_last_key =
all_keys.last().expect("all_keys is populated; has .last(); qed");
log::debug!(
target: LOG_TARGET,
"new total = {}, full page received: {}",
all_keys.len(),
HexDisplay::from(new_last_key)
);
last_key = Some(new_last_key.clone());
};
};

Ok(keys)
}
}

// Public methods
Expand Down Expand Up @@ -1592,49 +1540,7 @@ mod remote_tests {
}

#[tokio::test]
async fn can_fetch_parallel() {
use std::time::Instant;

init_logger();

let uri = String::from("wss://polkadot-try-runtime-node.parity-chains.parity.io:443");
let mut builder = Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig { transport: uri.into(), ..Default::default() }));
builder.init_remote_client().await.unwrap();

let at = builder.as_online().at.unwrap();

// scrape all
let prefix = StorageKey(vec![]);
// original
let start = Instant::now();
builder.rpc_get_keys_paged(prefix.clone(), at).await.unwrap();
log::error!("rpc_get_keys_paged: {:?}", start.elapsed());
// 16*32 chunks
let start = Instant::now();
let p4c8 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 8).await.unwrap();
log::error!("p4c8: {:?}", start.elapsed());
// 8*32 chunks
let start = Instant::now();
let p4c10 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 10).await.unwrap();
log::error!("p4c10: {:?}", start.elapsed());
// 4*32 chunks
let start = Instant::now();
let p4c16 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 16).await.unwrap();
log::error!("p4c16: {:?}", start.elapsed());
// 2*32 chunks
let start = Instant::now();
let p4c20 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 20).await.unwrap();
log::error!("p4c20: {:?}", start.elapsed());
// 1*32 chunks
let start = Instant::now();
let p4c33 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 33).await.unwrap();
log::error!("p4c33: {:?}", start.elapsed());
assert_eq!(p4c8.len(), p4c33.len());
}

#[tokio::test]
async fn can_fetch_parallel_correctly() {
async fn can_fetch_in_parallel() {
init_logger();

let uri = String::from("wss://kusama-bridge-hub-rpc.polkadot.io:443");
Expand All @@ -1645,13 +1551,13 @@ mod remote_tests {
let at = builder.as_online().at.unwrap();

let prefix = StorageKey(vec![13]);
let paged = builder.rpc_get_keys_paged(prefix.clone(), at).await.unwrap();
let para = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 16).await.unwrap();
let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
let para = builder.rpc_get_keys_parallel(&prefix, at, 4).await.unwrap();
assert_eq!(paged, para);

let prefix = StorageKey(vec![]);
let paged = builder.rpc_get_keys_paged(prefix.clone(), at).await.unwrap();
let para = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 32).await.unwrap();
let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
let para = builder.rpc_get_keys_parallel(&prefix, at, 8).await.unwrap();
assert_eq!(paged, para);
}
}

0 comments on commit 605cd8d

Please sign in to comment.