Skip to content

Commit

Permalink
feat: Implement thiserror types
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus23 committed Aug 28, 2023
1 parent 436c1b6 commit 6252e84
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 45 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions car-mirror/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ proptest = { version = "1.1", optional = true }
roaring-graphs = { version = "0.12", optional = true }
serde = "1.0.183"
serde_ipld_dagcbor = "0.4.0"
thiserror = "1.0.47"
tracing = "0.1"
tracing-subscriber = "0.3"
wnfs-common = "0.1.23"
Expand Down
52 changes: 37 additions & 15 deletions car-mirror/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{anyhow, bail, Result};
use anyhow::anyhow;
use bytes::Bytes;
use deterministic_bloom::runtime_size::BloomFilter;
use futures::TryStreamExt;
Expand All @@ -11,6 +11,7 @@ use wnfs_common::BlockStore;

use crate::{
dag_walk::DagWalk,
error::Error,
incremental_verification::{BlockState, IncrementalDagVerification},
messages::{Bloom, PullRequest, PushResponse},
};
Expand Down Expand Up @@ -70,7 +71,7 @@ pub async fn block_send(
last_state: Option<ReceiverState>,
config: &Config,
store: &impl BlockStore,
) -> Result<CarFile> {
) -> Result<CarFile, Error> {
let ReceiverState {
ref missing_subgraph_roots,
have_cids_bloom,
Expand Down Expand Up @@ -128,7 +129,10 @@ pub async fn block_send(
Vec::new(),
);

writer.write_header().await?;
writer
.write_header()
.await
.map_err(|e| Error::CarFileError(anyhow!(e)))?;

let mut block_bytes = 0;
let mut dag_walk = DagWalk::breadth_first(subgraph_roots.clone());
Expand All @@ -150,7 +154,10 @@ pub async fn block_send(
"writing block to CAR",
);

writer.write(cid, &block).await?;
writer
.write(cid, &block)
.await
.map_err(|e| Error::CarFileError(anyhow!(e)))?;

// TODO(matheus23): Count the actual bytes sent?
// At the moment, this is a rough estimate. iroh-car could be improved to return the written bytes.
Expand All @@ -161,7 +168,11 @@ pub async fn block_send(
}

Ok(CarFile {
bytes: writer.finish().await?.into(),
bytes: writer
.finish()
.await
.map_err(|e| Error::CarFileError(anyhow!(e)))?
.into(),
})
}

Expand All @@ -179,14 +190,20 @@ pub async fn block_receive(
last_car: Option<CarFile>,
config: &Config,
store: &impl BlockStore,
) -> Result<ReceiverState> {
) -> Result<ReceiverState, Error> {
let mut dag_verification = IncrementalDagVerification::new([root], store).await?;

if let Some(car) = last_car {
let mut reader = CarReader::new(Cursor::new(car.bytes)).await?;
let mut reader = CarReader::new(Cursor::new(car.bytes))
.await
.map_err(|e| Error::CarFileError(anyhow!(e)))?;
let mut block_bytes = 0;

while let Some((cid, vec)) = reader.next_block().await? {
while let Some((cid, vec)) = reader
.next_block()
.await
.map_err(|e| Error::CarFileError(anyhow!(e)))?
{
let block = Bytes::from(vec);

debug!(
Expand All @@ -197,10 +214,10 @@ pub async fn block_receive(

block_bytes += block.len();
if block_bytes > config.receive_maximum {
bail!(
"Received more than {} bytes ({block_bytes}), aborting request.",
config.receive_maximum
);
return Err(Error::TooManyBytes {
block_bytes,
receive_maximum: config.receive_maximum,
});
}

match dag_verification.block_state(cid) {
Expand Down Expand Up @@ -273,13 +290,18 @@ pub async fn block_receive(
/// This will error out if
/// - the codec is not supported
/// - the block can't be parsed.
pub fn references<E: Extend<Cid>>(cid: Cid, block: impl AsRef<[u8]>, mut refs: E) -> Result<E> {
pub fn references<E: Extend<Cid>>(
cid: Cid,
block: impl AsRef<[u8]>,
mut refs: E,
) -> Result<E, Error> {
let codec: IpldCodec = cid
.codec()
.try_into()
.map_err(|_| anyhow!("Unsupported codec in Cid: {cid}"))?;
.map_err(|_| Error::UnsupportedCodec { cid })?;

<Ipld as References<IpldCodec>>::references(codec, &mut Cursor::new(block), &mut refs)?;
<Ipld as References<IpldCodec>>::references(codec, &mut Cursor::new(block), &mut refs)
.map_err(Error::ParsingError)?;
Ok(refs)
}

Expand Down
15 changes: 9 additions & 6 deletions car-mirror/src/dag_walk.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::common::references;
use anyhow::Result;
use crate::{common::references, error::Error};
use bytes::Bytes;
use futures::{stream::try_unfold, Stream};
use libipld_core::cid::Cid;
Expand Down Expand Up @@ -54,7 +53,7 @@ impl DagWalk {
/// Return the next node in the traversal.
///
/// Returns `None` if no nodes are left to be visited.
pub async fn next(&mut self, store: &impl BlockStore) -> Result<Option<(Cid, Bytes)>> {
pub async fn next(&mut self, store: &impl BlockStore) -> Result<Option<(Cid, Bytes)>, Error> {
let cid = loop {
let popped = if self.breadth_first {
self.frontier.pop_back()
Expand All @@ -75,7 +74,10 @@ impl DagWalk {
// TODO: Two opportunities for performance improvement:
// - skip Raw CIDs. They can't have further links (but needs adjustment to this function's return type)
// - run multiple `get_block` calls concurrently
let block = store.get_block(&cid).await?;
let block = store
.get_block(&cid)
.await
.map_err(Error::BlockStoreError)?;
for ref_cid in references(cid, &block, Vec::new())? {
if !self.visited.contains(&ref_cid) {
self.frontier.push_front(ref_cid);
Expand All @@ -89,7 +91,7 @@ impl DagWalk {
pub fn stream(
self,
store: &impl BlockStore,
) -> impl Stream<Item = Result<(Cid, Bytes)>> + Unpin + '_ {
) -> impl Stream<Item = Result<(Cid, Bytes), Error>> + Unpin + '_ {
Box::pin(try_unfold(self, move |mut this| async move {
let maybe_block = this.next(store).await?;
Ok(maybe_block.map(|b| (b, this)))
Expand All @@ -110,7 +112,7 @@ impl DagWalk {
}

/// Skip a node from the traversal for now.
pub fn skip_walking(&mut self, block: (Cid, Bytes)) -> Result<()> {
pub fn skip_walking(&mut self, block: (Cid, Bytes)) -> Result<(), Error> {
let (cid, bytes) = block;
let refs = references(cid, bytes, HashSet::new())?;
self.visited.insert(cid);
Expand All @@ -124,6 +126,7 @@ impl DagWalk {
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Result;
use futures::TryStreamExt;
use libipld::Ipld;
use wnfs_common::MemoryBlockStore;
Expand Down
92 changes: 92 additions & 0 deletions car-mirror/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use libipld::Cid;

use crate::incremental_verification::BlockState;

/// Errors raised from the CAR mirror library
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// An error raised during receival of blocks, when more than the configured maximum
/// bytes are received in a single batch. See the `Config` type.
#[error("Received more than {receive_maximum} bytes ({block_bytes}), aborting request.")]
TooManyBytes {
/// The configured amount of maximum bytes to receive
receive_maximum: usize,
/// The actual amount of bytes received so far
block_bytes: usize,
},

/// This library only supports a subset of default codecs, including DAG-CBOR, DAG-JSON, DAG-PB and more.g
/// This is raised if an unknown codec is read from a CID. See the `libipld` library for more information.
#[error("Unsupported codec in Cid: {cid}")]
UnsupportedCodec {
/// The CID with the unsupported codec
cid: Cid,
},

/// This library only supports a subset of default hash functions, including SHA-256, SHA-3, BLAKE3 and more.
/// This is raised if an unknown hash code is read from a CID. See the `libipld` library for more information.
#[error("Unsupported hash code in CID {cid}")]
UnsupportedHashCode {
/// The CID with the unsupported hash function
cid: Cid,
},

/// This error is raised when the hash function that the `BlockStore` uses a different hashing function
/// than the blocks which are received over the wire.
/// This error will be removed in the future, when the block store trait gets modified to support specifying
/// the hash function.
#[error("BlockStore uses an incompatible hashing function: CID mismatched, expected {cid}, got {actual_cid}")]
BlockStoreIncompatible {
/// The expected CID
cid: Box<Cid>,
/// The CID returned from the BlockStore implementation
actual_cid: Box<Cid>,
},

// -------------
// Anyhow Errors
// -------------
/// An error raised when trying to parse a block (e.g. to look for further links)
#[error("Error during block parsing: {0}")]
ParsingError(anyhow::Error),

/// An error rasied when trying to read or write a CAR file.
#[error("CAR (de)serialization error: {0}")]
CarFileError(anyhow::Error),

/// An error rasied from the blockstore.
#[error("BlockStore error: {0}")]
BlockStoreError(anyhow::Error),

// ----------
// Sub-errors
// ----------
/// Errors related to incremental verification
#[error(transparent)]
IncrementalVerificationError(#[from] IncrementalVerificationError),
}

/// Errors related to incremental verification
#[derive(thiserror::Error, Debug)]
pub enum IncrementalVerificationError {
/// Raised when we receive a block with a CID that we don't expect.
/// We only expect blocks when they're related to the root CID of a DAG.
/// So a CID needs to have a path back to the root.
#[error("Expected to want block {cid}, but block state is: {block_state:?}")]
ExpectedWantedBlock {
/// The CID of the block we're currently processing
cid: Box<Cid>,
/// The block state it has during incremental verification.
/// So either we already have it or it's unexpected.
block_state: BlockState,
},

/// Raised when the block stored in the CAR file doesn't match its hash.
#[error("Digest mismatch in CAR file: expected {cid}, got {actual_cid}")]
DigestMismatch {
/// The expected CID
cid: Box<Cid>,
/// The CID it actually hashes to
actual_cid: Box<Cid>,
},
}
Loading

0 comments on commit 6252e84

Please sign in to comment.