diff --git a/.github/workflows/db-version.yml b/.github/workflows/db-version.yml index 158b287c7..bddf509ff 100644 --- a/.github/workflows/db-version.yml +++ b/.github/workflows/db-version.yml @@ -6,6 +6,11 @@ on: workflow_call: jobs: + db-check: + runs-on: ubuntu-latest + steps: + - run: echo "Checking DB Version" + update-db-version: runs-on: ubuntu-latest if: contains(github.event.pull_request.labels.*.name, 'db-migration') diff --git a/.github/workflows/linters.yml b/.github/workflows/linters.yml index fc9c47c9f..18ebd2e49 100644 --- a/.github/workflows/linters.yml +++ b/.github/workflows/linters.yml @@ -27,7 +27,13 @@ jobs: toml-lint: runs-on: ubuntu-latest steps: + - name: Download taplo + run: | + curl -L https://github.com/tamasfe/taplo/releases/download/0.9.3/taplo-linux-x86_64.gz -o taplo.gz + gunzip taplo.gz + chmod +x taplo + mv taplo /usr/local/bin/taplo - name: Checkout toml files uses: actions/checkout@v4 - name: Run toml check - run: npx @taplo/cli@0.5.2 fmt --config ./taplo/taplo.toml --check + run: taplo fmt --config ./taplo/taplo.toml --check diff --git a/.github/workflows/pull-request.yml b/.github/workflows/pull-request.yml index 55b510e86..1cf9ffecf 100644 --- a/.github/workflows/pull-request.yml +++ b/.github/workflows/pull-request.yml @@ -23,7 +23,6 @@ jobs: linters: name: Run linters needs: update_db_version - if: ${{ github.event.pull_request.draft == false && always() }} uses: ./.github/workflows/linters.yml rust_check: @@ -39,14 +38,12 @@ jobs: coverage: name: Run Coverage needs: update_db_version - if: ${{ github.event.pull_request.draft == false && always() }} secrets: inherit uses: ./.github/workflows/coverage.yml build: name: Build Madara needs: update_db_version - if: ${{ github.event.pull_request.draft == false && always() }} uses: ./.github/workflows/build.yml js_test: diff --git a/Cargo.lock b/Cargo.lock index 8e1c5c0b5..50ce03c38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5576,6 +5576,7 @@ dependencies = [ "rocksdb", "serde", "starknet-types-core 0.1.7 (git+https://github.com/kasarlabs/types-rs.git?branch=feat-deserialize-v0.1.7)", + "starknet-types-rpc", "starknet_api", "tempfile", "thiserror 2.0.3", diff --git a/README.md b/README.md index a6330d505..0859c9d28 100644 --- a/README.md +++ b/README.md @@ -400,7 +400,7 @@ Here is a list of all the supported methods with their current status: | ------ | ------------------------------------------------ | | ✅ | `starknet_unsubscribe` (v0.8.0) | | ✅ | `starknet_subscribeNewHeads` (v0.8.0) | -| ❌ | `starknet_subscribeEvents` (v0.8.0) | +| ✅ | `starknet_subscribeEvents` (v0.8.0) | | ❌ | `starknet_subscribeTransactionStatus` (v0.8.0) | | ❌ | `starknet_subscribePendingTransactions` (v0.8.0) | | ❌ | `starknet_subscriptionReorg` (v0.8.0) | diff --git a/crates/madara/client/db/Cargo.toml b/crates/madara/client/db/Cargo.toml index f78e62c54..0f9c30e2d 100644 --- a/crates/madara/client/db/Cargo.toml +++ b/crates/madara/client/db/Cargo.toml @@ -32,6 +32,7 @@ mp-utils = { workspace = true } blockifier = { workspace = true } bonsai-trie = { workspace = true } starknet-types-core = { workspace = true } +starknet-types-rpc = { workspace = true } starknet_api = { workspace = true } # Other diff --git a/crates/madara/client/db/src/block_db.rs b/crates/madara/client/db/src/block_db.rs index 382d73581..17480ca96 100644 --- a/crates/madara/client/db/src/block_db.rs +++ b/crates/madara/client/db/src/block_db.rs @@ -12,6 +12,7 @@ use mp_state_update::StateDiff; use rocksdb::WriteOptions; use starknet_api::core::ChainId; use starknet_types_core::felt::Felt; +use starknet_types_rpc::EmittedEvent; type Result = std::result::Result; @@ -313,6 +314,29 @@ impl MadaraBackend { tracing::debug!("Failed to send block info to subscribers: {e}"); } } + if self.sender_event.receiver_count() > 0 { + let block_number = block.info.header.block_number; + let block_hash = block.info.block_hash; + + block + .inner + .receipts + .iter() + .flat_map(|receipt| { + let tx_hash = receipt.transaction_hash(); + receipt.events().iter().map(move |event| (tx_hash, event)) + }) + .for_each(|(transaction_hash, event)| { + if let Err(e) = self.sender_event.publish(EmittedEvent { + event: event.clone().into(), + block_hash: Some(block_hash), + block_number: Some(block_number), + transaction_hash, + }) { + tracing::debug!("Failed to send event to subscribers: {e}"); + } + }); + } // clear pending tx.delete_cf(&meta, ROW_PENDING_INFO); @@ -400,6 +424,11 @@ impl MadaraBackend { self.sender_block_info.subscribe() } + #[tracing::instrument(skip(self), fields(module = "BlockDB"))] + pub fn subscribe_events(&self, from_address: Option) -> tokio::sync::broadcast::Receiver> { + self.sender_event.subscribe(from_address) + } + #[tracing::instrument(skip(self, id), fields(module = "BlockDB"))] pub fn get_block_inner(&self, id: &impl DbBlockIdResolvable) -> Result> { let Some(ty) = id.resolve_db_block_id(self)? else { return Ok(None) }; diff --git a/crates/madara/client/db/src/lib.rs b/crates/madara/client/db/src/lib.rs index 099dda789..33bc43c4e 100644 --- a/crates/madara/client/db/src/lib.rs +++ b/crates/madara/client/db/src/lib.rs @@ -13,7 +13,9 @@ use rocksdb::{ }; use rocksdb_options::rocksdb_global_options; use snapshots::Snapshots; +use starknet_types_core::felt::Felt; use starknet_types_core::hash::{Pedersen, Poseidon, StarkHash}; +use starknet_types_rpc::EmittedEvent; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::{fmt, fs}; @@ -272,6 +274,141 @@ impl Default for TrieLogConfig { } } +/// EventChannels manages a highly efficient and scalable pub/sub system for events with 16 specific channels +/// plus one "all" channel. This architecture provides several key benefits: +/// +/// Benefits: +/// - Selective Subscription: Subscribers can choose between receiving all events or filtering for specific +/// senders, optimizing network and processing resources +/// - Memory Efficiency: The fixed number of channels (16) provides a good balance between granularity +/// and memory overhead +/// - Predictable Routing: The XOR-based hash function ensures consistent and fast mapping of sender +/// addresses to channels +/// +/// Events are distributed based on the sender's address in the event, where each sender address +/// is mapped to one of the 16 specific channels using a simple XOR-based hash function. +/// Subscribers can choose to either receive all events or only events from specific senders +/// by subscribing to the corresponding channel. +pub struct EventChannels { + /// Broadcast channel that receives all events regardless of their sender's address + all_channels: tokio::sync::broadcast::Sender>, + /// Array of 16 broadcast channels, each handling events from a subset of sender addresses + /// The target channel for an event is determined by the sender's address mapping + specific_channels: [tokio::sync::broadcast::Sender>; 16], +} + +impl EventChannels { + /// Creates a new EventChannels instance with the specified buffer capacity for each channel. + /// Each channel (both all_channels and specific channels) will be able to buffer up to + /// `capacity` events before older events are dropped. + /// + /// # Arguments + /// * `capacity` - The maximum number of events that can be buffered in each channel + /// + /// # Returns + /// A new EventChannels instance with initialized broadcast channels + pub fn new(capacity: usize) -> Self { + let (all_channels, _) = tokio::sync::broadcast::channel(capacity); + + let mut specific_channels = Vec::with_capacity(16); + for _ in 0..16 { + let (sender, _) = tokio::sync::broadcast::channel(capacity); + specific_channels.push(sender); + } + + Self { all_channels, specific_channels: specific_channels.try_into().unwrap() } + } + + /// Subscribes to events based on an optional sender address filter + /// + /// # Arguments + /// * `from_address` - Optional sender address to filter events: + /// * If `Some(address)`, subscribes only to events from senders whose addresses map + /// to the same channel as the provided address (address % 16) + /// * If `None`, subscribes to all events regardless of sender address + /// + /// # Returns + /// A broadcast::Receiver that will receive either: + /// * All events (if from_address is None) + /// * Only events from senders whose addresses map to the same channel as the provided address + /// + /// # Warning + /// This method only provides a coarse filtering mechanism based on address mapping. + /// You will still need to implement additional filtering in your receiver logic because: + /// * Multiple sender addresses map to the same channel + /// * You may want to match the exact sender address rather than just its channel mapping + /// + /// # Implementation Details + /// When a specific address is provided, the method: + /// 1. Calculates the channel index using the sender's address + /// 2. Subscribes to the corresponding specific channel + /// + /// This means you'll receive events from all senders whose addresses map to the same channel + pub fn subscribe(&self, from_address: Option) -> tokio::sync::broadcast::Receiver> { + match from_address { + Some(address) => { + let channel_index = self.calculate_channel_index(&address); + self.specific_channels[channel_index].subscribe() + } + None => self.all_channels.subscribe(), + } + } + + /// Publishes an event to both the all_channels and the specific channel determined by the sender's address. + /// The event will only be sent to channels that have active subscribers. + /// + /// # Arguments + /// * `event` - The event to publish, containing the sender's address that determines the target specific channel + /// + /// # Returns + /// * `Ok(usize)` - The sum of the number of subscribers that received the event across both channels + /// * `Ok(0)` - If no subscribers exist in any channel + /// * `Err` - If the event couldn't be sent + pub fn publish( + &self, + event: EmittedEvent, + ) -> Result>>> { + let channel_index = self.calculate_channel_index(&event.event.from_address); + let specific_channel = &self.specific_channels[channel_index]; + + let mut total = 0; + if self.all_channels.receiver_count() > 0 { + total += self.all_channels.send(event.clone())?; + } + if specific_channel.receiver_count() > 0 { + total += specific_channel.send(event)?; + } + Ok(total) + } + + pub fn receiver_count(&self) -> usize { + self.all_channels.receiver_count() + self.specific_channels.iter().map(|c| c.receiver_count()).sum::() + } + + /// Calculates the target channel index for a given sender's address + /// + /// # Arguments + /// * `address` - The Felt address of the event sender to calculate the channel index for + /// + /// # Returns + /// A channel index between 0 and 15, calculated by XORing the two highest limbs of the address + /// and taking the lowest 4 bits of the result. + /// + /// # Implementation Details + /// Rather than using the last byte of the address, this function: + /// 1. Gets the raw limbs representation of the address + /// 2. XORs limbs[0] and limbs[1] (the two lowest limbs) + /// 3. Uses the lowest 4 bits of the XOR result to determine the channel + /// + /// This provides a balanced distribution of addresses across channels by + /// incorporating entropy from the address + fn calculate_channel_index(&self, address: &Felt) -> usize { + let limbs = address.to_raw(); + let hash = limbs[0] ^ limbs[1]; + (hash & 0x0f) as usize + } +} + /// Madara client database backend singleton. pub struct MadaraBackend { backup_handle: Option>, @@ -281,6 +418,7 @@ pub struct MadaraBackend { snapshots: Arc, trie_log_config: TrieLogConfig, sender_block_info: tokio::sync::broadcast::Sender, + sender_event: EventChannels, write_opt_no_wal: WriteOptions, #[cfg(any(test, feature = "testing"))] _temp_dir: Option, @@ -386,6 +524,7 @@ impl MadaraBackend { snapshots, trie_log_config: Default::default(), sender_block_info: tokio::sync::broadcast::channel(100).0, + sender_event: EventChannels::new(100), write_opt_no_wal: make_write_opt_no_wal(), _temp_dir: Some(temp_dir), }) @@ -445,6 +584,7 @@ impl MadaraBackend { snapshots, trie_log_config, sender_block_info: tokio::sync::broadcast::channel(100).0, + sender_event: EventChannels::new(100), write_opt_no_wal: make_write_opt_no_wal(), #[cfg(any(test, feature = "testing"))] _temp_dir: None, diff --git a/crates/madara/client/gateway/client/Cargo.toml b/crates/madara/client/gateway/client/Cargo.toml index 4576fc86a..f6c636e35 100644 --- a/crates/madara/client/gateway/client/Cargo.toml +++ b/crates/madara/client/gateway/client/Cargo.toml @@ -30,8 +30,8 @@ starknet-types-rpc.workspace = true anyhow.workspace = true bytes.workspace = true futures.workspace = true -http-body-util.workspace = true http.workspace = true +http-body-util.workspace = true hyper = { workspace = true, features = ["full"] } hyper-tls.workspace = true hyper-util.workspace = true diff --git a/crates/madara/client/gateway/server/Cargo.toml b/crates/madara/client/gateway/server/Cargo.toml index e520a99f0..83a2492e4 100644 --- a/crates/madara/client/gateway/server/Cargo.toml +++ b/crates/madara/client/gateway/server/Cargo.toml @@ -37,8 +37,8 @@ hyper-util.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true thiserror.workspace = true -tokio-util.workspace = true tokio.workspace = true +tokio-util.workspace = true tracing.workspace = true [dev-dependencies] diff --git a/crates/madara/client/mempool/Cargo.toml b/crates/madara/client/mempool/Cargo.toml index 79e4ac2e1..6e937af8c 100644 --- a/crates/madara/client/mempool/Cargo.toml +++ b/crates/madara/client/mempool/Cargo.toml @@ -64,8 +64,8 @@ mockall = { workspace = true, optional = true } reqwest.workspace = true serde.workspace = true thiserror.workspace = true -tokio-util.workspace = true tokio.workspace = true +tokio-util.workspace = true #Instrumentation opentelemetry = { workspace = true, features = ["metrics", "logs"] } diff --git a/crates/madara/client/rpc/src/utils/mod.rs b/crates/madara/client/rpc/src/utils/mod.rs index 82fbf6188..8878be332 100644 --- a/crates/madara/client/rpc/src/utils/mod.rs +++ b/crates/madara/client/rpc/src/utils/mod.rs @@ -1,5 +1,8 @@ use std::fmt; +use starknet_types_core::felt::Felt; +use starknet_types_rpc::Event; + use crate::StarknetRpcApiError; pub fn display_internal_server_error(err: impl fmt::Display) { @@ -110,3 +113,124 @@ impl OptionExt for Option { } } } + +/// Filters events based on the provided address and keys. +/// +/// This function checks if an event matches the given address and keys. +/// If an address is provided, the event must originate from that address. +/// The event's keys must match the provided keys pattern. +/// +/// # Arguments +/// +/// * `event` - A reference to the event to be filtered. +/// * `address` - An optional address that the event must originate from. +/// * `keys` - An optional slice of key patterns that the event's keys must match. +/// +/// # Returns +/// +/// * `true` if the event matches the address and keys pattern. +/// * `false` otherwise. +#[inline] +pub fn event_match_filter(event: &Event, address: Option<&Felt>, keys: Option<&[Vec]>) -> bool { + // Check if the event's address matches the provided address, if any. + if let Some(addr) = address { + if addr != &event.from_address { + return false; + } + } + + // If keys are not provided, return true. + if let Some(keys) = keys { + // Check if the number of keys in the event matches the number of provided key patterns. + if keys.len() > event.event_content.keys.len() { + return false; + } + + // Check if each key in the event matches the corresponding key pattern. + // Use iterators to traverse both keys and event.event_content.keys simultaneously. + for (pattern, key) in keys.iter().zip(event.event_content.keys.iter()) { + if !pattern.is_empty() && !pattern.contains(key) { + return false; + } + } + } + + true +} + +#[cfg(test)] +mod tests { + use super::*; + use rstest::*; + use starknet_types_rpc::EventContent; + + #[fixture] + fn base_event() -> Event { + Event { + from_address: Felt::from_hex_unchecked("0x1234"), + event_content: EventContent { + data: vec![Felt::from_hex_unchecked("0x5678")], + keys: vec![Felt::from_hex_unchecked("0x1"), Felt::from_hex_unchecked("0x2")], + }, + } + } + + #[fixture] + fn matching_address() -> Felt { + Felt::from_hex_unchecked("0x1234") + } + + #[fixture] + fn non_matching_address() -> Felt { + Felt::from_hex_unchecked("0x5678") + } + + #[fixture] + fn matching_keys() -> Vec> { + vec![vec![Felt::from_hex_unchecked("0x1")], vec![Felt::from_hex_unchecked("0x2")]] + } + + #[fixture] + fn non_matching_keys() -> Vec> { + vec![vec![Felt::from_hex_unchecked("0x1")], vec![Felt::from_hex_unchecked("0x3")]] + } + + #[rstest] + fn test_address_and_keys_match(base_event: Event, matching_address: Felt, matching_keys: Vec>) { + assert!(event_match_filter(&base_event, Some(&matching_address), Some(&matching_keys))); + } + + #[rstest] + fn test_address_does_not_match(base_event: Event, non_matching_address: Felt, matching_keys: Vec>) { + assert!(!event_match_filter(&base_event, Some(&non_matching_address), Some(&matching_keys))); + } + + #[rstest] + fn test_keys_do_not_match(base_event: Event, matching_address: Felt, non_matching_keys: Vec>) { + assert!(!event_match_filter(&base_event, Some(&matching_address), Some(&non_matching_keys))); + } + + #[rstest] + fn test_no_address_provided(base_event: Event, matching_keys: Vec>) { + assert!(event_match_filter(&base_event, None, Some(&matching_keys))); + } + + #[rstest] + fn test_no_keys_provided(base_event: Event, matching_address: Felt) { + assert!(event_match_filter(&base_event, Some(&matching_address), None)); + } + + #[rstest] + fn test_keys_with_pattern(base_event: Event, matching_address: Felt) { + // [0x1 | 0x2, 0x2] + let keys = vec![ + vec![Felt::from_hex_unchecked("0x1"), Felt::from_hex_unchecked("0x2")], + vec![Felt::from_hex_unchecked("0x2")], + ]; + assert!(event_match_filter(&base_event, Some(&matching_address), Some(&keys))); + + // [_, 0x3 | 0x2] + let keys = vec![vec![], vec![Felt::from_hex_unchecked("0x3"), Felt::from_hex_unchecked("0x2")]]; + assert!(event_match_filter(&base_event, Some(&matching_address), Some(&keys))); + } +} diff --git a/crates/madara/client/rpc/src/versions/user/v0_7_1/methods/read/get_events.rs b/crates/madara/client/rpc/src/versions/user/v0_7_1/methods/read/get_events.rs index 5c37a57cb..30a560e6d 100644 --- a/crates/madara/client/rpc/src/versions/user/v0_7_1/methods/read/get_events.rs +++ b/crates/madara/client/rpc/src/versions/user/v0_7_1/methods/read/get_events.rs @@ -5,6 +5,7 @@ use starknet_types_rpc::{EmittedEvent, Event, EventContent, EventFilterWithPageR use crate::constants::{MAX_EVENTS_CHUNK_SIZE, MAX_EVENTS_KEYS}; use crate::errors::{StarknetRpcApiError, StarknetRpcResult}; use crate::types::ContinuationToken; +use crate::utils::event_match_filter; use crate::Starknet; /// Returns all events matching the given filter. @@ -32,11 +33,13 @@ pub async fn get_events( filter: EventFilterWithPageRequest, ) -> StarknetRpcResult> { let from_address = filter.address; - let keys = filter.keys.unwrap_or_default(); + let keys = filter.keys; let chunk_size = filter.chunk_size; - if keys.len() > MAX_EVENTS_KEYS { - return Err(StarknetRpcApiError::TooManyKeysInFilter); + if let Some(keys) = &keys { + if keys.len() > MAX_EVENTS_KEYS { + return Err(StarknetRpcApiError::TooManyKeysInFilter); + } } if chunk_size > MAX_EVENTS_CHUNK_SIZE as u64 { return Err(StarknetRpcApiError::PageSizeTooBig); @@ -65,9 +68,8 @@ pub async fn get_events( (true, starknet.get_block(&BlockId::Tag(BlockTag::Pending))?) }; - let block_filtered_events: Vec> = get_block_events(starknet, &block) - .into_iter() - .filter(|event| event_match_filter(&event.event, from_address, &keys)) + let block_filtered_events: Vec> = drain_block_events(block) + .filter(|event| event_match_filter(&event.event, from_address.as_ref(), keys.as_deref())) .collect(); if current_block == from_block && (block_filtered_events.len() as u64) < continuation_token.event_n { @@ -96,15 +98,6 @@ pub async fn get_events( Ok(EventsChunk { events: filtered_events, continuation_token: None }) } -#[inline] -fn event_match_filter(event: &Event, address: Option, keys: &[Vec]) -> bool { - let match_from_address = address.map_or(true, |addr| addr == event.from_address); - let match_keys = keys.iter().enumerate().all(|(i, keys)| { - event.event_content.keys.len() > i && (keys.is_empty() || keys.contains(&event.event_content.keys[i])) - }); - match_from_address && match_keys -} - fn block_range( starknet: &Starknet, from_block: Option, @@ -124,26 +117,40 @@ fn block_range( Ok((from_block_n, to_block_n, latest_block_n)) } -fn get_block_events(_starknet: &Starknet, block: &MadaraMaybePendingBlock) -> Vec> { +/// Extracts and iterates over all events emitted within a block. +/// +/// This function processes all transactions in a given block (whether pending or confirmed) +/// and returns an iterator over their emitted events. Each event is enriched with its +/// contextual information including block details and the transaction that generated it. +/// +/// # Arguments +/// +/// * `block` - A reference to either a pending or confirmed block (`MadaraMaybePendingBlock`) +/// +/// # Returns +/// +/// Returns an iterator yielding `EmittedEvent` items. Each item contains: +/// - The event data (from address, keys, and associated data) +/// - Block context (hash and number, if the block is confirmed) +/// - Transaction hash that generated the event +pub fn drain_block_events(block: MadaraMaybePendingBlock) -> impl Iterator> { let (block_hash, block_number) = match &block.info { MadaraMaybePendingBlockInfo::Pending(_) => (None, None), MadaraMaybePendingBlockInfo::NotPending(block) => (Some(block.block_hash), Some(block.header.block_number)), }; - let tx_hash_and_events = block.inner.receipts.iter().flat_map(|receipt| { + let tx_hash_and_events = block.inner.receipts.into_iter().flat_map(|receipt| { let tx_hash = receipt.transaction_hash(); - receipt.events().iter().map(move |events| (tx_hash, events)) + receipt.into_events().into_iter().map(move |events| (tx_hash, events)) }); - tx_hash_and_events - .map(|(transaction_hash, event)| EmittedEvent { - event: Event { - from_address: event.from_address, - event_content: EventContent { keys: event.keys.clone(), data: event.data.clone() }, - }, - block_hash, - block_number, - transaction_hash, - }) - .collect() + tx_hash_and_events.map(move |(transaction_hash, event)| EmittedEvent { + event: Event { + from_address: event.from_address, + event_content: EventContent { keys: event.keys, data: event.data }, + }, + block_hash, + block_number, + transaction_hash, + }) } diff --git a/crates/madara/client/rpc/src/versions/user/v0_8_0/api.rs b/crates/madara/client/rpc/src/versions/user/v0_8_0/api.rs index 799e22a34..d968744f3 100644 --- a/crates/madara/client/rpc/src/versions/user/v0_8_0/api.rs +++ b/crates/madara/client/rpc/src/versions/user/v0_8_0/api.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use starknet_types_core::felt::Felt; pub(crate) type NewHead = starknet_types_rpc::BlockHeader; +pub(crate) type EmittedEvent = starknet_types_rpc::EmittedEvent; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ContractStorageKeysItem { @@ -54,8 +55,16 @@ pub struct GetStorageProofResult { #[versioned_rpc("V0_8_0", "starknet")] pub trait StarknetWsRpcApi { - #[subscription(name = "subscribeNewHeads", unsubscribe = "unsubscribe", item = NewHead, param_kind = map)] - async fn subscribe_new_heads(&self, block_id: BlockId) -> jsonrpsee::core::SubscriptionResult; + #[subscription(name = "subscribeNewHeads", unsubscribe = "unsubscribeNewHeads", item = NewHead, param_kind = map)] + async fn subscribe_new_heads(&self, block: BlockId) -> jsonrpsee::core::SubscriptionResult; + + #[subscription(name = "subscribeEvents", unsubscribe = "unsubscribeEvents", item = EmittedEvent, param_kind = map)] + async fn subscribe_events( + &self, + from_address: Option, + keys: Option>>, + block: Option, + ) -> jsonrpsee::core::SubscriptionResult; } #[versioned_rpc("V0_8_0", "starknet")] diff --git a/crates/madara/client/rpc/src/versions/user/v0_8_0/methods/ws/lib.rs b/crates/madara/client/rpc/src/versions/user/v0_8_0/methods/ws/lib.rs index 4c79cc3c4..1c21d4353 100644 --- a/crates/madara/client/rpc/src/versions/user/v0_8_0/methods/ws/lib.rs +++ b/crates/madara/client/rpc/src/versions/user/v0_8_0/methods/ws/lib.rs @@ -1,377 +1,28 @@ -use mp_block::{BlockId, BlockTag}; +use mp_block::BlockId; +use starknet_types_core::felt::Felt; -use crate::{ - errors::{ErrorExtWs, OptionExtWs, StarknetWsApiError}, - versions::user::v0_8_0::StarknetWsRpcApiV0_8_0Server, -}; +use crate::versions::user::v0_8_0::StarknetWsRpcApiV0_8_0Server; -use super::BLOCK_PAST_LIMIT; +use super::subscribe_events::*; +use super::subscribe_new_heads::*; #[jsonrpsee::core::async_trait] impl StarknetWsRpcApiV0_8_0Server for crate::Starknet { async fn subscribe_new_heads( &self, subscription_sink: jsonrpsee::PendingSubscriptionSink, - block_id: BlockId, + block: BlockId, ) -> jsonrpsee::core::SubscriptionResult { - let sink = - subscription_sink.accept().await.or_internal_server_error("Failed to establish websocket connection")?; - - let mut block_n = match block_id { - BlockId::Number(block_n) => { - let err = || format!("Failed to retrieve block info for block {block_n}"); - let block_latest = self - .backend - .get_block_n(&BlockId::Tag(BlockTag::Latest)) - .or_else_internal_server_error(err)? - .ok_or(StarknetWsApiError::NoBlocks)?; - - if block_n < block_latest.saturating_sub(BLOCK_PAST_LIMIT) { - return Err(StarknetWsApiError::TooManyBlocksBack.into()); - } - - block_n - } - BlockId::Hash(block_hash) => { - let err = || format!("Failed to retrieve block info at hash {block_hash:#x}"); - let block_latest = self - .backend - .get_block_n(&BlockId::Tag(BlockTag::Latest)) - .or_else_internal_server_error(err)? - .ok_or(StarknetWsApiError::NoBlocks)?; - - let block_n = self - .backend - .get_block_n(&block_id) - .or_else_internal_server_error(err)? - .ok_or(StarknetWsApiError::BlockNotFound)?; - - if block_n < block_latest.saturating_sub(BLOCK_PAST_LIMIT) { - return Err(StarknetWsApiError::TooManyBlocksBack.into()); - } - - block_n - } - BlockId::Tag(BlockTag::Latest) => self - .backend - .get_latest_block_n() - .or_internal_server_error("Failed to retrieve block info for latest block")? - .ok_or(StarknetWsApiError::NoBlocks)?, - BlockId::Tag(BlockTag::Pending) => { - return Err(StarknetWsApiError::Pending.into()); - } - }; - - let mut rx = self.backend.subscribe_block_info(); - for n in block_n.. { - if sink.is_closed() { - return Ok(()); - } - - let block_info = match self.backend.get_block_info(&BlockId::Number(n)) { - Ok(Some(block_info)) => { - let err = || format!("Failed to retrieve block info for block {n}"); - block_info.as_nonpending_owned().ok_or_else_internal_server_error(err)? - } - Ok(None) => break, - Err(e) => { - let err = format!("Failed to retrieve block info for block {n}: {e}"); - return Err(StarknetWsApiError::internal_server_error(err).into()); - } - }; - - send_block_header(&sink, block_info, block_n).await?; - block_n = block_n.saturating_add(1); - } - - // Catching up with the backend - loop { - tokio::select! { - block_info = rx.recv() => { - let block_info = block_info.or_internal_server_error("Failed to retrieve block info")?; - if block_info.header.block_number == block_n { - break send_block_header(&sink, block_info, block_n).await?; - } - }, - _ = sink.closed() => { - return Ok(()) - } - } - } - - // New block headers - loop { - tokio::select! { - block_info = rx.recv() => { - let block_info = block_info.or_internal_server_error("Failed to retrieve block info")?; - if block_info.header.block_number == block_n + 1 { - send_block_header(&sink, block_info, block_n).await?; - } else { - let err = format!( - "Received non-sequential block {}, expected {}", - block_info.header.block_number, - block_n + 1 - ); - return Err(StarknetWsApiError::internal_server_error(err).into()); - } - block_n = block_n.saturating_add(1); - }, - _ = sink.closed() => { - return Ok(()) - } - } - } + Ok(subscribe_new_heads(self, subscription_sink, block).await?) } -} - -async fn send_block_header<'a>( - sink: &jsonrpsee::core::server::SubscriptionSink, - block_info: mp_block::MadaraBlockInfo, - block_n: u64, -) -> Result<(), StarknetWsApiError> { - let header = starknet_types_rpc::BlockHeader::from(block_info); - let msg = jsonrpsee::SubscriptionMessage::from_json(&header) - .or_else_internal_server_error(|| format!("Failed to create response message for block {block_n}"))?; - - sink.send(msg).await.or_internal_server_error("Failed to respond to websocket request")?; - - Ok(()) -} - -#[cfg(test)] -mod test { - use super::*; - - use jsonrpsee::ws_client::WsClientBuilder; - use starknet_types_core::felt::Felt; - - use crate::{ - test_utils::rpc_test_setup, - versions::user::v0_8_0::{NewHead, StarknetWsRpcApiV0_8_0Client}, - Starknet, - }; - - fn block_generator(backend: &mc_db::MadaraBackend) -> impl Iterator + '_ { - (0..).map(|n| { - backend - .store_block( - mp_block::MadaraMaybePendingBlock { - info: mp_block::MadaraMaybePendingBlockInfo::NotPending(mp_block::MadaraBlockInfo { - header: mp_block::Header { - parent_block_hash: Felt::from(n), - block_number: n, - ..Default::default() - }, - block_hash: Felt::from(n), - tx_hashes: vec![], - }), - inner: mp_block::MadaraBlockInner { transactions: vec![], receipts: vec![] }, - }, - mp_state_update::StateDiff::default(), - vec![], - None, - None, - ) - .expect("Storing block"); - - let block_info = backend - .get_block_info(&BlockId::Number(n)) - .expect("Retrieving block info") - .expect("Retrieving block info") - .as_nonpending_owned() - .expect("Retrieving block info"); - - NewHead::from(block_info) - }) - } - - #[tokio::test] - #[rstest::rstest] - async fn subscribe_new_heads(rpc_test_setup: (std::sync::Arc, Starknet)) { - let (backend, starknet) = rpc_test_setup; - let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); - let server_url = format!("ws://{}", server.local_addr().expect("Retrieving server local address")); - // Server will be stopped once this is dropped - let _server_handle = server.start(StarknetWsRpcApiV0_8_0Server::into_rpc(starknet)); - let client = WsClientBuilder::default().build(&server_url).await.expect("Building client"); - - let mut generator = block_generator(&backend); - let expected = generator.next().expect("Retrieving block from backend"); - - let mut sub = - client.subscribe_new_heads(BlockId::Tag(BlockTag::Latest)).await.expect("starknet_subscribeNewHeads"); - - let next = sub.next().await; - let header = next.expect("Waiting for block header").expect("Waiting for block header"); - - assert_eq!( - header, - expected, - "actual: {}\nexpect: {}", - serde_json::to_string_pretty(&header).unwrap_or_default(), - serde_json::to_string_pretty(&expected).unwrap_or_default() - ); - } - - #[tokio::test] - #[rstest::rstest] - async fn subscribe_new_heads_many(rpc_test_setup: (std::sync::Arc, Starknet)) { - let (backend, starknet) = rpc_test_setup; - let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); - let server_url = format!("ws://{}", server.local_addr().expect("Retrieving server local address")); - // Server will be stopped once this is dropped - let _server_handle = server.start(StarknetWsRpcApiV0_8_0Server::into_rpc(starknet)); - let client = WsClientBuilder::default().build(&server_url).await.expect("Building client"); - - let generator = block_generator(&backend); - let expected: Vec<_> = generator.take(BLOCK_PAST_LIMIT as usize).collect(); - - let mut sub = client.subscribe_new_heads(BlockId::Number(0)).await.expect("starknet_subscribeNewHeads"); - - for e in expected { - let next = sub.next().await; - let header = next.expect("Waiting for block header").expect("Waiting for block header"); - assert_eq!( - header, - e, - "actual: {}\nexpect: {}", - serde_json::to_string_pretty(&header).unwrap_or_default(), - serde_json::to_string_pretty(&e).unwrap_or_default() - ); - } - } - - #[tokio::test] - #[rstest::rstest] - async fn subscribe_new_heads_disconnect(rpc_test_setup: (std::sync::Arc, Starknet)) { - let (backend, starknet) = rpc_test_setup; - let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); - let server_url = format!("ws://{}", server.local_addr().expect("Retrieving server local address")); - // Server will be stopped once this is dropped - let _server_handle = server.start(StarknetWsRpcApiV0_8_0Server::into_rpc(starknet)); - let client = WsClientBuilder::default().build(&server_url).await.expect("Building client"); - - let mut generator = block_generator(&backend); - let expected = generator.next().expect("Retrieving block from backend"); - - let mut sub = client.subscribe_new_heads(BlockId::Number(0)).await.expect("starknet_subscribeNewHeads"); - - let next = sub.next().await; - let header = next.expect("Waiting for block header").expect("Waiting for block header"); - - assert_eq!( - header, - expected, - "actual: {}\nexpect: {}", - serde_json::to_string_pretty(&header).unwrap_or_default(), - serde_json::to_string_pretty(&expected).unwrap_or_default() - ); - - let next = sub.unsubscribe().await; - assert!(next.is_ok()); - } - - #[tokio::test] - #[rstest::rstest] - async fn subscribe_new_heads_future(rpc_test_setup: (std::sync::Arc, Starknet)) { - let (backend, starknet) = rpc_test_setup; - let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); - let server_url = format!("ws://{}", server.local_addr().expect("Retrieving server local address")); - // Server will be stopped once this is dropped - let _server_handle = server.start(StarknetWsRpcApiV0_8_0Server::into_rpc(starknet)); - let client = WsClientBuilder::default().build(&server_url).await.expect("Building client"); - - let mut generator = block_generator(&backend); - let _block_0 = generator.next().expect("Retrieving block from backend"); - - let mut sub = client.subscribe_new_heads(BlockId::Number(1)).await.expect("starknet_subscribeNewHeads"); - - let block_1 = generator.next().expect("Retrieving block from backend"); - - let next = sub.next().await; - let header = next.expect("Waiting for block header").expect("Waiting for block header"); - - // Note that `sub` does not yield block 0. This is because it starts - // from block 1, ignoring any block before. This can server to notify - // when a block is ready - assert_eq!( - header, - block_1, - "actual: {}\nexpect: {}", - serde_json::to_string_pretty(&header).unwrap_or_default(), - serde_json::to_string_pretty(&block_1).unwrap_or_default() - ); - } - - #[tokio::test] - #[rstest::rstest] - async fn subscribe_new_heads_err_too_far_back_block_n( - rpc_test_setup: (std::sync::Arc, Starknet), - ) { - let (backend, starknet) = rpc_test_setup; - let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); - let server_url = format!("ws://{}", server.local_addr().expect("Retrieving server local address")); - // Server will be stopped once this is dropped - let _server_handle = server.start(StarknetWsRpcApiV0_8_0Server::into_rpc(starknet)); - let client = WsClientBuilder::default().build(&server_url).await.expect("Building client"); - - // We generate BLOCK_PAST_LIMIT + 2 because genesis is block 0 - let generator = block_generator(&backend); - let _expected: Vec<_> = generator.take(BLOCK_PAST_LIMIT as usize + 2).collect(); - - let mut sub = client.subscribe_new_heads(BlockId::Number(0)).await.expect("starknet_subscribeNewHeads"); - - // Jsonrsee seems to just close the connection and not return the error - // to the client so this is the best we can do :/ - let next = sub.next().await; - assert!(next.is_none()); - } - - #[tokio::test] - #[rstest::rstest] - async fn subscribe_new_heads_err_too_far_back_block_hash( - rpc_test_setup: (std::sync::Arc, Starknet), - ) { - let (backend, starknet) = rpc_test_setup; - let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); - let server_url = format!("ws://{}", server.local_addr().expect("Retrieving server local address")); - // Server will be stopped once this is dropped - let _server_handle = server.start(StarknetWsRpcApiV0_8_0Server::into_rpc(starknet)); - let client = WsClientBuilder::default().build(&server_url).await.expect("Building client"); - - // We generate BLOCK_PAST_LIMIT + 2 because genesis is block 0 - let generator = block_generator(&backend); - let _expected: Vec<_> = generator.take(BLOCK_PAST_LIMIT as usize + 2).collect(); - - let mut sub = - client.subscribe_new_heads(BlockId::Hash(Felt::from(0))).await.expect("starknet_subscribeNewHeads"); - - // Jsonrsee seems to just close the connection and not return the error - // to the client so this is the best we can do :/ - let next = sub.next().await; - assert!(next.is_none()); - } - - #[tokio::test] - #[rstest::rstest] - async fn subscribe_new_heads_err_pending(rpc_test_setup: (std::sync::Arc, Starknet)) { - let (backend, starknet) = rpc_test_setup; - let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); - let server_url = format!("ws://{}", server.local_addr().expect("Retrieving server local address")); - // Server will be stopped once this is dropped - let _server_handle = server.start(StarknetWsRpcApiV0_8_0Server::into_rpc(starknet)); - let client = WsClientBuilder::default().build(&server_url).await.expect("Building client"); - - let generator = block_generator(&backend); - let _expected: Vec<_> = generator.take(BLOCK_PAST_LIMIT as usize + 2).collect(); - - let mut sub = - client.subscribe_new_heads(BlockId::Tag(BlockTag::Pending)).await.expect("starknet_subscribeNewHeads"); - - // Jsonrsee seems to just close the connection and not return the error - // to the client so this is the best we can do :/ - let next = sub.next().await; - assert!(next.is_none()); + async fn subscribe_events( + &self, + subscription_sink: jsonrpsee::PendingSubscriptionSink, + from_address: Option, + keys: Option>>, + block: Option, + ) -> jsonrpsee::core::SubscriptionResult { + Ok(subscribe_events(self, subscription_sink, from_address, keys, block).await?) } } diff --git a/crates/madara/client/rpc/src/versions/user/v0_8_0/methods/ws/mod.rs b/crates/madara/client/rpc/src/versions/user/v0_8_0/methods/ws/mod.rs index f5444215c..ce3bf3b78 100644 --- a/crates/madara/client/rpc/src/versions/user/v0_8_0/methods/ws/mod.rs +++ b/crates/madara/client/rpc/src/versions/user/v0_8_0/methods/ws/mod.rs @@ -1,3 +1,5 @@ pub mod lib; +pub mod subscribe_events; +pub mod subscribe_new_heads; const BLOCK_PAST_LIMIT: u64 = 1024; diff --git a/crates/madara/client/rpc/src/versions/user/v0_8_0/methods/ws/subscribe_events.rs b/crates/madara/client/rpc/src/versions/user/v0_8_0/methods/ws/subscribe_events.rs new file mode 100644 index 000000000..77c4897d9 --- /dev/null +++ b/crates/madara/client/rpc/src/versions/user/v0_8_0/methods/ws/subscribe_events.rs @@ -0,0 +1,337 @@ +use mp_block::BlockId; +use starknet_types_core::felt::Felt; + +use crate::{ + errors::{ErrorExtWs, StarknetWsApiError}, + utils::event_match_filter, + versions::user::v0_7_1::methods::read::get_events::drain_block_events, +}; + +use super::BLOCK_PAST_LIMIT; + +pub async fn subscribe_events( + starknet: &crate::Starknet, + subscription_sink: jsonrpsee::PendingSubscriptionSink, + from_address: Option, + keys: Option>>, + block_id: Option, +) -> Result<(), StarknetWsApiError> { + let sink = subscription_sink.accept().await.or_internal_server_error("Failed to establish websocket connection")?; + + let mut rx = starknet.backend.subscribe_events(from_address); + + if let Some(block_id) = block_id { + let latest_block = starknet + .backend + .get_latest_block_n() + .or_internal_server_error("Failed to retrieve latest block")? + .ok_or(StarknetWsApiError::NoBlocks)?; + + let block_n = starknet + .backend + .resolve_block_id(&block_id) + .or_internal_server_error("Failed to resolve block id")? + .ok_or(StarknetWsApiError::BlockNotFound)? + .block_n() + .ok_or(StarknetWsApiError::Pending)?; + + if block_n < latest_block.saturating_sub(BLOCK_PAST_LIMIT) { + return Err(StarknetWsApiError::TooManyBlocksBack); + } + for block_number in block_n..=latest_block { + let block = starknet + .get_block(&BlockId::Number(block_number)) + .or_internal_server_error("Failed to retrieve block")?; + for event in drain_block_events(block) + .filter(|event| event_match_filter(&event.event, from_address.as_ref(), keys.as_deref())) + { + let msg = jsonrpsee::SubscriptionMessage::from_json(&event) + .or_internal_server_error("Failed to create response message")?; + sink.send(msg).await.or_internal_server_error("Failed to respond to websocket request")?; + } + } + } + + loop { + tokio::select! { + event = rx.recv() => { + let event = event.or_internal_server_error("Failed to retrieve event")?; + if event_match_filter(&event.event, from_address.as_ref(), keys.as_deref()) { + let msg = jsonrpsee::SubscriptionMessage::from_json(&event) + .or_internal_server_error("Failed to create response message")?; + sink.send(msg).await.or_internal_server_error("Failed to respond to websocket request")?; + } + }, + _ = sink.closed() => { + return Ok(()) + } + } + } +} + +#[cfg(test)] +mod test { + use crate::{ + versions::user::v0_8_0::{StarknetWsRpcApiV0_8_0Client, StarknetWsRpcApiV0_8_0Server}, + Starknet, + }; + + use super::*; + use crate::test_utils::rpc_test_setup; + use jsonrpsee::ws_client::WsClientBuilder; + use mp_receipt::{InvokeTransactionReceipt, TransactionReceipt}; + use starknet_types_rpc::{EmittedEvent, Event, EventContent}; + + /// Generates a transaction receipt with predictable event values for testing purposes. + /// Values are generated using binary patterns for easy verification. + /// + /// # Values Pattern (in binary) + /// For a given base B: + /// - Transaction hash = B << 32 + /// - For each event i: + /// - from_address = (B << 32) | (i << 16) | 1 + /// - keys[j] = (B << 32) | (i << 16) | (2 + j) + /// + /// This means: + /// - Top 32 bits: base value + /// - Next 16 bits: event index + /// - Last 16 bits: value type (1 for address, 2+ for keys) + /// + /// # Arguments + /// * `base` - Base number used as prefix for all values + /// * `num_events` - Number of events to generate + /// * `keys_per_event` - Number of keys per event + + fn generate_receipt(base: u64, num_events: usize, keys_per_event: usize) -> TransactionReceipt { + // Transaction hash is just the base shifted + let tx_hash = Felt::from(base << 32); + + let events = (0..num_events) + .map(|event_idx| { + // Base pattern for this event: (base << 32) | (event_idx << 16) + let event_pattern = (base << 32) | ((event_idx as u64) << 16); + + // from_address adds 1 to the pattern + let from_address = Felt::from(event_pattern | 1); + + // Keys add 2+ to the pattern + let keys = + (0..keys_per_event).map(|key_idx| Felt::from(event_pattern | (2 + key_idx as u64))).collect(); + + mp_receipt::Event { from_address, keys, data: vec![] } + }) + .collect(); + + TransactionReceipt::Invoke(InvokeTransactionReceipt { transaction_hash: tx_hash, events, ..Default::default() }) + } + + // Generator function that produces a stream of blocks containing events + // Each block contains two receipts: + // 1. First receipt with 1 event and 1 key + // 2. Second receipt with 2 events and 2 keys + fn block_generator(backend: &mc_db::MadaraBackend) -> impl Iterator>> + '_ { + (0..).map(|n| { + let block_info = mp_block::MadaraBlockInfo { + header: mp_block::Header { parent_block_hash: Felt::from(n), block_number: n, ..Default::default() }, + block_hash: Felt::from(n), + tx_hashes: vec![], + }; + + let receipts = vec![generate_receipt(n * 2, 1, 1), generate_receipt(n * 2 + 1, 2, 2)]; + + let block_inner = mp_block::MadaraBlockInner { transactions: vec![], receipts }; + + backend + .store_block( + mp_block::MadaraMaybePendingBlock { + info: mp_block::MadaraMaybePendingBlockInfo::NotPending(block_info.clone()), + inner: block_inner.clone(), + }, + mp_state_update::StateDiff::default(), + vec![], + None, + None, + ) + .expect("Storing block"); + + block_inner + .receipts + .into_iter() + .flat_map(|receipt| { + let tx_hash = receipt.transaction_hash(); + receipt.into_events().into_iter().map(move |events| (tx_hash, events)) + }) + .map(|(transaction_hash, event)| EmittedEvent { + event: Event { + from_address: event.from_address, + event_content: EventContent { keys: event.keys, data: event.data }, + }, + block_hash: Some(block_info.block_hash), + block_number: Some(block_info.header.block_number), + transaction_hash, + }) + .collect() + }) + } + + // Test 1: Basic event subscription without any filters + // - Creates 10 blocks with events + // - Verifies that all 30 events are received (3 events per block * 10 blocks) + // - Events should arrive in the same order they were generated + #[tokio::test] + #[rstest::rstest] + async fn subscribe_events_no_filter(rpc_test_setup: (std::sync::Arc, Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("ws://{}", server.local_addr().expect("Retrieving server local address")); + let _server_handle = server.start(StarknetWsRpcApiV0_8_0Server::into_rpc(starknet)); + let client = WsClientBuilder::default().build(&server_url).await.expect("Building client"); + + let mut generator = block_generator(&backend); + + let mut sub = client.subscribe_events(None, None, None).await.expect("Subscribing to events"); + + let mut nb_events = 0; + for _ in 0..10 { + let events = generator.next().expect("Retrieving block"); + for event in events { + let received = sub.next().await.expect("Subscribing closed").expect("Failed to retrieve event"); + assert_eq!(received, event); + nb_events += 1; + } + } + assert_eq!(nb_events, 30); + } + + // Test 2: Event subscription filtered by address + // - Creates blocks and filters events by a specific from_address + // - Only events from the specified address should be received + // - Verifies that at least some events match the filter + #[tokio::test] + #[rstest::rstest] + async fn subscribe_events_filter_address(rpc_test_setup: (std::sync::Arc, Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("ws://{}", server.local_addr().expect("Retrieving server local address")); + let _server_handle = server.start(StarknetWsRpcApiV0_8_0Server::into_rpc(starknet)); + let client = WsClientBuilder::default().build(&server_url).await.expect("Building client"); + + let mut generator = block_generator(&backend); + + let from_address = Felt::from(0x300000001u64); + let mut sub = client.subscribe_events(Some(from_address), None, None).await.expect("Subscribing to events"); + + let mut nb_events = 0; + + for _ in 0..10 { + let events = generator.next().expect("Retrieving block"); + for event in events { + if event.event.from_address == from_address { + let received = sub.next().await.expect("Subscribing closed").expect("Failed to retrieve event"); + assert_eq!(received, event); + nb_events += 1; + } + } + } + assert_eq!(nb_events, 1); + } + + // Test 3: Event subscription filtered by keys + // - Creates blocks and filters events by specific key patterns + // - Only events with matching keys should be received + // - Verifies that exactly two specific events match the filter pattern + #[tokio::test] + #[rstest::rstest] + async fn subscribe_events_filter_keys(rpc_test_setup: (std::sync::Arc, Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("ws://{}", server.local_addr().expect("Retrieving server local address")); + let _server_handle = server.start(StarknetWsRpcApiV0_8_0Server::into_rpc(starknet)); + let client = WsClientBuilder::default().build(&server_url).await.expect("Building client"); + + let mut generator = block_generator(&backend); + + let keys = vec![ + vec![Felt::from(0x300000002u64), Felt::from(0x500000002u64)], + vec![Felt::from(0x300000003u64), Felt::from(0x500000003u64)], + ]; + + let mut sub = client.subscribe_events(None, Some(keys.clone()), None).await.expect("Subscribing to events"); + + let expected_events = vec![ + EmittedEvent { + event: Event { + from_address: Felt::from(0x300000001u64), + event_content: EventContent { + keys: vec![Felt::from(0x300000002u64), Felt::from(0x300000003u64)], + data: vec![], + }, + }, + block_hash: Some(Felt::from(1u64)), + block_number: Some(1), + transaction_hash: Felt::from(0x300000000u64), + }, + EmittedEvent { + event: Event { + from_address: Felt::from(0x500000001u64), + event_content: EventContent { + keys: vec![Felt::from(0x500000002u64), Felt::from(0x500000003u64)], + data: vec![], + }, + }, + block_hash: Some(Felt::from(2u64)), + block_number: Some(2), + transaction_hash: Felt::from(0x500000000u64), + }, + ]; + + for _ in 0..10 { + let _ = generator.next().expect("Retrieving block"); + } + + for event in expected_events { + let received = sub.next().await.expect("Subscribing closed").expect("Failed to retrieve event"); + assert_eq!(received, event); + } + } + + // Test 4: Event subscription starting from a past block + // - Generates initial blocks (0-2) + // - Starts subscription from block 3 + // - Verifies that only events from blocks 3-9 are received + // - Events should arrive in the correct order + #[tokio::test] + #[rstest::rstest] + async fn subscribe_events_past_block(rpc_test_setup: (std::sync::Arc, Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("ws://{}", server.local_addr().expect("Retrieving server local address")); + let _server_handle = server.start(StarknetWsRpcApiV0_8_0Server::into_rpc(starknet)); + let client = WsClientBuilder::default().build(&server_url).await.expect("Building client"); + + let mut generator = block_generator(&backend); + + // Generate first 3 blocks but ignore their events + for _ in 0..3 { + let _ = generator.next().expect("Retrieving block"); + } + + let mut expected_events = vec![]; + + // Collect events from blocks 3-9 + for _ in 3..10 { + let events = generator.next().expect("Retrieving block"); + for event in events { + expected_events.push(event); + } + } + + let block_id = BlockId::Number(3); + let mut sub = client.subscribe_events(None, None, Some(block_id)).await.expect("Subscribing to events"); + + for event in expected_events { + let received = sub.next().await.expect("Subscribing closed").expect("Failed to retrieve event"); + assert_eq!(received, event); + } + } +} diff --git a/crates/madara/client/rpc/src/versions/user/v0_8_0/methods/ws/subscribe_new_heads.rs b/crates/madara/client/rpc/src/versions/user/v0_8_0/methods/ws/subscribe_new_heads.rs new file mode 100644 index 000000000..742981c66 --- /dev/null +++ b/crates/madara/client/rpc/src/versions/user/v0_8_0/methods/ws/subscribe_new_heads.rs @@ -0,0 +1,370 @@ +use mp_block::{BlockId, BlockTag}; + +use crate::errors::{ErrorExtWs, OptionExtWs, StarknetWsApiError}; + +use super::BLOCK_PAST_LIMIT; + +pub async fn subscribe_new_heads( + starknet: &crate::Starknet, + subscription_sink: jsonrpsee::PendingSubscriptionSink, + block_id: BlockId, +) -> Result<(), StarknetWsApiError> { + let sink = subscription_sink.accept().await.or_internal_server_error("Failed to establish websocket connection")?; + + let mut block_n = match block_id { + BlockId::Number(block_n) => { + let err = || format!("Failed to retrieve block info for block {block_n}"); + let block_latest = starknet + .backend + .get_block_n(&BlockId::Tag(BlockTag::Latest)) + .or_else_internal_server_error(err)? + .ok_or(StarknetWsApiError::NoBlocks)?; + + if block_n < block_latest.saturating_sub(BLOCK_PAST_LIMIT) { + return Err(StarknetWsApiError::TooManyBlocksBack); + } + + block_n + } + BlockId::Hash(block_hash) => { + let err = || format!("Failed to retrieve block info at hash {block_hash:#x}"); + let block_latest = starknet + .backend + .get_block_n(&BlockId::Tag(BlockTag::Latest)) + .or_else_internal_server_error(err)? + .ok_or(StarknetWsApiError::NoBlocks)?; + + let block_n = starknet + .backend + .get_block_n(&block_id) + .or_else_internal_server_error(err)? + .ok_or(StarknetWsApiError::BlockNotFound)?; + + if block_n < block_latest.saturating_sub(BLOCK_PAST_LIMIT) { + return Err(StarknetWsApiError::TooManyBlocksBack); + } + + block_n + } + BlockId::Tag(BlockTag::Latest) => starknet + .backend + .get_latest_block_n() + .or_internal_server_error("Failed to retrieve block info for latest block")? + .ok_or(StarknetWsApiError::NoBlocks)?, + BlockId::Tag(BlockTag::Pending) => { + return Err(StarknetWsApiError::Pending); + } + }; + + let mut rx = starknet.backend.subscribe_block_info(); + for n in block_n.. { + if sink.is_closed() { + return Ok(()); + } + + let block_info = match starknet.backend.get_block_info(&BlockId::Number(n)) { + Ok(Some(block_info)) => { + let err = || format!("Failed to retrieve block info for block {n}"); + block_info.as_nonpending_owned().ok_or_else_internal_server_error(err)? + } + Ok(None) => break, + Err(e) => { + let err = format!("Failed to retrieve block info for block {n}: {e}"); + return Err(StarknetWsApiError::internal_server_error(err)); + } + }; + + send_block_header(&sink, block_info, block_n).await?; + block_n = block_n.saturating_add(1); + } + + // Catching up with the backend + loop { + tokio::select! { + block_info = rx.recv() => { + let block_info = block_info.or_internal_server_error("Failed to retrieve block info")?; + if block_info.header.block_number == block_n { + break send_block_header(&sink, block_info, block_n).await?; + } + }, + _ = sink.closed() => { + return Ok(()) + } + } + } + + // New block headers + loop { + tokio::select! { + block_info = rx.recv() => { + let block_info = block_info.or_internal_server_error("Failed to retrieve block info")?; + if block_info.header.block_number == block_n + 1 { + send_block_header(&sink, block_info, block_n).await?; + } else { + let err = format!( + "Received non-sequential block {}, expected {}", + block_info.header.block_number, + block_n + 1 + ); + return Err(StarknetWsApiError::internal_server_error(err)); + } + block_n = block_n.saturating_add(1); + }, + _ = sink.closed() => { + return Ok(()) + } + } + } +} + +async fn send_block_header<'a>( + sink: &jsonrpsee::core::server::SubscriptionSink, + block_info: mp_block::MadaraBlockInfo, + block_n: u64, +) -> Result<(), StarknetWsApiError> { + let header = starknet_types_rpc::BlockHeader::from(block_info); + let msg = jsonrpsee::SubscriptionMessage::from_json(&header) + .or_else_internal_server_error(|| format!("Failed to create response message for block {block_n}"))?; + + sink.send(msg).await.or_internal_server_error("Failed to respond to websocket request")?; + + Ok(()) +} + +#[cfg(test)] +mod test { + use super::*; + + use jsonrpsee::ws_client::WsClientBuilder; + use starknet_types_core::felt::Felt; + + use crate::{ + test_utils::rpc_test_setup, + versions::user::v0_8_0::{NewHead, StarknetWsRpcApiV0_8_0Client, StarknetWsRpcApiV0_8_0Server}, + Starknet, + }; + + fn block_generator(backend: &mc_db::MadaraBackend) -> impl Iterator + '_ { + (0..).map(|n| { + backend + .store_block( + mp_block::MadaraMaybePendingBlock { + info: mp_block::MadaraMaybePendingBlockInfo::NotPending(mp_block::MadaraBlockInfo { + header: mp_block::Header { + parent_block_hash: Felt::from(n), + block_number: n, + ..Default::default() + }, + block_hash: Felt::from(n), + tx_hashes: vec![], + }), + inner: mp_block::MadaraBlockInner { transactions: vec![], receipts: vec![] }, + }, + mp_state_update::StateDiff::default(), + vec![], + None, + None, + ) + .expect("Storing block"); + + let block_info = backend + .get_block_info(&BlockId::Number(n)) + .expect("Retrieving block info") + .expect("Retrieving block info") + .as_nonpending_owned() + .expect("Retrieving block info"); + + NewHead::from(block_info) + }) + } + + #[tokio::test] + #[rstest::rstest] + async fn subscribe_new_heads(rpc_test_setup: (std::sync::Arc, Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("ws://{}", server.local_addr().expect("Retrieving server local address")); + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetWsRpcApiV0_8_0Server::into_rpc(starknet)); + let client = WsClientBuilder::default().build(&server_url).await.expect("Building client"); + + let mut generator = block_generator(&backend); + let expected = generator.next().expect("Retrieving block from backend"); + + let mut sub = + client.subscribe_new_heads(BlockId::Tag(BlockTag::Latest)).await.expect("starknet_subscribeNewHeads"); + + let next = sub.next().await; + let header = next.expect("Waiting for block header").expect("Waiting for block header"); + + assert_eq!( + header, + expected, + "actual: {}\nexpect: {}", + serde_json::to_string_pretty(&header).unwrap_or_default(), + serde_json::to_string_pretty(&expected).unwrap_or_default() + ); + } + + #[tokio::test] + #[rstest::rstest] + async fn subscribe_new_heads_many(rpc_test_setup: (std::sync::Arc, Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("ws://{}", server.local_addr().expect("Retrieving server local address")); + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetWsRpcApiV0_8_0Server::into_rpc(starknet)); + let client = WsClientBuilder::default().build(&server_url).await.expect("Building client"); + + let generator = block_generator(&backend); + let expected: Vec<_> = generator.take(BLOCK_PAST_LIMIT as usize).collect(); + + let mut sub = client.subscribe_new_heads(BlockId::Number(0)).await.expect("starknet_subscribeNewHeads"); + + for e in expected { + let next = sub.next().await; + let header = next.expect("Waiting for block header").expect("Waiting for block header"); + + assert_eq!( + header, + e, + "actual: {}\nexpect: {}", + serde_json::to_string_pretty(&header).unwrap_or_default(), + serde_json::to_string_pretty(&e).unwrap_or_default() + ); + } + } + + #[tokio::test] + #[rstest::rstest] + async fn subscribe_new_heads_disconnect(rpc_test_setup: (std::sync::Arc, Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("ws://{}", server.local_addr().expect("Retrieving server local address")); + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetWsRpcApiV0_8_0Server::into_rpc(starknet)); + let client = WsClientBuilder::default().build(&server_url).await.expect("Building client"); + + let mut generator = block_generator(&backend); + let expected = generator.next().expect("Retrieving block from backend"); + + let mut sub = client.subscribe_new_heads(BlockId::Number(0)).await.expect("starknet_subscribeNewHeads"); + + let next = sub.next().await; + let header = next.expect("Waiting for block header").expect("Waiting for block header"); + + assert_eq!( + header, + expected, + "actual: {}\nexpect: {}", + serde_json::to_string_pretty(&header).unwrap_or_default(), + serde_json::to_string_pretty(&expected).unwrap_or_default() + ); + + let next = sub.unsubscribe().await; + assert!(next.is_ok()); + } + + #[tokio::test] + #[rstest::rstest] + async fn subscribe_new_heads_future(rpc_test_setup: (std::sync::Arc, Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("ws://{}", server.local_addr().expect("Retrieving server local address")); + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetWsRpcApiV0_8_0Server::into_rpc(starknet)); + let client = WsClientBuilder::default().build(&server_url).await.expect("Building client"); + + let mut generator = block_generator(&backend); + let _block_0 = generator.next().expect("Retrieving block from backend"); + + let mut sub = client.subscribe_new_heads(BlockId::Number(1)).await.expect("starknet_subscribeNewHeads"); + + let block_1 = generator.next().expect("Retrieving block from backend"); + + let next = sub.next().await; + let header = next.expect("Waiting for block header").expect("Waiting for block header"); + + // Note that `sub` does not yield block 0. This is because it starts + // from block 1, ignoring any block before. This can server to notify + // when a block is ready + assert_eq!( + header, + block_1, + "actual: {}\nexpect: {}", + serde_json::to_string_pretty(&header).unwrap_or_default(), + serde_json::to_string_pretty(&block_1).unwrap_or_default() + ); + } + + #[tokio::test] + #[rstest::rstest] + async fn subscribe_new_heads_err_too_far_back_block_n( + rpc_test_setup: (std::sync::Arc, Starknet), + ) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("ws://{}", server.local_addr().expect("Retrieving server local address")); + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetWsRpcApiV0_8_0Server::into_rpc(starknet)); + let client = WsClientBuilder::default().build(&server_url).await.expect("Building client"); + + // We generate BLOCK_PAST_LIMIT + 2 because genesis is block 0 + let generator = block_generator(&backend); + let _expected: Vec<_> = generator.take(BLOCK_PAST_LIMIT as usize + 2).collect(); + + let mut sub = client.subscribe_new_heads(BlockId::Number(0)).await.expect("starknet_subscribeNewHeads"); + + // Jsonrsee seems to just close the connection and not return the error + // to the client so this is the best we can do :/ + let next = sub.next().await; + assert!(next.is_none()); + } + + #[tokio::test] + #[rstest::rstest] + async fn subscribe_new_heads_err_too_far_back_block_hash( + rpc_test_setup: (std::sync::Arc, Starknet), + ) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("ws://{}", server.local_addr().expect("Retrieving server local address")); + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetWsRpcApiV0_8_0Server::into_rpc(starknet)); + let client = WsClientBuilder::default().build(&server_url).await.expect("Building client"); + + // We generate BLOCK_PAST_LIMIT + 2 because genesis is block 0 + let generator = block_generator(&backend); + let _expected: Vec<_> = generator.take(BLOCK_PAST_LIMIT as usize + 2).collect(); + + let mut sub = + client.subscribe_new_heads(BlockId::Hash(Felt::from(0))).await.expect("starknet_subscribeNewHeads"); + + // Jsonrsee seems to just close the connection and not return the error + // to the client so this is the best we can do :/ + let next = sub.next().await; + assert!(next.is_none()); + } + + #[tokio::test] + #[rstest::rstest] + async fn subscribe_new_heads_err_pending(rpc_test_setup: (std::sync::Arc, Starknet)) { + let (backend, starknet) = rpc_test_setup; + let server = jsonrpsee::server::Server::builder().build("127.0.0.1:0").await.expect("Starting server"); + let server_url = format!("ws://{}", server.local_addr().expect("Retrieving server local address")); + // Server will be stopped once this is dropped + let _server_handle = server.start(StarknetWsRpcApiV0_8_0Server::into_rpc(starknet)); + let client = WsClientBuilder::default().build(&server_url).await.expect("Building client"); + + let generator = block_generator(&backend); + let _expected: Vec<_> = generator.take(BLOCK_PAST_LIMIT as usize + 2).collect(); + + let mut sub = + client.subscribe_new_heads(BlockId::Tag(BlockTag::Pending)).await.expect("starknet_subscribeNewHeads"); + + // Jsonrsee seems to just close the connection and not return the error + // to the client so this is the best we can do :/ + let next = sub.next().await; + assert!(next.is_none()); + } +} diff --git a/crates/madara/node/Cargo.toml b/crates/madara/node/Cargo.toml index 05240ceea..4b1e7c4bf 100644 --- a/crates/madara/node/Cargo.toml +++ b/crates/madara/node/Cargo.toml @@ -60,8 +60,8 @@ serde_json.workspace = true serde_yaml.workspace = true thiserror.workspace = true tokio.workspace = true -tower-http.workspace = true tower.workspace = true +tower-http.workspace = true url.workspace = true #Instrumentation diff --git a/crates/madara/primitives/receipt/src/lib.rs b/crates/madara/primitives/receipt/src/lib.rs index ea384360f..d785621a9 100644 --- a/crates/madara/primitives/receipt/src/lib.rs +++ b/crates/madara/primitives/receipt/src/lib.rs @@ -110,6 +110,16 @@ impl TransactionReceipt { } } + pub fn into_events(self) -> Vec { + match self { + TransactionReceipt::Invoke(receipt) => receipt.events, + TransactionReceipt::L1Handler(receipt) => receipt.events, + TransactionReceipt::Declare(receipt) => receipt.events, + TransactionReceipt::Deploy(receipt) => receipt.events, + TransactionReceipt::DeployAccount(receipt) => receipt.events, + } + } + pub fn execution_result(&self) -> ExecutionResult { match self { TransactionReceipt::Invoke(receipt) => receipt.execution_result.clone(), diff --git a/crates/madara/primitives/utils/Cargo.toml b/crates/madara/primitives/utils/Cargo.toml index aab8d40c2..183e8aa0e 100644 --- a/crates/madara/primitives/utils/Cargo.toml +++ b/crates/madara/primitives/utils/Cargo.toml @@ -32,8 +32,8 @@ rand.workspace = true rayon.workspace = true serde.workspace = true serde_yaml.workspace = true -tokio-util.workspace = true tokio.workspace = true +tokio-util.workspace = true url.workspace = true #Instrumentation diff --git a/crates/madara/tests/Cargo.toml b/crates/madara/tests/Cargo.toml index 1254ffd7b..8e1b4d1f1 100644 --- a/crates/madara/tests/Cargo.toml +++ b/crates/madara/tests/Cargo.toml @@ -26,5 +26,5 @@ starknet-providers.workspace = true starknet-types-core.workspace = true tempfile.workspace = true tokio = { workspace = true, features = ["rt", "macros"] } -tracing-subscriber.workspace = true tracing.workspace = true +tracing-subscriber.workspace = true diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 619461109..58748b24f 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,4 +1,4 @@ [toolchain] channel = "1.81" -components = ["rustfmt", "clippy", "rust-analyzer"] +components = ["rust-src", "rustfmt", "clippy", "rust-analyzer"] profile = "minimal" diff --git a/taplo/taplo.toml b/taplo/taplo.toml index e49b66281..339d907fa 100644 --- a/taplo/taplo.toml +++ b/taplo/taplo.toml @@ -1,5 +1,4 @@ include = ["**/*.toml"] -exclude = ["**/bad.toml"] [formatting] align_entries = false