diff --git a/Cargo.lock b/Cargo.lock index 1eb49fc..5731461 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,18 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" +dependencies = [ + "cfg-if", + "getrandom", + "once_cell", + "version_check", +] + [[package]] name = "android-tzdata" version = "0.1.1" @@ -387,11 +399,13 @@ dependencies = [ "libipld", "libipld-core", "proptest", + "quick_cache", "roaring-graphs", "serde", "serde_ipld_dagcbor", "test-strategy", "thiserror", + "tokio", "tracing", "tracing-subscriber", "wnfs-common", @@ -696,6 +710,12 @@ version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91" +[[package]] +name = "equivalent" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" + [[package]] name = "errno" version = "0.3.1" @@ -906,6 +926,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -951,7 +977,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.12.3", ] [[package]] @@ -1145,6 +1171,16 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" +[[package]] +name = "lock_api" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1cc9717a20b1bb222f333e6a92fd32f7d8a18ddc5a3191a11af45dcbf4dcd16" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.19" @@ -1295,6 +1331,29 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + [[package]] name = "pin-project-lite" version = "0.2.9" @@ -1407,6 +1466,18 @@ dependencies = [ "byteorder", ] +[[package]] +name = "quick_cache" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f69f8d22fa3f34f3083d9a4375c038732c7a7e964de1beb81c544da92dfc40b8" +dependencies = [ + "ahash", + "equivalent", + "hashbrown 0.14.0", + "parking_lot", +] + [[package]] name = "quote" version = "1.0.32" diff --git a/car-mirror-benches/Cargo.toml b/car-mirror-benches/Cargo.toml index ba1c467..ccda826 100644 --- a/car-mirror-benches/Cargo.toml +++ b/car-mirror-benches/Cargo.toml @@ -10,7 +10,7 @@ anyhow = "1.0" async-std = { version = "1.11", features = ["attributes"] } async-trait = "0.1" bytes = "1.4.0" -car-mirror = { path = "../car-mirror", version = "0.1", features = ["test_utils"] } +car-mirror = { path = "../car-mirror", version = "0.1", features = ["test_utils", "quick_cache"] } libipld = "0.16.0" wnfs-common = "0.1.23" diff --git a/car-mirror-benches/benches/artificially_slow_blockstore.rs b/car-mirror-benches/benches/artificially_slow_blockstore.rs index f74ec72..b34bcbf 100644 --- a/car-mirror-benches/benches/artificially_slow_blockstore.rs +++ b/car-mirror-benches/benches/artificially_slow_blockstore.rs @@ -5,6 +5,7 @@ use car_mirror::{ common::Config, pull, push, test_utils::{arb_ipld_dag, links_to_padded_ipld, setup_blockstore}, + traits::InMemoryCache, }; use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use libipld::Cid; @@ -27,19 +28,26 @@ pub fn push_throttled(c: &mut Criterion) { }, |(client_store, root)| { let client_store = &ThrottledBlockStore(client_store); + let client_cache = &InMemoryCache::new(10_000); let server_store = &ThrottledBlockStore::new(); + let server_cache = &InMemoryCache::new(10_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory async_std::task::block_on(async move { - let mut request = push::request(root, None, config, client_store).await?; + let mut request = + push::request(root, None, config, client_store, client_cache).await?; loop { - let response = push::response(root, request, config, server_store).await?; + let response = + push::response(root, request, config, server_store, server_cache) + .await?; if response.indicates_finished() { break; } - request = push::request(root, Some(response), config, client_store).await?; + request = + push::request(root, Some(response), config, client_store, client_cache) + .await?; } Ok::<(), anyhow::Error>(()) @@ -67,15 +75,22 @@ pub fn pull_throttled(c: &mut Criterion) { }, |(server_store, root)| { let server_store = &ThrottledBlockStore(server_store); + let server_cache = &InMemoryCache::new(10_000); let client_store = &ThrottledBlockStore::new(); + let client_cache = &InMemoryCache::new(10_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory async_std::task::block_on(async move { - let mut request = pull::request(root, None, config, client_store).await?; + let mut request = + pull::request(root, None, config, client_store, client_cache).await?; loop { - let response = pull::response(root, request, config, server_store).await?; - request = pull::request(root, Some(response), config, client_store).await?; + let response = + pull::response(root, request, config, server_store, server_cache) + .await?; + request = + pull::request(root, Some(response), config, client_store, client_cache) + .await?; if request.indicates_finished() { break; diff --git a/car-mirror-benches/benches/in_memory.rs b/car-mirror-benches/benches/in_memory.rs index 84b4291..f77ac4b 100644 --- a/car-mirror-benches/benches/in_memory.rs +++ b/car-mirror-benches/benches/in_memory.rs @@ -2,6 +2,7 @@ use car_mirror::{ common::Config, pull, push, test_utils::{arb_ipld_dag, links_to_padded_ipld, setup_blockstore}, + traits::InMemoryCache, }; use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use wnfs_common::MemoryBlockStore; @@ -21,19 +22,26 @@ pub fn push(c: &mut Criterion) { (store, root) }, |(ref client_store, root)| { + let client_cache = &InMemoryCache::new(10_000); let server_store = &MemoryBlockStore::new(); + let server_cache = &InMemoryCache::new(10_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory async_std::task::block_on(async move { - let mut request = push::request(root, None, config, client_store).await?; + let mut request = + push::request(root, None, config, client_store, client_cache).await?; loop { - let response = push::response(root, request, config, server_store).await?; + let response = + push::response(root, request, config, server_store, server_cache) + .await?; if response.indicates_finished() { break; } - request = push::request(root, Some(response), config, client_store).await?; + request = + push::request(root, Some(response), config, client_store, client_cache) + .await?; } Ok::<(), anyhow::Error>(()) @@ -60,15 +68,22 @@ pub fn pull(c: &mut Criterion) { (store, root) }, |(ref server_store, root)| { + let server_cache = &InMemoryCache::new(10_000); let client_store = &MemoryBlockStore::new(); + let client_cache = &InMemoryCache::new(10_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory async_std::task::block_on(async move { - let mut request = pull::request(root, None, config, client_store).await?; + let mut request = + pull::request(root, None, config, client_store, client_cache).await?; loop { - let response = pull::response(root, request, config, server_store).await?; - request = pull::request(root, Some(response), config, client_store).await?; + let response = + pull::response(root, request, config, server_store, server_cache) + .await?; + request = + pull::request(root, Some(response), config, client_store, client_cache) + .await?; if request.indicates_finished() { break; diff --git a/car-mirror/Cargo.toml b/car-mirror/Cargo.toml index 89b7e99..7b8ed45 100644 --- a/car-mirror/Cargo.toml +++ b/car-mirror/Cargo.toml @@ -33,10 +33,12 @@ iroh-car = "0.3.0" libipld = "0.16.0" libipld-core = "0.16.0" proptest = { version = "1.1", optional = true } +quick_cache = { version = "0.4.0", optional = true } roaring-graphs = { version = "0.12", optional = true } serde = "1.0.183" serde_ipld_dagcbor = "0.4.0" thiserror = "1.0.47" +tokio = { version = "^1", default-features = false } tracing = "0.1" tracing-subscriber = "0.3" wnfs-common = "0.1.23" @@ -51,6 +53,7 @@ test-strategy = "0.3" [features] default = [] test_utils = ["proptest", "roaring-graphs"] +quick_cache = ["dep:quick_cache"] [package.metadata.docs.rs] all-features = true diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index aeac4d2..a5b1cb2 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -16,6 +16,7 @@ use crate::{ error::Error, incremental_verification::{BlockState, IncrementalDagVerification}, messages::{Bloom, PullRequest, PushResponse}, + traits::Cache, }; //-------------------------------------------------------------------------------------------------- @@ -67,12 +68,13 @@ pub struct CarFile { /// /// It returns a `CarFile` of (a subset) of all blocks below `root`, that /// are thought to be missing on the receiving end. -#[instrument(skip(config, store))] +#[instrument(skip(config, store, cache))] pub async fn block_send( root: Cid, last_state: Option, config: &Config, store: &impl BlockStore, + cache: &impl Cache, ) -> Result { let ReceiverState { ref missing_subgraph_roots, @@ -83,39 +85,10 @@ pub async fn block_send( }); // Verify that all missing subgraph roots are in the relevant DAG: - let subgraph_roots: Vec = DagWalk::breadth_first([root]) - .stream(store) - .try_filter_map(|(cid, _)| async move { - Ok(missing_subgraph_roots.contains(&cid).then_some(cid)) - }) - .try_collect() - .await?; + let subgraph_roots = + verify_missing_subgraph_roots(root, missing_subgraph_roots, store, cache).await?; - if subgraph_roots.len() != missing_subgraph_roots.len() { - let unrelated_roots = missing_subgraph_roots - .iter() - .filter(|cid| !subgraph_roots.contains(cid)) - .map(|cid| cid.to_string()) - .collect::>() - .join(", "); - - warn!( - unrelated_roots = %unrelated_roots, - "got asked for DAG-unrelated blocks" - ); - } - - if let Some(bloom) = &have_cids_bloom { - debug!( - size_bits = bloom.as_bytes().len() * 8, - hash_count = bloom.hash_count(), - ones_count = bloom.count_ones(), - estimated_fpr = bloom.current_false_positive_rate(), - "received 'have cids' bloom", - ); - } - - let bloom = have_cids_bloom.unwrap_or_else(|| BloomFilter::new_with(1, Box::new([0]))); // An empty bloom that contains nothing + let bloom = handle_missing_bloom(have_cids_bloom); let mut writer = CarWriter::new( CarHeader::new_v1( @@ -136,38 +109,15 @@ pub async fn block_send( .await .map_err(|e| Error::CarFileError(anyhow!(e)))?; - let mut block_bytes = 0; - let mut dag_walk = DagWalk::breadth_first(subgraph_roots.clone()); - while let Some((cid, block)) = dag_walk.next(store).await? { - if bloom.contains(&cid.to_bytes()) && !subgraph_roots.contains(&cid) { - debug!( - cid = %cid, - bloom_contains = bloom.contains(&cid.to_bytes()), - subgraph_roots_contains = subgraph_roots.contains(&cid), - "skipped writing block" - ); - continue; - } - - debug!( - cid = %cid, - num_bytes = block.len(), - frontier_size = dag_walk.frontier.len(), - "writing block to CAR", - ); - - writer - .write(cid, &block) - .await - .map_err(|e| Error::CarFileError(anyhow!(e)))?; - - // TODO(matheus23): Count the actual bytes sent? - // At the moment, this is a rough estimate. iroh-car could be improved to return the written bytes. - block_bytes += block.len(); - if block_bytes > config.send_minimum { - break; - } - } + write_blocks_into_car( + &mut writer, + subgraph_roots, + &bloom, + config.send_minimum, + store, + cache, + ) + .await?; Ok(CarFile { bytes: writer @@ -186,58 +136,29 @@ pub async fn block_send( /// It takes a `CarFile`, verifies that its contents are related to the /// `root` and returns some information to help the block sending side /// figure out what blocks to send next. -#[instrument(skip(last_car, config, store), fields(car_bytes = last_car.as_ref().map(|car| car.bytes.len())))] +#[instrument(skip(last_car, config, store, cache), fields(car_bytes = last_car.as_ref().map(|car| car.bytes.len())))] pub async fn block_receive( root: Cid, last_car: Option, config: &Config, store: &impl BlockStore, + cache: &impl Cache, ) -> Result { - let mut dag_verification = IncrementalDagVerification::new([root], store).await?; + let mut dag_verification = IncrementalDagVerification::new([root], store, cache).await?; if let Some(car) = last_car { let mut reader = CarReader::new(Cursor::new(car.bytes)) .await .map_err(|e| Error::CarFileError(anyhow!(e)))?; - let mut block_bytes = 0; - - while let Some((cid, vec)) = reader - .next_block() - .await - .map_err(|e| Error::CarFileError(anyhow!(e)))? - { - let block = Bytes::from(vec); - - debug!( - cid = %cid, - num_bytes = block.len(), - "reading block from CAR", - ); - - block_bytes += block.len(); - if block_bytes > config.receive_maximum { - return Err(Error::TooManyBytes { - block_bytes, - receive_maximum: config.receive_maximum, - }); - } - match dag_verification.block_state(cid) { - BlockState::Have => continue, - BlockState::Unexpected => { - trace!( - cid = %cid, - "received block out of order (possibly due to bloom false positive)" - ); - break; - } - BlockState::Want => { - dag_verification - .verify_and_store_block((cid, block), store) - .await?; - } - } - } + read_and_verify_blocks( + &mut dag_verification, + &mut reader, + config.receive_maximum, + store, + cache, + ) + .await?; } let missing_subgraph_roots = dag_verification @@ -296,17 +217,164 @@ pub fn references>( cid: Cid, block: impl AsRef<[u8]>, mut refs: E, -) -> Result { +) -> Result { let codec: IpldCodec = cid .codec() .try_into() .map_err(|_| Error::UnsupportedCodec { cid })?; - >::references(codec, &mut Cursor::new(block), &mut refs) - .map_err(Error::ParsingError)?; + >::references(codec, &mut Cursor::new(block), &mut refs)?; Ok(refs) } +//-------------------------------------------------------------------------------------------------- +// Private Functions +//-------------------------------------------------------------------------------------------------- + +async fn verify_missing_subgraph_roots( + root: Cid, + missing_subgraph_roots: &Vec, + store: &impl BlockStore, + cache: &impl Cache, +) -> Result, Error> { + let subgraph_roots: Vec = DagWalk::breadth_first([root]) + .stream(store, cache) + .try_filter_map( + |cid| async move { Ok(missing_subgraph_roots.contains(&cid).then_some(cid)) }, + ) + .try_collect() + .await?; + + if subgraph_roots.len() != missing_subgraph_roots.len() { + let unrelated_roots = missing_subgraph_roots + .iter() + .filter(|cid| !subgraph_roots.contains(cid)) + .map(|cid| cid.to_string()) + .collect::>() + .join(", "); + + warn!( + unrelated_roots = %unrelated_roots, + "got asked for DAG-unrelated blocks" + ); + } + + Ok(subgraph_roots) +} + +fn handle_missing_bloom(have_cids_bloom: Option) -> BloomFilter { + if let Some(bloom) = &have_cids_bloom { + debug!( + size_bits = bloom.as_bytes().len() * 8, + hash_count = bloom.hash_count(), + ones_count = bloom.count_ones(), + estimated_fpr = bloom.current_false_positive_rate(), + "received 'have cids' bloom", + ); + } + + have_cids_bloom.unwrap_or_else(|| BloomFilter::new_with(1, Box::new([0]))) // An empty bloom that contains nothing +} + +async fn write_blocks_into_car( + writer: &mut CarWriter, + subgraph_roots: Vec, + bloom: &BloomFilter, + send_minimum: usize, + store: &impl BlockStore, + cache: &impl Cache, +) -> Result<(), Error> { + let mut block_bytes = 0; + let mut dag_walk = DagWalk::breadth_first(subgraph_roots.clone()); + + while let Some(cid) = dag_walk.next(store, cache).await? { + let block = store + .get_block(&cid) + .await + .map_err(Error::BlockStoreError)?; + + if bloom.contains(&cid.to_bytes()) && !subgraph_roots.contains(&cid) { + debug!( + cid = %cid, + bloom_contains = bloom.contains(&cid.to_bytes()), + subgraph_roots_contains = subgraph_roots.contains(&cid), + "skipped writing block" + ); + continue; + } + + debug!( + cid = %cid, + num_bytes = block.len(), + frontier_size = dag_walk.frontier.len(), + "writing block to CAR", + ); + + writer + .write(cid, &block) + .await + .map_err(|e| Error::CarFileError(anyhow!(e)))?; + + // TODO(matheus23): Count the actual bytes sent? + // At the moment, this is a rough estimate. iroh-car could be improved to return the written bytes. + block_bytes += block.len(); + if block_bytes > send_minimum { + break; + } + } + + Ok(()) +} + +async fn read_and_verify_blocks( + dag_verification: &mut IncrementalDagVerification, + reader: &mut CarReader, + receive_maximum: usize, + store: &impl BlockStore, + cache: &impl Cache, +) -> Result<(), Error> { + let mut block_bytes = 0; + while let Some((cid, vec)) = reader + .next_block() + .await + .map_err(|e| Error::CarFileError(anyhow!(e)))? + { + let block = Bytes::from(vec); + + debug!( + cid = %cid, + num_bytes = block.len(), + "reading block from CAR", + ); + + block_bytes += block.len(); + if block_bytes > receive_maximum { + return Err(Error::TooManyBytes { + block_bytes, + receive_maximum, + }); + } + + match dag_verification.block_state(cid) { + BlockState::Have => continue, + BlockState::Unexpected => { + trace!( + cid = %cid, + "received block out of order (possibly due to bloom false positive)" + ); + break; + } + BlockState::Want => { + dag_verification + .verify_and_store_block((cid, block), store, cache) + .await?; + } + } + } + + Ok(()) +} + //-------------------------------------------------------------------------------------------------- // Implementations //-------------------------------------------------------------------------------------------------- diff --git a/car-mirror/src/dag_walk.rs b/car-mirror/src/dag_walk.rs index 1ae84a7..3bc607f 100644 --- a/car-mirror/src/dag_walk.rs +++ b/car-mirror/src/dag_walk.rs @@ -1,4 +1,4 @@ -use crate::{common::references, error::Error}; +use crate::{common::references, error::Error, traits::Cache}; use bytes::Bytes; use futures::{stream::try_unfold, Stream}; use libipld_core::cid::Cid; @@ -53,7 +53,11 @@ impl DagWalk { /// Return the next node in the traversal. /// /// Returns `None` if no nodes are left to be visited. - pub async fn next(&mut self, store: &impl BlockStore) -> Result, Error> { + pub async fn next( + &mut self, + store: &impl BlockStore, + cache: &impl Cache, + ) -> Result, Error> { let cid = loop { let popped = if self.breadth_first { self.frontier.pop_back() @@ -71,29 +75,28 @@ impl DagWalk { } }; - // TODO: Two opportunities for performance improvement: - // - skip Raw CIDs. They can't have further links (but needs adjustment to this function's return type) - // - run multiple `get_block` calls concurrently - let block = store - .get_block(&cid) + let refs = cache + .references(cid, store) .await .map_err(Error::BlockStoreError)?; - for ref_cid in references(cid, &block, Vec::new())? { + + for ref_cid in refs { if !self.visited.contains(&ref_cid) { self.frontier.push_front(ref_cid); } } - Ok(Some((cid, block))) + Ok(Some(cid)) } /// Turn this traversal into a stream - pub fn stream( + pub fn stream<'a>( self, - store: &impl BlockStore, - ) -> impl Stream> + Unpin + '_ { + store: &'a impl BlockStore, + cache: &'a impl Cache, + ) -> impl Stream> + Unpin + 'a { Box::pin(try_unfold(self, move |mut this| async move { - let maybe_block = this.next(store).await?; + let maybe_block = this.next(store, cache).await?; Ok(maybe_block.map(|b| (b, this))) })) } @@ -114,7 +117,7 @@ impl DagWalk { /// Skip a node from the traversal for now. pub fn skip_walking(&mut self, block: (Cid, Bytes)) -> Result<(), Error> { let (cid, bytes) = block; - let refs = references(cid, bytes, HashSet::new())?; + let refs = references(cid, bytes, HashSet::new()).map_err(Error::ParsingError)?; self.visited.insert(cid); self.frontier .retain(|frontier_cid| !refs.contains(frontier_cid)); @@ -126,6 +129,7 @@ impl DagWalk { #[cfg(test)] mod tests { use super::*; + use crate::traits::NoCache; use anyhow::Result; use futures::TryStreamExt; use libipld::Ipld; @@ -156,11 +160,10 @@ mod tests { .await?; let cids = DagWalk::breadth_first([cid_root]) - .stream(store) + .stream(store, &NoCache) .try_collect::>() .await? .into_iter() - .map(|(cid, _block)| cid) .collect::>(); assert_eq!(cids, vec![cid_root, cid_1_wrap, cid_2, cid_3, cid_1]); @@ -172,7 +175,7 @@ mod tests { #[cfg(test)] mod proptests { use super::*; - use crate::test_utils::arb_ipld_dag; + use crate::{test_utils::arb_ipld_dag, traits::NoCache}; use futures::TryStreamExt; use libipld::{ multihash::{Code, MultihashDigest}, @@ -199,6 +202,7 @@ mod proptests { async_std::task::block_on(async { let (dag, root) = dag; let store = &MemoryBlockStore::new(); + for (cid, ipld) in dag.iter() { let block: Bytes = encode(ipld).unwrap().into(); let cid_store = store @@ -209,8 +213,7 @@ mod proptests { } let mut cids = DagWalk::breadth_first([root]) - .stream(store) - .map_ok(|(cid, _)| cid) + .stream(store, &NoCache) .try_collect::>() .await .unwrap(); diff --git a/car-mirror/src/incremental_verification.rs b/car-mirror/src/incremental_verification.rs index 8c46009..c8fb908 100644 --- a/car-mirror/src/incremental_verification.rs +++ b/car-mirror/src/incremental_verification.rs @@ -3,6 +3,7 @@ use crate::{ dag_walk::DagWalk, error::{Error, IncrementalVerificationError}, + traits::Cache, }; use bytes::Bytes; use libipld_core::{ @@ -41,23 +42,28 @@ impl IncrementalDagVerification { pub async fn new( roots: impl IntoIterator, store: &impl BlockStore, + cache: &impl Cache, ) -> Result { let mut this = Self { want_cids: roots.into_iter().collect(), have_cids: HashSet::new(), }; - this.update_have_cids(store).await?; + this.update_have_cids(store, cache).await?; Ok(this) } #[instrument(level = "trace", skip_all, fields(num_want = self.want_cids.len(), num_have = self.have_cids.len()))] - async fn update_have_cids(&mut self, store: &impl BlockStore) -> Result<(), Error> { + async fn update_have_cids( + &mut self, + store: &impl BlockStore, + cache: &impl Cache, + ) -> Result<(), Error> { let mut dag_walk = DagWalk::breadth_first(self.want_cids.iter().cloned()); loop { - match dag_walk.next(store).await { + match dag_walk.next(store, cache).await { Err(Error::BlockStoreError(e)) => { if let Some(BlockStoreError::CIDNotFound(not_found)) = e.downcast_ref::() @@ -68,7 +74,7 @@ impl IncrementalDagVerification { } } Err(e) => return Err(e), - Ok(Some((cid, _))) => { + Ok(Some(cid)) => { self.want_cids.remove(&cid); self.have_cids.insert(cid); } @@ -111,6 +117,7 @@ impl IncrementalDagVerification { &mut self, block: (Cid, Bytes), store: &impl BlockStore, + cache: &impl Cache, ) -> Result<(), Error> { let (cid, bytes) = block; @@ -154,7 +161,7 @@ impl IncrementalDagVerification { }); } - self.update_have_cids(store).await?; + self.update_have_cids(store, cache).await?; Ok(()) } diff --git a/car-mirror/src/lib.rs b/car-mirror/src/lib.rs index 5b256e3..48ddeb0 100644 --- a/car-mirror/src/lib.rs +++ b/car-mirror/src/lib.rs @@ -23,3 +23,5 @@ pub mod messages; pub mod pull; /// The CAR mirror push protocol. Meant to be used qualified, i.e. `push::request` and `push::response` pub mod push; +/// Traits defined in this crate +pub mod traits; diff --git a/car-mirror/src/pull.rs b/car-mirror/src/pull.rs index 4b018fa..9bf7abf 100644 --- a/car-mirror/src/pull.rs +++ b/car-mirror/src/pull.rs @@ -2,6 +2,7 @@ use crate::{ common::{block_receive, block_send, CarFile, Config, ReceiverState}, error::Error, messages::PullRequest, + traits::Cache, }; use libipld::Cid; use wnfs_common::BlockStore; @@ -22,8 +23,9 @@ pub async fn request( last_response: Option, config: &Config, store: &impl BlockStore, + cache: &impl Cache, ) -> Result { - Ok(block_receive(root, last_response, config, store) + Ok(block_receive(root, last_response, config, store, cache) .await? .into()) } @@ -34,9 +36,10 @@ pub async fn response( request: PullRequest, config: &Config, store: &impl BlockStore, + cache: &impl Cache, ) -> Result { let receiver_state = Some(ReceiverState::from(request)); - block_send(root, receiver_state, config, store).await + block_send(root, receiver_state, config, store, cache).await } #[cfg(test)] @@ -45,6 +48,7 @@ mod tests { common::Config, dag_walk::DagWalk, test_utils::{setup_random_dag, Metrics}, + traits::NoCache, }; use anyhow::Result; use futures::TryStreamExt; @@ -59,10 +63,11 @@ mod tests { server_store: &MemoryBlockStore, ) -> Result> { let mut metrics = Vec::new(); - let mut request = crate::pull::request(root, None, config, client_store).await?; + let mut request = crate::pull::request(root, None, config, client_store, &NoCache).await?; loop { let request_bytes = serde_ipld_dagcbor::to_vec(&request)?.len(); - let response = crate::pull::response(root, request, config, server_store).await?; + let response = + crate::pull::response(root, request, config, server_store, &NoCache).await?; let response_bytes = response.bytes.len(); metrics.push(Metrics { @@ -70,7 +75,8 @@ mod tests { response_bytes, }); - request = crate::pull::request(root, Some(response), config, client_store).await?; + request = + crate::pull::request(root, Some(response), config, client_store, &NoCache).await?; if request.indicates_finished() { break; } @@ -88,13 +94,11 @@ mod tests { // client should have all data let client_cids = DagWalk::breadth_first([root]) - .stream(client_store) - .map_ok(|(cid, _)| cid) + .stream(client_store, &NoCache) .try_collect::>() .await?; let server_cids = DagWalk::breadth_first([root]) - .stream(server_store) - .map_ok(|(cid, _)| cid) + .stream(server_store, &NoCache) .try_collect::>() .await?; @@ -110,6 +114,7 @@ mod proptests { common::Config, dag_walk::DagWalk, test_utils::{setup_blockstore, variable_blocksize_dag}, + traits::NoCache, }; use futures::TryStreamExt; use libipld::{Cid, Ipld}; @@ -135,14 +140,12 @@ mod proptests { // client should have all data let client_cids = DagWalk::breadth_first([root]) - .stream(client_store) - .map_ok(|(cid, _)| cid) + .stream(client_store, &NoCache) .try_collect::>() .await .unwrap(); let server_cids = DagWalk::breadth_first([root]) - .stream(server_store) - .map_ok(|(cid, _)| cid) + .stream(server_store, &NoCache) .try_collect::>() .await .unwrap(); diff --git a/car-mirror/src/push.rs b/car-mirror/src/push.rs index aa5618b..bb8d57f 100644 --- a/car-mirror/src/push.rs +++ b/car-mirror/src/push.rs @@ -2,6 +2,7 @@ use crate::{ common::{block_receive, block_send, CarFile, Config, ReceiverState}, error::Error, messages::PushResponse, + traits::Cache, }; use libipld_core::cid::Cid; use wnfs_common::BlockStore; @@ -21,9 +22,10 @@ pub async fn request( last_response: Option, config: &Config, store: &impl BlockStore, + cache: &impl Cache, ) -> Result { let receiver_state = last_response.map(ReceiverState::from); - block_send(root, receiver_state, config, store).await + block_send(root, receiver_state, config, store, cache).await } /// Create a response for a CAR mirror push request. @@ -39,8 +41,9 @@ pub async fn response( request: CarFile, config: &Config, store: &impl BlockStore, + cache: &impl Cache, ) -> Result { - Ok(block_receive(root, Some(request), config, store) + Ok(block_receive(root, Some(request), config, store, cache) .await? .into()) } @@ -54,6 +57,7 @@ mod tests { get_cid_at_approx_path, setup_random_dag, total_dag_blocks, total_dag_bytes, Metrics, Rvg, }, + traits::NoCache, }; use anyhow::Result; use futures::TryStreamExt; @@ -69,10 +73,11 @@ mod tests { server_store: &MemoryBlockStore, ) -> Result> { let mut metrics = Vec::new(); - let mut request = crate::push::request(root, None, config, client_store).await?; + let mut request = crate::push::request(root, None, config, client_store, &NoCache).await?; loop { let request_bytes = request.bytes.len(); - let response = crate::push::response(root, request, config, server_store).await?; + let response = + crate::push::response(root, request, config, server_store, &NoCache).await?; let response_bytes = serde_ipld_dagcbor::to_vec(&response)?.len(); metrics.push(Metrics { @@ -83,7 +88,8 @@ mod tests { if response.indicates_finished() { break; } - request = crate::push::request(root, Some(response), config, client_store).await?; + request = + crate::push::request(root, Some(response), config, client_store, &NoCache).await?; } Ok(metrics) @@ -97,13 +103,11 @@ mod tests { // receiver should have all data let client_cids = DagWalk::breadth_first([root]) - .stream(client_store) - .map_ok(|(cid, _)| cid) + .stream(client_store, &NoCache) .try_collect::>() .await?; let server_cids = DagWalk::breadth_first([root]) - .stream(server_store) - .map_ok(|(cid, _)| cid) + .stream(server_store, &NoCache) .try_collect::>() .await?; @@ -184,6 +188,7 @@ mod proptests { common::Config, dag_walk::DagWalk, test_utils::{setup_blockstore, variable_blocksize_dag}, + traits::NoCache, }; use futures::TryStreamExt; use libipld::{Cid, Ipld}; @@ -209,14 +214,12 @@ mod proptests { // client should have all data let client_cids = DagWalk::breadth_first([root]) - .stream(client_store) - .map_ok(|(cid, _)| cid) + .stream(client_store, &NoCache) .try_collect::>() .await .unwrap(); let server_cids = DagWalk::breadth_first([root]) - .stream(server_store) - .map_ok(|(cid, _)| cid) + .stream(server_store, &NoCache) .try_collect::>() .await .unwrap(); diff --git a/car-mirror/src/test_utils/local_utils.rs b/car-mirror/src/test_utils/local_utils.rs index 51f021d..a6708df 100644 --- a/car-mirror/src/test_utils/local_utils.rs +++ b/car-mirror/src/test_utils/local_utils.rs @@ -1,6 +1,6 @@ ///! Crate-local test utilities use super::{arb_ipld_dag, links_to_padded_ipld, setup_blockstore, Rvg}; -use crate::{common::references, dag_walk::DagWalk}; +use crate::{common::references, dag_walk::DagWalk, error::Error, traits::NoCache}; use anyhow::Result; use futures::TryStreamExt; use libipld::{Cid, Ipld}; @@ -70,8 +70,14 @@ pub(crate) async fn setup_random_dag( pub(crate) async fn total_dag_bytes(root: Cid, store: &impl BlockStore) -> Result { Ok(DagWalk::breadth_first([root]) - .stream(store) - .map_ok(|(_, block)| block.len()) + .stream(store, &NoCache) + .try_filter_map(|cid| async move { + let block = store + .get_block(&cid) + .await + .map_err(Error::BlockStoreError)?; + Ok(Some(block.len())) + }) .try_collect::>() .await? .into_iter() @@ -80,8 +86,7 @@ pub(crate) async fn total_dag_bytes(root: Cid, store: &impl BlockStore) -> Resul pub(crate) async fn total_dag_blocks(root: Cid, store: &impl BlockStore) -> Result { Ok(DagWalk::breadth_first([root]) - .stream(store) - .map_ok(|(_, block)| block.len()) + .stream(store, &NoCache) .try_collect::>() .await? .len()) diff --git a/car-mirror/src/traits.rs b/car-mirror/src/traits.rs new file mode 100644 index 0000000..c38079b --- /dev/null +++ b/car-mirror/src/traits.rs @@ -0,0 +1,113 @@ +use crate::common::references; +use anyhow::Result; +use async_trait::async_trait; +use libipld::{Cid, IpldCodec}; +use wnfs_common::BlockStore; + +/// This trait abstracts caches used by the car mirror implementation. +/// An efficient cache implementation can significantly reduce the amount +/// of lookups into the blockstore. +/// +/// At the moment, all caches are conceptually memoization tables, so you don't +/// necessarily need to think about being careful about cache eviction. +/// +/// See `InMemoryCache` for a `quick_cache`-based implementation +/// (enable the `quick-cache` feature), and `NoCache` for disabling the cache. +#[async_trait(?Send)] +pub trait Cache { + /// This returns further references from the block referenced by given CID, + /// if the cache is hit. + /// Returns `None` if it's a cache miss. + /// + /// This isn't meant to be called directly, instead use `Cache::references`. + async fn get_references_cached(&self, cid: Cid) -> Result>>; + + /// Populates the references cache for given CID with given references. + async fn put_references_cached(&self, cid: Cid, references: Vec) -> Result<()>; + + /// Find out any CIDs that are linked to from the block with given CID. + /// + /// This makes use of the cache via `get_references_cached`, if possible. + /// If the cache is missed, then it will fetch the block, compute the references + /// and automatically populate the cache using `put_references_cached`. + async fn references(&self, cid: Cid, store: &impl BlockStore) -> Result> { + // raw blocks don't have further links + let raw_codec: u64 = IpldCodec::Raw.into(); + if cid.codec() == raw_codec { + return Ok(Vec::new()); + } + + if let Some(refs) = self.get_references_cached(cid).await? { + return Ok(refs); + } + + let block = store.get_block(&cid).await?; + let refs = references(cid, block, Vec::new())?; + self.put_references_cached(cid, refs.clone()).await?; + Ok(refs) + } +} + +/// A [quick-cache]-based implementation of a car mirror cache. +/// +/// [quick-cache]: https://github.com/arthurprs/quick-cache/ +#[cfg(feature = "quick_cache")] +#[derive(Debug)] +pub struct InMemoryCache { + references: quick_cache::sync::Cache>, +} + +#[cfg(feature = "quick_cache")] +impl InMemoryCache { + /// Create a new in-memory cache that approximately holds + /// cached references for `approx_capacity` CIDs. + /// + /// Computing the expected memory requirements isn't easy. + /// A block in theory have up to thousands of references. + /// [UnixFS] chunked files will reference up to 174 chunks + /// at a time. + /// Each CID takes up 96 bytes of memory. + /// In the UnixFS worst case of 175 CIDs per cache entry, + /// you need to reserve up to `175 * 96B = 16.8KB` of space + /// per entry. Thus, if you want your cache to not outgrow + /// ~100MB (ignoring the internal cache structure space + /// requirements), you can store up to `100MB / 16.8KB = 5952` + /// entries. + /// + /// In practice, the fanout average will be much lower than 174. + /// + /// [UnixFS]: https://github.com/ipfs/specs/blob/main/UNIXFS.md#layout + pub fn new(approx_capacity: usize) -> Self { + Self { + references: quick_cache::sync::Cache::new(approx_capacity), + } + } +} + +#[cfg(feature = "quick_cache")] +#[async_trait(?Send)] +impl Cache for InMemoryCache { + async fn get_references_cached(&self, cid: Cid) -> Result>> { + Ok(self.references.get(&cid)) + } + + async fn put_references_cached(&self, cid: Cid, references: Vec) -> Result<()> { + self.references.insert(cid, references); + Ok(()) + } +} + +/// An implementation of `Cache` that doesn't cache at all. +#[derive(Debug)] +pub struct NoCache; + +#[async_trait(?Send)] +impl Cache for NoCache { + async fn get_references_cached(&self, _: Cid) -> Result>> { + Ok(None) + } + + async fn put_references_cached(&self, _: Cid, _: Vec) -> Result<()> { + Ok(()) + } +}