Skip to content

Commit

Permalink
Bump config file format version
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Jun 11, 2024
1 parent 80e68d4 commit 0abb3a2
Show file tree
Hide file tree
Showing 36 changed files with 1,034 additions and 75 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ ttl_cache = "0.5"
typetag = "0.2"
ulid = "1.1"
username = "0.2"
utoipa = "4.2.0"
utoipa = { version = "4.2", features = ["time", "ulid"] }
uuid = { version = "1.8", features = ["v4", "serde"] }
vrl = { version = "0.8.1", default-features = false, features = [
"compiler",
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
.ask_for_res(SpawnPipeline {
index_id: args.index_id.clone(),
source_config,
pipeline_uid: PipelineUid::new(),
pipeline_uid: PipelineUid::random(),
})
.await?;
let merge_pipeline_handle = indexing_server_mailbox
Expand Down Expand Up @@ -611,7 +611,7 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
transform_config: None,
input_format: SourceInputFormat::Json,
},
pipeline_uid: PipelineUid::new(),
pipeline_uid: PipelineUid::random(),
})
.await?;
let pipeline_handle: ActorHandle<MergePipeline> = indexing_service_mailbox
Expand Down
25 changes: 16 additions & 9 deletions quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

pub(crate) mod serialize;

use std::collections::BTreeSet;
use std::num::NonZeroU32;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -31,15 +29,14 @@ use chrono::Utc;
use cron::Schedule;
use humantime::parse_duration;
use quickwit_common::uri::Uri;
use quickwit_doc_mapper::{DefaultDocMapperBuilder, DocMapper, DocMapping, Mode};
use quickwit_proto::types::{DocMappingUid, IndexId};
use quickwit_doc_mapper::{DefaultDocMapperBuilder, DocMapper, DocMapping};
use quickwit_proto::types::IndexId;
use serde::{Deserialize, Serialize};
pub use serialize::load_index_config_from_user_config;
use tracing::warn;

use crate::index_config::serialize::VersionedIndexConfig;
use crate::merge_policy_config::{MergePolicyConfig, StableLogMergePolicyConfig};
use crate::TestableForRegression;
use crate::merge_policy_config::MergePolicyConfig;

#[derive(Clone, Debug, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -260,6 +257,7 @@ impl IndexConfig {
pub fn for_test(index_id: &str, index_uri: &str) -> Self {
let index_uri = Uri::from_str(index_uri).unwrap();
let doc_mapping_json = r#"{
"doc_mapping_uid": "00000000000000000000000000",
"mode": "lenient",
"field_mappings": [
{
Expand Down Expand Up @@ -342,8 +340,17 @@ impl IndexConfig {
}
}

impl TestableForRegression for IndexConfig {
#[cfg(any(test, feature = "testsuite"))]
impl crate::TestableForRegression for IndexConfig {
fn sample_for_regression() -> Self {
use std::collections::BTreeSet;
use std::num::NonZeroU32;

use quickwit_doc_mapper::Mode;
use quickwit_proto::types::DocMappingUid;

use crate::merge_policy_config::StableLogMergePolicyConfig;

let tenant_id_mapping = serde_json::from_str(
r#"{
"name": "tenant_id",
Expand Down Expand Up @@ -386,7 +393,7 @@ impl TestableForRegression for IndexConfig {
)
.unwrap();
let doc_mapping = DocMapping {
doc_mapping_uid: DocMappingUid::default(),
doc_mapping_uid: DocMappingUid::for_test(1),
mode: Mode::default(),
field_mappings: vec![
tenant_id_mapping,
Expand Down Expand Up @@ -551,7 +558,7 @@ mod tests {
assert_eq!(index_config.indexing_settings.commit_timeout_secs, 61);
assert_eq!(
index_config.indexing_settings.merge_policy,
MergePolicyConfig::StableLog(StableLogMergePolicyConfig {
MergePolicyConfig::StableLog(crate::StableLogMergePolicyConfig {
merge_factor: 9,
max_merge_factor: 11,
maturation_period: Duration::from_secs(48 * 3600),
Expand Down
15 changes: 11 additions & 4 deletions quickwit/quickwit-config/src/index_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use anyhow::Context;
use quickwit_common::uri::Uri;
use quickwit_proto::types::IndexId;
use quickwit_proto::types::{DocMappingUid, IndexId};
use serde::{Deserialize, Serialize};
use tracing::info;

Expand All @@ -35,8 +35,10 @@ type IndexConfigForSerialization = IndexConfigV0_8;
#[derive(Clone, Debug, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(tag = "version")]
pub(crate) enum VersionedIndexConfig {
#[serde(rename = "0.8")]
#[serde(rename = "0.9")]
V0_9(IndexConfigV0_8),
// Retro compatibility
#[serde(rename = "0.8")]
#[serde(alias = "0.7")]
V0_8(IndexConfigV0_8),
}
Expand All @@ -45,6 +47,7 @@ impl From<VersionedIndexConfig> for IndexConfigForSerialization {
fn from(versioned_config: VersionedIndexConfig) -> IndexConfigForSerialization {
match versioned_config {
VersionedIndexConfig::V0_8(v0_8) => v0_8,
VersionedIndexConfig::V0_9(v0_8) => v0_8,
}
}
}
Expand Down Expand Up @@ -108,7 +111,7 @@ impl IndexConfigForSerialization {

impl From<IndexConfig> for VersionedIndexConfig {
fn from(index_config: IndexConfig) -> Self {
VersionedIndexConfig::V0_8(index_config.into())
VersionedIndexConfig::V0_9(index_config.into())
}
}

Expand All @@ -117,7 +120,11 @@ impl TryFrom<VersionedIndexConfig> for IndexConfig {

fn try_from(versioned_index_config: VersionedIndexConfig) -> anyhow::Result<Self> {
match versioned_index_config {
VersionedIndexConfig::V0_8(v0_8) => v0_8.build_and_validate(None),
VersionedIndexConfig::V0_8(mut v0_8) => {
v0_8.doc_mapping.doc_mapping_uid = DocMappingUid::default();
v0_8.build_and_validate(None)
}
VersionedIndexConfig::V0_9(v0_8) => v0_8.build_and_validate(None),
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions quickwit/quickwit-config/src/index_template/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub use serialize::{IndexTemplateV0_8, VersionedIndexTemplate};
use crate::index_config::validate_index_config;
use crate::{
validate_identifier, validate_index_id_pattern, DocMapping, IndexConfig, IndexingSettings,
RetentionPolicy, SearchSettings, TestableForRegression,
RetentionPolicy, SearchSettings,
};

pub type IndexTemplateId = String;
Expand Down Expand Up @@ -135,7 +135,8 @@ impl IndexTemplate {
}
}

impl TestableForRegression for IndexTemplate {
#[cfg(any(test, feature = "testsuite"))]
impl crate::TestableForRegression for IndexTemplate {
fn sample_for_regression() -> Self {
let template_id = "test-template".to_string();
let index_id_patterns = vec![
Expand All @@ -144,6 +145,7 @@ impl TestableForRegression for IndexTemplate {
];

let doc_mapping_json = r#"{
"doc_mapping_uid": "00000000000000000000000001",
"field_mappings": [
{
"name": "ts",
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-config/src/index_template/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use crate::{DocMapping, IndexingSettings, RetentionPolicy, SearchSettings};
#[derive(Clone, Debug, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(tag = "version")]
pub enum VersionedIndexTemplate {
#[serde(rename = "0.8")]
#[serde(rename = "0.9")]
#[serde(alias = "0.8")]
#[serde(alias = "0.7")]
V0_8(IndexTemplateV0_8),
}
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub use serialize::load_source_config_from_user_config;
// For backward compatibility.
use serialize::VersionedSourceConfig;

use crate::{disable_ingest_v1, enable_ingest_v2, TestableForRegression};
use crate::{disable_ingest_v1, enable_ingest_v2};

/// Reserved source ID for the `quickwit index ingest` CLI command.
pub const CLI_SOURCE_ID: &str = "_ingest-cli-source";
Expand Down Expand Up @@ -153,7 +153,8 @@ impl SourceConfig {
}
}

impl TestableForRegression for SourceConfig {
#[cfg(any(test, feature = "testsuite"))]
impl crate::TestableForRegression for SourceConfig {
fn sample_for_regression() -> Self {
SourceConfig {
source_id: "kafka-source".to_string(),
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-config/src/source_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ type SourceConfigForSerialization = SourceConfigV0_8;
#[serde(deny_unknown_fields)]
#[serde(tag = "version")]
pub enum VersionedSourceConfig {
#[serde(rename = "0.8")]
#[serde(rename = "0.9")]
#[serde(alias = "0.8")]
V0_8(SourceConfigV0_8),
// Retro compatibility.
#[serde(rename = "0.7")]
Expand Down
10 changes: 6 additions & 4 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1681,10 +1681,12 @@ mod tests {
.unwrap();
assert!(indexing_tasks.is_empty());

let results: Vec<ApplyIndexingPlanRequest> =
client_inbox.drain_for_test_typed::<ApplyIndexingPlanRequest>();
assert_eq!(results.len(), 1);
assert!(results[0].indexing_tasks.is_empty());
let apply_plan_requests = client_inbox.drain_for_test_typed::<ApplyIndexingPlanRequest>();
assert!(!apply_plan_requests.is_empty());

for apply_plan_request in &apply_plan_requests {
assert!(apply_plan_request.indexing_tasks.is_empty());
}

universe.assert_quit().await;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source(
IndexingTask {
index_uid: Some(source.source_uid.index_uid.clone()),
source_id: source.source_uid.source_id.clone(),
pipeline_uid: Some(PipelineUid::new()),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: Vec::new(),
}
});
Expand All @@ -283,7 +283,7 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source(
vec![IndexingTask {
index_uid: Some(source.source_uid.index_uid.clone()),
source_id: source.source_uid.source_id.clone(),
pipeline_uid: Some(PipelineUid::new()),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: Vec::new(),
}]
}
Expand Down Expand Up @@ -551,7 +551,7 @@ fn add_shard_to_indexer(
indexer_tasks.push(IndexingTask {
index_uid: Some(source_uid.index_uid.clone()),
source_id: source_uid.source_id.clone(),
pipeline_uid: Some(PipelineUid::new()),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: vec![missing_shard],
});
}
Expand Down Expand Up @@ -1189,13 +1189,13 @@ mod tests {
let previous_task1 = IndexingTask {
index_uid: Some(source_uid.index_uid.clone()),
source_id: source_uid.source_id.to_string(),
pipeline_uid: Some(PipelineUid::new()),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: vec![ShardId::from(1), ShardId::from(4), ShardId::from(5)],
};
let previous_task2 = IndexingTask {
index_uid: Some(source_uid.index_uid.clone()),
source_id: source_uid.source_id.to_string(),
pipeline_uid: Some(PipelineUid::new()),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: vec![
ShardId::from(6),
ShardId::from(7),
Expand Down Expand Up @@ -1259,14 +1259,14 @@ mod tests {
index_uid: IndexUid::new_with_random_ulid("testindex"),
source_id: "testsource".to_string(),
};
let pipeline_uid1 = PipelineUid::new();
let pipeline_uid1 = PipelineUid::random();
let previous_task1 = IndexingTask {
index_uid: Some(source_uid.index_uid.clone()),
source_id: source_uid.source_id.to_string(),
pipeline_uid: Some(pipeline_uid1),
shard_ids: Vec::new(),
};
let pipeline_uid2 = PipelineUid::new();
let pipeline_uid2 = PipelineUid::random();
let previous_task2 = IndexingTask {
index_uid: Some(source_uid.index_uid.clone()),
source_id: source_uid.source_id.to_string(),
Expand Down
9 changes: 6 additions & 3 deletions quickwit/quickwit-doc-mapper/src/doc_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ impl Default for Mode {
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct DocMapping {
/// UID of the doc mapping.
#[serde(default = "DocMappingUid::new")]
/// Doc mapping UID.
///
/// Splits with the same doc mapping UID share the same schema and should use the same doc
/// mapper during indexing and querying.
#[serde(default = "DocMappingUid::random")]
pub doc_mapping_uid: DocMappingUid,

/// Defines how unmapped fields should be handled.
Expand Down Expand Up @@ -182,7 +185,7 @@ mod tests {
#[test]
fn test_doc_mapping_serde_roundtrip() {
let doc_mapping = DocMapping {
doc_mapping_uid: DocMappingUid::new(),
doc_mapping_uid: DocMappingUid::random(),
mode: Mode::Strict,
field_mappings: vec![
FieldMappingEntry {
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-doc-mapper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub use doc_mapper::{DocMapper, JsonObject, NamedField, TermRange, WarmupInfo};
pub use doc_mapping::{DocMapping, Mode, ModeType};
pub use error::{DocParsingError, QueryParserError};
use quickwit_common::shared_consts::FIELD_PRESENCE_FIELD_NAME;
use quickwit_proto::types::DocMappingUid;
pub use routing_expression::RoutingExpr;

/// Field name reserved for storing the source document.
Expand Down Expand Up @@ -78,6 +79,7 @@ pub enum Cardinality {

#[derive(utoipa::OpenApi)]
#[openapi(components(schemas(
DocMappingUid,
FastFieldOptions,
FieldMappingEntryForSerialization,
IndexRecordOptionSchema,
Expand Down
Loading

0 comments on commit 0abb3a2

Please sign in to comment.