diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 82b86a21554c..1baffd4a7fa1 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -723,10 +723,20 @@ pub enum Error { #[snafu(display("Failed to iter data part"))] ReadDataPart { + #[snafu(implicit)] + location: Location, #[snafu(source)] error: parquet::errors::ParquetError, }, + #[snafu(display("Failed to read row group in memtable"))] + DecodeArrowRowGroup { + #[snafu(source)] + error: ArrowError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Invalid region options, {}", reason))] InvalidRegionOptions { reason: String, @@ -1029,6 +1039,7 @@ impl ErrorExt for Error { RegionBusy { .. } => StatusCode::RegionBusy, GetSchemaMetadata { source, .. } => source.status_code(), Timeout { .. } => StatusCode::Cancelled, + DecodeArrowRowGroup { .. } => StatusCode::Internal, } } diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index 96e6c70acdf9..8bbbda8ca367 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -27,8 +27,12 @@ use crate::memtable::{ BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef, MemtableStats, }; +#[allow(unused)] +mod context; #[allow(unused)] pub(crate) mod part; +mod part_reader; +mod row_group_reader; #[derive(Debug)] pub struct BulkMemtable { diff --git a/src/mito2/src/memtable/bulk/context.rs b/src/mito2/src/memtable/bulk/context.rs new file mode 100644 index 000000000000..8929ae4383b2 --- /dev/null +++ b/src/mito2/src/memtable/bulk/context.rs @@ -0,0 +1,117 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Context for iterating bulk memtable. + +use std::collections::VecDeque; +use std::sync::Arc; + +use parquet::file::metadata::ParquetMetaData; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::ColumnId; +use table::predicate::Predicate; + +use crate::row_converter::McmpRowCodec; +use crate::sst::parquet::file_range::RangeBase; +use crate::sst::parquet::format::ReadFormat; +use crate::sst::parquet::reader::SimpleFilterContext; +use crate::sst::parquet::stats::RowGroupPruningStats; + +pub(crate) type BulkIterContextRef = Arc; + +pub(crate) struct BulkIterContext { + pub(crate) base: RangeBase, + pub(crate) predicate: Option, +} + +impl BulkIterContext { + pub(crate) fn new( + region_metadata: RegionMetadataRef, + projection: &Option<&[ColumnId]>, + predicate: Option, + ) -> Self { + let codec = McmpRowCodec::new_with_primary_keys(®ion_metadata); + + let simple_filters = predicate + .as_ref() + .iter() + .flat_map(|predicate| { + predicate + .exprs() + .iter() + .filter_map(|expr| SimpleFilterContext::new_opt(®ion_metadata, None, expr)) + }) + .collect(); + + let read_format = build_read_format(region_metadata, projection); + + Self { + base: RangeBase { + filters: simple_filters, + read_format, + codec, + // we don't need to compat batch since all batch in memtable have the same schema. + compat_batch: None, + }, + predicate, + } + } + + /// Prunes row groups by stats. + pub(crate) fn row_groups_to_read(&self, file_meta: &Arc) -> VecDeque { + let region_meta = self.base.read_format.metadata(); + let row_groups = file_meta.row_groups(); + // expected_metadata is set to None since we always expect region metadata of memtable is up-to-date. + let stats = RowGroupPruningStats::new(row_groups, &self.base.read_format, None); + if let Some(predicate) = self.predicate.as_ref() { + predicate + .prune_with_stats(&stats, region_meta.schema.arrow_schema()) + .iter() + .zip(0..file_meta.num_row_groups()) + .filter_map(|(selected, row_group)| { + if !*selected { + return None; + } + Some(row_group) + }) + .collect::>() + } else { + (0..file_meta.num_row_groups()).collect() + } + } + + pub(crate) fn read_format(&self) -> &ReadFormat { + &self.base.read_format + } +} + +fn build_read_format( + region_metadata: RegionMetadataRef, + projection: &Option<&[ColumnId]>, +) -> ReadFormat { + let read_format = if let Some(column_ids) = &projection { + ReadFormat::new(region_metadata, column_ids.iter().copied()) + } else { + // No projection, lists all column ids to read. + ReadFormat::new( + region_metadata.clone(), + region_metadata + .column_metadatas + .iter() + .map(|col| col.column_id), + ) + }; + + read_format +} diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 9e806a46bfd9..89147abf1e8d 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -13,10 +13,12 @@ // limitations under the License. //! Bulk part encoder/decoder. + use std::collections::VecDeque; use std::sync::Arc; use api::v1::Mutation; +use bytes::Bytes; use common_time::timestamp::TimeUnit; use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder}; use datatypes::arrow; @@ -26,93 +28,145 @@ use datatypes::arrow::array::{ UInt8Builder, }; use datatypes::arrow::compute::TakeOptions; -use datatypes::arrow::datatypes::{DataType as ArrowDataType, SchemaRef}; +use datatypes::arrow::datatypes::SchemaRef; use datatypes::arrow_array::BinaryArray; use datatypes::data_type::DataType; use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector}; -use datatypes::types::TimestampType; +use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::ArrowWriter; use parquet::data_type::AsBytes; +use parquet::file::metadata::ParquetMetaData; +use parquet::file::properties::WriterProperties; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; +use store_api::storage::ColumnId; +use table::predicate::Predicate; +use crate::error; use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu, Result}; +use crate::memtable::bulk::context::BulkIterContextRef; +use crate::memtable::bulk::part_reader::BulkPartIter; use crate::memtable::key_values::KeyValuesRef; -use crate::read::Batch; +use crate::memtable::BoxedBatchIterator; use crate::row_converter::{McmpRowCodec, RowCodec}; -use crate::sst::parquet::format::PrimaryKeyArray; +use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat}; +use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::to_sst_arrow_schema; #[derive(Debug)] pub struct BulkPart { - data: Vec, + data: Bytes, metadata: BulkPartMeta, } impl BulkPart { - pub fn new(data: Vec, metadata: BulkPartMeta) -> Self { + pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self { Self { data, metadata } } pub(crate) fn metadata(&self) -> &BulkPartMeta { &self.metadata } + + pub(crate) fn read(&self, context: BulkIterContextRef) -> Result> { + // use predicate to find row groups to read. + let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata); + + if row_groups_to_read.is_empty() { + // All row groups are filtered. + return Ok(None); + } + + let iter = BulkPartIter::try_new( + context, + row_groups_to_read, + self.metadata.parquet_metadata.clone(), + self.data.clone(), + )?; + Ok(Some(Box::new(iter) as BoxedBatchIterator)) + } } #[derive(Debug)] pub struct BulkPartMeta { + /// Total rows in part. pub num_rows: usize, + /// Max timestamp in part. pub max_timestamp: i64, + /// Min timestamp in part. pub min_timestamp: i64, -} - -impl Default for BulkPartMeta { - fn default() -> Self { - Self { - num_rows: 0, - max_timestamp: i64::MIN, - min_timestamp: i64::MAX, - } - } + /// Part file metadata. + pub parquet_metadata: Arc, + /// Part region schema. + pub region_metadata: RegionMetadataRef, } pub struct BulkPartEncoder { metadata: RegionMetadataRef, - arrow_schema: SchemaRef, pk_encoder: McmpRowCodec, + row_group_size: usize, dedup: bool, + writer_props: Option, +} + +impl BulkPartEncoder { + pub(crate) fn new( + metadata: RegionMetadataRef, + dedup: bool, + row_group_size: usize, + ) -> BulkPartEncoder { + let codec = McmpRowCodec::new_with_primary_keys(&metadata); + let writer_props = Some( + WriterProperties::builder() + .set_write_batch_size(row_group_size) + .set_max_row_group_size(row_group_size) + .build(), + ); + Self { + metadata, + pk_encoder: codec, + row_group_size, + dedup, + writer_props, + } + } } impl BulkPartEncoder { /// Encodes mutations to a [BulkPart], returns true if encoded data has been written to `dest`. - fn encode_mutations(&self, mutations: &[Mutation], dest: &mut BulkPart) -> Result { + fn encode_mutations(&self, mutations: &[Mutation]) -> Result> { let Some((arrow_record_batch, min_ts, max_ts)) = - mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, false)? + mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, self.dedup)? else { - return Ok(false); + return Ok(None); }; + let mut buf = Vec::with_capacity(4096); let arrow_schema = arrow_record_batch.schema(); - { - let mut writer = ArrowWriter::try_new(&mut dest.data, arrow_schema, None) - .context(EncodeMemtableSnafu)?; + + let file_metadata = { + let mut writer = + ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone()) + .context(EncodeMemtableSnafu)?; writer .write(&arrow_record_batch) .context(EncodeMemtableSnafu)?; - let _metadata = writer.finish().context(EncodeMemtableSnafu)?; - } - - dest.metadata = BulkPartMeta { - num_rows: arrow_record_batch.num_rows(), - max_timestamp: max_ts, - min_timestamp: min_ts, + writer.finish().context(EncodeMemtableSnafu)? }; - Ok(true) - } - /// Decodes [BulkPart] to [Batch]es. - fn decode_to_batches(&self, _part: &BulkPart, _dest: &mut VecDeque) -> Result<()> { - todo!() + let buf = Bytes::from(buf); + let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?); + + Ok(Some(BulkPart { + data: buf, + metadata: BulkPartMeta { + num_rows: arrow_record_batch.num_rows(), + max_timestamp: max_ts, + min_timestamp: min_ts, + parquet_metadata, + region_metadata: self.metadata.clone(), + }, + })) } } @@ -379,10 +433,12 @@ fn binary_array_to_dictionary(input: &BinaryArray) -> Result { mod tests { use std::collections::VecDeque; + use datafusion_common::ScalarValue; use datatypes::prelude::{ScalarVector, Value}; use datatypes::vectors::{Float64Vector, TimestampMillisecondVector}; use super::*; + use crate::memtable::bulk::context::BulkIterContext; use crate::sst::parquet::format::ReadFormat; use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test}; @@ -444,7 +500,7 @@ mod tests { k0: &'a str, k1: u32, timestamps: &'a [i64], - v0: &'a [Option], + v1: &'a [Option], sequence: u64, } @@ -452,7 +508,7 @@ mod tests { struct BatchOutput<'a> { pk_values: &'a [Value], timestamps: &'a [i64], - v0: &'a [Option], + v1: &'a [Option], } fn check_mutations_to_record_batches( @@ -470,7 +526,7 @@ mod tests { m.k0.to_string(), m.k1, m.timestamps.iter().copied(), - m.v0.iter().copied(), + m.v1.iter().copied(), m.sequence, ) .mutation @@ -526,7 +582,7 @@ mod tests { for idx in 0..expected.len() { assert_eq!(expected[idx].pk_values, &batch_values[idx].0); assert_eq!(expected[idx].timestamps, &batch_values[idx].1); - assert_eq!(expected[idx].v0, &batch_values[idx].2); + assert_eq!(expected[idx].v1, &batch_values[idx].2); } } @@ -537,13 +593,13 @@ mod tests { k0: "a", k1: 0, timestamps: &[0], - v0: &[Some(0.1)], + v1: &[Some(0.1)], sequence: 0, }], &[BatchOutput { pk_values: &[Value::String("a".into()), Value::UInt32(0)], timestamps: &[0], - v0: &[Some(0.1)], + v1: &[Some(0.1)], }], (0, 0), true, @@ -555,28 +611,28 @@ mod tests { k0: "a", k1: 0, timestamps: &[0], - v0: &[Some(0.1)], + v1: &[Some(0.1)], sequence: 0, }, MutationInput { k0: "b", k1: 0, timestamps: &[0], - v0: &[Some(0.0)], + v1: &[Some(0.0)], sequence: 0, }, MutationInput { k0: "a", k1: 0, timestamps: &[1], - v0: &[Some(0.2)], + v1: &[Some(0.2)], sequence: 1, }, MutationInput { k0: "a", k1: 1, timestamps: &[1], - v0: &[Some(0.3)], + v1: &[Some(0.3)], sequence: 2, }, ], @@ -584,17 +640,17 @@ mod tests { BatchOutput { pk_values: &[Value::String("a".into()), Value::UInt32(0)], timestamps: &[0, 1], - v0: &[Some(0.1), Some(0.2)], + v1: &[Some(0.1), Some(0.2)], }, BatchOutput { pk_values: &[Value::String("a".into()), Value::UInt32(1)], timestamps: &[1], - v0: &[Some(0.3)], + v1: &[Some(0.3)], }, BatchOutput { pk_values: &[Value::String("b".into()), Value::UInt32(0)], timestamps: &[0], - v0: &[Some(0.0)], + v1: &[Some(0.0)], }, ], (0, 1), @@ -607,21 +663,21 @@ mod tests { k0: "a", k1: 0, timestamps: &[0], - v0: &[Some(0.1)], + v1: &[Some(0.1)], sequence: 0, }, MutationInput { k0: "b", k1: 0, timestamps: &[0], - v0: &[Some(0.0)], + v1: &[Some(0.0)], sequence: 0, }, MutationInput { k0: "a", k1: 0, timestamps: &[0], - v0: &[Some(0.2)], + v1: &[Some(0.2)], sequence: 1, }, ], @@ -629,12 +685,12 @@ mod tests { BatchOutput { pk_values: &[Value::String("a".into()), Value::UInt32(0)], timestamps: &[0], - v0: &[Some(0.2)], + v1: &[Some(0.2)], }, BatchOutput { pk_values: &[Value::String("b".into()), Value::UInt32(0)], timestamps: &[0], - v0: &[Some(0.0)], + v1: &[Some(0.0)], }, ], (0, 0), @@ -646,21 +702,21 @@ mod tests { k0: "a", k1: 0, timestamps: &[0], - v0: &[Some(0.1)], + v1: &[Some(0.1)], sequence: 0, }, MutationInput { k0: "b", k1: 0, timestamps: &[0], - v0: &[Some(0.0)], + v1: &[Some(0.0)], sequence: 0, }, MutationInput { k0: "a", k1: 0, timestamps: &[0], - v0: &[Some(0.2)], + v1: &[Some(0.2)], sequence: 1, }, ], @@ -668,16 +724,194 @@ mod tests { BatchOutput { pk_values: &[Value::String("a".into()), Value::UInt32(0)], timestamps: &[0, 0], - v0: &[Some(0.2), Some(0.1)], + v1: &[Some(0.2), Some(0.1)], }, BatchOutput { pk_values: &[Value::String("b".into()), Value::UInt32(0)], timestamps: &[0], - v0: &[Some(0.0)], + v1: &[Some(0.0)], }, ], (0, 0), false, ); } + + fn encode(input: &[MutationInput]) -> BulkPart { + let metadata = metadata_for_test(); + let mutations = input + .iter() + .map(|m| { + build_key_values_with_ts_seq_values( + &metadata, + m.k0.to_string(), + m.k1, + m.timestamps.iter().copied(), + m.v1.iter().copied(), + m.sequence, + ) + .mutation + }) + .collect::>(); + let encoder = BulkPartEncoder::new(metadata, true, 1024); + encoder.encode_mutations(&mutations).unwrap().unwrap() + } + + #[test] + fn test_write_and_read_part_projection() { + let part = encode(&[ + MutationInput { + k0: "a", + k1: 0, + timestamps: &[1], + v1: &[Some(0.1)], + sequence: 0, + }, + MutationInput { + k0: "b", + k1: 0, + timestamps: &[1], + v1: &[Some(0.0)], + sequence: 0, + }, + MutationInput { + k0: "a", + k1: 0, + timestamps: &[2], + v1: &[Some(0.2)], + sequence: 1, + }, + ]); + + let projection = &[4u32]; + + let mut reader = part + .read(Arc::new(BulkIterContext::new( + part.metadata.region_metadata.clone(), + &Some(projection.as_slice()), + None, + ))) + .unwrap() + .expect("expect at least one row group"); + + let mut total_rows_read = 0; + let mut field = vec![]; + for res in reader { + let batch = res.unwrap(); + assert_eq!(1, batch.fields().len()); + assert_eq!(4, batch.fields()[0].column_id); + field.extend( + batch.fields()[0] + .data + .as_any() + .downcast_ref::() + .unwrap() + .iter_data() + .map(|v| v.unwrap()), + ); + total_rows_read += batch.num_rows(); + } + assert_eq!(3, total_rows_read); + assert_eq!(vec![0.1, 0.2, 0.0], field); + } + + fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> BulkPart { + let metadata = metadata_for_test(); + let mutations = key_values + .into_iter() + .map(|(k0, k1, (start, end), sequence)| { + let ts = (start..end); + let v1 = (start..end).map(|_| None); + build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence) + .mutation + }) + .collect::>(); + let encoder = BulkPartEncoder::new(metadata, true, 100); + encoder.encode_mutations(&mutations).unwrap().unwrap() + } + + fn check_prune_row_group(part: &BulkPart, predicate: Option, expected_rows: usize) { + let context = Arc::new(BulkIterContext::new( + part.metadata.region_metadata.clone(), + &None, + predicate, + )); + let mut reader = part + .read(context) + .unwrap() + .expect("expect at least one row group"); + let mut total_rows_read = 0; + for res in reader { + let batch = res.unwrap(); + total_rows_read += batch.num_rows(); + } + // Should only read row group 1. + assert_eq!(expected_rows, total_rows_read); + } + + #[test] + fn test_prune_row_groups() { + let part = prepare(vec![ + ("a", 0, (0, 40), 1), + ("a", 1, (0, 60), 1), + ("b", 0, (0, 100), 2), + ("b", 1, (100, 180), 3), + ("b", 1, (180, 210), 4), + ]); + + let context = Arc::new(BulkIterContext::new( + part.metadata.region_metadata.clone(), + &None, + Some(Predicate::new(vec![datafusion_expr::col("ts").eq( + datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)), + )])), + )); + assert!(part.read(context).unwrap().is_none()); + + check_prune_row_group(&part, None, 310); + + check_prune_row_group( + &part, + Some(Predicate::new(vec![ + datafusion_expr::col("k0").eq(datafusion_expr::lit("a")), + datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)), + ])), + 40, + ); + + check_prune_row_group( + &part, + Some(Predicate::new(vec![ + datafusion_expr::col("k0").eq(datafusion_expr::lit("a")), + datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)), + ])), + 60, + ); + + check_prune_row_group( + &part, + Some(Predicate::new(vec![ + datafusion_expr::col("k0").eq(datafusion_expr::lit("a")) + ])), + 100, + ); + + check_prune_row_group( + &part, + Some(Predicate::new(vec![ + datafusion_expr::col("k0").eq(datafusion_expr::lit("b")), + datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)), + ])), + 100, + ); + + /// Predicates over field column can do precise filtering. + check_prune_row_group( + &part, + Some(Predicate::new(vec![ + datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)) + ])), + 1, + ); + } } diff --git a/src/mito2/src/memtable/bulk/part_reader.rs b/src/mito2/src/memtable/bulk/part_reader.rs new file mode 100644 index 000000000000..fdf3f81f5e11 --- /dev/null +++ b/src/mito2/src/memtable/bulk/part_reader.rs @@ -0,0 +1,149 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::VecDeque; +use std::sync::Arc; + +use bytes::Bytes; +use parquet::arrow::ProjectionMask; +use parquet::file::metadata::ParquetMetaData; + +use crate::error; +use crate::memtable::bulk::context::BulkIterContextRef; +use crate::memtable::bulk::row_group_reader::{ + MemtableRowGroupReader, MemtableRowGroupReaderBuilder, +}; +use crate::read::Batch; + +/// Iterator for reading data inside a bulk part. +pub struct BulkPartIter { + row_groups_to_read: VecDeque, + current_reader: Option, + builder: MemtableRowGroupReaderBuilder, +} + +impl BulkPartIter { + /// Creates a new [BulkPartIter]. + pub(crate) fn try_new( + context: BulkIterContextRef, + mut row_groups_to_read: VecDeque, + parquet_meta: Arc, + data: Bytes, + ) -> error::Result { + let projection_mask = ProjectionMask::roots( + parquet_meta.file_metadata().schema_descr(), + context.read_format().projection_indices().iter().copied(), + ); + + let builder = MemtableRowGroupReaderBuilder::try_new( + context.clone(), + projection_mask, + parquet_meta, + data, + )?; + + let init_reader = row_groups_to_read + .pop_front() + .map(|first_row_group| builder.build_row_group_reader(first_row_group, None)) + .transpose()? + .map(|r| PruneReader::new(context, r)); + Ok(Self { + row_groups_to_read, + current_reader: init_reader, + builder, + }) + } + + pub(crate) fn next_batch(&mut self) -> error::Result> { + let Some(current) = &mut self.current_reader else { + // All row group exhausted. + return Ok(None); + }; + + if let Some(batch) = current.next_batch()? { + return Ok(Some(batch)); + } + + // Previous row group exhausted, read next row group + while let Some(next_row_group) = self.row_groups_to_read.pop_front() { + current.reset(self.builder.build_row_group_reader(next_row_group, None)?); + if let Some(next_batch) = current.next_batch()? { + return Ok(Some(next_batch)); + } + } + Ok(None) + } +} + +impl Iterator for BulkPartIter { + type Item = error::Result; + + fn next(&mut self) -> Option { + self.next_batch().transpose() + } +} + +struct PruneReader { + context: BulkIterContextRef, + row_group_reader: MemtableRowGroupReader, +} + +//todo(hl): maybe we also need to support lastrow mode here. +impl PruneReader { + fn new(context: BulkIterContextRef, reader: MemtableRowGroupReader) -> Self { + Self { + context, + row_group_reader: reader, + } + } + + /// Iterates current inner reader until exhausted. + fn next_batch(&mut self) -> error::Result> { + while let Some(b) = self.row_group_reader.next_inner()? { + match self.prune(b)? { + Some(b) => { + return Ok(Some(b)); + } + None => { + continue; + } + } + } + Ok(None) + } + + /// Prunes batch according to filters. + fn prune(&mut self, batch: Batch) -> error::Result> { + //todo(hl): add metrics. + + // fast path + if self.context.base.filters.is_empty() { + return Ok(Some(batch)); + } + + let Some(batch_filtered) = self.context.base.precise_filter(batch)? else { + // the entire batch is filtered out + return Ok(None); + }; + if !batch_filtered.is_empty() { + Ok(Some(batch_filtered)) + } else { + Ok(None) + } + } + + fn reset(&mut self, reader: MemtableRowGroupReader) { + self.row_group_reader = reader; + } +} diff --git a/src/mito2/src/memtable/bulk/row_group_reader.rs b/src/mito2/src/memtable/bulk/row_group_reader.rs new file mode 100644 index 000000000000..14c3fbe68b83 --- /dev/null +++ b/src/mito2/src/memtable/bulk/row_group_reader.rs @@ -0,0 +1,189 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Range; +use std::sync::Arc; + +use bytes::Bytes; +use datatypes::arrow::array::RecordBatch; +use datatypes::arrow::error::ArrowError; +use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowGroups, RowSelection}; +use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask}; +use parquet::column::page::{PageIterator, PageReader}; +use parquet::file::metadata::ParquetMetaData; +use snafu::ResultExt; + +use crate::error; +use crate::error::ReadDataPartSnafu; +use crate::memtable::bulk::context::BulkIterContextRef; +use crate::sst::parquet::format::ReadFormat; +use crate::sst::parquet::reader::{RowGroupReaderBase, RowGroupReaderContext}; +use crate::sst::parquet::row_group::{ColumnChunkIterator, RowGroupBase}; +use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; + +/// Helper for reading specific row group inside Memtable Parquet parts. +// This is similar to [mito2::sst::parquet::row_group::InMemoryRowGroup] since +// it's a workaround for lacking of keyword generics. +pub struct MemtableRowGroupPageFetcher<'a> { + /// Shared structs for reading row group. + base: RowGroupBase<'a>, + bytes: Bytes, +} + +impl<'a> MemtableRowGroupPageFetcher<'a> { + pub(crate) fn create( + row_group_idx: usize, + parquet_meta: &'a ParquetMetaData, + bytes: Bytes, + ) -> Self { + let metadata = parquet_meta.row_group(row_group_idx); + let row_count = metadata.num_rows() as usize; + let page_locations = parquet_meta + .offset_index() + .map(|x| x[row_group_idx].as_slice()); + + Self { + base: RowGroupBase { + metadata, + page_locations, + row_count, + column_chunks: vec![None; metadata.columns().len()], + // the cached `column_uncompressed_pages` would never be used in Memtable readers. + column_uncompressed_pages: vec![None; metadata.columns().len()], + }, + bytes, + } + } + + /// Fetches column pages from memory file. + pub(crate) fn fetch(&mut self, projection: &ProjectionMask, selection: Option<&RowSelection>) { + if let Some((selection, page_locations)) = selection.zip(self.base.page_locations) { + // Selection provided. + let (fetch_ranges, page_start_offsets) = + self.base + .calc_sparse_read_ranges(projection, page_locations, selection); + if fetch_ranges.is_empty() { + return; + } + let chunk_data = self.fetch_bytes(&fetch_ranges); + + self.base + .assign_sparse_chunk(projection, chunk_data, page_start_offsets); + } else { + let fetch_ranges = self.base.calc_dense_read_ranges(projection); + if fetch_ranges.is_empty() { + // Nothing to fetch. + return; + } + let chunk_data = self.fetch_bytes(&fetch_ranges); + self.base.assign_dense_chunk(projection, chunk_data); + } + } + + fn fetch_bytes(&self, ranges: &[Range]) -> Vec { + ranges + .iter() + .map(|range| self.bytes.slice(range.start as usize..range.end as usize)) + .collect() + } + + /// Creates a page reader to read column at `i`. + fn column_page_reader(&self, i: usize) -> parquet::errors::Result> { + let reader = self.base.column_reader(i)?; + Ok(Box::new(reader)) + } +} + +impl RowGroups for MemtableRowGroupPageFetcher<'_> { + fn num_rows(&self) -> usize { + self.base.row_count + } + + fn column_chunks(&self, i: usize) -> parquet::errors::Result> { + Ok(Box::new(ColumnChunkIterator { + reader: Some(self.column_page_reader(i)), + })) + } +} + +impl RowGroupReaderContext for BulkIterContextRef { + fn map_result( + &self, + result: Result, ArrowError>, + ) -> error::Result> { + result.context(error::DecodeArrowRowGroupSnafu) + } + + fn read_format(&self) -> &ReadFormat { + self.as_ref().read_format() + } +} + +pub(crate) type MemtableRowGroupReader = RowGroupReaderBase; + +pub(crate) struct MemtableRowGroupReaderBuilder { + context: BulkIterContextRef, + projection: ProjectionMask, + parquet_metadata: Arc, + field_levels: FieldLevels, + data: Bytes, +} + +impl MemtableRowGroupReaderBuilder { + pub(crate) fn try_new( + context: BulkIterContextRef, + projection: ProjectionMask, + parquet_metadata: Arc, + data: Bytes, + ) -> error::Result { + let parquet_schema_desc = parquet_metadata.file_metadata().schema_descr(); + let hint = Some(context.read_format().arrow_schema().fields()); + let field_levels = + parquet_to_arrow_field_levels(parquet_schema_desc, projection.clone(), hint) + .context(ReadDataPartSnafu)?; + Ok(Self { + context, + projection, + parquet_metadata, + field_levels, + data, + }) + } + + /// Builds a reader to read the row group at `row_group_idx` from memory. + pub(crate) fn build_row_group_reader( + &self, + row_group_idx: usize, + row_selection: Option, + ) -> error::Result { + let mut row_group = MemtableRowGroupPageFetcher::create( + row_group_idx, + &self.parquet_metadata, + self.data.clone(), + ); + // Fetches data from memory part. Currently, row selection is not supported. + row_group.fetch(&self.projection, row_selection.as_ref()); + + // Builds the parquet reader. + // Now the row selection is None. + let reader = ParquetRecordBatchReader::try_new_with_row_groups( + &self.field_levels, + &row_group, + DEFAULT_READ_BATCH_SIZE, + row_selection, + ) + .context(ReadDataPartSnafu)?; + Ok(MemtableRowGroupReader::create(self.context.clone(), reader)) + } +} diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs index 1e2a6a5844c6..de96f8881e46 100644 --- a/src/mito2/src/read/last_row.rs +++ b/src/mito2/src/read/last_row.rs @@ -99,11 +99,8 @@ impl RowGroupLastRowCachedReader { return Self::new_miss(key, row_group_reader, None); }; if let Some(value) = cache_manager.get_selector_result(&key) { - let schema_matches = value.projection - == row_group_reader - .context() - .read_format() - .projection_indices(); + let schema_matches = + value.projection == row_group_reader.read_format().projection_indices(); if schema_matches { // Schema matches, use cache batches. Self::new_hit(value) @@ -218,29 +215,23 @@ impl RowGroupLastRowReader { }; // All last rows in row group are yielded, update cache. - self.update_cache(); + self.maybe_update_cache(); Ok(last_batch) } /// Updates row group's last row cache if cache manager is present. - fn update_cache(&mut self) { - if self.yielded_batches.is_empty() { - // we always expect that row groups yields batches. - return; + fn maybe_update_cache(&mut self) { + if let Some(cache) = &self.cache_manager { + if self.yielded_batches.is_empty() { + // we always expect that row groups yields batches. + return; + } + let value = Arc::new(SelectorResultValue { + result: std::mem::take(&mut self.yielded_batches), + projection: self.reader.read_format().projection_indices().to_vec(), + }); + cache.put_selector_result(self.key, value) } - let Some(cache) = &self.cache_manager else { - return; - }; - let value = Arc::new(SelectorResultValue { - result: std::mem::take(&mut self.yielded_batches), - projection: self - .reader - .context() - .read_format() - .projection_indices() - .to_vec(), - }); - cache.put_selector_result(self.key, value); } fn metrics(&self) -> &ReaderMetrics { diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 3dd53ba645f2..280d46b500df 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -27,11 +27,11 @@ pub(crate) mod file_range; pub mod format; pub(crate) mod helper; pub(crate) mod metadata; -mod page_reader; +pub(crate) mod page_reader; pub mod reader; pub mod row_group; mod row_selection; -mod stats; +pub(crate) mod stats; pub mod writer; /// Key of metadata in parquet SST. diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 39153fce8d96..5527752a8885 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -24,6 +24,7 @@ use async_trait::async_trait; use common_recordbatch::filter::SimpleFilterEvaluator; use common_telemetry::{debug, warn}; use datafusion_expr::Expr; +use datatypes::arrow::error::ArrowError; use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::ConcreteDataType; use itertools::Itertools; @@ -39,7 +40,8 @@ use table::predicate::Predicate; use crate::cache::CacheManagerRef; use crate::error::{ - ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadParquetSnafu, Result, + ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu, + ReadParquetSnafu, Result, }; use crate::metrics::{ PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL, @@ -207,8 +209,7 @@ impl ParquetReaderBuilder { let hint = Some(read_format.arrow_schema().fields()); let field_levels = parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint) - .context(ReadParquetSnafu { path: &file_path })?; - + .context(ReadDataPartSnafu)?; let row_groups = self .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics) .await; @@ -871,7 +872,7 @@ impl SimpleFilterContext { /// /// Returns None if the column to filter doesn't exist in the SST metadata or the /// expected metadata. - fn new_opt( + pub(crate) fn new_opt( sst_meta: &RegionMetadataRef, expected_meta: Option<&RegionMetadata>, expr: &Expr, @@ -1035,10 +1036,51 @@ impl ParquetReader { } } +/// RowGroupReaderContext represents the fields that cannot be shared +/// between different `RowGroupReader`s. +pub(crate) trait RowGroupReaderContext: Send { + fn map_result( + &self, + result: std::result::Result, ArrowError>, + ) -> Result>; + + fn read_format(&self) -> &ReadFormat; +} + +impl RowGroupReaderContext for FileRangeContextRef { + fn map_result( + &self, + result: std::result::Result, ArrowError>, + ) -> Result> { + result.context(ArrowReaderSnafu { + path: self.file_path(), + }) + } + + fn read_format(&self) -> &ReadFormat { + self.as_ref().read_format() + } +} + +/// [RowGroupReader] that reads from [FileRange]. +pub(crate) type RowGroupReader = RowGroupReaderBase; + +impl RowGroupReader { + /// Creates a new reader from file range. + pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self { + Self { + context, + reader, + batches: VecDeque::new(), + metrics: ReaderMetrics::default(), + } + } +} + /// Reader to read a row group of a parquet file. -pub struct RowGroupReader { - /// Context for file ranges. - context: FileRangeContextRef, +pub(crate) struct RowGroupReaderBase { + /// Context of [RowGroupReader] so adapts to different underlying implementation. + context: T, /// Inner parquet reader. reader: ParquetRecordBatchReader, /// Buffered batches to return. @@ -1047,9 +1089,12 @@ pub struct RowGroupReader { metrics: ReaderMetrics, } -impl RowGroupReader { +impl RowGroupReaderBase +where + T: RowGroupReaderContext, +{ /// Creates a new reader. - pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self { + pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self { Self { context, reader, @@ -1062,21 +1107,19 @@ impl RowGroupReader { pub(crate) fn metrics(&self) -> &ReaderMetrics { &self.metrics } - pub(crate) fn context(&self) -> &FileRangeContextRef { - &self.context + + /// Gets [ReadFormat] of underlying reader. + pub(crate) fn read_format(&self) -> &ReadFormat { + self.context.read_format() } /// Tries to fetch next [RecordBatch] from the reader. fn fetch_next_record_batch(&mut self) -> Result> { - self.reader.next().transpose().context(ArrowReaderSnafu { - path: self.context.file_path(), - }) + self.context.map_result(self.reader.next().transpose()) } -} -#[async_trait::async_trait] -impl BatchReader for RowGroupReader { - async fn next_batch(&mut self) -> Result> { + /// Returns the next [Batch]. + pub(crate) fn next_inner(&mut self) -> Result> { let scan_start = Instant::now(); if let Some(batch) = self.batches.pop_front() { self.metrics.num_rows += batch.num_rows(); @@ -1104,6 +1147,16 @@ impl BatchReader for RowGroupReader { } } +#[async_trait::async_trait] +impl BatchReader for RowGroupReaderBase +where + T: RowGroupReaderContext, +{ + async fn next_batch(&mut self) -> Result> { + self.next_inner() + } +} + #[cfg(test)] mod tests { use parquet::arrow::arrow_reader::RowSelector; diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 73382c06d9b3..dde78f39e4ee 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -38,25 +38,196 @@ use crate::sst::file::FileId; use crate::sst::parquet::helper::fetch_byte_ranges; use crate::sst::parquet::page_reader::RowGroupCachedReader; +pub(crate) struct RowGroupBase<'a> { + pub(crate) metadata: &'a RowGroupMetaData, + pub(crate) page_locations: Option<&'a [Vec]>, + /// Compressed page of each column. + pub(crate) column_chunks: Vec>>, + pub(crate) row_count: usize, + /// Row group level cached pages for each column. + /// + /// These pages are uncompressed pages of a row group. + /// `column_uncompressed_pages.len()` equals to `column_chunks.len()`. + pub(crate) column_uncompressed_pages: Vec>>, +} + +impl<'a> RowGroupBase<'a> { + pub(crate) fn new(parquet_meta: &'a ParquetMetaData, row_group_idx: usize) -> Self { + let metadata = parquet_meta.row_group(row_group_idx); + // `page_locations` is always `None` if we don't set + // [with_page_index()](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index) + // to `true`. + let page_locations = parquet_meta + .offset_index() + .map(|x| x[row_group_idx].as_slice()); + + Self { + metadata, + page_locations, + column_chunks: vec![None; metadata.columns().len()], + row_count: metadata.num_rows() as usize, + column_uncompressed_pages: vec![None; metadata.columns().len()], + } + } + + pub(crate) fn calc_sparse_read_ranges( + &self, + projection: &ProjectionMask, + page_locations: &[Vec], + selection: &RowSelection, + ) -> (Vec>, Vec>) { + // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the + // `RowSelection` + let mut page_start_offsets: Vec> = vec![]; + let ranges = self + .column_chunks + .iter() + .zip(self.metadata.columns()) + .enumerate() + .filter(|&(idx, (chunk, _chunk_meta))| chunk.is_none() && projection.leaf_included(idx)) + .flat_map(|(idx, (_chunk, chunk_meta))| { + // If the first page does not start at the beginning of the column, + // then we need to also fetch a dictionary page. + let mut ranges = vec![]; + let (start, _len) = chunk_meta.byte_range(); + match page_locations[idx].first() { + Some(first) if first.offset as u64 != start => { + ranges.push(start..first.offset as u64); + } + _ => (), + } + + ranges.extend( + selection + .scan_ranges(&page_locations[idx]) + .iter() + .map(|range| range.start as u64..range.end as u64), + ); + page_start_offsets.push(ranges.iter().map(|range| range.start as usize).collect()); + + ranges + }) + .collect::>(); + (ranges, page_start_offsets) + } + + pub(crate) fn assign_sparse_chunk( + &mut self, + projection: &ProjectionMask, + data: Vec, + page_start_offsets: Vec>, + ) { + let mut page_start_offsets = page_start_offsets.into_iter(); + let mut chunk_data = data.into_iter(); + + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(offsets) = page_start_offsets.next() { + let mut chunks = Vec::with_capacity(offsets.len()); + for _ in 0..offsets.len() { + chunks.push(chunk_data.next().unwrap()); + } + + *chunk = Some(Arc::new(ColumnChunkData::Sparse { + length: self.metadata.column(idx).byte_range().1 as usize, + data: offsets.into_iter().zip(chunks).collect(), + })) + } + } + } + + pub(crate) fn calc_dense_read_ranges(&self, projection: &ProjectionMask) -> Vec> { + self.column_chunks + .iter() + .zip(&self.column_uncompressed_pages) + .enumerate() + .filter(|&(idx, (chunk, uncompressed_pages))| { + // Don't need to fetch column data if we already cache the column's pages. + chunk.is_none() && projection.leaf_included(idx) && uncompressed_pages.is_none() + }) + .map(|(idx, (_chunk, _pages))| { + let column = self.metadata.column(idx); + let (start, length) = column.byte_range(); + start..(start + length) + }) + .collect::>() + } + + /// Assigns uncompressed chunk binary data to [RowGroupBase::column_chunks] + /// and returns the chunk offset and binary data assigned. + pub(crate) fn assign_dense_chunk( + &mut self, + projection: &ProjectionMask, + chunk_data: Vec, + ) -> Vec<(usize, Bytes)> { + let mut chunk_data = chunk_data.into_iter(); + let mut res = vec![]; + + for (idx, (chunk, row_group_pages)) in self + .column_chunks + .iter_mut() + .zip(&self.column_uncompressed_pages) + .enumerate() + { + if chunk.is_some() || !projection.leaf_included(idx) || row_group_pages.is_some() { + continue; + } + + // Get the fetched page. + let Some(data) = chunk_data.next() else { + continue; + }; + + let column = self.metadata.column(idx); + res.push((idx, data.clone())); + *chunk = Some(Arc::new(ColumnChunkData::Dense { + offset: column.byte_range().0 as usize, + data, + })); + } + res + } + + /// Create [PageReader] from [RowGroupBase::column_chunks] + pub(crate) fn column_reader( + &self, + col_idx: usize, + ) -> Result> { + let page_reader = match &self.column_chunks[col_idx] { + None => { + return Err(ParquetError::General(format!( + "Invalid column index {col_idx}, column was not fetched" + ))) + } + Some(data) => { + let page_locations = self.page_locations.map(|index| index[col_idx].clone()); + SerializedPageReader::new( + data.clone(), + self.metadata.column(col_idx), + self.row_count, + page_locations, + )? + } + }; + + // This column don't cache uncompressed pages. + Ok(page_reader) + } +} + /// An in-memory collection of column chunks pub struct InMemoryRowGroup<'a> { - metadata: &'a RowGroupMetaData, - page_locations: Option<&'a [Vec]>, - /// Compressed page of each column. - column_chunks: Vec>>, - row_count: usize, region_id: RegionId, file_id: FileId, row_group_idx: usize, cache_manager: Option, - /// Row group level cached pages for each column. - /// - /// These pages are uncompressed pages of a row group. - /// `column_uncompressed_pages.len()` equals to `column_chunks.len()`. - column_uncompressed_pages: Vec>>, file_path: &'a str, /// Object store. object_store: ObjectStore, + base: RowGroupBase<'a>, } impl<'a> InMemoryRowGroup<'a> { @@ -73,24 +244,12 @@ impl<'a> InMemoryRowGroup<'a> { file_path: &'a str, object_store: ObjectStore, ) -> Self { - let metadata = parquet_meta.row_group(row_group_idx); - // `page_locations` is always `None` if we don't set - // [with_page_index()](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index) - // to `true`. - let page_locations = parquet_meta - .offset_index() - .map(|x| x[row_group_idx].as_slice()); - Self { - metadata, - row_count: metadata.num_rows() as usize, - column_chunks: vec![None; metadata.columns().len()], - page_locations, + base: RowGroupBase::new(parquet_meta, row_group_idx), region_id, file_id, row_group_idx, cache_manager, - column_uncompressed_pages: vec![None; metadata.columns().len()], file_path, object_store, } @@ -102,65 +261,15 @@ impl<'a> InMemoryRowGroup<'a> { projection: &ProjectionMask, selection: Option<&RowSelection>, ) -> Result<()> { - if let Some((selection, page_locations)) = selection.zip(self.page_locations) { - // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the - // `RowSelection` - let mut page_start_offsets: Vec> = vec![]; - - let fetch_ranges = self - .column_chunks - .iter() - .zip(self.metadata.columns()) - .enumerate() - .filter(|&(idx, (chunk, _chunk_meta))| { - chunk.is_none() && projection.leaf_included(idx) - }) - .flat_map(|(idx, (_chunk, chunk_meta))| { - // If the first page does not start at the beginning of the column, - // then we need to also fetch a dictionary page. - let mut ranges = vec![]; - let (start, _len) = chunk_meta.byte_range(); - match page_locations[idx].first() { - Some(first) if first.offset as u64 != start => { - ranges.push(start..first.offset as u64); - } - _ => (), - } - - ranges.extend( - selection - .scan_ranges(&page_locations[idx]) - .iter() - .map(|range| range.start as u64..range.end as u64), - ); - page_start_offsets - .push(ranges.iter().map(|range| range.start as usize).collect()); - - ranges - }) - .collect::>(); - - let mut chunk_data = self.fetch_bytes(&fetch_ranges).await?.into_iter(); - - let mut page_start_offsets = page_start_offsets.into_iter(); - - for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { - if chunk.is_some() || !projection.leaf_included(idx) { - continue; - } - - if let Some(offsets) = page_start_offsets.next() { - let mut chunks = Vec::with_capacity(offsets.len()); - for _ in 0..offsets.len() { - chunks.push(chunk_data.next().unwrap()); - } - - *chunk = Some(Arc::new(ColumnChunkData::Sparse { - length: self.metadata.column(idx).byte_range().1 as usize, - data: offsets.into_iter().zip(chunks).collect(), - })) - } - } + if let Some((selection, page_locations)) = selection.zip(self.base.page_locations) { + let (fetch_ranges, page_start_offsets) = + self.base + .calc_sparse_read_ranges(projection, page_locations, selection); + + let chunk_data = self.fetch_bytes(&fetch_ranges).await?; + // Assign sparse chunk data to base. + self.base + .assign_sparse_chunk(projection, chunk_data, page_start_offsets); } else { // Now we only use cache in dense chunk data. self.fetch_pages_from_cache(projection); @@ -169,46 +278,24 @@ impl<'a> InMemoryRowGroup<'a> { // is a synchronous, CPU-bound operation. yield_now().await; - let fetch_ranges = self - .column_chunks - .iter() - .zip(&self.column_uncompressed_pages) - .enumerate() - .filter(|&(idx, (chunk, uncompressed_pages))| { - // Don't need to fetch column data if we already cache the column's pages. - chunk.is_none() && projection.leaf_included(idx) && uncompressed_pages.is_none() - }) - .map(|(idx, (_chunk, _pages))| { - let column = self.metadata.column(idx); - let (start, length) = column.byte_range(); - start..(start + length) - }) - .collect::>(); + // Calculate ranges to read. + let fetch_ranges = self.base.calc_dense_read_ranges(projection); if fetch_ranges.is_empty() { // Nothing to fetch. return Ok(()); } - let mut chunk_data = self.fetch_bytes(&fetch_ranges).await?.into_iter(); + // Fetch data with ranges + let chunk_data = self.fetch_bytes(&fetch_ranges).await?; - for (idx, (chunk, row_group_pages)) in self - .column_chunks - .iter_mut() - .zip(&self.column_uncompressed_pages) - .enumerate() - { - if chunk.is_some() || !projection.leaf_included(idx) || row_group_pages.is_some() { - continue; - } - - // Get the fetched page. - let Some(data) = chunk_data.next() else { - continue; - }; + // Assigns fetched data to base. + let assigned_columns = self.base.assign_dense_chunk(projection, chunk_data); - let column = self.metadata.column(idx); - if let Some(cache) = &self.cache_manager { + // Put fetched data to cache if necessary. + if let Some(cache) = &self.cache_manager { + for (col_idx, data) in assigned_columns { + let column = self.base.metadata.column(col_idx); if !cache_uncompressed_pages(column) { // For columns that have multiple uncompressed pages, we only cache the compressed page // to save memory. @@ -216,17 +303,12 @@ impl<'a> InMemoryRowGroup<'a> { self.region_id, self.file_id, self.row_group_idx, - idx, + col_idx, ); cache .put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone()))); } } - - *chunk = Some(Arc::new(ColumnChunkData::Dense { - offset: column.byte_range().0 as usize, - data, - })); } } @@ -237,7 +319,8 @@ impl<'a> InMemoryRowGroup<'a> { /// If the page is in the cache, sets the column chunk or `column_uncompressed_pages` for the column. fn fetch_pages_from_cache(&mut self, projection: &ProjectionMask) { let _timer = READ_STAGE_FETCH_PAGES.start_timer(); - self.column_chunks + self.base + .column_chunks .iter_mut() .enumerate() .filter(|(idx, chunk)| chunk.is_none() && projection.leaf_included(*idx)) @@ -245,7 +328,7 @@ impl<'a> InMemoryRowGroup<'a> { let Some(cache) = &self.cache_manager else { return; }; - let column = self.metadata.column(idx); + let column = self.base.metadata.column(idx); if cache_uncompressed_pages(column) { // Fetches uncompressed pages for the row group. let page_key = PageKey::new_uncompressed( @@ -254,7 +337,7 @@ impl<'a> InMemoryRowGroup<'a> { self.row_group_idx, idx, ); - self.column_uncompressed_pages[idx] = cache.get_pages(&page_key); + self.base.column_uncompressed_pages[idx] = cache.get_pages(&page_key); } else { // Fetches the compressed page from the cache. let page_key = PageKey::new_compressed( @@ -308,34 +391,19 @@ impl<'a> InMemoryRowGroup<'a> { /// Creates a page reader to read column at `i`. fn column_page_reader(&self, i: usize) -> Result> { - if let Some(cached_pages) = &self.column_uncompressed_pages[i] { + if let Some(cached_pages) = &self.base.column_uncompressed_pages[i] { debug_assert!(!cached_pages.row_group.is_empty()); // Hits the row group level page cache. return Ok(Box::new(RowGroupCachedReader::new(&cached_pages.row_group))); } - let page_reader = match &self.column_chunks[i] { - None => { - return Err(ParquetError::General(format!( - "Invalid column index {i}, column was not fetched" - ))) - } - Some(data) => { - let page_locations = self.page_locations.map(|index| index[i].clone()); - SerializedPageReader::new( - data.clone(), - self.metadata.column(i), - self.row_count, - page_locations, - )? - } - }; + let page_reader = self.base.column_reader(i)?; let Some(cache) = &self.cache_manager else { return Ok(Box::new(page_reader)); }; - let column = self.metadata.column(i); + let column = self.base.metadata.column(i); if cache_uncompressed_pages(column) { // This column use row group level page cache. // We collect all pages and put them into the cache. @@ -362,7 +430,7 @@ fn cache_uncompressed_pages(column: &ColumnChunkMetaData) -> bool { impl RowGroups for InMemoryRowGroup<'_> { fn num_rows(&self) -> usize { - self.row_count + self.base.row_count } fn column_chunks(&self, i: usize) -> Result> { @@ -430,8 +498,8 @@ impl ChunkReader for ColumnChunkData { } /// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`] -struct ColumnChunkIterator { - reader: Option>>, +pub(crate) struct ColumnChunkIterator { + pub(crate) reader: Option>>, } impl Iterator for ColumnChunkIterator {