From 2bc99f30918637dff62f97c6d7ef499ad421bddf Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Mon, 3 Jun 2024 17:12:35 -0400 Subject: [PATCH] Clean up `DefaultDocMapperBuilder` --- quickwit/Cargo.lock | 1 - quickwit/quickwit-config/Cargo.toml | 1 - .../quickwit-config/src/index_config/mod.rs | 99 +------- quickwit/quickwit-config/src/lib.rs | 9 +- .../src/default_doc_mapper/default_mapper.rs | 148 +++++------ .../default_mapper_builder.rs | 154 ++---------- .../src/default_doc_mapper/mod.rs | 2 +- .../quickwit-doc-mapper/src/doc_mapper.rs | 101 +++----- .../quickwit-doc-mapper/src/doc_mapping.rs | 229 ++++++++++++++++++ quickwit/quickwit-doc-mapper/src/lib.rs | 11 +- 10 files changed, 382 insertions(+), 373 deletions(-) create mode 100644 quickwit/quickwit-doc-mapper/src/doc_mapping.rs diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 2d0b51d90b3..c7eb4ef1f3a 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5867,7 +5867,6 @@ dependencies = [ "once_cell", "quickwit-common", "quickwit-doc-mapper", - "quickwit-macros", "quickwit-proto", "regex", "serde", diff --git a/quickwit/quickwit-config/Cargo.toml b/quickwit/quickwit-config/Cargo.toml index f32393cd39f..b488ead891b 100644 --- a/quickwit/quickwit-config/Cargo.toml +++ b/quickwit/quickwit-config/Cargo.toml @@ -36,7 +36,6 @@ vrl = { workspace = true, optional = true } quickwit-common = { workspace = true } quickwit-doc-mapper = { workspace = true } -quickwit-macros = { workspace = true } quickwit-proto = { workspace = true } [dev-dependencies] diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index bdb99d59bef..2ec65bdc040 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -31,10 +31,7 @@ use chrono::Utc; use cron::Schedule; use humantime::parse_duration; use quickwit_common::uri::Uri; -use quickwit_doc_mapper::{ - DefaultDocMapper, DefaultDocMapperBuilder, DocMapper, FieldMappingEntry, Mode, ModeType, - QuickwitJsonOptions, TokenizerEntry, -}; +use quickwit_doc_mapper::{DefaultDocMapperBuilder, DocMapper, DocMapping, Mode}; use quickwit_proto::types::IndexId; use serde::{Deserialize, Serialize}; pub use serialize::load_index_config_from_user_config; @@ -44,57 +41,6 @@ use crate::index_config::serialize::VersionedIndexConfig; use crate::merge_policy_config::{MergePolicyConfig, StableLogMergePolicyConfig}; use crate::TestableForRegression; -// Note(fmassot): `DocMapping` is a struct only used for -// serialization/deserialization of `DocMapper` parameters. -// This is partly a duplicate of the `DefaultDocMapper` and -// can be viewed as a temporary hack for 0.2 release before -// refactoring. -#[quickwit_macros::serde_multikey] -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, utoipa::ToSchema)] -#[serde(deny_unknown_fields)] -pub struct DocMapping { - #[serde(default)] - #[schema(value_type = Vec)] - /// The mapping of the index schema fields. - /// - /// This defines the name, type and other information about the field(s). - /// - /// Properties are determined by the specified type, for more information - /// please see: - pub field_mappings: Vec, - #[schema(value_type = Vec)] - #[serde(default)] - pub tag_fields: BTreeSet, - #[serde(default)] - pub store_source: bool, - #[serde(default)] - pub index_field_presence: bool, - #[serde(default)] - pub timestamp_field: Option, - #[serde_multikey( - deserializer = Mode::from_parts, - serializer = Mode::into_parts, - fields = ( - #[serde(default)] - mode: ModeType, - #[serde(skip_serializing_if = "Option::is_none")] - dynamic_mapping: Option - ), - )] - pub mode: Mode, - #[serde(default)] - #[serde(skip_serializing_if = "Option::is_none")] - pub partition_key: Option, - #[schema(value_type = u32)] - #[serde(default = "DefaultDocMapper::default_max_num_partitions")] - pub max_num_partitions: NonZeroU32, - #[serde(default)] - pub tokenizers: Vec, - /// Record document length - #[serde(default)] - pub document_length: bool, -} - #[derive(Clone, Debug, Serialize, Deserialize, utoipa::ToSchema)] #[serde(deny_unknown_fields)] pub struct IndexingResources { @@ -440,24 +386,24 @@ impl TestableForRegression for IndexConfig { ) .unwrap(); let doc_mapping = DocMapping { - index_field_presence: true, + mode: Mode::default(), field_mappings: vec![ tenant_id_mapping, timestamp_mapping, log_level_mapping, message_mapping, ], + timestamp_field: Some("timestamp".to_string()), tag_fields: ["tenant_id", "log_level"] .into_iter() .map(|tag_field| tag_field.to_string()) .collect::>(), - store_source: true, - mode: Mode::default(), partition_key: Some("tenant_id".to_string()), max_num_partitions: NonZeroU32::new(100).unwrap(), - timestamp_field: Some("timestamp".to_string()), + index_field_presence: true, + store_document_size: false, + store_source: true, tokenizers: vec![tokenizer], - document_length: false, }; let retention_policy = Some(RetentionPolicy { retention_period: "90 days".to_string(), @@ -496,46 +442,20 @@ impl TestableForRegression for IndexConfig { fn assert_equality(&self, other: &Self) { assert_eq!(self.index_id, other.index_id); assert_eq!(self.index_uri, other.index_uri); - assert_eq!( - self.doc_mapping - .field_mappings - .iter() - .map(|field_mapping| &field_mapping.name) - .collect::>(), - other - .doc_mapping - .field_mappings - .iter() - .map(|field_mapping| &field_mapping.name) - .collect::>(), - ); - assert_eq!(self.doc_mapping.tag_fields, other.doc_mapping.tag_fields,); - assert_eq!( - self.doc_mapping.store_source, - other.doc_mapping.store_source, - ); + assert_eq!(self.doc_mapping, other.doc_mapping); assert_eq!(self.indexing_settings, other.indexing_settings); assert_eq!(self.search_settings, other.search_settings); } } -/// Builds and returns the doc mapper associated with index. +/// Builds and returns the doc mapper associated with an index. pub fn build_doc_mapper( doc_mapping: &DocMapping, search_settings: &SearchSettings, ) -> anyhow::Result> { let builder = DefaultDocMapperBuilder { - store_source: doc_mapping.store_source, - index_field_presence: doc_mapping.index_field_presence, + doc_mapping: doc_mapping.clone(), default_search_fields: search_settings.default_search_fields.clone(), - timestamp_field: doc_mapping.timestamp_field.clone(), - field_mappings: doc_mapping.field_mappings.clone(), - tag_fields: doc_mapping.tag_fields.iter().cloned().collect(), - mode: doc_mapping.mode.clone(), - partition_key: doc_mapping.partition_key.clone(), - max_num_partitions: doc_mapping.max_num_partitions, - tokenizers: doc_mapping.tokenizers.clone(), - document_length: doc_mapping.document_length, }; Ok(Arc::new(builder.try_build()?)) } @@ -571,6 +491,7 @@ pub(super) fn validate_index_config( mod tests { use cron::TimeUnitSpec; + use quickwit_doc_mapper::ModeType; use super::*; use crate::merge_policy_config::MergePolicyConfig; diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index bd58a139b6c..82ee8f7b0c5 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -48,9 +48,10 @@ pub use cluster_config::ClusterConfig; // See #2048 use index_config::serialize::{IndexConfigV0_8, VersionedIndexConfig}; pub use index_config::{ - build_doc_mapper, load_index_config_from_user_config, DocMapping, IndexConfig, - IndexingResources, IndexingSettings, RetentionPolicy, SearchSettings, + build_doc_mapper, load_index_config_from_user_config, IndexConfig, IndexingResources, + IndexingSettings, RetentionPolicy, SearchSettings, }; +pub use quickwit_doc_mapper::DocMapping; use serde::de::DeserializeOwned; use serde::Serialize; use serde_json::Value as JsonValue; @@ -225,7 +226,7 @@ impl ConfigFormat { serde_json::from_reader(StripComments::new(payload))?; let version_value = json_value.get_mut("version").context("missing version")?; if let Some(version_number) = version_value.as_u64() { - warn!(version_value=?version_value, "`version` is supposed to be a string"); + warn!(version_value=?version_value, "`version` should be a string"); *version_value = JsonValue::String(version_number.to_string()); } serde_json::from_value(json_value).context("failed to parse JSON file") @@ -237,7 +238,7 @@ impl ConfigFormat { toml::from_str(payload_str).context("failed to parse TOML file")?; let version_value = toml_value.get_mut("version").context("missing version")?; if let Some(version_number) = version_value.as_integer() { - warn!(version_value=?version_value, "`version` is supposed to be a string"); + warn!(version_value=?version_value, "`version` should be a string"); *version_value = toml::Value::String(version_number.to_string()); let reserialized = toml::to_string(version_value) .context("failed to reserialize toml config")?; diff --git a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs index 3daf65462d0..2713865192c 100644 --- a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs +++ b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs @@ -45,8 +45,9 @@ use crate::doc_mapper::{JsonObject, Partition}; use crate::query_builder::build_query; use crate::routing_expression::RoutingExpr; use crate::{ - Cardinality, DocMapper, DocParsingError, Mode, QueryParserError, TokenizerEntry, WarmupInfo, - DOCUMENT_LEN_FIELD_NAME, DYNAMIC_FIELD_NAME, FIELD_PRESENCE_FIELD_NAME, SOURCE_FIELD_NAME, + Cardinality, DocMapper, DocMapping, DocParsingError, Mode, QueryParserError, TokenizerEntry, + WarmupInfo, DOCUMENT_SIZE_FIELD_NAME, DYNAMIC_FIELD_NAME, FIELD_PRESENCE_FIELD_NAME, + SOURCE_FIELD_NAME, }; const FIELD_PRESENCE_FIELD: Field = Field::from_field_id(0u32); @@ -55,8 +56,8 @@ const FIELD_PRESENCE_FIELD: Field = Field::from_field_id(0u32); /// to tantivy index fields. /// /// The mains rules are defined by the field mappings. -#[derive(Serialize, Deserialize, Clone)] -#[serde(try_from = "DefaultDocMapperBuilder", into = "DefaultDocMapperBuilder")] +#[derive(Clone, Serialize, Deserialize)] +#[serde(into = "DefaultDocMapperBuilder", try_from = "DefaultDocMapperBuilder")] pub struct DefaultDocMapper { /// Field in which the source should be stored. /// This field is only valid when using the schema associated with the default @@ -70,7 +71,7 @@ pub struct DefaultDocMapper { /// doc mapper, and therefore cannot be used in the `query` method. dynamic_field: Option, /// Field in which the len of the source document is stored as a fast field. - document_len_field: Option, + document_size_field: Option, /// Default list of field names used for search. default_search_field_names: Vec, /// Timestamp field name. @@ -89,8 +90,6 @@ pub struct DefaultDocMapper { partition_key: RoutingExpr, /// Maximum number of partitions max_num_partitions: NonZeroU32, - /// List of required fields. Right now this is unused. - required_fields: Vec, /// Defines how unmapped fields should be handle. mode: Mode, /// User-defined tokenizers. @@ -100,21 +99,9 @@ pub struct DefaultDocMapper { } impl DefaultDocMapper { - fn check_missing_required_fields(&self, doc: &Document) -> Result<(), DocParsingError> { - for &required_field in &self.required_fields { - if doc.get_first(required_field).is_none() { - let missing_field_name = self.schema.get_field_name(required_field); - return Err(DocParsingError::RequiredField( - missing_field_name.to_string(), - )); - } - } - Ok(()) - } - /// Default maximum number of partitions. pub fn default_max_num_partitions() -> NonZeroU32 { - NonZeroU32::new(200).unwrap() + DocMapping::default_max_num_partitions() } } @@ -146,50 +133,79 @@ fn validate_timestamp_field( Ok(()) } +impl From for DefaultDocMapperBuilder { + fn from(default_doc_mapper: DefaultDocMapper) -> Self { + let partition_key_str = default_doc_mapper.partition_key.to_string(); + let partition_key_opt: Option = if !partition_key_str.is_empty() { + Some(partition_key_str) + } else { + None + }; + let doc_mapping = DocMapping { + mode: default_doc_mapper.mode, + field_mappings: default_doc_mapper.field_mappings.into(), + timestamp_field: default_doc_mapper.timestamp_field_name, + tag_fields: default_doc_mapper.tag_field_names, + partition_key: partition_key_opt, + max_num_partitions: default_doc_mapper.max_num_partitions, + index_field_presence: default_doc_mapper.index_field_presence, + store_document_size: default_doc_mapper.document_size_field.is_some(), + store_source: default_doc_mapper.source_field.is_some(), + tokenizers: default_doc_mapper.tokenizer_entries, + }; + Self { + doc_mapping, + default_search_fields: default_doc_mapper.default_search_field_names, + } + } +} + impl TryFrom for DefaultDocMapper { type Error = anyhow::Error; fn try_from(builder: DefaultDocMapperBuilder) -> anyhow::Result { let mut schema_builder = Schema::builder(); + + // We want the field ID of the field presence field to be 0, so we add it to the schema + // first. let field_presence_field = schema_builder.add_u64_field(FIELD_PRESENCE_FIELD_NAME, INDEXED); assert_eq!(field_presence_field, FIELD_PRESENCE_FIELD); - let dynamic_field = if let Mode::Dynamic(json_options) = &builder.mode { + let doc_mapping = builder.doc_mapping; + + let dynamic_field = if let Mode::Dynamic(json_options) = &doc_mapping.mode { Some(schema_builder.add_json_field(DYNAMIC_FIELD_NAME, json_options.clone())) } else { None }; - - let document_len_field = if builder.document_length { - let document_len_field_options = tantivy::schema::NumericOptions::default().set_fast(); - Some(schema_builder.add_u64_field(DOCUMENT_LEN_FIELD_NAME, document_len_field_options)) + let document_size_field = if doc_mapping.store_document_size { + let document_size_field_options = tantivy::schema::NumericOptions::default().set_fast(); + Some( + schema_builder.add_u64_field(DOCUMENT_SIZE_FIELD_NAME, document_size_field_options), + ) + } else { + None + }; + let source_field = if doc_mapping.store_source { + Some(schema_builder.add_json_field(SOURCE_FIELD_NAME, STORED)) } else { None }; - - // Adding regular fields. let MappingNodeRoot { field_mappings, concatenate_dynamic_fields, - } = build_mapping_tree(&builder.field_mappings, &mut schema_builder)?; + } = build_mapping_tree(&doc_mapping.field_mappings, &mut schema_builder)?; if !concatenate_dynamic_fields.is_empty() && dynamic_field.is_none() { bail!("concatenate field has `include_dynamic_fields` set, but index isn't dynamic"); } - let source_field = if builder.store_source { - Some(schema_builder.add_json_field(SOURCE_FIELD_NAME, STORED)) - } else { - None - }; - - if let Some(timestamp_field_path) = builder.timestamp_field.as_ref() { + if let Some(timestamp_field_path) = &doc_mapping.timestamp_field { validate_timestamp_field(timestamp_field_path, &field_mappings)?; }; - let schema = schema_builder.build(); let tokenizer_manager = create_default_quickwit_tokenizer_manager(); let mut custom_tokenizer_names = HashSet::new(); - for tokenizer_config_entry in builder.tokenizers.iter() { + for tokenizer_config_entry in &doc_mapping.tokenizers { if custom_tokenizer_names.contains(&tokenizer_config_entry.name) { bail!( "duplicated custom tokenizer: `{}`", @@ -247,40 +263,38 @@ impl TryFrom for DefaultDocMapper { } // Resolve tag fields - let mut tag_field_names: BTreeSet = builder.tag_fields.iter().cloned().collect(); - for tag_field_name in &builder.tag_fields { + for tag_field_name in &doc_mapping.tag_fields { validate_tag(tag_field_name, &schema)?; } - let partition_key_expr: &str = builder.partition_key.as_deref().unwrap_or(""); + let partition_key_expr: &str = doc_mapping.partition_key.as_deref().unwrap_or(""); let partition_key = RoutingExpr::new(partition_key_expr).with_context(|| { format!("failed to interpret the partition key: `{partition_key_expr}`") })?; // If valid, partition key fields should be considered as tags. + let mut tag_field_names = doc_mapping.tag_fields; + for partition_key in partition_key.field_names() { if validate_tag(&partition_key, &schema).is_ok() { tag_field_names.insert(partition_key); } } - - let required_fields = Vec::new(); Ok(DefaultDocMapper { schema, - index_field_presence: builder.index_field_presence, + index_field_presence: doc_mapping.index_field_presence, source_field, dynamic_field, - document_len_field, + document_size_field, default_search_field_names, - timestamp_field_name: builder.timestamp_field, + timestamp_field_name: doc_mapping.timestamp_field, field_mappings, concatenate_dynamic_fields, tag_field_names, - required_fields, partition_key, - max_num_partitions: builder.max_num_partitions, - mode: builder.mode, - tokenizer_entries: builder.tokenizers, + max_num_partitions: doc_mapping.max_num_partitions, + mode: doc_mapping.mode, + tokenizer_entries: doc_mapping.tokenizers, tokenizer_manager, }) } @@ -366,32 +380,6 @@ fn validate_fields_tokenizers( Ok(()) } -impl From for DefaultDocMapperBuilder { - fn from(default_doc_mapper: DefaultDocMapper) -> Self { - let partition_key_str = default_doc_mapper.partition_key.to_string(); - let partition_key_opt: Option = if partition_key_str.is_empty() { - None - } else { - Some(partition_key_str) - }; - Self { - store_source: default_doc_mapper.source_field.is_some(), - index_field_presence: default_doc_mapper.index_field_presence, - timestamp_field: default_doc_mapper - .timestamp_field_name() - .map(ToString::to_string), - field_mappings: default_doc_mapper.field_mappings.into(), - tag_fields: default_doc_mapper.tag_field_names.into_iter().collect(), - default_search_fields: default_doc_mapper.default_search_field_names, - mode: default_doc_mapper.mode, - partition_key: partition_key_opt, - max_num_partitions: default_doc_mapper.max_num_partitions, - tokenizers: default_doc_mapper.tokenizer_entries, - document_length: false, - } - } -} - impl std::fmt::Debug for DefaultDocMapper { fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { formatter @@ -624,8 +612,8 @@ impl DocMapper for DefaultDocMapper { } } - if let Some(document_len_field) = self.document_len_field { - document.add_u64(document_len_field, document_len); + if let Some(document_size_field) = self.document_size_field { + document.add_u64(document_size_field, document_len); } // The capacity is inexact here. @@ -664,8 +652,6 @@ impl DocMapper for DefaultDocMapper { document.add_field_value(FIELD_PRESENCE_FIELD, field_presence_hash); } } - - self.check_missing_required_fields(&document)?; Ok((partition, document)) } @@ -745,7 +731,7 @@ mod tests { use crate::default_doc_mapper::field_mapping_entry::DEFAULT_TOKENIZER_NAME; use crate::default_doc_mapper::mapping_tree::value_to_pretokenized; use crate::{ - DefaultDocMapperBuilder, DocMapper, DocParsingError, DOCUMENT_LEN_FIELD_NAME, + DefaultDocMapperBuilder, DocMapper, DocParsingError, DOCUMENT_SIZE_FIELD_NAME, DYNAMIC_FIELD_NAME, FIELD_PRESENCE_FIELD_NAME, SOURCE_FIELD_NAME, }; @@ -2048,7 +2034,7 @@ mod tests { "document_length": true, "mode": "dynamic" }"#, - DOCUMENT_LEN_FIELD_NAME, + DOCUMENT_SIZE_FIELD_NAME, raw_doc, vec![(raw_doc.len() as u64).into()], ); diff --git a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper_builder.rs b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper_builder.rs index 2c60d137384..f2778c730f9 100644 --- a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper_builder.rs +++ b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper_builder.rs @@ -17,14 +17,9 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::num::NonZeroU32; - use serde::{Deserialize, Serialize}; -use super::tokenizer_entry::TokenizerEntry; -use super::FieldMappingEntry; -use crate::default_doc_mapper::QuickwitJsonOptions; -use crate::DefaultDocMapper; +use crate::{DefaultDocMapper, DocMapping}; /// DefaultDocMapperBuilder is here /// to create a valid DocMapper. @@ -32,126 +27,15 @@ use crate::DefaultDocMapper; /// It is also used to serialize/deserialize a DocMapper. /// note that this is not the way is the DocMapping is deserialized /// from the configuration. -#[quickwit_macros::serde_multikey] -#[derive(Serialize, Deserialize, Clone)] +#[derive(Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct DefaultDocMapperBuilder { - /// Stores the original source document when set to true. - #[serde(default)] - pub store_source: bool, - /// Indexes field presence. - #[serde(default)] - pub index_field_presence: bool, - /// Name of the fields that are searched by default, unless overridden. + /// Doc mapping. + #[serde(flatten)] + pub doc_mapping: DocMapping, + /// Default search field names. #[serde(default)] pub default_search_fields: Vec, - /// Name of the field storing the timestamp of the event for time series data. - #[serde(default)] - #[serde(skip_serializing_if = "Option::is_none")] - pub timestamp_field: Option, - /// Describes which fields are indexed and how. - #[serde(default)] - pub field_mappings: Vec, - /// Name of the fields that are tagged. - #[serde(default)] - pub tag_fields: Vec, - /// The partition key is a DSL used to route documents - /// into specific splits. - #[serde(default)] - pub partition_key: Option, - /// Maximum number of partitions. - #[serde(default = "DefaultDocMapper::default_max_num_partitions")] - pub max_num_partitions: NonZeroU32, - #[serde_multikey( - deserializer = Mode::from_parts, - serializer = Mode::into_parts, - fields = ( - /// Defines the indexing mode. - #[serde(default)] - mode: ModeType, - /// If mode is set to dynamic, `dynamic_mapping` defines - /// how the unmapped fields should be handled. - #[serde(default)] - dynamic_mapping: Option, - ), - )] - /// Defines how the unmapped fields should be handled. - pub mode: Mode, - /// User-defined tokenizers. - #[serde(default)] - pub tokenizers: Vec, - /// Record document length - #[serde(default)] - pub document_length: bool, -} - -/// Defines how an unmapped field should be handled. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub enum Mode { - /// Lenient mode: unmapped fields are just ignored. - Lenient, - /// Strict mode: when parsing a document with an unmapped field, an error is yielded. - Strict, - /// Dynamic mode: unmapped fields are captured and handled according to the provided - /// configuration. - Dynamic(QuickwitJsonOptions), -} - -impl Mode { - /// Extact the `ModeType` of this `Mode` - pub fn mode_type(&self) -> ModeType { - match self { - Mode::Lenient => ModeType::Lenient, - Mode::Strict => ModeType::Strict, - Mode::Dynamic(_) => ModeType::Dynamic, - } - } - - /// Build a Mode from its type and optional dynamic mapping options - pub fn from_parts( - mode: ModeType, - dynamic_mapping: Option, - ) -> anyhow::Result { - Ok(match (mode, dynamic_mapping) { - (ModeType::Lenient, None) => Mode::Lenient, - (ModeType::Strict, None) => Mode::Strict, - (ModeType::Dynamic, Some(dynamic_mapping)) => Mode::Dynamic(dynamic_mapping), - (ModeType::Dynamic, None) => Mode::default(), // Dynamic with default options - (_, Some(_)) => anyhow::bail!( - "`dynamic_mapping` is only allowed with mode=dynamic. (here mode=`{:?}`)", - mode - ), - }) - } - - /// Obtain the mode type and dynamic options from a Mode - pub fn into_parts(self) -> (ModeType, Option) { - match self { - Mode::Lenient => (ModeType::Lenient, None), - Mode::Strict => (ModeType::Strict, None), - Mode::Dynamic(json_options) => (ModeType::Dynamic, Some(json_options)), - } - } -} - -impl Default for Mode { - fn default() -> Self { - Mode::Dynamic(QuickwitJsonOptions::default_dynamic()) - } -} - -/// `Mode` describing how the unmapped field should be handled. -#[derive(Clone, Copy, Default, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)] -#[serde(rename_all = "lowercase")] -pub enum ModeType { - /// Lenient mode: unmapped fields are just ignored. - Lenient, - /// Strict mode: when parsing a document with an unmapped field, an error is yielded. - Strict, - /// Dynamic mode: unmapped fields are captured and handled according to the - /// `dynamic_mapping` configuration. - #[default] - Dynamic, } #[cfg(test)] @@ -172,17 +56,27 @@ impl DefaultDocMapperBuilder { #[cfg(test)] mod tests { use super::*; + use crate::ModeType; #[test] fn test_default_mapper_builder_deserialize_from_empty_object() { - let default_mapper_builder: DefaultDocMapperBuilder = - serde_json::from_str::("{}").unwrap(); - assert!(default_mapper_builder.default_search_fields.is_empty()); - assert!(default_mapper_builder.field_mappings.is_empty()); - assert!(default_mapper_builder.tag_fields.is_empty()); - assert_eq!(default_mapper_builder.mode.mode_type(), ModeType::Dynamic); - assert_eq!(default_mapper_builder.store_source, false); - assert!(default_mapper_builder.timestamp_field.is_none()); + let default_doc_mapper_builder: DefaultDocMapperBuilder = + serde_json::from_str("{}").unwrap(); + assert_eq!( + default_doc_mapper_builder.doc_mapping.mode.mode_type(), + ModeType::Dynamic + ); + assert!(default_doc_mapper_builder + .doc_mapping + .field_mappings + .is_empty()); + assert!(default_doc_mapper_builder + .doc_mapping + .timestamp_field + .is_none()); + assert!(default_doc_mapper_builder.doc_mapping.tag_fields.is_empty()); + assert_eq!(default_doc_mapper_builder.doc_mapping.store_source, false); + assert!(default_doc_mapper_builder.default_search_fields.is_empty()); } #[test] diff --git a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mod.rs b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mod.rs index d36acff16fc..4a2262f0023 100644 --- a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mod.rs +++ b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mod.rs @@ -30,7 +30,7 @@ use once_cell::sync::Lazy; use regex::Regex; pub use self::default_mapper::DefaultDocMapper; -pub use self::default_mapper_builder::{DefaultDocMapperBuilder, Mode, ModeType}; +pub use self::default_mapper_builder::DefaultDocMapperBuilder; pub use self::field_mapping_entry::{ BinaryFormat, FastFieldOptions, FieldMappingEntry, QuickwitBytesOptions, QuickwitJsonOptions, QuickwitTextNormalizer, diff --git a/quickwit/quickwit-doc-mapper/src/doc_mapper.rs b/quickwit/quickwit-doc-mapper/src/doc_mapper.rs index 04860c47c97..c7d4fb2998b 100644 --- a/quickwit/quickwit-doc-mapper/src/doc_mapper.rs +++ b/quickwit/quickwit-doc-mapper/src/doc_mapper.rs @@ -249,8 +249,8 @@ mod tests { use crate::default_doc_mapper::{FieldMappingType, QuickwitJsonOptions}; use crate::{ - Cardinality, DefaultDocMapper, DefaultDocMapperBuilder, DocMapper, DocParsingError, - FieldMappingEntry, Mode, TermRange, WarmupInfo, DYNAMIC_FIELD_NAME, + Cardinality, DefaultDocMapperBuilder, DocMapper, DocParsingError, FieldMappingEntry, + TermRange, WarmupInfo, DYNAMIC_FIELD_NAME, }; const JSON_DEFAULT_DOC_MAPPER: &str = r#" @@ -332,36 +332,19 @@ mod tests { } } - #[test] - fn test_serdeserialize_doc_mapper() -> anyhow::Result<()> { - let deserialized_default_doc_mapper = - serde_json::from_str::>(JSON_DEFAULT_DOC_MAPPER)?; - let expected_default_doc_mapper = DefaultDocMapperBuilder::default().try_build()?; - assert_eq!( - format!("{deserialized_default_doc_mapper:?}"), - format!("{expected_default_doc_mapper:?}"), - ); - - let serialized_doc_mapper = serde_json::to_string(&deserialized_default_doc_mapper)?; - let deserialized_default_doc_mapper = - serde_json::from_str::>(&serialized_doc_mapper)?; - let serialized_doc_mapper_2 = serde_json::to_string(&deserialized_default_doc_mapper)?; - - assert_eq!(serialized_doc_mapper, serialized_doc_mapper_2); - - Ok(()) - } - #[test] fn test_doc_mapper_query_with_json_field() { let mut doc_mapper_builder = DefaultDocMapperBuilder::default(); - doc_mapper_builder.field_mappings.push(FieldMappingEntry { - name: "json_field".to_string(), - mapping_type: FieldMappingType::Json( - QuickwitJsonOptions::default(), - Cardinality::SingleValue, - ), - }); + doc_mapper_builder + .doc_mapping + .field_mappings + .push(FieldMappingEntry { + name: "json_field".to_string(), + mapping_type: FieldMappingType::Json( + QuickwitJsonOptions::default(), + Cardinality::SingleValue, + ), + }); let doc_mapper = doc_mapper_builder.try_build().unwrap(); let schema = doc_mapper.schema(); let query_ast = UserInputQuery { @@ -380,12 +363,7 @@ mod tests { #[test] fn test_doc_mapper_query_with_json_field_default_search_fields() { - let doc_mapper: DefaultDocMapper = DefaultDocMapperBuilder { - mode: Mode::default(), - ..Default::default() - } - .try_build() - .unwrap(); + let doc_mapper = DefaultDocMapperBuilder::default().try_build().unwrap(); let schema = doc_mapper.schema(); let query_ast = query_ast_from_user_text("toto.titi:hello", None) .parse_user_query(doc_mapper.default_search_fields()) @@ -399,12 +377,7 @@ mod tests { #[test] fn test_doc_mapper_query_with_json_field_ambiguous_term() { - let doc_mapper: DefaultDocMapper = DefaultDocMapperBuilder { - mode: Mode::default(), - ..Default::default() - } - .try_build() - .unwrap(); + let doc_mapper = DefaultDocMapperBuilder::default().try_build().unwrap(); let schema = doc_mapper.schema(); let query_ast = query_ast_from_user_text("toto:5", None) .parse_user_query(&[]) @@ -582,27 +555,33 @@ mod tests { }; use crate::{TokenizerConfig, TokenizerEntry}; let mut doc_mapper_builder = DefaultDocMapperBuilder::default(); - doc_mapper_builder.field_mappings.push(FieldMappingEntry { - name: "multilang".to_string(), - mapping_type: FieldMappingType::Text( - QuickwitTextOptions { - indexing_options: Some(TextIndexingOptions { - tokenizer: QuickwitTextTokenizer::from_static("multilang"), - record: IndexRecordOption::Basic, - fieldnorms: false, - }), - ..Default::default() + doc_mapper_builder + .doc_mapping + .field_mappings + .push(FieldMappingEntry { + name: "multilang".to_string(), + mapping_type: FieldMappingType::Text( + QuickwitTextOptions { + indexing_options: Some(TextIndexingOptions { + tokenizer: QuickwitTextTokenizer::from_static("multilang"), + record: IndexRecordOption::Basic, + fieldnorms: false, + }), + ..Default::default() + }, + Cardinality::SingleValue, + ), + }); + doc_mapper_builder + .doc_mapping + .tokenizers + .push(TokenizerEntry { + name: "multilang".to_string(), + config: TokenizerConfig { + tokenizer_type: TokenizerType::Multilang, + filters: Vec::new(), }, - Cardinality::SingleValue, - ), - }); - doc_mapper_builder.tokenizers.push(TokenizerEntry { - name: "multilang".to_string(), - config: TokenizerConfig { - tokenizer_type: TokenizerType::Multilang, - filters: Vec::new(), - }, - }); + }); let doc_mapper = doc_mapper_builder.try_build().unwrap(); let schema = doc_mapper.schema(); let query_ast = quickwit_query::query_ast::QueryAst::Term(TermQuery { diff --git a/quickwit/quickwit-doc-mapper/src/doc_mapping.rs b/quickwit/quickwit-doc-mapper/src/doc_mapping.rs new file mode 100644 index 00000000000..b2bc83c83ce --- /dev/null +++ b/quickwit/quickwit-doc-mapper/src/doc_mapping.rs @@ -0,0 +1,229 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::BTreeSet; +use std::num::NonZeroU32; + +use serde::{Deserialize, Serialize}; + +use crate::{FieldMappingEntry, QuickwitJsonOptions, TokenizerEntry}; + +/// Defines how unmapped fields should be handled. +#[derive(Clone, Copy, Default, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)] +#[serde(rename_all = "lowercase")] +pub enum ModeType { + /// Lenient mode: ignores unmapped fields. + Lenient, + /// Strict mode: returns an error when an unmapped field is encountered. + Strict, + /// Dynamic mode: captures and handles unmapped fields according to the dynamic field + /// configuration. + #[default] + Dynamic, +} + +/// Defines how unmapped fields should be handled. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub enum Mode { + /// Lenient mode: ignores unmapped fields. + Lenient, + /// Strict mode: returns an error when an unmapped field is encountered. + Strict, + /// Dynamic mode: captures and handles unmapped fields according to the dynamic field + /// configuration. + Dynamic(QuickwitJsonOptions), +} + +impl Mode { + /// Extracts the [`ModeType`] of this [`Mode`] + pub fn mode_type(&self) -> ModeType { + match self { + Self::Lenient => ModeType::Lenient, + Self::Strict => ModeType::Strict, + Self::Dynamic(_) => ModeType::Dynamic, + } + } + + /// Builds a [`Mode`] from its type and optional dynamic mapping options. + pub fn from_parts( + mode: ModeType, + dynamic_mapping: Option, + ) -> anyhow::Result { + Ok(match (mode, dynamic_mapping) { + (ModeType::Lenient, None) => Self::Lenient, + (ModeType::Strict, None) => Self::Strict, + (ModeType::Dynamic, Some(dynamic_mapping)) => Self::Dynamic(dynamic_mapping), + (ModeType::Dynamic, None) => Self::default(), // Dynamic with default options + (_, Some(_)) => anyhow::bail!( + "`dynamic_mapping` is only allowed with mode=dynamic. (here mode=`{:?}`)", + mode + ), + }) + } + + /// Obtains the mode type and dynamic options from a [`Mode`]. + pub fn into_parts(self) -> (ModeType, Option) { + match self { + Self::Lenient => (ModeType::Lenient, None), + Self::Strict => (ModeType::Strict, None), + Self::Dynamic(json_options) => (ModeType::Dynamic, Some(json_options)), + } + } +} + +impl Default for Mode { + fn default() -> Self { + Self::Dynamic(QuickwitJsonOptions::default_dynamic()) + } +} + +/// Defines how the document of an index should be parsed, tokenized, partitioned, indexed, and +/// stored. +#[quickwit_macros::serde_multikey] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, utoipa::ToSchema)] +#[serde(deny_unknown_fields)] +pub struct DocMapping { + /// Defines how unmapped fields should be handled. + #[serde_multikey( + deserializer = Mode::from_parts, + serializer = Mode::into_parts, + fields = ( + #[serde(default)] + mode: ModeType, + #[serde(skip_serializing_if = "Option::is_none")] + dynamic_mapping: Option + ), + )] + pub mode: Mode, + + /// Defines the schema of ingested documents and describes how each field value should be + /// parsed, tokenized, indexed, and stored. + #[serde(default)] + #[schema(value_type = Vec)] + pub field_mappings: Vec, + + /// Declares the field which contains the date or timestamp at which the document + /// was emitted. + #[serde(default)] + pub timestamp_field: Option, + + /// Declares the low cardinality fields for which the values ​​are recorded directly in the + /// splits metadata. + #[schema(value_type = Vec)] + #[serde(default)] + pub tag_fields: BTreeSet, + + /// Expresses via a "mini-DSL" how to route documents to split partitions. + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub partition_key: Option, + + /// The maximum number of partitions that an indexer can generate. + #[schema(value_type = u32)] + #[serde(default = "DocMapping::default_max_num_partitions")] + pub max_num_partitions: NonZeroU32, + + /// Whether to record the presence of the fields of each indexed document to allow `exists` + /// queries. + #[serde(default)] + pub index_field_presence: bool, + + /// Whether to record and store the size (bytes) of each ingested document in a fast field. + #[serde(alias = "document_length")] + #[serde(default)] + pub store_document_size: bool, + + /// Whether to store the original source documents in the doc store. + #[serde(default)] + pub store_source: bool, + + /// A set of additional user-defined tokenizers to be used during indexing. + #[serde(default)] + pub tokenizers: Vec, +} + +impl DocMapping { + /// Returns the default value for `max_num_partitions`. + pub fn default_max_num_partitions() -> NonZeroU32 { + NonZeroU32::new(200).unwrap() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::default_doc_mapper::{RegexTokenizerOption, TokenFilterType, TokenizerType}; + use crate::{FieldMappingType, TokenizerConfig}; + + #[test] + fn test_doc_mapping_serde_roundtrip() { + let doc_mapping = DocMapping { + mode: Mode::Strict, + field_mappings: vec![ + FieldMappingEntry { + name: "timestamp".to_string(), + mapping_type: FieldMappingType::Text((), ()), + }, + FieldMappingEntry { + name: "message".to_string(), + mapping_type: FieldMappingType::Text((), ()), + }, + ], + timestamp_field: Some("timestamp".to_string()), + tag_fields: BTreeSet::from_iter(["level"]), + partition_key: Some("tenant_id".to_string()), + max_num_partitions: NonZeroU32::new(100).unwrap(), + index_field_presence: true, + store_document_size: true, + store_source: true, + tokenizers: vec![TokenizerEntry { + name: "whitespace".to_string(), + config: TokenizerConfig { + tokenizer_type: TokenizerType::Regex(RegexTokenizerOption { + pattern: r"\s+".to_string(), + }), + filters: vec![TokenFilterType::LowerCaser], + }, + }], + }; + let serialized = serde_json::to_string(&doc_mapping).unwrap(); + let deserialized: DocMapping = serde_json::from_str(&serialized).unwrap(); + assert_eq!(deserialized, doc_mapping); + } + + #[test] + fn test_doc_mapping_serde_default_values() { + let doc_mapping: DocMapping = serde_json::from_str("{}").unwrap(); + assert_eq!( + doc_mapping.mode, + Mode::Dynamic(QuickwitJsonOptions::default_dynamic()) + ); + assert!(doc_mapping.field_mappings.is_empty()); + assert_eq!(doc_mapping.timestamp_field, None); + assert!(doc_mapping.tag_fields.is_empty()); + assert_eq!(doc_mapping.partition_key, None); + assert_eq!( + doc_mapping.max_num_partitions, + NonZeroU32::new(200).unwrap() + ); + assert_eq!(doc_mapping.index_field_presence, false); + assert_eq!(doc_mapping.store_document_size, false); + assert_eq!(doc_mapping.store_source, false); + } +} diff --git a/quickwit/quickwit-doc-mapper/src/lib.rs b/quickwit/quickwit-doc-mapper/src/lib.rs index 161732b28f5..435a56e0796 100644 --- a/quickwit/quickwit-doc-mapper/src/lib.rs +++ b/quickwit/quickwit-doc-mapper/src/lib.rs @@ -27,6 +27,7 @@ mod default_doc_mapper; mod doc_mapper; +mod doc_mapping; mod error; mod query_builder; mod routing_expression; @@ -36,8 +37,7 @@ pub mod tag_pruning; pub use default_doc_mapper::{ analyze_text, BinaryFormat, DefaultDocMapper, DefaultDocMapperBuilder, FieldMappingEntry, - FieldMappingType, Mode, ModeType, QuickwitBytesOptions, QuickwitJsonOptions, TokenizerConfig, - TokenizerEntry, + FieldMappingType, QuickwitBytesOptions, QuickwitJsonOptions, TokenizerConfig, TokenizerEntry, }; use default_doc_mapper::{ FastFieldOptions, FieldMappingEntryForSerialization, IndexRecordOptionSchema, @@ -45,6 +45,7 @@ use default_doc_mapper::{ TokenFilterType, TokenizerType, }; pub use doc_mapper::{DocMapper, JsonObject, NamedField, TermRange, WarmupInfo}; +pub use doc_mapping::{DocMapping, Mode, ModeType}; pub use error::{DocParsingError, QueryParserError}; use quickwit_common::shared_consts::FIELD_PRESENCE_FIELD_NAME; pub use routing_expression::RoutingExpr; @@ -56,14 +57,14 @@ pub const SOURCE_FIELD_NAME: &str = "_source"; pub const DYNAMIC_FIELD_NAME: &str = "_dynamic"; /// Field name reserved for storing the length of source document. -pub const DOCUMENT_LEN_FIELD_NAME: &str = "_doc_length"; +pub const DOCUMENT_SIZE_FIELD_NAME: &str = "_doc_length"; /// Quickwit reserved field names. const QW_RESERVED_FIELD_NAMES: &[&str] = &[ - SOURCE_FIELD_NAME, + DOCUMENT_SIZE_FIELD_NAME, DYNAMIC_FIELD_NAME, FIELD_PRESENCE_FIELD_NAME, - DOCUMENT_LEN_FIELD_NAME, + SOURCE_FIELD_NAME, ]; /// Cardinality of a field.