diff --git a/Cargo.lock b/Cargo.lock index ce3cd4a..b1b4615 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -233,6 +233,28 @@ dependencies = [ "wasm-bindgen-futures", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "async-task" version = "4.7.0" @@ -427,7 +449,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-std", - "async-trait", + "async-stream", "bytes", "car-mirror", "deterministic-bloom", @@ -456,7 +478,6 @@ version = "0.1.0" dependencies = [ "anyhow", "async-std", - "async-trait", "bytes", "car-mirror", "criterion", @@ -495,12 +516,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.83" +version = "1.0.85" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" -dependencies = [ - "libc", -] +checksum = "9b918671670962b48bc23753aef0c51d072dca6f52f01f800854ada6ddb7f7d3" [[package]] name = "cfg-if" @@ -2567,8 +2585,7 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "wnfs-common" version = "0.1.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1395a47e38402df060d3448fe153c5af1eae6f27aeca9c2e79e5a39bb355efab" +source = "git+https://github.com/wnfs-wg/rs-wnfs?branch=matheus23/evolve-trait#e8184827a9e7d59d85eea6730e5015c689c305c2" dependencies = [ "anyhow", "async-once-cell", diff --git a/Cargo.toml b/Cargo.toml index aa734b0..4966187 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,9 +2,19 @@ members = [ "car-mirror", "car-mirror-benches", - "car-mirror-wasm" -, - "examples"] + "car-mirror-wasm", + "examples" +] + +[workspace.dependencies] +anyhow = "1.0" +async-stream = "0.3.5" +bytes = "1.4" +futures = "0.3" +libipld = "0.16" +libipld-core = "0.16" +serde_ipld_dagcbor = "0.4" +wnfs-common = { version = "0.1.26", git = "https://github.com/wnfs-wg/rs-wnfs", branch = "matheus23/evolve-trait" } # See https://doc.rust-lang.org/cargo/reference/profiles.html for more info. [profile.release.package.car-mirror-wasm] diff --git a/car-mirror-benches/Cargo.toml b/car-mirror-benches/Cargo.toml index a9590b0..b0c162c 100644 --- a/car-mirror-benches/Cargo.toml +++ b/car-mirror-benches/Cargo.toml @@ -6,14 +6,13 @@ edition = "2021" authors = ["Philipp Krüger "] [dependencies] -anyhow = "1.0" +anyhow = { workspace = true } async-std = { version = "1.11", features = ["attributes"] } -async-trait = "0.1" -bytes = "1.4.0" +bytes = { workspace = true } car-mirror = { path = "../car-mirror", version = "0.1", features = ["test_utils", "quick_cache"] } -libipld = "0.16.0" -serde_ipld_dagcbor = "0.4.0" -wnfs-common = "0.1.23" +libipld = { workspace = true } +serde_ipld_dagcbor = { workspace = true } +wnfs-common = { workspace = true } [dev-dependencies] criterion = { version = "0.4", default-features = false } diff --git a/car-mirror-benches/benches/artificially_slow_blockstore.rs b/car-mirror-benches/benches/artificially_slow_blockstore.rs index 905bdd1..bdf5bea 100644 --- a/car-mirror-benches/benches/artificially_slow_blockstore.rs +++ b/car-mirror-benches/benches/artificially_slow_blockstore.rs @@ -1,5 +1,4 @@ use anyhow::Result; -use async_trait::async_trait; use bytes::Bytes; use car_mirror::{ common::Config, @@ -35,9 +34,17 @@ pub fn push_throttled(c: &mut Criterion) { // Simulate a multi-round protocol run in-memory async_std::task::block_on(async move { - let mut request = - push::request(root, None, config, client_store, client_cache).await?; + let mut last_response = None; loop { + let request = push::request( + root, + last_response, + config, + client_store.clone(), + client_cache.clone(), + ) + .await?; + let response = push::response(root, request, config, server_store, server_cache) .await?; @@ -45,9 +52,8 @@ pub fn push_throttled(c: &mut Criterion) { if response.indicates_finished() { break; } - request = - push::request(root, Some(response), config, client_store, client_cache) - .await?; + + last_response = Some(response); } Ok::<(), anyhow::Error>(()) @@ -109,8 +115,6 @@ pub fn pull_throttled(c: &mut Criterion) { #[derive(Debug, Clone)] struct ThrottledBlockStore(MemoryBlockStore); -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl BlockStore for ThrottledBlockStore { async fn get_block(&self, cid: &Cid) -> Result { let bytes = self.0.get_block(cid).await?; diff --git a/car-mirror/Cargo.toml b/car-mirror/Cargo.toml index aa0363d..6623dfa 100644 --- a/car-mirror/Cargo.toml +++ b/car-mirror/Cargo.toml @@ -18,23 +18,23 @@ path = "src/lib.rs" doctest = true [dependencies] -anyhow = "1.0" -async-trait = "0.1.73" -bytes = "1.4" +anyhow = { workspace = true } +async-stream = { workspace = true } +bytes = { workspace = true } deterministic-bloom = "0.1" -futures = "0.3" +futures = { workspace = true } iroh-car = "0.4" -libipld = "0.16" -libipld-core = "0.16" +libipld = { workspace = true } +libipld-core = { workspace = true } proptest = { version = "1.1", optional = true } quick_cache = { version = "0.4", optional = true } roaring-graphs = { version = "0.12", optional = true } serde = "^1" -serde_ipld_dagcbor = "0.4" +serde_ipld_dagcbor = { workspace = true } thiserror = "1.0" tokio = { version = "^1", default-features = false } tracing = "0.1" -wnfs-common = "0.1.26" +wnfs-common = { workspace = true } [dev-dependencies] async-std = { version = "1.11", features = ["attributes"] } diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index da62de9..c4fec7c 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -1,6 +1,6 @@ use bytes::Bytes; use deterministic_bloom::runtime_size::BloomFilter; -use futures::{future, StreamExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use iroh_car::{CarHeader, CarReader, CarWriter}; use libipld::{Ipld, IpldCodec}; use libipld_core::{cid::Cid, codec::References}; @@ -72,8 +72,8 @@ pub async fn block_send( root: Cid, last_state: Option, config: &Config, - store: &impl BlockStore, - cache: &impl Cache, + store: impl BlockStore, + cache: impl Cache, ) -> Result { let bytes = block_send_car_stream( root, @@ -94,13 +94,13 @@ pub async fn block_send( /// /// It uses the car file format for framing blocks & CIDs in the given `AsyncWrite`. #[instrument(skip_all, fields(root, last_state))] -pub async fn block_send_car_stream<'a, W: tokio::io::AsyncWrite + Unpin + Send>( +pub async fn block_send_car_stream( root: Cid, last_state: Option, stream: W, send_limit: Option, - store: &impl BlockStore, - cache: &impl Cache, + store: impl BlockStore, + cache: impl Cache, ) -> Result { let mut block_stream = block_send_block_stream(root, last_state, store, cache).await?; write_blocks_into_car(stream, &mut block_stream, send_limit).await @@ -111,8 +111,8 @@ pub async fn block_send_car_stream<'a, W: tokio::io::AsyncWrite + Unpin + Send>( pub async fn block_send_block_stream<'a>( root: Cid, last_state: Option, - store: &'a impl BlockStore, - cache: &'a impl Cache, + store: impl BlockStore + 'a, + cache: impl Cache + 'a, ) -> Result, Error> { let ReceiverState { missing_subgraph_roots, @@ -124,7 +124,7 @@ pub async fn block_send_block_stream<'a>( // Verify that all missing subgraph roots are in the relevant DAG: let subgraph_roots = - verify_missing_subgraph_roots(root, &missing_subgraph_roots, store, cache).await?; + verify_missing_subgraph_roots(root, &missing_subgraph_roots, &store, &cache).await?; let bloom = handle_missing_bloom(have_cids_bloom); @@ -356,23 +356,22 @@ fn handle_missing_bloom(have_cids_bloom: Option) -> BloomFilter { fn stream_blocks_from_roots<'a>( subgraph_roots: Vec, bloom: BloomFilter, - store: &'a impl BlockStore, - cache: &'a impl Cache, + store: impl BlockStore + 'a, + cache: impl Cache + 'a, ) -> BlockStream<'a> { - Box::pin( - DagWalk::breadth_first(subgraph_roots.clone()) - .stream(store, cache) - .try_filter(move |cid| { - future::ready(!should_block_be_skipped(cid, &bloom, &subgraph_roots)) - }) - .and_then(move |cid| async move { - let bytes = store - .get_block(&cid) - .await - .map_err(Error::BlockStoreError)?; - Ok((cid, bytes)) - }), - ) + Box::pin(async_stream::try_stream! { + let mut dag_walk = DagWalk::breadth_first(subgraph_roots.clone()); + + while let Some(cid) = dag_walk.next(&store, &cache).await? { + if should_block_be_skipped(&cid, &bloom, &subgraph_roots) { + continue; + } + + let bytes = store.get_block(&cid).await.map_err(Error::BlockStoreError)?; + + yield (cid, bytes); + } + }) } async fn write_blocks_into_car( @@ -585,8 +584,8 @@ mod tests { unimplemented!(), unimplemented!(), unimplemented!(), - unimplemented!() as &MemoryBlockStore, - &NoCache, + unimplemented!() as MemoryBlockStore, + NoCache, ) }); assert_cond_send_sync(|| { diff --git a/car-mirror/src/dag_walk.rs b/car-mirror/src/dag_walk.rs index 51ec790..02b63d9 100644 --- a/car-mirror/src/dag_walk.rs +++ b/car-mirror/src/dag_walk.rs @@ -101,6 +101,27 @@ impl DagWalk { })) } + /// Turn this traversal into a stream that takes ownership of the store & cache. + /// + /// In most cases `store` and `cache` should be cheaply-clonable types, so giving + /// the traversal ownership of them shouldn't be a big deal. + /// + /// This helps with creating streams that are `: 'static`, which is useful for + /// anything that ends up being put into e.g. a tokio task. + pub fn stream_owned( + self, + store: impl BlockStore, + cache: impl Cache, + ) -> impl Stream> + Unpin { + Box::pin(try_unfold( + (self, store, cache), + move |(mut this, store, cache)| async move { + let maybe_block = this.next(&store, &cache).await?; + Ok(maybe_block.map(|b| (b, (this, store, cache)))) + }, + )) + } + /// Find out whether the traversal is finished. /// /// The next call to `next` would result in `None` if this returns true. diff --git a/car-mirror/src/pull.rs b/car-mirror/src/pull.rs index d13cfee..9c72aca 100644 --- a/car-mirror/src/pull.rs +++ b/car-mirror/src/pull.rs @@ -35,8 +35,8 @@ pub async fn response( root: Cid, request: PullRequest, config: &Config, - store: &impl BlockStore, - cache: &impl Cache, + store: impl BlockStore, + cache: impl Cache, ) -> Result { let receiver_state = Some(ReceiverState::from(request)); block_send(root, receiver_state, config, store, cache).await @@ -68,7 +68,7 @@ mod tests { loop { let request_bytes = serde_ipld_dagcbor::to_vec(&request)?.len(); let response = - crate::pull::response(root, request, config, server_store, &NoCache).await?; + crate::pull::response(root, request, config, server_store, NoCache).await?; let response_bytes = response.bytes.len(); metrics.push(Metrics { diff --git a/car-mirror/src/push.rs b/car-mirror/src/push.rs index f4e36bc..de75136 100644 --- a/car-mirror/src/push.rs +++ b/car-mirror/src/push.rs @@ -21,8 +21,8 @@ pub async fn request( root: Cid, last_response: Option, config: &Config, - store: &impl BlockStore, - cache: &impl Cache, + store: impl BlockStore, + cache: impl Cache, ) -> Result { let receiver_state = last_response.map(ReceiverState::from); block_send(root, receiver_state, config, store, cache).await diff --git a/car-mirror/src/traits.rs b/car-mirror/src/traits.rs index 67eca42..cda7a61 100644 --- a/car-mirror/src/traits.rs +++ b/car-mirror/src/traits.rs @@ -1,10 +1,15 @@ +use std::ops::Deref; + use crate::common::references; use anyhow::Result; -use async_trait::async_trait; +use futures::Future; use libipld::{Cid, IpldCodec}; #[cfg(feature = "quick_cache")] use wnfs_common::utils::Arc; -use wnfs_common::{utils::CondSync, BlockStore, BlockStoreError}; +use wnfs_common::{ + utils::{CondSend, CondSync}, + BlockStore, BlockStoreError, +}; /// This trait abstracts caches used by the car mirror implementation. /// An efficient cache implementation can significantly reduce the amount @@ -15,18 +20,23 @@ use wnfs_common::{utils::CondSync, BlockStore, BlockStoreError}; /// /// See `InMemoryCache` for a `quick_cache`-based implementation /// (enable the `quick-cache` feature), and `NoCache` for disabling the cache. -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait Cache: CondSync { /// This returns further references from the block referenced by given CID, /// if the cache is hit. /// Returns `None` if it's a cache miss. /// /// This isn't meant to be called directly, instead use `Cache::references`. - async fn get_references_cache(&self, cid: Cid) -> Result>>; + fn get_references_cache( + &self, + cid: Cid, + ) -> impl Future>>> + CondSend; /// Populates the references cache for given CID with given references. - async fn put_references_cache(&self, cid: Cid, references: Vec) -> Result<()>; + fn put_references_cache( + &self, + cid: Cid, + references: Vec, + ) -> impl Future> + CondSend; /// This returns whether the cache has the fact stored that a block with given /// CID is stored. @@ -36,31 +46,37 @@ pub trait Cache: CondSync { /// stored or not (it's always a cache miss). /// /// Don't call this directly, instead, use `Cache::has_block`. - async fn get_has_block_cache(&self, cid: &Cid) -> Result; + fn get_has_block_cache(&self, cid: &Cid) -> impl Future> + CondSend; /// This populates the cache with the fact that a block with given CID is stored. - async fn put_has_block_cache(&self, cid: Cid) -> Result<()>; + fn put_has_block_cache(&self, cid: Cid) -> impl Future> + CondSend; /// Find out any CIDs that are linked to from the block with given CID. /// /// This makes use of the cache via `get_references_cached`, if possible. /// If the cache is missed, then it will fetch the block, compute the references /// and automatically populate the cache using `put_references_cached`. - async fn references(&self, cid: Cid, store: &impl BlockStore) -> Result> { - // raw blocks don't have further links - let raw_codec: u64 = IpldCodec::Raw.into(); - if cid.codec() == raw_codec { - return Ok(Vec::new()); - } + fn references( + &self, + cid: Cid, + store: &impl BlockStore, + ) -> impl Future>> + CondSend { + async move { + // raw blocks don't have further links + let raw_codec: u64 = IpldCodec::Raw.into(); + if cid.codec() == raw_codec { + return Ok(Vec::new()); + } - if let Some(refs) = self.get_references_cache(cid).await? { - return Ok(refs); - } + if let Some(refs) = self.get_references_cache(cid).await? { + return Ok(refs); + } - let block = store.get_block(&cid).await?; - let refs = references(cid, block, Vec::new())?; - self.put_references_cache(cid, refs.clone()).await?; - Ok(refs) + let block = store.get_block(&cid).await?; + let refs = references(cid, block, Vec::new())?; + self.put_references_cache(cid, refs.clone()).await?; + Ok(refs) + } } /// Find out whether a given block is stored in given blockstore or not. @@ -74,24 +90,48 @@ pub trait Cache: CondSync { /// This makes use of the caches `get_has_block_cached`, if possible. /// On cache misses, this will actually fetch the block from the store /// and if successful, populate the cache using `put_has_block_cached`. - async fn has_block(&self, cid: Cid, store: &impl BlockStore) -> Result { - if self.get_has_block_cache(&cid).await? { - return Ok(true); - } - - match store.get_block(&cid).await { - Ok(_) => { - self.put_has_block_cache(cid).await?; - Ok(true) + fn has_block( + &self, + cid: Cid, + store: &impl BlockStore, + ) -> impl Future> + CondSend { + async move { + if self.get_has_block_cache(&cid).await? { + return Ok(true); } - Err(e) if matches!(e.downcast_ref(), Some(BlockStoreError::CIDNotFound(_))) => { - Ok(false) + + match store.get_block(&cid).await { + Ok(_) => { + self.put_has_block_cache(cid).await?; + Ok(true) + } + Err(e) if matches!(e.downcast_ref(), Some(BlockStoreError::CIDNotFound(_))) => { + Ok(false) + } + Err(e) => Err(e), } - Err(e) => Err(e), } } } +impl + CondSync> Cache for T { + async fn get_references_cache(&self, cid: Cid) -> Result>> { + self.deref().get_references_cache(cid).await + } + + async fn put_references_cache(&self, cid: Cid, references: Vec) -> Result<()> { + self.deref().put_references_cache(cid, references).await + } + + async fn get_has_block_cache(&self, cid: &Cid) -> Result { + self.deref().get_has_block_cache(cid).await + } + + async fn put_has_block_cache(&self, cid: Cid) -> Result<()> { + self.deref().put_has_block_cache(cid).await + } +} + /// A [quick-cache]-based implementation of a car mirror cache. /// /// [quick-cache]: https://github.com/arthurprs/quick-cache/ @@ -137,8 +177,6 @@ impl InMemoryCache { } #[cfg(feature = "quick_cache")] -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Cache for InMemoryCache { async fn get_references_cache(&self, cid: Cid) -> Result>> { Ok(self.references.get(&cid)) @@ -163,8 +201,6 @@ impl Cache for InMemoryCache { #[derive(Debug)] pub struct NoCache; -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Cache for NoCache { async fn get_references_cache(&self, _: Cid) -> Result>> { Ok(None) @@ -253,7 +289,6 @@ mod quick_cache_tests { mod tests { use super::{Cache, NoCache}; use anyhow::Result; - use async_trait::async_trait; use libipld::{Cid, Ipld, IpldCodec}; use std::{ collections::{HashMap, HashSet}, @@ -268,7 +303,6 @@ mod tests { has_blocks: RwLock>, } - #[async_trait] impl Cache for HashMapCache { async fn get_references_cache(&self, cid: Cid) -> Result>> { Ok(self.references.read().unwrap().get(&cid).cloned())