Skip to content

Commit

Permalink
refactor: Remove async_trait dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus23 committed Feb 13, 2024
1 parent 3067020 commit 10c29ff
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 105 deletions.
35 changes: 26 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 13 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,19 @@
members = [
"car-mirror",
"car-mirror-benches",
"car-mirror-wasm"
,
"examples"]
"car-mirror-wasm",
"examples"
]

[workspace.dependencies]
anyhow = "1.0"
async-stream = "0.3.5"
bytes = "1.4"
futures = "0.3"
libipld = "0.16"
libipld-core = "0.16"
serde_ipld_dagcbor = "0.4"
wnfs-common = { version = "0.1.26", git = "https://github.com/wnfs-wg/rs-wnfs", branch = "matheus23/evolve-trait" }

# See https://doc.rust-lang.org/cargo/reference/profiles.html for more info.
[profile.release.package.car-mirror-wasm]
Expand Down
11 changes: 5 additions & 6 deletions car-mirror-benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ edition = "2021"
authors = ["Philipp Krüger <[email protected]>"]

[dependencies]
anyhow = "1.0"
anyhow = { workspace = true }
async-std = { version = "1.11", features = ["attributes"] }
async-trait = "0.1"
bytes = "1.4.0"
bytes = { workspace = true }
car-mirror = { path = "../car-mirror", version = "0.1", features = ["test_utils", "quick_cache"] }
libipld = "0.16.0"
serde_ipld_dagcbor = "0.4.0"
wnfs-common = "0.1.23"
libipld = { workspace = true }
serde_ipld_dagcbor = { workspace = true }
wnfs-common = { workspace = true }

[dev-dependencies]
criterion = { version = "0.4", default-features = false }
Expand Down
20 changes: 12 additions & 8 deletions car-mirror-benches/benches/artificially_slow_blockstore.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
use car_mirror::{
common::Config,
Expand Down Expand Up @@ -35,19 +34,26 @@ pub fn push_throttled(c: &mut Criterion) {

// Simulate a multi-round protocol run in-memory
async_std::task::block_on(async move {
let mut request =
push::request(root, None, config, client_store, client_cache).await?;
let mut last_response = None;
loop {
let request = push::request(
root,
last_response,
config,
client_store.clone(),
client_cache.clone(),
)
.await?;

let response =
push::response(root, request, config, server_store, server_cache)
.await?;

if response.indicates_finished() {
break;
}
request =
push::request(root, Some(response), config, client_store, client_cache)
.await?;

last_response = Some(response);
}

Ok::<(), anyhow::Error>(())
Expand Down Expand Up @@ -109,8 +115,6 @@ pub fn pull_throttled(c: &mut Criterion) {
#[derive(Debug, Clone)]
struct ThrottledBlockStore(MemoryBlockStore);

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl BlockStore for ThrottledBlockStore {
async fn get_block(&self, cid: &Cid) -> Result<Bytes> {
let bytes = self.0.get_block(cid).await?;
Expand Down
16 changes: 8 additions & 8 deletions car-mirror/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,23 @@ path = "src/lib.rs"
doctest = true

[dependencies]
anyhow = "1.0"
async-trait = "0.1.73"
bytes = "1.4"
anyhow = { workspace = true }
async-stream = { workspace = true }
bytes = { workspace = true }
deterministic-bloom = "0.1"
futures = "0.3"
futures = { workspace = true }
iroh-car = "0.4"
libipld = "0.16"
libipld-core = "0.16"
libipld = { workspace = true }
libipld-core = { workspace = true }
proptest = { version = "1.1", optional = true }
quick_cache = { version = "0.4", optional = true }
roaring-graphs = { version = "0.12", optional = true }
serde = "^1"
serde_ipld_dagcbor = "0.4"
serde_ipld_dagcbor = { workspace = true }
thiserror = "1.0"
tokio = { version = "^1", default-features = false }
tracing = "0.1"
wnfs-common = "0.1.26"
wnfs-common = { workspace = true }

[dev-dependencies]
async-std = { version = "1.11", features = ["attributes"] }
Expand Down
53 changes: 26 additions & 27 deletions car-mirror/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use bytes::Bytes;
use deterministic_bloom::runtime_size::BloomFilter;
use futures::{future, StreamExt, TryStreamExt};
use futures::{StreamExt, TryStreamExt};
use iroh_car::{CarHeader, CarReader, CarWriter};
use libipld::{Ipld, IpldCodec};
use libipld_core::{cid::Cid, codec::References};
Expand Down Expand Up @@ -72,8 +72,8 @@ pub async fn block_send(
root: Cid,
last_state: Option<ReceiverState>,
config: &Config,
store: &impl BlockStore,
cache: &impl Cache,
store: impl BlockStore,
cache: impl Cache,
) -> Result<CarFile, Error> {
let bytes = block_send_car_stream(
root,
Expand All @@ -94,13 +94,13 @@ pub async fn block_send(
///
/// It uses the car file format for framing blocks & CIDs in the given `AsyncWrite`.
#[instrument(skip_all, fields(root, last_state))]
pub async fn block_send_car_stream<'a, W: tokio::io::AsyncWrite + Unpin + Send>(
pub async fn block_send_car_stream<W: tokio::io::AsyncWrite + Unpin + Send>(
root: Cid,
last_state: Option<ReceiverState>,
stream: W,
send_limit: Option<usize>,
store: &impl BlockStore,
cache: &impl Cache,
store: impl BlockStore,
cache: impl Cache,
) -> Result<W, Error> {
let mut block_stream = block_send_block_stream(root, last_state, store, cache).await?;
write_blocks_into_car(stream, &mut block_stream, send_limit).await
Expand All @@ -111,8 +111,8 @@ pub async fn block_send_car_stream<'a, W: tokio::io::AsyncWrite + Unpin + Send>(
pub async fn block_send_block_stream<'a>(
root: Cid,
last_state: Option<ReceiverState>,
store: &'a impl BlockStore,
cache: &'a impl Cache,
store: impl BlockStore + 'a,
cache: impl Cache + 'a,
) -> Result<BlockStream<'a>, Error> {
let ReceiverState {
missing_subgraph_roots,
Expand All @@ -124,7 +124,7 @@ pub async fn block_send_block_stream<'a>(

// Verify that all missing subgraph roots are in the relevant DAG:
let subgraph_roots =
verify_missing_subgraph_roots(root, &missing_subgraph_roots, store, cache).await?;
verify_missing_subgraph_roots(root, &missing_subgraph_roots, &store, &cache).await?;

let bloom = handle_missing_bloom(have_cids_bloom);

Expand Down Expand Up @@ -356,23 +356,22 @@ fn handle_missing_bloom(have_cids_bloom: Option<BloomFilter>) -> BloomFilter {
fn stream_blocks_from_roots<'a>(
subgraph_roots: Vec<Cid>,
bloom: BloomFilter,
store: &'a impl BlockStore,
cache: &'a impl Cache,
store: impl BlockStore + 'a,
cache: impl Cache + 'a,
) -> BlockStream<'a> {
Box::pin(
DagWalk::breadth_first(subgraph_roots.clone())
.stream(store, cache)
.try_filter(move |cid| {
future::ready(!should_block_be_skipped(cid, &bloom, &subgraph_roots))
})
.and_then(move |cid| async move {
let bytes = store
.get_block(&cid)
.await
.map_err(Error::BlockStoreError)?;
Ok((cid, bytes))
}),
)
Box::pin(async_stream::try_stream! {
let mut dag_walk = DagWalk::breadth_first(subgraph_roots.clone());

while let Some(cid) = dag_walk.next(&store, &cache).await? {
if should_block_be_skipped(&cid, &bloom, &subgraph_roots) {
continue;
}

let bytes = store.get_block(&cid).await.map_err(Error::BlockStoreError)?;

yield (cid, bytes);
}
})
}

async fn write_blocks_into_car<W: tokio::io::AsyncWrite + Unpin + Send>(
Expand Down Expand Up @@ -585,8 +584,8 @@ mod tests {
unimplemented!(),
unimplemented!(),
unimplemented!(),
unimplemented!() as &MemoryBlockStore,
&NoCache,
unimplemented!() as MemoryBlockStore,
NoCache,
)
});
assert_cond_send_sync(|| {
Expand Down
21 changes: 21 additions & 0 deletions car-mirror/src/dag_walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,27 @@ impl DagWalk {
}))
}

/// Turn this traversal into a stream that takes ownership of the store & cache.
///
/// In most cases `store` and `cache` should be cheaply-clonable types, so giving
/// the traversal ownership of them shouldn't be a big deal.
///
/// This helps with creating streams that are `: 'static`, which is useful for
/// anything that ends up being put into e.g. a tokio task.
pub fn stream_owned(
self,
store: impl BlockStore,
cache: impl Cache,
) -> impl Stream<Item = Result<Cid, Error>> + Unpin {
Box::pin(try_unfold(
(self, store, cache),
move |(mut this, store, cache)| async move {
let maybe_block = this.next(&store, &cache).await?;
Ok(maybe_block.map(|b| (b, (this, store, cache))))
},
))
}

/// Find out whether the traversal is finished.
///
/// The next call to `next` would result in `None` if this returns true.
Expand Down
6 changes: 3 additions & 3 deletions car-mirror/src/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ pub async fn response(
root: Cid,
request: PullRequest,
config: &Config,
store: &impl BlockStore,
cache: &impl Cache,
store: impl BlockStore,
cache: impl Cache,
) -> Result<CarFile, Error> {
let receiver_state = Some(ReceiverState::from(request));
block_send(root, receiver_state, config, store, cache).await
Expand Down Expand Up @@ -68,7 +68,7 @@ mod tests {
loop {
let request_bytes = serde_ipld_dagcbor::to_vec(&request)?.len();
let response =
crate::pull::response(root, request, config, server_store, &NoCache).await?;
crate::pull::response(root, request, config, server_store, NoCache).await?;
let response_bytes = response.bytes.len();

metrics.push(Metrics {
Expand Down
4 changes: 2 additions & 2 deletions car-mirror/src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ pub async fn request(
root: Cid,
last_response: Option<PushResponse>,
config: &Config,
store: &impl BlockStore,
cache: &impl Cache,
store: impl BlockStore,
cache: impl Cache,
) -> Result<CarFile, Error> {
let receiver_state = last_response.map(ReceiverState::from);
block_send(root, receiver_state, config, store, cache).await
Expand Down
Loading

0 comments on commit 10c29ff

Please sign in to comment.