From 0cdfc54a4cbbf7f1fe74b454e0ea53236e053644 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Thu, 18 Jan 2024 15:51:52 +0800 Subject: [PATCH 1/7] feat: MetadataLoader --- src/mito2/src/sst/file.rs | 4 + src/mito2/src/sst/parquet.rs | 1 + src/mito2/src/sst/parquet/metadata.rs | 150 ++++++++++++++++++++++++++ src/mito2/src/sst/parquet/reader.rs | 25 ++--- 4 files changed, 164 insertions(+), 16 deletions(-) create mode 100644 src/mito2/src/sst/parquet/metadata.rs diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index f86faa81ef78..930a91f26ff2 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -178,6 +178,10 @@ impl FileHandle { pub fn meta(&self) -> FileMeta { self.inner.meta.clone() } + + pub fn file_size(&self) -> u64 { + self.inner.meta.file_size + } } /// Inner data of [FileHandle]. diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index d0334690cc1c..8c88aa219acd 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -16,6 +16,7 @@ mod format; pub(crate) mod helper; +mod metadata; mod page_reader; pub mod reader; pub mod row_group; diff --git a/src/mito2/src/sst/parquet/metadata.rs b/src/mito2/src/sst/parquet/metadata.rs new file mode 100644 index 000000000000..96ef33d8b00a --- /dev/null +++ b/src/mito2/src/sst/parquet/metadata.rs @@ -0,0 +1,150 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use object_store::ObjectStore; +use parquet::file::footer::{decode_footer, decode_metadata}; +use parquet::file::metadata::ParquetMetaData; +use parquet::file::FOOTER_SIZE; +use snafu::ResultExt; + +use crate::error::{self, Result}; + +/// The estimated size of the footer and metadata need to read from the end of parquet file. +const DEFAULT_PREFETCH_SIZE: u64 = 64 * 1024; + +pub(crate) struct MetadataLoader<'a> { + object_store: ObjectStore, + + file_path: &'a str, + + file_size: Option, +} + +impl<'a> MetadataLoader<'a> { + pub fn new( + object_store: ObjectStore, + file_path: &'a str, + file_size: Option, + ) -> MetadataLoader { + Self { + object_store, + file_path, + file_size, + } + } + + /// Load the metadata of parquet file. + /// + /// Read [DEFAULT_PREFETCH_SIZE] from the end of parquet file at first, if File Metadata is in the + /// read range, decode it and return [ParquetMetaData], otherwise, read again to get the rest of the metadata. + /// + /// Parquet File Format: + /// ```text + /// ┌───────────────────────────────────┐ + /// |4-byte magic number "PAR1" | + /// |───────────────────────────────────| + /// |Column 1 Chunk 1 + Column Metadata | + /// |Column 2 Chunk 1 + Column Metadata | + /// |... | + /// |Column N Chunk M + Column Metadata | + /// |───────────────────────────────────| + /// |File Metadata | + /// |───────────────────────────────────| + /// |4-byte length of file metadata | + /// |4-byte magic number "PAR1" | + /// └───────────────────────────────────┘ + /// ``` + /// + pub async fn load(&self) -> Result { + let object_store = &self.object_store; + let path = self.file_path; + let file_size = match self.file_size { + Some(n) => n, + None => object_store + .stat(path) + .await + .context(error::OpenDalSnafu)? + .content_length(), + }; + + if file_size < FOOTER_SIZE as u64 { + return error::InvalidParquetSnafu { + file: path.to_string(), + reason: "file size is smaller than footer size".to_string(), + } + .fail(); + } + + // Prefetch bytes for metadata from the end and process the footer + let prefetch_size = DEFAULT_PREFETCH_SIZE.min(file_size); + let buffer = object_store + .read_with(path) + .range((file_size - prefetch_size)..file_size) + .await + .context(error::OpenDalSnafu)?; + let buffer_len = buffer.len(); + + let mut footer = [0; 8]; + footer.copy_from_slice(&buffer[(buffer_len - FOOTER_SIZE as usize)..]); + let metadata_len = decode_footer(&footer).map_err(|_| { + error::InvalidParquetSnafu { + file: path.to_string(), + reason: "failed to decode footer".to_string(), + } + .build() + })? as u64; + + if metadata_len + FOOTER_SIZE as u64 > file_size { + return error::InvalidParquetSnafu { + file: path.to_string(), + reason: format!( + "the sum of Metadata length {} and Footer size {} is larger than file size {}", + metadata_len, FOOTER_SIZE, file_size + ), + } + .fail(); + } + + let footer_len = metadata_len + FOOTER_SIZE as u64; + if (footer_len as usize) <= buffer_len { + // The whole metadata is in the first read + let offset = buffer_len - footer_len as usize; + let metadata = decode_metadata(&buffer[offset..]).map_err(|_| { + error::InvalidParquetSnafu { + file: path.to_string(), + reason: "failed to decode metadata".to_string(), + } + .build() + })?; + Ok(metadata) + } else { + // The metadata is out of buffer, need to read the rest + let mut data = object_store + .read_with(path) + .range((file_size - footer_len)..(file_size - buffer_len as u64)) + .await + .context(error::OpenDalSnafu)?; + data.extend(buffer); + + let metadata = decode_metadata(&data).map_err(|_| { + error::InvalidParquetSnafu { + file: path.to_string(), + reason: "failed to decode metadata".to_string(), + } + .build() + })?; + Ok(metadata) + } + } +} diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index af1fcdc68a1e..0d31d30c3bbf 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -29,7 +29,6 @@ use datafusion_common::arrow::buffer::BooleanBuffer; use datatypes::arrow::record_batch::RecordBatch; use object_store::ObjectStore; use parquet::arrow::arrow_reader::ParquetRecordBatchReader; -use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask}; use parquet::file::metadata::ParquetMetaData; use parquet::format::KeyValue; @@ -37,7 +36,6 @@ use snafu::{OptionExt, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; use table::predicate::Predicate; -use tokio::io::BufReader; use crate::cache::CacheManagerRef; use crate::error::{ @@ -52,6 +50,7 @@ use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; use crate::sst::file::FileHandle; use crate::sst::index::applier::SstIndexApplierRef; use crate::sst::parquet::format::ReadFormat; +use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::row_group::InMemoryRowGroup; use crate::sst::parquet::stats::RowGroupPruningStats; use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY}; @@ -136,15 +135,9 @@ impl ParquetReaderBuilder { let start = Instant::now(); let file_path = self.file_handle.file_path(&self.file_dir); - // Now we create a reader to read the whole file. - let reader = self - .object_store - .reader(&file_path) - .await - .context(OpenDalSnafu)?; - let mut reader = BufReader::new(reader); + let file_size = self.file_handle.file_size(); // Loads parquet metadata of the file. - let parquet_meta = self.read_parquet_metadata(&mut reader, &file_path).await?; + let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?; // Decodes region metadata. let key_value_meta = parquet_meta.file_metadata().key_value_metadata(); let region_meta = Self::get_region_metadata(&file_path, key_value_meta)?; @@ -247,8 +240,8 @@ impl ParquetReaderBuilder { /// Reads parquet metadata of specific file. async fn read_parquet_metadata( &self, - reader: &mut impl AsyncFileReader, file_path: &str, + file_size: u64, ) -> Result> { // Tries to get from global cache. if let Some(metadata) = self.cache_manager.as_ref().and_then(|cache| { @@ -257,11 +250,11 @@ impl ParquetReaderBuilder { return Ok(metadata); } - // Cache miss, get from the reader. - let metadata = reader - .get_metadata() - .await - .context(ReadParquetSnafu { path: file_path })?; + // Cache miss, read directly. + let metadata_loader = + MetadataLoader::new(self.object_store.clone(), file_path, Some(file_size)); + let metadata = metadata_loader.load().await?; + let metadata = Arc::new(metadata); // Cache the metadata. if let Some(cache) = &self.cache_manager { cache.put_parquet_meta_data( From c679cbf036b4f480efc1e747fed7a690aab6b719 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Fri, 19 Jan 2024 11:58:44 +0800 Subject: [PATCH 2/7] refactor code --- src/mito2/src/sst/file.rs | 4 --- src/mito2/src/sst/parquet/metadata.rs | 46 ++++++++++++++++----------- src/mito2/src/sst/parquet/reader.rs | 5 ++- 3 files changed, 29 insertions(+), 26 deletions(-) diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 930a91f26ff2..f86faa81ef78 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -178,10 +178,6 @@ impl FileHandle { pub fn meta(&self) -> FileMeta { self.inner.meta.clone() } - - pub fn file_size(&self) -> u64 { - self.inner.meta.file_size - } } /// Inner data of [FileHandle]. diff --git a/src/mito2/src/sst/parquet/metadata.rs b/src/mito2/src/sst/parquet/metadata.rs index 96ef33d8b00a..8133d5968316 100644 --- a/src/mito2/src/sst/parquet/metadata.rs +++ b/src/mito2/src/sst/parquet/metadata.rs @@ -23,20 +23,19 @@ use crate::error::{self, Result}; /// The estimated size of the footer and metadata need to read from the end of parquet file. const DEFAULT_PREFETCH_SIZE: u64 = 64 * 1024; +/// Load the metadata of parquet file in an async way. pub(crate) struct MetadataLoader<'a> { + // An object store that supports async read object_store: ObjectStore, - + // The path of parquet file file_path: &'a str, - - file_size: Option, + // The size of parquet file + file_size: u64, } impl<'a> MetadataLoader<'a> { - pub fn new( - object_store: ObjectStore, - file_path: &'a str, - file_size: Option, - ) -> MetadataLoader { + /// Create a new parquet metadata loader. + pub fn new(object_store: ObjectStore, file_path: &'a str, file_size: u64) -> MetadataLoader { Self { object_store, file_path, @@ -44,7 +43,7 @@ impl<'a> MetadataLoader<'a> { } } - /// Load the metadata of parquet file. + /// Async load the metadata of parquet file. /// /// Read [DEFAULT_PREFETCH_SIZE] from the end of parquet file at first, if File Metadata is in the /// read range, decode it and return [ParquetMetaData], otherwise, read again to get the rest of the metadata. @@ -66,17 +65,11 @@ impl<'a> MetadataLoader<'a> { /// └───────────────────────────────────┘ /// ``` /// + /// Refer to https://github.com/apache/arrow-rs/blob/093a10e46203be1a0e94ae117854701bf58d4c79/parquet/src/arrow/async_reader/metadata.rs#L55-L106 pub async fn load(&self) -> Result { let object_store = &self.object_store; let path = self.file_path; - let file_size = match self.file_size { - Some(n) => n, - None => object_store - .stat(path) - .await - .context(error::OpenDalSnafu)? - .content_length(), - }; + let file_size = self.get_file_size().await?; if file_size < FOOTER_SIZE as u64 { return error::InvalidParquetSnafu { @@ -96,7 +89,8 @@ impl<'a> MetadataLoader<'a> { let buffer_len = buffer.len(); let mut footer = [0; 8]; - footer.copy_from_slice(&buffer[(buffer_len - FOOTER_SIZE as usize)..]); + footer.copy_from_slice(&buffer[(buffer_len - FOOTER_SIZE)..]); + let metadata_len = decode_footer(&footer).map_err(|_| { error::InvalidParquetSnafu { file: path.to_string(), @@ -129,7 +123,7 @@ impl<'a> MetadataLoader<'a> { })?; Ok(metadata) } else { - // The metadata is out of buffer, need to read the rest + // The metadata is out of buffer, need to make a second read let mut data = object_store .read_with(path) .range((file_size - footer_len)..(file_size - buffer_len as u64)) @@ -147,4 +141,18 @@ impl<'a> MetadataLoader<'a> { Ok(metadata) } } + + /// Get the size of parquet file. + async fn get_file_size(&self) -> Result { + let file_size = match self.file_size { + 0 => self + .object_store + .stat(self.file_path) + .await + .context(error::OpenDalSnafu)? + .content_length(), + n => n, + }; + Ok(file_size) + } } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 0d31d30c3bbf..1902caa626e7 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -135,7 +135,7 @@ impl ParquetReaderBuilder { let start = Instant::now(); let file_path = self.file_handle.file_path(&self.file_dir); - let file_size = self.file_handle.file_size(); + let file_size = self.file_handle.meta().file_size; // Loads parquet metadata of the file. let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?; // Decodes region metadata. @@ -251,8 +251,7 @@ impl ParquetReaderBuilder { } // Cache miss, read directly. - let metadata_loader = - MetadataLoader::new(self.object_store.clone(), file_path, Some(file_size)); + let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size); let metadata = metadata_loader.load().await?; let metadata = Arc::new(metadata); // Cache the metadata. From c1663a403e73c4b4f505db70ef85cb7d09fc4eef Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Fri, 19 Jan 2024 14:08:52 +0800 Subject: [PATCH 3/7] chore: clippy --- src/mito2/src/sst/parquet/metadata.rs | 2 +- src/mito2/src/sst/parquet/reader.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/sst/parquet/metadata.rs b/src/mito2/src/sst/parquet/metadata.rs index 8133d5968316..195acb7a09bf 100644 --- a/src/mito2/src/sst/parquet/metadata.rs +++ b/src/mito2/src/sst/parquet/metadata.rs @@ -151,7 +151,7 @@ impl<'a> MetadataLoader<'a> { .await .context(error::OpenDalSnafu)? .content_length(), - n => n, + other => other, }; Ok(file_size) } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 1902caa626e7..77cbf18b559f 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -40,7 +40,7 @@ use table::predicate::Predicate; use crate::cache::CacheManagerRef; use crate::error::{ ArrowReaderSnafu, FieldTypeMismatchSnafu, FilterRecordBatchSnafu, InvalidMetadataSnafu, - InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu, Result, + InvalidParquetSnafu, ReadParquetSnafu, Result, }; use crate::metrics::{ PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED, @@ -250,7 +250,7 @@ impl ParquetReaderBuilder { return Ok(metadata); } - // Cache miss, read directly. + // Cache miss, load metadata directly. let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size); let metadata = metadata_loader.load().await?; let metadata = Arc::new(metadata); From 64e3d7f150eaaed9194938f325db1eb9b0fffb0b Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Fri, 19 Jan 2024 16:32:28 +0800 Subject: [PATCH 4/7] chore: cr comment --- src/mito2/src/sst/parquet/metadata.rs | 50 +++++++++++++-------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/src/mito2/src/sst/parquet/metadata.rs b/src/mito2/src/sst/parquet/metadata.rs index 195acb7a09bf..2ca75771de72 100644 --- a/src/mito2/src/sst/parquet/metadata.rs +++ b/src/mito2/src/sst/parquet/metadata.rs @@ -73,35 +73,35 @@ impl<'a> MetadataLoader<'a> { if file_size < FOOTER_SIZE as u64 { return error::InvalidParquetSnafu { - file: path.to_string(), - reason: "file size is smaller than footer size".to_string(), + file: path, + reason: "file size is smaller than footer size", } .fail(); } // Prefetch bytes for metadata from the end and process the footer - let prefetch_size = DEFAULT_PREFETCH_SIZE.min(file_size); + let buffer_start = file_size.saturating_sub(DEFAULT_PREFETCH_SIZE); let buffer = object_store .read_with(path) - .range((file_size - prefetch_size)..file_size) + .range(buffer_start..file_size) .await .context(error::OpenDalSnafu)?; let buffer_len = buffer.len(); let mut footer = [0; 8]; - footer.copy_from_slice(&buffer[(buffer_len - FOOTER_SIZE)..]); + footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]); - let metadata_len = decode_footer(&footer).map_err(|_| { + let metadata_len = decode_footer(&footer).map_err(|e| { error::InvalidParquetSnafu { - file: path.to_string(), - reason: "failed to decode footer".to_string(), + file: path, + reason: format!("failed to decode footer, {e}"), } .build() })? as u64; if metadata_len + FOOTER_SIZE as u64 > file_size { return error::InvalidParquetSnafu { - file: path.to_string(), + file: path, reason: format!( "the sum of Metadata length {} and Footer size {} is larger than file size {}", metadata_len, FOOTER_SIZE, file_size @@ -110,31 +110,31 @@ impl<'a> MetadataLoader<'a> { .fail(); } - let footer_len = metadata_len + FOOTER_SIZE as u64; - if (footer_len as usize) <= buffer_len { + if (metadata_len as usize) <= buffer_len - FOOTER_SIZE { // The whole metadata is in the first read - let offset = buffer_len - footer_len as usize; - let metadata = decode_metadata(&buffer[offset..]).map_err(|_| { - error::InvalidParquetSnafu { - file: path.to_string(), - reason: "failed to decode metadata".to_string(), - } - .build() - })?; + let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE; + let metadata = decode_metadata(&buffer[metadata_start..buffer_len - FOOTER_SIZE]) + .map_err(|e| { + error::InvalidParquetSnafu { + file: path, + reason: format!("failed to decode metadata, {e}"), + } + .build() + })?; Ok(metadata) } else { // The metadata is out of buffer, need to make a second read - let mut data = object_store + let metadata_start = file_size - metadata_len - FOOTER_SIZE as u64; + let data = object_store .read_with(path) - .range((file_size - footer_len)..(file_size - buffer_len as u64)) + .range(metadata_start..(file_size - FOOTER_SIZE as u64)) .await .context(error::OpenDalSnafu)?; - data.extend(buffer); - let metadata = decode_metadata(&data).map_err(|_| { + let metadata = decode_metadata(&data).map_err(|e| { error::InvalidParquetSnafu { - file: path.to_string(), - reason: "failed to decode metadata".to_string(), + file: path, + reason: format!("failed to decode metadata, {e}"), } .build() })?; From 0284ca42ace61c9ad8eebb1675e46880f2721454 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Fri, 19 Jan 2024 17:47:19 +0800 Subject: [PATCH 5/7] chore: add TODO --- src/mito2/src/sst/parquet/reader.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 77cbf18b559f..5edc04cdc60e 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -250,6 +250,8 @@ impl ParquetReaderBuilder { return Ok(metadata); } + // TODO(QuenKar): should also check write cache to get parquet metadata. + // Cache miss, load metadata directly. let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size); let metadata = metadata_loader.load().await?; From 303d4aa5e823416dcd1e2aea7643df622f950f67 Mon Sep 17 00:00:00 2001 From: Wei <15172118655@163.com> Date: Fri, 19 Jan 2024 18:38:00 +0800 Subject: [PATCH 6/7] chore: cr comment Co-authored-by: Zhenchi --- src/mito2/src/sst/parquet/metadata.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/sst/parquet/metadata.rs b/src/mito2/src/sst/parquet/metadata.rs index 2ca75771de72..e88680905bf3 100644 --- a/src/mito2/src/sst/parquet/metadata.rs +++ b/src/mito2/src/sst/parquet/metadata.rs @@ -99,7 +99,7 @@ impl<'a> MetadataLoader<'a> { .build() })? as u64; - if metadata_len + FOOTER_SIZE as u64 > file_size { + if file_size - FOOTER_SIZE as u64 < metadata_len { return error::InvalidParquetSnafu { file: path, reason: format!( From a8c347b18c36a312d9670fc8f66fda55839b19c4 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Fri, 19 Jan 2024 18:40:07 +0800 Subject: [PATCH 7/7] chore: clippy --- src/mito2/src/sst/parquet/metadata.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/sst/parquet/metadata.rs b/src/mito2/src/sst/parquet/metadata.rs index e88680905bf3..e0db7b40b889 100644 --- a/src/mito2/src/sst/parquet/metadata.rs +++ b/src/mito2/src/sst/parquet/metadata.rs @@ -99,7 +99,7 @@ impl<'a> MetadataLoader<'a> { .build() })? as u64; - if file_size - FOOTER_SIZE as u64 < metadata_len { + if file_size - (FOOTER_SIZE as u64) < metadata_len { return error::InvalidParquetSnafu { file: path, reason: format!(