Skip to content

Commit

Permalink
fix: use non-resumable downloading for actor bundles to avoid CI test…
Browse files Browse the repository at this point in the history
… flakyness (#5286)

Co-authored-by: Hubert <[email protected]>
  • Loading branch information
hanabi1224 and LesnyRumcajs authored Feb 13, 2025
1 parent 3f49d3f commit e28fe8f
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 56 deletions.
4 changes: 2 additions & 2 deletions src/cli_shared/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::{
str::FromStr,
};

use crate::networks::NetworkChain;
use crate::{cli_shared::snapshot::parse::ParsedFilename, utils::net::download_file_with_retry};
use crate::{networks::NetworkChain, utils::net::DownloadFileOption};
use anyhow::{bail, Context as _};
use chrono::NaiveDate;
use url::Url;
Expand Down Expand Up @@ -67,7 +67,7 @@ pub async fn fetch(
.date_and_height_and_forest();
let filename = filename(vendor, chain, date, height, forest_format);

download_file_with_retry(&url, directory, &filename).await
download_file_with_retry(&url, directory, &filename, DownloadFileOption::Resumable).await
}

/// Returns
Expand Down
44 changes: 25 additions & 19 deletions src/daemon/bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::db::PersistentStore;
use crate::utils::net::download_file_with_cache;
use crate::utils::net::{download_file_with_cache, DownloadFileOption};
use crate::{
networks::{ActorBundleInfo, NetworkChain, ACTOR_BUNDLES},
utils::db::car_stream::{CarBlock, CarStream},
Expand All @@ -14,8 +14,9 @@ use futures::{stream::FuturesUnordered, TryStreamExt};
use once_cell::sync::Lazy;
use std::mem::discriminant;
use std::path::PathBuf;
use std::sync::Arc;
use std::{io::Cursor, path::Path};
use tokio::io::BufReader;
use tokio::sync::Semaphore;
use tracing::{info, warn};

/// Tries to load the missing actor bundles to the blockstore. If the bundle is
Expand Down Expand Up @@ -88,6 +89,7 @@ pub async fn load_actor_bundles_from_server(
network: &NetworkChain,
bundles: &[ActorBundleInfo],
) -> anyhow::Result<Vec<Cid>> {
let semaphore = Arc::new(Semaphore::new(4));
FuturesUnordered::from_iter(
bundles
.iter()
Expand All @@ -104,25 +106,29 @@ pub async fn load_actor_bundles_from_server(
alt_url,
network,
version,
}| async move {
let result = if let Ok(response) =
download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR).await
{
response
} else {
warn!("failed to download bundle {network}-{version} from primary URL, trying alternative URL");
download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR).await?
};
}| {
let semaphore = semaphore.clone();
async move {
let _permit = semaphore.acquire().await?;
let result = if let Ok(response) =
download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::NonResumable).await
{
response
} else {
warn!("failed to download bundle {network}-{version} from primary URL, trying alternative URL");
download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::NonResumable).await?
};

let bytes = std::fs::read(&result.path)?;
let mut stream = CarStream::new(BufReader::new(Cursor::new(bytes))).await?;
while let Some(block) = stream.try_next().await? {
db.put_keyed_persistent(&block.cid, &block.data)?;
let bytes = std::fs::read(&result.path)?;
let mut stream = CarStream::new(Cursor::new(bytes)).await?;
while let Some(block) = stream.try_next().await? {
db.put_keyed_persistent(&block.cid, &block.data)?;
}
let header = stream.header_v1;
anyhow::ensure!(header.roots.len() == 1);
anyhow::ensure!(header.roots.first() == root);
Ok(*root)
}
let header = stream.header_v1;
anyhow::ensure!(header.roots.len() == 1);
anyhow::ensure!(header.roots.first() == root);
Ok(*header.roots.first())
}
),
)
Expand Down
9 changes: 7 additions & 2 deletions src/daemon/db_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::networks::Height;
use crate::state_manager::StateManager;
use crate::utils::db::car_stream::CarStream;
use crate::utils::io::EitherMmapOrRandomAccessFile;
use crate::utils::net::download_to;
use crate::utils::net::{download_to, DownloadFileOption};
use anyhow::{bail, Context};
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -108,7 +108,12 @@ pub async fn import_chain_as_forest_car(
let downloaded_car_temp_path =
tempfile::NamedTempFile::new_in(forest_car_db_dir)?.into_temp_path();
if let Ok(url) = Url::parse(&from_path.display().to_string()) {
download_to(&url, &downloaded_car_temp_path).await?;
download_to(
&url,
&downloaded_car_temp_path,
DownloadFileOption::Resumable,
)
.await?;
} else {
move_or_copy_file(from_path, &downloaded_car_temp_path, mode)?;
}
Expand Down
7 changes: 3 additions & 4 deletions src/networks/actors_bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tracing::warn;
use crate::daemon::bundle::{load_actor_bundles_from_server, ACTOR_BUNDLE_CACHE_DIR};
use crate::shim::machine::BuiltinActorManifest;
use crate::utils::db::car_stream::{CarStream, CarWriter};
use crate::utils::net::download_file_with_cache;
use crate::utils::net::{download_file_with_cache, DownloadFileOption};

use std::str::FromStr;

Expand Down Expand Up @@ -174,19 +174,18 @@ pub async fn generate_actor_bundle(output: &Path) -> anyhow::Result<()> {
version,
}| async move {
let result = if let Ok(response) =
download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR).await
download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::NonResumable).await
{
response
} else {
warn!(
"failed to download bundle {network}-{version} from primary URL, trying alternative URL"
);
download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR).await?
download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::NonResumable).await?
};

let bytes = std::fs::read(&result.path)?;
let car = CarStream::new(Cursor::new(bytes)).await?;
ensure!(car.header_v1.version == 1);
ensure!(car.header_v1.roots.len() == 1);
ensure!(car.header_v1.roots.first() == root);
anyhow::Ok((*root, car.try_collect::<Vec<_>>().await?))
Expand Down
9 changes: 7 additions & 2 deletions src/tool/offline_server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::rpc::eth::filter::EthEventHandler;
use crate::rpc::{start_rpc, RPCState};
use crate::shim::address::{CurrentNetwork, Network};
use crate::state_manager::StateManager;
use crate::utils::net::download_to;
use crate::utils::net::{download_to, DownloadFileOption};
use crate::JWT_IDENTIFIER;
use anyhow::Context as _;
use fvm_ipld_blockstore::Blockstore;
Expand Down Expand Up @@ -223,7 +223,12 @@ async fn handle_snapshots(
indicatif::HumanBytes(num_bytes)
);
let downloaded_snapshot_path = std::env::current_dir()?.join(path);
download_to(&snapshot_url, &downloaded_snapshot_path).await?;
download_to(
&snapshot_url,
&downloaded_snapshot_path,
DownloadFileOption::Resumable,
)
.await?;
info!("Snapshot downloaded");
Ok(vec![downloaded_snapshot_path])
}
Expand Down
11 changes: 9 additions & 2 deletions src/tool/subcommands/api_cmd/test_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,11 @@ async fn ctx(
#[cfg(test)]
mod tests {
use super::*;
use crate::utils::net::download_file_with_cache;
use crate::utils::net::{download_file_with_cache, DownloadFileOption};
use directories::ProjectDirs;
use futures::{stream::FuturesUnordered, StreamExt};
use itertools::Itertools as _;
use tokio::sync::Semaphore;
use url::Url;

#[tokio::test(flavor = "multi_thread")]
Expand All @@ -172,10 +173,16 @@ mod tests {
.collect_vec();
let project_dir = ProjectDirs::from("com", "ChainSafe", "Forest").unwrap();
let cache_dir = project_dir.cache_dir().join("test").join("rpc-snapshots");
let semaphore = Arc::new(Semaphore::new(4));
let mut tasks = FuturesUnordered::from_iter(urls.into_iter().map(|(filename, url)| {
let cache_dir = cache_dir.clone();
let semaphore = semaphore.clone();
async move {
let result = download_file_with_cache(&url, &cache_dir).await.unwrap();
let _permit = semaphore.acquire().await.unwrap();
let result =
download_file_with_cache(&url, &cache_dir, DownloadFileOption::NonResumable)
.await
.unwrap();
(filename, result.path)
}
}));
Expand Down
1 change: 0 additions & 1 deletion src/tool/subcommands/state_migration_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ impl StateMigrationCommands {
let metadata = get_actor_bundles_metadata().await?;
let metadata_json = serde_json::to_string_pretty(&metadata)?;
println!("{}", metadata_json);

Ok(())
}
}
Expand Down
39 changes: 28 additions & 11 deletions src/utils/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ pub async fn download_ipfs_file_trustlessly(
tempfile::NamedTempFile::new_in(destination.parent().unwrap_or_else(|| Path::new(".")))?
.into_temp_path();
{
let mut reader = reader(url.as_str()).await?.compat();
let mut reader = reader(url.as_str(), DownloadFileOption::Resumable)
.await?
.compat();
let mut writer = futures::io::BufWriter::new(async_fs::File::create(&tmp).await?);
let cid_v10 = crate::utils::cid::cid_11_to_10(cid);
rs_car_ipfs::single_file::read_single_file_seek(&mut reader, &mut writer, Some(&cid_v10))
Expand All @@ -63,22 +65,37 @@ pub async fn download_ipfs_file_trustlessly(
/// - uncompressed
///
/// This function returns a reader of uncompressed data.
pub async fn reader(location: &str) -> anyhow::Result<impl AsyncBufRead> {
pub async fn reader(
location: &str,
option: DownloadFileOption,
) -> anyhow::Result<impl AsyncBufRead> {
// This isn't the cleanest approach in terms of error-handling, but it works. If the URL is
// malformed it'll end up trying to treat it as a local filepath. If that fails - an error
// is thrown.
let (stream, content_length) = match Url::parse(location) {
Ok(url) => {
info!("Downloading file: {}", url);
let resume_resp = reqwest_resume::get(url).await?;
let resp = resume_resp.response().error_for_status_ref()?;
let content_length = resp.content_length().unwrap_or_default();
let stream = resume_resp
.bytes_stream()
.map_err(std::io::Error::other)
.pipe(tokio_util::io::StreamReader::new);

(Left(stream), content_length)
match option {
DownloadFileOption::Resumable => {
let resume_resp = reqwest_resume::get(url).await?;
let resp = resume_resp.response().error_for_status_ref()?;
let content_length = resp.content_length().unwrap_or_default();
let stream = resume_resp
.bytes_stream()
.map_err(std::io::Error::other)
.pipe(tokio_util::io::StreamReader::new);
(Left(Left(stream)), content_length)
}
DownloadFileOption::NonResumable => {
let resp = reqwest::get(url).await?;
let content_length = resp.content_length().unwrap_or_default();
let stream = resp
.bytes_stream()
.map_err(std::io::Error::other)
.pipe(tokio_util::io::StreamReader::new);
(Left(Right(stream)), content_length)
}
}
}
Err(_) => {
info!("Reading file: {}", location);
Expand Down
49 changes: 36 additions & 13 deletions src/utils/net/download_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ use std::{
};
use url::Url;

#[derive(Debug, Copy, Clone)]
pub enum DownloadFileOption {
NonResumable,
Resumable,
}

#[derive(Debug, Clone)]
pub struct DownloadFileResult {
pub path: PathBuf,
Expand All @@ -23,6 +29,7 @@ pub struct DownloadFileResult {
pub async fn download_file_with_cache(
url: &Url,
cache_dir: &Path,
option: DownloadFileOption,
) -> anyhow::Result<DownloadFileResult> {
let cache_file_path =
cache_dir.join(url.path().strip_prefix('/').unwrap_or_else(|| url.path()));
Expand All @@ -38,8 +45,8 @@ pub async fn download_file_with_cache(
if file_md5 == url_md5 {
true
} else {
println!(
"md5 hash mismatch, url: {url}, local: {}, remote: {}",
tracing::warn!(
"download again due to md5 hash mismatch, url: {url}, local cache: {}, remote: {}",
hex::encode(&file_md5),
hex::encode(&url_md5)
);
Expand All @@ -53,7 +60,9 @@ pub async fn download_file_with_cache(
None => false,
};

if !cache_hit {
if cache_hit {
tracing::debug!(%url, "loaded from cache");
} else {
download_file_with_retry(
url,
cache_file_path.parent().unwrap_or_else(|| Path::new(".")),
Expand All @@ -66,6 +75,7 @@ pub async fn download_file_with_cache(
cache_file_path.display()
)
})?,
option,
)
.await?;
}
Expand Down Expand Up @@ -118,14 +128,19 @@ async fn get_content_md5_hash_from_url(url: Url) -> anyhow::Result<Option<Vec<u8
}

/// Download the file at `url` with a private HTTP client, returning the path to the downloaded file
pub async fn download_http(url: &Url, directory: &Path, filename: &str) -> anyhow::Result<PathBuf> {
pub async fn download_http(
url: &Url,
directory: &Path,
filename: &str,
option: DownloadFileOption,
) -> anyhow::Result<PathBuf> {
if !directory.is_dir() {
std::fs::create_dir_all(directory)?;
}
let dst_path = directory.join(filename);
let destination = dst_path.display();
tracing::info!(%url, %destination, "downloading snapshot");
let mut reader = crate::utils::net::reader(url.as_str()).await?;
let mut reader = crate::utils::net::reader(url.as_str(), option).await?;
let tmp_dst_path = {
// like `crdownload` for the chrome browser
const DOWNLOAD_EXTENSION: &str = "frdownload";
Expand Down Expand Up @@ -155,18 +170,23 @@ pub async fn download_file_with_retry(
url: &Url,
directory: &Path,
filename: &str,
option: DownloadFileOption,
) -> anyhow::Result<PathBuf> {
Ok(retry(
RetryArgs {
timeout: None,
..Default::default()
},
|| download_http(url, directory, filename),
|| download_http(url, directory, filename, option),
)
.await?)
}

pub async fn download_to(url: &Url, destination: &Path) -> anyhow::Result<()> {
pub async fn download_to(
url: &Url,
destination: &Path,
option: DownloadFileOption,
) -> anyhow::Result<()> {
download_file_with_retry(
url,
destination.parent().with_context(|| {
Expand All @@ -179,6 +199,7 @@ pub async fn download_to(url: &Url, destination: &Path) -> anyhow::Result<()> {
.file_name()
.and_then(OsStr::to_str)
.with_context(|| format!("Error getting the file name of {}", destination.display()))?,
option,
)
.await?;

Expand Down Expand Up @@ -213,13 +234,15 @@ mod test {
async fn test_download_file_with_cache() {
let temp_dir = tempfile::tempdir().unwrap();
let url = "https://forest-snapshots.fra1.cdn.digitaloceanspaces.com/genesis/butterflynet-bafy2bzacecm7xklkq3hkc2kgm5wnb5shlxmffino6lzhh7lte5acytb7sssr4.car.zst".try_into().unwrap();
let result = download_file_with_cache(&url, temp_dir.path())
.await
.unwrap();
let result =
download_file_with_cache(&url, temp_dir.path(), DownloadFileOption::NonResumable)
.await
.unwrap();
assert!(!result.cache_hit);
let result = download_file_with_cache(&url, temp_dir.path())
.await
.unwrap();
let result =
download_file_with_cache(&url, temp_dir.path(), DownloadFileOption::NonResumable)
.await
.unwrap();
assert!(result.cache_hit);
}
}

0 comments on commit e28fe8f

Please sign in to comment.