From c537a518dba8edaf63ffb6b0fcb9e75ab2e918b5 Mon Sep 17 00:00:00 2001 From: Nicholas Rodrigues Lordello Date: Mon, 6 Jan 2020 15:00:04 +0100 Subject: [PATCH] implemented confirmations --- src/errors.rs | 4 + src/transaction/confirm.rs | 162 ++++++++++++++++++++++++++++--------- 2 files changed, 130 insertions(+), 36 deletions(-) diff --git a/src/errors.rs b/src/errors.rs index 4027c95b..54f1316d 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -78,6 +78,10 @@ pub enum ExecutionError { /// A contract call executed an invalid opcode. #[error("contract call executed an invalid opcode")] InvalidOpcode, + + /// A contract transaction failed to confirm within the block timeout limit. + #[error("contract transaction timed-out")] + ConfirmTimeout, } impl From for ExecutionError { diff --git a/src/transaction/confirm.rs b/src/transaction/confirm.rs index 9e119983..3b9d82fd 100644 --- a/src/transaction/confirm.rs +++ b/src/transaction/confirm.rs @@ -6,72 +6,162 @@ //! just sent to the mem-pool but does not wait for it to get mined. Hopefully //! some of this can move upstream into the `web3` crate. -#![allow(missing_docs)] - use crate::errors::ExecutionError; -use crate::future::CompatCallFuture; -use futures::future::{Either, TryJoin}; +use crate::future::{CompatCallFuture, Web3Unpin}; +use futures::compat::{Compat01As03, Future01CompatExt}; +use futures::future::{self, TryJoin}; use futures::ready; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use std::num::NonZeroUsize; -use web3::api::Web3; +use std::time::Duration; +use web3::api::{CreateFilter, FilterStream}; +use web3::futures::stream::{Skip as Skip01, StreamFuture as StreamFuture01}; +use web3::futures::Stream as Stream01; use web3::types::{TransactionReceipt, H256, U256}; use web3::Transport; +/// A struct with the confirmation parameters. +#[derive(Clone, Debug)] +pub struct ConfirmParams { + /// The number of blocks to confirm the transaction with. This is the number + /// of blocks mined on top of the block where the transaction was mined. + /// This means that, for example, to just wait for the transaction to be + /// mined, then the number of confirmations should be 0. Positive non-zero + /// values indicate that extra blocks should be waited for on top of the + /// block where the transaction was mined. + pub confirmations: usize, + /// The polling interval. This is used as the interval between consecutive + /// `eth_getFilterChanges` calls to get filter updates, or the interval to + /// wait between confirmation checks in case filters are not supported by + /// the node (for example when using Infura over HTTP(S)). + pub poll_interval: Duration, + /// The maximum number of blocks to wait for a transaction to get confirmed. + pub block_timeout: usize, +} + +/// A future that resolves once a transaction is confirmed. pub struct ConfirmFuture { - web3: Web3, + web3: Web3Unpin, tx: H256, - confirmations: NonZeroUsize, + params: ConfirmParams, + starting_block_num: Option, state: ConfirmState, } -impl ConfirmFuture { - pub fn new(web3: Web3, tx: H256, confirmations: usize) -> Either< -} - -impl ConfirmFuture { - fn state_mut(self: Pin<&mut Self>) -> &mut ConfirmState { - // NOTE: this is safe as the `state` field does not need to be pinned - unsafe { &mut self.get_unchecked_mut().state } - } +/// The state of the confirmation future. +enum ConfirmState { + /// The future is in the state where it needs to setup the checking future + /// to see if the confirmation is complete. This is used as a intermediate + /// state that doesn't actually wait for anything and immediately proceeds + /// to the `Checking` state. + Check, + /// The future is waiting for the block number and transaction receipt to + /// make sure that enough blocks have passed since the transaction was + /// mined. Note that the transaction receipt is retrieved everytime in case + /// of ommered blocks. + Checking(CheckFuture), + /// The future is waiting for the block filter to be created so that it can + /// wait for blocks to go by. + CreatingFilter(CompatCreateFilter, u64), + /// The future is waiting for new blocks to be mined and added to the chain + /// so that the transaction can be confirmed the desired number of blocks. + WaitingForBlocks(CompatFilterFuture), + /// The future is waiting for a poll timeout. This state happens when the + /// node does not support block filters for the given transport (like Infura + /// over HTTPS) so we need to fallback to polling. + WaitingForPollTimeout, } impl Future for ConfirmFuture { - type Output = Result; + type Output = Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let unpinned = Pin::into_inner(self); loop { - match self.as_mut().state_mut() { + unpinned.state = match &mut unpinned.state { + ConfirmState::Check => ConfirmState::Checking(future::try_join( + unpinned.web3.eth().block_number().compat(), + unpinned + .web3 + .eth() + .transaction_receipt(unpinned.tx) + .compat(), + )), ConfirmState::Checking(ref mut check) => { let (block_num, tx) = match ready!(Pin::new(check).poll(cx)) { Ok(result) => result, Err(err) => return Poll::Ready(Err(err.into())), }; - let tx_block_num = tx.and_then(|tx| tx.block_number).unwrap_or(block_num); - if block_num + 1 >= tx_block_num + self.confirmations.get() { - return Poll::Ready + // NOTE: If the transaction hasn't been mined, then assume + // it will be picked up in the next block. + let tx_block_num = tx + .as_ref() + .and_then(|tx| tx.block_number) + .unwrap_or(block_num + 1); + + let target_block_num = tx_block_num + unpinned.params.confirmations; + let remaining_confirmations = target_block_num.saturating_sub(block_num); + + if remaining_confirmations.is_zero() { + // NOTE: It is safe to unwrap here since if tx is `None` + // then the `remaining_confirmations` will always be + // positive since `tx_block_num` will be a future + // block. + return Poll::Ready(Ok(tx.unwrap())); + } + + let starting_block_num = *unpinned.starting_block_num.get_or_insert(block_num); + if block_num.saturating_sub(starting_block_num) + > U256::from(unpinned.params.block_timeout) + { + return Poll::Ready(Err(ExecutionError::ConfirmTimeout)); + } + + ConfirmState::CreatingFilter( + unpinned.web3.eth_filter().create_blocks_filter().compat(), + remaining_confirmations.as_u64(), + ) + } + ConfirmState::CreatingFilter(ref mut create_filter, count) => { + match ready!(Pin::new(create_filter).poll(cx)) { + Ok(filter) => ConfirmState::WaitingForBlocks( + filter + .stream(unpinned.params.poll_interval) + .skip(*count) + .into_future() + .compat(), + ), + Err(_) => { + // NOTE: In the case we fail to create a filter + // (usually because the node doesn't support pub/ + // sub) then fall back to polling. + ConfirmState::WaitingForPollTimeout + } + } + } + ConfirmState::WaitingForBlocks(ref mut wait) => { + match ready!(Pin::new(wait).poll(cx)) { + Ok(_) => ConfirmState::Check, + Err((err, _)) => return Poll::Ready(Err(err.into())), } - todo!() } - ConfirmState::WaitingForBlocks(ref mut wait) => todo!(), + ConfirmState::WaitingForPollTimeout => todo!("polling is currently not supported"), } } } } -enum ConfirmState { - Checking(CheckFuture), - WaitingForBlocks(WaitForBlocksFuture), -} +/// A type alias for a joined `eth_blockNumber` and `eth_getTransactionReceipt` +/// calls. Used when checking that the transaction has been confirmed by enough +/// blocks. +type CheckFuture = + TryJoin, CompatCallFuture>>; -type CheckFuture = TryJoin, CompatCallFuture>; +/// A type alias for a future creating a `eth_newBlockFilter` filter. +type CompatCreateFilter = Compat01As03>; -struct WaitForBlocksFuture(T); - -pub enum TransactionResult { - Hash(H256), - Receipt(TransactionReceipt), -} +/// A type alias for a future that resolves once the block filter has received +/// a certain number of blocks. +type CompatFilterFuture = Compat01As03>>>;