From 449036004d02e408a3eda555ddf28c692ca9ffb1 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Wed, 12 Jun 2024 23:33:24 +1000 Subject: [PATCH] Collect wasm with contract id and make ledger optional --- cmd/soroban-cli/src/commands/snapshot.rs | 244 +++++++++++++---------- 1 file changed, 141 insertions(+), 103 deletions(-) diff --git a/cmd/soroban-cli/src/commands/snapshot.rs b/cmd/soroban-cli/src/commands/snapshot.rs index c1893e2e4..b8e2fc38b 100644 --- a/cmd/soroban-cli/src/commands/snapshot.rs +++ b/cmd/soroban-cli/src/commands/snapshot.rs @@ -5,6 +5,7 @@ use futures::TryStreamExt; use http::Uri; use humantime::format_duration; use io_tee::TeeReader; +use sha2::{Digest, Sha256}; use soroban_ledger_snapshot::LedgerSnapshot; use std::{ collections::HashSet, @@ -15,10 +16,11 @@ use std::{ time::{Duration, Instant}, }; use stellar_xdr::curr::{ - BucketEntry, ConfigSettingEntry, ConfigSettingId, Frame, LedgerEntry, LedgerEntryData, - LedgerKey, LedgerKeyAccount, LedgerKeyClaimableBalance, LedgerKeyConfigSetting, - LedgerKeyContractCode, LedgerKeyContractData, LedgerKeyData, LedgerKeyLiquidityPool, - LedgerKeyOffer, LedgerKeyTrustLine, LedgerKeyTtl, Limited, Limits, ReadXdr, + BucketEntry, ConfigSettingEntry, ConfigSettingId, ContractExecutable, Frame, Hash, LedgerEntry, + LedgerEntryData, LedgerKey, LedgerKeyAccount, LedgerKeyClaimableBalance, + LedgerKeyConfigSetting, LedgerKeyContractCode, LedgerKeyContractData, LedgerKeyData, + LedgerKeyLiquidityPool, LedgerKeyOffer, LedgerKeyTrustLine, LedgerKeyTtl, Limited, Limits, + ReadXdr, ScContractInstance, ScVal, }; use tokio_util::compat::FuturesAsyncReadCompatExt as _; @@ -37,9 +39,9 @@ fn default_out_path() -> PathBuf { #[derive(Parser, Debug, Clone)] #[group(skip)] pub struct Cmd { - /// The ledger sequence number to snapshot. + /// The ledger sequence number to snapshot. Defaults to latest history archived ledger. #[arg(long)] - ledger: u32, + ledger: Option, /// The out path that the snapshot is written to. #[arg(long, default_value=default_out_path().into_os_string())] out: PathBuf, @@ -111,27 +113,31 @@ const CHECKPOINT_FREQUENCY: u32 = 64; impl Cmd { pub async fn run(&self) -> Result<(), Error> { const BASE_URL: &str = "http://history.stellar.org/prd/core-live/core_live_001"; - let ledger = self.ledger; let start = Instant::now(); - // Check ledger is a checkpoint ledger and available in archives. - let ledger_offset = (ledger + 1) % CHECKPOINT_FREQUENCY; - if ledger_offset != 0 { - println!( - "ledger {ledger} not a checkpoint ledger, use {} or {}", - ledger - ledger_offset, - ledger + (CHECKPOINT_FREQUENCY - ledger_offset), - ); - return Ok(()); - } + let history_url = if let Some(ledger) = self.ledger { + // Check ledger is a checkpoint ledger and available in archives. + let ledger_offset = (ledger + 1) % CHECKPOINT_FREQUENCY; + if ledger_offset != 0 { + println!( + "ledger {ledger} not a checkpoint ledger, use {} or {}", + ledger - ledger_offset, + ledger + (CHECKPOINT_FREQUENCY - ledger_offset), + ); + return Ok(()); + } + + // Download history JSON file. + let ledger_hex = format!("{ledger:08x}"); + let ledger_hex_0 = &ledger_hex[0..=1]; + let ledger_hex_1 = &ledger_hex[2..=3]; + let ledger_hex_2 = &ledger_hex[4..=5]; + format!("{BASE_URL}/history/{ledger_hex_0}/{ledger_hex_1}/{ledger_hex_2}/history-{ledger_hex}.json") + } else { + format!("{BASE_URL}/.well-known/stellar-history.json") + }; - // Download history JSON file. - let ledger_hex = format!("{ledger:08x}"); - let ledger_hex_0 = &ledger_hex[0..=1]; - let ledger_hex_1 = &ledger_hex[2..=3]; - let ledger_hex_2 = &ledger_hex[4..=5]; - let history_url = format!("{BASE_URL}/history/{ledger_hex_0}/{ledger_hex_1}/{ledger_hex_2}/history-{ledger_hex}.json"); let history_url = Uri::from_str(&history_url).unwrap(); println!("🌎 Downloading history {history_url}"); let https = hyper_tls::HttpsConnector::new(); @@ -143,6 +149,13 @@ impl Cmd { let body = hyper::body::to_bytes(response.into_body()).await.unwrap(); let history = serde_json::from_slice::(&body).unwrap(); + let ledger = history.current_ledger; + let network_passphrase = &history.network_passphrase; + let network_id = Sha256::digest(network_passphrase); + println!("ℹī¸ Ledger: {ledger}"); + println!("ℹī¸ Network Passphrase: {network_passphrase}"); + println!("ℹī¸ Network ID: {}", hex::encode(network_id)); + // Prepare a flat list of buckets to read. They'll be ordered by their // level so that they can iterated higher level to lower level. let buckets = history @@ -165,7 +178,7 @@ impl Cmd { protocol_version: 0, sequence_number: ledger, timestamp: 0, - network_id: [0u8; 32], + network_id: network_id.into(), base_reserve: 1, min_persistent_entry_ttl: 0, min_temp_entry_ttl: 0, @@ -173,6 +186,9 @@ impl Cmd { ledger_entries: Vec::new(), }; + let mut account_ids = self.account_ids.clone(); + let mut contract_ids = self.contract_ids.clone(); + let mut wasm_hashes = self.wasm_hashes.clone(); for (i, bucket) in buckets.iter().enumerate() { // Defined where the bucket will be read from, either from cache on // disk, or streamed from the archive. @@ -217,91 +233,111 @@ impl Cmd { }; let cache_path = cache_path.clone(); - let account_ids = self.account_ids.clone(); - let contract_ids = self.contract_ids.clone(); - let wasm_hashes = self.wasm_hashes.clone(); - (seen, snapshot) = tokio::task::spawn_blocking(move || { - let dl_path = cache_path.with_extension("dl"); - let buf = BufReader::new(read); - let read: Box = if stream { - // When streamed from the archive the bucket will be - // uncompressed, and also be streamed to cache. - let gz = GzDecoder::new(buf); - let buf = BufReader::new(gz); - let file = OpenOptions::new() - .create(true) - .truncate(true) - .write(true) - .open(&dl_path) - .unwrap(); - let tee = TeeReader::new(buf, file); - Box::new(tee) - } else { - Box::new(buf) - }; - // Stream the bucket entries from the bucket, identifying - // entries that match the filters, and including only the - // entries that match in the snapshot. - let limited = &mut Limited::new(read, Limits::none()); - let sz = Frame::::read_xdr_iter(limited); - let mut count_saved = 0; - for entry in sz { - let Frame(entry) = entry.unwrap(); - let (key, val) = match entry { - BucketEntry::Liveentry(l) | BucketEntry::Initentry(l) => { - let k = data_into_key(&l); - (k, Some(l)) - } - BucketEntry::Deadentry(k) => (k, None), - BucketEntry::Metaentry(m) => { - snapshot.protocol_version = m.ledger_version; - continue; - } + (seen, snapshot, account_ids, contract_ids, wasm_hashes) = + tokio::task::spawn_blocking(move || { + let dl_path = cache_path.with_extension("dl"); + let buf = BufReader::new(read); + let read: Box = if stream { + // When streamed from the archive the bucket will be + // uncompressed, and also be streamed to cache. + let gz = GzDecoder::new(buf); + let buf = BufReader::new(gz); + let file = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(&dl_path) + .unwrap(); + let tee = TeeReader::new(buf, file); + Box::new(tee) + } else { + Box::new(buf) }; - if seen.contains(&key) { - continue; - } - if let Some(val) = val { - let keep = match &val.data { - LedgerEntryData::Account(e) => { - account_ids.contains(&e.account_id.to_string()) - } - LedgerEntryData::Trustline(e) => { - account_ids.contains(&e.account_id.to_string()) - } - LedgerEntryData::ContractData(e) => { - contract_ids.contains(&e.contract.to_string()) + // Stream the bucket entries from the bucket, identifying + // entries that match the filters, and including only the + // entries that match in the snapshot. + let limited = &mut Limited::new(read, Limits::none()); + let sz = Frame::::read_xdr_iter(limited); + let mut count_saved = 0; + for entry in sz { + let Frame(entry) = entry.unwrap(); + let (key, val) = match entry { + BucketEntry::Liveentry(l) | BucketEntry::Initentry(l) => { + let k = data_into_key(&l); + (k, Some(l)) } - LedgerEntryData::ContractCode(e) => { - let hash = hex::encode(e.hash.0); - wasm_hashes.contains(&hash) + BucketEntry::Deadentry(k) => (k, None), + BucketEntry::Metaentry(m) => { + snapshot.protocol_version = m.ledger_version; + continue; } - LedgerEntryData::Offer(_) - | LedgerEntryData::Data(_) - | LedgerEntryData::ClaimableBalance(_) - | LedgerEntryData::LiquidityPool(_) - | LedgerEntryData::ConfigSetting(_) - | LedgerEntryData::Ttl(_) => false, }; - seen.insert(key.clone()); - if keep { - snapshot - .ledger_entries - .push((Box::new(key), (Box::new(val), None))); - count_saved += 1; + if seen.contains(&key) { + continue; + } + if let Some(val) = val { + let keep = match &val.data { + LedgerEntryData::Account(e) => { + account_ids.contains(&e.account_id.to_string()) + } + LedgerEntryData::Trustline(e) => { + account_ids.contains(&e.account_id.to_string()) + } + LedgerEntryData::ContractData(e) => { + let keep = contract_ids.contains(&e.contract.to_string()); + // If a contract instance references + // contract executable stored in another + // ledger entry, add that ledger entry to + // the filter so that Wasm for any filtered + // contract is collected too. TODO: Change + // this to support Wasm ledger entries + // appearing in earlier buckets after state + // archival is rolled out. + if keep && e.key == ScVal::LedgerKeyContractInstance { + if let ScVal::ContractInstance(ScContractInstance { + executable: ContractExecutable::Wasm(Hash(hash)), + .. + }) = e.val + { + let hash = hex::encode(hash); + wasm_hashes.push(hash); + } + } + keep + } + LedgerEntryData::ContractCode(e) => { + let hash = hex::encode(e.hash.0); + wasm_hashes.contains(&hash) + } + LedgerEntryData::Offer(_) + | LedgerEntryData::Data(_) + | LedgerEntryData::ClaimableBalance(_) + | LedgerEntryData::LiquidityPool(_) + | LedgerEntryData::ConfigSetting(_) + | LedgerEntryData::Ttl(_) => false, + }; + seen.insert(key.clone()); + if keep { + // Store the found ledger entry in the snapshot with + // a max u32 expiry. TODO: Change the expiry to come + // from the corresponding TTL ledger entry. + snapshot + .ledger_entries + .push((Box::new(key), (Box::new(val), Some(u32::MAX)))); + count_saved += 1; + } } } - } - if stream { - fs::rename(&dl_path, &cache_path).unwrap(); - } - if count_saved > 0 { - println!("🔎 Found {count_saved} entries"); - } - (seen, snapshot) - }) - .await - .unwrap(); + if stream { + fs::rename(&dl_path, &cache_path).unwrap(); + } + if count_saved > 0 { + println!("🔎 Found {count_saved} entries"); + } + (seen, snapshot, account_ids, contract_ids, wasm_hashes) + }) + .await + .unwrap(); } snapshot.write_file(&self.out).unwrap(); @@ -321,7 +357,9 @@ impl Cmd { #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)] #[serde(rename_all = "camelCase")] struct History { + current_ledger: u32, current_buckets: Vec, + network_passphrase: String, } #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)]