Skip to content

Commit

Permalink
introduce DownloadFileOption
Browse files Browse the repository at this point in the history
  • Loading branch information
hanabi1224 committed Feb 13, 2025
1 parent 1fa175e commit 5020fce
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 31 deletions.
4 changes: 2 additions & 2 deletions src/cli_shared/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::{
str::FromStr,
};

use crate::networks::NetworkChain;
use crate::{cli_shared::snapshot::parse::ParsedFilename, utils::net::download_file_with_retry};
use crate::{networks::NetworkChain, utils::net::DownloadFileOption};
use anyhow::{bail, Context as _};
use chrono::NaiveDate;
use url::Url;
Expand Down Expand Up @@ -67,7 +67,7 @@ pub async fn fetch(
.date_and_height_and_forest();
let filename = filename(vendor, chain, date, height, forest_format);

download_file_with_retry(&url, directory, &filename).await
download_file_with_retry(&url, directory, &filename, DownloadFileOption::Resumable).await
}

/// Returns
Expand Down
6 changes: 3 additions & 3 deletions src/daemon/bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::db::PersistentStore;
use crate::utils::net::download_file_with_cache;
use crate::utils::net::{download_file_with_cache, DownloadFileOption};
use crate::{
networks::{ActorBundleInfo, NetworkChain, ACTOR_BUNDLES},
utils::db::car_stream::{CarBlock, CarStream},
Expand Down Expand Up @@ -105,12 +105,12 @@ pub async fn load_actor_bundles_from_server(
version,
}| async move {
let result = if let Ok(response) =
download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR).await
download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::Default).await
{
response
} else {
warn!("failed to download bundle {network}-{version} from primary URL, trying alternative URL");
download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR).await?
download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::Default).await?
};

let bytes = std::fs::read(&result.path)?;
Expand Down
9 changes: 7 additions & 2 deletions src/daemon/db_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::networks::Height;
use crate::state_manager::StateManager;
use crate::utils::db::car_stream::CarStream;
use crate::utils::io::EitherMmapOrRandomAccessFile;
use crate::utils::net::download_to;
use crate::utils::net::{download_to, DownloadFileOption};
use anyhow::{bail, Context};
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -108,7 +108,12 @@ pub async fn import_chain_as_forest_car(
let downloaded_car_temp_path =
tempfile::NamedTempFile::new_in(forest_car_db_dir)?.into_temp_path();
if let Ok(url) = Url::parse(&from_path.display().to_string()) {
download_to(&url, &downloaded_car_temp_path).await?;
download_to(
&url,
&downloaded_car_temp_path,
DownloadFileOption::Resumable,
)
.await?;
} else {
move_or_copy_file(from_path, &downloaded_car_temp_path, mode)?;
}
Expand Down
6 changes: 3 additions & 3 deletions src/networks/actors_bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tracing::warn;
use crate::daemon::bundle::{load_actor_bundles_from_server, ACTOR_BUNDLE_CACHE_DIR};
use crate::shim::machine::BuiltinActorManifest;
use crate::utils::db::car_stream::{CarStream, CarWriter};
use crate::utils::net::download_file_with_cache;
use crate::utils::net::{download_file_with_cache, DownloadFileOption};

use std::str::FromStr;

Expand Down Expand Up @@ -174,14 +174,14 @@ pub async fn generate_actor_bundle(output: &Path) -> anyhow::Result<()> {
version,
}| async move {
let result = if let Ok(response) =
download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR).await
download_file_with_cache(url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::Default).await
{
response
} else {
warn!(
"failed to download bundle {network}-{version} from primary URL, trying alternative URL"
);
download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR).await?
download_file_with_cache(alt_url, &ACTOR_BUNDLE_CACHE_DIR, DownloadFileOption::Default).await?
};

let bytes = std::fs::read(&result.path)?;
Expand Down
9 changes: 7 additions & 2 deletions src/tool/offline_server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::rpc::eth::filter::EthEventHandler;
use crate::rpc::{start_rpc, RPCState};
use crate::shim::address::{CurrentNetwork, Network};
use crate::state_manager::StateManager;
use crate::utils::net::download_to;
use crate::utils::net::{download_to, DownloadFileOption};
use crate::JWT_IDENTIFIER;
use anyhow::Context as _;
use fvm_ipld_blockstore::Blockstore;
Expand Down Expand Up @@ -223,7 +223,12 @@ async fn handle_snapshots(
indicatif::HumanBytes(num_bytes)
);
let downloaded_snapshot_path = std::env::current_dir()?.join(path);
download_to(&snapshot_url, &downloaded_snapshot_path).await?;
download_to(
&snapshot_url,
&downloaded_snapshot_path,
DownloadFileOption::Resumable,
)
.await?;
info!("Snapshot downloaded");
Ok(vec![downloaded_snapshot_path])
}
Expand Down
7 changes: 5 additions & 2 deletions src/tool/subcommands/api_cmd/test_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ async fn ctx(
#[cfg(test)]
mod tests {
use super::*;
use crate::utils::net::download_file_with_cache;
use crate::utils::net::{download_file_with_cache, DownloadFileOption};
use directories::ProjectDirs;
use futures::{stream::FuturesUnordered, StreamExt};
use itertools::Itertools as _;
Expand Down Expand Up @@ -175,7 +175,10 @@ mod tests {
let mut tasks = FuturesUnordered::from_iter(urls.into_iter().map(|(filename, url)| {
let cache_dir = cache_dir.clone();
async move {
let result = download_file_with_cache(&url, &cache_dir).await.unwrap();
let result =
download_file_with_cache(&url, &cache_dir, DownloadFileOption::Default)
.await
.unwrap();
(filename, result.path)
}
}));
Expand Down
39 changes: 28 additions & 11 deletions src/utils/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ pub async fn download_ipfs_file_trustlessly(
tempfile::NamedTempFile::new_in(destination.parent().unwrap_or_else(|| Path::new(".")))?
.into_temp_path();
{
let mut reader = reader(url.as_str()).await?.compat();
let mut reader = reader(url.as_str(), DownloadFileOption::Resumable)
.await?
.compat();
let mut writer = futures::io::BufWriter::new(async_fs::File::create(&tmp).await?);
let cid_v10 = crate::utils::cid::cid_11_to_10(cid);
rs_car_ipfs::single_file::read_single_file_seek(&mut reader, &mut writer, Some(&cid_v10))
Expand All @@ -63,22 +65,37 @@ pub async fn download_ipfs_file_trustlessly(
/// - uncompressed
///
/// This function returns a reader of uncompressed data.
pub async fn reader(location: &str) -> anyhow::Result<impl AsyncBufRead> {
pub async fn reader(
location: &str,
option: DownloadFileOption,
) -> anyhow::Result<impl AsyncBufRead> {
// This isn't the cleanest approach in terms of error-handling, but it works. If the URL is
// malformed it'll end up trying to treat it as a local filepath. If that fails - an error
// is thrown.
let (stream, content_length) = match Url::parse(location) {
Ok(url) => {
info!("Downloading file: {}", url);
let resume_resp = reqwest_resume::get(url).await?;
let resp = resume_resp.response().error_for_status_ref()?;
let content_length = resp.content_length().unwrap_or_default();
let stream = resume_resp
.bytes_stream()
.map_err(std::io::Error::other)
.pipe(tokio_util::io::StreamReader::new);

(Left(stream), content_length)
match option {
DownloadFileOption::Resumable => {
let resume_resp = reqwest_resume::get(url).await?;
let resp = resume_resp.response().error_for_status_ref()?;
let content_length = resp.content_length().unwrap_or_default();
let stream = resume_resp
.bytes_stream()
.map_err(std::io::Error::other)
.pipe(tokio_util::io::StreamReader::new);
(Left(Left(stream)), content_length)
}
DownloadFileOption::Default => {
let resp = reqwest::get(url).await?;
let content_length = resp.content_length().unwrap_or_default();
let stream = resp
.bytes_stream()
.map_err(std::io::Error::other)
.pipe(tokio_util::io::StreamReader::new);
(Left(Right(stream)), content_length)
}
}
}
Err(_) => {
info!("Reading file: {}", location);
Expand Down
32 changes: 26 additions & 6 deletions src/utils/net/download_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ use std::{
};
use url::Url;

#[derive(Debug, Copy, Clone)]
pub enum DownloadFileOption {
/// Non-resumable
Default,
Resumable,
}

#[derive(Debug, Clone)]
pub struct DownloadFileResult {
pub path: PathBuf,
Expand All @@ -23,6 +30,7 @@ pub struct DownloadFileResult {
pub async fn download_file_with_cache(
url: &Url,
cache_dir: &Path,
option: DownloadFileOption,
) -> anyhow::Result<DownloadFileResult> {
let cache_file_path =
cache_dir.join(url.path().strip_prefix('/').unwrap_or_else(|| url.path()));
Expand Down Expand Up @@ -68,6 +76,7 @@ pub async fn download_file_with_cache(
cache_file_path.display()
)
})?,
option,
)
.await?;
}
Expand Down Expand Up @@ -120,14 +129,19 @@ async fn get_content_md5_hash_from_url(url: Url) -> anyhow::Result<Option<Vec<u8
}

/// Download the file at `url` with a private HTTP client, returning the path to the downloaded file
pub async fn download_http(url: &Url, directory: &Path, filename: &str) -> anyhow::Result<PathBuf> {
pub async fn download_http(
url: &Url,
directory: &Path,
filename: &str,
option: DownloadFileOption,
) -> anyhow::Result<PathBuf> {
if !directory.is_dir() {
std::fs::create_dir_all(directory)?;
}
let dst_path = directory.join(filename);
let destination = dst_path.display();
tracing::info!(%url, %destination, "downloading snapshot");
let mut reader = crate::utils::net::reader(url.as_str()).await?;
let mut reader = crate::utils::net::reader(url.as_str(), option).await?;
let tmp_dst_path = {
// like `crdownload` for the chrome browser
const DOWNLOAD_EXTENSION: &str = "frdownload";
Expand Down Expand Up @@ -157,18 +171,23 @@ pub async fn download_file_with_retry(
url: &Url,
directory: &Path,
filename: &str,
option: DownloadFileOption,
) -> anyhow::Result<PathBuf> {
Ok(retry(
RetryArgs {
timeout: None,
..Default::default()
},
|| download_http(url, directory, filename),
|| download_http(url, directory, filename, option),
)
.await?)
}

pub async fn download_to(url: &Url, destination: &Path) -> anyhow::Result<()> {
pub async fn download_to(
url: &Url,
destination: &Path,
option: DownloadFileOption,
) -> anyhow::Result<()> {
download_file_with_retry(
url,
destination.parent().with_context(|| {
Expand All @@ -181,6 +200,7 @@ pub async fn download_to(url: &Url, destination: &Path) -> anyhow::Result<()> {
.file_name()
.and_then(OsStr::to_str)
.with_context(|| format!("Error getting the file name of {}", destination.display()))?,
option,
)
.await?;

Expand Down Expand Up @@ -215,11 +235,11 @@ mod test {
async fn test_download_file_with_cache() {
let temp_dir = tempfile::tempdir().unwrap();
let url = "https://forest-snapshots.fra1.cdn.digitaloceanspaces.com/genesis/butterflynet-bafy2bzacecm7xklkq3hkc2kgm5wnb5shlxmffino6lzhh7lte5acytb7sssr4.car.zst".try_into().unwrap();
let result = download_file_with_cache(&url, temp_dir.path())
let result = download_file_with_cache(&url, temp_dir.path(), DownloadFileOption::Default)
.await
.unwrap();
assert!(!result.cache_hit);
let result = download_file_with_cache(&url, temp_dir.path())
let result = download_file_with_cache(&url, temp_dir.path(), DownloadFileOption::Default)
.await
.unwrap();
assert!(result.cache_hit);
Expand Down

0 comments on commit 5020fce

Please sign in to comment.