Skip to content

Commit

Permalink
partition merges by doc mapper version
Browse files Browse the repository at this point in the history
  • Loading branch information
trinity-1686a committed Jul 12, 2024
1 parent fd89a54 commit 069d7a2
Showing 1 changed file with 82 additions and 25 deletions.
107 changes: 82 additions & 25 deletions quickwit/quickwit-indexing/src/actors/merge_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use async_trait::async_trait;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
use quickwit_metastore::SplitMetadata;
use quickwit_proto::indexing::MergePipelineId;
use quickwit_proto::types::DocMappingUid;
use serde::Serialize;
use tantivy::Inventory;
use time::OffsetDateTime;
Expand All @@ -37,11 +38,26 @@ use crate::merge_policy::MergeOperation;
use crate::models::NewSplits;
use crate::MergePolicy;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct MergePartition {
partition_id: u64,
doc_mapping_uid: DocMappingUid,
}

impl MergePartition {
fn from_split_meta(split_meta: &SplitMetadata) -> MergePartition {
MergePartition {
partition_id: split_meta.partition_id,
doc_mapping_uid: split_meta.doc_mapping_uid,
}
}
}

/// The merge planner decides when to start a merge task.
pub struct MergePlanner {
/// A young split is a split that has not reached maturity
/// yet and can be candidate to merge operations.
partitioned_young_splits: HashMap<u64, Vec<SplitMetadata>>,
partitioned_young_splits: HashMap<MergePartition, Vec<SplitMetadata>>,

/// This set contains all of the split ids that we "acknowledged".
/// The point of this set is to rapidly dismiss redundant `NewSplit` message.
Expand Down Expand Up @@ -228,7 +244,7 @@ impl MergePlanner {
fn record_split(&mut self, new_split: SplitMetadata) {
let splits_for_partition: &mut Vec<SplitMetadata> = self
.partitioned_young_splits
.entry(new_split.partition_id)
.entry(MergePartition::from_split_meta(&new_split))
.or_default();
splits_for_partition.push(new_split);
}
Expand Down Expand Up @@ -326,7 +342,7 @@ mod tests {
use quickwit_config::IndexingSettings;
use quickwit_metastore::{SplitMaturity, SplitMetadata};
use quickwit_proto::indexing::MergePipelineId;
use quickwit_proto::types::{IndexUid, NodeId};
use quickwit_proto::types::{DocMappingUid, IndexUid, NodeId};
use time::OffsetDateTime;

use crate::actors::MergePlanner;
Expand All @@ -339,6 +355,7 @@ mod tests {
index_uid: &IndexUid,
split_id: &str,
partition_id: u64,
doc_mapping_uid: DocMappingUid,
num_docs: usize,
num_merge_ops: usize,
) -> SplitMetadata {
Expand All @@ -354,6 +371,7 @@ mod tests {
maturity: SplitMaturity::Immature {
maturation_period: Duration::from_secs(3600),
},
doc_mapping_uid,
..Default::default()
}
}
Expand All @@ -363,6 +381,8 @@ mod tests {
let node_id = NodeId::from("test-node");
let index_uid = IndexUid::new_with_random_ulid("test-index");
let source_id = "test-source".to_string();
let doc_mapping_uid1 = DocMappingUid::random();
let doc_mapping_uid2 = DocMappingUid::random();
let pipeline_id = MergePipelineId {
node_id,
index_uid: index_uid.clone(),
Expand Down Expand Up @@ -394,8 +414,9 @@ mod tests {
// send one split
let message = NewSplits {
new_splits: vec![
split_metadata_for_test(&index_uid, "1_1", 1, 2500, 0),
split_metadata_for_test(&index_uid, "1_2", 2, 3000, 0),
split_metadata_for_test(&index_uid, "1_1", 1, doc_mapping_uid1, 2500, 0),
split_metadata_for_test(&index_uid, "1v2_1", 1, doc_mapping_uid2, 2500, 0),
split_metadata_for_test(&index_uid, "1_2", 2, doc_mapping_uid1, 3000, 0),
],
};
merge_planner_mailbox.send_message(message).await?;
Expand All @@ -406,8 +427,9 @@ mod tests {
// send two splits with a duplicate
let message = NewSplits {
new_splits: vec![
split_metadata_for_test(&index_uid, "2_1", 1, 2000, 0),
split_metadata_for_test(&index_uid, "1_2", 2, 3000, 0),
split_metadata_for_test(&index_uid, "2_1", 1, doc_mapping_uid1, 2000, 0),
split_metadata_for_test(&index_uid, "2v2_1", 1, doc_mapping_uid2, 2500, 0),
split_metadata_for_test(&index_uid, "1_2", 2, doc_mapping_uid1, 3000, 0),
],
};
merge_planner_mailbox.send_message(message).await?;
Expand All @@ -418,27 +440,41 @@ mod tests {
// send four more splits to generate merge
let message = NewSplits {
new_splits: vec![
split_metadata_for_test(&index_uid, "3_1", 1, 1500, 0),
split_metadata_for_test(&index_uid, "4_1", 1, 1000, 0),
split_metadata_for_test(&index_uid, "2_2", 2, 2000, 0),
split_metadata_for_test(&index_uid, "3_2", 2, 4000, 0),
split_metadata_for_test(&index_uid, "3_1", 1, doc_mapping_uid1, 1500, 0),
split_metadata_for_test(&index_uid, "4_1", 1, doc_mapping_uid1, 1000, 0),
split_metadata_for_test(&index_uid, "3v2_1", 1, doc_mapping_uid2, 1500, 0),
split_metadata_for_test(&index_uid, "2_2", 2, doc_mapping_uid1, 2000, 0),
split_metadata_for_test(&index_uid, "3_2", 2, doc_mapping_uid1, 4000, 0),
],
};
merge_planner_mailbox.send_message(message).await?;
merge_planner_handle.process_pending_and_observe().await;
let operations = merge_split_downloader_inbox.drain_for_test_typed::<MergeTask>();
assert_eq!(operations.len(), 2);
let mut merge_operations = operations.into_iter().sorted_by(|left_op, right_op| {
left_op.splits[0]
.partition_id
.cmp(&right_op.splits[0].partition_id)
});
assert_eq!(operations.len(), 3);
let mut merge_operations = operations
.into_iter()
.sorted_by_key(|op| (op.splits[0].partition_id, op.splits[0].doc_mapping_uid));

let first_merge_operation = merge_operations.next().unwrap();
assert_eq!(first_merge_operation.splits.len(), 4);
assert!(first_merge_operation
.splits
.iter()
.all(|split| split.partition_id == 1 && split.doc_mapping_uid == doc_mapping_uid1));

let second_merge_operation = merge_operations.next().unwrap();
assert_eq!(second_merge_operation.splits.len(), 3);
assert!(second_merge_operation
.splits
.iter()
.all(|split| split.partition_id == 1 && split.doc_mapping_uid == doc_mapping_uid2));

let third_merge_operation = merge_operations.next().unwrap();
assert_eq!(third_merge_operation.splits.len(), 3);
assert!(third_merge_operation
.splits
.iter()
.all(|split| split.partition_id == 2 && split.doc_mapping_uid == doc_mapping_uid1));
}
universe.assert_quit().await;

Expand All @@ -451,6 +487,7 @@ mod tests {
let node_id = NodeId::from("test-node");
let index_uid = IndexUid::new_with_random_ulid("test-index");
let source_id = "test-source".to_string();
let doc_mapping_uid = DocMappingUid::random();
let pipeline_id = MergePipelineId {
node_id,
index_uid: index_uid.clone(),
Expand All @@ -472,12 +509,20 @@ mod tests {
};
let immature_splits = vec![
split_metadata_for_test(
&index_uid, "a_small", 0, // partition_id
1_000_000, 2,
&index_uid,
"a_small",
0, // partition_id
doc_mapping_uid,
1_000_000,
2,
),
split_metadata_for_test(
&index_uid, "b_small", 0, // partition_id
1_000_000, 2,
&index_uid,
"b_small",
0, // partition_id
doc_mapping_uid,
1_000_000,
2,
),
];
let merge_policy: Arc<dyn MergePolicy> = merge_policy_from_settings(&indexing_settings);
Expand Down Expand Up @@ -527,6 +572,7 @@ mod tests {
let node_id = NodeId::from("test-node");
let index_uid = IndexUid::new_with_random_ulid("test-index");
let source_id = "test-source".to_string();
let doc_mapping_uid = DocMappingUid::random();
let pipeline_id = MergePipelineId {
node_id,
index_uid,
Expand Down Expand Up @@ -558,13 +604,15 @@ mod tests {
&other_index_uid,
"a_small",
0, // partition_id
doc_mapping_uid,
1_000_000,
2,
),
split_metadata_for_test(
&other_index_uid,
"b_small",
0, // partition_id
doc_mapping_uid,
1_000_000,
2,
),
Expand Down Expand Up @@ -595,6 +643,7 @@ mod tests {
let node_id = NodeId::from("test-node");
let index_uid = IndexUid::new_with_random_ulid("test-index");
let source_id = "test-source".to_string();
let doc_mapping_uid = DocMappingUid::random();
let pipeline_id = MergePipelineId {
node_id,
index_uid: index_uid.clone(),
Expand All @@ -617,12 +666,20 @@ mod tests {
};
let immature_splits = vec![
split_metadata_for_test(
&index_uid, "a_small", 0, // partition_id
1_000_000, 2,
&index_uid,
"a_small",
0, // partition_id
doc_mapping_uid,
1_000_000,
2,
),
split_metadata_for_test(
&index_uid, "b_small", 0, // partition_id
1_000_000, 2,
&index_uid,
"b_small",
0, // partition_id
doc_mapping_uid,
1_000_000,
2,
),
];
let merge_policy: Arc<dyn MergePolicy> = merge_policy_from_settings(&indexing_settings);
Expand Down

0 comments on commit 069d7a2

Please sign in to comment.