Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use non-resumable downloading for actor bundles to avoid CI test flakyness #5286

Merged
merged 8 commits into from
Feb 13, 2025
4 changes: 2 additions & 2 deletions src/cli_shared/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::{
str::FromStr,
};

use crate::networks::NetworkChain;
use crate::{cli_shared::snapshot::parse::ParsedFilename, utils::net::download_file_with_retry};
use crate::{networks::NetworkChain, utils::net::DownloadFileOption};
use anyhow::{bail, Context as _};
use chrono::NaiveDate;
use url::Url;
Expand Down Expand Up @@ -67,7 +67,7 @@ pub async fn fetch(
.date_and_height_and_forest();
let filename = filename(vendor, chain, date, height, forest_format);

download_file_with_retry(&url, directory, &filename).await
download_file_with_retry(&url, directory, &filename, DownloadFileOption::Resumable).await
}

/// Returns
Expand Down
44 changes: 25 additions & 19 deletions src/daemon/bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::db::PersistentStore;
use crate::utils::net::download_file_with_cache;
use crate::utils::net::{download_file_with_cache, DownloadFileOption};
use crate::{
networks::{ActorBundleInfo, NetworkChain, ACTOR_BUNDLES},
utils::db::car_stream::{CarBlock, CarStream},
Expand All @@ -14,8 +14,9 @@ use futures::{stream::FuturesUnordered, TryStreamExt};
use once_cell::sync::Lazy;
use std::mem::discriminant;
use std::path::PathBuf;
use std::sync::Arc;
use std::{io::Cursor, path::Path};
use tokio::io::BufReader;
use tokio::sync::Semaphore;
use tracing::{info, warn};

/// Tries to load the missing actor bundles to the blockstore. If the bundle is
Expand Down Expand Up @@ -88,6 +89,7 @@ pub async fn load_actor_bundles_from_server(
network: &NetworkChain,
bundles: &[ActorBundleInfo],
) -> anyhow::Result<Vec<Cid>> {
let semaphore = Arc::new(Semaphore::new(4));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 4 in particular? And not 3 or 10?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think any value between 2-8 is fine, 4 is somewhere in the middle.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 9 is not fine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Larger number is faster but more error-prone. Previously, all tasks run in parallel and the test was quite flaky

Copy link
Contributor Author

@hanabi1224 hanabi1224 Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the time cost of the test, 4 makes it more stable and still fast

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can try, but I'm not 💯 convinced. I'd rather we have a deterministically correct and working solution. This seems like reducing error probability from an unknown chance to another unknown (but likely smaller) chance, while always paying the price of reduced performance (slower startup for fresh nodes).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've been using the same logic for RPC snapshot test and it's much more stable, the only notable difference is, actor bundle uses Github assets primarily while RPC snapshot test uses DO space CDN.

Copy link
Contributor Author

@hanabi1224 hanabi1224 Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's a common practice to limit max concurrent downloads, e.g. the author uses 8 in https://patshaughnessy.net/2020/1/20/downloading-100000-files-using-async-rust
And I think the number really depends on the network environment between client and remote server

FuturesUnordered::from_iter(
bundles
.iter()
Expand All @@ -104,25 +106,29 @@ pub async fn load_actor_bundles_from_server(
alt_url,
network,
version,
}| async move {
let result = if let Ok(response) =
download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR).await
{
response
} else {
warn!("failed to download bundle {network}-{version} from primary URL, trying alternative URL");
download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR).await?
};
}| {
let semaphore = semaphore.clone();
async move {
let _permit = semaphore.acquire().await?;
let result = if let Ok(response) =
download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::NonResumable).await
{
response
} else {
warn!("failed to download bundle {network}-{version} from primary URL, trying alternative URL");
download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::NonResumable).await?
};

let bytes = std::fs::read(&result.path)?;
let mut stream = CarStream::new(BufReader::new(Cursor::new(bytes))).await?;
while let Some(block) = stream.try_next().await? {
db.put_keyed_persistent(&block.cid, &block.data)?;
let bytes = std::fs::read(&result.path)?;
let mut stream = CarStream::new(Cursor::new(bytes)).await?;
while let Some(block) = stream.try_next().await? {
db.put_keyed_persistent(&block.cid, &block.data)?;
}
let header = stream.header_v1;
anyhow::ensure!(header.roots.len() == 1);
anyhow::ensure!(header.roots.first() == root);
Ok(*root)
}
let header = stream.header_v1;
anyhow::ensure!(header.roots.len() == 1);
anyhow::ensure!(header.roots.first() == root);
Ok(*header.roots.first())
}
),
)
Expand Down
9 changes: 7 additions & 2 deletions src/daemon/db_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::networks::Height;
use crate::state_manager::StateManager;
use crate::utils::db::car_stream::CarStream;
use crate::utils::io::EitherMmapOrRandomAccessFile;
use crate::utils::net::download_to;
use crate::utils::net::{download_to, DownloadFileOption};
use anyhow::{bail, Context};
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -108,7 +108,12 @@ pub async fn import_chain_as_forest_car(
let downloaded_car_temp_path =
tempfile::NamedTempFile::new_in(forest_car_db_dir)?.into_temp_path();
if let Ok(url) = Url::parse(&from_path.display().to_string()) {
download_to(&url, &downloaded_car_temp_path).await?;
download_to(
&url,
&downloaded_car_temp_path,
DownloadFileOption::Resumable,
)
.await?;
} else {
move_or_copy_file(from_path, &downloaded_car_temp_path, mode)?;
}
Expand Down
7 changes: 3 additions & 4 deletions src/networks/actors_bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tracing::warn;
use crate::daemon::bundle::{load_actor_bundles_from_server, ACTOR_BUNDLE_CACHE_DIR};
use crate::shim::machine::BuiltinActorManifest;
use crate::utils::db::car_stream::{CarStream, CarWriter};
use crate::utils::net::download_file_with_cache;
use crate::utils::net::{download_file_with_cache, DownloadFileOption};

use std::str::FromStr;

Expand Down Expand Up @@ -174,19 +174,18 @@ pub async fn generate_actor_bundle(output: &Path) -> anyhow::Result<()> {
version,
}| async move {
let result = if let Ok(response) =
download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR).await
download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::NonResumable).await
{
response
} else {
warn!(
"failed to download bundle {network}-{version} from primary URL, trying alternative URL"
);
download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR).await?
download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::NonResumable).await?
};

let bytes = std::fs::read(&result.path)?;
let car = CarStream::new(Cursor::new(bytes)).await?;
ensure!(car.header_v1.version == 1);
ensure!(car.header_v1.roots.len() == 1);
ensure!(car.header_v1.roots.first() == root);
anyhow::Ok((*root, car.try_collect::<Vec<_>>().await?))
Expand Down
9 changes: 7 additions & 2 deletions src/tool/offline_server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::rpc::eth::filter::EthEventHandler;
use crate::rpc::{start_rpc, RPCState};
use crate::shim::address::{CurrentNetwork, Network};
use crate::state_manager::StateManager;
use crate::utils::net::download_to;
use crate::utils::net::{download_to, DownloadFileOption};
use crate::JWT_IDENTIFIER;
use anyhow::Context as _;
use fvm_ipld_blockstore::Blockstore;
Expand Down Expand Up @@ -223,7 +223,12 @@ async fn handle_snapshots(
indicatif::HumanBytes(num_bytes)
);
let downloaded_snapshot_path = std::env::current_dir()?.join(path);
download_to(&snapshot_url, &downloaded_snapshot_path).await?;
download_to(
&snapshot_url,
&downloaded_snapshot_path,
DownloadFileOption::Resumable,
)
.await?;
info!("Snapshot downloaded");
Ok(vec![downloaded_snapshot_path])
}
Expand Down
11 changes: 9 additions & 2 deletions src/tool/subcommands/api_cmd/test_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,11 @@ async fn ctx(
#[cfg(test)]
mod tests {
use super::*;
use crate::utils::net::download_file_with_cache;
use crate::utils::net::{download_file_with_cache, DownloadFileOption};
use directories::ProjectDirs;
use futures::{stream::FuturesUnordered, StreamExt};
use itertools::Itertools as _;
use tokio::sync::Semaphore;
use url::Url;

#[tokio::test(flavor = "multi_thread")]
Expand All @@ -172,10 +173,16 @@ mod tests {
.collect_vec();
let project_dir = ProjectDirs::from("com", "ChainSafe", "Forest").unwrap();
let cache_dir = project_dir.cache_dir().join("test").join("rpc-snapshots");
let semaphore = Arc::new(Semaphore::new(4));
let mut tasks = FuturesUnordered::from_iter(urls.into_iter().map(|(filename, url)| {
let cache_dir = cache_dir.clone();
let semaphore = semaphore.clone();
async move {
let result = download_file_with_cache(&url, &cache_dir).await.unwrap();
let _permit = semaphore.acquire().await.unwrap();
let result =
download_file_with_cache(&url, &cache_dir, DownloadFileOption::NonResumable)
.await
.unwrap();
(filename, result.path)
}
}));
Expand Down
1 change: 0 additions & 1 deletion src/tool/subcommands/state_migration_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ impl StateMigrationCommands {
let metadata = get_actor_bundles_metadata().await?;
let metadata_json = serde_json::to_string_pretty(&metadata)?;
println!("{}", metadata_json);

Ok(())
}
}
Expand Down
39 changes: 28 additions & 11 deletions src/utils/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ pub async fn download_ipfs_file_trustlessly(
tempfile::NamedTempFile::new_in(destination.parent().unwrap_or_else(|| Path::new(".")))?
.into_temp_path();
{
let mut reader = reader(url.as_str()).await?.compat();
let mut reader = reader(url.as_str(), DownloadFileOption::Resumable)
.await?
.compat();
let mut writer = futures::io::BufWriter::new(async_fs::File::create(&tmp).await?);
let cid_v10 = crate::utils::cid::cid_11_to_10(cid);
rs_car_ipfs::single_file::read_single_file_seek(&mut reader, &mut writer, Some(&cid_v10))
Expand All @@ -63,22 +65,37 @@ pub async fn download_ipfs_file_trustlessly(
/// - uncompressed
///
/// This function returns a reader of uncompressed data.
pub async fn reader(location: &str) -> anyhow::Result<impl AsyncBufRead> {
pub async fn reader(
location: &str,
option: DownloadFileOption,
) -> anyhow::Result<impl AsyncBufRead> {
// This isn't the cleanest approach in terms of error-handling, but it works. If the URL is
// malformed it'll end up trying to treat it as a local filepath. If that fails - an error
// is thrown.
let (stream, content_length) = match Url::parse(location) {
Ok(url) => {
info!("Downloading file: {}", url);
let resume_resp = reqwest_resume::get(url).await?;
let resp = resume_resp.response().error_for_status_ref()?;
let content_length = resp.content_length().unwrap_or_default();
let stream = resume_resp
.bytes_stream()
.map_err(std::io::Error::other)
.pipe(tokio_util::io::StreamReader::new);

(Left(stream), content_length)
match option {
DownloadFileOption::Resumable => {
let resume_resp = reqwest_resume::get(url).await?;
let resp = resume_resp.response().error_for_status_ref()?;
let content_length = resp.content_length().unwrap_or_default();
let stream = resume_resp
.bytes_stream()
.map_err(std::io::Error::other)
.pipe(tokio_util::io::StreamReader::new);
(Left(Left(stream)), content_length)
}
DownloadFileOption::NonResumable => {
let resp = reqwest::get(url).await?;
let content_length = resp.content_length().unwrap_or_default();
let stream = resp
.bytes_stream()
.map_err(std::io::Error::other)
.pipe(tokio_util::io::StreamReader::new);
(Left(Right(stream)), content_length)
}
}
}
Err(_) => {
info!("Reading file: {}", location);
Expand Down
49 changes: 36 additions & 13 deletions src/utils/net/download_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ use std::{
};
use url::Url;

#[derive(Debug, Copy, Clone)]
pub enum DownloadFileOption {
NonResumable,
Resumable,
}

#[derive(Debug, Clone)]
pub struct DownloadFileResult {
pub path: PathBuf,
Expand All @@ -23,6 +29,7 @@ pub struct DownloadFileResult {
pub async fn download_file_with_cache(
url: &Url,
cache_dir: &Path,
option: DownloadFileOption,
) -> anyhow::Result<DownloadFileResult> {
let cache_file_path =
cache_dir.join(url.path().strip_prefix('/').unwrap_or_else(|| url.path()));
Expand All @@ -38,8 +45,8 @@ pub async fn download_file_with_cache(
if file_md5 == url_md5 {
true
} else {
println!(
"md5 hash mismatch, url: {url}, local: {}, remote: {}",
tracing::warn!(
"download again due to md5 hash mismatch, url: {url}, local cache: {}, remote: {}",
hex::encode(&file_md5),
hex::encode(&url_md5)
);
Expand All @@ -53,7 +60,9 @@ pub async fn download_file_with_cache(
None => false,
};

if !cache_hit {
if cache_hit {
tracing::debug!(%url, "loaded from cache");
} else {
download_file_with_retry(
url,
cache_file_path.parent().unwrap_or_else(|| Path::new(".")),
Expand All @@ -66,6 +75,7 @@ pub async fn download_file_with_cache(
cache_file_path.display()
)
})?,
option,
)
.await?;
}
Expand Down Expand Up @@ -118,14 +128,19 @@ async fn get_content_md5_hash_from_url(url: Url) -> anyhow::Result<Option<Vec<u8
}

/// Download the file at `url` with a private HTTP client, returning the path to the downloaded file
pub async fn download_http(url: &Url, directory: &Path, filename: &str) -> anyhow::Result<PathBuf> {
pub async fn download_http(
url: &Url,
directory: &Path,
filename: &str,
option: DownloadFileOption,
) -> anyhow::Result<PathBuf> {
if !directory.is_dir() {
std::fs::create_dir_all(directory)?;
}
let dst_path = directory.join(filename);
let destination = dst_path.display();
tracing::info!(%url, %destination, "downloading snapshot");
let mut reader = crate::utils::net::reader(url.as_str()).await?;
let mut reader = crate::utils::net::reader(url.as_str(), option).await?;
let tmp_dst_path = {
// like `crdownload` for the chrome browser
const DOWNLOAD_EXTENSION: &str = "frdownload";
Expand Down Expand Up @@ -155,18 +170,23 @@ pub async fn download_file_with_retry(
url: &Url,
directory: &Path,
filename: &str,
option: DownloadFileOption,
) -> anyhow::Result<PathBuf> {
Ok(retry(
RetryArgs {
timeout: None,
..Default::default()
},
|| download_http(url, directory, filename),
|| download_http(url, directory, filename, option),
)
.await?)
}

pub async fn download_to(url: &Url, destination: &Path) -> anyhow::Result<()> {
pub async fn download_to(
url: &Url,
destination: &Path,
option: DownloadFileOption,
) -> anyhow::Result<()> {
download_file_with_retry(
url,
destination.parent().with_context(|| {
Expand All @@ -179,6 +199,7 @@ pub async fn download_to(url: &Url, destination: &Path) -> anyhow::Result<()> {
.file_name()
.and_then(OsStr::to_str)
.with_context(|| format!("Error getting the file name of {}", destination.display()))?,
option,
)
.await?;

Expand Down Expand Up @@ -213,13 +234,15 @@ mod test {
async fn test_download_file_with_cache() {
let temp_dir = tempfile::tempdir().unwrap();
let url = "https://forest-snapshots.fra1.cdn.digitaloceanspaces.com/genesis/butterflynet-bafy2bzacecm7xklkq3hkc2kgm5wnb5shlxmffino6lzhh7lte5acytb7sssr4.car.zst".try_into().unwrap();
let result = download_file_with_cache(&url, temp_dir.path())
.await
.unwrap();
let result =
download_file_with_cache(&url, temp_dir.path(), DownloadFileOption::NonResumable)
.await
.unwrap();
assert!(!result.cache_hit);
let result = download_file_with_cache(&url, temp_dir.path())
.await
.unwrap();
let result =
download_file_with_cache(&url, temp_dir.path(), DownloadFileOption::NonResumable)
.await
.unwrap();
assert!(result.cache_hit);
}
}
Loading