Skip to content

Commit

Permalink
refactor: Use newer BlockStore trait methods
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus23 committed Feb 15, 2024
1 parent 1d86575 commit 69de1f3
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 309 deletions.
38 changes: 11 additions & 27 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 14 additions & 6 deletions car-mirror-benches/benches/artificially_slow_blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ pub fn push_throttled(c: &mut Criterion) {
},
|(client_store, root)| {
let client_store = &ThrottledBlockStore(client_store);
let client_cache = &InMemoryCache::new(10_000, 150_000);
let client_cache = &InMemoryCache::new(10_000);
let server_store = &ThrottledBlockStore::new();
let server_cache = &InMemoryCache::new(10_000, 150_000);
let server_cache = &InMemoryCache::new(10_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down Expand Up @@ -81,9 +81,9 @@ pub fn pull_throttled(c: &mut Criterion) {
},
|(server_store, root)| {
let server_store = &ThrottledBlockStore(server_store);
let server_cache = &InMemoryCache::new(10_000, 150_000);
let server_cache = &InMemoryCache::new(10_000);
let client_store = &ThrottledBlockStore::new();
let client_cache = &InMemoryCache::new(10_000, 150_000);
let client_cache = &InMemoryCache::new(10_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down Expand Up @@ -117,14 +117,22 @@ struct ThrottledBlockStore(MemoryBlockStore);

impl BlockStore for ThrottledBlockStore {
async fn get_block(&self, cid: &Cid) -> Result<Bytes> {
let bytes = self.0.get_block(cid).await?;
async_std::task::sleep(Duration::from_micros(50)).await; // Block fetching is artifically slowed by 50 microseconds
Ok(bytes)
self.0.get_block(cid).await
}

async fn put_block(&self, bytes: impl Into<Bytes> + CondSend, codec: u64) -> Result<Cid> {
self.0.put_block(bytes, codec).await
}

async fn put_block_keyed(&self, cid: Cid, bytes: impl Into<Bytes> + CondSend) -> Result<()> {
self.0.put_block_keyed(cid, bytes).await
}

async fn has_block(&self, cid: &Cid) -> Result<bool> {
// The idea is that has_block would be faster than `get_block`, as it should be managed closer to CPU memory
self.0.has_block(cid).await
}
}

impl ThrottledBlockStore {
Expand Down
8 changes: 4 additions & 4 deletions car-mirror-benches/benches/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ pub fn push(c: &mut Criterion) {
(store, root)
},
|(ref client_store, root)| {
let client_cache = &InMemoryCache::new(10_000, 150_000);
let client_cache = &InMemoryCache::new(10_000);
let server_store = &MemoryBlockStore::new();
let server_cache = &InMemoryCache::new(10_000, 150_000);
let server_cache = &InMemoryCache::new(10_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down Expand Up @@ -68,9 +68,9 @@ pub fn pull(c: &mut Criterion) {
(store, root)
},
|(ref server_store, root)| {
let server_cache = &InMemoryCache::new(10_000, 150_000);
let server_cache = &InMemoryCache::new(10_000);
let client_store = &MemoryBlockStore::new();
let client_cache = &InMemoryCache::new(10_000, 150_000);
let client_cache = &InMemoryCache::new(10_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down
8 changes: 4 additions & 4 deletions car-mirror-benches/benches/simulated_latency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ pub fn pull_with_simulated_latency(
links_to_padded_ipld(block_padding),
));
let store = async_std::task::block_on(setup_blockstore(blocks)).unwrap();
let cache = InMemoryCache::new(10_000, 150_000);
let cache = InMemoryCache::new(10_000);
(store, cache, root)
},
|(ref server_store, ref server_cache, root)| {
let client_store = &MemoryBlockStore::new();
let client_cache = &InMemoryCache::new(10_000, 150_000);
let client_cache = &InMemoryCache::new(10_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down Expand Up @@ -145,12 +145,12 @@ pub fn push_with_simulated_latency(
links_to_padded_ipld(block_padding),
));
let store = async_std::task::block_on(setup_blockstore(blocks)).unwrap();
let cache = InMemoryCache::new(10_000, 150_000);
let cache = InMemoryCache::new(10_000);
(store, cache, root)
},
|(ref client_store, ref client_cache, root)| {
let server_store = &MemoryBlockStore::new();
let server_cache = &InMemoryCache::new(10_000, 150_000);
let server_cache = &InMemoryCache::new(10_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down
13 changes: 9 additions & 4 deletions car-mirror/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ async fn car_frame_from_block(block: (Cid, Bytes)) -> Result<Bytes, Error> {
Ok(bytes.into())
}

/// Ensure that any requested subgraph roots are actually part
/// of the DAG from the root.
async fn verify_missing_subgraph_roots(
root: Cid,
missing_subgraph_roots: &[Cid],
Expand All @@ -316,9 +318,10 @@ async fn verify_missing_subgraph_roots(
) -> Result<Vec<Cid>, Error> {
let subgraph_roots: Vec<Cid> = DagWalk::breadth_first([root])
.stream(store, cache)
.try_filter_map(
|cid| async move { Ok(missing_subgraph_roots.contains(&cid).then_some(cid)) },
)
.try_filter_map(|item| async move {
let cid = item.to_cid()?;
Ok(missing_subgraph_roots.contains(&cid).then_some(cid))
})
.try_collect()
.await?;

Expand Down Expand Up @@ -362,7 +365,9 @@ fn stream_blocks_from_roots<'a>(
Box::pin(async_stream::try_stream! {
let mut dag_walk = DagWalk::breadth_first(subgraph_roots.clone());

while let Some(cid) = dag_walk.next(&store, &cache).await? {
while let Some(item) = dag_walk.next(&store, &cache).await? {
let cid = item.to_cid()?;

if should_block_be_skipped(&cid, &bloom, &subgraph_roots) {
continue;
}
Expand Down
Loading

0 comments on commit 69de1f3

Please sign in to comment.