From 3ff8bf8927182923f01b239d2ced84280f48bb3a Mon Sep 17 00:00:00 2001 From: Samuel Rufinatscha Date: Thu, 28 Nov 2024 14:33:46 +0100 Subject: [PATCH] [iota-faucet] Enable the use of batched mode on faucet (#4110) * fix: Pass at least one coin to the faucet batch queue * fix: Clippy * fix: Rework split point * fix: Enable `batch_enabled` flag for existing batch tests * fix: Add test_faucet_batch_concurrent_requests test * fix: Fix comment * fix: Remove wrong comment --- .../iota-faucet/src/faucet/simple_faucet.rs | 36 ++- crates/iota/tests/cli_tests.rs | 270 +++++++++++++++++- 2 files changed, 293 insertions(+), 13 deletions(-) diff --git a/crates/iota-faucet/src/faucet/simple_faucet.rs b/crates/iota-faucet/src/faucet/simple_faucet.rs index 1a168c35c48..eafb50d4a4a 100644 --- a/crates/iota-faucet/src/faucet/simple_faucet.rs +++ b/crates/iota-faucet/src/faucet/simple_faucet.rs @@ -119,6 +119,11 @@ impl SimpleFaucet { .map(|q| GasCoin::try_from(&q.1).unwrap()) .filter(|coin| coin.0.balance.value() >= (config.amount * config.num_coins as u64)) .collect::>(); + + if coins.is_empty() { + return Err(FaucetError::NoGasCoinAvailable); + } + let metrics = FaucetMetrics::new(prometheus_registry); let wal = WriteAheadLog::open(wal_path); @@ -131,16 +136,19 @@ impl SimpleFaucet { config.max_request_queue_length as usize, ); - // This is to handle the case where there is only 1 coin, we want it to go to - // the normal queue - let split_point = if coins.len() > 10 { - coins.len() / 2 + // Split the coins eventually into two pools: one for the gas pool and one for + // the batch pool. The batch pool will only be populated if the batch feature is + // enabled. + let split_point = if config.batch_enabled { + if coins.len() > 1 { + 1 // At least one coin goes to the gas pool the rest to the batch pool + } else { + 0 // Only one coin available, all coins go to the batch pool. This is safe as we have already checked above that `coins` is not empty. + } } else { - coins.len() + coins.len() // All coins go to the gas pool if batch is disabled }; - // Put half of the coins in the old faucet impl queue, and put half in the other - // queue for batch coins. In the test cases we create an account with 5 - // coins so we just let this run with a minimum of 5 coins + for (coins_processed, coin) in coins.iter().enumerate() { let coin_id = *coin.id(); if let Some(write_ahead_log::Entry { @@ -946,6 +954,7 @@ impl Faucet for SimpleFaucet { { return Err(FaucetError::BatchSendQueueFull); } + let mut task_map = self.task_id_cache.lock().await; task_map.insert( id, @@ -1035,6 +1044,7 @@ pub async fn batch_transfer_gases( "Batch transfer attempted of size: {:?}", total_requests ); let total_iota_needed: u64 = requests.iter().flat_map(|(_, _, amounts)| amounts).sum(); + // This loop is utilized to grab a coin that is large enough for the request loop { let gas_coin_response = faucet @@ -1292,7 +1302,10 @@ mod tests { #[tokio::test] async fn test_batch_transfer_interface() { let test_cluster = TestClusterBuilder::new().build().await; - let config: FaucetConfig = Default::default(); + let config: FaucetConfig = FaucetConfig { + batch_enabled: true, + ..Default::default() + }; let coin_amount = config.amount; let prom_registry = Registry::new(); let tmp = tempfile::tempdir().unwrap(); @@ -1892,7 +1905,10 @@ mod tests { #[tokio::test] async fn test_amounts_transferred_on_batch() { let test_cluster = TestClusterBuilder::new().build().await; - let config: FaucetConfig = Default::default(); + let config: FaucetConfig = FaucetConfig { + batch_enabled: true, + ..Default::default() + }; let address = test_cluster.get_address_0(); let mut context = test_cluster.wallet; let gas_coins = context diff --git a/crates/iota/tests/cli_tests.rs b/crates/iota/tests/cli_tests.rs index 3f75f606082..aaded6cecdb 100644 --- a/crates/iota/tests/cli_tests.rs +++ b/crates/iota/tests/cli_tests.rs @@ -55,8 +55,8 @@ use iota_test_transaction_builder::batch_make_transfer_transactions; use iota_types::{ base_types::{IotaAddress, ObjectID}, crypto::{ - Ed25519IotaSignature, IotaKeyPair, IotaSignatureInner, Secp256k1IotaSignature, - SignatureScheme, get_key_pair, + AccountKeyPair, Ed25519IotaSignature, IotaKeyPair, IotaSignatureInner, + Secp256k1IotaSignature, SignatureScheme, get_key_pair, }, error::IotaObjectResponseError, gas_coin::GasCoin, @@ -4168,8 +4168,10 @@ async fn test_faucet() -> Result<(), anyhow::Error> { let wallet_config = test_cluster.swarm.dir().join(IOTA_CLIENT_CONFIG); let mut context = WalletContext::new(&wallet_config, None, None)?; + let (address, _): (_, AccountKeyPair) = get_key_pair(); + let faucet_result = IotaClientCommands::Faucet { - address: None, + address: Some(KeyIdentity::Address(address)), url: Some("http://127.0.0.1:5003/gas".to_string()), } .execute(&mut context) @@ -4180,6 +4182,268 @@ async fn test_faucet() -> Result<(), anyhow::Error> { unreachable!("Invalid response"); }; + sleep(Duration::from_secs(5)).await; + + let gas_objects_after = context + .get_gas_objects_owned_by_address(address, None) + .await + .unwrap() + .len(); + assert_eq!(gas_objects_after, 1); + + Ok(()) +} + +#[sim_test] +async fn test_faucet_batch() -> Result<(), anyhow::Error> { + let test_cluster = TestClusterBuilder::new() + .with_fullnode_rpc_port(9000) + .build() + .await; + + let context = test_cluster.wallet; + + let tmp = tempfile::tempdir().unwrap(); + let prom_registry = prometheus::Registry::new(); + let config = iota_faucet::FaucetConfig { + batch_enabled: true, + ..Default::default() + }; + + let prometheus_registry = prometheus::Registry::new(); + let app_state = std::sync::Arc::new(iota_faucet::AppState { + faucet: iota_faucet::SimpleFaucet::new( + context, + &prometheus_registry, + &tmp.path().join("faucet.wal"), + config.clone(), + ) + .await + .unwrap(), + config, + }); + tokio::spawn(async move { iota_faucet::start_faucet(app_state, 10, &prom_registry).await }); + + // Wait for the faucet to be up + sleep(Duration::from_secs(1)).await; + let wallet_config = test_cluster.swarm.dir().join(IOTA_CLIENT_CONFIG); + let mut context = WalletContext::new(&wallet_config, None, None)?; + + let (address_1, _): (_, AccountKeyPair) = get_key_pair(); + let (address_2, _): (_, AccountKeyPair) = get_key_pair(); + let (address_3, _): (_, AccountKeyPair) = get_key_pair(); + + assert_ne!(address_1, address_2); + assert_ne!(address_1, address_3); + assert_ne!(address_2, address_3); + + for address in [address_1, address_2, address_3].iter() { + let gas_objects_after = context + .get_gas_objects_owned_by_address(*address, None) + .await + .unwrap() + .len(); + assert_eq!(gas_objects_after, 0); + } + + for address in [address_1, address_2, address_3].iter() { + let faucet_result = IotaClientCommands::Faucet { + address: Some(KeyIdentity::Address(*address)), + url: Some("http://127.0.0.1:5003/v1/gas".to_string()), + } + .execute(&mut context) + .await?; + + if let IotaClientCommandResult::NoOutput = faucet_result { + } else { + unreachable!("Invalid response"); + }; + } + + // we need to wait a minimum of 10 seconds for gathering the batch + some time + // for transaction to be sequenced + sleep(Duration::from_secs(15)).await; + + for address in [address_1, address_2, address_3].iter() { + let gas_objects_after = context + .get_gas_objects_owned_by_address(*address, None) + .await + .unwrap() + .len(); + assert_eq!(gas_objects_after, 1); + } + + // try with a new batch + let (address_4, _): (_, AccountKeyPair) = get_key_pair(); + let (address_5, _): (_, AccountKeyPair) = get_key_pair(); + let (address_6, _): (_, AccountKeyPair) = get_key_pair(); + + assert_ne!(address_4, address_5); + assert_ne!(address_4, address_6); + assert_ne!(address_5, address_6); + + for address in [address_4, address_5, address_6].iter() { + let gas_objects_after = context + .get_gas_objects_owned_by_address(*address, None) + .await + .unwrap() + .len(); + assert_eq!(gas_objects_after, 0); + } + + for address in [address_4, address_5, address_6].iter() { + let faucet_result = IotaClientCommands::Faucet { + address: Some(KeyIdentity::Address(*address)), + url: Some("http://127.0.0.1:5003/v1/gas".to_string()), + } + .execute(&mut context) + .await?; + + if let IotaClientCommandResult::NoOutput = faucet_result { + } else { + unreachable!("Invalid response"); + }; + } + + // we need to wait a minimum of 10 seconds for gathering the batch + some time + // for transaction to be sequenced + sleep(Duration::from_secs(15)).await; + + for address in [address_4, address_5, address_6].iter() { + let gas_objects_after = context + .get_gas_objects_owned_by_address(*address, None) + .await + .unwrap() + .len(); + assert_eq!(gas_objects_after, 1); + } + + Ok(()) +} + +#[sim_test] +async fn test_faucet_batch_concurrent_requests() -> Result<(), anyhow::Error> { + let test_cluster = TestClusterBuilder::new() + .with_fullnode_rpc_port(9000) + .build() + .await; + + let context = test_cluster.wallet; + + let tmp = tempfile::tempdir().unwrap(); + let prom_registry = prometheus::Registry::new(); + let config = iota_faucet::FaucetConfig { + batch_enabled: true, + ..Default::default() + }; + + let prometheus_registry = prometheus::Registry::new(); + let app_state = std::sync::Arc::new(iota_faucet::AppState { + faucet: iota_faucet::SimpleFaucet::new( + context, + &prometheus_registry, + &tmp.path().join("faucet.wal"), + config.clone(), + ) + .await + .unwrap(), + config, + }); + tokio::spawn(async move { iota_faucet::start_faucet(app_state, 10, &prom_registry).await }); + + // Wait for the faucet to be up + sleep(Duration::from_secs(1)).await; + + let wallet_config = test_cluster.swarm.dir().join(IOTA_CLIENT_CONFIG); + let context = WalletContext::new(&wallet_config, None, None)?; // Use immutable context + + // Generate multiple addresses + let addresses: Vec<_> = (0..6) + .map(|_| get_key_pair::().0) + .collect::>(); + + // Ensure all addresses have zero gas objects initially + for address in &addresses { + assert_eq!( + context + .get_gas_objects_owned_by_address(*address, None) + .await + .unwrap() + .len(), + 0 + ); + } + + // First batch: send faucet requests concurrently for all addresses + let first_batch_results: Vec<_> = futures::future::join_all(addresses.iter().map(|address| { + let wallet_config = wallet_config.clone(); + async move { + let mut context = WalletContext::new(&wallet_config, None, None)?; // Use mutable context (for faucet requests) + IotaClientCommands::Faucet { + address: Some(KeyIdentity::Address(*address)), + url: Some("http://127.0.0.1:5003/v1/gas".to_string()), + } + .execute(&mut context) + .await + } + })) + .await; + + // Ensure all results are `NoOutput` indicating requests were batched + for result in first_batch_results { + assert!(matches!(result, Ok(IotaClientCommandResult::NoOutput))); + } + + // Wait for the first batch to complete + sleep(Duration::from_secs(15)).await; + + // Validate gas objects after the first batch + for address in &addresses { + assert_eq!( + context + .get_gas_objects_owned_by_address(*address, None) + .await + .unwrap() + .len(), + 1 + ); + } + + // Second batch: send faucet requests again for all addresses + let second_batch_results: Vec<_> = futures::future::join_all(addresses.iter().map(|address| { + let wallet_config = wallet_config.clone(); + async move { + let mut context = WalletContext::new(&wallet_config, None, None)?; // Use mutable context + IotaClientCommands::Faucet { + address: Some(KeyIdentity::Address(*address)), + url: Some("http://127.0.0.1:5003/v1/gas".to_string()), + } + .execute(&mut context) + .await + } + })) + .await; + + // Ensure all results are `NoOutput` for the second batch + for result in second_batch_results { + assert!(matches!(result, Ok(IotaClientCommandResult::NoOutput))); + } + + // Wait for the second batch to complete + sleep(Duration::from_secs(15)).await; + + // Validate gas objects after the second batch + for address in &addresses { + assert_eq!( + context + .get_gas_objects_owned_by_address(*address, None) + .await + .unwrap() + .len(), + 2 + ); + } + Ok(()) }