diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 3f90a43b3e4..41033fa244e 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5900,6 +5900,7 @@ dependencies = [ "regex", "serde", "serde_json", + "serde_json_borrow", "serde_yaml", "siphasher", "tantivy", @@ -6016,6 +6017,7 @@ dependencies = [ "rand_distr", "serde", "serde_json", + "serde_json_borrow", "tempfile", "thiserror", "tokio", @@ -7336,6 +7338,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_json_borrow" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a256097ff1654ce1975402cb5a2bfa2cad3cc3199e1d704bf303b386fc971f3c" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "serde_path_to_error" version = "0.1.16" diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 7fcd89e8fbe..777e4f19dc7 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -211,6 +211,7 @@ sea-query-binder = { version = "0.5", features = [ # ^1.0.184 due to serde-rs/serde#2538 serde = { version = "1.0.184", features = ["derive", "rc"] } serde_json = "1.0" +serde_json_borrow = "0.5" serde_qs = { version = "0.12", features = ["warp"] } serde_with = "3.8.0" serde_yaml = "0.9" diff --git a/quickwit/quickwit-doc-mapper/Cargo.toml b/quickwit/quickwit-doc-mapper/Cargo.toml index f48cc1072d6..abb391f3ebb 100644 --- a/quickwit/quickwit-doc-mapper/Cargo.toml +++ b/quickwit/quickwit-doc-mapper/Cargo.toml @@ -23,6 +23,7 @@ once_cell = { workspace = true } regex = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +serde_json_borrow = { workspace = true } siphasher = { workspace = true } tantivy = { workspace = true } thiserror = { workspace = true } diff --git a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/date_time_type.rs b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/date_time_type.rs index 47913c30794..6ebdcc2d960 100644 --- a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/date_time_type.rs +++ b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/date_time_type.rs @@ -71,6 +71,38 @@ impl Default for QuickwitDateTimeOptions { } impl QuickwitDateTimeOptions { + pub(crate) fn validate_json( + &self, + json_value: &serde_json_borrow::Value, + ) -> Result<(), String> { + match json_value { + serde_json_borrow::Value::Number(timestamp) => { + // `.as_f64()` actually converts floats to integers, so we must check for integers + // first. + if let Some(timestamp_i64) = timestamp.as_i64() { + quickwit_datetime::parse_timestamp_int(timestamp_i64, &self.input_formats.0)?; + Ok(()) + } else if let Some(timestamp_f64) = timestamp.as_f64() { + quickwit_datetime::parse_timestamp_float(timestamp_f64, &self.input_formats.0)?; + Ok(()) + } else { + Err(format!( + "failed to convert timestamp to f64 ({:?}). this should never happen", + serde_json::Number::from(*timestamp) + )) + } + } + serde_json_borrow::Value::Str(date_time_str) => { + quickwit_datetime::parse_date_time_str(date_time_str, &self.input_formats.0)?; + Ok(()) + } + _ => Err(format!( + "failed to parse datetime: expected a float, integer, or string, got \ + `{json_value}`" + )), + } + } + pub(crate) fn parse_json(&self, json_value: &JsonValue) -> Result { let date_time = match json_value { JsonValue::Number(timestamp) => { diff --git a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs index ca640d6fae2..0dfc283dea4 100644 --- a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs +++ b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs @@ -29,6 +29,7 @@ use quickwit_query::query_ast::QueryAst; use quickwit_query::tokenizers::TokenizerManager; use serde::{Deserialize, Serialize}; use serde_json::{self, Value as JsonValue}; +use serde_json_borrow::Map as BorrowedJsonMap; use tantivy::query::Query; use tantivy::schema::document::{ReferenceValue, ReferenceValueLeaf}; use tantivy::schema::{ @@ -507,7 +508,7 @@ impl DocMapper for DefaultDocMapper { self.doc_mapping_uid } - fn validate_json_obj(&self, json_obj: &JsonObject) -> Result<(), DocParsingError> { + fn validate_json_obj(&self, json_obj: &BorrowedJsonMap) -> Result<(), DocParsingError> { let is_strict = self.mode.mode_type() == ModeType::Strict; let mut field_path = Vec::new(); self.field_mappings diff --git a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/field_mapping_entry.rs b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/field_mapping_entry.rs index 262fab89d3d..817667d6b60 100644 --- a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/field_mapping_entry.rs +++ b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/field_mapping_entry.rs @@ -206,15 +206,7 @@ impl BinaryFormat { } /// Parses the `serde_json::Value` into `tantivy::schema::Value`. - pub fn parse_json(&self, json_val: &JsonValue) -> Result { - let byte_str = if let JsonValue::String(byte_str) = json_val { - byte_str - } else { - return Err(format!( - "expected {} string, got `{json_val}`", - self.as_str() - )); - }; + pub fn parse_str(&self, byte_str: &str) -> Result, String> { let payload = match self { Self::Base64 => BASE64_STANDARD .decode(byte_str) @@ -225,6 +217,20 @@ impl BinaryFormat { format!("expected hex string, got `{byte_str}`: {hex_decode_err}") })?, }; + Ok(payload) + } + + /// Parses the `serde_json::Value` into `tantivy::schema::Value`. + pub fn parse_json(&self, json_val: &JsonValue) -> Result { + let byte_str = if let JsonValue::String(byte_str) = json_val { + byte_str + } else { + return Err(format!( + "expected {} string, got `{json_val}`", + self.as_str() + )); + }; + let payload = self.parse_str(byte_str)?; Ok(TantivyValue::Bytes(payload)) } } diff --git a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs index 3b53d9a6592..d235630d262 100644 --- a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs +++ b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs @@ -25,6 +25,7 @@ use std::str::FromStr; use anyhow::bail; use itertools::Itertools; use serde_json::Value as JsonValue; +use serde_json_borrow::{Map as BorrowedJsonMap, Value as BorrowedJsonValue}; use tantivy::schema::{ BytesOptions, DateOptions, Field, IntoIpv6Addr, IpAddrOptions, JsonObjectOptions, NumericOptions, OwnedValue as TantivyValue, SchemaBuilder, TextOptions, @@ -158,47 +159,55 @@ pub(crate) fn map_primitive_json_to_tantivy(value: JsonValue) -> Option Result<(), String> { + fn validate_from_json(&self, json_val: &BorrowedJsonValue) -> Result<(), String> { match self { LeafType::Text(_) => { - if let JsonValue::String(_) = json_val { + if json_val.is_string() { Ok(()) } else { Err(format!("expected string, got `{json_val}`")) } } LeafType::I64(numeric_options) => { - i64::from_json_to_self(json_val, numeric_options.coerce).map(|_| ()) + i64::validate_json(json_val, numeric_options.coerce).map(|_| ()) } LeafType::U64(numeric_options) => { - u64::from_json_to_self(json_val, numeric_options.coerce).map(|_| ()) + u64::validate_json(json_val, numeric_options.coerce).map(|_| ()) } LeafType::F64(numeric_options) => { - f64::from_json_to_self(json_val, numeric_options.coerce).map(|_| ()) + f64::validate_json(json_val, numeric_options.coerce).map(|_| ()) } LeafType::Bool(_) => { - if let JsonValue::Bool(_) = json_val { + if json_val.is_bool() { Ok(()) } else { Err(format!("expected boolean, got `{json_val}`")) } } LeafType::IpAddr(_) => { - let JsonValue::String(ip_address) = json_val else { + let Some(ip_address) = json_val.as_str() else { return Err(format!("expected string, got `{json_val}`")); }; - IpAddr::from_str(ip_address.as_str()) + IpAddr::from_str(ip_address) .map_err(|err| format!("failed to parse IP address `{ip_address}`: {err}"))?; Ok(()) } LeafType::DateTime(date_time_options) => { - date_time_options.parse_json(json_val).map(|_| ()) + date_time_options.validate_json(json_val).map(|_| ()) } LeafType::Bytes(binary_options) => { - binary_options.input_format.parse_json(json_val).map(|_| ()) + if let Some(byte_str) = json_val.as_str() { + binary_options.input_format.parse_str(byte_str)?; + Ok(()) + } else { + Err(format!( + "expected {} string, got `{json_val}`", + binary_options.input_format.as_str() + )) + } } LeafType::Json(_) => { - if let JsonValue::Object(_) = json_val { + if json_val.is_object() { Ok(()) } else { Err(format!("expected object, got `{json_val}`")) @@ -327,14 +336,14 @@ pub(crate) struct MappingLeaf { impl MappingLeaf { fn validate_from_json( &self, - json_value: &JsonValue, + json_value: &BorrowedJsonValue, path: &[&str], ) -> Result<(), DocParsingError> { if json_value.is_null() { // We just ignore `null`. return Ok(()); } - if let JsonValue::Array(els) = json_value { + if let BorrowedJsonValue::Array(els) = json_value { if self.cardinality == Cardinality::SingleValued { return Err(DocParsingError::MultiValuesNotSupported(path.join("."))); } @@ -691,6 +700,47 @@ fn insert_json_val( trait NumVal: Sized + FromStr + ToString + Into { fn from_json_number(num: &serde_json::Number) -> Option; + fn validate_json(json_val: &BorrowedJsonValue, coerce: bool) -> Result<(), String> { + match json_val { + BorrowedJsonValue::Number(num_val) => { + let num_val = serde_json::Number::from(*num_val); + Self::from_json_number(&num_val).ok_or_else(|| { + format!( + "expected {}, got inconvertible JSON number `{}`", + type_name::(), + num_val + ) + })?; + Ok(()) + } + BorrowedJsonValue::Str(str_val) => { + if coerce { + str_val.parse::().map_err(|_| { + format!( + "failed to coerce JSON string `\"{str_val}\"` to {}", + type_name::() + ) + })?; + Ok(()) + } else { + Err(format!( + "expected JSON number, got string `\"{str_val}\"`. enable coercion to {} \ + with the `coerce` parameter in the field mapping", + type_name::() + )) + } + } + _ => { + let message = if coerce { + format!("expected JSON number or string, got `{json_val}`") + } else { + format!("expected JSON number, got `{json_val}`") + }; + Err(message) + } + } + } + fn from_json_to_self(json_val: &JsonValue, coerce: bool) -> Result { match json_val { JsonValue::Number(num_val) => Self::from_json_number(num_val).ok_or_else(|| { @@ -873,13 +923,13 @@ impl MappingNode { pub fn validate_from_json<'a>( &self, - json_obj: &'a serde_json::Map, + json_obj: &'a BorrowedJsonMap, strict_mode: bool, path: &mut Vec<&'a str>, ) -> Result<(), DocParsingError> { - for (field_name, json_val) in json_obj { + for (field_name, json_val) in json_obj.iter() { if let Some(child_tree) = self.branches.get(field_name) { - path.push(field_name.as_str()); + path.push(field_name); child_tree.validate_from_json(json_val, path, strict_mode)?; path.pop(); } else if strict_mode { @@ -981,7 +1031,7 @@ pub(crate) enum MappingTree { impl MappingTree { fn validate_from_json<'a>( &self, - json_value: &'a JsonValue, + json_value: &'a BorrowedJsonValue<'a>, field_path: &mut Vec<&'a str>, strict_mode: bool, ) -> Result<(), DocParsingError> { @@ -990,7 +1040,7 @@ impl MappingTree { mapping_leaf.validate_from_json(json_value, field_path) } MappingTree::Node(mapping_node) => { - if let JsonValue::Object(json_obj) = json_value { + if let Some(json_obj) = json_value.as_object() { mapping_node.validate_from_json(json_obj, strict_mode, field_path) } else { Err(DocParsingError::ValueError( diff --git a/quickwit/quickwit-doc-mapper/src/doc_mapper.rs b/quickwit/quickwit-doc-mapper/src/doc_mapper.rs index 6f2e44794be..52e99aa586f 100644 --- a/quickwit/quickwit-doc-mapper/src/doc_mapper.rs +++ b/quickwit/quickwit-doc-mapper/src/doc_mapper.rs @@ -28,6 +28,7 @@ use quickwit_proto::types::DocMappingUid; use quickwit_query::query_ast::QueryAst; use quickwit_query::tokenizers::TokenizerManager; use serde_json::Value as JsonValue; +use serde_json_borrow::Map as BorrowedJsonMap; use tantivy::query::Query; use tantivy::schema::{Field, FieldType, OwnedValue as Value, Schema}; use tantivy::{TantivyDocument as Document, Term}; @@ -52,7 +53,9 @@ pub trait DocMapper: Send + Sync + Debug + DynClone + 'static { fn doc_mapping_uid(&self) -> DocMappingUid; /// Validates a JSON object according to the doc mapper. - fn validate_json_obj(&self, json_obj: &JsonObject) -> Result<(), DocParsingError>; + fn validate_json_obj(&self, _json_obj: &BorrowedJsonMap) -> Result<(), DocParsingError> { + Ok(()) + } /// Transforms a JSON object into a tantivy [`Document`] according to the rules /// defined for the `DocMapper`. @@ -396,6 +399,16 @@ mod tests { ); } + #[track_caller] + fn test_validate_doc_aux( + doc_mapper: &dyn DocMapper, + doc_json: &str, + ) -> Result<(), DocParsingError> { + let json_val: serde_json_borrow::Value = serde_json::from_str(doc_json).unwrap(); + let json_obj = json_val.as_object().unwrap(); + doc_mapper.validate_json_obj(json_obj) + } + #[test] fn test_validate_doc() { const JSON_CONFIG_VALUE: &str = r#"{ @@ -449,74 +462,58 @@ mod tests { }"#; let doc_mapper = serde_json::from_str::(JSON_CONFIG_VALUE).unwrap(); { - let valid_doc_value = serde_json::json!({ "body": "toto" }); - let valid_doc_json = valid_doc_value.as_object().unwrap(); - assert!(doc_mapper.validate_json_obj(valid_doc_json).is_ok()); + assert!(test_validate_doc_aux(&doc_mapper, r#"{ "body": "toto"}"#).is_ok()); } { - let valid_doc_value = serde_json::json!({ "response_time": "toto" }); - let valid_doc_json = valid_doc_value.as_object().unwrap(); assert!(matches!( - doc_mapper.validate_json_obj(valid_doc_json).unwrap_err(), + test_validate_doc_aux(&doc_mapper, r#"{ "response_time": "toto"}"#).unwrap_err(), DocParsingError::ValueError(_, _) )); } { - // coercion is supported - let valid_doc_value = serde_json::json!({ "response_time": "2.3" }); - let valid_doc_json = valid_doc_value.as_object().unwrap(); - assert!(doc_mapper.validate_json_obj(valid_doc_json).is_ok()); + assert!(test_validate_doc_aux(&doc_mapper, r#"{ "response_time": "2.3"}"#).is_ok(),); } { // coercion disabled - let valid_doc_value = serde_json::json!({ "response_time_no_coercion": "2.3" }); - let valid_doc_json = valid_doc_value.as_object().unwrap(); assert!(matches!( - doc_mapper.validate_json_obj(valid_doc_json).unwrap_err(), + test_validate_doc_aux(&doc_mapper, r#"{"response_time_no_coercion": "2.3"}"#) + .unwrap_err(), DocParsingError::ValueError(_, _) )); } { - // coercion disabled - let valid_doc_value = serde_json::json!({ "response_time": [2.3] }); - let valid_doc_json = valid_doc_value.as_object().unwrap(); assert!(matches!( - doc_mapper.validate_json_obj(valid_doc_json).unwrap_err(), + test_validate_doc_aux(&doc_mapper, r#"{"response_time": [2.3]}"#).unwrap_err(), DocParsingError::MultiValuesNotSupported(_) )); } { - let valid_doc_value = serde_json::json!({ "attributes": { "numbers": [-2] }}); - let valid_doc_json = valid_doc_value.as_object().unwrap(); - assert!(doc_mapper.validate_json_obj(valid_doc_json).is_ok()); + assert!( + test_validate_doc_aux(&doc_mapper, r#"{"attributes": {"numbers": [-2]}}"#).is_ok() + ); } } #[test] fn test_validate_doc_mode() { + const DOC: &str = r#"{ "whatever": "blop" }"#; { const JSON_CONFIG_VALUE: &str = r#"{ "mode": "strict", "field_mappings": [] }"#; let doc_mapper = serde_json::from_str::(JSON_CONFIG_VALUE).unwrap(); - let valid_doc_value = serde_json::json!({ "response_time": "toto" }); - let valid_doc_json = valid_doc_value.as_object().unwrap(); assert!(matches!( - doc_mapper.validate_json_obj(valid_doc_json).unwrap_err(), + test_validate_doc_aux(&doc_mapper, DOC).unwrap_err(), DocParsingError::NoSuchFieldInSchema(_) )); } { const JSON_CONFIG_VALUE: &str = r#"{ "mode": "lenient", "field_mappings": [] }"#; let doc_mapper = serde_json::from_str::(JSON_CONFIG_VALUE).unwrap(); - let valid_doc_value = serde_json::json!({ "response_time": "toto" }); - let valid_doc_json = valid_doc_value.as_object().unwrap(); - assert!(doc_mapper.validate_json_obj(valid_doc_json).is_ok()); + assert!(test_validate_doc_aux(&doc_mapper, DOC).is_ok()); } { const JSON_CONFIG_VALUE: &str = r#"{ "mode": "dynamic", "field_mappings": [] }"#; let doc_mapper = serde_json::from_str::(JSON_CONFIG_VALUE).unwrap(); - let valid_doc_value = serde_json::json!({ "response_time": "toto" }); - let valid_doc_json = valid_doc_value.as_object().unwrap(); - assert!(doc_mapper.validate_json_obj(valid_doc_json).is_ok()); + assert!(test_validate_doc_aux(&doc_mapper, DOC).is_ok()); } } diff --git a/quickwit/quickwit-ingest/Cargo.toml b/quickwit/quickwit-ingest/Cargo.toml index f8370c3d3b4..495a6cee2df 100644 --- a/quickwit/quickwit-ingest/Cargo.toml +++ b/quickwit/quickwit-ingest/Cargo.toml @@ -26,6 +26,7 @@ prost = { workspace = true } rand = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +serde_json_borrow = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tonic = { workspace = true } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs b/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs index 2ae098e6937..2748828131b 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs @@ -28,7 +28,7 @@ use quickwit_proto::ingest::{ DocBatchV2, IngestV2Error, IngestV2Result, ParseFailure, ParseFailureReason, }; use quickwit_proto::types::DocMappingUid; -use serde_json::Value as JsonValue; +use serde_json_borrow::Value as JsonValue; use tracing::info; /// Attempts to get the doc mapper identified by the given doc mapping UID `doc_mapping_uid` from @@ -77,7 +77,7 @@ fn validate_doc_batch_impl( ) -> (DocBatchV2, Vec) { let mut parse_failures: Vec = Vec::new(); for (doc_uid, doc) in doc_batch.docs() { - let Ok(json_doc) = serde_json::from_slice::(&doc) else { + let Ok(json_doc) = serde_json::from_slice::(&doc) else { let parse_failure = ParseFailure { doc_uid: Some(doc_uid), reason: ParseFailureReason::InvalidJson as i32, diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index 903417f7168..215ea7e146e 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -1337,13 +1337,6 @@ mod tests { #[typetag::serde(name = "mock")] impl quickwit_doc_mapper::DocMapper for MockDocMapper { - fn validate_json_obj( - &self, - _: &quickwit_doc_mapper::JsonObject, - ) -> Result<(), quickwit_doc_mapper::DocParsingError> { - unimplemented!() - } - fn doc_mapping_uid(&self) -> DocMappingUid { DocMappingUid::default() }