Skip to content

Commit

Permalink
feat: store event bloom filter in DB
Browse files Browse the repository at this point in the history
  • Loading branch information
jbcaron committed Jan 23, 2025
1 parent 6bd2e3b commit ca2b119
Show file tree
Hide file tree
Showing 31 changed files with 2,087 additions and 133 deletions.
856 changes: 750 additions & 106 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ members = [
"crates/madara/client/block_import",
"crates/madara/node",
"crates/madara/primitives/block",
"crates/madara/primitives/bloom_filter",
"crates/madara/primitives/convert",
"crates/madara/primitives/transactions",
"crates/madara/primitives/class",
Expand Down Expand Up @@ -46,6 +47,7 @@ default-members = [
"crates/madara/client/analytics",
"crates/madara/node",
"crates/madara/primitives/block",
"crates/madara/primitives/bloom_filter",
"crates/madara/primitives/convert",
"crates/madara/primitives/transactions",
"crates/madara/primitives/class",
Expand Down Expand Up @@ -110,6 +112,7 @@ m-proc-macros = { path = "crates/madara/proc-macros", default-features = false }

# Madara primtitives
mp-block = { path = "crates/madara/primitives/block", default-features = false }
mp-bloom-filter = { path = "crates/madara/primitives/bloom_filter", default-features = false }
mp-convert = { path = "crates/madara/primitives/convert", default-features = false }
mp-transactions = { path = "crates/madara/primitives/transactions", default-features = false }
mp-class = { path = "crates/madara/primitives/class", default-features = false }
Expand Down Expand Up @@ -167,6 +170,7 @@ alloy = { version = "0.4.0", features = [
# Other third party dependencies
paste = "1.0.15"
anyhow = "1.0"
ahash = "0.8"
bigdecimal = "0.4.5"
assert_matches = "1.5"
async-trait = "0.1"
Expand Down
1 change: 1 addition & 0 deletions crates/madara/client/block_import/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ tokio.workspace = true
mc-analytics.workspace = true
mc-db.workspace = true
mp-block.workspace = true
mp-bloom-filter.workspace = true
mp-chain-config.workspace = true
mp-class.workspace = true
mp-convert.workspace = true
Expand Down
23 changes: 18 additions & 5 deletions crates/madara/client/block_import/src/verify_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use mp_block::{
header::PendingHeader, BlockId, Header, MadaraBlockInfo, MadaraBlockInner, MadaraMaybePendingBlock,
MadaraMaybePendingBlockInfo, MadaraPendingBlockInfo,
};
use mp_bloom_filter::EventBloomWriter;
use mp_convert::{FeltHexDisplay, ToFelt};
use starknet_api::core::ChainId;
use starknet_types_core::felt::Felt;
Expand Down Expand Up @@ -82,6 +83,16 @@ pub fn verify_apply_inner(
// Update contract and its storage tries
let global_state_root = update_tries(backend, &block, &validation, block_number)?;

// Event bloom filter
let event_bloom = {
let mut events_iter = block.receipts.iter().flat_map(|tx| tx.events().iter()).peekable();
if events_iter.peek().is_none() {
None
} else {
Some(EventBloomWriter::from_events(events_iter))
}
};

// Block hash
let (block_hash, header) = block_hash(&block, &validation, block_number, parent_block_hash, global_state_root)?;

Expand All @@ -101,6 +112,7 @@ pub fn verify_apply_inner(
},
block.state_diff,
block.converted_classes,
event_bloom,
block.visited_segments,
None,
)
Expand Down Expand Up @@ -146,6 +158,7 @@ pub fn verify_apply_pending_inner(
},
block.state_diff,
block.converted_classes,
None,
block.visited_segments,
None,
)
Expand Down Expand Up @@ -411,7 +424,7 @@ mod verify_apply_tests {
if populate_db {
let header = create_dummy_header();
let pending_block = finalized_block_zero(header);
backend.store_block(pending_block.clone(), finalized_state_diff_zero(), vec![], None, None).unwrap();
backend.store_block(pending_block.clone(), finalized_state_diff_zero(), vec![], None, None, None).unwrap();
}

// Create a validation context with the specified ignore_block_order flag
Expand Down Expand Up @@ -665,7 +678,7 @@ mod verify_apply_tests {
let mut header = create_dummy_header();
header.block_number = 0;
let pending_block = finalized_block_zero(header);
backend.store_block(pending_block.clone(), finalized_state_diff_zero(), vec![], None, None).unwrap();
backend.store_block(pending_block.clone(), finalized_state_diff_zero(), vec![], None, None, None).unwrap();

assert_eq!(backend.get_latest_block_n().unwrap(), Some(0));

Expand All @@ -691,7 +704,7 @@ mod verify_apply_tests {
let mut header = create_dummy_header();
header.block_number = 0;
let pending_block = finalized_block_zero(header);
backend.store_block(pending_block.clone(), finalized_state_diff_zero(), vec![], None, None).unwrap();
backend.store_block(pending_block.clone(), finalized_state_diff_zero(), vec![], None, None, None).unwrap();

assert_eq!(backend.get_latest_block_n().unwrap(), Some(0));

Expand Down Expand Up @@ -727,7 +740,7 @@ mod verify_apply_tests {
let mut genesis_header = create_dummy_header();
genesis_header.block_number = 0;
let genesis_block = finalized_block_zero(genesis_header.clone());
backend.store_block(genesis_block, finalized_state_diff_zero(), vec![], None, None).unwrap();
backend.store_block(genesis_block, finalized_state_diff_zero(), vec![], None, None, None).unwrap();

assert_eq!(backend.get_latest_block_n().unwrap(), Some(0));

Expand Down Expand Up @@ -773,7 +786,7 @@ mod verify_apply_tests {
let mut genesis_header = create_dummy_header();
genesis_header.block_number = 0;
let genesis_block = finalized_block_zero(genesis_header.clone());
backend.store_block(genesis_block, finalized_state_diff_zero(), vec![], None, None).unwrap();
backend.store_block(genesis_block, finalized_state_diff_zero(), vec![], None, None, None).unwrap();

assert_eq!(backend.get_latest_block_n().unwrap(), Some(0));

Expand Down
1 change: 1 addition & 0 deletions crates/madara/client/block_production/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ impl<Mempool: MempoolProvider> BlockProductionTask<Mempool> {
self.block.clone().into(),
new_state_diff,
self.declared_classes.clone(),
None,
Some(visited_segments),
Some(bouncer_weights),
)?;
Expand Down
1 change: 1 addition & 0 deletions crates/madara/client/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ targets = ["x86_64-unknown-linux-gnu"]
# Madara
mc-analytics = { workspace = true }
mp-block = { workspace = true }
mp-bloom-filter = { workspace = true }
mp-chain-config = { workspace = true }
mp-class = { workspace = true }
mp-receipt = { workspace = true }
Expand Down
33 changes: 31 additions & 2 deletions crates/madara/client/db/src/block_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use mp_block::{
BlockId, BlockTag, MadaraBlock, MadaraBlockInfo, MadaraBlockInner, MadaraMaybePendingBlock,
MadaraMaybePendingBlockInfo, MadaraPendingBlock, MadaraPendingBlockInfo, VisitedSegments,
};
use mp_bloom_filter::{EventBloomReader, EventBloomWriter};
use mp_state_update::StateDiff;
use rocksdb::WriteOptions;
use rocksdb::{Direction, IteratorMode, WriteOptions};
use starknet_api::core::ChainId;
use starknet_types_core::felt::Felt;
use starknet_types_rpc::EmittedEvent;
Expand Down Expand Up @@ -285,14 +286,20 @@ impl MadaraBackend {

/// Also clears pending block
#[tracing::instrument(skip(self), fields(module = "BlockDB"))]
pub(crate) fn block_db_store_block(&self, block: &MadaraBlock, state_diff: &StateDiff) -> Result<()> {
pub(crate) fn block_db_store_block(
&self,
block: &MadaraBlock,
state_diff: &StateDiff,
events_bloom: Option<EventBloomWriter>,
) -> Result<()> {
let mut tx = WriteBatchWithTransaction::default();

let tx_hash_to_block_n = self.db.get_column(Column::TxHashToBlockN);
let block_hash_to_block_n = self.db.get_column(Column::BlockHashToBlockN);
let block_n_to_block = self.db.get_column(Column::BlockNToBlockInfo);
let block_n_to_block_inner = self.db.get_column(Column::BlockNToBlockInner);
let block_n_to_state_diff = self.db.get_column(Column::BlockNToStateDiff);
let event_bloom = self.db.get_column(Column::EventBloom);
let meta = self.db.get_column(Column::BlockStorageMeta);

let block_hash_encoded = bincode::serialize(&block.info.block_hash)?;
Expand All @@ -306,6 +313,9 @@ impl MadaraBackend {
tx.put_cf(&block_hash_to_block_n, block_hash_encoded, &block_n_encoded);
tx.put_cf(&block_n_to_block_inner, &block_n_encoded, bincode::serialize(&block.inner)?);
tx.put_cf(&block_n_to_state_diff, &block_n_encoded, bincode::serialize(state_diff)?);
if let Some(events_bloom) = events_bloom {
tx.put_cf(&event_bloom, &block_n_encoded, bincode::serialize(&events_bloom)?);
}
tx.put_cf(&meta, ROW_SYNC_TIP, block_n_encoded);

// susbcribers
Expand Down Expand Up @@ -480,4 +490,23 @@ impl MadaraBackend {
}
}
}

#[tracing::instrument(skip(self), fields(module = "BlockDB"))]
pub fn get_event_filter_stream(
&self,
block_n: u64,
) -> Result<impl Iterator<Item = Result<(u64, EventBloomReader)>> + '_> {
let col = self.db.get_column(Column::EventBloom);
let key = bincode::serialize(&block_n)?;
let iter_mode = IteratorMode::From(&key, Direction::Forward);
let iter = self.db.iterator_cf(&col, iter_mode);

Ok(iter.map(|kvs| {
kvs.map_err(MadaraStorageError::from).and_then(|(key, value)| {
let stored_block_n: u64 = bincode::deserialize(&key).map_err(MadaraStorageError::from)?;
let bloom = bincode::deserialize(&value).map_err(MadaraStorageError::from)?;
Ok((stored_block_n, bloom))
})
}))
}
}
10 changes: 10 additions & 0 deletions crates/madara/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ pub enum Column {
BlockHashToBlockN,
/// One To One
BlockNToStateDiff,
/// block_n => bloom filter for events
EventBloom,
/// Meta column for block storage (sync tip, pending block)
BlockStorageMeta,

Expand Down Expand Up @@ -178,6 +180,7 @@ impl Column {
BlockHashToBlockN,
BlockStorageMeta,
BlockNToStateDiff,
EventBloom,
ClassInfo,
ClassCompiled,
PendingClassInfo,
Expand Down Expand Up @@ -214,6 +217,7 @@ impl Column {
BlockHashToBlockN => "block_hash_to_block_n",
BlockStorageMeta => "block_storage_meta",
BlockNToStateDiff => "block_n_to_state_diff",
EventBloom => "event_bloom",
BonsaiContractsTrie => "bonsai_contracts_trie",
BonsaiContractsFlat => "bonsai_contracts_flat",
BonsaiContractsLog => "bonsai_contracts_log",
Expand Down Expand Up @@ -241,6 +245,12 @@ impl Column {
}
}

#[cfg(test)]
#[test]
fn test_column_all() {
assert_eq!(Column::ALL.len(), Column::NUM_COLUMNS);
}

pub trait DatabaseExt {
fn get_column(&self, col: Column) -> Arc<BoundColumnFamily<'_>>;
}
Expand Down
4 changes: 3 additions & 1 deletion crates/madara/client/db/src/storage_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::MadaraStorageError;
use blockifier::bouncer::BouncerWeights;
use mp_block::VisitedSegments;
use mp_block::{MadaraBlock, MadaraMaybePendingBlock, MadaraMaybePendingBlockInfo, MadaraPendingBlock};
use mp_bloom_filter::EventBloomWriter;
use mp_class::ConvertedClass;
use mp_state_update::{
ContractStorageDiffItem, DeployedContractItem, NonceUpdate, ReplacedClassItem, StateDiff, StorageEntry,
Expand All @@ -18,6 +19,7 @@ impl MadaraBackend {
block: MadaraMaybePendingBlock,
state_diff: StateDiff,
converted_classes: Vec<ConvertedClass>,
events_bloom: Option<EventBloomWriter>,
visited_segments: Option<VisitedSegments>,
bouncer_weights: Option<BouncerWeights>,
) -> Result<(), MadaraStorageError> {
Expand All @@ -35,7 +37,7 @@ impl MadaraBackend {
bouncer_weights,
),
MadaraMaybePendingBlockInfo::NotPending(info) => {
self.block_db_store_block(&MadaraBlock { info, inner: block.inner }, &state_diff_cpy)
self.block_db_store_block(&MadaraBlock { info, inner: block.inner }, &state_diff_cpy, events_bloom)
}
};

Expand Down
26 changes: 13 additions & 13 deletions crates/madara/client/db/src/tests/test_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ mod block_tests {
let block_hash = block.info.block_hash().unwrap();
let state_diff = finalized_state_diff_zero();

backend.store_block(block.clone(), state_diff.clone(), vec![], None, None).unwrap();
backend.store_block(pending_block_one(), pending_state_diff_one(), vec![], None, None).unwrap();
backend.store_block(block.clone(), state_diff.clone(), vec![], None, None, None).unwrap();
backend.store_block(pending_block_one(), pending_state_diff_one(), vec![], None, None, None).unwrap();

assert_eq!(backend.resolve_block_id(&BlockId::Hash(block_hash)).unwrap().unwrap(), DbBlockId::Number(0));
assert_eq!(backend.resolve_block_id(&BlockId::Number(0)).unwrap().unwrap(), DbBlockId::Number(0));
Expand All @@ -52,7 +52,7 @@ mod block_tests {
let block = finalized_block_zero(Header::default());
let state_diff = finalized_state_diff_zero();

backend.store_block(block.clone(), state_diff.clone(), vec![], None, None).unwrap();
backend.store_block(block.clone(), state_diff.clone(), vec![], None, None, None).unwrap();

assert_eq!(backend.get_block_hash(&BLOCK_ID_0).unwrap().unwrap(), block.info.block_hash().unwrap());
assert_eq!(BLOCK_ID_0.resolve_db_block_id(backend).unwrap().unwrap(), BLOCK_ID_0);
Expand All @@ -75,7 +75,7 @@ mod block_tests {
let block = pending_block_one();
let state_diff = pending_state_diff_one();

backend.store_block(block.clone(), state_diff.clone(), vec![], None, None).unwrap();
backend.store_block(block.clone(), state_diff.clone(), vec![], None, None, None).unwrap();

assert!(backend.get_block_hash(&BLOCK_ID_PENDING).unwrap().is_none());
assert_eq!(backend.get_block_info(&BLOCK_ID_PENDING).unwrap().unwrap(), block.info);
Expand All @@ -92,9 +92,9 @@ mod block_tests {
let backend = db.backend();

backend
.store_block(finalized_block_zero(Header::default()), finalized_state_diff_zero(), vec![], None, None)
.store_block(finalized_block_zero(Header::default()), finalized_state_diff_zero(), vec![], None, None, None)
.unwrap();
backend.store_block(pending_block_one(), pending_state_diff_one(), vec![], None, None).unwrap();
backend.store_block(pending_block_one(), pending_state_diff_one(), vec![], None, None, None).unwrap();
backend.clear_pending_block().unwrap();

assert!(backend.get_block(&BLOCK_ID_PENDING).unwrap().unwrap().inner.transactions.is_empty());
Expand All @@ -104,11 +104,11 @@ mod block_tests {
"fake pending block parent hash must match with latest block in db"
);

backend.store_block(finalized_block_one(), finalized_state_diff_one(), vec![], None, None).unwrap();
backend.store_block(finalized_block_one(), finalized_state_diff_one(), vec![], None, None, None).unwrap();

let block_pending = pending_block_two();
let state_diff = pending_state_diff_two();
backend.store_block(block_pending.clone(), state_diff.clone(), vec![], None, None).unwrap();
backend.store_block(block_pending.clone(), state_diff.clone(), vec![], None, None, None).unwrap();

assert!(backend.get_block_hash(&BLOCK_ID_PENDING).unwrap().is_none());
assert_eq!(backend.get_block_info(&BLOCK_ID_PENDING).unwrap().unwrap(), block_pending.info);
Expand All @@ -123,11 +123,11 @@ mod block_tests {
let backend = db.backend();

backend
.store_block(finalized_block_zero(Header::default()), finalized_state_diff_zero(), vec![], None, None)
.store_block(finalized_block_zero(Header::default()), finalized_state_diff_zero(), vec![], None, None, None)
.unwrap();

let latest_block = finalized_block_one();
backend.store_block(latest_block.clone(), finalized_state_diff_one(), vec![], None, None).unwrap();
backend.store_block(latest_block.clone(), finalized_state_diff_one(), vec![], None, None, None).unwrap();

assert_eq!(backend.get_latest_block_n().unwrap().unwrap(), 1);
}
Expand All @@ -152,7 +152,7 @@ mod block_tests {
let block = finalized_block_zero(Header::default());
let state_diff = finalized_state_diff_zero();

backend.store_block(block.clone(), state_diff.clone(), vec![], None, None).unwrap();
backend.store_block(block.clone(), state_diff.clone(), vec![], None, None, None).unwrap();

let tx_hash_1 = block.info.tx_hashes()[1];
assert_eq!(backend.find_tx_hash_block_info(&tx_hash_1).unwrap().unwrap(), (block.info.clone(), TxIndex(1)));
Expand All @@ -165,11 +165,11 @@ mod block_tests {
let backend = db.backend();

backend
.store_block(finalized_block_zero(Header::default()), finalized_state_diff_zero(), vec![], None, None)
.store_block(finalized_block_zero(Header::default()), finalized_state_diff_zero(), vec![], None, None, None)
.unwrap();

let block_pending = pending_block_one();
backend.store_block(block_pending.clone(), pending_state_diff_one(), vec![], None, None).unwrap();
backend.store_block(block_pending.clone(), pending_state_diff_one(), vec![], None, None, None).unwrap();

let tx_hash_1 = block_pending.info.tx_hashes()[1];
assert_eq!(
Expand Down
2 changes: 2 additions & 0 deletions crates/madara/client/mempool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1681,6 +1681,7 @@ mod test {
vec![],
None,
None,
None,
)
.expect("Failed to store block");

Expand Down Expand Up @@ -1722,6 +1723,7 @@ mod test {
vec![],
None,
None,
None,
)
.expect("Failed to store block");

Expand Down
1 change: 1 addition & 0 deletions crates/madara/client/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ mc-exec = { workspace = true }
mc-gateway-client = { workspace = true }
mc-mempool = { workspace = true }
mp-block = { workspace = true, default-features = true }
mp-bloom-filter = { workspace = true }
mp-chain-config = { workspace = true }
mp-class = { workspace = true }
mp-convert = { workspace = true, default-features = true }
Expand Down
Loading

0 comments on commit ca2b119

Please sign in to comment.