Skip to content

Commit

Permalink
add doc mapping uid to splits
Browse files Browse the repository at this point in the history
  • Loading branch information
trinity-1686a committed Jul 12, 2024
1 parent 4de8d7a commit fd89a54
Show file tree
Hide file tree
Showing 17 changed files with 576 additions and 528 deletions.
5 changes: 4 additions & 1 deletion quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use quickwit_proto::indexing::{IndexingPipelineId, PipelineMetrics};
use quickwit_proto::metastore::{
LastDeleteOpstampRequest, MetastoreService, MetastoreServiceClient,
};
use quickwit_proto::types::PublishToken;
use quickwit_proto::types::{DocMappingUid, PublishToken};
use quickwit_query::get_quickwit_fastfield_normalizer_manager;
use serde::Serialize;
use tantivy::schema::Schema;
Expand Down Expand Up @@ -98,6 +98,7 @@ struct IndexerState {
publish_lock: PublishLock,
publish_token_opt: Option<PublishToken>,
schema: Schema,
doc_mapping_uid: DocMappingUid,
tokenizer_manager: TokenizerManager,
max_num_partitions: NonZeroU32,
index_settings: IndexSettings,
Expand Down Expand Up @@ -130,6 +131,7 @@ impl IndexerState {
self.pipeline_id.clone(),
partition_id,
last_delete_opstamp,
self.doc_mapping_uid,
self.indexing_directory.clone(),
index_builder,
io_controls,
Expand Down Expand Up @@ -572,6 +574,7 @@ impl Indexer {
publish_lock: PublishLock::default(),
publish_token_opt: None,
schema,
doc_mapping_uid: doc_mapper.doc_mapping_uid(),
tokenizer_manager: tokenizer_manager.tantivy_manager().clone(),
index_settings,
max_num_partitions: doc_mapper.max_num_partitions(),
Expand Down
20 changes: 16 additions & 4 deletions quickwit/quickwit-indexing/src/actors/merge_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ pub fn merge_split_attrs(
pipeline_id: MergePipelineId,
merge_split_id: SplitId,
splits: &[SplitMetadata],
) -> SplitAttrs {
) -> anyhow::Result<SplitAttrs> {
let partition_id = combine_partition_ids_aux(splits.iter().map(|split| split.partition_id));
let time_range: Option<RangeInclusive<DateTime>> = merge_time_range(splits);
let uncompressed_docs_size_in_bytes = sum_doc_sizes_in_bytes(splits);
Expand All @@ -250,10 +250,21 @@ pub fn merge_split_attrs(
.map(|split| split.delete_opstamp)
.min()
.unwrap_or(0);
SplitAttrs {
let doc_mapping_uid = splits
.first()
.ok_or_else(|| anyhow::anyhow!("attempted to merge zero splits"))?
.doc_mapping_uid;
if splits
.iter()
.any(|split| split.doc_mapping_uid != doc_mapping_uid)
{
anyhow::bail!("attempted to merge splits with different doc mapping uid");
}
Ok(SplitAttrs {
node_id: pipeline_id.node_id.clone(),
index_uid: pipeline_id.index_uid.clone(),
source_id: pipeline_id.source_id.clone(),
doc_mapping_uid,
split_id: merge_split_id,
partition_id,
replaced_split_ids,
Expand All @@ -262,7 +273,7 @@ pub fn merge_split_attrs(
uncompressed_docs_size_in_bytes,
delete_opstamp,
num_merge_ops: max_merge_ops(splits) + 1,
}
})
}

fn max_merge_ops(splits: &[SplitMetadata]) -> usize {
Expand Down Expand Up @@ -324,7 +335,7 @@ impl MergeExecutor {
)?;
ctx.record_progress();

let split_attrs = merge_split_attrs(self.pipeline_id.clone(), merge_split_id, &splits);
let split_attrs = merge_split_attrs(self.pipeline_id.clone(), merge_split_id, &splits)?;
Ok(IndexedSplit {
split_attrs,
index: merged_index,
Expand Down Expand Up @@ -436,6 +447,7 @@ impl MergeExecutor {
node_id: NodeId::new(split.node_id),
index_uid: split.index_uid,
source_id: split.source_id,
doc_mapping_uid: split.doc_mapping_uid,
split_id: merge_split_id,
partition_id: split.partition_id,
replaced_split_ids: vec![split.split_id.clone()],
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-indexing/src/actors/packager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ mod tests {
use quickwit_actors::{ObservationType, Universe};
use quickwit_metastore::checkpoint::IndexCheckpointDelta;
use quickwit_proto::search::{deserialize_split_fields, ListFieldsEntryResponse};
use quickwit_proto::types::{IndexUid, NodeId};
use quickwit_proto::types::{DocMappingUid, IndexUid, NodeId};
use tantivy::directory::MmapDirectory;
use tantivy::schema::{NumericOptions, Schema, Type, FAST, STRING, TEXT};
use tantivy::{doc, DateTime, IndexBuilder, IndexSettings};
Expand Down Expand Up @@ -519,6 +519,7 @@ mod tests {
node_id,
index_uid,
source_id,
doc_mapping_uid: DocMappingUid::default(),
split_id: "test-split".to_string(),
partition_id: 17u64,
num_docs,
Expand Down
7 changes: 6 additions & 1 deletion quickwit/quickwit-indexing/src/actors/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ mod tests {
use quickwit_common::temp_dir::TempDirectory;
use quickwit_metastore::checkpoint::{IndexCheckpointDelta, SourceCheckpointDelta};
use quickwit_proto::metastore::{EmptyResponse, MockMetastoreService};
use quickwit_proto::types::NodeId;
use quickwit_proto::types::{DocMappingUid, NodeId};
use quickwit_storage::RamStorage;
use tantivy::DateTime;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -552,6 +552,7 @@ mod tests {
node_id,
index_uid,
source_id,
doc_mapping_uid: DocMappingUid::default(),
partition_id: 3u64,
time_range: Some(
DateTime::from_timestamp_secs(1_628_203_589)
Expand Down Expand Up @@ -661,6 +662,7 @@ mod tests {
node_id: node_id.clone(),
index_uid: index_uid.clone(),
source_id: source_id.clone(),
doc_mapping_uid: DocMappingUid::default(),
split_id: "test-split-1".to_string(),
partition_id: 3u64,
num_docs: 10,
Expand All @@ -687,6 +689,7 @@ mod tests {
node_id,
index_uid,
source_id,
doc_mapping_uid: DocMappingUid::default(),
split_id: "test-split-2".to_string(),
partition_id: 3u64,
num_docs: 10,
Expand Down Expand Up @@ -811,6 +814,7 @@ mod tests {
node_id,
index_uid,
source_id,
doc_mapping_uid: DocMappingUid::default(),
split_id: "test-split".to_string(),
partition_id: 3u64,
time_range: None,
Expand Down Expand Up @@ -987,6 +991,7 @@ mod tests {
node_id,
index_uid,
source_id,
doc_mapping_uid: DocMappingUid::default(),
partition_id: 3u64,
time_range: Some(
DateTime::from_timestamp_secs(1_628_203_589)
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/merge_policy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ pub mod tests {
index_uid: IndexUid::new_with_random_ulid("test_index"),
source_id: "test_source".to_string(),
};
let split_attrs = merge_split_attrs(pipeline_id, merged_split_id, splits);
let split_attrs = merge_split_attrs(pipeline_id, merged_split_id, splits).unwrap();
create_split_metadata(merge_policy, &split_attrs, tags, 0..0)
}

Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-indexing/src/models/indexed_split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use quickwit_common::metrics::GaugeGuard;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_metastore::checkpoint::IndexCheckpointDelta;
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::types::{IndexUid, PublishToken};
use quickwit_proto::types::{DocMappingUid, IndexUid, PublishToken};
use tantivy::directory::MmapDirectory;
use tantivy::IndexBuilder;
use tracing::{instrument, Span};
Expand Down Expand Up @@ -82,6 +82,7 @@ impl IndexedSplitBuilder {
pipeline_id: IndexingPipelineId,
partition_id: u64,
last_delete_opstamp: u64,
doc_mapping_uid: DocMappingUid,
scratch_directory: TempDirectory,
index_builder: IndexBuilder,
io_controls: IoControls,
Expand All @@ -105,6 +106,7 @@ impl IndexedSplitBuilder {
node_id: pipeline_id.node_id,
index_uid: pipeline_id.index_uid,
source_id: pipeline_id.source_id,
doc_mapping_uid,
partition_id,
split_id,
num_docs: 0,
Expand Down
6 changes: 5 additions & 1 deletion quickwit/quickwit-indexing/src/models/split_attrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::ops::{Range, RangeInclusive};
use std::sync::Arc;

use quickwit_metastore::SplitMetadata;
use quickwit_proto::types::{IndexUid, NodeId, SourceId, SplitId};
use quickwit_proto::types::{DocMappingUid, IndexUid, NodeId, SourceId, SplitId};
use tantivy::DateTime;
use time::OffsetDateTime;

Expand All @@ -37,6 +37,9 @@ pub struct SplitAttrs {
/// Source ID to which the split belongs.
pub source_id: SourceId,

/// Doc mapping UID used to produce this split.
pub doc_mapping_uid: DocMappingUid,

/// Split ID. Joined with the index URI (<index URI>/<split ID>), this ID
/// should be enough to uniquely identify a split.
/// In reality, some information may be implicitly configured
Expand Down Expand Up @@ -100,6 +103,7 @@ pub fn create_split_metadata(
node_id: split_attrs.node_id.to_string(),
index_uid: split_attrs.index_uid.clone(),
source_id: split_attrs.source_id.clone(),
doc_mapping_uid: split_attrs.doc_mapping_uid,
split_id: split_attrs.split_id.clone(),
partition_id: split_attrs.partition_id,
num_docs: split_attrs.num_docs as usize,
Expand Down
8 changes: 7 additions & 1 deletion quickwit/quickwit-metastore/src/split_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::str::FromStr;
use std::time::Duration;

use bytesize::ByteSize;
use quickwit_proto::types::{IndexUid, SourceId, SplitId};
use quickwit_proto::types::{DocMappingUid, IndexUid, SourceId, SplitId};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DurationMilliSeconds};
use time::OffsetDateTime;
Expand Down Expand Up @@ -133,6 +133,10 @@ pub struct SplitMetadata {
/// Number of merge operations that was involved to create
/// this split.
pub num_merge_ops: usize,

/// Doc mapping UID used when creating this split. This split may only be merged with other
/// splits using the same doc mapping UID.
pub doc_mapping_uid: DocMappingUid,
}

impl fmt::Debug for SplitMetadata {
Expand Down Expand Up @@ -281,6 +285,7 @@ impl quickwit_config::TestableForRegression for SplitMetadata {
tags: ["234".to_string(), "aaa".to_string()].into_iter().collect(),
footer_offsets: 1000..2000,
num_merge_ops: 3,
doc_mapping_uid: DocMappingUid::default(),
}
}

Expand Down Expand Up @@ -420,6 +425,7 @@ mod tests {
footer_offsets: 0..1024,
delete_opstamp: 0,
num_merge_ops: 0,
doc_mapping_uid: DocMappingUid::default(),
};

let expected_output = "SplitMetadata { split_id: \"split-1\", index_uid: IndexUid { \
Expand Down
9 changes: 8 additions & 1 deletion quickwit/quickwit-metastore/src/split_metadata_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::collections::BTreeSet;
use std::ops::{Range, RangeInclusive};

use quickwit_proto::types::{IndexUid, SplitId};
use quickwit_proto::types::{DocMappingUid, IndexUid, SplitId};
use serde::{Deserialize, Serialize};

use crate::split_metadata::{utc_now_timestamp, SplitMaturity};
Expand Down Expand Up @@ -92,6 +92,11 @@ pub(crate) struct SplitMetadataV0_8 {

#[serde(default)]
num_merge_ops: usize,

// we default fill with zero: we don't know the right uid, and it's correct to assume all
// splits before when updates first appeared are compatible with each other.
#[serde(default)]
doc_mapping_uid: DocMappingUid,
}

impl From<SplitMetadataV0_8> for SplitMetadata {
Expand Down Expand Up @@ -127,6 +132,7 @@ impl From<SplitMetadataV0_8> for SplitMetadata {
tags: v8.tags,
footer_offsets: v8.footer_offsets,
num_merge_ops: v8.num_merge_ops,
doc_mapping_uid: v8.doc_mapping_uid,
}
}
}
Expand All @@ -148,6 +154,7 @@ impl From<SplitMetadata> for SplitMetadataV0_8 {
tags: split.tags,
footer_offsets: split.footer_offsets,
num_merge_ops: split.num_merge_ops,
doc_mapping_uid: split.doc_mapping_uid,
}
}
}
Expand Down
Loading

0 comments on commit fd89a54

Please sign in to comment.