diff --git a/src/index/src/inverted_index/error.rs b/src/index/src/inverted_index/error.rs index 07a42b8b8767..49816e63c463 100644 --- a/src/index/src/inverted_index/error.rs +++ b/src/index/src/inverted_index/error.rs @@ -26,14 +26,6 @@ use crate::inverted_index::search::predicate::Predicate; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { - #[snafu(display("Failed to seek"))] - Seek { - #[snafu(source)] - error: IoError, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to read"))] Read { #[snafu(source)] @@ -215,8 +207,7 @@ impl ErrorExt for Error { fn status_code(&self) -> StatusCode { use Error::*; match self { - Seek { .. } - | Read { .. } + Read { .. } | Write { .. } | Flush { .. } | Close { .. } diff --git a/src/puffin/src/error.rs b/src/puffin/src/error.rs index 57aec44d1fb8..d9247fefea3a 100644 --- a/src/puffin/src/error.rs +++ b/src/puffin/src/error.rs @@ -25,14 +25,6 @@ use snafu::{Location, Snafu}; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { - #[snafu(display("Failed to seek"))] - Seek { - #[snafu(source)] - error: IoError, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to read"))] Read { #[snafu(source)] @@ -119,14 +111,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to convert bytes to integer"))] - BytesToInteger { - #[snafu(source)] - error: std::array::TryFromSliceError, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Unsupported decompression: {}", decompression))] UnsupportedDecompression { decompression: String, @@ -150,17 +134,15 @@ pub enum Error { location: Location, }, - #[snafu(display("Parse stage not match, expected: {}, actual: {}", expected, actual))] - ParseStageNotMatch { - expected: String, - actual: String, + #[snafu(display("Unexpected footer payload size: {}", size))] + UnexpectedFooterPayloadSize { + size: i32, #[snafu(implicit)] location: Location, }, - #[snafu(display("Unexpected footer payload size: {}", size))] - UnexpectedFooterPayloadSize { - size: i32, + #[snafu(display("Invalid puffin footer"))] + InvalidPuffinFooter { #[snafu(implicit)] location: Location, }, @@ -177,20 +159,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Invalid blob offset: {}, location: {:?}", offset, location))] - InvalidBlobOffset { - offset: i64, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Invalid blob area end: {}, location: {:?}", offset, location))] - InvalidBlobAreaEnd { - offset: u64, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to compress lz4"))] Lz4Compression { #[snafu(source)] @@ -256,14 +224,19 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Puffin file size too small"))] + PuffinFileSizeTooSmall { + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { fn status_code(&self) -> StatusCode { use Error::*; match self { - Seek { .. } - | Read { .. } + Read { .. } | MagicNotMatched { .. } | DeserializeJson { .. } | Write { .. } @@ -275,18 +248,16 @@ impl ErrorExt for Error { | Remove { .. } | Rename { .. } | SerializeJson { .. } - | BytesToInteger { .. } - | ParseStageNotMatch { .. } | UnexpectedFooterPayloadSize { .. } | UnexpectedPuffinFileSize { .. } - | InvalidBlobOffset { .. } - | InvalidBlobAreaEnd { .. } | Lz4Compression { .. } | Lz4Decompression { .. } | BlobNotFound { .. } | BlobIndexOutOfBound { .. } | FileKeyNotMatch { .. } - | WalkDir { .. } => StatusCode::Unexpected, + | WalkDir { .. } + | InvalidPuffinFooter { .. } + | PuffinFileSizeTooSmall { .. } => StatusCode::Unexpected, UnsupportedCompression { .. } | UnsupportedDecompression { .. } => { StatusCode::Unsupported diff --git a/src/puffin/src/file_format/reader.rs b/src/puffin/src/file_format/reader.rs index 3f48bf4b105e..162d7116a578 100644 --- a/src/puffin/src/file_format/reader.rs +++ b/src/puffin/src/file_format/reader.rs @@ -21,21 +21,9 @@ use common_base::range_read::RangeReader; use crate::blob_metadata::BlobMetadata; use crate::error::Result; pub use crate::file_format::reader::file::PuffinFileReader; +pub use crate::file_format::reader::footer::PuffinFileFooterReader; use crate::file_metadata::FileMetadata; -/// `SyncReader` defines a synchronous reader for puffin data. -pub trait SyncReader<'a> { - type Reader: std::io::Read + std::io::Seek; - - /// Fetches the FileMetadata. - fn metadata(&'a mut self) -> Result; - - /// Reads particular blob data based on given metadata. - /// - /// Data read from the reader is compressed leaving the caller to decompress the data. - fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result; -} - /// `AsyncReader` defines an asynchronous reader for puffin data. #[async_trait] pub trait AsyncReader<'a> { diff --git a/src/puffin/src/file_format/reader/file.rs b/src/puffin/src/file_format/reader/file.rs index 3736ed5d2d8d..31e8e10bc4d5 100644 --- a/src/puffin/src/file_format/reader/file.rs +++ b/src/puffin/src/file_format/reader/file.rs @@ -12,20 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::{self, SeekFrom}; - use async_trait::async_trait; use common_base::range_read::RangeReader; use snafu::{ensure, ResultExt}; use crate::blob_metadata::BlobMetadata; -use crate::error::{ - MagicNotMatchedSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedPuffinFileSizeSnafu, - UnsupportedDecompressionSnafu, -}; -use crate::file_format::reader::footer::FooterParser; -use crate::file_format::reader::{AsyncReader, SyncReader}; -use crate::file_format::{MAGIC, MAGIC_SIZE, MIN_FILE_SIZE}; +use crate::error::{ReadSnafu, Result, UnexpectedPuffinFileSizeSnafu}; +use crate::file_format::reader::footer::DEFAULT_PREFETCH_SIZE; +use crate::file_format::reader::{AsyncReader, PuffinFileFooterReader}; +use crate::file_format::MIN_FILE_SIZE; use crate::file_metadata::FileMetadata; use crate::partial_reader::PartialReader; @@ -72,45 +67,6 @@ impl PuffinFileReader { } } -impl<'a, R: io::Read + io::Seek + 'a> SyncReader<'a> for PuffinFileReader { - type Reader = PartialReader<&'a mut R>; - - fn metadata(&mut self) -> Result { - if let Some(metadata) = &self.metadata { - return Ok(metadata.clone()); - } - - // check the magic - let mut magic = [0; MAGIC_SIZE as usize]; - self.source.read_exact(&mut magic).context(ReadSnafu)?; - ensure!(magic == MAGIC, MagicNotMatchedSnafu); - - let file_size = self.get_file_size_sync()?; - - // parse the footer - let metadata = FooterParser::new(&mut self.source, file_size).parse_sync()?; - self.metadata = Some(metadata.clone()); - Ok(metadata) - } - - fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result { - // TODO(zhongzc): support decompression - let compression = blob_metadata.compression_codec.as_ref(); - ensure!( - compression.is_none(), - UnsupportedDecompressionSnafu { - decompression: compression.unwrap().to_string() - } - ); - - Ok(PartialReader::new( - &mut self.source, - blob_metadata.offset as _, - blob_metadata.length as _, - )) - } -} - #[async_trait] impl<'a, R: RangeReader + 'a> AsyncReader<'a> for PuffinFileReader { type Reader = PartialReader<&'a mut R>; @@ -119,17 +75,10 @@ impl<'a, R: RangeReader + 'a> AsyncReader<'a> for PuffinFileReader { if let Some(metadata) = &self.metadata { return Ok(metadata.clone()); } - - // check the magic - let magic = self.source.read(0..MAGIC_SIZE).await.context(ReadSnafu)?; - ensure!(*magic == MAGIC, MagicNotMatchedSnafu); - let file_size = self.get_file_size_async().await?; - - // parse the footer - let metadata = FooterParser::new(&mut self.source, file_size) - .parse_async() - .await?; + let mut reader = PuffinFileFooterReader::new(&mut self.source, file_size) + .with_prefetch_size(DEFAULT_PREFETCH_SIZE); + let metadata = reader.metadata().await?; self.metadata = Some(metadata.clone()); Ok(metadata) } @@ -143,14 +92,6 @@ impl<'a, R: RangeReader + 'a> AsyncReader<'a> for PuffinFileReader { } } -impl PuffinFileReader { - fn get_file_size_sync(&mut self) -> Result { - let file_size = self.source.seek(SeekFrom::End(0)).context(SeekSnafu)?; - Self::validate_file_size(file_size)?; - Ok(file_size) - } -} - impl PuffinFileReader { async fn get_file_size_async(&mut self) -> Result { let file_size = self diff --git a/src/puffin/src/file_format/reader/footer.rs b/src/puffin/src/file_format/reader/footer.rs index aa764fd32a21..0ec5f50f7e7e 100644 --- a/src/puffin/src/file_format/reader/footer.rs +++ b/src/puffin/src/file_format/reader/footer.rs @@ -12,240 +12,100 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::{self, Cursor, SeekFrom}; +use std::io::Cursor; use common_base::range_read::RangeReader; use snafu::{ensure, ResultExt}; use crate::error::{ - BytesToIntegerSnafu, DeserializeJsonSnafu, InvalidBlobAreaEndSnafu, InvalidBlobOffsetSnafu, - Lz4DecompressionSnafu, MagicNotMatchedSnafu, ParseStageNotMatchSnafu, ReadSnafu, Result, - SeekSnafu, UnexpectedFooterPayloadSizeSnafu, + DeserializeJsonSnafu, InvalidPuffinFooterSnafu, Lz4DecompressionSnafu, MagicNotMatchedSnafu, + PuffinFileSizeTooSmallSnafu, ReadSnafu, Result, UnexpectedFooterPayloadSizeSnafu, }; use crate::file_format::{Flags, FLAGS_SIZE, MAGIC, MAGIC_SIZE, MIN_FILE_SIZE, PAYLOAD_SIZE_SIZE}; use crate::file_metadata::FileMetadata; -/// Parser for the footer of a Puffin data file +/// The default prefetch size for the footer reader. +pub const DEFAULT_PREFETCH_SIZE: u64 = 1024; // 1KiB + +/// Reader for the footer of a Puffin data file /// /// The footer has a specific layout that needs to be read and parsed to /// extract metadata about the file, which is encapsulated in the [`FileMetadata`] type. /// +/// This reader supports prefetching, allowing for more efficient reading +/// of the footer by fetching additional data ahead of time. +/// /// ```text /// Footer layout: HeadMagic Payload PayloadSize Flags FootMagic /// [4] [?] [4] [4] [4] /// ``` -pub struct FooterParser { - // The underlying IO source +pub struct PuffinFileFooterReader { + /// The source of the puffin file source: R, - - // The size of the file, used for calculating offsets to read from + /// The content length of the puffin file file_size: u64, + /// The prefetch footer size + prefetch_size: Option, } -impl FooterParser { - pub fn new(source: R, file_size: u64) -> Self { - Self { source, file_size } - } -} - -impl FooterParser { - /// Parses the footer from the IO source in a synchronous manner. - pub fn parse_sync(&mut self) -> Result { - let mut parser = StageParser::new(self.file_size); - - let mut buf = vec![]; - while let Some(byte_to_read) = parser.next_to_read() { - self.source - .seek(SeekFrom::Start(byte_to_read.offset)) - .context(SeekSnafu)?; - let size = byte_to_read.size as usize; - - buf.resize(size, 0); - let buf = &mut buf[..size]; - - self.source.read_exact(buf).context(ReadSnafu)?; - - parser.consume_bytes(buf)?; - } - - parser.finish() - } -} - -impl FooterParser { - /// Parses the footer from the IO source in a asynchronous manner. - pub async fn parse_async(&mut self) -> Result { - let mut parser = StageParser::new(self.file_size); - - let mut buf = vec![]; - while let Some(byte_to_read) = parser.next_to_read() { - buf.clear(); - let range = byte_to_read.offset..byte_to_read.offset + byte_to_read.size; - self.source - .read_into(range, &mut buf) - .await - .context(ReadSnafu)?; - parser.consume_bytes(&buf)?; - } - - parser.finish() - } -} - -/// The internal stages of parsing the footer. -/// This enum allows the StageParser to keep track of which part -/// of the footer needs to be parsed next. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum ParseStage { - FootMagic, - Flags, - PayloadSize, - Payload, - HeadMagic, - Done, -} - -/// Manages the parsing process of the file's footer. -struct StageParser { - /// Current stage in the parsing sequence of the footer. - stage: ParseStage, - - /// Total file size; used for calculating offsets to read from. - file_size: u64, - - /// Flags from the footer, set when the `Flags` field is parsed. - flags: Flags, - - /// Size of the footer's payload, set when the `PayloadSize` is parsed. - payload_size: u64, - - /// Metadata from the footer's payload, set when the `Payload` is parsed. - metadata: Option, -} - -/// Represents a read operation that needs to be performed, including the -/// offset from the start of the file and the number of bytes to read. -struct BytesToRead { - offset: u64, - size: u64, -} - -impl StageParser { - fn new(file_size: u64) -> Self { +impl<'a, R: RangeReader + 'a> PuffinFileFooterReader { + pub fn new(source: R, content_len: u64) -> Self { Self { - stage: ParseStage::FootMagic, - file_size, - payload_size: 0, - flags: Flags::empty(), - metadata: None, + source, + file_size: content_len, + prefetch_size: None, } } - /// Determines the next segment of bytes to read based on the current parsing stage. - /// This method returns information like the offset and size of the next read, - /// or None if parsing is complete. - fn next_to_read(&self) -> Option { - if self.stage == ParseStage::Done { - return None; - } - - let btr = match self.stage { - ParseStage::FootMagic => BytesToRead { - offset: self.foot_magic_offset(), - size: MAGIC_SIZE, - }, - ParseStage::Flags => BytesToRead { - offset: self.flags_offset(), - size: FLAGS_SIZE, - }, - ParseStage::PayloadSize => BytesToRead { - offset: self.payload_size_offset(), - size: PAYLOAD_SIZE_SIZE, - }, - ParseStage::Payload => BytesToRead { - offset: self.payload_offset(), - size: self.payload_size, - }, - ParseStage::HeadMagic => BytesToRead { - offset: self.head_magic_offset(), - size: MAGIC_SIZE, - }, - ParseStage::Done => unreachable!(), - }; - - Some(btr) + fn prefetch_size(&self) -> u64 { + self.prefetch_size.unwrap_or(MIN_FILE_SIZE) } - /// Processes the bytes that have been read according to the current parsing stage - /// and advances the parsing stage. It ensures the correct sequence of bytes is - /// encountered and stores the necessary information in the `StageParser`. - fn consume_bytes(&mut self, bytes: &[u8]) -> Result<()> { - match self.stage { - ParseStage::FootMagic => { - ensure!(bytes == MAGIC, MagicNotMatchedSnafu); - self.stage = ParseStage::Flags; - } - ParseStage::Flags => { - self.flags = Self::parse_flags(bytes)?; - self.stage = ParseStage::PayloadSize; - } - ParseStage::PayloadSize => { - self.payload_size = Self::parse_payload_size(bytes)?; - self.validate_payload_size()?; - self.stage = ParseStage::Payload; - } - ParseStage::Payload => { - self.metadata = Some(self.parse_payload(bytes)?); - self.validate_metadata()?; - self.stage = ParseStage::HeadMagic; - } - ParseStage::HeadMagic => { - ensure!(bytes == MAGIC, MagicNotMatchedSnafu); - self.stage = ParseStage::Done; - } - ParseStage::Done => unreachable!(), - } - - Ok(()) + pub fn with_prefetch_size(mut self, prefetch_size: u64) -> Self { + self.prefetch_size = Some(prefetch_size.max(MIN_FILE_SIZE)); + self } - /// Finalizes the parsing process, ensuring all stages are complete, and returns - /// the parsed `FileMetadata`. It converts the raw footer payload into structured data. - fn finish(self) -> Result { - ensure!( - self.stage == ParseStage::Done, - ParseStageNotMatchSnafu { - expected: format!("{:?}", ParseStage::Done), - actual: format!("{:?}", self.stage), - } - ); + pub async fn metadata(&'a mut self) -> Result { + ensure!(self.file_size >= MIN_FILE_SIZE, PuffinFileSizeTooSmallSnafu); - Ok(self.metadata.unwrap()) - } + // Note: prefetch > content_len is allowed, since we're using saturating_sub. + let footer_start = self.file_size.saturating_sub(self.prefetch_size()); + let suffix = self + .source + .read(footer_start..self.file_size) + .await + .context(ReadSnafu)?; + let suffix_len = suffix.len(); - fn parse_flags(bytes: &[u8]) -> Result { - let n = u32::from_le_bytes(bytes.try_into().context(BytesToIntegerSnafu)?); - Ok(Flags::from_bits_truncate(n)) - } + // check the magic + let magic = Self::read_tailing_four_bytes(&suffix)?; + ensure!(magic == MAGIC, MagicNotMatchedSnafu); - fn parse_payload_size(bytes: &[u8]) -> Result { - let n = i32::from_le_bytes(bytes.try_into().context(BytesToIntegerSnafu)?); - ensure!(n >= 0, UnexpectedFooterPayloadSizeSnafu { size: n }); - Ok(n as u64) - } + let flags = self.decode_flags(&suffix[..suffix_len - MAGIC_SIZE as usize])?; + let length = self.decode_payload_size( + &suffix[..suffix_len - MAGIC_SIZE as usize - FLAGS_SIZE as usize], + )?; + let footer_size = PAYLOAD_SIZE_SIZE + FLAGS_SIZE + MAGIC_SIZE; - fn validate_payload_size(&self) -> Result<()> { - ensure!( - self.payload_size <= self.file_size - MIN_FILE_SIZE, - UnexpectedFooterPayloadSizeSnafu { - size: self.payload_size as i32 - } - ); - Ok(()) + // Did not fetch the entire file metadata in the initial read, need to make a second request. + if length > suffix_len as u64 - footer_size { + let metadata_start = self.file_size - length - footer_size; + let meta = self + .source + .read(metadata_start..self.file_size - footer_size) + .await + .context(ReadSnafu)?; + self.parse_payload(&flags, &meta) + } else { + let metadata_start = self.file_size - length - footer_size - footer_start; + let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize]; + self.parse_payload(&flags, meta) + } } - fn parse_payload(&self, bytes: &[u8]) -> Result { - if self.flags.contains(Flags::FOOTER_PAYLOAD_COMPRESSED_LZ4) { + fn parse_payload(&self, flags: &Flags, bytes: &[u8]) -> Result { + if flags.contains(Flags::FOOTER_PAYLOAD_COMPRESSED_LZ4) { let decoder = lz4_flex::frame::FrameDecoder::new(Cursor::new(bytes)); let res = serde_json::from_reader(decoder).context(Lz4DecompressionSnafu)?; Ok(res) @@ -254,54 +114,35 @@ impl StageParser { } } - fn validate_metadata(&self) -> Result<()> { - let metadata = self.metadata.as_ref().expect("metadata is not set"); - - let mut next_blob_offset = MAGIC_SIZE; - // check blob offsets - for blob in &metadata.blobs { - ensure!( - blob.offset as u64 == next_blob_offset, - InvalidBlobOffsetSnafu { - offset: blob.offset - } - ); - next_blob_offset += blob.length as u64; - } - - let blob_area_end = metadata - .blobs - .last() - .map_or(MAGIC_SIZE, |b| (b.offset + b.length) as u64); - ensure!( - blob_area_end == self.head_magic_offset(), - InvalidBlobAreaEndSnafu { - offset: blob_area_end - } - ); - - Ok(()) - } + fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> { + let suffix_len = suffix.len(); + ensure!(suffix_len >= 4, InvalidPuffinFooterSnafu); + let mut bytes = [0; 4]; + bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]); - fn foot_magic_offset(&self) -> u64 { - self.file_size - MAGIC_SIZE + Ok(bytes) } - fn flags_offset(&self) -> u64 { - self.file_size - MAGIC_SIZE - FLAGS_SIZE + fn decode_flags(&self, suffix: &[u8]) -> Result { + let flags = u32::from_le_bytes(Self::read_tailing_four_bytes(suffix)?); + Ok(Flags::from_bits_truncate(flags)) } - fn payload_size_offset(&self) -> u64 { - self.file_size - MAGIC_SIZE - FLAGS_SIZE - PAYLOAD_SIZE_SIZE - } + fn decode_payload_size(&self, suffix: &[u8]) -> Result { + let payload_size = i32::from_le_bytes(Self::read_tailing_four_bytes(suffix)?); - fn payload_offset(&self) -> u64 { - // `validate_payload_size` ensures that this subtraction will not overflow - self.file_size - MAGIC_SIZE - FLAGS_SIZE - PAYLOAD_SIZE_SIZE - self.payload_size - } + ensure!( + payload_size >= 0, + UnexpectedFooterPayloadSizeSnafu { size: payload_size } + ); + let payload_size = payload_size as u64; + ensure!( + payload_size <= self.file_size - MIN_FILE_SIZE, + UnexpectedFooterPayloadSizeSnafu { + size: self.file_size as i32 + } + ); - fn head_magic_offset(&self) -> u64 { - // `validate_payload_size` ensures that this subtraction will not overflow - self.file_size - MAGIC_SIZE * 2 - FLAGS_SIZE - PAYLOAD_SIZE_SIZE - self.payload_size + Ok(payload_size) } } diff --git a/src/puffin/src/tests.rs b/src/puffin/src/tests.rs index a152d4124bd6..a3bb48587924 100644 --- a/src/puffin/src/tests.rs +++ b/src/puffin/src/tests.rs @@ -13,26 +13,14 @@ // limitations under the License. use std::collections::HashMap; -use std::fs::File; -use std::io::{Cursor, Read}; use std::vec; use common_base::range_read::{FileReader, RangeReader}; use futures::io::Cursor as AsyncCursor; -use crate::file_format::reader::{AsyncReader, PuffinFileReader, SyncReader}; -use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter, SyncWriter}; - -#[test] -fn test_read_empty_puffin_sync() { - let path = "src/tests/resources/empty-puffin-uncompressed.puffin"; - - let file = File::open(path).unwrap(); - let mut reader = PuffinFileReader::new(file); - let metadata = reader.metadata().unwrap(); - assert_eq!(metadata.properties.len(), 0); - assert_eq!(metadata.blobs.len(), 0); -} +use crate::file_format::reader::{AsyncReader, PuffinFileFooterReader, PuffinFileReader}; +use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter}; +use crate::file_metadata::FileMetadata; #[tokio::test] async fn test_read_empty_puffin_async() { @@ -45,39 +33,37 @@ async fn test_read_empty_puffin_async() { assert_eq!(metadata.blobs.len(), 0); } -#[test] -fn test_sample_metric_data_puffin_sync() { - let path = "src/tests/resources/sample-metric-data-uncompressed.puffin"; - - let file = File::open(path).unwrap(); - let mut reader = PuffinFileReader::new(file); - let metadata = reader.metadata().unwrap(); - - assert_eq!(metadata.properties.len(), 1); - assert_eq!( - metadata.properties.get("created-by"), - Some(&"Test 1234".to_string()) - ); - - assert_eq!(metadata.blobs.len(), 2); - assert_eq!(metadata.blobs[0].blob_type, "some-blob"); - assert_eq!(metadata.blobs[0].offset, 4); - assert_eq!(metadata.blobs[0].length, 9); - - assert_eq!(metadata.blobs[1].blob_type, "some-other-blob"); - assert_eq!(metadata.blobs[1].offset, 13); - assert_eq!(metadata.blobs[1].length, 83); +async fn test_read_puffin_file_metadata( + path: &str, + file_size: u64, + expeccted_metadata: FileMetadata, +) { + for prefetch_size in [0, file_size / 2, file_size, file_size + 10] { + let reader = FileReader::new(path).await.unwrap(); + let mut footer_reader = PuffinFileFooterReader::new(reader, file_size); + if prefetch_size > 0 { + footer_reader = footer_reader.with_prefetch_size(prefetch_size); + } + let metadata = footer_reader.metadata().await.unwrap(); + assert_eq!(metadata.properties, expeccted_metadata.properties,); + assert_eq!(metadata.blobs, expeccted_metadata.blobs); + } +} - let mut some_blob = reader.blob_reader(&metadata.blobs[0]).unwrap(); - let mut buf = String::new(); - some_blob.read_to_string(&mut buf).unwrap(); - assert_eq!(buf, "abcdefghi"); +#[tokio::test] +async fn test_read_puffin_file_metadata_async() { + let paths = vec![ + "src/tests/resources/empty-puffin-uncompressed.puffin", + "src/tests/resources/sample-metric-data-uncompressed.puffin", + ]; + for path in paths { + let mut reader = FileReader::new(path).await.unwrap(); + let file_size = reader.metadata().await.unwrap().content_length; + let mut reader = PuffinFileReader::new(reader); + let metadata = reader.metadata().await.unwrap(); - let mut some_other_blob = reader.blob_reader(&metadata.blobs[1]).unwrap(); - let mut buf = Vec::new(); - some_other_blob.read_to_end(&mut buf).unwrap(); - let expected = include_bytes!("tests/resources/sample-metric-data.blob"); - assert_eq!(buf, expected); + test_read_puffin_file_metadata(path, file_size, metadata).await; + } } #[tokio::test] @@ -113,38 +99,6 @@ async fn test_sample_metric_data_puffin_async() { assert_eq!(buf, expected); } -#[test] -fn test_writer_reader_with_empty_sync() { - fn test_writer_reader_with_empty_sync(footer_compressed: bool) { - let mut buf = Cursor::new(vec![]); - - let mut writer = PuffinFileWriter::new(&mut buf); - writer.set_properties(HashMap::from([( - "created-by".to_string(), - "Test 1234".to_string(), - )])); - - writer.set_footer_lz4_compressed(footer_compressed); - let written_bytes = writer.finish().unwrap(); - assert!(written_bytes > 0); - - let mut buf = Cursor::new(buf.into_inner()); - let mut reader = PuffinFileReader::new(&mut buf); - let metadata = reader.metadata().unwrap(); - - assert_eq!(metadata.properties.len(), 1); - assert_eq!( - metadata.properties.get("created-by"), - Some(&"Test 1234".to_string()) - ); - - assert_eq!(metadata.blobs.len(), 0); - } - - test_writer_reader_with_empty_sync(false); - test_writer_reader_with_empty_sync(true); -} - #[tokio::test] async fn test_writer_reader_empty_async() { async fn test_writer_reader_empty_async(footer_compressed: bool) { @@ -176,76 +130,6 @@ async fn test_writer_reader_empty_async() { test_writer_reader_empty_async(true).await; } -#[test] -fn test_writer_reader_sync() { - fn test_writer_reader_sync(footer_compressed: bool) { - let mut buf = Cursor::new(vec![]); - - let mut writer = PuffinFileWriter::new(&mut buf); - - let blob1 = "abcdefghi"; - writer - .add_blob(Blob { - compressed_data: Cursor::new(&blob1), - blob_type: "some-blob".to_string(), - properties: Default::default(), - compression_codec: None, - }) - .unwrap(); - - let blob2 = include_bytes!("tests/resources/sample-metric-data.blob"); - writer - .add_blob(Blob { - compressed_data: Cursor::new(&blob2), - blob_type: "some-other-blob".to_string(), - properties: Default::default(), - compression_codec: None, - }) - .unwrap(); - - writer.set_properties(HashMap::from([( - "created-by".to_string(), - "Test 1234".to_string(), - )])); - - writer.set_footer_lz4_compressed(footer_compressed); - let written_bytes = writer.finish().unwrap(); - assert!(written_bytes > 0); - - let mut buf = Cursor::new(buf.into_inner()); - let mut reader = PuffinFileReader::new(&mut buf); - let metadata = reader.metadata().unwrap(); - - assert_eq!(metadata.properties.len(), 1); - assert_eq!( - metadata.properties.get("created-by"), - Some(&"Test 1234".to_string()) - ); - - assert_eq!(metadata.blobs.len(), 2); - assert_eq!(metadata.blobs[0].blob_type, "some-blob"); - assert_eq!(metadata.blobs[0].offset, 4); - assert_eq!(metadata.blobs[0].length, 9); - - assert_eq!(metadata.blobs[1].blob_type, "some-other-blob"); - assert_eq!(metadata.blobs[1].offset, 13); - assert_eq!(metadata.blobs[1].length, 83); - - let mut some_blob = reader.blob_reader(&metadata.blobs[0]).unwrap(); - let mut buf = String::new(); - some_blob.read_to_string(&mut buf).unwrap(); - assert_eq!(buf, blob1); - - let mut some_other_blob = reader.blob_reader(&metadata.blobs[1]).unwrap(); - let mut buf = Vec::new(); - some_other_blob.read_to_end(&mut buf).unwrap(); - assert_eq!(buf, blob2); - } - - test_writer_reader_sync(false); - test_writer_reader_sync(true); -} - #[tokio::test] async fn test_writer_reader_async() { async fn test_writer_reader_async(footer_compressed: bool) {