diff --git a/.github/workflows/forester-tests.yml b/.github/workflows/forester-tests.yml index 7a0c5f83a..3213f3a83 100644 --- a/.github/workflows/forester-tests.yml +++ b/.github/workflows/forester-tests.yml @@ -1,57 +1,66 @@ +name: forester-tests + on: push: - branches: - - main + branches: [main] paths: - "forester/**" + - "forester-utils/**" - "photon-api/**" - ".github/workflows/forester-tests.yml" pull_request: - branches: - - "**" + branches: ["**"] paths: - "forester/**" + - "forester-utils/**" - "photon-api/**" - ".github/workflows/forester-tests.yml" - types: - - opened - - synchronize - - reopened - - ready_for_review - -name: forester-tests + types: [opened, synchronize, reopened, ready_for_review] concurrency: group: ${{ github.workflow }}-${{ github.ref }} cancel-in-progress: true +env: + RUSTFLAGS: "--cfg tokio_unstable -D warnings" + jobs: - forester-tests: - name: forester-tests - if: github.event.pull_request.draft == false + test: + strategy: + matrix: + test-name: [ + {name: "address-batched", command: "test_address_batched", timeout: 60, needs-test-program: true}, + {name: "state-batched", command: "test_state_batched", timeout: 60, needs-test-program: false}, + {name: "2-foresters", command: "test_epoch_monitor_with_2_foresters", timeout: 60, needs-test-program: false}, + {name: "double-registration", command: "test_epoch_double_registration", timeout: 30, needs-test-program: false} + ] + name: test-${{ matrix.test-name.name }} runs-on: ubuntu-latest - timeout-minutes: 120 - + timeout-minutes: ${{ matrix.test-name.timeout }} steps: - - name: Checkout sources - uses: actions/checkout@v4 - + - uses: actions/checkout@v4 - name: Setup and build uses: ./.github/actions/setup-and-build - - name: Clean build artifacts before tests shell: bash run: | cargo clean rm -rf target/debug/deps/* - - name: Check available disk space shell: bash run: | df -h / du -sh /home/runner/work/* | sort -hr | head -n 10 - - - name: Run forester tests + - name: Build CLI + run: | + source ./scripts/devenv.sh + npx nx build @lightprotocol/zk-compression-cli + - name: Build test program + if: ${{ matrix.test-name.needs-test-program }} + run: | + source ./scripts/devenv.sh + cargo test-sbf -p create-address-test-program + - name: Run ${{ matrix.test-name.name }} tests run: | source ./scripts/devenv.sh - npx nx test @lightprotocol/forester + cargo test --package forester ${{ matrix.test-name.command }} -- --nocapture \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 725fe0228..36a461b04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1272,26 +1272,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "config" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7328b20597b53c2454f0b1919720c25c7339051c02b72b7e05409e00b14132be" -dependencies = [ - "async-trait", - "convert_case", - "json5", - "lazy_static", - "nom", - "pathdiff", - "ron", - "rust-ini", - "serde", - "serde_json", - "toml 0.8.19", - "yaml-rust", -] - [[package]] name = "console" version = "0.15.8" @@ -1331,41 +1311,12 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4c78c047431fee22c1a7bb92e00ad095a02a983affe4d8a72e2a2c62c1b94f3" -[[package]] -name = "const-random" -version = "0.1.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" -dependencies = [ - "const-random-macro", -] - -[[package]] -name = "const-random-macro" -version = "0.1.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" -dependencies = [ - "getrandom 0.2.15", - "once_cell", - "tiny-keccak", -] - [[package]] name = "constant_time_eq" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" -[[package]] -name = "convert_case" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca" -dependencies = [ - "unicode-segmentation", -] - [[package]] name = "core-foundation" version = "0.9.4" @@ -1715,15 +1666,6 @@ dependencies = [ "syn 2.0.85", ] -[[package]] -name = "dlv-list" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" -dependencies = [ - "const-random", -] - [[package]] name = "downcast" version = "0.11.0" @@ -2003,12 +1945,11 @@ version = "1.1.0" dependencies = [ "account-compression", "anchor-lang", + "anyhow", "async-trait", - "bincode", "borsh 0.10.3", "bs58 0.5.1", "clap 4.5.23", - "config", "dashmap 6.1.0", "env_logger 0.11.5", "forester-utils", @@ -2025,8 +1966,6 @@ dependencies = [ "light-registry", "light-system-program", "light-test-utils", - "light-utils 1.1.0", - "light-verifier", "photon-api", "prometheus", "reqwest 0.11.27", @@ -2066,6 +2005,7 @@ dependencies = [ "light-registry", "light-system-program", "light-utils 1.1.0", + "light-verifier", "log", "num-bigint 0.4.6", "num-traits", @@ -2800,17 +2740,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "json5" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1" -dependencies = [ - "pest", - "pest_derive", - "serde", -] - [[package]] name = "jsonrpc-core" version = "18.0.0" @@ -3389,12 +3318,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "linked-hash-map" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" - [[package]] name = "linux-raw-sys" version = "0.4.14" @@ -3950,16 +3873,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "ordered-multimap" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ed8acf08e98e744e5384c8bc63ceb0364e68a6854187221c18df61c4797690e" -dependencies = [ - "dlv-list", - "hashbrown 0.13.2", -] - [[package]] name = "os_str_bytes" version = "6.6.1" @@ -4035,12 +3948,6 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" -[[package]] -name = "pathdiff" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8835116a5c179084a830efb3adc117ab007512b535bc1a21c991d3b32a6b44dd" - [[package]] name = "pbkdf2" version = "0.4.0" @@ -4083,51 +3990,6 @@ dependencies = [ "num", ] -[[package]] -name = "pest" -version = "2.7.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd53dff83f26735fdc1ca837098ccf133605d794cdae66acfc2bfac3ec809d95" -dependencies = [ - "memchr", - "thiserror", - "ucd-trie", -] - -[[package]] -name = "pest_derive" -version = "2.7.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a548d2beca6773b1c244554d36fcf8548a8a58e74156968211567250e48e49a" -dependencies = [ - "pest", - "pest_generator", -] - -[[package]] -name = "pest_generator" -version = "2.7.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c93a82e8d145725dcbaf44e5ea887c8a869efdcc28706df2d08c69e17077183" -dependencies = [ - "pest", - "pest_meta", - "proc-macro2", - "quote", - "syn 2.0.85", -] - -[[package]] -name = "pest_meta" -version = "2.7.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a941429fea7e08bedec25e4f6785b6ffaacc6b755da98df5ef3e7dcf4a124c4f" -dependencies = [ - "once_cell", - "pest", - "sha2 0.10.8", -] - [[package]] name = "photon-api" version = "0.45.0" @@ -4281,7 +4143,7 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d6ea3c4595b96363c13943497db34af4460fb474a95c43f4446ad341b8c9785" dependencies = [ - "toml 0.5.11", + "toml", ] [[package]] @@ -4789,18 +4651,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "ron" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b91f7eff05f748767f183df4320a63d6936e9c6107d97c9e6bdd9784f4289c94" -dependencies = [ - "base64 0.21.7", - "bitflags 2.6.0", - "serde", - "serde_derive", -] - [[package]] name = "rpassword" version = "7.3.1" @@ -4822,16 +4672,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "rust-ini" -version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e2a3bcec1f113553ef1c88aae6c020a369d03d55b58de9869a0908930385091" -dependencies = [ - "cfg-if", - "ordered-multimap", -] - [[package]] name = "rustc-demangle" version = "0.1.24" @@ -5120,15 +4960,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_spanned" -version = "0.6.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb5b1b31579f3811bf615c144393417496f152e12ac8b7663bf664f4a815306d" -dependencies = [ - "serde", -] - [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -7424,15 +7255,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "tiny-keccak" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" -dependencies = [ - "crunchy", -] - [[package]] name = "tinyvec" version = "1.8.0" @@ -7607,26 +7429,11 @@ dependencies = [ "serde", ] -[[package]] -name = "toml" -version = "0.8.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e" -dependencies = [ - "serde", - "serde_spanned", - "toml_datetime", - "toml_edit 0.22.20", -] - [[package]] name = "toml_datetime" version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" -dependencies = [ - "serde", -] [[package]] name = "toml_edit" @@ -7646,8 +7453,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "583c44c02ad26b0c3f3066fe629275e50627026c51ac2e595cca4c230ce1ce1d" dependencies = [ "indexmap 2.5.0", - "serde", - "serde_spanned", "toml_datetime", "winnow 0.6.18", ] @@ -7831,12 +7636,6 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" -[[package]] -name = "ucd-trie" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" - [[package]] name = "unicase" version = "2.7.0" @@ -8493,15 +8292,6 @@ dependencies = [ "tabled", ] -[[package]] -name = "yaml-rust" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" -dependencies = [ - "linked-hash-map", -] - [[package]] name = "yasna" version = "0.5.2" diff --git a/forester-utils/Cargo.toml b/forester-utils/Cargo.toml index 4b50e00a5..9186a6c07 100644 --- a/forester-utils/Cargo.toml +++ b/forester-utils/Cargo.toml @@ -20,7 +20,7 @@ light-registry = { workspace = true } light-system-program = { workspace = true, features = ["cpi"] } light-utils = { workspace = true } light-batched-merkle-tree = { workspace = true } - +light-verifier = { workspace = true } photon-api = { workspace = true } light-client = { workspace = true } diff --git a/forester-utils/src/forester_epoch.rs b/forester-utils/src/forester_epoch.rs index 566f69226..81e48df3d 100644 --- a/forester-utils/src/forester_epoch.rs +++ b/forester-utils/src/forester_epoch.rs @@ -86,6 +86,7 @@ pub enum TreeType { Address, State, BatchedState, + BatchedAddress, } impl Display for TreeType { @@ -94,6 +95,7 @@ impl Display for TreeType { TreeType::Address => write!(f, "address"), TreeType::State => write!(f, "state"), TreeType::BatchedState => write!(f, "batched state"), + TreeType::BatchedAddress => write!(f, "batched address"), } } } diff --git a/forester-utils/src/indexer/mod.rs b/forester-utils/src/indexer/mod.rs index 91040afd7..854633fbf 100644 --- a/forester-utils/src/indexer/mod.rs +++ b/forester-utils/src/indexer/mod.rs @@ -258,6 +258,14 @@ pub trait Indexer: Sync + Send + Debug + 'static { ) { unimplemented!() } + + async fn finalize_batched_address_tree_update( + &mut self, + _rpc: &mut R, + _merkle_tree_pubkey: Pubkey, + ) { + unimplemented!() + } } #[derive(Debug, Clone)] diff --git a/forester-utils/src/instructions.rs b/forester-utils/src/instructions.rs new file mode 100644 index 000000000..c96d49f49 --- /dev/null +++ b/forester-utils/src/instructions.rs @@ -0,0 +1,450 @@ +use light_batched_merkle_tree::{ + constants::{DEFAULT_BATCH_ADDRESS_TREE_HEIGHT, DEFAULT_BATCH_STATE_TREE_HEIGHT}, + merkle_tree::{ + AppendBatchProofInputsIx, BatchProofInputsIx, BatchedMerkleTreeAccount, + InstructionDataBatchAppendInputs, InstructionDataBatchNullifyInputs, + }, + queue::BatchedQueueAccount, +}; +use light_client::rpc::RpcConnection; +use light_hasher::{Hasher, Poseidon}; +use light_prover_client::{ + batch_address_append::get_batch_address_append_circuit_inputs, + batch_append_with_proofs::get_batch_append_with_proofs_inputs, + batch_update::get_batch_update_inputs, + gnark::{ + batch_address_append_json_formatter::to_json, + batch_append_with_proofs_json_formatter::BatchAppendWithProofsInputsJson, + batch_update_json_formatter::update_inputs_string, + constants::{PROVE_PATH, SERVER_ADDRESS}, + proof_helpers::{compress_proof, deserialize_gnark_proof_json, proof_from_json_struct}, + }, +}; +use light_utils::bigint::bigint_to_be_bytes_array; +use light_verifier::CompressedProof; +use log::{error, info}; +use reqwest::Client; +use solana_sdk::pubkey::Pubkey; +use thiserror::Error; + +use crate::indexer::Indexer; + +#[derive(Error, Debug)] +pub enum ForesterUtilsError { + #[error("parse error: {0:?}")] + ParseError(String), + #[error("prover error: {0:?}")] + ProverError(String), + #[error("rpc error: {0:?}")] + RpcError(String), + #[error("indexer error: {0:?}")] + IndexerError(String), +} + +pub async fn create_batch_update_address_tree_instruction_data>( + rpc: &mut R, + indexer: &mut I, + merkle_tree_pubkey: Pubkey, +) -> Result<(InstructionDataBatchNullifyInputs, usize), ForesterUtilsError> { + let mut merkle_tree_account = rpc.get_account(merkle_tree_pubkey).await + .map_err(|e| { + error!( + "create_batch_update_address_tree_instruction_data: failed to get account data from rpc: {:?}", + e + ); + ForesterUtilsError::RpcError("Failed to get account data".into()) + })? + .unwrap(); + + let (old_root_index, leaves_hashchain, start_index, current_root, batch_size, full_batch_index) = { + let merkle_tree = BatchedMerkleTreeAccount::address_tree_from_bytes_mut( + merkle_tree_account.data.as_mut_slice(), + ) + .unwrap(); + + let old_root_index = merkle_tree.root_history.last_index(); + let full_batch_index = merkle_tree + .get_metadata() + .queue_metadata + .next_full_batch_index; + let batch = &merkle_tree.batches[full_batch_index as usize]; + let zkp_batch_index = batch.get_num_inserted_zkps(); + let leaves_hashchain = + merkle_tree.hashchain_store[full_batch_index as usize][zkp_batch_index as usize]; + let start_index = merkle_tree.get_metadata().next_index; + let current_root = *merkle_tree.root_history.last().unwrap(); + let batch_size = batch.zkp_batch_size as usize; + + ( + old_root_index, + leaves_hashchain, + start_index, + current_root, + batch_size, + full_batch_index, + ) + }; + + let batch_start_index = indexer + .get_address_merkle_trees() + .iter() + .find(|x| x.accounts.merkle_tree == merkle_tree_pubkey) + .unwrap() + .merkle_tree + .merkle_tree + .rightmost_index; + + let addresses = indexer + .get_queue_elements( + merkle_tree_pubkey.to_bytes(), + full_batch_index, + 0, + batch_size as u64, + ) + .await + .map_err(|e| { + error!( + "create_batch_update_address_tree_instruction_data: failed to get queue elements from indexer: {:?}", + e + ); + ForesterUtilsError::IndexerError("Failed to get queue elements".into()) + })?; + + let batch_size = addresses.len(); + + // Get proof info after addresses are retrieved + let non_inclusion_proofs = indexer + .get_multiple_new_address_proofs_full( + merkle_tree_pubkey.to_bytes(), + addresses.clone(), + ) + .await + .map_err(|e| { + error!( + "create_batch_update_address_tree_instruction_data: failed to get get_multiple_new_address_proofs_full from indexer: {:?}", + e + ); + ForesterUtilsError::IndexerError("Failed to get get_multiple_new_address_proofs_full".into()) + })?; + + let mut low_element_values = Vec::new(); + let mut low_element_indices = Vec::new(); + let mut low_element_next_indices = Vec::new(); + let mut low_element_next_values = Vec::new(); + let mut low_element_proofs: Vec> = Vec::new(); + + for non_inclusion_proof in &non_inclusion_proofs { + low_element_values.push(non_inclusion_proof.low_address_value); + low_element_indices.push(non_inclusion_proof.low_address_index as usize); + low_element_next_indices.push(non_inclusion_proof.low_address_next_index as usize); + low_element_next_values.push(non_inclusion_proof.low_address_next_value); + low_element_proofs.push(non_inclusion_proof.low_address_proof.to_vec()); + } + + let subtrees = indexer + .get_subtrees(merkle_tree_pubkey.to_bytes()) + .await + .map_err(|e| { + error!( + "create_batch_update_address_tree_instruction_data: failed to get subtrees from indexer: {:?}", + e + ); + ForesterUtilsError::IndexerError("Failed to get subtrees".into()) + })? + .try_into() + .unwrap(); + + let inputs = + get_batch_address_append_circuit_inputs::<{ DEFAULT_BATCH_ADDRESS_TREE_HEIGHT as usize }>( + start_index as usize, + current_root, + low_element_values, + low_element_next_values, + low_element_indices, + low_element_next_indices, + low_element_proofs, + addresses, + subtrees, + leaves_hashchain, + batch_start_index, + batch_size, + ) + .map_err(|e| { + error!( + "create_batch_update_address_tree_instruction_data: failed to get circuit inputs: {:?}", + e + ); + ForesterUtilsError::ProverError("Failed to get circuit inputs".into()) + })?; + + let client = Client::new(); + let circuit_inputs_new_root = bigint_to_be_bytes_array::<32>(&inputs.new_root).unwrap(); + let inputs = to_json(&inputs); + + let response_result = client + .post(format!("{}{}", SERVER_ADDRESS, PROVE_PATH)) + .header("Content-Type", "text/plain; charset=utf-8") + .body(inputs) + .send() + .await + .expect("Failed to execute request."); + + if response_result.status().is_success() { + let body = response_result.text().await.unwrap(); + let proof_json = deserialize_gnark_proof_json(&body).unwrap(); + let (proof_a, proof_b, proof_c) = proof_from_json_struct(proof_json); + let (proof_a, proof_b, proof_c) = compress_proof(&proof_a, &proof_b, &proof_c); + let instruction_data = InstructionDataBatchNullifyInputs { + public_inputs: BatchProofInputsIx { + new_root: circuit_inputs_new_root, + old_root_index: old_root_index as u16, + }, + compressed_proof: CompressedProof { + a: proof_a, + b: proof_b, + c: proof_c, + }, + }; + Ok((instruction_data, batch_size)) + } else { + Err(ForesterUtilsError::ProverError( + "Prover failed to generate proof".to_string(), + )) + } +} + +pub async fn create_append_batch_ix_data>( + rpc: &mut R, + indexer: &mut I, + merkle_tree_pubkey: Pubkey, + output_queue_pubkey: Pubkey, +) -> Result { + let (merkle_tree_next_index, current_root) = { + let mut merkle_tree_account = rpc.get_account(merkle_tree_pubkey).await.unwrap().unwrap(); + let merkle_tree = BatchedMerkleTreeAccount::state_tree_from_bytes_mut( + merkle_tree_account.data.as_mut_slice(), + ) + .unwrap(); + ( + merkle_tree.get_metadata().next_index, + *merkle_tree.root_history.last().unwrap(), + ) + }; + + let (zkp_batch_size, full_batch_index, num_inserted_zkps, leaves_hashchain) = { + let mut output_queue_account = rpc.get_account(output_queue_pubkey).await.unwrap().unwrap(); + let output_queue = BatchedQueueAccount::output_queue_from_bytes_mut( + output_queue_account.data.as_mut_slice(), + ) + .unwrap(); + + let queue_metadata = output_queue.get_metadata(); + let full_batch_index = queue_metadata.batch_metadata.next_full_batch_index; + let zkp_batch_size = queue_metadata.batch_metadata.zkp_batch_size; + + let num_inserted_zkps = + output_queue.batches[full_batch_index as usize].get_num_inserted_zkps(); + + let leaves_hashchain = + output_queue.hashchain_store[full_batch_index as usize][num_inserted_zkps as usize]; + + ( + zkp_batch_size, + full_batch_index, + num_inserted_zkps, + leaves_hashchain, + ) + }; + let start = num_inserted_zkps as usize * zkp_batch_size as usize; + let end = start + zkp_batch_size as usize; + + let leaves = indexer + .get_queue_elements( + merkle_tree_pubkey.to_bytes(), + full_batch_index, + start as u64, + end as u64, + ) + .await + .unwrap(); + + info!("Leaves: {:?}", leaves); + + let (old_leaves, merkle_proofs) = { + let mut old_leaves = vec![]; + let mut merkle_proofs = vec![]; + let indices = + (merkle_tree_next_index..merkle_tree_next_index + zkp_batch_size).collect::>(); + let proofs = indexer.get_proofs_by_indices(merkle_tree_pubkey, &indices); + proofs.iter().for_each(|proof| { + old_leaves.push(proof.leaf); + merkle_proofs.push(proof.proof.clone()); + }); + + (old_leaves, merkle_proofs) + }; + + info!("Old leaves: {:?}", old_leaves); + + let (proof, new_root) = { + let circuit_inputs = + get_batch_append_with_proofs_inputs::<{ DEFAULT_BATCH_STATE_TREE_HEIGHT as usize }>( + current_root, + merkle_tree_next_index as u32, + leaves, + leaves_hashchain, + old_leaves, + merkle_proofs, + zkp_batch_size as u32, + ) + .unwrap(); + + let client = Client::new(); + let inputs_json = BatchAppendWithProofsInputsJson::from_inputs(&circuit_inputs).to_string(); + + let response = client + .post(format!("{}{}", SERVER_ADDRESS, PROVE_PATH)) + .header("Content-Type", "text/plain; charset=utf-8") + .body(inputs_json) + .send() + .await + .expect("Failed to execute request."); + + if response.status().is_success() { + let body = response.text().await.unwrap(); + let proof_json = deserialize_gnark_proof_json(&body).unwrap(); + let (proof_a, proof_b, proof_c) = proof_from_json_struct(proof_json); + let (proof_a, proof_b, proof_c) = compress_proof(&proof_a, &proof_b, &proof_c); + ( + CompressedProof { + a: proof_a, + b: proof_b, + c: proof_c, + }, + bigint_to_be_bytes_array::<32>(&circuit_inputs.new_root.to_biguint().unwrap()) + .unwrap(), + ) + } else { + error!( + "create_append_batch_ix_data: prover server respond: {:?}", + response.text().await + ); + return Err(ForesterUtilsError::ProverError( + "Prover response failed".to_string(), + )); + } + }; + + Ok(InstructionDataBatchAppendInputs { + public_inputs: AppendBatchProofInputsIx { new_root }, + compressed_proof: proof, + }) +} + +pub async fn create_nullify_batch_ix_data>( + rpc: &mut R, + indexer: &mut I, + merkle_tree_pubkey: Pubkey, +) -> Result { + let (zkp_batch_size, old_root, old_root_index, leaves_hashchain) = { + let mut account = rpc.get_account(merkle_tree_pubkey).await.unwrap().unwrap(); + let merkle_tree = + BatchedMerkleTreeAccount::state_tree_from_bytes_mut(account.data.as_mut_slice()) + .unwrap(); + let metadata = merkle_tree.get_metadata(); + let batch_idx = metadata.queue_metadata.next_full_batch_index as usize; + let zkp_size = metadata.queue_metadata.zkp_batch_size; + let batch = &merkle_tree.batches[batch_idx]; + let zkp_idx = batch.get_num_inserted_zkps(); + let hashchain = merkle_tree.hashchain_store[batch_idx][zkp_idx as usize]; + let root_idx = merkle_tree.root_history.last_index(); + let root = *merkle_tree.root_history.last().unwrap(); + (zkp_size, root, root_idx, hashchain) + }; + + let leaf_indices_tx_hashes = + indexer.get_leaf_indices_tx_hashes(merkle_tree_pubkey, zkp_batch_size as usize); + + let mut leaves = Vec::new(); + let mut tx_hashes = Vec::new(); + let mut old_leaves = Vec::new(); + let mut path_indices = Vec::new(); + let mut merkle_proofs = Vec::new(); + let mut nullifiers = Vec::new(); + + let proofs = indexer.get_proofs_by_indices( + merkle_tree_pubkey, + &leaf_indices_tx_hashes + .iter() + .map(|(index, _, _)| *index as u64) + .collect::>(), + ); + + for ((index, leaf, tx_hash), proof) in leaf_indices_tx_hashes.iter().zip(proofs.iter()) { + path_indices.push(*index); + leaves.push(*leaf); + old_leaves.push(proof.leaf); + merkle_proofs.push(proof.proof.clone()); + tx_hashes.push(*tx_hash); + let index_bytes = index.to_be_bytes(); + let nullifier = Poseidon::hashv(&[leaf, &index_bytes, tx_hash]).unwrap(); + nullifiers.push(nullifier); + } + + let inputs = get_batch_update_inputs::<{ DEFAULT_BATCH_STATE_TREE_HEIGHT as usize }>( + old_root, + tx_hashes, + leaves.to_vec(), + leaves_hashchain, + old_leaves, + merkle_proofs, + path_indices, + zkp_batch_size as u32, + ) + .unwrap(); + + let new_root = bigint_to_be_bytes_array::<32>(&inputs.new_root.to_biguint().unwrap()).unwrap(); + + let client = Client::new(); + let response = client + .post(format!("{}{}", SERVER_ADDRESS, PROVE_PATH)) + .header("Content-Type", "text/plain; charset=utf-8") + .body(update_inputs_string(&inputs)) + .send() + .await + .map_err(|e| { + error!( + "get_batched_nullify_ix_data: failed to send proof to server: {:?}", + e + ); + ForesterUtilsError::ProverError("Failed to send proof to server".into()) + })?; + + let proof = if response.status().is_success() { + let body = response.text().await.unwrap(); + let proof_json = deserialize_gnark_proof_json(&body).unwrap(); + let (proof_a, proof_b, proof_c) = proof_from_json_struct(proof_json); + let (proof_a, proof_b, proof_c) = compress_proof(&proof_a, &proof_b, &proof_c); + CompressedProof { + a: proof_a, + b: proof_b, + c: proof_c, + } + } else { + error!( + "get_batched_nullify_ix_data: failed to get proof from server: {:?}", + response.text().await + ); + return Err(ForesterUtilsError::ProverError( + "Failed to get proof from server".into(), + )); + }; + + Ok(InstructionDataBatchNullifyInputs { + public_inputs: BatchProofInputsIx { + new_root, + old_root_index: old_root_index as u16, + }, + compressed_proof: proof, + }) +} diff --git a/forester-utils/src/lib.rs b/forester-utils/src/lib.rs index fd66d2aff..d8beb6159 100644 --- a/forester-utils/src/lib.rs +++ b/forester-utils/src/lib.rs @@ -17,6 +17,7 @@ use solana_sdk::{ pub mod address_merkle_tree_config; pub mod forester_epoch; pub mod indexer; +pub mod instructions; pub mod registry; pub fn create_account_instruction( diff --git a/forester/Cargo.toml b/forester/Cargo.toml index dc7ce01d8..b807fd176 100644 --- a/forester/Cargo.toml +++ b/forester/Cargo.toml @@ -4,7 +4,6 @@ version = "1.1.0" edition = "2021" [dependencies] -config = "0.14.0" anchor-lang = { workspace = true } clap = {version = "4.5.23", features = ["derive", "env"]} solana-sdk = { workspace = true } @@ -13,14 +12,16 @@ solana-account-decoder = { workspace = true } solana-program = { workspace = true } account-compression = { workspace = true } light-batched-merkle-tree = { workspace = true } -light-prover-client = { workspace = true } light-system-program = { workspace = true, features = ["cpi"] } light-hash-set = { workspace = true, features = ["solana"] } light-hasher = { workspace = true } light-merkle-tree-reference = { workspace = true } light-registry = { workspace = true} -light-utils = { workspace = true } -light-verifier = { workspace = true } +photon-api = { workspace = true } +forester-utils = { workspace = true } +light-client = { workspace = true } +light-merkle-tree-metadata = { workspace = true } + serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } tokio = { version = "1", features = ["full"] } @@ -29,9 +30,6 @@ futures = "0.3.31" thiserror = "1" borsh = "0.10.3" bs58 = "0.5.1" -photon-api = { workspace=true } -bincode = "1.3" -forester-utils = { workspace=true } env_logger = "0.11" async-trait = "0.1.81" tracing = "0.1.40" @@ -42,13 +40,10 @@ lazy_static = "1.4" warp = "0.3" dashmap = "6.1.0" scopeguard = "1.2.0" -light-client = { workspace = true } -light-merkle-tree-metadata = { workspace = true } +anyhow = "1.0.94" [dev-dependencies] serial_test = "3.2.0" light-prover-client = { workspace = true } -light-merkle-tree-reference = { workspace = true} -light-hasher = { workspace = true} light-test-utils = { workspace = true } light-program-test = { workspace = true, features = ["devenv"] } diff --git a/forester/package.json b/forester/package.json index e3528ac31..424e52577 100644 --- a/forester/package.json +++ b/forester/package.json @@ -4,8 +4,9 @@ "license": "GPL-3.0", "scripts": { "build": "cargo build", - "test": "RUSTFLAGS=\"--cfg tokio_unstable -D warnings\" cargo test --package forester -- --test-threads=1 --nocapture", - "test-state-batch": "cargo test --package forester test_batched -- --test-threads=1 --nocapture", + "test": "RUSTFLAGS=\"--cfg tokio_unstable -D warnings\" cargo test --package forester -- --nocapture", + "test-state-batched": "cargo test --package forester test_state_batched -- --nocapture", + "test-address-batched": "cargo test --package forester test_address_batched -- --nocapture", "docker:build": "docker build --tag forester -f Dockerfile .." }, "devDependencies": { diff --git a/forester/src/batch_processor/address.rs b/forester/src/batch_processor/address.rs new file mode 100644 index 000000000..2a02f1a0e --- /dev/null +++ b/forester/src/batch_processor/address.rs @@ -0,0 +1,62 @@ +use borsh::BorshSerialize; +use forester_utils::{ + indexer::Indexer, instructions::create_batch_update_address_tree_instruction_data, +}; +use light_batched_merkle_tree::event::BatchNullifyEvent; +use light_client::rpc::RpcConnection; +use light_registry::account_compression_cpi::sdk::create_batch_update_address_tree_instruction; +use solana_sdk::signer::Signer; +use tracing::{info, instrument}; + +use super::common::BatchContext; +use crate::batch_processor::error::{BatchProcessError, Result}; + +#[instrument(level = "debug", skip(context), fields(tree = %context.merkle_tree))] +pub(crate) async fn process_batch>( + context: &BatchContext, +) -> Result { + info!("Processing address batch operation"); + let mut rpc = context.rpc_pool.get_connection().await?; + + // Create instruction data and get batch size + let (instruction_data, batch_size) = create_batch_update_address_tree_instruction_data( + &mut *rpc, + &mut *context.indexer.lock().await, + context.merkle_tree, + ) + .await + .map_err(|e| BatchProcessError::InstructionData(e.to_string()))?; + + // Create the instruction + let instruction = create_batch_update_address_tree_instruction( + context.authority.pubkey(), + context.derivation, + context.merkle_tree, + context.epoch, + instruction_data.try_to_vec().map_err(|e| { + BatchProcessError::InstructionData(format!( + "Failed to serialize instruction data: {}", + e + )) + })?, + ); + + rpc.create_and_send_transaction_with_event::( + &[instruction], + &context.authority.pubkey(), + &[&context.authority], + None, + ) + .await?; + + let mut indexer = context.indexer.lock().await; + indexer + .finalize_batched_address_tree_update(&mut *rpc, context.merkle_tree) + .await; + + info!( + "Address batch processing completed successfully. Batch size: {}", + batch_size + ); + Ok(batch_size) +} diff --git a/forester/src/batch_processor/common.rs b/forester/src/batch_processor/common.rs new file mode 100644 index 000000000..dda238d9b --- /dev/null +++ b/forester/src/batch_processor/common.rs @@ -0,0 +1,244 @@ +use std::sync::Arc; + +use forester_utils::{forester_epoch::TreeType, indexer::Indexer}; +use light_batched_merkle_tree::{ + batch::{Batch, BatchState}, + merkle_tree::BatchedMerkleTreeAccount, + queue::BatchedQueueAccount, +}; +use light_client::{rpc::RpcConnection, rpc_pool::SolanaRpcPool}; +use solana_program::pubkey::Pubkey; +use solana_sdk::signature::Keypair; +use tokio::sync::Mutex; +use tracing::info; + +use super::{address, error::Result, state, BatchProcessError}; + +#[derive(Debug)] +pub struct BatchContext> { + pub rpc_pool: Arc>, + pub indexer: Arc>, + pub authority: Keypair, + pub derivation: Pubkey, + pub epoch: u64, + pub merkle_tree: Pubkey, + pub output_queue: Pubkey, +} + +#[derive(Debug)] +pub enum BatchReadyState { + NotReady, + ReadyForAppend, + ReadyForNullify, +} + +#[derive(Debug)] +pub struct BatchProcessor> { + context: BatchContext, + tree_type: TreeType, +} + +impl> BatchProcessor { + pub fn new(context: BatchContext, tree_type: TreeType) -> Self { + Self { context, tree_type } + } + + pub async fn process(&self) -> Result { + match self.verify_batch_ready().await { + BatchReadyState::ReadyForAppend => match self.tree_type { + TreeType::BatchedAddress => address::process_batch(&self.context).await, + TreeType::BatchedState => self.process_state_append().await, + _ => Err(BatchProcessError::UnsupportedTreeType(self.tree_type)), + }, + BatchReadyState::ReadyForNullify => self.process_state_nullify().await, + BatchReadyState::NotReady => Ok(0), + } + } + + async fn verify_batch_ready(&self) -> BatchReadyState { + let mut rpc = match self.context.rpc_pool.get_connection().await { + Ok(rpc) => rpc, + Err(_) => return BatchReadyState::NotReady, + }; + + let input_ready = self.verify_input_queue_batch_ready(&mut rpc).await; + let output_ready = if self.tree_type == TreeType::BatchedState { + self.verify_output_queue_batch_ready(&mut rpc).await + } else { + false + }; + + if self.tree_type == TreeType::BatchedAddress { + return if input_ready { + BatchReadyState::ReadyForAppend + } else { + BatchReadyState::NotReady + }; + } + + // For State tree type, we need to balance between append and nullify + // operations based on the queue states + match (input_ready, output_ready) { + (true, true) => { + // If both queues are ready, check their fill levels + let input_fill = self.get_input_queue_completion(&mut rpc).await; + let output_fill = self.get_output_queue_completion(&mut rpc).await; + + info!( + "Input queue fill: {:.2}, Output queue fill: {:.2}", + input_fill, output_fill + ); + // Prioritize the queue that is more full + if input_fill > output_fill { + BatchReadyState::ReadyForNullify + } else { + BatchReadyState::ReadyForAppend + } + } + (true, false) => BatchReadyState::ReadyForNullify, + (false, true) => BatchReadyState::ReadyForAppend, + (false, false) => BatchReadyState::NotReady, + } + } + async fn get_input_queue_completion(&self, rpc: &mut R) -> f64 { + let mut account = match rpc.get_account(self.context.merkle_tree).await { + Ok(Some(account)) => account, + _ => return 0.0, + }; + + Self::calculate_completion_from_tree(account.data.as_mut_slice()) + } + + async fn get_output_queue_completion(&self, rpc: &mut R) -> f64 { + let mut account = match rpc.get_account(self.context.output_queue).await { + Ok(Some(account)) => account, + _ => return 0.0, + }; + + Self::calculate_completion_from_queue(account.data.as_mut_slice()) + } + + fn calculate_completion_from_tree(data: &mut [u8]) -> f64 { + let tree = match BatchedMerkleTreeAccount::state_tree_from_bytes_mut(data) { + Ok(tree) => tree, + Err(_) => return 0.0, + }; + + let batch_index = tree.get_metadata().queue_metadata.next_full_batch_index; + match tree.batches.get(batch_index as usize) { + Some(batch) => Self::calculate_completion(batch), + None => 0.0, + } + } + + fn calculate_completion_from_queue(data: &mut [u8]) -> f64 { + let queue = match BatchedQueueAccount::output_queue_from_bytes_mut(data) { + Ok(queue) => queue, + Err(_) => return 0.0, + }; + + let batch_index = queue.get_metadata().batch_metadata.next_full_batch_index; + match queue.batches.get(batch_index as usize) { + Some(batch) => Self::calculate_completion(batch), + None => 0.0, + } + } + + fn calculate_completion(batch: &Batch) -> f64 { + let total = batch.get_num_zkp_batches(); + if total == 0 { + return 0.0; + } + + let remaining = total - batch.get_num_inserted_zkps(); + remaining as f64 / total as f64 + } + + async fn process_state_append(&self) -> Result { + let mut rpc = self.context.rpc_pool.get_connection().await?; + let (num_inserted_zkps, zkp_batch_size) = self.get_num_inserted_zkps(&mut rpc).await?; + state::perform_append(&self.context, &mut rpc, num_inserted_zkps).await?; + Ok(zkp_batch_size) + } + + async fn process_state_nullify(&self) -> Result { + let mut rpc = self.context.rpc_pool.get_connection().await?; + let (_, zkp_batch_size) = self.get_num_inserted_zkps(&mut rpc).await?; + state::perform_nullify(&self.context, &mut rpc).await?; + Ok(zkp_batch_size) + } + + async fn get_num_inserted_zkps(&self, rpc: &mut R) -> Result<(u64, usize)> { + let (num_inserted_zkps, zkp_batch_size) = { + let mut output_queue_account = + rpc.get_account(self.context.output_queue).await?.unwrap(); + let output_queue = BatchedQueueAccount::output_queue_from_bytes_mut( + output_queue_account.data.as_mut_slice(), + ) + .map_err(|e| BatchProcessError::QueueParsing(e.to_string()))?; + + let batch_index = output_queue + .get_metadata() + .batch_metadata + .next_full_batch_index; + let zkp_batch_size = output_queue.get_metadata().batch_metadata.zkp_batch_size; + + ( + output_queue.batches[batch_index as usize].get_num_inserted_zkps(), + zkp_batch_size as usize, + ) + }; + Ok((num_inserted_zkps, zkp_batch_size)) + } + + async fn verify_input_queue_batch_ready(&self, rpc: &mut R) -> bool { + let mut account = match rpc.get_account(self.context.merkle_tree).await { + Ok(Some(account)) => account, + _ => return false, + }; + + let merkle_tree = match self.tree_type { + TreeType::BatchedAddress => { + BatchedMerkleTreeAccount::address_tree_from_bytes_mut(account.data.as_mut_slice()) + } + TreeType::BatchedState => { + BatchedMerkleTreeAccount::state_tree_from_bytes_mut(account.data.as_mut_slice()) + } + _ => return false, + }; + + if let Ok(tree) = merkle_tree { + let batch_index = tree.get_metadata().queue_metadata.next_full_batch_index; + let full_batch = tree.batches.get(batch_index as usize).unwrap(); + + full_batch.get_state() != BatchState::Inserted + && full_batch.get_current_zkp_batch_index() > full_batch.get_num_inserted_zkps() + } else { + false + } + } + + async fn verify_output_queue_batch_ready(&self, rpc: &mut R) -> bool { + let mut account = match rpc.get_account(self.context.output_queue).await { + Ok(Some(account)) => account, + _ => return false, + }; + + let output_queue = match self.tree_type { + TreeType::BatchedState => { + BatchedQueueAccount::output_queue_from_bytes_mut(account.data.as_mut_slice()) + } + _ => return false, + }; + + if let Ok(queue) = output_queue { + let batch_index = queue.get_metadata().batch_metadata.next_full_batch_index; + let full_batch = queue.batches.get(batch_index as usize).unwrap(); + + full_batch.get_state() != BatchState::Inserted + && full_batch.get_current_zkp_batch_index() > full_batch.get_num_inserted_zkps() + } else { + false + } + } +} diff --git a/forester/src/batch_processor/error.rs b/forester/src/batch_processor/error.rs new file mode 100644 index 000000000..9d031e560 --- /dev/null +++ b/forester/src/batch_processor/error.rs @@ -0,0 +1,54 @@ +use forester_utils::forester_epoch::TreeType; +use light_client::rpc_pool::PoolError; +use solana_client::rpc_request::RpcError; +use thiserror::Error; + +pub type Result = std::result::Result; + +#[derive(Debug, Error)] +pub enum BatchProcessError { + #[error("Failed to parse queue account: {0}")] + QueueParsing(String), + + #[error("Failed to parse merkle tree account: {0}")] + MerkleTreeParsing(String), + + #[error("Failed to create instruction data: {0}")] + InstructionData(String), + + #[error("Transaction failed: {0}")] + Transaction(String), + + #[error("RPC error: {0}")] + Rpc(String), + + #[error("Pool error: {0}")] + Pool(String), + + #[error("Indexer error: {0}")] + Indexer(String), + + #[error("Unsupported tree type: {0:?}")] + UnsupportedTreeType(TreeType), + + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +impl From for BatchProcessError { + fn from(e: light_client::rpc::RpcError) -> Self { + Self::Rpc(e.to_string()) + } +} + +impl From for BatchProcessError { + fn from(e: RpcError) -> Self { + Self::Rpc(e.to_string()) + } +} + +impl From for BatchProcessError { + fn from(e: PoolError) -> Self { + Self::Pool(e.to_string()) + } +} diff --git a/forester/src/batch_processor/mod.rs b/forester/src/batch_processor/mod.rs new file mode 100644 index 000000000..f4013bef7 --- /dev/null +++ b/forester/src/batch_processor/mod.rs @@ -0,0 +1,30 @@ +mod address; +mod common; +mod error; +mod state; + +use common::BatchProcessor; +use error::Result; +use forester_utils::{forester_epoch::TreeType, indexer::Indexer}; +use light_client::rpc::RpcConnection; +use tracing::{info, instrument}; + +#[instrument( + level = "debug", + fields( + epoch = context.epoch, + tree = %context.merkle_tree, + tree_type = ?tree_type + ) +)] +pub async fn process_batched_operations>( + context: BatchContext, + tree_type: TreeType, +) -> Result { + info!("process_batched_operations"); + let processor = BatchProcessor::new(context, tree_type); + processor.process().await +} + +pub use common::BatchContext; +pub use error::BatchProcessError; diff --git a/forester/src/batch_processor/state.rs b/forester/src/batch_processor/state.rs new file mode 100644 index 000000000..a3c38d381 --- /dev/null +++ b/forester/src/batch_processor/state.rs @@ -0,0 +1,114 @@ +use borsh::BorshSerialize; +use forester_utils::{ + indexer::Indexer, + instructions::{create_append_batch_ix_data, create_nullify_batch_ix_data}, +}; +use light_batched_merkle_tree::event::{BatchAppendEvent, BatchNullifyEvent}; +use light_client::rpc::RpcConnection; +use light_registry::account_compression_cpi::sdk::{ + create_batch_append_instruction, create_batch_nullify_instruction, +}; +use solana_sdk::signer::Signer; + +use super::common::BatchContext; +use crate::batch_processor::error::{BatchProcessError, Result}; + +pub(crate) async fn perform_append>( + context: &BatchContext, + rpc: &mut R, + num_inserted_zkps: u64, +) -> Result<()> { + let instruction_data = create_append_batch_ix_data( + rpc, + &mut *context.indexer.lock().await, + context.merkle_tree, + context.output_queue, + ) + .await + .map_err(|e| BatchProcessError::InstructionData(e.to_string()))?; + + let instruction = create_batch_append_instruction( + context.authority.pubkey(), + context.derivation, + context.merkle_tree, + context.output_queue, + context.epoch, + instruction_data + .try_to_vec() + .map_err(|e| BatchProcessError::InstructionData(e.to_string()))?, + ); + + rpc.create_and_send_transaction_with_event::( + &[instruction], + &context.authority.pubkey(), + &[&context.authority], + None, + ) + .await?; + + let mut indexer = context.indexer.lock().await; + indexer + .update_test_indexer_after_append( + rpc, + context.merkle_tree, + context.output_queue, + num_inserted_zkps, + ) + .await; + + Ok(()) +} + +pub(crate) async fn perform_nullify>( + context: &BatchContext, + rpc: &mut R, +) -> Result<()> { + let batch_index = get_batch_index(context, rpc).await?; + + let instruction_data = + create_nullify_batch_ix_data(rpc, &mut *context.indexer.lock().await, context.merkle_tree) + .await + .map_err(|e| BatchProcessError::InstructionData(e.to_string()))?; + + let instruction = create_batch_nullify_instruction( + context.authority.pubkey(), + context.derivation, + context.merkle_tree, + context.epoch, + instruction_data + .try_to_vec() + .map_err(|e| BatchProcessError::InstructionData(e.to_string()))?, + ); + + rpc.create_and_send_transaction_with_event::( + &[instruction], + &context.authority.pubkey(), + &[&context.authority], + None, + ) + .await?; + + context + .indexer + .lock() + .await + .update_test_indexer_after_nullification(rpc, context.merkle_tree, batch_index) + .await; + + Ok(()) +} + +async fn get_batch_index>( + context: &BatchContext, + rpc: &mut R, +) -> Result { + let mut account = rpc.get_account(context.merkle_tree).await?.unwrap(); + let merkle_tree = light_batched_merkle_tree::merkle_tree::BatchedMerkleTreeAccount::state_tree_from_bytes_mut( + account.data.as_mut_slice(), + ).map_err(|e| BatchProcessError::MerkleTreeParsing(e.to_string()))?; + + Ok(merkle_tree + .get_metadata() + .queue_metadata + .next_full_batch_index as usize) +} diff --git a/forester/src/batched_ops.rs b/forester/src/batched_ops.rs deleted file mode 100644 index a75d1558c..000000000 --- a/forester/src/batched_ops.rs +++ /dev/null @@ -1,427 +0,0 @@ -use std::sync::Arc; - -use borsh::BorshSerialize; -use forester_utils::indexer::Indexer; -use light_batched_merkle_tree::{ - batch::BatchState, - constants::DEFAULT_BATCH_STATE_TREE_HEIGHT, - event::{BatchAppendEvent, BatchNullifyEvent}, - merkle_tree::{ - AppendBatchProofInputsIx, BatchProofInputsIx, BatchedMerkleTreeAccount, - InstructionDataBatchAppendInputs, InstructionDataBatchNullifyInputs, - }, - queue::BatchedQueueAccount, -}; -use light_client::{rpc::RpcConnection, rpc_pool::SolanaRpcPool}; -use light_hasher::{Hasher, Poseidon}; -use light_prover_client::{ - batch_append_with_proofs::get_batch_append_with_proofs_inputs, - batch_update::get_batch_update_inputs, - gnark::{ - batch_append_with_proofs_json_formatter::BatchAppendWithProofsInputsJson, - batch_update_json_formatter::update_inputs_string, - constants::{PROVE_PATH, SERVER_ADDRESS}, - proof_helpers::{compress_proof, deserialize_gnark_proof_json, proof_from_json_struct}, - }, -}; -use light_registry::account_compression_cpi::sdk::{ - create_batch_append_instruction, create_batch_nullify_instruction, -}; -use light_utils::bigint::bigint_to_be_bytes_array; -use light_verifier::CompressedProof; -use reqwest::Client; -use solana_program::pubkey::Pubkey; -use solana_sdk::{signature::Keypair, signer::Signer}; -use tokio::sync::Mutex; -use tracing::error; - -use crate::{errors::ForesterError, Result}; - -pub struct BatchedOperations> { - pub rpc_pool: Arc>, - pub indexer: Arc>, - pub authority: Keypair, - pub derivation: Pubkey, - pub epoch: u64, - pub merkle_tree: Pubkey, - pub output_queue: Pubkey, -} -impl> BatchedOperations { - async fn is_batch_ready(&self) -> bool { - let mut rpc = self.rpc_pool.get_connection().await.unwrap(); - let is_batch_ready = { - let mut account = rpc.get_account(self.merkle_tree).await.unwrap().unwrap(); - let merkle_tree = - BatchedMerkleTreeAccount::state_tree_from_bytes_mut(account.data.as_mut_slice()) - .unwrap(); - let batch_index = merkle_tree - .get_metadata() - .queue_metadata - .next_full_batch_index; - let full_batch = merkle_tree.batches.get(batch_index as usize).unwrap(); - - full_batch.get_state() != BatchState::Inserted - && full_batch.get_current_zkp_batch_index() > full_batch.get_num_inserted_zkps() - }; - is_batch_ready - } - - pub async fn perform_batch_append(&self) -> Result { - let mut rpc = self.rpc_pool.get_connection().await?; - - let (num_inserted_zkps, batch_size) = { - let mut output_queue_account = - rpc.get_account(self.output_queue).await.unwrap().unwrap(); - let output_queue = BatchedQueueAccount::output_queue_from_bytes_mut( - output_queue_account.data.as_mut_slice(), - ) - .unwrap(); - let queue_metadata = output_queue.get_metadata(); - let batch_index = queue_metadata.batch_metadata.next_full_batch_index; - let num_inserted_zkps = - output_queue.batches[batch_index as usize].get_num_inserted_zkps(); - let zkp_batch_size = queue_metadata.batch_metadata.zkp_batch_size; - - (num_inserted_zkps, zkp_batch_size) - }; - - let instruction_data = self.create_append_batch_ix_data().await; - let instruction = create_batch_append_instruction( - self.authority.pubkey(), - self.derivation, - self.merkle_tree, - self.output_queue, - self.epoch, - instruction_data?.try_to_vec()?, - ); - - rpc.create_and_send_transaction_with_event::( - &[instruction], - &self.authority.pubkey(), - &[&self.authority], - None, - ) - .await?; - - self.indexer - .lock() - .await - .update_test_indexer_after_append( - &mut rpc, - self.merkle_tree, - self.output_queue, - num_inserted_zkps, - ) - .await; - Ok(batch_size as usize) - } - - pub async fn perform_batch_nullify(&self) -> Result { - let mut rpc = self.rpc_pool.get_connection().await?; - - let instruction_data = self.get_batched_nullify_ix_data().await?; - - let instruction = create_batch_nullify_instruction( - self.authority.pubkey(), - self.derivation, - self.merkle_tree, - self.epoch, - instruction_data.try_to_vec()?, - ); - - rpc.create_and_send_transaction_with_event::( - &[instruction], - &self.authority.pubkey(), - &[&self.authority], - None, - ) - .await?; - - let (batch_index, batch_size) = { - let mut account = rpc.get_account(self.merkle_tree).await.unwrap().unwrap(); - let merkle_tree = - BatchedMerkleTreeAccount::state_tree_from_bytes_mut(account.data.as_mut_slice()) - .unwrap(); - ( - merkle_tree - .get_metadata() - .queue_metadata - .next_full_batch_index, - merkle_tree.get_metadata().queue_metadata.zkp_batch_size, - ) - }; - - self.indexer - .lock() - .await - .update_test_indexer_after_nullification( - &mut rpc, - self.merkle_tree, - batch_index as usize, - ) - .await; - Ok(batch_size as usize) - } - - async fn create_append_batch_ix_data(&self) -> Result { - let mut rpc = self.rpc_pool.get_connection().await.unwrap(); - - let (merkle_tree_next_index, current_root) = { - let mut merkle_tree_account = rpc.get_account(self.merkle_tree).await.unwrap().unwrap(); - let merkle_tree = BatchedMerkleTreeAccount::state_tree_from_bytes_mut( - merkle_tree_account.data.as_mut_slice(), - ) - .unwrap(); - ( - merkle_tree.get_metadata().next_index, - *merkle_tree.root_history.last().unwrap(), - ) - }; - - let (zkp_batch_size, full_batch_index, num_inserted_zkps, leaves_hashchain) = { - let mut output_queue_account = - rpc.get_account(self.output_queue).await.unwrap().unwrap(); - let output_queue = BatchedQueueAccount::output_queue_from_bytes_mut( - output_queue_account.data.as_mut_slice(), - ) - .unwrap(); - - let queue_metadata = output_queue.get_metadata(); - let full_batch_index = queue_metadata.batch_metadata.next_full_batch_index; - let zkp_batch_size = queue_metadata.batch_metadata.zkp_batch_size; - - let num_inserted_zkps = - output_queue.batches[full_batch_index as usize].get_num_inserted_zkps(); - - let leaves_hashchain = - output_queue.hashchain_store[full_batch_index as usize][num_inserted_zkps as usize]; - - ( - zkp_batch_size, - full_batch_index, - num_inserted_zkps, - leaves_hashchain, - ) - }; - let start = num_inserted_zkps as usize * zkp_batch_size as usize; - let end = start + zkp_batch_size as usize; - - let leaves = self - .indexer - .lock() - .await - .get_queue_elements( - self.merkle_tree.to_bytes(), - full_batch_index, - start as u64, - end as u64, - ) - .await - .unwrap(); - - let (old_leaves, merkle_proofs) = { - let mut old_leaves = vec![]; - let mut merkle_proofs = vec![]; - let indices = (merkle_tree_next_index..merkle_tree_next_index + zkp_batch_size) - .collect::>(); - let proofs = self - .indexer - .lock() - .await - .get_proofs_by_indices(self.merkle_tree, &indices); - proofs.iter().for_each(|proof| { - old_leaves.push(proof.leaf); - merkle_proofs.push(proof.proof.clone()); - }); - - (old_leaves, merkle_proofs) - }; - - let (proof, new_root) = { - let circuit_inputs = get_batch_append_with_proofs_inputs::< - { DEFAULT_BATCH_STATE_TREE_HEIGHT as usize }, - >( - current_root, - merkle_tree_next_index as u32, - leaves, - leaves_hashchain, - old_leaves, - merkle_proofs, - zkp_batch_size as u32, - ) - .unwrap(); - - let client = Client::new(); - let inputs_json = - BatchAppendWithProofsInputsJson::from_inputs(&circuit_inputs).to_string(); - - let response = client - .post(format!("{}{}", SERVER_ADDRESS, PROVE_PATH)) - .header("Content-Type", "text/plain; charset=utf-8") - .body(inputs_json) - .send() - .await - .expect("Failed to execute request."); - - if response.status().is_success() { - let body = response.text().await.unwrap(); - let proof_json = deserialize_gnark_proof_json(&body).unwrap(); - let (proof_a, proof_b, proof_c) = proof_from_json_struct(proof_json); - let (proof_a, proof_b, proof_c) = compress_proof(&proof_a, &proof_b, &proof_c); - ( - CompressedProof { - a: proof_a, - b: proof_b, - c: proof_c, - }, - bigint_to_be_bytes_array::<32>(&circuit_inputs.new_root.to_biguint().unwrap()) - .unwrap(), - ) - } else { - error!( - "create_append_batch_ix_data: failed to get proof from server: {:?}", - response.text().await - ); - return Err(ForesterError::Custom( - "Failed to get proof from server".into(), - )); - } - }; - - Ok(InstructionDataBatchAppendInputs { - public_inputs: AppendBatchProofInputsIx { new_root }, - compressed_proof: proof, - }) - } - - async fn get_batched_nullify_ix_data(&self) -> Result { - let mut rpc = self.rpc_pool.get_connection().await.unwrap(); - - let (zkp_batch_size, old_root, old_root_index, leaves_hashchain) = { - let mut account = rpc.get_account(self.merkle_tree).await.unwrap().unwrap(); - let merkle_tree = - BatchedMerkleTreeAccount::state_tree_from_bytes_mut(account.data.as_mut_slice()) - .unwrap(); - let metadata = merkle_tree.get_metadata(); - let batch_idx = metadata.queue_metadata.next_full_batch_index as usize; - let zkp_size = metadata.queue_metadata.zkp_batch_size; - let batch = &merkle_tree.batches[batch_idx]; - let zkp_idx = batch.get_num_inserted_zkps(); - let hashchain = merkle_tree.hashchain_store[batch_idx][zkp_idx as usize]; - let root_idx = merkle_tree.root_history.last_index(); - let root = *merkle_tree.root_history.last().unwrap(); - (zkp_size, root, root_idx, hashchain) - }; - - let leaf_indices_tx_hashes = self - .indexer - .lock() - .await - .get_leaf_indices_tx_hashes(self.merkle_tree, zkp_batch_size as usize); - - let mut leaves = Vec::new(); - let mut tx_hashes = Vec::new(); - let mut old_leaves = Vec::new(); - let mut path_indices = Vec::new(); - let mut merkle_proofs = Vec::new(); - let mut nullifiers = Vec::new(); - - let proofs = self.indexer.lock().await.get_proofs_by_indices( - self.merkle_tree, - &leaf_indices_tx_hashes - .iter() - .map(|(index, _, _)| *index as u64) - .collect::>(), - ); - - for ((index, leaf, tx_hash), proof) in leaf_indices_tx_hashes.iter().zip(proofs.iter()) { - path_indices.push(*index); - leaves.push(*leaf); - old_leaves.push(proof.leaf); - merkle_proofs.push(proof.proof.clone()); - tx_hashes.push(*tx_hash); - let index_bytes = index.to_be_bytes(); - let nullifier = Poseidon::hashv(&[leaf, &index_bytes, tx_hash]).unwrap(); - nullifiers.push(nullifier); - } - - let inputs = get_batch_update_inputs::<{ DEFAULT_BATCH_STATE_TREE_HEIGHT as usize }>( - old_root, - tx_hashes, - leaves.to_vec(), - leaves_hashchain, - old_leaves, - merkle_proofs, - path_indices, - zkp_batch_size as u32, - ) - .unwrap(); - - let new_root = - bigint_to_be_bytes_array::<32>(&inputs.new_root.to_biguint().unwrap()).unwrap(); - - let client = Client::new(); - let response = client - .post(format!("{}{}", SERVER_ADDRESS, PROVE_PATH)) - .header("Content-Type", "text/plain; charset=utf-8") - .body(update_inputs_string(&inputs)) - .send() - .await?; - - let proof = if response.status().is_success() { - let body = response.text().await.unwrap(); - let proof_json = deserialize_gnark_proof_json(&body).unwrap(); - let (proof_a, proof_b, proof_c) = proof_from_json_struct(proof_json); - let (proof_a, proof_b, proof_c) = compress_proof(&proof_a, &proof_b, &proof_c); - CompressedProof { - a: proof_a, - b: proof_b, - c: proof_c, - } - } else { - error!( - "get_batched_nullify_ix_data: failed to get proof from server: {:?}", - response.text().await - ); - return Err(ForesterError::Custom( - "Failed to get proof from server".into(), - )); - }; - - Ok(InstructionDataBatchNullifyInputs { - public_inputs: BatchProofInputsIx { - new_root, - old_root_index: old_root_index as u16, - }, - compressed_proof: proof, - }) - } -} - -pub async fn process_batched_operations>( - rpc_pool: Arc>, - indexer: Arc>, - authority: Keypair, - derivation: Pubkey, - epoch: u64, - merkle_tree: Pubkey, - output_queue: Pubkey, -) -> Result { - let ops = BatchedOperations { - rpc_pool, - indexer, - authority, - derivation, - epoch, - merkle_tree, - output_queue, - }; - - if ops.is_batch_ready().await { - let processed_appends_count = ops.perform_batch_append().await?; - let processed_nullifications_count = ops.perform_batch_nullify().await?; - Ok(processed_appends_count + processed_nullifications_count) - } else { - Ok(0) - } -} diff --git a/forester/src/config.rs b/forester/src/config.rs index d28ba1fbd..460229f7d 100644 --- a/forester/src/config.rs +++ b/forester/src/config.rs @@ -12,7 +12,8 @@ use solana_sdk::signature::Keypair; use crate::{ cli::{StartArgs, StatusArgs}, - errors::ForesterError, + errors::ConfigError, + Result, }; #[derive(Debug)] @@ -100,35 +101,46 @@ impl Default for TransactionConfig { } } impl ForesterConfig { - pub fn new_for_start(args: &StartArgs) -> Result { + pub fn new_for_start(args: &StartArgs) -> Result { let registry_pubkey = light_registry::program::LightRegistry::id().to_string(); let payer: Vec = match &args.payer { - Some(payer_str) => serde_json::from_str(payer_str) - .map_err(|e| ForesterError::ConfigError(e.to_string()))?, - None => return Err(ForesterError::ConfigError("Payer is required".to_string())), + Some(payer_str) => { + serde_json::from_str(payer_str).map_err(|e| ConfigError::JsonParse { + field: "payer", + error: e.to_string(), + })? + } + None => return Err(ConfigError::MissingField { field: "payer" })?, }; let payer = - Keypair::from_bytes(&payer).map_err(|e| ForesterError::ConfigError(e.to_string()))?; + Keypair::from_bytes(&payer).map_err(|e| ConfigError::InvalidKeypair(e.to_string()))?; let derivation: Vec = match &args.derivation { - Some(derivation_str) => serde_json::from_str(derivation_str) - .map_err(|e| ForesterError::ConfigError(e.to_string()))?, + Some(derivation_str) => { + serde_json::from_str(derivation_str).map_err(|e| ConfigError::JsonParse { + field: "derivation", + error: e.to_string(), + })? + } None => { - return Err(ForesterError::ConfigError( - "Derivation is required".to_string(), - )) + return Err(ConfigError::MissingField { + field: "derivation", + })? } }; - let derivation_array: [u8; 32] = derivation - .try_into() - .map_err(|_| ForesterError::ConfigError("Derivation must be 32 bytes".to_string()))?; + let derivation_array: [u8; 32] = + derivation + .try_into() + .map_err(|_| ConfigError::InvalidDerivation { + reason: "must be exactly 32 bytes".to_string(), + })?; let derivation = Pubkey::from(derivation_array); let rpc_url = args .rpc_url .clone() - .ok_or_else(|| ForesterError::ConfigError("RPC URL is required".to_string()))?; + .ok_or(ConfigError::MissingField { field: "rpc_url" })?; Ok(Self { external_services: ExternalServicesConfig { @@ -166,8 +178,12 @@ impl ForesterConfig { tree_discovery_interval_seconds: args.tree_discovery_interval_seconds, enable_metrics: args.enable_metrics(), }, - registry_pubkey: Pubkey::from_str(®istry_pubkey) - .map_err(|e| ForesterError::ConfigError(e.to_string()))?, + registry_pubkey: Pubkey::from_str(®istry_pubkey).map_err(|e| { + ConfigError::InvalidPubkey { + field: "registry_pubkey", + error: e.to_string(), + } + })?, payer_keypair: payer, derivation_pubkey: derivation, address_tree_data: vec![], @@ -175,7 +191,7 @@ impl ForesterConfig { }) } - pub fn new_for_status(args: &StatusArgs) -> Result { + pub fn new_for_status(args: &StatusArgs) -> Result { let rpc_url = args.rpc_url.clone(); Ok(Self { diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index e342073e1..566622c09 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -7,6 +7,7 @@ use std::{ time::Duration, }; +use anyhow::Context; use dashmap::DashMap; use forester_utils::{ forester_epoch::{get_epoch_phases, Epoch, TreeAccounts, TreeForesterSchedule, TreeType}, @@ -18,7 +19,6 @@ use light_client::{ rpc_pool::SolanaRpcPool, }; use light_registry::{ - errors::RegistryError, protocol_config::state::ProtocolConfig, sdk::{create_finalize_registration_instruction, create_report_work_instruction}, utils::{get_epoch_pda_address, get_forester_epoch_pda_from_authority}, @@ -34,8 +34,11 @@ use tokio::{ use tracing::{debug, error, info, info_span, instrument, warn}; use crate::{ - batched_ops::process_batched_operations, - errors::ForesterError, + batch_processor::{process_batched_operations, BatchContext}, + errors::{ + ChannelError, ConfigurationError, ForesterError, InitializationError, RegistrationError, + WorkReportError, + }, metrics::{push_metrics, queue_metric_update, update_forester_sol_balance}, pagerduty::send_pagerduty_alert, queue_helpers::QueueItemData, @@ -52,7 +55,7 @@ use crate::{ ForesterConfig, ForesterEpochInfo, Result, }; -#[derive(Clone, Debug)] +#[derive(Copy, Clone, Debug)] pub struct WorkReport { pub epoch: u64, pub processed_items: usize, @@ -298,9 +301,7 @@ impl> EpochManager { debug!("New epoch detected: {}", current_epoch); let phases = get_epoch_phases(&self.protocol_config, current_epoch); if slot < phases.registration.end { - tx.send(current_epoch).await.map_err(|e| { - ForesterError::Custom(format!("Failed to send new epoch: {}", e)) - })?; + tx.send(current_epoch).await?; last_epoch = Some(current_epoch); } } @@ -352,13 +353,16 @@ impl> EpochManager { .get_anchor_account::(&forester_epoch_pda_pubkey) .await?; - match existing_pda { - Some(pda) => { + existing_pda + .map(|pda| async move { self.recover_registration_info_internal(epoch, forester_epoch_pda_pubkey, pda) .await - } - None => Err(ForesterError::ForesterEpochPdaNotFound), - } + }) + .ok_or(RegistrationError::ForesterEpochPdaNotFound { + epoch, + pda_address: forester_epoch_pda_pubkey, + })? + .await } #[instrument(level = "debug", skip(self))] @@ -370,16 +374,12 @@ impl> EpochManager { // Process previous epoch if still in active or later phase if slot > current_phases.registration.start { debug!("Processing previous epoch: {}", previous_epoch); - tx.send(previous_epoch).await.map_err(|e| { - ForesterError::Custom(format!("Failed to send previous epoch: {}", e)) - })?; + tx.send(previous_epoch).await?; } // Process current epoch debug!("Processing current epoch: {}", current_epoch); - tx.send(current_epoch) - .await - .map_err(|e| ForesterError::Custom(format!("Failed to send current epoch: {}", e)))?; + tx.send(current_epoch).await?; debug!("Finished processing current and previous epochs"); Ok(()) @@ -466,10 +466,12 @@ impl> EpochManager { // Check if it's already too late to register if slot >= phases.registration.end { - return Err(ForesterError::Custom(format!( - "Too late to register for epoch {}. Current slot: {}, Registration end: {}", - epoch, slot, phases.registration.end - ))); + return Err(RegistrationError::RegistrationPhaseEnded { + epoch, + current_slot: slot, + registration_end: phases.registration.end, + } + .into()); } for attempt in 0..max_retries { @@ -508,10 +510,11 @@ impl> EpochManager { } } } - Err(ForesterError::Custom(format!( - "Failed to register for epoch {} after {} attempts", - epoch, max_retries - ))) + Err(RegistrationError::MaxRetriesExceeded { + epoch, + attempts: max_retries, + } + .into()) } #[instrument(level = "debug", skip(self), fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch @@ -554,57 +557,46 @@ impl> EpochManager { &self.config.derivation_pubkey, ) .await - { - Ok(Some(epoch)) => { + .with_context(|| { + format!("Failed to execute epoch registration for epoch {}", epoch) + })? { + Some(epoch) => { debug!("Registered epoch: {:?}", epoch); epoch } - Ok(None) => { - return Err(ForesterError::Custom( - "Epoch::register returned None".into(), - )) - } - Err(e) => { - return Err(ForesterError::Custom(format!( - "Epoch::register failed: {:?}", - e - ))) + None => { + return Err(RegistrationError::EmptyRegistration.into()); } }; - let forester_epoch_pda = match rpc + let forester_epoch_pda = rpc .get_anchor_account::(®istered_epoch.forester_epoch_pda) .await - { - Ok(Some(pda)) => { - debug!("ForesterEpochPda: {:?}", pda); - pda - } - Ok(None) => { - return Err(ForesterError::Custom( - "Failed to get ForesterEpochPda: returned None".into(), - )) - } - Err(e) => { - return Err(ForesterError::Custom(format!( - "Failed to get ForesterEpochPda: {:?}", - e - ))) - } - }; + .with_context(|| { + format!( + "Failed to fetch ForesterEpochPda from RPC for address {}", + registered_epoch.forester_epoch_pda + ) + })? + .ok_or(RegistrationError::ForesterEpochPdaNotFound { + epoch, + pda_address: registered_epoch.forester_epoch_pda, + })?; let epoch_pda_address = get_epoch_pda_address(epoch); - let epoch_pda = match rpc + let epoch_pda = rpc .get_anchor_account::(&epoch_pda_address) - .await? - { - Some(pda) => pda, - None => { - return Err(ForesterError::Custom( - "Failed to get EpochPda: returned None".into(), - )) - } - }; + .await + .with_context(|| { + format!( + "Failed to fetch EpochPda from RPC for address {}", + epoch_pda_address + ) + })? + .ok_or(RegistrationError::EpochPdaNotFound { + epoch, + pda_address: epoch_pda_address, + })?; ForesterEpochInfo { epoch: registered_epoch, @@ -621,9 +613,12 @@ impl> EpochManager { "Too late to register for epoch {}. Current slot: {}, Registration end: {}", epoch, slot, phases.registration.end ); - Err(ForesterError::Custom( - "Too late to register for epoch".into(), - )) + Err(RegistrationError::RegistrationPhaseEnded { + epoch, + current_slot: slot, + registration_end: phases.registration.end, + } + .into()) } } @@ -640,17 +635,14 @@ impl> EpochManager { let state = phases.get_current_epoch_state(slot); let epoch_pda_address = get_epoch_pda_address(epoch); - let epoch_pda = match rpc + let epoch_pda = rpc .get_anchor_account::(&epoch_pda_address) - .await? - { - Some(pda) => pda, - None => { - return Err(ForesterError::Custom( - "Failed to get EpochPda: returned None".into(), - )) - } - }; + .await + .with_context(|| format!("Failed to fetch EpochPda for epoch {}", epoch))? + .ok_or(RegistrationError::EpochPdaNotFound { + epoch, + pda_address: epoch_pda_address, + })?; let epoch_info = Epoch { epoch, @@ -712,17 +704,27 @@ impl> EpochManager { let mut epoch_info = (*epoch_info).clone(); epoch_info.forester_epoch_pda = rpc .get_anchor_account::(&epoch_info.epoch.forester_epoch_pda) - .await? - .ok_or_else(|| ForesterError::Custom("Failed to get ForesterEpochPda".to_string()))?; + .await + .with_context(|| { + format!( + "Failed to fetch ForesterEpochPda for epoch {} at address {}", + epoch_info.epoch.epoch, epoch_info.epoch.forester_epoch_pda + ) + })? + .ok_or(RegistrationError::ForesterEpochPdaNotFound { + epoch: epoch_info.epoch.epoch, + pda_address: epoch_info.epoch.forester_epoch_pda, + })?; let slot = rpc.get_slot().await?; let trees = self.trees.lock().await; + info!("Adding schedule for trees: {:?}", *trees); epoch_info.add_trees_with_schedule(&trees, slot); info!("Finished waiting for active phase"); Ok(epoch_info) } - // TODO: add receiver for new tree discoverd -> spawn new task to process this tree derive schedule etc. + // TODO: add receiver for new tree discovered -> spawn new task to process this tree derive schedule etc. // TODO: optimize active phase startup time #[instrument( level = "debug", @@ -747,12 +749,15 @@ impl> EpochManager { let mut handles: Vec>> = Vec::new(); - debug!( + info!( "Creating threads for tree processing. Trees: {:?}", epoch_info.trees ); for tree in epoch_info.trees.iter() { - info!("Creating thread for queue {}", tree.tree_accounts.queue); + info!( + "Creating thread for tree {}", + tree.tree_accounts.merkle_tree + ); let self_clone = self_arc.clone(); let epoch_info_clone = epoch_info_arc.clone(); let tree = tree.clone(); @@ -811,8 +816,8 @@ impl> EpochManager { epoch_pda: &ForesterEpochPda, mut tree: TreeForesterSchedule, ) -> Result<()> { - debug!("enter process_queue"); - debug!("Tree schedule slots: {:?}", tree.slots); + info!("enter process_queue"); + info!("Tree schedule slots: {:?}", tree.slots); // TODO: sync at some point let mut estimated_slot = self.slot_tracker.estimated_current_slot(); @@ -830,7 +835,11 @@ impl> EpochManager { .find(|(_, slot)| slot.is_some()); if let Some((index, forester_slot)) = index_and_forester_slot { - debug!("Found eligible slot"); + info!( + "Found eligible slot, index: {}, tree: {}", + index, + tree.tree_accounts.merkle_tree.to_string() + ); let forester_slot = forester_slot.as_ref().unwrap().clone(); tree.slots.remove(index); @@ -845,41 +854,59 @@ impl> EpochManager { let light_slot_timeout = { let slot_length_u32 = u32::try_from(epoch_pda.protocol_config.slot_length) - .map_err(|_| ForesterError::Custom("Slot length overflow".into()))?; - - slot_duration() - .checked_mul(slot_length_u32) - .ok_or_else(|| { - ForesterError::Custom("Timeout calculation overflow".into()) - })? + .map_err(|_| ConfigurationError::SlotLengthOverflow { + value: epoch_pda.protocol_config.slot_length, + })?; + + let duration = slot_duration(); + duration.checked_mul(slot_length_u32).ok_or( + ConfigurationError::TimeoutCalculationOverflow { + slot_duration: duration, + slot_length: slot_length_u32, + }, + )? }; - if tree.tree_accounts.tree_type == TreeType::BatchedState { + if tree.tree_accounts.tree_type == TreeType::BatchedState + || tree.tree_accounts.tree_type == TreeType::BatchedAddress + { + let batch_context = BatchContext { + rpc_pool: self.rpc_pool.clone(), + indexer: self.indexer.clone(), + authority: self.config.payer_keypair.insecure_clone(), + derivation: self.config.derivation_pubkey, + epoch: epoch_info.epoch, + merkle_tree: tree.tree_accounts.merkle_tree, + output_queue: tree.tree_accounts.queue, + }; + let start_time = Instant::now(); - info!("Processing batched state operations"); - - let rpc_pool = self.rpc_pool.clone(); - let indexer = self.indexer.clone(); - let payer = self.config.payer_keypair.insecure_clone(); - let derivation = self.config.derivation_pubkey; - let merkle_tree = tree.tree_accounts.merkle_tree; - let queue = tree.tree_accounts.queue; - - // TODO: measure & spawn child task for processing batched state operations - let processed_count = process_batched_operations( - rpc_pool, - indexer, - payer, - derivation, - epoch_info.epoch, - merkle_tree, - queue, - ) - .await?; - info!("Processed {} batched state operations", processed_count); - queue_metric_update(epoch_info.epoch, 1, start_time.elapsed()).await; - self.increment_processed_items_count(epoch_info.epoch, processed_count) - .await; + + match process_batched_operations(batch_context, tree.tree_accounts.tree_type) + .await + { + Ok(processed_count) => { + info!( + "Processed {} operations for tree type {:?}", + processed_count, tree.tree_accounts.tree_type + ); + queue_metric_update( + epoch_info.epoch, + processed_count, + start_time.elapsed(), + ) + .await; + self.increment_processed_items_count(epoch_info.epoch, processed_count) + .await; + } + Err(e) => { + error!( + "Failed to process batched operations for tree {:?}: {:?}", + tree.tree_accounts.merkle_tree, e + ); + return Err(e.into()); + } + } } else { // TODO: measure accuracy // Optional replace with shutdown signal for all child processes @@ -1024,6 +1051,7 @@ impl> EpochManager { &self.config.derivation_pubkey, epoch_info.epoch.epoch, ); + match rpc .create_and_send_transaction( &[ix], @@ -1042,23 +1070,16 @@ impl> EpochManager { InstructionError::Custom(error_code), )) = client_error.get_transaction_error() { - let reported_work_code = RegistryError::ForesterAlreadyReportedWork as u32; - let not_in_report_work_phase_code = - RegistryError::NotInReportWorkPhase as u32; - - if error_code == reported_work_code { - info!("Work already reported for this epoch. Skipping."); - return Ok(()); - } else if error_code == not_in_report_work_phase_code { - warn!("Not in report work phase. Skipping report."); - return Ok(()); - } else { - // Log other registry errors but still return an Err - warn!("Registry error encountered: {:?}", client_error); - } + return WorkReportError::from_registry_error( + error_code, + epoch_info.epoch.epoch, + ) + .map_err(|e| anyhow::Error::from(ForesterError::from(e))); } } - return Err(ForesterError::from(e)); + return Err(anyhow::Error::from(WorkReportError::Transaction(Box::new( + e, + )))); } } @@ -1070,7 +1091,10 @@ impl> EpochManager { self.work_report_sender .send(report) .await - .map_err(|e| ForesterError::Custom(format!("Failed to send work report: {}", e)))?; + .map_err(|e| ChannelError::WorkReportSend { + epoch: report.epoch, + error: e.to_string(), + })?; info!("Work reported"); Ok(()) @@ -1213,18 +1237,20 @@ pub async fn run_service>( config.retry_config.max_retries, start_time.elapsed() ); - return Err(ForesterError::Custom(format!( - "Failed to start forester after {} attempts: {:?}", - config.retry_config.max_retries, e - ))); + return Err(InitializationError::MaxRetriesExceeded { + attempts: config.retry_config.max_retries, + error: e.to_string(), + } + .into()); } } } } - Err(ForesterError::Custom( - "Unexpected error: Retry loop exited without returning".to_string(), - )) + Err( + InitializationError::Unexpected("Retry loop exited without returning".to_string()) + .into(), + ) }) .await } diff --git a/forester/src/errors.rs b/forester/src/errors.rs index 8d6fefe4f..079550527 100644 --- a/forester/src/errors.rs +++ b/forester/src/errors.rs @@ -1,128 +1,228 @@ -use account_compression::initialize_address_merkle_tree::Error as AccountCompressionError; -use config::ConfigError; -use forester_utils::indexer::IndexerError; +use std::time::Duration; + use light_client::{rpc::errors::RpcError, rpc_pool::PoolError}; -use light_hash_set::HashSetError; +use light_registry::errors::RegistryError; use photon_api::apis::{default_api::GetCompressedAccountProofPostError, Error as PhotonApiError}; -use prometheus::Error as PrometheusError; -use reqwest::Error as ReqwestError; -use solana_client::pubsub_client::PubsubClientError; +use solana_program::{program_error::ProgramError, pubkey::Pubkey}; use thiserror::Error; -use tokio::{ - sync::{mpsc::error::SendError, oneshot::error::RecvError}, - task::JoinError, -}; +use tracing::{info, warn}; + +use crate::batch_processor::BatchProcessError; #[derive(Error, Debug)] pub enum ForesterError { #[error("Element is not eligible for foresting")] NotEligible, - #[error("RPC Error: {0}")] - RpcError(#[from] RpcError), - #[error("failed to deserialize account data")] - DeserializeError(#[from] solana_sdk::program_error::ProgramError), - #[error("failed to copy merkle tree")] - CopyMerkleTreeError(#[from] std::io::Error), - #[error(transparent)] - AccountCompressionError(#[from] AccountCompressionError), - #[error(transparent)] - HashSetError(#[from] HashSetError), + + #[error("Registration error: {0}")] + Registration(#[from] RegistrationError), + + #[error("Configuration error: {0}")] + Configuration(#[from] ConfigurationError), + + #[error("Work report error: {0}")] + WorkReport(#[from] WorkReportError), + + #[error("Epoch registration returned no result")] + EmptyRegistration, + + #[error("Failed to register epoch {epoch}: {error}")] + RegistrationFailed { epoch: u64, error: String }, + + #[error("Batch processing error: {0}")] + BatchProcessing(#[from] BatchProcessError), + + #[error("RPC error: {0}")] + Rpc(#[from] RpcError), + + #[error("Pool error: {0}")] + Pool(#[from] PoolError), + + #[error("Program error: {0}")] + Program(#[from] ProgramError), + + #[error("Indexer error: {0}")] + Indexer(#[from] IndexerError), + + #[error("Channel error: {0}")] + Channel(#[from] ChannelError), + + #[error("Subscription error: {0}")] + Subscription(String), + + #[error("Initialization error: {0}")] + Initialization(#[from] InitializationError), + + #[error("Account deserialization error: {0}")] + AccountDeserialization(#[from] AccountDeserializationError), + + #[error("Forester error: {error}")] + General { error: String }, + #[error(transparent)] - PhotonApiError(PhotonApiErrorWrapper), - #[error("bincode error")] - BincodeError(#[from] Box), - #[error("Indexer can't find any proofs")] - NoProofsFound, - #[error("Max retries reached")] - MaxRetriesReached, - #[error("error: {0:?}")] - SendError(String), - #[error("error: {0:?}")] - IndexerError(String), - #[error("Recv error: {0}")] - RecvError(#[from] RecvError), - #[error("error: {0:?}")] - JoinError(String), - #[error("Solana pubsub client error: {0}")] - PubsubClientError(#[from] PubsubClientError), - #[error("Channel disconnected")] - ChannelDisconnected, - #[error("Subscription timeout")] - SubscriptionTimeout, - #[error("Unexpected message: {0}")] - UnexpectedMessage(String), - #[error("Config error: {0:?}")] - ConfigError(String), - #[error("error: {0:?}")] - PrometheusError(PrometheusError), - #[error("error: {0:?}")] - ReqwestError(ReqwestError), - #[error("error: {0:?}")] - Custom(String), - #[error("unknown error")] - Unknown, - #[error("ForesterEpochPda not found")] - ForesterEpochPdaNotFound, + Other(#[from] anyhow::Error), } #[derive(Error, Debug)] -pub enum PhotonApiErrorWrapper { - #[error(transparent)] - GetCompressedAccountProofPostError(#[from] PhotonApiError), +pub enum RegistrationError { + #[error("Too late to register for epoch {epoch}. Current slot: {current_slot}, Registration end: {registration_end}")] + RegistrationPhaseEnded { + epoch: u64, + current_slot: u64, + registration_end: u64, + }, + + #[error("Epoch registration returned no result")] + EmptyRegistration, + + #[error("Failed to register epoch {epoch}: {error}")] + RegistrationFailed { epoch: u64, error: String }, + + #[error("Failed to register for epoch {epoch} after {attempts} attempts")] + MaxRetriesExceeded { epoch: u64, attempts: u32 }, + + #[error("Failed to register forester: {0}")] + ForesterRegistration(String), + + #[error("ForesterEpochPda not found for address {pda_address}")] + ForesterEpochPdaNotFound { epoch: u64, pda_address: Pubkey }, + + #[error("Failed to fetch ForesterEpochPda for address {pda_address}: {error}")] + ForesterEpochPdaFetchFailed { pda_address: Pubkey, error: String }, + + #[error("EpochPda not found for address {pda_address}")] + EpochPdaNotFound { epoch: u64, pda_address: Pubkey }, } -impl From> for ForesterError { - fn from(err: PhotonApiError) -> Self { - ForesterError::PhotonApiError(PhotonApiErrorWrapper::GetCompressedAccountProofPostError( - err, - )) - } +#[derive(Error, Debug)] +pub enum ConfigError { + #[error("Missing required field: {field}")] + MissingField { field: &'static str }, + + #[error("Invalid keypair data: {0}")] + InvalidKeypair(String), + + #[error("Invalid pubkey: {field} - {error}")] + InvalidPubkey { field: &'static str, error: String }, + + #[error("Invalid derivation: {reason}")] + InvalidDerivation { reason: String }, + + #[error("JSON parsing error: {field} - {error}")] + JsonParse { field: &'static str, error: String }, } -impl From for ForesterError { - fn from(err: IndexerError) -> Self { - ForesterError::IndexerError(err.to_string()) - } +#[derive(Error, Debug)] +pub enum AccountDeserializationError { + #[error("Failed to deserialize batch state tree account: {error}")] + BatchStateMerkleTree { error: String }, + + #[error("Failed to deserialize batch address tree account: {error}")] + BatchAddressMerkleTree { error: String }, } -impl From> for ForesterError { - fn from(err: SendError) -> Self { - ForesterError::SendError(err.to_string()) - } +#[derive(Error, Debug)] +pub enum IndexerError { + #[error("Indexer error: {error}")] + General { error: String }, } -impl From for ForesterError { - fn from(err: JoinError) -> Self { - ForesterError::JoinError(err.to_string()) - } +#[derive(Error, Debug)] +pub enum ChannelError { + #[error("Failed to send work report for epoch {epoch}: {error}")] + WorkReportSend { epoch: u64, error: String }, + + #[error("Channel error: {error}")] + General { error: String }, +} + +#[derive(Error, Debug)] +pub enum ConfigurationError { + #[error("Slot length overflow: value {value} cannot fit in u32")] + SlotLengthOverflow { value: u64 }, + + #[error( + "Timeout calculation overflow: slot_duration {slot_duration:?} * slot_length {slot_length}" + )] + TimeoutCalculationOverflow { + slot_duration: Duration, + slot_length: u32, + }, +} + +#[derive(Error, Debug)] +pub enum InitializationError { + #[error("Failed to start forester after {attempts} attempts. Last error: {error}")] + MaxRetriesExceeded { attempts: u32, error: String }, + + #[error("Unexpected initialization error: {0}")] + Unexpected(String), +} + +#[derive(Error, Debug)] +pub enum WorkReportError { + #[error("Not in report work phase for epoch {epoch}")] + NotInReportPhase { epoch: u64 }, + + #[error("Work already reported for epoch {epoch}")] + AlreadyReported { epoch: u64 }, + + #[error("Registry instruction error: {error_code}")] + RegistryInstruction { error_code: u32 }, + + #[error("Transaction failed: {0}")] + Transaction(#[from] Box), } -impl From for ForesterError { - fn from(err: PoolError) -> Self { - ForesterError::Custom(err.to_string()) +impl WorkReportError { + pub(crate) fn from_registry_error(error_code: u32, epoch: u64) -> Result<(), Self> { + match error_code { + code if code == RegistryError::ForesterAlreadyReportedWork as u32 => { + info!("Work already reported for epoch {}. Skipping.", epoch); + Ok(()) + } + code if code == RegistryError::NotInReportWorkPhase as u32 => { + warn!("Not in report work phase for epoch {}. Skipping.", epoch); + Ok(()) + } + code => Err(Self::RegistryInstruction { error_code: code }), + } } } -impl From for ForesterError { - fn from(err: ConfigError) -> Self { - ForesterError::Custom(err.to_string()) +#[derive(Error, Debug)] +pub enum PhotonApiErrorWrapper { + #[error(transparent)] + GetCompressedAccountProofPostError(#[from] PhotonApiError), +} +impl ForesterError { + pub fn indexer(error: E) -> Self { + Self::Indexer(IndexerError::General { + error: error.to_string(), + }) + } + + pub fn channel(error: E) -> Self { + Self::Channel(ChannelError::General { + error: error.to_string(), + }) } } -impl From for ForesterError { - fn from(err: PrometheusError) -> ForesterError { - ForesterError::PrometheusError(err) +impl From> for ForesterError { + fn from(err: tokio::sync::mpsc::error::SendError) -> Self { + Self::channel(err) } } -impl From for ForesterError { - fn from(err: ReqwestError) -> ForesterError { - ForesterError::ReqwestError(err) +impl From for ForesterError { + fn from(err: tokio::sync::oneshot::error::RecvError) -> Self { + Self::channel(err) } } -impl From for ForesterError { - fn from(err: String) -> ForesterError { - ForesterError::Custom(err) +impl From for ForesterError { + fn from(err: tokio::task::JoinError) -> Self { + Self::Other(err.into()) } } diff --git a/forester/src/forester_status.rs b/forester/src/forester_status.rs index 60622e787..28adbc9c0 100644 --- a/forester/src/forester_status.rs +++ b/forester/src/forester_status.rs @@ -164,6 +164,7 @@ pub async fn fetch_forester_status(args: &StatusArgs) { TreeType::State => "State", TreeType::Address => "Address", TreeType::BatchedState => "BatchedState", + TreeType::BatchedAddress => "BatchedAddress", } ); let tree_info = get_tree_fullness(&mut rpc, tree.merkle_tree, tree.tree_type) diff --git a/forester/src/lib.rs b/forester/src/lib.rs index 4c7c88880..d99a5790f 100644 --- a/forester/src/lib.rs +++ b/forester/src/lib.rs @@ -1,6 +1,6 @@ -pub type Result = std::result::Result; +pub type Result = anyhow::Result; -pub mod batched_ops; +mod batch_processor; pub mod cli; pub mod config; pub mod epoch_manager; @@ -37,7 +37,6 @@ use tracing::debug; use crate::{ epoch_manager::{run_service, WorkReport}, - errors::ForesterError, metrics::QUEUE_LENGTH, queue_helpers::fetch_queue_item_data, slot_tracker::SlotTracker, @@ -92,8 +91,7 @@ pub async fn run_pipeline>( CommitmentConfig::confirmed(), config.general_config.rpc_pool_size as u32, ) - .await - .map_err(|e| ForesterError::Custom(e.to_string()))?; + .await?; let protocol_config = { let mut rpc = rpc_pool.get_connection().await?; diff --git a/forester/src/metrics.rs b/forester/src/metrics.rs index 9b0b35de2..f7e9a658a 100644 --- a/forester/src/metrics.rs +++ b/forester/src/metrics.rs @@ -169,7 +169,7 @@ pub async fn push_metrics(url: &Option) -> Result<()> { res.status(), res.text().await? ); - Err(error_message.into()) + Err(anyhow::anyhow!(error_message)) } } diff --git a/forester/src/pubsub_client.rs b/forester/src/pubsub_client.rs index 809d2b931..de28d3d53 100644 --- a/forester/src/pubsub_client.rs +++ b/forester/src/pubsub_client.rs @@ -11,20 +11,26 @@ use solana_sdk::commitment_config::CommitmentConfig; use tokio::{runtime::Builder, sync::mpsc}; use tracing::{debug, error}; -use crate::{errors::ForesterError, queue_helpers::QueueUpdate, ForesterConfig, Result}; +use crate::{ + errors::{ChannelError, ForesterError}, + queue_helpers::QueueUpdate, + ForesterConfig, Result, +}; pub async fn setup_pubsub_client( config: &ForesterConfig, queue_pubkeys: std::collections::HashSet, ) -> Result<(mpsc::Receiver, mpsc::Sender<()>)> { - let ws_url = match &config.external_services.ws_rpc_url { - Some(url) => url.clone(), - None => { - return Err(ForesterError::Custom( - "PubSub client requires a WebSocket URL".to_string(), - )) - } - }; + let ws_url = config + .external_services + .ws_rpc_url + .as_ref() + .ok_or_else(|| { + ForesterError::Channel(ChannelError::General { + error: "PubSub client requires a WebSocket URL".to_string(), + }) + })? + .clone(); debug!( "Setting up pubsub client for {} queues", @@ -61,13 +67,19 @@ fn spawn_pubsub_client( let rt = Builder::new_current_thread() .enable_all() .build() - .map_err(|e| ForesterError::Custom(format!("Failed to build runtime: {}", e)))?; + .map_err(|e| { + ForesterError::Channel(ChannelError::General { + error: format!("Failed to build runtime: {}", e), + }) + })?; rt.block_on(async { debug!("Connecting to PubSub at {}", ws_url); - let pubsub_client = PubsubClient::new(&ws_url).await.map_err(|e| { - ForesterError::Custom(format!("Failed to create PubsubClient: {}", e)) - })?; + let pubsub_client = PubsubClient::new(&ws_url) + .await + .map_err(|e| ForesterError::Channel(ChannelError::General { + error: format!("Failed to create PubsubClient: {}", e) + }))?; debug!("PubSub connection established"); @@ -86,9 +98,9 @@ fn spawn_pubsub_client( }), ) .await - .map_err(|e| { - ForesterError::Custom(format!("Failed to subscribe to program: {}", e)) - })?; + .map_err(|e| ForesterError::Channel(ChannelError::General { + error: format!("Failed to subscribe to program: {}", e) + }))?; loop { tokio::select! { diff --git a/forester/src/queue_helpers.rs b/forester/src/queue_helpers.rs index c42c1fad1..98b0606cc 100644 --- a/forester/src/queue_helpers.rs +++ b/forester/src/queue_helpers.rs @@ -5,7 +5,7 @@ use light_client::rpc::RpcConnection; use light_hash_set::HashSet; use tracing::debug; -use crate::{errors::ForesterError, Result}; +use crate::Result; #[derive(Debug, Clone)] pub struct QueueItemData { @@ -21,10 +21,7 @@ pub async fn fetch_queue_item_data( queue_length: u16, ) -> Result> { debug!("Fetching queue data for {:?}", queue_pubkey); - let mut account = rpc - .get_account(*queue_pubkey) - .await? - .ok_or_else(|| ForesterError::Custom("Queue account not found".to_string()))?; + let mut account = rpc.get_account(*queue_pubkey).await?.unwrap(); let queue: HashSet = unsafe { HashSet::from_bytes_copy(&mut account.data[8 + mem::size_of::()..])? }; diff --git a/forester/src/rollover/operations.rs b/forester/src/rollover/operations.rs index d9647dcd5..f3bfd5e70 100644 --- a/forester/src/rollover/operations.rs +++ b/forester/src/rollover/operations.rs @@ -167,6 +167,68 @@ pub async fn get_tree_fullness( threshold, }) } + + TreeType::BatchedAddress => { + let mut account = rpc.get_account(tree_pubkey).await?.unwrap(); + let merkle_tree = + BatchedMerkleTreeAccount::state_tree_from_bytes_mut(&mut account.data).unwrap(); + println!( + "merkle_tree.get_account().queue.batch_size: {:?}", + merkle_tree.get_metadata().queue_metadata.batch_size + ); + + println!( + "queue currently_processing_batch_index: {:?}", + merkle_tree + .get_metadata() + .queue_metadata + .currently_processing_batch_index as usize + ); + + println!( + "queue batch_size: {:?}", + merkle_tree.get_metadata().queue_metadata.batch_size + ); + println!( + "queue zkp_batch_size: {:?}", + merkle_tree.get_metadata().queue_metadata.zkp_batch_size + ); + println!( + "queue next_full_batch_index: {:?}", + merkle_tree + .get_metadata() + .queue_metadata + .next_full_batch_index + ); + println!( + "queue bloom_filter_capacity: {:?}", + merkle_tree + .get_metadata() + .queue_metadata + .bloom_filter_capacity + ); + println!( + "queue num_batches: {:?}", + merkle_tree.get_metadata().queue_metadata.num_batches + ); + + println!( + "tree next_index: {:?}", + merkle_tree.get_metadata().next_index + ); + println!("tree height: {:?}", merkle_tree.get_metadata().height); + + // TODO: implement + let threshold = 0; + let next_index = 0; + let fullness = 0.0; + + Ok(TreeInfo { + fullness, + next_index, + threshold, + }) + } } } diff --git a/forester/src/send_transaction.rs b/forester/src/send_transaction.rs index 8f60279a3..054ea2d55 100644 --- a/forester/src/send_transaction.rs +++ b/forester/src/send_transaction.rs @@ -355,7 +355,9 @@ pub async fn fetch_proofs_and_create_instructions(rpc: &R) -> Result> { let program_id = account_compression::id(); @@ -27,6 +27,7 @@ fn process_account(pubkey: Pubkey, mut account: Account) -> Option process_state_account(&account, pubkey) .or_else(|_| process_batch_state_account(&mut account, pubkey)) .or_else(|_| process_address_account(&account, pubkey)) + .or_else(|_| process_batch_address_account(&mut account, pubkey)) .ok() } @@ -40,10 +41,20 @@ fn process_state_account(account: &Account, pubkey: Pubkey) -> Result Result { + check_discriminator::(&account.data)?; + let tree_account = AddressMerkleTreeAccount::deserialize(&mut &account.data[8..])?; + Ok(create_tree_accounts( + pubkey, + &tree_account.metadata, + TreeType::Address, + )) +} + fn process_batch_state_account(account: &mut Account, pubkey: Pubkey) -> Result { let tree_account = BatchedMerkleTreeAccount::state_tree_from_bytes_mut(&mut account.data) - .map_err(|e| { - ForesterError::Custom(format!("Failed to deserialize state tree account: {:?}", e)) + .map_err(|e| AccountDeserializationError::BatchStateMerkleTree { + error: e.to_string(), })?; Ok(create_tree_accounts( pubkey, @@ -52,13 +63,15 @@ fn process_batch_state_account(account: &mut Account, pubkey: Pubkey) -> Result< )) } -fn process_address_account(account: &Account, pubkey: Pubkey) -> Result { - check_discriminator::(&account.data)?; - let tree_account = AddressMerkleTreeAccount::deserialize(&mut &account.data[8..])?; +fn process_batch_address_account(account: &mut Account, pubkey: Pubkey) -> Result { + let tree_account = BatchedMerkleTreeAccount::address_tree_from_bytes_mut(&mut account.data) + .map_err(|e| AccountDeserializationError::BatchAddressMerkleTree { + error: e.to_string(), + })?; Ok(create_tree_accounts( pubkey, - &tree_account.metadata, - TreeType::Address, + &tree_account.get_metadata().metadata, + TreeType::BatchedAddress, )) } diff --git a/forester/tests/batched_address_test.rs b/forester/tests/batched_address_test.rs new file mode 100644 index 000000000..ba4ff36fe --- /dev/null +++ b/forester/tests/batched_address_test.rs @@ -0,0 +1,324 @@ +use std::{sync::Arc, time::Duration}; + +use forester::run_pipeline; +use forester_utils::{ + indexer::AddressMerkleTreeAccounts, + registry::{register_test_forester, update_test_forester}, +}; +use light_batched_merkle_tree::{ + batch::BatchState, initialize_address_tree::InitAddressTreeAccountsInstructionData, + merkle_tree::BatchedMerkleTreeAccount, +}; +use light_client::{ + rpc::{solana_rpc::SolanaRpcUrl, RpcConnection, SolanaRpcConnection}, + rpc_pool::SolanaRpcPool, +}; +use light_program_test::test_env::EnvAccounts; +use light_prover_client::gnark::helpers::{LightValidatorConfig, ProverConfig, ProverMode}; +use light_test_utils::{ + create_address_test_program_sdk::perform_create_pda_with_event_rnd, e2e_test_env::E2ETestEnv, + indexer::TestIndexer, +}; +use serial_test::serial; +use solana_program::native_token::LAMPORTS_PER_SOL; +use solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair, signer::Signer}; +use tokio::{ + sync::{mpsc, oneshot, Mutex}, + time::{sleep, timeout}, +}; +use tracing::log::info; + +use crate::test_utils::{forester_config, general_action_config, init, keypair_action_config}; + +mod test_utils; + +#[tokio::test(flavor = "multi_thread", worker_threads = 32)] +#[serial] +async fn test_address_batched() { + init(Some(LightValidatorConfig { + enable_indexer: false, + wait_time: 60, + prover_config: Some(ProverConfig { + run_mode: Some(ProverMode::ForesterTest), + circuits: vec![], + }), + sbf_programs: vec![( + "FNt7byTHev1k5x2cXZLBr8TdWiC3zoP5vcnZR4P682Uy".to_string(), + "../target/deploy/create_address_test_program.so".to_string(), + )], + })) + .await; + + let tree_params = InitAddressTreeAccountsInstructionData::test_default(); + + let forester_keypair = Keypair::new(); + let mut env_accounts = EnvAccounts::get_local_test_validator_accounts(); + env_accounts.forester = forester_keypair.insecure_clone(); + + let mut config = forester_config(); + config.payer_keypair = forester_keypair.insecure_clone(); + + let pool = SolanaRpcPool::::new( + config.external_services.rpc_url.to_string(), + CommitmentConfig::processed(), + config.general_config.rpc_pool_size as u32, + ) + .await + .unwrap(); + + let commitment_config = CommitmentConfig::confirmed(); + let mut rpc = SolanaRpcConnection::new(SolanaRpcUrl::Localnet, Some(commitment_config)); + rpc.payer = forester_keypair.insecure_clone(); + + rpc.airdrop_lamports(&forester_keypair.pubkey(), LAMPORTS_PER_SOL * 100_000) + .await + .unwrap(); + + rpc.airdrop_lamports( + &env_accounts.governance_authority.pubkey(), + LAMPORTS_PER_SOL * 100_000, + ) + .await + .unwrap(); + + register_test_forester( + &mut rpc, + &env_accounts.governance_authority, + &forester_keypair.pubkey(), + light_registry::ForesterConfig::default(), + ) + .await + .unwrap(); + + let new_forester_keypair = Keypair::new(); + rpc.airdrop_lamports(&new_forester_keypair.pubkey(), LAMPORTS_PER_SOL * 100_000) + .await + .unwrap(); + + update_test_forester( + &mut rpc, + &forester_keypair, + &forester_keypair.pubkey(), + Some(&new_forester_keypair), + light_registry::ForesterConfig::default(), + ) + .await + .unwrap(); + + config.derivation_pubkey = forester_keypair.pubkey(); + config.payer_keypair = new_forester_keypair.insecure_clone(); + + let config = Arc::new(config); + + let indexer: TestIndexer = + TestIndexer::init_from_env(&config.payer_keypair, &env_accounts, None).await; + + let mut env = E2ETestEnv::>::new( + rpc, + indexer, + &env_accounts, + keypair_action_config(), + general_action_config(), + 0, + Some(0), + ) + .await; + + let address_trees: Vec = env + .indexer + .address_merkle_trees + .iter() + .map(|x| x.accounts) + .collect(); + + println!("Address trees: {:?}", address_trees); + for tree in address_trees { + let is_v2 = tree.merkle_tree == tree.queue; + println!("Tree {:?} is_v2: {}", tree, is_v2); + } + + println!("Removing trees..."); + env.indexer.address_merkle_trees.clear(); + + println!("Creating new address batch tree..."); + + let merkle_tree_keypair = Keypair::new(); + env.indexer + .add_address_merkle_tree( + &mut env.rpc, + &merkle_tree_keypair, + &merkle_tree_keypair, + None, + 2, + ) + .await; + env_accounts.batch_address_merkle_tree = merkle_tree_keypair.pubkey(); + + let address_trees: Vec = env + .indexer + .address_merkle_trees + .iter() + .map(|x| x.accounts) + .collect(); + + println!("New address trees: {:?}", address_trees); + for tree in address_trees { + let is_v2 = tree.merkle_tree == tree.queue; + println!("Tree {:?} is_v2: {}", tree, is_v2); + } + + let mut merkle_tree_account = env + .rpc + .get_account(merkle_tree_keypair.pubkey()) + .await + .unwrap() + .unwrap(); + let merkle_tree = + BatchedMerkleTreeAccount::address_tree_from_bytes_mut(&mut merkle_tree_account.data) + .unwrap(); + + for i in 0..merkle_tree.get_metadata().queue_metadata.batch_size { + println!("===================== tx {} =====================", i); + + perform_create_pda_with_event_rnd( + &mut env.indexer, + &mut env.rpc, + &env_accounts, + &env.payer, + ) + .await + .unwrap(); + + sleep(Duration::from_millis(100)).await; + } + + let merkle_tree_pubkey = env.indexer.address_merkle_trees[0].accounts.merkle_tree; + + let zkp_batches = tree_params.input_queue_batch_size / tree_params.input_queue_zkp_batch_size; + + println!("zkp_batches: {}", zkp_batches); + + let (initial_next_index, initial_sequence_number, pre_root) = { + let mut rpc = pool.get_connection().await.unwrap(); + let mut merkle_tree_account = rpc.get_account(merkle_tree_pubkey).await.unwrap().unwrap(); + + let merkle_tree = BatchedMerkleTreeAccount::address_tree_from_bytes_mut( + merkle_tree_account.data.as_mut_slice(), + ) + .unwrap(); + + let initial_next_index = merkle_tree.get_metadata().next_index; + let initial_sequence_number = merkle_tree.get_metadata().sequence_number; + + ( + initial_next_index, + initial_sequence_number, + merkle_tree.get_root().unwrap(), + ) + }; + + let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + let (work_report_sender, mut work_report_receiver) = mpsc::channel(100); + + let service_handle = tokio::spawn(run_pipeline( + config.clone(), + Arc::new(Mutex::new(env.indexer)), + shutdown_receiver, + work_report_sender, + )); + + let timeout_duration = Duration::from_secs(60 * 10); + match timeout(timeout_duration, work_report_receiver.recv()).await { + Ok(Some(report)) => { + info!("Received work report: {:?}", report); + assert!(report.processed_items > 0, "No items were processed"); + } + Ok(None) => panic!("Work report channel closed unexpectedly"), + Err(_) => panic!("Test timed out after {:?}", timeout_duration), + } + + let mut rpc = pool.get_connection().await.unwrap(); + let mut merkle_tree_account = rpc.get_account(merkle_tree_pubkey).await.unwrap().unwrap(); + + let merkle_tree = BatchedMerkleTreeAccount::address_tree_from_bytes_mut( + merkle_tree_account.data.as_mut_slice(), + ) + .unwrap(); + + assert!( + merkle_tree + .get_metadata() + .queue_metadata + .next_full_batch_index + > 0, + "No batches were processed" + ); + + { + let mut rpc = pool.get_connection().await.unwrap(); + + let mut merkle_tree_account = rpc + .get_account(merkle_tree_keypair.pubkey()) + .await + .unwrap() + .unwrap(); + + let merkle_tree = BatchedMerkleTreeAccount::address_tree_from_bytes_mut( + merkle_tree_account.data.as_mut_slice(), + ) + .unwrap(); + + let final_metadata = merkle_tree.get_metadata(); + + let batch_size = merkle_tree.get_metadata().queue_metadata.batch_size; + let zkp_batch_size = merkle_tree.get_metadata().queue_metadata.zkp_batch_size; + let num_zkp_batches = batch_size / zkp_batch_size; + + let mut completed_items = 0; + for batch_idx in 0..merkle_tree.batches.len() { + let batch = merkle_tree.batches.get(batch_idx).unwrap(); + if batch.get_state() == BatchState::Inserted { + completed_items += batch_size; + } + } + + assert_eq!( + final_metadata.next_index, + initial_next_index + completed_items, + "Merkle tree next_index did not advance by expected amount", + ); + + assert_eq!( + merkle_tree + .get_metadata() + .queue_metadata + .next_full_batch_index, + 1 + ); + + const UPDATES_PER_BATCH: u64 = 1; + + let expected_sequence_number = + initial_sequence_number + (num_zkp_batches * UPDATES_PER_BATCH); + let expected_root_history_len = (expected_sequence_number + 1) as usize; + + assert_eq!(final_metadata.sequence_number, expected_sequence_number); + + assert_eq!(merkle_tree.root_history.len(), expected_root_history_len); + + assert_ne!( + pre_root, + merkle_tree.get_root().unwrap(), + "Root should have changed" + ); + assert!( + merkle_tree.root_history.len() > 1, + "Root history should contain multiple roots" + ); + } + + shutdown_sender + .send(()) + .expect("Failed to send shutdown signal"); + service_handle.await.unwrap().unwrap(); +} diff --git a/forester/tests/batched_ops_test.rs b/forester/tests/batched_state_test.rs similarity index 62% rename from forester/tests/batched_ops_test.rs rename to forester/tests/batched_state_test.rs index c3a93a27a..d19aa0898 100644 --- a/forester/tests/batched_ops_test.rs +++ b/forester/tests/batched_state_test.rs @@ -3,8 +3,8 @@ use std::{sync::Arc, time::Duration}; use forester::run_pipeline; use forester_utils::registry::{register_test_forester, update_test_forester}; use light_batched_merkle_tree::{ - initialize_state_tree::InitStateTreeAccountsInstructionData, - merkle_tree::BatchedMerkleTreeAccount, + batch::BatchState, initialize_state_tree::InitStateTreeAccountsInstructionData, + merkle_tree::BatchedMerkleTreeAccount, queue::BatchedQueueAccount, }; use light_client::{ rpc::{solana_rpc::SolanaRpcUrl, RpcConnection, SolanaRpcConnection}, @@ -16,6 +16,7 @@ use light_test_utils::{ e2e_test_env::{init_program_test_env, E2ETestEnv}, indexer::TestIndexer, }; +use serial_test::serial; use solana_program::native_token::LAMPORTS_PER_SOL; use solana_sdk::{ commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Keypair, signer::Signer, @@ -31,7 +32,8 @@ use crate::test_utils::{forester_config, init}; mod test_utils; #[tokio::test(flavor = "multi_thread", worker_threads = 32)] -async fn test_batched() { +#[serial] +async fn test_state_batched() { let devnet = false; let tree_params = if devnet { InitStateTreeAccountsInstructionData::default() @@ -41,8 +43,9 @@ async fn test_batched() { init(Some(LightValidatorConfig { enable_indexer: false, - wait_time: 15, + wait_time: 10, prover_config: None, + sbf_programs: vec![], })) .await; @@ -132,6 +135,40 @@ async fn test_batched() { .unwrap(); let merkle_tree = BatchedMerkleTreeAccount::state_tree_from_bytes_mut(&mut merkle_tree_account.data).unwrap(); + + let (initial_next_index, initial_sequence_number, pre_root) = { + let mut rpc = pool.get_connection().await.unwrap(); + let mut merkle_tree_account = rpc + .get_account(merkle_tree_keypair.pubkey()) + .await + .unwrap() + .unwrap(); + + let merkle_tree = BatchedMerkleTreeAccount::state_tree_from_bytes_mut( + merkle_tree_account.data.as_mut_slice(), + ) + .unwrap(); + + let initial_next_index = merkle_tree.get_metadata().next_index; + let initial_sequence_number = merkle_tree.get_metadata().sequence_number; + + ( + initial_next_index, + initial_sequence_number, + merkle_tree.get_root().unwrap(), + ) + }; + + info!( + "Initial state: + next_index: {} + sequence_number: {} + batch_size: {}", + initial_next_index, + initial_sequence_number, + merkle_tree.get_metadata().queue_metadata.batch_size + ); + for i in 0..merkle_tree.get_metadata().queue_metadata.batch_size { println!("\ntx {}", i); @@ -165,21 +202,6 @@ async fn test_batched() { println!("num_output_zkp_batches: {}", num_output_zkp_batches); - let pre_root = { - let mut rpc = pool.get_connection().await.unwrap(); - let mut merkle_tree_account = rpc - .get_account(merkle_tree_keypair.pubkey()) - .await - .unwrap() - .unwrap(); - - let merkle_tree = BatchedMerkleTreeAccount::state_tree_from_bytes_mut( - merkle_tree_account.data.as_mut_slice(), - ) - .unwrap(); - merkle_tree.get_root().unwrap() - }; - let (shutdown_sender, shutdown_receiver) = oneshot::channel(); let (work_report_sender, mut work_report_receiver) = mpsc::channel(100); @@ -194,7 +216,25 @@ async fn test_batched() { match timeout(timeout_duration, work_report_receiver.recv()).await { Ok(Some(report)) => { info!("Received work report: {:?}", report); + info!( + "Work report debug: + reported_items: {} + batch_size: {} + complete_batches: {}", + report.processed_items, + tree_params.input_queue_batch_size, + report.processed_items / tree_params.input_queue_batch_size as usize, + ); assert!(report.processed_items > 0, "No items were processed"); + + let batch_size = tree_params.input_queue_batch_size; + assert_eq!( + report.processed_items % batch_size as usize, + 0, + "Processed items {} should be a multiple of batch size {}", + report.processed_items, + batch_size + ); } Ok(None) => panic!("Work report channel closed unexpectedly"), Err(_) => panic!("Test timed out after {:?}", timeout_duration), @@ -221,8 +261,9 @@ async fn test_batched() { "No batches were processed" ); - let post_root = { + { let mut rpc = pool.get_connection().await.unwrap(); + let mut merkle_tree_account = rpc .get_account(merkle_tree_keypair.pubkey()) .await @@ -233,10 +274,88 @@ async fn test_batched() { merkle_tree_account.data.as_mut_slice(), ) .unwrap(); - merkle_tree.get_root().unwrap() - }; - assert_ne!(pre_root, post_root, "Roots are the same"); + let final_metadata = merkle_tree.get_metadata(); + + let mut output_queue_account = rpc + .get_account(nullifier_queue_keypair.pubkey()) + .await + .unwrap() + .unwrap(); + + let output_queue = BatchedQueueAccount::output_queue_from_bytes_mut( + output_queue_account.data.as_mut_slice(), + ) + .unwrap(); + + let batch_size = merkle_tree.get_metadata().queue_metadata.batch_size; + let zkp_batch_size = merkle_tree.get_metadata().queue_metadata.zkp_batch_size; + let num_zkp_batches = batch_size / zkp_batch_size; + + let mut completed_items = 0; + for batch_idx in 0..output_queue.batches.len() { + let batch = output_queue.batches.get(batch_idx).unwrap(); + if batch.get_state() == BatchState::Inserted { + completed_items += batch_size; + } + } + info!( + "initial_next_index: {} + final_next_index: {} + batch_size: {} + zkp_batch_size: {} + num_zkp_batches per full batch: {} + completed_items from batch states: {} + input_queue_metadata: {:?} + output_queue_metadata: {:?}", + initial_next_index, + final_metadata.next_index, + batch_size, + zkp_batch_size, + num_zkp_batches, + completed_items, + final_metadata.queue_metadata, + output_queue.get_metadata().batch_metadata + ); + + assert_eq!( + final_metadata.next_index, + initial_next_index + completed_items, + "Merkle tree next_index did not advance by expected amount", + ); + + assert_eq!( + merkle_tree + .get_metadata() + .queue_metadata + .next_full_batch_index, + 1 + ); + + assert!( + final_metadata.sequence_number > initial_sequence_number, + "Sequence number should have increased" + ); + + // compress_sol_deterministic creates 1 output + // transfer_sol_deterministic invalidates 1 input and creates 1 output + // 1 + 1 + 1 = 3 + const UPDATES_PER_BATCH: u64 = 3; + + let expected_sequence_number = + initial_sequence_number + (num_zkp_batches * UPDATES_PER_BATCH); + let expected_root_history_len = (expected_sequence_number + 1) as usize; + + assert_eq!(final_metadata.sequence_number, expected_sequence_number); + + assert_eq!(merkle_tree.root_history.len(), expected_root_history_len); + + assert_ne!( + pre_root, + merkle_tree.get_root().unwrap(), + "Root should have changed" + ); + } shutdown_sender .send(()) diff --git a/forester/tests/e2e_test.rs b/forester/tests/e2e_test.rs index a8357821b..f30045dd1 100644 --- a/forester/tests/e2e_test.rs +++ b/forester/tests/e2e_test.rs @@ -51,6 +51,7 @@ async fn test_epoch_monitor_with_test_indexer_and_1_forester() { enable_indexer: false, wait_time: 10, prover_config: None, + sbf_programs: vec![], })) .await; @@ -293,6 +294,7 @@ async fn test_epoch_monitor_with_2_foresters() { enable_indexer: false, wait_time: 40, prover_config: None, + sbf_programs: vec![], })) .await; let forester_keypair1 = Keypair::new(); @@ -641,6 +643,7 @@ async fn test_epoch_double_registration() { enable_indexer: false, wait_time: 10, prover_config: None, + sbf_programs: vec![], })) .await; diff --git a/program-tests/registry-test/tests/tests.rs b/program-tests/registry-test/tests/tests.rs index 82a5039c8..6b167afc8 100644 --- a/program-tests/registry-test/tests/tests.rs +++ b/program-tests/registry-test/tests/tests.rs @@ -1892,18 +1892,9 @@ async fn test_batch_address_tree() { ) .await .unwrap(); - let mut account = rpc - .get_account(env.batch_address_merkle_tree) - .await - .unwrap() - .unwrap(); - let zero_copy_account = - BatchedMerkleTreeAccount::address_tree_from_bytes_mut(account.data.as_mut_slice()) - .unwrap(); - test_indexer.finalize_batched_address_tree_update( - env.batch_address_merkle_tree, - &zero_copy_account, - ); + test_indexer + .finalize_batched_address_tree_update(&mut rpc, env.batch_address_merkle_tree) + .await; } } @@ -1921,18 +1912,9 @@ async fn test_batch_address_tree() { .await .unwrap(); } - let mut account = rpc - .get_account(env.batch_address_merkle_tree) - .await - .unwrap() - .unwrap(); - let zero_copy_account = - BatchedMerkleTreeAccount::address_tree_from_bytes_mut(account.data.as_mut_slice()) - .unwrap(); - test_indexer.finalize_batched_address_tree_update( - env.batch_address_merkle_tree, - &zero_copy_account, - ); + test_indexer + .finalize_batched_address_tree_update(&mut rpc, env.batch_address_merkle_tree) + .await; } // Non eligible forester. @@ -1970,15 +1952,9 @@ async fn test_batch_address_tree() { .await .unwrap(); } - let mut account = rpc - .get_account(env.batch_address_merkle_tree) - .await - .unwrap() - .unwrap(); - let zero_copy_account = - BatchedMerkleTreeAccount::address_tree_from_bytes_mut(account.data.as_mut_slice()).unwrap(); test_indexer - .finalize_batched_address_tree_update(env.batch_address_merkle_tree, &zero_copy_account); + .finalize_batched_address_tree_update(&mut rpc, env.batch_address_merkle_tree) + .await; } pub async fn perform_batch_address_merkle_tree_update( diff --git a/program-tests/system-cpi-test/tests/test.rs b/program-tests/system-cpi-test/tests/test.rs index 12e96d727..96d869ddb 100644 --- a/program-tests/system-cpi-test/tests/test.rs +++ b/program-tests/system-cpi-test/tests/test.rs @@ -2,10 +2,7 @@ use account_compression::errors::AccountCompressionErrorCode; use anchor_lang::{AnchorDeserialize, AnchorSerialize}; -use light_batched_merkle_tree::{ - initialize_state_tree::InitStateTreeAccountsInstructionData, - merkle_tree::BatchedMerkleTreeAccount, -}; +use light_batched_merkle_tree::initialize_state_tree::InitStateTreeAccountsInstructionData; use light_compressed_token::{ process_transfer::InputTokenDataWithContext, token_data::AccountState, }; @@ -147,7 +144,7 @@ async fn test_read_only_accounts() { // fails because of invalid leaves hashchain in some iteration let instruction_data = create_batch_update_address_tree_instruction_data_with_proof( &mut e2e_env.rpc, - &mut e2e_env.indexer, + &e2e_env.indexer, env.batch_address_merkle_tree, ) .await @@ -169,20 +166,13 @@ async fn test_read_only_accounts() { ) .await .unwrap(); - let mut account = e2e_env - .rpc - .get_account(env.batch_address_merkle_tree) - .await - .unwrap() - .unwrap() - .data; - let onchain_account = - BatchedMerkleTreeAccount::address_tree_from_bytes_mut(account.as_mut_slice()) - .unwrap(); - e2e_env.indexer.finalize_batched_address_tree_update( - env.batch_address_merkle_tree, - &onchain_account, - ); + e2e_env + .indexer + .finalize_batched_address_tree_update( + &mut e2e_env.rpc, + env.batch_address_merkle_tree, + ) + .await; } for i in 0..params.output_queue_zkp_batch_size { @@ -705,12 +695,7 @@ async fn only_test_create_pda() { CreatePdaMode::InvalidReadOnlyAddress, ) .await; - assert_rpc_error( - result, - 0, - light_verifier::VerifierError::ProofVerificationFailed.into(), - ) - .unwrap(); + assert_rpc_error(result, 0, VerifierError::ProofVerificationFailed.into()).unwrap(); let result = perform_create_pda_with_event( &mut test_indexer, @@ -746,12 +731,7 @@ async fn only_test_create_pda() { CreatePdaMode::InvalidReadOnlyRootIndex, ) .await; - assert_rpc_error( - result, - 0, - light_verifier::VerifierError::ProofVerificationFailed.into(), - ) - .unwrap(); + assert_rpc_error(result, 0, VerifierError::ProofVerificationFailed.into()).unwrap(); let result = perform_create_pda_with_event( &mut test_indexer, @@ -1315,6 +1295,7 @@ async fn test_create_pda_in_program_owned_merkle_trees() { &program_owned_address_merkle_tree_keypair, &program_owned_address_queue_keypair, Some(light_compressed_token::ID), + 1, ) .await; let env_with_program_owned_address_merkle_tree = EnvAccounts { @@ -1425,6 +1406,7 @@ async fn test_create_pda_in_program_owned_merkle_trees() { &program_owned_address_merkle_tree_keypair, &program_owned_address_queue_keypair, Some(ID), + 1, ) .await; let env_with_program_owned_state_merkle_tree = EnvAccounts { diff --git a/program-tests/utils/src/create_address_test_program_sdk.rs b/program-tests/utils/src/create_address_test_program_sdk.rs index 55de65797..b9b962a51 100644 --- a/program-tests/utils/src/create_address_test_program_sdk.rs +++ b/program-tests/utils/src/create_address_test_program_sdk.rs @@ -132,7 +132,11 @@ pub async fn perform_create_pda_with_event( registered_program_pda: &env.registered_program_pda, }; let instruction = create_pda_instruction(create_ix_inputs); - let pre_test_indexer_queue_len = test_indexer.address_merkle_trees[1].queue_elements.len(); + let pre_test_indexer_queue_len = test_indexer + .get_address_merkle_tree(env.batch_address_merkle_tree) + .unwrap() + .queue_elements + .len(); let event = rpc .create_and_send_transaction_with_event(&[instruction], &payer.pubkey(), &[payer], None) .await? @@ -140,7 +144,11 @@ pub async fn perform_create_pda_with_event( let slot: u64 = rpc.get_slot().await.unwrap(); test_indexer.add_compressed_accounts_with_token_data(slot, &event.0); assert_eq!( - test_indexer.address_merkle_trees[1].queue_elements.len(), + test_indexer + .get_address_merkle_tree(env.batch_address_merkle_tree) + .unwrap() + .queue_elements + .len(), pre_test_indexer_queue_len + 1 ); Ok(()) diff --git a/program-tests/utils/src/indexer/test_indexer.rs b/program-tests/utils/src/indexer/test_indexer.rs index 1f9b45d31..f057264ae 100644 --- a/program-tests/utils/src/indexer/test_indexer.rs +++ b/program-tests/utils/src/indexer/test_indexer.rs @@ -24,6 +24,7 @@ use forester_utils::{ use light_batched_merkle_tree::{ batch::BatchState, constants::{DEFAULT_BATCH_ADDRESS_TREE_HEIGHT, DEFAULT_BATCH_STATE_TREE_HEIGHT}, + initialize_address_tree::InitAddressTreeAccountsInstructionData, initialize_state_tree::InitStateTreeAccountsInstructionData, merkle_tree::BatchedMerkleTreeAccount, queue::{BatchedQueueAccount, BatchedQueueMetadata}, @@ -41,7 +42,7 @@ use light_indexed_merkle_tree::{array::IndexedArray, reference::IndexedMerkleTre use light_macros::pubkey; use light_merkle_tree_reference::MerkleTree; use light_program_test::{ - test_batch_forester::create_batched_state_merkle_tree, + test_batch_forester::{create_batch_address_merkle_tree, create_batched_state_merkle_tree}, test_env::{ create_address_merkle_tree_and_queue_account, create_state_merkle_tree_and_queue_account, EnvAccounts, BATCHED_OUTPUT_QUEUE_TEST_KEYPAIR, @@ -142,6 +143,60 @@ impl Indexer for TestIndexer { Err(IndexerError::Custom("Merkle tree not found".to_string())) } + fn get_proof_by_index(&mut self, merkle_tree_pubkey: Pubkey, index: u64) -> ProofOfLeaf { + let mut bundle = self + .state_merkle_trees + .iter_mut() + .find(|x| x.accounts.merkle_tree == merkle_tree_pubkey) + .unwrap(); + + while bundle.merkle_tree.leaves().len() <= index as usize { + bundle.merkle_tree.append(&[0u8; 32]).unwrap(); + } + + let leaf = match bundle.merkle_tree.get_leaf(index as usize) { + Ok(leaf) => leaf, + Err(_) => { + bundle.merkle_tree.append(&[0u8; 32]).unwrap(); + bundle.merkle_tree.get_leaf(index as usize).unwrap() + } + }; + + let proof = bundle + .merkle_tree + .get_proof_of_leaf(index as usize, true) + .unwrap() + .to_vec(); + + ProofOfLeaf { leaf, proof } + } + + fn get_proofs_by_indices( + &mut self, + merkle_tree_pubkey: Pubkey, + indices: &[u64], + ) -> Vec { + indices + .iter() + .map(|&index| self.get_proof_by_index(merkle_tree_pubkey, index)) + .collect() + } + + /// leaf index, leaf, tx hash + fn get_leaf_indices_tx_hashes( + &mut self, + merkle_tree_pubkey: Pubkey, + zkp_batch_size: usize, + ) -> Vec<(u32, [u8; 32], [u8; 32])> { + let mut state_merkle_tree_bundle = self + .state_merkle_trees + .iter_mut() + .find(|x| x.accounts.merkle_tree == merkle_tree_pubkey) + .unwrap(); + + state_merkle_tree_bundle.input_leaf_indices[..zkp_batch_size].to_vec() + } + async fn get_subtrees( &self, merkle_tree_pubkey: [u8; 32], @@ -346,121 +401,6 @@ impl Indexer for TestIndexer { &self.group_pda } - /// leaf index, leaf, tx hash - fn get_leaf_indices_tx_hashes( - &mut self, - merkle_tree_pubkey: Pubkey, - zkp_batch_size: usize, - ) -> Vec<(u32, [u8; 32], [u8; 32])> { - let mut state_merkle_tree_bundle = self - .state_merkle_trees - .iter_mut() - .find(|x| x.accounts.merkle_tree == merkle_tree_pubkey) - .unwrap(); - - state_merkle_tree_bundle.input_leaf_indices[..zkp_batch_size].to_vec() - } - - async fn create_proof_for_compressed_accounts2( - &mut self, - compressed_accounts: Option>, - state_merkle_tree_pubkeys: Option>, - new_addresses: Option<&[[u8; 32]]>, - address_merkle_tree_pubkeys: Option>, - rpc: &mut R, - ) -> BatchedTreeProofRpcResult { - let mut indices_to_remove = Vec::new(); - - // for all accounts in batched trees, check whether values are in tree or queue - let (compressed_accounts, state_merkle_tree_pubkeys) = - if let Some((compressed_accounts, state_merkle_tree_pubkeys)) = - compressed_accounts.zip(state_merkle_tree_pubkeys) - { - for (i, (compressed_account, state_merkle_tree_pubkey)) in compressed_accounts - .iter() - .zip(state_merkle_tree_pubkeys.iter()) - .enumerate() - { - let accounts = self.state_merkle_trees.iter().find(|x| { - x.accounts.merkle_tree == *state_merkle_tree_pubkey && x.version == 2 - }); - if let Some(accounts) = accounts { - let output_queue_pubkey = accounts.accounts.nullifier_queue; - let mut queue = - AccountZeroCopy::::new(rpc, output_queue_pubkey) - .await; - let queue_zero_copy = BatchedQueueAccount::output_queue_from_bytes_mut( - queue.account.data.as_mut_slice(), - ) - .unwrap(); - for value_array in queue_zero_copy.value_vecs.iter() { - let index = value_array.iter().position(|x| *x == *compressed_account); - if index.is_some() { - indices_to_remove.push(i); - } - } - } - } - let compress_accounts = compressed_accounts - .iter() - .enumerate() - .filter(|(i, _)| !indices_to_remove.contains(i)) - .map(|(_, x)| *x) - .collect::>(); - let state_merkle_tree_pubkeys = state_merkle_tree_pubkeys - .iter() - .enumerate() - .filter(|(i, _)| !indices_to_remove.contains(i)) - .map(|(_, x)| *x) - .collect::>(); - if compress_accounts.is_empty() { - (None, None) - } else { - (Some(compress_accounts), Some(state_merkle_tree_pubkeys)) - } - } else { - (None, None) - }; - let rpc_result = if (compressed_accounts.is_some() - && !compressed_accounts.as_ref().unwrap().is_empty()) - || address_merkle_tree_pubkeys.is_some() - { - Some( - self.create_proof_for_compressed_accounts( - compressed_accounts, - state_merkle_tree_pubkeys, - new_addresses, - address_merkle_tree_pubkeys, - rpc, - ) - .await, - ) - } else { - None - }; - let address_root_indices = if let Some(rpc_result) = rpc_result.as_ref() { - rpc_result.address_root_indices.clone() - } else { - Vec::new() - }; - let root_indices = { - let mut root_indices = if let Some(rpc_result) = rpc_result.as_ref() { - rpc_result.root_indices.clone() - } else { - Vec::new() - }; - for index in indices_to_remove { - root_indices.insert(index, None); - } - root_indices - }; - BatchedTreeProofRpcResult { - proof: rpc_result.map(|x| x.proof), - root_indices, - address_root_indices, - } - } - async fn create_proof_for_compressed_accounts( &mut self, compressed_accounts: Option>, @@ -642,6 +582,106 @@ impl Indexer for TestIndexer { panic!("Failed to get proof from server"); } + async fn create_proof_for_compressed_accounts2( + &mut self, + compressed_accounts: Option>, + state_merkle_tree_pubkeys: Option>, + new_addresses: Option<&[[u8; 32]]>, + address_merkle_tree_pubkeys: Option>, + rpc: &mut R, + ) -> BatchedTreeProofRpcResult { + let mut indices_to_remove = Vec::new(); + + // for all accounts in batched trees, check whether values are in tree or queue + let (compressed_accounts, state_merkle_tree_pubkeys) = + if let Some((compressed_accounts, state_merkle_tree_pubkeys)) = + compressed_accounts.zip(state_merkle_tree_pubkeys) + { + for (i, (compressed_account, state_merkle_tree_pubkey)) in compressed_accounts + .iter() + .zip(state_merkle_tree_pubkeys.iter()) + .enumerate() + { + let accounts = self.state_merkle_trees.iter().find(|x| { + x.accounts.merkle_tree == *state_merkle_tree_pubkey && x.version == 2 + }); + if let Some(accounts) = accounts { + let output_queue_pubkey = accounts.accounts.nullifier_queue; + let mut queue = + AccountZeroCopy::::new(rpc, output_queue_pubkey) + .await; + let queue_zero_copy = BatchedQueueAccount::output_queue_from_bytes_mut( + queue.account.data.as_mut_slice(), + ) + .unwrap(); + for value_array in queue_zero_copy.value_vecs.iter() { + let index = value_array.iter().position(|x| *x == *compressed_account); + if index.is_some() { + indices_to_remove.push(i); + } + } + } + } + let compress_accounts = compressed_accounts + .iter() + .enumerate() + .filter(|(i, _)| !indices_to_remove.contains(i)) + .map(|(_, x)| *x) + .collect::>(); + let state_merkle_tree_pubkeys = state_merkle_tree_pubkeys + .iter() + .enumerate() + .filter(|(i, _)| !indices_to_remove.contains(i)) + .map(|(_, x)| *x) + .collect::>(); + if compress_accounts.is_empty() { + (None, None) + } else { + (Some(compress_accounts), Some(state_merkle_tree_pubkeys)) + } + } else { + (None, None) + }; + let rpc_result = if (compressed_accounts.is_some() + && !compressed_accounts.as_ref().unwrap().is_empty()) + || address_merkle_tree_pubkeys.is_some() + { + Some( + self.create_proof_for_compressed_accounts( + compressed_accounts, + state_merkle_tree_pubkeys, + new_addresses, + address_merkle_tree_pubkeys, + rpc, + ) + .await, + ) + } else { + None + }; + let address_root_indices = if let Some(rpc_result) = rpc_result.as_ref() { + rpc_result.address_root_indices.clone() + } else { + Vec::new() + }; + let root_indices = { + let mut root_indices = if let Some(rpc_result) = rpc_result.as_ref() { + rpc_result.root_indices.clone() + } else { + Vec::new() + }; + for index in indices_to_remove { + root_indices.insert(index, None); + } + root_indices + }; + BatchedTreeProofRpcResult { + proof: rpc_result.map(|x| x.proof), + root_indices, + address_root_indices, + } + } + fn add_address_merkle_tree_accounts( &mut self, merkle_tree_keypair: &Keypair, @@ -757,9 +797,11 @@ impl Indexer for TestIndexer { state_merkle_tree_bundle.merkle_tree.root(), "update indexer after append root invalid" ); + + let num_inserted_zkps = num_inserted_zkps + 1; // check can we get rid of this and use the data from the merkle tree if num_inserted_zkps == max_num_zkp_updates { - for _ in 0..zkp_batch_size { + for _ in 0..zkp_batch_size * max_num_zkp_updates { state_merkle_tree_bundle.output_queue_elements.remove(0); } } @@ -804,43 +846,42 @@ impl Indexer for TestIndexer { } } - fn get_proofs_by_indices( + async fn finalize_batched_address_tree_update( &mut self, + rpc: &mut R, merkle_tree_pubkey: Pubkey, - indices: &[u64], - ) -> Vec { - indices - .iter() - .map(|&index| self.get_proof_by_index(merkle_tree_pubkey, index)) - .collect() - } - - fn get_proof_by_index(&mut self, merkle_tree_pubkey: Pubkey, index: u64) -> ProofOfLeaf { - let mut bundle = self - .state_merkle_trees + ) { + let mut account = rpc.get_account(merkle_tree_pubkey).await.unwrap().unwrap(); + let onchain_account = + BatchedMerkleTreeAccount::address_tree_from_bytes_mut(account.data.as_mut_slice()) + .unwrap(); + let address_tree = self + .address_merkle_trees .iter_mut() .find(|x| x.accounts.merkle_tree == merkle_tree_pubkey) .unwrap(); + let address_tree_index = address_tree.merkle_tree.merkle_tree.rightmost_index; + let onchain_next_index = onchain_account.get_metadata().next_index; + let diff_onchain_indexer = onchain_next_index - address_tree_index as u64; + let addresses = address_tree.queue_elements[0..diff_onchain_indexer as usize].to_vec(); - while bundle.merkle_tree.leaves().len() <= index as usize { - bundle.merkle_tree.append(&[0u8; 32]).unwrap(); + for _ in 0..diff_onchain_indexer { + address_tree.queue_elements.remove(0); + } + for new_element_value in &addresses { + address_tree + .merkle_tree + .append( + &BigUint::from_bytes_be(new_element_value), + &mut address_tree.indexed_array, + ) + .unwrap(); } - let leaf = match bundle.merkle_tree.get_leaf(index as usize) { - Ok(leaf) => leaf, - Err(_) => { - bundle.merkle_tree.append(&[0u8; 32]).unwrap(); - bundle.merkle_tree.get_leaf(index as usize).unwrap() - } - }; - - let proof = bundle - .merkle_tree - .get_proof_of_leaf(index as usize, true) - .unwrap() - .to_vec(); - - ProofOfLeaf { leaf, proof } + let onchain_root = onchain_account.root_history.last().unwrap(); + let new_root = address_tree.merkle_tree.root(); + assert_eq!(*onchain_root, new_root); + println!("finalized batched address tree update"); } } @@ -1051,7 +1092,7 @@ impl TestIndexer { } } - pub async fn add_address_merkle_tree( + async fn add_address_merkle_tree_v1( &mut self, rpc: &mut R, merkle_tree_keypair: &Keypair, @@ -1075,6 +1116,67 @@ impl TestIndexer { self.add_address_merkle_tree_accounts(merkle_tree_keypair, queue_keypair, owning_program_id) } + async fn add_address_merkle_tree_v2( + &mut self, + rpc: &mut R, + merkle_tree_keypair: &Keypair, + queue_keypair: &Keypair, + owning_program_id: Option, + ) -> AddressMerkleTreeAccounts { + info!( + "Adding address merkle tree accounts v2 {:?}", + merkle_tree_keypair.pubkey() + ); + + let params = InitAddressTreeAccountsInstructionData::test_default(); + + info!( + "Creating batched address merkle tree {:?}", + merkle_tree_keypair.pubkey() + ); + create_batch_address_merkle_tree(rpc, &self.payer, merkle_tree_keypair, params) + .await + .unwrap(); + info!( + "Batched address merkle tree created {:?}", + merkle_tree_keypair.pubkey() + ); + + self.add_address_merkle_tree_accounts(merkle_tree_keypair, queue_keypair, owning_program_id) + } + + pub async fn add_address_merkle_tree( + &mut self, + rpc: &mut R, + merkle_tree_keypair: &Keypair, + queue_keypair: &Keypair, + owning_program_id: Option, + version: u64, + ) -> AddressMerkleTreeAccounts { + if version == 1 { + self.add_address_merkle_tree_v1( + rpc, + merkle_tree_keypair, + queue_keypair, + owning_program_id, + ) + .await + } else if version == 2 { + self.add_address_merkle_tree_v2( + rpc, + merkle_tree_keypair, + queue_keypair, + owning_program_id, + ) + .await + } else { + panic!( + "add_address_merkle_tree: Version not supported, {}. Versions: 1, 2", + version + ) + } + } + #[allow(clippy::too_many_arguments)] pub async fn add_state_merkle_tree( &mut self, @@ -1666,37 +1768,12 @@ impl TestIndexer { } } - pub fn finalize_batched_address_tree_update( - &mut self, + pub(crate) fn get_address_merkle_tree( + &self, merkle_tree_pubkey: Pubkey, - onchain_account: &BatchedMerkleTreeAccount, - ) { - let address_tree = self - .address_merkle_trees - .iter_mut() + ) -> Option<&AddressMerkleTreeBundle> { + self.address_merkle_trees + .iter() .find(|x| x.accounts.merkle_tree == merkle_tree_pubkey) - .unwrap(); - let address_tree_index = address_tree.merkle_tree.merkle_tree.rightmost_index; - let onchain_next_index = onchain_account.get_metadata().next_index; - let diff_onchain_indexer = onchain_next_index - address_tree_index as u64; - let addresses = address_tree.queue_elements[0..diff_onchain_indexer as usize].to_vec(); - - for _ in 0..diff_onchain_indexer { - address_tree.queue_elements.remove(0); - } - for new_element_value in &addresses { - address_tree - .merkle_tree - .append( - &BigUint::from_bytes_be(new_element_value), - &mut address_tree.indexed_array, - ) - .unwrap(); - } - - let onchain_root = onchain_account.root_history.last().unwrap(); - let new_root = address_tree.merkle_tree.root(); - assert_eq!(*onchain_root, new_root); - println!("finalized batched address tree update"); } } diff --git a/prover/client/src/gnark/helpers.rs b/prover/client/src/gnark/helpers.rs index dd6516b7c..3658c082b 100644 --- a/prover/client/src/gnark/helpers.rs +++ b/prover/client/src/gnark/helpers.rs @@ -272,6 +272,7 @@ pub struct LightValidatorConfig { pub enable_indexer: bool, pub prover_config: Option, pub wait_time: u64, + pub sbf_programs: Vec<(String, String)>, } impl Default for LightValidatorConfig { @@ -280,6 +281,7 @@ impl Default for LightValidatorConfig { enable_indexer: false, prover_config: None, wait_time: 35, + sbf_programs: vec![], } } } @@ -291,6 +293,14 @@ pub async fn spawn_validator(config: LightValidatorConfig) { if !config.enable_indexer { path.push_str(" --skip-indexer"); } + + for sbf_program in config.sbf_programs.iter() { + path.push_str(&format!( + " --sbf-program {} {}", + sbf_program.0, sbf_program.1 + )); + } + if let Some(prover_config) = config.prover_config { prover_config.circuits.iter().for_each(|circuit| { path.push_str(&format!(" --circuit {}", circuit)); diff --git a/sdk-libs/client/src/rpc/solana_rpc.rs b/sdk-libs/client/src/rpc/solana_rpc.rs index ffb2c0a71..ce8e0ed38 100644 --- a/sdk-libs/client/src/rpc/solana_rpc.rs +++ b/sdk-libs/client/src/rpc/solana_rpc.rs @@ -62,7 +62,7 @@ pub struct RetryConfig { impl Default for RetryConfig { fn default() -> Self { RetryConfig { - max_retries: 20, + max_retries: 30, retry_delay: Duration::from_secs(1), timeout: Duration::from_secs(60), } diff --git a/sdk-libs/client/tests/rpc_client.rs b/sdk-libs/client/tests/rpc_client.rs index 2b1c659d8..f4982341f 100644 --- a/sdk-libs/client/tests/rpc_client.rs +++ b/sdk-libs/client/tests/rpc_client.rs @@ -42,6 +42,7 @@ async fn test_all_endpoints() { circuits: vec![ProofType::Combined], }), wait_time: 20, + sbf_programs: vec![], }; spawn_validator(config).await;