Skip to content

Commit

Permalink
Refactor and reformat Forester codebase
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeytimoshin committed Jun 25, 2024
1 parent 143ff82 commit 0857c37
Show file tree
Hide file tree
Showing 19 changed files with 178 additions and 152 deletions.
2 changes: 1 addition & 1 deletion forester/src/external_services_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ impl ExternalServicesConfig {
prover_url: "https://zk-testnet.helius.dev:3001".to_string(),
}
}
}
}
2 changes: 1 addition & 1 deletion forester/src/indexer/photon_indexer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use log::info;
use crate::utils::decode_hash;
use account_compression::initialize_address_merkle_tree::Pubkey;
use light_test_utils::indexer::{
Indexer, IndexerError, MerkleProof, MerkleProofWithAddressContext, NewAddressProofWithContext,
};
use log::info;
use solana_sdk::bs58;

use photon_api::apis::configuration::Configuration;
Expand Down
2 changes: 1 addition & 1 deletion forester/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub mod cli;
pub mod external_services_config;
pub mod errors;
pub mod external_services_config;
pub mod indexer;
pub mod nqmt;
pub mod nullifier;
Expand Down
77 changes: 44 additions & 33 deletions forester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,23 @@ use anchor_lang::prelude::Pubkey;
use clap::Parser;
use config::Config;
use env_logger::Env;
use forester::nqmt::reindex_and_store;
use log::info;
use solana_sdk::signature::{Keypair, Signer};
use std::str::FromStr;
use serde_json::Result;
use forester::cli::{Cli, Commands};
use forester::external_services_config::ExternalServicesConfig;
use forester::indexer::PhotonIndexer;
use forester::nullifier::{Config as ForesterConfig, empty_address_queue};
use forester::nqmt::reindex_and_store;
use forester::nullifier::{empty_address_queue, Config as ForesterConfig};
use forester::nullifier::{nullify, subscribe_nullify};
use forester::settings::SettingsKey;
use light_test_utils::rpc::rpc_connection::RpcConnection;
use light_test_utils::rpc::SolanaRpcConnection;
use log::info;
use serde_json::Result;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_sdk::signature::{Keypair, Signer};
use std::env;
use std::str::FromStr;
use std::sync::Arc;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use tokio::try_join;
use light_test_utils::rpc::rpc_connection::RpcConnection;

fn locate_config_file() -> String {
let file_name = "forester.toml";
Expand Down Expand Up @@ -61,7 +61,9 @@ fn init_config() -> ForesterConfig {
let registry_pubkey = settings
.get_string(&SettingsKey::RegistryPubkey.to_string())
.unwrap();
let payer = settings.get_string(&SettingsKey::Payer.to_string()).unwrap();
let payer = settings
.get_string(&SettingsKey::Payer.to_string())
.unwrap();
let payer: Vec<u8> = convert(&payer).unwrap();

ForesterConfig {
Expand Down Expand Up @@ -101,18 +103,13 @@ async fn main() {
}
Some(Commands::Nullify) => {
let config_clone = config.clone();
let task_clear_addresses = tokio::spawn({
async move { nullify_addresses(config_clone).await }
});
let task_clear_addresses =
tokio::spawn(async move { nullify_addresses(config_clone).await });

let config_clone = config.clone();
let task_clear_state = tokio::spawn({
async move { nullify_state(config_clone).await }
});

try_join!(task_clear_addresses, task_clear_state)
.expect("Failed to join tasks");
let task_clear_state = tokio::spawn(async move { nullify_state(config_clone).await });

try_join!(task_clear_addresses, task_clear_state).expect("Failed to join tasks");
}
Some(Commands::Index) => {
info!("Reindex merkle tree & nullifier queue accounts");
Expand All @@ -129,40 +126,54 @@ async fn main() {
}

async fn nullify_state(config: Arc<ForesterConfig>) {
info!("Run state tree nullifier. Queue: {}. Merkle tree: {}", config.nullifier_queue_pubkey, config.state_merkle_tree_pubkey);
info!(
"Run state tree nullifier. Queue: {}. Merkle tree: {}",
config.nullifier_queue_pubkey, config.state_merkle_tree_pubkey
);
let rpc = init_rpc(&config).await;
let indexer = Arc::new(tokio::sync::Mutex::new(PhotonIndexer::new(config.external_services.rpc_url.to_string())));
let indexer = Arc::new(tokio::sync::Mutex::new(PhotonIndexer::new(
config.external_services.rpc_url.to_string(),
)));
let rpc = Arc::new(tokio::sync::Mutex::new(rpc));
let config = config.clone();
let result = nullify(indexer, rpc, config).await;
info!("State nullifier result: {:?}", result);
}

async fn nullify_addresses(config: Arc<ForesterConfig>) {
info!("Run address tree nullifier. Queue: {}. Merkle tree: {}", config.address_merkle_tree_queue_pubkey, config.address_merkle_tree_pubkey);

info!(
"Run address tree nullifier. Queue: {}. Merkle tree: {}",
config.address_merkle_tree_queue_pubkey, config.address_merkle_tree_pubkey
);

let result = tokio::task::spawn_blocking(move || {
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(async {
let indexer = Arc::new(tokio::sync::Mutex::new(PhotonIndexer::new(config.external_services.rpc_url.to_string())));
let indexer = Arc::new(tokio::sync::Mutex::new(PhotonIndexer::new(
config.external_services.rpc_url.to_string(),
)));
let rpc = init_rpc(&config).await;
let rpc = Arc::new(tokio::sync::Mutex::new(rpc));
empty_address_queue(indexer, rpc, config).await
})
}).await.unwrap();
})
.await
.unwrap();

info!("Address nullifier result: {:?}", result);
}

async fn init_rpc(config: &Arc<ForesterConfig>) -> SolanaRpcConnection {
let mut rpc = SolanaRpcConnection::new(config.external_services.rpc_url.clone(), Some(CommitmentConfig {
commitment: CommitmentLevel::Confirmed,
}));

rpc.airdrop_lamports(
&config.payer_keypair.pubkey(),
10_000_000_000,
).await.unwrap();
let mut rpc = SolanaRpcConnection::new(
config.external_services.rpc_url.clone(),
Some(CommitmentConfig {
commitment: CommitmentLevel::Confirmed,
}),
);

rpc.airdrop_lamports(&config.payer_keypair.pubkey(), 10_000_000_000)
.await
.unwrap();

rpc
}
}
5 changes: 4 additions & 1 deletion forester/src/nqmt/reindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use std::collections::LinkedList;
const INVALID_MT_PUBKEY: &str = "11111111111111111111111111111111";

pub fn reindex_and_store(config: &Config) -> Result<(), ForesterError> {
match reindex(&config.state_merkle_tree_pubkey, &config.external_services.rpc_url) {
match reindex(
&config.state_merkle_tree_pubkey,
&config.external_services.rpc_url,
) {
Ok(list) => {
info!("Indexed {} merkle trees", list.len());
serialize_indexed_mt(list)?;
Expand Down
101 changes: 51 additions & 50 deletions forester/src/nullifier/address_queue_nullifier.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::errors::ForesterError;
use crate::nullifier::Config;
use account_compression::initialize_address_merkle_tree::Pubkey;
use account_compression::{AddressMerkleTreeAccount, QueueAccount};
use light_hash_set::HashSet;
Expand All @@ -9,15 +11,13 @@ use light_test_utils::get_indexed_merkle_tree;
use light_test_utils::indexer::Indexer;
use light_test_utils::rpc::errors::RpcError;
use light_test_utils::rpc::rpc_connection::RpcConnection;
use log::{info};
use log::info;
use solana_sdk::signature::{Keypair, Signer};
use solana_sdk::transaction::Transaction;
use std::mem;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use crate::errors::ForesterError;
use crate::nullifier::Config;

pub async fn empty_address_queue<I: Indexer, R: RpcConnection>(
indexer: Arc<Mutex<I>>,
Expand All @@ -40,16 +40,17 @@ pub async fn empty_address_queue<I: Indexer, R: RpcConnection>(

async fn fetch_address_queue_data<R: RpcConnection>(
config: &Arc<Config>,
rpc: &Arc<Mutex<R>>
rpc: &Arc<Mutex<R>>,
) -> Result<Vec<crate::nullifier::queue_data::Account>, ForesterError> {
// let address_merkle_tree_pubkey = config.address_merkle_tree_pubkey;
let address_queue_pubkey = config.address_merkle_tree_queue_pubkey;

let mut account = (*rpc.lock().await).get_account(address_queue_pubkey).await?.unwrap();
let mut account = (*rpc.lock().await)
.get_account(address_queue_pubkey)
.await?
.unwrap();
let address_queue: HashSet = unsafe {
HashSet::from_bytes_copy(
&mut account.data[8 + mem::size_of::<QueueAccount>()..],
)?
HashSet::from_bytes_copy(&mut account.data[8 + mem::size_of::<QueueAccount>()..])?
};
let mut address_queue_vec = Vec::new();
let address = address_queue.first_no_seq().unwrap();
Expand All @@ -71,7 +72,7 @@ async fn process_address_queue<I: Indexer, R: RpcConnection>(
queue_data: Vec<crate::nullifier::queue_data::Account>,
indexer: &Arc<Mutex<I>>,
rpc: &Arc<Mutex<R>>,
config: &Arc<Config>
config: &Arc<Config>,
) {
for account in queue_data {
let address_merkle_tree_pubkey = config.address_merkle_tree_pubkey;
Expand All @@ -81,50 +82,50 @@ async fn process_address_queue<I: Indexer, R: RpcConnection>(
let address_hashset_index = account.index;
let mut update_errors: Vec<RpcError> = Vec::new();
let rpc_clone = Arc::clone(rpc);
let rpc_clone = &mut *rpc_clone.lock().await;
let rpc_clone = &mut *rpc_clone.lock().await;

let merkle_tree = get_indexed_merkle_tree::<AddressMerkleTreeAccount, R, Poseidon, usize, 26>(
rpc_clone,
address_merkle_tree_pubkey,
).await;
let merkle_tree =
get_indexed_merkle_tree::<AddressMerkleTreeAccount, R, Poseidon, usize, 26>(
rpc_clone,
address_merkle_tree_pubkey,
)
.await;
info!("address merkle_tree root: {:?}", merkle_tree.root());
let proof = (*indexer.lock().await)
.get_address_tree_proof(address_merkle_tree_pubkey.to_bytes(), address)
.await
.unwrap();
info!("proof: {:?}", proof);
info!("updating merkle tree...");


let update_successful = match update_merkle_tree(
rpc,
&config.payer_keypair,
address_queue_pubkey,
address_merkle_tree_pubkey,
address_hashset_index as u16,
proof.low_address_index,
proof.low_address_value,
proof.low_address_next_index,
proof.low_address_next_value,
proof.low_address_proof,
)
.await
{
Ok(event) => {
info!("event: {:?}", event);
true
}
Err(e) => {
update_errors.push(e);
break;
}
};
info!("update_successful: {:?}", update_successful);
if update_successful {
(*indexer.lock().await).address_tree_updated(address_merkle_tree_pubkey.to_bytes(), proof)
}

let proof = (*indexer.lock().await)
.get_address_tree_proof(address_merkle_tree_pubkey.to_bytes(), address)
.await
.unwrap();
info!("proof: {:?}", proof);
info!("updating merkle tree...");

let update_successful = match update_merkle_tree(
rpc,
&config.payer_keypair,
address_queue_pubkey,
address_merkle_tree_pubkey,
address_hashset_index as u16,
proof.low_address_index,
proof.low_address_value,
proof.low_address_next_index,
proof.low_address_next_value,
proof.low_address_proof,
)
.await
{
Ok(event) => {
info!("event: {:?}", event);
true
}
Err(e) => {
update_errors.push(e);
break;
}
};
info!("update_successful: {:?}", update_successful);
if update_successful {
(*indexer.lock().await)
.address_tree_updated(address_merkle_tree_pubkey.to_bytes(), proof)
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions forester/src/nullifier/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::external_services_config::ExternalServicesConfig;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Keypair;
use crate::external_services_config::ExternalServicesConfig;

pub struct Config {
pub external_services: ExternalServicesConfig,
Expand Down Expand Up @@ -29,7 +29,7 @@ impl Clone for Config {
concurrency_limit: self.concurrency_limit,
batch_size: self.batch_size,
max_retries: self.max_retries,
max_concurrent_batches: self.max_concurrent_batches
max_concurrent_batches: self.max_concurrent_batches,
}
}
}
Loading

0 comments on commit 0857c37

Please sign in to comment.