Skip to content

Commit

Permalink
feat(rpc): implement starknet_subscribeEvents WebSocket endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
jbcaron committed Jan 15, 2025
1 parent 4d1c4b3 commit 008a6c2
Show file tree
Hide file tree
Showing 20 changed files with 1,038 additions and 391 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ Here is a list of all the supported methods with their current status:
| ------ | ------------------------------------------------ |
|| `starknet_unsubscribe` (v0.8.0) |
|| `starknet_subscribeNewHeads` (v0.8.0) |
| | `starknet_subscribeEvents` (v0.8.0) |
| | `starknet_subscribeEvents` (v0.8.0) |
|| `starknet_subscribeTransactionStatus` (v0.8.0) |
|| `starknet_subscribePendingTransactions` (v0.8.0) |
|| `starknet_subscriptionReorg` (v0.8.0) |
Expand Down
1 change: 1 addition & 0 deletions crates/madara/client/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mp-utils = { workspace = true }
blockifier = { workspace = true }
bonsai-trie = { workspace = true }
starknet-types-core = { workspace = true }
starknet-types-rpc = { workspace = true }
starknet_api = { workspace = true }

# Other
Expand Down
29 changes: 29 additions & 0 deletions crates/madara/client/db/src/block_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use mp_state_update::StateDiff;
use rocksdb::WriteOptions;
use starknet_api::core::ChainId;
use starknet_types_core::felt::Felt;
use starknet_types_rpc::EmittedEvent;

type Result<T, E = MadaraStorageError> = std::result::Result<T, E>;

Expand Down Expand Up @@ -313,6 +314,29 @@ impl MadaraBackend {
tracing::debug!("Failed to send block info to subscribers: {e}");
}
}
if self.sender_event.receiver_count() > 0 {
let block_number = block.info.header.block_number;
let block_hash = block.info.block_hash;

block
.inner
.receipts
.iter()
.flat_map(|receipt| {
let tx_hash = receipt.transaction_hash();
receipt.events().iter().map(move |event| (tx_hash, event))
})
.for_each(|(transaction_hash, event)| {
if let Err(e) = self.sender_event.publish(EmittedEvent {
event: event.clone().into(),
block_hash: Some(block_hash),
block_number: Some(block_number),
transaction_hash,
}) {
tracing::debug!("Failed to send event to subscribers: {e}");
}
});
}

// clear pending
tx.delete_cf(&meta, ROW_PENDING_INFO);
Expand Down Expand Up @@ -400,6 +424,11 @@ impl MadaraBackend {
self.sender_block_info.subscribe()
}

#[tracing::instrument(skip(self), fields(module = "BlockDB"))]
pub fn subscribe_events(&self, from_address: Option<Felt>) -> tokio::sync::broadcast::Receiver<EmittedEvent<Felt>> {
self.sender_event.subscribe(from_address)
}

#[tracing::instrument(skip(self, id), fields(module = "BlockDB"))]
pub fn get_block_inner(&self, id: &impl DbBlockIdResolvable) -> Result<Option<MadaraBlockInner>> {
let Some(ty) = id.resolve_db_block_id(self)? else { return Ok(None) };
Expand Down
123 changes: 123 additions & 0 deletions crates/madara/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use rocksdb::{
};
use rocksdb_options::rocksdb_global_options;
use snapshots::Snapshots;
use starknet_types_core::felt::Felt;
use starknet_types_core::hash::{Pedersen, Poseidon, StarkHash};
use starknet_types_rpc::EmittedEvent;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::{fmt, fs};
Expand Down Expand Up @@ -272,6 +274,124 @@ impl Default for TrieLogConfig {
}
}

/// EventChannels manages a pub/sub system for events with 16 specific channels plus one "all" channel.
/// Events are distributed based on the sender's address in the event, where each sender address
/// is mapped to one of the 16 specific channels using a modulo operation on the address bytes.
/// Subscribers can choose to either receive all events or only events from specific senders
/// by subscribing to the corresponding channel.
pub struct EventChannels {
/// Broadcast channel that receives all events regardless of their sender's address
all_channels: tokio::sync::broadcast::Sender<EmittedEvent<Felt>>,
/// Array of 16 broadcast channels, each handling events from a subset of sender addresses
/// The target channel for an event is determined by: sender_address % 16
specific_channels: [tokio::sync::broadcast::Sender<EmittedEvent<Felt>>; 16],
}

impl EventChannels {
/// Creates a new EventChannels instance with the specified buffer capacity for each channel.
/// Each channel (both all_channels and specific channels) will be able to buffer up to
/// `capacity` events before older events are dropped.
///
/// # Arguments
/// * `capacity` - The maximum number of events that can be buffered in each channel
///
/// # Returns
/// A new EventChannels instance with initialized broadcast channels
pub fn new(capacity: usize) -> Self {
let (all_channels, _) = tokio::sync::broadcast::channel(capacity);

let mut specific_channels = Vec::with_capacity(16);
for _ in 0..16 {
let (sender, _) = tokio::sync::broadcast::channel(capacity);
specific_channels.push(sender);
}

Self { all_channels, specific_channels: specific_channels.try_into().unwrap() }
}

/// Subscribes to events based on an optional sender address filter
///
/// # Arguments
/// * `from_address` - Optional sender address to filter events:
/// * If `Some(address)`, subscribes only to events from senders whose addresses map
/// to the same channel as the provided address (address % 16)
/// * If `None`, subscribes to all events regardless of sender address
///
/// # Returns
/// A broadcast::Receiver that will receive either:
/// * All events (if from_address is None)
/// * Only events from senders whose addresses map to the same channel as the provided address
///
/// # Warning
/// This method only provides a coarse filtering mechanism based on address mapping (address % 16).
/// You will still need to implement additional filtering in your receiver logic because:
/// * Multiple sender addresses map to the same channel
/// * You may want to match the exact sender address rather than just its channel mapping
///
/// # Implementation Details
/// When a specific address is provided, the method:
/// 1. Calculates the channel index using (address % 16)
/// 2. Subscribes to the corresponding specific channel
///
/// This means you'll receive events from all senders whose addresses map to the same channel
pub fn subscribe(&self, from_address: Option<Felt>) -> tokio::sync::broadcast::Receiver<EmittedEvent<Felt>> {
match from_address {
Some(address) => {
let channel_index = self.calculate_channel_index(&address);
self.specific_channels[channel_index].subscribe()
}
None => self.all_channels.subscribe(),
}
}

/// Publishes an event to both the all_channels and the specific channel determined by the sender's address.
/// The event will only be sent to channels that have active subscribers.
///
/// # Arguments
/// * `event` - The event to publish, containing the sender's address that determines the target specific channel
///
/// # Returns
/// * `Ok(usize)` - The sum of the number of subscribers that received the event across both channels
/// * `Ok(0)` - If no subscribers exist in any channel
/// * `Err` - If the event couldn't be sent
pub fn publish(
&self,
event: EmittedEvent<Felt>,
) -> Result<usize, Box<tokio::sync::broadcast::error::SendError<EmittedEvent<Felt>>>> {
let channel_index = self.calculate_channel_index(&event.event.from_address);
let specific_channel = &self.specific_channels[channel_index];

let mut total = 0;
if self.all_channels.receiver_count() > 0 {
total += self.all_channels.send(event.clone())?;
}
if specific_channel.receiver_count() > 0 {
total += specific_channel.send(event)?;
}
Ok(total)
}

pub fn receiver_count(&self) -> usize {
self.all_channels.receiver_count() + self.specific_channels.iter().map(|c| c.receiver_count()).sum::<usize>()
}

/// Calculates the target channel index for a given sender's address
///
/// # Arguments
/// * `address` - The Felt address of the event sender to calculate the channel index for
///
/// # Returns
/// A channel index between 0 and 15, calculated by taking the last byte of the
/// sender's address modulo 16
///
/// # Implementation Details
/// Uses the last byte of the big-endian representation of the sender's address to determine
/// the channel, ensuring an even distribution of sender addresses across channels
fn calculate_channel_index(&self, address: &Felt) -> usize {
(address.to_bytes_be()[31] % 16) as usize
}
}

/// Madara client database backend singleton.
pub struct MadaraBackend {
backup_handle: Option<mpsc::Sender<BackupRequest>>,
Expand All @@ -281,6 +401,7 @@ pub struct MadaraBackend {
snapshots: Arc<Snapshots>,
trie_log_config: TrieLogConfig,
sender_block_info: tokio::sync::broadcast::Sender<mp_block::MadaraBlockInfo>,
sender_event: EventChannels,
write_opt_no_wal: WriteOptions,
#[cfg(any(test, feature = "testing"))]
_temp_dir: Option<tempfile::TempDir>,
Expand Down Expand Up @@ -386,6 +507,7 @@ impl MadaraBackend {
snapshots,
trie_log_config: Default::default(),
sender_block_info: tokio::sync::broadcast::channel(100).0,
sender_event: EventChannels::new(100),
write_opt_no_wal: make_write_opt_no_wal(),
_temp_dir: Some(temp_dir),
})
Expand Down Expand Up @@ -445,6 +567,7 @@ impl MadaraBackend {
snapshots,
trie_log_config,
sender_block_info: tokio::sync::broadcast::channel(100).0,
sender_event: EventChannels::new(100),
write_opt_no_wal: make_write_opt_no_wal(),
#[cfg(any(test, feature = "testing"))]
_temp_dir: None,
Expand Down
2 changes: 1 addition & 1 deletion crates/madara/client/gateway/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ starknet-types-rpc.workspace = true
anyhow.workspace = true
bytes.workspace = true
futures.workspace = true
http-body-util.workspace = true
http.workspace = true
http-body-util.workspace = true
hyper = { workspace = true, features = ["full"] }
hyper-tls.workspace = true
hyper-util.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/madara/client/gateway/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ hyper-util.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
thiserror.workspace = true
tokio-util.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/madara/client/mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ mockall = { workspace = true, optional = true }
reqwest.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio-util.workspace = true
tokio.workspace = true
tokio-util.workspace = true

#Instrumentation
opentelemetry = { workspace = true, features = ["metrics", "logs"] }
Expand Down
121 changes: 121 additions & 0 deletions crates/madara/client/rpc/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::fmt;

use starknet_types_core::felt::Felt;
use starknet_types_rpc::Event;

use crate::StarknetRpcApiError;

pub fn display_internal_server_error(err: impl fmt::Display) {
Expand Down Expand Up @@ -110,3 +113,121 @@ impl<T> OptionExt<T> for Option<T> {
}
}
}

/// Filters events based on the provided address and keys.
///
/// This function checks if an event matches the given address and keys.
/// If an address is provided, the event must originate from that address.
/// The event's keys must match the provided keys pattern.
///
/// # Arguments
///
/// * `event` - A reference to the event to be filtered.
/// * `address` - An optional address that the event must originate from.
/// * `keys` - An optional slice of key patterns that the event's keys must match.
///
/// # Returns
///
/// * `true` if the event matches the address and keys pattern.
/// * `false` otherwise.
#[inline]
pub fn event_match_filter(event: &Event<Felt>, address: Option<Felt>, keys: &Option<Vec<Vec<Felt>>>) -> bool {
// Check if the event's address matches the provided address, if any.
if let Some(addr) = address {
if addr != event.from_address {
return false;
}
}

// If keys are not provided, return true.
if let Some(keys) = keys {
// Check if the number of keys in the event matches the number of provided key patterns.
if keys.len() > event.event_content.keys.len() {
return false;
}

// Check if each key in the event matches the corresponding key pattern.
// Use iterators to traverse both keys and event.event_content.keys simultaneously.
for (pattern, key) in keys.iter().zip(event.event_content.keys.iter()) {
if !pattern.is_empty() && !pattern.contains(key) {
return false;
}
}
}

true
}

#[cfg(test)]
mod tests {
use starknet_types_rpc::EventContent;

use super::*;

fn event() -> Event<Felt> {
Event {
from_address: Felt::from_hex_unchecked("0x1234"),
event_content: EventContent {
data: vec![Felt::from_hex_unchecked("0x5678")],
keys: vec![Felt::from_hex_unchecked("0x1"), Felt::from_hex_unchecked("0x2")],
},
}
}

#[test]
fn test_address_and_keys_match() {
let event = event();
let address = Some(Felt::from_hex_unchecked("0x1234"));
let keys = Some(vec![vec![Felt::from_hex_unchecked("0x1")], vec![Felt::from_hex_unchecked("0x2")]]);
assert!(event_match_filter(&event, address, &keys));
}

#[test]
fn test_address_does_not_match() {
let event = event();
let address = Some(Felt::from_hex_unchecked("0x5678"));
let keys = Some(vec![vec![Felt::from_hex_unchecked("0x1")], vec![Felt::from_hex_unchecked("0x2")]]);
assert!(!event_match_filter(&event, address, &keys));
}

#[test]
fn test_keys_do_not_match() {
let event = event();
let address = Some(Felt::from_hex_unchecked("0x1234"));
let keys = Some(vec![vec![Felt::from_hex_unchecked("0x1")], vec![Felt::from_hex_unchecked("0x3")]]);
assert!(!event_match_filter(&event, address, &keys));
}

#[test]
fn test_no_address_provided() {
let event = event();
let address = None;
let keys = Some(vec![vec![Felt::from_hex_unchecked("0x1")], vec![Felt::from_hex_unchecked("0x2")]]);
assert!(event_match_filter(&event, address, &keys));
}

#[test]
fn test_no_keys_provided() {
let event = event();
let address = Some(Felt::from_hex_unchecked("0x1234"));
let keys = None;
assert!(event_match_filter(&event, address, &keys));
}

#[test]
fn test_keys_with_pattern() {
let event = event();
let address = Some(Felt::from_hex_unchecked("0x1234"));

// [0x1 | 0x2, 0x2]
let keys = Some(vec![
vec![Felt::from_hex_unchecked("0x1"), Felt::from_hex_unchecked("0x2")],
vec![Felt::from_hex_unchecked("0x2")],
]);
assert!(event_match_filter(&event, address, &keys));

// [_, 0x3 | 0x2]
let keys = Some(vec![vec![], vec![Felt::from_hex_unchecked("0x3"), Felt::from_hex_unchecked("0x2")]]);
assert!(event_match_filter(&event, address, &keys));
}
}
Loading

0 comments on commit 008a6c2

Please sign in to comment.