diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index 45636034e2..1f770f13cd 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -7,6 +7,11 @@ license = { workspace = true } rust-version = { workspace = true } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +aggregate_bloom = [] + +default = [] + [dependencies] anyhow = { workspace = true } base64 = { workspace = true } diff --git a/crates/storage/src/bloom.rs b/crates/storage/src/bloom.rs index f1b0e22722..055428272a 100644 --- a/crates/storage/src/bloom.rs +++ b/crates/storage/src/bloom.rs @@ -1,3 +1,65 @@ +//! A Bloom filter is a space-efficient probabilistic data structure that +//! is used to test whether an element is a member of a set. An empty Bloom +//! filter is a bit array of m bits, all set to 0. It is equipped with k +//! different hash functions, which map set elements to one of the m possible +//! array positions. To check whether an element is in the set, the element is +//! hashed k times and the bits at the resulting positions are checked. If all k +//! bits are set to 1, the element is considered to be in the set (false +//! positives are possible). In our case, each block is a set with its own +//! [`BloomFilter`] and the elements are block events' keys and contract +//! addresses. +//! +//! When considering scenarios where keys need to be checked for large ranges +//! of blocks, it can certainly take a long time if [`BloomFilter`]s for each +//! block are loaded and checked against one by one. A possible optimization is +//! to store aggregates of [`BloomFilter`]s for ranges of blocks and, once keys +//! need to be checked for that range, this [`AggregateBloom`] filter can +//! be loaded and checked against in a single shot. +//! +//! Example: A key K1 that is mapped to three (out of *eight) indices of the +//! bloom filter, needs to be added to an aggregate bloom filter for a range +//! of *ten blocks. (*these are illustratory numbers, in practice much larger +//! values are used). +//! +//! We start with an empty aggregate filter, an 8x10 bitmap full of zeroes. Rows +//! of this matrix represent bloom filter indices that keys can be mapped to +//! whereas the columns represent the blocks within the range for which the +//! aggregate bloom filter is used. +//! +//! HashFn(K1) = [0, 1, 0, 1, 1, 0, 0, 0] +//! +//! We are inserting K1 as a part of the first block. In order to insert the key +//! into the aggregate, we first rotate it clockwise by 90 degrees (turning it +//! from a row vector into a column vector). Then, we set the first bit (since +//! we are adding to the first block) of rows 1, 3 and 4 (zero based) because +//! bloom filter hash functions map K1 to these indices. After this, we are left +//! with the following bitmap: +//! +//! [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] +//! [1, 0, 0, 0, 0, 0, 0, 0, 0, 0] +//! [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] +//! [1, 0, 0, 0, 0, 0, 0, 0, 0, 0] +//! [1, 0, 0, 0, 0, 0, 0, 0, 0, 0] +//! [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] +//! [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] +//! [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] +//! +//! which we can now store. Next, to check if K1 has been added to any of the +//! blocks, we perform the following steps: +//! 1) Load an [`AggregateBloom`] filter with the previously stored bitmap. +//! 2) Obtain the indices to which K1 maps to (1, 3 and 4 in this example), +//! pluck out the corresponding rows and bitwise AND them together, leaving +//! us with a 1x10 bit vector. +//! 3) Indices of bits that are set in the bit vector obtained through step 2) +//! are block numbers to which K1 could have been added (or are false +//! positives due to how Bloom filters work). In this example, the first bit +//! will be set meaning block 0 could contain K1 (and it does since this is a +//! very simplified example). +//! +//! This way, it's possible to quickly figure out which blocks correspond to a +//! specific set of keys without having to load and check each individual bloom +//! filter. + use std::sync::{Mutex, MutexGuard}; use bloomfilter::Bloom; @@ -13,18 +75,185 @@ use crate::ReorgCounter; // filter. pub const EVENT_KEY_FILTER_LIMIT: usize = 16; +/// An aggregate of all Bloom filters for a given range of blocks. +/// Before being added to `AggregateBloom`, each [`BloomFilter`] is +/// rotated by 90 degrees. +#[derive(Debug)] +pub(crate) struct AggregateBloom { + /// A [Self::BLOCK_RANGE_LEN] by [BloomFilter::BITVEC_LEN] matrix stored in + /// a single array. + bitmap: Vec, + /// Block range for which the aggregate filter is constructed. + block_range: std::ops::Range, + next_block: BlockNumber, +} + +impl AggregateBloom { + // TODO: + // Remove #[allow(dead_code)] when follow up is done. + + /// Maximum number of blocks to aggregate in a single `AggregateBloom`. + const BLOCK_RANGE_LEN: u64 = 32_768; + const BLOCK_RANGE_BYTES: u64 = Self::BLOCK_RANGE_LEN / 8; + + /// Create a new `AggregateBloom` for the (`from_block`, `from_block + + /// [Self::BLOCK_RANGE_LEN]`) range. + #[allow(dead_code)] + pub fn new(from_block: BlockNumber) -> Self { + let bitmap = vec![0; (Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN) as usize]; + + let to_block = from_block + Self::BLOCK_RANGE_LEN; + + Self { + bitmap, + block_range: from_block..to_block, + next_block: from_block, + } + } + + #[allow(dead_code)] + pub fn from_bytes(from_block: BlockNumber, bytes: Vec) -> Self { + assert_eq!( + bytes.len() as u64, + Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN, + "Bitmap size mismatch" + ); + + let to_block = from_block + Self::BLOCK_RANGE_LEN; + + Self { + bitmap: bytes, + block_range: from_block..to_block, + next_block: from_block, + } + } + + #[allow(dead_code)] + pub fn to_bytes(&self) -> &[u8] { + &self.bitmap + } + + /// Rotate the bloom filter by 90 degrees and add it to the aggregate. + #[allow(dead_code)] + pub fn add_bloom( + &mut self, + bloom: &BloomFilter, + insert_pos: BlockNumber, + ) -> Result<(), AddBloomError> { + if !self.block_range.contains(&insert_pos) { + return Err(AddBloomError::InvalidBlockNumber); + } + assert_eq!(self.next_block, insert_pos, "Unexpected insert position"); + assert_eq!( + bloom.0.number_of_hash_functions(), + BloomFilter::K_NUM, + "Hash function count mismatch" + ); + + let bloom = bloom.0.bit_vec().to_bytes(); + assert_eq!( + bloom.len() as u64, + BloomFilter::BITVEC_BYTES, + "Bit vector length mismatch" + ); + + let byte_index = (insert_pos.get() / 8) as usize; + let bit_index = (insert_pos.get() % 8) as usize; + for (i, bloom_byte) in bloom.iter().enumerate() { + if *bloom_byte == 0 { + continue; + } + + let base = 8 * i; + for j in 0..8 { + let row_idx = base + j; + let idx = Self::bitmap_index_at(row_idx, byte_index); + self.bitmap[idx] |= ((bloom_byte >> (7 - j)) & 1) << bit_index; + } + } + + self.next_block += 1; + if self.next_block >= self.block_range.end { + tracing::info!( + "Block limit reached for [{}, {}) range", + self.block_range.start, + self.block_range.end + ); + return Err(AddBloomError::BlockLimitReached); + } + + Ok(()) + } + + #[allow(dead_code)] + pub fn blocks_for_filter(&self, filter: &crate::EventFilter) -> Vec { + let mut keys = vec![]; + + if let Some(contract_address) = filter.contract_address { + keys.push(contract_address.0); + } + filter.keys.iter().flatten().for_each(|k| keys.push(k.0)); + + self.blocks_for_keys(keys) + } + + #[allow(dead_code)] + fn blocks_for_keys(&self, keys: Vec) -> Vec { + let mut block_matches = vec![]; + + for k in keys { + let mut row_to_check = vec![u8::MAX; Self::BLOCK_RANGE_BYTES as usize]; + + let indices = BloomFilter::indices_for_key(&k); + for row_idx in indices { + for (col_idx, row_byte) in row_to_check.iter_mut().enumerate() { + let idx = Self::bitmap_index_at(row_idx, col_idx); + *row_byte &= self.bitmap[idx]; + } + } + + for (col_idx, byte) in row_to_check.iter().enumerate() { + if *byte == 0 { + continue; + } + + for i in 0..8 { + if byte & (1 << i) != 0 { + block_matches.push(BlockNumber::new_or_panic((col_idx * 8 + i) as u64)); + } + } + } + } + + block_matches + } + + #[allow(dead_code)] + fn bitmap_index_at(row: usize, col: usize) -> usize { + row * Self::BLOCK_RANGE_BYTES as usize + col + } +} + +#[derive(Debug)] +pub enum AddBloomError { + BlockLimitReached, + InvalidBlockNumber, +} + #[derive(Clone)] pub(crate) struct BloomFilter(Bloom); impl BloomFilter { + // The size of the bitmap used by the Bloom filter. + const BITVEC_LEN: u64 = 16_384; // The size of the bitmap used by the Bloom filter (in bytes). - const BITMAP_BYTES: u64 = 2048; - // The maximal number of items anticipated to be inserted into the Bloom filter. - const ITEMS_COUNT: u32 = 1024; + const BITVEC_BYTES: u64 = Self::BITVEC_LEN / 8; // The number of hash functions used by the Bloom filter. // We need this value to be able to re-create the filter with the deserialized // bitmap. const K_NUM: u32 = 12; + // The maximal number of items anticipated to be inserted into the Bloom filter. + const ITEMS_COUNT: u32 = 1024; // The seed used by the hash functions of the filter. // This is a randomly generated vector of 32 bytes. const SEED: [u8; 32] = [ @@ -35,7 +264,7 @@ impl BloomFilter { pub fn new() -> Self { let bloom = Bloom::new_with_seed( - Self::BITMAP_BYTES as usize, + Self::BITVEC_BYTES as usize, Self::ITEMS_COUNT as usize, &Self::SEED, ); @@ -45,7 +274,7 @@ impl BloomFilter { } pub fn from_compressed_bytes(bytes: &[u8]) -> Self { - let bytes = zstd::bulk::decompress(bytes, Self::BITMAP_BYTES as usize * 2) + let bytes = zstd::bulk::decompress(bytes, Self::BITVEC_BYTES as usize * 2) .expect("Decompressing Bloom filter"); Self::from_bytes(&bytes) } @@ -57,7 +286,7 @@ impl BloomFilter { let k4 = u64::from_le_bytes(Self::SEED[24..32].try_into().unwrap()); let bloom = Bloom::from_existing( bytes, - Self::BITMAP_BYTES * 8, + Self::BITVEC_BYTES * 8, Self::K_NUM, [(k1, k2), (k3, k4)], ); @@ -121,6 +350,24 @@ impl BloomFilter { self.check_keys(&filter.keys) } + + // Workaround to get the indices of the keys in the filter. + // Needed because the `bloomfilter` crate doesn't provide a + // way to get this information. + fn indices_for_key(key: &Felt) -> Vec { + // Use key on an empty Bloom filter + let mut bloom = Self::new(); + bloom.set(key); + + bloom + .0 + .bit_vec() + .iter() + .enumerate() + .filter(|(_, bit)| *bit) + .map(|(i, _)| i) + .collect() + } } type CacheKey = (crate::ReorgCounter, BlockNumber); @@ -153,11 +400,13 @@ impl Cache { #[cfg(test)] mod tests { + use assert_matches::assert_matches; use pathfinder_common::felt; use super::*; - const KEY: Felt = felt!("0x0218b538681900fad5a0b2ffe1d6781c0c3f14df5d32071ace0bdc9d46cb69eb"); + const KEY: Felt = felt!("0x0218b538681900fad5a0b2ffe1d6781c0c3f14df5d32071ace0bdc9d46cb69ea"); + const KEY1: Felt = felt!("0x0218b538681900fad5a0b2ffe1d6781c0c3f14df5d32071ace0bdc9d46cb69eb"); const KEY_NOT_IN_FILTER: Felt = felt!("0x0218b538681900fad5a0b2ffe1d6781c0c3f14df5d32071ace0bdc9d46cb69ec"); @@ -179,4 +428,159 @@ mod tests { assert!(bloom.check(&KEY)); assert!(!bloom.check(&KEY_NOT_IN_FILTER)); } + + #[test] + #[cfg_attr(not(feature = "aggregate_bloom"), ignore)] + fn add_bloom_and_check_single_block_found() { + let from_block = BlockNumber::new_or_panic(0); + let mut aggregate_bloom_filter = AggregateBloom::new(from_block); + + let mut bloom = BloomFilter::new(); + bloom.set(&KEY); + bloom.set(&KEY1); + + aggregate_bloom_filter + .add_bloom(&bloom, from_block) + .unwrap(); + + let block_matches = aggregate_bloom_filter.blocks_for_keys(vec![KEY]); + + assert_eq!(block_matches, vec![from_block]); + } + + #[test] + #[cfg_attr(not(feature = "aggregate_bloom"), ignore)] + fn add_blooms_and_check_multiple_blocks_found() { + let from_block = BlockNumber::new_or_panic(0); + let mut aggregate_bloom_filter = AggregateBloom::new(from_block); + + let mut bloom = BloomFilter::new(); + bloom.set(&KEY); + + aggregate_bloom_filter + .add_bloom(&bloom, from_block) + .unwrap(); + aggregate_bloom_filter + .add_bloom(&bloom, from_block + 1) + .unwrap(); + + let block_matches = aggregate_bloom_filter.blocks_for_keys(vec![KEY]); + + assert_eq!(block_matches, vec![from_block, from_block + 1]); + } + + #[test] + #[cfg_attr(not(feature = "aggregate_bloom"), ignore)] + fn key_not_in_filter_returns_empty_vec() { + let from_block = BlockNumber::new_or_panic(0); + let mut aggregate_bloom_filter = AggregateBloom::new(from_block); + + let mut bloom = BloomFilter::new(); + bloom.set(&KEY); + bloom.set(&KEY1); + + aggregate_bloom_filter + .add_bloom(&bloom, from_block) + .unwrap(); + aggregate_bloom_filter + .add_bloom(&bloom, from_block + 1) + .unwrap(); + + let block_matches_empty = aggregate_bloom_filter.blocks_for_keys(vec![KEY_NOT_IN_FILTER]); + + assert_eq!(block_matches_empty, Vec::::new()); + } + + #[test] + #[cfg_attr(not(feature = "aggregate_bloom"), ignore)] + fn serialize_aggregate_roundtrip() { + let from_block = BlockNumber::new_or_panic(0); + let mut aggregate_bloom_filter = AggregateBloom::new(from_block); + + let mut bloom = BloomFilter::new(); + bloom.set(&KEY); + + aggregate_bloom_filter + .add_bloom(&bloom, from_block) + .unwrap(); + aggregate_bloom_filter + .add_bloom(&bloom, from_block + 1) + .unwrap(); + + let bytes = aggregate_bloom_filter.to_bytes(); + let aggregate_bloom_filter = AggregateBloom::from_bytes(from_block, bytes.to_vec()); + + let block_matches = aggregate_bloom_filter.blocks_for_keys(vec![KEY]); + let block_matches_empty = aggregate_bloom_filter.blocks_for_keys(vec![KEY_NOT_IN_FILTER]); + + assert_eq!(block_matches, vec![from_block, from_block + 1]); + assert_eq!(block_matches_empty, Vec::::new()); + } + + #[test] + #[cfg_attr(not(feature = "aggregate_bloom"), ignore)] + fn block_limit_reached_after_full_range() { + impl AggregateBloom { + /// Real [Self::add_bloom] makes this test last way to long + fn add_bloom_mock(&mut self) { + self.next_block += 1; + } + } + + let from_block = BlockNumber::new_or_panic(0); + let mut aggregate_bloom_filter = AggregateBloom::new(from_block); + + let mut bloom = BloomFilter::new(); + bloom.set(&KEY); + + for _ in from_block.get()..(AggregateBloom::BLOCK_RANGE_LEN - 1) { + aggregate_bloom_filter.add_bloom_mock(); + } + + let last_block = from_block + AggregateBloom::BLOCK_RANGE_LEN - 1; + assert_matches!( + aggregate_bloom_filter.add_bloom(&bloom, last_block), + Err(AddBloomError::BlockLimitReached) + ); + } + + #[test] + #[cfg_attr(not(feature = "aggregate_bloom"), ignore)] + fn invalid_insert_pos() { + let from_block = BlockNumber::new_or_panic(0); + let mut aggregate_bloom_filter = AggregateBloom::new(from_block); + + let mut bloom = BloomFilter::new(); + bloom.set(&KEY); + + aggregate_bloom_filter + .add_bloom(&bloom, from_block) + .unwrap(); + + let invalid_insert_pos = from_block + AggregateBloom::BLOCK_RANGE_LEN; + assert_matches!( + aggregate_bloom_filter.add_bloom(&bloom, invalid_insert_pos), + Err(AddBloomError::InvalidBlockNumber) + ); + } + + #[test] + #[cfg_attr(not(feature = "aggregate_bloom"), ignore)] + #[should_panic] + fn skipping_a_block_panics() { + let from_block = BlockNumber::new_or_panic(0); + let mut aggregate_bloom_filter = AggregateBloom::new(from_block); + + let mut bloom = BloomFilter::new(); + bloom.set(&KEY); + + aggregate_bloom_filter + .add_bloom(&bloom, from_block) + .unwrap(); + + let skipped_block = from_block + 2; + aggregate_bloom_filter + .add_bloom(&bloom, skipped_block) + .unwrap(); + } } diff --git a/crates/storage/src/connection/event.rs b/crates/storage/src/connection/event.rs index 52fce6f21c..c779dac3c2 100644 --- a/crates/storage/src/connection/event.rs +++ b/crates/storage/src/connection/event.rs @@ -1,5 +1,6 @@ use std::num::NonZeroUsize; +use anyhow::Result; use pathfinder_common::event::Event; use pathfinder_common::{ BlockHash, @@ -10,7 +11,9 @@ use pathfinder_common::{ TransactionHash, }; -use crate::bloom::BloomFilter; +#[cfg(feature = "aggregate_bloom")] +use crate::bloom::AddBloomError; +use crate::bloom::{AggregateBloom, BloomFilter}; use crate::prelude::*; use crate::ReorgCounter; @@ -231,8 +234,8 @@ impl Transaction<'_> { } } - // Stop if we have a page of events plus an extra one to decide if we're on the - // last page. + // Stop if we have a page of events plus an extra one to decide if we're on + // the last page. if emitted_events.len() > filter.page_size { break ScanResult::PageFull; } @@ -246,6 +249,80 @@ impl Transaction<'_> { } }; + // TODO: + // The logic that constructs aggregate bloom filters is temporarily + // placed here, in order to compare with the current implementation. + // It will be moved to sync as a follow up. + #[cfg(feature = "aggregate_bloom")] + { + let mut aggregates = vec![]; + let mut running_aggregate = AggregateBloom::new(from_block); + + let mut blocks_from_individual = vec![]; + + for block_num in from_block.get()..=to_block.get() { + if block_num as usize >= max_blocks_to_scan.get() { + break; + } + + let block_num = BlockNumber::new_or_panic(block_num); + + // TODO: + // Using single block `BloomFilter` API for now since we don't have + // a table for `AggregateBloom` yet. + let bloom = self.load_bloom(reorg_counter, block_num)?; + match bloom { + Filter::Missing => {} + Filter::Cached(bloom) => { + if bloom.check_filter(filter) { + blocks_from_individual.push(block_num); + } + + match running_aggregate.add_bloom(&bloom, block_num) { + Ok(_) => {} + Err(AddBloomError::BlockLimitReached) => { + aggregates.push(running_aggregate); + running_aggregate = AggregateBloom::new(block_num + 1); + } + Err(AddBloomError::InvalidBlockNumber) => { + unreachable!() // For now. + } + } + } + Filter::Loaded(bloom) => { + if bloom.check_filter(filter) { + blocks_from_individual.push(block_num); + } + + match running_aggregate.add_bloom(&bloom, block_num) { + Ok(_) => {} + Err(AddBloomError::BlockLimitReached) => { + aggregates.push(running_aggregate); + running_aggregate = AggregateBloom::new(block_num + 1); + } + Err(AddBloomError::InvalidBlockNumber) => { + unreachable!() // For now. + } + } + } + } + } + + // Remainder of (to_block - from_block) % AggregateBloom::BLOCK_RANGE_LEN + aggregates.push(running_aggregate); + + let blocks_from_aggregate = aggregates.iter().fold(vec![], |mut acc, aggregate| { + acc.extend(aggregate.blocks_for_filter(filter)); + acc + }); + + if blocks_from_individual != blocks_from_aggregate { + tracing::error!("Blocks from individual and aggregate bloom filter do not match"); + tracing::error!("Individual: {:?}", blocks_from_individual,); + tracing::error!("Aggregate: {:?}", blocks_from_aggregate,); + } + } + match result { ScanResult::Done => { return Ok(PageOfEvents { @@ -389,6 +466,30 @@ impl Transaction<'_> { None => Filter::Missing, }) } + + // TODO: + // Implement once [`AggregateBloom`] table is added. + fn _running_bloom_aggregate(&self) -> Result, anyhow::Error> { + // Fetch running aggregate from DB + unimplemented!() + } + + fn _load_bloom_range( + &self, + _from_block: BlockNumber, + _to_block: BlockNumber, + ) -> anyhow::Result> { + // Should be something like: + // (from_block..to_block) + // .chunks(AggregateBloom::BLOCK_RANGE_LEN) + // .iter() + // .enumerate() + // .map(|(i, _)| { + // // load from DB where ID is i + // }) + // .collect() + unimplemented!() + } } fn continuation_token( @@ -1118,6 +1219,10 @@ mod tests { } #[test] + // TODO: + // This fails when "aggregate_bloom" feature is enabled because in that case all filters are + // loaded twice. We can ignore it for now. + #[cfg_attr(feature = "aggregate_bloom", ignore)] fn bloom_filter_load_limit() { let (storage, test_data) = test_utils::setup_test_storage(); let emitted_events = test_data.events;