From 1b878d503d3a1cd80f3539bf5d03d9f710276b36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 15 Feb 2024 17:20:43 +0100 Subject: [PATCH] feat: Implement `CacheMissing` blockstore wrapper --- .../benches/artificially_slow_blockstore.rs | 20 +- car-mirror-benches/benches/in_memory.rs | 10 +- .../benches/simulated_latency.rs | 10 +- car-mirror/src/cache.rs | 388 ++++++++++++++++++ car-mirror/src/common.rs | 4 +- car-mirror/src/dag_walk.rs | 6 +- car-mirror/src/incremental_verification.rs | 2 +- car-mirror/src/lib.rs | 4 +- car-mirror/src/pull.rs | 6 +- car-mirror/src/push.rs | 6 +- car-mirror/src/test_utils/local_utils.rs | 2 +- car-mirror/src/traits.rs | 298 -------------- 12 files changed, 423 insertions(+), 333 deletions(-) create mode 100644 car-mirror/src/cache.rs delete mode 100644 car-mirror/src/traits.rs diff --git a/car-mirror-benches/benches/artificially_slow_blockstore.rs b/car-mirror-benches/benches/artificially_slow_blockstore.rs index e6ef642..7e29f01 100644 --- a/car-mirror-benches/benches/artificially_slow_blockstore.rs +++ b/car-mirror-benches/benches/artificially_slow_blockstore.rs @@ -1,10 +1,10 @@ use anyhow::Result; use bytes::Bytes; use car_mirror::{ + cache::{CacheMissing, InMemoryCache}, common::Config, pull, push, test_utils::{arb_ipld_dag, links_to_padded_ipld, setup_blockstore}, - traits::InMemoryCache, }; use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use libipld::Cid; @@ -26,10 +26,10 @@ pub fn push_throttled(c: &mut Criterion) { (store, root) }, |(client_store, root)| { - let client_store = &ThrottledBlockStore(client_store); - let client_cache = &InMemoryCache::new(10_000); - let server_store = &ThrottledBlockStore::new(); - let server_cache = &InMemoryCache::new(10_000); + let client_store = &CacheMissing::new(100_000, ThrottledBlockStore(client_store)); + let client_cache = &InMemoryCache::new(100_000); + let server_store = &CacheMissing::new(100_000, ThrottledBlockStore::new()); + let server_cache = &InMemoryCache::new(100_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory @@ -80,10 +80,10 @@ pub fn pull_throttled(c: &mut Criterion) { (store, root) }, |(server_store, root)| { - let server_store = &ThrottledBlockStore(server_store); - let server_cache = &InMemoryCache::new(10_000); - let client_store = &ThrottledBlockStore::new(); - let client_cache = &InMemoryCache::new(10_000); + let server_store = &CacheMissing::new(100_000, ThrottledBlockStore(server_store)); + let server_cache = &InMemoryCache::new(100_000); + let client_store = &CacheMissing::new(100_000, ThrottledBlockStore::new()); + let client_cache = &InMemoryCache::new(100_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory @@ -130,7 +130,7 @@ impl BlockStore for ThrottledBlockStore { } async fn has_block(&self, cid: &Cid) -> Result { - // The idea is that has_block would be faster than `get_block`, as it should be managed closer to CPU memory + async_std::task::sleep(Duration::from_micros(50)).await; // Block fetching is artifically slowed by 50 microseconds self.0.has_block(cid).await } } diff --git a/car-mirror-benches/benches/in_memory.rs b/car-mirror-benches/benches/in_memory.rs index f77ac4b..c7b7623 100644 --- a/car-mirror-benches/benches/in_memory.rs +++ b/car-mirror-benches/benches/in_memory.rs @@ -1,8 +1,8 @@ use car_mirror::{ + cache::InMemoryCache, common::Config, pull, push, test_utils::{arb_ipld_dag, links_to_padded_ipld, setup_blockstore}, - traits::InMemoryCache, }; use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use wnfs_common::MemoryBlockStore; @@ -22,9 +22,9 @@ pub fn push(c: &mut Criterion) { (store, root) }, |(ref client_store, root)| { - let client_cache = &InMemoryCache::new(10_000); + let client_cache = &InMemoryCache::new(100_000); let server_store = &MemoryBlockStore::new(); - let server_cache = &InMemoryCache::new(10_000); + let server_cache = &InMemoryCache::new(100_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory @@ -68,9 +68,9 @@ pub fn pull(c: &mut Criterion) { (store, root) }, |(ref server_store, root)| { - let server_cache = &InMemoryCache::new(10_000); + let server_cache = &InMemoryCache::new(100_000); let client_store = &MemoryBlockStore::new(); - let client_cache = &InMemoryCache::new(10_000); + let client_cache = &InMemoryCache::new(100_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory diff --git a/car-mirror-benches/benches/simulated_latency.rs b/car-mirror-benches/benches/simulated_latency.rs index 0c05689..7b83f0a 100644 --- a/car-mirror-benches/benches/simulated_latency.rs +++ b/car-mirror-benches/benches/simulated_latency.rs @@ -1,8 +1,8 @@ use car_mirror::{ + cache::InMemoryCache, common::Config, pull, push, test_utils::{arb_ipld_dag, links_to_padded_ipld, setup_blockstore}, - traits::InMemoryCache, }; use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use std::{ops::Range, time::Duration}; @@ -68,12 +68,12 @@ pub fn pull_with_simulated_latency( links_to_padded_ipld(block_padding), )); let store = async_std::task::block_on(setup_blockstore(blocks)).unwrap(); - let cache = InMemoryCache::new(10_000); + let cache = InMemoryCache::new(100_000); (store, cache, root) }, |(ref server_store, ref server_cache, root)| { let client_store = &MemoryBlockStore::new(); - let client_cache = &InMemoryCache::new(10_000); + let client_cache = &InMemoryCache::new(100_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory @@ -145,12 +145,12 @@ pub fn push_with_simulated_latency( links_to_padded_ipld(block_padding), )); let store = async_std::task::block_on(setup_blockstore(blocks)).unwrap(); - let cache = InMemoryCache::new(10_000); + let cache = InMemoryCache::new(100_000); (store, cache, root) }, |(ref client_store, ref client_cache, root)| { let server_store = &MemoryBlockStore::new(); - let server_cache = &InMemoryCache::new(10_000); + let server_cache = &InMemoryCache::new(100_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory diff --git a/car-mirror/src/cache.rs b/car-mirror/src/cache.rs new file mode 100644 index 0000000..21aeff4 --- /dev/null +++ b/car-mirror/src/cache.rs @@ -0,0 +1,388 @@ +use crate::common::references; +use anyhow::Result; +use futures::Future; +use libipld::{Cid, IpldCodec}; +use wnfs_common::{ + utils::{CondSend, CondSync}, + BlockStore, +}; + +/// This trait abstracts caches used by the car mirror implementation. +/// An efficient cache implementation can significantly reduce the amount +/// of lookups into the blockstore. +/// +/// At the moment, all caches are either memoization tables or informationally +/// monotonous, so you don't need to be careful about cache eviction. +/// +/// See `InMemoryCache` for a `quick_cache`-based implementation +/// (enable the `quick-cache` feature), and `NoCache` for disabling the cache. +pub trait Cache: CondSync { + /// This returns further references from the block referenced by given CID, + /// if the cache is hit. + /// Returns `None` if it's a cache miss. + /// + /// This isn't meant to be called directly, instead use `Cache::references`. + fn get_references_cache( + &self, + cid: Cid, + ) -> impl Future>>> + CondSend; + + /// Populates the references cache for given CID with given references. + fn put_references_cache( + &self, + cid: Cid, + references: Vec, + ) -> impl Future> + CondSend; + + /// Find out any CIDs that are linked to from the block with given CID. + /// + /// This makes use of the cache via `get_references_cached`, if possible. + /// If the cache is missed, then it will fetch the block, compute the references + /// and automatically populate the cache using `put_references_cached`. + fn references( + &self, + cid: Cid, + store: &impl BlockStore, + ) -> impl Future>> + CondSend { + async move { + // raw blocks don't have further links + let raw_codec: u64 = IpldCodec::Raw.into(); + if cid.codec() == raw_codec { + return Ok(Vec::new()); + } + + if let Some(refs) = self.get_references_cache(cid).await? { + return Ok(refs); + } + + let block = store.get_block(&cid).await?; + let refs = references(cid, block, Vec::new())?; + self.put_references_cache(cid, refs.clone()).await?; + Ok(refs) + } + } +} + +impl Cache for &C { + async fn get_references_cache(&self, cid: Cid) -> Result>> { + (**self).get_references_cache(cid).await + } + + async fn put_references_cache(&self, cid: Cid, references: Vec) -> Result<()> { + (**self).put_references_cache(cid, references).await + } +} + +impl Cache for Box { + async fn get_references_cache(&self, cid: Cid) -> Result>> { + (**self).get_references_cache(cid).await + } + + async fn put_references_cache(&self, cid: Cid, references: Vec) -> Result<()> { + (**self).put_references_cache(cid, references).await + } +} + +/// An implementation of `Cache` that doesn't cache at all. +#[derive(Debug)] +pub struct NoCache; + +impl Cache for NoCache { + async fn get_references_cache(&self, _: Cid) -> Result>> { + Ok(None) + } + + async fn put_references_cache(&self, _: Cid, _: Vec) -> Result<()> { + Ok(()) + } +} + +#[cfg(feature = "quick_cache")] +pub use quick_cache::*; + +#[cfg(feature = "quick_cache")] +mod quick_cache { + use super::Cache; + use anyhow::{anyhow, Result}; + use bytes::Bytes; + use libipld::Cid; + use quick_cache::{sync, OptionsBuilder, Weighter}; + use wnfs_common::{ + utils::{Arc, CondSend}, + BlockStore, BlockStoreError, + }; + + /// A [quick-cache]-based implementation of a car mirror cache. + /// + /// [quick-cache]: https://github.com/arthurprs/quick-cache/ + #[derive(Debug, Clone)] + pub struct InMemoryCache { + references: Arc, ReferencesWeighter>>, + } + + /// A wrapper struct for a `BlockStore` that attaches an in-memory cache + /// of which blocks are available and which aren't, speeding up + /// consecutive `has_block` and `get_block` calls. + #[derive(Debug, Clone)] + pub struct CacheMissing { + inner: B, + has_blocks: Arc>, + } + + impl InMemoryCache { + /// Create a new in-memory cache that approximately holds + /// cached references for `approx_cids` CIDs. + /// + /// Memory requirements can be eye-balled by calculating ~100 bytes + /// per CID in the cache. + /// + /// So if you want this cache to never exceed roughly ~100MB, set + /// `approx_cids` to `1_000_000`. + pub fn new(approx_cids: usize) -> Self { + let max_links_per_unixfs = 175; + let est_average_links = max_links_per_unixfs / 10; + Self { + references: Arc::new(sync::Cache::with_options( + OptionsBuilder::new() + .estimated_items_capacity(approx_cids / est_average_links) + .weight_capacity(approx_cids as u64) + .build() + .expect("Couldn't create options for quick cache?"), + ReferencesWeighter, + Default::default(), + Default::default(), + )), + } + } + } + + impl Cache for InMemoryCache { + async fn get_references_cache(&self, cid: Cid) -> Result>> { + Ok(self.references.get(&cid)) + } + + async fn put_references_cache(&self, cid: Cid, references: Vec) -> Result<()> { + self.references.insert(cid, references); + Ok(()) + } + } + + impl CacheMissing { + /// Wrap an existing `BlockStore`, caching `has_block` responses. + /// + /// This will also intercept `get_block` requests and fail early, if the + /// block is known to be missing. + /// In some circumstances this can be problematic, e.g. if blocks can be + /// added and removed to the underlying blockstore without going through + /// the wrapped instance's `put_block` or `put_block_keyed` interfaces. + /// + /// In these cases a more advanced caching strategy that may answer + /// `has_block` early from a cache with a cache TTL & eviction strategy. + /// + /// The additional memory requirements for this cache can be estimated + /// using the `approx_capacity`: Each cache line is roughly ~100 bytes + /// in size, so for a 100MB cache, set this value to `1_000_000`. + pub fn new(approx_capacity: usize, inner: B) -> Self { + Self { + inner, + has_blocks: Arc::new(sync::Cache::new(approx_capacity)), + } + } + } + + impl BlockStore for CacheMissing { + async fn get_block(&self, cid: &Cid) -> Result { + match self.has_blocks.get_value_or_guard_async(cid).await { + Ok(false) => Err(anyhow!(BlockStoreError::CIDNotFound(*cid))), + Ok(true) => self.inner.get_block(cid).await, + Err(guard) => match self.inner.get_block(cid).await { + Ok(block) => { + let _ignore_meantime_eviction = guard.insert(true); + Ok(block) + } + Err(e) => { + if let Some(BlockStoreError::CIDNotFound(_)) = e.downcast_ref() { + let _ignore_meantime_eviction = guard.insert(false); + Err(e) + } else { + Err(e) + } + } + }, + } + } + + async fn put_block_keyed( + &self, + cid: Cid, + bytes: impl Into + CondSend, + ) -> Result<()> { + self.inner.put_block_keyed(cid, bytes).await?; + self.has_blocks.insert(cid, true); + Ok(()) + } + + async fn has_block(&self, cid: &Cid) -> Result { + self.has_blocks + .get_or_insert_async(cid, self.inner.has_block(cid)) + .await + } + } + + #[derive(Debug, Clone)] + struct ReferencesWeighter; + + impl Weighter> for ReferencesWeighter { + fn weight(&self, _key: &Cid, val: &Vec) -> u32 { + 1 + val.len() as u32 + } + } + + #[cfg(test)] + mod tests { + use super::{Cache, InMemoryCache}; + use libipld::{cbor::DagCborCodec, Ipld, IpldCodec}; + use testresult::TestResult; + use wnfs_common::{encode, BlockStore, MemoryBlockStore}; + + #[test_log::test(async_std::test)] + async fn test_references_cache() -> TestResult { + let store = &MemoryBlockStore::new(); + let cache = InMemoryCache::new(100_000); + + let hello_one_cid = store + .put_block(b"Hello, One?".to_vec(), IpldCodec::Raw.into()) + .await?; + let hello_two_cid = store + .put_block(b"Hello, Two?".to_vec(), IpldCodec::Raw.into()) + .await?; + let cid = store + .put_block( + encode( + &Ipld::List(vec![Ipld::Link(hello_one_cid), Ipld::Link(hello_two_cid)]), + DagCborCodec, + )?, + DagCborCodec.into(), + ) + .await?; + + // Cache unpopulated initially + assert_eq!(cache.get_references_cache(cid).await?, None); + + // This should populate the references cache + assert_eq!( + cache.references(cid, store).await?, + vec![hello_one_cid, hello_two_cid] + ); + + // Cache should now contain the references + assert_eq!( + cache.get_references_cache(cid).await?, + Some(vec![hello_one_cid, hello_two_cid]) + ); + + Ok(()) + } + } +} + +#[cfg(test)] +mod tests { + use super::{Cache, NoCache}; + use anyhow::Result; + use libipld::{cbor::DagCborCodec, Cid, Ipld, IpldCodec}; + use std::{collections::HashMap, sync::RwLock}; + use testresult::TestResult; + use wnfs_common::{encode, BlockStore, MemoryBlockStore}; + + #[derive(Debug, Default)] + struct HashMapCache { + references: RwLock>>, + } + + impl Cache for HashMapCache { + async fn get_references_cache(&self, cid: Cid) -> Result>> { + Ok(self.references.read().unwrap().get(&cid).cloned()) + } + + async fn put_references_cache(&self, cid: Cid, references: Vec) -> Result<()> { + self.references.write().unwrap().insert(cid, references); + Ok(()) + } + } + + #[test_log::test(async_std::test)] + async fn test_references_cache() -> TestResult { + let store = &MemoryBlockStore::new(); + let cache = HashMapCache::default(); + + let hello_one_cid = store + .put_block(b"Hello, One?".to_vec(), IpldCodec::Raw.into()) + .await?; + let hello_two_cid = store + .put_block(b"Hello, Two?".to_vec(), IpldCodec::Raw.into()) + .await?; + let cid = store + .put_block( + encode( + &Ipld::List(vec![Ipld::Link(hello_one_cid), Ipld::Link(hello_two_cid)]), + DagCborCodec, + )?, + DagCborCodec.into(), + ) + .await?; + + // Cache unpopulated initially + assert_eq!(cache.get_references_cache(cid).await?, None); + + // This should populate the references cache + assert_eq!( + cache.references(cid, store).await?, + vec![hello_one_cid, hello_two_cid] + ); + + // Cache should now contain the references + assert_eq!( + cache.get_references_cache(cid).await?, + Some(vec![hello_one_cid, hello_two_cid]) + ); + + Ok(()) + } + + #[test_log::test(async_std::test)] + async fn test_no_cache_references() -> TestResult { + let store = &MemoryBlockStore::new(); + let cache = NoCache; + + let hello_one_cid = store + .put_block(b"Hello, One?".to_vec(), IpldCodec::Raw.into()) + .await?; + let hello_two_cid = store + .put_block(b"Hello, Two?".to_vec(), IpldCodec::Raw.into()) + .await?; + let cid = store + .put_block( + encode( + &Ipld::List(vec![Ipld::Link(hello_one_cid), Ipld::Link(hello_two_cid)]), + DagCborCodec, + )?, + DagCborCodec.into(), + ) + .await?; + + // Cache should start out unpopulated + assert_eq!(cache.get_references_cache(cid).await?, None); + + // We should get the correct answer for our queries + assert_eq!( + cache.references(cid, store).await?, + vec![hello_one_cid, hello_two_cid] + ); + + // We don't have a populated cache though + assert_eq!(cache.get_references_cache(cid).await?, None); + + Ok(()) + } +} diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index 6d9815b..7699d9e 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -12,11 +12,11 @@ use wnfs_common::{ }; use crate::{ + cache::Cache, dag_walk::DagWalk, error::Error, incremental_verification::{BlockState, IncrementalDagVerification}, messages::{Bloom, PullRequest, PushResponse}, - traits::Cache, }; //-------------------------------------------------------------------------------------------------- @@ -578,7 +578,7 @@ impl std::fmt::Debug for ReceiverState { #[cfg(test)] mod tests { use super::*; - use crate::{test_utils::assert_cond_send_sync, traits::NoCache}; + use crate::{cache::NoCache, test_utils::assert_cond_send_sync}; use testresult::TestResult; use wnfs_common::MemoryBlockStore; diff --git a/car-mirror/src/dag_walk.rs b/car-mirror/src/dag_walk.rs index 9995a82..fa7edcf 100644 --- a/car-mirror/src/dag_walk.rs +++ b/car-mirror/src/dag_walk.rs @@ -1,4 +1,4 @@ -use crate::{common::references, error::Error, traits::Cache}; +use crate::{cache::Cache, common::references, error::Error}; use bytes::Bytes; use futures::{stream::try_unfold, Stream}; use libipld_core::cid::Cid; @@ -201,7 +201,7 @@ impl DagWalk { #[cfg(test)] mod tests { use super::*; - use crate::traits::NoCache; + use crate::cache::NoCache; use futures::TryStreamExt; use libipld::{cbor::DagCborCodec, Ipld}; use testresult::TestResult; @@ -272,7 +272,7 @@ mod tests { #[cfg(test)] mod proptests { use super::*; - use crate::{test_utils::arb_ipld_dag, traits::NoCache}; + use crate::{cache::NoCache, test_utils::arb_ipld_dag}; use futures::TryStreamExt; use libipld::{ multihash::{Code, MultihashDigest}, diff --git a/car-mirror/src/incremental_verification.rs b/car-mirror/src/incremental_verification.rs index acdb2f3..95bbd9d 100644 --- a/car-mirror/src/incremental_verification.rs +++ b/car-mirror/src/incremental_verification.rs @@ -1,8 +1,8 @@ use crate::{ + cache::Cache, common::ReceiverState, dag_walk::{DagWalk, TraversedItem}, error::{Error, IncrementalVerificationError}, - traits::Cache, }; use bytes::Bytes; use deterministic_bloom::runtime_size::BloomFilter; diff --git a/car-mirror/src/lib.rs b/car-mirror/src/lib.rs index 6514d7b..d994424 100644 --- a/car-mirror/src/lib.rs +++ b/car-mirror/src/lib.rs @@ -9,6 +9,8 @@ #[cfg_attr(docsrs, doc(cfg(feature = "test_utils")))] pub mod test_utils; +/// Module with local caching strategies and mechanisms that greatly enhance CAR mirror performance +pub mod cache; /// Common utilities pub mod common; /// Algorithms for walking IPLD directed acyclic graphs @@ -23,5 +25,3 @@ pub mod messages; pub mod pull; /// The CAR mirror push protocol. Meant to be used qualified, i.e. `push::request` and `push::response` pub mod push; -/// Traits defined in this crate -pub mod traits; diff --git a/car-mirror/src/pull.rs b/car-mirror/src/pull.rs index 65438da..f4aa5bd 100644 --- a/car-mirror/src/pull.rs +++ b/car-mirror/src/pull.rs @@ -1,8 +1,8 @@ use crate::{ + cache::Cache, common::{block_receive, block_send, CarFile, Config, ReceiverState}, error::Error, messages::PullRequest, - traits::Cache, }; use libipld::Cid; use wnfs_common::BlockStore; @@ -46,10 +46,10 @@ pub async fn response( #[cfg(test)] mod tests { use crate::{ + cache::NoCache, common::Config, dag_walk::DagWalk, test_utils::{setup_random_dag, Metrics}, - traits::NoCache, }; use anyhow::Result; use futures::TryStreamExt; @@ -115,10 +115,10 @@ mod tests { #[cfg(test)] mod proptests { use crate::{ + cache::NoCache, common::Config, dag_walk::DagWalk, test_utils::{setup_blockstore, variable_blocksize_dag}, - traits::NoCache, }; use futures::TryStreamExt; use libipld::{Cid, Ipld}; diff --git a/car-mirror/src/push.rs b/car-mirror/src/push.rs index 59c0f70..f0d5c17 100644 --- a/car-mirror/src/push.rs +++ b/car-mirror/src/push.rs @@ -1,8 +1,8 @@ use crate::{ + cache::Cache, common::{block_receive, block_send, CarFile, Config, ReceiverState}, error::Error, messages::PushResponse, - traits::Cache, }; use libipld_core::cid::Cid; use wnfs_common::BlockStore; @@ -51,13 +51,13 @@ pub async fn response( #[cfg(test)] mod tests { use crate::{ + cache::NoCache, common::Config, dag_walk::DagWalk, test_utils::{ get_cid_at_approx_path, setup_random_dag, total_dag_blocks, total_dag_bytes, Metrics, Rvg, }, - traits::NoCache, }; use anyhow::Result; use futures::TryStreamExt; @@ -188,10 +188,10 @@ mod tests { #[cfg(test)] mod proptests { use crate::{ + cache::NoCache, common::Config, dag_walk::DagWalk, test_utils::{setup_blockstore, variable_blocksize_dag}, - traits::NoCache, }; use futures::TryStreamExt; use libipld::{Cid, Ipld}; diff --git a/car-mirror/src/test_utils/local_utils.rs b/car-mirror/src/test_utils/local_utils.rs index 5d3276b..e1b08ab 100644 --- a/car-mirror/src/test_utils/local_utils.rs +++ b/car-mirror/src/test_utils/local_utils.rs @@ -1,6 +1,6 @@ ///! Crate-local test utilities use super::{arb_ipld_dag, links_to_padded_ipld, setup_blockstore, Rvg}; -use crate::{common::references, dag_walk::DagWalk, error::Error, traits::NoCache}; +use crate::{cache::NoCache, common::references, dag_walk::DagWalk, error::Error}; use anyhow::Result; use futures::TryStreamExt; use libipld::{Cid, Ipld}; diff --git a/car-mirror/src/traits.rs b/car-mirror/src/traits.rs deleted file mode 100644 index df4c43c..0000000 --- a/car-mirror/src/traits.rs +++ /dev/null @@ -1,298 +0,0 @@ -use crate::common::references; -use anyhow::Result; -use futures::Future; -use libipld::{Cid, IpldCodec}; -#[cfg(feature = "quick_cache")] -use wnfs_common::utils::Arc; -use wnfs_common::{ - utils::{CondSend, CondSync}, - BlockStore, -}; - -/// This trait abstracts caches used by the car mirror implementation. -/// An efficient cache implementation can significantly reduce the amount -/// of lookups into the blockstore. -/// -/// At the moment, all caches are either memoization tables or informationally -/// monotonous, so you don't need to be careful about cache eviction. -/// -/// See `InMemoryCache` for a `quick_cache`-based implementation -/// (enable the `quick-cache` feature), and `NoCache` for disabling the cache. -pub trait Cache: CondSync { - /// This returns further references from the block referenced by given CID, - /// if the cache is hit. - /// Returns `None` if it's a cache miss. - /// - /// This isn't meant to be called directly, instead use `Cache::references`. - fn get_references_cache( - &self, - cid: Cid, - ) -> impl Future>>> + CondSend; - - /// Populates the references cache for given CID with given references. - fn put_references_cache( - &self, - cid: Cid, - references: Vec, - ) -> impl Future> + CondSend; - - /// Find out any CIDs that are linked to from the block with given CID. - /// - /// This makes use of the cache via `get_references_cached`, if possible. - /// If the cache is missed, then it will fetch the block, compute the references - /// and automatically populate the cache using `put_references_cached`. - fn references( - &self, - cid: Cid, - store: &impl BlockStore, - ) -> impl Future>> + CondSend { - async move { - // raw blocks don't have further links - let raw_codec: u64 = IpldCodec::Raw.into(); - if cid.codec() == raw_codec { - return Ok(Vec::new()); - } - - if let Some(refs) = self.get_references_cache(cid).await? { - return Ok(refs); - } - - let block = store.get_block(&cid).await?; - let refs = references(cid, block, Vec::new())?; - self.put_references_cache(cid, refs.clone()).await?; - Ok(refs) - } - } -} - -impl Cache for &C { - async fn get_references_cache(&self, cid: Cid) -> Result>> { - (**self).get_references_cache(cid).await - } - - async fn put_references_cache(&self, cid: Cid, references: Vec) -> Result<()> { - (**self).put_references_cache(cid, references).await - } -} - -impl Cache for Box { - async fn get_references_cache(&self, cid: Cid) -> Result>> { - (**self).get_references_cache(cid).await - } - - async fn put_references_cache(&self, cid: Cid, references: Vec) -> Result<()> { - (**self).put_references_cache(cid, references).await - } -} - -/// A [quick-cache]-based implementation of a car mirror cache. -/// -/// [quick-cache]: https://github.com/arthurprs/quick-cache/ -#[cfg(feature = "quick_cache")] -#[derive(Debug, Clone)] -pub struct InMemoryCache { - references: Arc>>, -} - -#[cfg(feature = "quick_cache")] -impl InMemoryCache { - /// Create a new in-memory cache that approximately holds - /// cached references for `approx_references_capacity` CIDs. - /// - /// Computing the expected memory requirements for the reference - /// cache isn't easy. - /// A block in theory have up to thousands of references. - /// [UnixFS] chunked files will reference up to 174 chunks - /// at a time. - /// Each CID takes up 96 bytes of memory. - /// In the UnixFS worst case of 175 CIDs per cache entry, - /// you need to reserve up to `175 * 96B = 16.8KB` of space - /// per entry. Thus, if you want your cache to not outgrow - /// ~100MB (ignoring the internal cache structure space - /// requirements), you can store up to `100MB / 16.8KB = 5952` - /// entries. - /// - /// In practice, the fanout average will be much lower than 174. - /// - /// [UnixFS]: https://github.com/ipfs/specs/blob/main/UNIXFS.md#layout - pub fn new(approx_references_capacity: usize) -> Self { - Self { - references: Arc::new(quick_cache::sync::Cache::new(approx_references_capacity)), - } - } -} - -#[cfg(feature = "quick_cache")] -impl Cache for InMemoryCache { - async fn get_references_cache(&self, cid: Cid) -> Result>> { - Ok(self.references.get(&cid)) - } - - async fn put_references_cache(&self, cid: Cid, references: Vec) -> Result<()> { - self.references.insert(cid, references); - Ok(()) - } -} - -/// An implementation of `Cache` that doesn't cache at all. -#[derive(Debug)] -pub struct NoCache; - -impl Cache for NoCache { - async fn get_references_cache(&self, _: Cid) -> Result>> { - Ok(None) - } - - async fn put_references_cache(&self, _: Cid, _: Vec) -> Result<()> { - Ok(()) - } -} - -#[cfg(feature = "quick_cache")] -#[cfg(test)] -mod quick_cache_tests { - use super::{Cache, InMemoryCache}; - use libipld::{cbor::DagCborCodec, Ipld, IpldCodec}; - use testresult::TestResult; - use wnfs_common::{encode, BlockStore, MemoryBlockStore}; - - #[test_log::test(async_std::test)] - async fn test_references_cache() -> TestResult { - let store = &MemoryBlockStore::new(); - let cache = InMemoryCache::new(10_000); - - let hello_one_cid = store - .put_block(b"Hello, One?".to_vec(), IpldCodec::Raw.into()) - .await?; - let hello_two_cid = store - .put_block(b"Hello, Two?".to_vec(), IpldCodec::Raw.into()) - .await?; - let cid = store - .put_block( - encode( - &Ipld::List(vec![Ipld::Link(hello_one_cid), Ipld::Link(hello_two_cid)]), - DagCborCodec, - )?, - DagCborCodec.into(), - ) - .await?; - - // Cache unpopulated initially - assert_eq!(cache.get_references_cache(cid).await?, None); - - // This should populate the references cache - assert_eq!( - cache.references(cid, store).await?, - vec![hello_one_cid, hello_two_cid] - ); - - // Cache should now contain the references - assert_eq!( - cache.get_references_cache(cid).await?, - Some(vec![hello_one_cid, hello_two_cid]) - ); - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::{Cache, NoCache}; - use anyhow::Result; - use libipld::{cbor::DagCborCodec, Cid, Ipld, IpldCodec}; - use std::{collections::HashMap, sync::RwLock}; - use testresult::TestResult; - use wnfs_common::{encode, BlockStore, MemoryBlockStore}; - - #[derive(Debug, Default)] - struct HashMapCache { - references: RwLock>>, - } - - impl Cache for HashMapCache { - async fn get_references_cache(&self, cid: Cid) -> Result>> { - Ok(self.references.read().unwrap().get(&cid).cloned()) - } - - async fn put_references_cache(&self, cid: Cid, references: Vec) -> Result<()> { - self.references.write().unwrap().insert(cid, references); - Ok(()) - } - } - - #[test_log::test(async_std::test)] - async fn test_references_cache() -> TestResult { - let store = &MemoryBlockStore::new(); - let cache = HashMapCache::default(); - - let hello_one_cid = store - .put_block(b"Hello, One?".to_vec(), IpldCodec::Raw.into()) - .await?; - let hello_two_cid = store - .put_block(b"Hello, Two?".to_vec(), IpldCodec::Raw.into()) - .await?; - let cid = store - .put_block( - encode( - &Ipld::List(vec![Ipld::Link(hello_one_cid), Ipld::Link(hello_two_cid)]), - DagCborCodec, - )?, - DagCborCodec.into(), - ) - .await?; - - // Cache unpopulated initially - assert_eq!(cache.get_references_cache(cid).await?, None); - - // This should populate the references cache - assert_eq!( - cache.references(cid, store).await?, - vec![hello_one_cid, hello_two_cid] - ); - - // Cache should now contain the references - assert_eq!( - cache.get_references_cache(cid).await?, - Some(vec![hello_one_cid, hello_two_cid]) - ); - - Ok(()) - } - - #[test_log::test(async_std::test)] - async fn test_no_cache_references() -> TestResult { - let store = &MemoryBlockStore::new(); - let cache = NoCache; - - let hello_one_cid = store - .put_block(b"Hello, One?".to_vec(), IpldCodec::Raw.into()) - .await?; - let hello_two_cid = store - .put_block(b"Hello, Two?".to_vec(), IpldCodec::Raw.into()) - .await?; - let cid = store - .put_block( - encode( - &Ipld::List(vec![Ipld::Link(hello_one_cid), Ipld::Link(hello_two_cid)]), - DagCborCodec, - )?, - DagCborCodec.into(), - ) - .await?; - - // Cache should start out unpopulated - assert_eq!(cache.get_references_cache(cid).await?, None); - - // We should get the correct answer for our queries - assert_eq!( - cache.references(cid, store).await?, - vec![hello_one_cid, hello_two_cid] - ); - - // We don't have a populated cache though - assert_eq!(cache.get_references_cache(cid).await?, None); - - Ok(()) - } -}