diff --git a/Cargo.lock b/Cargo.lock index 182d8f6bacad..02d7da8f7657 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11086,6 +11086,7 @@ name = "node-bench" version = "0.9.0-dev" dependencies = [ "array-bytes", + "async-trait", "clap 4.5.13", "derive_more 0.99.17", "fs_extra", @@ -23245,6 +23246,7 @@ version = "0.34.0" dependencies = [ "array-bytes", "assert_matches", + "async-trait", "futures", "futures-util", "hex", diff --git a/prdoc/pr_6528.prdoc b/prdoc/pr_6528.prdoc new file mode 100644 index 000000000000..477ad76c947f --- /dev/null +++ b/prdoc/pr_6528.prdoc @@ -0,0 +1,18 @@ +title: 'TransactionPool API uses async_trait' +doc: +- audience: Node Dev + description: |- + This PR refactors `TransactionPool` API to use `async_trait`, replacing the` Pin>` pattern. This should improve readability and maintainability. + + The change is not altering any functionality. +crates: +- name: sc-rpc-spec-v2 + bump: minor +- name: sc-service + bump: minor +- name: sc-transaction-pool-api + bump: major +- name: sc-transaction-pool + bump: major +- name: sc-rpc + bump: minor diff --git a/substrate/bin/node/bench/Cargo.toml b/substrate/bin/node/bench/Cargo.toml index 8c6556da682c..447f947107c1 100644 --- a/substrate/bin/node/bench/Cargo.toml +++ b/substrate/bin/node/bench/Cargo.toml @@ -15,6 +15,7 @@ workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = { workspace = true } array-bytes = { workspace = true, default-features = true } clap = { features = ["derive"], workspace = true } log = { workspace = true, default-features = true } diff --git a/substrate/bin/node/bench/src/construct.rs b/substrate/bin/node/bench/src/construct.rs index bed6e3d914c2..22129c6a1d69 100644 --- a/substrate/bin/node/bench/src/construct.rs +++ b/substrate/bin/node/bench/src/construct.rs @@ -24,14 +24,14 @@ //! DO NOT depend on user input). Thus transaction generation should be //! based on randomized data. -use futures::Future; use std::{borrow::Cow, collections::HashMap, pin::Pin, sync::Arc}; +use async_trait::async_trait; use node_primitives::Block; use node_testing::bench::{BenchDb, BlockType, DatabaseType, KeyTypes}; use sc_transaction_pool_api::{ - ImportNotificationStream, PoolFuture, PoolStatus, ReadyTransactions, TransactionFor, - TransactionSource, TransactionStatusStreamFor, TxHash, + ImportNotificationStream, PoolStatus, ReadyTransactions, TransactionFor, TransactionSource, + TransactionStatusStreamFor, TxHash, }; use sp_consensus::{Environment, Proposer}; use sp_inherents::InherentDataProvider; @@ -224,54 +224,47 @@ impl ReadyTransactions for TransactionsIterator { fn report_invalid(&mut self, _tx: &Self::Item) {} } +#[async_trait] impl sc_transaction_pool_api::TransactionPool for Transactions { type Block = Block; type Hash = node_primitives::Hash; type InPoolTransaction = PoolTransaction; type Error = sc_transaction_pool_api::error::Error; - /// Returns a future that imports a bunch of unverified transactions to the pool. - fn submit_at( + /// Asynchronously imports a bunch of unverified transactions to the pool. + async fn submit_at( &self, _at: Self::Hash, _source: TransactionSource, _xts: Vec>, - ) -> PoolFuture>, Self::Error> { + ) -> Result>, Self::Error> { unimplemented!() } - /// Returns a future that imports one unverified transaction to the pool. - fn submit_one( + /// Asynchronously imports one unverified transaction to the pool. + async fn submit_one( &self, _at: Self::Hash, _source: TransactionSource, _xt: TransactionFor, - ) -> PoolFuture, Self::Error> { + ) -> Result, Self::Error> { unimplemented!() } - fn submit_and_watch( + async fn submit_and_watch( &self, _at: Self::Hash, _source: TransactionSource, _xt: TransactionFor, - ) -> PoolFuture>>, Self::Error> { + ) -> Result>>, Self::Error> { unimplemented!() } - fn ready_at( + async fn ready_at( &self, _at: Self::Hash, - ) -> Pin< - Box< - dyn Future< - Output = Box> + Send>, - > + Send, - >, - > { - let iter: Box> + Send> = - Box::new(TransactionsIterator(self.0.clone().into_iter())); - Box::pin(futures::future::ready(iter)) + ) -> Box> + Send> { + Box::new(TransactionsIterator(self.0.clone().into_iter())) } fn ready(&self) -> Box> + Send> { @@ -306,18 +299,11 @@ impl sc_transaction_pool_api::TransactionPool for Transactions { unimplemented!() } - fn ready_at_with_timeout( + async fn ready_at_with_timeout( &self, _at: Self::Hash, _timeout: std::time::Duration, - ) -> Pin< - Box< - dyn Future< - Output = Box> + Send>, - > + Send - + '_, - >, - > { + ) -> Box> + Send> { unimplemented!() } } diff --git a/substrate/client/rpc-spec-v2/Cargo.toml b/substrate/client/rpc-spec-v2/Cargo.toml index 58dd8b830beb..daa805912fb9 100644 --- a/substrate/client/rpc-spec-v2/Cargo.toml +++ b/substrate/client/rpc-spec-v2/Cargo.toml @@ -44,6 +44,7 @@ rand = { workspace = true, default-features = true } schnellru = { workspace = true } [dev-dependencies] +async-trait = { workspace = true } jsonrpsee = { workspace = true, features = ["server", "ws-client"] } serde_json = { workspace = true, default-features = true } tokio = { features = ["macros"], workspace = true, default-features = true } diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs index adcc987f9c39..a543969a89b8 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs @@ -16,16 +16,16 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use async_trait::async_trait; use codec::Encode; -use futures::Future; use sc_transaction_pool::BasicPool; use sc_transaction_pool_api::{ - ImportNotificationStream, PoolFuture, PoolStatus, ReadyTransactions, TransactionFor, - TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash, + ImportNotificationStream, PoolStatus, ReadyTransactions, TransactionFor, TransactionPool, + TransactionSource, TransactionStatusStreamFor, TxHash, }; use crate::hex_string; -use futures::{FutureExt, StreamExt}; +use futures::StreamExt; use sp_runtime::traits::Block as BlockT; use std::{collections::HashMap, pin::Pin, sync::Arc}; @@ -77,67 +77,64 @@ impl MiddlewarePool { } } +#[async_trait] impl TransactionPool for MiddlewarePool { type Block = as TransactionPool>::Block; type Hash = as TransactionPool>::Hash; type InPoolTransaction = as TransactionPool>::InPoolTransaction; type Error = as TransactionPool>::Error; - fn submit_at( + async fn submit_at( &self, at: ::Hash, source: TransactionSource, xts: Vec>, - ) -> PoolFuture, Self::Error>>, Self::Error> { - self.inner_pool.submit_at(at, source, xts) + ) -> Result, Self::Error>>, Self::Error> { + self.inner_pool.submit_at(at, source, xts).await } - fn submit_one( + async fn submit_one( &self, at: ::Hash, source: TransactionSource, xt: TransactionFor, - ) -> PoolFuture, Self::Error> { - self.inner_pool.submit_one(at, source, xt) + ) -> Result, Self::Error> { + self.inner_pool.submit_one(at, source, xt).await } - fn submit_and_watch( + async fn submit_and_watch( &self, at: ::Hash, source: TransactionSource, xt: TransactionFor, - ) -> PoolFuture>>, Self::Error> { - let pool = self.inner_pool.clone(); - let sender = self.sender.clone(); + ) -> Result>>, Self::Error> { let transaction = hex_string(&xt.encode()); + let sender = self.sender.clone(); - async move { - let watcher = match pool.submit_and_watch(at, source, xt).await { - Ok(watcher) => watcher, - Err(err) => { - let _ = sender.send(MiddlewarePoolEvent::PoolError { - transaction: transaction.clone(), - err: err.to_string(), - }); - return Err(err); - }, - }; - - let watcher = watcher.map(move |status| { - let sender = sender.clone(); - let transaction = transaction.clone(); - - let _ = sender.send(MiddlewarePoolEvent::TransactionStatus { - transaction, - status: status.clone(), + let watcher = match self.inner_pool.submit_and_watch(at, source, xt).await { + Ok(watcher) => watcher, + Err(err) => { + let _ = sender.send(MiddlewarePoolEvent::PoolError { + transaction: transaction.clone(), + err: err.to_string(), }); + return Err(err); + }, + }; + + let watcher = watcher.map(move |status| { + let sender = sender.clone(); + let transaction = transaction.clone(); - status + let _ = sender.send(MiddlewarePoolEvent::TransactionStatus { + transaction, + status: status.clone(), }); - Ok(watcher.boxed()) - } - .boxed() + status + }); + + Ok(watcher.boxed()) } fn remove_invalid(&self, hashes: &[TxHash]) -> Vec> { @@ -164,17 +161,11 @@ impl TransactionPool for MiddlewarePool { self.inner_pool.ready_transaction(hash) } - fn ready_at( + async fn ready_at( &self, at: ::Hash, - ) -> Pin< - Box< - dyn Future< - Output = Box> + Send>, - > + Send, - >, - > { - self.inner_pool.ready_at(at) + ) -> Box> + Send> { + self.inner_pool.ready_at(at).await } fn ready(&self) -> Box> + Send> { @@ -185,18 +176,11 @@ impl TransactionPool for MiddlewarePool { self.inner_pool.futures() } - fn ready_at_with_timeout( + async fn ready_at_with_timeout( &self, at: ::Hash, _timeout: std::time::Duration, - ) -> Pin< - Box< - dyn Future< - Output = Box> + Send>, - > + Send - + '_, - >, - > { - self.inner_pool.ready_at(at) + ) -> Box> + Send> { + self.inner_pool.ready_at(at).await } } diff --git a/substrate/client/rpc/src/author/mod.rs b/substrate/client/rpc/src/author/mod.rs index 731f4df2f6f3..6afc871e565a 100644 --- a/substrate/client/rpc/src/author/mod.rs +++ b/substrate/client/rpc/src/author/mod.rs @@ -29,7 +29,6 @@ use crate::{ }; use codec::{Decode, Encode}; -use futures::TryFutureExt; use jsonrpsee::{core::async_trait, types::ErrorObject, Extensions, PendingSubscriptionSink}; use sc_rpc_api::check_if_safe; use sc_transaction_pool_api::{ @@ -191,14 +190,16 @@ where }, }; - let submit = self.pool.submit_and_watch(best_block_hash, TX_SOURCE, dxt).map_err(|e| { - e.into_pool_error() - .map(error::Error::from) - .unwrap_or_else(|e| error::Error::Verification(Box::new(e))) - }); - + let pool = self.pool.clone(); let fut = async move { - let stream = match submit.await { + let submit = + pool.submit_and_watch(best_block_hash, TX_SOURCE, dxt).await.map_err(|e| { + e.into_pool_error() + .map(error::Error::from) + .unwrap_or_else(|e| error::Error::Verification(Box::new(e))) + }); + + let stream = match submit { Ok(stream) => stream, Err(err) => { let _ = pending.reject(ErrorObject::from(err)).await; diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index 5cfd80cef910..9c01d7288a81 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -528,13 +528,17 @@ where }; let start = std::time::Instant::now(); - let import_future = self.pool.submit_one( - self.client.info().best_hash, - sc_transaction_pool_api::TransactionSource::External, - uxt, - ); + let pool = self.pool.clone(); + let client = self.client.clone(); Box::pin(async move { - match import_future.await { + match pool + .submit_one( + client.info().best_hash, + sc_transaction_pool_api::TransactionSource::External, + uxt, + ) + .await + { Ok(_) => { let elapsed = start.elapsed(); debug!(target: sc_transaction_pool::LOG_TARGET, "import transaction: {elapsed:?}"); diff --git a/substrate/client/transaction-pool/api/src/lib.rs b/substrate/client/transaction-pool/api/src/lib.rs index 3ac1a79a0c28..6f771e9479bd 100644 --- a/substrate/client/transaction-pool/api/src/lib.rs +++ b/substrate/client/transaction-pool/api/src/lib.rs @@ -23,7 +23,7 @@ pub mod error; use async_trait::async_trait; use codec::Codec; -use futures::{Future, Stream}; +use futures::Stream; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use sp_core::offchain::TransactionPoolExt; use sp_runtime::traits::{Block as BlockT, Member}; @@ -208,9 +208,6 @@ pub type LocalTransactionFor

= <

::Block as BlockT> /// Transaction's index within the block in which it was included. pub type TxIndex = usize; -/// Typical future type used in transaction pool api. -pub type PoolFuture = std::pin::Pin> + Send>>; - /// In-pool transaction interface. /// /// The pool is container of transactions that are implementing this trait. @@ -238,6 +235,7 @@ pub trait InPoolTransaction { } /// Transaction pool interface. +#[async_trait] pub trait TransactionPool: Send + Sync { /// Block type. type Block: BlockT; @@ -253,46 +251,40 @@ pub trait TransactionPool: Send + Sync { // *** RPC - /// Returns a future that imports a bunch of unverified transactions to the pool. - fn submit_at( + /// Asynchronously imports a bunch of unverified transactions to the pool. + async fn submit_at( &self, at: ::Hash, source: TransactionSource, xts: Vec>, - ) -> PoolFuture, Self::Error>>, Self::Error>; + ) -> Result, Self::Error>>, Self::Error>; - /// Returns a future that imports one unverified transaction to the pool. - fn submit_one( + /// Asynchronously imports one unverified transaction to the pool. + async fn submit_one( &self, at: ::Hash, source: TransactionSource, xt: TransactionFor, - ) -> PoolFuture, Self::Error>; + ) -> Result, Self::Error>; - /// Returns a future that imports a single transaction and starts to watch their progress in the + /// Asynchronously imports a single transaction and starts to watch their progress in the /// pool. - fn submit_and_watch( + async fn submit_and_watch( &self, at: ::Hash, source: TransactionSource, xt: TransactionFor, - ) -> PoolFuture>>, Self::Error>; + ) -> Result>>, Self::Error>; // *** Block production / Networking /// Get an iterator for ready transactions ordered by priority. /// - /// Guarantees to return only when transaction pool got updated at `at` block. - /// Guarantees to return immediately when `None` is passed. - fn ready_at( + /// Guaranteed to resolve only when transaction pool got updated at `at` block. + /// Guaranteed to resolve immediately when `None` is passed. + async fn ready_at( &self, at: ::Hash, - ) -> Pin< - Box< - dyn Future< - Output = Box> + Send>, - > + Send, - >, - >; + ) -> Box> + Send>; /// Get an iterator for ready transactions ordered by priority. fn ready(&self) -> Box> + Send>; @@ -322,22 +314,15 @@ pub trait TransactionPool: Send + Sync { /// Return specific ready transaction by hash, if there is one. fn ready_transaction(&self, hash: &TxHash) -> Option>; - /// Returns set of ready transaction at given block within given timeout. + /// Asynchronously returns a set of ready transaction at given block within given timeout. /// - /// If the timeout is hit during method execution then the best effort set of ready transactions - /// for given block, without executing full maintain process is returned. - fn ready_at_with_timeout( + /// If the timeout is hit during method execution, then the best effort (without executing full + /// maintain process) set of ready transactions for given block is returned. + async fn ready_at_with_timeout( &self, at: ::Hash, timeout: std::time::Duration, - ) -> Pin< - Box< - dyn Future< - Output = Box> + Send>, - > + Send - + '_, - >, - >; + ) -> Box> + Send>; } /// An iterator of ready transactions. diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index a342d35b2844..065d0cb3a274 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -33,7 +33,7 @@ use crate::{ enactment_state::{EnactmentAction, EnactmentState}, fork_aware_txpool::revalidation_worker, graph::{self, base_pool::Transaction, ExtrinsicFor, ExtrinsicHash, IsValidator, Options}, - PolledIterator, ReadyIteratorFor, LOG_TARGET, + ReadyIteratorFor, LOG_TARGET, }; use async_trait::async_trait; use futures::{ @@ -45,8 +45,8 @@ use futures::{ use parking_lot::Mutex; use prometheus_endpoint::Registry as PrometheusRegistry; use sc_transaction_pool_api::{ - ChainEvent, ImportNotificationStream, MaintainedTransactionPool, PoolFuture, PoolStatus, - TransactionFor, TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash, + ChainEvent, ImportNotificationStream, MaintainedTransactionPool, PoolStatus, TransactionFor, + TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash, }; use sp_blockchain::{HashAndNumber, TreeRoute}; use sp_core::traits::SpawnEssentialNamed; @@ -375,14 +375,13 @@ where /// /// Pruning is just rebuilding the underlying transactions graph, no validations are executed, /// so this process shall be fast. - pub fn ready_at_light(&self, at: Block::Hash) -> PolledIterator { + pub async fn ready_at_light(&self, at: Block::Hash) -> ReadyIteratorFor { let start = Instant::now(); let api = self.api.clone(); log::trace!(target: LOG_TARGET, "fatp::ready_at_light {:?}", at); let Ok(block_number) = self.api.resolve_block_number(at) else { - let empty: ReadyIteratorFor = Box::new(std::iter::empty()); - return Box::pin(async { empty }) + return Box::new(std::iter::empty()) }; let best_result = { @@ -401,57 +400,53 @@ where ) }; - Box::pin(async move { - if let Ok((Some(best_tree_route), Some(best_view))) = best_result { - let tmp_view: View = View::new_from_other( - &best_view, - &HashAndNumber { hash: at, number: block_number }, - ); - - let mut all_extrinsics = vec![]; + if let Ok((Some(best_tree_route), Some(best_view))) = best_result { + let tmp_view: View = + View::new_from_other(&best_view, &HashAndNumber { hash: at, number: block_number }); - for h in best_tree_route.enacted() { - let extrinsics = api - .block_body(h.hash) - .await - .unwrap_or_else(|e| { - log::warn!(target: LOG_TARGET, "Compute ready light transactions: error request: {}", e); - None - }) - .unwrap_or_default() - .into_iter() - .map(|t| api.hash_and_length(&t).0); - all_extrinsics.extend(extrinsics); - } + let mut all_extrinsics = vec![]; - let before_count = tmp_view.pool.validated_pool().status().ready; - let tags = tmp_view - .pool - .validated_pool() - .extrinsics_tags(&all_extrinsics) + for h in best_tree_route.enacted() { + let extrinsics = api + .block_body(h.hash) + .await + .unwrap_or_else(|e| { + log::warn!(target: LOG_TARGET, "Compute ready light transactions: error request: {}", e); + None + }) + .unwrap_or_default() .into_iter() - .flatten() - .flatten() - .collect::>(); - let _ = tmp_view.pool.validated_pool().prune_tags(tags); - - let after_count = tmp_view.pool.validated_pool().status().ready; - log::debug!(target: LOG_TARGET, - "fatp::ready_at_light {} from {} before: {} to be removed: {} after: {} took:{:?}", - at, - best_view.at.hash, - before_count, - all_extrinsics.len(), - after_count, - start.elapsed() - ); - Box::new(tmp_view.pool.validated_pool().ready()) - } else { - let empty: ReadyIteratorFor = Box::new(std::iter::empty()); - log::debug!(target: LOG_TARGET, "fatp::ready_at_light {} -> empty, took:{:?}", at, start.elapsed()); - empty + .map(|t| api.hash_and_length(&t).0); + all_extrinsics.extend(extrinsics); } - }) + + let before_count = tmp_view.pool.validated_pool().status().ready; + let tags = tmp_view + .pool + .validated_pool() + .extrinsics_tags(&all_extrinsics) + .into_iter() + .flatten() + .flatten() + .collect::>(); + let _ = tmp_view.pool.validated_pool().prune_tags(tags); + + let after_count = tmp_view.pool.validated_pool().status().ready; + log::debug!(target: LOG_TARGET, + "fatp::ready_at_light {} from {} before: {} to be removed: {} after: {} took:{:?}", + at, + best_view.at.hash, + before_count, + all_extrinsics.len(), + after_count, + start.elapsed() + ); + Box::new(tmp_view.pool.validated_pool().ready()) + } else { + let empty: ReadyIteratorFor = Box::new(std::iter::empty()); + log::debug!(target: LOG_TARGET, "fatp::ready_at_light {} -> empty, took:{:?}", at, start.elapsed()); + empty + } } /// Waits for the set of ready transactions for a given block up to a specified timeout. @@ -464,18 +459,18 @@ where /// maintain. /// /// Returns a future resolving to a ready iterator of transactions. - fn ready_at_with_timeout_internal( + async fn ready_at_with_timeout_internal( &self, at: Block::Hash, timeout: std::time::Duration, - ) -> PolledIterator { + ) -> ReadyIteratorFor { log::debug!(target: LOG_TARGET, "fatp::ready_at_with_timeout at {:?} allowed delay: {:?}", at, timeout); let timeout = futures_timer::Delay::new(timeout); let (view_already_exists, ready_at) = self.ready_at_internal(at); if view_already_exists { - return ready_at; + return ready_at.await; } let maybe_ready = async move { @@ -493,18 +488,19 @@ where }; let fall_back_ready = self.ready_at_light(at); - Box::pin(async { - let (maybe_ready, fall_back_ready) = - futures::future::join(maybe_ready.boxed(), fall_back_ready.boxed()).await; - maybe_ready.unwrap_or(fall_back_ready) - }) + let (maybe_ready, fall_back_ready) = + futures::future::join(maybe_ready, fall_back_ready).await; + maybe_ready.unwrap_or(fall_back_ready) } - fn ready_at_internal(&self, at: Block::Hash) -> (bool, PolledIterator) { + fn ready_at_internal( + &self, + at: Block::Hash, + ) -> (bool, Pin> + Send>>) { let mut ready_poll = self.ready_poll.lock(); if let Some((view, inactive)) = self.view_store.get_view_at(at, true) { - log::debug!(target: LOG_TARGET, "fatp::ready_at {at:?} (inactive:{inactive:?})"); + log::debug!(target: LOG_TARGET, "fatp::ready_at_internal {at:?} (inactive:{inactive:?})"); let iterator: ReadyIteratorFor = Box::new(view.pool.validated_pool().ready()); return (true, async move { iterator }.boxed()); } @@ -519,7 +515,7 @@ where }) .boxed(); log::debug!(target: LOG_TARGET, - "fatp::ready_at {at:?} pending keys: {:?}", + "fatp::ready_at_internal {at:?} pending keys: {:?}", ready_poll.pollers.keys() ); (false, pending) @@ -573,6 +569,7 @@ fn reduce_multiview_result(input: HashMap>>) -> Vec TransactionPool for ForkAwareTxPool where Block: BlockT, @@ -590,12 +587,12 @@ where /// /// The internal limits of the pool are checked. The results of submissions to individual views /// are reduced to single result. Refer to `reduce_multiview_result` for more details. - fn submit_at( + async fn submit_at( &self, _: ::Hash, source: TransactionSource, xts: Vec>, - ) -> PoolFuture, Self::Error>>, Self::Error> { + ) -> Result, Self::Error>>, Self::Error> { let view_store = self.view_store.clone(); log::debug!(target: LOG_TARGET, "fatp::submit_at count:{} views:{}", xts.len(), self.active_views_count()); log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "[{:?}] fatp::submit_at"); @@ -603,7 +600,7 @@ where let mempool_results = self.mempool.extend_unwatched(source, &xts); if view_store.is_empty() { - return future::ready(Ok(mempool_results)).boxed() + return Ok(mempool_results) } let to_be_submitted = mempool_results @@ -616,11 +613,10 @@ where .report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _)); let mempool = self.mempool.clone(); - async move { - let results_map = view_store.submit(source, to_be_submitted.into_iter()).await; - let mut submission_results = reduce_multiview_result(results_map).into_iter(); + let results_map = view_store.submit(source, to_be_submitted.into_iter()).await; + let mut submission_results = reduce_multiview_result(results_map).into_iter(); - Ok(mempool_results + Ok(mempool_results .into_iter() .map(|result| { result.and_then(|xt_hash| { @@ -633,60 +629,50 @@ where }) }) .collect::>()) - } - .boxed() } /// Submits a single transaction and returns a future resolving to the submission results. /// /// Actual transaction submission process is delegated to the `submit_at` function. - fn submit_one( + async fn submit_one( &self, _at: ::Hash, source: TransactionSource, xt: TransactionFor, - ) -> PoolFuture, Self::Error> { + ) -> Result, Self::Error> { log::trace!(target: LOG_TARGET, "[{:?}] fatp::submit_one views:{}", self.tx_hash(&xt), self.active_views_count()); - let result_future = self.submit_at(_at, source, vec![xt]); - async move { - let result = result_future.await; - match result { - Ok(mut v) => - v.pop().expect("There is exactly one element in result of submit_at. qed."), - Err(e) => Err(e), - } + match self.submit_at(_at, source, vec![xt]).await { + Ok(mut v) => + v.pop().expect("There is exactly one element in result of submit_at. qed."), + Err(e) => Err(e), } - .boxed() } /// Submits a transaction and starts to watch its progress in the pool, returning a stream of /// status updates. /// /// Actual transaction submission process is delegated to the `ViewStore` internal instance. - fn submit_and_watch( + async fn submit_and_watch( &self, at: ::Hash, source: TransactionSource, xt: TransactionFor, - ) -> PoolFuture>>, Self::Error> { + ) -> Result>>, Self::Error> { log::trace!(target: LOG_TARGET, "[{:?}] fatp::submit_and_watch views:{}", self.tx_hash(&xt), self.active_views_count()); let xt = Arc::from(xt); let xt_hash = match self.mempool.push_watched(source, xt.clone()) { Ok(xt_hash) => xt_hash, - Err(e) => return future::ready(Err(e)).boxed(), + Err(e) => return Err(e), }; self.metrics.report(|metrics| metrics.submitted_transactions.inc()); let view_store = self.view_store.clone(); let mempool = self.mempool.clone(); - async move { - view_store - .submit_and_watch(at, source, xt) - .await - .inspect_err(|_| mempool.remove(xt_hash)) - } - .boxed() + view_store + .submit_and_watch(at, source, xt) + .await + .inspect_err(|_| mempool.remove(xt_hash)) } /// Intended to remove transactions identified by the given hashes, and any dependent @@ -757,9 +743,9 @@ where } /// Returns an iterator for ready transactions at a specific block, ordered by priority. - fn ready_at(&self, at: ::Hash) -> PolledIterator { + async fn ready_at(&self, at: ::Hash) -> ReadyIteratorFor { let (_, result) = self.ready_at_internal(at); - result + result.await } /// Returns an iterator for ready transactions, ordered by priority. @@ -782,12 +768,12 @@ where /// /// If the timeout expires before the maintain process is accomplished, a best-effort /// set of transactions is returned (refer to `ready_at_light`). - fn ready_at_with_timeout( + async fn ready_at_with_timeout( &self, at: ::Hash, timeout: std::time::Duration, - ) -> PolledIterator { - self.ready_at_with_timeout_internal(at, timeout) + ) -> ReadyIteratorFor { + self.ready_at_with_timeout_internal(at, timeout).await } } diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs index 9f979e216b6d..5f7294a24fd7 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs @@ -201,12 +201,12 @@ //! required to accomplish it. //! //! ### Providing ready transactions: `ready_at` -//! The [`ready_at`] function returns a [future][`crate::PolledIterator`] that resolves to the -//! [ready transactions iterator][`ReadyTransactions`]. The block builder shall wait either for the -//! future to be resolved or for timeout to be hit. To avoid building empty blocks in case of -//! timeout, the waiting for timeout functionality was moved into the transaction pool, and new API -//! function was added: [`ready_at_with_timeout`]. This function also provides a fall back ready -//! iterator which is result of [light maintain](#light-maintain). +//! The asynchronous [`ready_at`] function resolves to the [ready transactions +//! iterator][`ReadyTransactions`]. The block builder shall wait either for the future to be +//! resolved or for timeout to be hit. To avoid building empty blocks in case of timeout, the +//! waiting for timeout functionality was moved into the transaction pool, and new API function was +//! added: [`ready_at_with_timeout`]. This function also provides a fall back ready iterator which +//! is result of [light maintain](#light-maintain). //! //! New function internally waits either for [maintain](#maintain) process triggered for requested //! block to be accomplished or for the timeout. If timeout hits then the result of [light diff --git a/substrate/client/transaction-pool/src/lib.rs b/substrate/client/transaction-pool/src/lib.rs index 888d25d3a0d2..3d3d596c291f 100644 --- a/substrate/client/transaction-pool/src/lib.rs +++ b/substrate/client/transaction-pool/src/lib.rs @@ -30,7 +30,7 @@ mod single_state_txpool; mod transaction_pool_wrapper; use common::{api, enactment_state}; -use std::{future::Future, pin::Pin, sync::Arc}; +use std::sync::Arc; pub use api::FullChainApi; pub use builder::{Builder, TransactionPoolHandle, TransactionPoolOptions, TransactionPoolType}; @@ -50,8 +50,6 @@ type BoxedReadyIterator = Box< type ReadyIteratorFor = BoxedReadyIterator, graph::ExtrinsicFor>; -type PolledIterator = Pin> + Send>>; - /// Log target for transaction pool. /// /// It can be used by other components for logging functionality strictly related to txpool (e.g. diff --git a/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs b/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs index 0826b95cf070..b29630b563bb 100644 --- a/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs +++ b/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs @@ -29,9 +29,8 @@ use crate::{ error, log_xt::log_xt_trace, }, - graph, - graph::{ExtrinsicHash, IsValidator}, - PolledIterator, ReadyIteratorFor, LOG_TARGET, + graph::{self, ExtrinsicHash, IsValidator}, + ReadyIteratorFor, LOG_TARGET, }; use async_trait::async_trait; use futures::{channel::oneshot, future, prelude::*, Future, FutureExt}; @@ -39,8 +38,8 @@ use parking_lot::Mutex; use prometheus_endpoint::Registry as PrometheusRegistry; use sc_transaction_pool_api::{ error::Error as TxPoolError, ChainEvent, ImportNotificationStream, MaintainedTransactionPool, - PoolFuture, PoolStatus, TransactionFor, TransactionPool, TransactionSource, - TransactionStatusStreamFor, TxHash, + PoolStatus, TransactionFor, TransactionPool, TransactionSource, TransactionStatusStreamFor, + TxHash, }; use sp_blockchain::{HashAndNumber, TreeRoute}; use sp_core::traits::SpawnEssentialNamed; @@ -224,26 +223,19 @@ where &self.api } - fn ready_at_with_timeout_internal( + async fn ready_at_with_timeout_internal( &self, at: Block::Hash, timeout: std::time::Duration, - ) -> PolledIterator { - let timeout = futures_timer::Delay::new(timeout); - let ready_maintained = self.ready_at(at); - let ready_current = self.ready(); - - let ready = async { - select! { - ready = ready_maintained => ready, - _ = timeout => ready_current - } - }; - - Box::pin(ready) + ) -> ReadyIteratorFor { + select! { + ready = self.ready_at(at)=> ready, + _ = futures_timer::Delay::new(timeout)=> self.ready() + } } } +#[async_trait] impl TransactionPool for BasicPool where Block: BlockT, @@ -255,12 +247,12 @@ where graph::base_pool::Transaction, graph::ExtrinsicFor>; type Error = PoolApi::Error; - fn submit_at( + async fn submit_at( &self, at: ::Hash, source: TransactionSource, xts: Vec>, - ) -> PoolFuture, Self::Error>>, Self::Error> { + ) -> Result, Self::Error>>, Self::Error> { let pool = self.pool.clone(); let xts = xts.into_iter().map(Arc::from).collect::>(); @@ -268,38 +260,32 @@ where .report(|metrics| metrics.submitted_transactions.inc_by(xts.len() as u64)); let number = self.api.resolve_block_number(at); - async move { - let at = HashAndNumber { hash: at, number: number? }; - Ok(pool.submit_at(&at, source, xts).await) - } - .boxed() + let at = HashAndNumber { hash: at, number: number? }; + Ok(pool.submit_at(&at, source, xts).await) } - fn submit_one( + async fn submit_one( &self, at: ::Hash, source: TransactionSource, xt: TransactionFor, - ) -> PoolFuture, Self::Error> { + ) -> Result, Self::Error> { let pool = self.pool.clone(); let xt = Arc::from(xt); self.metrics.report(|metrics| metrics.submitted_transactions.inc()); let number = self.api.resolve_block_number(at); - async move { - let at = HashAndNumber { hash: at, number: number? }; - pool.submit_one(&at, source, xt).await - } - .boxed() + let at = HashAndNumber { hash: at, number: number? }; + pool.submit_one(&at, source, xt).await } - fn submit_and_watch( + async fn submit_and_watch( &self, at: ::Hash, source: TransactionSource, xt: TransactionFor, - ) -> PoolFuture>>, Self::Error> { + ) -> Result>>, Self::Error> { let pool = self.pool.clone(); let xt = Arc::from(xt); @@ -307,13 +293,10 @@ where let number = self.api.resolve_block_number(at); - async move { - let at = HashAndNumber { hash: at, number: number? }; - let watcher = pool.submit_and_watch(&at, source, xt).await?; + let at = HashAndNumber { hash: at, number: number? }; + let watcher = pool.submit_and_watch(&at, source, xt).await?; - Ok(watcher.into_stream().boxed()) - } - .boxed() + Ok(watcher.into_stream().boxed()) } fn remove_invalid(&self, hashes: &[TxHash]) -> Vec> { @@ -343,9 +326,9 @@ where self.pool.validated_pool().ready_by_hash(hash) } - fn ready_at(&self, at: ::Hash) -> PolledIterator { + async fn ready_at(&self, at: ::Hash) -> ReadyIteratorFor { let Ok(at) = self.api.resolve_block_number(at) else { - return async { Box::new(std::iter::empty()) as Box<_> }.boxed() + return Box::new(std::iter::empty()) as Box<_> }; let status = self.status(); @@ -354,25 +337,23 @@ where // There could be transaction being added because of some re-org happening at the relevant // block, but this is relative unlikely. if status.ready == 0 && status.future == 0 { - return async { Box::new(std::iter::empty()) as Box<_> }.boxed() + return Box::new(std::iter::empty()) as Box<_> } if self.ready_poll.lock().updated_at() >= at { log::trace!(target: LOG_TARGET, "Transaction pool already processed block #{}", at); let iterator: ReadyIteratorFor = Box::new(self.pool.validated_pool().ready()); - return async move { iterator }.boxed() + return iterator } - self.ready_poll - .lock() - .add(at) - .map(|received| { - received.unwrap_or_else(|e| { - log::warn!(target: LOG_TARGET, "Error receiving pending set: {:?}", e); - Box::new(std::iter::empty()) - }) + let result = self.ready_poll.lock().add(at).map(|received| { + received.unwrap_or_else(|e| { + log::warn!(target: LOG_TARGET, "Error receiving pending set: {:?}", e); + Box::new(std::iter::empty()) }) - .boxed() + }); + + result.await } fn ready(&self) -> ReadyIteratorFor { @@ -384,12 +365,12 @@ where pool.futures().cloned().collect::>() } - fn ready_at_with_timeout( + async fn ready_at_with_timeout( &self, at: ::Hash, timeout: std::time::Duration, - ) -> PolledIterator { - self.ready_at_with_timeout_internal(at, timeout) + ) -> ReadyIteratorFor { + self.ready_at_with_timeout_internal(at, timeout).await } } diff --git a/substrate/client/transaction-pool/src/transaction_pool_wrapper.rs b/substrate/client/transaction-pool/src/transaction_pool_wrapper.rs index 4e1b53833b8f..e373c0278d80 100644 --- a/substrate/client/transaction-pool/src/transaction_pool_wrapper.rs +++ b/substrate/client/transaction-pool/src/transaction_pool_wrapper.rs @@ -22,16 +22,16 @@ use crate::{ builder::FullClientTransactionPool, graph::{base_pool::Transaction, ExtrinsicFor, ExtrinsicHash}, - ChainApi, FullChainApi, + ChainApi, FullChainApi, ReadyIteratorFor, }; use async_trait::async_trait; use sc_transaction_pool_api::{ ChainEvent, ImportNotificationStream, LocalTransactionFor, LocalTransactionPool, - MaintainedTransactionPool, PoolFuture, PoolStatus, ReadyTransactions, TransactionFor, - TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash, + MaintainedTransactionPool, PoolStatus, ReadyTransactions, TransactionFor, TransactionPool, + TransactionSource, TransactionStatusStreamFor, TxHash, }; use sp_runtime::traits::Block as BlockT; -use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc}; +use std::{collections::HashMap, pin::Pin, sync::Arc}; /// The wrapper for actual object providing implementation of TransactionPool. /// @@ -49,6 +49,7 @@ where + 'static, Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue; +#[async_trait] impl TransactionPool for TransactionPoolWrapper where Block: BlockT, @@ -68,44 +69,38 @@ where >; type Error = as ChainApi>::Error; - fn submit_at( + async fn submit_at( &self, at: ::Hash, source: TransactionSource, xts: Vec>, - ) -> PoolFuture, Self::Error>>, Self::Error> { - self.0.submit_at(at, source, xts) + ) -> Result, Self::Error>>, Self::Error> { + self.0.submit_at(at, source, xts).await } - fn submit_one( + async fn submit_one( &self, at: ::Hash, source: TransactionSource, xt: TransactionFor, - ) -> PoolFuture, Self::Error> { - self.0.submit_one(at, source, xt) + ) -> Result, Self::Error> { + self.0.submit_one(at, source, xt).await } - fn submit_and_watch( + async fn submit_and_watch( &self, at: ::Hash, source: TransactionSource, xt: TransactionFor, - ) -> PoolFuture>>, Self::Error> { - self.0.submit_and_watch(at, source, xt) + ) -> Result>>, Self::Error> { + self.0.submit_and_watch(at, source, xt).await } - fn ready_at( + async fn ready_at( &self, at: ::Hash, - ) -> Pin< - Box< - dyn Future< - Output = Box> + Send>, - > + Send, - >, - > { - self.0.ready_at(at) + ) -> ReadyIteratorFor> { + self.0.ready_at(at).await } fn ready(&self) -> Box> + Send> { @@ -140,19 +135,12 @@ where self.0.ready_transaction(hash) } - fn ready_at_with_timeout( + async fn ready_at_with_timeout( &self, at: ::Hash, timeout: std::time::Duration, - ) -> Pin< - Box< - dyn Future< - Output = Box> + Send>, - > + Send - + '_, - >, - > { - self.0.ready_at_with_timeout(at, timeout) + ) -> ReadyIteratorFor> { + self.0.ready_at_with_timeout(at, timeout).await } }