diff --git a/Cargo.lock b/Cargo.lock index 330c2563d976..c79adee6f38e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4680,6 +4680,7 @@ dependencies = [ "cumulus-relay-chain-interface", "cumulus-relay-chain-minimal-node", "futures", + "futures-timer", "polkadot-primitives 7.0.0", "sc-client-api", "sc-consensus", diff --git a/cumulus/client/service/Cargo.toml b/cumulus/client/service/Cargo.toml index 8e9e41ca89dc..0a77b465d96a 100644 --- a/cumulus/client/service/Cargo.toml +++ b/cumulus/client/service/Cargo.toml @@ -11,6 +11,7 @@ workspace = true [dependencies] futures = { workspace = true } +futures-timer = { workspace = true } # Substrate sc-client-api = { workspace = true, default-features = true } diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 68ac94539df8..ac9371a8941b 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -25,7 +25,7 @@ use crate::{ start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers, SpawnTaskHandle, TaskManager, TransactionPoolAdapter, }; -use futures::{future::ready, FutureExt, StreamExt}; +use futures::{select, FutureExt, StreamExt}; use jsonrpsee::RpcModule; use log::info; use prometheus_endpoint::Registry; @@ -90,7 +90,11 @@ use sp_consensus::block_validation::{ use sp_core::traits::{CodeExecutor, SpawnNamed}; use sp_keystore::KeystorePtr; use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero}; -use std::{str::FromStr, sync::Arc, time::SystemTime}; +use std::{ + str::FromStr, + sync::Arc, + time::{Duration, SystemTime}, +}; /// Full client type. pub type TFullClient = @@ -577,22 +581,42 @@ pub async fn propagate_transaction_notifications( Block: BlockT, ExPool: MaintainedTransactionPool::Hash>, { + const TELEMETRY_INTERVAL: Duration = Duration::from_secs(1); + // transaction notifications - transaction_pool - .import_notification_stream() - .for_each(move |hash| { - tx_handler_controller.propagate_transaction(hash); - let status = transaction_pool.status(); - telemetry!( - telemetry; - SUBSTRATE_INFO; - "txpool.import"; - "ready" => status.ready, - "future" => status.future, - ); - ready(()) - }) - .await; + let mut notifications = transaction_pool.import_notification_stream().fuse(); + let mut timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse(); + let mut tx_imported = false; + + loop { + select! { + notification = notifications.next() => { + let Some(hash) = notification else { return }; + + tx_handler_controller.propagate_transaction(hash); + + tx_imported = true; + }, + _ = timer => { + timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse(); + + if !tx_imported { + continue; + } + + tx_imported = false; + let status = transaction_pool.status(); + + telemetry!( + telemetry; + SUBSTRATE_INFO; + "txpool.import"; + "ready" => status.ready, + "future" => status.future, + ); + } + } + } } /// Initialize telemetry with provided configuration and return telemetry handle