diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index b1d48f8c654e..39457281d76b 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -416,6 +416,13 @@ pub enum Error { error: ArrowError, location: Location, }, + + #[snafu(display("Invalid file metadata"))] + ConvertMetaData { + location: Location, + #[snafu(source)] + error: parquet::errors::ParquetError, + }, } pub type Result = std::result::Result; @@ -477,6 +484,7 @@ impl ErrorExt for Error { InvalidBatch { .. } => StatusCode::InvalidArguments, InvalidRecordBatch { .. } => StatusCode::InvalidArguments, ConvertVector { source, .. } => source.status_code(), + ConvertMetaData { .. } => StatusCode::Internal, ComputeArrow { .. } => StatusCode::Internal, ComputeVector { .. } => StatusCode::Internal, PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments, diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index af3f8479f39c..584faf1ab964 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -15,6 +15,7 @@ //! SST in parquet format. mod format; +mod helper; mod page_reader; pub mod reader; pub mod row_group; @@ -22,6 +23,7 @@ mod stats; pub mod writer; use common_base::readable_size::ReadableSize; +use parquet::file::metadata::ParquetMetaData; use crate::sst::file::FileTimeRange; @@ -59,6 +61,8 @@ pub struct SstInfo { pub file_size: u64, /// Number of rows. pub num_rows: usize, + /// File Meta Data + pub file_metadata: Option, } #[cfg(test)] @@ -195,4 +199,68 @@ mod tests { }; assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none()); } + + #[tokio::test] + async fn test_parquet_metadata_eq() { + // create test env + let mut env = crate::test_util::TestEnv::new(); + let object_store = env.init_object_store_manager(); + let handle = sst_file_handle(0, 1000); + let file_path = handle.file_path(FILE_DIR); + let metadata = Arc::new(sst_region_metadata()); + let source = new_source(&[ + new_batch_by_range(&["a", "d"], 0, 60), + new_batch_by_range(&["b", "f"], 0, 40), + new_batch_by_range(&["b", "h"], 100, 200), + ]); + let write_opts = WriteOptions { + row_group_size: 50, + ..Default::default() + }; + + // write the sst file and get sst info + // sst info contains the parquet metadata, which is converted from FileMetaData + let mut writer = + ParquetWriter::new(file_path, metadata.clone(), source, object_store.clone()); + let sst_info = writer + .write_all(&write_opts) + .await + .unwrap() + .expect("write_all should return sst info"); + let writer_metadata = sst_info.file_metadata.unwrap(); + + // read the sst file metadata + let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store); + let reader = builder.build().await.unwrap(); + let reader_metadata = reader.parquet_metadata(); + + // Because ParquetMetaData doesn't implement PartialEq, + // check all fields manually + macro_rules! assert_metadata { + ( $writer:expr, $reader:expr, $($method:ident,)+ ) => { + $( + assert_eq!($writer.$method(), $reader.$method()); + )+ + } + } + + assert_metadata!( + writer_metadata.file_metadata(), + reader_metadata.file_metadata(), + version, + num_rows, + created_by, + key_value_metadata, + schema_descr, + column_orders, + ); + + assert_metadata!( + writer_metadata, + reader_metadata, + row_groups, + column_index, + offset_index, + ); + } } diff --git a/src/mito2/src/sst/parquet/helper.rs b/src/mito2/src/sst/parquet/helper.rs new file mode 100644 index 000000000000..6e059bd963e5 --- /dev/null +++ b/src/mito2/src/sst/parquet/helper.rs @@ -0,0 +1,86 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use parquet::basic::ColumnOrder; +use parquet::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData}; +use parquet::format; +use parquet::schema::types::{from_thrift, SchemaDescriptor}; +use snafu::ResultExt; + +use crate::error; +use crate::error::Result; + +// Refer to https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L74-L90 +/// Convert [format::FileMetaData] to [ParquetMetaData] +pub fn parse_parquet_metadata(t_file_metadata: format::FileMetaData) -> Result { + let schema = from_thrift(&t_file_metadata.schema).context(error::ConvertMetaDataSnafu)?; + let schema_desc_ptr = Arc::new(SchemaDescriptor::new(schema)); + + let mut row_groups = Vec::with_capacity(t_file_metadata.row_groups.len()); + for rg in t_file_metadata.row_groups { + row_groups.push( + RowGroupMetaData::from_thrift(schema_desc_ptr.clone(), rg) + .context(error::ConvertMetaDataSnafu)?, + ); + } + let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_desc_ptr); + + let file_metadata = FileMetaData::new( + t_file_metadata.version, + t_file_metadata.num_rows, + t_file_metadata.created_by, + t_file_metadata.key_value_metadata, + schema_desc_ptr, + column_orders, + ); + // There may be a problem owing to lacking of column_index and offset_index, + // if we open page index in the future. + Ok(ParquetMetaData::new(file_metadata, row_groups)) +} + +// Port from https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L106-L137 +/// Parses column orders from Thrift definition. +/// If no column orders are defined, returns `None`. +fn parse_column_orders( + t_column_orders: Option>, + schema_descr: &SchemaDescriptor, +) -> Option> { + match t_column_orders { + Some(orders) => { + // Should always be the case + assert_eq!( + orders.len(), + schema_descr.num_columns(), + "Column order length mismatch" + ); + let mut res = Vec::with_capacity(schema_descr.num_columns()); + for (i, column) in schema_descr.columns().iter().enumerate() { + match orders[i] { + format::ColumnOrder::TYPEORDER(_) => { + let sort_order = ColumnOrder::get_sort_order( + column.logical_type(), + column.converted_type(), + column.physical_type(), + ); + res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order)); + } + } + } + Some(res) + } + None => None, + } +} diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 0882ef82c7e3..60729c664283 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -452,4 +452,9 @@ impl ParquetReader { Ok(None) } + + #[cfg(test)] + pub fn parquet_metadata(&self) -> Arc { + self.reader_builder.parquet_meta.clone() + } } diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index d776b3ac627d..febec27c0d36 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -26,6 +26,7 @@ use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::SEQUENCE_COLUMN_NAME; +use super::helper::parse_parquet_metadata; use crate::error::{InvalidMetadataSnafu, Result, WriteBufferSnafu}; use crate::read::{Batch, Source}; use crate::sst::parquet::format::WriteFormat; @@ -107,15 +108,20 @@ impl ParquetWriter { return Ok(None); } - let (_file_meta, file_size) = buffered_writer.close().await.context(WriteBufferSnafu)?; + let (file_meta, file_size) = buffered_writer.close().await.context(WriteBufferSnafu)?; + // Safety: num rows > 0 so we must have min/max. let time_range = stats.time_range.unwrap(); + // convert FileMetaData to ParquetMetaData + let parquet_metadata = parse_parquet_metadata(file_meta)?; + // object_store.write will make sure all bytes are written or an error is raised. Ok(Some(SstInfo { time_range, file_size, num_rows: stats.num_rows, + file_metadata: Some(parquet_metadata), })) }