Skip to content

Commit

Permalink
feat: Implement a positive cache for blocks we already have
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus23 committed Jan 1, 2024
1 parent e1cbd40 commit 1645c90
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 32 deletions.
8 changes: 4 additions & 4 deletions car-mirror-benches/benches/artificially_slow_blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ pub fn push_throttled(c: &mut Criterion) {
},
|(client_store, root)| {
let client_store = &ThrottledBlockStore(client_store);
let client_cache = &InMemoryCache::new(10_000);
let client_cache = &InMemoryCache::new(10_000, 150_000);
let server_store = &ThrottledBlockStore::new();
let server_cache = &InMemoryCache::new(10_000);
let server_cache = &InMemoryCache::new(10_000, 150_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down Expand Up @@ -75,9 +75,9 @@ pub fn pull_throttled(c: &mut Criterion) {
},
|(server_store, root)| {
let server_store = &ThrottledBlockStore(server_store);
let server_cache = &InMemoryCache::new(10_000);
let server_cache = &InMemoryCache::new(10_000, 150_000);
let client_store = &ThrottledBlockStore::new();
let client_cache = &InMemoryCache::new(10_000);
let client_cache = &InMemoryCache::new(10_000, 150_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down
8 changes: 4 additions & 4 deletions car-mirror-benches/benches/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ pub fn push(c: &mut Criterion) {
(store, root)
},
|(ref client_store, root)| {
let client_cache = &InMemoryCache::new(10_000);
let client_cache = &InMemoryCache::new(10_000, 150_000);
let server_store = &MemoryBlockStore::new();
let server_cache = &InMemoryCache::new(10_000);
let server_cache = &InMemoryCache::new(10_000, 150_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down Expand Up @@ -68,9 +68,9 @@ pub fn pull(c: &mut Criterion) {
(store, root)
},
|(ref server_store, root)| {
let server_cache = &InMemoryCache::new(10_000);
let server_cache = &InMemoryCache::new(10_000, 150_000);
let client_store = &MemoryBlockStore::new();
let client_cache = &InMemoryCache::new(10_000);
let client_cache = &InMemoryCache::new(10_000, 150_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down
8 changes: 4 additions & 4 deletions car-mirror-benches/benches/simulated_latency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ pub fn pull_with_simulated_latency(
links_to_padded_ipld(block_padding),
));
let store = async_std::task::block_on(setup_blockstore(blocks)).unwrap();
let cache = InMemoryCache::new(10_000);
let cache = InMemoryCache::new(10_000, 150_000);
(store, cache, root)
},
|(ref server_store, ref server_cache, root)| {
let client_store = &MemoryBlockStore::new();
let client_cache = &InMemoryCache::new(10_000);
let client_cache = &InMemoryCache::new(10_000, 150_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down Expand Up @@ -145,12 +145,12 @@ pub fn push_with_simulated_latency(
links_to_padded_ipld(block_padding),
));
let store = async_std::task::block_on(setup_blockstore(blocks)).unwrap();
let cache = InMemoryCache::new(10_000);
let cache = InMemoryCache::new(10_000, 150_000);
(store, cache, root)
},
|(ref client_store, ref client_cache, root)| {
let server_store = &MemoryBlockStore::new();
let server_cache = &InMemoryCache::new(10_000);
let server_cache = &InMemoryCache::new(10_000, 150_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down
9 changes: 6 additions & 3 deletions car-mirror/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ impl std::fmt::Debug for ReceiverState {
)
});
f.debug_struct("ReceiverState")
.field("missing_subgraph_roots", &self.missing_subgraph_roots)
.field(
"missing_subgraph_roots.len() == ",
&self.missing_subgraph_roots.len(),
)
.field("have_cids_bloom", &have_cids_bloom)
.finish()
}
Expand All @@ -86,7 +89,7 @@ pub struct CarFile {
///
/// It returns a `CarFile` of (a subset) of all blocks below `root`, that
/// are thought to be missing on the receiving end.
#[instrument(skip(config, store, cache))]
#[instrument(skip_all, fields(root, last_state))]
pub async fn block_send(
root: Cid,
last_state: Option<ReceiverState>,
Expand Down Expand Up @@ -145,7 +148,7 @@ pub async fn block_send(
/// It takes a `CarFile`, verifies that its contents are related to the
/// `root` and returns some information to help the block sending side
/// figure out what blocks to send next.
#[instrument(skip(last_car, config, store, cache), fields(car_bytes = last_car.as_ref().map(|car| car.bytes.len())))]
#[instrument(skip_all, fields(root, car_bytes = last_car.as_ref().map(|car| car.bytes.len())))]
pub async fn block_receive(
root: Cid,
last_car: Option<CarFile>,
Expand Down
103 changes: 86 additions & 17 deletions car-mirror/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ use crate::common::references;
use anyhow::Result;
use async_trait::async_trait;
use libipld::{Cid, IpldCodec};
use wnfs_common::{utils::CondSync, BlockStore};
use wnfs_common::{
utils::{Arc, CondSync},

Check warning on line 6 in car-mirror/src/traits.rs

View workflow job for this annotation

GitHub Actions / run-checks (1.66)

unused import: `Arc`

Check warning on line 6 in car-mirror/src/traits.rs

View workflow job for this annotation

GitHub Actions / run-checks (nightly)

unused import: `Arc`

Check warning on line 6 in car-mirror/src/traits.rs

View workflow job for this annotation

GitHub Actions / run-checks (stable)

unused import: `Arc`
BlockStore, BlockStoreError,
};

/// This trait abstracts caches used by the car mirror implementation.
/// An efficient cache implementation can significantly reduce the amount
/// of lookups into the blockstore.
///
/// At the moment, all caches are conceptually memoization tables, so you don't
/// necessarily need to think about being careful about cache eviction.
/// At the moment, all caches are either memoization tables or informationally
/// monotonous, so you don't need to be careful about cache eviction.
///
/// See `InMemoryCache` for a `quick_cache`-based implementation
/// (enable the `quick-cache` feature), and `NoCache` for disabling the cache.
Expand All @@ -21,10 +24,23 @@ pub trait Cache: CondSync {
/// Returns `None` if it's a cache miss.
///
/// This isn't meant to be called directly, instead use `Cache::references`.
async fn get_references_cached(&self, cid: Cid) -> Result<Option<Vec<Cid>>>;
async fn get_references_cache(&self, cid: Cid) -> Result<Option<Vec<Cid>>>;

/// Populates the references cache for given CID with given references.
async fn put_references_cached(&self, cid: Cid, references: Vec<Cid>) -> Result<()>;
async fn put_references_cache(&self, cid: Cid, references: Vec<Cid>) -> Result<()>;

/// This returns whether the cache has the fact stored that a block with given
/// CID is stored.
///
/// This only returns `true` in case the block has been stored.
/// `false` simply indicates that the cache doesn't know whether the block is
/// stored or not (it's always a cache miss).
///
/// Don't call this directly, instead, use `Cache::has_block`.
async fn get_has_block_cache(&self, cid: &Cid) -> Result<bool>;

/// This populates the cache with the fact that a block with given CID is stored.
async fn put_has_block_cache(&self, cid: Cid) -> Result<()>;

/// Find out any CIDs that are linked to from the block with given CID.
///
Expand All @@ -38,32 +54,63 @@ pub trait Cache: CondSync {
return Ok(Vec::new());
}

if let Some(refs) = self.get_references_cached(cid).await? {
if let Some(refs) = self.get_references_cache(cid).await? {
return Ok(refs);
}

let block = store.get_block(&cid).await?;
let refs = references(cid, block, Vec::new())?;
self.put_references_cached(cid, refs.clone()).await?;
self.put_references_cache(cid, refs.clone()).await?;
Ok(refs)
}

/// Find out whether a given block is stored in given blockstore or not.
///
/// This cache is *only* effective on `true` values for `has_block`.
/// Repeatedly calling `has_block` with `Cid`s of blocks that are *not*
/// stored will cause repeated calls to given blockstore.
///
/// **Make sure to always use the same `BlockStore` when calling this function.**
///
/// This makes use of the caches `get_has_block_cached`, if possible.
/// On cache misses, this will actually fetch the block from the store
/// and if successful, populate the cache using `put_has_block_cached`.
async fn has_block(&self, cid: Cid, store: &impl BlockStore) -> Result<bool> {
if self.get_has_block_cache(&cid).await? {
return Ok(true);
}

match store.get_block(&cid).await {
Ok(_) => {
self.put_has_block_cache(cid).await?;
Ok(true)
}
Err(e) if matches!(e.downcast_ref(), Some(BlockStoreError::CIDNotFound(_))) => {
Ok(false)
}
Err(e) => Err(e),
}
}
}

/// A [quick-cache]-based implementation of a car mirror cache.
///
/// [quick-cache]: https://github.com/arthurprs/quick-cache/
#[cfg(feature = "quick_cache")]
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct InMemoryCache {
references: quick_cache::sync::Cache<Cid, Vec<Cid>>,
references: Arc<quick_cache::sync::Cache<Cid, Vec<Cid>>>,
has_blocks: Arc<quick_cache::sync::Cache<Cid, ()>>,
}

#[cfg(feature = "quick_cache")]
impl InMemoryCache {
/// Create a new in-memory cache that approximately holds
/// cached references for `approx_capacity` CIDs.
/// cached references for `approx_references_capacity` CIDs
/// and `approx_has_blocks_capacity` CIDs known to be stored locally.
///
/// Computing the expected memory requirements isn't easy.
/// Computing the expected memory requirements for the reference
/// cache isn't easy.
/// A block in theory have up to thousands of references.
/// [UnixFS] chunked files will reference up to 174 chunks
/// at a time.
Expand All @@ -77,10 +124,15 @@ impl InMemoryCache {
///
/// In practice, the fanout average will be much lower than 174.
///
/// On the other hand, each cache entry for the `has_blocks` cache
/// will take a little more than 64 bytes, so for a 10MB
/// `has_blocks` cache, you would use `10MB / 64bytes = 156_250`.
///
/// [UnixFS]: https://github.com/ipfs/specs/blob/main/UNIXFS.md#layout
pub fn new(approx_capacity: usize) -> Self {
pub fn new(approx_references_capacity: usize, approx_has_blocks_capacity: usize) -> Self {
Self {
references: quick_cache::sync::Cache::new(approx_capacity),
references: Arc::new(quick_cache::sync::Cache::new(approx_references_capacity)),
has_blocks: Arc::new(quick_cache::sync::Cache::new(approx_has_blocks_capacity)),
}
}
}
Expand All @@ -89,14 +141,23 @@ impl InMemoryCache {
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl Cache for InMemoryCache {
async fn get_references_cached(&self, cid: Cid) -> Result<Option<Vec<Cid>>> {
async fn get_references_cache(&self, cid: Cid) -> Result<Option<Vec<Cid>>> {
Ok(self.references.get(&cid))
}

async fn put_references_cached(&self, cid: Cid, references: Vec<Cid>) -> Result<()> {
async fn put_references_cache(&self, cid: Cid, references: Vec<Cid>) -> Result<()> {
self.references.insert(cid, references);
Ok(())
}

async fn get_has_block_cache(&self, cid: &Cid) -> Result<bool> {
Ok(self.has_blocks.get(cid).is_some())
}

async fn put_has_block_cache(&self, cid: Cid) -> Result<()> {
self.has_blocks.insert(cid, ());
Ok(())
}
}

/// An implementation of `Cache` that doesn't cache at all.
Expand All @@ -106,11 +167,19 @@ pub struct NoCache;
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl Cache for NoCache {
async fn get_references_cached(&self, _: Cid) -> Result<Option<Vec<Cid>>> {
async fn get_references_cache(&self, _: Cid) -> Result<Option<Vec<Cid>>> {
Ok(None)
}

async fn put_references_cached(&self, _: Cid, _: Vec<Cid>) -> Result<()> {
async fn put_references_cache(&self, _: Cid, _: Vec<Cid>) -> Result<()> {
Ok(())
}

async fn get_has_block_cache(&self, _: &Cid) -> Result<bool> {
Ok(false)
}

async fn put_has_block_cache(&self, _: Cid) -> Result<()> {
Ok(())
}
}

0 comments on commit 1645c90

Please sign in to comment.