From 6281c10ba5a922e19fdcc5a53d13406ded588009 Mon Sep 17 00:00:00 2001 From: "Dr. Chat" Date: Sun, 12 Jan 2025 15:09:16 -0600 Subject: [PATCH] API updates; Implement `CarStore::create` --- atrium-repo/src/blockstore/car.rs | 51 +++++++++++++++++++++++++------ atrium-repo/src/lib.rs | 1 + atrium-repo/src/repo.rs | 4 +-- examples/firehose/src/main.rs | 2 +- 4 files changed, 45 insertions(+), 13 deletions(-) diff --git a/atrium-repo/src/blockstore/car.rs b/atrium-repo/src/blockstore/car.rs index 4d9e0eef..7e7788d6 100644 --- a/atrium-repo/src/blockstore/car.rs +++ b/atrium-repo/src/blockstore/car.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, convert::Infallible}; use futures::{AsyncReadExt as _, AsyncSeekExt as _}; use ipld_core::cid::{multihash::Multihash, Cid, Version}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use sha2::Digest; use tokio::io::{ AsyncRead, AsyncReadExt as _, AsyncSeek, AsyncSeekExt as _, AsyncWrite, AsyncWriteExt as _, @@ -15,7 +15,7 @@ use crate::blockstore::{self, AsyncBlockStoreRead, SHA2_256}; use super::AsyncBlockStoreWrite; -#[derive(Debug, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct V1Header { pub version: u64, pub roots: Vec, @@ -63,7 +63,8 @@ pub struct CarStore { } impl CarStore { - pub async fn new(mut storage: R) -> Result { + /// Open a pre-existing CAR file. + pub async fn open(mut storage: R) -> Result { // Read the header. let header_len = unsigned_varint::aio::read_usize((&mut storage).compat()).await?; let mut header_bytes = vec![0; header_len]; @@ -114,8 +115,38 @@ impl CarStore { Ok(Self { storage, header, index }) } - pub fn header(&self) -> &V1Header { - &self.header + pub fn roots(&self) -> impl Iterator { + self.header.roots.clone().into_iter() + } +} + +impl CarStore { + pub async fn create(mut storage: S) -> Result { + // HACK: Create a header with a single root entry (most commonly desired). + // We do this here because it's hard to delete/insert bytes into a pre-existing file. + let header = V1Header { version: 1, roots: vec![Cid::default()] }; + + let header_bytes = serde_ipld_dagcbor::to_vec(&header).unwrap(); + let mut buf = unsigned_varint::encode::usize_buffer(); + let buf = unsigned_varint::encode::usize(header_bytes.len(), &mut buf); + storage.write_all(&buf).await?; + storage.write_all(&header_bytes).await?; + + Ok(Self { storage, header, index: HashMap::new() }) + } + + pub async fn set_root(&mut self, root: Cid) -> Result<(), Error> { + // HACK: The root array must be the same length in order to avoid shifting the file's contents. + self.header.roots = vec![root]; + + let header_bytes = serde_ipld_dagcbor::to_vec(&self.header).unwrap(); + let mut buf = unsigned_varint::encode::usize_buffer(); + let buf = unsigned_varint::encode::usize(header_bytes.len(), &mut buf); + self.storage.seek(SeekFrom::Start(0)).await?; + self.storage.write_all(&buf).await?; + self.storage.write_all(&header_bytes).await?; + + Ok(()) } } @@ -176,19 +207,19 @@ impl AsyncBlockStoreWrite /// Errors that can occur while interacting with a CAR. #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("Invalid CID: {0}")] + #[error("invalid CID: {0}")] Cid(#[from] ipld_core::cid::Error), #[error("CID does not exist in CAR")] CidNotFound, - #[error("Invalid hash")] + #[error("file hash does not match computed hash for block")] InvalidHash, - #[error("Invalid explicit CID v0")] + #[error("invalid explicit CID v0")] InvalidCidV0, - #[error("Invalid varint: {0}")] + #[error("invalid varint: {0}")] InvalidVarint(#[from] unsigned_varint::io::ReadError), #[error("IO error: {0}")] Io(#[from] std::io::Error), - #[error("Invalid Multihash: {0}")] + #[error("invalid Multihash: {0}")] Multihash(#[from] ipld_core::cid::multihash::Error), #[error("serde_ipld_dagcbor decoding error: {0}")] Parse(#[from] serde_ipld_dagcbor::DecodeError), diff --git a/atrium-repo/src/lib.rs b/atrium-repo/src/lib.rs index cdf240fa..c8b8e857 100644 --- a/atrium-repo/src/lib.rs +++ b/atrium-repo/src/lib.rs @@ -2,4 +2,5 @@ pub mod blockstore; pub mod mst; pub mod repo; +pub use ipld_core::cid::Cid; pub use repo::Repository; diff --git a/atrium-repo/src/repo.rs b/atrium-repo/src/repo.rs index 3e7cde82..ca4f4c47 100644 --- a/atrium-repo/src/repo.rs +++ b/atrium-repo/src/repo.rs @@ -192,8 +192,8 @@ mod test { async fn load( bytes: &[u8], ) -> Result>>, Box> { - let db = CarStore::new(std::io::Cursor::new(bytes)).await?; - let root = db.header().roots[0]; + let db = CarStore::open(std::io::Cursor::new(bytes)).await?; + let root = db.roots().next().unwrap(); Repository::open(db, root).await.map_err(Into::into) } diff --git a/examples/firehose/src/main.rs b/examples/firehose/src/main.rs index ebd648ac..35e38ea6 100644 --- a/examples/firehose/src/main.rs +++ b/examples/firehose/src/main.rs @@ -59,7 +59,7 @@ struct Firehose; impl CommitHandler for Firehose { async fn handle_commit(&self, commit: &Commit) -> Result<()> { let mut repo = Repository::open( - CarStore::new(std::io::Cursor::new(commit.blocks.as_slice())).await?, + CarStore::open(std::io::Cursor::new(commit.blocks.as_slice())).await?, // N.B: This same CID is also specified inside of the `CarStore`, accessible // via `car.header().roots[0]`. commit.commit.0,