Skip to content

Commit

Permalink
Add doc_mapping_uid field to DocMapper
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Jun 6, 2024
1 parent e69d3c5 commit 330ea55
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 8 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use cron::Schedule;
use humantime::parse_duration;
use quickwit_common::uri::Uri;
use quickwit_doc_mapper::{DefaultDocMapperBuilder, DocMapper, DocMapping, Mode};
use quickwit_proto::types::IndexId;
use quickwit_proto::types::{DocMappingUid, IndexId};
use serde::{Deserialize, Serialize};
pub use serialize::load_index_config_from_user_config;
use tracing::warn;
Expand Down Expand Up @@ -386,6 +386,7 @@ impl TestableForRegression for IndexConfig {
)
.unwrap();
let doc_mapping = DocMapping {
doc_mapping_uid: DocMappingUid::default(),
mode: Mode::default(),
field_mappings: vec![
tenant_id_mapping,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-doc-mapper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ utoipa = { workspace = true }
quickwit-common = { workspace = true }
quickwit-datetime = { workspace = true }
quickwit-macros = { workspace = true }
quickwit-proto = { workspace = true }
quickwit-query = { workspace = true }

[dev-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::num::NonZeroU32;
use anyhow::{bail, Context};
use fnv::FnvHashSet;
use quickwit_common::PathHasher;
use quickwit_proto::types::DocMappingUid;
use quickwit_query::create_default_quickwit_tokenizer_manager;
use quickwit_query::query_ast::QueryAst;
use quickwit_query::tokenizers::TokenizerManager;
Expand Down Expand Up @@ -59,6 +60,8 @@ const FIELD_PRESENCE_FIELD: Field = Field::from_field_id(0u32);
#[derive(Clone, Serialize, Deserialize)]
#[serde(into = "DefaultDocMapperBuilder", try_from = "DefaultDocMapperBuilder")]
pub struct DefaultDocMapper {
/// The UID of the doc mapping.
doc_mapping_uid: DocMappingUid,
/// Field in which the source should be stored.
/// This field is only valid when using the schema associated with the default
/// doc mapper, and therefore cannot be used in the `query` method.
Expand Down Expand Up @@ -98,13 +101,6 @@ pub struct DefaultDocMapper {
tokenizer_manager: TokenizerManager,
}

impl DefaultDocMapper {
/// Default maximum number of partitions.
pub fn default_max_num_partitions() -> NonZeroU32 {
DocMapping::default_max_num_partitions()
}
}

fn validate_timestamp_field(
timestamp_field_path: &str,
mapping_root_node: &MappingNode,
Expand Down Expand Up @@ -142,6 +138,7 @@ impl From<DefaultDocMapper> for DefaultDocMapperBuilder {
None
};
let doc_mapping = DocMapping {
doc_mapping_uid: default_doc_mapper.doc_mapping_uid,
mode: default_doc_mapper.mode,
field_mappings: default_doc_mapper.field_mappings.into(),
timestamp_field: default_doc_mapper.timestamp_field_name,
Expand Down Expand Up @@ -281,6 +278,7 @@ impl TryFrom<DefaultDocMapperBuilder> for DefaultDocMapper {
}
}
Ok(DefaultDocMapper {
doc_mapping_uid: doc_mapping.doc_mapping_uid,
schema,
index_field_presence: doc_mapping.index_field_presence,
source_field,
Expand Down Expand Up @@ -556,6 +554,10 @@ impl<T, I: Iterator<Item = T>, U: Clone> Iterator for ZipCloneable<T, I, U> {

#[typetag::serde(name = "default")]
impl DocMapper for DefaultDocMapper {
fn doc_mapping_uid(&self) -> DocMappingUid {
self.doc_mapping_uid
}

fn doc_from_json_obj(
&self,
json_obj: JsonObject,
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-doc-mapper/src/doc_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::ops::Bound;

use anyhow::Context;
use dyn_clone::{clone_trait_object, DynClone};
use quickwit_proto::types::DocMappingUid;
use quickwit_query::query_ast::QueryAst;
use quickwit_query::tokenizers::TokenizerManager;
use serde_json::Value as JsonValue;
Expand All @@ -47,6 +48,9 @@ use crate::{DocParsingError, QueryParserError};
/// - supplying a tantivy [`Schema`]
#[typetag::serde(tag = "type")]
pub trait DocMapper: Send + Sync + Debug + DynClone + 'static {
/// Returns the unique identifier of the doc mapper.
fn doc_mapping_uid(&self) -> DocMappingUid;

/// Transforms a JSON object into a tantivy [`Document`] according to the rules
/// defined for the `DocMapper`.
fn doc_from_json_obj(
Expand Down
6 changes: 6 additions & 0 deletions quickwit/quickwit-doc-mapper/src/doc_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use std::collections::BTreeSet;
use std::num::NonZeroU32;

use quickwit_proto::types::DocMappingUid;
use serde::{Deserialize, Serialize};

use crate::{FieldMappingEntry, QuickwitJsonOptions, TokenizerEntry};
Expand Down Expand Up @@ -99,6 +100,10 @@ impl Default for Mode {
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct DocMapping {
/// UID of the doc mapping.
#[serde(default = "DocMappingUid::new")]
pub doc_mapping_uid: DocMappingUid,

/// Defines how unmapped fields should be handled.
#[serde_multikey(
deserializer = Mode::from_parts,
Expand Down Expand Up @@ -177,6 +182,7 @@ mod tests {
#[test]
fn test_doc_mapping_serde_roundtrip() {
let doc_mapping = DocMapping {
doc_mapping_uid: DocMappingUid::new(),
mode: Mode::Strict,
field_mappings: vec![
FieldMappingEntry {
Expand Down
167 changes: 167 additions & 0 deletions quickwit/quickwit-proto/src/types/doc_mapping_uid.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::borrow::Cow;
use std::fmt;

use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub use ulid::Ulid;

use crate::types::pipeline_uid::ULID_SIZE;

/// A doc mapping UID identifies a document across segments, splits, and indexes.
#[derive(Clone, Copy, Default, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct DocMappingUid(Ulid);

impl fmt::Debug for DocMappingUid {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "DocMapping({})", self.0)
}
}

impl fmt::Display for DocMappingUid {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.0.fmt(f)
}
}

impl From<Ulid> for DocMappingUid {
fn from(ulid: Ulid) -> Self {
Self(ulid)
}
}

impl DocMappingUid {
/// Creates a new random doc mapping UID.
pub fn new() -> Self {
Self(Ulid::new())
}

#[cfg(any(test, feature = "testsuite"))]
pub fn for_test(ulid_u128: u128) -> DocMappingUid {
Self(Ulid::from(ulid_u128))
}
}

impl<'de> Deserialize<'de> for DocMappingUid {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where D: Deserializer<'de> {
let ulid_str: Cow<'de, str> = Cow::deserialize(deserializer)?;
let ulid = Ulid::from_string(&ulid_str).map_err(D::Error::custom)?;
Ok(Self(ulid))
}
}

impl Serialize for DocMappingUid {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: Serializer {
serializer.collect_str(&self.0)
}
}

impl prost::Message for DocMappingUid {
fn encode_raw<B>(&self, buf: &mut B)
where B: prost::bytes::BufMut {
// TODO: when `bytes::encode` supports `&[u8]`, we can remove this allocation.
prost::encoding::bytes::encode(1u32, &self.0.to_bytes().to_vec(), buf);
}

fn merge_field<B>(
&mut self,
tag: u32,
wire_type: prost::encoding::WireType,
buf: &mut B,
ctx: prost::encoding::DecodeContext,
) -> ::core::result::Result<(), prost::DecodeError>
where
B: prost::bytes::Buf,
{
const STRUCT_NAME: &str = "DocMappingUid";

match tag {
1u32 => {
let mut buffer = Vec::with_capacity(ULID_SIZE);

prost::encoding::bytes::merge(wire_type, &mut buffer, buf, ctx).map_err(
|mut error| {
error.push(STRUCT_NAME, "doc_mapping_uid");
error
},
)?;
let ulid_bytes: [u8; ULID_SIZE] =
buffer.try_into().map_err(|buffer: Vec<u8>| {
prost::DecodeError::new(format!(
"invalid length for field `doc_mapping_uid`, expected 16 bytes, got {}",
buffer.len()
))
})?;
self.0 = Ulid::from_bytes(ulid_bytes);
Ok(())
}
_ => prost::encoding::skip_field(wire_type, tag, buf, ctx),
}
}

#[inline]
fn encoded_len(&self) -> usize {
prost::encoding::key_len(1u32)
+ prost::encoding::encoded_len_varint(ULID_SIZE as u64)
+ ULID_SIZE
}

fn clear(&mut self) {
self.0 = Ulid::nil();
}
}

#[cfg(test)]
mod tests {
use bytes::Bytes;
use prost::Message;

use super::*;

#[test]
fn test_doc_mapping_uid_json_serde_roundtrip() {
let doc_mapping_uid = DocMappingUid::default();
let serialized = serde_json::to_string(&doc_mapping_uid).unwrap();
assert_eq!(serialized, r#""00000000000000000000000000""#);

let deserialized: DocMappingUid = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized, doc_mapping_uid);
}

#[test]
fn test_doc_mapping_uid_prost_serde_roundtrip() {
let doc_mapping_uid = DocMappingUid::new();

let encoded = doc_mapping_uid.encode_to_vec();
assert_eq!(
DocMappingUid::decode(Bytes::from(encoded)).unwrap(),
doc_mapping_uid
);

let encoded = doc_mapping_uid.encode_length_delimited_to_vec();
assert_eq!(
DocMappingUid::decode_length_delimited(Bytes::from(encoded)).unwrap(),
doc_mapping_uid
);
}
}
2 changes: 2 additions & 0 deletions quickwit/quickwit-proto/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ use serde::{Deserialize, Serialize};
use tracing::warn;
pub use ulid::Ulid;

mod doc_mapping_uid;
mod index_uid;
mod pipeline_uid;
mod position;
mod shard_id;

pub use doc_mapping_uid::DocMappingUid;
pub use index_uid::IndexUid;
pub use pipeline_uid::PipelineUid;
pub use position::Position;
Expand Down
5 changes: 5 additions & 0 deletions quickwit/quickwit-search/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,7 @@ mod tests {
LeafSearchResponse, PartialHit, SearchRequest, SortByValue, SortField, SortOrder,
SortValue, SplitSearchError,
};
use quickwit_proto::types::DocMappingUid;
use tantivy::collector::Collector;
use tantivy::TantivyDocument;

Expand Down Expand Up @@ -1338,6 +1339,10 @@ mod tests {

#[typetag::serde(name = "mock")]
impl quickwit_doc_mapper::DocMapper for MockDocMapper {
fn doc_mapping_uid(&self) -> DocMappingUid {
DocMappingUid::default()
}

// Required methods
fn doc_from_json_obj(
&self,
Expand Down

0 comments on commit 330ea55

Please sign in to comment.