Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a command to fetch-all contracts from a network #1601

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions FULL_HELP_DOCS.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ Tools for smart contract developers
* `extend` — Extend the time to live ledger of a contract-data ledger entry
* `deploy` — Deploy a wasm contract
* `fetch` — Fetch a contract's Wasm binary
* `fetch-all` — Fetch all contract Wasms
* `id` — Generate the contract id for a given contract or asset
* `info` — Access info about contracts
* `init` — Initialize a Soroban project with an example contract
Expand Down Expand Up @@ -427,6 +428,24 @@ Fetch a contract's Wasm binary



## `stellar contract fetch-all`

Fetch all contract Wasms

**Usage:** `stellar contract fetch-all [OPTIONS]`

###### **Options:**

* `--ledger <LEDGER>` — The ledger sequence number to download from. Defaults to latest history archived ledger
* `--global` — Use global config
* `--config-dir <CONFIG_DIR>` — Location of config directory, default is "."
* `--rpc-url <RPC_URL>` — RPC server endpoint
* `--network-passphrase <NETWORK_PASSPHRASE>` — Network passphrase to sign the transaction sent to the rpc server
* `--network <NETWORK>` — Name of network to use from config
* `--archive-url <ARCHIVE_URL>` — Archive URL



## `stellar contract id`

Generate the contract id for a given contract or asset
Expand Down
324 changes: 324 additions & 0 deletions cmd/soroban-cli/src/commands/contract/fetch_all.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
use async_compression::tokio::bufread::GzipDecoder;
use bytesize::ByteSize;
use clap::{arg, Parser};
use futures::{StreamExt, TryStreamExt};
use http::Uri;
use humantime::format_duration;
use sha2::{Digest, Sha256};
use std::{
fs,
io::{self},
path::PathBuf,
str::FromStr,
time::{Duration, Instant},
};
use stellar_xdr::curr::{
self as xdr, BucketEntry, Frame, LedgerEntryData, Limited, Limits, ReadXdr,
};
use tokio::fs::OpenOptions;

use crate::{
commands::{config::data, global, HEADING_RPC},
config::{self, locator, network::passphrase},
print,
};

/// Fetch all contract wasms from a network to disk.
#[derive(Parser, Debug, Clone)]
#[group(skip)]
#[command(arg_required_else_help = true)]
pub struct Cmd {
/// The ledger sequence number to download from. Defaults to latest history archived ledger.
#[arg(long)]
ledger: Option<u32>,
#[command(flatten)]
locator: locator::Args,
#[command(flatten)]
network: config::network::Args,
/// Archive URL
#[arg(long, help_heading = HEADING_RPC, env = "STELLAR_ARCHIVE_URL")]
archive_url: Option<Uri>,
}

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("downloading history: {0}")]
DownloadingHistory(hyper::Error),
#[error("downloading history: got status code {0}")]
DownloadingHistoryGotStatusCode(hyper::StatusCode),
#[error("json decoding history: {0}")]
JsonDecodingHistory(serde_json::Error),
#[error("opening cached bucket to read: {0}")]
ReadOpeningCachedBucket(io::Error),
#[error("parsing bucket url: {0}")]
ParsingBucketUrl(http::uri::InvalidUri),
#[error("getting bucket: {0}")]
GettingBucket(hyper::Error),
#[error("getting bucket: got status code {0}")]
GettingBucketGotStatusCode(hyper::StatusCode),
#[error("opening cached bucket to write: {0}")]
WriteOpeningCachedBucket(io::Error),
#[error("streaming bucket: {0}")]
StreamingBucket(io::Error),
#[error("read XDR frame bucket entry: {0}")]
ReadXdrFrameBucketEntry(xdr::Error),
#[error("renaming temporary downloaded file to final destination: {0}")]
RenameDownloadFile(io::Error),
#[error("getting bucket directory: {0}")]
GetBucketDir(data::Error),
#[error("reading history http stream: {0}")]
ReadHistoryHttpStream(hyper::Error),
#[error(transparent)]
Network(#[from] config::network::Error),
#[error(transparent)]
Locator(#[from] locator::Error),
#[error(transparent)]
Config(#[from] config::Error),
#[error("archive url not configured")]
ArchiveUrlNotConfigured,
}

/// Checkpoint frequency is usually 64 ledgers, but in local test nets it'll
/// often by 8. There's no way to simply detect what frequency to expect ledgers
/// at, so it is hardcoded at 64, and this value is used only to help the user
/// select good ledger numbers when they select one that doesn't exist.
const CHECKPOINT_FREQUENCY: u32 = 64;

impl Cmd {
#[allow(clippy::too_many_lines)]
pub async fn run(&self, global_args: &global::Args) -> Result<(), Error> {
let print = print::Print::new(global_args.quiet);
let start = Instant::now();

let archive_url = self.archive_url()?;
let history = get_history(&print, &archive_url, self.ledger).await?;

let ledger = history.current_ledger;
let network_passphrase = &history.network_passphrase;
let network_id = Sha256::digest(network_passphrase);

print.infoln(format!("Ledger: {ledger}"));
print.infoln(format!("Network Passphrase: {network_passphrase}"));
print.infoln(format!("Network id: {}", hex::encode(network_id)));

// Prepare a flat list of buckets to read. They'll be ordered by their
// level so that they can iterated higher level to lower level.
let buckets = history
.current_buckets
.iter()
.flat_map(|h| [h.curr.clone(), h.snap.clone()])
.filter(|b| b != "0000000000000000000000000000000000000000000000000000000000000000")
.collect::<Vec<_>>();

// Pre-cache the buckets.
for (i, bucket) in buckets.iter().enumerate() {
cache_bucket(&print, &archive_url, i, bucket).await?;
}

{
print.infoln(format!("Searching for wasms"));

for (i, bucket) in buckets.iter().enumerate() {
// Defined where the bucket will be read from, either from cache on
// disk, or streamed from the archive.
let cache_path = cache_bucket(&print, &archive_url, i, bucket).await?;
let file = std::fs::OpenOptions::new()
.read(true)
.open(&cache_path)
.map_err(Error::ReadOpeningCachedBucket)?;

let message = format!("Searching bucket {i} {bucket}");
print.search(format!("{message}…"));

if let Ok(metadata) = file.metadata() {
print.clear_line();
print.searchln(format!("{message} ({})", ByteSize(metadata.len())));
}

// Stream the bucket entries from the bucket, identifying
// entries that match the filters, and including only the
// entries that match in the snapshot.
let limited = &mut Limited::new(file, Limits::none());
let entries = Frame::<BucketEntry>::read_xdr_iter(limited);
for entry in entries {
let Frame(entry) = entry.map_err(Error::ReadXdrFrameBucketEntry)?;
let val = match entry {
BucketEntry::Liveentry(l) | BucketEntry::Initentry(l) => l,
BucketEntry::Deadentry(_) | BucketEntry::Metaentry(_) => continue,
};
if let LedgerEntryData::ContractCode(c) = val.data {
let hash = c.hash.to_string();
let path = format!("{hash}.wasm");
let code = c.code.to_vec();
print.infoln(format!("Found {hash} ({})", ByteSize(code.len() as u64)));
fs::write(path, code).unwrap();
};
}
}
}

let duration = Duration::from_secs(start.elapsed().as_secs());
print.checkln(format!("Completed in {}", format_duration(duration)));

Ok(())
}

fn archive_url(&self) -> Result<http::Uri, Error> {
// Return the configured archive URL, or if one is not configured, guess
// at an appropriate archive URL given the network passphrase.
self.archive_url
.clone()
.or_else(|| {
self.network.get(&self.locator).ok().and_then(|network| {
match network.network_passphrase.as_str() {
passphrase::MAINNET => {
Some("https://history.stellar.org/prd/core-live/core_live_001")
}
passphrase::TESTNET => {
Some("https://history.stellar.org/prd/core-testnet/core_testnet_001")
}
passphrase::FUTURENET => Some("https://history-futurenet.stellar.org"),
passphrase::LOCAL => Some("http://localhost:8000/archive"),
_ => None,
}
.map(|s| Uri::from_str(s).expect("archive url valid"))
})
})
.ok_or(Error::ArchiveUrlNotConfigured)
}
}

async fn get_history(
print: &print::Print,
archive_url: &Uri,
ledger: Option<u32>,
) -> Result<History, Error> {
let archive_url = archive_url.to_string();
let archive_url = archive_url.strip_suffix('/').unwrap_or(&archive_url);
let history_url = if let Some(ledger) = ledger {
let ledger_hex = format!("{ledger:08x}");
let ledger_hex_0 = &ledger_hex[0..=1];
let ledger_hex_1 = &ledger_hex[2..=3];
let ledger_hex_2 = &ledger_hex[4..=5];
format!("{archive_url}/history/{ledger_hex_0}/{ledger_hex_1}/{ledger_hex_2}/history-{ledger_hex}.json")
} else {
format!("{archive_url}/.well-known/stellar-history.json")
};
let history_url = Uri::from_str(&history_url).unwrap();

print.globe(format!("Downloading history {history_url}"));

let https = hyper_tls::HttpsConnector::new();
let response = hyper::Client::builder()
.build::<_, hyper::Body>(https)
.get(history_url.clone())
.await
.map_err(Error::DownloadingHistory)?;

if !response.status().is_success() {
// Check ledger is a checkpoint ledger and available in archives.
if let Some(ledger) = ledger {
let ledger_offset = (ledger + 1) % CHECKPOINT_FREQUENCY;

if ledger_offset != 0 {
print.println("");
print.errorln(format!(
"Ledger {ledger} may not be a checkpoint ledger, try {} or {}",
ledger - ledger_offset,
ledger + (CHECKPOINT_FREQUENCY - ledger_offset),
));
}
}
return Err(Error::DownloadingHistoryGotStatusCode(response.status()));
}

let body = hyper::body::to_bytes(response.into_body())
.await
.map_err(Error::ReadHistoryHttpStream)?;

print.clear_line();
print.globeln(format!("Downloaded history {}", &history_url));

serde_json::from_slice::<History>(&body).map_err(Error::JsonDecodingHistory)
}

async fn cache_bucket(
print: &print::Print,
archive_url: &Uri,
bucket_index: usize,
bucket: &str,
) -> Result<PathBuf, Error> {
let bucket_dir = data::bucket_dir().map_err(Error::GetBucketDir)?;
let cache_path = bucket_dir.join(format!("bucket-{bucket}.xdr"));
if !cache_path.exists() {
let bucket_0 = &bucket[0..=1];
let bucket_1 = &bucket[2..=3];
let bucket_2 = &bucket[4..=5];
let bucket_url =
format!("{archive_url}/bucket/{bucket_0}/{bucket_1}/{bucket_2}/bucket-{bucket}.xdr.gz");

print.globe(format!("Downloading bucket {bucket_index} {bucket}…"));

let bucket_url = Uri::from_str(&bucket_url).map_err(Error::ParsingBucketUrl)?;
let https = hyper_tls::HttpsConnector::new();
let response = hyper::Client::builder()
.build::<_, hyper::Body>(https)
.get(bucket_url)
.await
.map_err(Error::GettingBucket)?;

if !response.status().is_success() {
print.println("");
return Err(Error::GettingBucketGotStatusCode(response.status()));
}

if let Some(val) = response.headers().get("Content-Length") {
if let Ok(str) = val.to_str() {
if let Ok(len) = str.parse::<u64>() {
print.clear_line();
print.globe(format!(
"Downloaded bucket {bucket_index} {bucket} ({})",
ByteSize(len)
));
}
}
}

print.println("");

let read = response
.into_body()
.map(|result| result.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)))
.into_async_read();
let read = tokio_util::compat::FuturesAsyncReadCompatExt::compat(read);
let mut read = GzipDecoder::new(read);
let dl_path = cache_path.with_extension("dl");
let mut file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&dl_path)
.await
.map_err(Error::WriteOpeningCachedBucket)?;
tokio::io::copy(&mut read, &mut file)
.await
.map_err(Error::StreamingBucket)?;
fs::rename(&dl_path, &cache_path).map_err(Error::RenameDownloadFile)?;
}
Ok(cache_path)
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct History {
current_ledger: u32,
current_buckets: Vec<HistoryBucket>,
network_passphrase: String,
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct HistoryBucket {
curr: String,
snap: String,
}
8 changes: 8 additions & 0 deletions cmd/soroban-cli/src/commands/contract/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod build;
pub mod deploy;
pub mod extend;
pub mod fetch;
pub mod fetch_all;
pub mod id;
pub mod info;
pub mod init;
Expand Down Expand Up @@ -45,6 +46,9 @@ pub enum Cmd {
/// Fetch a contract's Wasm binary
Fetch(fetch::Cmd),

/// Fetch all contract Wasms
FetchAll(fetch_all::Cmd),

/// Generate the contract id for a given contract or asset
#[command(subcommand)]
Id(id::Cmd),
Expand Down Expand Up @@ -107,6 +111,9 @@ pub enum Error {
#[error(transparent)]
Fetch(#[from] fetch::Error),

#[error(transparent)]
FetchAll(#[from] fetch_all::Error),

#[error(transparent)]
Init(#[from] init::Error),

Expand Down Expand Up @@ -152,6 +159,7 @@ impl Cmd {
Cmd::Invoke(invoke) => invoke.run(global_args).await?,
Cmd::Optimize(optimize) => optimize.run()?,
Cmd::Fetch(fetch) => fetch.run().await?,
Cmd::FetchAll(fetch_all) => fetch_all.run(global_args).await?,
Cmd::Read(read) => read.run().await?,
Cmd::Restore(restore) => restore.run().await?,
}
Expand Down
Loading