diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index ad7698ada5..c52d643acf 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -7,6 +7,11 @@ license = { workspace = true } rust-version = { workspace = true } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +aggregate_bloom = [] + +default = [] + [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } diff --git a/crates/rpc/fixtures/mainnet.sqlite b/crates/rpc/fixtures/mainnet.sqlite index f86d30b16e..88b5ca98c3 100644 Binary files a/crates/rpc/fixtures/mainnet.sqlite and b/crates/rpc/fixtures/mainnet.sqlite differ diff --git a/crates/rpc/src/method/get_events.rs b/crates/rpc/src/method/get_events.rs index ab59924e00..c52c0c8130 100644 --- a/crates/rpc/src/method/get_events.rs +++ b/crates/rpc/src/method/get_events.rs @@ -217,6 +217,17 @@ pub async fn get_events( offset: requested_offset, }; + // TODO: + // Instrumentation and `AggregateBloom` version of fetching events + // for the given `EventFilter` are under a feature flag for now and + // we do not execute them during testing because they would only + // slow the tests down and would not have any impact on their outcome. + // Follow-up PR will use the `AggregateBloom` logic to create the output, + // then the conditions will be removed. + + #[cfg(all(feature = "aggregate_bloom", not(test)))] + let start = std::time::Instant::now(); + let page = transaction .events( &filter, @@ -228,6 +239,40 @@ pub async fn get_events( EventFilterError::PageSizeTooSmall => GetEventsError::Custom(e.into()), })?; + #[cfg(all(feature = "aggregate_bloom", not(test)))] + { + let elapsed = start.elapsed(); + + tracing::info!( + "Getting events (individual Bloom filters) took {:?}", + elapsed + ); + + let start = std::time::Instant::now(); + let page_from_aggregate = transaction + .events_from_aggregate(&filter, context.config.get_events_max_blocks_to_scan) + .map_err(|e| match e { + EventFilterError::Internal(e) => GetEventsError::Internal(e), + EventFilterError::PageSizeTooSmall => GetEventsError::Custom(e.into()), + })?; + let elapsed = start.elapsed(); + + tracing::info!( + "Getting events (aggregate Bloom filters) took {:?}", + elapsed + ); + + if page != page_from_aggregate { + tracing::error!( + "Page of events from individual and aggregate bloom filters does not match!" + ); + tracing::error!("Individual: {:?}", page); + tracing::error!("Aggregate: {:?}", page_from_aggregate); + } else { + tracing::info!("Page of events from individual and aggregate bloom filters match!"); + } + } + let mut events = GetEventsResult { events: page.events.into_iter().map(|e| e.into()).collect(), continuation_token: page.continuation_token.map(|token| { diff --git a/crates/rpc/src/method/subscribe_events.rs b/crates/rpc/src/method/subscribe_events.rs index 537cba978d..ab3683d482 100644 --- a/crates/rpc/src/method/subscribe_events.rs +++ b/crates/rpc/src/method/subscribe_events.rs @@ -250,14 +250,14 @@ mod tests { use tokio::sync::mpsc; use crate::context::{RpcConfig, RpcContext}; - use crate::jsonrpc::{handle_json_rpc_socket, RpcRouter}; + use crate::jsonrpc::{handle_json_rpc_socket, RpcRouter, CATCH_UP_BATCH_SIZE}; use crate::pending::PendingWatcher; use crate::v02::types::syncing::Syncing; use crate::{v08, Notifications, Reorg, SyncState}; #[tokio::test] async fn no_filtering() { - let num_blocks = 2000; + let num_blocks = 80; let router = setup(num_blocks).await; let (sender_tx, mut sender_rx) = mpsc::channel(1024); let (receiver_tx, receiver_rx) = mpsc::channel(1024); @@ -319,14 +319,14 @@ mod tests { #[tokio::test] async fn filter_from_address() { - let router = setup(2000).await; + let router = setup(80).await; let (sender_tx, mut sender_rx) = mpsc::channel(1024); let (receiver_tx, receiver_rx) = mpsc::channel(1024); handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx); let params = serde_json::json!( { "block": {"block_number": 0}, - "from_address": "0x90", + "from_address": "0x46", } ); receiver_tx @@ -351,7 +351,7 @@ mod tests { } _ => panic!("Expected text message"), }; - let expected = sample_event_message(0x90, subscription_id); + let expected = sample_event_message(0x46, subscription_id); let event = sender_rx.recv().await.unwrap().unwrap(); let json: serde_json::Value = match event { Message::Text(json) => serde_json::from_str(&json).unwrap(), @@ -371,9 +371,9 @@ mod tests { .context .notifications .l2_blocks - .send(sample_block(0x90).into()) + .send(sample_block(0x46).into()) .unwrap(); - let expected = sample_event_message(0x90, subscription_id); + let expected = sample_event_message(0x46, subscription_id); let event = sender_rx.recv().await.unwrap().unwrap(); let json: serde_json::Value = match event { Message::Text(json) => serde_json::from_str(&json).unwrap(), @@ -385,14 +385,14 @@ mod tests { #[tokio::test] async fn filter_keys() { - let router = setup(2000).await; + let router = setup(80).await; let (sender_tx, mut sender_rx) = mpsc::channel(1024); let (receiver_tx, receiver_rx) = mpsc::channel(1024); handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx); let params = serde_json::json!( { "block": {"block_number": 0}, - "keys": [["0x90"], [], ["0x92", "0x93"]], + "keys": [["0x46"], [], ["0x47", "0x48"]], } ); receiver_tx @@ -417,7 +417,7 @@ mod tests { } _ => panic!("Expected text message"), }; - let expected = sample_event_message(0x90, subscription_id); + let expected = sample_event_message(0x46, subscription_id); let event = sender_rx.recv().await.unwrap().unwrap(); let json: serde_json::Value = match event { Message::Text(json) => serde_json::from_str(&json).unwrap(), @@ -437,9 +437,9 @@ mod tests { .context .notifications .l2_blocks - .send(sample_block(0x90).into()) + .send(sample_block(0x46).into()) .unwrap(); - let expected = sample_event_message(0x90, subscription_id); + let expected = sample_event_message(0x46, subscription_id); let event = sender_rx.recv().await.unwrap().unwrap(); let json: serde_json::Value = match event { Message::Text(json) => serde_json::from_str(&json).unwrap(), @@ -451,15 +451,15 @@ mod tests { #[tokio::test] async fn filter_from_address_and_keys() { - let router = setup(2000).await; + let router = setup(80).await; let (sender_tx, mut sender_rx) = mpsc::channel(1024); let (receiver_tx, receiver_rx) = mpsc::channel(1024); handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx); let params = serde_json::json!( { "block": {"block_number": 0}, - "from_address": "0x90", - "keys": [["0x90"], [], ["0x92", "0x93"]], + "from_address": "0x46", + "keys": [["0x46"], [], ["0x47", "0x48"]], } ); receiver_tx @@ -484,7 +484,7 @@ mod tests { } _ => panic!("Expected text message"), }; - let expected = sample_event_message(0x90, subscription_id); + let expected = sample_event_message(0x46, subscription_id); let event = sender_rx.recv().await.unwrap().unwrap(); let json: serde_json::Value = match event { Message::Text(json) => serde_json::from_str(&json).unwrap(), @@ -504,9 +504,9 @@ mod tests { .context .notifications .l2_blocks - .send(sample_block(0x90).into()) + .send(sample_block(0x46).into()) .unwrap(); - let expected = sample_event_message(0x90, subscription_id); + let expected = sample_event_message(0x46, subscription_id); let event = sender_rx.recv().await.unwrap().unwrap(); let json: serde_json::Value = match event { Message::Text(json) => serde_json::from_str(&json).unwrap(), @@ -518,32 +518,32 @@ mod tests { #[tokio::test] async fn too_many_keys_filter() { - let router = setup(2000).await; + let router = setup(80).await; let (sender_tx, mut sender_rx) = mpsc::channel(1024); let (receiver_tx, receiver_rx) = mpsc::channel(1024); handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx); let params = serde_json::json!( { "block": {"block_number": 0}, - "from_address": "0x90", + "from_address": "0x46", "keys": [ - ["0x91"], - ["0x92"], - ["0x93"], - ["0x94"], - ["0x95"], - ["0x96"], - ["0x97"], - ["0x98"], - ["0x99"], - ["0x9a"], - ["0x9b"], - ["0x9c"], - ["0x9d"], - ["0x9e"], - ["0x9f"], - ["0xa0"], - ["0xa1"], + ["0x46"], + ["0x47"], + ["0x48"], + ["0x49"], + ["0x4a"], + ["0x4b"], + ["0x4c"], + ["0x4d"], + ["0x4e"], + ["0x4f"], + ["0x50"], + ["0x51"], + ["0x52"], + ["0x53"], + ["0x54"], + ["0x55"], + ["0x56"], ], } ); @@ -644,6 +644,8 @@ mod tests { } async fn setup(num_blocks: u64) -> RpcRouter { + assert!(num_blocks == 0 || num_blocks > CATCH_UP_BATCH_SIZE); + let storage = StorageBuilder::in_memory().unwrap(); tokio::task::spawn_blocking({ let storage = storage.clone(); diff --git a/crates/storage/src/bloom.rs b/crates/storage/src/bloom.rs index f8fcd01a0a..b0ce2adcf8 100644 --- a/crates/storage/src/bloom.rs +++ b/crates/storage/src/bloom.rs @@ -60,6 +60,7 @@ //! specific set of keys without having to load and check each individual bloom //! filter. +use std::collections::BTreeSet; use std::sync::{Mutex, MutexGuard}; use bloomfilter::Bloom; @@ -79,71 +80,80 @@ pub const EVENT_KEY_FILTER_LIMIT: usize = 16; /// Before being added to `AggregateBloom`, each [`BloomFilter`] is /// rotated by 90 degrees (transposed). #[derive(Debug)] -pub(crate) struct AggregateBloom { +pub struct AggregateBloom { /// A [Self::BLOCK_RANGE_LEN] by [BloomFilter::BITVEC_LEN] matrix stored in /// a single array. bitmap: Vec, - /// Block range for which the aggregate filter is constructed. - block_range: std::ops::Range, - next_block: BlockNumber, + /// Starting (inclusive) block number for the range of blocks that this + /// aggregate covers. + pub from_block: BlockNumber, + /// Ending (inclusive) block number for the range of blocks that this + /// aggregate covers. + pub to_block: BlockNumber, } +// TODO: +// Delete after cfg flag is removed +#[allow(dead_code)] impl AggregateBloom { - // TODO: - // Remove #[allow(dead_code)] when follow up is done. - /// Maximum number of blocks to aggregate in a single `AggregateBloom`. - const BLOCK_RANGE_LEN: u64 = 32_768; + pub const BLOCK_RANGE_LEN: u64 = 32_768; const BLOCK_RANGE_BYTES: u64 = Self::BLOCK_RANGE_LEN / 8; - /// Create a new `AggregateBloom` for the (`from_block`, `from_block + - /// [Self::BLOCK_RANGE_LEN]`) range. - #[allow(dead_code)] + /// Create a new `AggregateBloom` for the (`from_block`, `from_block` + + /// [`block_range_length`](Self::BLOCK_RANGE_LEN) - 1) range. pub fn new(from_block: BlockNumber) -> Self { - let bitmap = vec![0; (Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN) as usize]; + let to_block = from_block + Self::BLOCK_RANGE_LEN - 1; + let bitmap = vec![0; Self::BLOCK_RANGE_BYTES as usize * BloomFilter::BITVEC_LEN as usize]; + Self::from_parts(from_block, to_block, bitmap) + } - let to_block = from_block + Self::BLOCK_RANGE_LEN; + pub fn from_existing_compressed( + from_block: u64, + to_block: u64, + compressed_bitmap: Vec, + ) -> Self { + let from_block = BlockNumber::new_or_panic(from_block); + let to_block = BlockNumber::new_or_panic(to_block); - Self { - bitmap, - block_range: from_block..to_block, - next_block: from_block, - } + let bitmap = zstd::bulk::decompress( + &compressed_bitmap, + AggregateBloom::BLOCK_RANGE_BYTES as usize * BloomFilter::BITVEC_LEN as usize, + ) + .expect("Decompressing aggregate Bloom filter"); + + Self::from_parts(from_block, to_block, bitmap) } - #[allow(dead_code)] - pub fn from_bytes(from_block: BlockNumber, bytes: Vec) -> Self { + fn from_parts(from_block: BlockNumber, to_block: BlockNumber, bitmap: Vec) -> Self { assert_eq!( - bytes.len() as u64, + from_block + Self::BLOCK_RANGE_LEN - 1, + to_block, + "Block range mismatch" + ); + assert_eq!( + bitmap.len() as u64, Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN, - "Bitmap size mismatch" + "Bitmap length mismatch" ); - let to_block = from_block + Self::BLOCK_RANGE_LEN; - Self { - bitmap: bytes, - block_range: from_block..to_block, - next_block: from_block, + bitmap, + from_block, + to_block, } } - #[allow(dead_code)] - pub fn to_bytes(&self) -> &[u8] { - &self.bitmap + pub fn compress_bitmap(&self) -> Vec { + zstd::bulk::compress(&self.bitmap, 10).expect("Compressing aggregate Bloom filter") } /// Rotate the bloom filter by 90 degrees and add it to the aggregate. - #[allow(dead_code)] - pub fn add_bloom( - &mut self, - bloom: &BloomFilter, - insert_pos: BlockNumber, - ) -> Result<(), AddBloomError> { - if !self.block_range.contains(&insert_pos) { - return Err(AddBloomError::InvalidBlockNumber); - } - assert_eq!(self.next_block, insert_pos, "Unexpected insert position"); + pub fn add_bloom(&mut self, bloom: &BloomFilter, block_number: BlockNumber) { + assert!( + (self.from_block..=self.to_block).contains(&block_number), + "Invalid block number", + ); assert_eq!( bloom.0.number_of_hash_functions(), BloomFilter::K_NUM, @@ -157,8 +167,9 @@ impl AggregateBloom { "Bit vector length mismatch" ); - let byte_index = (insert_pos.get() / 8) as usize; - let bit_index = (insert_pos.get() % 8) as usize; + let relative_block_number = block_number.get() - self.from_block.get(); + let byte_idx = (relative_block_number / 8) as usize; + let bit_idx = (relative_block_number % 8) as usize; for (i, bloom_byte) in bloom.iter().enumerate() { if *bloom_byte == 0 { continue; @@ -167,48 +178,54 @@ impl AggregateBloom { let base = 8 * i; for j in 0..8 { let row_idx = base + j; - let idx = Self::bitmap_index_at(row_idx, byte_index); - self.bitmap[idx] |= ((bloom_byte >> (7 - j)) & 1) << bit_index; + *self.bitmap_at_mut(row_idx, byte_idx) |= ((bloom_byte >> (7 - j)) & 1) << bit_idx; } } + } - self.next_block += 1; - if self.next_block >= self.block_range.end { - tracing::info!( - "Block limit reached for [{}, {}) range", - self.block_range.start, - self.block_range.end - ); - return Err(AddBloomError::BlockLimitReached); + pub fn blocks_for_filter(&self, filter: &crate::EventFilter) -> BTreeSet { + // Empty filters are considered present in all blocks. + if filter.contract_address.is_none() && (filter.keys.iter().flatten().count() == 0) { + return (self.from_block.get()..=self.to_block.get()) + .map(BlockNumber::new_or_panic) + .collect(); } - Ok(()) - } - - #[allow(dead_code)] - pub fn blocks_for_filter(&self, filter: &crate::EventFilter) -> Vec { - let mut keys = vec![]; + let mut blocks: BTreeSet<_> = filter + .keys + .iter() + .enumerate() + .flat_map(|(idx, keys)| { + let keys: Vec<_> = keys + .iter() + .map(|key| { + let mut key_with_idx = key.0; + key_with_idx.as_mut_be_bytes()[0] |= (idx as u8) << 4; + key_with_idx + }) + .collect(); + + self.blocks_for_keys(&keys) + }) + .collect(); if let Some(contract_address) = filter.contract_address { - keys.push(contract_address.0); + blocks.extend(self.blocks_for_keys(&[contract_address.0])); } - filter.keys.iter().flatten().for_each(|k| keys.push(k.0)); - self.blocks_for_keys(keys) + blocks } - #[allow(dead_code)] - fn blocks_for_keys(&self, keys: Vec) -> Vec { + fn blocks_for_keys(&self, keys: &[Felt]) -> Vec { let mut block_matches = vec![]; for k in keys { let mut row_to_check = vec![u8::MAX; Self::BLOCK_RANGE_BYTES as usize]; - let indices = BloomFilter::indices_for_key(&k); + let indices = BloomFilter::indices_for_key(k); for row_idx in indices { for (col_idx, row_byte) in row_to_check.iter_mut().enumerate() { - let idx = Self::bitmap_index_at(row_idx, col_idx); - *row_byte &= self.bitmap[idx]; + *row_byte &= self.bitmap_at(row_idx, col_idx); } } @@ -219,7 +236,8 @@ impl AggregateBloom { for i in 0..8 { if byte & (1 << i) != 0 { - block_matches.push(BlockNumber::new_or_panic((col_idx * 8 + i) as u64)); + let match_number = self.from_block + col_idx as u64 * 8 + i as u64; + block_matches.push(match_number); } } } @@ -228,16 +246,15 @@ impl AggregateBloom { block_matches } - #[allow(dead_code)] - fn bitmap_index_at(row: usize, col: usize) -> usize { - row * Self::BLOCK_RANGE_BYTES as usize + col + fn bitmap_at(&self, row: usize, col: usize) -> u8 { + let idx = row * Self::BLOCK_RANGE_BYTES as usize + col; + self.bitmap[idx] } -} -#[derive(Debug)] -pub enum AddBloomError { - BlockLimitReached, - InvalidBlockNumber, + fn bitmap_at_mut(&mut self, row: usize, col: usize) -> &mut u8 { + let idx = row * Self::BLOCK_RANGE_BYTES as usize + col; + &mut self.bitmap[idx] + } } #[derive(Clone)] @@ -354,6 +371,9 @@ impl BloomFilter { // Workaround to get the indices of the keys in the filter. // Needed because the `bloomfilter` crate doesn't provide a // way to get this information. + // TODO: + // Delete after cfg flag is removed + #[allow(dead_code)] fn indices_for_key(key: &Felt) -> Vec { // Use key on an empty Bloom filter let mut bloom = Self::new(); @@ -400,7 +420,6 @@ impl Cache { #[cfg(test)] mod tests { - use assert_matches::assert_matches; use pathfinder_common::felt; use super::*; @@ -439,148 +458,141 @@ mod tests { bloom.set(&KEY); bloom.set(&KEY1); - aggregate_bloom_filter - .add_bloom(&bloom, from_block) - .unwrap(); + aggregate_bloom_filter.add_bloom(&bloom, from_block); - let block_matches = aggregate_bloom_filter.blocks_for_keys(vec![KEY]); + let filter = crate::EventFilter { + keys: vec![vec![EventKey(KEY)]], + contract_address: None, + ..Default::default() + }; + let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]); + assert_eq!(block_matches, vec![from_block]); + + let block_matches: Vec<_> = aggregate_bloom_filter + .blocks_for_filter(&filter) + .into_iter() + .collect(); assert_eq!(block_matches, vec![from_block]); } #[test] #[cfg_attr(not(feature = "aggregate_bloom"), ignore)] - fn add_blooms_and_check_multiple_blocks_found() { - let from_block = BlockNumber::new_or_panic(0); + fn aggregate_bloom_past_first_range() { + let from_block = BlockNumber::new_or_panic(AggregateBloom::BLOCK_RANGE_LEN); let mut aggregate_bloom_filter = AggregateBloom::new(from_block); let mut bloom = BloomFilter::new(); bloom.set(&KEY); + bloom.set(&KEY1); - aggregate_bloom_filter - .add_bloom(&bloom, from_block) - .unwrap(); - aggregate_bloom_filter - .add_bloom(&bloom, from_block + 1) - .unwrap(); + let filter = crate::EventFilter { + keys: vec![vec![EventKey(KEY)]], + contract_address: None, + ..Default::default() + }; - let block_matches = aggregate_bloom_filter.blocks_for_keys(vec![KEY]); + aggregate_bloom_filter.add_bloom(&bloom, from_block); - assert_eq!(block_matches, vec![from_block, from_block + 1]); + let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]); + assert_eq!(block_matches, vec![from_block]); + + let block_matches: Vec<_> = aggregate_bloom_filter + .blocks_for_filter(&filter) + .into_iter() + .collect(); + assert_eq!(block_matches, vec![from_block]); } #[test] #[cfg_attr(not(feature = "aggregate_bloom"), ignore)] - fn key_not_in_filter_returns_empty_vec() { + fn add_blooms_and_check_multiple_blocks_found() { let from_block = BlockNumber::new_or_panic(0); let mut aggregate_bloom_filter = AggregateBloom::new(from_block); let mut bloom = BloomFilter::new(); bloom.set(&KEY); - bloom.set(&KEY1); - aggregate_bloom_filter - .add_bloom(&bloom, from_block) - .unwrap(); - aggregate_bloom_filter - .add_bloom(&bloom, from_block + 1) - .unwrap(); + aggregate_bloom_filter.add_bloom(&bloom, from_block); + aggregate_bloom_filter.add_bloom(&bloom, from_block + 1); - let block_matches_empty = aggregate_bloom_filter.blocks_for_keys(vec![KEY_NOT_IN_FILTER]); + let filter = crate::EventFilter { + keys: vec![vec![EventKey(KEY)]], + contract_address: None, + ..Default::default() + }; - assert_eq!(block_matches_empty, Vec::::new()); + let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]); + assert_eq!(block_matches, vec![from_block, from_block + 1]); + + let block_matches: Vec<_> = aggregate_bloom_filter + .blocks_for_filter(&filter) + .into_iter() + .collect(); + assert_eq!(block_matches, vec![from_block, from_block + 1]); } #[test] #[cfg_attr(not(feature = "aggregate_bloom"), ignore)] - fn serialize_aggregate_roundtrip() { + fn key_not_in_filter_returns_empty_vec() { let from_block = BlockNumber::new_or_panic(0); let mut aggregate_bloom_filter = AggregateBloom::new(from_block); let mut bloom = BloomFilter::new(); bloom.set(&KEY); + bloom.set(&KEY1); - aggregate_bloom_filter - .add_bloom(&bloom, from_block) - .unwrap(); - aggregate_bloom_filter - .add_bloom(&bloom, from_block + 1) - .unwrap(); - - let bytes = aggregate_bloom_filter.to_bytes(); - let aggregate_bloom_filter = AggregateBloom::from_bytes(from_block, bytes.to_vec()); + aggregate_bloom_filter.add_bloom(&bloom, from_block); + aggregate_bloom_filter.add_bloom(&bloom, from_block + 1); - let block_matches = aggregate_bloom_filter.blocks_for_keys(vec![KEY]); - let block_matches_empty = aggregate_bloom_filter.blocks_for_keys(vec![KEY_NOT_IN_FILTER]); + let block_matches_empty = aggregate_bloom_filter.blocks_for_keys(&[KEY_NOT_IN_FILTER]); - assert_eq!(block_matches, vec![from_block, from_block + 1]); assert_eq!(block_matches_empty, Vec::::new()); } #[test] #[cfg_attr(not(feature = "aggregate_bloom"), ignore)] - fn block_limit_reached_after_full_range() { - impl AggregateBloom { - /// Real [Self::add_bloom] makes this test last way to long - fn add_bloom_mock(&mut self) { - self.next_block += 1; - } - } - + fn serialize_aggregate_roundtrip() { let from_block = BlockNumber::new_or_panic(0); let mut aggregate_bloom_filter = AggregateBloom::new(from_block); let mut bloom = BloomFilter::new(); bloom.set(&KEY); - for _ in from_block.get()..(AggregateBloom::BLOCK_RANGE_LEN - 1) { - aggregate_bloom_filter.add_bloom_mock(); - } + aggregate_bloom_filter.add_bloom(&bloom, from_block); + aggregate_bloom_filter.add_bloom(&bloom, from_block + 1); - let last_block = from_block + AggregateBloom::BLOCK_RANGE_LEN - 1; - assert_matches!( - aggregate_bloom_filter.add_bloom(&bloom, last_block), - Err(AddBloomError::BlockLimitReached) + let compressed_bitmap = aggregate_bloom_filter.compress_bitmap(); + let mut decompressed = AggregateBloom::from_existing_compressed( + aggregate_bloom_filter.from_block.get(), + aggregate_bloom_filter.to_block.get(), + compressed_bitmap, ); - } - - #[test] - #[cfg_attr(not(feature = "aggregate_bloom"), ignore)] - fn invalid_insert_pos() { - let from_block = BlockNumber::new_or_panic(0); - let mut aggregate_bloom_filter = AggregateBloom::new(from_block); + decompressed.add_bloom(&bloom, from_block + 2); - let mut bloom = BloomFilter::new(); - bloom.set(&KEY); + let block_matches = decompressed.blocks_for_keys(&[KEY]); + let block_matches_empty = decompressed.blocks_for_keys(&[KEY_NOT_IN_FILTER]); - aggregate_bloom_filter - .add_bloom(&bloom, from_block) - .unwrap(); - - let invalid_insert_pos = from_block + AggregateBloom::BLOCK_RANGE_LEN; - assert_matches!( - aggregate_bloom_filter.add_bloom(&bloom, invalid_insert_pos), - Err(AddBloomError::InvalidBlockNumber) + assert_eq!( + block_matches, + vec![from_block, from_block + 1, from_block + 2] ); + assert_eq!(block_matches_empty, Vec::::new()); } #[test] #[cfg_attr(not(feature = "aggregate_bloom"), ignore)] #[should_panic] - fn skipping_a_block_panics() { + fn invalid_insert_pos() { let from_block = BlockNumber::new_or_panic(0); let mut aggregate_bloom_filter = AggregateBloom::new(from_block); let mut bloom = BloomFilter::new(); bloom.set(&KEY); - aggregate_bloom_filter - .add_bloom(&bloom, from_block) - .unwrap(); + aggregate_bloom_filter.add_bloom(&bloom, from_block); - let skipped_block = from_block + 2; - aggregate_bloom_filter - .add_bloom(&bloom, skipped_block) - .unwrap(); + let invalid_insert_pos = from_block + AggregateBloom::BLOCK_RANGE_LEN; + aggregate_bloom_filter.add_bloom(&bloom, invalid_insert_pos); } } diff --git a/crates/storage/src/connection/event.rs b/crates/storage/src/connection/event.rs index c779dac3c2..e6fe68711a 100644 --- a/crates/storage/src/connection/event.rs +++ b/crates/storage/src/connection/event.rs @@ -1,5 +1,7 @@ use std::num::NonZeroUsize; +#[cfg(feature = "aggregate_bloom")] +use anyhow::Context; use anyhow::Result; use pathfinder_common::event::Event; use pathfinder_common::{ @@ -12,8 +14,8 @@ use pathfinder_common::{ }; #[cfg(feature = "aggregate_bloom")] -use crate::bloom::AddBloomError; -use crate::bloom::{AggregateBloom, BloomFilter}; +use crate::bloom::AggregateBloom; +use crate::bloom::BloomFilter; use crate::prelude::*; use crate::ReorgCounter; @@ -66,14 +68,51 @@ pub struct PageOfEvents { } impl Transaction<'_> { + #[cfg(feature = "aggregate_bloom")] + pub(super) fn upsert_block_events_aggregate( + &self, + block_number: BlockNumber, + events: &[Event], + ) -> anyhow::Result<()> { + #[rustfmt::skip] + let mut stmt = self.inner().prepare_cached( + "INSERT INTO starknet_event_filters_aggregate (from_block, to_block, bloom) \ + VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET bloom=excluded.bloom", + )?; + + let mut running_aggregate = match self.load_aggregate_bloom(block_number)? { + // Loading existing block range + Some(aggregate) => aggregate, + // New block range reached + None => AggregateBloom::new(block_number), + }; + + let mut bloom = BloomFilter::new(); + for event in events { + bloom.set_keys(&event.keys); + bloom.set_address(&event.from_address); + } + + running_aggregate.add_bloom(&bloom, block_number); + + stmt.execute(params![ + &running_aggregate.from_block, + &running_aggregate.to_block, + &running_aggregate.compress_bitmap() + ])?; + + Ok(()) + } + pub(super) fn upsert_block_events<'a>( &self, block_number: BlockNumber, events: impl Iterator, ) -> anyhow::Result<()> { + #[rustfmt::skip] let mut stmt = self.inner().prepare_cached( - "INSERT INTO starknet_events_filters (block_number, bloom) VALUES (?, ?) ON CONFLICT \ - DO UPDATE SET bloom=excluded.bloom", + "INSERT INTO starknet_events_filters (block_number, bloom) VALUES (?, ?) \ + ON CONFLICT DO UPDATE SET bloom=excluded.bloom", )?; let mut bloom = BloomFilter::new(); @@ -249,87 +288,131 @@ impl Transaction<'_> { } }; - // TODO: - // The logic that constructs aggregate bloom filters is temporarily - // placed here, in order to compare with the current implementation. - // It will be moved to sync as a follow up. - #[cfg(feature = "aggregate_bloom")] - { - let mut aggregates = vec![]; - let mut running_aggregate = AggregateBloom::new(from_block); + match result { + ScanResult::Done => { + return Ok(PageOfEvents { + events: emitted_events, + continuation_token: None, + }) + } + ScanResult::PageFull => { + assert!(emitted_events.len() > filter.page_size); + let continuation_token = continuation_token( + &emitted_events, + ContinuationToken { + block_number: from_block, + offset: filter.offset, + }, + ) + .unwrap(); + emitted_events.truncate(filter.page_size); - let mut blocks_from_individual = vec![]; + return Ok(PageOfEvents { + events: emitted_events, + continuation_token: Some(ContinuationToken { + block_number: continuation_token.block_number, + // account for the extra event + offset: continuation_token.offset - 1, + }), + }); + } + ScanResult::ContinueFrom(block_number) => { + // We've reached a search limit without filling the page. + // We'll need to continue from the next block. + return Ok(PageOfEvents { + events: emitted_events, + continuation_token: Some(ContinuationToken { + block_number, + offset: 0, + }), + }); + } + } + } - for block_num in from_block.get()..=to_block.get() { - if block_num as usize >= max_blocks_to_scan.get() { - break; - } + // TODO: + // This function is temporarily here to compare the performance of the new + // aggregate bloom filter. + #[cfg(feature = "aggregate_bloom")] + pub fn events_from_aggregate( + &self, + filter: &EventFilter, + max_blocks_to_scan: NonZeroUsize, + ) -> Result { + use std::collections::BTreeSet; - let block_num = BlockNumber::new_or_panic(block_num); + if filter.page_size < 1 { + return Err(EventFilterError::PageSizeTooSmall); + } - // TODO: - // Using single block `BloomFilter` API for now since we don't have - // a table for `AggregateBloom` yet. - let bloom = self.load_bloom(reorg_counter, block_num)?; - match bloom { - Filter::Missing => {} - Filter::Cached(bloom) => { - if bloom.check_filter(filter) { - blocks_from_individual.push(block_num); - } + let from_block = filter.from_block.unwrap_or(BlockNumber::GENESIS); + let to_block = filter.to_block.unwrap_or(BlockNumber::MAX); + let key_filter_is_empty = filter.keys.iter().flatten().count() == 0; - match running_aggregate.add_bloom(&bloom, block_num) { - Ok(_) => {} - Err(AddBloomError::BlockLimitReached) => { - aggregates.push(running_aggregate); - running_aggregate = AggregateBloom::new(block_num + 1); - } - Err(AddBloomError::InvalidBlockNumber) => { - unreachable!() // For now. - } - } - } - Filter::Loaded(bloom) => { - if bloom.check_filter(filter) { - blocks_from_individual.push(block_num); - } + let mut emitted_events = Vec::new(); - match running_aggregate.add_bloom(&bloom, block_num) { - Ok(_) => {} - Err(AddBloomError::BlockLimitReached) => { - aggregates.push(running_aggregate); - running_aggregate = AggregateBloom::new(block_num + 1); - } - Err(AddBloomError::InvalidBlockNumber) => { - unreachable!() // For now. - } - } - } - } - } + let mut blocks_scanned: usize = 0; + let mut offset = filter.offset; - // Remainder of (to_block - from_block) % AggregateBloom::BLOCK_RANGE_LEN - aggregates.push(running_aggregate); + enum ScanResult { + Done, + PageFull, + ContinueFrom(BlockNumber), + } - let blocks_from_aggregate = aggregates.iter().fold(vec![], |mut acc, aggregate| { + let aggregates = self.load_aggregate_bloom_range(from_block, to_block)?; + let mut filtered_blocks = aggregates + .iter() + .fold(BTreeSet::new(), |mut acc, aggregate| { acc.extend(aggregate.blocks_for_filter(filter)); acc }); - if blocks_from_individual != blocks_from_aggregate { - tracing::error!("Blocks from individual and aggregate bloom filter do not match"); - tracing::error!("Individual: {:?}", blocks_from_individual,); - tracing::error!("Aggregate: {:?}", blocks_from_aggregate,); + filtered_blocks.retain(|&block| block >= from_block && block <= to_block); + + let mut blocks_iter = filtered_blocks.iter(); + let result = loop { + let Some(&block) = blocks_iter.next() else { + break ScanResult::Done; + }; + + // Stop if we're past the last block. + if block > to_block { + break ScanResult::Done; } - } - match result { - ScanResult::Done => { - return Ok(PageOfEvents { - events: emitted_events, - continuation_token: None, - }) + // Check if we've reached our block scan limit + blocks_scanned += 1; + if blocks_scanned > max_blocks_to_scan.get() { + tracing::trace!("Block scan limit reached"); + break ScanResult::ContinueFrom(block); + } + + match self.scan_block_into( + block, + filter, + key_filter_is_empty, + offset, + &mut emitted_events, + )? { + BlockScanResult::NoSuchBlock => break ScanResult::Done, + BlockScanResult::Done { new_offset } => { + offset = new_offset; + } + } + + // Stop if we have a page of events plus an extra one to decide if we're on + // the last page. + if emitted_events.len() > filter.page_size { + break ScanResult::PageFull; } + }; + + match result { + ScanResult::Done => Ok(PageOfEvents { + events: emitted_events, + continuation_token: None, + }), ScanResult::PageFull => { assert!(emitted_events.len() > filter.page_size); let continuation_token = continuation_token( @@ -342,25 +425,25 @@ impl Transaction<'_> { .unwrap(); emitted_events.truncate(filter.page_size); - return Ok(PageOfEvents { + Ok(PageOfEvents { events: emitted_events, continuation_token: Some(ContinuationToken { block_number: continuation_token.block_number, // account for the extra event offset: continuation_token.offset - 1, }), - }); + }) } ScanResult::ContinueFrom(block_number) => { // We've reached a search limit without filling the page. // We'll need to continue from the next block. - return Ok(PageOfEvents { + Ok(PageOfEvents { events: emitted_events, continuation_token: Some(ContinuationToken { block_number, offset: 0, }), - }); + }) } } } @@ -467,28 +550,69 @@ impl Transaction<'_> { }) } - // TODO: - // Implement once [`AggregateBloom`] table is added. - fn _running_bloom_aggregate(&self) -> Result, anyhow::Error> { - // Fetch running aggregate from DB - unimplemented!() + #[cfg(feature = "aggregate_bloom")] + fn load_aggregate_bloom( + &self, + block_number: BlockNumber, + ) -> anyhow::Result> { + #[rustfmt::skip] + let mut select_stmt = self.inner().prepare_cached( + "SELECT from_block, to_block, bloom FROM starknet_event_filters_aggregate \ + WHERE from_block <= ? AND to_block >= ?", + )?; + + let aggregate = select_stmt + .query_row(params![&block_number, &block_number], |row| { + let from_block: u64 = row.get(0)?; + let to_block: u64 = row.get(1)?; + let compressed_bitmap: Vec = row.get(2)?; + + Ok((from_block, to_block, compressed_bitmap)) + }) + .optional() + .context("Querying running bloom aggregate")? + .map(|(from_block, to_block, compressed_bitmap)| { + AggregateBloom::from_existing_compressed(from_block, to_block, compressed_bitmap) + }); + + Ok(aggregate) } - fn _load_bloom_range( + #[cfg(feature = "aggregate_bloom")] + fn load_aggregate_bloom_range( &self, - _from_block: BlockNumber, - _to_block: BlockNumber, + start_block: BlockNumber, + end_block: BlockNumber, ) -> anyhow::Result> { - // Should be something like: - // (from_block..to_block) - // .chunks(AggregateBloom::BLOCK_RANGE_LEN) - // .iter() - // .enumerate() - // .map(|(i, _)| { - // // load from DB where ID is i - // }) - // .collect() - unimplemented!() + #[rustfmt::skip] + let mut stmt = self.inner().prepare_cached( + "SELECT from_block, to_block, bloom FROM starknet_event_filters_aggregate \ + WHERE from_block <= :end_block AND to_block >= :start_block \ + ORDER BY from_block", + )?; + + let aggregates = stmt + .query_map( + named_params![ + ":end_block": &end_block, + ":start_block": &start_block + ], + |row| { + let from_block: u64 = row.get(0)?; + let to_block: u64 = row.get(1)?; + let compressed_bitmap: Vec = row.get(2)?; + + Ok(AggregateBloom::from_existing_compressed( + from_block, + to_block, + compressed_bitmap, + )) + }, + ) + .context("Querying bloom filter range")? + .collect::, _>>()?; + + Ok(aggregates) } } @@ -584,6 +708,14 @@ mod tests { continuation_token: None, } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -724,6 +856,14 @@ mod tests { continuation_token: None, } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -755,6 +895,14 @@ mod tests { continuation_token: None, } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -788,6 +936,14 @@ mod tests { } ); + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } + // test continuation token let filter = EventFilter { from_block: Some(events.continuation_token.unwrap().block_number), @@ -810,6 +966,14 @@ mod tests { continuation_token: None, } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -840,6 +1004,14 @@ mod tests { continuation_token: None, } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -870,6 +1042,14 @@ mod tests { continuation_token: None, } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -900,11 +1080,20 @@ mod tests { } ); + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } + // try event keys in the wrong order, should not match let filter = EventFilter { keys: vec![vec![expected_event.keys[1]], vec![expected_event.keys[0]]], ..filter }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -915,6 +1104,14 @@ mod tests { continuation_token: None, } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -943,6 +1140,14 @@ mod tests { continuation_token: None, } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -960,6 +1165,7 @@ mod tests { page_size: 10, offset: 0, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -974,6 +1180,14 @@ mod tests { } ); + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } + let filter = EventFilter { from_block: None, to_block: None, @@ -982,6 +1196,7 @@ mod tests { page_size: 10, offset: 10, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -996,6 +1211,14 @@ mod tests { } ); + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } + let filter = EventFilter { from_block: None, to_block: None, @@ -1004,6 +1227,7 @@ mod tests { page_size: 10, offset: 30, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -1014,6 +1238,14 @@ mod tests { continuation_token: None } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -1032,6 +1264,7 @@ mod tests { // _after_ the last one offset: test_utils::NUM_BLOCKS * test_utils::EVENTS_PER_BLOCK, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -1042,6 +1275,14 @@ mod tests { continuation_token: None, } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -1065,6 +1306,7 @@ mod tests { page_size: 2, offset: 0, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -1079,6 +1321,14 @@ mod tests { } ); + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } + // increase offset let filter: EventFilter = EventFilter { from_block: None, @@ -1088,6 +1338,7 @@ mod tests { page_size: 2, offset: 2, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -1102,6 +1353,14 @@ mod tests { } ); + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } + // using the continuation token should be equivalent to the previous query let filter: EventFilter = EventFilter { from_block: Some(BlockNumber::new_or_panic(0)), @@ -1111,6 +1370,7 @@ mod tests { page_size: 2, offset: 2, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -1125,6 +1385,14 @@ mod tests { } ); + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } + // increase offset by two let filter = EventFilter { from_block: None, @@ -1134,6 +1402,7 @@ mod tests { page_size: 2, offset: 4, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -1145,6 +1414,14 @@ mod tests { } ); + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } + // using the continuation token should be equivalent to the previous query let filter = EventFilter { from_block: Some(BlockNumber::new_or_panic(3)), @@ -1154,6 +1431,7 @@ mod tests { page_size: 2, offset: 1, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -1164,6 +1442,14 @@ mod tests { continuation_token: None, } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -1181,6 +1467,7 @@ mod tests { page_size: 20, offset: 0, }; + let events = tx .events(&filter, 1.try_into().unwrap(), *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -1195,6 +1482,14 @@ mod tests { } ); + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, 1.try_into().unwrap()) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } + let filter = EventFilter { from_block: Some(BlockNumber::new_or_panic(1)), to_block: None, @@ -1203,6 +1498,7 @@ mod tests { page_size: 20, offset: 0, }; + let events = tx .events(&filter, 1.try_into().unwrap(), *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -1216,13 +1512,17 @@ mod tests { }), } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, 1.try_into().unwrap()) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] - // TODO: - // This fails when "aggregate_bloom" feature is enabled because in that case all filters are - // loaded twice. We can ignore it for now. - #[cfg_attr(feature = "aggregate_bloom", ignore)] fn bloom_filter_load_limit() { let (storage, test_data) = test_utils::setup_test_storage(); let emitted_events = test_data.events; @@ -1237,6 +1537,7 @@ mod tests { page_size: emitted_events.len(), offset: 0, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, 1.try_into().unwrap()) .unwrap(); @@ -1251,6 +1552,17 @@ mod tests { } ); + // TODO: + // This does not match at the moment because aggregate bloom implementation + // does not have a limit on the number of bloom filters to load. + #[cfg(all(feature = "aggregate_bloom", any()))] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } + let filter = EventFilter { from_block: Some(BlockNumber::new_or_panic(1)), to_block: None, @@ -1259,6 +1571,7 @@ mod tests { page_size: emitted_events.len(), offset: 0, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, 1.try_into().unwrap()) .unwrap(); @@ -1272,5 +1585,16 @@ mod tests { }), } ); + + // TODO: + // This does not match at the moment because aggregate bloom implementation + // does not have a limit on the number of bloom filters to load. + #[cfg(all(feature = "aggregate_bloom", any()))] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } } diff --git a/crates/storage/src/connection/transaction.rs b/crates/storage/src/connection/transaction.rs index 7187bec55e..91894e86dc 100644 --- a/crates/storage/src/connection/transaction.rs +++ b/crates/storage/src/connection/transaction.rs @@ -167,6 +167,12 @@ impl Transaction<'_> { .context("Inserting transaction data")?; if let Some(events) = events { + #[cfg(feature = "aggregate_bloom")] + { + let events: Vec = events.iter().flatten().cloned().collect(); + self.upsert_block_events_aggregate(block_number, &events) + .context("Inserting events into Bloom filter aggregate")?; + } let events = events.iter().flatten(); self.upsert_block_events(block_number, events) .context("Inserting events into Bloom filter")?; @@ -210,6 +216,12 @@ impl Transaction<'_> { ]) .context("Updating events")?; + #[cfg(feature = "aggregate_bloom")] + { + let events: Vec = events.iter().flatten().cloned().collect(); + self.upsert_block_events_aggregate(block_number, &events) + .context("Inserting events into Bloom filter aggregate")?; + } self.upsert_block_events(block_number, events.iter().flatten()) .context("Inserting events into Bloom filter")?; diff --git a/crates/storage/src/schema.rs b/crates/storage/src/schema.rs index 88a8af88e8..ffe92147ec 100644 --- a/crates/storage/src/schema.rs +++ b/crates/storage/src/schema.rs @@ -25,6 +25,8 @@ mod revision_0062; mod revision_0063; mod revision_0064; mod revision_0065; +#[cfg(feature = "aggregate_bloom")] +mod revision_0066; pub(crate) use base::base_schema; @@ -58,6 +60,8 @@ pub fn migrations() -> &'static [MigrationFn] { revision_0063::migrate, revision_0064::migrate, revision_0065::migrate, + #[cfg(feature = "aggregate_bloom")] + revision_0066::migrate, ] } diff --git a/crates/storage/src/schema/revision_0066.rs b/crates/storage/src/schema/revision_0066.rs new file mode 100644 index 0000000000..52159524a0 --- /dev/null +++ b/crates/storage/src/schema/revision_0066.rs @@ -0,0 +1,77 @@ +use anyhow::Context; +use pathfinder_common::BlockNumber; +use rusqlite::params; + +use crate::bloom::{AggregateBloom, BloomFilter}; + +#[allow(dead_code)] +pub(crate) fn migrate(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> { + tracing::warn!("Creating starknet_event_filters table with aggregate bloom filters"); + + let mut select_old_filters_query = + tx.prepare("SELECT bloom FROM starknet_events_filters ORDER BY block_number")?; + + let mut bloom_filters_bytes = select_old_filters_query + .query_map(params![], |row| { + let bytes = row.get::<_, Vec>(0)?; + + Ok(bytes) + }) + .context("Selecting old filters")?; + + let mut bloom_filters = vec![]; + loop { + let Some(bloom) = bloom_filters_bytes.next().transpose()? else { + break; + }; + + bloom_filters.push(BloomFilter::from_compressed_bytes(&bloom)); + } + + tx.execute( + "CREATE TABLE starknet_event_filters_aggregate ( + from_block INTEGER NOT NULL, + to_block INTEGER NOT NULL, + bloom BLOB, + UNIQUE(from_block, to_block) + )", + params![], + ) + .context("Creating starknet_event_filters_aggregate table")?; + + bloom_filters + .chunks(AggregateBloom::BLOCK_RANGE_LEN as usize) + .enumerate() + .try_for_each(|(i, bloom_filter_chunk)| -> anyhow::Result<()> { + let from_block = i as u64 * AggregateBloom::BLOCK_RANGE_LEN; + let to_block = from_block + AggregateBloom::BLOCK_RANGE_LEN - 1; + let from_block = BlockNumber::new_or_panic(from_block); + let to_block = BlockNumber::new_or_panic(to_block); + + let mut aggregate = AggregateBloom::new(from_block); + + for (j, bloom_filter) in bloom_filter_chunk.iter().enumerate() { + let block_number = from_block + j as u64; + + aggregate.add_bloom(bloom_filter, block_number); + } + + tx.execute( + "INSERT INTO starknet_event_filters_aggregate (from_block, to_block, bloom) + VALUES (?, ?, ?)", + params![ + &from_block.get(), + &to_block.get(), + &aggregate.compress_bitmap() + ], + ) + .context("Inserting aggregate bloom filter")?; + + Ok(()) + })?; + + // TODO: + // Delete old filters table + + Ok(()) +}