Skip to content

Commit

Permalink
API updates; Implement CarStore::create
Browse files Browse the repository at this point in the history
  • Loading branch information
DrChat committed Jan 12, 2025
1 parent 4ee9bb9 commit 6281c10
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 13 deletions.
51 changes: 41 additions & 10 deletions atrium-repo/src/blockstore/car.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{collections::HashMap, convert::Infallible};

use futures::{AsyncReadExt as _, AsyncSeekExt as _};
use ipld_core::cid::{multihash::Multihash, Cid, Version};
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use sha2::Digest;
use tokio::io::{
AsyncRead, AsyncReadExt as _, AsyncSeek, AsyncSeekExt as _, AsyncWrite, AsyncWriteExt as _,
Expand All @@ -15,7 +15,7 @@ use crate::blockstore::{self, AsyncBlockStoreRead, SHA2_256};

use super::AsyncBlockStoreWrite;

#[derive(Debug, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct V1Header {
pub version: u64,
pub roots: Vec<Cid>,
Expand Down Expand Up @@ -63,7 +63,8 @@ pub struct CarStore<S: AsyncRead + AsyncSeek> {
}

impl<R: AsyncRead + AsyncSeek + Unpin> CarStore<R> {
pub async fn new(mut storage: R) -> Result<Self, Error> {
/// Open a pre-existing CAR file.
pub async fn open(mut storage: R) -> Result<Self, Error> {
// Read the header.
let header_len = unsigned_varint::aio::read_usize((&mut storage).compat()).await?;
let mut header_bytes = vec![0; header_len];
Expand Down Expand Up @@ -114,8 +115,38 @@ impl<R: AsyncRead + AsyncSeek + Unpin> CarStore<R> {
Ok(Self { storage, header, index })
}

pub fn header(&self) -> &V1Header {
&self.header
pub fn roots(&self) -> impl Iterator<Item = Cid> {
self.header.roots.clone().into_iter()
}
}

impl<S: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin> CarStore<S> {
pub async fn create(mut storage: S) -> Result<Self, Error> {
// HACK: Create a header with a single root entry (most commonly desired).
// We do this here because it's hard to delete/insert bytes into a pre-existing file.
let header = V1Header { version: 1, roots: vec![Cid::default()] };

let header_bytes = serde_ipld_dagcbor::to_vec(&header).unwrap();
let mut buf = unsigned_varint::encode::usize_buffer();
let buf = unsigned_varint::encode::usize(header_bytes.len(), &mut buf);
storage.write_all(&buf).await?;
storage.write_all(&header_bytes).await?;

Ok(Self { storage, header, index: HashMap::new() })
}

pub async fn set_root(&mut self, root: Cid) -> Result<(), Error> {
// HACK: The root array must be the same length in order to avoid shifting the file's contents.
self.header.roots = vec![root];

let header_bytes = serde_ipld_dagcbor::to_vec(&self.header).unwrap();
let mut buf = unsigned_varint::encode::usize_buffer();
let buf = unsigned_varint::encode::usize(header_bytes.len(), &mut buf);
self.storage.seek(SeekFrom::Start(0)).await?;
self.storage.write_all(&buf).await?;
self.storage.write_all(&header_bytes).await?;

Ok(())
}
}

Expand Down Expand Up @@ -176,19 +207,19 @@ impl<R: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin> AsyncBlockStoreWrite
/// Errors that can occur while interacting with a CAR.
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Invalid CID: {0}")]
#[error("invalid CID: {0}")]
Cid(#[from] ipld_core::cid::Error),
#[error("CID does not exist in CAR")]
CidNotFound,
#[error("Invalid hash")]
#[error("file hash does not match computed hash for block")]
InvalidHash,
#[error("Invalid explicit CID v0")]
#[error("invalid explicit CID v0")]
InvalidCidV0,
#[error("Invalid varint: {0}")]
#[error("invalid varint: {0}")]
InvalidVarint(#[from] unsigned_varint::io::ReadError),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Invalid Multihash: {0}")]
#[error("invalid Multihash: {0}")]
Multihash(#[from] ipld_core::cid::multihash::Error),
#[error("serde_ipld_dagcbor decoding error: {0}")]
Parse(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
Expand Down
1 change: 1 addition & 0 deletions atrium-repo/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ pub mod blockstore;
pub mod mst;
pub mod repo;

pub use ipld_core::cid::Cid;
pub use repo::Repository;
4 changes: 2 additions & 2 deletions atrium-repo/src/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ mod test {
async fn load(
bytes: &[u8],
) -> Result<Repository<CarStore<std::io::Cursor<&[u8]>>>, Box<dyn std::error::Error>> {
let db = CarStore::new(std::io::Cursor::new(bytes)).await?;
let root = db.header().roots[0];
let db = CarStore::open(std::io::Cursor::new(bytes)).await?;
let root = db.roots().next().unwrap();

Repository::open(db, root).await.map_err(Into::into)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/firehose/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ struct Firehose;
impl CommitHandler for Firehose {
async fn handle_commit(&self, commit: &Commit) -> Result<()> {
let mut repo = Repository::open(
CarStore::new(std::io::Cursor::new(commit.blocks.as_slice())).await?,
CarStore::open(std::io::Cursor::new(commit.blocks.as_slice())).await?,
// N.B: This same CID is also specified inside of the `CarStore`, accessible
// via `car.header().roots[0]`.
commit.commit.0,
Expand Down

0 comments on commit 6281c10

Please sign in to comment.