Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use a partition level map to look up pk index #3400

Merged
merged 5 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,4 +459,73 @@ mod tests {
}
assert_eq!(expect, v0_all);
}

#[test]
fn test_write_iter_multi_keys() {
write_iter_multi_keys(1, 100);
write_iter_multi_keys(2, 100);
write_iter_multi_keys(4, 100);
write_iter_multi_keys(8, 5);
write_iter_multi_keys(2, 10);
}

fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true);
let memtable = MergeTreeMemtable::new(
1,
metadata.clone(),
None,
&MergeTreeConfig {
index_max_keys_per_shard: max_keys,
data_freeze_threshold: freeze_threshold,
..Default::default()
},
);

let mut data = Vec::new();
// 4 partitions, each partition 4 pks.
for i in 0..4 {
for j in 0..4 {
// key: i, a{j}
let timestamps = [11, 13, 1, 5, 3, 7, 9];
let key = format!("a{j}");
let kvs =
memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 0);
memtable.write(&kvs).unwrap();
for ts in timestamps {
data.push((i, key.clone(), ts));
}
}
for j in 0..4 {
// key: i, a{j}
let timestamps = [10, 2, 4, 8, 6];
let key = format!("a{j}");
let kvs =
memtable_util::build_key_values(&metadata, key.clone(), i, &timestamps, 200);
memtable.write(&kvs).unwrap();
for ts in timestamps {
data.push((i, key.clone(), ts));
}
}
}
data.sort_unstable();

let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<Vec<_>>();
assert_eq!(expect, read);
}
}
21 changes: 6 additions & 15 deletions src/mito2/src/memtable/merge_tree/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,19 +108,17 @@ impl KeyDictBuilder {
}

/// Finishes the builder.
pub fn finish(&mut self) -> Option<KeyDict> {
pub fn finish(&mut self, pk_to_index: &mut BTreeMap<Vec<u8>, PkIndex>) -> Option<KeyDict> {
if self.key_buffer.is_empty() {
return None;
}

// Finishes current dict block and resets the pk index.
let dict_block = self.key_buffer.finish(true);
self.dict_blocks.push(dict_block);
// Takes the pk to index map.
let mut pk_to_index = std::mem::take(&mut self.pk_to_index);
// Computes key position and then alter pk index.
let mut key_positions = vec![0; pk_to_index.len()];
for (i, pk_index) in pk_to_index.values_mut().enumerate() {
let mut key_positions = vec![0; self.pk_to_index.len()];
for (i, pk_index) in self.pk_to_index.values_mut().enumerate() {
// The position of the i-th key is the old pk index.
key_positions[i] = *pk_index;
// Overwrites the pk index.
Expand All @@ -129,9 +127,9 @@ impl KeyDictBuilder {
self.num_keys = 0;
let key_bytes_in_index = self.key_bytes_in_index;
self.key_bytes_in_index = 0;
*pk_to_index = std::mem::take(&mut self.pk_to_index);

Some(KeyDict {
pk_to_index,
dict_blocks: std::mem::take(&mut self.dict_blocks),
key_positions,
key_bytes_in_index,
Expand Down Expand Up @@ -214,8 +212,6 @@ fn compute_pk_weights(sorted_pk_indices: &[PkIndex], pk_weights: &mut Vec<u16>)
#[derive(Default)]
pub struct KeyDict {
// TODO(yingwen): We can use key_positions to do a binary search.
/// Key map to find a key in the dict.
pk_to_index: PkIndexMap,
/// Unsorted key blocks.
dict_blocks: Vec<DictBlock>,
/// Maps pk index to position of the key in [Self::dict_blocks].
Expand All @@ -237,11 +233,6 @@ impl KeyDict {
self.dict_blocks[block_index as usize].key_by_pk_index(position)
}

/// Gets the pk index by the key.
pub fn get_pk_index(&self, key: &[u8]) -> Option<PkIndex> {
self.pk_to_index.get(key).copied()
}

/// Returns pk weights to sort a data part and replaces pk indices.
pub(crate) fn pk_weights_to_sort_data(&self) -> Vec<u16> {
let mut pk_weights = Vec::with_capacity(self.key_positions.len());
Expand Down Expand Up @@ -442,7 +433,7 @@ mod tests {
assert_eq!(key_bytes, builder.key_bytes_in_index);
assert_eq!(8850, builder.memory_size());

let dict = builder.finish().unwrap();
let dict = builder.finish(&mut BTreeMap::new()).unwrap();
assert_eq!(0, builder.key_bytes_in_index);
assert_eq!(key_bytes, dict.key_bytes_in_index);
assert!(dict.shared_memory_size() > key_bytes);
Expand All @@ -458,7 +449,7 @@ mod tests {
builder.insert_key(key.as_bytes(), &mut metrics);
}
assert!(builder.is_full());
builder.finish();
builder.finish(&mut BTreeMap::new());

assert!(!builder.is_full());
assert_eq!(0, builder.insert_key(b"a0", &mut metrics));
Expand Down
67 changes: 40 additions & 27 deletions src/mito2/src/memtable/merge_tree/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//!
//! We only support partitioning the tree by pre-defined internal columns.

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock};

use api::v1::SemanticType;
Expand Down Expand Up @@ -67,17 +67,18 @@ impl Partition {
metrics: &mut WriteMetrics,
) -> Result<()> {
let mut inner = self.inner.write().unwrap();
// Now we ensure one key only exists in one shard.
// Freeze the shard builder if needed.
if inner.shard_builder.should_freeze() {
inner.freeze_active_shard()?;
}

// Finds key in shards, now we ensure one key only exists in one shard.
if let Some(pk_id) = inner.find_key_in_shards(primary_key) {
// Key already in shards.
inner.write_to_shard(pk_id, key_value);
return Ok(());
}

if inner.shard_builder.should_freeze() {
inner.freeze_active_shard()?;
}

// Write to the shard builder.
inner
.shard_builder
Expand Down Expand Up @@ -142,26 +143,35 @@ impl Partition {
///
/// Must freeze the partition before fork.
pub fn fork(&self, metadata: &RegionMetadataRef, config: &MergeTreeConfig) -> Partition {
let inner = self.inner.read().unwrap();
debug_assert!(inner.shard_builder.is_empty());
// TODO(yingwen): TTL or evict shards.
let shard_builder = ShardBuilder::new(
metadata.clone(),
config,
inner.shard_builder.current_shard_id(),
);
let shards = inner
.shards
.iter()
.map(|shard| shard.fork(metadata.clone()))
.collect();
let (shards, shard_builder) = {
let inner = self.inner.read().unwrap();
debug_assert!(inner.shard_builder.is_empty());
let shard_builder = ShardBuilder::new(
metadata.clone(),
config,
inner.shard_builder.current_shard_id(),
);
let shards = inner
.shards
.iter()
.map(|shard| shard.fork(metadata.clone()))
.collect();

(shards, shard_builder)
};
let pk_to_pk_id = {
let mut inner = self.inner.write().unwrap();
std::mem::take(&mut inner.pk_to_pk_id)
};

Partition {
inner: RwLock::new(Inner {
metadata: metadata.clone(),
shard_builder,
shards,
num_rows: 0,
pk_to_pk_id,
frozen: false,
}),
dedup: self.dedup,
}
Expand Down Expand Up @@ -461,11 +471,14 @@ fn data_batch_to_batch(
/// A key only exists in one shard.
struct Inner {
metadata: RegionMetadataRef,
/// Map to index pk to pk id.
pk_to_pk_id: HashMap<Vec<u8>, PkId>,
/// Shard whose dictionary is active.
shard_builder: ShardBuilder,
/// Shards with frozen dictionary.
shards: Vec<Shard>,
num_rows: usize,
frozen: bool,
}

impl Inner {
Expand All @@ -479,20 +492,17 @@ impl Inner {
let shard_builder = ShardBuilder::new(metadata.clone(), config, current_shard_id);
Self {
metadata,
pk_to_pk_id: HashMap::new(),
shard_builder,
shards,
num_rows: 0,
frozen: false,
}
}

fn find_key_in_shards(&self, primary_key: &[u8]) -> Option<PkId> {
for shard in &self.shards {
if let Some(pkid) = shard.find_id_by_key(primary_key) {
return Some(pkid);
}
}

None
assert!(!self.frozen);
self.pk_to_pk_id.get(primary_key).copied()
}

fn write_to_shard(&mut self, pk_id: PkId, key_value: KeyValue) {
Expand All @@ -506,7 +516,10 @@ impl Inner {
}

fn freeze_active_shard(&mut self) -> Result<()> {
if let Some(shard) = self.shard_builder.finish(self.metadata.clone())? {
if let Some(shard) = self
.shard_builder
.finish(self.metadata.clone(), &mut self.pk_to_pk_id)?
{
self.shards.push(shard);
}
Ok(())
Expand Down
Loading
Loading