From ad010b97a0c74590c6b8db94032c3cecd3ef68cc Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 11 Dec 2024 13:49:04 +0000 Subject: [PATCH] feat: introduce `PuffinFileFooterReader` --- src/puffin/src/file_format/reader.rs | 1 + src/puffin/src/file_format/reader/footer.rs | 116 ++++++++++++++++++++ src/puffin/src/tests.rs | 36 +++++- 3 files changed, 152 insertions(+), 1 deletion(-) diff --git a/src/puffin/src/file_format/reader.rs b/src/puffin/src/file_format/reader.rs index 3f48bf4b105e..779098567efe 100644 --- a/src/puffin/src/file_format/reader.rs +++ b/src/puffin/src/file_format/reader.rs @@ -21,6 +21,7 @@ use common_base::range_read::RangeReader; use crate::blob_metadata::BlobMetadata; use crate::error::Result; pub use crate::file_format::reader::file::PuffinFileReader; +pub use crate::file_format::reader::footer::PuffinFileFooterReader; use crate::file_metadata::FileMetadata; /// `SyncReader` defines a synchronous reader for puffin data. diff --git a/src/puffin/src/file_format/reader/footer.rs b/src/puffin/src/file_format/reader/footer.rs index aa764fd32a21..0ad875ed8981 100644 --- a/src/puffin/src/file_format/reader/footer.rs +++ b/src/puffin/src/file_format/reader/footer.rs @@ -305,3 +305,119 @@ impl StageParser { self.file_size - MAGIC_SIZE * 2 - FLAGS_SIZE - PAYLOAD_SIZE_SIZE - self.payload_size } } + +/// Reader for the footer of a Puffin data file +/// +/// The footer has a specific layout that needs to be read and parsed to +/// extract metadata about the file, which is encapsulated in the [`FileMetadata`] type. +/// +/// This reader supports prefetching, allowing for more efficient reading +/// of the footer by fetching additional data ahead of time. +/// +/// ```text +/// Footer layout: HeadMagic Payload PayloadSize Flags FootMagic +/// [4] [?] [4] [4] [4] +/// ``` +pub struct PuffinFileFooterReader { + /// The source of the puffin file + source: R, + /// The content length of the puffin file + file_size: u64, + /// The prefetch footer size + prefetch_size: Option, +} + +impl<'a, R: RangeReader + 'a> PuffinFileFooterReader { + pub fn new(source: R, content_len: u64) -> Self { + Self { + source, + file_size: content_len, + prefetch_size: None, + } + } + + pub fn with_prefetch_size(mut self, prefetch_size: u64) -> Self { + self.prefetch_size = Some(prefetch_size.max(MIN_FILE_SIZE)); + self + } + + pub async fn metadata(&'a mut self) -> Result { + // Note: prefetch > content_len is allowed, since we're using saturating_sub. + let footer_start = self + .file_size + .saturating_sub(self.prefetch_size.unwrap_or_default()); + + let suffix = self + .source + .read(footer_start..self.file_size) + .await + .context(ReadSnafu)?; + let suffix_len = suffix.len(); + + // check the magic + let magic = Self::read_tailing_four_bytes(&suffix); + ensure!(magic == MAGIC, MagicNotMatchedSnafu); + + let flags = self.decode_flags(&suffix[..suffix_len - MAGIC_SIZE as usize]); + let length = self.decode_payload_size( + &suffix[..suffix_len - MAGIC_SIZE as usize - FLAGS_SIZE as usize], + )?; + let footer_size = PAYLOAD_SIZE_SIZE + FLAGS_SIZE + MAGIC_SIZE; + + // Did not fetch the entire file metadata in the initial read, need to make a second request. + if length > suffix_len as u64 - footer_size { + let metadata_start = self.file_size - length - footer_size; + let meta = self + .source + .read(metadata_start..self.file_size - footer_size) + .await + .context(ReadSnafu)?; + self.parse_payload(&flags, &meta) + } else { + let metadata_start = self.file_size - length - footer_size - footer_start; + let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize]; + self.parse_payload(&flags, meta) + } + } + + fn parse_payload(&self, flags: &Flags, bytes: &[u8]) -> Result { + if flags.contains(Flags::FOOTER_PAYLOAD_COMPRESSED_LZ4) { + let decoder = lz4_flex::frame::FrameDecoder::new(Cursor::new(bytes)); + let res = serde_json::from_reader(decoder).context(Lz4DecompressionSnafu)?; + Ok(res) + } else { + serde_json::from_slice(bytes).context(DeserializeJsonSnafu) + } + } + + fn read_tailing_four_bytes(suffix: &[u8]) -> [u8; 4] { + let suffix_len = suffix.len(); + let mut bytes = [0; 4]; + bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]); + + bytes + } + + fn decode_flags(&self, suffix: &[u8]) -> Flags { + let flags = u32::from_le_bytes(Self::read_tailing_four_bytes(suffix)); + Flags::from_bits_truncate(flags) + } + + fn decode_payload_size(&self, suffix: &[u8]) -> Result { + let payload_size = i32::from_le_bytes(Self::read_tailing_four_bytes(suffix)); + + ensure!( + payload_size >= 0, + UnexpectedFooterPayloadSizeSnafu { size: payload_size } + ); + let payload_size = payload_size as u64; + ensure!( + payload_size <= self.file_size - MIN_FILE_SIZE, + UnexpectedFooterPayloadSizeSnafu { + size: self.file_size as i32 + } + ); + + Ok(payload_size) + } +} diff --git a/src/puffin/src/tests.rs b/src/puffin/src/tests.rs index a152d4124bd6..af8476404c36 100644 --- a/src/puffin/src/tests.rs +++ b/src/puffin/src/tests.rs @@ -20,8 +20,11 @@ use std::vec; use common_base::range_read::{FileReader, RangeReader}; use futures::io::Cursor as AsyncCursor; -use crate::file_format::reader::{AsyncReader, PuffinFileReader, SyncReader}; +use crate::file_format::reader::{ + AsyncReader, PuffinFileFooterReader, PuffinFileReader, SyncReader, +}; use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter, SyncWriter}; +use crate::file_metadata::FileMetadata; #[test] fn test_read_empty_puffin_sync() { @@ -45,6 +48,37 @@ async fn test_read_empty_puffin_async() { assert_eq!(metadata.blobs.len(), 0); } +async fn test_read_puffin_file_metadata( + path: &str, + file_size: u64, + expeccted_metadata: FileMetadata, +) { + for prefetch_size in [0, file_size / 2, file_size, file_size + 10] { + let reader = FileReader::new(path).await.unwrap(); + let mut footer_reader = + PuffinFileFooterReader::new(reader, file_size).with_prefetch_size(prefetch_size); + let metadata = footer_reader.metadata().await.unwrap(); + assert_eq!(metadata.properties, expeccted_metadata.properties,); + assert_eq!(metadata.blobs, expeccted_metadata.blobs); + } +} + +#[tokio::test] +async fn test_read_puffin_file_metadata_async() { + let paths = vec![ + "src/tests/resources/empty-puffin-uncompressed.puffin", + "src/tests/resources/sample-metric-data-uncompressed.puffin", + ]; + for path in paths { + let mut reader = FileReader::new(path).await.unwrap(); + let file_size = reader.metadata().await.unwrap().content_length; + let mut reader = PuffinFileReader::new(reader); + let metadata = reader.metadata().await.unwrap(); + + test_read_puffin_file_metadata(path, file_size, metadata).await; + } +} + #[test] fn test_sample_metric_data_puffin_sync() { let path = "src/tests/resources/sample-metric-data-uncompressed.puffin";