diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 81750de4db4a..c5cd32bf0ec5 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -15,7 +15,6 @@ //! Memtables are write buffers for regions. pub mod key_values; -#[allow(dead_code)] pub mod merge_tree; pub mod time_series; pub(crate) mod version; diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 11e0e676e8db..343ed8e2884a 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -65,10 +65,6 @@ impl DataBatchRange { pub(crate) fn len(&self) -> usize { self.end - self.start } - - pub(crate) fn is_empty(&self) -> bool { - self.len() == 0 - } } /// Data part batches returns by `DataParts::read`. @@ -89,10 +85,6 @@ impl<'a> DataBatch<'a> { self.range } - pub(crate) fn is_empty(&self) -> bool { - self.range.is_empty() - } - pub(crate) fn slice_record_batch(&self) -> RecordBatch { self.rb.slice(self.range.start, self.range.len()) } @@ -525,12 +517,6 @@ impl DataBufferReader { } } - /// # Panics - /// If Current reader is exhausted. - pub(crate) fn current_pk_index(&self) -> PkIndex { - self.current_range.as_ref().unwrap().pk_index - } - /// Advances reader to next data batch. pub(crate) fn next(&mut self) -> Result<()> { if self.offset >= self.batch.num_rows() { @@ -751,12 +737,6 @@ pub enum DataPart { } impl DataPart { - fn is_empty(&self) -> bool { - match self { - DataPart::Parquet(p) => p.data.is_empty(), - } - } - /// Reads frozen data part and yields [DataBatch]es. pub fn read(&self) -> Result { match self { @@ -801,14 +781,6 @@ impl DataPartReader { self.current_range.is_some() } - /// Returns current pk index. - /// - /// # Panics - /// If reader is exhausted. - pub(crate) fn current_pk_index(&self) -> PkIndex { - self.current_range.as_ref().unwrap().pk_index - } - /// Returns current data batch of reader. /// # Panics /// If reader is exhausted. @@ -891,12 +863,6 @@ impl DataParts { self.active.write_row(pk_index, kv) } - /// Freezes the active data buffer into frozen data parts. - pub fn freeze(&mut self) -> Result<()> { - self.frozen.push(self.active.freeze(None, false)?); - Ok(()) - } - /// Reads data from all parts including active and frozen parts. /// The returned iterator yields a record batch of one primary key at a time. /// The order of yielding primary keys is determined by provided weights. @@ -913,10 +879,6 @@ impl DataParts { let merger = Merger::try_new(nodes)?; Ok(DataPartsReader { merger }) } - - pub(crate) fn is_empty(&self) -> bool { - self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty()) - } } /// Reader for all parts inside a `DataParts`. diff --git a/src/mito2/src/memtable/merge_tree/dict.rs b/src/mito2/src/memtable/merge_tree/dict.rs index 543dca3d1c8e..989a59d007ac 100644 --- a/src/mito2/src/memtable/merge_tree/dict.rs +++ b/src/mito2/src/memtable/merge_tree/dict.rs @@ -56,11 +56,6 @@ impl KeyDictBuilder { } } - /// Gets the pk index by the key. - pub fn get_pk_index(&self, key: &[u8]) -> Option { - self.pk_to_index.get(key).copied() - } - /// Returns true if the builder is full. pub fn is_full(&self) -> bool { self.num_keys >= self.capacity @@ -97,6 +92,7 @@ impl KeyDictBuilder { } /// Memory size of the builder. + #[cfg(test)] pub fn memory_size(&self) -> usize { self.key_bytes_in_index + self.key_buffer.buffer_memory_size() @@ -152,8 +148,6 @@ impl KeyDictBuilder { pub struct DictBuilderReader { blocks: Vec, sorted_pk_indices: Vec, - /// Current offset in the `sorted_pk_indices`. - offset: usize, } impl DictBuilderReader { @@ -161,21 +155,23 @@ impl DictBuilderReader { Self { blocks, sorted_pk_indices, - offset: 0, } } /// Returns the number of keys. + #[cfg(test)] pub fn num_keys(&self) -> usize { self.sorted_pk_indices.len() } /// Gets the i-th pk index. + #[cfg(test)] pub fn pk_index(&self, offset: usize) -> PkIndex { self.sorted_pk_indices[offset] } /// Gets the i-th key. + #[cfg(test)] pub fn key(&self, offset: usize) -> &[u8] { let pk_index = self.pk_index(offset); self.key_by_pk_index(pk_index) @@ -191,11 +187,6 @@ impl DictBuilderReader { pub(crate) fn pk_weights_to_sort_data(&self, pk_weights: &mut Vec) { compute_pk_weights(&self.sorted_pk_indices, pk_weights) } - - /// Returns pk indices sorted by keys. - pub(crate) fn sorted_pk_index(&self) -> &[PkIndex] { - &self.sorted_pk_indices - } } /// Returns pk weights to sort a data part and replaces pk indices. @@ -290,23 +281,8 @@ impl KeyBuffer { self.key_builder.is_empty() } - /// Gets the primary key by its index. - /// - /// # Panics - /// Panics if the index is invalid. - fn get_key(&self, index: PkIndex) -> &[u8] { - let values = self.key_builder.values_slice(); - let offsets = self.key_builder.offsets_slice(); - // Casting index to usize is safe. - let start = offsets[index as usize]; - let end = offsets[index as usize + 1]; - - // We ensure no null in the builder so we don't check validity. - // The builder offset should be positive. - &values[start as usize..end as usize] - } - /// Returns the buffer size of the builder. + #[cfg(test)] fn buffer_memory_size(&self) -> usize { self.key_builder.values_slice().len() + std::mem::size_of_val(self.key_builder.offsets_slice()) @@ -351,15 +327,12 @@ impl DictBlock { Self { keys } } - fn len(&self) -> usize { - self.keys.len() - } - fn key_by_pk_index(&self, index: PkIndex) -> &[u8] { let pos = index % MAX_KEYS_PER_BLOCK; self.keys.value(pos as usize) } + #[cfg(test)] fn buffer_memory_size(&self) -> usize { self.keys.get_buffer_memory_size() } diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index 0fdf61b714d7..e0dcad9989f7 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -136,10 +136,6 @@ pub struct ShardReader { } impl ShardReader { - fn shard_id(&self) -> ShardId { - self.shard_id - } - fn is_valid(&self) -> bool { self.parts_reader.is_valid() } diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index bb277eeee691..c63a1b9f261c 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -132,10 +132,6 @@ pub struct ShardBuilderReader { } impl ShardBuilderReader { - pub fn shard_id(&self) -> ShardId { - self.shard_id - } - pub fn is_valid(&self) -> bool { self.data_reader.is_valid() } @@ -164,14 +160,12 @@ impl ShardBuilderReader { #[cfg(test)] mod tests { - use std::sync::Arc; use super::*; - use crate::memtable::merge_tree::dict::KeyDictBuilder; use crate::memtable::merge_tree::metrics::WriteMetrics; use crate::memtable::KeyValues; use crate::test_util::memtable_util::{ - build_key_values_with_ts_seq_values, encode_key_by_kv, encode_keys, metadata_for_test, + build_key_values_with_ts_seq_values, encode_key_by_kv, metadata_for_test, }; fn input_with_key(metadata: &RegionMetadataRef) -> Vec { @@ -203,27 +197,6 @@ mod tests { ] } - fn new_shard_builder( - shard_id: ShardId, - metadata: RegionMetadataRef, - input: &[KeyValues], - ) -> Shard { - let mut dict_builder = KeyDictBuilder::new(1024); - let mut metrics = WriteMetrics::default(); - let mut keys = Vec::with_capacity(input.len()); - for kvs in input { - encode_keys(&metadata, kvs, &mut keys); - } - for key in &keys { - dict_builder.insert_key(key, &mut metrics); - } - - let dict = dict_builder.finish().unwrap(); - let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true); - - Shard::new(shard_id, Some(Arc::new(dict)), data_parts, true) - } - #[test] fn test_write_shard_builder() { let metadata = metadata_for_test(); diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/merge_tree/tree.rs index afa79463e591..7e19a6945432 100644 --- a/src/mito2/src/memtable/merge_tree/tree.rs +++ b/src/mito2/src/memtable/merge_tree/tree.rs @@ -21,8 +21,6 @@ use api::v1::OpType; use common_recordbatch::filter::SimpleFilterEvaluator; use common_time::Timestamp; use datafusion_common::ScalarValue; -use datatypes::arrow; -use datatypes::data_type::ConcreteDataType; use snafu::ensure; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; @@ -35,7 +33,6 @@ use crate::memtable::merge_tree::partition::{ Partition, PartitionKey, PartitionReader, PartitionRef, ReadPartitionContext, }; use crate::memtable::merge_tree::MergeTreeConfig; -use crate::memtable::time_series::primary_key_schema; use crate::memtable::{BoxedBatchIterator, KeyValues}; use crate::read::Batch; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; @@ -144,18 +141,8 @@ impl MergeTree { .unwrap_or_default(); let partitions = self.prune_partitions(&filters); - let pk_schema = primary_key_schema(&self.metadata); - let pk_datatypes = self - .metadata - .primary_key_columns() - .map(|pk| pk.column_schema.data_type.clone()) - .collect(); let mut iter = TreeIter { - metadata: self.metadata.clone(), - pk_schema, - pk_datatypes, - row_codec: self.row_codec.clone(), partitions, current_reader: None, }; @@ -283,10 +270,6 @@ impl MergeTree { } struct TreeIter { - metadata: RegionMetadataRef, - pk_schema: arrow::datatypes::SchemaRef, - pk_datatypes: Vec, - row_codec: Arc, partitions: VecDeque, current_reader: Option, }