diff --git a/Cargo.lock b/Cargo.lock index b599df9d1..16671946a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -349,8 +349,7 @@ dependencies = [ [[package]] name = "bao-tree" version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1f7a89a8ee5889d2593ae422ce6e1bb03e48a0e8a16e4fa0882dfcbe7e182ef" +source = "git+https://github.com/n0-computer/bao-tree?branch=read_and_seek#bff3195faa89636b8bdb2ff69bf7e9d532565df6" dependencies = [ "bytes", "futures-lite", @@ -361,6 +360,7 @@ dependencies = [ "range-collections", "self_cell", "smallvec", + "tokio", ] [[package]] @@ -2103,6 +2103,7 @@ dependencies = [ "oneshot", "parking_lot", "portable-atomic", + "positioned-io", "postcard", "proptest", "quic-rpc", @@ -3277,6 +3278,7 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ccabfeeb89c73adf4081f0dca7f8e28dbda90981a222ceea37f619e93ea6afe9" dependencies = [ + "byteorder", "libc", "winapi", ] diff --git a/Cargo.toml b/Cargo.toml index abe0eec71..9b4b5c2a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ async-channel = "2.3.1" bao-tree = { version = "0.13", features = [ "tokio_fsm", "validate", + "experimental-mixed", ], default-features = false } blake3 = { version = "1.4.5", package = "iroh-blake3" } bytes = { version = "1.7", features = ["serde"] } @@ -77,6 +78,7 @@ walkdir = { version = "2.5.0", optional = true } # Examples console = { version = "0.15.8", optional = true } tracing-test = "0.2.5" +positioned-io = "0.3.3" [dev-dependencies] http-body = "1.0" @@ -189,3 +191,4 @@ incremental = false iroh = { git = "https://github.com/n0-computer/iroh.git", branch = "main" } iroh-base = { git = "https://github.com/n0-computer/iroh.git", branch = "main" } quic-rpc = { git = "https://github.com/n0-computer/quic-rpc", branch = "main" } +bao-tree = { git = "https://github.com/n0-computer/bao-tree", branch = "read_and_seek" } diff --git a/src/rpc.rs b/src/rpc.rs index a17cbb77c..025d77dfa 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -23,10 +23,10 @@ use proto::{ AddPathRequest, AddPathResponse, AddStreamRequest, AddStreamResponse, AddStreamUpdate, BatchAddPathRequest, BatchAddPathResponse, BatchAddStreamRequest, BatchAddStreamResponse, BatchAddStreamUpdate, BatchCreateRequest, BatchCreateResponse, BatchCreateTempTagRequest, - BatchUpdate, BlobStatusRequest, BlobStatusResponse, ConsistencyCheckRequest, - CreateCollectionRequest, CreateCollectionResponse, DeleteRequest, DownloadResponse, - ExportRequest, ExportResponse, ListIncompleteRequest, ListRequest, ReadAtRequest, - ReadAtResponse, ValidateRequest, + BatchUpdate, BlobEntryInfoRequest, BlobStatusRequest, BlobStatusResponse, + ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse, DeleteRequest, + DownloadResponse, ExportRequest, ExportResponse, ListIncompleteRequest, ListRequest, + ReadAtRequest, ReadAtResponse, ValidateRequest, }, tags::{ CreateRequest as TagsCreateRequest, DeleteRequest as TagDeleteRequest, @@ -51,7 +51,9 @@ use crate::{ }, net_protocol::{BlobDownloadRequest, Blobs, BlobsInner}, provider::{AddProgress, BatchAddPathProgress}, - store::{ConsistencyCheckProgress, ImportProgress, MapEntry, ValidateProgress}, + store::{ + ConsistencyCheckProgress, EntryPathOrData, ImportProgress, MapEntry, ValidateProgress, + }, util::{ local_pool::LocalPoolHandle, progress::{AsyncChannelProgressSender, ProgressSender}, @@ -203,6 +205,7 @@ impl<D: crate::store::Store> Handler<D> { .await } BatchCreateTempTag(msg) => chan.rpc(msg, self, Self::batch_create_temp_tag).await, + EntryInfo(msg) => chan.rpc(msg, self, Self::blob_entry_info).await, } } @@ -309,6 +312,17 @@ impl<D: crate::store::Store> Handler<D> { Ok(()) } + async fn blob_entry_info( + self, + msg: BlobEntryInfoRequest, + ) -> RpcResult<Option<EntryPathOrData>> { + Ok(self + .store() + .entry_path_or_data(msg.hash) + .await + .map_err(|e| RpcError::new(&e))?) + } + fn blob_list_tags(self, msg: TagListRequest) -> impl Stream<Item = TagInfo> + Send + 'static { tracing::info!("blob_list_tags"); let blobs = self; diff --git a/src/rpc/client/blobs.rs b/src/rpc/client/blobs.rs index 633034c8d..d0f24d436 100644 --- a/src/rpc/client/blobs.rs +++ b/src/rpc/client/blobs.rs @@ -66,6 +66,10 @@ use std::{ }; use anyhow::{anyhow, Context as _, Result}; +use bao_tree::{ + io::{baofile::BaoFile, outboard::PreOrderOutboard}, + BaoTree, +}; use bytes::Bytes; use futures_lite::{Stream, StreamExt}; use futures_util::SinkExt; @@ -87,10 +91,10 @@ use crate::{ format::collection::{Collection, SimpleStore}, get::db::DownloadProgress as BytesDownloadProgress, net_protocol::BlobDownloadRequest, - rpc::proto::RpcService, + rpc::proto::{blobs::BlobEntryInfoRequest, RpcService}, store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, - util::SetTagOption, - BlobFormat, Hash, Tag, + util::{MemOrFile, SetTagOption}, + BlobFormat, Hash, Tag, IROH_BLOCK_SIZE, }; mod batch; @@ -380,6 +384,31 @@ where )) } + /// Open a blob as an independent bao file + pub async fn open(&self, hash: Hash) -> Result<impl std::io::Read + std::io::Seek> { + let Some(info) = self.rpc.rpc(BlobEntryInfoRequest { hash }).await?? else { + return Err(anyhow!("Blob not found")); + }; + let (data, size) = match info.data { + MemOrFile::Mem(data) => (MemOrFile::Mem(data.clone()), data.len() as u64), + MemOrFile::File((path, size)) => (MemOrFile::File(std::fs::File::open(path)?), size), + }; + let outboard = match info.outboard { + MemOrFile::Mem(data) => MemOrFile::Mem(data.clone()), + MemOrFile::File(path) => MemOrFile::File(std::fs::File::open(path)?), + }; + let file = BaoFile { + data, + outboard: PreOrderOutboard { + tree: BaoTree::new(size, IROH_BLOCK_SIZE), + root: hash.into(), + data: outboard, + }, + }; + let file = positioned_io::Cursor::new(file); + Ok(file) + } + /// Export a blob from the internal blob store to a path on the node's filesystem. /// /// `destination` should be an writeable, absolute path on the local node's filesystem. diff --git a/src/rpc/proto/blobs.rs b/src/rpc/proto/blobs.rs index 75fdad1c7..b773d3652 100644 --- a/src/rpc/proto/blobs.rs +++ b/src/rpc/proto/blobs.rs @@ -15,8 +15,8 @@ use crate::{ provider::{AddProgress, BatchAddPathProgress}, rpc::client::blobs::{BlobInfo, BlobStatus, IncompleteBlobInfo, ReadAtLen, WrapOption}, store::{ - BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ImportMode, - ValidateProgress, + BaoBlobSize, ConsistencyCheckProgress, EntryPathOrData, ExportFormat, ExportMode, + ImportMode, ValidateProgress, }, util::SetTagOption, BlobFormat, Hash, HashAndFormat, Tag, @@ -63,6 +63,8 @@ pub enum Request { BatchAddPath(BatchAddPathRequest), #[rpc(response = RpcResult<()>)] BatchCreateTempTag(BatchCreateTempTagRequest), + #[rpc(response = RpcResult<Option<EntryPathOrData>>)] + EntryInfo(BlobEntryInfoRequest), } #[allow(missing_docs)] @@ -83,6 +85,7 @@ pub enum Response { BatchCreate(BatchCreateResponse), BatchAddStream(BatchAddStreamResponse), BatchAddPath(BatchAddPathResponse), + EntryInfo(RpcResult<Option<EntryPathOrData>>), } /// A request to the node to provide the data at the given path @@ -313,6 +316,13 @@ pub struct BatchAddPathRequest { pub batch: BatchId, } +/// Write a blob from a byte stream +#[derive(Serialize, Deserialize, Debug)] +pub struct BlobEntryInfoRequest { + /// The hash of the blob + pub hash: Hash, +} + /// Response to a batch add path request #[derive(Serialize, Deserialize, Debug)] pub struct BatchAddPathResponse(pub BatchAddPathProgress); diff --git a/src/store/bao_file.rs b/src/store/bao_file.rs index c06328669..7564f2d5d 100644 --- a/src/store/bao_file.rs +++ b/src/store/bao_file.rs @@ -189,16 +189,7 @@ impl FileStorage { } fn current_size(&self) -> io::Result<u64> { - let len = self.sizes.metadata()?.len(); - if len < 8 { - Ok(0) - } else { - // todo: use the last full u64 in case the sizes file is not a multiple of 8 - // bytes. Not sure how that would happen, but we should handle it. - let mut buf = [0u8; 8]; - self.sizes.read_exact_at(len - 8, &mut buf)?; - Ok(u64::from_le_bytes(buf)) - } + read_current_size(&self.sizes) } fn write_batch(&mut self, size: u64, batch: &[BaoContentItem]) -> io::Result<()> { @@ -470,6 +461,18 @@ impl AsyncSliceReader for OutboardReader { } } +pub fn read_current_size(sizes: &File) -> io::Result<u64> { + let len = sizes.metadata()?.len(); + if len < 8 { + Ok(0) + } else { + let len = len & !7; + let mut buf = [0u8; 8]; + sizes.read_exact_at(len - 8, &mut buf)?; + Ok(u64::from_le_bytes(buf)) + } +} + enum HandleChange { None, MemToFile, diff --git a/src/store/fs.rs b/src/store/fs.rs index 8f6adb37f..fb583da28 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -98,9 +98,9 @@ use tables::{ReadOnlyTables, ReadableTables, Tables}; use self::{tables::DeleteSet, test_support::EntryData, util::PeekableFlumeReceiver}; use super::{ - bao_file::{BaoFileConfig, BaoFileHandle, BaoFileHandleWeak, CreateCb}, - temp_name, BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, EntryStatus, ExportMode, - ExportProgressCb, ImportMode, ImportProgress, Map, ReadableStore, TempCounterMap, + bao_file::{read_current_size, BaoFileConfig, BaoFileHandle, BaoFileHandleWeak, CreateCb}, + temp_name, BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, EntryPathOrData, EntryStatus, + ExportMode, ExportProgressCb, ImportMode, ImportProgress, Map, ReadableStore, TempCounterMap, }; use crate::{ store::{ @@ -532,6 +532,10 @@ pub(crate) enum ActorMessage { hash: Hash, tx: oneshot::Sender<ActorResult<EntryStatus>>, }, + EntryPathOrData { + hash: Hash, + tx: oneshot::Sender<ActorResult<Option<EntryPathOrData>>>, + }, #[cfg(test)] /// Query method: get the full entry state for a hash, both in memory and in redb. /// This is everything we got about the entry, including the actual inline outboard and data. @@ -664,6 +668,7 @@ impl ActorMessage { | Self::Tags { .. } | Self::GcStart { .. } | Self::GetFullEntryState { .. } + | Self::EntryPathOrData { .. } | Self::Dump => MessageCategory::ReadOnly, Self::Import { .. } | Self::Export { .. } @@ -870,6 +875,14 @@ impl StoreInner { Ok(tags) } + async fn entry_path_or_data(&self, hash: Hash) -> OuterResult<Option<EntryPathOrData>> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(ActorMessage::EntryPathOrData { hash, tx }) + .await?; + Ok(rx.await??) + } + async fn set_tag(&self, tag: Tag, value: Option<HashAndFormat>) -> OuterResult<()> { let (tx, rx) = oneshot::channel(); self.tx @@ -1371,6 +1384,10 @@ impl super::Store for Store { .await??) } + async fn entry_path_or_data(&self, hash: Hash) -> io::Result<Option<EntryPathOrData>> { + Ok(self.0.entry_path_or_data(hash).await?) + } + async fn set_tag(&self, name: Tag, hash: Option<HashAndFormat>) -> io::Result<()> { Ok(self.0.set_tag(name, hash).await?) } @@ -2266,6 +2283,65 @@ impl ActorState { Ok(()) } + fn entry_path_or_data( + &mut self, + tables: &impl ReadableTables, + hash: Hash, + ) -> ActorResult<Option<EntryPathOrData>> { + let data_path = || self.options.path.owned_data_path(&hash); + let outboard_path = || self.options.path.owned_outboard_path(&hash); + let sizes_path = || self.options.path.owned_sizes_path(&hash); + Ok(match tables.blobs().get(hash)? { + Some(guard) => match guard.value() { + EntryState::Complete { + data_location, + outboard_location, + } => { + let data = match data_location { + DataLocation::External(paths, size) => { + let path = paths.first().ok_or_else(|| { + ActorError::Inconsistent("external data missing".to_owned()) + })?; + MemOrFile::File((path.clone(), size)) + } + DataLocation::Owned(size) => MemOrFile::File((data_path(), size)), + DataLocation::Inline(_) => { + let data = tables.inline_data().get(hash)?.ok_or_else(|| { + ActorError::Inconsistent("inline data missing".to_owned()) + })?; + MemOrFile::Mem(data.value().to_vec().into()) + } + }; + let outboard = match outboard_location { + OutboardLocation::Owned => MemOrFile::File(outboard_path()), + OutboardLocation::Inline(_) => MemOrFile::Mem( + tables + .inline_outboard() + .get(hash)? + .ok_or_else(|| { + ActorError::Inconsistent("inline outboard missing".to_owned()) + })? + .value() + .to_vec() + .into(), + ), + OutboardLocation::NotNeeded => MemOrFile::Mem(Bytes::new()), + }; + Some(EntryPathOrData { data, outboard }) + } + EntryState::Partial { .. } => { + let sizes = std::fs::File::open(sizes_path())?; + let size = read_current_size(&sizes)?; + Some(EntryPathOrData { + data: MemOrFile::File((data_path(), size)), + outboard: MemOrFile::File(outboard_path()), + }) + } + }, + None => None, + }) + } + fn handle_toplevel(&mut self, db: &redb::Database, msg: ActorMessage) -> ActorResult<()> { match msg { ActorMessage::UpdateInlineOptions { @@ -2339,6 +2415,10 @@ impl ActorState { let res = self.get_full_entry_state(tables, hash); tx.send(res).ok(); } + ActorMessage::EntryPathOrData { hash, tx } => { + let res = self.entry_path_or_data(tables, hash); + tx.send(res).ok(); + } x => return Ok(Err(x)), } Ok(Ok(())) diff --git a/src/store/mem.rs b/src/store/mem.rs index 89d8fffd7..93da9fb7d 100644 --- a/src/store/mem.rs +++ b/src/store/mem.rs @@ -19,8 +19,8 @@ use futures_lite::{Stream, StreamExt}; use iroh_io::AsyncSliceReader; use super::{ - temp_name, BaoBatchWriter, ConsistencyCheckProgress, ExportMode, ExportProgressCb, ImportMode, - ImportProgress, Map, TempCounterMap, + temp_name, BaoBatchWriter, ConsistencyCheckProgress, EntryPathOrData, ExportMode, + ExportProgressCb, ImportMode, ImportProgress, Map, TempCounterMap, }; use crate::{ store::{ @@ -28,7 +28,7 @@ use crate::{ }, util::{ progress::{BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSender}, - TagCounter, TagDrop, + MemOrFile, TagCounter, TagDrop, }, BlobFormat, Hash, HashAndFormat, Tag, TempTag, IROH_BLOCK_SIZE, }; @@ -246,6 +246,22 @@ impl super::Store for Store { Ok(()) } + async fn entry_path_or_data(&self, hash: Hash) -> io::Result<Option<EntryPathOrData>> { + let state = self.read_lock(); + match state.entries.get(&hash) { + Some(entry) => { + let inner = entry.inner.data.read().unwrap(); + let data = inner.data.to_vec().into(); + let outboard = inner.outboard.to_vec().into(); + Ok(Some(EntryPathOrData { + data: MemOrFile::Mem(data), + outboard: MemOrFile::Mem(outboard), + })) + } + None => Ok(None), + } + } + async fn shutdown(&self) {} async fn sync(&self) -> io::Result<()> { diff --git a/src/store/readonly_mem.rs b/src/store/readonly_mem.rs index a04161554..5957ca4ce 100644 --- a/src/store/readonly_mem.rs +++ b/src/store/readonly_mem.rs @@ -18,7 +18,10 @@ use futures_lite::Stream; use iroh_io::AsyncSliceReader; use tokio::io::AsyncWriteExt; -use super::{BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, DbIter, ExportProgressCb}; +use super::{ + BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, DbIter, EntryPathOrData, + ExportProgressCb, +}; use crate::{ store::{ EntryStatus, ExportMode, ImportMode, ImportProgress, Map, MapEntry, MapEntryMut, @@ -26,7 +29,7 @@ use crate::{ }, util::{ progress::{BoxedProgressSender, IdGenerator, ProgressSender}, - Tag, + MemOrFile, Tag, }, BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE, }; @@ -321,6 +324,24 @@ impl super::Store for Store { Err(io::Error::new(io::ErrorKind::Other, "not implemented")) } + fn entry_path_or_data( + &self, + hash: Hash, + ) -> impl Future<Output = io::Result<Option<super::EntryPathOrData>>> + Send { + let res = match self.0.get(&hash) { + Some((outboard, data)) => { + let outboard = outboard.data.clone(); + let data = data.clone(); + Ok(Some(EntryPathOrData { + outboard: MemOrFile::Mem(outboard), + data: MemOrFile::Mem(data), + })) + } + None => Ok(None), + }; + futures_lite::future::ready(res) + } + fn temp_tag(&self, inner: HashAndFormat) -> TempTag { TempTag::new(inner, None) } diff --git a/src/store/traits.rs b/src/store/traits.rs index 01c48229d..78085be40 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -20,7 +20,7 @@ use crate::{ util::{ local_pool::{self, LocalPool}, progress::{BoxedProgressSender, IdGenerator, ProgressSender}, - Tag, + MemOrFile, Tag, }, BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE, }; @@ -42,6 +42,15 @@ pub enum EntryStatus { NotFound, } +/// Get the path or data for an entry +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] +pub struct EntryPathOrData { + /// The path to the data file or the inline data + pub data: MemOrFile<Bytes, (PathBuf, u64)>, + /// The path to the outboard file, or the inline outboard + pub outboard: MemOrFile<Bytes, PathBuf>, +} + /// The size of a bao file #[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq)] pub enum BaoBlobSize { @@ -384,6 +393,12 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug { ) -> impl Future<Output = io::Result<()>> + Send { validate_impl(self, repair, tx) } + + /// Get the info needed to open an entry independently of the store. + fn entry_path_or_data( + &self, + hash: Hash, + ) -> impl Future<Output = io::Result<Option<EntryPathOrData>>> + Send; } async fn validate_impl( diff --git a/src/util/mem_or_file.rs b/src/util/mem_or_file.rs index d929a19c9..768aa99ee 100644 --- a/src/util/mem_or_file.rs +++ b/src/util/mem_or_file.rs @@ -1,12 +1,16 @@ use std::{fs::File, io}; -use bao_tree::io::sync::{ReadAt, Size}; +use bao_tree::io::{ + mixed::ReadBytesAt, + sync::{ReadAt, Size}, +}; use bytes::Bytes; +use serde::{Deserialize, Serialize}; /// This is a general purpose Either, just like Result, except that the two cases /// are Mem for something that is in memory, and File for something that is somewhere /// external and only available via io. -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum MemOrFile<M, F> { /// We got it all in memory Mem(M), @@ -14,6 +18,19 @@ pub enum MemOrFile<M, F> { File(F), } +impl<M, F> ReadBytesAt for MemOrFile<M, F> +where + M: ReadBytesAt, + F: ReadBytesAt, +{ + fn read_bytes_at(&self, offset: u64, size: usize) -> io::Result<Bytes> { + match self { + MemOrFile::Mem(mem) => mem.read_bytes_at(offset, size), + MemOrFile::File(file) => file.read_bytes_at(offset, size), + } + } +} + /// Helper methods for a common way to use MemOrFile, where the memory part is something /// like a slice, and the file part is a tuple consisiting of path or file and size. impl<M, F> MemOrFile<M, (F, u64)>