diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index e60771784297..7d977a328ca1 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -80,18 +80,15 @@ impl CacheManager { CacheManagerBuilder::default() } - /// Gets cached [ParquetMetaData]. + /// Gets cached [ParquetMetaData] from in-memory cache first. + /// If not found, tries to get it from write cache and fill the in-memory cache. pub async fn get_parquet_meta_data( &self, region_id: RegionId, file_id: FileId, ) -> Option> { // Try to get metadata from sst meta cache - let metadata = self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| { - let value = sst_meta_cache.get(&SstMetaKey(region_id, file_id)); - update_hit_miss(value, SST_META_TYPE) - }); - + let metadata = self.get_parquet_meta_data_from_mem_cache(region_id, file_id); if metadata.is_some() { return metadata; } @@ -110,6 +107,20 @@ impl CacheManager { None } + /// Gets cached [ParquetMetaData] from in-memory cache. + /// This method does not perform I/O. + pub fn get_parquet_meta_data_from_mem_cache( + &self, + region_id: RegionId, + file_id: FileId, + ) -> Option> { + // Try to get metadata from sst meta cache + self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| { + let value = sst_meta_cache.get(&SstMetaKey(region_id, file_id)); + update_hit_miss(value, SST_META_TYPE) + }) + } + /// Puts [ParquetMetaData] into the cache. pub fn put_parquet_meta_data( &self, diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs index 50cda4aabb83..677b37354d5b 100644 --- a/src/mito2/src/read/range.rs +++ b/src/mito2/src/read/range.rs @@ -18,15 +18,17 @@ use common_time::Timestamp; use smallvec::{smallvec, SmallVec}; use store_api::region_engine::PartitionRange; +use crate::cache::CacheManager; use crate::memtable::MemtableRef; use crate::read::scan_region::ScanInput; use crate::sst::file::{overlaps, FileHandle, FileTimeRange}; +use crate::sst::parquet::format::parquet_row_group_time_range; use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE; const ALL_ROW_GROUPS: i64 = -1; /// Index to access a row group. -#[derive(Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq)] pub(crate) struct RowGroupIndex { /// Index to the memtable/file. pub(crate) index: usize, @@ -38,6 +40,7 @@ pub(crate) struct RowGroupIndex { /// Meta data of a partition range. /// If the scanner is [UnorderedScan], each meta only has one row group or memtable. /// If the scanner is [SeqScan], each meta may have multiple row groups and memtables. +#[derive(Debug, PartialEq)] pub(crate) struct RangeMeta { /// The time range of the range. pub(crate) time_range: FileTimeRange, @@ -84,7 +87,12 @@ impl RangeMeta { pub(crate) fn unordered_scan_ranges(input: &ScanInput) -> Vec { let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len()); Self::push_unordered_mem_ranges(&input.memtables, &mut ranges); - Self::push_unordered_file_ranges(input.memtables.len(), &input.files, &mut ranges); + Self::push_unordered_file_ranges( + input.memtables.len(), + &input.files, + input.cache_manager.as_deref(), + &mut ranges, + ); ranges } @@ -164,12 +172,36 @@ impl RangeMeta { fn push_unordered_file_ranges( num_memtables: usize, files: &[FileHandle], + cache: Option<&CacheManager>, ranges: &mut Vec, ) { // For append mode, we can parallelize reading row groups. for (i, file) in files.iter().enumerate() { let file_index = num_memtables + i; - if file.meta_ref().num_row_groups > 0 { + // Get parquet meta from the cache. + let parquet_meta = cache.and_then(|c| { + c.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id()) + }); + if let Some(parquet_meta) = parquet_meta { + // Scans each row group. + for row_group_index in 0..file.meta_ref().num_row_groups { + let time_range = parquet_row_group_time_range( + file.meta_ref(), + &parquet_meta, + row_group_index as usize, + ); + let num_rows = parquet_meta.row_group(row_group_index as usize).num_rows(); + ranges.push(RangeMeta { + time_range: time_range.unwrap_or_else(|| file.time_range()), + indices: smallvec![file_index], + row_group_indices: smallvec![RowGroupIndex { + index: file_index, + row_group_index: row_group_index as i64, + }], + num_rows: num_rows as usize, + }); + } + } else if file.meta_ref().num_row_groups > 0 { // Scans each row group. for row_group_index in 0..file.meta_ref().num_row_groups { ranges.push(RangeMeta { @@ -217,7 +249,6 @@ impl RangeMeta { } } - // TODO(yingwen): Support multiple row groups in a range so we can split them later. fn push_seq_file_ranges( num_memtables: usize, files: &[FileHandle], @@ -226,15 +257,31 @@ impl RangeMeta { // For non append-only mode, each range only contains one file. for (i, file) in files.iter().enumerate() { let file_index = num_memtables + i; - ranges.push(RangeMeta { - time_range: file.time_range(), - indices: smallvec![file_index], - row_group_indices: smallvec![RowGroupIndex { - index: file_index, - row_group_index: ALL_ROW_GROUPS, - }], - num_rows: file.meta_ref().num_rows as usize, - }); + if file.meta_ref().num_row_groups > 0 { + // All row groups share the same time range. + let row_group_indices = (0..file.meta_ref().num_row_groups) + .map(|row_group_index| RowGroupIndex { + index: file_index, + row_group_index: row_group_index as i64, + }) + .collect(); + ranges.push(RangeMeta { + time_range: file.time_range(), + indices: smallvec![file_index], + row_group_indices, + num_rows: file.meta_ref().num_rows as usize, + }); + } else { + ranges.push(RangeMeta { + time_range: file.time_range(), + indices: smallvec![file_index], + row_group_indices: smallvec![RowGroupIndex { + index: file_index, + row_group_index: ALL_ROW_GROUPS, + }], + num_rows: file.meta_ref().num_rows as usize, + }); + } } } } @@ -366,4 +413,212 @@ mod tests { &[(vec![3], 0, 1000), (vec![1, 2], 3000, 6000)], ); } + + #[test] + fn test_merge_range() { + let mut left = RangeMeta { + time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)), + indices: smallvec![1], + row_group_indices: smallvec![ + RowGroupIndex { + index: 1, + row_group_index: 1 + }, + RowGroupIndex { + index: 1, + row_group_index: 2 + } + ], + num_rows: 5, + }; + let right = RangeMeta { + time_range: (Timestamp::new_second(800), Timestamp::new_second(1200)), + indices: smallvec![2], + row_group_indices: smallvec![ + RowGroupIndex { + index: 2, + row_group_index: 1 + }, + RowGroupIndex { + index: 2, + row_group_index: 2 + } + ], + num_rows: 4, + }; + left.merge(right); + + assert_eq!( + left, + RangeMeta { + time_range: (Timestamp::new_second(800), Timestamp::new_second(2000)), + indices: smallvec![1, 2], + row_group_indices: smallvec![ + RowGroupIndex { + index: 1, + row_group_index: 1 + }, + RowGroupIndex { + index: 1, + row_group_index: 2 + }, + RowGroupIndex { + index: 2, + row_group_index: 1 + }, + RowGroupIndex { + index: 2, + row_group_index: 2 + }, + ], + num_rows: 9, + } + ); + } + + #[test] + fn test_split_range() { + let range = RangeMeta { + time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)), + indices: smallvec![1], + row_group_indices: smallvec![ + RowGroupIndex { + index: 1, + row_group_index: 1 + }, + RowGroupIndex { + index: 1, + row_group_index: 2 + } + ], + num_rows: 5, + }; + + assert!(range.can_split_preserve_order()); + let mut output = Vec::new(); + range.maybe_split(&mut output); + + assert_eq!( + output, + &[ + RangeMeta { + time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)), + indices: smallvec![1], + row_group_indices: smallvec![RowGroupIndex { + index: 1, + row_group_index: 1 + },], + num_rows: 2, + }, + RangeMeta { + time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)), + indices: smallvec![1], + row_group_indices: smallvec![RowGroupIndex { + index: 1, + row_group_index: 2 + }], + num_rows: 2, + } + ] + ); + } + + #[test] + fn test_not_split_range() { + let range = RangeMeta { + time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)), + indices: smallvec![1, 2], + row_group_indices: smallvec![ + RowGroupIndex { + index: 1, + row_group_index: 1 + }, + RowGroupIndex { + index: 2, + row_group_index: 1 + } + ], + num_rows: 5, + }; + + assert!(!range.can_split_preserve_order()); + let mut output = Vec::new(); + range.maybe_split(&mut output); + assert_eq!(1, output.len()); + } + + #[test] + fn test_maybe_split_ranges() { + let ranges = vec![ + RangeMeta { + time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)), + indices: smallvec![1], + row_group_indices: smallvec![ + RowGroupIndex { + index: 1, + row_group_index: 0 + }, + RowGroupIndex { + index: 1, + row_group_index: 1 + } + ], + num_rows: 4, + }, + RangeMeta { + time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)), + indices: smallvec![2, 3], + row_group_indices: smallvec![ + RowGroupIndex { + index: 2, + row_group_index: 0 + }, + RowGroupIndex { + index: 3, + row_group_index: 0 + } + ], + num_rows: 5, + }, + ]; + let output = maybe_split_ranges_for_seq_scan(ranges); + assert_eq!( + output, + vec![ + RangeMeta { + time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)), + indices: smallvec![1], + row_group_indices: smallvec![RowGroupIndex { + index: 1, + row_group_index: 0 + },], + num_rows: 2, + }, + RangeMeta { + time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)), + indices: smallvec![1], + row_group_indices: smallvec![RowGroupIndex { + index: 1, + row_group_index: 1 + }], + num_rows: 2, + }, + RangeMeta { + time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)), + indices: smallvec![2, 3], + row_group_indices: smallvec![ + RowGroupIndex { + index: 2, + row_group_index: 0 + }, + RowGroupIndex { + index: 3, + row_group_index: 0 + } + ], + num_rows: 5, + }, + ] + ) + } } diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 50d7a57cc189..451ec44f1cd2 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -111,7 +111,8 @@ pub struct FileMeta { pub region_id: RegionId, /// Compared to normal file names, FileId ignore the extension pub file_id: FileId, - /// Timestamp range of file. + /// Timestamp range of file. The timestamps have the same time unit as the + /// data in the SST. pub time_range: FileTimeRange, /// SST level of the file. pub level: Level, diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index cc0cff605b5c..ae51a0d37c29 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -62,7 +62,8 @@ impl Default for WriteOptions { /// Parquet SST info returned by the writer. pub struct SstInfo { - /// Time range of the SST. + /// Time range of the SST. The timestamps have the same time unit as the + /// data in the SST. pub time_range: FileTimeRange, /// File size in bytes. pub file_size: u64, diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 9f18a3390cb2..f2d6c7614b1e 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -31,13 +31,14 @@ use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use api::v1::SemanticType; +use common_time::Timestamp; use datafusion_common::ScalarValue; use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array}; use datatypes::arrow::datatypes::{SchemaRef, UInt32Type}; use datatypes::arrow::record_batch::RecordBatch; use datatypes::prelude::DataType; use datatypes::vectors::{Helper, Vector}; -use parquet::file::metadata::RowGroupMetaData; +use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; use parquet::file::statistics::Statistics; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; @@ -48,6 +49,7 @@ use crate::error::{ }; use crate::read::{Batch, BatchBuilder, BatchColumn}; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; +use crate::sst::file::{FileMeta, FileTimeRange}; use crate::sst::to_sst_arrow_schema; /// Arrow array type for the primary key dictionary. @@ -558,6 +560,50 @@ fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef { Arc::new(DictionaryArray::new(keys, values)) } +/// Gets the min/max time index of the row group from the parquet meta. +/// It assumes the parquet is created by the mito engine. +pub(crate) fn parquet_row_group_time_range( + file_meta: &FileMeta, + parquet_meta: &ParquetMetaData, + row_group_idx: usize, +) -> Option { + let row_group_meta = parquet_meta.row_group(row_group_idx); + let num_columns = parquet_meta.file_metadata().schema_descr().num_columns(); + assert!( + num_columns >= FIXED_POS_COLUMN_NUM, + "file only has {} columns", + num_columns + ); + let time_index_pos = num_columns - FIXED_POS_COLUMN_NUM; + + let stats = row_group_meta.column(time_index_pos).statistics()?; + if stats.has_min_max_set() { + // The physical type for the timestamp should be i64. + let (min, max) = match stats { + Statistics::Int64(value_stats) => (*value_stats.min(), *value_stats.max()), + Statistics::Int32(_) + | Statistics::Boolean(_) + | Statistics::Int96(_) + | Statistics::Float(_) + | Statistics::Double(_) + | Statistics::ByteArray(_) + | Statistics::FixedLenByteArray(_) => return None, + }; + + debug_assert!( + min >= file_meta.time_range.0.value() && min <= file_meta.time_range.1.value() + ); + debug_assert!( + max >= file_meta.time_range.0.value() && max <= file_meta.time_range.1.value() + ); + let unit = file_meta.time_range.0.unit(); + + Some((Timestamp::new(min, unit), Timestamp::new(max, unit))) + } else { + None + } +} + #[cfg(test)] mod tests { use api::v1::OpType;