Skip to content

Commit

Permalink
feat: convert FileMetaData to ParquetMetaData (#2980)
Browse files Browse the repository at this point in the history
* feat: can convert Format FileMetaData to ParquetMetaData

* test: parquet metadata equal

* chore: test

* chore: cr comment

Co-authored-by: fys <[email protected]>

* chore: cr comment

* refactor: type name

* chore: cr comment

Co-authored-by: Yingwen <[email protected]>

---------

Co-authored-by: fys <[email protected]>
Co-authored-by: Yingwen <[email protected]>
  • Loading branch information
3 people authored Dec 26, 2023
1 parent 3bd2f79 commit 95f172e
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 1 deletion.
8 changes: 8 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,13 @@ pub enum Error {
error: ArrowError,
location: Location,
},

#[snafu(display("Invalid file metadata"))]
ConvertMetaData {
location: Location,
#[snafu(source)]
error: parquet::errors::ParquetError,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -477,6 +484,7 @@ impl ErrorExt for Error {
InvalidBatch { .. } => StatusCode::InvalidArguments,
InvalidRecordBatch { .. } => StatusCode::InvalidArguments,
ConvertVector { source, .. } => source.status_code(),
ConvertMetaData { .. } => StatusCode::Internal,
ComputeArrow { .. } => StatusCode::Internal,
ComputeVector { .. } => StatusCode::Internal,
PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments,
Expand Down
68 changes: 68 additions & 0 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
//! SST in parquet format.
mod format;
mod helper;
mod page_reader;
pub mod reader;
pub mod row_group;
mod stats;
pub mod writer;

use common_base::readable_size::ReadableSize;
use parquet::file::metadata::ParquetMetaData;

use crate::sst::file::FileTimeRange;

Expand Down Expand Up @@ -59,6 +61,8 @@ pub struct SstInfo {
pub file_size: u64,
/// Number of rows.
pub num_rows: usize,
/// File Meta Data
pub file_metadata: Option<ParquetMetaData>,
}

#[cfg(test)]
Expand Down Expand Up @@ -195,4 +199,68 @@ mod tests {
};
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none());
}

#[tokio::test]
async fn test_parquet_metadata_eq() {
// create test env
let mut env = crate::test_util::TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
]);
let write_opts = WriteOptions {
row_group_size: 50,
..Default::default()
};

// write the sst file and get sst info
// sst info contains the parquet metadata, which is converted from FileMetaData
let mut writer =
ParquetWriter::new(file_path, metadata.clone(), source, object_store.clone());
let sst_info = writer
.write_all(&write_opts)
.await
.unwrap()
.expect("write_all should return sst info");
let writer_metadata = sst_info.file_metadata.unwrap();

// read the sst file metadata
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
let reader = builder.build().await.unwrap();
let reader_metadata = reader.parquet_metadata();

// Because ParquetMetaData doesn't implement PartialEq,
// check all fields manually
macro_rules! assert_metadata {
( $writer:expr, $reader:expr, $($method:ident,)+ ) => {
$(
assert_eq!($writer.$method(), $reader.$method());
)+
}
}

assert_metadata!(
writer_metadata.file_metadata(),
reader_metadata.file_metadata(),
version,
num_rows,
created_by,
key_value_metadata,
schema_descr,
column_orders,
);

assert_metadata!(
writer_metadata,
reader_metadata,
row_groups,
column_index,
offset_index,
);
}
}
86 changes: 86 additions & 0 deletions src/mito2/src/sst/parquet/helper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use parquet::basic::ColumnOrder;
use parquet::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
use parquet::format;
use parquet::schema::types::{from_thrift, SchemaDescriptor};
use snafu::ResultExt;

use crate::error;
use crate::error::Result;

// Refer to https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L74-L90
/// Convert [format::FileMetaData] to [ParquetMetaData]
pub fn parse_parquet_metadata(t_file_metadata: format::FileMetaData) -> Result<ParquetMetaData> {
let schema = from_thrift(&t_file_metadata.schema).context(error::ConvertMetaDataSnafu)?;
let schema_desc_ptr = Arc::new(SchemaDescriptor::new(schema));

let mut row_groups = Vec::with_capacity(t_file_metadata.row_groups.len());
for rg in t_file_metadata.row_groups {
row_groups.push(
RowGroupMetaData::from_thrift(schema_desc_ptr.clone(), rg)
.context(error::ConvertMetaDataSnafu)?,
);
}
let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_desc_ptr);

let file_metadata = FileMetaData::new(
t_file_metadata.version,
t_file_metadata.num_rows,
t_file_metadata.created_by,
t_file_metadata.key_value_metadata,
schema_desc_ptr,
column_orders,
);
// There may be a problem owing to lacking of column_index and offset_index,
// if we open page index in the future.
Ok(ParquetMetaData::new(file_metadata, row_groups))
}

// Port from https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L106-L137
/// Parses column orders from Thrift definition.
/// If no column orders are defined, returns `None`.
fn parse_column_orders(
t_column_orders: Option<Vec<format::ColumnOrder>>,
schema_descr: &SchemaDescriptor,
) -> Option<Vec<ColumnOrder>> {
match t_column_orders {
Some(orders) => {
// Should always be the case
assert_eq!(
orders.len(),
schema_descr.num_columns(),
"Column order length mismatch"
);
let mut res = Vec::with_capacity(schema_descr.num_columns());
for (i, column) in schema_descr.columns().iter().enumerate() {
match orders[i] {
format::ColumnOrder::TYPEORDER(_) => {
let sort_order = ColumnOrder::get_sort_order(
column.logical_type(),
column.converted_type(),
column.physical_type(),
);
res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
}
}
}
Some(res)
}
None => None,
}
}
5 changes: 5 additions & 0 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,4 +452,9 @@ impl ParquetReader {

Ok(None)
}

#[cfg(test)]
pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
self.reader_builder.parquet_meta.clone()
}
}
8 changes: 7 additions & 1 deletion src/mito2/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::SEQUENCE_COLUMN_NAME;

use super::helper::parse_parquet_metadata;
use crate::error::{InvalidMetadataSnafu, Result, WriteBufferSnafu};
use crate::read::{Batch, Source};
use crate::sst::parquet::format::WriteFormat;
Expand Down Expand Up @@ -107,15 +108,20 @@ impl ParquetWriter {
return Ok(None);
}

let (_file_meta, file_size) = buffered_writer.close().await.context(WriteBufferSnafu)?;
let (file_meta, file_size) = buffered_writer.close().await.context(WriteBufferSnafu)?;

// Safety: num rows > 0 so we must have min/max.
let time_range = stats.time_range.unwrap();

// convert FileMetaData to ParquetMetaData
let parquet_metadata = parse_parquet_metadata(file_meta)?;

// object_store.write will make sure all bytes are written or an error is raised.
Ok(Some(SstInfo {
time_range,
file_size,
num_rows: stats.num_rows,
file_metadata: Some(parquet_metadata),
}))
}

Expand Down

0 comments on commit 95f172e

Please sign in to comment.