Skip to content

Commit

Permalink
feat: introduce PuffinFileFooterReader
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 11, 2024
1 parent 1a8e77a commit ad010b9
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/puffin/src/file_format/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use common_base::range_read::RangeReader;
use crate::blob_metadata::BlobMetadata;
use crate::error::Result;
pub use crate::file_format::reader::file::PuffinFileReader;
pub use crate::file_format::reader::footer::PuffinFileFooterReader;
use crate::file_metadata::FileMetadata;

/// `SyncReader` defines a synchronous reader for puffin data.
Expand Down
116 changes: 116 additions & 0 deletions src/puffin/src/file_format/reader/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,119 @@ impl StageParser {
self.file_size - MAGIC_SIZE * 2 - FLAGS_SIZE - PAYLOAD_SIZE_SIZE - self.payload_size
}
}

/// Reader for the footer of a Puffin data file
///
/// The footer has a specific layout that needs to be read and parsed to
/// extract metadata about the file, which is encapsulated in the [`FileMetadata`] type.
///
/// This reader supports prefetching, allowing for more efficient reading
/// of the footer by fetching additional data ahead of time.
///
/// ```text
/// Footer layout: HeadMagic Payload PayloadSize Flags FootMagic
/// [4] [?] [4] [4] [4]
/// ```
pub struct PuffinFileFooterReader<R> {
/// The source of the puffin file
source: R,
/// The content length of the puffin file
file_size: u64,
/// The prefetch footer size
prefetch_size: Option<u64>,
}

impl<'a, R: RangeReader + 'a> PuffinFileFooterReader<R> {
pub fn new(source: R, content_len: u64) -> Self {
Self {
source,
file_size: content_len,
prefetch_size: None,
}
}

pub fn with_prefetch_size(mut self, prefetch_size: u64) -> Self {
self.prefetch_size = Some(prefetch_size.max(MIN_FILE_SIZE));
self
}

pub async fn metadata(&'a mut self) -> Result<FileMetadata> {
// Note: prefetch > content_len is allowed, since we're using saturating_sub.
let footer_start = self
.file_size
.saturating_sub(self.prefetch_size.unwrap_or_default());

let suffix = self
.source
.read(footer_start..self.file_size)
.await
.context(ReadSnafu)?;
let suffix_len = suffix.len();

// check the magic
let magic = Self::read_tailing_four_bytes(&suffix);
ensure!(magic == MAGIC, MagicNotMatchedSnafu);

let flags = self.decode_flags(&suffix[..suffix_len - MAGIC_SIZE as usize]);
let length = self.decode_payload_size(
&suffix[..suffix_len - MAGIC_SIZE as usize - FLAGS_SIZE as usize],
)?;
let footer_size = PAYLOAD_SIZE_SIZE + FLAGS_SIZE + MAGIC_SIZE;

// Did not fetch the entire file metadata in the initial read, need to make a second request.
if length > suffix_len as u64 - footer_size {
let metadata_start = self.file_size - length - footer_size;
let meta = self
.source
.read(metadata_start..self.file_size - footer_size)
.await
.context(ReadSnafu)?;
self.parse_payload(&flags, &meta)
} else {
let metadata_start = self.file_size - length - footer_size - footer_start;
let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize];
self.parse_payload(&flags, meta)
}
}

fn parse_payload(&self, flags: &Flags, bytes: &[u8]) -> Result<FileMetadata> {
if flags.contains(Flags::FOOTER_PAYLOAD_COMPRESSED_LZ4) {
let decoder = lz4_flex::frame::FrameDecoder::new(Cursor::new(bytes));
let res = serde_json::from_reader(decoder).context(Lz4DecompressionSnafu)?;
Ok(res)
} else {
serde_json::from_slice(bytes).context(DeserializeJsonSnafu)
}
}

fn read_tailing_four_bytes(suffix: &[u8]) -> [u8; 4] {
let suffix_len = suffix.len();
let mut bytes = [0; 4];
bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]);

bytes
}

fn decode_flags(&self, suffix: &[u8]) -> Flags {
let flags = u32::from_le_bytes(Self::read_tailing_four_bytes(suffix));
Flags::from_bits_truncate(flags)
}

fn decode_payload_size(&self, suffix: &[u8]) -> Result<u64> {
let payload_size = i32::from_le_bytes(Self::read_tailing_four_bytes(suffix));

ensure!(
payload_size >= 0,
UnexpectedFooterPayloadSizeSnafu { size: payload_size }
);
let payload_size = payload_size as u64;
ensure!(
payload_size <= self.file_size - MIN_FILE_SIZE,
UnexpectedFooterPayloadSizeSnafu {
size: self.file_size as i32
}
);

Ok(payload_size)
}
}
36 changes: 35 additions & 1 deletion src/puffin/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ use std::vec;
use common_base::range_read::{FileReader, RangeReader};
use futures::io::Cursor as AsyncCursor;

use crate::file_format::reader::{AsyncReader, PuffinFileReader, SyncReader};
use crate::file_format::reader::{
AsyncReader, PuffinFileFooterReader, PuffinFileReader, SyncReader,
};
use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter, SyncWriter};
use crate::file_metadata::FileMetadata;

#[test]
fn test_read_empty_puffin_sync() {
Expand All @@ -45,6 +48,37 @@ async fn test_read_empty_puffin_async() {
assert_eq!(metadata.blobs.len(), 0);
}

async fn test_read_puffin_file_metadata(
path: &str,
file_size: u64,
expeccted_metadata: FileMetadata,
) {
for prefetch_size in [0, file_size / 2, file_size, file_size + 10] {
let reader = FileReader::new(path).await.unwrap();
let mut footer_reader =
PuffinFileFooterReader::new(reader, file_size).with_prefetch_size(prefetch_size);
let metadata = footer_reader.metadata().await.unwrap();
assert_eq!(metadata.properties, expeccted_metadata.properties,);
assert_eq!(metadata.blobs, expeccted_metadata.blobs);
}
}

#[tokio::test]
async fn test_read_puffin_file_metadata_async() {
let paths = vec![
"src/tests/resources/empty-puffin-uncompressed.puffin",
"src/tests/resources/sample-metric-data-uncompressed.puffin",
];
for path in paths {
let mut reader = FileReader::new(path).await.unwrap();
let file_size = reader.metadata().await.unwrap().content_length;
let mut reader = PuffinFileReader::new(reader);
let metadata = reader.metadata().await.unwrap();

test_read_puffin_file_metadata(path, file_size, metadata).await;
}
}

#[test]
fn test_sample_metric_data_puffin_sync() {
let path = "src/tests/resources/sample-metric-data-uncompressed.puffin";
Expand Down

0 comments on commit ad010b9

Please sign in to comment.