Skip to content

Commit

Permalink
Add doc mapping UID attribute to DocMapping (#5087)
Browse files Browse the repository at this point in the history
* Add `doc_mapping_uid` field to `DocMapper`

* Bump config file format version

* Leave doc mapping UID out of doc mapping comparison
  • Loading branch information
guilload authored Jun 19, 2024
1 parent 3758434 commit 5b85f7c
Show file tree
Hide file tree
Showing 44 changed files with 1,256 additions and 81 deletions.
2 changes: 2 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ ttl_cache = "0.5"
typetag = "0.2"
ulid = "1.1"
username = "0.2"
utoipa = "4.2.0"
utoipa = { version = "4.2", features = ["time", "ulid"] }
uuid = { version = "1.8", features = ["v4", "serde"] }
vrl = { version = "0.8.1", default-features = false, features = [
"compiler",
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
.ask_for_res(SpawnPipeline {
index_id: args.index_id.clone(),
source_config,
pipeline_uid: PipelineUid::new(),
pipeline_uid: PipelineUid::random(),
})
.await?;
let merge_pipeline_handle = indexing_server_mailbox
Expand Down Expand Up @@ -611,7 +611,7 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
transform_config: None,
input_format: SourceInputFormat::Json,
},
pipeline_uid: PipelineUid::new(),
pipeline_uid: PipelineUid::random(),
})
.await?;
let pipeline_handle: ActorHandle<MergePipeline> = indexing_service_mailbox
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ quickwit-proto = { workspace = true }
[dev-dependencies]
tokio = { workspace = true }

quickwit-proto = { workspace = true, features = ["testsuite"] }

[features]
testsuite = []
vrl = ["dep:vrl"]
22 changes: 15 additions & 7 deletions quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

pub(crate) mod serialize;

use std::collections::BTreeSet;
use std::num::NonZeroU32;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -31,15 +29,14 @@ use chrono::Utc;
use cron::Schedule;
use humantime::parse_duration;
use quickwit_common::uri::Uri;
use quickwit_doc_mapper::{DefaultDocMapperBuilder, DocMapper, DocMapping, Mode};
use quickwit_doc_mapper::{DefaultDocMapperBuilder, DocMapper, DocMapping};
use quickwit_proto::types::IndexId;
use serde::{Deserialize, Serialize};
pub use serialize::{load_index_config_from_user_config, load_index_config_update};
use tracing::warn;

use crate::index_config::serialize::VersionedIndexConfig;
use crate::merge_policy_config::{MergePolicyConfig, StableLogMergePolicyConfig};
use crate::TestableForRegression;
use crate::merge_policy_config::MergePolicyConfig;

#[derive(Clone, Debug, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -260,6 +257,7 @@ impl IndexConfig {
pub fn for_test(index_id: &str, index_uri: &str) -> Self {
let index_uri = Uri::from_str(index_uri).unwrap();
let doc_mapping_json = r#"{
"doc_mapping_uid": "00000000000000000000000000",
"mode": "lenient",
"field_mappings": [
{
Expand Down Expand Up @@ -342,8 +340,17 @@ impl IndexConfig {
}
}

impl TestableForRegression for IndexConfig {
#[cfg(any(test, feature = "testsuite"))]
impl crate::TestableForRegression for IndexConfig {
fn sample_for_regression() -> Self {
use std::collections::BTreeSet;
use std::num::NonZeroU32;

use quickwit_doc_mapper::Mode;
use quickwit_proto::types::DocMappingUid;

use crate::merge_policy_config::StableLogMergePolicyConfig;

let tenant_id_mapping = serde_json::from_str(
r#"{
"name": "tenant_id",
Expand Down Expand Up @@ -386,6 +393,7 @@ impl TestableForRegression for IndexConfig {
)
.unwrap();
let doc_mapping = DocMapping {
doc_mapping_uid: DocMappingUid::for_test(1),
mode: Mode::default(),
field_mappings: vec![
tenant_id_mapping,
Expand Down Expand Up @@ -550,7 +558,7 @@ mod tests {
assert_eq!(index_config.indexing_settings.commit_timeout_secs, 61);
assert_eq!(
index_config.indexing_settings.merge_policy,
MergePolicyConfig::StableLog(StableLogMergePolicyConfig {
MergePolicyConfig::StableLog(crate::StableLogMergePolicyConfig {
merge_factor: 9,
max_merge_factor: 11,
maturation_period: Duration::from_secs(48 * 3600),
Expand Down
24 changes: 18 additions & 6 deletions quickwit/quickwit-config/src/index_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use anyhow::{ensure, Context};
use quickwit_common::uri::Uri;
use quickwit_proto::types::IndexId;
use quickwit_proto::types::{DocMappingUid, IndexId};
use serde::{Deserialize, Serialize};
use tracing::info;

Expand All @@ -35,8 +35,12 @@ type IndexConfigForSerialization = IndexConfigV0_8;
#[derive(Clone, Debug, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(tag = "version")]
pub(crate) enum VersionedIndexConfig {
#[serde(rename = "0.8")]
// The two versions use the same format but for v0.8 and below, we need to set the
// `doc_mapping_uid` to the nil value upon deserialization.
#[serde(rename = "0.9")]
V0_9(IndexConfigV0_8),
// Retro compatibility
#[serde(rename = "0.8")]
#[serde(alias = "0.7")]
V0_8(IndexConfigV0_8),
}
Expand All @@ -45,6 +49,7 @@ impl From<VersionedIndexConfig> for IndexConfigForSerialization {
fn from(versioned_config: VersionedIndexConfig) -> IndexConfigForSerialization {
match versioned_config {
VersionedIndexConfig::V0_8(v0_8) => v0_8,
VersionedIndexConfig::V0_9(v0_8) => v0_8,
}
}
}
Expand Down Expand Up @@ -75,7 +80,7 @@ pub fn load_index_config_update(
let current_index_parent_dir = &current_index_config
.index_uri
.parent()
.context("Unexpected `index_uri` format on current configuration")?;
.expect("index URI should have a parent");
let new_index_config = load_index_config_from_user_config(
config_format,
index_config_bytes,
Expand All @@ -94,7 +99,9 @@ pub fn load_index_config_update(
new_index_config.index_uri
);
ensure!(
current_index_config.doc_mapping == new_index_config.doc_mapping,
current_index_config
.doc_mapping
.eq_ignore_doc_mapping_uid(&new_index_config.doc_mapping),
"`doc_mapping` cannot be updated"
);
Ok(new_index_config)
Expand Down Expand Up @@ -147,7 +154,7 @@ impl IndexConfigForSerialization {

impl From<IndexConfig> for VersionedIndexConfig {
fn from(index_config: IndexConfig) -> Self {
VersionedIndexConfig::V0_8(index_config.into())
VersionedIndexConfig::V0_9(index_config.into())
}
}

Expand All @@ -156,7 +163,12 @@ impl TryFrom<VersionedIndexConfig> for IndexConfig {

fn try_from(versioned_index_config: VersionedIndexConfig) -> anyhow::Result<Self> {
match versioned_index_config {
VersionedIndexConfig::V0_8(v0_8) => v0_8.build_and_validate(None),
VersionedIndexConfig::V0_8(mut v0_8) => {
// Override the randomly generated doc mapping UID with the nil value.
v0_8.doc_mapping.doc_mapping_uid = DocMappingUid::default();
v0_8.build_and_validate(None)
}
VersionedIndexConfig::V0_9(v0_8) => v0_8.build_and_validate(None),
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions quickwit/quickwit-config/src/index_template/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub use serialize::{IndexTemplateV0_8, VersionedIndexTemplate};
use crate::index_config::validate_index_config;
use crate::{
validate_identifier, validate_index_id_pattern, DocMapping, IndexConfig, IndexingSettings,
RetentionPolicy, SearchSettings, TestableForRegression,
RetentionPolicy, SearchSettings,
};

pub type IndexTemplateId = String;
Expand Down Expand Up @@ -135,7 +135,8 @@ impl IndexTemplate {
}
}

impl TestableForRegression for IndexTemplate {
#[cfg(any(test, feature = "testsuite"))]
impl crate::TestableForRegression for IndexTemplate {
fn sample_for_regression() -> Self {
let template_id = "test-template".to_string();
let index_id_patterns = vec![
Expand All @@ -144,6 +145,7 @@ impl TestableForRegression for IndexTemplate {
];

let doc_mapping_json = r#"{
"doc_mapping_uid": "00000000000000000000000001",
"field_mappings": [
{
"name": "ts",
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-config/src/index_template/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use crate::{DocMapping, IndexingSettings, RetentionPolicy, SearchSettings};
#[derive(Clone, Debug, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(tag = "version")]
pub enum VersionedIndexTemplate {
#[serde(rename = "0.8")]
#[serde(rename = "0.9")]
#[serde(alias = "0.8")]
#[serde(alias = "0.7")]
V0_8(IndexTemplateV0_8),
}
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub use serialize::load_source_config_from_user_config;
// For backward compatibility.
use serialize::VersionedSourceConfig;

use crate::{disable_ingest_v1, enable_ingest_v2, TestableForRegression};
use crate::{disable_ingest_v1, enable_ingest_v2};

/// Reserved source ID for the `quickwit index ingest` CLI command.
pub const CLI_SOURCE_ID: &str = "_ingest-cli-source";
Expand Down Expand Up @@ -153,7 +153,8 @@ impl SourceConfig {
}
}

impl TestableForRegression for SourceConfig {
#[cfg(any(test, feature = "testsuite"))]
impl crate::TestableForRegression for SourceConfig {
fn sample_for_regression() -> Self {
SourceConfig {
source_id: "kafka-source".to_string(),
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-config/src/source_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ type SourceConfigForSerialization = SourceConfigV0_8;
#[serde(deny_unknown_fields)]
#[serde(tag = "version")]
pub enum VersionedSourceConfig {
#[serde(rename = "0.8")]
#[serde(rename = "0.9")]
#[serde(alias = "0.8")]
V0_8(SourceConfigV0_8),
// Retro compatibility.
#[serde(rename = "0.7")]
Expand Down
10 changes: 6 additions & 4 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1690,10 +1690,12 @@ mod tests {
.unwrap();
assert!(indexing_tasks.is_empty());

let results: Vec<ApplyIndexingPlanRequest> =
client_inbox.drain_for_test_typed::<ApplyIndexingPlanRequest>();
assert_eq!(results.len(), 1);
assert!(results[0].indexing_tasks.is_empty());
let apply_plan_requests = client_inbox.drain_for_test_typed::<ApplyIndexingPlanRequest>();
assert!(!apply_plan_requests.is_empty());

for apply_plan_request in &apply_plan_requests {
assert!(apply_plan_request.indexing_tasks.is_empty());
}

universe.assert_quit().await;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source(
IndexingTask {
index_uid: Some(source.source_uid.index_uid.clone()),
source_id: source.source_uid.source_id.clone(),
pipeline_uid: Some(PipelineUid::new()),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: Vec::new(),
}
});
Expand All @@ -283,7 +283,7 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source(
vec![IndexingTask {
index_uid: Some(source.source_uid.index_uid.clone()),
source_id: source.source_uid.source_id.clone(),
pipeline_uid: Some(PipelineUid::new()),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: Vec::new(),
}]
}
Expand Down Expand Up @@ -551,7 +551,7 @@ fn add_shard_to_indexer(
indexer_tasks.push(IndexingTask {
index_uid: Some(source_uid.index_uid.clone()),
source_id: source_uid.source_id.clone(),
pipeline_uid: Some(PipelineUid::new()),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: vec![missing_shard],
});
}
Expand Down Expand Up @@ -1189,13 +1189,13 @@ mod tests {
let previous_task1 = IndexingTask {
index_uid: Some(source_uid.index_uid.clone()),
source_id: source_uid.source_id.to_string(),
pipeline_uid: Some(PipelineUid::new()),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: vec![ShardId::from(1), ShardId::from(4), ShardId::from(5)],
};
let previous_task2 = IndexingTask {
index_uid: Some(source_uid.index_uid.clone()),
source_id: source_uid.source_id.to_string(),
pipeline_uid: Some(PipelineUid::new()),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: vec![
ShardId::from(6),
ShardId::from(7),
Expand Down Expand Up @@ -1259,14 +1259,14 @@ mod tests {
index_uid: IndexUid::new_with_random_ulid("testindex"),
source_id: "testsource".to_string(),
};
let pipeline_uid1 = PipelineUid::new();
let pipeline_uid1 = PipelineUid::random();
let previous_task1 = IndexingTask {
index_uid: Some(source_uid.index_uid.clone()),
source_id: source_uid.source_id.to_string(),
pipeline_uid: Some(pipeline_uid1),
shard_ids: Vec::new(),
};
let pipeline_uid2 = PipelineUid::new();
let pipeline_uid2 = PipelineUid::random();
let previous_task2 = IndexingTask {
index_uid: Some(source_uid.index_uid.clone()),
source_id: source_uid.source_id.to_string(),
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-doc-mapper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ utoipa = { workspace = true }
quickwit-common = { workspace = true }
quickwit-datetime = { workspace = true }
quickwit-macros = { workspace = true }
quickwit-proto = { workspace = true }
quickwit-query = { workspace = true }

[dev-dependencies]
Expand Down
Loading

0 comments on commit 5b85f7c

Please sign in to comment.