Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use non-resumable downloading for actor bundles to avoid CI test flakyness #5286

Merged
merged 8 commits into from
Feb 13, 2025
4 changes: 2 additions & 2 deletions src/cli_shared/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::{
str::FromStr,
};

use crate::networks::NetworkChain;
use crate::{cli_shared::snapshot::parse::ParsedFilename, utils::net::download_file_with_retry};
use crate::{networks::NetworkChain, utils::net::DownloadFileOption};
use anyhow::{bail, Context as _};
use chrono::NaiveDate;
use url::Url;
Expand Down Expand Up @@ -67,7 +67,7 @@ pub async fn fetch(
.date_and_height_and_forest();
let filename = filename(vendor, chain, date, height, forest_format);

download_file_with_retry(&url, directory, &filename).await
download_file_with_retry(&url, directory, &filename, DownloadFileOption::Resumable).await
}

/// Returns
Expand Down
11 changes: 5 additions & 6 deletions src/daemon/bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::db::PersistentStore;
use crate::utils::net::download_file_with_cache;
use crate::utils::net::{download_file_with_cache, DownloadFileOption};
use crate::{
networks::{ActorBundleInfo, NetworkChain, ACTOR_BUNDLES},
utils::db::car_stream::{CarBlock, CarStream},
Expand All @@ -15,7 +15,6 @@ use once_cell::sync::Lazy;
use std::mem::discriminant;
use std::path::PathBuf;
use std::{io::Cursor, path::Path};
use tokio::io::BufReader;
use tracing::{info, warn};

/// Tries to load the missing actor bundles to the blockstore. If the bundle is
Expand Down Expand Up @@ -106,23 +105,23 @@ pub async fn load_actor_bundles_from_server(
version,
}| async move {
let result = if let Ok(response) =
download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR).await
download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::Default).await
{
response
} else {
warn!("failed to download bundle {network}-{version} from primary URL, trying alternative URL");
download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR).await?
download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::Default).await?
};

let bytes = std::fs::read(&result.path)?;
let mut stream = CarStream::new(BufReader::new(Cursor::new(bytes))).await?;
let mut stream = CarStream::new(Cursor::new(bytes)).await?;
while let Some(block) = stream.try_next().await? {
db.put_keyed_persistent(&block.cid, &block.data)?;
}
let header = stream.header_v1;
anyhow::ensure!(header.roots.len() == 1);
anyhow::ensure!(header.roots.first() == root);
Ok(*header.roots.first())
Ok(*root)
}
),
)
Expand Down
9 changes: 7 additions & 2 deletions src/daemon/db_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::networks::Height;
use crate::state_manager::StateManager;
use crate::utils::db::car_stream::CarStream;
use crate::utils::io::EitherMmapOrRandomAccessFile;
use crate::utils::net::download_to;
use crate::utils::net::{download_to, DownloadFileOption};
use anyhow::{bail, Context};
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -108,7 +108,12 @@ pub async fn import_chain_as_forest_car(
let downloaded_car_temp_path =
tempfile::NamedTempFile::new_in(forest_car_db_dir)?.into_temp_path();
if let Ok(url) = Url::parse(&from_path.display().to_string()) {
download_to(&url, &downloaded_car_temp_path).await?;
download_to(
&url,
&downloaded_car_temp_path,
DownloadFileOption::Resumable,
)
.await?;
} else {
move_or_copy_file(from_path, &downloaded_car_temp_path, mode)?;
}
Expand Down
7 changes: 3 additions & 4 deletions src/networks/actors_bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tracing::warn;
use crate::daemon::bundle::{load_actor_bundles_from_server, ACTOR_BUNDLE_CACHE_DIR};
use crate::shim::machine::BuiltinActorManifest;
use crate::utils::db::car_stream::{CarStream, CarWriter};
use crate::utils::net::download_file_with_cache;
use crate::utils::net::{download_file_with_cache, DownloadFileOption};

use std::str::FromStr;

Expand Down Expand Up @@ -174,19 +174,18 @@ pub async fn generate_actor_bundle(output: &Path) -> anyhow::Result<()> {
version,
}| async move {
let result = if let Ok(response) =
download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR).await
download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::Default).await
{
response
} else {
warn!(
"failed to download bundle {network}-{version} from primary URL, trying alternative URL"
);
download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR).await?
download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::Default).await?
};

let bytes = std::fs::read(&result.path)?;
let car = CarStream::new(Cursor::new(bytes)).await?;
ensure!(car.header_v1.version == 1);
ensure!(car.header_v1.roots.len() == 1);
ensure!(car.header_v1.roots.first() == root);
anyhow::Ok((*root, car.try_collect::<Vec<_>>().await?))
Expand Down
9 changes: 7 additions & 2 deletions src/tool/offline_server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::rpc::eth::filter::EthEventHandler;
use crate::rpc::{start_rpc, RPCState};
use crate::shim::address::{CurrentNetwork, Network};
use crate::state_manager::StateManager;
use crate::utils::net::download_to;
use crate::utils::net::{download_to, DownloadFileOption};
use crate::JWT_IDENTIFIER;
use anyhow::Context as _;
use fvm_ipld_blockstore::Blockstore;
Expand Down Expand Up @@ -223,7 +223,12 @@ async fn handle_snapshots(
indicatif::HumanBytes(num_bytes)
);
let downloaded_snapshot_path = std::env::current_dir()?.join(path);
download_to(&snapshot_url, &downloaded_snapshot_path).await?;
download_to(
&snapshot_url,
&downloaded_snapshot_path,
DownloadFileOption::Resumable,
)
.await?;
info!("Snapshot downloaded");
Ok(vec![downloaded_snapshot_path])
}
Expand Down
7 changes: 5 additions & 2 deletions src/tool/subcommands/api_cmd/test_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ async fn ctx(
#[cfg(test)]
mod tests {
use super::*;
use crate::utils::net::download_file_with_cache;
use crate::utils::net::{download_file_with_cache, DownloadFileOption};
use directories::ProjectDirs;
use futures::{stream::FuturesUnordered, StreamExt};
use itertools::Itertools as _;
Expand Down Expand Up @@ -175,7 +175,10 @@ mod tests {
let mut tasks = FuturesUnordered::from_iter(urls.into_iter().map(|(filename, url)| {
let cache_dir = cache_dir.clone();
async move {
let result = download_file_with_cache(&url, &cache_dir).await.unwrap();
let result =
download_file_with_cache(&url, &cache_dir, DownloadFileOption::Default)
.await
.unwrap();
(filename, result.path)
}
}));
Expand Down
1 change: 0 additions & 1 deletion src/tool/subcommands/state_migration_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ impl StateMigrationCommands {
let metadata = get_actor_bundles_metadata().await?;
let metadata_json = serde_json::to_string_pretty(&metadata)?;
println!("{}", metadata_json);

Ok(())
}
}
Expand Down
39 changes: 28 additions & 11 deletions src/utils/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ pub async fn download_ipfs_file_trustlessly(
tempfile::NamedTempFile::new_in(destination.parent().unwrap_or_else(|| Path::new(".")))?
.into_temp_path();
{
let mut reader = reader(url.as_str()).await?.compat();
let mut reader = reader(url.as_str(), DownloadFileOption::Resumable)
.await?
.compat();
let mut writer = futures::io::BufWriter::new(async_fs::File::create(&tmp).await?);
let cid_v10 = crate::utils::cid::cid_11_to_10(cid);
rs_car_ipfs::single_file::read_single_file_seek(&mut reader, &mut writer, Some(&cid_v10))
Expand All @@ -63,22 +65,37 @@ pub async fn download_ipfs_file_trustlessly(
/// - uncompressed
///
/// This function returns a reader of uncompressed data.
pub async fn reader(location: &str) -> anyhow::Result<impl AsyncBufRead> {
pub async fn reader(
location: &str,
option: DownloadFileOption,
) -> anyhow::Result<impl AsyncBufRead> {
// This isn't the cleanest approach in terms of error-handling, but it works. If the URL is
// malformed it'll end up trying to treat it as a local filepath. If that fails - an error
// is thrown.
let (stream, content_length) = match Url::parse(location) {
Ok(url) => {
info!("Downloading file: {}", url);
let resume_resp = reqwest_resume::get(url).await?;
let resp = resume_resp.response().error_for_status_ref()?;
let content_length = resp.content_length().unwrap_or_default();
let stream = resume_resp
.bytes_stream()
.map_err(std::io::Error::other)
.pipe(tokio_util::io::StreamReader::new);

(Left(stream), content_length)
match option {
DownloadFileOption::Resumable => {
let resume_resp = reqwest_resume::get(url).await?;
let resp = resume_resp.response().error_for_status_ref()?;
let content_length = resp.content_length().unwrap_or_default();
let stream = resume_resp
.bytes_stream()
.map_err(std::io::Error::other)
.pipe(tokio_util::io::StreamReader::new);
(Left(Left(stream)), content_length)
}
DownloadFileOption::Default => {
let resp = reqwest::get(url).await?;
let content_length = resp.content_length().unwrap_or_default();
let stream = resp
.bytes_stream()
.map_err(std::io::Error::other)
.pipe(tokio_util::io::StreamReader::new);
(Left(Right(stream)), content_length)
}
}
}
Err(_) => {
info!("Reading file: {}", location);
Expand Down
40 changes: 31 additions & 9 deletions src/utils/net/download_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ use std::{
};
use url::Url;

#[derive(Debug, Copy, Clone)]
pub enum DownloadFileOption {
/// Non-resumable
hanabi1224 marked this conversation as resolved.
Show resolved Hide resolved
Default,
hanabi1224 marked this conversation as resolved.
Show resolved Hide resolved
Resumable,
}

#[derive(Debug, Clone)]
pub struct DownloadFileResult {
pub path: PathBuf,
Expand All @@ -23,6 +30,7 @@ pub struct DownloadFileResult {
pub async fn download_file_with_cache(
url: &Url,
cache_dir: &Path,
option: DownloadFileOption,
) -> anyhow::Result<DownloadFileResult> {
let cache_file_path =
cache_dir.join(url.path().strip_prefix('/').unwrap_or_else(|| url.path()));
Expand All @@ -38,8 +46,8 @@ pub async fn download_file_with_cache(
if file_md5 == url_md5 {
true
} else {
println!(
"md5 hash mismatch, url: {url}, local: {}, remote: {}",
tracing::warn!(
"download again due to md5 hash mismatch, url: {url}, local cache: {}, remote: {}",
hex::encode(&file_md5),
hex::encode(&url_md5)
);
Expand All @@ -53,7 +61,9 @@ pub async fn download_file_with_cache(
None => false,
};

if !cache_hit {
if cache_hit {
tracing::debug!(%url, "loaded from cache");
} else {
download_file_with_retry(
url,
cache_file_path.parent().unwrap_or_else(|| Path::new(".")),
Expand All @@ -66,6 +76,7 @@ pub async fn download_file_with_cache(
cache_file_path.display()
)
})?,
option,
)
.await?;
}
Expand Down Expand Up @@ -118,14 +129,19 @@ async fn get_content_md5_hash_from_url(url: Url) -> anyhow::Result<Option<Vec<u8
}

/// Download the file at `url` with a private HTTP client, returning the path to the downloaded file
pub async fn download_http(url: &Url, directory: &Path, filename: &str) -> anyhow::Result<PathBuf> {
pub async fn download_http(
url: &Url,
directory: &Path,
filename: &str,
option: DownloadFileOption,
) -> anyhow::Result<PathBuf> {
if !directory.is_dir() {
std::fs::create_dir_all(directory)?;
}
let dst_path = directory.join(filename);
let destination = dst_path.display();
tracing::info!(%url, %destination, "downloading snapshot");
let mut reader = crate::utils::net::reader(url.as_str()).await?;
let mut reader = crate::utils::net::reader(url.as_str(), option).await?;
let tmp_dst_path = {
// like `crdownload` for the chrome browser
const DOWNLOAD_EXTENSION: &str = "frdownload";
Expand Down Expand Up @@ -155,18 +171,23 @@ pub async fn download_file_with_retry(
url: &Url,
directory: &Path,
filename: &str,
option: DownloadFileOption,
) -> anyhow::Result<PathBuf> {
Ok(retry(
RetryArgs {
timeout: None,
..Default::default()
},
|| download_http(url, directory, filename),
|| download_http(url, directory, filename, option),
)
.await?)
}

pub async fn download_to(url: &Url, destination: &Path) -> anyhow::Result<()> {
pub async fn download_to(
url: &Url,
destination: &Path,
option: DownloadFileOption,
) -> anyhow::Result<()> {
download_file_with_retry(
url,
destination.parent().with_context(|| {
Expand All @@ -179,6 +200,7 @@ pub async fn download_to(url: &Url, destination: &Path) -> anyhow::Result<()> {
.file_name()
.and_then(OsStr::to_str)
.with_context(|| format!("Error getting the file name of {}", destination.display()))?,
option,
)
.await?;

Expand Down Expand Up @@ -213,11 +235,11 @@ mod test {
async fn test_download_file_with_cache() {
let temp_dir = tempfile::tempdir().unwrap();
let url = "https://forest-snapshots.fra1.cdn.digitaloceanspaces.com/genesis/butterflynet-bafy2bzacecm7xklkq3hkc2kgm5wnb5shlxmffino6lzhh7lte5acytb7sssr4.car.zst".try_into().unwrap();
let result = download_file_with_cache(&url, temp_dir.path())
let result = download_file_with_cache(&url, temp_dir.path(), DownloadFileOption::Default)
.await
.unwrap();
assert!(!result.cache_hit);
let result = download_file_with_cache(&url, temp_dir.path())
let result = download_file_with_cache(&url, temp_dir.path(), DownloadFileOption::Default)
.await
.unwrap();
assert!(result.cache_hit);
Expand Down
Loading