From b41fc3fc78feb861bff0f8e7866e1de5b8266140 Mon Sep 17 00:00:00 2001 From: sistemd Date: Tue, 5 Nov 2024 14:32:37 +0100 Subject: [PATCH] aggregate filter creation and storage --- crates/rpc/Cargo.toml | 5 + crates/rpc/fixtures/mainnet.sqlite | Bin 454656 -> 454656 bytes crates/rpc/src/method/get_events.rs | 45 ++ crates/rpc/src/method/subscribe_events.rs | 76 +-- crates/storage/src/bloom.rs | 318 ++++++------ crates/storage/src/connection/event.rs | 510 +++++++++++++++---- crates/storage/src/connection/transaction.rs | 12 + crates/storage/src/schema.rs | 4 + crates/storage/src/schema/revision_0066.rs | 77 +++ 9 files changed, 761 insertions(+), 286 deletions(-) create mode 100644 crates/storage/src/schema/revision_0066.rs diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index ad7698ada5..c52d643acf 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -7,6 +7,11 @@ license = { workspace = true } rust-version = { workspace = true } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +aggregate_bloom = [] + +default = [] + [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } diff --git a/crates/rpc/fixtures/mainnet.sqlite b/crates/rpc/fixtures/mainnet.sqlite index f86d30b16ead2d8fd10a17413238fc3772b5e9d1..88b5ca98c3427f34d954d0e5d8cd88fee940f60c 100644 GIT binary patch delta 459 zcmZp8Al>jldV;iS1Oo#@J`e{0u?!G*05J;?!vG_Y;WSxLL7p+9F`+equ{D9IHGz3+ z0?U&8#Vi2|+gT>CFefldPv`i+qRyHPRGiap@_}W$$p_Z6`UXI*D5E$7^8}_(eA9Vn z@rv@qazEvI%)Njsh~qLxIFkjtFxw?I6V@dxhgb|5#W|-7CNMn6%~rbYz~E(Iiz zR+OI`pOllIoIO1unNfWD1syg{_I#*FTr#61NQ8qGs3Jdi`bRZ33AIo^&%jVu4TMG= zg%YR+O}NpT+aD{i{bby3tju+WanJ2RT5Ri%0szj4 Bkh1^) delta 7189 zcma)>cTiJZ*SAj~Kthw=OArD<=^X*-ogf`)QdD{eK|l?~&^v+9rFT$7nu1cKH|Zcv zIw~C%5z#N;eLt_yeD5D`X7=PvCMPpFbM3X(_1nQ8n1eqsCo;ng0RZs8Ej`>~!>tJ1 z0^`=dFA(lRF@+Mw4l)d}LEC`PHh5?o{CXR3kq0>efCNa_pl=C#@KpgM?n^SH=zF)I zIv^di9hww^^%%p4ff%s1Kfo{)qF(UK<^&)k0qCssVS30pQ5H9=BPuCTxOf1V&@YM5 zWF4DIOL?%MKR7tZqys1c>cGoS14A`E`qZqJYP*_A4Y|JRx{pO3Ci$)W5UAu7-SfWH zv0_wsQ#nkFyqVL>-K)Q|osZqCSD2WP6ks62ivvROz};>9yX@SVg(%2pQZRRvmkjRb~kTX`Z@+uELXg{c-T--Jrtmul)tR!REdw`)Mcdfov=%xJ5bJ|Xm%W< zZPU4fySB7Q%E*``5(C5l37~fLp#8YJXKL2@=$cE8pRN&CnAY{MgM;Pwo4$8U@3RpS zTxmXrjBrvCa2fv^@@C3%5&`eaq0-^{eQ}achYD>k-TV5>EZf>=_-TIZS>D0atfr%S z{`}0P8Ag=r!&>5F(xt^-eEsHY{V(3?X_It33&*_Js@IGLj|u_htPEA8B`w4B{$A;VC7u;Vxq+`(tae1kZGJVlFUMR4@=4V$@e#Z>D&K zc5zCRkT(L(o2eJ(=)n+%NI{UTJcA*XBWS2$LZR5y60k4{%IX}>2zxXO1gqbHgOmO* zIQKxTmjH<>PY%5sV}#=(_9qxO*Qt zdCf&~#m(}=;)8a*Z0$9AEz1C3ZtMcoch zgRlEbvabBEP5dM+uDsrtKNAd?eEIr84tl|<5gQO;)*lcwEo|z>7(qGiHt#q4VJuIe zOUIaP;YvTF=+Gy$X|vT+QCd_*#TUcsm-t|5G2DYlB2Z-6qLhYq;}ETf&zkY zSxU~ke1Za1^OHoM?3HJg9xtrDpVE(<5+bxH4)@-%eIB8$+pmnn2viiY3<6`JOj@C5L4)U6fr4|2PE8K^o@KA+b+a#lPJB}u} zbJ~~!%PFa!yNWWWIeq(_vq@Dodg)-7MAeC4Pyj^c4U4u*E=@DIRzL%XbB*%XRyV5# zl$CC^_Gu?LE}+s+MYEPgzA|98??*F52MV>*Tz>Ei9Cyqu7$?*%%9yZUra?rCPZ$=g z016S<;mvYY;lDdyTd3_r>ccmfpN!we24Ije+PsRH#59gfZ&@R7g$=h+sy&HbdJ;Ke zxUso?U>;I*pyfq9_eFytSWk8^3`FsMTpb?ueXzyH|w*pUG)w&2nK+=BLO*5oMSzzaP> zv||5QC}ZCBRcGFj=~D`0Is!!l{Co$lP3AtlPTpt(A(#RMphpK6SK&gbUXCSH_GV#% zjzzE!-4Fa=E5K~Lf?15)boajZ=H9jbd9R9(EI%5g*9$Z6LV7ucDAs5xl++t*kHQRc z_a5j*o=iBmR*N6zrch;(K79B&r`~UNH?<@t$hdmC`Bm|Z2@OG|B(w3{V8inhCb!Xy zW)i{4w7P3C&9gpxvLnM%sOIMS_b;Vm+6x{WYA7z>pe}}0@b%*<ntg{Zof) zd1+NIJrscX;af1+GwApzxc$VEWv|2s+3v@fAo-yV7vMrbsZbO^NGs2w5M@FnQ*ecn z>($|BC6|IVlfh8Uwt306tDa6sI(2We{iiVN*x{OUvQ@@$rhuGXY*pP40f_}cvpQ#Ba2Mfx2k$I*WPKb z*IpjQ0S=4OT+0}xSZ{a~{xHfAmbsj$%*XqU8@XQTv2CnXo>qNzax5)Q+DY5Ii^3=E zyYr{dLD*xSS=rom);+|f@%v&3%Aw8&)QxX^snwt<$fuFKyKW}ko~_fyj|`xwR;`Vq zG=3`vUO|I1E6+ehTedg{r-+Xm(p(@#uO%tjA%qwn7}yk+<(Lskqo@Bd6)t)~S0$*6 z6zEAn-P0xG=XipaWuo8c4D3 zMK}9{I60~wiPO$+o<{6XFYl5oT_;cfQt+iNiTcz5!`b0-mHi+|g4{L_tf|w!}~+{ z)~|Sxtaj;0`&VsDYV*8&j_AX_O@GeUQ`SS(Wl}Ig-q5P@VLlLx$1Ow-HFI^F0=i zmu5~ajQ-@BQYu!FuC`=7XS5J|sWl^Qs@f9ktpUc`n))eDp3bhr1OIWssG1V<#)<8^ zjJoIcH)dpWyI6PsgD{x-c%bH}RK5Y3TO?TUaOX_NQ}}kmb*EMXRFZSWKqHv{@k{1S zL8+EJp@iPl9#h+*0mdAW`K|W^ zmE1g|FbTu_J~pn#8XAS_!Uhw=+xrYSm)Z!lBi}WrOcW#^@Xn6k zUX~Nc+KXSIy0$?jl6FE!+AEO3jG})btfa_q+bGF5u@G-sZGPv2$J0X2ciypW(#27(CUAb<#V=JWK~}%sYF16JZe$pj6HDe zW|#ex4~hy_!2@g*K4i$imFV2&<4P^HY8sCbTxl6%Ay+MdlOusf0b^C)hLvd zU$ZGIlpG>GZF0}N*J{6?9&0)dzD`AkGeZIpKR`xIOhZHu<^=KAQNox(q-dYpP#vfa z+8#{^!74Blz_7NNU^wa&U?l-ISpLiXDD@Ak{wZmsBO}sSJT@~5{2?!fgqsOFR0(E8ar3Y6P+7)Zr6g%m zxTN#(pxC~DaIl}~d?JtnZd^NVNR={l+3_{mlh_;|D0r|{iSN7F~jFQiu1 zd|^T19;icwD=5<*k^+PlfT!uG2&2*lqS!Sv&4Dp;QhU<7Eu`N-UI9|{rt9>vC&SME zm-oieOh>6SoP9dFCI(Jv~{$|{R_}$kL(FT;4yv?Qa zca(P?gS66y z=^3-9!ZJI;%x-^3r(>7$Dta67DN*B^Yn<6)VA7g{ub8umHAEJW<% z9*t$)tO44eRMX(mVwFh(y4AAcczA_nPNv5}*e{Hy;z)UGn$LZkaGSK`^$h%&Z|T=o zNODE)jWT{zoL3EF+D5Emq@Ny(Gg%l5LkolJfxxT=-q&DH_c1}bBi6}XoyA@wJY!e1 z!u43vR9GQ33B@q-N+HISALUsd@$a+>LiX30`mDM=xuri(Par%^<*}R2Lt)QhDZ{0q zJW~L}2!UwMTfUHNBh81=yx=rh5A5JuLGfY{*N?-;lqmpS{cthstQ3b!eA1SV^cTPT zO-DmwTh;D_=s?>G`|?BMLDJ5{H0Hm?0uhiEE=;rjOTH3gy(N{H!u8#AgZZ5pXWuUW zkuMEE893b>ov!o`+)=r~KxFn)f5P-$lT$&;SrfW1z3!Bkk@A*EOKD$F?~`{jtXn-C zN9eKw1Y{*{RVs!nN9}oABC&!ni~&IbK}4^RQEH-+ATfl>$vFAySQ?2bO-K+pL0BJ; zmyM^-+g|Uv=Xhnr{8iPpBqZasK;2=iIQA2hu2^+n^FU@`j+q%I@S`U+w9o8Z zeG*^MBdsf%acpU$r!l5Tr_0F;KvYjF5=<+bXc~iP$ESqqgopv6=b(nj1w%6XbsgF6 zPpGqPUfq}U4R0Kul*2iQ|FN$)6ciwwynd0ZUBqg0b_3*ZCqj$G;-2Bz=Jh=L9xx)^ zH#qnS|Gd~r{8Ma8K38nGIi2fQU+{%~>EQIMwc|(#q0rZ=exEH00gF0x=qsP8I4`#2 z2*gQ%&wE*>Z+gXxGOwGF707Dq|{7?zC z6d%yya+Pjdel}r19wV}?9cHXd`!qBMah|9Z$SMmjX@1sSdzeXsy;b`5V8Nw`fB{!( zRdM!p<4>f90MkUDM`|j4wZ@`Kisk$D{29fX6mOKaK#7F<-w1Ad+zjPc6^g;r$6Ux4 z8QRC@g3mj+R*#R(M}pZwOn(u23eb`O_6zzjg&X6bC9fAoW*;cUmAIrWaUGlc@KMUb z9eRlXv+(6bB%lbW0pU~Wnd$duv(l%}vVWXLROU{<@A;V4^5b<+xMN98?tL02B5@6A zsXSkX9>lg-Jl=s#nz$E3X`9UlAP-P%qg1d-)D!Y#a!@rAL-kyS@~7TB4HE6FQsrH{ znPx!e!$9Kju<`5u7uF!gS$QkQp=zsi5Cc(nip6{#y}YGx5$$tP|+gHaK$ z{o(!ZYNjNupCV8x9YY@OGVUh*M)WT%P6UaNXYdnR^abIS#IFKKmc|50xhwFVy~tY$B%Owze0+bMiPgOgUy1{Xgzjn(`#Oes$Jrb(z|}t}}dpM`j&w zl&fFye<;$A@S-}JyLC>HY4sgPQ|&p<}9f z%s5uSVDdND$>iSjuOt-ZV+JQ1y)fmF`(jW#nipabwR!KPoRb_AJN9ib=H$d*64SLv z%F!_7J|!6eu31Yp@~`4E+XQ$;Sh|ui{R2=nu20j*U$)WA#t)lOxjeVThW0U*D9>0e{l|{kz_d3+pUxZn&bN z)~;N~c(3wuFXne)cDH>dDJTM?tQEglB@q`cXEYh4;q^#Y=8Xp9sp6N3X1K3KKlv%> zG_?Gx$#U*;j=B>IuW=6|lS^}4=$NoF%RJW!2sGmp1*C&yvElJ8`MfQ@su4 zerXg<0J|Yk>8-1{!e&uM@ICKu`qch&Sb)ow>dlsi8-9+hk=XHvgJ8LMXUBCYI>nKq6iu~Y%vz0p2iG6D}A-3u8T)DaSiKtXZ3NFv|54a`n$R1 z diff --git a/crates/rpc/src/method/get_events.rs b/crates/rpc/src/method/get_events.rs index ab59924e00..c52c0c8130 100644 --- a/crates/rpc/src/method/get_events.rs +++ b/crates/rpc/src/method/get_events.rs @@ -217,6 +217,17 @@ pub async fn get_events( offset: requested_offset, }; + // TODO: + // Instrumentation and `AggregateBloom` version of fetching events + // for the given `EventFilter` are under a feature flag for now and + // we do not execute them during testing because they would only + // slow the tests down and would not have any impact on their outcome. + // Follow-up PR will use the `AggregateBloom` logic to create the output, + // then the conditions will be removed. + + #[cfg(all(feature = "aggregate_bloom", not(test)))] + let start = std::time::Instant::now(); + let page = transaction .events( &filter, @@ -228,6 +239,40 @@ pub async fn get_events( EventFilterError::PageSizeTooSmall => GetEventsError::Custom(e.into()), })?; + #[cfg(all(feature = "aggregate_bloom", not(test)))] + { + let elapsed = start.elapsed(); + + tracing::info!( + "Getting events (individual Bloom filters) took {:?}", + elapsed + ); + + let start = std::time::Instant::now(); + let page_from_aggregate = transaction + .events_from_aggregate(&filter, context.config.get_events_max_blocks_to_scan) + .map_err(|e| match e { + EventFilterError::Internal(e) => GetEventsError::Internal(e), + EventFilterError::PageSizeTooSmall => GetEventsError::Custom(e.into()), + })?; + let elapsed = start.elapsed(); + + tracing::info!( + "Getting events (aggregate Bloom filters) took {:?}", + elapsed + ); + + if page != page_from_aggregate { + tracing::error!( + "Page of events from individual and aggregate bloom filters does not match!" + ); + tracing::error!("Individual: {:?}", page); + tracing::error!("Aggregate: {:?}", page_from_aggregate); + } else { + tracing::info!("Page of events from individual and aggregate bloom filters match!"); + } + } + let mut events = GetEventsResult { events: page.events.into_iter().map(|e| e.into()).collect(), continuation_token: page.continuation_token.map(|token| { diff --git a/crates/rpc/src/method/subscribe_events.rs b/crates/rpc/src/method/subscribe_events.rs index 537cba978d..ab3683d482 100644 --- a/crates/rpc/src/method/subscribe_events.rs +++ b/crates/rpc/src/method/subscribe_events.rs @@ -250,14 +250,14 @@ mod tests { use tokio::sync::mpsc; use crate::context::{RpcConfig, RpcContext}; - use crate::jsonrpc::{handle_json_rpc_socket, RpcRouter}; + use crate::jsonrpc::{handle_json_rpc_socket, RpcRouter, CATCH_UP_BATCH_SIZE}; use crate::pending::PendingWatcher; use crate::v02::types::syncing::Syncing; use crate::{v08, Notifications, Reorg, SyncState}; #[tokio::test] async fn no_filtering() { - let num_blocks = 2000; + let num_blocks = 80; let router = setup(num_blocks).await; let (sender_tx, mut sender_rx) = mpsc::channel(1024); let (receiver_tx, receiver_rx) = mpsc::channel(1024); @@ -319,14 +319,14 @@ mod tests { #[tokio::test] async fn filter_from_address() { - let router = setup(2000).await; + let router = setup(80).await; let (sender_tx, mut sender_rx) = mpsc::channel(1024); let (receiver_tx, receiver_rx) = mpsc::channel(1024); handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx); let params = serde_json::json!( { "block": {"block_number": 0}, - "from_address": "0x90", + "from_address": "0x46", } ); receiver_tx @@ -351,7 +351,7 @@ mod tests { } _ => panic!("Expected text message"), }; - let expected = sample_event_message(0x90, subscription_id); + let expected = sample_event_message(0x46, subscription_id); let event = sender_rx.recv().await.unwrap().unwrap(); let json: serde_json::Value = match event { Message::Text(json) => serde_json::from_str(&json).unwrap(), @@ -371,9 +371,9 @@ mod tests { .context .notifications .l2_blocks - .send(sample_block(0x90).into()) + .send(sample_block(0x46).into()) .unwrap(); - let expected = sample_event_message(0x90, subscription_id); + let expected = sample_event_message(0x46, subscription_id); let event = sender_rx.recv().await.unwrap().unwrap(); let json: serde_json::Value = match event { Message::Text(json) => serde_json::from_str(&json).unwrap(), @@ -385,14 +385,14 @@ mod tests { #[tokio::test] async fn filter_keys() { - let router = setup(2000).await; + let router = setup(80).await; let (sender_tx, mut sender_rx) = mpsc::channel(1024); let (receiver_tx, receiver_rx) = mpsc::channel(1024); handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx); let params = serde_json::json!( { "block": {"block_number": 0}, - "keys": [["0x90"], [], ["0x92", "0x93"]], + "keys": [["0x46"], [], ["0x47", "0x48"]], } ); receiver_tx @@ -417,7 +417,7 @@ mod tests { } _ => panic!("Expected text message"), }; - let expected = sample_event_message(0x90, subscription_id); + let expected = sample_event_message(0x46, subscription_id); let event = sender_rx.recv().await.unwrap().unwrap(); let json: serde_json::Value = match event { Message::Text(json) => serde_json::from_str(&json).unwrap(), @@ -437,9 +437,9 @@ mod tests { .context .notifications .l2_blocks - .send(sample_block(0x90).into()) + .send(sample_block(0x46).into()) .unwrap(); - let expected = sample_event_message(0x90, subscription_id); + let expected = sample_event_message(0x46, subscription_id); let event = sender_rx.recv().await.unwrap().unwrap(); let json: serde_json::Value = match event { Message::Text(json) => serde_json::from_str(&json).unwrap(), @@ -451,15 +451,15 @@ mod tests { #[tokio::test] async fn filter_from_address_and_keys() { - let router = setup(2000).await; + let router = setup(80).await; let (sender_tx, mut sender_rx) = mpsc::channel(1024); let (receiver_tx, receiver_rx) = mpsc::channel(1024); handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx); let params = serde_json::json!( { "block": {"block_number": 0}, - "from_address": "0x90", - "keys": [["0x90"], [], ["0x92", "0x93"]], + "from_address": "0x46", + "keys": [["0x46"], [], ["0x47", "0x48"]], } ); receiver_tx @@ -484,7 +484,7 @@ mod tests { } _ => panic!("Expected text message"), }; - let expected = sample_event_message(0x90, subscription_id); + let expected = sample_event_message(0x46, subscription_id); let event = sender_rx.recv().await.unwrap().unwrap(); let json: serde_json::Value = match event { Message::Text(json) => serde_json::from_str(&json).unwrap(), @@ -504,9 +504,9 @@ mod tests { .context .notifications .l2_blocks - .send(sample_block(0x90).into()) + .send(sample_block(0x46).into()) .unwrap(); - let expected = sample_event_message(0x90, subscription_id); + let expected = sample_event_message(0x46, subscription_id); let event = sender_rx.recv().await.unwrap().unwrap(); let json: serde_json::Value = match event { Message::Text(json) => serde_json::from_str(&json).unwrap(), @@ -518,32 +518,32 @@ mod tests { #[tokio::test] async fn too_many_keys_filter() { - let router = setup(2000).await; + let router = setup(80).await; let (sender_tx, mut sender_rx) = mpsc::channel(1024); let (receiver_tx, receiver_rx) = mpsc::channel(1024); handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx); let params = serde_json::json!( { "block": {"block_number": 0}, - "from_address": "0x90", + "from_address": "0x46", "keys": [ - ["0x91"], - ["0x92"], - ["0x93"], - ["0x94"], - ["0x95"], - ["0x96"], - ["0x97"], - ["0x98"], - ["0x99"], - ["0x9a"], - ["0x9b"], - ["0x9c"], - ["0x9d"], - ["0x9e"], - ["0x9f"], - ["0xa0"], - ["0xa1"], + ["0x46"], + ["0x47"], + ["0x48"], + ["0x49"], + ["0x4a"], + ["0x4b"], + ["0x4c"], + ["0x4d"], + ["0x4e"], + ["0x4f"], + ["0x50"], + ["0x51"], + ["0x52"], + ["0x53"], + ["0x54"], + ["0x55"], + ["0x56"], ], } ); @@ -644,6 +644,8 @@ mod tests { } async fn setup(num_blocks: u64) -> RpcRouter { + assert!(num_blocks == 0 || num_blocks > CATCH_UP_BATCH_SIZE); + let storage = StorageBuilder::in_memory().unwrap(); tokio::task::spawn_blocking({ let storage = storage.clone(); diff --git a/crates/storage/src/bloom.rs b/crates/storage/src/bloom.rs index 055428272a..f0927b9180 100644 --- a/crates/storage/src/bloom.rs +++ b/crates/storage/src/bloom.rs @@ -60,6 +60,7 @@ //! specific set of keys without having to load and check each individual bloom //! filter. +use std::collections::BTreeSet; use std::sync::{Mutex, MutexGuard}; use bloomfilter::Bloom; @@ -79,71 +80,80 @@ pub const EVENT_KEY_FILTER_LIMIT: usize = 16; /// Before being added to `AggregateBloom`, each [`BloomFilter`] is /// rotated by 90 degrees. #[derive(Debug)] -pub(crate) struct AggregateBloom { +pub struct AggregateBloom { /// A [Self::BLOCK_RANGE_LEN] by [BloomFilter::BITVEC_LEN] matrix stored in /// a single array. bitmap: Vec, - /// Block range for which the aggregate filter is constructed. - block_range: std::ops::Range, - next_block: BlockNumber, + /// Starting (inclusive) block number for the range of blocks that this + /// aggregate covers. + pub from_block: BlockNumber, + /// Ending (inclusive) block number for the range of blocks that this + /// aggregate covers. + pub to_block: BlockNumber, } +// TODO: +// Delete after cfg flag is removed +#[allow(dead_code)] impl AggregateBloom { - // TODO: - // Remove #[allow(dead_code)] when follow up is done. - /// Maximum number of blocks to aggregate in a single `AggregateBloom`. - const BLOCK_RANGE_LEN: u64 = 32_768; + pub const BLOCK_RANGE_LEN: u64 = 32_768; const BLOCK_RANGE_BYTES: u64 = Self::BLOCK_RANGE_LEN / 8; - /// Create a new `AggregateBloom` for the (`from_block`, `from_block + - /// [Self::BLOCK_RANGE_LEN]`) range. - #[allow(dead_code)] + /// Create a new `AggregateBloom` for the (`from_block`, `from_block` + + /// [`block_range_length`](Self::BLOCK_RANGE_LEN) - 1) range. pub fn new(from_block: BlockNumber) -> Self { - let bitmap = vec![0; (Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN) as usize]; + let to_block = from_block + Self::BLOCK_RANGE_LEN - 1; + let bitmap = vec![0; Self::BLOCK_RANGE_BYTES as usize * BloomFilter::BITVEC_LEN as usize]; + Self::from_parts(from_block, to_block, bitmap) + } - let to_block = from_block + Self::BLOCK_RANGE_LEN; + pub fn from_existing_compressed( + from_block: u64, + to_block: u64, + compressed_bitmap: Vec, + ) -> Self { + let from_block = BlockNumber::new_or_panic(from_block); + let to_block = BlockNumber::new_or_panic(to_block); - Self { - bitmap, - block_range: from_block..to_block, - next_block: from_block, - } + let bitmap = zstd::bulk::decompress( + &compressed_bitmap, + AggregateBloom::BLOCK_RANGE_BYTES as usize * BloomFilter::BITVEC_LEN as usize, + ) + .expect("Decompressing aggregate Bloom filter"); + + Self::from_parts(from_block, to_block, bitmap) } - #[allow(dead_code)] - pub fn from_bytes(from_block: BlockNumber, bytes: Vec) -> Self { + fn from_parts(from_block: BlockNumber, to_block: BlockNumber, bitmap: Vec) -> Self { assert_eq!( - bytes.len() as u64, + from_block + Self::BLOCK_RANGE_LEN - 1, + to_block, + "Block range mismatch" + ); + assert_eq!( + bitmap.len() as u64, Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN, - "Bitmap size mismatch" + "Bitmap length mismatch" ); - let to_block = from_block + Self::BLOCK_RANGE_LEN; - Self { - bitmap: bytes, - block_range: from_block..to_block, - next_block: from_block, + bitmap, + from_block, + to_block, } } - #[allow(dead_code)] - pub fn to_bytes(&self) -> &[u8] { - &self.bitmap + pub fn compress_bitmap(&self) -> Vec { + zstd::bulk::compress(&self.bitmap, 10).expect("Compressing aggregate Bloom filter") } /// Rotate the bloom filter by 90 degrees and add it to the aggregate. - #[allow(dead_code)] - pub fn add_bloom( - &mut self, - bloom: &BloomFilter, - insert_pos: BlockNumber, - ) -> Result<(), AddBloomError> { - if !self.block_range.contains(&insert_pos) { - return Err(AddBloomError::InvalidBlockNumber); - } - assert_eq!(self.next_block, insert_pos, "Unexpected insert position"); + pub fn add_bloom(&mut self, bloom: &BloomFilter, block_number: BlockNumber) { + assert!( + (self.from_block..=self.to_block).contains(&block_number), + "Invalid block number", + ); assert_eq!( bloom.0.number_of_hash_functions(), BloomFilter::K_NUM, @@ -157,8 +167,9 @@ impl AggregateBloom { "Bit vector length mismatch" ); - let byte_index = (insert_pos.get() / 8) as usize; - let bit_index = (insert_pos.get() % 8) as usize; + let relative_block_number = block_number.get() - self.from_block.get(); + let byte_idx = (relative_block_number / 8) as usize; + let bit_idx = (relative_block_number % 8) as usize; for (i, bloom_byte) in bloom.iter().enumerate() { if *bloom_byte == 0 { continue; @@ -167,48 +178,54 @@ impl AggregateBloom { let base = 8 * i; for j in 0..8 { let row_idx = base + j; - let idx = Self::bitmap_index_at(row_idx, byte_index); - self.bitmap[idx] |= ((bloom_byte >> (7 - j)) & 1) << bit_index; + *self.bitmap_at_mut(row_idx, byte_idx) |= ((bloom_byte >> (7 - j)) & 1) << bit_idx; } } + } - self.next_block += 1; - if self.next_block >= self.block_range.end { - tracing::info!( - "Block limit reached for [{}, {}) range", - self.block_range.start, - self.block_range.end - ); - return Err(AddBloomError::BlockLimitReached); + pub fn blocks_for_filter(&self, filter: &crate::EventFilter) -> BTreeSet { + // Empty filters are considered present in all blocks. + if filter.contract_address.is_none() && (filter.keys.iter().flatten().count() == 0) { + return (self.from_block.get()..=self.to_block.get()) + .map(BlockNumber::new_or_panic) + .collect(); } - Ok(()) - } - - #[allow(dead_code)] - pub fn blocks_for_filter(&self, filter: &crate::EventFilter) -> Vec { - let mut keys = vec![]; + let mut blocks: BTreeSet<_> = filter + .keys + .iter() + .enumerate() + .flat_map(|(idx, keys)| { + let keys: Vec<_> = keys + .iter() + .map(|key| { + let mut key_with_idx = key.0; + key_with_idx.as_mut_be_bytes()[0] |= (idx as u8) << 4; + key_with_idx + }) + .collect(); + + self.blocks_for_keys(&keys) + }) + .collect(); if let Some(contract_address) = filter.contract_address { - keys.push(contract_address.0); + blocks.extend(self.blocks_for_keys(&[contract_address.0])); } - filter.keys.iter().flatten().for_each(|k| keys.push(k.0)); - self.blocks_for_keys(keys) + blocks } - #[allow(dead_code)] - fn blocks_for_keys(&self, keys: Vec) -> Vec { + fn blocks_for_keys(&self, keys: &[Felt]) -> Vec { let mut block_matches = vec![]; for k in keys { let mut row_to_check = vec![u8::MAX; Self::BLOCK_RANGE_BYTES as usize]; - let indices = BloomFilter::indices_for_key(&k); + let indices = BloomFilter::indices_for_key(k); for row_idx in indices { for (col_idx, row_byte) in row_to_check.iter_mut().enumerate() { - let idx = Self::bitmap_index_at(row_idx, col_idx); - *row_byte &= self.bitmap[idx]; + *row_byte &= self.bitmap_at(row_idx, col_idx); } } @@ -219,7 +236,8 @@ impl AggregateBloom { for i in 0..8 { if byte & (1 << i) != 0 { - block_matches.push(BlockNumber::new_or_panic((col_idx * 8 + i) as u64)); + let match_number = self.from_block + col_idx as u64 * 8 + i as u64; + block_matches.push(match_number); } } } @@ -228,16 +246,15 @@ impl AggregateBloom { block_matches } - #[allow(dead_code)] - fn bitmap_index_at(row: usize, col: usize) -> usize { - row * Self::BLOCK_RANGE_BYTES as usize + col + fn bitmap_at(&self, row: usize, col: usize) -> u8 { + let idx = row * Self::BLOCK_RANGE_BYTES as usize + col; + self.bitmap[idx] } -} -#[derive(Debug)] -pub enum AddBloomError { - BlockLimitReached, - InvalidBlockNumber, + fn bitmap_at_mut(&mut self, row: usize, col: usize) -> &mut u8 { + let idx = row * Self::BLOCK_RANGE_BYTES as usize + col; + &mut self.bitmap[idx] + } } #[derive(Clone)] @@ -354,6 +371,9 @@ impl BloomFilter { // Workaround to get the indices of the keys in the filter. // Needed because the `bloomfilter` crate doesn't provide a // way to get this information. + // TODO: + // Delete after cfg flag is removed + #[allow(dead_code)] fn indices_for_key(key: &Felt) -> Vec { // Use key on an empty Bloom filter let mut bloom = Self::new(); @@ -400,7 +420,6 @@ impl Cache { #[cfg(test)] mod tests { - use assert_matches::assert_matches; use pathfinder_common::felt; use super::*; @@ -439,148 +458,141 @@ mod tests { bloom.set(&KEY); bloom.set(&KEY1); - aggregate_bloom_filter - .add_bloom(&bloom, from_block) - .unwrap(); + aggregate_bloom_filter.add_bloom(&bloom, from_block); - let block_matches = aggregate_bloom_filter.blocks_for_keys(vec![KEY]); + let filter = crate::EventFilter { + keys: vec![vec![EventKey(KEY)]], + contract_address: None, + ..Default::default() + }; + let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]); + assert_eq!(block_matches, vec![from_block]); + + let block_matches: Vec<_> = aggregate_bloom_filter + .blocks_for_filter(&filter) + .into_iter() + .collect(); assert_eq!(block_matches, vec![from_block]); } #[test] #[cfg_attr(not(feature = "aggregate_bloom"), ignore)] - fn add_blooms_and_check_multiple_blocks_found() { - let from_block = BlockNumber::new_or_panic(0); + fn aggregate_bloom_past_first_range() { + let from_block = BlockNumber::new_or_panic(AggregateBloom::BLOCK_RANGE_LEN); let mut aggregate_bloom_filter = AggregateBloom::new(from_block); let mut bloom = BloomFilter::new(); bloom.set(&KEY); + bloom.set(&KEY1); - aggregate_bloom_filter - .add_bloom(&bloom, from_block) - .unwrap(); - aggregate_bloom_filter - .add_bloom(&bloom, from_block + 1) - .unwrap(); + let filter = crate::EventFilter { + keys: vec![vec![EventKey(KEY)]], + contract_address: None, + ..Default::default() + }; - let block_matches = aggregate_bloom_filter.blocks_for_keys(vec![KEY]); + aggregate_bloom_filter.add_bloom(&bloom, from_block); - assert_eq!(block_matches, vec![from_block, from_block + 1]); + let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]); + assert_eq!(block_matches, vec![from_block]); + + let block_matches: Vec<_> = aggregate_bloom_filter + .blocks_for_filter(&filter) + .into_iter() + .collect(); + assert_eq!(block_matches, vec![from_block]); } #[test] #[cfg_attr(not(feature = "aggregate_bloom"), ignore)] - fn key_not_in_filter_returns_empty_vec() { + fn add_blooms_and_check_multiple_blocks_found() { let from_block = BlockNumber::new_or_panic(0); let mut aggregate_bloom_filter = AggregateBloom::new(from_block); let mut bloom = BloomFilter::new(); bloom.set(&KEY); - bloom.set(&KEY1); - aggregate_bloom_filter - .add_bloom(&bloom, from_block) - .unwrap(); - aggregate_bloom_filter - .add_bloom(&bloom, from_block + 1) - .unwrap(); + aggregate_bloom_filter.add_bloom(&bloom, from_block); + aggregate_bloom_filter.add_bloom(&bloom, from_block + 1); - let block_matches_empty = aggregate_bloom_filter.blocks_for_keys(vec![KEY_NOT_IN_FILTER]); + let filter = crate::EventFilter { + keys: vec![vec![EventKey(KEY)]], + contract_address: None, + ..Default::default() + }; - assert_eq!(block_matches_empty, Vec::::new()); + let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]); + assert_eq!(block_matches, vec![from_block, from_block + 1]); + + let block_matches: Vec<_> = aggregate_bloom_filter + .blocks_for_filter(&filter) + .into_iter() + .collect(); + assert_eq!(block_matches, vec![from_block, from_block + 1]); } #[test] #[cfg_attr(not(feature = "aggregate_bloom"), ignore)] - fn serialize_aggregate_roundtrip() { + fn key_not_in_filter_returns_empty_vec() { let from_block = BlockNumber::new_or_panic(0); let mut aggregate_bloom_filter = AggregateBloom::new(from_block); let mut bloom = BloomFilter::new(); bloom.set(&KEY); + bloom.set(&KEY1); - aggregate_bloom_filter - .add_bloom(&bloom, from_block) - .unwrap(); - aggregate_bloom_filter - .add_bloom(&bloom, from_block + 1) - .unwrap(); - - let bytes = aggregate_bloom_filter.to_bytes(); - let aggregate_bloom_filter = AggregateBloom::from_bytes(from_block, bytes.to_vec()); + aggregate_bloom_filter.add_bloom(&bloom, from_block); + aggregate_bloom_filter.add_bloom(&bloom, from_block + 1); - let block_matches = aggregate_bloom_filter.blocks_for_keys(vec![KEY]); - let block_matches_empty = aggregate_bloom_filter.blocks_for_keys(vec![KEY_NOT_IN_FILTER]); + let block_matches_empty = aggregate_bloom_filter.blocks_for_keys(&[KEY_NOT_IN_FILTER]); - assert_eq!(block_matches, vec![from_block, from_block + 1]); assert_eq!(block_matches_empty, Vec::::new()); } #[test] #[cfg_attr(not(feature = "aggregate_bloom"), ignore)] - fn block_limit_reached_after_full_range() { - impl AggregateBloom { - /// Real [Self::add_bloom] makes this test last way to long - fn add_bloom_mock(&mut self) { - self.next_block += 1; - } - } - + fn serialize_aggregate_roundtrip() { let from_block = BlockNumber::new_or_panic(0); let mut aggregate_bloom_filter = AggregateBloom::new(from_block); let mut bloom = BloomFilter::new(); bloom.set(&KEY); - for _ in from_block.get()..(AggregateBloom::BLOCK_RANGE_LEN - 1) { - aggregate_bloom_filter.add_bloom_mock(); - } + aggregate_bloom_filter.add_bloom(&bloom, from_block); + aggregate_bloom_filter.add_bloom(&bloom, from_block + 1); - let last_block = from_block + AggregateBloom::BLOCK_RANGE_LEN - 1; - assert_matches!( - aggregate_bloom_filter.add_bloom(&bloom, last_block), - Err(AddBloomError::BlockLimitReached) + let compressed_bitmap = aggregate_bloom_filter.compress_bitmap(); + let mut decompressed = AggregateBloom::from_existing_compressed( + aggregate_bloom_filter.from_block.get(), + aggregate_bloom_filter.to_block.get(), + compressed_bitmap, ); - } - - #[test] - #[cfg_attr(not(feature = "aggregate_bloom"), ignore)] - fn invalid_insert_pos() { - let from_block = BlockNumber::new_or_panic(0); - let mut aggregate_bloom_filter = AggregateBloom::new(from_block); + decompressed.add_bloom(&bloom, from_block + 2); - let mut bloom = BloomFilter::new(); - bloom.set(&KEY); + let block_matches = decompressed.blocks_for_keys(&[KEY]); + let block_matches_empty = decompressed.blocks_for_keys(&[KEY_NOT_IN_FILTER]); - aggregate_bloom_filter - .add_bloom(&bloom, from_block) - .unwrap(); - - let invalid_insert_pos = from_block + AggregateBloom::BLOCK_RANGE_LEN; - assert_matches!( - aggregate_bloom_filter.add_bloom(&bloom, invalid_insert_pos), - Err(AddBloomError::InvalidBlockNumber) + assert_eq!( + block_matches, + vec![from_block, from_block + 1, from_block + 2] ); + assert_eq!(block_matches_empty, Vec::::new()); } #[test] #[cfg_attr(not(feature = "aggregate_bloom"), ignore)] #[should_panic] - fn skipping_a_block_panics() { + fn invalid_insert_pos() { let from_block = BlockNumber::new_or_panic(0); let mut aggregate_bloom_filter = AggregateBloom::new(from_block); let mut bloom = BloomFilter::new(); bloom.set(&KEY); - aggregate_bloom_filter - .add_bloom(&bloom, from_block) - .unwrap(); + aggregate_bloom_filter.add_bloom(&bloom, from_block); - let skipped_block = from_block + 2; - aggregate_bloom_filter - .add_bloom(&bloom, skipped_block) - .unwrap(); + let invalid_insert_pos = from_block + AggregateBloom::BLOCK_RANGE_LEN; + aggregate_bloom_filter.add_bloom(&bloom, invalid_insert_pos); } } diff --git a/crates/storage/src/connection/event.rs b/crates/storage/src/connection/event.rs index c779dac3c2..726efd1305 100644 --- a/crates/storage/src/connection/event.rs +++ b/crates/storage/src/connection/event.rs @@ -1,5 +1,7 @@ use std::num::NonZeroUsize; +#[cfg(feature = "aggregate_bloom")] +use anyhow::Context; use anyhow::Result; use pathfinder_common::event::Event; use pathfinder_common::{ @@ -12,8 +14,8 @@ use pathfinder_common::{ }; #[cfg(feature = "aggregate_bloom")] -use crate::bloom::AddBloomError; -use crate::bloom::{AggregateBloom, BloomFilter}; +use crate::bloom::AggregateBloom; +use crate::bloom::BloomFilter; use crate::prelude::*; use crate::ReorgCounter; @@ -66,14 +68,51 @@ pub struct PageOfEvents { } impl Transaction<'_> { + #[cfg(feature = "aggregate_bloom")] + pub(super) fn upsert_block_events_aggregate( + &self, + block_number: BlockNumber, + events: &[Event], + ) -> anyhow::Result<()> { + #[rustfmt::skip] + let mut stmt = self.inner().prepare_cached( + "INSERT INTO starknet_event_filters_aggregate (from_block, to_block, bloom) \ + VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET bloom=excluded.bloom", + )?; + + let mut running_aggregate = match self.load_aggregate_bloom(block_number)? { + // Loading existing block range + Some(aggregate) => aggregate, + // New block range reached + None => AggregateBloom::new(block_number), + }; + + let mut bloom = BloomFilter::new(); + for event in events { + bloom.set_keys(&event.keys); + bloom.set_address(&event.from_address); + } + + running_aggregate.add_bloom(&bloom, block_number); + + stmt.execute(params![ + &running_aggregate.from_block, + &running_aggregate.to_block, + &running_aggregate.compress_bitmap() + ])?; + + Ok(()) + } + pub(super) fn upsert_block_events<'a>( &self, block_number: BlockNumber, events: impl Iterator, ) -> anyhow::Result<()> { + #[rustfmt::skip] let mut stmt = self.inner().prepare_cached( - "INSERT INTO starknet_events_filters (block_number, bloom) VALUES (?, ?) ON CONFLICT \ - DO UPDATE SET bloom=excluded.bloom", + "INSERT INTO starknet_events_filters (block_number, bloom) VALUES (?, ?) \ + ON CONFLICT DO UPDATE SET bloom=excluded.bloom", )?; let mut bloom = BloomFilter::new(); @@ -249,87 +288,131 @@ impl Transaction<'_> { } }; - // TODO: - // The logic that constructs aggregate bloom filters is temporarily - // placed here, in order to compare with the current implementation. - // It will be moved to sync as a follow up. - #[cfg(feature = "aggregate_bloom")] - { - let mut aggregates = vec![]; - let mut running_aggregate = AggregateBloom::new(from_block); + match result { + ScanResult::Done => { + return Ok(PageOfEvents { + events: emitted_events, + continuation_token: None, + }) + } + ScanResult::PageFull => { + assert!(emitted_events.len() > filter.page_size); + let continuation_token = continuation_token( + &emitted_events, + ContinuationToken { + block_number: from_block, + offset: filter.offset, + }, + ) + .unwrap(); + emitted_events.truncate(filter.page_size); - let mut blocks_from_individual = vec![]; + return Ok(PageOfEvents { + events: emitted_events, + continuation_token: Some(ContinuationToken { + block_number: continuation_token.block_number, + // account for the extra event + offset: continuation_token.offset - 1, + }), + }); + } + ScanResult::ContinueFrom(block_number) => { + // We've reached a search limit without filling the page. + // We'll need to continue from the next block. + return Ok(PageOfEvents { + events: emitted_events, + continuation_token: Some(ContinuationToken { + block_number, + offset: 0, + }), + }); + } + } + } - for block_num in from_block.get()..=to_block.get() { - if block_num as usize >= max_blocks_to_scan.get() { - break; - } + // TODO: + // This function is temporarily here to compare the performance of the new + // aggregate bloom filter. + #[cfg(feature = "aggregate_bloom")] + pub fn events_from_aggregate( + &self, + filter: &EventFilter, + max_blocks_to_scan: NonZeroUsize, + ) -> Result { + use std::collections::BTreeSet; - let block_num = BlockNumber::new_or_panic(block_num); + if filter.page_size < 1 { + return Err(EventFilterError::PageSizeTooSmall); + } - // TODO: - // Using single block `BloomFilter` API for now since we don't have - // a table for `AggregateBloom` yet. - let bloom = self.load_bloom(reorg_counter, block_num)?; - match bloom { - Filter::Missing => {} - Filter::Cached(bloom) => { - if bloom.check_filter(filter) { - blocks_from_individual.push(block_num); - } + let from_block = filter.from_block.unwrap_or(BlockNumber::GENESIS); + let to_block = filter.to_block.unwrap_or(BlockNumber::MAX); + let key_filter_is_empty = filter.keys.iter().flatten().count() == 0; - match running_aggregate.add_bloom(&bloom, block_num) { - Ok(_) => {} - Err(AddBloomError::BlockLimitReached) => { - aggregates.push(running_aggregate); - running_aggregate = AggregateBloom::new(block_num + 1); - } - Err(AddBloomError::InvalidBlockNumber) => { - unreachable!() // For now. - } - } - } - Filter::Loaded(bloom) => { - if bloom.check_filter(filter) { - blocks_from_individual.push(block_num); - } + let mut emitted_events = Vec::new(); - match running_aggregate.add_bloom(&bloom, block_num) { - Ok(_) => {} - Err(AddBloomError::BlockLimitReached) => { - aggregates.push(running_aggregate); - running_aggregate = AggregateBloom::new(block_num + 1); - } - Err(AddBloomError::InvalidBlockNumber) => { - unreachable!() // For now. - } - } - } - } - } + let mut blocks_scanned: usize = 0; + let mut offset = filter.offset; - // Remainder of (to_block - from_block) % AggregateBloom::BLOCK_RANGE_LEN - aggregates.push(running_aggregate); + enum ScanResult { + Done, + PageFull, + ContinueFrom(BlockNumber), + } - let blocks_from_aggregate = aggregates.iter().fold(vec![], |mut acc, aggregate| { + let aggregates = self.load_aggregate_bloom_range(from_block, to_block)?; + let mut filtered_blocks = aggregates + .iter() + .fold(BTreeSet::new(), |mut acc, aggregate| { acc.extend(aggregate.blocks_for_filter(filter)); acc }); - if blocks_from_individual != blocks_from_aggregate { - tracing::error!("Blocks from individual and aggregate bloom filter do not match"); - tracing::error!("Individual: {:?}", blocks_from_individual,); - tracing::error!("Aggregate: {:?}", blocks_from_aggregate,); + filtered_blocks.retain(|&block| block >= from_block && block <= to_block); + + let mut blocks_iter = filtered_blocks.iter(); + let result = loop { + let Some(&block) = blocks_iter.next() else { + break ScanResult::Done; + }; + + // Stop if we're past the last block. + if block > to_block { + break ScanResult::Done; } - } - match result { - ScanResult::Done => { - return Ok(PageOfEvents { - events: emitted_events, - continuation_token: None, - }) + // Check if we've reached our block scan limit + blocks_scanned += 1; + if blocks_scanned > max_blocks_to_scan.get() { + tracing::trace!("Block scan limit reached"); + break ScanResult::ContinueFrom(block); + } + + match self.scan_block_into( + block, + filter, + key_filter_is_empty, + offset, + &mut emitted_events, + )? { + BlockScanResult::NoSuchBlock => break ScanResult::Done, + BlockScanResult::Done { new_offset } => { + offset = new_offset; + } } + + // Stop if we have a page of events plus an extra one to decide if we're on + // the last page. + if emitted_events.len() > filter.page_size { + break ScanResult::PageFull; + } + }; + + match result { + ScanResult::Done => Ok(PageOfEvents { + events: emitted_events, + continuation_token: None, + }), ScanResult::PageFull => { assert!(emitted_events.len() > filter.page_size); let continuation_token = continuation_token( @@ -342,25 +425,25 @@ impl Transaction<'_> { .unwrap(); emitted_events.truncate(filter.page_size); - return Ok(PageOfEvents { + Ok(PageOfEvents { events: emitted_events, continuation_token: Some(ContinuationToken { block_number: continuation_token.block_number, // account for the extra event offset: continuation_token.offset - 1, }), - }); + }) } ScanResult::ContinueFrom(block_number) => { // We've reached a search limit without filling the page. // We'll need to continue from the next block. - return Ok(PageOfEvents { + Ok(PageOfEvents { events: emitted_events, continuation_token: Some(ContinuationToken { block_number, offset: 0, }), - }); + }) } } } @@ -467,28 +550,63 @@ impl Transaction<'_> { }) } - // TODO: - // Implement once [`AggregateBloom`] table is added. - fn _running_bloom_aggregate(&self) -> Result, anyhow::Error> { - // Fetch running aggregate from DB - unimplemented!() + #[cfg(feature = "aggregate_bloom")] + fn load_aggregate_bloom( + &self, + block_number: BlockNumber, + ) -> anyhow::Result> { + #[rustfmt::skip] + let mut select_stmt = self.inner().prepare_cached( + "SELECT from_block, to_block, bloom FROM starknet_event_filters_aggregate \ + WHERE from_block <= ? AND to_block >= ?", + )?; + + let aggregate = select_stmt + .query_row(params![&block_number, &block_number], |row| { + let from_block: u64 = row.get(0)?; + let to_block: u64 = row.get(1)?; + let compressed_bitmap: Vec = row.get(2)?; + + Ok((from_block, to_block, compressed_bitmap)) + }) + .optional() + .context("Querying running bloom aggregate")? + .map(|(from_block, to_block, compressed_bitmap)| { + AggregateBloom::from_existing_compressed(from_block, to_block, compressed_bitmap) + }); + + Ok(aggregate) } - fn _load_bloom_range( + #[cfg(feature = "aggregate_bloom")] + fn load_aggregate_bloom_range( &self, - _from_block: BlockNumber, - _to_block: BlockNumber, + start_block: BlockNumber, + end_block: BlockNumber, ) -> anyhow::Result> { - // Should be something like: - // (from_block..to_block) - // .chunks(AggregateBloom::BLOCK_RANGE_LEN) - // .iter() - // .enumerate() - // .map(|(i, _)| { - // // load from DB where ID is i - // }) - // .collect() - unimplemented!() + #[rustfmt::skip] + let mut stmt = self.inner().prepare_cached( + "SELECT from_block, to_block, bloom FROM starknet_event_filters_aggregate \ + WHERE from_block <= ? AND to_block >= ? \ + ORDER BY from_block", + )?; + + let aggregates = stmt + .query_map(params![&end_block, &start_block], |row| { + let from_block: u64 = row.get(0)?; + let to_block: u64 = row.get(1)?; + let compressed_bitmap: Vec = row.get(2)?; + + Ok(AggregateBloom::from_existing_compressed( + from_block, + to_block, + compressed_bitmap, + )) + }) + .context("Querying bloom filter range")? + .collect::, _>>()?; + + Ok(aggregates) } } @@ -584,6 +702,14 @@ mod tests { continuation_token: None, } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -724,6 +850,14 @@ mod tests { continuation_token: None, } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -755,6 +889,14 @@ mod tests { continuation_token: None, } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -788,6 +930,14 @@ mod tests { } ); + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } + // test continuation token let filter = EventFilter { from_block: Some(events.continuation_token.unwrap().block_number), @@ -810,6 +960,14 @@ mod tests { continuation_token: None, } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -840,6 +998,14 @@ mod tests { continuation_token: None, } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -870,6 +1036,14 @@ mod tests { continuation_token: None, } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -900,11 +1074,20 @@ mod tests { } ); + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } + // try event keys in the wrong order, should not match let filter = EventFilter { keys: vec![vec![expected_event.keys[1]], vec![expected_event.keys[0]]], ..filter }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -915,6 +1098,14 @@ mod tests { continuation_token: None, } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -943,6 +1134,14 @@ mod tests { continuation_token: None, } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -960,6 +1159,7 @@ mod tests { page_size: 10, offset: 0, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -974,6 +1174,14 @@ mod tests { } ); + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } + let filter = EventFilter { from_block: None, to_block: None, @@ -982,6 +1190,7 @@ mod tests { page_size: 10, offset: 10, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -996,6 +1205,14 @@ mod tests { } ); + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } + let filter = EventFilter { from_block: None, to_block: None, @@ -1004,6 +1221,7 @@ mod tests { page_size: 10, offset: 30, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -1014,6 +1232,14 @@ mod tests { continuation_token: None } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -1032,6 +1258,7 @@ mod tests { // _after_ the last one offset: test_utils::NUM_BLOCKS * test_utils::EVENTS_PER_BLOCK, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -1042,6 +1269,14 @@ mod tests { continuation_token: None, } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -1065,6 +1300,7 @@ mod tests { page_size: 2, offset: 0, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -1079,6 +1315,14 @@ mod tests { } ); + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } + // increase offset let filter: EventFilter = EventFilter { from_block: None, @@ -1088,6 +1332,7 @@ mod tests { page_size: 2, offset: 2, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -1102,6 +1347,14 @@ mod tests { } ); + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } + // using the continuation token should be equivalent to the previous query let filter: EventFilter = EventFilter { from_block: Some(BlockNumber::new_or_panic(0)), @@ -1111,6 +1364,7 @@ mod tests { page_size: 2, offset: 2, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -1125,6 +1379,14 @@ mod tests { } ); + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } + // increase offset by two let filter = EventFilter { from_block: None, @@ -1134,6 +1396,7 @@ mod tests { page_size: 2, offset: 4, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -1145,6 +1408,14 @@ mod tests { } ); + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } + // using the continuation token should be equivalent to the previous query let filter = EventFilter { from_block: Some(BlockNumber::new_or_panic(3)), @@ -1154,6 +1425,7 @@ mod tests { page_size: 2, offset: 1, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -1164,6 +1436,14 @@ mod tests { continuation_token: None, } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] @@ -1181,6 +1461,7 @@ mod tests { page_size: 20, offset: 0, }; + let events = tx .events(&filter, 1.try_into().unwrap(), *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -1195,6 +1476,14 @@ mod tests { } ); + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, 1.try_into().unwrap()) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } + let filter = EventFilter { from_block: Some(BlockNumber::new_or_panic(1)), to_block: None, @@ -1203,6 +1492,7 @@ mod tests { page_size: 20, offset: 0, }; + let events = tx .events(&filter, 1.try_into().unwrap(), *MAX_BLOOM_FILTERS_TO_LOAD) .unwrap(); @@ -1216,13 +1506,17 @@ mod tests { }), } ); + + #[cfg(feature = "aggregate_bloom")] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, 1.try_into().unwrap()) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } #[test] - // TODO: - // This fails when "aggregate_bloom" feature is enabled because in that case all filters are - // loaded twice. We can ignore it for now. - #[cfg_attr(feature = "aggregate_bloom", ignore)] fn bloom_filter_load_limit() { let (storage, test_data) = test_utils::setup_test_storage(); let emitted_events = test_data.events; @@ -1237,6 +1531,7 @@ mod tests { page_size: emitted_events.len(), offset: 0, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, 1.try_into().unwrap()) .unwrap(); @@ -1251,6 +1546,17 @@ mod tests { } ); + // TODO: + // This does not match at the moment because aggregate bloom implementation + // does not have a limit on the number of bloom filters to load. + #[cfg(all(feature = "aggregate_bloom", any()))] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } + let filter = EventFilter { from_block: Some(BlockNumber::new_or_panic(1)), to_block: None, @@ -1259,6 +1565,7 @@ mod tests { page_size: emitted_events.len(), offset: 0, }; + let events = tx .events(&filter, *MAX_BLOCKS_TO_SCAN, 1.try_into().unwrap()) .unwrap(); @@ -1272,5 +1579,16 @@ mod tests { }), } ); + + // TODO: + // This does not match at the moment because aggregate bloom implementation + // does not have a limit on the number of bloom filters to load. + #[cfg(all(feature = "aggregate_bloom", any()))] + { + let events_from_aggregate = tx + .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN) + .unwrap(); + assert_eq!(events_from_aggregate, events); + } } } diff --git a/crates/storage/src/connection/transaction.rs b/crates/storage/src/connection/transaction.rs index 7187bec55e..91894e86dc 100644 --- a/crates/storage/src/connection/transaction.rs +++ b/crates/storage/src/connection/transaction.rs @@ -167,6 +167,12 @@ impl Transaction<'_> { .context("Inserting transaction data")?; if let Some(events) = events { + #[cfg(feature = "aggregate_bloom")] + { + let events: Vec = events.iter().flatten().cloned().collect(); + self.upsert_block_events_aggregate(block_number, &events) + .context("Inserting events into Bloom filter aggregate")?; + } let events = events.iter().flatten(); self.upsert_block_events(block_number, events) .context("Inserting events into Bloom filter")?; @@ -210,6 +216,12 @@ impl Transaction<'_> { ]) .context("Updating events")?; + #[cfg(feature = "aggregate_bloom")] + { + let events: Vec = events.iter().flatten().cloned().collect(); + self.upsert_block_events_aggregate(block_number, &events) + .context("Inserting events into Bloom filter aggregate")?; + } self.upsert_block_events(block_number, events.iter().flatten()) .context("Inserting events into Bloom filter")?; diff --git a/crates/storage/src/schema.rs b/crates/storage/src/schema.rs index 88a8af88e8..ffe92147ec 100644 --- a/crates/storage/src/schema.rs +++ b/crates/storage/src/schema.rs @@ -25,6 +25,8 @@ mod revision_0062; mod revision_0063; mod revision_0064; mod revision_0065; +#[cfg(feature = "aggregate_bloom")] +mod revision_0066; pub(crate) use base::base_schema; @@ -58,6 +60,8 @@ pub fn migrations() -> &'static [MigrationFn] { revision_0063::migrate, revision_0064::migrate, revision_0065::migrate, + #[cfg(feature = "aggregate_bloom")] + revision_0066::migrate, ] } diff --git a/crates/storage/src/schema/revision_0066.rs b/crates/storage/src/schema/revision_0066.rs new file mode 100644 index 0000000000..52159524a0 --- /dev/null +++ b/crates/storage/src/schema/revision_0066.rs @@ -0,0 +1,77 @@ +use anyhow::Context; +use pathfinder_common::BlockNumber; +use rusqlite::params; + +use crate::bloom::{AggregateBloom, BloomFilter}; + +#[allow(dead_code)] +pub(crate) fn migrate(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> { + tracing::warn!("Creating starknet_event_filters table with aggregate bloom filters"); + + let mut select_old_filters_query = + tx.prepare("SELECT bloom FROM starknet_events_filters ORDER BY block_number")?; + + let mut bloom_filters_bytes = select_old_filters_query + .query_map(params![], |row| { + let bytes = row.get::<_, Vec>(0)?; + + Ok(bytes) + }) + .context("Selecting old filters")?; + + let mut bloom_filters = vec![]; + loop { + let Some(bloom) = bloom_filters_bytes.next().transpose()? else { + break; + }; + + bloom_filters.push(BloomFilter::from_compressed_bytes(&bloom)); + } + + tx.execute( + "CREATE TABLE starknet_event_filters_aggregate ( + from_block INTEGER NOT NULL, + to_block INTEGER NOT NULL, + bloom BLOB, + UNIQUE(from_block, to_block) + )", + params![], + ) + .context("Creating starknet_event_filters_aggregate table")?; + + bloom_filters + .chunks(AggregateBloom::BLOCK_RANGE_LEN as usize) + .enumerate() + .try_for_each(|(i, bloom_filter_chunk)| -> anyhow::Result<()> { + let from_block = i as u64 * AggregateBloom::BLOCK_RANGE_LEN; + let to_block = from_block + AggregateBloom::BLOCK_RANGE_LEN - 1; + let from_block = BlockNumber::new_or_panic(from_block); + let to_block = BlockNumber::new_or_panic(to_block); + + let mut aggregate = AggregateBloom::new(from_block); + + for (j, bloom_filter) in bloom_filter_chunk.iter().enumerate() { + let block_number = from_block + j as u64; + + aggregate.add_bloom(bloom_filter, block_number); + } + + tx.execute( + "INSERT INTO starknet_event_filters_aggregate (from_block, to_block, bloom) + VALUES (?, ?, ?)", + params![ + &from_block.get(), + &to_block.get(), + &aggregate.compress_bitmap() + ], + ) + .context("Inserting aggregate bloom filter")?; + + Ok(()) + })?; + + // TODO: + // Delete old filters table + + Ok(()) +}