diff --git a/Cargo.lock b/Cargo.lock index 6d406716eef1..b6426b59b39c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1934,10 +1934,13 @@ dependencies = [ "bytes", "common-error", "common-macro", + "common-test-util", "futures", "paste", + "pin-project", "serde", "snafu 0.8.5", + "tokio", "toml 0.8.19", "zeroize", ] @@ -8973,6 +8976,8 @@ dependencies = [ "auto_impl", "base64 0.21.7", "bitflags 2.6.0", + "bytes", + "common-base", "common-error", "common-macro", "common-runtime", diff --git a/src/common/base/Cargo.toml b/src/common/base/Cargo.toml index 5dfde47bf2ee..465599974dae 100644 --- a/src/common/base/Cargo.toml +++ b/src/common/base/Cargo.toml @@ -16,9 +16,12 @@ common-error.workspace = true common-macro.workspace = true futures.workspace = true paste = "1.0" +pin-project.workspace = true serde = { version = "1.0", features = ["derive"] } snafu.workspace = true +tokio.workspace = true zeroize = { version = "1.6", default-features = false, features = ["alloc"] } [dev-dependencies] +common-test-util.workspace = true toml.workspace = true diff --git a/src/common/base/src/range_read.rs b/src/common/base/src/range_read.rs index 4e7a48a8b78b..91f865d17ef6 100644 --- a/src/common/base/src/range_read.rs +++ b/src/common/base/src/range_read.rs @@ -12,12 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::future::Future; use std::io; use std::ops::Range; +use std::path::Path; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; use async_trait::async_trait; use bytes::{BufMut, Bytes}; -use futures::{AsyncReadExt, AsyncSeekExt}; +use futures::AsyncRead; +use pin_project::pin_project; +use tokio::io::{AsyncReadExt as _, AsyncSeekExt as _}; +use tokio::sync::Mutex; /// `Metadata` contains the metadata of a source. pub struct Metadata { @@ -61,7 +69,7 @@ pub trait RangeReader: Send + Unpin { } #[async_trait] -impl RangeReader for &mut R { +impl RangeReader for &mut R { async fn metadata(&mut self) -> io::Result { (*self).metadata().await } @@ -80,26 +88,212 @@ impl RangeReader for &mut R { } } -/// `RangeReaderAdapter` bridges `RangeReader` and `AsyncRead + AsyncSeek`. -pub struct RangeReaderAdapter(pub R); +/// `AsyncReadAdapter` adapts a `RangeReader` to an `AsyncRead`. +#[pin_project] +pub struct AsyncReadAdapter { + /// The inner `RangeReader`. + /// Use `Mutex` to get rid of the borrow checker issue. + inner: Arc>, + + /// The current position from the view of the reader. + position: u64, + + /// The buffer for the read bytes. + buffer: Vec, + + /// The length of the content. + content_length: u64, + + /// The future for reading the next bytes. + #[pin] + read_fut: Option> + Send>>>, +} + +impl AsyncReadAdapter { + pub async fn new(inner: R) -> io::Result { + let mut inner = inner; + let metadata = inner.metadata().await?; + Ok(AsyncReadAdapter { + inner: Arc::new(Mutex::new(inner)), + position: 0, + buffer: Vec::new(), + content_length: metadata.content_length, + read_fut: None, + }) + } +} + +/// The maximum size per read for the inner reader in `AsyncReadAdapter`. +const MAX_SIZE_PER_READ: usize = 8 * 1024 * 1024; // 8MB + +impl AsyncRead for AsyncReadAdapter { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let mut this = self.as_mut().project(); + + if *this.position >= *this.content_length { + return Poll::Ready(Ok(0)); + } + + if !this.buffer.is_empty() { + let to_read = this.buffer.len().min(buf.len()); + buf[..to_read].copy_from_slice(&this.buffer[..to_read]); + this.buffer.drain(..to_read); + *this.position += to_read as u64; + return Poll::Ready(Ok(to_read)); + } + + if this.read_fut.is_none() { + let size = (*this.content_length - *this.position).min(MAX_SIZE_PER_READ as u64); + let range = *this.position..(*this.position + size); + let inner = this.inner.clone(); + let fut = async move { + let mut inner = inner.lock().await; + inner.read(range).await + }; + + *this.read_fut = Some(Box::pin(fut)); + } + + match this + .read_fut + .as_mut() + .as_pin_mut() + .expect("checked above") + .poll(cx) + { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(bytes)) => { + *this.read_fut = None; + + if !bytes.is_empty() { + this.buffer.extend_from_slice(&bytes); + self.poll_read(cx, buf) + } else { + Poll::Ready(Ok(0)) + } + } + Poll::Ready(Err(e)) => { + *this.read_fut = None; + Poll::Ready(Err(e)) + } + } + } +} -/// Implements `RangeReader` for a type that implements `AsyncRead + AsyncSeek`. -/// -/// TODO(zhongzc): It's a temporary solution for porting the codebase from `AsyncRead + AsyncSeek` to `RangeReader`. -/// Until the codebase is fully ported to `RangeReader`, remove this implementation. #[async_trait] -impl RangeReader - for RangeReaderAdapter -{ +impl RangeReader for Vec { async fn metadata(&mut self) -> io::Result { - let content_length = self.0.seek(io::SeekFrom::End(0)).await?; - Ok(Metadata { content_length }) + Ok(Metadata { + content_length: self.len() as u64, + }) } - async fn read(&mut self, range: Range) -> io::Result { + async fn read(&mut self, mut range: Range) -> io::Result { + range.end = range.end.min(self.len() as u64); + + let bytes = Bytes::copy_from_slice(&self[range.start as usize..range.end as usize]); + Ok(bytes) + } +} + +/// `FileReader` is a `RangeReader` for reading a file. +pub struct FileReader { + content_length: u64, + position: u64, + file: tokio::fs::File, +} + +impl FileReader { + /// Creates a new `FileReader` for the file at the given path. + pub async fn new(path: impl AsRef) -> io::Result { + let file = tokio::fs::File::open(path).await?; + let metadata = file.metadata().await?; + Ok(FileReader { + content_length: metadata.len(), + position: 0, + file, + }) + } +} + +#[async_trait] +impl RangeReader for FileReader { + async fn metadata(&mut self) -> io::Result { + Ok(Metadata { + content_length: self.content_length, + }) + } + + async fn read(&mut self, mut range: Range) -> io::Result { + if range.start != self.position { + self.file.seek(io::SeekFrom::Start(range.start)).await?; + self.position = range.start; + } + + range.end = range.end.min(self.content_length); + if range.end <= self.position { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Start of range is out of bounds", + )); + } + let mut buf = vec![0; (range.end - range.start) as usize]; - self.0.seek(io::SeekFrom::Start(range.start)).await?; - self.0.read_exact(&mut buf).await?; + + self.file.read_exact(&mut buf).await?; + self.position = range.end; + Ok(Bytes::from(buf)) } } + +#[cfg(test)] +mod tests { + use common_test_util::temp_dir::create_named_temp_file; + use futures::io::AsyncReadExt as _; + + use super::*; + + #[tokio::test] + async fn test_async_read_adapter() { + let data = b"hello world"; + let reader = Vec::from(data); + let mut adapter = AsyncReadAdapter::new(reader).await.unwrap(); + + let mut buf = Vec::new(); + adapter.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, data); + } + + #[tokio::test] + async fn test_async_read_adapter_large() { + let data = (0..20 * 1024 * 1024).map(|i| i as u8).collect::>(); + let mut adapter = AsyncReadAdapter::new(data.clone()).await.unwrap(); + + let mut buf = Vec::new(); + adapter.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, data); + } + + #[tokio::test] + async fn test_file_reader() { + let file = create_named_temp_file(); + let path = file.path(); + let data = b"hello world"; + tokio::fs::write(path, data).await.unwrap(); + + let mut reader = FileReader::new(path).await.unwrap(); + let metadata = reader.metadata().await.unwrap(); + assert_eq!(metadata.content_length, data.len() as u64); + + let bytes = reader.read(0..metadata.content_length).await.unwrap(); + assert_eq!(&*bytes, data); + + let bytes = reader.read(0..5).await.unwrap(); + assert_eq!(&*bytes, &data[..5]); + } +} diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs index f79d651e7949..ace0e5c48536 100644 --- a/src/index/src/inverted_index/format/reader/blob.rs +++ b/src/index/src/inverted_index/format/reader/blob.rs @@ -80,9 +80,7 @@ impl InvertedIndexReader for InvertedIndexBlobReader { #[cfg(test)] mod tests { use common_base::bit_vec::prelude::*; - use common_base::range_read::RangeReaderAdapter; use fst::MapBuilder; - use futures::io::Cursor; use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas}; use prost::Message; @@ -163,8 +161,7 @@ mod tests { #[tokio::test] async fn test_inverted_index_blob_reader_metadata() { let blob = create_inverted_index_blob(); - let cursor = RangeReaderAdapter(Cursor::new(blob)); - let mut blob_reader = InvertedIndexBlobReader::new(cursor); + let mut blob_reader = InvertedIndexBlobReader::new(blob); let metas = blob_reader.metadata().await.unwrap(); assert_eq!(metas.metas.len(), 2); @@ -191,8 +188,7 @@ mod tests { #[tokio::test] async fn test_inverted_index_blob_reader_fst() { let blob = create_inverted_index_blob(); - let cursor = RangeReaderAdapter(Cursor::new(blob)); - let mut blob_reader = InvertedIndexBlobReader::new(cursor); + let mut blob_reader = InvertedIndexBlobReader::new(blob); let metas = blob_reader.metadata().await.unwrap(); let meta = metas.metas.get("tag0").unwrap(); @@ -224,8 +220,7 @@ mod tests { #[tokio::test] async fn test_inverted_index_blob_reader_bitmap() { let blob = create_inverted_index_blob(); - let cursor = RangeReaderAdapter(Cursor::new(blob)); - let mut blob_reader = InvertedIndexBlobReader::new(cursor); + let mut blob_reader = InvertedIndexBlobReader::new(blob); let metas = blob_reader.metadata().await.unwrap(); let meta = metas.metas.get("tag0").unwrap(); diff --git a/src/index/src/inverted_index/format/reader/footer.rs b/src/index/src/inverted_index/format/reader/footer.rs index ffcaf9d921ea..1f35237711ce 100644 --- a/src/index/src/inverted_index/format/reader/footer.rs +++ b/src/index/src/inverted_index/format/reader/footer.rs @@ -113,8 +113,6 @@ impl InvertedIndeFooterReader { #[cfg(test)] mod tests { - use common_base::range_read::RangeReaderAdapter; - use futures::io::Cursor; use prost::Message; use super::*; @@ -141,10 +139,9 @@ mod tests { ..Default::default() }; - let payload_buf = create_test_payload(meta); + let mut payload_buf = create_test_payload(meta); let blob_size = payload_buf.len() as u64; - let cursor = RangeReaderAdapter(Cursor::new(payload_buf)); - let mut reader = InvertedIndeFooterReader::new(cursor, blob_size); + let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size); let payload_size = reader.read_payload_size().await.unwrap(); let metas = reader.read_payload(payload_size).await.unwrap(); @@ -164,8 +161,7 @@ mod tests { let mut payload_buf = create_test_payload(meta); payload_buf.push(0xff); // Add an extra byte to corrupt the footer let blob_size = payload_buf.len() as u64; - let cursor = RangeReaderAdapter(Cursor::new(payload_buf)); - let mut reader = InvertedIndeFooterReader::new(cursor, blob_size); + let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size); let payload_size_result = reader.read_payload_size().await; assert!(payload_size_result.is_err()); @@ -180,10 +176,9 @@ mod tests { ..Default::default() }; - let payload_buf = create_test_payload(meta); + let mut payload_buf = create_test_payload(meta); let blob_size = payload_buf.len() as u64; - let cursor = RangeReaderAdapter(Cursor::new(payload_buf)); - let mut reader = InvertedIndeFooterReader::new(cursor, blob_size); + let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size); let payload_size = reader.read_payload_size().await.unwrap(); let payload_result = reader.read_payload(payload_size).await; diff --git a/src/index/src/inverted_index/format/writer/blob.rs b/src/index/src/inverted_index/format/writer/blob.rs index 26d3fb26d90e..7b95b8deafa9 100644 --- a/src/index/src/inverted_index/format/writer/blob.rs +++ b/src/index/src/inverted_index/format/writer/blob.rs @@ -99,8 +99,6 @@ impl InvertedIndexBlobWriter { #[cfg(test)] mod tests { - use common_base::range_read::RangeReaderAdapter; - use futures::io::Cursor; use futures::stream; use super::*; @@ -120,8 +118,7 @@ mod tests { .await .unwrap(); - let cursor = RangeReaderAdapter(Cursor::new(blob)); - let mut reader = InvertedIndexBlobReader::new(cursor); + let mut reader = InvertedIndexBlobReader::new(blob); let metadata = reader.metadata().await.unwrap(); assert_eq!(metadata.total_row_count, 8); assert_eq!(metadata.segment_row_count, 1); @@ -161,8 +158,7 @@ mod tests { .await .unwrap(); - let cursor = RangeReaderAdapter(Cursor::new(blob)); - let mut reader = InvertedIndexBlobReader::new(cursor); + let mut reader = InvertedIndexBlobReader::new(blob); let metadata = reader.metadata().await.unwrap(); assert_eq!(metadata.total_row_count, 8); assert_eq!(metadata.segment_row_count, 1); diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index a3482cc07530..cac3ffedd74c 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -16,7 +16,6 @@ pub mod builder; use std::sync::Arc; -use common_base::range_read::RangeReaderAdapter; use common_telemetry::warn; use index::inverted_index::format::reader::InvertedIndexBlobReader; use index::inverted_index::search::index_apply::{ @@ -109,7 +108,6 @@ impl InvertedIndexApplier { self.remote_blob_reader(file_id).await? } }; - let blob = RangeReaderAdapter(blob); if let Some(index_cache) = &self.inverted_index_cache { let mut index_reader = CachedInvertedIndexBlobReader::new( diff --git a/src/mito2/src/sst/index/puffin_manager.rs b/src/mito2/src/sst/index/puffin_manager.rs index b83101e3fb62..8bfa25de1e87 100644 --- a/src/mito2/src/sst/index/puffin_manager.rs +++ b/src/mito2/src/sst/index/puffin_manager.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use async_trait::async_trait; use common_error::ext::BoxedError; -use object_store::{FuturesAsyncReader, FuturesAsyncWriter, ObjectStore}; +use object_store::{FuturesAsyncWriter, ObjectStore}; use puffin::error::{self as puffin_error, Result as PuffinResult}; use puffin::puffin_manager::file_accessor::PuffinFileAccessor; use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager; @@ -28,11 +28,11 @@ use snafu::ResultExt; use crate::error::{PuffinInitStagerSnafu, Result}; use crate::metrics::{ INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL, INDEX_PUFFIN_READ_OP_TOTAL, - INDEX_PUFFIN_SEEK_OP_TOTAL, INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL, + INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL, }; use crate::sst::index::store::{self, InstrumentedStore}; -type InstrumentedAsyncRead = store::InstrumentedAsyncRead<'static, FuturesAsyncReader>; +type InstrumentedRangeReader = store::InstrumentedRangeReader<'static>; type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>; pub(crate) type SstPuffinManager = @@ -115,16 +115,15 @@ impl ObjectStorePuffinFileAccessor { #[async_trait] impl PuffinFileAccessor for ObjectStorePuffinFileAccessor { - type Reader = InstrumentedAsyncRead; + type Reader = InstrumentedRangeReader; type Writer = InstrumentedAsyncWrite; async fn reader(&self, puffin_file_name: &str) -> PuffinResult { self.object_store - .reader( + .range_reader( puffin_file_name, &INDEX_PUFFIN_READ_BYTES_TOTAL, &INDEX_PUFFIN_READ_OP_TOTAL, - &INDEX_PUFFIN_SEEK_OP_TOTAL, ) .await .map_err(BoxedError::new) @@ -147,9 +146,9 @@ impl PuffinFileAccessor for ObjectStorePuffinFileAccessor { #[cfg(test)] mod tests { + use common_base::range_read::RangeReader; use common_test_util::temp_dir::create_temp_dir; use futures::io::Cursor; - use futures::AsyncReadExt; use object_store::services::Memory; use puffin::blob_metadata::CompressionCodec; use puffin::puffin_manager::{DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions}; @@ -193,9 +192,9 @@ mod tests { let reader = manager.reader(file_name).await.unwrap(); let blob_guard = reader.blob(blob_key).await.unwrap(); let mut blob_reader = blob_guard.reader().await.unwrap(); - let mut buf = Vec::new(); - blob_reader.read_to_end(&mut buf).await.unwrap(); - assert_eq!(buf, raw_data); + let meta = blob_reader.metadata().await.unwrap(); + let bs = blob_reader.read(0..meta.content_length).await.unwrap(); + assert_eq!(&*bs, raw_data); let dir_guard = reader.dir(dir_key).await.unwrap(); let file = dir_guard.path().join("hello"); diff --git a/src/mito2/src/sst/index/store.rs b/src/mito2/src/sst/index/store.rs index 3c709d500028..2750c69fc249 100644 --- a/src/mito2/src/sst/index/store.rs +++ b/src/mito2/src/sst/index/store.rs @@ -13,9 +13,13 @@ // limitations under the License. use std::io; +use std::ops::Range; use std::pin::Pin; use std::task::{Context, Poll}; +use async_trait::async_trait; +use bytes::{BufMut, Bytes}; +use common_base::range_read::{Metadata, RangeReader}; use futures::{AsyncRead, AsyncSeek, AsyncWrite}; use object_store::ObjectStore; use pin_project::pin_project; @@ -51,6 +55,22 @@ impl InstrumentedStore { self } + /// Returns an [`InstrumentedRangeReader`] for the given path. + /// Metrics like the number of bytes read are recorded using the provided `IntCounter`. + pub async fn range_reader<'a>( + &self, + path: &str, + read_byte_count: &'a IntCounter, + read_count: &'a IntCounter, + ) -> Result> { + Ok(InstrumentedRangeReader { + store: self.object_store.clone(), + path: path.to_string(), + read_byte_count, + read_count, + }) + } + /// Returns an [`InstrumentedAsyncRead`] for the given path. /// Metrics like the number of bytes read, read and seek operations /// are recorded using the provided `IntCounter`s. @@ -236,6 +256,56 @@ impl AsyncWrite for InstrumentedAsyncWrite<'_, W> } } +/// Implements `RangeReader` for `ObjectStore` and record metrics. +pub(crate) struct InstrumentedRangeReader<'a> { + store: ObjectStore, + path: String, + read_byte_count: &'a IntCounter, + read_count: &'a IntCounter, +} + +#[async_trait] +impl RangeReader for InstrumentedRangeReader<'_> { + async fn metadata(&mut self) -> io::Result { + let stat = self.store.stat(&self.path).await?; + Ok(Metadata { + content_length: stat.content_length(), + }) + } + + async fn read(&mut self, range: Range) -> io::Result { + let buf = self.store.reader(&self.path).await?.read(range).await?; + self.read_byte_count.inc_by(buf.len() as _); + self.read_count.inc_by(1); + Ok(buf.to_bytes()) + } + + async fn read_into( + &mut self, + range: Range, + buf: &mut (impl BufMut + Send), + ) -> io::Result<()> { + let reader = self.store.reader(&self.path).await?; + let size = reader.read_into(buf, range).await?; + self.read_byte_count.inc_by(size as _); + self.read_count.inc_by(1); + Ok(()) + } + + async fn read_vec(&mut self, ranges: &[Range]) -> io::Result> { + let bufs = self + .store + .reader(&self.path) + .await? + .fetch(ranges.to_owned()) + .await?; + let total_size: usize = bufs.iter().map(|buf| buf.len()).sum(); + self.read_byte_count.inc_by(total_size as _); + self.read_count.inc_by(1); + Ok(bufs.into_iter().map(|buf| buf.to_bytes()).collect()) + } +} + /// A guard that increments a counter when dropped. struct CounterGuard<'a> { count: usize, diff --git a/src/puffin/Cargo.toml b/src/puffin/Cargo.toml index 22c429cd8429..e4e6c74a5c9b 100644 --- a/src/puffin/Cargo.toml +++ b/src/puffin/Cargo.toml @@ -14,6 +14,8 @@ async-walkdir = "2.0.0" auto_impl = "1.2.0" base64.workspace = true bitflags.workspace = true +bytes.workspace = true +common-base.workspace = true common-error.workspace = true common-macro.workspace = true common-runtime.workspace = true diff --git a/src/puffin/src/file_format/reader.rs b/src/puffin/src/file_format/reader.rs index 5b8ba3c1478f..3f48bf4b105e 100644 --- a/src/puffin/src/file_format/reader.rs +++ b/src/puffin/src/file_format/reader.rs @@ -16,6 +16,7 @@ mod file; mod footer; use async_trait::async_trait; +use common_base::range_read::RangeReader; use crate::blob_metadata::BlobMetadata; use crate::error::Result; @@ -38,7 +39,7 @@ pub trait SyncReader<'a> { /// `AsyncReader` defines an asynchronous reader for puffin data. #[async_trait] pub trait AsyncReader<'a> { - type Reader: futures::AsyncRead + futures::AsyncSeek; + type Reader: RangeReader; /// Fetches the FileMetadata. async fn metadata(&'a mut self) -> Result; diff --git a/src/puffin/src/file_format/reader/file.rs b/src/puffin/src/file_format/reader/file.rs index 9a87d70592c2..3736ed5d2d8d 100644 --- a/src/puffin/src/file_format/reader/file.rs +++ b/src/puffin/src/file_format/reader/file.rs @@ -15,7 +15,7 @@ use std::io::{self, SeekFrom}; use async_trait::async_trait; -use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; +use common_base::range_read::RangeReader; use snafu::{ensure, ResultExt}; use crate::blob_metadata::BlobMetadata; @@ -112,7 +112,7 @@ impl<'a, R: io::Read + io::Seek + 'a> SyncReader<'a> for PuffinFileReader { } #[async_trait] -impl<'a, R: AsyncRead + AsyncSeek + Unpin + Send + 'a> AsyncReader<'a> for PuffinFileReader { +impl<'a, R: RangeReader + 'a> AsyncReader<'a> for PuffinFileReader { type Reader = PartialReader<&'a mut R>; async fn metadata(&'a mut self) -> Result { @@ -121,12 +121,8 @@ impl<'a, R: AsyncRead + AsyncSeek + Unpin + Send + 'a> AsyncReader<'a> for Puffi } // check the magic - let mut magic = [0; MAGIC_SIZE as usize]; - self.source - .read_exact(&mut magic) - .await - .context(ReadSnafu)?; - ensure!(magic == MAGIC, MagicNotMatchedSnafu); + let magic = self.source.read(0..MAGIC_SIZE).await.context(ReadSnafu)?; + ensure!(*magic == MAGIC, MagicNotMatchedSnafu); let file_size = self.get_file_size_async().await?; @@ -155,13 +151,14 @@ impl PuffinFileReader { } } -impl PuffinFileReader { +impl PuffinFileReader { async fn get_file_size_async(&mut self) -> Result { let file_size = self .source - .seek(SeekFrom::End(0)) + .metadata() .await - .context(SeekSnafu)?; + .context(ReadSnafu)? + .content_length; Self::validate_file_size(file_size)?; Ok(file_size) } diff --git a/src/puffin/src/file_format/reader/footer.rs b/src/puffin/src/file_format/reader/footer.rs index 1d0e915ed369..aa764fd32a21 100644 --- a/src/puffin/src/file_format/reader/footer.rs +++ b/src/puffin/src/file_format/reader/footer.rs @@ -14,7 +14,7 @@ use std::io::{self, Cursor, SeekFrom}; -use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; +use common_base::range_read::RangeReader; use snafu::{ensure, ResultExt}; use crate::error::{ @@ -72,24 +72,20 @@ impl FooterParser { } } -impl FooterParser { +impl FooterParser { /// Parses the footer from the IO source in a asynchronous manner. pub async fn parse_async(&mut self) -> Result { let mut parser = StageParser::new(self.file_size); let mut buf = vec![]; while let Some(byte_to_read) = parser.next_to_read() { + buf.clear(); + let range = byte_to_read.offset..byte_to_read.offset + byte_to_read.size; self.source - .seek(SeekFrom::Start(byte_to_read.offset)) + .read_into(range, &mut buf) .await - .context(SeekSnafu)?; - let size = byte_to_read.size as usize; - - buf.resize(size, 0); - let buf = &mut buf[..size]; - - self.source.read_exact(buf).await.context(ReadSnafu)?; - parser.consume_bytes(buf)?; + .context(ReadSnafu)?; + parser.consume_bytes(&buf)?; } parser.finish() diff --git a/src/puffin/src/partial_reader/async.rs b/src/puffin/src/partial_reader/async.rs index 2cc9fae5236a..3de40cb3a190 100644 --- a/src/puffin/src/partial_reader/async.rs +++ b/src/puffin/src/partial_reader/async.rs @@ -13,184 +13,121 @@ // limitations under the License. use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::ops::Range; -use futures::{ready, AsyncRead, AsyncSeek}; +use async_trait::async_trait; +use bytes::{BufMut, Bytes}; +use common_base::range_read::{Metadata, RangeReader}; -use crate::partial_reader::position::position_after_seek; use crate::partial_reader::PartialReader; -impl AsyncRead for PartialReader { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - // past end of portion - if self.position() > self.size() { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::InvalidInput, - "invalid read past the end of the portion", - ))); - } +#[async_trait] +impl RangeReader for PartialReader { + async fn metadata(&mut self) -> io::Result { + Ok(Metadata { + content_length: self.size, + }) + } - // end of portion - if self.is_eof() { - return Poll::Ready(Ok(0)); + async fn read(&mut self, range: Range) -> io::Result { + let absolute_range_start = self.offset + range.start; + if absolute_range_start >= self.offset + self.size { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Start of range is out of bounds", + )); } + let absolute_range_end = (self.offset + range.end).min(self.offset + self.size); + let absolute_range = absolute_range_start..absolute_range_end; - // first read, seek to the correct offset - if self.position_in_portion.is_none() { - // seek operation - let seek_from = io::SeekFrom::Start(self.offset); - ready!(self.as_mut().project().source.poll_seek(cx, seek_from))?; + let result = self.source.read(absolute_range.clone()).await?; + self.position_in_portion = Some(absolute_range.end); + Ok(result) + } - self.position_in_portion = Some(0); + async fn read_into( + &mut self, + range: Range, + buf: &mut (impl BufMut + Send), + ) -> io::Result<()> { + let absolute_range_start = self.offset + range.start; + if absolute_range_start >= self.offset + self.size { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Start of range is out of bounds", + )); } + let absolute_range_end = (self.offset + range.end).min(self.offset + self.size); + let absolute_range = absolute_range_start..absolute_range_end; - // prevent reading over the end - let max_len = (self.size() - self.position_in_portion.unwrap()) as usize; - let actual_len = max_len.min(buf.len()); - - // create a limited reader - let target_buf = &mut buf[..actual_len]; - - // read operation - let read_bytes = ready!(self.as_mut().project().source.poll_read(cx, target_buf))?; - self.position_in_portion = Some(self.position() + read_bytes as u64); - - Poll::Ready(Ok(read_bytes)) + self.source.read_into(absolute_range.clone(), buf).await?; + self.position_in_portion = Some(absolute_range.end); + Ok(()) } -} -impl AsyncSeek for PartialReader { - fn poll_seek( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - pos: io::SeekFrom, - ) -> Poll> { - let new_position = position_after_seek(pos, self.position(), self.size())?; - let pos = io::SeekFrom::Start(self.offset + new_position); - ready!(self.as_mut().project().source.poll_seek(cx, pos))?; - - self.position_in_portion = Some(new_position); - Poll::Ready(Ok(new_position)) + async fn read_vec(&mut self, ranges: &[Range]) -> io::Result> { + let absolute_ranges = ranges + .iter() + .map(|range| { + let start = self.offset + range.start; + + if start >= self.offset + self.size { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Start of range is out of bounds", + )); + } + + let end = (self.offset + range.end).min(self.offset + self.size); + Ok(start..end) + }) + .collect::>>()?; + + let results = self.source.read_vec(&absolute_ranges).await?; + if let Some(last_range) = absolute_ranges.last() { + self.position_in_portion = Some(last_range.end); + } + + Ok(results) } } #[cfg(test)] mod tests { - use futures::io::Cursor; - use futures::{AsyncReadExt as _, AsyncSeekExt as _}; - use super::*; #[tokio::test] async fn read_all_data_in_portion() { let data: Vec = (0..100).collect(); - let mut reader = PartialReader::new(Cursor::new(data.clone()), 0, 100); - let mut buf = vec![0; 100]; - assert_eq!(reader.read(&mut buf).await.unwrap(), 100); - assert_eq!(buf, data); + let mut reader = PartialReader::new(data.clone(), 0, 100); + let buf = reader.read(0..100).await.unwrap(); + assert_eq!(*buf, data); } #[tokio::test] async fn read_part_of_data_in_portion() { let data: Vec = (0..100).collect(); - let mut reader = PartialReader::new(Cursor::new(data), 10, 30); - let mut buf = vec![0; 30]; - assert_eq!(reader.read(&mut buf).await.unwrap(), 30); - assert_eq!(buf, (10..40).collect::>()); - } - - #[tokio::test] - async fn seek_and_read_data_in_portion() { - let data: Vec = (0..100).collect(); - let mut reader = PartialReader::new(Cursor::new(data), 10, 30); - assert_eq!(reader.seek(io::SeekFrom::Start(10)).await.unwrap(), 10); - let mut buf = vec![0; 10]; - assert_eq!(reader.read(&mut buf).await.unwrap(), 10); - assert_eq!(buf, (20..30).collect::>()); - } - - #[tokio::test] - async fn read_past_end_of_portion_is_eof() { - let data: Vec = (0..100).collect(); - let mut reader = PartialReader::new(Cursor::new(data), 10, 30); - let mut buf = vec![0; 50]; - assert_eq!(reader.read(&mut buf).await.unwrap(), 30); - assert_eq!(reader.read(&mut buf).await.unwrap(), 0); // hit EOF + let mut reader = PartialReader::new(data, 10, 30); + let buf = reader.read(0..30).await.unwrap(); + assert_eq!(*buf, (10..40).collect::>()); } #[tokio::test] async fn seek_past_end_of_portion_returns_error() { let data: Vec = (0..100).collect(); - let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + let mut reader = PartialReader::new(data, 10, 30); // seeking past the portion returns an error - assert!(reader.seek(io::SeekFrom::Start(31)).await.is_err()); - } - - #[tokio::test] - async fn seek_to_negative_position_returns_error() { - let data: Vec = (0..100).collect(); - let mut reader = PartialReader::new(Cursor::new(data), 10, 30); - assert_eq!(reader.seek(io::SeekFrom::Start(10)).await.unwrap(), 10); - // seeking back to the start of the portion - assert_eq!(reader.seek(io::SeekFrom::Current(-10)).await.unwrap(), 0); - // seeking to a negative position returns an error - assert!(reader.seek(io::SeekFrom::Current(-1)).await.is_err()); - } - - #[tokio::test] - async fn seek_from_end_of_portion() { - let data: Vec = (0..100).collect(); - let mut reader = PartialReader::new(Cursor::new(data), 10, 30); - let mut buf = vec![0; 10]; - // seek to 10 bytes before the end of the portion - assert_eq!(reader.seek(io::SeekFrom::End(-10)).await.unwrap(), 20); - assert_eq!(reader.read(&mut buf).await.unwrap(), 10); - // the final 10 bytes of the portion - assert_eq!(buf, (30..40).collect::>()); - assert!(reader.is_eof()); - } - - #[tokio::test] - async fn seek_from_end_to_negative_position_returns_error() { - let data: Vec = (0..100).collect(); - let mut reader = PartialReader::new(Cursor::new(data.clone()), 10, 30); - // seeking to a negative position returns an error - assert!(reader.seek(io::SeekFrom::End(-31)).await.is_err()); - } - - #[tokio::test] - async fn zero_length_portion_returns_zero_on_read() { - let data: Vec = (0..100).collect(); - let mut reader = PartialReader::new(Cursor::new(data), 10, 0); - let mut buf = vec![0; 10]; - // reading a portion with zero length returns 0 bytes - assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + assert!(reader.read(31..32).await.is_err()); } #[tokio::test] async fn is_eof_returns_true_at_end_of_portion() { let data: Vec = (0..100).collect(); - let mut reader = PartialReader::new(Cursor::new(data), 10, 30); + let mut reader = PartialReader::new(data, 10, 30); // we are not at the end of the portion assert!(!reader.is_eof()); - let mut buf = vec![0; 30]; - assert_eq!(reader.read(&mut buf).await.unwrap(), 30); + let _ = reader.read(0..20).await.unwrap(); // we are at the end of the portion assert!(reader.is_eof()); } - - #[tokio::test] - async fn position_resets_after_seek_to_start() { - let data: Vec = (0..100).collect(); - let mut reader = PartialReader::new(Cursor::new(data), 10, 30); - assert_eq!(reader.seek(io::SeekFrom::Start(10)).await.unwrap(), 10); - assert_eq!(reader.position(), 10); - assert_eq!(reader.seek(io::SeekFrom::Start(0)).await.unwrap(), 0); - assert_eq!(reader.position(), 0); - } } diff --git a/src/puffin/src/puffin_manager.rs b/src/puffin/src/puffin_manager.rs index f77b79c007ba..7bd5e9039d03 100644 --- a/src/puffin/src/puffin_manager.rs +++ b/src/puffin/src/puffin_manager.rs @@ -22,7 +22,8 @@ mod tests; use std::path::PathBuf; use async_trait::async_trait; -use futures::{AsyncRead, AsyncSeek}; +use common_base::range_read::RangeReader; +use futures::AsyncRead; use crate::blob_metadata::CompressionCodec; use crate::error::Result; @@ -94,7 +95,7 @@ pub trait PuffinReader { #[async_trait] #[auto_impl::auto_impl(Arc)] pub trait BlobGuard { - type Reader: AsyncRead + AsyncSeek + Unpin; + type Reader: RangeReader; async fn reader(&self) -> Result; } diff --git a/src/puffin/src/puffin_manager/file_accessor.rs b/src/puffin/src/puffin_manager/file_accessor.rs index 89ef8cc45192..dc32db6db50c 100644 --- a/src/puffin/src/puffin_manager/file_accessor.rs +++ b/src/puffin/src/puffin_manager/file_accessor.rs @@ -13,7 +13,8 @@ // limitations under the License. use async_trait::async_trait; -use futures::{AsyncRead, AsyncSeek, AsyncWrite}; +use common_base::range_read::RangeReader; +use futures::AsyncWrite; use crate::error::Result; @@ -21,7 +22,7 @@ use crate::error::Result; #[async_trait] #[auto_impl::auto_impl(Arc)] pub trait PuffinFileAccessor: Send + Sync + 'static { - type Reader: AsyncRead + AsyncSeek + Unpin + Send + Sync; + type Reader: RangeReader + Sync; type Writer: AsyncWrite + Unpin + Send; /// Opens a reader for the given puffin file. diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager.rs b/src/puffin/src/puffin_manager/fs_puffin_manager.rs index 01b367a78291..976eb239979a 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager.rs @@ -20,8 +20,8 @@ use async_trait::async_trait; pub use reader::FsPuffinReader; pub use writer::FsPuffinWriter; -use super::file_accessor::PuffinFileAccessor; use crate::error::Result; +use crate::puffin_manager::file_accessor::PuffinFileAccessor; use crate::puffin_manager::stager::Stager; use crate::puffin_manager::PuffinManager; diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs index 3d3d2c880407..3de27fdb77b0 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs @@ -12,19 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::io; +use std::ops::Range; use async_compression::futures::bufread::ZstdDecoder; use async_trait::async_trait; +use bytes::{BufMut, Bytes}; +use common_base::range_read::{AsyncReadAdapter, Metadata, RangeReader}; use futures::io::BufReader; -use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite}; +use futures::{AsyncRead, AsyncWrite}; use snafu::{ensure, OptionExt, ResultExt}; use crate::blob_metadata::{BlobMetadata, CompressionCodec}; use crate::error::{ BlobIndexOutOfBoundSnafu, BlobNotFoundSnafu, DeserializeJsonSnafu, FileKeyNotMatchSnafu, - ReadSnafu, Result, UnsupportedDecompressionSnafu, WriteSnafu, + MetadataSnafu, ReadSnafu, Result, UnsupportedDecompressionSnafu, WriteSnafu, }; use crate::file_format::reader::{AsyncReader, PuffinFileReader}; use crate::partial_reader::PartialReader; @@ -132,11 +134,12 @@ where F: PuffinFileAccessor + Clone, { async fn init_blob_to_stager( - mut reader: PuffinFileReader, + reader: PuffinFileReader, blob_metadata: BlobMetadata, mut writer: BoxWriter, ) -> Result { - let reader = reader.blob_reader(&blob_metadata)?; + let reader = reader.into_blob_reader(&blob_metadata); + let reader = AsyncReadAdapter::new(reader).await.context(MetadataSnafu)?; let compression = blob_metadata.compression_codec; let size = Self::handle_decompress(reader, &mut writer, compression).await?; Ok(size) @@ -159,10 +162,12 @@ where .context(BlobNotFoundSnafu { blob: key })?; let mut reader = file.blob_reader(blob_metadata)?; - let mut buf = vec![]; - reader.read_to_end(&mut buf).await.context(ReadSnafu)?; - let dir_meta: DirMetadata = - serde_json::from_slice(buf.as_slice()).context(DeserializeJsonSnafu)?; + let meta = reader.metadata().await.context(MetadataSnafu)?; + let buf = reader + .read(0..meta.content_length) + .await + .context(ReadSnafu)?; + let dir_meta: DirMetadata = serde_json::from_slice(&buf).context(DeserializeJsonSnafu)?; let mut tasks = vec![]; for file_meta in dir_meta.files { @@ -185,8 +190,8 @@ where let reader = accessor.reader(&puffin_file_name).await?; let writer = writer_provider.writer(&file_meta.relative_path).await?; let task = common_runtime::spawn_global(async move { - let mut file = PuffinFileReader::new(reader); - let reader = file.blob_reader(&blob_meta)?; + let reader = PuffinFileReader::new(reader).into_blob_reader(&blob_meta); + let reader = AsyncReadAdapter::new(reader).await.context(MetadataSnafu)?; let compression = blob_meta.compression_codec; let size = Self::handle_decompress(reader, writer, compression).await?; Ok(size) @@ -256,43 +261,45 @@ impl BlobGuard for RandomReadBlob { /// `Either` is a type that represents either `A` or `B`. /// /// Used to: -/// impl `AsyncRead + AsyncSeek` for `Either`, +/// impl `RangeReader` for `Either`, /// impl `BlobGuard` for `Either`. pub enum Either { L(A), R(B), } -impl AsyncRead for Either +#[async_trait] +impl RangeReader for Either where - A: AsyncRead + Unpin, - B: AsyncRead + Unpin, + A: RangeReader, + B: RangeReader, { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - match self.get_mut() { - Either::L(a) => Pin::new(a).poll_read(cx, buf), - Either::R(b) => Pin::new(b).poll_read(cx, buf), + async fn metadata(&mut self) -> io::Result { + match self { + Either::L(a) => a.metadata().await, + Either::R(b) => b.metadata().await, } } -} - -impl AsyncSeek for Either -where - A: AsyncSeek + Unpin, - B: AsyncSeek + Unpin, -{ - fn poll_seek( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - pos: std::io::SeekFrom, - ) -> Poll> { - match self.get_mut() { - Either::L(a) => Pin::new(a).poll_seek(cx, pos), - Either::R(b) => Pin::new(b).poll_seek(cx, pos), + async fn read(&mut self, range: Range) -> io::Result { + match self { + Either::L(a) => a.read(range).await, + Either::R(b) => b.read(range).await, + } + } + async fn read_into( + &mut self, + range: Range, + buf: &mut (impl BufMut + Send), + ) -> io::Result<()> { + match self { + Either::L(a) => a.read_into(range, buf).await, + Either::R(b) => b.read_into(range, buf).await, + } + } + async fn read_vec(&mut self, ranges: &[Range]) -> io::Result> { + match self { + Either::L(a) => a.read_vec(ranges).await, + Either::R(b) => b.read_vec(ranges).await, } } } diff --git a/src/puffin/src/puffin_manager/stager/bounded_stager.rs b/src/puffin/src/puffin_manager/stager/bounded_stager.rs index 9ab2be00ac27..19fd891438b3 100644 --- a/src/puffin/src/puffin_manager/stager/bounded_stager.rs +++ b/src/puffin/src/puffin_manager/stager/bounded_stager.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use async_walkdir::{Filtering, WalkDir}; use base64::prelude::BASE64_URL_SAFE; use base64::Engine; +use common_base::range_read::FileReader; use common_runtime::runtime::RuntimeTrait; use common_telemetry::{info, warn}; use futures::{FutureExt, StreamExt}; @@ -30,7 +31,7 @@ use snafu::ResultExt; use tokio::fs; use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::{Receiver, Sender}; -use tokio_util::compat::{Compat, TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; +use tokio_util::compat::TokioAsyncWriteCompatExt; use crate::error::{ CacheGetSnafu, CreateSnafu, MetadataSnafu, OpenSnafu, ReadSnafu, RemoveSnafu, RenameSnafu, @@ -427,11 +428,10 @@ pub struct FsBlobGuard { #[async_trait] impl BlobGuard for FsBlobGuard { - type Reader = Compat; + type Reader = FileReader; async fn reader(&self) -> Result { - let file = fs::File::open(&self.path).await.context(OpenSnafu)?; - Ok(file.compat()) + FileReader::new(&self.path).await.context(OpenSnafu) } } @@ -530,8 +530,9 @@ impl BoundedStager { #[cfg(test)] mod tests { + use common_base::range_read::RangeReader; use common_test_util::temp_dir::create_temp_dir; - use futures::{AsyncReadExt, AsyncWriteExt}; + use futures::AsyncWriteExt; use tokio::io::AsyncReadExt as _; use super::*; @@ -564,9 +565,9 @@ mod tests { .await .unwrap(); - let mut buf = Vec::new(); - reader.read_to_end(&mut buf).await.unwrap(); - assert_eq!(buf, b"hello world"); + let m = reader.metadata().await.unwrap(); + let buf = reader.read(0..m.content_length).await.unwrap(); + assert_eq!(&*buf, b"hello world"); let mut file = stager.must_get_file(puffin_file_name, key).await; let mut buf = Vec::new(); @@ -695,9 +696,10 @@ mod tests { .reader() .await .unwrap(); - let mut buf = Vec::new(); - reader.read_to_end(&mut buf).await.unwrap(); - assert_eq!(buf, b"hello world"); + + let m = reader.metadata().await.unwrap(); + let buf = reader.read(0..m.content_length).await.unwrap(); + assert_eq!(&*buf, b"hello world"); let dir_path = stager .get_dir( @@ -751,9 +753,9 @@ mod tests { stager.cache.run_pending_tasks().await; assert!(!stager.in_cache(puffin_file_name, blob_key)); - let mut buf = Vec::new(); - reader.read_to_end(&mut buf).await.unwrap(); - assert_eq!(buf, b"Hello world"); + let m = reader.metadata().await.unwrap(); + let buf = reader.read(0..m.content_length).await.unwrap(); + assert_eq!(&*buf, b"Hello world"); // Second time to get the blob, get from recycle bin let mut reader = stager @@ -772,9 +774,9 @@ mod tests { stager.cache.run_pending_tasks().await; assert!(!stager.in_cache(puffin_file_name, blob_key)); - let mut buf = Vec::new(); - reader.read_to_end(&mut buf).await.unwrap(); - assert_eq!(buf, b"Hello world"); + let m = reader.metadata().await.unwrap(); + let buf = reader.read(0..m.content_length).await.unwrap(); + assert_eq!(&*buf, b"Hello world"); let dir_key = "dir_key"; let files_in_dir = [ diff --git a/src/puffin/src/puffin_manager/tests.rs b/src/puffin/src/puffin_manager/tests.rs index 02073522bece..37b3ce7c65af 100644 --- a/src/puffin/src/puffin_manager/tests.rs +++ b/src/puffin/src/puffin_manager/tests.rs @@ -16,8 +16,8 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use common_base::range_read::{FileReader, RangeReader}; use common_test_util::temp_dir::{create_temp_dir, TempDir}; -use futures::AsyncReadExt as _; use tokio::fs::File; use tokio::io::AsyncReadExt as _; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; @@ -93,10 +93,9 @@ async fn test_put_get_file() { #[tokio::test] async fn test_put_get_files() { let capicities = [1, 16, u64::MAX]; + let compression_codecs = [None, Some(CompressionCodec::Zstd)]; for capacity in capicities { - let compression_codecs = [None, Some(CompressionCodec::Zstd)]; - for compression_codec in compression_codecs { let (_staging_dir, stager) = new_bounded_stager("test_put_get_files_", capacity).await; let file_accessor = Arc::new(MockFileAccessor::new("test_put_get_files_")); @@ -299,9 +298,9 @@ async fn check_blob( ) { let blob = puffin_reader.blob(key).await.unwrap(); let mut reader = blob.reader().await.unwrap(); - let mut buf = Vec::new(); - reader.read_to_end(&mut buf).await.unwrap(); - assert_eq!(buf, raw_data); + let meta = reader.metadata().await.unwrap(); + let bs = reader.read(0..meta.content_length).await.unwrap(); + assert_eq!(&*bs, raw_data); if !compressed { // If the blob is not compressed, it won't be exist in the stager. @@ -386,14 +385,13 @@ impl MockFileAccessor { #[async_trait] impl PuffinFileAccessor for MockFileAccessor { - type Reader = Compat; + type Reader = FileReader; type Writer = Compat; async fn reader(&self, puffin_file_name: &str) -> Result { - let f = tokio::fs::File::open(self.tempdir.path().join(puffin_file_name)) + Ok(FileReader::new(self.tempdir.path().join(puffin_file_name)) .await - .unwrap(); - Ok(f.compat()) + .unwrap()) } async fn writer(&self, puffin_file_name: &str) -> Result { diff --git a/src/puffin/src/tests.rs b/src/puffin/src/tests.rs index bbde67067fcf..a152d4124bd6 100644 --- a/src/puffin/src/tests.rs +++ b/src/puffin/src/tests.rs @@ -15,10 +15,10 @@ use std::collections::HashMap; use std::fs::File; use std::io::{Cursor, Read}; +use std::vec; +use common_base::range_read::{FileReader, RangeReader}; use futures::io::Cursor as AsyncCursor; -use futures::AsyncReadExt; -use tokio_util::compat::TokioAsyncReadCompatExt; use crate::file_format::reader::{AsyncReader, PuffinFileReader, SyncReader}; use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter, SyncWriter}; @@ -38,8 +38,8 @@ fn test_read_empty_puffin_sync() { async fn test_read_empty_puffin_async() { let path = "src/tests/resources/empty-puffin-uncompressed.puffin"; - let file = tokio::fs::File::open(path).await.unwrap(); - let mut reader = PuffinFileReader::new(file.compat()); + let reader = FileReader::new(path).await.unwrap(); + let mut reader = PuffinFileReader::new(reader); let metadata = reader.metadata().await.unwrap(); assert_eq!(metadata.properties.len(), 0); assert_eq!(metadata.blobs.len(), 0); @@ -84,8 +84,8 @@ fn test_sample_metric_data_puffin_sync() { async fn test_sample_metric_data_puffin_async() { let path = "src/tests/resources/sample-metric-data-uncompressed.puffin"; - let file = tokio::fs::File::open(path).await.unwrap(); - let mut reader = PuffinFileReader::new(file.compat()); + let reader = FileReader::new(path).await.unwrap(); + let mut reader = PuffinFileReader::new(reader); let metadata = reader.metadata().await.unwrap(); assert_eq!(metadata.properties.len(), 1); @@ -104,13 +104,11 @@ async fn test_sample_metric_data_puffin_async() { assert_eq!(metadata.blobs[1].length, 83); let mut some_blob = reader.blob_reader(&metadata.blobs[0]).unwrap(); - let mut buf = String::new(); - some_blob.read_to_string(&mut buf).await.unwrap(); - assert_eq!(buf, "abcdefghi"); + let buf = read_all_range(&mut some_blob).await; + assert_eq!(&buf, b"abcdefghi"); let mut some_other_blob = reader.blob_reader(&metadata.blobs[1]).unwrap(); - let mut buf = Vec::new(); - some_other_blob.read_to_end(&mut buf).await.unwrap(); + let buf = read_all_range(&mut some_other_blob).await; let expected = include_bytes!("tests/resources/sample-metric-data.blob"); assert_eq!(buf, expected); } @@ -162,8 +160,7 @@ async fn test_writer_reader_empty_async() { let written_bytes = writer.finish().await.unwrap(); assert!(written_bytes > 0); - let mut buf = AsyncCursor::new(buf.into_inner()); - let mut reader = PuffinFileReader::new(&mut buf); + let mut reader = PuffinFileReader::new(buf.into_inner()); let metadata = reader.metadata().await.unwrap(); assert_eq!(metadata.properties.len(), 1); @@ -287,8 +284,7 @@ async fn test_writer_reader_async() { let written_bytes = writer.finish().await.unwrap(); assert!(written_bytes > 0); - let mut buf = AsyncCursor::new(buf.into_inner()); - let mut reader = PuffinFileReader::new(&mut buf); + let mut reader = PuffinFileReader::new(buf.into_inner()); let metadata = reader.metadata().await.unwrap(); assert_eq!(metadata.properties.len(), 1); @@ -307,16 +303,20 @@ async fn test_writer_reader_async() { assert_eq!(metadata.blobs[1].length, 83); let mut some_blob = reader.blob_reader(&metadata.blobs[0]).unwrap(); - let mut buf = Vec::new(); - some_blob.read_to_end(&mut buf).await.unwrap(); + let buf = read_all_range(&mut some_blob).await; assert_eq!(buf, blob1); let mut some_other_blob = reader.blob_reader(&metadata.blobs[1]).unwrap(); - let mut buf = Vec::new(); - some_other_blob.read_to_end(&mut buf).await.unwrap(); + let buf = read_all_range(&mut some_other_blob).await; assert_eq!(buf, blob2); } test_writer_reader_async(false).await; test_writer_reader_async(true).await; } + +async fn read_all_range(reader: &mut impl RangeReader) -> Vec { + let m = reader.metadata().await.unwrap(); + let buf = reader.read(0..m.content_length).await.unwrap(); + buf.to_vec() +}