Skip to content

Commit

Permalink
Squerge in #1370
Browse files Browse the repository at this point in the history
Squashed commit of the following:

commit 961ea58
Author: Paul Hauner <[email protected]>
Date:   Tue Jul 21 09:21:13 2020 +1000

    Add batched deposits
  • Loading branch information
paulhauner committed Jul 25, 2020
1 parent 23a8f31 commit c065c52
Showing 1 changed file with 86 additions and 26 deletions.
112 changes: 86 additions & 26 deletions account_manager/src/validator/deposit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use crate::VALIDATOR_DIR_FLAG;
use clap::{App, Arg, ArgMatches};
use deposit_contract::DEPOSIT_GAS;
use environment::Environment;
use futures::compat::Future01CompatExt;
use futures::{
compat::Future01CompatExt,
stream::{FuturesUnordered, StreamExt},
};
use slog::{info, Logger};
use std::path::PathBuf;
use tokio::time::{delay_until, Duration, Instant};
Expand All @@ -20,11 +23,15 @@ pub const VALIDATOR_FLAG: &str = "validator";
pub const ETH1_IPC_FLAG: &str = "eth1-ipc";
pub const ETH1_HTTP_FLAG: &str = "eth1-http";
pub const FROM_ADDRESS_FLAG: &str = "from-address";
pub const CONFIRMATION_COUNT_FLAG: &str = "confirmation-count";
pub const CONFIRMATION_BATCH_SIZE_FLAG: &str = "confirmation-batch-size";

const GWEI: u64 = 1_000_000_000;

const SYNCING_STATE_RETRY_DELAY: Duration = Duration::from_secs(2);

const CONFIRMATIONS_POLL_TIME: Duration = Duration::from_secs(2);

pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
App::new("deposit")
.about(
Expand Down Expand Up @@ -86,15 +93,40 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name(CONFIRMATION_COUNT_FLAG)
.long(CONFIRMATION_COUNT_FLAG)
.value_name("CONFIRMATION_COUNT")
.help(
"The number of Eth1 block confirmations required \
before a transaction is considered complete. Set to \
0 for no confirmations.",
)
.takes_value(true)
.default_value("1"),
)
.arg(
Arg::with_name(CONFIRMATION_BATCH_SIZE_FLAG)
.long(CONFIRMATION_BATCH_SIZE_FLAG)
.value_name("BATCH_SIZE")
.help(
"Perform BATCH_SIZE deposits and wait for confirmations \
in parallel. Useful for achieving faster bulk deposits.",
)
.takes_value(true)
.default_value("10"),
)
}

fn send_deposit_transactions<T1, T2: 'static>(
mut env: Environment<T1>,
log: Logger,
eth1_deposit_datas: Vec<(ValidatorDir, Eth1DepositData)>,
mut eth1_deposit_datas: Vec<(ValidatorDir, Eth1DepositData)>,
from_address: Address,
deposit_contract: Address,
transport: T2,
confirmation_count: usize,
confirmation_batch_size: usize,
) -> Result<(), String>
where
T1: EthSpec,
Expand All @@ -106,32 +138,53 @@ where
let deposits_fut = async {
poll_until_synced(web3.clone(), log.clone()).await?;

for (mut validator_dir, eth1_deposit_data) in eth1_deposit_datas {
let tx_hash = web3
.eth()
.send_transaction(TransactionRequest {
from: from_address,
to: Some(deposit_contract),
gas: Some(DEPOSIT_GAS.into()),
gas_price: None,
value: Some(from_gwei(eth1_deposit_data.deposit_data.amount)),
data: Some(eth1_deposit_data.rlp.into()),
nonce: None,
condition: None,
})
.compat()
.await
.map_err(|e| format!("Failed to send transaction: {:?}", e))?;
for chunk in eth1_deposit_datas.chunks_mut(confirmation_batch_size) {
let futures = FuturesUnordered::default();

info!(
log,
"Submitted deposit";
"tx_hash" => format!("{:?}", tx_hash),
);
for (ref mut validator_dir, eth1_deposit_data) in chunk.iter_mut() {
let web3 = web3.clone();
let log = log.clone();
futures.push(async move {
let tx_hash = web3
.send_transaction_with_confirmation(
TransactionRequest {
from: from_address,
to: Some(deposit_contract),
gas: Some(DEPOSIT_GAS.into()),
gas_price: None,
value: Some(from_gwei(eth1_deposit_data.deposit_data.amount)),
data: Some(eth1_deposit_data.rlp.clone().into()),
nonce: None,
condition: None,
},
CONFIRMATIONS_POLL_TIME,
confirmation_count,
)
.compat()
.await
.map_err(|e| format!("Failed to send transaction: {:?}", e))?;

validator_dir
.save_eth1_deposit_tx_hash(&format!("{:?}", tx_hash))
.map_err(|e| format!("Failed to save tx hash {:?} to disk: {:?}", tx_hash, e))?;
info!(
log,
"Submitted deposit";
"tx_hash" => format!("{:?}", tx_hash),
);

validator_dir
.save_eth1_deposit_tx_hash(&format!("{:?}", tx_hash))
.map_err(|e| {
format!("Failed to save tx hash {:?} to disk: {:?}", tx_hash, e)
})?;

Ok::<(), String>(())
});
}

futures
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<_, _>>()?;
}

Ok::<(), String>(())
Expand All @@ -157,6 +210,9 @@ pub fn cli_run<T: EthSpec>(
let eth1_ipc_path: Option<PathBuf> = clap_utils::parse_optional(matches, ETH1_IPC_FLAG)?;
let eth1_http_url: Option<String> = clap_utils::parse_optional(matches, ETH1_HTTP_FLAG)?;
let from_address: Address = clap_utils::parse_required(matches, FROM_ADDRESS_FLAG)?;
let confirmation_count: usize = clap_utils::parse_required(matches, CONFIRMATION_COUNT_FLAG)?;
let confirmation_batch_size: usize =
clap_utils::parse_required(matches, CONFIRMATION_BATCH_SIZE_FLAG)?;

let manager = ValidatorManager::open(&data_dir)
.map_err(|e| format!("Unable to read --{}: {:?}", VALIDATOR_DIR_FLAG, e))?;
Expand Down Expand Up @@ -250,6 +306,8 @@ pub fn cli_run<T: EthSpec>(
from_address,
deposit_contract,
ipc_transport,
confirmation_count,
confirmation_batch_size,
)
}
(None, Some(http_url)) => {
Expand All @@ -262,6 +320,8 @@ pub fn cli_run<T: EthSpec>(
from_address,
deposit_contract,
http_transport,
confirmation_count,
confirmation_batch_size,
)
}
}
Expand Down

0 comments on commit c065c52

Please sign in to comment.