Skip to content

Commit

Permalink
Bump config file format version
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Jun 10, 2024
1 parent 80e68d4 commit a879287
Show file tree
Hide file tree
Showing 33 changed files with 1,028 additions and 72 deletions.
4 changes: 2 additions & 2 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
.ask_for_res(SpawnPipeline {
index_id: args.index_id.clone(),
source_config,
pipeline_uid: PipelineUid::new(),
pipeline_uid: PipelineUid::random(),
})
.await?;
let merge_pipeline_handle = indexing_server_mailbox
Expand Down Expand Up @@ -611,7 +611,7 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
transform_config: None,
input_format: SourceInputFormat::Json,
},
pipeline_uid: PipelineUid::new(),
pipeline_uid: PipelineUid::random(),
})
.await?;
let pipeline_handle: ActorHandle<MergePipeline> = indexing_service_mailbox
Expand Down
23 changes: 15 additions & 8 deletions quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

pub(crate) mod serialize;

use std::collections::BTreeSet;
use std::num::NonZeroU32;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -31,15 +29,14 @@ use chrono::Utc;
use cron::Schedule;
use humantime::parse_duration;
use quickwit_common::uri::Uri;
use quickwit_doc_mapper::{DefaultDocMapperBuilder, DocMapper, DocMapping, Mode};
use quickwit_proto::types::{DocMappingUid, IndexId};
use quickwit_doc_mapper::{DefaultDocMapperBuilder, DocMapper, DocMapping};
use quickwit_proto::types::IndexId;
use serde::{Deserialize, Serialize};
pub use serialize::load_index_config_from_user_config;
use tracing::warn;

use crate::index_config::serialize::VersionedIndexConfig;
use crate::merge_policy_config::{MergePolicyConfig, StableLogMergePolicyConfig};
use crate::TestableForRegression;
use crate::merge_policy_config::MergePolicyConfig;

#[derive(Clone, Debug, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -260,6 +257,7 @@ impl IndexConfig {
pub fn for_test(index_id: &str, index_uri: &str) -> Self {
let index_uri = Uri::from_str(index_uri).unwrap();
let doc_mapping_json = r#"{
"doc_mapping_uid": "00000000000000000000000000",
"mode": "lenient",
"field_mappings": [
{
Expand Down Expand Up @@ -342,8 +340,17 @@ impl IndexConfig {
}
}

impl TestableForRegression for IndexConfig {
#[cfg(any(test, feature = "testsuite"))]
impl crate::TestableForRegression for IndexConfig {
fn sample_for_regression() -> Self {
use std::collections::BTreeSet;
use std::num::NonZeroU32;

use quickwit_doc_mapper::Mode;
use quickwit_proto::types::DocMappingUid;

use crate::merge_policy_config::StableLogMergePolicyConfig;

let tenant_id_mapping = serde_json::from_str(
r#"{
"name": "tenant_id",
Expand Down Expand Up @@ -551,7 +558,7 @@ mod tests {
assert_eq!(index_config.indexing_settings.commit_timeout_secs, 61);
assert_eq!(
index_config.indexing_settings.merge_policy,
MergePolicyConfig::StableLog(StableLogMergePolicyConfig {
MergePolicyConfig::StableLog(crate::StableLogMergePolicyConfig {
merge_factor: 9,
max_merge_factor: 11,
maturation_period: Duration::from_secs(48 * 3600),
Expand Down
15 changes: 11 additions & 4 deletions quickwit/quickwit-config/src/index_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use anyhow::Context;
use quickwit_common::uri::Uri;
use quickwit_proto::types::IndexId;
use quickwit_proto::types::{DocMappingUid, IndexId};
use serde::{Deserialize, Serialize};
use tracing::info;

Expand All @@ -35,8 +35,10 @@ type IndexConfigForSerialization = IndexConfigV0_8;
#[derive(Clone, Debug, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(tag = "version")]
pub(crate) enum VersionedIndexConfig {
#[serde(rename = "0.8")]
#[serde(rename = "0.9")]
V0_9(IndexConfigV0_8),
// Retro compatibility
#[serde(rename = "0.8")]
#[serde(alias = "0.7")]
V0_8(IndexConfigV0_8),
}
Expand All @@ -45,6 +47,7 @@ impl From<VersionedIndexConfig> for IndexConfigForSerialization {
fn from(versioned_config: VersionedIndexConfig) -> IndexConfigForSerialization {
match versioned_config {
VersionedIndexConfig::V0_8(v0_8) => v0_8,
VersionedIndexConfig::V0_9(v0_8) => v0_8,
}
}
}
Expand Down Expand Up @@ -108,7 +111,7 @@ impl IndexConfigForSerialization {

impl From<IndexConfig> for VersionedIndexConfig {
fn from(index_config: IndexConfig) -> Self {
VersionedIndexConfig::V0_8(index_config.into())
VersionedIndexConfig::V0_9(index_config.into())
}
}

Expand All @@ -117,7 +120,11 @@ impl TryFrom<VersionedIndexConfig> for IndexConfig {

fn try_from(versioned_index_config: VersionedIndexConfig) -> anyhow::Result<Self> {
match versioned_index_config {
VersionedIndexConfig::V0_8(v0_8) => v0_8.build_and_validate(None),
VersionedIndexConfig::V0_8(mut v0_8) => {
v0_8.doc_mapping.doc_mapping_uid = DocMappingUid::default();
v0_8.build_and_validate(None)
}
VersionedIndexConfig::V0_9(v0_8) => v0_8.build_and_validate(None),
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions quickwit/quickwit-config/src/index_template/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub use serialize::{IndexTemplateV0_8, VersionedIndexTemplate};
use crate::index_config::validate_index_config;
use crate::{
validate_identifier, validate_index_id_pattern, DocMapping, IndexConfig, IndexingSettings,
RetentionPolicy, SearchSettings, TestableForRegression,
RetentionPolicy, SearchSettings,
};

pub type IndexTemplateId = String;
Expand Down Expand Up @@ -135,7 +135,8 @@ impl IndexTemplate {
}
}

impl TestableForRegression for IndexTemplate {
#[cfg(any(test, feature = "testsuite"))]
impl crate::TestableForRegression for IndexTemplate {
fn sample_for_regression() -> Self {
let template_id = "test-template".to_string();
let index_id_patterns = vec![
Expand All @@ -144,6 +145,7 @@ impl TestableForRegression for IndexTemplate {
];

let doc_mapping_json = r#"{
"doc_mapping_uid": "00000000000000000000000000",
"field_mappings": [
{
"name": "ts",
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-config/src/index_template/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use crate::{DocMapping, IndexingSettings, RetentionPolicy, SearchSettings};
#[derive(Clone, Debug, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(tag = "version")]
pub enum VersionedIndexTemplate {
#[serde(rename = "0.8")]
#[serde(rename = "0.9")]
#[serde(alias = "0.8")]
#[serde(alias = "0.7")]
V0_8(IndexTemplateV0_8),
}
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub use serialize::load_source_config_from_user_config;
// For backward compatibility.
use serialize::VersionedSourceConfig;

use crate::{disable_ingest_v1, enable_ingest_v2, TestableForRegression};
use crate::{disable_ingest_v1, enable_ingest_v2};

/// Reserved source ID for the `quickwit index ingest` CLI command.
pub const CLI_SOURCE_ID: &str = "_ingest-cli-source";
Expand Down Expand Up @@ -153,7 +153,8 @@ impl SourceConfig {
}
}

impl TestableForRegression for SourceConfig {
#[cfg(any(test, feature = "testsuite"))]
impl crate::TestableForRegression for SourceConfig {
fn sample_for_regression() -> Self {
SourceConfig {
source_id: "kafka-source".to_string(),
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-config/src/source_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ type SourceConfigForSerialization = SourceConfigV0_8;
#[serde(deny_unknown_fields)]
#[serde(tag = "version")]
pub enum VersionedSourceConfig {
#[serde(rename = "0.8")]
#[serde(rename = "0.9")]
#[serde(alias = "0.8")]
V0_8(SourceConfigV0_8),
// Retro compatibility.
#[serde(rename = "0.7")]
Expand Down
10 changes: 6 additions & 4 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1681,10 +1681,12 @@ mod tests {
.unwrap();
assert!(indexing_tasks.is_empty());

let results: Vec<ApplyIndexingPlanRequest> =
client_inbox.drain_for_test_typed::<ApplyIndexingPlanRequest>();
assert_eq!(results.len(), 1);
assert!(results[0].indexing_tasks.is_empty());
let apply_plan_requests = client_inbox.drain_for_test_typed::<ApplyIndexingPlanRequest>();
assert!(!apply_plan_requests.is_empty());

for apply_plan_request in &apply_plan_requests {
assert!(apply_plan_request.indexing_tasks.is_empty());
}

universe.assert_quit().await;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source(
IndexingTask {
index_uid: Some(source.source_uid.index_uid.clone()),
source_id: source.source_uid.source_id.clone(),
pipeline_uid: Some(PipelineUid::new()),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: Vec::new(),
}
});
Expand All @@ -283,7 +283,7 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source(
vec![IndexingTask {
index_uid: Some(source.source_uid.index_uid.clone()),
source_id: source.source_uid.source_id.clone(),
pipeline_uid: Some(PipelineUid::new()),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: Vec::new(),
}]
}
Expand Down Expand Up @@ -551,7 +551,7 @@ fn add_shard_to_indexer(
indexer_tasks.push(IndexingTask {
index_uid: Some(source_uid.index_uid.clone()),
source_id: source_uid.source_id.clone(),
pipeline_uid: Some(PipelineUid::new()),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: vec![missing_shard],
});
}
Expand Down Expand Up @@ -1189,13 +1189,13 @@ mod tests {
let previous_task1 = IndexingTask {
index_uid: Some(source_uid.index_uid.clone()),
source_id: source_uid.source_id.to_string(),
pipeline_uid: Some(PipelineUid::new()),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: vec![ShardId::from(1), ShardId::from(4), ShardId::from(5)],
};
let previous_task2 = IndexingTask {
index_uid: Some(source_uid.index_uid.clone()),
source_id: source_uid.source_id.to_string(),
pipeline_uid: Some(PipelineUid::new()),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: vec![
ShardId::from(6),
ShardId::from(7),
Expand Down Expand Up @@ -1259,14 +1259,14 @@ mod tests {
index_uid: IndexUid::new_with_random_ulid("testindex"),
source_id: "testsource".to_string(),
};
let pipeline_uid1 = PipelineUid::new();
let pipeline_uid1 = PipelineUid::random();
let previous_task1 = IndexingTask {
index_uid: Some(source_uid.index_uid.clone()),
source_id: source_uid.source_id.to_string(),
pipeline_uid: Some(pipeline_uid1),
shard_ids: Vec::new(),
};
let pipeline_uid2 = PipelineUid::new();
let pipeline_uid2 = PipelineUid::random();
let previous_task2 = IndexingTask {
index_uid: Some(source_uid.index_uid.clone()),
source_id: source_uid.source_id.to_string(),
Expand Down
9 changes: 6 additions & 3 deletions quickwit/quickwit-doc-mapper/src/doc_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ impl Default for Mode {
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct DocMapping {
/// UID of the doc mapping.
#[serde(default = "DocMappingUid::new")]
/// Doc mapping UID.
///
/// Splits with the same doc mapping UID share the same schema and should use the same doc
/// mapper during indexing and querying.
#[serde(default = "DocMappingUid::random")]
pub doc_mapping_uid: DocMappingUid,

/// Defines how unmapped fields should be handled.
Expand Down Expand Up @@ -182,7 +185,7 @@ mod tests {
#[test]
fn test_doc_mapping_serde_roundtrip() {
let doc_mapping = DocMapping {
doc_mapping_uid: DocMappingUid::new(),
doc_mapping_uid: DocMappingUid::random(),
mode: Mode::Strict,
field_mappings: vec![
FieldMappingEntry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::fs;
use std::path::Path;

use anyhow::{bail, Context};
use quickwit_config::TestableForRegression;
use quickwit_config::{IndexConfig, IndexTemplate, SourceConfig, TestableForRegression};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;

Expand All @@ -46,7 +46,7 @@ use crate::{IndexMetadata, SplitMetadata};
/// #[serde(rename="0.2")]
/// V0_2(MyResourceV1) //< there was no change in this version.
/// }
const GLOBAL_QUICKWIT_RESOURCE_VERSION: &str = "0.8";
const GLOBAL_QUICKWIT_RESOURCE_VERSION: &str = "0.9";

/// This test makes sure that the resource is using the current `GLOBAL_QUICKWIT_RESOURCE_VERSION`.
fn test_global_version<T: Serialize>(serializable: &T) -> anyhow::Result<()> {
Expand Down Expand Up @@ -197,10 +197,12 @@ where for<'a> T: Serialize {
pub(crate) fn test_json_backward_compatibility_helper<T>(test_name: &str) -> anyhow::Result<()>
where T: TestableForRegression + std::fmt::Debug {
let sample_instance: T = T::sample_for_regression();
test_global_version(&sample_instance).unwrap();

let test_dir = Path::new("test-data").join(test_name);
test_global_version(&sample_instance).context("version is not the global version")?;
test_backward_compatibility::<T>(&test_dir).context("backward-compatibility")?;
test_and_update_expected_files::<T>(&test_dir).context("test-and-update")?;

test_and_create_new_test::<T>(&test_dir, sample_instance)
.context("test-and-create-new-test")?;
Ok(())
Expand All @@ -216,6 +218,18 @@ fn test_index_metadata_backward_compatibility() {
test_json_backward_compatibility_helper::<IndexMetadata>("index-metadata").unwrap();
}

#[test]
fn test_index_config_global_version() {
let sample_instance = IndexConfig::sample_for_regression();
test_global_version(&sample_instance).unwrap();
}

#[test]
fn test_source_config_global_version() {
let sample_instance = SourceConfig::sample_for_regression();
test_global_version(&sample_instance).unwrap();
}

#[test]
fn test_file_backed_index_backward_compatibility() {
test_json_backward_compatibility_helper::<FileBackedIndex>("file-backed-index").unwrap();
Expand All @@ -225,3 +239,9 @@ fn test_file_backed_index_backward_compatibility() {
fn test_file_backed_metastore_manifest_backward_compatibility() {
test_json_backward_compatibility_helper::<Manifest>("manifest").unwrap();
}

#[test]
fn test_index_template_global_version() {
let sample_instance = IndexTemplate::sample_for_regression();
test_global_version(&sample_instance).unwrap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ use crate::{IndexMetadata, Split};
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "version")]
pub(crate) enum VersionedFileBackedIndex {
#[serde(rename = "0.8")]
#[serde(rename = "0.9")]
// Retro compatibility.
#[serde(alias = "0.8")]
#[serde(alias = "0.7")]
V0_8(FileBackedIndexV0_8),
}
Expand Down
Loading

0 comments on commit a879287

Please sign in to comment.