From 6687a558be3ce7742cb162a59d245448c22226d1 Mon Sep 17 00:00:00 2001 From: Marko Petrlic Date: Wed, 19 Feb 2025 11:49:32 +0100 Subject: [PATCH] Added Tranasction State RPC --- Cargo.lock | 14 + Cargo.toml | 2 + node/Cargo.toml | 1 + node/src/cli.rs | 24 + node/src/command.rs | 26 +- node/src/lib.rs | 2 + node/src/main.rs | 1 + node/src/rpc.rs | 37 +- node/src/service.rs | 119 +++- node/src/transaction_state.rs | 652 ++++++++++++++++++++++ pallets/system/rpc/runtime-api/Cargo.toml | 4 +- pallets/system/rpc/runtime-api/src/lib.rs | 13 + rpc/kate-rpc/src/lib.rs | 12 + rpc/transaction-rpc/Cargo.toml | 21 + rpc/transaction-rpc/src/lib.rs | 82 +++ runtime/src/apis.rs | 40 +- runtime/src/version.rs | 2 +- 17 files changed, 996 insertions(+), 56 deletions(-) create mode 100644 node/src/transaction_state.rs create mode 100644 rpc/transaction-rpc/Cargo.toml create mode 100644 rpc/transaction-rpc/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 690bf96b0..924d5ea8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1180,6 +1180,7 @@ dependencies = [ "substrate-state-trie-migration-rpc", "tempfile", "testing-rpc", + "transaction-rpc", "try-runtime-cli", ] @@ -4001,7 +4002,9 @@ name = "frame-system-rpc-runtime-api" version = "4.0.0-dev" dependencies = [ "parity-scale-codec", + "scale-info", "sp-api", + "sp-std", ] [[package]] @@ -13476,6 +13479,17 @@ dependencies = [ "tracing-log 0.2.0", ] +[[package]] +name = "transaction-rpc" +version = "0.1.0" +dependencies = [ + "async-trait", + "avail-base", + "jsonrpsee", + "serde", + "sp-core", +] + [[package]] name = "trie-db" version = "0.24.0" diff --git a/Cargo.toml b/Cargo.toml index bd39ca8c2..bde5dfae6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "runtime", "runtime/fuzz", "node", + "rpc/transaction-rpc" ] [workspace.package] @@ -36,6 +37,7 @@ pallet-vector = { path = "pallets/vector", default-features = false } da-runtime = { path = "runtime", default-features = false } kate-rpc = { path = "rpc/kate-rpc" } testing-rpc = { path = "rpc/testing-rpc" } +transaction-rpc = { path = "rpc/transaction-rpc", default-features = false } patricia-merkle-trie = { path = "patricia-merkle-trie", default-features = false } sc-basic-authorship = { path = "client/basic-authorship", default-features = false } diff --git a/node/Cargo.toml b/node/Cargo.toml index 3f8194161..960bf0b5c 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -30,6 +30,7 @@ pallet-vector.workspace = true frame-system-rpc-runtime-api.workspace = true frame-system = { workspace = true, default-features = false } sc-basic-authorship.workspace = true +transaction-rpc.workspace = true # 3rd-party codec = { package = "parity-scale-codec", version = "3" } diff --git a/node/src/cli.rs b/node/src/cli.rs index 079a55c86..baacb0518 100644 --- a/node/src/cli.rs +++ b/node/src/cli.rs @@ -64,6 +64,30 @@ pub struct Cli { /// This parameter can be used to update the network name and id of the `dev` and `dev_tri` chains. #[arg(long)] pub network_name: Option, + + /// Enable Transaction State RPC. This allows querying the transaction state (success or failure) + /// using only a transaction hash. + #[clap(long = "enable-tx-state-rpc", default_value_t = false)] + pub tx_state_rpc_enabled: bool, + + /// The maximum number of results the transaction state RPC will return for a transaction hash. + /// If a transaction hash appears in multiple blocks, the RPC will return only the top `X` transaction states. + /// In most cases, the transaction hash is unique, so this parameter is usually irrelevant. + #[clap(long, default_value_t = 10)] + pub tx_state_rpc_max_search_results: usize, + + /// The maximum number of blocks preserved and stored in the transaction state RPC database. + /// + /// The default is 31 days' worth of blocks. + #[clap(long, default_value_t = 133920)] + pub tx_state_rpc_max_stored_block_count: usize, + + /// Logging interval for transaction state, in milliseconds. + /// A lower value results in more frequent log updates. + /// + /// The default is 300_000 milliseconds (300 seconds). + #[clap(long, default_value_t = 300_000)] + pub tx_state_logging_interval: u64, } fn kate_max_cells_size_upper_bound(s: &str) -> Result { diff --git a/node/src/command.rs b/node/src/command.rs index 301efcfde..94a1c936c 100644 --- a/node/src/command.rs +++ b/node/src/command.rs @@ -32,6 +32,7 @@ use { use crate::{ cli::{Cli, Subcommand}, service::{self, new_partial, FullClient}, + transaction_state, }; use avail_node::NODE_VERSION; @@ -208,9 +209,8 @@ pub fn run() -> Result<()> { } = new_partial( &config, cli.unsafe_da_sync, - cli.kate_max_cells_size, - cli.kate_rpc_enabled, - cli.kate_rpc_metrics_enabled, + kate_rpc::Deps::default(), + transaction_state::CliDeps::default(), )?; Ok((cmd.run(client, import_queue), task_manager)) }) @@ -225,9 +225,8 @@ pub fn run() -> Result<()> { } = new_partial( &config, cli.unsafe_da_sync, - cli.kate_max_cells_size, - cli.kate_rpc_enabled, - cli.kate_rpc_metrics_enabled, + kate_rpc::Deps::default(), + transaction_state::CliDeps::default(), )?; Ok((cmd.run(client, config.database), task_manager)) }) @@ -242,9 +241,8 @@ pub fn run() -> Result<()> { } = new_partial( &config, cli.unsafe_da_sync, - cli.kate_max_cells_size, - cli.kate_rpc_enabled, - cli.kate_rpc_metrics_enabled, + kate_rpc::Deps::default(), + transaction_state::CliDeps::default(), )?; Ok((cmd.run(client, config.chain_spec), task_manager)) }) @@ -260,9 +258,8 @@ pub fn run() -> Result<()> { } = new_partial( &config, cli.unsafe_da_sync, - cli.kate_max_cells_size, - cli.kate_rpc_enabled, - cli.kate_rpc_metrics_enabled, + kate_rpc::Deps::default(), + transaction_state::CliDeps::default(), )?; Ok((cmd.run(client, import_queue), task_manager)) }) @@ -282,9 +279,8 @@ pub fn run() -> Result<()> { } = new_partial( &config, cli.unsafe_da_sync, - cli.kate_max_cells_size, - cli.kate_rpc_enabled, - cli.kate_rpc_metrics_enabled, + kate_rpc::Deps::default(), + transaction_state::CliDeps::default(), )?; let aux_revert = Box::new(|client: Arc, backend, blocks| { sc_consensus_babe::revert(client.clone(), backend, blocks)?; diff --git a/node/src/lib.rs b/node/src/lib.rs index 6a742311d..cd32690a7 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -7,4 +7,6 @@ pub mod cli; pub mod rpc; pub mod service; +mod transaction_state; + pub const NODE_VERSION: &str = "2.2.1"; diff --git a/node/src/main.rs b/node/src/main.rs index 4205202d3..5c96480f2 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -9,6 +9,7 @@ mod cli; mod command; mod da_block_import; mod rpc; +mod transaction_state; fn main() -> sc_cli::Result<()> { command::run() diff --git a/node/src/rpc.rs b/node/src/rpc.rs index 558cff7bf..f6f84cd1a 100644 --- a/node/src/rpc.rs +++ b/node/src/rpc.rs @@ -94,14 +94,18 @@ pub struct FullDeps { pub babe: BabeDeps, /// GRANDPA specific dependencies. pub grandpa: GrandpaDeps, - /// The maximum number of cells that can be requested in one go. - pub kate_max_cells_size: usize, - /// Enable Kate RPCs - pub kate_rpc_enabled: bool, - /// Enable Kate RPCs Metrics + /// Kate RPC specific dependencies. /// - /// Should not be used unless unless you know what you're doing. - pub kate_rpc_metrics_enabled: bool, + /// Available configs: + /// - pub max_cells_size: usize, + /// - pub rpc_enabled: bool, + /// - pub rpc_metrics_enabled: bool, + pub kate_rpc_deps: kate_rpc::Deps, + /// Transaction RPC specific dependencies. + /// + /// Available configs: + /// - TxStateSender, + pub transaction_rpc_deps: Option, } /// Instantiate all Full RPC extensions. @@ -152,9 +156,8 @@ where deny_unsafe, babe, grandpa, - kate_max_cells_size, - kate_rpc_enabled, - kate_rpc_metrics_enabled, + kate_rpc_deps, + transaction_rpc_deps, } = deps; let BabeDeps { @@ -226,20 +229,26 @@ where io.merge(StateMigration::new(client.clone(), backend, deny_unsafe).into_rpc())?; - if is_dev_chain || kate_rpc_metrics_enabled { + if is_dev_chain || kate_rpc_deps.rpc_metrics_enabled { io.merge(KateApiMetricsServer::into_rpc(Kate::::new( client.clone(), - kate_max_cells_size, + kate_rpc_deps.max_cells_size, )))?; } - if is_dev_chain || kate_rpc_enabled || kate_rpc_metrics_enabled { + if is_dev_chain || kate_rpc_deps.rpc_enabled || kate_rpc_deps.rpc_metrics_enabled { io.merge(KateApiServer::into_rpc(Kate::::new( client, - kate_max_cells_size, + kate_rpc_deps.max_cells_size, )))?; } + if let Some(deps) = transaction_rpc_deps { + io.merge(transaction_rpc::TransactionStateServer::into_rpc( + transaction_rpc::System::new(deps), + ))?; + } + #[cfg(feature = "testing-environment")] io.merge(TestingApiServer::into_rpc(TestingEnv))?; diff --git a/node/src/service.rs b/node/src/service.rs index 1fffb2322..bb82fa438 100644 --- a/node/src/service.rs +++ b/node/src/service.rs @@ -19,6 +19,7 @@ //! Service and ServiceFactory implementation. Specialized wrapper over substrate service. #![allow(dead_code)] +use crate::transaction_state; use crate::{cli::Cli, rpc as node_rpc}; use avail_core::AppId; use da_runtime::{apis::RuntimeApi, NodeBlock as Block, Runtime}; @@ -26,6 +27,7 @@ use da_runtime::{apis::RuntimeApi, NodeBlock as Block, Runtime}; use codec::Encode; use frame_system_rpc_runtime_api::AccountNonceApi; use futures::prelude::*; +use jsonrpsee::tokio::sync::mpsc::channel; use pallet_transaction_payment::ChargeTransactionPayment; use sc_client_api::{Backend, BlockBackend}; use sc_consensus_babe::{self, SlotProportion}; @@ -36,6 +38,7 @@ use sc_service::{ error::Error as ServiceError, Configuration, RpcHandlers, TaskManager, WarpSyncParams, }; use sc_telemetry::custom_telemetry::external::BlockIntervalFromNode; +use sc_telemetry::log; use sc_telemetry::{custom_telemetry::CustomTelemetryWorker, Telemetry, TelemetryWorker}; use sc_transaction_pool_api::OffchainTransactionPoolFactory; use sp_api::ProvideRuntimeApi; @@ -170,9 +173,8 @@ pub fn create_extrinsic( pub fn new_partial( config: &Configuration, unsafe_da_sync: bool, - kate_max_cells_size: usize, - kate_rpc_enabled: bool, - kate_rpc_metrics_enabled: bool, + kate_rpc_deps: kate_rpc::Deps, + tx_state_cli_deps: transaction_state::CliDeps, ) -> Result< sc_service::PartialComponents< FullClient, @@ -192,6 +194,7 @@ pub fn new_partial( ), sc_consensus_grandpa::SharedVoterState, Option, + Option, ), >, ServiceError, @@ -292,6 +295,25 @@ pub fn new_partial( let import_setup = (da_block_import, grandpa_link, babe_link); + let mut transaction_rpc_deps = None; + let mut tx_state_deps = None; + if tx_state_cli_deps.enabled { + let (search_send, search_recv) = channel::(10_000); + let (block_send, block_recv) = channel::(50_000); + + let deps = transaction_state::Deps { + block_receiver: block_recv, + block_sender: block_send, + search_receiver: search_recv, + cli: tx_state_cli_deps, + }; + + transaction_rpc_deps = Some(transaction_rpc::Deps { + sender: search_send, + }); + tx_state_deps = Some(deps); + } + let (rpc_extensions_builder, rpc_setup) = { let (_, grandpa_link, _) = &import_setup; @@ -330,9 +352,8 @@ pub fn new_partial( subscription_executor, finality_provider: finality_proof_provider.clone(), }, - kate_max_cells_size, - kate_rpc_enabled, - kate_rpc_metrics_enabled, + kate_rpc_deps: kate_rpc_deps.clone(), + transaction_rpc_deps: transaction_rpc_deps.clone(), }; node_rpc::create_full(deps, rpc_backend.clone()).map_err(Into::into) @@ -349,7 +370,13 @@ pub fn new_partial( select_chain, import_queue, transaction_pool, - other: (rpc_extensions_builder, import_setup, rpc_setup, telemetry), + other: ( + rpc_extensions_builder, + import_setup, + rpc_setup, + telemetry, + tx_state_deps, + ), }) } @@ -376,9 +403,8 @@ pub fn new_full_base( disable_hardware_benchmarks: bool, with_startup_data: impl FnOnce(&BlockImport, &sc_consensus_babe::BabeLink), unsafe_da_sync: bool, - kate_max_cells_size: usize, - kate_rpc_enabled: bool, - kate_rpc_metrics_enabled: bool, + kate_rpc_deps: kate_rpc::Deps, + tx_state_cli_deps: transaction_state::CliDeps, ) -> Result { let hwbench = if !disable_hardware_benchmarks { config.database.path().map(|database_path| { @@ -397,14 +423,8 @@ pub fn new_full_base( keystore_container, select_chain, transaction_pool, - other: (rpc_builder, import_setup, rpc_setup, mut telemetry), - } = new_partial( - &config, - unsafe_da_sync, - kate_max_cells_size, - kate_rpc_enabled, - kate_rpc_metrics_enabled, - )?; + other: (rpc_builder, import_setup, rpc_setup, mut telemetry, tx_state_deps), + } = new_partial(&config, unsafe_da_sync, kate_rpc_deps, tx_state_cli_deps)?; let shared_voter_state = rpc_setup; let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht; @@ -642,6 +662,53 @@ pub fn new_full_base( } network_starter.start_network(); + + // Spawning Transaction Info Workers + + if let Some(deps) = tx_state_deps { + log::info!("👾 Transaction State RPC is enabled."); + let worker_1 = transaction_state::IncludedWorker { + rpc_handlers: rpc_handlers.clone(), + client: client.clone(), + sender: deps.block_sender.clone(), + logger: transaction_state::WorkerLogging::new( + "Inclusion Worker".into(), + deps.cli.logging_interval, + ), + }; + + let worker_2 = transaction_state::FinalizedWorker { + rpc_handlers: rpc_handlers.clone(), + client: client.clone(), + sender: deps.block_sender.clone(), + max_stored_block_count: deps.cli.max_stored_block_count, + logger: transaction_state::WorkerLogging::new( + "Finalization Worker".into(), + deps.cli.logging_interval, + ), + }; + + let db = transaction_state::Database::new( + deps.block_receiver, + deps.search_receiver, + deps.cli.max_search_results, + deps.cli.max_stored_block_count, + deps.cli.logging_interval, + ); + + task_manager + .spawn_handle() + .spawn("tx-state-worker-i", None, worker_1.run()); + task_manager + .spawn_handle() + .spawn("tx-state-worker-f", None, worker_2.run()); + task_manager + .spawn_handle() + .spawn("tx-state-db", None, db.run()); + } else { + log::info!("👾 Transaction State RPC is disabled."); + } + Ok(NewFullBase { task_manager, client, @@ -655,14 +722,24 @@ pub fn new_full_base( /// Builds a new service for a full client. pub fn new_full(config: Configuration, cli: Cli) -> Result { let database_path = config.database.path().map(Path::to_path_buf); + let kate_rpc_deps = kate_rpc::Deps { + max_cells_size: cli.kate_max_cells_size, + rpc_enabled: cli.kate_rpc_enabled, + rpc_metrics_enabled: cli.kate_rpc_metrics_enabled, + }; + let tx_state_cli_deps = transaction_state::CliDeps { + max_search_results: cli.tx_state_rpc_max_search_results, + max_stored_block_count: cli.tx_state_rpc_max_stored_block_count, + enabled: cli.tx_state_rpc_enabled, + logging_interval: cli.tx_state_logging_interval, + }; let task_manager = new_full_base( config, cli.no_hardware_benchmarks, |_, _| (), cli.unsafe_da_sync, - cli.kate_max_cells_size, - cli.kate_rpc_enabled, - cli.kate_rpc_metrics_enabled, + kate_rpc_deps, + tx_state_cli_deps, ) .map(|NewFullBase { task_manager, .. }| task_manager)?; diff --git a/node/src/transaction_state.rs b/node/src/transaction_state.rs new file mode 100644 index 000000000..8d73ee336 --- /dev/null +++ b/node/src/transaction_state.rs @@ -0,0 +1,652 @@ +use std::collections::VecDeque; +use std::ops::Add; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use avail_core::OpaqueExtrinsic; +use codec::{decode_from_bytes, Encode}; +use da_runtime::UncheckedExtrinsic; +use frame_system_rpc_runtime_api::TransactionSuccessStatus; +use jsonrpsee::tokio; +use jsonrpsee::tokio::sync::mpsc::{Receiver, Sender}; +use sc_service::RpcHandlers; +use sc_telemetry::log; +use serde::{Deserialize, Serialize}; +use sp_core::{bytes::from_hex, Blake2Hasher, Hasher, H256}; +use sp_runtime::generic::BlockId; +use sp_runtime::traits::BlockIdTo; +use transaction_rpc::TxStateReceiver as SearchReceiver; +use transaction_rpc::{OneShotTxStateSender, TransactionState as RPCTransactionState}; + +use crate::service::FullClient; + +#[derive(Clone, Default)] +pub struct CliDeps { + pub max_search_results: usize, + pub max_stored_block_count: usize, + pub logging_interval: u64, + pub enabled: bool, +} + +pub struct Deps { + pub block_receiver: Receiver, + pub block_sender: Sender, + pub search_receiver: SearchReceiver, + pub cli: CliDeps, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BlockDetails { + pub block_hash: H256, + pub block_height: u32, + pub finalized: bool, + pub transactions: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TransactionState { + pub tx_hash: H256, + pub tx_index: u32, + pub tx_success: bool, + pub pallet_index: u8, + pub call_index: u8, +} + +pub struct DatabaseLogging { + pub rpc_calls: Vec, + pub new_blocks: Vec, + pub timer: Instant, + timer_interval: Duration, +} + +impl DatabaseLogging { + pub fn new(logging_interval: u64) -> Self { + Self { + timer: Instant::now(), + rpc_calls: Default::default(), + new_blocks: Default::default(), + timer_interval: Duration::from_millis(logging_interval), + } + } + pub fn add_block(&mut self, duration: Duration) { + self.new_blocks.push(duration); + } + + pub fn add_rpc_call(&mut self, duration: Duration) { + self.rpc_calls.push(duration); + } + + pub fn log(&mut self, included_queue_size: usize, finalized_queue_size: usize) { + if self.timer.elapsed() < self.timer_interval { + return; + } + + let mut message = String::new(); + + if !self.rpc_calls.is_empty() { + let (count, total, min, median, max) = generate_duration_stats(&mut self.rpc_calls); + message = std::format!("RPC call count: {}, Total Duration: {:.02?}, Min Duration: {:.02?}, Median Duration: {:.02?}, Max Duration: {:.02?}. ", count, total, min, median, max); + } + + if !self.new_blocks.is_empty() { + let (count, total, min, median, max) = generate_duration_stats(&mut self.new_blocks); + message = std::format!("{}Block received count: {}, Total Duration: {:.02?}, Min Duration: {:.02?}, Median Duration: {:.02?}, Max Duration: {:.02?}. ", message, count, total, min, median, max) + } + + if !message.is_empty() { + log::info!( + "👾 {}Included Block Queue Size: {}, Finalized Block Queue Size: {}", + message, + included_queue_size, + finalized_queue_size + ); + } + + self.rpc_calls.clear(); + self.rpc_calls.shrink_to(25_000); + self.new_blocks.clear(); + self.new_blocks.shrink_to(25_000); + + self.timer = Instant::now(); + } +} + +pub struct Database { + included_blocks: VecDeque, + finalized_blocks: VecDeque, + block_receiver: Receiver, + search_receiver: SearchReceiver, + max_search_results: usize, + max_stored_block_count: usize, + logger: DatabaseLogging, +} + +impl Database { + pub fn new( + block_receiver: Receiver, + search_receiver: SearchReceiver, + max_search_results: usize, + max_stored_block_count: usize, + logging_interval: u64, + ) -> Self { + Self { + included_blocks: VecDeque::new(), + finalized_blocks: VecDeque::new(), + block_receiver, + search_receiver, + max_search_results, + max_stored_block_count, + logger: DatabaseLogging::new(logging_interval), + } + } + + pub async fn run(mut self) { + log::info!("👾 Transaction State Running with following parameters: Max Search Result: {}, Max Stored Block Count: {}", self.max_search_results, self.max_stored_block_count); + + loop { + if !self.block_receiver.is_empty() { + while let Ok(new_block) = self.block_receiver.try_recv() { + let now = Instant::now(); + self.add_block(new_block); + self.logger.add_block(now.elapsed()); + } + } + + if !self.search_receiver.is_empty() { + while let Ok(details) = self.search_receiver.try_recv() { + let now = Instant::now(); + self.send_transaction_state(details); + self.logger.add_rpc_call(now.elapsed()); + } + } + + self.logger + .log(self.included_blocks.len(), self.finalized_blocks.len()); + + tokio::time::sleep(Duration::from_millis(200)).await; + } + } + + fn search_transaction_status( + &self, + tx_hash: H256, + array: &VecDeque, + out: &mut Vec, + ) { + for block in array.iter().rev() { + for status in &block.transactions { + if status.tx_hash != tx_hash { + continue; + } + + out.push(RPCTransactionState { + block_hash: block.block_hash, + block_height: block.block_height, + tx_hash: status.tx_hash, + tx_index: status.tx_index, + tx_success: status.tx_success, + pallet_index: status.pallet_index, + call_index: status.call_index, + is_finalized: block.finalized, + }); + + if out.len() >= self.max_search_results { + return; + } + } + } + } + + fn send_transaction_state(&self, details: (H256, bool, OneShotTxStateSender)) { + let (tx_hash, is_finalized, oneshot) = details; + + let mut result: Vec = Vec::new(); + if !is_finalized { + self.search_transaction_status(tx_hash, &self.included_blocks, &mut result); + } + if result.len() < self.max_search_results { + self.search_transaction_status(tx_hash, &self.finalized_blocks, &mut result); + } + + _ = oneshot.send(result); + } + + fn push_new_finalized_block(&mut self, new_block: BlockDetails, index: usize) { + self.finalized_blocks.insert(index, new_block); + + while self.finalized_blocks.len() >= self.max_stored_block_count { + self.finalized_blocks.pop_front(); + } + } + + fn add_finalized_block(&mut self, new_block: BlockDetails) { + // Remove the same block height from included block vector + while let Some(pos) = self + .included_blocks + .iter() + .position(|b| b.block_height == new_block.block_height) + { + self.included_blocks.remove(pos); + } + + // If higher height push it to the back + if self + .finalized_blocks + .back() + .is_some_and(|b| new_block.block_height >= b.block_height) + { + self.push_new_finalized_block(new_block, self.finalized_blocks.len()); + return; + } + + // If lower height push it to the front + if self + .finalized_blocks + .front() + .is_some_and(|b| new_block.block_height <= b.block_height) + { + self.push_new_finalized_block(new_block, 0); + return; + } + + // If somewhere in between push it there. + // + // It's unlikely that this code will be executed. + // During the sync phase new blocks are pushed to the front and during normal + // operations blocks are push to the back. + for (i, elem) in self.finalized_blocks.iter().enumerate().rev() { + if new_block.block_height >= elem.block_height { + self.push_new_finalized_block(new_block, i + 1); + return; + } + } + + // If no block is present or if we didn't find a position for it, push it to the front. + self.push_new_finalized_block(new_block, 0); + } + + fn add_block(&mut self, new_block: BlockDetails) { + match new_block.finalized { + true => self.add_finalized_block(new_block), + false => self.add_included_block(new_block), + } + } + + fn add_included_block(&mut self, new_block: BlockDetails) { + if self.included_blocks.len() >= self.max_stored_block_count { + self.included_blocks.pop_front(); + } + + self.included_blocks.push_back(new_block); + } +} + +pub struct IncludedWorker { + pub rpc_handlers: RpcHandlers, + pub client: Arc, + pub sender: Sender, + pub logger: WorkerLogging, +} + +impl IncludedWorker { + pub async fn run(mut self) { + wait_for_sync(&self.rpc_handlers).await; + + let mut current_block_hash = H256::default(); + loop { + let block = self.fetch_next_block(¤t_block_hash).await; + let block = build_block_details(block.0, block.1, block.2, block.3, false).await; + current_block_hash = block.block_hash.clone(); + + let ok = self.sender.send(block).await; + if ok.is_err() { + return; + } + } + } + + async fn fetch_next_block( + &mut self, + current_block_hash: &H256, + ) -> ( + Vec, + H256, + u32, + Vec, + ) { + loop { + let chain_info = self.client.chain_info(); + let (block_hash, block_height) = (chain_info.best_hash, chain_info.best_number); + + if (*current_block_hash).eq(&block_hash) { + tokio::time::sleep(Duration::from_millis(1000)).await; + continue; + } + + let now = Instant::now(); + + let Some(states) = fetch_execution_states(&self.rpc_handlers, &block_hash).await else { + tokio::time::sleep(Duration::from_millis(2500)).await; + continue; + }; + + let Ok(Some(extrinsics)) = self.client.body(block_hash) else { + tokio::time::sleep(Duration::from_millis(2500)).await; + continue; + }; + + self.logger.add_block_fetch(now.elapsed()); + self.logger.log(); + + return (extrinsics, block_hash, block_height, states); + } + } +} + +pub struct FinalizedWorker { + pub rpc_handlers: RpcHandlers, + pub client: Arc, + pub sender: Sender, + pub max_stored_block_count: usize, + pub logger: WorkerLogging, +} + +impl FinalizedWorker { + pub async fn run(mut self) { + wait_for_sync(&self.rpc_handlers).await; + let mut height = self.index_old_blocks().await; + + loop { + let block = self.fetch_next_block(&mut height).await; + let block = build_block_details(block.0, block.1, height, block.2, true).await; + + let ok = self.sender.send(block).await; + if ok.is_err() { + return; + } + + height += 1; + } + } + + async fn index_old_blocks(&self) -> u32 { + let chain_info = self.client.chain_info(); + if chain_info.finalized_number == 0 { + return chain_info.finalized_number; + } + + let mut max_block_count = self.max_stored_block_count; + let mut height = chain_info.finalized_number - 1; + loop { + // If we cannot fetch header, block details, or transaction states then we bail out. + // + // This most likely means that the pruning strategy removed the header and/or block body + // or the new runtime API is not there so there isn't much that we can do. + let Some(block) = self.fetch_block(height).await else { + break; + }; + + let block = build_block_details(block.0, block.1, height, block.2, true).await; + + // Failure would mean that the other end of the channel is closed which means that we should bail out. + let ok = self.sender.send(block).await; + if ok.is_err() { + break; + } + + if height == 0 || max_block_count == 0 { + break; + } + + max_block_count -= 1; + height -= 1; + } + + chain_info.finalized_number + } + + async fn fetch_block( + &self, + block_height: u32, + ) -> Option<(Vec, H256, Vec)> { + let block_hash = self.client.to_hash(&BlockId::Number(block_height)); + + // If Err then bail out. + // If None then bail out as there is no header available. + let Ok(Some(block_hash)) = block_hash else { + return None; + }; + + // If Err then bail out. + // If None then bail out as there is no block to be found. + let Ok(Some(extrinsics)) = self.client.body(block_hash) else { + return None; + }; + + // If we cannot fetch the transaction execution statutes (success or failure) then we bail out. + // + // This most likely means that our new Runtime API is not available so there isn't much that we can do. + let Some(states) = fetch_execution_states(&self.rpc_handlers, &block_hash).await else { + return None; + }; + + return Some((extrinsics, block_hash, states)); + } + + async fn fetch_next_block( + &mut self, + height: &mut u32, + ) -> (Vec, H256, Vec) { + loop { + let chain_info = self.client.chain_info(); + if *height > chain_info.finalized_number { + tokio::time::sleep(Duration::from_millis(1000)).await; + continue; + } + + let now = Instant::now(); + + let block_hash = self.client.to_hash(&BlockId::Number(*height)); + let Ok(Some(block_hash)) = block_hash else { + *height = *height + 1; + continue; + }; + + let Ok(Some(extrinsics)) = self.client.body(block_hash) else { + *height = *height + 1; + continue; + }; + + let Some(states) = fetch_execution_states(&self.rpc_handlers, &block_hash).await else { + *height = *height + 1; + continue; + }; + + self.logger.add_block_fetch(now.elapsed()); + self.logger.log(); + + return (extrinsics, block_hash, states); + } + } +} + +async fn wait_for_sync(handler: &RpcHandlers) { + loop { + match fetch_sync_status(handler).await { + Some(true) => (), + Some(false) => return, + None => (), + } + + tokio::time::sleep(Duration::from_secs(10)).await; + } +} + +async fn fetch_sync_status(handler: &RpcHandlers) -> Option { + let query = r#"{ + "jsonrpc": "2.0", + "method": "system_health", + "params": [], + "id": 0 + }"#; + + let res = handler.rpc_query(&query).await.ok()?; + let json = serde_json::from_str::(&res.0).ok()?; + let result_json = json["result"].as_object()?; + + result_json["isSyncing"].as_bool() +} + +async fn fetch_execution_states( + handlers: &RpcHandlers, + block_hash: &H256, +) -> Option> { + let query = format!( + r#"{{ + "jsonrpc": "2.0", + "method": "state_call", + "params": ["SystemEventsApi_fetch_transaction_success_status", "0x", "{}"], + "id": 0 + }}"#, + std::format!("{:?}", block_hash) + ); + + let (res, _) = handlers.rpc_query(&query).await.ok()?; + let json = serde_json::from_str::(&res).ok()?; + + let result_json = json["result"].as_str()?; + let result = from_hex(result_json).ok()?; + let res = decode_from_bytes::>(result.into()).ok()?; + + Some(res) +} + +async fn build_block_details( + extrinsics: Vec, + block_hash: H256, + block_height: u32, + execution_status: Vec, + finalized: bool, +) -> BlockDetails { + let mut txs: Vec = Vec::with_capacity(extrinsics.len()); + for (i, ext) in extrinsics.iter().enumerate() { + let unchecked_ext = match UncheckedExtrinsic::decode_no_vec_prefix(&mut ext.0.as_slice()) { + Ok(x) => x, + Err(err) => { + println!("Failed to convert OpaqExt to Unchecked, {}", err); + continue; + }, + }; + + let Some((pallet_index, call_index)) = read_pallet_call_index(&unchecked_ext) else { + continue; + }; + + let tx_hash = Blake2Hasher::hash(&unchecked_ext.encode()); + + let status = execution_status.iter().find(|x| x.tx_index == i as u32); + let Some(status) = status else { continue }; + let info = TransactionState { + tx_hash, + tx_index: status.tx_index, + tx_success: status.tx_success, + pallet_index, + call_index, + }; + txs.push(info); + } + + let block = BlockDetails { + block_hash, + block_height, + finalized, + transactions: txs, + }; + + block +} + +fn read_pallet_call_index(ext: &UncheckedExtrinsic) -> Option<(u8, u8)> { + let ext = ext.function.encode(); + if ext.len() < 2 { + return None; + } + let pallet_index = ext[0]; + let call_index = ext[1]; + + Some((pallet_index, call_index)) +} + +pub struct WorkerLogging { + pub block_fetch: Vec, + pub timer: Instant, + timer_interval: Duration, + name: String, +} + +impl WorkerLogging { + pub fn new(name: String, logging_interval: u64) -> Self { + Self { + block_fetch: Default::default(), + timer: Instant::now(), + timer_interval: Duration::from_millis(logging_interval), + name, + } + } + + pub fn add_block_fetch(&mut self, duration: Duration) { + self.block_fetch.push(duration); + } + + pub fn log(&mut self) { + if self.timer.elapsed() < self.timer_interval { + return; + } + + let mut message = String::new(); + + if !self.block_fetch.is_empty() { + let (count, total, min, median, max) = generate_duration_stats(&mut self.block_fetch); + message = std::format!("Block fetch count: {}, Total Duration: {:.02?}, Min Duration: {:.02?}, Median Duration: {:.02?}, Max Duration: {:.02?}. ", count, total, min, median, max); + } + + if !message.is_empty() { + log::info!("👾 {}: {}", self.name, message,); + } + + self.block_fetch.clear(); + self.block_fetch.shrink_to(25_000); + + self.timer = Instant::now(); + } +} + +fn generate_duration_stats( + array: &mut Vec, +) -> (usize, Duration, Duration, Duration, Duration) { + array.sort_unstable(); + + let min = array + .first() + .cloned() + .unwrap_or_else(|| Duration::default()); + + let max = array.last().cloned().unwrap_or_else(|| Duration::default()); + + let count = array.len(); + let total_duration = array.iter().fold(Duration::default(), |acc, x| acc.add(*x)); + let median = if count % 2 != 0 { + array + .get(count / 2) + .cloned() + .unwrap_or_else(|| Duration::default()) + } else { + if let (Some(left), Some(right)) = (array.get(count / 2), array.get(count / 2 - 1)) { + (left.add(*right)).div_f64(2.0) + } else { + Duration::default() + } + }; + + (count, total_duration, min, median, max) +} diff --git a/pallets/system/rpc/runtime-api/Cargo.toml b/pallets/system/rpc/runtime-api/Cargo.toml index f3a555683..f95ba4f89 100644 --- a/pallets/system/rpc/runtime-api/Cargo.toml +++ b/pallets/system/rpc/runtime-api/Cargo.toml @@ -15,7 +15,9 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] sp-api = { workspace = true, default-features = false } codec = { package = "parity-scale-codec", version = "3", default-features = false } +sp-std = { workspace = true, default-features = false } +scale-info = { workspace = true, default-features = false } [features] default = [ "std" ] -std = [ "codec/std", "sp-api/std" ] +std = [ "codec/std", "sp-api/std", "sp-std/std" ] diff --git a/pallets/system/rpc/runtime-api/src/lib.rs b/pallets/system/rpc/runtime-api/src/lib.rs index f59988d81..872470a8c 100644 --- a/pallets/system/rpc/runtime-api/src/lib.rs +++ b/pallets/system/rpc/runtime-api/src/lib.rs @@ -23,6 +23,8 @@ #![cfg_attr(not(feature = "std"), no_std)] +use sp_std::vec::Vec; + sp_api::decl_runtime_apis! { /// The API to query account nonce. pub trait AccountNonceApi where @@ -32,4 +34,15 @@ sp_api::decl_runtime_apis! { /// Get current account nonce of given `AccountId`. fn account_nonce(account: AccountId) -> Nonce; } + + #[api_version(1)] + pub trait SystemEventsApi { + fn fetch_transaction_success_status() -> Vec; + } +} + +#[derive(Debug, Clone, Copy, scale_info::TypeInfo, codec::Decode, codec::Encode)] +pub struct TransactionSuccessStatus { + pub tx_index: u32, + pub tx_success: bool, } diff --git a/rpc/kate-rpc/src/lib.rs b/rpc/kate-rpc/src/lib.rs index 4ebad47e7..0b60341e7 100644 --- a/rpc/kate-rpc/src/lib.rs +++ b/rpc/kate-rpc/src/lib.rs @@ -30,6 +30,18 @@ pub type Cells = BoundedVec; pub mod metrics; +#[derive(Clone, Default)] +pub struct Deps { + /// The maximum number of cells that can be requested in one go. + pub max_cells_size: usize, + /// Enable Kate RPCs + pub rpc_enabled: bool, + /// Enable Kate RPCs Metrics + /// + /// Should not be used unless unless you know what you're doing. + pub rpc_metrics_enabled: bool, +} + /// # TODO /// - [ ] Update type definitions for RPCs in our subxt & explorer. #[rpc(client, server)] diff --git a/rpc/transaction-rpc/Cargo.toml b/rpc/transaction-rpc/Cargo.toml new file mode 100644 index 000000000..b50603809 --- /dev/null +++ b/rpc/transaction-rpc/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "transaction-rpc" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +avail-base.workspace = true +serde.workspace = true +jsonrpsee.workspace = true +sp-core.workspace = true +async-trait.workspace = true + +[features] +default = [ "std" ] +std = [ + "avail-base/std", + "serde/std", + "sp-core/std", +] diff --git a/rpc/transaction-rpc/src/lib.rs b/rpc/transaction-rpc/src/lib.rs new file mode 100644 index 000000000..bc018b654 --- /dev/null +++ b/rpc/transaction-rpc/src/lib.rs @@ -0,0 +1,82 @@ +use async_trait::async_trait; +use jsonrpsee::{ + core::RpcResult, + proc_macros::rpc, + tokio::sync::{ + mpsc::{Receiver, Sender}, + oneshot, + }, + types::ErrorObject, +}; +use serde::{Deserialize, Serialize}; +use sp_core::H256; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TransactionState { + pub block_hash: H256, + pub block_height: u32, + pub tx_hash: H256, + pub tx_index: u32, + pub tx_success: bool, + pub pallet_index: u8, + pub call_index: u8, + pub is_finalized: bool, +} + +#[derive(Clone)] +pub struct Deps { + pub sender: TxStateSender, +} + +pub type OneShotTxStateSender = oneshot::Sender>; +pub type TxStateReceiver = Receiver<(H256, bool, OneShotTxStateSender)>; +pub type TxStateSender = Sender<(H256, bool, OneShotTxStateSender)>; +pub type TxStateChannel = (H256, bool, OneShotTxStateSender); + +#[rpc(client, server)] +pub trait TransactionState { + #[method(name = "transaction_state")] + async fn transaction_state( + &self, + txhash: H256, + is_finalized: Option, + ) -> RpcResult>; +} + +pub struct System { + sender: TxStateSender, +} + +impl System { + pub fn new(deps: Deps) -> Self { + Self { + sender: deps.sender, + } + } +} + +#[async_trait] +impl TransactionStateServer for System { + async fn transaction_state( + &self, + txhash: H256, + finalized: Option, + ) -> RpcResult> { + let (response_tx, response_rx) = oneshot::channel(); + + let finalized = finalized.unwrap_or(false); + let res = self.sender.send((txhash, finalized, response_tx)).await; + if let Err(e) = res { + return Err(internal_error(e.to_string())); + } + + match response_rx.await { + Ok(x) => Ok(x), + Err(e) => Err(internal_error(e.to_string())), + } + } +} + +fn internal_error<'a>(msg: String) -> ErrorObject<'a> { + ErrorObject::owned(0, msg, None::<()>) +} diff --git a/runtime/src/apis.rs b/runtime/src/apis.rs index 9c414c977..a00adf603 100644 --- a/runtime/src/apis.rs +++ b/runtime/src/apis.rs @@ -1,9 +1,11 @@ use super::kate::{Error as RTKateError, GDataProof, GRow}; use crate::{ - constants, mmr, version::VERSION, AccountId, AuthorityDiscovery, Babe, Block, BlockNumber, - EpochDuration, Executive, Grandpa, Historical, Index, InherentDataExt, Mmr, NominationPools, - OpaqueMetadata, Runtime, RuntimeCall, RuntimeGenesisConfig, SessionKeys, Staking, System, - TransactionPayment, LOG_TARGET, + constants::{self}, + mmr, + version::VERSION, + AccountId, AuthorityDiscovery, Babe, Block, BlockNumber, EpochDuration, Executive, Grandpa, + Historical, Index, InherentDataExt, Mmr, NominationPools, OpaqueMetadata, Runtime, RuntimeCall, + RuntimeGenesisConfig, SessionKeys, Staking, System, TransactionPayment, LOG_TARGET, }; use avail_base::{HeaderExtensionBuilderData, ProvidePostInherent}; use avail_core::{ @@ -237,6 +239,36 @@ impl_runtime_apis! { } + impl frame_system_rpc_runtime_api::SystemEventsApi for Runtime { + fn fetch_transaction_success_status() -> Vec { + use frame_system_rpc_runtime_api::TransactionSuccessStatus; + use frame_system::Event; + + let mut results: Vec = Vec::new(); + let event_records = System::read_events_no_consensus(); + for event_record in event_records { + let id = match &event_record.phase { + frame_system::Phase::ApplyExtrinsic(x) => *x, + _ => continue + }; + + let system_event = match &event_record.event { + crate::RuntimeEvent::System(x) => x, + _ => continue, + }; + + match system_event { + Event::::ExtrinsicSuccess{dispatch_info: _} => results.push(TransactionSuccessStatus {tx_index: id, tx_success: true}), + Event::::ExtrinsicFailed{dispatch_error: _, dispatch_info: _} => results.push(TransactionSuccessStatus {tx_index: id, tx_success: false}), + _ => continue, + } + } + + results + } + } + + impl pallet_transaction_payment_rpc_runtime_api::TransactionPaymentApi< Block, Balance, diff --git a/runtime/src/version.rs b/runtime/src/version.rs index fc862ab8c..a05c5b54e 100644 --- a/runtime/src/version.rs +++ b/runtime/src/version.rs @@ -17,7 +17,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { // Per convention: if the runtime behavior changes, increment spec_version // and set impl_version to 0. This paramenter is typically incremented when // there's an update to the transaction_version. - spec_version: 41, + spec_version: 42, // The version of the implementation of the specification. Nodes can ignore this. It is only // used to indicate that the code is different. As long as the authoring_version and the // spec_version are the same, the code itself might have changed, but the native and Wasm