Skip to content

Commit

Permalink
Collect wasm with contract id and make ledger optional
Browse files Browse the repository at this point in the history
  • Loading branch information
leighmcculloch committed Jun 12, 2024
1 parent 379cf5d commit 4490360
Showing 1 changed file with 141 additions and 103 deletions.
244 changes: 141 additions & 103 deletions cmd/soroban-cli/src/commands/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use futures::TryStreamExt;
use http::Uri;
use humantime::format_duration;
use io_tee::TeeReader;
use sha2::{Digest, Sha256};
use soroban_ledger_snapshot::LedgerSnapshot;
use std::{
collections::HashSet,
Expand All @@ -15,10 +16,11 @@ use std::{
time::{Duration, Instant},
};
use stellar_xdr::curr::{
BucketEntry, ConfigSettingEntry, ConfigSettingId, Frame, LedgerEntry, LedgerEntryData,
LedgerKey, LedgerKeyAccount, LedgerKeyClaimableBalance, LedgerKeyConfigSetting,
LedgerKeyContractCode, LedgerKeyContractData, LedgerKeyData, LedgerKeyLiquidityPool,
LedgerKeyOffer, LedgerKeyTrustLine, LedgerKeyTtl, Limited, Limits, ReadXdr,
BucketEntry, ConfigSettingEntry, ConfigSettingId, ContractExecutable, Frame, Hash, LedgerEntry,
LedgerEntryData, LedgerKey, LedgerKeyAccount, LedgerKeyClaimableBalance,
LedgerKeyConfigSetting, LedgerKeyContractCode, LedgerKeyContractData, LedgerKeyData,
LedgerKeyLiquidityPool, LedgerKeyOffer, LedgerKeyTrustLine, LedgerKeyTtl, Limited, Limits,
ReadXdr, ScContractInstance, ScVal,
};
use tokio_util::compat::FuturesAsyncReadCompatExt as _;

Expand All @@ -37,9 +39,9 @@ fn default_out_path() -> PathBuf {
#[derive(Parser, Debug, Clone)]
#[group(skip)]
pub struct Cmd {
/// The ledger sequence number to snapshot.
/// The ledger sequence number to snapshot. Defaults to latest history archived ledger.
#[arg(long)]
ledger: u32,
ledger: Option<u32>,
/// The out path that the snapshot is written to.
#[arg(long, default_value=default_out_path().into_os_string())]
out: PathBuf,
Expand Down Expand Up @@ -111,27 +113,31 @@ const CHECKPOINT_FREQUENCY: u32 = 64;
impl Cmd {
pub async fn run(&self) -> Result<(), Error> {
const BASE_URL: &str = "http://history.stellar.org/prd/core-live/core_live_001";
let ledger = self.ledger;

let start = Instant::now();

// Check ledger is a checkpoint ledger and available in archives.
let ledger_offset = (ledger + 1) % CHECKPOINT_FREQUENCY;
if ledger_offset != 0 {
println!(
"ledger {ledger} not a checkpoint ledger, use {} or {}",
ledger - ledger_offset,
ledger + (CHECKPOINT_FREQUENCY - ledger_offset),
);
return Ok(());
}
let history_url = if let Some(ledger) = self.ledger {
// Check ledger is a checkpoint ledger and available in archives.
let ledger_offset = (ledger + 1) % CHECKPOINT_FREQUENCY;
if ledger_offset != 0 {
println!(
"ledger {ledger} not a checkpoint ledger, use {} or {}",
ledger - ledger_offset,
ledger + (CHECKPOINT_FREQUENCY - ledger_offset),
);
return Ok(());
}

// Download history JSON file.
let ledger_hex = format!("{ledger:08x}");
let ledger_hex_0 = &ledger_hex[0..=1];
let ledger_hex_1 = &ledger_hex[2..=3];
let ledger_hex_2 = &ledger_hex[4..=5];
format!("{BASE_URL}/history/{ledger_hex_0}/{ledger_hex_1}/{ledger_hex_2}/history-{ledger_hex}.json")
} else {
format!("{BASE_URL}/.well-known/stellar-history.json")
};

// Download history JSON file.
let ledger_hex = format!("{ledger:08x}");
let ledger_hex_0 = &ledger_hex[0..=1];
let ledger_hex_1 = &ledger_hex[2..=3];
let ledger_hex_2 = &ledger_hex[4..=5];
let history_url = format!("{BASE_URL}/history/{ledger_hex_0}/{ledger_hex_1}/{ledger_hex_2}/history-{ledger_hex}.json");
let history_url = Uri::from_str(&history_url).unwrap();
println!("🌎 Downloading history {history_url}");
let https = hyper_tls::HttpsConnector::new();
Expand All @@ -143,6 +149,13 @@ impl Cmd {
let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
let history = serde_json::from_slice::<History>(&body).unwrap();

let ledger = history.current_ledger;
let network_passphrase = &history.network_passphrase;
let network_id = Sha256::digest(network_passphrase);
println!("ℹ️ Ledger: {ledger}");
println!("ℹ️ Network Passphrase: {network_passphrase}");
println!("ℹ️ Network ID: {}", hex::encode(network_id));

// Prepare a flat list of buckets to read. They'll be ordered by their
// level so that they can iterated higher level to lower level.
let buckets = history
Expand All @@ -165,14 +178,17 @@ impl Cmd {
protocol_version: 0,
sequence_number: ledger,
timestamp: 0,
network_id: [0u8; 32],
network_id: network_id.into(),
base_reserve: 1,
min_persistent_entry_ttl: 0,
min_temp_entry_ttl: 0,
max_entry_ttl: 0,
ledger_entries: Vec::new(),
};

let mut account_ids = self.account_ids.clone();
let mut contract_ids = self.contract_ids.clone();
let mut wasm_hashes = self.wasm_hashes.clone();
for (i, bucket) in buckets.iter().enumerate() {
// Defined where the bucket will be read from, either from cache on
// disk, or streamed from the archive.
Expand Down Expand Up @@ -217,91 +233,111 @@ impl Cmd {
};

let cache_path = cache_path.clone();
let account_ids = self.account_ids.clone();
let contract_ids = self.contract_ids.clone();
let wasm_hashes = self.wasm_hashes.clone();
(seen, snapshot) = tokio::task::spawn_blocking(move || {
let dl_path = cache_path.with_extension("dl");
let buf = BufReader::new(read);
let read: Box<dyn Read + Sync + Send> = if stream {
// When streamed from the archive the bucket will be
// uncompressed, and also be streamed to cache.
let gz = GzDecoder::new(buf);
let buf = BufReader::new(gz);
let file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&dl_path)
.unwrap();
let tee = TeeReader::new(buf, file);
Box::new(tee)
} else {
Box::new(buf)
};
// Stream the bucket entries from the bucket, identifying
// entries that match the filters, and including only the
// entries that match in the snapshot.
let limited = &mut Limited::new(read, Limits::none());
let sz = Frame::<BucketEntry>::read_xdr_iter(limited);
let mut count_saved = 0;
for entry in sz {
let Frame(entry) = entry.unwrap();
let (key, val) = match entry {
BucketEntry::Liveentry(l) | BucketEntry::Initentry(l) => {
let k = data_into_key(&l);
(k, Some(l))
}
BucketEntry::Deadentry(k) => (k, None),
BucketEntry::Metaentry(m) => {
snapshot.protocol_version = m.ledger_version;
continue;
}
(seen, snapshot, account_ids, contract_ids, wasm_hashes) =
tokio::task::spawn_blocking(move || {
let dl_path = cache_path.with_extension("dl");
let buf = BufReader::new(read);
let read: Box<dyn Read + Sync + Send> = if stream {
// When streamed from the archive the bucket will be
// uncompressed, and also be streamed to cache.
let gz = GzDecoder::new(buf);
let buf = BufReader::new(gz);
let file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&dl_path)
.unwrap();
let tee = TeeReader::new(buf, file);
Box::new(tee)
} else {
Box::new(buf)
};
if seen.contains(&key) {
continue;
}
if let Some(val) = val {
let keep = match &val.data {
LedgerEntryData::Account(e) => {
account_ids.contains(&e.account_id.to_string())
}
LedgerEntryData::Trustline(e) => {
account_ids.contains(&e.account_id.to_string())
}
LedgerEntryData::ContractData(e) => {
contract_ids.contains(&e.contract.to_string())
// Stream the bucket entries from the bucket, identifying
// entries that match the filters, and including only the
// entries that match in the snapshot.
let limited = &mut Limited::new(read, Limits::none());
let sz = Frame::<BucketEntry>::read_xdr_iter(limited);
let mut count_saved = 0;
for entry in sz {
let Frame(entry) = entry.unwrap();
let (key, val) = match entry {
BucketEntry::Liveentry(l) | BucketEntry::Initentry(l) => {
let k = data_into_key(&l);
(k, Some(l))
}
LedgerEntryData::ContractCode(e) => {
let hash = hex::encode(e.hash.0);
wasm_hashes.contains(&hash)
BucketEntry::Deadentry(k) => (k, None),
BucketEntry::Metaentry(m) => {
snapshot.protocol_version = m.ledger_version;
continue;
}
LedgerEntryData::Offer(_)
| LedgerEntryData::Data(_)
| LedgerEntryData::ClaimableBalance(_)
| LedgerEntryData::LiquidityPool(_)
| LedgerEntryData::ConfigSetting(_)
| LedgerEntryData::Ttl(_) => false,
};
seen.insert(key.clone());
if keep {
snapshot
.ledger_entries
.push((Box::new(key), (Box::new(val), None)));
count_saved += 1;
if seen.contains(&key) {
continue;
}
if let Some(val) = val {
let keep = match &val.data {
LedgerEntryData::Account(e) => {
account_ids.contains(&e.account_id.to_string())
}
LedgerEntryData::Trustline(e) => {
account_ids.contains(&e.account_id.to_string())
}
LedgerEntryData::ContractData(e) => {
let keep = contract_ids.contains(&e.contract.to_string());
// If a contract instance references
// contract executable stored in another
// ledger entry, add that ledger entry to
// the filter so that Wasm for any filtered
// contract is collected too. TODO: Change
// this to support Wasm ledger entries
// appearing in earlier buckets after state
// archival is rolled out.
if keep && e.key == ScVal::LedgerKeyContractInstance {
if let ScVal::ContractInstance(ScContractInstance {
executable: ContractExecutable::Wasm(Hash(hash)),
..
}) = e.val
{
let hash = hex::encode(hash);
wasm_hashes.push(hash);
}
}
keep
}
LedgerEntryData::ContractCode(e) => {
let hash = hex::encode(e.hash.0);
wasm_hashes.contains(&hash)
}
LedgerEntryData::Offer(_)
| LedgerEntryData::Data(_)
| LedgerEntryData::ClaimableBalance(_)
| LedgerEntryData::LiquidityPool(_)
| LedgerEntryData::ConfigSetting(_)
| LedgerEntryData::Ttl(_) => false,
};
seen.insert(key.clone());
if keep {
// Store the found ledger entry in the snapshot with
// a max u32 expiry. TODO: Change the expiry to come
// from the corresponding TTL ledger entry.
snapshot
.ledger_entries
.push((Box::new(key), (Box::new(val), Some(u32::MAX))));
count_saved += 1;
}
}
}
}
if stream {
fs::rename(&dl_path, &cache_path).unwrap();
}
if count_saved > 0 {
println!("🔎 Found {count_saved} entries");
}
(seen, snapshot)
})
.await
.unwrap();
if stream {
fs::rename(&dl_path, &cache_path).unwrap();
}
if count_saved > 0 {
println!("🔎 Found {count_saved} entries");
}
(seen, snapshot, account_ids, contract_ids, wasm_hashes)
})
.await
.unwrap();
}

snapshot.write_file(&self.out).unwrap();
Expand All @@ -321,7 +357,9 @@ impl Cmd {
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct History {
current_ledger: u32,
current_buckets: Vec<HistoryBucket>,
network_passphrase: String,
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)]
Expand Down

0 comments on commit 4490360

Please sign in to comment.