Skip to content

Commit

Permalink
[iota-faucet] Enable the use of batched mode on faucet (#4110)
Browse files Browse the repository at this point in the history
* fix: Pass at least one coin to the faucet batch queue

* fix: Clippy

* fix: Rework split point

* fix: Enable `batch_enabled` flag for existing batch tests

* fix: Add test_faucet_batch_concurrent_requests test

* fix: Fix comment

* fix: Remove wrong comment
  • Loading branch information
samuel-rufi authored Nov 28, 2024
1 parent f9aa341 commit 3ff8bf8
Show file tree
Hide file tree
Showing 2 changed files with 293 additions and 13 deletions.
36 changes: 26 additions & 10 deletions crates/iota-faucet/src/faucet/simple_faucet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ impl SimpleFaucet {
.map(|q| GasCoin::try_from(&q.1).unwrap())
.filter(|coin| coin.0.balance.value() >= (config.amount * config.num_coins as u64))
.collect::<Vec<GasCoin>>();

if coins.is_empty() {
return Err(FaucetError::NoGasCoinAvailable);
}

let metrics = FaucetMetrics::new(prometheus_registry);

let wal = WriteAheadLog::open(wal_path);
Expand All @@ -131,16 +136,19 @@ impl SimpleFaucet {
config.max_request_queue_length as usize,
);

// This is to handle the case where there is only 1 coin, we want it to go to
// the normal queue
let split_point = if coins.len() > 10 {
coins.len() / 2
// Split the coins eventually into two pools: one for the gas pool and one for
// the batch pool. The batch pool will only be populated if the batch feature is
// enabled.
let split_point = if config.batch_enabled {
if coins.len() > 1 {
1 // At least one coin goes to the gas pool the rest to the batch pool
} else {
0 // Only one coin available, all coins go to the batch pool. This is safe as we have already checked above that `coins` is not empty.
}
} else {
coins.len()
coins.len() // All coins go to the gas pool if batch is disabled
};
// Put half of the coins in the old faucet impl queue, and put half in the other
// queue for batch coins. In the test cases we create an account with 5
// coins so we just let this run with a minimum of 5 coins

for (coins_processed, coin) in coins.iter().enumerate() {
let coin_id = *coin.id();
if let Some(write_ahead_log::Entry {
Expand Down Expand Up @@ -946,6 +954,7 @@ impl Faucet for SimpleFaucet {
{
return Err(FaucetError::BatchSendQueueFull);
}

let mut task_map = self.task_id_cache.lock().await;
task_map.insert(
id,
Expand Down Expand Up @@ -1035,6 +1044,7 @@ pub async fn batch_transfer_gases(
"Batch transfer attempted of size: {:?}", total_requests
);
let total_iota_needed: u64 = requests.iter().flat_map(|(_, _, amounts)| amounts).sum();

// This loop is utilized to grab a coin that is large enough for the request
loop {
let gas_coin_response = faucet
Expand Down Expand Up @@ -1292,7 +1302,10 @@ mod tests {
#[tokio::test]
async fn test_batch_transfer_interface() {
let test_cluster = TestClusterBuilder::new().build().await;
let config: FaucetConfig = Default::default();
let config: FaucetConfig = FaucetConfig {
batch_enabled: true,
..Default::default()
};
let coin_amount = config.amount;
let prom_registry = Registry::new();
let tmp = tempfile::tempdir().unwrap();
Expand Down Expand Up @@ -1892,7 +1905,10 @@ mod tests {
#[tokio::test]
async fn test_amounts_transferred_on_batch() {
let test_cluster = TestClusterBuilder::new().build().await;
let config: FaucetConfig = Default::default();
let config: FaucetConfig = FaucetConfig {
batch_enabled: true,
..Default::default()
};
let address = test_cluster.get_address_0();
let mut context = test_cluster.wallet;
let gas_coins = context
Expand Down
270 changes: 267 additions & 3 deletions crates/iota/tests/cli_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ use iota_test_transaction_builder::batch_make_transfer_transactions;
use iota_types::{
base_types::{IotaAddress, ObjectID},
crypto::{
Ed25519IotaSignature, IotaKeyPair, IotaSignatureInner, Secp256k1IotaSignature,
SignatureScheme, get_key_pair,
AccountKeyPair, Ed25519IotaSignature, IotaKeyPair, IotaSignatureInner,
Secp256k1IotaSignature, SignatureScheme, get_key_pair,
},
error::IotaObjectResponseError,
gas_coin::GasCoin,
Expand Down Expand Up @@ -4168,8 +4168,10 @@ async fn test_faucet() -> Result<(), anyhow::Error> {
let wallet_config = test_cluster.swarm.dir().join(IOTA_CLIENT_CONFIG);
let mut context = WalletContext::new(&wallet_config, None, None)?;

let (address, _): (_, AccountKeyPair) = get_key_pair();

let faucet_result = IotaClientCommands::Faucet {
address: None,
address: Some(KeyIdentity::Address(address)),
url: Some("http://127.0.0.1:5003/gas".to_string()),
}
.execute(&mut context)
Expand All @@ -4180,6 +4182,268 @@ async fn test_faucet() -> Result<(), anyhow::Error> {
unreachable!("Invalid response");
};

sleep(Duration::from_secs(5)).await;

let gas_objects_after = context
.get_gas_objects_owned_by_address(address, None)
.await
.unwrap()
.len();
assert_eq!(gas_objects_after, 1);

Ok(())
}

#[sim_test]
async fn test_faucet_batch() -> Result<(), anyhow::Error> {
let test_cluster = TestClusterBuilder::new()
.with_fullnode_rpc_port(9000)
.build()
.await;

let context = test_cluster.wallet;

let tmp = tempfile::tempdir().unwrap();
let prom_registry = prometheus::Registry::new();
let config = iota_faucet::FaucetConfig {
batch_enabled: true,
..Default::default()
};

let prometheus_registry = prometheus::Registry::new();
let app_state = std::sync::Arc::new(iota_faucet::AppState {
faucet: iota_faucet::SimpleFaucet::new(
context,
&prometheus_registry,
&tmp.path().join("faucet.wal"),
config.clone(),
)
.await
.unwrap(),
config,
});
tokio::spawn(async move { iota_faucet::start_faucet(app_state, 10, &prom_registry).await });

// Wait for the faucet to be up
sleep(Duration::from_secs(1)).await;
let wallet_config = test_cluster.swarm.dir().join(IOTA_CLIENT_CONFIG);
let mut context = WalletContext::new(&wallet_config, None, None)?;

let (address_1, _): (_, AccountKeyPair) = get_key_pair();
let (address_2, _): (_, AccountKeyPair) = get_key_pair();
let (address_3, _): (_, AccountKeyPair) = get_key_pair();

assert_ne!(address_1, address_2);
assert_ne!(address_1, address_3);
assert_ne!(address_2, address_3);

for address in [address_1, address_2, address_3].iter() {
let gas_objects_after = context
.get_gas_objects_owned_by_address(*address, None)
.await
.unwrap()
.len();
assert_eq!(gas_objects_after, 0);
}

for address in [address_1, address_2, address_3].iter() {
let faucet_result = IotaClientCommands::Faucet {
address: Some(KeyIdentity::Address(*address)),
url: Some("http://127.0.0.1:5003/v1/gas".to_string()),
}
.execute(&mut context)
.await?;

if let IotaClientCommandResult::NoOutput = faucet_result {
} else {
unreachable!("Invalid response");
};
}

// we need to wait a minimum of 10 seconds for gathering the batch + some time
// for transaction to be sequenced
sleep(Duration::from_secs(15)).await;

for address in [address_1, address_2, address_3].iter() {
let gas_objects_after = context
.get_gas_objects_owned_by_address(*address, None)
.await
.unwrap()
.len();
assert_eq!(gas_objects_after, 1);
}

// try with a new batch
let (address_4, _): (_, AccountKeyPair) = get_key_pair();
let (address_5, _): (_, AccountKeyPair) = get_key_pair();
let (address_6, _): (_, AccountKeyPair) = get_key_pair();

assert_ne!(address_4, address_5);
assert_ne!(address_4, address_6);
assert_ne!(address_5, address_6);

for address in [address_4, address_5, address_6].iter() {
let gas_objects_after = context
.get_gas_objects_owned_by_address(*address, None)
.await
.unwrap()
.len();
assert_eq!(gas_objects_after, 0);
}

for address in [address_4, address_5, address_6].iter() {
let faucet_result = IotaClientCommands::Faucet {
address: Some(KeyIdentity::Address(*address)),
url: Some("http://127.0.0.1:5003/v1/gas".to_string()),
}
.execute(&mut context)
.await?;

if let IotaClientCommandResult::NoOutput = faucet_result {
} else {
unreachable!("Invalid response");
};
}

// we need to wait a minimum of 10 seconds for gathering the batch + some time
// for transaction to be sequenced
sleep(Duration::from_secs(15)).await;

for address in [address_4, address_5, address_6].iter() {
let gas_objects_after = context
.get_gas_objects_owned_by_address(*address, None)
.await
.unwrap()
.len();
assert_eq!(gas_objects_after, 1);
}

Ok(())
}

#[sim_test]
async fn test_faucet_batch_concurrent_requests() -> Result<(), anyhow::Error> {
let test_cluster = TestClusterBuilder::new()
.with_fullnode_rpc_port(9000)
.build()
.await;

let context = test_cluster.wallet;

let tmp = tempfile::tempdir().unwrap();
let prom_registry = prometheus::Registry::new();
let config = iota_faucet::FaucetConfig {
batch_enabled: true,
..Default::default()
};

let prometheus_registry = prometheus::Registry::new();
let app_state = std::sync::Arc::new(iota_faucet::AppState {
faucet: iota_faucet::SimpleFaucet::new(
context,
&prometheus_registry,
&tmp.path().join("faucet.wal"),
config.clone(),
)
.await
.unwrap(),
config,
});
tokio::spawn(async move { iota_faucet::start_faucet(app_state, 10, &prom_registry).await });

// Wait for the faucet to be up
sleep(Duration::from_secs(1)).await;

let wallet_config = test_cluster.swarm.dir().join(IOTA_CLIENT_CONFIG);
let context = WalletContext::new(&wallet_config, None, None)?; // Use immutable context

// Generate multiple addresses
let addresses: Vec<_> = (0..6)
.map(|_| get_key_pair::<AccountKeyPair>().0)
.collect::<Vec<IotaAddress>>();

// Ensure all addresses have zero gas objects initially
for address in &addresses {
assert_eq!(
context
.get_gas_objects_owned_by_address(*address, None)
.await
.unwrap()
.len(),
0
);
}

// First batch: send faucet requests concurrently for all addresses
let first_batch_results: Vec<_> = futures::future::join_all(addresses.iter().map(|address| {
let wallet_config = wallet_config.clone();
async move {
let mut context = WalletContext::new(&wallet_config, None, None)?; // Use mutable context (for faucet requests)
IotaClientCommands::Faucet {
address: Some(KeyIdentity::Address(*address)),
url: Some("http://127.0.0.1:5003/v1/gas".to_string()),
}
.execute(&mut context)
.await
}
}))
.await;

// Ensure all results are `NoOutput` indicating requests were batched
for result in first_batch_results {
assert!(matches!(result, Ok(IotaClientCommandResult::NoOutput)));
}

// Wait for the first batch to complete
sleep(Duration::from_secs(15)).await;

// Validate gas objects after the first batch
for address in &addresses {
assert_eq!(
context
.get_gas_objects_owned_by_address(*address, None)
.await
.unwrap()
.len(),
1
);
}

// Second batch: send faucet requests again for all addresses
let second_batch_results: Vec<_> = futures::future::join_all(addresses.iter().map(|address| {
let wallet_config = wallet_config.clone();
async move {
let mut context = WalletContext::new(&wallet_config, None, None)?; // Use mutable context
IotaClientCommands::Faucet {
address: Some(KeyIdentity::Address(*address)),
url: Some("http://127.0.0.1:5003/v1/gas".to_string()),
}
.execute(&mut context)
.await
}
}))
.await;

// Ensure all results are `NoOutput` for the second batch
for result in second_batch_results {
assert!(matches!(result, Ok(IotaClientCommandResult::NoOutput)));
}

// Wait for the second batch to complete
sleep(Duration::from_secs(15)).await;

// Validate gas objects after the second batch
for address in &addresses {
assert_eq!(
context
.get_gas_objects_owned_by_address(*address, None)
.await
.unwrap()
.len(),
2
);
}

Ok(())
}

Expand Down

0 comments on commit 3ff8bf8

Please sign in to comment.