diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 17d4df2db886..69ac84371598 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -405,36 +405,24 @@ where /// Get keys with `prefix` at `block` in a parallel manner. async fn rpc_get_keys_parallel( &self, - prefix: StorageKey, + prefix: &StorageKey, block: B::Hash, parallel: usize, - // FIXME eagr: remove it - cutoff: u32, ) -> Result, &'static str> { // Divide the workload and return the start key of each chunks. Guarantee to return a // non-empty list. - // - // Assuming 256-bit hashes, given a prefix of length L, - // SCALE = 32 - L, in the sense of the scale of the problem. - // We don't want to divide the workload too fine, which is also counter-productive. - // For that purpose, we use CUTOFF to control how fine the chunks are. - // The number of resulted chunks is determined as (2 ^ (SCALE / CUTOFF)) * SCALE. - fn gen_start_keys(prefix: &StorageKey, cutoff: u32) -> Vec { + fn gen_start_keys(prefix: &StorageKey) -> Vec { let mut prefix = prefix.as_ref().to_vec(); let scale = 32usize.saturating_sub(prefix.len()); // no need to divide workload - if scale < 8 { + if scale < 9 { prefix.extend(vec![0; scale]); return vec![StorageKey(prefix)] } - // FIXME eagr: use literal after determining the best cutoff - // let cutoff = cutoff; - // grow coefficient faster for larger scale - let coefficient = 2usize.saturating_pow(scale as u32 / cutoff); - let chunks = coefficient * scale; - + // FIXME: figure out a better algo for dividing workload + let chunks = 16; let step = 0x10000 / chunks; let ext = scale - 2; @@ -449,7 +437,7 @@ where .collect() } - let start_keys = gen_start_keys(&prefix, cutoff); + let start_keys = gen_start_keys(&prefix); let start_keys: Vec> = start_keys.iter().map(Some).collect(); let mut end_keys: Vec> = start_keys[1..].to_vec(); end_keys.push(None); @@ -708,8 +696,7 @@ where let start = Instant::now(); let mut sp = Spinner::with_timer(Spinners::Dots, "Scraping keys...".into()); let keys = self - // FIXME eagr: remove cutoff - .rpc_get_keys_parallel(prefix.clone(), at, parallel, 8) + .rpc_get_keys_parallel(&prefix, at, parallel) .await? .into_iter() .collect::>(); @@ -955,6 +942,7 @@ where let mut keys_and_values = Vec::new(); for prefix in &config.hashed_prefixes { let now = std::time::Instant::now(); + // By observation, 4 parallel tasks almost always perform the best. let additional_key_values = self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext, 4).await?; let elapsed = now.elapsed(); @@ -1158,46 +1146,6 @@ where Ok(ext) } - - /// Get all the keys at `prefix` at `hash` using the paged, safe RPC methods. - #[cfg(test)] - async fn rpc_get_keys_paged( - &self, - prefix: StorageKey, - hash: B::Hash, - ) -> Result, &'static str> { - let mut last_key: Option = None; - let mut all_keys: Vec = vec![]; - let keys = loop { - // This loop can hit the node with very rapid requests, occasionally causing it to - // error out in CI (https://github.com/paritytech/substrate/issues/14129), so we retry. - let retry_strategy = - FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES); - let get_page_closure = - || self.get_keys_single_page(Some(prefix.clone()), last_key.clone(), hash); - let page = Retry::spawn(retry_strategy, get_page_closure).await?; - let page_len = page.len(); - - all_keys.extend(page); - - if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize { - log::debug!(target: LOG_TARGET, "last page received: {}", page_len); - break all_keys - } else { - let new_last_key = - all_keys.last().expect("all_keys is populated; has .last(); qed"); - log::debug!( - target: LOG_TARGET, - "new total = {}, full page received: {}", - all_keys.len(), - HexDisplay::from(new_last_key) - ); - last_key = Some(new_last_key.clone()); - }; - }; - - Ok(keys) - } } // Public methods @@ -1592,49 +1540,7 @@ mod remote_tests { } #[tokio::test] - async fn can_fetch_parallel() { - use std::time::Instant; - - init_logger(); - - let uri = String::from("wss://polkadot-try-runtime-node.parity-chains.parity.io:443"); - let mut builder = Builder::::new() - .mode(Mode::Online(OnlineConfig { transport: uri.into(), ..Default::default() })); - builder.init_remote_client().await.unwrap(); - - let at = builder.as_online().at.unwrap(); - - // scrape all - let prefix = StorageKey(vec![]); - // original - let start = Instant::now(); - builder.rpc_get_keys_paged(prefix.clone(), at).await.unwrap(); - log::error!("rpc_get_keys_paged: {:?}", start.elapsed()); - // 16*32 chunks - let start = Instant::now(); - let p4c8 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 8).await.unwrap(); - log::error!("p4c8: {:?}", start.elapsed()); - // 8*32 chunks - let start = Instant::now(); - let p4c10 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 10).await.unwrap(); - log::error!("p4c10: {:?}", start.elapsed()); - // 4*32 chunks - let start = Instant::now(); - let p4c16 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 16).await.unwrap(); - log::error!("p4c16: {:?}", start.elapsed()); - // 2*32 chunks - let start = Instant::now(); - let p4c20 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 20).await.unwrap(); - log::error!("p4c20: {:?}", start.elapsed()); - // 1*32 chunks - let start = Instant::now(); - let p4c33 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 33).await.unwrap(); - log::error!("p4c33: {:?}", start.elapsed()); - assert_eq!(p4c8.len(), p4c33.len()); - } - - #[tokio::test] - async fn can_fetch_parallel_correctly() { + async fn can_fetch_in_parallel() { init_logger(); let uri = String::from("wss://kusama-bridge-hub-rpc.polkadot.io:443"); @@ -1645,13 +1551,13 @@ mod remote_tests { let at = builder.as_online().at.unwrap(); let prefix = StorageKey(vec![13]); - let paged = builder.rpc_get_keys_paged(prefix.clone(), at).await.unwrap(); - let para = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 16).await.unwrap(); + let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap(); + let para = builder.rpc_get_keys_parallel(&prefix, at, 4).await.unwrap(); assert_eq!(paged, para); let prefix = StorageKey(vec![]); - let paged = builder.rpc_get_keys_paged(prefix.clone(), at).await.unwrap(); - let para = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 32).await.unwrap(); + let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap(); + let para = builder.rpc_get_keys_parallel(&prefix, at, 8).await.unwrap(); assert_eq!(paged, para); } }