Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ability to open a hash as a seekable bao file #72

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ async-channel = "2.3.1"
bao-tree = { version = "0.13", features = [
"tokio_fsm",
"validate",
"experimental-mixed",
], default-features = false }
blake3 = { version = "1.4.5", package = "iroh-blake3" }
bytes = { version = "1.7", features = ["serde"] }
Expand Down Expand Up @@ -77,6 +78,7 @@ walkdir = { version = "2.5.0", optional = true }
# Examples
console = { version = "0.15.8", optional = true }
tracing-test = "0.2.5"
positioned-io = "0.3.3"

[dev-dependencies]
http-body = "1.0"
Expand Down Expand Up @@ -189,3 +191,4 @@ incremental = false
iroh = { git = "https://github.com/n0-computer/iroh.git", branch = "main" }
iroh-base = { git = "https://github.com/n0-computer/iroh.git", branch = "main" }
quic-rpc = { git = "https://github.com/n0-computer/quic-rpc", branch = "main" }
bao-tree = { git = "https://github.com/n0-computer/bao-tree", branch = "read_and_seek" }
24 changes: 19 additions & 5 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ use proto::{
AddPathRequest, AddPathResponse, AddStreamRequest, AddStreamResponse, AddStreamUpdate,
BatchAddPathRequest, BatchAddPathResponse, BatchAddStreamRequest, BatchAddStreamResponse,
BatchAddStreamUpdate, BatchCreateRequest, BatchCreateResponse, BatchCreateTempTagRequest,
BatchUpdate, BlobStatusRequest, BlobStatusResponse, ConsistencyCheckRequest,
CreateCollectionRequest, CreateCollectionResponse, DeleteRequest, DownloadResponse,
ExportRequest, ExportResponse, ListIncompleteRequest, ListRequest, ReadAtRequest,
ReadAtResponse, ValidateRequest,
BatchUpdate, BlobEntryInfoRequest, BlobStatusRequest, BlobStatusResponse,
ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse, DeleteRequest,
DownloadResponse, ExportRequest, ExportResponse, ListIncompleteRequest, ListRequest,
ReadAtRequest, ReadAtResponse, ValidateRequest,
},
tags::{
CreateRequest as TagsCreateRequest, DeleteRequest as TagDeleteRequest,
Expand All @@ -51,7 +51,9 @@ use crate::{
},
net_protocol::{BlobDownloadRequest, Blobs, BlobsInner},
provider::{AddProgress, BatchAddPathProgress},
store::{ConsistencyCheckProgress, ImportProgress, MapEntry, ValidateProgress},
store::{
ConsistencyCheckProgress, EntryPathOrData, ImportProgress, MapEntry, ValidateProgress,
},
util::{
local_pool::LocalPoolHandle,
progress::{AsyncChannelProgressSender, ProgressSender},
Expand Down Expand Up @@ -203,6 +205,7 @@ impl<D: crate::store::Store> Handler<D> {
.await
}
BatchCreateTempTag(msg) => chan.rpc(msg, self, Self::batch_create_temp_tag).await,
EntryInfo(msg) => chan.rpc(msg, self, Self::blob_entry_info).await,
}
}

Expand Down Expand Up @@ -309,6 +312,17 @@ impl<D: crate::store::Store> Handler<D> {
Ok(())
}

async fn blob_entry_info(
self,
msg: BlobEntryInfoRequest,
) -> RpcResult<Option<EntryPathOrData>> {
Ok(self
.store()
.entry_path_or_data(msg.hash)
.await
.map_err(|e| RpcError::new(&e))?)
}

fn blob_list_tags(self, msg: TagListRequest) -> impl Stream<Item = TagInfo> + Send + 'static {
tracing::info!("blob_list_tags");
let blobs = self;
Expand Down
35 changes: 32 additions & 3 deletions src/rpc/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ use std::{
};

use anyhow::{anyhow, Context as _, Result};
use bao_tree::{
io::{baofile::BaoFile, outboard::PreOrderOutboard},
BaoTree,
};
use bytes::Bytes;
use futures_lite::{Stream, StreamExt};
use futures_util::SinkExt;
Expand All @@ -87,10 +91,10 @@ use crate::{
format::collection::{Collection, SimpleStore},
get::db::DownloadProgress as BytesDownloadProgress,
net_protocol::BlobDownloadRequest,
rpc::proto::RpcService,
rpc::proto::{blobs::BlobEntryInfoRequest, RpcService},
store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
util::SetTagOption,
BlobFormat, Hash, Tag,
util::{MemOrFile, SetTagOption},
BlobFormat, Hash, Tag, IROH_BLOCK_SIZE,
};

mod batch;
Expand Down Expand Up @@ -380,6 +384,31 @@ where
))
}

/// Open a blob as an independent bao file
pub async fn open(&self, hash: Hash) -> Result<impl std::io::Read + std::io::Seek> {
let Some(info) = self.rpc.rpc(BlobEntryInfoRequest { hash }).await?? else {
return Err(anyhow!("Blob not found"));
};
let (data, size) = match info.data {
MemOrFile::Mem(data) => (MemOrFile::Mem(data.clone()), data.len() as u64),
MemOrFile::File((path, size)) => (MemOrFile::File(std::fs::File::open(path)?), size),
};
let outboard = match info.outboard {
MemOrFile::Mem(data) => MemOrFile::Mem(data.clone()),
MemOrFile::File(path) => MemOrFile::File(std::fs::File::open(path)?),
};
let file = BaoFile {
data,
outboard: PreOrderOutboard {
tree: BaoTree::new(size, IROH_BLOCK_SIZE),
root: hash.into(),
data: outboard,
},
};
let file = positioned_io::Cursor::new(file);
Ok(file)
}

/// Export a blob from the internal blob store to a path on the node's filesystem.
///
/// `destination` should be an writeable, absolute path on the local node's filesystem.
Expand Down
14 changes: 12 additions & 2 deletions src/rpc/proto/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use crate::{
provider::{AddProgress, BatchAddPathProgress},
rpc::client::blobs::{BlobInfo, BlobStatus, IncompleteBlobInfo, ReadAtLen, WrapOption},
store::{
BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ImportMode,
ValidateProgress,
BaoBlobSize, ConsistencyCheckProgress, EntryPathOrData, ExportFormat, ExportMode,
ImportMode, ValidateProgress,
},
util::SetTagOption,
BlobFormat, Hash, HashAndFormat, Tag,
Expand Down Expand Up @@ -63,6 +63,8 @@ pub enum Request {
BatchAddPath(BatchAddPathRequest),
#[rpc(response = RpcResult<()>)]
BatchCreateTempTag(BatchCreateTempTagRequest),
#[rpc(response = RpcResult<Option<EntryPathOrData>>)]
EntryInfo(BlobEntryInfoRequest),
}

#[allow(missing_docs)]
Expand All @@ -83,6 +85,7 @@ pub enum Response {
BatchCreate(BatchCreateResponse),
BatchAddStream(BatchAddStreamResponse),
BatchAddPath(BatchAddPathResponse),
EntryInfo(RpcResult<Option<EntryPathOrData>>),
}

/// A request to the node to provide the data at the given path
Expand Down Expand Up @@ -313,6 +316,13 @@ pub struct BatchAddPathRequest {
pub batch: BatchId,
}

/// Write a blob from a byte stream
#[derive(Serialize, Deserialize, Debug)]
pub struct BlobEntryInfoRequest {
/// The hash of the blob
pub hash: Hash,
}

/// Response to a batch add path request
#[derive(Serialize, Deserialize, Debug)]
pub struct BatchAddPathResponse(pub BatchAddPathProgress);
23 changes: 13 additions & 10 deletions src/store/bao_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,7 @@ impl FileStorage {
}

fn current_size(&self) -> io::Result<u64> {
let len = self.sizes.metadata()?.len();
if len < 8 {
Ok(0)
} else {
// todo: use the last full u64 in case the sizes file is not a multiple of 8
// bytes. Not sure how that would happen, but we should handle it.
let mut buf = [0u8; 8];
self.sizes.read_exact_at(len - 8, &mut buf)?;
Ok(u64::from_le_bytes(buf))
}
read_current_size(&self.sizes)
}

fn write_batch(&mut self, size: u64, batch: &[BaoContentItem]) -> io::Result<()> {
Expand Down Expand Up @@ -470,6 +461,18 @@ impl AsyncSliceReader for OutboardReader {
}
}

pub fn read_current_size(sizes: &File) -> io::Result<u64> {
let len = sizes.metadata()?.len();
if len < 8 {
Ok(0)
} else {
let len = len & !7;
let mut buf = [0u8; 8];
sizes.read_exact_at(len - 8, &mut buf)?;
Ok(u64::from_le_bytes(buf))
}
}

enum HandleChange {
None,
MemToFile,
Expand Down
86 changes: 83 additions & 3 deletions src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ use tables::{ReadOnlyTables, ReadableTables, Tables};

use self::{tables::DeleteSet, test_support::EntryData, util::PeekableFlumeReceiver};
use super::{
bao_file::{BaoFileConfig, BaoFileHandle, BaoFileHandleWeak, CreateCb},
temp_name, BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, EntryStatus, ExportMode,
ExportProgressCb, ImportMode, ImportProgress, Map, ReadableStore, TempCounterMap,
bao_file::{read_current_size, BaoFileConfig, BaoFileHandle, BaoFileHandleWeak, CreateCb},
temp_name, BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, EntryPathOrData, EntryStatus,
ExportMode, ExportProgressCb, ImportMode, ImportProgress, Map, ReadableStore, TempCounterMap,
};
use crate::{
store::{
Expand Down Expand Up @@ -532,6 +532,10 @@ pub(crate) enum ActorMessage {
hash: Hash,
tx: oneshot::Sender<ActorResult<EntryStatus>>,
},
EntryPathOrData {
hash: Hash,
tx: oneshot::Sender<ActorResult<Option<EntryPathOrData>>>,
},
#[cfg(test)]
/// Query method: get the full entry state for a hash, both in memory and in redb.
/// This is everything we got about the entry, including the actual inline outboard and data.
Expand Down Expand Up @@ -664,6 +668,7 @@ impl ActorMessage {
| Self::Tags { .. }
| Self::GcStart { .. }
| Self::GetFullEntryState { .. }
| Self::EntryPathOrData { .. }
| Self::Dump => MessageCategory::ReadOnly,
Self::Import { .. }
| Self::Export { .. }
Expand Down Expand Up @@ -870,6 +875,14 @@ impl StoreInner {
Ok(tags)
}

async fn entry_path_or_data(&self, hash: Hash) -> OuterResult<Option<EntryPathOrData>> {
let (tx, rx) = oneshot::channel();
self.tx
.send(ActorMessage::EntryPathOrData { hash, tx })
.await?;
Ok(rx.await??)
}

async fn set_tag(&self, tag: Tag, value: Option<HashAndFormat>) -> OuterResult<()> {
let (tx, rx) = oneshot::channel();
self.tx
Expand Down Expand Up @@ -1371,6 +1384,10 @@ impl super::Store for Store {
.await??)
}

async fn entry_path_or_data(&self, hash: Hash) -> io::Result<Option<EntryPathOrData>> {
Ok(self.0.entry_path_or_data(hash).await?)
}

async fn set_tag(&self, name: Tag, hash: Option<HashAndFormat>) -> io::Result<()> {
Ok(self.0.set_tag(name, hash).await?)
}
Expand Down Expand Up @@ -2266,6 +2283,65 @@ impl ActorState {
Ok(())
}

fn entry_path_or_data(
&mut self,
tables: &impl ReadableTables,
hash: Hash,
) -> ActorResult<Option<EntryPathOrData>> {
let data_path = || self.options.path.owned_data_path(&hash);
let outboard_path = || self.options.path.owned_outboard_path(&hash);
let sizes_path = || self.options.path.owned_sizes_path(&hash);
Ok(match tables.blobs().get(hash)? {
Some(guard) => match guard.value() {
EntryState::Complete {
data_location,
outboard_location,
} => {
let data = match data_location {
DataLocation::External(paths, size) => {
let path = paths.first().ok_or_else(|| {
ActorError::Inconsistent("external data missing".to_owned())
})?;
MemOrFile::File((path.clone(), size))
}
DataLocation::Owned(size) => MemOrFile::File((data_path(), size)),
DataLocation::Inline(_) => {
let data = tables.inline_data().get(hash)?.ok_or_else(|| {
ActorError::Inconsistent("inline data missing".to_owned())
})?;
MemOrFile::Mem(data.value().to_vec().into())
}
};
let outboard = match outboard_location {
OutboardLocation::Owned => MemOrFile::File(outboard_path()),
OutboardLocation::Inline(_) => MemOrFile::Mem(
tables
.inline_outboard()
.get(hash)?
.ok_or_else(|| {
ActorError::Inconsistent("inline outboard missing".to_owned())
})?
.value()
.to_vec()
.into(),
),
OutboardLocation::NotNeeded => MemOrFile::Mem(Bytes::new()),
};
Some(EntryPathOrData { data, outboard })
}
EntryState::Partial { .. } => {
let sizes = std::fs::File::open(sizes_path())?;
let size = read_current_size(&sizes)?;
Some(EntryPathOrData {
data: MemOrFile::File((data_path(), size)),
outboard: MemOrFile::File(outboard_path()),
})
}
},
None => None,
})
}

fn handle_toplevel(&mut self, db: &redb::Database, msg: ActorMessage) -> ActorResult<()> {
match msg {
ActorMessage::UpdateInlineOptions {
Expand Down Expand Up @@ -2339,6 +2415,10 @@ impl ActorState {
let res = self.get_full_entry_state(tables, hash);
tx.send(res).ok();
}
ActorMessage::EntryPathOrData { hash, tx } => {
let res = self.entry_path_or_data(tables, hash);
tx.send(res).ok();
}
x => return Ok(Err(x)),
}
Ok(Ok(()))
Expand Down
Loading
Loading