diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index d0334690cc1c..8c88aa219acd 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -16,6 +16,7 @@ mod format; pub(crate) mod helper; +mod metadata; mod page_reader; pub mod reader; pub mod row_group; diff --git a/src/mito2/src/sst/parquet/metadata.rs b/src/mito2/src/sst/parquet/metadata.rs new file mode 100644 index 000000000000..e0db7b40b889 --- /dev/null +++ b/src/mito2/src/sst/parquet/metadata.rs @@ -0,0 +1,158 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use object_store::ObjectStore; +use parquet::file::footer::{decode_footer, decode_metadata}; +use parquet::file::metadata::ParquetMetaData; +use parquet::file::FOOTER_SIZE; +use snafu::ResultExt; + +use crate::error::{self, Result}; + +/// The estimated size of the footer and metadata need to read from the end of parquet file. +const DEFAULT_PREFETCH_SIZE: u64 = 64 * 1024; + +/// Load the metadata of parquet file in an async way. +pub(crate) struct MetadataLoader<'a> { + // An object store that supports async read + object_store: ObjectStore, + // The path of parquet file + file_path: &'a str, + // The size of parquet file + file_size: u64, +} + +impl<'a> MetadataLoader<'a> { + /// Create a new parquet metadata loader. + pub fn new(object_store: ObjectStore, file_path: &'a str, file_size: u64) -> MetadataLoader { + Self { + object_store, + file_path, + file_size, + } + } + + /// Async load the metadata of parquet file. + /// + /// Read [DEFAULT_PREFETCH_SIZE] from the end of parquet file at first, if File Metadata is in the + /// read range, decode it and return [ParquetMetaData], otherwise, read again to get the rest of the metadata. + /// + /// Parquet File Format: + /// ```text + /// ┌───────────────────────────────────┐ + /// |4-byte magic number "PAR1" | + /// |───────────────────────────────────| + /// |Column 1 Chunk 1 + Column Metadata | + /// |Column 2 Chunk 1 + Column Metadata | + /// |... | + /// |Column N Chunk M + Column Metadata | + /// |───────────────────────────────────| + /// |File Metadata | + /// |───────────────────────────────────| + /// |4-byte length of file metadata | + /// |4-byte magic number "PAR1" | + /// └───────────────────────────────────┘ + /// ``` + /// + /// Refer to https://github.com/apache/arrow-rs/blob/093a10e46203be1a0e94ae117854701bf58d4c79/parquet/src/arrow/async_reader/metadata.rs#L55-L106 + pub async fn load(&self) -> Result { + let object_store = &self.object_store; + let path = self.file_path; + let file_size = self.get_file_size().await?; + + if file_size < FOOTER_SIZE as u64 { + return error::InvalidParquetSnafu { + file: path, + reason: "file size is smaller than footer size", + } + .fail(); + } + + // Prefetch bytes for metadata from the end and process the footer + let buffer_start = file_size.saturating_sub(DEFAULT_PREFETCH_SIZE); + let buffer = object_store + .read_with(path) + .range(buffer_start..file_size) + .await + .context(error::OpenDalSnafu)?; + let buffer_len = buffer.len(); + + let mut footer = [0; 8]; + footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]); + + let metadata_len = decode_footer(&footer).map_err(|e| { + error::InvalidParquetSnafu { + file: path, + reason: format!("failed to decode footer, {e}"), + } + .build() + })? as u64; + + if file_size - (FOOTER_SIZE as u64) < metadata_len { + return error::InvalidParquetSnafu { + file: path, + reason: format!( + "the sum of Metadata length {} and Footer size {} is larger than file size {}", + metadata_len, FOOTER_SIZE, file_size + ), + } + .fail(); + } + + if (metadata_len as usize) <= buffer_len - FOOTER_SIZE { + // The whole metadata is in the first read + let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE; + let metadata = decode_metadata(&buffer[metadata_start..buffer_len - FOOTER_SIZE]) + .map_err(|e| { + error::InvalidParquetSnafu { + file: path, + reason: format!("failed to decode metadata, {e}"), + } + .build() + })?; + Ok(metadata) + } else { + // The metadata is out of buffer, need to make a second read + let metadata_start = file_size - metadata_len - FOOTER_SIZE as u64; + let data = object_store + .read_with(path) + .range(metadata_start..(file_size - FOOTER_SIZE as u64)) + .await + .context(error::OpenDalSnafu)?; + + let metadata = decode_metadata(&data).map_err(|e| { + error::InvalidParquetSnafu { + file: path, + reason: format!("failed to decode metadata, {e}"), + } + .build() + })?; + Ok(metadata) + } + } + + /// Get the size of parquet file. + async fn get_file_size(&self) -> Result { + let file_size = match self.file_size { + 0 => self + .object_store + .stat(self.file_path) + .await + .context(error::OpenDalSnafu)? + .content_length(), + other => other, + }; + Ok(file_size) + } +} diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index af1fcdc68a1e..5edc04cdc60e 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -29,7 +29,6 @@ use datafusion_common::arrow::buffer::BooleanBuffer; use datatypes::arrow::record_batch::RecordBatch; use object_store::ObjectStore; use parquet::arrow::arrow_reader::ParquetRecordBatchReader; -use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask}; use parquet::file::metadata::ParquetMetaData; use parquet::format::KeyValue; @@ -37,12 +36,11 @@ use snafu::{OptionExt, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; use table::predicate::Predicate; -use tokio::io::BufReader; use crate::cache::CacheManagerRef; use crate::error::{ ArrowReaderSnafu, FieldTypeMismatchSnafu, FilterRecordBatchSnafu, InvalidMetadataSnafu, - InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu, Result, + InvalidParquetSnafu, ReadParquetSnafu, Result, }; use crate::metrics::{ PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED, @@ -52,6 +50,7 @@ use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; use crate::sst::file::FileHandle; use crate::sst::index::applier::SstIndexApplierRef; use crate::sst::parquet::format::ReadFormat; +use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::row_group::InMemoryRowGroup; use crate::sst::parquet::stats::RowGroupPruningStats; use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY}; @@ -136,15 +135,9 @@ impl ParquetReaderBuilder { let start = Instant::now(); let file_path = self.file_handle.file_path(&self.file_dir); - // Now we create a reader to read the whole file. - let reader = self - .object_store - .reader(&file_path) - .await - .context(OpenDalSnafu)?; - let mut reader = BufReader::new(reader); + let file_size = self.file_handle.meta().file_size; // Loads parquet metadata of the file. - let parquet_meta = self.read_parquet_metadata(&mut reader, &file_path).await?; + let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?; // Decodes region metadata. let key_value_meta = parquet_meta.file_metadata().key_value_metadata(); let region_meta = Self::get_region_metadata(&file_path, key_value_meta)?; @@ -247,8 +240,8 @@ impl ParquetReaderBuilder { /// Reads parquet metadata of specific file. async fn read_parquet_metadata( &self, - reader: &mut impl AsyncFileReader, file_path: &str, + file_size: u64, ) -> Result> { // Tries to get from global cache. if let Some(metadata) = self.cache_manager.as_ref().and_then(|cache| { @@ -257,11 +250,12 @@ impl ParquetReaderBuilder { return Ok(metadata); } - // Cache miss, get from the reader. - let metadata = reader - .get_metadata() - .await - .context(ReadParquetSnafu { path: file_path })?; + // TODO(QuenKar): should also check write cache to get parquet metadata. + + // Cache miss, load metadata directly. + let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size); + let metadata = metadata_loader.load().await?; + let metadata = Arc::new(metadata); // Cache the metadata. if let Some(cache) = &self.cache_manager { cache.put_parquet_meta_data(