Skip to content

Commit

Permalink
running aggregate stored in memory
Browse files Browse the repository at this point in the history
  • Loading branch information
sistemd committed Nov 12, 2024
1 parent 3e16b77 commit 1a3205d
Show file tree
Hide file tree
Showing 11 changed files with 535 additions and 209 deletions.
Binary file modified crates/rpc/fixtures/mainnet.sqlite
Binary file not shown.
10 changes: 5 additions & 5 deletions crates/rpc/src/method/subscribe_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ mod tests {

#[tokio::test]
async fn no_filtering() {
let num_blocks = 80;
let num_blocks = 2000;
let router = setup(num_blocks).await;
let (sender_tx, mut sender_rx) = mpsc::channel(1024);
let (receiver_tx, receiver_rx) = mpsc::channel(1024);
Expand Down Expand Up @@ -319,7 +319,7 @@ mod tests {

#[tokio::test]
async fn filter_from_address() {
let router = setup(80).await;
let router = setup(2000).await;
let (sender_tx, mut sender_rx) = mpsc::channel(1024);
let (receiver_tx, receiver_rx) = mpsc::channel(1024);
handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx);
Expand Down Expand Up @@ -385,7 +385,7 @@ mod tests {

#[tokio::test]
async fn filter_keys() {
let router = setup(80).await;
let router = setup(2000).await;
let (sender_tx, mut sender_rx) = mpsc::channel(1024);
let (receiver_tx, receiver_rx) = mpsc::channel(1024);
handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx);
Expand Down Expand Up @@ -451,7 +451,7 @@ mod tests {

#[tokio::test]
async fn filter_from_address_and_keys() {
let router = setup(80).await;
let router = setup(2000).await;
let (sender_tx, mut sender_rx) = mpsc::channel(1024);
let (receiver_tx, receiver_rx) = mpsc::channel(1024);
handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx);
Expand Down Expand Up @@ -518,7 +518,7 @@ mod tests {

#[tokio::test]
async fn too_many_keys_filter() {
let router = setup(80).await;
let router = setup(2000).await;
let (sender_tx, mut sender_rx) = mpsc::channel(1024);
let (receiver_tx, receiver_rx) = mpsc::channel(1024);
handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx);
Expand Down
14 changes: 14 additions & 0 deletions crates/rpc/src/v06/method/trace_block_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ pub(crate) mod tests {
block_hash,
felt,
BlockHeader,
BlockNumber,
Chain,
GasPrice,
SequencerAddress,
Expand Down Expand Up @@ -639,6 +640,19 @@ pub(crate) mod tests {
let context = RpcContext::for_tests_on(Chain::Mainnet);
let mut connection = context.storage.connection().unwrap();
let transaction = connection.transaction().unwrap();

// Need to avoid skipping blocks for `insert_transaction_data`.
(0..619596)
.collect::<Vec<_>>()
.chunks(pathfinder_storage::BLOCK_RANGE_LEN as usize)
.map(|range| *range.last().unwrap() as u64)
.for_each(|block| {
let block = BlockNumber::new_or_panic(block);
transaction
.insert_transaction_data(block, &[], None)
.unwrap();
});

let block: starknet_gateway_types::reply::Block =
serde_json::from_str(include_str!("../../../fixtures/mainnet-619596.json")).unwrap();
let transaction_count = block.transactions.len();
Expand Down
22 changes: 21 additions & 1 deletion crates/rpc/src/v06/method/trace_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,14 @@ pub async fn trace_transaction_impl(

#[cfg(test)]
pub mod tests {
use pathfinder_common::{block_hash, transaction_hash, BlockHeader, Chain, SequencerAddress};
use pathfinder_common::{
block_hash,
transaction_hash,
BlockHeader,
BlockNumber,
Chain,
SequencerAddress,
};
use pathfinder_crypto::Felt;

use super::super::trace_block_transactions::tests::{
Expand Down Expand Up @@ -305,6 +312,19 @@ pub mod tests {
let context = RpcContext::for_tests_on(Chain::Mainnet);
let mut connection = context.storage.connection().unwrap();
let transaction = connection.transaction().unwrap();

// Need to avoid skipping blocks for `insert_transaction_data`.
(0..619596)
.collect::<Vec<_>>()
.chunks(pathfinder_storage::BLOCK_RANGE_LEN as usize)
.map(|range| *range.last().unwrap() as u64)
.for_each(|block| {
let block = BlockNumber::new_or_panic(block);
transaction
.insert_transaction_data(block, &[], None)
.unwrap();
});

let block: starknet_gateway_types::reply::Block =
serde_json::from_str(include_str!("../../../fixtures/mainnet-619596.json")).unwrap();
let transaction_count = block.transactions.len();
Expand Down
103 changes: 32 additions & 71 deletions crates/storage/src/bloom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,20 @@ use cached::{Cached, SizedCache};
use pathfinder_common::{BlockNumber, ContractAddress, EventKey};
use pathfinder_crypto::Felt;

use crate::ReorgCounter;
use crate::{EventFilter, ReorgCounter};

// We're using the upper 4 bits of the 32 byte representation of a felt
// to store the index of the key in the values set in the Bloom filter.
// This allows for the maximum of 16 keys per event to be stored in the
// filter.
pub const EVENT_KEY_FILTER_LIMIT: usize = 16;

pub const BLOCK_RANGE_LEN: u64 = AggregateBloom::BLOCK_RANGE_LEN;

/// An aggregate of all Bloom filters for a given range of blocks.
/// Before being added to `AggregateBloom`, each [`BloomFilter`] is
/// rotated by 90 degrees (transposed).
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct AggregateBloom {
/// A [Self::BLOCK_RANGE_LEN] by [BloomFilter::BITVEC_LEN] matrix stored in
/// a single array.
Expand Down Expand Up @@ -109,13 +111,10 @@ impl AggregateBloom {
}

pub fn from_existing_compressed(
from_block: u64,
to_block: u64,
from_block: BlockNumber,
to_block: BlockNumber,
compressed_bitmap: Vec<u8>,
) -> Self {
let from_block = BlockNumber::new_or_panic(from_block);
let to_block = BlockNumber::new_or_panic(to_block);

let bitmap = zstd::bulk::decompress(
&compressed_bitmap,
AggregateBloom::BLOCK_RANGE_BYTES as usize * BloomFilter::BITVEC_LEN as usize,
Expand All @@ -126,15 +125,10 @@ impl AggregateBloom {
}

fn from_parts(from_block: BlockNumber, to_block: BlockNumber, bitmap: Vec<u8>) -> Self {
assert_eq!(
from_block + Self::BLOCK_RANGE_LEN - 1,
to_block,
"Block range mismatch"
);
assert_eq!(from_block + Self::BLOCK_RANGE_LEN - 1, to_block);
assert_eq!(
bitmap.len() as u64,
Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN,
"Bitmap length mismatch"
Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN
);

Self {
Expand All @@ -148,24 +142,21 @@ impl AggregateBloom {
zstd::bulk::compress(&self.bitmap, 10).expect("Compressing aggregate Bloom filter")
}

/// Rotate the bloom filter by 90 degrees and add it to the aggregate.
/// Rotate the [`BloomFilter`] by 90 degrees (transpose) and add it to the
/// aggregate. It is up to the user to keep track of when the aggregate
/// filter's block range has been exhausted and respond accordingly.
pub fn add_bloom(&mut self, bloom: &BloomFilter, block_number: BlockNumber) {
assert!(
(self.from_block..=self.to_block).contains(&block_number),
"Invalid block number",
);
assert_eq!(
bloom.0.number_of_hash_functions(),
BloomFilter::K_NUM,
"Hash function count mismatch"
"Block number {} is not in the range {}..={}",
block_number,
self.from_block,
self.to_block
);
assert_eq!(bloom.0.number_of_hash_functions(), BloomFilter::K_NUM);

let bloom = bloom.0.bit_vec().to_bytes();
assert_eq!(
bloom.len() as u64,
BloomFilter::BITVEC_BYTES,
"Bit vector length mismatch"
);
assert_eq!(bloom.len() as u64, BloomFilter::BITVEC_BYTES);

let relative_block_number = block_number.get() - self.from_block.get();
let byte_idx = (relative_block_number / 8) as usize;
Expand All @@ -183,7 +174,9 @@ impl AggregateBloom {
}
}

pub fn blocks_for_filter(&self, filter: &crate::EventFilter) -> BTreeSet<BlockNumber> {
/// Returns a set of [block numbers](BlockNumber) for which the given keys
/// are present in the aggregate.
pub fn blocks_for_filter(&self, filter: &EventFilter) -> BTreeSet<BlockNumber> {
// Empty filters are considered present in all blocks.
if filter.contract_address.is_none() && (filter.keys.iter().flatten().count() == 0) {
return (self.from_block.get()..=self.to_block.get())
Expand Down Expand Up @@ -449,7 +442,7 @@ mod tests {
}

#[test]
#[cfg_attr(not(feature = "aggregate_bloom"), ignore)]
#[cfg(feature = "aggregate_bloom")]
fn add_bloom_and_check_single_block_found() {
let from_block = BlockNumber::new_or_panic(0);
let mut aggregate_bloom_filter = AggregateBloom::new(from_block);
Expand All @@ -460,43 +453,14 @@ mod tests {

aggregate_bloom_filter.add_bloom(&bloom, from_block);

let filter = crate::EventFilter {
keys: vec![vec![EventKey(KEY)]],
contract_address: None,
..Default::default()
};

let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]);
assert_eq!(block_matches, vec![from_block]);

let block_matches: Vec<_> = aggregate_bloom_filter
.blocks_for_filter(&filter)
.into_iter()
.collect();
assert_eq!(block_matches, vec![from_block]);
}

#[test]
#[cfg_attr(not(feature = "aggregate_bloom"), ignore)]
fn aggregate_bloom_past_first_range() {
let from_block = BlockNumber::new_or_panic(AggregateBloom::BLOCK_RANGE_LEN);
let mut aggregate_bloom_filter = AggregateBloom::new(from_block);

let mut bloom = BloomFilter::new();
bloom.set(&KEY);
bloom.set(&KEY1);

let filter = crate::EventFilter {
let filter = EventFilter {
keys: vec![vec![EventKey(KEY)]],
contract_address: None,
..Default::default()
};

aggregate_bloom_filter.add_bloom(&bloom, from_block);

let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]);
assert_eq!(block_matches, vec![from_block]);

let block_matches: Vec<_> = aggregate_bloom_filter
.blocks_for_filter(&filter)
.into_iter()
Expand All @@ -505,7 +469,7 @@ mod tests {
}

#[test]
#[cfg_attr(not(feature = "aggregate_bloom"), ignore)]
#[cfg(feature = "aggregate_bloom")]
fn add_blooms_and_check_multiple_blocks_found() {
let from_block = BlockNumber::new_or_panic(0);
let mut aggregate_bloom_filter = AggregateBloom::new(from_block);
Expand All @@ -516,15 +480,14 @@ mod tests {
aggregate_bloom_filter.add_bloom(&bloom, from_block);
aggregate_bloom_filter.add_bloom(&bloom, from_block + 1);

let filter = crate::EventFilter {
let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]);
assert_eq!(block_matches, vec![from_block, from_block + 1]);

let filter = EventFilter {
keys: vec![vec![EventKey(KEY)]],
contract_address: None,
..Default::default()
};

let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]);
assert_eq!(block_matches, vec![from_block, from_block + 1]);

let block_matches: Vec<_> = aggregate_bloom_filter
.blocks_for_filter(&filter)
.into_iter()
Expand All @@ -533,7 +496,7 @@ mod tests {
}

#[test]
#[cfg_attr(not(feature = "aggregate_bloom"), ignore)]
#[cfg(feature = "aggregate_bloom")]
fn key_not_in_filter_returns_empty_vec() {
let from_block = BlockNumber::new_or_panic(0);
let mut aggregate_bloom_filter = AggregateBloom::new(from_block);
Expand All @@ -546,12 +509,11 @@ mod tests {
aggregate_bloom_filter.add_bloom(&bloom, from_block + 1);

let block_matches_empty = aggregate_bloom_filter.blocks_for_keys(&[KEY_NOT_IN_FILTER]);

assert_eq!(block_matches_empty, Vec::<BlockNumber>::new());
}

#[test]
#[cfg_attr(not(feature = "aggregate_bloom"), ignore)]
#[cfg(feature = "aggregate_bloom")]
fn serialize_aggregate_roundtrip() {
let from_block = BlockNumber::new_or_panic(0);
let mut aggregate_bloom_filter = AggregateBloom::new(from_block);
Expand All @@ -564,15 +526,14 @@ mod tests {

let compressed_bitmap = aggregate_bloom_filter.compress_bitmap();
let mut decompressed = AggregateBloom::from_existing_compressed(
aggregate_bloom_filter.from_block.get(),
aggregate_bloom_filter.to_block.get(),
aggregate_bloom_filter.from_block,
aggregate_bloom_filter.to_block,
compressed_bitmap,
);
decompressed.add_bloom(&bloom, from_block + 2);

let block_matches = decompressed.blocks_for_keys(&[KEY]);
let block_matches_empty = decompressed.blocks_for_keys(&[KEY_NOT_IN_FILTER]);

assert_eq!(
block_matches,
vec![from_block, from_block + 1, from_block + 2]
Expand All @@ -581,7 +542,7 @@ mod tests {
}

#[test]
#[cfg_attr(not(feature = "aggregate_bloom"), ignore)]
#[cfg(feature = "aggregate_bloom")]
#[should_panic]
fn invalid_insert_pos() {
let from_block = BlockNumber::new_or_panic(0);
Expand Down
Loading

0 comments on commit 1a3205d

Please sign in to comment.