Skip to content

Commit

Permalink
refactor: remove allow(dead_code) in merge tree
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed Feb 26, 2024
1 parent 8a949e6 commit ae54a37
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 121 deletions.
1 change: 0 additions & 1 deletion src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
//! Memtables are write buffers for regions.
pub mod key_values;
#[allow(dead_code)]
pub mod merge_tree;
pub mod time_series;
pub(crate) mod version;
Expand Down
38 changes: 0 additions & 38 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ impl DataBatchRange {
pub(crate) fn len(&self) -> usize {
self.end - self.start
}

pub(crate) fn is_empty(&self) -> bool {
self.len() == 0
}
}

/// Data part batches returns by `DataParts::read`.
Expand All @@ -89,10 +85,6 @@ impl<'a> DataBatch<'a> {
self.range
}

pub(crate) fn is_empty(&self) -> bool {
self.range.is_empty()
}

pub(crate) fn slice_record_batch(&self) -> RecordBatch {
self.rb.slice(self.range.start, self.range.len())
}
Expand Down Expand Up @@ -525,12 +517,6 @@ impl DataBufferReader {
}
}

/// # Panics
/// If Current reader is exhausted.
pub(crate) fn current_pk_index(&self) -> PkIndex {
self.current_range.as_ref().unwrap().pk_index
}

/// Advances reader to next data batch.
pub(crate) fn next(&mut self) -> Result<()> {
if self.offset >= self.batch.num_rows() {
Expand Down Expand Up @@ -751,12 +737,6 @@ pub enum DataPart {
}

impl DataPart {
fn is_empty(&self) -> bool {
match self {
DataPart::Parquet(p) => p.data.is_empty(),
}
}

/// Reads frozen data part and yields [DataBatch]es.
pub fn read(&self) -> Result<DataPartReader> {
match self {
Expand Down Expand Up @@ -801,14 +781,6 @@ impl DataPartReader {
self.current_range.is_some()
}

/// Returns current pk index.
///
/// # Panics
/// If reader is exhausted.
pub(crate) fn current_pk_index(&self) -> PkIndex {
self.current_range.as_ref().unwrap().pk_index
}

/// Returns current data batch of reader.
/// # Panics
/// If reader is exhausted.
Expand Down Expand Up @@ -891,12 +863,6 @@ impl DataParts {
self.active.write_row(pk_index, kv)
}

/// Freezes the active data buffer into frozen data parts.
pub fn freeze(&mut self) -> Result<()> {
self.frozen.push(self.active.freeze(None, false)?);
Ok(())
}

/// Reads data from all parts including active and frozen parts.
/// The returned iterator yields a record batch of one primary key at a time.
/// The order of yielding primary keys is determined by provided weights.
Expand All @@ -913,10 +879,6 @@ impl DataParts {
let merger = Merger::try_new(nodes)?;
Ok(DataPartsReader { merger })
}

pub(crate) fn is_empty(&self) -> bool {
self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty())
}
}

/// Reader for all parts inside a `DataParts`.
Expand Down
39 changes: 6 additions & 33 deletions src/mito2/src/memtable/merge_tree/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ impl KeyDictBuilder {
}
}

/// Gets the pk index by the key.
pub fn get_pk_index(&self, key: &[u8]) -> Option<PkIndex> {
self.pk_to_index.get(key).copied()
}

/// Returns true if the builder is full.
pub fn is_full(&self) -> bool {
self.num_keys >= self.capacity
Expand Down Expand Up @@ -97,6 +92,7 @@ impl KeyDictBuilder {
}

/// Memory size of the builder.
#[cfg(test)]
pub fn memory_size(&self) -> usize {
self.key_bytes_in_index
+ self.key_buffer.buffer_memory_size()
Expand Down Expand Up @@ -152,30 +148,30 @@ impl KeyDictBuilder {
pub struct DictBuilderReader {
blocks: Vec<DictBlock>,
sorted_pk_indices: Vec<PkIndex>,
/// Current offset in the `sorted_pk_indices`.
offset: usize,
}

impl DictBuilderReader {
fn new(blocks: Vec<DictBlock>, sorted_pk_indices: Vec<PkIndex>) -> Self {
Self {
blocks,
sorted_pk_indices,
offset: 0,
}
}

/// Returns the number of keys.
#[cfg(test)]
pub fn num_keys(&self) -> usize {
self.sorted_pk_indices.len()
}

/// Gets the i-th pk index.
#[cfg(test)]
pub fn pk_index(&self, offset: usize) -> PkIndex {
self.sorted_pk_indices[offset]
}

/// Gets the i-th key.
#[cfg(test)]
pub fn key(&self, offset: usize) -> &[u8] {
let pk_index = self.pk_index(offset);
self.key_by_pk_index(pk_index)
Expand All @@ -191,11 +187,6 @@ impl DictBuilderReader {
pub(crate) fn pk_weights_to_sort_data(&self, pk_weights: &mut Vec<u16>) {
compute_pk_weights(&self.sorted_pk_indices, pk_weights)
}

/// Returns pk indices sorted by keys.
pub(crate) fn sorted_pk_index(&self) -> &[PkIndex] {
&self.sorted_pk_indices
}
}

/// Returns pk weights to sort a data part and replaces pk indices.
Expand Down Expand Up @@ -290,23 +281,8 @@ impl KeyBuffer {
self.key_builder.is_empty()
}

/// Gets the primary key by its index.
///
/// # Panics
/// Panics if the index is invalid.
fn get_key(&self, index: PkIndex) -> &[u8] {
let values = self.key_builder.values_slice();
let offsets = self.key_builder.offsets_slice();
// Casting index to usize is safe.
let start = offsets[index as usize];
let end = offsets[index as usize + 1];

// We ensure no null in the builder so we don't check validity.
// The builder offset should be positive.
&values[start as usize..end as usize]
}

/// Returns the buffer size of the builder.
#[cfg(test)]
fn buffer_memory_size(&self) -> usize {
self.key_builder.values_slice().len()
+ std::mem::size_of_val(self.key_builder.offsets_slice())
Expand Down Expand Up @@ -351,15 +327,12 @@ impl DictBlock {
Self { keys }
}

fn len(&self) -> usize {
self.keys.len()
}

fn key_by_pk_index(&self, index: PkIndex) -> &[u8] {
let pos = index % MAX_KEYS_PER_BLOCK;
self.keys.value(pos as usize)
}

#[cfg(test)]
fn buffer_memory_size(&self) -> usize {
self.keys.get_buffer_memory_size()
}
Expand Down
4 changes: 0 additions & 4 deletions src/mito2/src/memtable/merge_tree/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,6 @@ pub struct ShardReader {
}

impl ShardReader {
fn shard_id(&self) -> ShardId {
self.shard_id
}

fn is_valid(&self) -> bool {
self.parts_reader.is_valid()
}
Expand Down
29 changes: 1 addition & 28 deletions src/mito2/src/memtable/merge_tree/shard_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,6 @@ pub struct ShardBuilderReader {
}

impl ShardBuilderReader {
pub fn shard_id(&self) -> ShardId {
self.shard_id
}

pub fn is_valid(&self) -> bool {
self.data_reader.is_valid()
}
Expand Down Expand Up @@ -164,14 +160,12 @@ impl ShardBuilderReader {

#[cfg(test)]
mod tests {
use std::sync::Arc;

use super::*;
use crate::memtable::merge_tree::dict::KeyDictBuilder;
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::KeyValues;
use crate::test_util::memtable_util::{
build_key_values_with_ts_seq_values, encode_key_by_kv, encode_keys, metadata_for_test,
build_key_values_with_ts_seq_values, encode_key_by_kv, metadata_for_test,
};

fn input_with_key(metadata: &RegionMetadataRef) -> Vec<KeyValues> {
Expand Down Expand Up @@ -203,27 +197,6 @@ mod tests {
]
}

fn new_shard_builder(
shard_id: ShardId,
metadata: RegionMetadataRef,
input: &[KeyValues],
) -> Shard {
let mut dict_builder = KeyDictBuilder::new(1024);
let mut metrics = WriteMetrics::default();
let mut keys = Vec::with_capacity(input.len());
for kvs in input {
encode_keys(&metadata, kvs, &mut keys);
}
for key in &keys {
dict_builder.insert_key(key, &mut metrics);
}

let dict = dict_builder.finish().unwrap();
let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true);

Shard::new(shard_id, Some(Arc::new(dict)), data_parts, true)
}

#[test]
fn test_write_shard_builder() {
let metadata = metadata_for_test();
Expand Down
17 changes: 0 additions & 17 deletions src/mito2/src/memtable/merge_tree/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use api::v1::OpType;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_time::Timestamp;
use datafusion_common::ScalarValue;
use datatypes::arrow;
use datatypes::data_type::ConcreteDataType;
use snafu::ensure;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
Expand All @@ -35,7 +33,6 @@ use crate::memtable::merge_tree::partition::{
Partition, PartitionKey, PartitionReader, PartitionRef, ReadPartitionContext,
};
use crate::memtable::merge_tree::MergeTreeConfig;
use crate::memtable::time_series::primary_key_schema;
use crate::memtable::{BoxedBatchIterator, KeyValues};
use crate::read::Batch;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
Expand Down Expand Up @@ -144,18 +141,8 @@ impl MergeTree {
.unwrap_or_default();

let partitions = self.prune_partitions(&filters);
let pk_schema = primary_key_schema(&self.metadata);
let pk_datatypes = self
.metadata
.primary_key_columns()
.map(|pk| pk.column_schema.data_type.clone())
.collect();

let mut iter = TreeIter {
metadata: self.metadata.clone(),
pk_schema,
pk_datatypes,
row_codec: self.row_codec.clone(),
partitions,
current_reader: None,
};
Expand Down Expand Up @@ -283,10 +270,6 @@ impl MergeTree {
}

struct TreeIter {
metadata: RegionMetadataRef,
pk_schema: arrow::datatypes::SchemaRef,
pk_datatypes: Vec<ConcreteDataType>,
row_codec: Arc<McmpRowCodec>,
partitions: VecDeque<PartitionRef>,
current_reader: Option<PartitionReader>,
}
Expand Down

0 comments on commit ae54a37

Please sign in to comment.