Skip to content

Commit

Permalink
refactor: remove SyncReader trait and impl
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 11, 2024
1 parent 87dda32 commit 980be0c
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 247 deletions.
13 changes: 0 additions & 13 deletions src/puffin/src/file_format/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,6 @@ pub use crate::file_format::reader::file::PuffinFileReader;
pub use crate::file_format::reader::footer::PuffinFileFooterReader;
use crate::file_metadata::FileMetadata;

/// `SyncReader` defines a synchronous reader for puffin data.
pub trait SyncReader<'a> {
type Reader: std::io::Read + std::io::Seek;

/// Fetches the FileMetadata.
fn metadata(&'a mut self) -> Result<FileMetadata>;

/// Reads particular blob data based on given metadata.
///
/// Data read from the reader is compressed leaving the caller to decompress the data.
fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result<Self::Reader>;
}

/// `AsyncReader` defines an asynchronous reader for puffin data.
#[async_trait]
pub trait AsyncReader<'a> {
Expand Down
56 changes: 2 additions & 54 deletions src/puffin/src/file_format/reader/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::{self, SeekFrom};

use async_trait::async_trait;
use common_base::range_read::RangeReader;
use snafu::{ensure, ResultExt};

use crate::blob_metadata::BlobMetadata;
use crate::error::{
MagicNotMatchedSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedPuffinFileSizeSnafu,
UnsupportedDecompressionSnafu,
};
use crate::error::{MagicNotMatchedSnafu, ReadSnafu, Result, UnexpectedPuffinFileSizeSnafu};
use crate::file_format::reader::footer::FooterParser;
use crate::file_format::reader::{AsyncReader, SyncReader};
use crate::file_format::reader::AsyncReader;
use crate::file_format::{MAGIC, MAGIC_SIZE, MIN_FILE_SIZE};
use crate::file_metadata::FileMetadata;
use crate::partial_reader::PartialReader;
Expand Down Expand Up @@ -72,45 +67,6 @@ impl<R> PuffinFileReader<R> {
}
}

impl<'a, R: io::Read + io::Seek + 'a> SyncReader<'a> for PuffinFileReader<R> {
type Reader = PartialReader<&'a mut R>;

fn metadata(&mut self) -> Result<FileMetadata> {
if let Some(metadata) = &self.metadata {
return Ok(metadata.clone());
}

// check the magic
let mut magic = [0; MAGIC_SIZE as usize];
self.source.read_exact(&mut magic).context(ReadSnafu)?;
ensure!(magic == MAGIC, MagicNotMatchedSnafu);

let file_size = self.get_file_size_sync()?;

// parse the footer
let metadata = FooterParser::new(&mut self.source, file_size).parse_sync()?;
self.metadata = Some(metadata.clone());
Ok(metadata)
}

fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result<Self::Reader> {
// TODO(zhongzc): support decompression
let compression = blob_metadata.compression_codec.as_ref();
ensure!(
compression.is_none(),
UnsupportedDecompressionSnafu {
decompression: compression.unwrap().to_string()
}
);

Ok(PartialReader::new(
&mut self.source,
blob_metadata.offset as _,
blob_metadata.length as _,
))
}
}

#[async_trait]
impl<'a, R: RangeReader + 'a> AsyncReader<'a> for PuffinFileReader<R> {
type Reader = PartialReader<&'a mut R>;
Expand Down Expand Up @@ -143,14 +99,6 @@ impl<'a, R: RangeReader + 'a> AsyncReader<'a> for PuffinFileReader<R> {
}
}

impl<R: io::Read + io::Seek> PuffinFileReader<R> {
fn get_file_size_sync(&mut self) -> Result<u64> {
let file_size = self.source.seek(SeekFrom::End(0)).context(SeekSnafu)?;
Self::validate_file_size(file_size)?;
Ok(file_size)
}
}

impl<R: RangeReader> PuffinFileReader<R> {
async fn get_file_size_async(&mut self) -> Result<u64> {
let file_size = self
Expand Down
28 changes: 2 additions & 26 deletions src/puffin/src/file_format/reader/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::{self, Cursor, SeekFrom};
use std::io::Cursor;

use common_base::range_read::RangeReader;
use snafu::{ensure, ResultExt};

use crate::error::{
BytesToIntegerSnafu, DeserializeJsonSnafu, InvalidBlobAreaEndSnafu, InvalidBlobOffsetSnafu,
InvalidPuffinFooterSnafu, Lz4DecompressionSnafu, MagicNotMatchedSnafu, ParseStageNotMatchSnafu,
ReadSnafu, Result, SeekSnafu, UnexpectedFooterPayloadSizeSnafu,
ReadSnafu, Result, UnexpectedFooterPayloadSizeSnafu,
};
use crate::file_format::{Flags, FLAGS_SIZE, MAGIC, MAGIC_SIZE, MIN_FILE_SIZE, PAYLOAD_SIZE_SIZE};
use crate::file_metadata::FileMetadata;
Expand Down Expand Up @@ -48,30 +48,6 @@ impl<R> FooterParser<R> {
}
}

impl<R: io::Read + io::Seek> FooterParser<R> {
/// Parses the footer from the IO source in a synchronous manner.
pub fn parse_sync(&mut self) -> Result<FileMetadata> {
let mut parser = StageParser::new(self.file_size);

let mut buf = vec![];
while let Some(byte_to_read) = parser.next_to_read() {
self.source
.seek(SeekFrom::Start(byte_to_read.offset))
.context(SeekSnafu)?;
let size = byte_to_read.size as usize;

buf.resize(size, 0);
let buf = &mut buf[..size];

self.source.read_exact(buf).context(ReadSnafu)?;

parser.consume_bytes(buf)?;
}

parser.finish()
}
}

impl<R: RangeReader> FooterParser<R> {
/// Parses the footer from the IO source in a asynchronous manner.
pub async fn parse_async(&mut self) -> Result<FileMetadata> {
Expand Down
156 changes: 2 additions & 154 deletions src/puffin/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,15 @@
// limitations under the License.

use std::collections::HashMap;
use std::fs::File;
use std::io::{Cursor, Read};
use std::vec;

use common_base::range_read::{FileReader, RangeReader};
use futures::io::Cursor as AsyncCursor;

use crate::file_format::reader::{
AsyncReader, PuffinFileFooterReader, PuffinFileReader, SyncReader,
};
use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter, SyncWriter};
use crate::file_format::reader::{AsyncReader, PuffinFileFooterReader, PuffinFileReader};
use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter};
use crate::file_metadata::FileMetadata;

#[test]
fn test_read_empty_puffin_sync() {
let path = "src/tests/resources/empty-puffin-uncompressed.puffin";

let file = File::open(path).unwrap();
let mut reader = PuffinFileReader::new(file);
let metadata = reader.metadata().unwrap();
assert_eq!(metadata.properties.len(), 0);
assert_eq!(metadata.blobs.len(), 0);
}

#[tokio::test]
async fn test_read_empty_puffin_async() {
let path = "src/tests/resources/empty-puffin-uncompressed.puffin";
Expand Down Expand Up @@ -81,41 +66,6 @@ async fn test_read_puffin_file_metadata_async() {
}
}

#[test]
fn test_sample_metric_data_puffin_sync() {
let path = "src/tests/resources/sample-metric-data-uncompressed.puffin";

let file = File::open(path).unwrap();
let mut reader = PuffinFileReader::new(file);
let metadata = reader.metadata().unwrap();

assert_eq!(metadata.properties.len(), 1);
assert_eq!(
metadata.properties.get("created-by"),
Some(&"Test 1234".to_string())
);

assert_eq!(metadata.blobs.len(), 2);
assert_eq!(metadata.blobs[0].blob_type, "some-blob");
assert_eq!(metadata.blobs[0].offset, 4);
assert_eq!(metadata.blobs[0].length, 9);

assert_eq!(metadata.blobs[1].blob_type, "some-other-blob");
assert_eq!(metadata.blobs[1].offset, 13);
assert_eq!(metadata.blobs[1].length, 83);

let mut some_blob = reader.blob_reader(&metadata.blobs[0]).unwrap();
let mut buf = String::new();
some_blob.read_to_string(&mut buf).unwrap();
assert_eq!(buf, "abcdefghi");

let mut some_other_blob = reader.blob_reader(&metadata.blobs[1]).unwrap();
let mut buf = Vec::new();
some_other_blob.read_to_end(&mut buf).unwrap();
let expected = include_bytes!("tests/resources/sample-metric-data.blob");
assert_eq!(buf, expected);
}

#[tokio::test]
async fn test_sample_metric_data_puffin_async() {
let path = "src/tests/resources/sample-metric-data-uncompressed.puffin";
Expand Down Expand Up @@ -149,38 +99,6 @@ async fn test_sample_metric_data_puffin_async() {
assert_eq!(buf, expected);
}

#[test]
fn test_writer_reader_with_empty_sync() {
fn test_writer_reader_with_empty_sync(footer_compressed: bool) {
let mut buf = Cursor::new(vec![]);

let mut writer = PuffinFileWriter::new(&mut buf);
writer.set_properties(HashMap::from([(
"created-by".to_string(),
"Test 1234".to_string(),
)]));

writer.set_footer_lz4_compressed(footer_compressed);
let written_bytes = writer.finish().unwrap();
assert!(written_bytes > 0);

let mut buf = Cursor::new(buf.into_inner());
let mut reader = PuffinFileReader::new(&mut buf);
let metadata = reader.metadata().unwrap();

assert_eq!(metadata.properties.len(), 1);
assert_eq!(
metadata.properties.get("created-by"),
Some(&"Test 1234".to_string())
);

assert_eq!(metadata.blobs.len(), 0);
}

test_writer_reader_with_empty_sync(false);
test_writer_reader_with_empty_sync(true);
}

#[tokio::test]
async fn test_writer_reader_empty_async() {
async fn test_writer_reader_empty_async(footer_compressed: bool) {
Expand Down Expand Up @@ -212,76 +130,6 @@ async fn test_writer_reader_empty_async() {
test_writer_reader_empty_async(true).await;
}

#[test]
fn test_writer_reader_sync() {
fn test_writer_reader_sync(footer_compressed: bool) {
let mut buf = Cursor::new(vec![]);

let mut writer = PuffinFileWriter::new(&mut buf);

let blob1 = "abcdefghi";
writer
.add_blob(Blob {
compressed_data: Cursor::new(&blob1),
blob_type: "some-blob".to_string(),
properties: Default::default(),
compression_codec: None,
})
.unwrap();

let blob2 = include_bytes!("tests/resources/sample-metric-data.blob");
writer
.add_blob(Blob {
compressed_data: Cursor::new(&blob2),
blob_type: "some-other-blob".to_string(),
properties: Default::default(),
compression_codec: None,
})
.unwrap();

writer.set_properties(HashMap::from([(
"created-by".to_string(),
"Test 1234".to_string(),
)]));

writer.set_footer_lz4_compressed(footer_compressed);
let written_bytes = writer.finish().unwrap();
assert!(written_bytes > 0);

let mut buf = Cursor::new(buf.into_inner());
let mut reader = PuffinFileReader::new(&mut buf);
let metadata = reader.metadata().unwrap();

assert_eq!(metadata.properties.len(), 1);
assert_eq!(
metadata.properties.get("created-by"),
Some(&"Test 1234".to_string())
);

assert_eq!(metadata.blobs.len(), 2);
assert_eq!(metadata.blobs[0].blob_type, "some-blob");
assert_eq!(metadata.blobs[0].offset, 4);
assert_eq!(metadata.blobs[0].length, 9);

assert_eq!(metadata.blobs[1].blob_type, "some-other-blob");
assert_eq!(metadata.blobs[1].offset, 13);
assert_eq!(metadata.blobs[1].length, 83);

let mut some_blob = reader.blob_reader(&metadata.blobs[0]).unwrap();
let mut buf = String::new();
some_blob.read_to_string(&mut buf).unwrap();
assert_eq!(buf, blob1);

let mut some_other_blob = reader.blob_reader(&metadata.blobs[1]).unwrap();
let mut buf = Vec::new();
some_other_blob.read_to_end(&mut buf).unwrap();
assert_eq!(buf, blob2);
}

test_writer_reader_sync(false);
test_writer_reader_sync(true);
}

#[tokio::test]
async fn test_writer_reader_async() {
async fn test_writer_reader_async(footer_compressed: bool) {
Expand Down

0 comments on commit 980be0c

Please sign in to comment.