diff --git a/Cargo.lock b/Cargo.lock index ce3cd4a..c7df932 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -234,22 +234,33 @@ dependencies = [ ] [[package]] -name = "async-task" -version = "4.7.0" +name = "async-stream" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbb36e985947064623dbd357f727af08ffd077f93d696782f3c56365fa2e2799" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] [[package]] -name = "async-trait" -version = "0.1.77" +name = "async-stream-impl" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", "syn 2.0.48", ] +[[package]] +name = "async-task" +version = "4.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbb36e985947064623dbd357f727af08ffd077f93d696782f3c56365fa2e2799" + [[package]] name = "atomic-waker" version = "1.1.2" @@ -427,7 +438,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-std", - "async-trait", + "async-stream", "bytes", "car-mirror", "deterministic-bloom", @@ -456,7 +467,6 @@ version = "0.1.0" dependencies = [ "anyhow", "async-std", - "async-trait", "bytes", "car-mirror", "criterion", @@ -1050,9 +1060,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c62115964e08cb8039170eb33c1d0e2388a256930279edca206fff675f82c3" +checksum = "bd5256b483761cd23699d0da46cc6fd2ee3be420bbe6d020ae4a091e70b7e9fd" [[package]] name = "iana-time-zone" @@ -1102,7 +1112,7 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" dependencies = [ - "hermit-abi 0.3.5", + "hermit-abi 0.3.6", "libc", "windows-sys 0.48.0", ] @@ -1732,21 +1742,14 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" -[[package]] -name = "retain_mut" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c31b5c4033f8fdde8700e4657be2c497e7288f01515be52168c631e2e4d4086" - [[package]] name = "roaring" -version = "0.10.2" +version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6106b5cf8587f5834158895e9715a3c6c9716c8aefab57f1f7680917191c7873" +checksum = "a1c77081a55300e016cb86f2864415b7518741879db925b8d488a0ee0d2da6bf" dependencies = [ "bytemuck", "byteorder", - "retain_mut", ] [[package]] @@ -2566,15 +2569,15 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "wnfs-common" -version = "0.1.26" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1395a47e38402df060d3448fe153c5af1eae6f27aeca9c2e79e5a39bb355efab" +checksum = "1b281a9ecccc4d450fae5e5d1b3f31d28dbdcae1e9e8c9b6233a75d7374ef4f3" dependencies = [ "anyhow", "async-once-cell", - "async-trait", "bytes", "chrono", + "cid", "dashmap", "futures", "libipld", diff --git a/Cargo.toml b/Cargo.toml index aa734b0..8a0a57d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,9 +2,19 @@ members = [ "car-mirror", "car-mirror-benches", - "car-mirror-wasm" -, - "examples"] + "car-mirror-wasm", + "examples" +] + +[workspace.dependencies] +anyhow = "1.0" +async-stream = "0.3.5" +bytes = "1.4" +futures = "0.3" +libipld = "0.16" +libipld-core = "0.16" +serde_ipld_dagcbor = "0.4" +wnfs-common = { version = "0.2.0" } # See https://doc.rust-lang.org/cargo/reference/profiles.html for more info. [profile.release.package.car-mirror-wasm] diff --git a/car-mirror-benches/Cargo.toml b/car-mirror-benches/Cargo.toml index a9590b0..b0c162c 100644 --- a/car-mirror-benches/Cargo.toml +++ b/car-mirror-benches/Cargo.toml @@ -6,14 +6,13 @@ edition = "2021" authors = ["Philipp Krüger "] [dependencies] -anyhow = "1.0" +anyhow = { workspace = true } async-std = { version = "1.11", features = ["attributes"] } -async-trait = "0.1" -bytes = "1.4.0" +bytes = { workspace = true } car-mirror = { path = "../car-mirror", version = "0.1", features = ["test_utils", "quick_cache"] } -libipld = "0.16.0" -serde_ipld_dagcbor = "0.4.0" -wnfs-common = "0.1.23" +libipld = { workspace = true } +serde_ipld_dagcbor = { workspace = true } +wnfs-common = { workspace = true } [dev-dependencies] criterion = { version = "0.4", default-features = false } diff --git a/car-mirror-benches/benches/artificially_slow_blockstore.rs b/car-mirror-benches/benches/artificially_slow_blockstore.rs index 905bdd1..f2af76b 100644 --- a/car-mirror-benches/benches/artificially_slow_blockstore.rs +++ b/car-mirror-benches/benches/artificially_slow_blockstore.rs @@ -1,16 +1,15 @@ use anyhow::Result; -use async_trait::async_trait; use bytes::Bytes; use car_mirror::{ + cache::{CacheMissing, InMemoryCache}, common::Config, pull, push, test_utils::{arb_ipld_dag, links_to_padded_ipld, setup_blockstore}, - traits::InMemoryCache, }; use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use libipld::Cid; use std::time::Duration; -use wnfs_common::{utils::CondSend, BlockStore, MemoryBlockStore}; +use wnfs_common::{utils::CondSend, BlockStore, BlockStoreError, MemoryBlockStore}; pub fn push_throttled(c: &mut Criterion) { let mut rvg = car_mirror::test_utils::Rvg::deterministic(); @@ -27,17 +26,25 @@ pub fn push_throttled(c: &mut Criterion) { (store, root) }, |(client_store, root)| { - let client_store = &ThrottledBlockStore(client_store); - let client_cache = &InMemoryCache::new(10_000, 150_000); - let server_store = &ThrottledBlockStore::new(); - let server_cache = &InMemoryCache::new(10_000, 150_000); + let client_store = &CacheMissing::new(100_000, ThrottledBlockStore(client_store)); + let client_cache = &InMemoryCache::new(100_000); + let server_store = &CacheMissing::new(100_000, ThrottledBlockStore::new()); + let server_cache = &InMemoryCache::new(100_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory async_std::task::block_on(async move { - let mut request = - push::request(root, None, config, client_store, client_cache).await?; + let mut last_response = None; loop { + let request = push::request( + root, + last_response, + config, + client_store.clone(), + client_cache.clone(), + ) + .await?; + let response = push::response(root, request, config, server_store, server_cache) .await?; @@ -45,9 +52,8 @@ pub fn push_throttled(c: &mut Criterion) { if response.indicates_finished() { break; } - request = - push::request(root, Some(response), config, client_store, client_cache) - .await?; + + last_response = Some(response); } Ok::<(), anyhow::Error>(()) @@ -74,10 +80,10 @@ pub fn pull_throttled(c: &mut Criterion) { (store, root) }, |(server_store, root)| { - let server_store = &ThrottledBlockStore(server_store); - let server_cache = &InMemoryCache::new(10_000, 150_000); - let client_store = &ThrottledBlockStore::new(); - let client_cache = &InMemoryCache::new(10_000, 150_000); + let server_store = &CacheMissing::new(100_000, ThrottledBlockStore(server_store)); + let server_cache = &InMemoryCache::new(100_000); + let client_store = &CacheMissing::new(100_000, ThrottledBlockStore::new()); + let client_cache = &InMemoryCache::new(100_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory @@ -109,18 +115,32 @@ pub fn pull_throttled(c: &mut Criterion) { #[derive(Debug, Clone)] struct ThrottledBlockStore(MemoryBlockStore); -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl BlockStore for ThrottledBlockStore { - async fn get_block(&self, cid: &Cid) -> Result { - let bytes = self.0.get_block(cid).await?; + async fn get_block(&self, cid: &Cid) -> Result { async_std::task::sleep(Duration::from_micros(50)).await; // Block fetching is artifically slowed by 50 microseconds - Ok(bytes) + self.0.get_block(cid).await } - async fn put_block(&self, bytes: impl Into + CondSend, codec: u64) -> Result { + async fn put_block( + &self, + bytes: impl Into + CondSend, + codec: u64, + ) -> Result { self.0.put_block(bytes, codec).await } + + async fn put_block_keyed( + &self, + cid: Cid, + bytes: impl Into + CondSend, + ) -> Result<(), BlockStoreError> { + self.0.put_block_keyed(cid, bytes).await + } + + async fn has_block(&self, cid: &Cid) -> Result { + async_std::task::sleep(Duration::from_micros(50)).await; // Block fetching is artifically slowed by 50 microseconds + self.0.has_block(cid).await + } } impl ThrottledBlockStore { diff --git a/car-mirror-benches/benches/in_memory.rs b/car-mirror-benches/benches/in_memory.rs index 8d03744..c7b7623 100644 --- a/car-mirror-benches/benches/in_memory.rs +++ b/car-mirror-benches/benches/in_memory.rs @@ -1,8 +1,8 @@ use car_mirror::{ + cache::InMemoryCache, common::Config, pull, push, test_utils::{arb_ipld_dag, links_to_padded_ipld, setup_blockstore}, - traits::InMemoryCache, }; use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use wnfs_common::MemoryBlockStore; @@ -22,9 +22,9 @@ pub fn push(c: &mut Criterion) { (store, root) }, |(ref client_store, root)| { - let client_cache = &InMemoryCache::new(10_000, 150_000); + let client_cache = &InMemoryCache::new(100_000); let server_store = &MemoryBlockStore::new(); - let server_cache = &InMemoryCache::new(10_000, 150_000); + let server_cache = &InMemoryCache::new(100_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory @@ -68,9 +68,9 @@ pub fn pull(c: &mut Criterion) { (store, root) }, |(ref server_store, root)| { - let server_cache = &InMemoryCache::new(10_000, 150_000); + let server_cache = &InMemoryCache::new(100_000); let client_store = &MemoryBlockStore::new(); - let client_cache = &InMemoryCache::new(10_000, 150_000); + let client_cache = &InMemoryCache::new(100_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory diff --git a/car-mirror-benches/benches/simulated_latency.rs b/car-mirror-benches/benches/simulated_latency.rs index 155ef59..7b83f0a 100644 --- a/car-mirror-benches/benches/simulated_latency.rs +++ b/car-mirror-benches/benches/simulated_latency.rs @@ -1,8 +1,8 @@ use car_mirror::{ + cache::InMemoryCache, common::Config, pull, push, test_utils::{arb_ipld_dag, links_to_padded_ipld, setup_blockstore}, - traits::InMemoryCache, }; use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use std::{ops::Range, time::Duration}; @@ -68,12 +68,12 @@ pub fn pull_with_simulated_latency( links_to_padded_ipld(block_padding), )); let store = async_std::task::block_on(setup_blockstore(blocks)).unwrap(); - let cache = InMemoryCache::new(10_000, 150_000); + let cache = InMemoryCache::new(100_000); (store, cache, root) }, |(ref server_store, ref server_cache, root)| { let client_store = &MemoryBlockStore::new(); - let client_cache = &InMemoryCache::new(10_000, 150_000); + let client_cache = &InMemoryCache::new(100_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory @@ -145,12 +145,12 @@ pub fn push_with_simulated_latency( links_to_padded_ipld(block_padding), )); let store = async_std::task::block_on(setup_blockstore(blocks)).unwrap(); - let cache = InMemoryCache::new(10_000, 150_000); + let cache = InMemoryCache::new(100_000); (store, cache, root) }, |(ref client_store, ref client_cache, root)| { let server_store = &MemoryBlockStore::new(); - let server_cache = &InMemoryCache::new(10_000, 150_000); + let server_cache = &InMemoryCache::new(100_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory diff --git a/car-mirror/Cargo.toml b/car-mirror/Cargo.toml index aa0363d..6623dfa 100644 --- a/car-mirror/Cargo.toml +++ b/car-mirror/Cargo.toml @@ -18,23 +18,23 @@ path = "src/lib.rs" doctest = true [dependencies] -anyhow = "1.0" -async-trait = "0.1.73" -bytes = "1.4" +anyhow = { workspace = true } +async-stream = { workspace = true } +bytes = { workspace = true } deterministic-bloom = "0.1" -futures = "0.3" +futures = { workspace = true } iroh-car = "0.4" -libipld = "0.16" -libipld-core = "0.16" +libipld = { workspace = true } +libipld-core = { workspace = true } proptest = { version = "1.1", optional = true } quick_cache = { version = "0.4", optional = true } roaring-graphs = { version = "0.12", optional = true } serde = "^1" -serde_ipld_dagcbor = "0.4" +serde_ipld_dagcbor = { workspace = true } thiserror = "1.0" tokio = { version = "^1", default-features = false } tracing = "0.1" -wnfs-common = "0.1.26" +wnfs-common = { workspace = true } [dev-dependencies] async-std = { version = "1.11", features = ["attributes"] } diff --git a/car-mirror/src/cache.rs b/car-mirror/src/cache.rs new file mode 100644 index 0000000..6613a24 --- /dev/null +++ b/car-mirror/src/cache.rs @@ -0,0 +1,419 @@ +use crate::common::references; +use futures::Future; +use libipld::{Cid, IpldCodec}; +use wnfs_common::{ + utils::{CondSend, CondSync}, + BlockStore, BlockStoreError, +}; + +/// This trait abstracts caches used by the car mirror implementation. +/// An efficient cache implementation can significantly reduce the amount +/// of lookups into the blockstore. +/// +/// At the moment, all caches are either memoization tables or informationally +/// monotonous, so you don't need to be careful about cache eviction. +/// +/// See `InMemoryCache` for a `quick_cache`-based implementation +/// (enable the `quick-cache` feature), and `NoCache` for disabling the cache. +pub trait Cache: CondSync { + /// This returns further references from the block referenced by given CID, + /// if the cache is hit. + /// Returns `None` if it's a cache miss. + /// + /// This isn't meant to be called directly, instead use `Cache::references`. + fn get_references_cache( + &self, + cid: Cid, + ) -> impl Future>, BlockStoreError>> + CondSend; + + /// Populates the references cache for given CID with given references. + fn put_references_cache( + &self, + cid: Cid, + references: Vec, + ) -> impl Future> + CondSend; + + /// Find out any CIDs that are linked to from the block with given CID. + /// + /// This makes use of the cache via `get_references_cached`, if possible. + /// If the cache is missed, then it will fetch the block, compute the references + /// and automatically populate the cache using `put_references_cached`. + fn references( + &self, + cid: Cid, + store: &impl BlockStore, + ) -> impl Future, BlockStoreError>> + CondSend { + async move { + // raw blocks don't have further links + let raw_codec: u64 = IpldCodec::Raw.into(); + if cid.codec() == raw_codec { + return Ok(Vec::new()); + } + + if let Some(refs) = self.get_references_cache(cid).await? { + return Ok(refs); + } + + let block = store.get_block(&cid).await?; + let refs = references(cid, block, Vec::new())?; + self.put_references_cache(cid, refs.clone()).await?; + Ok(refs) + } + } +} + +impl Cache for &C { + async fn get_references_cache(&self, cid: Cid) -> Result>, BlockStoreError> { + (**self).get_references_cache(cid).await + } + + async fn put_references_cache( + &self, + cid: Cid, + references: Vec, + ) -> Result<(), BlockStoreError> { + (**self).put_references_cache(cid, references).await + } +} + +impl Cache for Box { + async fn get_references_cache(&self, cid: Cid) -> Result>, BlockStoreError> { + (**self).get_references_cache(cid).await + } + + async fn put_references_cache( + &self, + cid: Cid, + references: Vec, + ) -> Result<(), BlockStoreError> { + (**self).put_references_cache(cid, references).await + } +} + +/// An implementation of `Cache` that doesn't cache at all. +#[derive(Debug)] +pub struct NoCache; + +impl Cache for NoCache { + async fn get_references_cache(&self, _: Cid) -> Result>, BlockStoreError> { + Ok(None) + } + + async fn put_references_cache(&self, _: Cid, _: Vec) -> Result<(), BlockStoreError> { + Ok(()) + } +} + +#[cfg(feature = "quick_cache")] +pub use quick_cache::*; + +#[cfg(feature = "quick_cache")] +mod quick_cache { + use super::Cache; + use bytes::Bytes; + use libipld::Cid; + use quick_cache::{sync, OptionsBuilder, Weighter}; + use wnfs_common::{ + utils::{Arc, CondSend}, + BlockStore, BlockStoreError, + }; + + /// A [quick-cache]-based implementation of a car mirror cache. + /// + /// [quick-cache]: https://github.com/arthurprs/quick-cache/ + #[derive(Debug, Clone)] + pub struct InMemoryCache { + references: Arc, ReferencesWeighter>>, + } + + /// A wrapper struct for a `BlockStore` that attaches an in-memory cache + /// of which blocks are available and which aren't, speeding up + /// consecutive `has_block` and `get_block` calls. + #[derive(Debug, Clone)] + pub struct CacheMissing { + inner: B, + has_blocks: Arc>, + } + + impl InMemoryCache { + /// Create a new in-memory cache that approximately holds + /// cached references for `approx_cids` CIDs. + /// + /// Memory requirements can be eye-balled by calculating ~100 bytes + /// per CID in the cache. + /// + /// So if you want this cache to never exceed roughly ~100MB, set + /// `approx_cids` to `1_000_000`. + pub fn new(approx_cids: usize) -> Self { + let max_links_per_unixfs = 175; + let est_average_links = max_links_per_unixfs / 10; + Self { + references: Arc::new(sync::Cache::with_options( + OptionsBuilder::new() + .estimated_items_capacity(approx_cids / est_average_links) + .weight_capacity(approx_cids as u64) + .build() + .expect("Couldn't create options for quick cache?"), + ReferencesWeighter, + Default::default(), + Default::default(), + )), + } + } + } + + impl Cache for InMemoryCache { + async fn get_references_cache( + &self, + cid: Cid, + ) -> Result>, BlockStoreError> { + Ok(self.references.get(&cid)) + } + + async fn put_references_cache( + &self, + cid: Cid, + references: Vec, + ) -> Result<(), BlockStoreError> { + self.references.insert(cid, references); + Ok(()) + } + } + + impl CacheMissing { + /// Wrap an existing `BlockStore`, caching `has_block` responses. + /// + /// This will also intercept `get_block` requests and fail early, if the + /// block is known to be missing. + /// In some circumstances this can be problematic, e.g. if blocks can be + /// added and removed to the underlying blockstore without going through + /// the wrapped instance's `put_block` or `put_block_keyed` interfaces. + /// + /// In these cases a more advanced caching strategy that may answer + /// `has_block` early from a cache with a cache TTL & eviction strategy. + /// + /// The additional memory requirements for this cache can be estimated + /// using the `approx_capacity`: Each cache line is roughly ~100 bytes + /// in size, so for a 100MB cache, set this value to `1_000_000`. + pub fn new(approx_capacity: usize, inner: B) -> Self { + Self { + inner, + has_blocks: Arc::new(sync::Cache::new(approx_capacity)), + } + } + } + + impl BlockStore for CacheMissing { + async fn get_block(&self, cid: &Cid) -> Result { + match self.has_blocks.get_value_or_guard_async(cid).await { + Ok(false) => Err(BlockStoreError::CIDNotFound(*cid)), + Ok(true) => self.inner.get_block(cid).await, + Err(guard) => match self.inner.get_block(cid).await { + Ok(block) => { + let _ignore_meantime_eviction = guard.insert(true); + Ok(block) + } + e @ Err(BlockStoreError::CIDNotFound(_)) => { + let _ignore_meantime_eviction = guard.insert(false); + e + } + Err(e) => Err(e), + }, + } + } + + async fn put_block_keyed( + &self, + cid: Cid, + bytes: impl Into + CondSend, + ) -> Result<(), BlockStoreError> { + self.inner.put_block_keyed(cid, bytes).await?; + self.has_blocks.insert(cid, true); + Ok(()) + } + + async fn has_block(&self, cid: &Cid) -> Result { + self.has_blocks + .get_or_insert_async(cid, self.inner.has_block(cid)) + .await + } + + async fn put_block( + &self, + bytes: impl Into + CondSend, + codec: u64, + ) -> Result { + let cid = self.inner.put_block(bytes, codec).await?; + self.has_blocks.insert(cid, true); + Ok(cid) + } + + fn create_cid(&self, bytes: &[u8], codec: u64) -> Result { + self.inner.create_cid(bytes, codec) + } + } + + #[derive(Debug, Clone)] + struct ReferencesWeighter; + + impl Weighter> for ReferencesWeighter { + fn weight(&self, _key: &Cid, val: &Vec) -> u32 { + 1 + val.len() as u32 + } + } + + #[cfg(test)] + mod tests { + use super::{Cache, InMemoryCache}; + use libipld::{cbor::DagCborCodec, Ipld, IpldCodec}; + use testresult::TestResult; + use wnfs_common::{encode, BlockStore, MemoryBlockStore}; + + #[test_log::test(async_std::test)] + async fn test_references_cache() -> TestResult { + let store = &MemoryBlockStore::new(); + let cache = InMemoryCache::new(100_000); + + let hello_one_cid = store + .put_block(b"Hello, One?".to_vec(), IpldCodec::Raw.into()) + .await?; + let hello_two_cid = store + .put_block(b"Hello, Two?".to_vec(), IpldCodec::Raw.into()) + .await?; + let cid = store + .put_block( + encode( + &Ipld::List(vec![Ipld::Link(hello_one_cid), Ipld::Link(hello_two_cid)]), + DagCborCodec, + )?, + DagCborCodec.into(), + ) + .await?; + + // Cache unpopulated initially + assert_eq!(cache.get_references_cache(cid).await?, None); + + // This should populate the references cache + assert_eq!( + cache.references(cid, store).await?, + vec![hello_one_cid, hello_two_cid] + ); + + // Cache should now contain the references + assert_eq!( + cache.get_references_cache(cid).await?, + Some(vec![hello_one_cid, hello_two_cid]) + ); + + Ok(()) + } + } +} + +#[cfg(test)] +mod tests { + use super::{Cache, NoCache}; + use anyhow::Result; + use libipld::{cbor::DagCborCodec, Cid, Ipld, IpldCodec}; + use std::{collections::HashMap, sync::RwLock}; + use testresult::TestResult; + use wnfs_common::{encode, BlockStore, BlockStoreError, MemoryBlockStore}; + + #[derive(Debug, Default)] + struct HashMapCache { + references: RwLock>>, + } + + impl Cache for HashMapCache { + async fn get_references_cache( + &self, + cid: Cid, + ) -> Result>, BlockStoreError> { + Ok(self.references.read().unwrap().get(&cid).cloned()) + } + + async fn put_references_cache( + &self, + cid: Cid, + references: Vec, + ) -> Result<(), BlockStoreError> { + self.references.write().unwrap().insert(cid, references); + Ok(()) + } + } + + #[test_log::test(async_std::test)] + async fn test_references_cache() -> TestResult { + let store = &MemoryBlockStore::new(); + let cache = HashMapCache::default(); + + let hello_one_cid = store + .put_block(b"Hello, One?".to_vec(), IpldCodec::Raw.into()) + .await?; + let hello_two_cid = store + .put_block(b"Hello, Two?".to_vec(), IpldCodec::Raw.into()) + .await?; + let cid = store + .put_block( + encode( + &Ipld::List(vec![Ipld::Link(hello_one_cid), Ipld::Link(hello_two_cid)]), + DagCborCodec, + )?, + DagCborCodec.into(), + ) + .await?; + + // Cache unpopulated initially + assert_eq!(cache.get_references_cache(cid).await?, None); + + // This should populate the references cache + assert_eq!( + cache.references(cid, store).await?, + vec![hello_one_cid, hello_two_cid] + ); + + // Cache should now contain the references + assert_eq!( + cache.get_references_cache(cid).await?, + Some(vec![hello_one_cid, hello_two_cid]) + ); + + Ok(()) + } + + #[test_log::test(async_std::test)] + async fn test_no_cache_references() -> TestResult { + let store = &MemoryBlockStore::new(); + let cache = NoCache; + + let hello_one_cid = store + .put_block(b"Hello, One?".to_vec(), IpldCodec::Raw.into()) + .await?; + let hello_two_cid = store + .put_block(b"Hello, Two?".to_vec(), IpldCodec::Raw.into()) + .await?; + let cid = store + .put_block( + encode( + &Ipld::List(vec![Ipld::Link(hello_one_cid), Ipld::Link(hello_two_cid)]), + DagCborCodec, + )?, + DagCborCodec.into(), + ) + .await?; + + // Cache should start out unpopulated + assert_eq!(cache.get_references_cache(cid).await?, None); + + // We should get the correct answer for our queries + assert_eq!( + cache.references(cid, store).await?, + vec![hello_one_cid, hello_two_cid] + ); + + // We don't have a populated cache though + assert_eq!(cache.get_references_cache(cid).await?, None); + + Ok(()) + } +} diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index da62de9..7699d9e 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -1,6 +1,6 @@ use bytes::Bytes; use deterministic_bloom::runtime_size::BloomFilter; -use futures::{future, StreamExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use iroh_car::{CarHeader, CarReader, CarWriter}; use libipld::{Ipld, IpldCodec}; use libipld_core::{cid::Cid, codec::References}; @@ -12,11 +12,11 @@ use wnfs_common::{ }; use crate::{ + cache::Cache, dag_walk::DagWalk, error::Error, incremental_verification::{BlockState, IncrementalDagVerification}, messages::{Bloom, PullRequest, PushResponse}, - traits::Cache, }; //-------------------------------------------------------------------------------------------------- @@ -72,8 +72,8 @@ pub async fn block_send( root: Cid, last_state: Option, config: &Config, - store: &impl BlockStore, - cache: &impl Cache, + store: impl BlockStore, + cache: impl Cache, ) -> Result { let bytes = block_send_car_stream( root, @@ -94,13 +94,13 @@ pub async fn block_send( /// /// It uses the car file format for framing blocks & CIDs in the given `AsyncWrite`. #[instrument(skip_all, fields(root, last_state))] -pub async fn block_send_car_stream<'a, W: tokio::io::AsyncWrite + Unpin + Send>( +pub async fn block_send_car_stream( root: Cid, last_state: Option, stream: W, send_limit: Option, - store: &impl BlockStore, - cache: &impl Cache, + store: impl BlockStore, + cache: impl Cache, ) -> Result { let mut block_stream = block_send_block_stream(root, last_state, store, cache).await?; write_blocks_into_car(stream, &mut block_stream, send_limit).await @@ -111,8 +111,8 @@ pub async fn block_send_car_stream<'a, W: tokio::io::AsyncWrite + Unpin + Send>( pub async fn block_send_block_stream<'a>( root: Cid, last_state: Option, - store: &'a impl BlockStore, - cache: &'a impl Cache, + store: impl BlockStore + 'a, + cache: impl Cache + 'a, ) -> Result, Error> { let ReceiverState { missing_subgraph_roots, @@ -124,7 +124,7 @@ pub async fn block_send_block_stream<'a>( // Verify that all missing subgraph roots are in the relevant DAG: let subgraph_roots = - verify_missing_subgraph_roots(root, &missing_subgraph_roots, store, cache).await?; + verify_missing_subgraph_roots(root, &missing_subgraph_roots, &store, &cache).await?; let bloom = handle_missing_bloom(have_cids_bloom); @@ -146,8 +146,8 @@ pub async fn block_receive( root: Cid, last_car: Option, config: &Config, - store: &impl BlockStore, - cache: &impl Cache, + store: impl BlockStore, + cache: impl Cache, ) -> Result { let mut receiver_state = match last_car { Some(car) => { @@ -160,7 +160,7 @@ pub async fn block_receive( block_receive_car_stream(root, Cursor::new(car.bytes), config, store, cache).await? } - None => IncrementalDagVerification::new([root], store, cache) + None => IncrementalDagVerification::new([root], &store, &cache) .await? .into_receiver_state(config.bloom_fpr), }; @@ -178,8 +178,8 @@ pub async fn block_receive_car_stream Result { let reader = CarReader::new(reader).await?; @@ -199,13 +199,13 @@ pub async fn block_receive_block_stream( root: Cid, stream: &mut BlockStream<'_>, config: &Config, - store: &impl BlockStore, - cache: &impl Cache, + store: impl BlockStore, + cache: impl Cache, ) -> Result { - let mut dag_verification = IncrementalDagVerification::new([root], store, cache).await?; + let mut dag_verification = IncrementalDagVerification::new([root], &store, &cache).await?; while let Some((cid, block)) = stream.try_next().await? { - match read_and_verify_block(&mut dag_verification, (cid, block), store, cache).await? { + match read_and_verify_block(&mut dag_verification, (cid, block), &store, &cache).await? { BlockState::Have => { // This can happen because we've just discovered a subgraph we already have. // Let's update the endpoint with our new receiver state. @@ -308,6 +308,8 @@ async fn car_frame_from_block(block: (Cid, Bytes)) -> Result { Ok(bytes.into()) } +/// Ensure that any requested subgraph roots are actually part +/// of the DAG from the root. async fn verify_missing_subgraph_roots( root: Cid, missing_subgraph_roots: &[Cid], @@ -316,9 +318,10 @@ async fn verify_missing_subgraph_roots( ) -> Result, Error> { let subgraph_roots: Vec = DagWalk::breadth_first([root]) .stream(store, cache) - .try_filter_map( - |cid| async move { Ok(missing_subgraph_roots.contains(&cid).then_some(cid)) }, - ) + .try_filter_map(|item| async move { + let cid = item.to_cid()?; + Ok(missing_subgraph_roots.contains(&cid).then_some(cid)) + }) .try_collect() .await?; @@ -356,23 +359,24 @@ fn handle_missing_bloom(have_cids_bloom: Option) -> BloomFilter { fn stream_blocks_from_roots<'a>( subgraph_roots: Vec, bloom: BloomFilter, - store: &'a impl BlockStore, - cache: &'a impl Cache, + store: impl BlockStore + 'a, + cache: impl Cache + 'a, ) -> BlockStream<'a> { - Box::pin( - DagWalk::breadth_first(subgraph_roots.clone()) - .stream(store, cache) - .try_filter(move |cid| { - future::ready(!should_block_be_skipped(cid, &bloom, &subgraph_roots)) - }) - .and_then(move |cid| async move { - let bytes = store - .get_block(&cid) - .await - .map_err(Error::BlockStoreError)?; - Ok((cid, bytes)) - }), - ) + Box::pin(async_stream::try_stream! { + let mut dag_walk = DagWalk::breadth_first(subgraph_roots.clone()); + + while let Some(item) = dag_walk.next(&store, &cache).await? { + let cid = item.to_cid()?; + + if should_block_be_skipped(&cid, &bloom, &subgraph_roots) { + continue; + } + + let bytes = store.get_block(&cid).await.map_err(Error::BlockStoreError)?; + + yield (cid, bytes); + } + }) } async fn write_blocks_into_car( @@ -574,7 +578,7 @@ impl std::fmt::Debug for ReceiverState { #[cfg(test)] mod tests { use super::*; - use crate::{test_utils::assert_cond_send_sync, traits::NoCache}; + use crate::{cache::NoCache, test_utils::assert_cond_send_sync}; use testresult::TestResult; use wnfs_common::MemoryBlockStore; @@ -585,8 +589,8 @@ mod tests { unimplemented!(), unimplemented!(), unimplemented!(), - unimplemented!() as &MemoryBlockStore, - &NoCache, + unimplemented!() as MemoryBlockStore, + NoCache, ) }); assert_cond_send_sync(|| { diff --git a/car-mirror/src/dag_walk.rs b/car-mirror/src/dag_walk.rs index 51ec790..045d0ec 100644 --- a/car-mirror/src/dag_walk.rs +++ b/car-mirror/src/dag_walk.rs @@ -1,9 +1,9 @@ -use crate::{common::references, error::Error, traits::Cache}; +use crate::{cache::Cache, common::references, error::Error}; use bytes::Bytes; use futures::{stream::try_unfold, Stream}; use libipld_core::cid::Cid; use std::collections::{HashSet, VecDeque}; -use wnfs_common::BlockStore; +use wnfs_common::{BlockStore, BlockStoreError}; /// A struct that represents an ongoing walk through the Dag. #[derive(Clone, Debug)] @@ -17,6 +17,29 @@ pub struct DagWalk { pub breadth_first: bool, } +/// Represents the state that a traversed block was found in. +/// If it's `Have`, then +#[derive(Debug, Clone, Copy)] +pub enum TraversedItem { + /// The block is available locally, and further + /// links from this block will be traversed. + Have(Cid), + /// The block is not available locally, so its links + /// can't be followed. + Missing(Cid), +} + +impl TraversedItem { + /// Return the CID of this traversed item. If the block for this CID + /// is missing, turn this item into the appropriate error. + pub fn to_cid(self) -> Result { + match self { + Self::Have(cid) => Ok(cid), + Self::Missing(cid) => Err(Error::BlockStoreError(BlockStoreError::CIDNotFound(cid))), + } + } +} + impl DagWalk { /// Start a breadth-first traversal of given roots. /// @@ -50,43 +73,65 @@ impl DagWalk { } } + fn frontier_next(&mut self) -> Option { + loop { + let cid = if self.breadth_first { + self.frontier.pop_back()? + } else { + self.frontier.pop_front()? + }; + + // We loop until we find an unvisited block + if self.visited.insert(cid) { + return Some(cid); + } + } + } + /// Return the next node in the traversal. /// /// Returns `None` if no nodes are left to be visited. + /// + /// Returns `Some((cid, item_state))` where `cid` is the next block in + /// the traversal, and `item_state` tells you whether the block is available + /// in the blockstore locally. If not, its links won't be followed and the + /// traversal will be incomplete. + /// This is not an error! If you want this to be an error, consider using + /// `TraversedItem::to_cid`. pub async fn next( &mut self, store: &impl BlockStore, cache: &impl Cache, - ) -> Result, Error> { - let cid = loop { - let popped = if self.breadth_first { - self.frontier.pop_back() - } else { - self.frontier.pop_front() - }; - - let Some(cid) = popped else { - return Ok(None); - }; - - // We loop until we find an unvisited block - if self.visited.insert(cid) { - break cid; - } + ) -> Result, Error> { + let Some(cid) = self.frontier_next() else { + return Ok(None); }; - let refs = cache - .references(cid, store) + let has_block = store + .has_block(&cid) .await .map_err(Error::BlockStoreError)?; - for ref_cid in refs { - if !self.visited.contains(&ref_cid) { - self.frontier.push_front(ref_cid); + if has_block { + let refs = cache + .references(cid, store) + .await + .map_err(Error::BlockStoreError)?; + + for ref_cid in refs { + if !self.visited.contains(&ref_cid) { + self.frontier.push_front(ref_cid); + } } } - Ok(Some(cid)) + let item = if has_block { + TraversedItem::Have(cid) + } else { + TraversedItem::Missing(cid) + }; + + Ok(Some(item)) } /// Turn this traversal into a stream @@ -94,13 +139,34 @@ impl DagWalk { self, store: &'a impl BlockStore, cache: &'a impl Cache, - ) -> impl Stream> + Unpin + 'a { + ) -> impl Stream> + Unpin + 'a { Box::pin(try_unfold(self, move |mut this| async move { - let maybe_block = this.next(store, cache).await?; - Ok(maybe_block.map(|b| (b, this))) + let item = this.next(store, cache).await?; + Ok(item.map(|b| (b, this))) })) } + /// Turn this traversal into a stream that takes ownership of the store & cache. + /// + /// In most cases `store` and `cache` should be cheaply-clonable types, so giving + /// the traversal ownership of them shouldn't be a big deal. + /// + /// This helps with creating streams that are `: 'static`, which is useful for + /// anything that ends up being put into e.g. a tokio task. + pub fn stream_owned( + self, + store: impl BlockStore, + cache: impl Cache, + ) -> impl Stream> + Unpin { + Box::pin(try_unfold( + (self, store, cache), + move |(mut this, store, cache)| async move { + let item = this.next(&store, &cache).await?; + Ok(item.map(|b| (b, (this, store, cache)))) + }, + )) + } + /// Find out whether the traversal is finished. /// /// The next call to `next` would result in `None` if this returns true. @@ -129,11 +195,11 @@ impl DagWalk { #[cfg(test)] mod tests { use super::*; - use crate::traits::NoCache; + use crate::cache::NoCache; use futures::TryStreamExt; - use libipld::Ipld; + use libipld::{cbor::DagCborCodec, Ipld}; use testresult::TestResult; - use wnfs_common::MemoryBlockStore; + use wnfs_common::{encode, MemoryBlockStore}; #[test_log::test(async_std::test)] async fn test_walk_dag_breadth_first() -> TestResult { @@ -143,24 +209,49 @@ mod tests { // -> cid_2 // -> cid_3 - let cid_1 = store.put_serializable(&Ipld::String("1".into())).await?; - let cid_2 = store.put_serializable(&Ipld::String("2".into())).await?; - let cid_3 = store.put_serializable(&Ipld::String("3".into())).await?; + let cid_1 = store + .put_block( + encode(&Ipld::String("1".into()), DagCborCodec)?, + DagCborCodec.into(), + ) + .await?; + let cid_2 = store + .put_block( + encode(&Ipld::String("2".into()), DagCborCodec)?, + DagCborCodec.into(), + ) + .await?; + let cid_3 = store + .put_block( + encode(&Ipld::String("3".into()), DagCborCodec)?, + DagCborCodec.into(), + ) + .await?; let cid_1_wrap = store - .put_serializable(&Ipld::List(vec![Ipld::Link(cid_1)])) + .put_block( + encode(&Ipld::List(vec![Ipld::Link(cid_1)]), DagCborCodec)?, + DagCborCodec.into(), + ) .await?; let cid_root = store - .put_serializable(&Ipld::List(vec![ - Ipld::Link(cid_1_wrap), - Ipld::Link(cid_2), - Ipld::Link(cid_3), - ])) + .put_block( + encode( + &Ipld::List(vec![ + Ipld::Link(cid_1_wrap), + Ipld::Link(cid_2), + Ipld::Link(cid_3), + ]), + DagCborCodec, + )?, + DagCborCodec.into(), + ) .await?; let cids = DagWalk::breadth_first([cid_root]) .stream(store, &NoCache) + .and_then(|item| async move { item.to_cid() }) .try_collect::>() .await? .into_iter() @@ -175,7 +266,7 @@ mod tests { #[cfg(test)] mod proptests { use super::*; - use crate::{test_utils::arb_ipld_dag, traits::NoCache}; + use crate::{cache::NoCache, test_utils::arb_ipld_dag}; use futures::TryStreamExt; use libipld::{ multihash::{Code, MultihashDigest}, @@ -214,6 +305,7 @@ mod proptests { let mut cids = DagWalk::breadth_first([root]) .stream(store, &NoCache) + .and_then(|item| async move { item.to_cid() }) .try_collect::>() .await .unwrap(); diff --git a/car-mirror/src/error.rs b/car-mirror/src/error.rs index 1dbc1ac..2d76029 100644 --- a/car-mirror/src/error.rs +++ b/car-mirror/src/error.rs @@ -1,4 +1,5 @@ use libipld::Cid; +use wnfs_common::BlockStoreError; use crate::incremental_verification::BlockState; @@ -31,17 +32,9 @@ pub enum Error { cid: Cid, }, - /// This error is raised when the hash function that the `BlockStore` uses a different hashing function - /// than the blocks which are received over the wire. - /// This error will be removed in the future, when the block store trait gets modified to support specifying - /// the hash function. - #[error("BlockStore uses an incompatible hashing function: CID mismatched, expected {cid}, got {actual_cid}")] - BlockStoreIncompatible { - /// The expected CID - cid: Box, - /// The CID returned from the BlockStore implementation - actual_cid: Box, - }, + /// An error rasied from the blockstore. + #[error("BlockStore error: {0}")] + BlockStoreError(#[from] BlockStoreError), // ------------- // Anyhow Errors @@ -50,10 +43,6 @@ pub enum Error { #[error("Error during block parsing: {0}")] ParsingError(anyhow::Error), - /// An error rasied from the blockstore. - #[error("BlockStore error: {0}")] - BlockStoreError(anyhow::Error), - // ---------- // Sub-errors // ---------- diff --git a/car-mirror/src/incremental_verification.rs b/car-mirror/src/incremental_verification.rs index 30a0eb7..95bbd9d 100644 --- a/car-mirror/src/incremental_verification.rs +++ b/car-mirror/src/incremental_verification.rs @@ -1,8 +1,8 @@ use crate::{ + cache::Cache, common::ReceiverState, - dag_walk::DagWalk, + dag_walk::{DagWalk, TraversedItem}, error::{Error, IncrementalVerificationError}, - traits::Cache, }; use bytes::Bytes; use deterministic_bloom::runtime_size::BloomFilter; @@ -12,7 +12,7 @@ use libipld_core::{ }; use std::{collections::HashSet, matches}; use tracing::instrument; -use wnfs_common::{BlockStore, BlockStoreError}; +use wnfs_common::BlockStore; /// A data structure that keeps state about incremental DAG verification. #[derive(Clone, Debug)] @@ -65,34 +65,14 @@ impl IncrementalDagVerification { ) -> Result<(), Error> { let mut dag_walk = DagWalk::breadth_first(self.want_cids.iter().cloned()); - loop { - match dag_walk.next(store, cache).await { - Err(Error::BlockStoreError(e)) => { - if let Some(BlockStoreError::CIDNotFound(not_found)) = - e.downcast_ref::() - { - tracing::trace!(%not_found, "Missing block, adding to want list"); - self.mark_as_want(*not_found); - } else { - return Err(Error::BlockStoreError(e)); - } + while let Some(item) = dag_walk.next(store, cache).await? { + match item { + TraversedItem::Have(cid) => { + self.mark_as_have(cid); } - Err(e) => return Err(e), - Ok(Some(cid)) => { - let not_found = matches!( - store.get_block(&cid).await, - Err(e) if matches!(e.downcast_ref(), Some(BlockStoreError::CIDNotFound(_))) - ); - - if not_found { - tracing::trace!(%cid, "Missing block, adding to want list"); - self.mark_as_want(cid); - } else { - self.mark_as_have(cid); - } - } - Ok(None) => { - break; + TraversedItem::Missing(cid) => { + tracing::trace!(%cid, "Missing block, adding to want list"); + self.mark_as_want(cid); } } } @@ -179,20 +159,11 @@ impl IncrementalDagVerification { .into()); } - let actual_cid = store - .put_block(bytes, cid.codec()) + store + .put_block_keyed(cid, bytes) .await .map_err(Error::BlockStoreError)?; - // TODO(matheus23): The BlockStore chooses the hashing function, - // so it may choose a different hashing function, causing a mismatch - if actual_cid != cid { - return Err(Error::BlockStoreIncompatible { - cid: Box::new(cid), - actual_cid: Box::new(actual_cid), - }); - } - self.update_have_cids(store, cache).await?; Ok(()) diff --git a/car-mirror/src/lib.rs b/car-mirror/src/lib.rs index 6514d7b..d994424 100644 --- a/car-mirror/src/lib.rs +++ b/car-mirror/src/lib.rs @@ -9,6 +9,8 @@ #[cfg_attr(docsrs, doc(cfg(feature = "test_utils")))] pub mod test_utils; +/// Module with local caching strategies and mechanisms that greatly enhance CAR mirror performance +pub mod cache; /// Common utilities pub mod common; /// Algorithms for walking IPLD directed acyclic graphs @@ -23,5 +25,3 @@ pub mod messages; pub mod pull; /// The CAR mirror push protocol. Meant to be used qualified, i.e. `push::request` and `push::response` pub mod push; -/// Traits defined in this crate -pub mod traits; diff --git a/car-mirror/src/pull.rs b/car-mirror/src/pull.rs index d13cfee..f4aa5bd 100644 --- a/car-mirror/src/pull.rs +++ b/car-mirror/src/pull.rs @@ -1,8 +1,8 @@ use crate::{ + cache::Cache, common::{block_receive, block_send, CarFile, Config, ReceiverState}, error::Error, messages::PullRequest, - traits::Cache, }; use libipld::Cid; use wnfs_common::BlockStore; @@ -17,13 +17,14 @@ use wnfs_common::BlockStore; /// /// Before actually sending the request over the network, /// make sure to check the `request.indicates_finished()`. -/// If true, the client already has all data. +/// If true, the client already has all data and the request +/// doesn't need to be sent. pub async fn request( root: Cid, last_response: Option, config: &Config, - store: &impl BlockStore, - cache: &impl Cache, + store: impl BlockStore, + cache: impl Cache, ) -> Result { Ok(block_receive(root, last_response, config, store, cache) .await? @@ -35,8 +36,8 @@ pub async fn response( root: Cid, request: PullRequest, config: &Config, - store: &impl BlockStore, - cache: &impl Cache, + store: impl BlockStore, + cache: impl Cache, ) -> Result { let receiver_state = Some(ReceiverState::from(request)); block_send(root, receiver_state, config, store, cache).await @@ -45,10 +46,10 @@ pub async fn response( #[cfg(test)] mod tests { use crate::{ + cache::NoCache, common::Config, dag_walk::DagWalk, test_utils::{setup_random_dag, Metrics}, - traits::NoCache, }; use anyhow::Result; use futures::TryStreamExt; @@ -68,7 +69,7 @@ mod tests { loop { let request_bytes = serde_ipld_dagcbor::to_vec(&request)?.len(); let response = - crate::pull::response(root, request, config, server_store, &NoCache).await?; + crate::pull::response(root, request, config, server_store, NoCache).await?; let response_bytes = response.bytes.len(); metrics.push(Metrics { @@ -96,10 +97,12 @@ mod tests { // client should have all data let client_cids = DagWalk::breadth_first([root]) .stream(client_store, &NoCache) + .and_then(|item| async move { item.to_cid() }) .try_collect::>() .await?; let server_cids = DagWalk::breadth_first([root]) .stream(server_store, &NoCache) + .and_then(|item| async move { item.to_cid() }) .try_collect::>() .await?; @@ -112,10 +115,10 @@ mod tests { #[cfg(test)] mod proptests { use crate::{ + cache::NoCache, common::Config, dag_walk::DagWalk, test_utils::{setup_blockstore, variable_blocksize_dag}, - traits::NoCache, }; use futures::TryStreamExt; use libipld::{Cid, Ipld}; @@ -142,11 +145,13 @@ mod proptests { // client should have all data let client_cids = DagWalk::breadth_first([root]) .stream(client_store, &NoCache) + .and_then(|item| async move { item.to_cid() }) .try_collect::>() .await .unwrap(); let server_cids = DagWalk::breadth_first([root]) .stream(server_store, &NoCache) + .and_then(|item| async move { item.to_cid() }) .try_collect::>() .await .unwrap(); diff --git a/car-mirror/src/push.rs b/car-mirror/src/push.rs index f4e36bc..f0d5c17 100644 --- a/car-mirror/src/push.rs +++ b/car-mirror/src/push.rs @@ -1,8 +1,8 @@ use crate::{ + cache::Cache, common::{block_receive, block_send, CarFile, Config, ReceiverState}, error::Error, messages::PushResponse, - traits::Cache, }; use libipld_core::cid::Cid; use wnfs_common::BlockStore; @@ -21,8 +21,8 @@ pub async fn request( root: Cid, last_response: Option, config: &Config, - store: &impl BlockStore, - cache: &impl Cache, + store: impl BlockStore, + cache: impl Cache, ) -> Result { let receiver_state = last_response.map(ReceiverState::from); block_send(root, receiver_state, config, store, cache).await @@ -40,8 +40,8 @@ pub async fn response( root: Cid, request: CarFile, config: &Config, - store: &impl BlockStore, - cache: &impl Cache, + store: impl BlockStore, + cache: impl Cache, ) -> Result { Ok(block_receive(root, Some(request), config, store, cache) .await? @@ -51,13 +51,13 @@ pub async fn response( #[cfg(test)] mod tests { use crate::{ + cache::NoCache, common::Config, dag_walk::DagWalk, test_utils::{ get_cid_at_approx_path, setup_random_dag, total_dag_blocks, total_dag_bytes, Metrics, Rvg, }, - traits::NoCache, }; use anyhow::Result; use futures::TryStreamExt; @@ -105,10 +105,12 @@ mod tests { // receiver should have all data let client_cids = DagWalk::breadth_first([root]) .stream(client_store, &NoCache) + .and_then(|item| async move { item.to_cid() }) .try_collect::>() .await?; let server_cids = DagWalk::breadth_first([root]) .stream(server_store, &NoCache) + .and_then(|item| async move { item.to_cid() }) .try_collect::>() .await?; @@ -186,10 +188,10 @@ mod tests { #[cfg(test)] mod proptests { use crate::{ + cache::NoCache, common::Config, dag_walk::DagWalk, test_utils::{setup_blockstore, variable_blocksize_dag}, - traits::NoCache, }; use futures::TryStreamExt; use libipld::{Cid, Ipld}; @@ -216,11 +218,13 @@ mod proptests { // client should have all data let client_cids = DagWalk::breadth_first([root]) .stream(client_store, &NoCache) + .and_then(|item| async move { item.to_cid() }) .try_collect::>() .await .unwrap(); let server_cids = DagWalk::breadth_first([root]) .stream(server_store, &NoCache) + .and_then(|item| async move { item.to_cid() }) .try_collect::>() .await .unwrap(); diff --git a/car-mirror/src/test_utils/local_utils.rs b/car-mirror/src/test_utils/local_utils.rs index 2281d78..e1b08ab 100644 --- a/car-mirror/src/test_utils/local_utils.rs +++ b/car-mirror/src/test_utils/local_utils.rs @@ -1,6 +1,6 @@ ///! Crate-local test utilities use super::{arb_ipld_dag, links_to_padded_ipld, setup_blockstore, Rvg}; -use crate::{common::references, dag_walk::DagWalk, error::Error, traits::NoCache}; +use crate::{cache::NoCache, common::references, dag_walk::DagWalk, error::Error}; use anyhow::Result; use futures::TryStreamExt; use libipld::{Cid, Ipld}; @@ -71,6 +71,7 @@ pub(crate) async fn setup_random_dag( pub(crate) async fn total_dag_bytes(root: Cid, store: &impl BlockStore) -> Result { Ok(DagWalk::breadth_first([root]) .stream(store, &NoCache) + .and_then(|item| async move { item.to_cid() }) .try_filter_map(|cid| async move { let block = store .get_block(&cid) diff --git a/car-mirror/src/traits.rs b/car-mirror/src/traits.rs deleted file mode 100644 index 67eca42..0000000 --- a/car-mirror/src/traits.rs +++ /dev/null @@ -1,407 +0,0 @@ -use crate::common::references; -use anyhow::Result; -use async_trait::async_trait; -use libipld::{Cid, IpldCodec}; -#[cfg(feature = "quick_cache")] -use wnfs_common::utils::Arc; -use wnfs_common::{utils::CondSync, BlockStore, BlockStoreError}; - -/// This trait abstracts caches used by the car mirror implementation. -/// An efficient cache implementation can significantly reduce the amount -/// of lookups into the blockstore. -/// -/// At the moment, all caches are either memoization tables or informationally -/// monotonous, so you don't need to be careful about cache eviction. -/// -/// See `InMemoryCache` for a `quick_cache`-based implementation -/// (enable the `quick-cache` feature), and `NoCache` for disabling the cache. -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -pub trait Cache: CondSync { - /// This returns further references from the block referenced by given CID, - /// if the cache is hit. - /// Returns `None` if it's a cache miss. - /// - /// This isn't meant to be called directly, instead use `Cache::references`. - async fn get_references_cache(&self, cid: Cid) -> Result>>; - - /// Populates the references cache for given CID with given references. - async fn put_references_cache(&self, cid: Cid, references: Vec) -> Result<()>; - - /// This returns whether the cache has the fact stored that a block with given - /// CID is stored. - /// - /// This only returns `true` in case the block has been stored. - /// `false` simply indicates that the cache doesn't know whether the block is - /// stored or not (it's always a cache miss). - /// - /// Don't call this directly, instead, use `Cache::has_block`. - async fn get_has_block_cache(&self, cid: &Cid) -> Result; - - /// This populates the cache with the fact that a block with given CID is stored. - async fn put_has_block_cache(&self, cid: Cid) -> Result<()>; - - /// Find out any CIDs that are linked to from the block with given CID. - /// - /// This makes use of the cache via `get_references_cached`, if possible. - /// If the cache is missed, then it will fetch the block, compute the references - /// and automatically populate the cache using `put_references_cached`. - async fn references(&self, cid: Cid, store: &impl BlockStore) -> Result> { - // raw blocks don't have further links - let raw_codec: u64 = IpldCodec::Raw.into(); - if cid.codec() == raw_codec { - return Ok(Vec::new()); - } - - if let Some(refs) = self.get_references_cache(cid).await? { - return Ok(refs); - } - - let block = store.get_block(&cid).await?; - let refs = references(cid, block, Vec::new())?; - self.put_references_cache(cid, refs.clone()).await?; - Ok(refs) - } - - /// Find out whether a given block is stored in given blockstore or not. - /// - /// This cache is *only* effective on `true` values for `has_block`. - /// Repeatedly calling `has_block` with `Cid`s of blocks that are *not* - /// stored will cause repeated calls to given blockstore. - /// - /// **Make sure to always use the same `BlockStore` when calling this function.** - /// - /// This makes use of the caches `get_has_block_cached`, if possible. - /// On cache misses, this will actually fetch the block from the store - /// and if successful, populate the cache using `put_has_block_cached`. - async fn has_block(&self, cid: Cid, store: &impl BlockStore) -> Result { - if self.get_has_block_cache(&cid).await? { - return Ok(true); - } - - match store.get_block(&cid).await { - Ok(_) => { - self.put_has_block_cache(cid).await?; - Ok(true) - } - Err(e) if matches!(e.downcast_ref(), Some(BlockStoreError::CIDNotFound(_))) => { - Ok(false) - } - Err(e) => Err(e), - } - } -} - -/// A [quick-cache]-based implementation of a car mirror cache. -/// -/// [quick-cache]: https://github.com/arthurprs/quick-cache/ -#[cfg(feature = "quick_cache")] -#[derive(Debug, Clone)] -pub struct InMemoryCache { - references: Arc>>, - has_blocks: Arc>, -} - -#[cfg(feature = "quick_cache")] -impl InMemoryCache { - /// Create a new in-memory cache that approximately holds - /// cached references for `approx_references_capacity` CIDs - /// and `approx_has_blocks_capacity` CIDs known to be stored locally. - /// - /// Computing the expected memory requirements for the reference - /// cache isn't easy. - /// A block in theory have up to thousands of references. - /// [UnixFS] chunked files will reference up to 174 chunks - /// at a time. - /// Each CID takes up 96 bytes of memory. - /// In the UnixFS worst case of 175 CIDs per cache entry, - /// you need to reserve up to `175 * 96B = 16.8KB` of space - /// per entry. Thus, if you want your cache to not outgrow - /// ~100MB (ignoring the internal cache structure space - /// requirements), you can store up to `100MB / 16.8KB = 5952` - /// entries. - /// - /// In practice, the fanout average will be much lower than 174. - /// - /// On the other hand, each cache entry for the `has_blocks` cache - /// will take a little more than 64 bytes, so for a 10MB - /// `has_blocks` cache, you would use `10MB / 64bytes = 156_250`. - /// - /// [UnixFS]: https://github.com/ipfs/specs/blob/main/UNIXFS.md#layout - pub fn new(approx_references_capacity: usize, approx_has_blocks_capacity: usize) -> Self { - Self { - references: Arc::new(quick_cache::sync::Cache::new(approx_references_capacity)), - has_blocks: Arc::new(quick_cache::sync::Cache::new(approx_has_blocks_capacity)), - } - } -} - -#[cfg(feature = "quick_cache")] -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -impl Cache for InMemoryCache { - async fn get_references_cache(&self, cid: Cid) -> Result>> { - Ok(self.references.get(&cid)) - } - - async fn put_references_cache(&self, cid: Cid, references: Vec) -> Result<()> { - self.references.insert(cid, references); - Ok(()) - } - - async fn get_has_block_cache(&self, cid: &Cid) -> Result { - Ok(self.has_blocks.get(cid).is_some()) - } - - async fn put_has_block_cache(&self, cid: Cid) -> Result<()> { - self.has_blocks.insert(cid, ()); - Ok(()) - } -} - -/// An implementation of `Cache` that doesn't cache at all. -#[derive(Debug)] -pub struct NoCache; - -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -impl Cache for NoCache { - async fn get_references_cache(&self, _: Cid) -> Result>> { - Ok(None) - } - - async fn put_references_cache(&self, _: Cid, _: Vec) -> Result<()> { - Ok(()) - } - - async fn get_has_block_cache(&self, _: &Cid) -> Result { - Ok(false) - } - - async fn put_has_block_cache(&self, _: Cid) -> Result<()> { - Ok(()) - } -} - -#[cfg(feature = "quick_cache")] -#[cfg(test)] -mod quick_cache_tests { - use super::{Cache, InMemoryCache}; - use libipld::{Ipld, IpldCodec}; - use testresult::TestResult; - use wnfs_common::{BlockStore, MemoryBlockStore}; - - #[test_log::test(async_std::test)] - async fn test_has_block_cache() -> TestResult { - let store = &MemoryBlockStore::new(); - let cache = InMemoryCache::new(10_000, 150_000); - - let cid = store - .put_block(b"Hello, World!".to_vec(), IpldCodec::Raw.into()) - .await?; - - // Initially, the cache is unpopulated - assert!(!cache.get_has_block_cache(&cid).await?); - - // Then, we populate that cache - assert!(cache.has_block(cid, store).await?); - - // Now, the cache should be populated - assert!(cache.get_has_block_cache(&cid).await?); - - Ok(()) - } - - #[test_log::test(async_std::test)] - async fn test_references_cache() -> TestResult { - let store = &MemoryBlockStore::new(); - let cache = InMemoryCache::new(10_000, 150_000); - - let hello_one_cid = store - .put_block(b"Hello, One?".to_vec(), IpldCodec::Raw.into()) - .await?; - let hello_two_cid = store - .put_block(b"Hello, Two?".to_vec(), IpldCodec::Raw.into()) - .await?; - let cid = store - .put_serializable(&Ipld::List(vec![ - Ipld::Link(hello_one_cid), - Ipld::Link(hello_two_cid), - ])) - .await?; - - // Cache unpopulated initially - assert_eq!(cache.get_references_cache(cid).await?, None); - - // This should populate the references cache - assert_eq!( - cache.references(cid, store).await?, - vec![hello_one_cid, hello_two_cid] - ); - - // Cache should now contain the references - assert_eq!( - cache.get_references_cache(cid).await?, - Some(vec![hello_one_cid, hello_two_cid]) - ); - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::{Cache, NoCache}; - use anyhow::Result; - use async_trait::async_trait; - use libipld::{Cid, Ipld, IpldCodec}; - use std::{ - collections::{HashMap, HashSet}, - sync::RwLock, - }; - use testresult::TestResult; - use wnfs_common::{BlockStore, MemoryBlockStore}; - - #[derive(Debug, Default)] - struct HashMapCache { - references: RwLock>>, - has_blocks: RwLock>, - } - - #[async_trait] - impl Cache for HashMapCache { - async fn get_references_cache(&self, cid: Cid) -> Result>> { - Ok(self.references.read().unwrap().get(&cid).cloned()) - } - - async fn put_references_cache(&self, cid: Cid, references: Vec) -> Result<()> { - self.references.write().unwrap().insert(cid, references); - Ok(()) - } - - async fn get_has_block_cache(&self, cid: &Cid) -> Result { - Ok(self.has_blocks.read().unwrap().contains(cid)) - } - - async fn put_has_block_cache(&self, cid: Cid) -> Result<()> { - self.has_blocks.write().unwrap().insert(cid); - Ok(()) - } - } - - #[test_log::test(async_std::test)] - async fn test_has_block_cache() -> TestResult { - let store = &MemoryBlockStore::new(); - let cache = HashMapCache::default(); - - let cid = store - .put_block(b"Hello, World!".to_vec(), IpldCodec::Raw.into()) - .await?; - - // Initially, the cache is unpopulated - assert!(!cache.get_has_block_cache(&cid).await?); - - // Then, we populate that cache - assert!(cache.has_block(cid, store).await?); - - // Now, the cache should be populated - assert!(cache.get_has_block_cache(&cid).await?); - - Ok(()) - } - - #[test_log::test(async_std::test)] - async fn test_references_cache() -> TestResult { - let store = &MemoryBlockStore::new(); - let cache = HashMapCache::default(); - - let hello_one_cid = store - .put_block(b"Hello, One?".to_vec(), IpldCodec::Raw.into()) - .await?; - let hello_two_cid = store - .put_block(b"Hello, Two?".to_vec(), IpldCodec::Raw.into()) - .await?; - let cid = store - .put_serializable(&Ipld::List(vec![ - Ipld::Link(hello_one_cid), - Ipld::Link(hello_two_cid), - ])) - .await?; - - // Cache unpopulated initially - assert_eq!(cache.get_references_cache(cid).await?, None); - - // This should populate the references cache - assert_eq!( - cache.references(cid, store).await?, - vec![hello_one_cid, hello_two_cid] - ); - - // Cache should now contain the references - assert_eq!( - cache.get_references_cache(cid).await?, - Some(vec![hello_one_cid, hello_two_cid]) - ); - - Ok(()) - } - - #[test_log::test(async_std::test)] - async fn test_no_cache_has_block() -> TestResult { - let store = &MemoryBlockStore::new(); - let cache = NoCache; - - let cid = store - .put_block(b"Hello, World!".to_vec(), IpldCodec::Raw.into()) - .await?; - - let not_stored_cid = store.create_cid(b"Hi!", IpldCodec::Raw.into())?; - - // Cache should start out unpopulated - assert!(!cache.get_has_block_cache(&cid).await?); - - // Then we "try to populate it". - assert!(cache.has_block(cid, store).await?); - - // It should still give correct answers - assert!(!cache.has_block(not_stored_cid, store).await?); - - // Still, it should stay unpopulated - assert!(!cache.get_has_block_cache(&cid).await?); - - Ok(()) - } - - #[test_log::test(async_std::test)] - async fn test_no_cache_references() -> TestResult { - let store = &MemoryBlockStore::new(); - let cache = NoCache; - - let hello_one_cid = store - .put_block(b"Hello, One?".to_vec(), IpldCodec::Raw.into()) - .await?; - let hello_two_cid = store - .put_block(b"Hello, Two?".to_vec(), IpldCodec::Raw.into()) - .await?; - let cid = store - .put_serializable(&Ipld::List(vec![ - Ipld::Link(hello_one_cid), - Ipld::Link(hello_two_cid), - ])) - .await?; - - // Cache should start out unpopulated - assert_eq!(cache.get_references_cache(cid).await?, None); - - // We should get the correct answer for our queries - assert_eq!( - cache.references(cid, store).await?, - vec![hello_one_cid, hello_two_cid] - ); - - // We don't have a populated cache though - assert_eq!(cache.get_references_cache(cid).await?, None); - - Ok(()) - } -} diff --git a/deny.toml b/deny.toml index a6c5ca1..120a945 100644 --- a/deny.toml +++ b/deny.toml @@ -191,7 +191,7 @@ unknown-git = "deny" # if not specified. If it is specified but empty, no registries are allowed. allow-registry = ["https://github.com/rust-lang/crates.io-index"] # List of URLs for allowed Git repositories -allow-git = [] +allow-git = ["https://github.com/wnfs-wg/rs-wnfs"] #[sources.allow-org] # 1 or more github.com organizations to allow git sources for