Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: convert FileMetaData to ParquetMetaData #2980

Merged
merged 7 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,13 @@ pub enum Error {
error: ArrowError,
location: Location,
},

#[snafu(display("Invalid file metadata"))]
ConvertMetaData {
location: Location,
#[snafu(source)]
error: parquet::errors::ParquetError,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -459,6 +466,7 @@ impl ErrorExt for Error {
InvalidBatch { .. } => StatusCode::InvalidArguments,
InvalidRecordBatch { .. } => StatusCode::InvalidArguments,
ConvertVector { source, .. } => source.status_code(),
ConvertMetaData { .. } => StatusCode::Internal,
ComputeArrow { .. } => StatusCode::Internal,
ComputeVector { .. } => StatusCode::Internal,
PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments,
Expand Down
68 changes: 68 additions & 0 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
//! SST in parquet format.

mod format;
mod helper;
mod page_reader;
pub mod reader;
pub mod row_group;
mod stats;
pub mod writer;

use common_base::readable_size::ReadableSize;
use parquet::file::metadata::ParquetMetaData;

use crate::sst::file::FileTimeRange;

Expand Down Expand Up @@ -59,6 +61,8 @@ pub struct SstInfo {
pub file_size: u64,
/// Number of rows.
pub num_rows: usize,
/// File Meta Data
pub file_metadata: Option<ParquetMetaData>,
}

#[cfg(test)]
Expand Down Expand Up @@ -195,4 +199,68 @@ mod tests {
};
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none());
}

#[tokio::test]
async fn test_parquet_metadata_eq() {
// create test env
let mut env = crate::test_util::TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
]);
let write_opts = WriteOptions {
row_group_size: 50,
..Default::default()
};

// write the sst file and get sst info
// sst info contains the parquet metadata, which is converted from FileMetaData
let mut writer =
ParquetWriter::new(file_path, metadata.clone(), source, object_store.clone());
let sst_info = writer
.write_all(&write_opts)
.await
.unwrap()
.expect("write_all should return sst info");
let writer_metadata = sst_info.file_metadata.unwrap();

// read the sst file metadata
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
let reader = builder.build().await.unwrap();
let reader_metadata = reader.parquet_metadata();

// Because ParquetMetaData doesn't implement PartialEq,
// check all fields manually
macro_rules! assert_metadata {
( $writer:expr, $reader:expr, $($method:ident,)+ ) => {
$(
assert_eq!($writer.$method(), $reader.$method());
)+
}
}

assert_metadata!(
writer_metadata.file_metadata(),
reader_metadata.file_metadata(),
version,
num_rows,
created_by,
key_value_metadata,
schema_descr,
column_orders,
);

assert_metadata!(
writer_metadata,
reader_metadata,
row_groups,
column_index,
offset_index,
);
}
}
86 changes: 86 additions & 0 deletions src/mito2/src/sst/parquet/helper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use parquet::basic::ColumnOrder;
use parquet::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
use parquet::format;
use parquet::schema::types::{from_thrift, SchemaDescriptor};
use snafu::ResultExt;

use crate::error;
use crate::error::Result;

// Refer to https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L74-L90
/// Convert [format::FileMetaData] to [ParquetMetaData]
pub fn parse_parquet_metadata(t_file_metadata: format::FileMetaData) -> Result<ParquetMetaData> {
let schema = from_thrift(&t_file_metadata.schema).context(error::ConvertMetaDataSnafu)?;
let schema_desc_ptr = Arc::new(SchemaDescriptor::new(schema));

let mut row_groups = Vec::with_capacity(t_file_metadata.row_groups.len());
for rg in t_file_metadata.row_groups {
row_groups.push(
RowGroupMetaData::from_thrift(schema_desc_ptr.clone(), rg)
.context(error::ConvertMetaDataSnafu)?,
);
}
let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_desc_ptr);

let file_metadata = FileMetaData::new(
t_file_metadata.version,
t_file_metadata.num_rows,
t_file_metadata.created_by,
t_file_metadata.key_value_metadata,
schema_desc_ptr,
column_orders,
);
// There may be a problem owing to lacking of column_index and offset_index,
// if we open page index in the future.
Ok(ParquetMetaData::new(file_metadata, row_groups))
}

// Port from https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L106-L137
/// Parses column orders from Thrift definition.
/// If no column orders are defined, returns `None`.
fn parse_column_orders(
t_column_orders: Option<Vec<format::ColumnOrder>>,
schema_descr: &SchemaDescriptor,
) -> Option<Vec<ColumnOrder>> {
match t_column_orders {
Some(orders) => {
// Should always be the case
assert_eq!(
orders.len(),
schema_descr.num_columns(),
"Column order length mismatch"
);
let mut res = Vec::with_capacity(schema_descr.num_columns());
for (i, column) in schema_descr.columns().iter().enumerate() {
match orders[i] {
format::ColumnOrder::TYPEORDER(_) => {
let sort_order = ColumnOrder::get_sort_order(
column.logical_type(),
column.converted_type(),
column.physical_type(),
);
res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
}
}
}
Some(res)
}
None => None,
}
}
5 changes: 5 additions & 0 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,4 +450,9 @@ impl ParquetReader {

Ok(None)
}

#[cfg(test)]
pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
self.reader_builder.parquet_meta.clone()
}
}
8 changes: 7 additions & 1 deletion src/mito2/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::SEQUENCE_COLUMN_NAME;

use super::helper::parse_parquet_metadata;
use crate::error::{InvalidMetadataSnafu, Result, WriteBufferSnafu};
use crate::read::{Batch, Source};
use crate::sst::parquet::format::WriteFormat;
Expand Down Expand Up @@ -107,15 +108,20 @@ impl ParquetWriter {
return Ok(None);
}

let (_file_meta, file_size) = buffered_writer.close().await.context(WriteBufferSnafu)?;
let (file_meta, file_size) = buffered_writer.close().await.context(WriteBufferSnafu)?;

// Safety: num rows > 0 so we must have min/max.
let time_range = stats.time_range.unwrap();

// convert FileMetaData to ParquetMetaData
let parquet_metadata = parse_parquet_metadata(file_meta)?;

// object_store.write will make sure all bytes are written or an error is raised.
Ok(Some(SstInfo {
time_range,
file_size,
num_rows: stats.num_rows,
file_metadata: Some(parquet_metadata),
}))
}

Expand Down
Loading