From 387cafa762fd0009f63290634cda810a586852c1 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 13 Feb 2025 18:25:25 +0800 Subject: [PATCH 1/6] fix: panic in downoad_file --- src/daemon/bundle.rs | 5 ++--- src/networks/actors_bundle.rs | 1 - src/tool/subcommands/state_migration_cmd.rs | 1 - src/utils/net/download_file.rs | 8 +++++--- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/daemon/bundle.rs b/src/daemon/bundle.rs index b286289bc079..e1c3c47d14b6 100644 --- a/src/daemon/bundle.rs +++ b/src/daemon/bundle.rs @@ -15,7 +15,6 @@ use once_cell::sync::Lazy; use std::mem::discriminant; use std::path::PathBuf; use std::{io::Cursor, path::Path}; -use tokio::io::BufReader; use tracing::{info, warn}; /// Tries to load the missing actor bundles to the blockstore. If the bundle is @@ -115,14 +114,14 @@ pub async fn load_actor_bundles_from_server( }; let bytes = std::fs::read(&result.path)?; - let mut stream = CarStream::new(BufReader::new(Cursor::new(bytes))).await?; + let mut stream = CarStream::new(Cursor::new(bytes)).await?; while let Some(block) = stream.try_next().await? { db.put_keyed_persistent(&block.cid, &block.data)?; } let header = stream.header_v1; anyhow::ensure!(header.roots.len() == 1); anyhow::ensure!(header.roots.first() == root); - Ok(*header.roots.first()) + Ok(*root) } ), ) diff --git a/src/networks/actors_bundle.rs b/src/networks/actors_bundle.rs index 3e247abd80a8..dad25f18b81d 100644 --- a/src/networks/actors_bundle.rs +++ b/src/networks/actors_bundle.rs @@ -186,7 +186,6 @@ pub async fn generate_actor_bundle(output: &Path) -> anyhow::Result<()> { let bytes = std::fs::read(&result.path)?; let car = CarStream::new(Cursor::new(bytes)).await?; - ensure!(car.header_v1.version == 1); ensure!(car.header_v1.roots.len() == 1); ensure!(car.header_v1.roots.first() == root); anyhow::Ok((*root, car.try_collect::>().await?)) diff --git a/src/tool/subcommands/state_migration_cmd.rs b/src/tool/subcommands/state_migration_cmd.rs index 066eb7ef5d6f..8a4c55f38788 100644 --- a/src/tool/subcommands/state_migration_cmd.rs +++ b/src/tool/subcommands/state_migration_cmd.rs @@ -27,7 +27,6 @@ impl StateMigrationCommands { let metadata = get_actor_bundles_metadata().await?; let metadata_json = serde_json::to_string_pretty(&metadata)?; println!("{}", metadata_json); - Ok(()) } } diff --git a/src/utils/net/download_file.rs b/src/utils/net/download_file.rs index 95c5bc29097e..f7b5cfa64ca0 100644 --- a/src/utils/net/download_file.rs +++ b/src/utils/net/download_file.rs @@ -38,8 +38,8 @@ pub async fn download_file_with_cache( if file_md5 == url_md5 { true } else { - println!( - "md5 hash mismatch, url: {url}, local: {}, remote: {}", + tracing::warn!( + "download again due to md5 hash mismatch, url: {url}, local cache: {}, remote: {}", hex::encode(&file_md5), hex::encode(&url_md5) ); @@ -53,7 +53,9 @@ pub async fn download_file_with_cache( None => false, }; - if !cache_hit { + if cache_hit { + tracing::debug!(%url, "loaded from cache"); + } else { download_file_with_retry( url, cache_file_path.parent().unwrap_or_else(|| Path::new(".")), From 5020fce482c41c96683d3dc50d013a8a7bc6ea49 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 13 Feb 2025 19:15:02 +0800 Subject: [PATCH 2/6] introduce DownloadFileOption --- src/cli_shared/snapshot.rs | 4 +- src/daemon/bundle.rs | 6 +-- src/daemon/db_util.rs | 9 ++++- src/networks/actors_bundle.rs | 6 +-- src/tool/offline_server/server.rs | 9 ++++- src/tool/subcommands/api_cmd/test_snapshot.rs | 7 +++- src/utils/net.rs | 39 +++++++++++++------ src/utils/net/download_file.rs | 32 ++++++++++++--- 8 files changed, 81 insertions(+), 31 deletions(-) diff --git a/src/cli_shared/snapshot.rs b/src/cli_shared/snapshot.rs index e484e9a1fe83..929a1d04a006 100644 --- a/src/cli_shared/snapshot.rs +++ b/src/cli_shared/snapshot.rs @@ -7,8 +7,8 @@ use std::{ str::FromStr, }; -use crate::networks::NetworkChain; use crate::{cli_shared::snapshot::parse::ParsedFilename, utils::net::download_file_with_retry}; +use crate::{networks::NetworkChain, utils::net::DownloadFileOption}; use anyhow::{bail, Context as _}; use chrono::NaiveDate; use url::Url; @@ -67,7 +67,7 @@ pub async fn fetch( .date_and_height_and_forest(); let filename = filename(vendor, chain, date, height, forest_format); - download_file_with_retry(&url, directory, &filename).await + download_file_with_retry(&url, directory, &filename, DownloadFileOption::Resumable).await } /// Returns diff --git a/src/daemon/bundle.rs b/src/daemon/bundle.rs index e1c3c47d14b6..9377743cd65a 100644 --- a/src/daemon/bundle.rs +++ b/src/daemon/bundle.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0, MIT use crate::db::PersistentStore; -use crate::utils::net::download_file_with_cache; +use crate::utils::net::{download_file_with_cache, DownloadFileOption}; use crate::{ networks::{ActorBundleInfo, NetworkChain, ACTOR_BUNDLES}, utils::db::car_stream::{CarBlock, CarStream}, @@ -105,12 +105,12 @@ pub async fn load_actor_bundles_from_server( version, }| async move { let result = if let Ok(response) = - download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR).await + download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::Default).await { response } else { warn!("failed to download bundle {network}-{version} from primary URL, trying alternative URL"); - download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR).await? + download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::Default).await? }; let bytes = std::fs::read(&result.path)?; diff --git a/src/daemon/db_util.rs b/src/daemon/db_util.rs index 638ed36becfb..69181c840175 100644 --- a/src/daemon/db_util.rs +++ b/src/daemon/db_util.rs @@ -8,7 +8,7 @@ use crate::networks::Height; use crate::state_manager::StateManager; use crate::utils::db::car_stream::CarStream; use crate::utils::io::EitherMmapOrRandomAccessFile; -use crate::utils::net::download_to; +use crate::utils::net::{download_to, DownloadFileOption}; use anyhow::{bail, Context}; use futures::TryStreamExt; use serde::{Deserialize, Serialize}; @@ -108,7 +108,12 @@ pub async fn import_chain_as_forest_car( let downloaded_car_temp_path = tempfile::NamedTempFile::new_in(forest_car_db_dir)?.into_temp_path(); if let Ok(url) = Url::parse(&from_path.display().to_string()) { - download_to(&url, &downloaded_car_temp_path).await?; + download_to( + &url, + &downloaded_car_temp_path, + DownloadFileOption::Resumable, + ) + .await?; } else { move_or_copy_file(from_path, &downloaded_car_temp_path, mode)?; } diff --git a/src/networks/actors_bundle.rs b/src/networks/actors_bundle.rs index dad25f18b81d..be91e46ff631 100644 --- a/src/networks/actors_bundle.rs +++ b/src/networks/actors_bundle.rs @@ -23,7 +23,7 @@ use tracing::warn; use crate::daemon::bundle::{load_actor_bundles_from_server, ACTOR_BUNDLE_CACHE_DIR}; use crate::shim::machine::BuiltinActorManifest; use crate::utils::db::car_stream::{CarStream, CarWriter}; -use crate::utils::net::download_file_with_cache; +use crate::utils::net::{download_file_with_cache, DownloadFileOption}; use std::str::FromStr; @@ -174,14 +174,14 @@ pub async fn generate_actor_bundle(output: &Path) -> anyhow::Result<()> { version, }| async move { let result = if let Ok(response) = - download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR).await + download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::Default).await { response } else { warn!( "failed to download bundle {network}-{version} from primary URL, trying alternative URL" ); - download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR).await? + download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::Default).await? }; let bytes = std::fs::read(&result.path)?; diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 2d0d11b07f46..3002fa7168f8 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -18,7 +18,7 @@ use crate::rpc::eth::filter::EthEventHandler; use crate::rpc::{start_rpc, RPCState}; use crate::shim::address::{CurrentNetwork, Network}; use crate::state_manager::StateManager; -use crate::utils::net::download_to; +use crate::utils::net::{download_to, DownloadFileOption}; use crate::JWT_IDENTIFIER; use anyhow::Context as _; use fvm_ipld_blockstore::Blockstore; @@ -223,7 +223,12 @@ async fn handle_snapshots( indicatif::HumanBytes(num_bytes) ); let downloaded_snapshot_path = std::env::current_dir()?.join(path); - download_to(&snapshot_url, &downloaded_snapshot_path).await?; + download_to( + &snapshot_url, + &downloaded_snapshot_path, + DownloadFileOption::Resumable, + ) + .await?; info!("Snapshot downloaded"); Ok(vec![downloaded_snapshot_path]) } diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs index 420bfc5a57f1..db3a6e924cc3 100644 --- a/src/tool/subcommands/api_cmd/test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -143,7 +143,7 @@ async fn ctx( #[cfg(test)] mod tests { use super::*; - use crate::utils::net::download_file_with_cache; + use crate::utils::net::{download_file_with_cache, DownloadFileOption}; use directories::ProjectDirs; use futures::{stream::FuturesUnordered, StreamExt}; use itertools::Itertools as _; @@ -175,7 +175,10 @@ mod tests { let mut tasks = FuturesUnordered::from_iter(urls.into_iter().map(|(filename, url)| { let cache_dir = cache_dir.clone(); async move { - let result = download_file_with_cache(&url, &cache_dir).await.unwrap(); + let result = + download_file_with_cache(&url, &cache_dir, DownloadFileOption::Default) + .await + .unwrap(); (filename, result.path) } })); diff --git a/src/utils/net.rs b/src/utils/net.rs index 119eeac0f597..51341b0959a4 100644 --- a/src/utils/net.rs +++ b/src/utils/net.rs @@ -42,7 +42,9 @@ pub async fn download_ipfs_file_trustlessly( tempfile::NamedTempFile::new_in(destination.parent().unwrap_or_else(|| Path::new(".")))? .into_temp_path(); { - let mut reader = reader(url.as_str()).await?.compat(); + let mut reader = reader(url.as_str(), DownloadFileOption::Resumable) + .await? + .compat(); let mut writer = futures::io::BufWriter::new(async_fs::File::create(&tmp).await?); let cid_v10 = crate::utils::cid::cid_11_to_10(cid); rs_car_ipfs::single_file::read_single_file_seek(&mut reader, &mut writer, Some(&cid_v10)) @@ -63,22 +65,37 @@ pub async fn download_ipfs_file_trustlessly( /// - uncompressed /// /// This function returns a reader of uncompressed data. -pub async fn reader(location: &str) -> anyhow::Result { +pub async fn reader( + location: &str, + option: DownloadFileOption, +) -> anyhow::Result { // This isn't the cleanest approach in terms of error-handling, but it works. If the URL is // malformed it'll end up trying to treat it as a local filepath. If that fails - an error // is thrown. let (stream, content_length) = match Url::parse(location) { Ok(url) => { info!("Downloading file: {}", url); - let resume_resp = reqwest_resume::get(url).await?; - let resp = resume_resp.response().error_for_status_ref()?; - let content_length = resp.content_length().unwrap_or_default(); - let stream = resume_resp - .bytes_stream() - .map_err(std::io::Error::other) - .pipe(tokio_util::io::StreamReader::new); - - (Left(stream), content_length) + match option { + DownloadFileOption::Resumable => { + let resume_resp = reqwest_resume::get(url).await?; + let resp = resume_resp.response().error_for_status_ref()?; + let content_length = resp.content_length().unwrap_or_default(); + let stream = resume_resp + .bytes_stream() + .map_err(std::io::Error::other) + .pipe(tokio_util::io::StreamReader::new); + (Left(Left(stream)), content_length) + } + DownloadFileOption::Default => { + let resp = reqwest::get(url).await?; + let content_length = resp.content_length().unwrap_or_default(); + let stream = resp + .bytes_stream() + .map_err(std::io::Error::other) + .pipe(tokio_util::io::StreamReader::new); + (Left(Right(stream)), content_length) + } + } } Err(_) => { info!("Reading file: {}", location); diff --git a/src/utils/net/download_file.rs b/src/utils/net/download_file.rs index f7b5cfa64ca0..f9427af7a965 100644 --- a/src/utils/net/download_file.rs +++ b/src/utils/net/download_file.rs @@ -13,6 +13,13 @@ use std::{ }; use url::Url; +#[derive(Debug, Copy, Clone)] +pub enum DownloadFileOption { + /// Non-resumable + Default, + Resumable, +} + #[derive(Debug, Clone)] pub struct DownloadFileResult { pub path: PathBuf, @@ -23,6 +30,7 @@ pub struct DownloadFileResult { pub async fn download_file_with_cache( url: &Url, cache_dir: &Path, + option: DownloadFileOption, ) -> anyhow::Result { let cache_file_path = cache_dir.join(url.path().strip_prefix('/').unwrap_or_else(|| url.path())); @@ -68,6 +76,7 @@ pub async fn download_file_with_cache( cache_file_path.display() ) })?, + option, ) .await?; } @@ -120,14 +129,19 @@ async fn get_content_md5_hash_from_url(url: Url) -> anyhow::Result anyhow::Result { +pub async fn download_http( + url: &Url, + directory: &Path, + filename: &str, + option: DownloadFileOption, +) -> anyhow::Result { if !directory.is_dir() { std::fs::create_dir_all(directory)?; } let dst_path = directory.join(filename); let destination = dst_path.display(); tracing::info!(%url, %destination, "downloading snapshot"); - let mut reader = crate::utils::net::reader(url.as_str()).await?; + let mut reader = crate::utils::net::reader(url.as_str(), option).await?; let tmp_dst_path = { // like `crdownload` for the chrome browser const DOWNLOAD_EXTENSION: &str = "frdownload"; @@ -157,18 +171,23 @@ pub async fn download_file_with_retry( url: &Url, directory: &Path, filename: &str, + option: DownloadFileOption, ) -> anyhow::Result { Ok(retry( RetryArgs { timeout: None, ..Default::default() }, - || download_http(url, directory, filename), + || download_http(url, directory, filename, option), ) .await?) } -pub async fn download_to(url: &Url, destination: &Path) -> anyhow::Result<()> { +pub async fn download_to( + url: &Url, + destination: &Path, + option: DownloadFileOption, +) -> anyhow::Result<()> { download_file_with_retry( url, destination.parent().with_context(|| { @@ -181,6 +200,7 @@ pub async fn download_to(url: &Url, destination: &Path) -> anyhow::Result<()> { .file_name() .and_then(OsStr::to_str) .with_context(|| format!("Error getting the file name of {}", destination.display()))?, + option, ) .await?; @@ -215,11 +235,11 @@ mod test { async fn test_download_file_with_cache() { let temp_dir = tempfile::tempdir().unwrap(); let url = "https://forest-snapshots.fra1.cdn.digitaloceanspaces.com/genesis/butterflynet-bafy2bzacecm7xklkq3hkc2kgm5wnb5shlxmffino6lzhh7lte5acytb7sssr4.car.zst".try_into().unwrap(); - let result = download_file_with_cache(&url, temp_dir.path()) + let result = download_file_with_cache(&url, temp_dir.path(), DownloadFileOption::Default) .await .unwrap(); assert!(!result.cache_hit); - let result = download_file_with_cache(&url, temp_dir.path()) + let result = download_file_with_cache(&url, temp_dir.path(), DownloadFileOption::Default) .await .unwrap(); assert!(result.cache_hit); From 611933e2e097acf4a32262788828bc0616a268a6 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 13 Feb 2025 19:54:51 +0800 Subject: [PATCH 3/6] Update src/utils/net/download_file.rs Co-authored-by: Hubert --- src/utils/net/download_file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/net/download_file.rs b/src/utils/net/download_file.rs index f9427af7a965..e47afe987153 100644 --- a/src/utils/net/download_file.rs +++ b/src/utils/net/download_file.rs @@ -16,7 +16,7 @@ use url::Url; #[derive(Debug, Copy, Clone)] pub enum DownloadFileOption { /// Non-resumable - Default, + NonResumable, Resumable, } From 198311b941bb615a23278c054a2ea1e9865a3fa1 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 13 Feb 2025 19:56:17 +0800 Subject: [PATCH 4/6] fix build --- src/daemon/bundle.rs | 4 ++-- src/networks/actors_bundle.rs | 4 ++-- src/tool/subcommands/api_cmd/test_snapshot.rs | 2 +- src/utils/net.rs | 2 +- src/utils/net/download_file.rs | 14 ++++++++------ 5 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/daemon/bundle.rs b/src/daemon/bundle.rs index 9377743cd65a..7d9ae8593d33 100644 --- a/src/daemon/bundle.rs +++ b/src/daemon/bundle.rs @@ -105,12 +105,12 @@ pub async fn load_actor_bundles_from_server( version, }| async move { let result = if let Ok(response) = - download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::Default).await + download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::NonResumable).await { response } else { warn!("failed to download bundle {network}-{version} from primary URL, trying alternative URL"); - download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::Default).await? + download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::NonResumable).await? }; let bytes = std::fs::read(&result.path)?; diff --git a/src/networks/actors_bundle.rs b/src/networks/actors_bundle.rs index be91e46ff631..072db6e617eb 100644 --- a/src/networks/actors_bundle.rs +++ b/src/networks/actors_bundle.rs @@ -174,14 +174,14 @@ pub async fn generate_actor_bundle(output: &Path) -> anyhow::Result<()> { version, }| async move { let result = if let Ok(response) = - download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::Default).await + download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::NonResumable).await { response } else { warn!( "failed to download bundle {network}-{version} from primary URL, trying alternative URL" ); - download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::Default).await? + download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::NonResumable).await? }; let bytes = std::fs::read(&result.path)?; diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs index db3a6e924cc3..7e070c686e76 100644 --- a/src/tool/subcommands/api_cmd/test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -176,7 +176,7 @@ mod tests { let cache_dir = cache_dir.clone(); async move { let result = - download_file_with_cache(&url, &cache_dir, DownloadFileOption::Default) + download_file_with_cache(&url, &cache_dir, DownloadFileOption::NonResumable) .await .unwrap(); (filename, result.path) diff --git a/src/utils/net.rs b/src/utils/net.rs index 51341b0959a4..6dde30df2583 100644 --- a/src/utils/net.rs +++ b/src/utils/net.rs @@ -86,7 +86,7 @@ pub async fn reader( .pipe(tokio_util::io::StreamReader::new); (Left(Left(stream)), content_length) } - DownloadFileOption::Default => { + DownloadFileOption::NonResumable => { let resp = reqwest::get(url).await?; let content_length = resp.content_length().unwrap_or_default(); let stream = resp diff --git a/src/utils/net/download_file.rs b/src/utils/net/download_file.rs index e47afe987153..1c9767202609 100644 --- a/src/utils/net/download_file.rs +++ b/src/utils/net/download_file.rs @@ -235,13 +235,15 @@ mod test { async fn test_download_file_with_cache() { let temp_dir = tempfile::tempdir().unwrap(); let url = "https://forest-snapshots.fra1.cdn.digitaloceanspaces.com/genesis/butterflynet-bafy2bzacecm7xklkq3hkc2kgm5wnb5shlxmffino6lzhh7lte5acytb7sssr4.car.zst".try_into().unwrap(); - let result = download_file_with_cache(&url, temp_dir.path(), DownloadFileOption::Default) - .await - .unwrap(); + let result = + download_file_with_cache(&url, temp_dir.path(), DownloadFileOption::NonResumable) + .await + .unwrap(); assert!(!result.cache_hit); - let result = download_file_with_cache(&url, temp_dir.path(), DownloadFileOption::Default) - .await - .unwrap(); + let result = + download_file_with_cache(&url, temp_dir.path(), DownloadFileOption::NonResumable) + .await + .unwrap(); assert!(result.cache_hit); } } From 21a579ac0b776e0d669a16335fd7e02f1062ac96 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 13 Feb 2025 20:16:03 +0800 Subject: [PATCH 5/6] Update src/utils/net/download_file.rs Co-authored-by: Hubert --- src/utils/net/download_file.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/utils/net/download_file.rs b/src/utils/net/download_file.rs index 1c9767202609..1a0e032f097a 100644 --- a/src/utils/net/download_file.rs +++ b/src/utils/net/download_file.rs @@ -15,7 +15,6 @@ use url::Url; #[derive(Debug, Copy, Clone)] pub enum DownloadFileOption { - /// Non-resumable NonResumable, Resumable, } From a9f48d84650c165d0f321ef85b2cdd8a0cc99415 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 13 Feb 2025 21:13:17 +0800 Subject: [PATCH 6/6] limit max concurrent downloading jobs --- src/daemon/bundle.rs | 41 +++++++++++-------- src/tool/subcommands/api_cmd/test_snapshot.rs | 4 ++ 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/src/daemon/bundle.rs b/src/daemon/bundle.rs index 7d9ae8593d33..3745181ac057 100644 --- a/src/daemon/bundle.rs +++ b/src/daemon/bundle.rs @@ -14,7 +14,9 @@ use futures::{stream::FuturesUnordered, TryStreamExt}; use once_cell::sync::Lazy; use std::mem::discriminant; use std::path::PathBuf; +use std::sync::Arc; use std::{io::Cursor, path::Path}; +use tokio::sync::Semaphore; use tracing::{info, warn}; /// Tries to load the missing actor bundles to the blockstore. If the bundle is @@ -87,6 +89,7 @@ pub async fn load_actor_bundles_from_server( network: &NetworkChain, bundles: &[ActorBundleInfo], ) -> anyhow::Result> { + let semaphore = Arc::new(Semaphore::new(4)); FuturesUnordered::from_iter( bundles .iter() @@ -103,25 +106,29 @@ pub async fn load_actor_bundles_from_server( alt_url, network, version, - }| async move { - let result = if let Ok(response) = - download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::NonResumable).await - { - response - } else { - warn!("failed to download bundle {network}-{version} from primary URL, trying alternative URL"); - download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::NonResumable).await? - }; + }| { + let semaphore = semaphore.clone(); + async move { + let _permit = semaphore.acquire().await?; + let result = if let Ok(response) = + download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::NonResumable).await + { + response + } else { + warn!("failed to download bundle {network}-{version} from primary URL, trying alternative URL"); + download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::NonResumable).await? + }; - let bytes = std::fs::read(&result.path)?; - let mut stream = CarStream::new(Cursor::new(bytes)).await?; - while let Some(block) = stream.try_next().await? { - db.put_keyed_persistent(&block.cid, &block.data)?; + let bytes = std::fs::read(&result.path)?; + let mut stream = CarStream::new(Cursor::new(bytes)).await?; + while let Some(block) = stream.try_next().await? { + db.put_keyed_persistent(&block.cid, &block.data)?; + } + let header = stream.header_v1; + anyhow::ensure!(header.roots.len() == 1); + anyhow::ensure!(header.roots.first() == root); + Ok(*root) } - let header = stream.header_v1; - anyhow::ensure!(header.roots.len() == 1); - anyhow::ensure!(header.roots.first() == root); - Ok(*root) } ), ) diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs index 7e070c686e76..948ac60004ef 100644 --- a/src/tool/subcommands/api_cmd/test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -147,6 +147,7 @@ mod tests { use directories::ProjectDirs; use futures::{stream::FuturesUnordered, StreamExt}; use itertools::Itertools as _; + use tokio::sync::Semaphore; use url::Url; #[tokio::test(flavor = "multi_thread")] @@ -172,9 +173,12 @@ mod tests { .collect_vec(); let project_dir = ProjectDirs::from("com", "ChainSafe", "Forest").unwrap(); let cache_dir = project_dir.cache_dir().join("test").join("rpc-snapshots"); + let semaphore = Arc::new(Semaphore::new(4)); let mut tasks = FuturesUnordered::from_iter(urls.into_iter().map(|(filename, url)| { let cache_dir = cache_dir.clone(); + let semaphore = semaphore.clone(); async move { + let _permit = semaphore.acquire().await.unwrap(); let result = download_file_with_cache(&url, &cache_dir, DownloadFileOption::NonResumable) .await