Skip to content

Commit

Permalink
refactor: change the receivers of merge tree components (#3378)
Browse files Browse the repository at this point in the history
* refactor: change the receivers of Shard::read/DataBuffer::read/DataParts::read to &self instead of &mut self

* refactor: remove allow(dead_code) in merge tree
  • Loading branch information
v0y4g3r authored Feb 26, 2024
1 parent e481f07 commit 8087822
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 195 deletions.
1 change: 0 additions & 1 deletion src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
//! Memtables are write buffers for regions.
pub mod key_values;
#[allow(dead_code)]
pub mod merge_tree;
pub mod time_series;
pub(crate) mod version;
Expand Down
225 changes: 119 additions & 106 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use datatypes::types::TimestampType;
use datatypes::vectors::{
TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
TimestampSecondVector, UInt16Vector, UInt16VectorBuilder, UInt64Vector, UInt64VectorBuilder,
UInt8VectorBuilder,
UInt8Vector, UInt8VectorBuilder,
};
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
use parquet::arrow::ArrowWriter;
Expand Down Expand Up @@ -65,10 +65,6 @@ impl DataBatchRange {
pub(crate) fn len(&self) -> usize {
self.end - self.start
}

pub(crate) fn is_empty(&self) -> bool {
self.len() == 0
}
}

/// Data part batches returns by `DataParts::read`.
Expand All @@ -89,10 +85,6 @@ impl<'a> DataBatch<'a> {
self.range
}

pub(crate) fn is_empty(&self) -> bool {
self.range.is_empty()
}

pub(crate) fn slice_record_batch(&self) -> RecordBatch {
self.rb.slice(self.range.start, self.range.len())
}
Expand Down Expand Up @@ -258,12 +250,11 @@ impl DataBuffer {
/// Reads batches from data buffer without resetting builder's buffers.
/// If pk_weights is present, yielded rows are sorted according to weights,
/// otherwise rows are sorted by "pk_weights" values as they are actually weights.
pub fn read(&mut self, pk_weights: Option<&[u16]>) -> Result<DataBufferReader> {
let batch = data_buffer_to_record_batches(
pub fn read(&self, pk_weights: Option<&[u16]>) -> Result<DataBufferReader> {
let batch = read_data_buffer_to_record_batches(
self.data_part_schema.clone(),
self,
pk_weights,
true,
self.dedup,
// replace_pk_index is always set to false since:
// - for DataBuffer in ShardBuilder, pk dict is not frozen
Expand Down Expand Up @@ -307,44 +298,123 @@ impl LazyMutableVectorBuilder {
}

/// Converts `DataBuffer` to record batches, with rows sorted according to pk_weights.
/// `keep_data`: whether to keep the original data inside `DataBuffer`.
/// `dedup`: whether to true to remove the duplicated rows inside `DataBuffer`.
/// `replace_pk_index`: whether to replace the pk_index values with corresponding pk weight.
fn data_buffer_to_record_batches(
fn drain_data_buffer_to_record_batches(
schema: SchemaRef,
buffer: &mut DataBuffer,
pk_weights: Option<&[u16]>,
keep_data: bool,
dedup: bool,
replace_pk_index: bool,
) -> Result<RecordBatch> {
let num_rows = buffer.ts_builder.len();

let (pk_index_v, ts_v, sequence_v, op_type_v) = if keep_data {
(
buffer.pk_index_builder.finish_cloned(),
buffer.ts_builder.to_vector_cloned(),
buffer.sequence_builder.finish_cloned(),
buffer.op_type_builder.finish_cloned(),
)
} else {
(
buffer.pk_index_builder.finish(),
buffer.ts_builder.to_vector(),
buffer.sequence_builder.finish(),
buffer.op_type_builder.finish(),
)
};
let (pk_index_v, ts_v, sequence_v, op_type_v) = (
buffer.pk_index_builder.finish(),
buffer.ts_builder.to_vector(),
buffer.sequence_builder.finish(),
buffer.op_type_builder.finish(),
);

let (indices_to_take, mut columns) = build_row_sort_indices_and_columns(
pk_weights,
pk_index_v,
ts_v,
sequence_v,
op_type_v,
replace_pk_index,
dedup,
buffer.field_builders.len() + 4,
)?;

for b in buffer.field_builders.iter_mut() {
let array = match b {
LazyMutableVectorBuilder::Type(ty) => {
let mut single_null = ty.create_mutable_vector(num_rows);
single_null.push_nulls(num_rows);
single_null.to_vector().to_arrow_array()
}
LazyMutableVectorBuilder::Builder(builder) => builder.to_vector().to_arrow_array(),
};
columns.push(
arrow::compute::take(&array, &indices_to_take, None)
.context(error::ComputeArrowSnafu)?,
);
}

let mut rows = build_rows_to_sort(pk_weights, &pk_index_v, &ts_v, &sequence_v);
RecordBatch::try_new(schema, columns).context(error::NewRecordBatchSnafu)
}

/// Reads `DataBuffer` to record batches, with rows sorted according to pk_weights without resetting `DataBuffer`.
/// `dedup`: whether to true to remove the duplicated rows inside `DataBuffer`.
/// `replace_pk_index`: whether to replace the pk_index values with corresponding pk weight.
fn read_data_buffer_to_record_batches(
schema: SchemaRef,
buffer: &DataBuffer,
pk_weights: Option<&[u16]>,
dedup: bool,
replace_pk_index: bool,
) -> Result<RecordBatch> {
let num_rows = buffer.ts_builder.len();

let (pk_index_v, ts_v, sequence_v, op_type_v) = (
buffer.pk_index_builder.finish_cloned(),
buffer.ts_builder.to_vector_cloned(),
buffer.sequence_builder.finish_cloned(),
buffer.op_type_builder.finish_cloned(),
);

let (indices_to_take, mut columns) = build_row_sort_indices_and_columns(
pk_weights,
pk_index_v,
ts_v,
sequence_v,
op_type_v,
replace_pk_index,
dedup,
buffer.field_builders.len() + 4,
)?;

for b in buffer.field_builders.iter() {
let array = match b {
LazyMutableVectorBuilder::Type(ty) => {
let mut single_null = ty.create_mutable_vector(num_rows);
single_null.push_nulls(num_rows);
single_null.to_vector().to_arrow_array()
}
LazyMutableVectorBuilder::Builder(builder) => {
builder.to_vector_cloned().to_arrow_array()
}
};
columns.push(
arrow::compute::take(&array, &indices_to_take, None)
.context(error::ComputeArrowSnafu)?,
);
}

RecordBatch::try_new(schema, columns).context(error::NewRecordBatchSnafu)
}

#[allow(clippy::too_many_arguments)]
fn build_row_sort_indices_and_columns(
pk_weights: Option<&[u16]>,
pk_index: UInt16Vector,
ts: VectorRef,
sequence: UInt64Vector,
op_type: UInt8Vector,
replace_pk_index: bool,
dedup: bool,
column_num: usize,
) -> Result<(UInt32Array, Vec<ArrayRef>)> {
let mut rows = build_rows_to_sort(pk_weights, &pk_index, &ts, &sequence);

let pk_array = if replace_pk_index {
// replace pk index values with pk weights.
Arc::new(UInt16Array::from_iter_values(
rows.iter().map(|(_, key)| key.pk_weight),
)) as Arc<_>
} else {
pk_index_v.to_arrow_array()
pk_index.to_arrow_array()
};

// sort and dedup
Expand All @@ -355,50 +425,29 @@ fn data_buffer_to_record_batches(

let indices_to_take = UInt32Array::from_iter_values(rows.iter().map(|(idx, _)| *idx as u32));

let mut columns = Vec::with_capacity(4 + buffer.field_builders.len());
let mut columns = Vec::with_capacity(column_num);

columns.push(
arrow::compute::take(&pk_array, &indices_to_take, None)
.context(error::ComputeArrowSnafu)?,
);

columns.push(
arrow::compute::take(&ts_v.to_arrow_array(), &indices_to_take, None)
arrow::compute::take(&ts.to_arrow_array(), &indices_to_take, None)
.context(error::ComputeArrowSnafu)?,
);

columns.push(
arrow::compute::take(&sequence_v.as_arrow(), &indices_to_take, None)
arrow::compute::take(&sequence.as_arrow(), &indices_to_take, None)
.context(error::ComputeArrowSnafu)?,
);

columns.push(
arrow::compute::take(&op_type_v.as_arrow(), &indices_to_take, None)
arrow::compute::take(&op_type.as_arrow(), &indices_to_take, None)
.context(error::ComputeArrowSnafu)?,
);

for b in buffer.field_builders.iter_mut() {
let array = match b {
LazyMutableVectorBuilder::Type(ty) => {
let mut single_null = ty.create_mutable_vector(num_rows);
single_null.push_nulls(num_rows);
single_null.to_vector().to_arrow_array()
}
LazyMutableVectorBuilder::Builder(builder) => {
if keep_data {
builder.to_vector_cloned().to_arrow_array()
} else {
builder.to_vector().to_arrow_array()
}
}
};
columns.push(
arrow::compute::take(&array, &indices_to_take, None)
.context(error::ComputeArrowSnafu)?,
);
}

RecordBatch::try_new(schema, columns).context(error::NewRecordBatchSnafu)
Ok((indices_to_take, columns))
}

pub(crate) fn timestamp_array_to_i64_slice(arr: &ArrayRef) -> &[i64] {
Expand Down Expand Up @@ -468,12 +517,6 @@ impl DataBufferReader {
}
}

/// # Panics
/// If Current reader is exhausted.
pub(crate) fn current_pk_index(&self) -> PkIndex {
self.current_range.as_ref().unwrap().pk_index
}

/// Advances reader to next data batch.
pub(crate) fn next(&mut self) -> Result<()> {
if self.offset >= self.batch.num_rows() {
Expand Down Expand Up @@ -673,11 +716,10 @@ impl<'a> DataPartEncoder<'a> {
let mut bytes = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut bytes, self.schema.clone(), self.writer_props())
.context(error::EncodeMemtableSnafu)?;
let rb = data_buffer_to_record_batches(
let rb = drain_data_buffer_to_record_batches(
self.schema.clone(),
source,
self.pk_weights,
false,
self.dedup,
self.replace_pk_index,
)?;
Expand All @@ -695,12 +737,6 @@ pub enum DataPart {
}

impl DataPart {
fn is_empty(&self) -> bool {
match self {
DataPart::Parquet(p) => p.data.is_empty(),
}
}

/// Reads frozen data part and yields [DataBatch]es.
pub fn read(&self) -> Result<DataPartReader> {
match self {
Expand Down Expand Up @@ -745,14 +781,6 @@ impl DataPartReader {
self.current_range.is_some()
}

/// Returns current pk index.
///
/// # Panics
/// If reader is exhausted.
pub(crate) fn current_pk_index(&self) -> PkIndex {
self.current_range.as_ref().unwrap().pk_index
}

/// Returns current data batch of reader.
/// # Panics
/// If reader is exhausted.
Expand Down Expand Up @@ -835,16 +863,10 @@ impl DataParts {
self.active.write_row(pk_index, kv)
}

/// Freezes the active data buffer into frozen data parts.
pub fn freeze(&mut self) -> Result<()> {
self.frozen.push(self.active.freeze(None, false)?);
Ok(())
}

/// Reads data from all parts including active and frozen parts.
/// The returned iterator yields a record batch of one primary key at a time.
/// The order of yielding primary keys is determined by provided weights.
pub fn read(&mut self) -> Result<DataPartsReader> {
pub fn read(&self) -> Result<DataPartsReader> {
let mut nodes = Vec::with_capacity(self.frozen.len() + 1);
nodes.push(DataNode::new(DataSource::Buffer(
// `DataPars::read` ensures that all pk_index inside `DataBuffer` are replaced by weights.
Expand All @@ -857,10 +879,6 @@ impl DataParts {
let merger = Merger::try_new(nodes)?;
Ok(DataPartsReader { merger })
}

pub(crate) fn is_empty(&self) -> bool {
self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty())
}
}

/// Reader for all parts inside a `DataParts`.
Expand Down Expand Up @@ -924,15 +942,12 @@ mod tests {
write_rows_to_buffer(&mut buffer, &meta, 0, vec![2], vec![Some(1.1)], 3);
assert_eq!(5, buffer.num_rows());
let schema = memtable_schema_to_encoded_schema(&meta);
let batch = data_buffer_to_record_batches(
schema,
&mut buffer,
Some(&[3, 1]),
keep_data,
true,
true,
)
.unwrap();
let batch = if keep_data {
read_data_buffer_to_record_batches(schema, &buffer, Some(&[3, 1]), true, true).unwrap()
} else {
drain_data_buffer_to_record_batches(schema, &mut buffer, Some(&[3, 1]), true, true)
.unwrap()
};

assert_eq!(
vec![1, 2, 1, 2],
Expand Down Expand Up @@ -1036,8 +1051,7 @@ mod tests {
assert_eq!(4, buffer.num_rows());
let schema = memtable_schema_to_encoded_schema(&meta);
let batch =
data_buffer_to_record_batches(schema, &mut buffer, Some(&[0, 1]), true, true, true)
.unwrap();
read_data_buffer_to_record_batches(schema, &buffer, Some(&[0, 1]), true, true).unwrap();

assert_eq!(3, batch.num_rows());
assert_eq!(
Expand Down Expand Up @@ -1090,9 +1104,8 @@ mod tests {
write_rows_to_buffer(&mut buffer, &meta, 0, vec![2], vec![Some(1.1)], 3);
assert_eq!(5, buffer.num_rows());
let schema = memtable_schema_to_encoded_schema(&meta);
let batch =
data_buffer_to_record_batches(schema, &mut buffer, Some(&[3, 1]), true, false, true)
.unwrap();
let batch = read_data_buffer_to_record_batches(schema, &buffer, Some(&[3, 1]), false, true)
.unwrap();

assert_eq!(
vec![1, 1, 3, 3, 3],
Expand Down Expand Up @@ -1289,7 +1302,7 @@ mod tests {
#[test]
fn test_iter_empty_data_buffer() {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
let buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
let mut iter = buffer.read(Some(&[0, 1, 3, 2])).unwrap();
check_buffer_values_equal(&mut iter, &[]);
}
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/memtable/merge_tree/dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ mod tests {
frozens.push(part1);
}

let mut parts = DataParts::new(meta, 10, true).with_frozen(frozens);
let parts = DataParts::new(meta, 10, true).with_frozen(frozens);

let mut res = Vec::with_capacity(expected.len());
let mut reader = DedupReader::try_new(MockSource(parts.read().unwrap())).unwrap();
Expand Down
Loading

0 comments on commit 8087822

Please sign in to comment.