Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Correctly handle raw-codec CIDs/blocks #37

Merged
merged 15 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions car-mirror-benches/benches/artificially_slow_blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use car_mirror::{
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use libipld::Cid;
use std::time::Duration;
use wnfs_common::{BlockStore, MemoryBlockStore};
use wnfs_common::{utils::CondSend, BlockStore, MemoryBlockStore};

pub fn push_throttled(c: &mut Criterion) {
let mut rvg = car_mirror::test_utils::Rvg::deterministic();
Expand All @@ -28,9 +28,9 @@ pub fn push_throttled(c: &mut Criterion) {
},
|(client_store, root)| {
let client_store = &ThrottledBlockStore(client_store);
let client_cache = &InMemoryCache::new(10_000);
let client_cache = &InMemoryCache::new(10_000, 150_000);
let server_store = &ThrottledBlockStore::new();
let server_cache = &InMemoryCache::new(10_000);
let server_cache = &InMemoryCache::new(10_000, 150_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down Expand Up @@ -75,9 +75,9 @@ pub fn pull_throttled(c: &mut Criterion) {
},
|(server_store, root)| {
let server_store = &ThrottledBlockStore(server_store);
let server_cache = &InMemoryCache::new(10_000);
let server_cache = &InMemoryCache::new(10_000, 150_000);
let client_store = &ThrottledBlockStore::new();
let client_cache = &InMemoryCache::new(10_000);
let client_cache = &InMemoryCache::new(10_000, 150_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down Expand Up @@ -109,15 +109,16 @@ pub fn pull_throttled(c: &mut Criterion) {
#[derive(Debug, Clone)]
struct ThrottledBlockStore(MemoryBlockStore);

#[async_trait(?Send)]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl BlockStore for ThrottledBlockStore {
async fn get_block(&self, cid: &Cid) -> Result<Bytes> {
let bytes = self.0.get_block(cid).await?;
async_std::task::sleep(Duration::from_micros(50)).await; // Block fetching is artifically slowed by 50 microseconds
Ok(bytes)
}

async fn put_block(&self, bytes: impl Into<Bytes>, codec: u64) -> Result<Cid> {
async fn put_block(&self, bytes: impl Into<Bytes> + CondSend, codec: u64) -> Result<Cid> {
self.0.put_block(bytes, codec).await
}
}
Expand Down
8 changes: 4 additions & 4 deletions car-mirror-benches/benches/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ pub fn push(c: &mut Criterion) {
(store, root)
},
|(ref client_store, root)| {
let client_cache = &InMemoryCache::new(10_000);
let client_cache = &InMemoryCache::new(10_000, 150_000);
let server_store = &MemoryBlockStore::new();
let server_cache = &InMemoryCache::new(10_000);
let server_cache = &InMemoryCache::new(10_000, 150_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down Expand Up @@ -68,9 +68,9 @@ pub fn pull(c: &mut Criterion) {
(store, root)
},
|(ref server_store, root)| {
let server_cache = &InMemoryCache::new(10_000);
let server_cache = &InMemoryCache::new(10_000, 150_000);
let client_store = &MemoryBlockStore::new();
let client_cache = &InMemoryCache::new(10_000);
let client_cache = &InMemoryCache::new(10_000, 150_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down
8 changes: 4 additions & 4 deletions car-mirror-benches/benches/simulated_latency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ pub fn pull_with_simulated_latency(
links_to_padded_ipld(block_padding),
));
let store = async_std::task::block_on(setup_blockstore(blocks)).unwrap();
let cache = InMemoryCache::new(10_000);
let cache = InMemoryCache::new(10_000, 150_000);
(store, cache, root)
},
|(ref server_store, ref server_cache, root)| {
let client_store = &MemoryBlockStore::new();
let client_cache = &InMemoryCache::new(10_000);
let client_cache = &InMemoryCache::new(10_000, 150_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down Expand Up @@ -145,12 +145,12 @@ pub fn push_with_simulated_latency(
links_to_padded_ipld(block_padding),
));
let store = async_std::task::block_on(setup_blockstore(blocks)).unwrap();
let cache = InMemoryCache::new(10_000);
let cache = InMemoryCache::new(10_000, 150_000);
(store, cache, root)
},
|(ref client_store, ref client_cache, root)| {
let server_store = &MemoryBlockStore::new();
let server_cache = &InMemoryCache::new(10_000);
let server_cache = &InMemoryCache::new(10_000, 150_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down
2 changes: 1 addition & 1 deletion car-mirror-wasm/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![cfg_attr(docsrs, feature(doc_cfg))]
#![warn(missing_debug_implementations, missing_docs, rust_2018_idioms)]
#![deny(unreachable_pub, private_in_public)]
#![deny(unreachable_pub)]

//! car-mirror

Expand Down
3 changes: 2 additions & 1 deletion car-mirror/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ thiserror = "1.0"
tokio = { version = "^1", default-features = false }
tracing = "0.1"
tracing-subscriber = "0.3"
wnfs-common = "0.1.24"
wnfs-common = "0.1.26"

[dev-dependencies]
async-std = { version = "1.11", features = ["attributes"] }
car-mirror = { path = ".", features = ["test_utils"] }
proptest = "1.1"
roaring-graphs = "0.12"
test-strategy = "0.3"
testresult = "0.3.0"

[features]
default = []
Expand Down
74 changes: 70 additions & 4 deletions car-mirror/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct Config {

/// Some information that the block receiving end provides the block sending end
/// in order to deduplicate block transfers.
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct ReceiverState {
/// At least *some* of the subgraph roots that are missing for sure on the receiving end.
pub missing_subgraph_roots: Vec<Cid>,
Expand All @@ -67,7 +67,7 @@ pub struct CarFile {
///
/// It returns a `CarFile` of (a subset) of all blocks below `root`, that
/// are thought to be missing on the receiving end.
#[instrument(skip(config, store, cache))]
#[instrument(skip_all, fields(root, last_state))]
pub async fn block_send(
root: Cid,
last_state: Option<ReceiverState>,
Expand Down Expand Up @@ -126,7 +126,7 @@ pub async fn block_send(
/// It takes a `CarFile`, verifies that its contents are related to the
/// `root` and returns some information to help the block sending side
/// figure out what blocks to send next.
#[instrument(skip(last_car, config, store, cache), fields(car_bytes = last_car.as_ref().map(|car| car.bytes.len())))]
#[instrument(skip_all, fields(root, car_bytes = last_car.as_ref().map(|car| car.bytes.len())))]
pub async fn block_receive(
root: Cid,
last_car: Option<CarFile>,
Expand Down Expand Up @@ -221,7 +221,7 @@ pub fn references<E: Extend<Cid>>(

async fn verify_missing_subgraph_roots(
root: Cid,
missing_subgraph_roots: &Vec<Cid>,
missing_subgraph_roots: &[Cid],
store: &impl BlockStore,
cache: &impl Cache,
) -> Result<Vec<Cid>, Error> {
Expand Down Expand Up @@ -450,3 +450,69 @@ impl Default for Config {
}
}
}

impl std::fmt::Debug for ReceiverState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let have_cids_bloom = self
.have_cids_bloom
.as_ref()
.map_or("None".into(), |bloom| {
format!(
"Some(BloomFilter(k_hashes = {}, {} bytes))",
bloom.hash_count(),
bloom.as_bytes().len()
)
});
f.debug_struct("ReceiverState")
.field(
"missing_subgraph_roots.len() == ",
&self.missing_subgraph_roots.len(),
)
.field("have_cids_bloom", &have_cids_bloom)
.finish()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{test_utils::assert_cond_send_sync, traits::NoCache};
use testresult::TestResult;
use wnfs_common::MemoryBlockStore;

#[allow(clippy::unreachable, unused)]
fn test_assert_send() {
assert_cond_send_sync(|| {
block_send(
unimplemented!(),
unimplemented!(),
unimplemented!(),
unimplemented!() as &MemoryBlockStore,
&NoCache,
)
});
assert_cond_send_sync(|| {
block_receive(
unimplemented!(),
unimplemented!(),
unimplemented!(),
unimplemented!() as &MemoryBlockStore,
&NoCache,
)
})
}

#[test]
fn test_receiver_state_is_not_a_huge_debug() -> TestResult {
let state = ReceiverState {
have_cids_bloom: Some(BloomFilter::new_from_size(4096, 1000)),
missing_subgraph_roots: vec![Cid::default(); 1000],
};

let debug_print = format!("{state:#?}");

assert!(debug_print.len() < 1000);

Ok(())
}
}
42 changes: 37 additions & 5 deletions car-mirror/src/incremental_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ impl IncrementalDagVerification {
Ok(this)
}

#[instrument(level = "trace", skip_all, fields(num_want = self.want_cids.len(), num_have = self.have_cids.len()))]
async fn update_have_cids(
/// Updates the state of incremental dag verification.
/// This goes through all "want" blocks and what they link to,
/// removing items that we now have and don't want anymore.
#[instrument(level = "trace", skip_all)]
pub async fn update_have_cids(
&mut self,
store: &impl BlockStore,
cache: &impl Cache,
Expand All @@ -68,25 +71,54 @@ impl IncrementalDagVerification {
if let Some(BlockStoreError::CIDNotFound(not_found)) =
e.downcast_ref::<BlockStoreError>()
{
self.want_cids.insert(*not_found);
tracing::trace!(%not_found, "Missing block, adding to want list");
self.mark_as_want(*not_found);
} else {
return Err(Error::BlockStoreError(e));
}
}
Err(e) => return Err(e),
Ok(Some(cid)) => {
self.want_cids.remove(&cid);
self.have_cids.insert(cid);
let not_found = matches!(
store.get_block(&cid).await,
Err(e) if matches!(e.downcast_ref(), Some(BlockStoreError::CIDNotFound(_)))
);

if not_found {
tracing::trace!(%cid, "Missing block, adding to want list");
self.mark_as_want(cid);
} else {
self.mark_as_have(cid);
}
}
Ok(None) => {
break;
}
}
}

tracing::debug!(
num_want = self.want_cids.len(),
num_have = self.have_cids.len(),
"Finished dag verification"
);

Ok(())
}

fn mark_as_want(&mut self, want: Cid) {
if self.have_cids.contains(&want) {
tracing::warn!(%want, "Marking a CID as wanted, that we have previously marked as having!");
self.have_cids.remove(&want);
}
self.want_cids.insert(want);
}

fn mark_as_have(&mut self, have: Cid) {
self.want_cids.remove(&have);
self.have_cids.insert(have);
}

/// Check the state of a CID to find out whether
/// - we expect it as one of the next possible blocks to receive (Want)
/// - we have already stored it (Have)
Expand Down
2 changes: 1 addition & 1 deletion car-mirror/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![cfg_attr(docsrs, feature(doc_cfg))]
#![warn(missing_debug_implementations, missing_docs, rust_2018_idioms)]
#![deny(unreachable_pub, private_in_public)]
#![deny(unreachable_pub)]

//! car-mirror

Expand Down
Loading
Loading