From b46386d52aecf88a71b09c4f29719e4840d5b93b Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 19 Feb 2024 22:57:25 +0800 Subject: [PATCH 1/6] feat: data buffer and related structs --- src/mito2/src/error.rs | 12 +- src/mito2/src/memtable/merge_tree/data.rs | 686 +++++++++++++++++++++- src/mito2/src/test_util/memtable_util.rs | 119 +++- 3 files changed, 811 insertions(+), 6 deletions(-) diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 468c1f8ed921..f141857a5322 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -37,8 +37,8 @@ use crate::worker::WorkerId; #[stack_trace_debug] pub enum Error { #[snafu(display( - "Failed to set region {} to writable, it was expected to replayed to {}, but actually replayed to {}", - region_id, expected_last_entry_id, replayed_last_entry_id + "Failed to set region {} to writable, it was expected to replayed to {}, but actually replayed to {}", + region_id, expected_last_entry_id, replayed_last_entry_id ))] UnexpectedReplay { location: Location, @@ -559,6 +559,13 @@ pub enum Error { #[snafu(display("Encode null value"))] IndexEncodeNull { location: Location }, + + #[snafu(display("Failed to encode memtable to Parquet bytes"))] + EncodeMemtable { + #[snafu(source)] + error: parquet::errors::ParquetError, + location: Location, + }, } pub type Result = std::result::Result; @@ -662,6 +669,7 @@ impl ErrorExt for Error { FilterRecordBatch { source, .. } => source.status_code(), Upload { .. } => StatusCode::StorageUnavailable, BiError { .. } => StatusCode::Internal, + EncodeMemtable { .. } => StatusCode::Internal, } } diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 3f2627e9d46c..79029fcb680d 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -14,8 +14,690 @@ //! Data part of a shard. -/// Buffer to store columns not in the primary key. -pub struct DataBuffer {} +use std::cmp::{Ordering, Reverse}; +use std::ops::Range; +use std::sync::Arc; + +use bytes::Bytes; +use datatypes::arrow; +use datatypes::arrow::array::{RecordBatch, UInt16Array, UInt32Array}; +use datatypes::arrow::datatypes::{Field, Schema, SchemaRef}; +use datatypes::data_type::DataType; +use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder, VectorRef}; +use datatypes::schema::ColumnSchema; +use datatypes::types::TimestampType; +use datatypes::vectors::{ + TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector, + TimestampSecondVector, UInt16Vector, UInt16VectorBuilder, UInt64Vector, UInt64VectorBuilder, + UInt8VectorBuilder, +}; +use parquet::arrow::ArrowWriter; +use parquet::file::properties::WriterProperties; +use snafu::ResultExt; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME}; + +use crate::error; +use crate::error::Result; +use crate::memtable::key_values::KeyValue; +use crate::memtable::merge_tree::{PkId, PkIndex, ShardId}; + +const PK_INDEX_COLUMN_NAME: &str = "pk_index"; + +/// Data part batches returns by `DataParts::read`. +#[derive(Debug, Clone)] +pub struct DataBatch { + /// Primary key index of this batch. + pk_index: PkIndex, + /// Record batch of data. + rb: RecordBatch, + /// Range of current primary key inside record batch + range: Range, +} + +impl DataBatch { + pub(crate) fn pk_index(&self) -> PkIndex { + self.pk_index + } + + pub(crate) fn record_batch(&self) -> &RecordBatch { + &self.rb + } + + pub(crate) fn range(&self) -> Range { + self.range.clone() + } + + pub(crate) fn as_record_batch(&self) -> RecordBatch { + self.rb.slice(self.range.start, self.range.len()) + } +} + +/// Buffer for the value part (pk_index, ts, sequence, op_type, field columns) in a shard. +pub struct DataBuffer { + shard_id: ShardId, + metadata: RegionMetadataRef, + /// Schema for data part (primary keys are replaced with pk_index) + data_part_schema: SchemaRef, + /// Data types for field columns. + field_types: Vec, + /// Builder for primary key index. + pk_index_builder: UInt16VectorBuilder, + /// Builder for timestamp column. + ts_builder: Box, + /// Builder for sequence column. + sequence_builder: UInt64VectorBuilder, + /// Builder for op_type column. + op_type_builder: UInt8VectorBuilder, + /// Builders for field columns. + field_builders: Vec>>, + /// Threshold for freezing data buffer. + freeze_threshold: usize, +} + +impl DataBuffer { + pub fn with_capacity( + metadata: RegionMetadataRef, + init_capacity: usize, + freeze_threshold: usize, + ) -> Self { + let ts_builder = metadata + .time_index_column() + .column_schema + .data_type + .create_mutable_vector(init_capacity); + + let pk_id_builder = UInt16VectorBuilder::with_capacity(init_capacity); + let sequence_builder = UInt64VectorBuilder::with_capacity(init_capacity); + let op_type_builder = UInt8VectorBuilder::with_capacity(init_capacity); + + let field_types = metadata + .field_columns() + .map(|c| c.column_schema.data_type.clone()) + .collect::>(); + let field_builders = (0..field_types.len()).map(|_| None).collect(); + + let data_part_schema = memtable_schema_to_encoded_schema(&metadata); + Self { + shard_id: 0, + metadata, + data_part_schema, + field_types, + pk_index_builder: pk_id_builder, + ts_builder, + sequence_builder, + op_type_builder, + field_builders, + freeze_threshold, + } + } + + /// Writes a row to data buffer. + pub fn write_row(&mut self, pk_id: PkId, kv: KeyValue) -> bool { + self.ts_builder.push_value_ref(kv.timestamp()); + self.pk_index_builder.push(Some(pk_id.pk_index)); + self.sequence_builder.push(Some(kv.sequence())); + self.op_type_builder.push(Some(kv.op_type() as u8)); + + debug_assert_eq!(self.field_builders.len(), kv.num_fields()); + + for (idx, field) in kv.fields().enumerate() { + self.field_builders[idx] + .get_or_insert_with(|| { + let mut builder = + self.field_types[idx].create_mutable_vector(self.ts_builder.len()); + builder.push_nulls(self.ts_builder.len() - 1); + builder + }) + .push_value_ref(field); + } + + self.ts_builder.len() >= self.freeze_threshold + } + + /// Freezes `DataBuffer` to bytes. Use `pk_weights` to convert pk_id to pk sort order. + /// `freeze` clears the buffers of builders. + pub fn freeze(&mut self, _pk_weights: &[u16]) -> Result { + // we need distinguish between `freeze` in `ShardWriter` And `Shard`. + todo!() + } + + /// Reads batches from data buffer without resetting builder's buffers. + pub fn iter(&mut self, pk_weights: &[u16]) -> Result { + let batch = + data_buffer_to_record_batches(self.data_part_schema.clone(), self, pk_weights, true)?; + DataBufferIter::new(batch) + } + + /// Returns num of rows in data buffer. + pub fn num_rows(&self) -> usize { + self.ts_builder.len() + } + + /// Returns whether the buffer is empty. + pub fn is_empty(&self) -> bool { + self.num_rows() == 0 + } +} + +/// Converts `DataBuffer` to record batches, with rows sorted according to pk_weights. +fn data_buffer_to_record_batches( + schema: SchemaRef, + buffer: &mut DataBuffer, + pk_weights: &[u16], + keep_data: bool, +) -> Result { + let num_rows = buffer.ts_builder.len(); + + let (pk_index_v, ts_v, sequence_v, op_type_v) = if keep_data { + ( + buffer.pk_index_builder.finish_cloned(), + buffer.ts_builder.to_vector_cloned(), + buffer.sequence_builder.finish_cloned(), + buffer.op_type_builder.finish_cloned(), + ) + } else { + ( + buffer.pk_index_builder.finish(), + buffer.ts_builder.to_vector(), + buffer.sequence_builder.finish(), + buffer.op_type_builder.finish(), + ) + }; + + let mut rows = build_rows_to_sort(pk_weights, &pk_index_v, &ts_v, &sequence_v); + + // sort and dedup + rows.sort_unstable_by(|l, r| l.1.cmp(&r.1)); + rows.dedup_by(|l, r| l.1.timestamp == r.1.timestamp); + let indices_to_take = UInt32Array::from_iter_values(rows.into_iter().map(|v| v.0 as u32)); + + let mut columns = Vec::with_capacity(4 + buffer.field_builders.len()); + + columns.push( + arrow::compute::take(&pk_index_v.as_arrow(), &indices_to_take, None) + .context(error::ComputeArrowSnafu)?, + ); + + columns.push( + arrow::compute::take(&ts_v.to_arrow_array(), &indices_to_take, None) + .context(error::ComputeArrowSnafu)?, + ); + + columns.push( + arrow::compute::take(&sequence_v.as_arrow(), &indices_to_take, None) + .context(error::ComputeArrowSnafu)?, + ); + + columns.push( + arrow::compute::take(&op_type_v.as_arrow(), &indices_to_take, None) + .context(error::ComputeArrowSnafu)?, + ); + + for (idx, c) in buffer.field_builders.iter_mut().enumerate() { + let array = match c { + None => { + let mut single_null = buffer.field_types[idx].create_mutable_vector(num_rows); + single_null.push_nulls(num_rows); + single_null.to_vector().to_arrow_array() + } + Some(v) => { + if keep_data { + v.to_vector_cloned().to_arrow_array() + } else { + v.to_vector().to_arrow_array() + } + } + }; + + columns.push( + arrow::compute::take(&array, &indices_to_take, None) + .context(error::ComputeArrowSnafu)?, + ); + } + + RecordBatch::try_new(schema, columns).context(error::NewRecordBatchSnafu) +} + +#[derive(Debug)] +pub(crate) struct DataBufferIter { + batch: RecordBatch, + offset: usize, + current_data_batch: Option, +} + +impl DataBufferIter { + pub(crate) fn new(batch: RecordBatch) -> Result { + let mut iter = Self { + batch, + offset: 0, + current_data_batch: None, + }; + iter.next()?; // fill data batch for comparison and merge. + Ok(iter) + } + + pub(crate) fn is_valid(&self) -> bool { + self.current_data_batch.is_some() + } + + /// # Panics + /// If Current iterator is not exhausted. + pub(crate) fn current_data_batch(&self) -> DataBatch { + self.current_data_batch.as_ref().unwrap().clone() + } + + /// # Panics + /// If Current iterator is not exhausted. + pub(crate) fn current_pk_index(&self) -> PkIndex { + self.current_data_batch.as_ref().unwrap().pk_index + } + + /// Advances iterator to next data batch. + pub(crate) fn next(&mut self) -> Result<()> { + if self.offset >= self.batch.num_rows() { + self.current_data_batch = None; + return Ok(()); + } + let pk_index_array = pk_index_array(&self.batch); + if let Some((next_pk, range)) = search_next_pk_range(pk_index_array, self.offset) { + self.offset = range.end; + self.current_data_batch = Some(DataBatch { + pk_index: next_pk, + rb: self.batch.clone(), + range, + }) + } else { + self.current_data_batch = None; + } + Ok(()) + } +} + +/// Gets `pk_index` array from record batch. +/// # Panics +/// If pk index column is not the first column or the type is not `UInt16Array`. +fn pk_index_array(batch: &RecordBatch) -> &UInt16Array { + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() +} + +/// Searches for next pk index, and it's offset range in a sorted `UInt16Array`. +fn search_next_pk_range(array: &UInt16Array, start: usize) -> Option<(PkIndex, Range)> { + let num_rows = array.len(); + if start >= num_rows { + return None; + } + + let next_pk = array.value(start); + for idx in start..num_rows { + if array.value(idx) != next_pk { + return Some((next_pk, start..idx)); + } + } + Some((next_pk, start..num_rows)) +} + +#[derive(Eq, PartialEq)] +struct InnerKey { + pk_weight: u16, + timestamp: i64, + sequence: u64, +} + +impl PartialOrd for InnerKey { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for InnerKey { + fn cmp(&self, other: &Self) -> Ordering { + (self.pk_weight, self.timestamp, Reverse(self.sequence)).cmp(&( + other.pk_weight, + other.timestamp, + Reverse(other.sequence), + )) + } +} + +fn build_rows_to_sort( + pk_weights: &[u16], + pk_index: &UInt16Vector, + ts: &VectorRef, + sequence: &UInt64Vector, +) -> Vec<(usize, InnerKey)> { + let ts_values = match ts.data_type() { + ConcreteDataType::Timestamp(t) => match t { + TimestampType::Second(_) => ts + .as_any() + .downcast_ref::() + .unwrap() + .as_arrow() + .values(), + TimestampType::Millisecond(_) => ts + .as_any() + .downcast_ref::() + .unwrap() + .as_arrow() + .values(), + TimestampType::Microsecond(_) => ts + .as_any() + .downcast_ref::() + .unwrap() + .as_arrow() + .values(), + TimestampType::Nanosecond(_) => ts + .as_any() + .downcast_ref::() + .unwrap() + .as_arrow() + .values(), + }, + other => unreachable!("Unexpected type {:?}", other), + }; + let pk_index_values = pk_index.as_arrow().values(); + let sequence_values = sequence.as_arrow().values(); + debug_assert_eq!(ts_values.len(), pk_index_values.len()); + debug_assert_eq!(ts_values.len(), sequence_values.len()); + + ts_values + .iter() + .zip(pk_index_values.iter()) + .zip(sequence_values.iter()) + .enumerate() + .map(|(idx, ((timestamp, pk_index), sequence))| { + ( + idx, + InnerKey { + timestamp: *timestamp, + pk_weight: pk_weights[*pk_index as usize], + sequence: *sequence, + }, + ) + }) + .collect() +} + +fn memtable_schema_to_encoded_schema(schema: &RegionMetadataRef) -> SchemaRef { + use datatypes::arrow::datatypes::DataType; + let ColumnSchema { + name: ts_name, + data_type: ts_type, + .. + } = &schema.time_index_column().column_schema; + + let mut fields = vec![ + Field::new(PK_INDEX_COLUMN_NAME, DataType::UInt16, false), + Field::new(ts_name, ts_type.as_arrow_type(), false), + Field::new(SEQUENCE_COLUMN_NAME, DataType::UInt64, false), + Field::new(OP_TYPE_COLUMN_NAME, DataType::UInt8, false), + ]; + + fields.extend(schema.field_columns().map(|c| { + Field::new( + &c.column_schema.name, + c.column_schema.data_type.as_arrow_type(), + c.column_schema.is_nullable(), + ) + })); + + Arc::new(Schema::new(fields)) +} + +struct DataPartEncoder<'a> { + schema: SchemaRef, + pk_weights: &'a [u16], + row_group_size: Option, +} + +impl<'a> DataPartEncoder<'a> { + pub fn new( + metadata: &RegionMetadataRef, + pk_weights: &'a [u16], + row_group_size: Option, + ) -> DataPartEncoder<'a> { + let schema = memtable_schema_to_encoded_schema(metadata); + Self { + schema, + pk_weights, + row_group_size, + } + } + + fn writer_props(&self) -> Option { + self.row_group_size.map(|size| { + WriterProperties::builder() + .set_max_row_group_size(size) + .build() + }) + } + pub fn write(&self, source: &mut DataBuffer) -> Result { + let mut bytes = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut bytes, self.schema.clone(), self.writer_props()) + .context(error::EncodeMemtableSnafu)?; + let rb = + data_buffer_to_record_batches(self.schema.clone(), source, self.pk_weights, false)?; + writer.write(&rb).context(error::EncodeMemtableSnafu)?; + let _file_meta = writer.close().context(error::EncodeMemtableSnafu)?; + Ok(Bytes::from(bytes)) + } +} + +/// Format of immutable data part. +pub enum DataPart { + Parquet(Bytes), +} + +impl DataPart { + fn is_empty(&self) -> bool { + match self { + DataPart::Parquet(data) => data.is_empty(), + } + } +} /// Data parts under a shard. pub struct DataParts {} + +#[cfg(test)] +mod tests { + use datafusion::arrow::array::Float64Array; + use datatypes::arrow::array::{TimestampMillisecondArray, UInt16Array}; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::data_type::AsBytes; + + use super::*; + use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test}; + + fn check_test_data_buffer_to_record_batches(keep_data: bool) { + let meta = metadata_for_test(); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, usize::MAX); + + write_rows_to_buffer(&mut buffer, &meta, 0, vec![1, 2], vec![Some(0.1), None], 1); + write_rows_to_buffer(&mut buffer, &meta, 1, vec![1, 2], vec![Some(1.1), None], 2); + write_rows_to_buffer(&mut buffer, &meta, 0, vec![2], vec![Some(1.1)], 3); + assert_eq!(5, buffer.num_rows()); + let schema = memtable_schema_to_encoded_schema(&meta); + let batch = data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], keep_data).unwrap(); + + assert_eq!( + vec![1, 2, 1, 2], + batch + .column_by_name("ts") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect::>() + ); + + assert_eq!( + vec![1, 1, 0, 0], + batch + .column_by_name(PK_INDEX_COLUMN_NAME) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect::>() + ); + + assert_eq!( + vec![Some(1.1), None, Some(0.1), Some(1.1)], + batch + .column_by_name("v1") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .collect::>() + ); + + if keep_data { + assert_eq!(5, buffer.num_rows()); + } else { + assert_eq!(0, buffer.num_rows()); + } + } + + #[test] + fn test_data_buffer_to_record_batches() { + check_test_data_buffer_to_record_batches(true); + check_test_data_buffer_to_record_batches(false); + } + + fn write_rows_to_buffer( + buffer: &mut DataBuffer, + schema: &RegionMetadataRef, + pk_index: u16, + ts: Vec, + v0: Vec>, + sequence: u64, + ) { + let kvs = build_key_values_with_ts_seq_values( + schema, + "whatever".to_string(), + 1, + ts.into_iter(), + v0.into_iter(), + sequence, + ); + + for kv in kvs.iter() { + buffer.write_row( + PkId { + shard_id: 0, + pk_index, + }, + kv, + ); + } + } + + #[test] + fn test_encode_data_buffer() { + let meta = metadata_for_test(); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, usize::MAX); + + // write rows with null values. + write_rows_to_buffer( + &mut buffer, + &meta, + 2, + vec![0, 1, 2], + vec![Some(1.0), None, Some(3.0)], + 2, + ); + + assert_eq!(3, buffer.num_rows()); + + write_rows_to_buffer(&mut buffer, &meta, 2, vec![1], vec![Some(2.0)], 3); + + assert_eq!(4, buffer.num_rows()); + + let encoder = DataPartEncoder::new(&meta, &[0, 1, 2], None); + let encoded = encoder.write(&mut buffer).unwrap(); + let s = String::from_utf8_lossy(encoded.as_bytes()); + assert!(s.starts_with("PAR1")); + assert!(s.ends_with("PAR1")); + + let builder = ParquetRecordBatchReaderBuilder::try_new(encoded).unwrap(); + let mut reader = builder.build().unwrap(); + let batch = reader.next().unwrap().unwrap(); + assert_eq!(3, batch.num_rows()); + } + + fn check_buffer_values_equal(iter: &mut DataBufferIter, expected_values: &[Vec]) { + let mut output = Vec::with_capacity(expected_values.len()); + while iter.is_valid() { + let batch = iter.current_data_batch().as_record_batch(); + let values = batch + .column_by_name("v1") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect::>(); + output.push(values); + iter.next().unwrap(); + } + assert_eq!(expected_values, output); + } + + #[test] + fn test_search_next_pk_range() { + let a = UInt16Array::from_iter_values([1, 1, 3, 3, 4, 6]); + assert_eq!((1, 0..2), search_next_pk_range(&a, 0).unwrap()); + assert_eq!((3, 2..4), search_next_pk_range(&a, 2).unwrap()); + assert_eq!((4, 4..5), search_next_pk_range(&a, 4).unwrap()); + assert_eq!((6, 5..6), search_next_pk_range(&a, 5).unwrap()); + + assert_eq!(None, search_next_pk_range(&a, 6)); + } + + #[test] + fn test_iter_data_buffer() { + let meta = metadata_for_test(); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, usize::MAX); + + write_rows_to_buffer( + &mut buffer, + &meta, + 3, + vec![1, 2, 3], + vec![Some(1.1), Some(2.1), Some(3.1)], + 3, + ); + + write_rows_to_buffer( + &mut buffer, + &meta, + 2, + vec![0, 1, 2], + vec![Some(1.0), Some(2.0), Some(3.0)], + 2, + ); + + let mut iter = buffer.iter(&[0, 1, 3, 2]).unwrap(); + check_buffer_values_equal(&mut iter, &[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0]]); + } + + #[test] + fn test_iter_empty_data_buffer() { + let meta = metadata_for_test(); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, usize::MAX); + let mut iter = buffer.iter(&[0, 1, 3, 2]).unwrap(); + check_buffer_values_equal(&mut iter, &[]); + } +} diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 22dca01156e7..7e761cad771a 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -17,8 +17,13 @@ use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; -use store_api::metadata::RegionMetadataRef; -use store_api::storage::ColumnId; +use api::helper::ColumnDataTypeWrapper; +use api::v1::value::ValueData; +use api::v1::{Row, Rows, SemanticType}; +use datatypes::data_type::ConcreteDataType; +use datatypes::schema::ColumnSchema; +use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef}; +use store_api::storage::{ColumnId, RegionId, SequenceNumber}; use table::predicate::Predicate; use crate::error::Result; @@ -83,3 +88,113 @@ impl MemtableBuilder for EmptyMemtableBuilder { )) } } + +/// Creates a region metadata to test memtable with default pk. +/// +/// The schema is `k0, k1, ts, v0, v1` and pk is `k0, k1`. +pub(crate) fn metadata_for_test() -> RegionMetadataRef { + metadata_with_primary_key(vec![0, 1]) +} + +/// Creates a region metadata to test memtable and specific primary key. +/// +/// The schema is `k0, k1, ts, v0, v1`. +pub(crate) fn metadata_with_primary_key(primary_key: Vec) -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false), + semantic_type: semantic_type_of_column(0, &primary_key), + column_id: 0, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false), + semantic_type: semantic_type_of_column(1, &primary_key), + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true), + semantic_type: semantic_type_of_column(3, &primary_key), + column_id: 3, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true), + semantic_type: semantic_type_of_column(4, &primary_key), + column_id: 4, + }) + .primary_key(primary_key); + let region_metadata = builder.build().unwrap(); + Arc::new(region_metadata) +} + +fn semantic_type_of_column(column_id: ColumnId, primary_key: &[ColumnId]) -> SemanticType { + if primary_key.contains(&column_id) { + SemanticType::Tag + } else { + SemanticType::Field + } +} + +/// Builds key values with timestamps (ms) and sequences for test. +pub(crate) fn build_key_values_with_ts_seq_values( + schema: &RegionMetadataRef, + k0: String, + k1: i64, + timestamps: impl Iterator, + values: impl Iterator>, + sequence: SequenceNumber, +) -> KeyValues { + let column_schema = schema + .column_metadatas + .iter() + .map(|c| api::v1::ColumnSchema { + column_name: c.column_schema.name.clone(), + datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone()) + .unwrap() + .datatype() as i32, + semantic_type: c.semantic_type as i32, + ..Default::default() + }) + .collect(); + + let rows = timestamps + .zip(values) + .map(|(ts, v)| Row { + values: vec![ + api::v1::Value { + value_data: Some(ValueData::StringValue(k0.clone())), + }, + api::v1::Value { + value_data: Some(ValueData::I64Value(k1)), + }, + api::v1::Value { + value_data: Some(ValueData::TimestampMillisecondValue(ts)), + }, + api::v1::Value { + value_data: Some(ValueData::I64Value(ts)), + }, + api::v1::Value { + value_data: v.map(ValueData::F64Value), + }, + ], + }) + .collect(); + let mutation = api::v1::Mutation { + op_type: 1, + sequence, + rows: Some(Rows { + schema: column_schema, + rows, + }), + }; + KeyValues::new(schema.as_ref(), mutation).unwrap() +} From 2ed98ff558e2480d1861ad42bcc7b4225cf03668 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 20 Feb 2024 14:10:57 +0800 Subject: [PATCH 2/6] fix: some cr comments --- src/mito2/src/memtable/merge_tree/data.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 79029fcb680d..31465a9c67fa 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -40,9 +40,9 @@ use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME}; use crate::error; use crate::error::Result; use crate::memtable::key_values::KeyValue; -use crate::memtable::merge_tree::{PkId, PkIndex, ShardId}; +use crate::memtable::merge_tree::{PkId, PkIndex}; -const PK_INDEX_COLUMN_NAME: &str = "pk_index"; +const PK_INDEX_COLUMN_NAME: &str = "__pk_index"; /// Data part batches returns by `DataParts::read`. #[derive(Debug, Clone)] @@ -50,7 +50,7 @@ pub struct DataBatch { /// Primary key index of this batch. pk_index: PkIndex, /// Record batch of data. - rb: RecordBatch, + rb: Arc, /// Range of current primary key inside record batch range: Range, } @@ -68,14 +68,13 @@ impl DataBatch { self.range.clone() } - pub(crate) fn as_record_batch(&self) -> RecordBatch { + pub(crate) fn slice_record_batch(&self) -> RecordBatch { self.rb.slice(self.range.start, self.range.len()) } } /// Buffer for the value part (pk_index, ts, sequence, op_type, field columns) in a shard. pub struct DataBuffer { - shard_id: ShardId, metadata: RegionMetadataRef, /// Schema for data part (primary keys are replaced with pk_index) data_part_schema: SchemaRef, @@ -119,7 +118,6 @@ impl DataBuffer { let data_part_schema = memtable_schema_to_encoded_schema(&metadata); Self { - shard_id: 0, metadata, data_part_schema, field_types, @@ -261,7 +259,7 @@ fn data_buffer_to_record_batches( #[derive(Debug)] pub(crate) struct DataBufferIter { - batch: RecordBatch, + batch: Arc, offset: usize, current_data_batch: Option, } @@ -269,7 +267,7 @@ pub(crate) struct DataBufferIter { impl DataBufferIter { pub(crate) fn new(batch: RecordBatch) -> Result { let mut iter = Self { - batch, + batch: Arc::new(batch), offset: 0, current_data_batch: None, }; @@ -639,7 +637,7 @@ mod tests { fn check_buffer_values_equal(iter: &mut DataBufferIter, expected_values: &[Vec]) { let mut output = Vec::with_capacity(expected_values.len()); while iter.is_valid() { - let batch = iter.current_data_batch().as_record_batch(); + let batch = iter.current_data_batch().slice_record_batch(); let values = batch .column_by_name("v1") .unwrap() From 60ec810ea8e377ad49754aa16493bca41b7298c8 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 20 Feb 2024 14:41:06 +0800 Subject: [PATCH 3/6] chore: remove freeze_threshold in DataBuffer --- src/mito2/src/memtable/merge_tree/data.rs | 24 ++++++++++------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 31465a9c67fa..a2dd1e2b2f08 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -90,16 +90,11 @@ pub struct DataBuffer { op_type_builder: UInt8VectorBuilder, /// Builders for field columns. field_builders: Vec>>, - /// Threshold for freezing data buffer. - freeze_threshold: usize, } impl DataBuffer { - pub fn with_capacity( - metadata: RegionMetadataRef, - init_capacity: usize, - freeze_threshold: usize, - ) -> Self { + /// Creates a `DataBuffer` instance with given schema and capacity. + pub fn with_capacity(metadata: RegionMetadataRef, init_capacity: usize) -> Self { let ts_builder = metadata .time_index_column() .column_schema @@ -126,7 +121,6 @@ impl DataBuffer { sequence_builder, op_type_builder, field_builders, - freeze_threshold, } } @@ -330,9 +324,11 @@ fn search_next_pk_range(array: &UInt16Array, start: usize) -> Option<(PkIndex, R return None; } - let next_pk = array.value(start); + let values = array.values(); + let next_pk = values[start]; + for idx in start..num_rows { - if array.value(idx) != next_pk { + if values[idx] != next_pk { return Some((next_pk, start..idx)); } } @@ -513,7 +509,7 @@ mod tests { fn check_test_data_buffer_to_record_batches(keep_data: bool) { let meta = metadata_for_test(); - let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, usize::MAX); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); write_rows_to_buffer(&mut buffer, &meta, 0, vec![1, 2], vec![Some(0.1), None], 1); write_rows_to_buffer(&mut buffer, &meta, 1, vec![1, 2], vec![Some(1.1), None], 2); @@ -604,7 +600,7 @@ mod tests { #[test] fn test_encode_data_buffer() { let meta = metadata_for_test(); - let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, usize::MAX); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); // write rows with null values. write_rows_to_buffer( @@ -667,7 +663,7 @@ mod tests { #[test] fn test_iter_data_buffer() { let meta = metadata_for_test(); - let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, usize::MAX); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); write_rows_to_buffer( &mut buffer, @@ -694,7 +690,7 @@ mod tests { #[test] fn test_iter_empty_data_buffer() { let meta = metadata_for_test(); - let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, usize::MAX); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); let mut iter = buffer.iter(&[0, 1, 3, 2]).unwrap(); check_buffer_values_equal(&mut iter, &[]); } From c8b78d661796e0ab630ca1373dbec757ab6c6c7f Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 20 Feb 2024 15:01:59 +0800 Subject: [PATCH 4/6] fix: use LazyMutableVectorBuilder instead of two vector; add option to control dedup --- src/mito2/src/memtable/merge_tree/data.rs | 145 +++++++++++++++++----- 1 file changed, 115 insertions(+), 30 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index a2dd1e2b2f08..e2b83b464652 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -78,8 +78,6 @@ pub struct DataBuffer { metadata: RegionMetadataRef, /// Schema for data part (primary keys are replaced with pk_index) data_part_schema: SchemaRef, - /// Data types for field columns. - field_types: Vec, /// Builder for primary key index. pk_index_builder: UInt16VectorBuilder, /// Builder for timestamp column. @@ -89,7 +87,7 @@ pub struct DataBuffer { /// Builder for op_type column. op_type_builder: UInt8VectorBuilder, /// Builders for field columns. - field_builders: Vec>>, + field_builders: Vec, } impl DataBuffer { @@ -105,17 +103,15 @@ impl DataBuffer { let sequence_builder = UInt64VectorBuilder::with_capacity(init_capacity); let op_type_builder = UInt8VectorBuilder::with_capacity(init_capacity); - let field_types = metadata + let field_builders = metadata .field_columns() - .map(|c| c.column_schema.data_type.clone()) + .map(|c| LazyMutableVectorBuilder::new(c.column_schema.data_type.clone())) .collect::>(); - let field_builders = (0..field_types.len()).map(|_| None).collect(); let data_part_schema = memtable_schema_to_encoded_schema(&metadata); Self { metadata, data_part_schema, - field_types, pk_index_builder: pk_id_builder, ts_builder, sequence_builder, @@ -125,7 +121,7 @@ impl DataBuffer { } /// Writes a row to data buffer. - pub fn write_row(&mut self, pk_id: PkId, kv: KeyValue) -> bool { + pub fn write_row(&mut self, pk_id: PkId, kv: KeyValue) { self.ts_builder.push_value_ref(kv.timestamp()); self.pk_index_builder.push(Some(pk_id.pk_index)); self.sequence_builder.push(Some(kv.sequence())); @@ -135,16 +131,9 @@ impl DataBuffer { for (idx, field) in kv.fields().enumerate() { self.field_builders[idx] - .get_or_insert_with(|| { - let mut builder = - self.field_types[idx].create_mutable_vector(self.ts_builder.len()); - builder.push_nulls(self.ts_builder.len() - 1); - builder - }) + .get_or_create_builder(self.ts_builder.len()) .push_value_ref(field); } - - self.ts_builder.len() >= self.freeze_threshold } /// Freezes `DataBuffer` to bytes. Use `pk_weights` to convert pk_id to pk sort order. @@ -156,8 +145,14 @@ impl DataBuffer { /// Reads batches from data buffer without resetting builder's buffers. pub fn iter(&mut self, pk_weights: &[u16]) -> Result { - let batch = - data_buffer_to_record_batches(self.data_part_schema.clone(), self, pk_weights, true)?; + // todo(hl): control whether to dedup while invoking `iter`. + let batch = data_buffer_to_record_batches( + self.data_part_schema.clone(), + self, + pk_weights, + true, + true, + )?; DataBufferIter::new(batch) } @@ -172,12 +167,35 @@ impl DataBuffer { } } +enum LazyMutableVectorBuilder { + Type(ConcreteDataType), + Builder(Box), +} + +impl LazyMutableVectorBuilder { + fn new(ty: ConcreteDataType) -> Self { + Self::Type(ty) + } + + fn get_or_create_builder(&mut self, init_capacity: usize) -> &mut Box { + match self { + LazyMutableVectorBuilder::Type(ty) => { + let builder = ty.create_mutable_vector(init_capacity); + *self = LazyMutableVectorBuilder::Builder(builder); + self.get_or_create_builder(init_capacity) + } + LazyMutableVectorBuilder::Builder(builder) => builder, + } + } +} + /// Converts `DataBuffer` to record batches, with rows sorted according to pk_weights. fn data_buffer_to_record_batches( schema: SchemaRef, buffer: &mut DataBuffer, pk_weights: &[u16], keep_data: bool, + dedup: bool, ) -> Result { let num_rows = buffer.ts_builder.len(); @@ -201,7 +219,9 @@ fn data_buffer_to_record_batches( // sort and dedup rows.sort_unstable_by(|l, r| l.1.cmp(&r.1)); - rows.dedup_by(|l, r| l.1.timestamp == r.1.timestamp); + if dedup { + rows.dedup_by(|l, r| l.1.timestamp == r.1.timestamp); + } let indices_to_take = UInt32Array::from_iter_values(rows.into_iter().map(|v| v.0 as u32)); let mut columns = Vec::with_capacity(4 + buffer.field_builders.len()); @@ -226,22 +246,21 @@ fn data_buffer_to_record_batches( .context(error::ComputeArrowSnafu)?, ); - for (idx, c) in buffer.field_builders.iter_mut().enumerate() { - let array = match c { - None => { - let mut single_null = buffer.field_types[idx].create_mutable_vector(num_rows); + for b in buffer.field_builders.iter_mut() { + let array = match b { + LazyMutableVectorBuilder::Type(ty) => { + let mut single_null = ty.create_mutable_vector(num_rows); single_null.push_nulls(num_rows); single_null.to_vector().to_arrow_array() } - Some(v) => { + LazyMutableVectorBuilder::Builder(builder) => { if keep_data { - v.to_vector_cloned().to_arrow_array() + builder.to_vector_cloned().to_arrow_array() } else { - v.to_vector().to_arrow_array() + builder.to_vector().to_arrow_array() } } }; - columns.push( arrow::compute::take(&array, &indices_to_take, None) .context(error::ComputeArrowSnafu)?, @@ -473,8 +492,13 @@ impl<'a> DataPartEncoder<'a> { let mut bytes = Vec::with_capacity(1024); let mut writer = ArrowWriter::try_new(&mut bytes, self.schema.clone(), self.writer_props()) .context(error::EncodeMemtableSnafu)?; - let rb = - data_buffer_to_record_batches(self.schema.clone(), source, self.pk_weights, false)?; + let rb = data_buffer_to_record_batches( + self.schema.clone(), + source, + self.pk_weights, + false, + true, + )?; writer.write(&rb).context(error::EncodeMemtableSnafu)?; let _file_meta = writer.close().context(error::EncodeMemtableSnafu)?; Ok(Bytes::from(bytes)) @@ -507,6 +531,26 @@ mod tests { use super::*; use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test}; + #[test] + fn test_lazy_mutable_vector_builder() { + let mut builder = LazyMutableVectorBuilder::new(ConcreteDataType::boolean_datatype()); + match builder { + LazyMutableVectorBuilder::Type(ref t) => { + assert_eq!(&ConcreteDataType::boolean_datatype(), t); + } + LazyMutableVectorBuilder::Builder(_) => { + unreachable!() + } + } + builder.get_or_create_builder(1); + match builder { + LazyMutableVectorBuilder::Type(_) => { + unreachable!() + } + LazyMutableVectorBuilder::Builder(_) => {} + } + } + fn check_test_data_buffer_to_record_batches(keep_data: bool) { let meta = metadata_for_test(); let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); @@ -516,7 +560,8 @@ mod tests { write_rows_to_buffer(&mut buffer, &meta, 0, vec![2], vec![Some(1.1)], 3); assert_eq!(5, buffer.num_rows()); let schema = memtable_schema_to_encoded_schema(&meta); - let batch = data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], keep_data).unwrap(); + let batch = + data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], keep_data, true).unwrap(); assert_eq!( vec![1, 2, 1, 2], @@ -569,6 +614,46 @@ mod tests { check_test_data_buffer_to_record_batches(false); } + #[test] + fn test_data_buffer_to_record_batches_without_dedup() { + let meta = metadata_for_test(); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); + + write_rows_to_buffer(&mut buffer, &meta, 0, vec![1, 2], vec![Some(0.1), None], 1); + write_rows_to_buffer(&mut buffer, &meta, 1, vec![1, 2], vec![Some(1.1), None], 2); + write_rows_to_buffer(&mut buffer, &meta, 0, vec![2], vec![Some(1.1)], 3); + assert_eq!(5, buffer.num_rows()); + let schema = memtable_schema_to_encoded_schema(&meta); + let batch = + data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], true, false).unwrap(); + + assert_eq!( + vec![1, 1, 0, 0, 0], + batch + .column_by_name(PK_INDEX_COLUMN_NAME) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect::>() + ); + + assert_eq!( + vec![1, 2, 1, 2, 2], + batch + .column_by_name("ts") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect::>() + ); + } + fn write_rows_to_buffer( buffer: &mut DataBuffer, schema: &RegionMetadataRef, From 2746af88d2783d54f2bb80ae0b545f4b6fd9499a Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 20 Feb 2024 15:55:20 +0800 Subject: [PATCH 5/6] fix: dedup rows according to both pk weights and timestamps --- src/mito2/src/memtable/merge_tree/data.rs | 58 ++++++++++++++++++++++- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index e2b83b464652..e0315f764237 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -220,7 +220,7 @@ fn data_buffer_to_record_batches( // sort and dedup rows.sort_unstable_by(|l, r| l.1.cmp(&r.1)); if dedup { - rows.dedup_by(|l, r| l.1.timestamp == r.1.timestamp); + rows.dedup_by(|l, r| l.1.pk_weight == r.1.pk_weight && l.1.timestamp == r.1.timestamp); } let indices_to_take = UInt32Array::from_iter_values(rows.into_iter().map(|v| v.0 as u32)); @@ -524,7 +524,7 @@ pub struct DataParts {} #[cfg(test)] mod tests { use datafusion::arrow::array::Float64Array; - use datatypes::arrow::array::{TimestampMillisecondArray, UInt16Array}; + use datatypes::arrow::array::{TimestampMillisecondArray, UInt16Array, UInt64Array}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::data_type::AsBytes; @@ -614,6 +614,60 @@ mod tests { check_test_data_buffer_to_record_batches(false); } + #[test] + fn test_data_buffer_to_record_batches_with_dedup() { + let meta = metadata_for_test(); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); + + write_rows_to_buffer(&mut buffer, &meta, 0, vec![1, 2], vec![Some(0.1), None], 1); + write_rows_to_buffer(&mut buffer, &meta, 1, vec![2], vec![Some(1.1)], 2); + write_rows_to_buffer(&mut buffer, &meta, 0, vec![2], vec![Some(1.1)], 3); + assert_eq!(4, buffer.num_rows()); + let schema = memtable_schema_to_encoded_schema(&meta); + let batch = + data_buffer_to_record_batches(schema, &mut buffer, &[0, 1], true, true).unwrap(); + + assert_eq!(3, batch.num_rows()); + assert_eq!( + vec![0, 0, 1], + batch + .column_by_name(PK_INDEX_COLUMN_NAME) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect::>() + ); + + assert_eq!( + vec![1, 2, 2], + batch + .column_by_name("ts") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect::>() + ); + + assert_eq!( + vec![1, 3, 2], + batch + .column_by_name(SEQUENCE_COLUMN_NAME) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect::>() + ); + } + #[test] fn test_data_buffer_to_record_batches_without_dedup() { let meta = metadata_for_test(); From 6e778fa37b6937a47db024f98e49fe01a21ae580 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 20 Feb 2024 17:12:21 +0800 Subject: [PATCH 6/6] fix: assembly DataBatch on demand --- src/mito2/src/memtable/merge_tree/data.rs | 40 ++++++++++++----------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index e0315f764237..2e903519e618 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -46,22 +46,22 @@ const PK_INDEX_COLUMN_NAME: &str = "__pk_index"; /// Data part batches returns by `DataParts::read`. #[derive(Debug, Clone)] -pub struct DataBatch { +pub struct DataBatch<'a> { /// Primary key index of this batch. pk_index: PkIndex, /// Record batch of data. - rb: Arc, + rb: &'a RecordBatch, /// Range of current primary key inside record batch range: Range, } -impl DataBatch { +impl<'a> DataBatch<'a> { pub(crate) fn pk_index(&self) -> PkIndex { self.pk_index } pub(crate) fn record_batch(&self) -> &RecordBatch { - &self.rb + self.rb } pub(crate) fn range(&self) -> Range { @@ -272,54 +272,56 @@ fn data_buffer_to_record_batches( #[derive(Debug)] pub(crate) struct DataBufferIter { - batch: Arc, + batch: RecordBatch, offset: usize, - current_data_batch: Option, + current_batch: Option<(PkIndex, Range)>, } impl DataBufferIter { pub(crate) fn new(batch: RecordBatch) -> Result { let mut iter = Self { - batch: Arc::new(batch), + batch, offset: 0, - current_data_batch: None, + current_batch: None, }; iter.next()?; // fill data batch for comparison and merge. Ok(iter) } pub(crate) fn is_valid(&self) -> bool { - self.current_data_batch.is_some() + self.current_batch.is_some() } /// # Panics /// If Current iterator is not exhausted. pub(crate) fn current_data_batch(&self) -> DataBatch { - self.current_data_batch.as_ref().unwrap().clone() + let (pk_index, range) = self.current_batch.as_ref().unwrap(); + DataBatch { + pk_index: *pk_index, + rb: &self.batch, + range: range.clone(), + } } /// # Panics - /// If Current iterator is not exhausted. + /// If Current iterator is exhausted. pub(crate) fn current_pk_index(&self) -> PkIndex { - self.current_data_batch.as_ref().unwrap().pk_index + let (pk_index, _) = self.current_batch.as_ref().unwrap(); + *pk_index } /// Advances iterator to next data batch. pub(crate) fn next(&mut self) -> Result<()> { if self.offset >= self.batch.num_rows() { - self.current_data_batch = None; + self.current_batch = None; return Ok(()); } let pk_index_array = pk_index_array(&self.batch); if let Some((next_pk, range)) = search_next_pk_range(pk_index_array, self.offset) { self.offset = range.end; - self.current_data_batch = Some(DataBatch { - pk_index: next_pk, - rb: self.batch.clone(), - range, - }) + self.current_batch = Some((next_pk, range)) } else { - self.current_data_batch = None; + self.current_batch = None; } Ok(()) }