Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: read parquet metadata #3199

Merged
merged 7 commits into from
Jan 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

mod format;
pub(crate) mod helper;
mod metadata;
mod page_reader;
pub mod reader;
pub mod row_group;
Expand Down
158 changes: 158 additions & 0 deletions src/mito2/src/sst/parquet/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use object_store::ObjectStore;
use parquet::file::footer::{decode_footer, decode_metadata};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::FOOTER_SIZE;
use snafu::ResultExt;

use crate::error::{self, Result};

/// The estimated size of the footer and metadata need to read from the end of parquet file.
const DEFAULT_PREFETCH_SIZE: u64 = 64 * 1024;

/// Load the metadata of parquet file in an async way.
pub(crate) struct MetadataLoader<'a> {
// An object store that supports async read
object_store: ObjectStore,
// The path of parquet file
file_path: &'a str,
// The size of parquet file
file_size: u64,
}

impl<'a> MetadataLoader<'a> {
/// Create a new parquet metadata loader.
pub fn new(object_store: ObjectStore, file_path: &'a str, file_size: u64) -> MetadataLoader {
Self {
object_store,
file_path,
file_size,
}
}

/// Async load the metadata of parquet file.
///
/// Read [DEFAULT_PREFETCH_SIZE] from the end of parquet file at first, if File Metadata is in the
/// read range, decode it and return [ParquetMetaData], otherwise, read again to get the rest of the metadata.
///
/// Parquet File Format:
/// ```text
/// ┌───────────────────────────────────┐
/// |4-byte magic number "PAR1" |
/// |───────────────────────────────────|
/// |Column 1 Chunk 1 + Column Metadata |
/// |Column 2 Chunk 1 + Column Metadata |
/// |... |
/// |Column N Chunk M + Column Metadata |
/// |───────────────────────────────────|
/// |File Metadata |
/// |───────────────────────────────────|
/// |4-byte length of file metadata |
/// |4-byte magic number "PAR1" |
/// └───────────────────────────────────┘
/// ```
///
/// Refer to https://github.com/apache/arrow-rs/blob/093a10e46203be1a0e94ae117854701bf58d4c79/parquet/src/arrow/async_reader/metadata.rs#L55-L106
pub async fn load(&self) -> Result<ParquetMetaData> {
let object_store = &self.object_store;
let path = self.file_path;
let file_size = self.get_file_size().await?;

if file_size < FOOTER_SIZE as u64 {
return error::InvalidParquetSnafu {
file: path,
reason: "file size is smaller than footer size",
}
.fail();
}

// Prefetch bytes for metadata from the end and process the footer
let buffer_start = file_size.saturating_sub(DEFAULT_PREFETCH_SIZE);
let buffer = object_store
.read_with(path)
.range(buffer_start..file_size)
.await
.context(error::OpenDalSnafu)?;
let buffer_len = buffer.len();

let mut footer = [0; 8];
footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);

let metadata_len = decode_footer(&footer).map_err(|e| {
error::InvalidParquetSnafu {
file: path,
reason: format!("failed to decode footer, {e}"),
}
.build()
})? as u64;

if file_size - (FOOTER_SIZE as u64) < metadata_len {
return error::InvalidParquetSnafu {
file: path,
reason: format!(
"the sum of Metadata length {} and Footer size {} is larger than file size {}",
metadata_len, FOOTER_SIZE, file_size
),
}
.fail();
}

if (metadata_len as usize) <= buffer_len - FOOTER_SIZE {
// The whole metadata is in the first read
let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE;
let metadata = decode_metadata(&buffer[metadata_start..buffer_len - FOOTER_SIZE])
.map_err(|e| {
error::InvalidParquetSnafu {
file: path,
reason: format!("failed to decode metadata, {e}"),
}
.build()
})?;
Ok(metadata)
} else {
// The metadata is out of buffer, need to make a second read
let metadata_start = file_size - metadata_len - FOOTER_SIZE as u64;
let data = object_store
.read_with(path)
.range(metadata_start..(file_size - FOOTER_SIZE as u64))
.await
.context(error::OpenDalSnafu)?;

let metadata = decode_metadata(&data).map_err(|e| {
error::InvalidParquetSnafu {
file: path,
reason: format!("failed to decode metadata, {e}"),
}
.build()
})?;
Ok(metadata)
}
}

/// Get the size of parquet file.
async fn get_file_size(&self) -> Result<u64> {
let file_size = match self.file_size {
0 => self
.object_store
.stat(self.file_path)
.await
.context(error::OpenDalSnafu)?
.content_length(),
other => other,
};
Ok(file_size)
}
}
28 changes: 11 additions & 17 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,18 @@ use datafusion_common::arrow::buffer::BooleanBuffer;
use datatypes::arrow::record_batch::RecordBatch;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
use parquet::file::metadata::ParquetMetaData;
use parquet::format::KeyValue;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use tokio::io::BufReader;

use crate::cache::CacheManagerRef;
use crate::error::{
ArrowReaderSnafu, FieldTypeMismatchSnafu, FilterRecordBatchSnafu, InvalidMetadataSnafu,
InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu, Result,
InvalidParquetSnafu, ReadParquetSnafu, Result,
};
use crate::metrics::{
PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
Expand All @@ -52,6 +50,7 @@ use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::file::FileHandle;
use crate::sst::index::applier::SstIndexApplierRef;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::row_group::InMemoryRowGroup;
use crate::sst::parquet::stats::RowGroupPruningStats;
use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
Expand Down Expand Up @@ -136,15 +135,9 @@ impl ParquetReaderBuilder {
let start = Instant::now();

let file_path = self.file_handle.file_path(&self.file_dir);
// Now we create a reader to read the whole file.
let reader = self
.object_store
.reader(&file_path)
.await
.context(OpenDalSnafu)?;
let mut reader = BufReader::new(reader);
let file_size = self.file_handle.meta().file_size;
// Loads parquet metadata of the file.
let parquet_meta = self.read_parquet_metadata(&mut reader, &file_path).await?;
let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
// Decodes region metadata.
let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
let region_meta = Self::get_region_metadata(&file_path, key_value_meta)?;
Expand Down Expand Up @@ -247,8 +240,8 @@ impl ParquetReaderBuilder {
/// Reads parquet metadata of specific file.
async fn read_parquet_metadata(
&self,
reader: &mut impl AsyncFileReader,
file_path: &str,
file_size: u64,
) -> Result<Arc<ParquetMetaData>> {
// Tries to get from global cache.
if let Some(metadata) = self.cache_manager.as_ref().and_then(|cache| {
Expand All @@ -257,11 +250,12 @@ impl ParquetReaderBuilder {
return Ok(metadata);
}

// Cache miss, get from the reader.
let metadata = reader
.get_metadata()
.await
.context(ReadParquetSnafu { path: file_path })?;
// TODO(QuenKar): should also check write cache to get parquet metadata.

// Cache miss, load metadata directly.
let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
let metadata = metadata_loader.load().await?;
let metadata = Arc::new(metadata);
// Cache the metadata.
if let Some(cache) = &self.cache_manager {
cache.put_parquet_meta_data(
Expand Down
Loading