From 980be0cf54ec4b7808a479d425d0f5b4906edee4 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 11 Dec 2024 16:59:18 +0000 Subject: [PATCH] refactor: remove `SyncReader` trait and impl --- src/puffin/src/file_format/reader.rs | 13 -- src/puffin/src/file_format/reader/file.rs | 56 +------ src/puffin/src/file_format/reader/footer.rs | 28 +--- src/puffin/src/tests.rs | 156 +------------------- 4 files changed, 6 insertions(+), 247 deletions(-) diff --git a/src/puffin/src/file_format/reader.rs b/src/puffin/src/file_format/reader.rs index 779098567efe..162d7116a578 100644 --- a/src/puffin/src/file_format/reader.rs +++ b/src/puffin/src/file_format/reader.rs @@ -24,19 +24,6 @@ pub use crate::file_format::reader::file::PuffinFileReader; pub use crate::file_format::reader::footer::PuffinFileFooterReader; use crate::file_metadata::FileMetadata; -/// `SyncReader` defines a synchronous reader for puffin data. -pub trait SyncReader<'a> { - type Reader: std::io::Read + std::io::Seek; - - /// Fetches the FileMetadata. - fn metadata(&'a mut self) -> Result; - - /// Reads particular blob data based on given metadata. - /// - /// Data read from the reader is compressed leaving the caller to decompress the data. - fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result; -} - /// `AsyncReader` defines an asynchronous reader for puffin data. #[async_trait] pub trait AsyncReader<'a> { diff --git a/src/puffin/src/file_format/reader/file.rs b/src/puffin/src/file_format/reader/file.rs index 3736ed5d2d8d..3fba48bdb32c 100644 --- a/src/puffin/src/file_format/reader/file.rs +++ b/src/puffin/src/file_format/reader/file.rs @@ -12,19 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::{self, SeekFrom}; - use async_trait::async_trait; use common_base::range_read::RangeReader; use snafu::{ensure, ResultExt}; use crate::blob_metadata::BlobMetadata; -use crate::error::{ - MagicNotMatchedSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedPuffinFileSizeSnafu, - UnsupportedDecompressionSnafu, -}; +use crate::error::{MagicNotMatchedSnafu, ReadSnafu, Result, UnexpectedPuffinFileSizeSnafu}; use crate::file_format::reader::footer::FooterParser; -use crate::file_format::reader::{AsyncReader, SyncReader}; +use crate::file_format::reader::AsyncReader; use crate::file_format::{MAGIC, MAGIC_SIZE, MIN_FILE_SIZE}; use crate::file_metadata::FileMetadata; use crate::partial_reader::PartialReader; @@ -72,45 +67,6 @@ impl PuffinFileReader { } } -impl<'a, R: io::Read + io::Seek + 'a> SyncReader<'a> for PuffinFileReader { - type Reader = PartialReader<&'a mut R>; - - fn metadata(&mut self) -> Result { - if let Some(metadata) = &self.metadata { - return Ok(metadata.clone()); - } - - // check the magic - let mut magic = [0; MAGIC_SIZE as usize]; - self.source.read_exact(&mut magic).context(ReadSnafu)?; - ensure!(magic == MAGIC, MagicNotMatchedSnafu); - - let file_size = self.get_file_size_sync()?; - - // parse the footer - let metadata = FooterParser::new(&mut self.source, file_size).parse_sync()?; - self.metadata = Some(metadata.clone()); - Ok(metadata) - } - - fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result { - // TODO(zhongzc): support decompression - let compression = blob_metadata.compression_codec.as_ref(); - ensure!( - compression.is_none(), - UnsupportedDecompressionSnafu { - decompression: compression.unwrap().to_string() - } - ); - - Ok(PartialReader::new( - &mut self.source, - blob_metadata.offset as _, - blob_metadata.length as _, - )) - } -} - #[async_trait] impl<'a, R: RangeReader + 'a> AsyncReader<'a> for PuffinFileReader { type Reader = PartialReader<&'a mut R>; @@ -143,14 +99,6 @@ impl<'a, R: RangeReader + 'a> AsyncReader<'a> for PuffinFileReader { } } -impl PuffinFileReader { - fn get_file_size_sync(&mut self) -> Result { - let file_size = self.source.seek(SeekFrom::End(0)).context(SeekSnafu)?; - Self::validate_file_size(file_size)?; - Ok(file_size) - } -} - impl PuffinFileReader { async fn get_file_size_async(&mut self) -> Result { let file_size = self diff --git a/src/puffin/src/file_format/reader/footer.rs b/src/puffin/src/file_format/reader/footer.rs index 63c1cd34dd14..95d33251e30f 100644 --- a/src/puffin/src/file_format/reader/footer.rs +++ b/src/puffin/src/file_format/reader/footer.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::{self, Cursor, SeekFrom}; +use std::io::Cursor; use common_base::range_read::RangeReader; use snafu::{ensure, ResultExt}; @@ -20,7 +20,7 @@ use snafu::{ensure, ResultExt}; use crate::error::{ BytesToIntegerSnafu, DeserializeJsonSnafu, InvalidBlobAreaEndSnafu, InvalidBlobOffsetSnafu, InvalidPuffinFooterSnafu, Lz4DecompressionSnafu, MagicNotMatchedSnafu, ParseStageNotMatchSnafu, - ReadSnafu, Result, SeekSnafu, UnexpectedFooterPayloadSizeSnafu, + ReadSnafu, Result, UnexpectedFooterPayloadSizeSnafu, }; use crate::file_format::{Flags, FLAGS_SIZE, MAGIC, MAGIC_SIZE, MIN_FILE_SIZE, PAYLOAD_SIZE_SIZE}; use crate::file_metadata::FileMetadata; @@ -48,30 +48,6 @@ impl FooterParser { } } -impl FooterParser { - /// Parses the footer from the IO source in a synchronous manner. - pub fn parse_sync(&mut self) -> Result { - let mut parser = StageParser::new(self.file_size); - - let mut buf = vec![]; - while let Some(byte_to_read) = parser.next_to_read() { - self.source - .seek(SeekFrom::Start(byte_to_read.offset)) - .context(SeekSnafu)?; - let size = byte_to_read.size as usize; - - buf.resize(size, 0); - let buf = &mut buf[..size]; - - self.source.read_exact(buf).context(ReadSnafu)?; - - parser.consume_bytes(buf)?; - } - - parser.finish() - } -} - impl FooterParser { /// Parses the footer from the IO source in a asynchronous manner. pub async fn parse_async(&mut self) -> Result { diff --git a/src/puffin/src/tests.rs b/src/puffin/src/tests.rs index 279371b74f30..a3bb48587924 100644 --- a/src/puffin/src/tests.rs +++ b/src/puffin/src/tests.rs @@ -13,30 +13,15 @@ // limitations under the License. use std::collections::HashMap; -use std::fs::File; -use std::io::{Cursor, Read}; use std::vec; use common_base::range_read::{FileReader, RangeReader}; use futures::io::Cursor as AsyncCursor; -use crate::file_format::reader::{ - AsyncReader, PuffinFileFooterReader, PuffinFileReader, SyncReader, -}; -use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter, SyncWriter}; +use crate::file_format::reader::{AsyncReader, PuffinFileFooterReader, PuffinFileReader}; +use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter}; use crate::file_metadata::FileMetadata; -#[test] -fn test_read_empty_puffin_sync() { - let path = "src/tests/resources/empty-puffin-uncompressed.puffin"; - - let file = File::open(path).unwrap(); - let mut reader = PuffinFileReader::new(file); - let metadata = reader.metadata().unwrap(); - assert_eq!(metadata.properties.len(), 0); - assert_eq!(metadata.blobs.len(), 0); -} - #[tokio::test] async fn test_read_empty_puffin_async() { let path = "src/tests/resources/empty-puffin-uncompressed.puffin"; @@ -81,41 +66,6 @@ async fn test_read_puffin_file_metadata_async() { } } -#[test] -fn test_sample_metric_data_puffin_sync() { - let path = "src/tests/resources/sample-metric-data-uncompressed.puffin"; - - let file = File::open(path).unwrap(); - let mut reader = PuffinFileReader::new(file); - let metadata = reader.metadata().unwrap(); - - assert_eq!(metadata.properties.len(), 1); - assert_eq!( - metadata.properties.get("created-by"), - Some(&"Test 1234".to_string()) - ); - - assert_eq!(metadata.blobs.len(), 2); - assert_eq!(metadata.blobs[0].blob_type, "some-blob"); - assert_eq!(metadata.blobs[0].offset, 4); - assert_eq!(metadata.blobs[0].length, 9); - - assert_eq!(metadata.blobs[1].blob_type, "some-other-blob"); - assert_eq!(metadata.blobs[1].offset, 13); - assert_eq!(metadata.blobs[1].length, 83); - - let mut some_blob = reader.blob_reader(&metadata.blobs[0]).unwrap(); - let mut buf = String::new(); - some_blob.read_to_string(&mut buf).unwrap(); - assert_eq!(buf, "abcdefghi"); - - let mut some_other_blob = reader.blob_reader(&metadata.blobs[1]).unwrap(); - let mut buf = Vec::new(); - some_other_blob.read_to_end(&mut buf).unwrap(); - let expected = include_bytes!("tests/resources/sample-metric-data.blob"); - assert_eq!(buf, expected); -} - #[tokio::test] async fn test_sample_metric_data_puffin_async() { let path = "src/tests/resources/sample-metric-data-uncompressed.puffin"; @@ -149,38 +99,6 @@ async fn test_sample_metric_data_puffin_async() { assert_eq!(buf, expected); } -#[test] -fn test_writer_reader_with_empty_sync() { - fn test_writer_reader_with_empty_sync(footer_compressed: bool) { - let mut buf = Cursor::new(vec![]); - - let mut writer = PuffinFileWriter::new(&mut buf); - writer.set_properties(HashMap::from([( - "created-by".to_string(), - "Test 1234".to_string(), - )])); - - writer.set_footer_lz4_compressed(footer_compressed); - let written_bytes = writer.finish().unwrap(); - assert!(written_bytes > 0); - - let mut buf = Cursor::new(buf.into_inner()); - let mut reader = PuffinFileReader::new(&mut buf); - let metadata = reader.metadata().unwrap(); - - assert_eq!(metadata.properties.len(), 1); - assert_eq!( - metadata.properties.get("created-by"), - Some(&"Test 1234".to_string()) - ); - - assert_eq!(metadata.blobs.len(), 0); - } - - test_writer_reader_with_empty_sync(false); - test_writer_reader_with_empty_sync(true); -} - #[tokio::test] async fn test_writer_reader_empty_async() { async fn test_writer_reader_empty_async(footer_compressed: bool) { @@ -212,76 +130,6 @@ async fn test_writer_reader_empty_async() { test_writer_reader_empty_async(true).await; } -#[test] -fn test_writer_reader_sync() { - fn test_writer_reader_sync(footer_compressed: bool) { - let mut buf = Cursor::new(vec![]); - - let mut writer = PuffinFileWriter::new(&mut buf); - - let blob1 = "abcdefghi"; - writer - .add_blob(Blob { - compressed_data: Cursor::new(&blob1), - blob_type: "some-blob".to_string(), - properties: Default::default(), - compression_codec: None, - }) - .unwrap(); - - let blob2 = include_bytes!("tests/resources/sample-metric-data.blob"); - writer - .add_blob(Blob { - compressed_data: Cursor::new(&blob2), - blob_type: "some-other-blob".to_string(), - properties: Default::default(), - compression_codec: None, - }) - .unwrap(); - - writer.set_properties(HashMap::from([( - "created-by".to_string(), - "Test 1234".to_string(), - )])); - - writer.set_footer_lz4_compressed(footer_compressed); - let written_bytes = writer.finish().unwrap(); - assert!(written_bytes > 0); - - let mut buf = Cursor::new(buf.into_inner()); - let mut reader = PuffinFileReader::new(&mut buf); - let metadata = reader.metadata().unwrap(); - - assert_eq!(metadata.properties.len(), 1); - assert_eq!( - metadata.properties.get("created-by"), - Some(&"Test 1234".to_string()) - ); - - assert_eq!(metadata.blobs.len(), 2); - assert_eq!(metadata.blobs[0].blob_type, "some-blob"); - assert_eq!(metadata.blobs[0].offset, 4); - assert_eq!(metadata.blobs[0].length, 9); - - assert_eq!(metadata.blobs[1].blob_type, "some-other-blob"); - assert_eq!(metadata.blobs[1].offset, 13); - assert_eq!(metadata.blobs[1].length, 83); - - let mut some_blob = reader.blob_reader(&metadata.blobs[0]).unwrap(); - let mut buf = String::new(); - some_blob.read_to_string(&mut buf).unwrap(); - assert_eq!(buf, blob1); - - let mut some_other_blob = reader.blob_reader(&metadata.blobs[1]).unwrap(); - let mut buf = Vec::new(); - some_other_blob.read_to_end(&mut buf).unwrap(); - assert_eq!(buf, blob2); - } - - test_writer_reader_sync(false); - test_writer_reader_sync(true); -} - #[tokio::test] async fn test_writer_reader_async() { async fn test_writer_reader_async(footer_compressed: bool) {