Skip to content

Commit

Permalink
Cache downloaded buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
leighmcculloch committed Jun 11, 2024
1 parent 27df088 commit 4072f18
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 34 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/soroban-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ futures = "0.3.30"
home = "0.5.9"
flate2 = "1.0.30"
bytesize = "1.3.0"
io_tee = "0.1.1"

# For hyper-tls
[target.'cfg(unix)'.dependencies]
Expand Down
6 changes: 6 additions & 0 deletions cmd/soroban-cli/src/commands/config/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ pub fn spec_dir() -> Result<std::path::PathBuf, Error> {
Ok(dir)
}

pub fn bucket_dir() -> Result<std::path::PathBuf, Error> {
let dir = data_local_dir()?.join("bucket");
std::fs::create_dir_all(&dir)?;
Ok(dir)
}

pub fn write(action: Action, rpc_url: &Uri) -> Result<ulid::Ulid, Error> {
let data = Data {
action,
Expand Down
94 changes: 60 additions & 34 deletions cmd/soroban-cli/src/commands/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ use clap::{arg, Parser};
use flate2::bufread::GzDecoder;
use futures::TryStreamExt;
use http::Uri;
use io_tee::TeeReader;
use soroban_ledger_snapshot::LedgerSnapshot;
use std::{
collections::HashSet,
io::{self, BufReader},
fs::OpenOptions,
io::{self, BufReader, Read},
path::PathBuf,
str::FromStr,
};
Expand All @@ -24,7 +26,7 @@ use super::{
config::{self, locator},
network,
};
use crate::rpc;
use crate::{commands::config::data, rpc};

fn default_out_path() -> PathBuf {
PathBuf::new().join("snapshot.json")
Expand All @@ -48,8 +50,8 @@ pub struct Cmd {
/// Contract IDs to filter by.
#[arg(long = "wasm-hash", help_heading = "FILTERS")]
wasm_hashes: Vec<String>,
// #[command(flatten)]
// locator: locator::Args,
#[command(flatten)]
locator: locator::Args,
// #[command(flatten)]
// network: network::Args,
}
Expand Down Expand Up @@ -158,47 +160,71 @@ impl Cmd {
};

for (i, bucket) in buckets.iter().enumerate() {
let bucket_0 = &bucket[0..=1];
let bucket_1 = &bucket[2..=3];
let bucket_2 = &bucket[4..=5];
let bucket_url = format!(
"{BASE_URL}/bucket/{bucket_0}/{bucket_1}/{bucket_2}/bucket-{bucket}.xdr.gz"
);
print!("🪣 Downloading bucket {i} {bucket_url}");
let bucket_url = Uri::from_str(&bucket_url).unwrap();
let cache_path = data::bucket_dir()
.unwrap()
.join(format!("bucket-{bucket}.xdr"));

let (read, gz): (Box<dyn Read + Sync + Send>, bool) = if cache_path.exists() {
println!("🪣 Loading cached bucket {i} {bucket}");
let file = OpenOptions::new().read(true).open(&cache_path).unwrap();
(Box::new(file), false)
} else {
let bucket_0 = &bucket[0..=1];
let bucket_1 = &bucket[2..=3];
let bucket_2 = &bucket[4..=5];
let bucket_url = format!(
"{BASE_URL}/bucket/{bucket_0}/{bucket_1}/{bucket_2}/bucket-{bucket}.xdr.gz"
);
print!("🪣 Downloading bucket {i} {bucket_url}");
let bucket_url = Uri::from_str(&bucket_url).unwrap();

let https = hyper_tls::HttpsConnector::new();
let response = hyper::Client::builder()
.build::<_, hyper::Body>(https)
.get(bucket_url)
.await
.unwrap();
let https = hyper_tls::HttpsConnector::new();
let response = hyper::Client::builder()
.build::<_, hyper::Body>(https)
.get(bucket_url)
.await
.unwrap();

if let Some(val) = response.headers().get("Content-Length") {
if let Ok(str) = val.to_str() {
if let Ok(len) = str.parse::<u64>() {
print!(" ({})", ByteSize(len));
if let Some(val) = response.headers().get("Content-Length") {
if let Ok(str) = val.to_str() {
if let Ok(len) = str.parse::<u64>() {
print!(" ({})", ByteSize(len));
}
}
}
}
println!();
println!();

let read = tokio_util::io::SyncIoBridge::new(
response
.into_body()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
.into_async_read()
.compat(),
);
let read = tokio_util::io::SyncIoBridge::new(
response
.into_body()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
.into_async_read()
.compat(),
);
(Box::new(read), true)
};

let cache_path = cache_path.clone();
let account_ids = self.account_ids.clone();
let contract_ids = self.contract_ids.clone();
let wasm_hashes = self.wasm_hashes.clone();
(seen, snapshot) = tokio::task::spawn_blocking(move || {
let buf = BufReader::new(read);
let gz = GzDecoder::new(buf);
let buf = BufReader::new(gz);
let limited = &mut Limited::new(buf, Limits::none());
let read: Box<dyn Read + Sync + Send> = if gz {
let gz = GzDecoder::new(buf);
let buf = BufReader::new(gz);
let file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&cache_path)
.unwrap();
let tee = TeeReader::new(buf, file);
Box::new(tee)
} else {
Box::new(buf)
};
let limited = &mut Limited::new(read, Limits::none());
let sz = Frame::<BucketEntry>::read_xdr_iter(limited);
let mut count_saved = 0;
for entry in sz {
Expand Down

0 comments on commit 4072f18

Please sign in to comment.