diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index c7eb4ef1f3a..62f86e872c2 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5960,6 +5960,7 @@ dependencies = [ "quickwit-common", "quickwit-datetime", "quickwit-macros", + "quickwit-proto", "quickwit-query", "regex", "serde", diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index 574c68dea45..126c478545b 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -32,7 +32,7 @@ use cron::Schedule; use humantime::parse_duration; use quickwit_common::uri::Uri; use quickwit_doc_mapper::{DefaultDocMapperBuilder, DocMapper, DocMapping, Mode}; -use quickwit_proto::types::IndexId; +use quickwit_proto::types::{DocMappingUid, IndexId}; use serde::{Deserialize, Serialize}; pub use serialize::load_index_config_from_user_config; use tracing::warn; @@ -386,6 +386,7 @@ impl TestableForRegression for IndexConfig { ) .unwrap(); let doc_mapping = DocMapping { + doc_mapping_uid: DocMappingUid::default(), mode: Mode::default(), field_mappings: vec![ tenant_id_mapping, diff --git a/quickwit/quickwit-doc-mapper/Cargo.toml b/quickwit/quickwit-doc-mapper/Cargo.toml index cbc3cba567a..f48cc1072d6 100644 --- a/quickwit/quickwit-doc-mapper/Cargo.toml +++ b/quickwit/quickwit-doc-mapper/Cargo.toml @@ -33,6 +33,7 @@ utoipa = { workspace = true } quickwit-common = { workspace = true } quickwit-datetime = { workspace = true } quickwit-macros = { workspace = true } +quickwit-proto = { workspace = true } quickwit-query = { workspace = true } [dev-dependencies] diff --git a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs index 537ba1f460e..7fceda0edf4 100644 --- a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs +++ b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs @@ -23,6 +23,7 @@ use std::num::NonZeroU32; use anyhow::{bail, Context}; use fnv::FnvHashSet; use quickwit_common::PathHasher; +use quickwit_proto::types::DocMappingUid; use quickwit_query::create_default_quickwit_tokenizer_manager; use quickwit_query::query_ast::QueryAst; use quickwit_query::tokenizers::TokenizerManager; @@ -59,6 +60,8 @@ const FIELD_PRESENCE_FIELD: Field = Field::from_field_id(0u32); #[derive(Clone, Serialize, Deserialize)] #[serde(into = "DefaultDocMapperBuilder", try_from = "DefaultDocMapperBuilder")] pub struct DefaultDocMapper { + /// The UID of the doc mapping. + doc_mapping_uid: DocMappingUid, /// Field in which the source should be stored. /// This field is only valid when using the schema associated with the default /// doc mapper, and therefore cannot be used in the `query` method. @@ -98,13 +101,6 @@ pub struct DefaultDocMapper { tokenizer_manager: TokenizerManager, } -impl DefaultDocMapper { - /// Default maximum number of partitions. - pub fn default_max_num_partitions() -> NonZeroU32 { - DocMapping::default_max_num_partitions() - } -} - fn validate_timestamp_field( timestamp_field_path: &str, mapping_root_node: &MappingNode, @@ -142,6 +138,7 @@ impl From for DefaultDocMapperBuilder { None }; let doc_mapping = DocMapping { + doc_mapping_uid: default_doc_mapper.doc_mapping_uid, mode: default_doc_mapper.mode, field_mappings: default_doc_mapper.field_mappings.into(), timestamp_field: default_doc_mapper.timestamp_field_name, @@ -281,6 +278,7 @@ impl TryFrom for DefaultDocMapper { } } Ok(DefaultDocMapper { + doc_mapping_uid: doc_mapping.doc_mapping_uid, schema, index_field_presence: doc_mapping.index_field_presence, source_field, @@ -556,6 +554,10 @@ impl, U: Clone> Iterator for ZipCloneable { #[typetag::serde(name = "default")] impl DocMapper for DefaultDocMapper { + fn doc_mapping_uid(&self) -> DocMappingUid { + self.doc_mapping_uid + } + fn doc_from_json_obj( &self, json_obj: JsonObject, diff --git a/quickwit/quickwit-doc-mapper/src/doc_mapper.rs b/quickwit/quickwit-doc-mapper/src/doc_mapper.rs index 37fcc385fa1..e7b75012cb2 100644 --- a/quickwit/quickwit-doc-mapper/src/doc_mapper.rs +++ b/quickwit/quickwit-doc-mapper/src/doc_mapper.rs @@ -24,6 +24,7 @@ use std::ops::Bound; use anyhow::Context; use dyn_clone::{clone_trait_object, DynClone}; +use quickwit_proto::types::DocMappingUid; use quickwit_query::query_ast::QueryAst; use quickwit_query::tokenizers::TokenizerManager; use serde_json::Value as JsonValue; @@ -47,6 +48,9 @@ use crate::{DocParsingError, QueryParserError}; /// - supplying a tantivy [`Schema`] #[typetag::serde(tag = "type")] pub trait DocMapper: Send + Sync + Debug + DynClone + 'static { + /// Returns the unique identifier of the doc mapper. + fn doc_mapping_uid(&self) -> DocMappingUid; + /// Transforms a JSON object into a tantivy [`Document`] according to the rules /// defined for the `DocMapper`. fn doc_from_json_obj( diff --git a/quickwit/quickwit-doc-mapper/src/doc_mapping.rs b/quickwit/quickwit-doc-mapper/src/doc_mapping.rs index 75400c7a206..ae2e984f747 100644 --- a/quickwit/quickwit-doc-mapper/src/doc_mapping.rs +++ b/quickwit/quickwit-doc-mapper/src/doc_mapping.rs @@ -20,6 +20,7 @@ use std::collections::BTreeSet; use std::num::NonZeroU32; +use quickwit_proto::types::DocMappingUid; use serde::{Deserialize, Serialize}; use crate::{FieldMappingEntry, QuickwitJsonOptions, TokenizerEntry}; @@ -99,6 +100,10 @@ impl Default for Mode { #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, utoipa::ToSchema)] #[serde(deny_unknown_fields)] pub struct DocMapping { + /// UID of the doc mapping. + #[serde(default = "DocMappingUid::new")] + pub doc_mapping_uid: DocMappingUid, + /// Defines how unmapped fields should be handled. #[serde_multikey( deserializer = Mode::from_parts, @@ -177,6 +182,7 @@ mod tests { #[test] fn test_doc_mapping_serde_roundtrip() { let doc_mapping = DocMapping { + doc_mapping_uid: DocMappingUid::new(), mode: Mode::Strict, field_mappings: vec![ FieldMappingEntry { diff --git a/quickwit/quickwit-proto/src/types/doc_mapping_uid.rs b/quickwit/quickwit-proto/src/types/doc_mapping_uid.rs new file mode 100644 index 00000000000..029d331d88c --- /dev/null +++ b/quickwit/quickwit-proto/src/types/doc_mapping_uid.rs @@ -0,0 +1,167 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::borrow::Cow; +use std::fmt; + +use serde::de::Error; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +pub use ulid::Ulid; + +use crate::types::pipeline_uid::ULID_SIZE; + +/// A doc mapping UID identifies a document across segments, splits, and indexes. +#[derive(Clone, Copy, Default, Hash, Eq, PartialEq, Ord, PartialOrd)] +pub struct DocMappingUid(Ulid); + +impl fmt::Debug for DocMappingUid { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "DocMapping({})", self.0) + } +} + +impl fmt::Display for DocMappingUid { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.0.fmt(f) + } +} + +impl From for DocMappingUid { + fn from(ulid: Ulid) -> Self { + Self(ulid) + } +} + +impl DocMappingUid { + /// Creates a new random doc mapping UID. + pub fn new() -> Self { + Self(Ulid::new()) + } + + #[cfg(any(test, feature = "testsuite"))] + pub fn for_test(ulid_u128: u128) -> DocMappingUid { + Self(Ulid::from(ulid_u128)) + } +} + +impl<'de> Deserialize<'de> for DocMappingUid { + fn deserialize(deserializer: D) -> Result + where D: Deserializer<'de> { + let ulid_str: Cow<'de, str> = Cow::deserialize(deserializer)?; + let ulid = Ulid::from_string(&ulid_str).map_err(D::Error::custom)?; + Ok(Self(ulid)) + } +} + +impl Serialize for DocMappingUid { + fn serialize(&self, serializer: S) -> Result + where S: Serializer { + serializer.collect_str(&self.0) + } +} + +impl prost::Message for DocMappingUid { + fn encode_raw(&self, buf: &mut B) + where B: prost::bytes::BufMut { + // TODO: when `bytes::encode` supports `&[u8]`, we can remove this allocation. + prost::encoding::bytes::encode(1u32, &self.0.to_bytes().to_vec(), buf); + } + + fn merge_field( + &mut self, + tag: u32, + wire_type: prost::encoding::WireType, + buf: &mut B, + ctx: prost::encoding::DecodeContext, + ) -> ::core::result::Result<(), prost::DecodeError> + where + B: prost::bytes::Buf, + { + const STRUCT_NAME: &str = "DocMappingUid"; + + match tag { + 1u32 => { + let mut buffer = Vec::with_capacity(ULID_SIZE); + + prost::encoding::bytes::merge(wire_type, &mut buffer, buf, ctx).map_err( + |mut error| { + error.push(STRUCT_NAME, "doc_mapping_uid"); + error + }, + )?; + let ulid_bytes: [u8; ULID_SIZE] = + buffer.try_into().map_err(|buffer: Vec| { + prost::DecodeError::new(format!( + "invalid length for field `doc_mapping_uid`, expected 16 bytes, got {}", + buffer.len() + )) + })?; + self.0 = Ulid::from_bytes(ulid_bytes); + Ok(()) + } + _ => prost::encoding::skip_field(wire_type, tag, buf, ctx), + } + } + + #[inline] + fn encoded_len(&self) -> usize { + prost::encoding::key_len(1u32) + + prost::encoding::encoded_len_varint(ULID_SIZE as u64) + + ULID_SIZE + } + + fn clear(&mut self) { + self.0 = Ulid::nil(); + } +} + +#[cfg(test)] +mod tests { + use bytes::Bytes; + use prost::Message; + + use super::*; + + #[test] + fn test_doc_mapping_uid_json_serde_roundtrip() { + let doc_mapping_uid = DocMappingUid::default(); + let serialized = serde_json::to_string(&doc_mapping_uid).unwrap(); + assert_eq!(serialized, r#""00000000000000000000000000""#); + + let deserialized: DocMappingUid = serde_json::from_str(&serialized).unwrap(); + assert_eq!(deserialized, doc_mapping_uid); + } + + #[test] + fn test_doc_mapping_uid_prost_serde_roundtrip() { + let doc_mapping_uid = DocMappingUid::new(); + + let encoded = doc_mapping_uid.encode_to_vec(); + assert_eq!( + DocMappingUid::decode(Bytes::from(encoded)).unwrap(), + doc_mapping_uid + ); + + let encoded = doc_mapping_uid.encode_length_delimited_to_vec(); + assert_eq!( + DocMappingUid::decode_length_delimited(Bytes::from(encoded)).unwrap(), + doc_mapping_uid + ); + } +} diff --git a/quickwit/quickwit-proto/src/types/mod.rs b/quickwit/quickwit-proto/src/types/mod.rs index f1fa01a852a..f45f3efbc55 100644 --- a/quickwit/quickwit-proto/src/types/mod.rs +++ b/quickwit/quickwit-proto/src/types/mod.rs @@ -28,11 +28,13 @@ use serde::{Deserialize, Serialize}; use tracing::warn; pub use ulid::Ulid; +mod doc_mapping_uid; mod index_uid; mod pipeline_uid; mod position; mod shard_id; +pub use doc_mapping_uid::DocMappingUid; pub use index_uid::IndexUid; pub use pipeline_uid::PipelineUid; pub use position::Position; diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index 97a4a3c75d4..4c44cd25be6 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -1267,6 +1267,7 @@ mod tests { LeafSearchResponse, PartialHit, SearchRequest, SortByValue, SortField, SortOrder, SortValue, SplitSearchError, }; + use quickwit_proto::types::DocMappingUid; use tantivy::collector::Collector; use tantivy::TantivyDocument; @@ -1338,6 +1339,10 @@ mod tests { #[typetag::serde(name = "mock")] impl quickwit_doc_mapper::DocMapper for MockDocMapper { + fn doc_mapping_uid(&self) -> DocMappingUid { + DocMappingUid::default() + } + // Required methods fn doc_from_json_obj( &self,