diff --git a/car-mirror-benches/benches/artificially_slow_blockstore.rs b/car-mirror-benches/benches/artificially_slow_blockstore.rs index e6f1859..905bdd1 100644 --- a/car-mirror-benches/benches/artificially_slow_blockstore.rs +++ b/car-mirror-benches/benches/artificially_slow_blockstore.rs @@ -28,9 +28,9 @@ pub fn push_throttled(c: &mut Criterion) { }, |(client_store, root)| { let client_store = &ThrottledBlockStore(client_store); - let client_cache = &InMemoryCache::new(10_000); + let client_cache = &InMemoryCache::new(10_000, 150_000); let server_store = &ThrottledBlockStore::new(); - let server_cache = &InMemoryCache::new(10_000); + let server_cache = &InMemoryCache::new(10_000, 150_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory @@ -75,9 +75,9 @@ pub fn pull_throttled(c: &mut Criterion) { }, |(server_store, root)| { let server_store = &ThrottledBlockStore(server_store); - let server_cache = &InMemoryCache::new(10_000); + let server_cache = &InMemoryCache::new(10_000, 150_000); let client_store = &ThrottledBlockStore::new(); - let client_cache = &InMemoryCache::new(10_000); + let client_cache = &InMemoryCache::new(10_000, 150_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory diff --git a/car-mirror-benches/benches/in_memory.rs b/car-mirror-benches/benches/in_memory.rs index f77ac4b..8d03744 100644 --- a/car-mirror-benches/benches/in_memory.rs +++ b/car-mirror-benches/benches/in_memory.rs @@ -22,9 +22,9 @@ pub fn push(c: &mut Criterion) { (store, root) }, |(ref client_store, root)| { - let client_cache = &InMemoryCache::new(10_000); + let client_cache = &InMemoryCache::new(10_000, 150_000); let server_store = &MemoryBlockStore::new(); - let server_cache = &InMemoryCache::new(10_000); + let server_cache = &InMemoryCache::new(10_000, 150_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory @@ -68,9 +68,9 @@ pub fn pull(c: &mut Criterion) { (store, root) }, |(ref server_store, root)| { - let server_cache = &InMemoryCache::new(10_000); + let server_cache = &InMemoryCache::new(10_000, 150_000); let client_store = &MemoryBlockStore::new(); - let client_cache = &InMemoryCache::new(10_000); + let client_cache = &InMemoryCache::new(10_000, 150_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory diff --git a/car-mirror-benches/benches/simulated_latency.rs b/car-mirror-benches/benches/simulated_latency.rs index 0c05689..155ef59 100644 --- a/car-mirror-benches/benches/simulated_latency.rs +++ b/car-mirror-benches/benches/simulated_latency.rs @@ -68,12 +68,12 @@ pub fn pull_with_simulated_latency( links_to_padded_ipld(block_padding), )); let store = async_std::task::block_on(setup_blockstore(blocks)).unwrap(); - let cache = InMemoryCache::new(10_000); + let cache = InMemoryCache::new(10_000, 150_000); (store, cache, root) }, |(ref server_store, ref server_cache, root)| { let client_store = &MemoryBlockStore::new(); - let client_cache = &InMemoryCache::new(10_000); + let client_cache = &InMemoryCache::new(10_000, 150_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory @@ -145,12 +145,12 @@ pub fn push_with_simulated_latency( links_to_padded_ipld(block_padding), )); let store = async_std::task::block_on(setup_blockstore(blocks)).unwrap(); - let cache = InMemoryCache::new(10_000); + let cache = InMemoryCache::new(10_000, 150_000); (store, cache, root) }, |(ref client_store, ref client_cache, root)| { let server_store = &MemoryBlockStore::new(); - let server_cache = &InMemoryCache::new(10_000); + let server_cache = &InMemoryCache::new(10_000, 150_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index 7f7ab3b..dc3dd67 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -61,7 +61,10 @@ impl std::fmt::Debug for ReceiverState { ) }); f.debug_struct("ReceiverState") - .field("missing_subgraph_roots", &self.missing_subgraph_roots) + .field( + "missing_subgraph_roots.len() == ", + &self.missing_subgraph_roots.len(), + ) .field("have_cids_bloom", &have_cids_bloom) .finish() } @@ -86,7 +89,7 @@ pub struct CarFile { /// /// It returns a `CarFile` of (a subset) of all blocks below `root`, that /// are thought to be missing on the receiving end. -#[instrument(skip(config, store, cache))] +#[instrument(skip_all, fields(root, last_state))] pub async fn block_send( root: Cid, last_state: Option, @@ -145,7 +148,7 @@ pub async fn block_send( /// It takes a `CarFile`, verifies that its contents are related to the /// `root` and returns some information to help the block sending side /// figure out what blocks to send next. -#[instrument(skip(last_car, config, store, cache), fields(car_bytes = last_car.as_ref().map(|car| car.bytes.len())))] +#[instrument(skip_all, fields(root, car_bytes = last_car.as_ref().map(|car| car.bytes.len())))] pub async fn block_receive( root: Cid, last_car: Option, diff --git a/car-mirror/src/traits.rs b/car-mirror/src/traits.rs index f032570..6e8eebb 100644 --- a/car-mirror/src/traits.rs +++ b/car-mirror/src/traits.rs @@ -2,14 +2,17 @@ use crate::common::references; use anyhow::Result; use async_trait::async_trait; use libipld::{Cid, IpldCodec}; -use wnfs_common::{utils::CondSync, BlockStore}; +use wnfs_common::{ + utils::{Arc, CondSync}, + BlockStore, BlockStoreError, +}; /// This trait abstracts caches used by the car mirror implementation. /// An efficient cache implementation can significantly reduce the amount /// of lookups into the blockstore. /// -/// At the moment, all caches are conceptually memoization tables, so you don't -/// necessarily need to think about being careful about cache eviction. +/// At the moment, all caches are either memoization tables or informationally +/// monotonous, so you don't need to be careful about cache eviction. /// /// See `InMemoryCache` for a `quick_cache`-based implementation /// (enable the `quick-cache` feature), and `NoCache` for disabling the cache. @@ -21,10 +24,23 @@ pub trait Cache: CondSync { /// Returns `None` if it's a cache miss. /// /// This isn't meant to be called directly, instead use `Cache::references`. - async fn get_references_cached(&self, cid: Cid) -> Result>>; + async fn get_references_cache(&self, cid: Cid) -> Result>>; /// Populates the references cache for given CID with given references. - async fn put_references_cached(&self, cid: Cid, references: Vec) -> Result<()>; + async fn put_references_cache(&self, cid: Cid, references: Vec) -> Result<()>; + + /// This returns whether the cache has the fact stored that a block with given + /// CID is stored. + /// + /// This only returns `true` in case the block has been stored. + /// `false` simply indicates that the cache doesn't know whether the block is + /// stored or not (it's always a cache miss). + /// + /// Don't call this directly, instead, use `Cache::has_block`. + async fn get_has_block_cache(&self, cid: &Cid) -> Result; + + /// This populates the cache with the fact that a block with given CID is stored. + async fn put_has_block_cache(&self, cid: Cid) -> Result<()>; /// Find out any CIDs that are linked to from the block with given CID. /// @@ -38,32 +54,63 @@ pub trait Cache: CondSync { return Ok(Vec::new()); } - if let Some(refs) = self.get_references_cached(cid).await? { + if let Some(refs) = self.get_references_cache(cid).await? { return Ok(refs); } let block = store.get_block(&cid).await?; let refs = references(cid, block, Vec::new())?; - self.put_references_cached(cid, refs.clone()).await?; + self.put_references_cache(cid, refs.clone()).await?; Ok(refs) } + + /// Find out whether a given block is stored in given blockstore or not. + /// + /// This cache is *only* effective on `true` values for `has_block`. + /// Repeatedly calling `has_block` with `Cid`s of blocks that are *not* + /// stored will cause repeated calls to given blockstore. + /// + /// **Make sure to always use the same `BlockStore` when calling this function.** + /// + /// This makes use of the caches `get_has_block_cached`, if possible. + /// On cache misses, this will actually fetch the block from the store + /// and if successful, populate the cache using `put_has_block_cached`. + async fn has_block(&self, cid: Cid, store: &impl BlockStore) -> Result { + if self.get_has_block_cache(&cid).await? { + return Ok(true); + } + + match store.get_block(&cid).await { + Ok(_) => { + self.put_has_block_cache(cid).await?; + Ok(true) + } + Err(e) if matches!(e.downcast_ref(), Some(BlockStoreError::CIDNotFound(_))) => { + Ok(false) + } + Err(e) => Err(e), + } + } } /// A [quick-cache]-based implementation of a car mirror cache. /// /// [quick-cache]: https://github.com/arthurprs/quick-cache/ #[cfg(feature = "quick_cache")] -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct InMemoryCache { - references: quick_cache::sync::Cache>, + references: Arc>>, + has_blocks: Arc>, } #[cfg(feature = "quick_cache")] impl InMemoryCache { /// Create a new in-memory cache that approximately holds - /// cached references for `approx_capacity` CIDs. + /// cached references for `approx_references_capacity` CIDs + /// and `approx_has_blocks_capacity` CIDs known to be stored locally. /// - /// Computing the expected memory requirements isn't easy. + /// Computing the expected memory requirements for the reference + /// cache isn't easy. /// A block in theory have up to thousands of references. /// [UnixFS] chunked files will reference up to 174 chunks /// at a time. @@ -77,10 +124,15 @@ impl InMemoryCache { /// /// In practice, the fanout average will be much lower than 174. /// + /// On the other hand, each cache entry for the `has_blocks` cache + /// will take a little more than 64 bytes, so for a 10MB + /// `has_blocks` cache, you would use `10MB / 64bytes = 156_250`. + /// /// [UnixFS]: https://github.com/ipfs/specs/blob/main/UNIXFS.md#layout - pub fn new(approx_capacity: usize) -> Self { + pub fn new(approx_references_capacity: usize, approx_has_blocks_capacity: usize) -> Self { Self { - references: quick_cache::sync::Cache::new(approx_capacity), + references: Arc::new(quick_cache::sync::Cache::new(approx_references_capacity)), + has_blocks: Arc::new(quick_cache::sync::Cache::new(approx_has_blocks_capacity)), } } } @@ -89,14 +141,23 @@ impl InMemoryCache { #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Cache for InMemoryCache { - async fn get_references_cached(&self, cid: Cid) -> Result>> { + async fn get_references_cache(&self, cid: Cid) -> Result>> { Ok(self.references.get(&cid)) } - async fn put_references_cached(&self, cid: Cid, references: Vec) -> Result<()> { + async fn put_references_cache(&self, cid: Cid, references: Vec) -> Result<()> { self.references.insert(cid, references); Ok(()) } + + async fn get_has_block_cache(&self, cid: &Cid) -> Result { + Ok(self.has_blocks.get(cid).is_some()) + } + + async fn put_has_block_cache(&self, cid: Cid) -> Result<()> { + self.has_blocks.insert(cid, ()); + Ok(()) + } } /// An implementation of `Cache` that doesn't cache at all. @@ -106,11 +167,19 @@ pub struct NoCache; #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Cache for NoCache { - async fn get_references_cached(&self, _: Cid) -> Result>> { + async fn get_references_cache(&self, _: Cid) -> Result>> { Ok(None) } - async fn put_references_cached(&self, _: Cid, _: Vec) -> Result<()> { + async fn put_references_cache(&self, _: Cid, _: Vec) -> Result<()> { + Ok(()) + } + + async fn get_has_block_cache(&self, _: &Cid) -> Result { + Ok(false) + } + + async fn put_has_block_cache(&self, _: Cid) -> Result<()> { Ok(()) } }