From 9ad451e24ce3ebf73b0161f6cd6e6341b1d2495a Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Mon, 20 May 2024 17:28:16 +0800 Subject: [PATCH 1/2] update tantivy --- quickwit/Cargo.lock | 23 +------ quickwit/Cargo.toml | 3 +- .../src/default_doc_mapper/default_mapper.rs | 64 +++++++------------ .../src/default_doc_mapper/mapping_tree.rs | 22 +++++-- .../src/actors/doc_processor.rs | 2 +- quickwit/quickwit-search/src/fetch_docs.rs | 13 ++-- 6 files changed, 53 insertions(+), 74 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index da8d17e0b2a..0b57997f954 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -4342,14 +4342,6 @@ dependencies = [ "loom", ] -[[package]] -name = "oneshot" -version = "0.1.6" -source = "git+https://github.com/fulmicoton/oneshot.git?rev=c10a3ba#c10a3ba32adc189acf68acd579ba9755075ecb4d" -dependencies = [ - "loom", -] - [[package]] name = "onig" version = "6.4.0" @@ -4675,7 +4667,6 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" [[package]] name = "ownedbytes" version = "0.7.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=5b7cca1#5b7cca13e5136c7e3b86f645be38b08bed2d6f78" dependencies = [ "stable_deref_trait", ] @@ -5884,7 +5875,7 @@ dependencies = [ "libz-sys", "mockall", "once_cell", - "oneshot 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "oneshot", "openssl", "proptest", "prost", @@ -8008,8 +7999,7 @@ dependencies = [ [[package]] name = "tantivy" -version = "0.23.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=5b7cca1#5b7cca13e5136c7e3b86f645be38b08bed2d6f78" +version = "0.22.0" dependencies = [ "aho-corasick", "arc-swap", @@ -8033,7 +8023,7 @@ dependencies = [ "measure_time", "memmap2", "once_cell", - "oneshot 0.1.6 (git+https://github.com/fulmicoton/oneshot.git?rev=c10a3ba)", + "oneshot", "rayon", "regex", "rust-stemmers", @@ -8061,7 +8051,6 @@ dependencies = [ [[package]] name = "tantivy-bitpacker" version = "0.6.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=5b7cca1#5b7cca13e5136c7e3b86f645be38b08bed2d6f78" dependencies = [ "bitpacking", ] @@ -8069,7 +8058,6 @@ dependencies = [ [[package]] name = "tantivy-columnar" version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=5b7cca1#5b7cca13e5136c7e3b86f645be38b08bed2d6f78" dependencies = [ "downcast-rs", "fastdivide", @@ -8084,7 +8072,6 @@ dependencies = [ [[package]] name = "tantivy-common" version = "0.7.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=5b7cca1#5b7cca13e5136c7e3b86f645be38b08bed2d6f78" dependencies = [ "async-trait", "byteorder", @@ -8107,7 +8094,6 @@ dependencies = [ [[package]] name = "tantivy-query-grammar" version = "0.22.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=5b7cca1#5b7cca13e5136c7e3b86f645be38b08bed2d6f78" dependencies = [ "nom", ] @@ -8115,7 +8101,6 @@ dependencies = [ [[package]] name = "tantivy-sstable" version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=5b7cca1#5b7cca13e5136c7e3b86f645be38b08bed2d6f78" dependencies = [ "tantivy-bitpacker", "tantivy-common", @@ -8126,7 +8111,6 @@ dependencies = [ [[package]] name = "tantivy-stacker" version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=5b7cca1#5b7cca13e5136c7e3b86f645be38b08bed2d6f78" dependencies = [ "murmurhash32", "rand_distr", @@ -8136,7 +8120,6 @@ dependencies = [ [[package]] name = "tantivy-tokenizer-api" version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=5b7cca1#5b7cca13e5136c7e3b86f645be38b08bed2d6f78" dependencies = [ "serde", ] diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index a207e2a7521..a5191299a40 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -318,7 +318,7 @@ quickwit-serve = { path = "quickwit-serve" } quickwit-storage = { path = "quickwit-storage" } quickwit-telemetry = { path = "quickwit-telemetry" } -tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "5b7cca1", default-features = false, features = [ +tantivy = { path = "../../../tantivy/compact_doc", default-features = false, features = [ "lz4-compression", "mmap", "quickwit", @@ -346,4 +346,5 @@ sasl2-sys = { git = "https://github.com/quickwit-oss/rust-sasl/", rev = "daca921 debug = false [profile.release] +debug = true lto = "thin" diff --git a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs index 3daf65462d0..be886232e95 100644 --- a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs +++ b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs @@ -29,8 +29,11 @@ use quickwit_query::tokenizers::TokenizerManager; use serde::{Deserialize, Serialize}; use serde_json::{self, Value as JsonValue}; use tantivy::query::Query; +use tantivy::schema::document::{ + CompactDocObjectIter, CompactDocValue, ReferenceValue, ReferenceValueLeaf, +}; use tantivy::schema::{ - Field, FieldType, FieldValue, OwnedValue as TantivyValue, Schema, INDEXED, STORED, + Field, FieldType, OwnedValue as TantivyValue, Schema, Value, INDEXED, STORED, }; use tantivy::TantivyDocument as Document; @@ -458,27 +461,18 @@ fn tantivy_value_to_json(val: TantivyValue) -> JsonValue { } #[inline] -fn populate_field_presence_for_json_value( - json_value: &TantivyValue, +fn populate_field_presence_for_json_value<'a>( + json_value: CompactDocValue<'a>, path_hasher: &PathHasher, is_expand_dots_enabled: bool, output: &mut FnvHashSet, ) { - match json_value { - TantivyValue::Null => {} - TantivyValue::Bool(_) - | TantivyValue::F64(_) - | TantivyValue::I64(_) - | TantivyValue::U64(_) - | TantivyValue::PreTokStr(_) - | TantivyValue::Date(_) - | TantivyValue::Facet(_) - | TantivyValue::Bytes(_) - | TantivyValue::IpAddr(_) - | TantivyValue::Str(_) => { + match json_value.as_value() { + ReferenceValue::Leaf(ReferenceValueLeaf::Null) => {} + ReferenceValue::Leaf(_) => { output.insert(path_hasher.finish()); } - TantivyValue::Array(items) => { + ReferenceValue::Array(items) => { for item in items { populate_field_presence_for_json_value( item, @@ -488,7 +482,7 @@ fn populate_field_presence_for_json_value( ); } } - TantivyValue::Object(json_obj) => { + ReferenceValue::Object(json_obj) => { populate_field_presence_for_json_obj( json_obj, path_hasher.clone(), @@ -500,7 +494,7 @@ fn populate_field_presence_for_json_value( } fn populate_field_presence_for_json_obj( - json_obj: &[(String, TantivyValue)], + json_obj: CompactDocObjectIter, path_hasher: PathHasher, is_expand_dots_enabled: bool, output: &mut FnvHashSet, @@ -577,17 +571,11 @@ impl DocMapper for DefaultDocMapper { let mut dynamic_json_obj = serde_json::Map::default(); let mut field_path = Vec::new(); - let mut document = Document::default(); + let mut document = Document::with_capacity(document_len as usize * 2); + let json_obj = JsonValue::Object(json_obj); if let Some(source_field) = self.source_field { - document.add_object( - source_field, - json_obj - .clone() - .into_iter() - .map(|(key, val)| (key, TantivyValue::from(val))) - .collect(), - ); + document.add_field_value(source_field, &json_obj); } let mode = self.mode.mode_type(); @@ -610,17 +598,11 @@ impl DocMapper for DefaultDocMapper { for (concatenate_dynamic_field, value) in zip_cloneable(self.concatenate_dynamic_fields.iter(), value) { - document.add_field_value(*concatenate_dynamic_field, value); + document.add_field_value(*concatenate_dynamic_field, &value); } } } - document.add_object( - dynamic_field, - dynamic_json_obj - .into_iter() - .map(|(key, val)| (key, TantivyValue::from(val))) - .collect(), - ); + document.add_field_value(dynamic_field, &JsonValue::Object(dynamic_json_obj)); } } @@ -632,18 +614,18 @@ impl DocMapper for DefaultDocMapper { if self.index_field_presence { let mut field_presence_hashes: FnvHashSet = FnvHashSet::with_capacity_and_hasher( - document.field_values().len(), + document.field_values().count(), Default::default(), ); - for FieldValue { field, value } in document.field_values() { - let field_entry = self.schema.get_field_entry(*field); + for (field, value) in document.field_values() { + let field_entry = self.schema.get_field_entry(field); if !field_entry.is_indexed() || field_entry.is_fast() { // We are using an tantivy's ExistsQuery for fast fields. continue; } let mut path_hasher: PathHasher = PathHasher::default(); path_hasher.append(&field.field_id().to_le_bytes()[..]); - if let TantivyValue::Object(json_obj) = value { + if let ReferenceValue::Object(json_obj) = value { let is_expand_dots_enabled: bool = if let FieldType::JsonObject(json_options) = field_entry.field_type() { json_options.is_expand_dots_enabled() @@ -661,11 +643,13 @@ impl DocMapper for DefaultDocMapper { } } for field_presence_hash in field_presence_hashes { - document.add_field_value(FIELD_PRESENCE_FIELD, field_presence_hash); + document.add_leaf_field_value(FIELD_PRESENCE_FIELD, field_presence_hash); } } self.check_missing_required_fields(&document)?; + + document.shrink_to_fit(); Ok((partition, document)) } diff --git a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs index 4fdf1af030c..8836bfba4e7 100644 --- a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs +++ b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs @@ -318,7 +318,7 @@ impl MappingLeaf { .map_err(|err_msg| DocParsingError::ValueError(path.join("."), err_msg))?; for concat_value in concat_values { for field in &self.concatenate { - document.add_field_value(*field, concat_value.clone()); + document.add_field_value(*field, &concat_value); } } } @@ -326,7 +326,7 @@ impl MappingLeaf { .typ .value_from_json(el_json_val) .map_err(|err_msg| DocParsingError::ValueError(path.join("."), err_msg))?; - document.add_field_value(self.field, value); + document.add_field_value(self.field, &value); } return Ok(()); } @@ -338,7 +338,7 @@ impl MappingLeaf { .map_err(|err_msg| DocParsingError::ValueError(path.join("."), err_msg))?; for concat_value in concat_values { for field in &self.concatenate { - document.add_field_value(*field, concat_value.clone()); + document.add_field_value(*field, &concat_value.clone()); } } } @@ -346,7 +346,7 @@ impl MappingLeaf { .typ .value_from_json(json_val) .map_err(|err_msg| DocParsingError::ValueError(path.join("."), err_msg))?; - document.add_field_value(self.field, value); + document.add_field_value(self.field, &value); Ok(()) } @@ -633,12 +633,16 @@ impl MappingNode { pub fn doc_from_json( &self, - json_obj: serde_json::Map, + json_obj: JsonValue, mode: ModeType, document: &mut Document, path: &mut Vec, dynamic_json_obj: &mut serde_json::Map, ) -> Result<(), DocParsingError> { + let json_obj = match json_obj { + JsonValue::Object(json_obj) => json_obj, + _ => panic!("internal error: expected json object"), + }; for (field_name, val) in json_obj { if let Some(child_tree) = self.branches.get(&field_name) { path.push(field_name); @@ -733,7 +737,13 @@ impl MappingTree { } MappingTree::Node(mapping_node) => { if let JsonValue::Object(json_obj) = json_value { - mapping_node.doc_from_json(json_obj, mode, document, path, dynamic_json_obj) + mapping_node.doc_from_json( + JsonValue::Object(json_obj), + mode, + document, + path, + dynamic_json_obj, + ) } else { Err(DocParsingError::ValueError( path.join("."), diff --git a/quickwit/quickwit-indexing/src/actors/doc_processor.rs b/quickwit/quickwit-indexing/src/actors/doc_processor.rs index 26b11155d8f..88fa6322209 100644 --- a/quickwit/quickwit-indexing/src/actors/doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/doc_processor.rs @@ -36,7 +36,7 @@ use quickwit_opentelemetry::otlp::{ use quickwit_proto::types::{IndexId, SourceId}; use serde::Serialize; use serde_json::Value as JsonValue; -use tantivy::schema::{Field, Value}; +use tantivy::schema::Field; use tantivy::{DateTime, TantivyDocument}; use thiserror::Error; use tokio::runtime::Handle; diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index ed6da6347aa..11a481faf22 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -29,7 +29,8 @@ use quickwit_proto::search::{ }; use quickwit_storage::Storage; use tantivy::query::Query; -use tantivy::schema::{Document as DocumentTrait, Field, OwnedValue, TantivyDocument, Value}; +use tantivy::schema::document::CompactDocValue; +use tantivy::schema::{Document as DocumentTrait, Field, TantivyDocument, Value}; use tantivy::snippet::SnippetGenerator; use tantivy::{ReloadPolicy, Score, Searcher, Term}; use tracing::{error, Instrument}; @@ -184,10 +185,10 @@ async fn fetch_docs_in_split( .context("open-index-for-split")?; // we add an executor here, we could add it in open_index_with_caches, though we should verify // the side-effect before - let tantivy_executor = crate::search_thread_pool() - .get_underlying_rayon_thread_pool() - .into(); - index.set_executor(tantivy_executor); + //let tantivy_executor = crate::search_thread_pool() + //.get_underlying_rayon_thread_pool() + //.into(); + //index.set_executor(tantivy_executor); let index_reader = index .reader_builder() // the docs are presorted so a cache size of NUM_CONCURRENT_REQUESTS is fine @@ -274,7 +275,7 @@ impl FieldsSnippetGenerator { fn snippets_from_field_values( &self, field_name: &str, - field_values: Vec<&OwnedValue>, + field_values: Vec>, ) -> Option> { if let Some(snippet_generator) = self.field_generators.get(field_name) { let values = field_values From 6c7c731317592a35374bdef1ac5adc3d25763ced Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Mon, 27 May 2024 13:50:05 +0800 Subject: [PATCH 2/2] use serde_json_borrow instead of serde_json::Value --- quickwit/Cargo.lock | 48 ++++- quickwit/Cargo.toml | 5 +- quickwit/quickwit-doc-mapper/Cargo.toml | 2 + .../benches/routing_expression_bench.rs | 3 +- .../src/default_doc_mapper/default_mapper.rs | 190 +++++++++++++----- .../src/default_doc_mapper/mapping_tree.rs | 121 ++++++----- .../quickwit-doc-mapper/src/doc_mapper.rs | 32 ++- .../src/routing_expression/mod.rs | 44 ++-- quickwit/quickwit-indexing/Cargo.toml | 1 + .../src/actors/doc_processor.rs | 33 ++- quickwit/quickwit-search/src/collector.rs | 2 +- quickwit/quickwit-search/src/fetch_docs.rs | 8 +- 12 files changed, 326 insertions(+), 163 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 0b57997f954..793eb296c57 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -3307,6 +3307,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.11" @@ -4342,6 +4351,14 @@ dependencies = [ "loom", ] +[[package]] +name = "oneshot" +version = "0.1.6" +source = "git+https://github.com/fulmicoton/oneshot.git?rev=c10a3ba#c10a3ba32adc189acf68acd579ba9755075ecb4d" +dependencies = [ + "loom", +] + [[package]] name = "onig" version = "6.4.0" @@ -4667,6 +4684,7 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" [[package]] name = "ownedbytes" version = "0.7.0" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=2e3641c#2e3641c2ae970e8b78c256da2dac19b12d566139" dependencies = [ "stable_deref_trait", ] @@ -5821,6 +5839,7 @@ dependencies = [ "regex", "serde", "serde_json", + "serde_json_borrow", "serde_yaml", "siphasher", "tantivy", @@ -5875,7 +5894,7 @@ dependencies = [ "libz-sys", "mockall", "once_cell", - "oneshot", + "oneshot 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "openssl", "proptest", "prost", @@ -5899,6 +5918,7 @@ dependencies = [ "reqwest", "serde", "serde_json", + "serde_json_borrow", "tantivy", "tempfile", "thiserror", @@ -7252,6 +7272,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_json_borrow" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a256097ff1654ce1975402cb5a2bfa2cad3cc3199e1d704bf303b386fc971f3c" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "serde_path_to_error" version = "0.1.16" @@ -7999,7 +8029,8 @@ dependencies = [ [[package]] name = "tantivy" -version = "0.22.0" +version = "0.23.0" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=2e3641c#2e3641c2ae970e8b78c256da2dac19b12d566139" dependencies = [ "aho-corasick", "arc-swap", @@ -8015,7 +8046,7 @@ dependencies = [ "fs4", "futures-util", "htmlescape", - "itertools 0.12.1", + "itertools 0.13.0", "levenshtein_automata", "log", "lru", @@ -8023,7 +8054,7 @@ dependencies = [ "measure_time", "memmap2", "once_cell", - "oneshot", + "oneshot 0.1.6 (git+https://github.com/fulmicoton/oneshot.git?rev=c10a3ba)", "rayon", "regex", "rust-stemmers", @@ -8051,6 +8082,7 @@ dependencies = [ [[package]] name = "tantivy-bitpacker" version = "0.6.0" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=2e3641c#2e3641c2ae970e8b78c256da2dac19b12d566139" dependencies = [ "bitpacking", ] @@ -8058,10 +8090,11 @@ dependencies = [ [[package]] name = "tantivy-columnar" version = "0.3.0" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=2e3641c#2e3641c2ae970e8b78c256da2dac19b12d566139" dependencies = [ "downcast-rs", "fastdivide", - "itertools 0.12.1", + "itertools 0.13.0", "serde", "tantivy-bitpacker", "tantivy-common", @@ -8072,6 +8105,7 @@ dependencies = [ [[package]] name = "tantivy-common" version = "0.7.0" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=2e3641c#2e3641c2ae970e8b78c256da2dac19b12d566139" dependencies = [ "async-trait", "byteorder", @@ -8094,6 +8128,7 @@ dependencies = [ [[package]] name = "tantivy-query-grammar" version = "0.22.0" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=2e3641c#2e3641c2ae970e8b78c256da2dac19b12d566139" dependencies = [ "nom", ] @@ -8101,6 +8136,7 @@ dependencies = [ [[package]] name = "tantivy-sstable" version = "0.3.0" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=2e3641c#2e3641c2ae970e8b78c256da2dac19b12d566139" dependencies = [ "tantivy-bitpacker", "tantivy-common", @@ -8111,6 +8147,7 @@ dependencies = [ [[package]] name = "tantivy-stacker" version = "0.3.0" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=2e3641c#2e3641c2ae970e8b78c256da2dac19b12d566139" dependencies = [ "murmurhash32", "rand_distr", @@ -8120,6 +8157,7 @@ dependencies = [ [[package]] name = "tantivy-tokenizer-api" version = "0.3.0" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=2e3641c#2e3641c2ae970e8b78c256da2dac19b12d566139" dependencies = [ "serde", ] diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index a5191299a40..73827649c88 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -210,6 +210,7 @@ sea-query-binder = { version = "0.5", features = [ # ^1.0.184 due to serde-rs/serde#2538 serde = { version = "1.0.184", features = ["derive", "rc"] } serde_json = "1.0" +serde_json_borrow = { version = "0.5" } serde_qs = { version = "0.12", features = ["warp"] } serde_with = "3.8.0" serde_yaml = "0.9" @@ -318,7 +319,7 @@ quickwit-serve = { path = "quickwit-serve" } quickwit-storage = { path = "quickwit-storage" } quickwit-telemetry = { path = "quickwit-telemetry" } -tantivy = { path = "../../../tantivy/compact_doc", default-features = false, features = [ +tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "2e3641c", default-features = false, features = [ "lz4-compression", "mmap", "quickwit", @@ -346,5 +347,5 @@ sasl2-sys = { git = "https://github.com/quickwit-oss/rust-sasl/", rev = "daca921 debug = false [profile.release] -debug = true +#debug = true lto = "thin" diff --git a/quickwit/quickwit-doc-mapper/Cargo.toml b/quickwit/quickwit-doc-mapper/Cargo.toml index cbc3cba567a..1c0ecde1839 100644 --- a/quickwit/quickwit-doc-mapper/Cargo.toml +++ b/quickwit/quickwit-doc-mapper/Cargo.toml @@ -23,7 +23,9 @@ once_cell = { workspace = true } regex = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +serde_json_borrow = { workspace = true } siphasher = { workspace = true } +time = { workspace = true } tantivy = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } diff --git a/quickwit/quickwit-doc-mapper/benches/routing_expression_bench.rs b/quickwit/quickwit-doc-mapper/benches/routing_expression_bench.rs index a36ae1b048e..82947bb407e 100644 --- a/quickwit/quickwit-doc-mapper/benches/routing_expression_bench.rs +++ b/quickwit/quickwit-doc-mapper/benches/routing_expression_bench.rs @@ -19,7 +19,6 @@ use criterion::{criterion_group, criterion_main, Criterion, Throughput}; use quickwit_doc_mapper::{DocMapper, RoutingExpr}; -use serde_json::Value as JsonValue; const JSON_TEST_DATA: &str = include_str!("data/simple-routing-expression-bench.json"); @@ -48,7 +47,7 @@ pub fn simple_routing_expression_benchmark(c: &mut Criterion) { let doc_mapper: Box = serde_json::from_str(DOC_MAPPER_CONF).unwrap(); let lines: Vec<&str> = JSON_TEST_DATA.lines().map(|line| line.trim()).collect(); - let json_lines: Vec> = lines + let json_lines: Vec = lines .iter() .map(|line| serde_json::from_str(line).unwrap()) .collect(); diff --git a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs index be886232e95..37bf6d17dbe 100644 --- a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs +++ b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs @@ -35,7 +35,9 @@ use tantivy::schema::document::{ use tantivy::schema::{ Field, FieldType, OwnedValue as TantivyValue, Schema, Value, INDEXED, STORED, }; -use tantivy::TantivyDocument as Document; +use tantivy::{DateTime, TantivyDocument as Document}; +use time::format_description::well_known::Rfc3339; +use time::{OffsetDateTime, UtcOffset}; use super::field_mapping_entry::RAW_TOKENIZER_NAME; use super::DefaultDocMapperBuilder; @@ -461,8 +463,8 @@ fn tantivy_value_to_json(val: TantivyValue) -> JsonValue { } #[inline] -fn populate_field_presence_for_json_value<'a>( - json_value: CompactDocValue<'a>, +fn populate_field_presence_for_json_value( + json_value: CompactDocValue, path_hasher: &PathHasher, is_expand_dots_enabled: bool, output: &mut FnvHashSet, @@ -560,6 +562,91 @@ impl, U: Clone> Iterator for ZipCloneable { } } +#[derive(Debug, Clone)] +/// This wrapper is there to implement the `Value` trait for `serde_json_borrow::Value`. +/// We could move this to tantivy +pub struct BorrowJson<'a>(&'a serde_json_borrow::Value<'a>); +pub fn can_be_rfc3339_date_time(text: &str) -> bool { + if let Some(&first_byte) = text.as_bytes().first() { + if first_byte.is_ascii_digit() { + return true; + } + } + + false +} + +impl<'a> Value<'a> for BorrowJson<'a> { + type ArrayIter = JsonArrayIter<'a>; + type ObjectIter = JsonObjectIter<'a>; + + #[inline] + fn as_value(&self) -> ReferenceValue<'a, Self> { + match self.0 { + serde_json_borrow::Value::Null => ReferenceValueLeaf::Null.into(), + serde_json_borrow::Value::Bool(value) => ReferenceValueLeaf::Bool(*value).into(), + serde_json_borrow::Value::Number(number) => { + if let Some(val) = number.as_i64() { + ReferenceValueLeaf::I64(val).into() + } else if let Some(val) = number.as_u64() { + ReferenceValueLeaf::U64(val).into() + } else if let Some(val) = number.as_f64() { + ReferenceValueLeaf::F64(val).into() + } else { + panic!("Unsupported serde_json_borrow number"); + } + } + serde_json_borrow::Value::Str(text) => { + if can_be_rfc3339_date_time(text) { + match OffsetDateTime::parse(text, &Rfc3339) { + Ok(dt) => { + let dt_utc = dt.to_offset(UtcOffset::UTC); + ReferenceValueLeaf::Date(DateTime::from_utc(dt_utc)).into() + } + Err(_) => ReferenceValueLeaf::Str(text).into(), + } + } else { + ReferenceValueLeaf::Str(text).into() + } + } + serde_json_borrow::Value::Array(elements) => ReferenceValue::Array(JsonArrayIter { + iter: elements.iter(), + }), + serde_json_borrow::Value::Object(object) => ReferenceValue::Object(JsonObjectIter { + iter: object.as_vec().iter(), + }), + } + } +} + +/// A wrapper struct for an interator producing [Value]s. +pub struct JsonArrayIter<'a> { + iter: std::slice::Iter<'a, serde_json_borrow::Value<'a>>, +} + +impl<'a> Iterator for JsonArrayIter<'a> { + type Item = BorrowJson<'a>; + + fn next(&mut self) -> Option { + let value = self.iter.next()?; + Some(BorrowJson(value)) + } +} + +/// A wrapper struct for an interator producing [Value]s. +pub struct JsonObjectIter<'a> { + iter: std::slice::Iter<'a, (std::borrow::Cow<'a, str>, serde_json_borrow::Value<'a>)>, +} + +impl<'a> Iterator for JsonObjectIter<'a> { + type Item = (&'a str, BorrowJson<'a>); + + fn next(&mut self) -> Option { + let (key, value) = self.iter.next()?; + Some((key.as_ref(), BorrowJson(value))) + } +} + #[typetag::serde(name = "default")] impl DocMapper for DefaultDocMapper { fn doc_from_json_obj( @@ -567,20 +654,20 @@ impl DocMapper for DefaultDocMapper { json_obj: JsonObject, document_len: u64, ) -> Result<(Partition, Document), DocParsingError> { - let partition: Partition = self.partition_key.eval_hash(&json_obj); + let partition: Partition = self.partition_key.eval_hash(json_obj.get_value()); - let mut dynamic_json_obj = serde_json::Map::default(); + let mut dynamic_json_obj = serde_json_borrow::Map::default(); let mut field_path = Vec::new(); - let mut document = Document::with_capacity(document_len as usize * 2); + // CompactDoc in tantivy is typically similar in size as the serialized JSON + let mut document = Document::with_capacity(document_len as usize * 3 / 2); - let json_obj = JsonValue::Object(json_obj); if let Some(source_field) = self.source_field { - document.add_field_value(source_field, &json_obj); + document.add_field_value(source_field, BorrowJson(&json_obj)); } let mode = self.mode.mode_type(); self.field_mappings.doc_from_json( - json_obj, + json_obj.get_value(), mode, &mut document, &mut field_path, @@ -590,9 +677,11 @@ impl DocMapper for DefaultDocMapper { if let Some(dynamic_field) = self.dynamic_field { if !dynamic_json_obj.is_empty() { if !self.concatenate_dynamic_fields.is_empty() { - let json_obj_values = - JsonValueIterator::new(serde_json::Value::Object(dynamic_json_obj.clone())) - .flat_map(map_primitive_json_to_tantivy); + // TODO: work on serde_json_borrow::Value + let json_obj_values = JsonValueIterator::new(serde_json::Value::Object( + serde_json::Map::from(dynamic_json_obj.clone()), + )) + .flat_map(map_primitive_json_to_tantivy); for value in json_obj_values { for (concatenate_dynamic_field, value) in @@ -602,7 +691,10 @@ impl DocMapper for DefaultDocMapper { } } } - document.add_field_value(dynamic_field, &JsonValue::Object(dynamic_json_obj)); + document.add_field_value( + dynamic_field, + BorrowJson(&serde_json_borrow::Value::Object(dynamic_json_obj)), + ); } } @@ -625,7 +717,7 @@ impl DocMapper for DefaultDocMapper { } let mut path_hasher: PathHasher = PathHasher::default(); path_hasher.append(&field.field_id().to_le_bytes()[..]); - if let ReferenceValue::Object(json_obj) = value { + if let ReferenceValue::Object(json_obj) = value.as_value() { let is_expand_dots_enabled: bool = if let FieldType::JsonObject(json_options) = field_entry.field_type() { json_options.is_expand_dots_enabled() @@ -719,10 +811,10 @@ mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; - use itertools::Itertools; use quickwit_common::PathHasher; use quickwit_query::query_ast::query_ast_from_user_text; use serde_json::{self, json, Value as JsonValue}; + use serde_json_borrow::OwnedValue; use tantivy::schema::{FieldType, IndexRecordOption, OwnedValue as TantivyValue, Type, Value}; use super::DefaultDocMapper; @@ -733,8 +825,8 @@ mod tests { DYNAMIC_FIELD_NAME, FIELD_PRESENCE_FIELD_NAME, SOURCE_FIELD_NAME, }; - fn example_json_doc_value() -> JsonValue { - serde_json::json!({ + fn example_json_doc_value() -> OwnedValue { + let val = serde_json::json!({ "timestamp": 1586960586i64, "body": "20200415T072306-0700 INFO This is a great log", "response_date2": "2021-12-19T16:39:57+00:00", @@ -749,7 +841,8 @@ mod tests { "server.status": ["200", "201"], "server.payload": ["YQ==", "Yg=="] } - }) + }); + OwnedValue::from_string(serde_json::to_string(&val).unwrap()).unwrap() } const EXPECTED_JSON_PATHS_AND_VALUES: &str = r#"{ @@ -783,11 +876,9 @@ mod tests { #[test] fn test_parsing_document() { - let json_doc = example_json_doc_value(); + let json_doc: OwnedValue = example_json_doc_value(); let doc_mapper = crate::default_doc_mapper_for_test(); - let (_, document) = doc_mapper - .doc_from_json_obj(json_doc.as_object().unwrap().clone(), 0) - .unwrap(); + let (_, document) = doc_mapper.doc_from_json_obj(json_doc.clone(), 0).unwrap(); let schema = doc_mapper.schema(); // 9 property entry + 1 field "_source" + 2 fields values for "tags" field // + 2 values inf "server.status" field + 2 values in "server.payload" field @@ -796,33 +887,26 @@ mod tests { let expected_json_paths_and_values: HashMap = serde_json::from_str(EXPECTED_JSON_PATHS_AND_VALUES).unwrap(); let mut field_presences: HashSet = HashSet::new(); - for field_value in document.field_values() { - let field_name = schema.get_field_name(field_value.field()); + for (field, value) in document.field_values() { + let field_name = schema.get_field_name(field); if field_name == SOURCE_FIELD_NAME { - // some part of aws-sdk enables `preserve_order` on serde_json. - // to get "normal" equality, we are forced to recreate the json object - // with sorted keys. - let sorted_json_values = json_doc - .as_object() - .unwrap() - .clone() - .into_iter() - .sorted_by(|k1, k2| k1.0.cmp(&k2.0)) - .collect::>(); assert_eq!( - tantivy::schema::OwnedValue::from(field_value.value().as_value()), - tantivy::schema::OwnedValue::from(sorted_json_values) + tantivy::schema::OwnedValue::from(value), + tantivy::schema::OwnedValue::from(serde_json::Value::from( + json_doc.get_value() + )) ); } else if field_name == DYNAMIC_FIELD_NAME { assert_eq!( - serde_json::to_string(&field_value.value()).unwrap(), + serde_json::to_string(&tantivy::schema::OwnedValue::from(value)).unwrap(), r#"{"response_date2":"2021-12-19T16:39:57Z"}"# ); } else if field_name == FIELD_PRESENCE_FIELD_NAME { - let field_presence_u64 = field_value.value().as_u64().unwrap(); + let field_presence_u64 = value.as_u64().unwrap(); field_presences.insert(field_presence_u64); } else { - let value = serde_json::to_string(field_value.value()).unwrap(); + let value = + serde_json::to_string(&tantivy::schema::OwnedValue::from(value)).unwrap(); let is_value_in_expected_values = expected_json_paths_and_values .get(field_name) .unwrap() @@ -1270,12 +1354,15 @@ mod tests { let builder = serde_json::from_str::(doc_mapper).unwrap(); let doc_mapper = builder.try_build().unwrap(); let schema = doc_mapper.schema(); - let json_doc_value: JsonValue = serde_json::json!({ + let json_doc_value = serde_json_borrow::OwnedValue::from_str( + r#"{ "city": "tokio", "image": "YWJj" - }); + }"#, + ) + .unwrap(); let (_, document) = doc_mapper - .doc_from_json_obj(json_doc_value.as_object().unwrap().clone(), 0) + .doc_from_json_obj(json_doc_value.clone(), 0) .unwrap(); // 2 properties, + 1 value for "_source" + 2 for field presence. @@ -1288,18 +1375,21 @@ mod tests { ) .unwrap(); let mut field_presences: HashSet = HashSet::default(); - document.field_values().iter().for_each(|field_value| { - let field_name = schema.get_field_name(field_value.field()); + document.field_values().for_each(|(field, value)| { + let field_name = schema.get_field_name(field); if field_name == SOURCE_FIELD_NAME { assert_eq!( - tantivy::schema::OwnedValue::from(field_value.value().as_value()), - tantivy::schema::OwnedValue::from(json_doc_value.as_object().unwrap().clone()) + tantivy::schema::OwnedValue::from(value), + tantivy::schema::OwnedValue::from(serde_json::Value::from( + json_doc_value.get_value() + )) ); } else if field_name == FIELD_PRESENCE_FIELD_NAME { - let field_value_hash = field_value.value().as_u64().unwrap(); + let field_value_hash = value.as_u64().unwrap(); field_presences.insert(field_value_hash); } else { - let value = serde_json::to_string(field_value.value()).unwrap(); + let value = + serde_json::to_string(&tantivy::schema::OwnedValue::from(value)).unwrap(); let is_value_in_expected_values = expected_json_paths_and_values .get(field_name) .unwrap() @@ -1605,10 +1695,10 @@ mod tests { let schema = default_doc_mapper.schema(); let field = schema.get_field(field).unwrap(); let (_, doc) = default_doc_mapper.doc_from_json_str(document_json).unwrap(); - let vals: Vec<&TantivyValue> = doc.get_all(field).collect(); + let vals: Vec<_> = doc.get_all(field).collect(); assert_eq!(vals.len(), expected.len()); for (val, exp) in vals.into_iter().zip(expected.iter()) { - assert_eq!(val, exp); + assert_eq!(&TantivyValue::from(val), exp); } } diff --git a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs index 8836bfba4e7..f868f2c74f9 100644 --- a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs +++ b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs @@ -294,15 +294,15 @@ pub(crate) struct MappingLeaf { impl MappingLeaf { pub fn doc_from_json( &self, - json_val: JsonValue, + json_val: &serde_json_borrow::Value, document: &mut Document, - path: &mut [String], + path: &mut [&str], ) -> Result<(), DocParsingError> { if json_val.is_null() { // We just ignore `null`. return Ok(()); } - if let JsonValue::Array(els) = json_val { + if let serde_json_borrow::Value::Array(els) = json_val { if self.cardinality == Cardinality::SingleValue { return Err(DocParsingError::MultiValuesNotSupported(path.join("."))); } @@ -314,7 +314,7 @@ impl MappingLeaf { if !self.concatenate.is_empty() { let concat_values = self .typ - .tantivy_string_value_from_json(el_json_val.clone()) + .tantivy_string_value_from_json(el_json_val.clone().into()) .map_err(|err_msg| DocParsingError::ValueError(path.join("."), err_msg))?; for concat_value in concat_values { for field in &self.concatenate { @@ -324,7 +324,7 @@ impl MappingLeaf { } let value = self .typ - .value_from_json(el_json_val) + .value_from_json(el_json_val.clone().into()) .map_err(|err_msg| DocParsingError::ValueError(path.join("."), err_msg))?; document.add_field_value(self.field, &value); } @@ -334,7 +334,7 @@ impl MappingLeaf { if !self.concatenate.is_empty() { let concat_values = self .typ - .tantivy_string_value_from_json(json_val.clone()) + .tantivy_string_value_from_json(json_val.clone().into()) .map_err(|err_msg| DocParsingError::ValueError(path.join("."), err_msg))?; for concat_value in concat_values { for field in &self.concatenate { @@ -344,7 +344,7 @@ impl MappingLeaf { } let value = self .typ - .value_from_json(json_val) + .value_from_json(json_val.clone().into()) .map_err(|err_msg| DocParsingError::ValueError(path.join("."), err_msg))?; document.add_field_value(self.field, &value); Ok(()) @@ -542,19 +542,20 @@ pub(crate) struct MappingNode { branches_order: Vec, } -fn get_or_insert_path<'a>( - path: &[String], - mut dynamic_json_obj: &'a mut serde_json::Map, -) -> &'a mut serde_json::Map { +fn get_or_insert_path<'a, 'b>( + path: &[&'a str], + mut dynamic_json_obj: &'b mut serde_json_borrow::Map<'a>, +) -> &'b mut serde_json_borrow::Map<'a> { for field_name in path { - let child_json_val = dynamic_json_obj - .entry(field_name.clone()) - .or_insert_with(|| JsonValue::Object(Default::default())); - dynamic_json_obj = if let JsonValue::Object(child_map) = child_json_val { - child_map - } else { - panic!("Expected Json object."); - }; + let child_json_val = dynamic_json_obj.insert_or_get_mut( + field_name, + serde_json_borrow::Value::Object(Default::default()), + ); + if let serde_json_borrow::Value::Object(ref mut child_map) = child_json_val { + dynamic_json_obj = child_map; + continue; + } + panic!("Expected Json object."); } dynamic_json_obj } @@ -631,20 +632,20 @@ impl MappingNode { field_mapping_entries } - pub fn doc_from_json( + pub fn doc_from_json<'a: 'b, 'b>( &self, - json_obj: JsonValue, + json_obj: &'a serde_json_borrow::Value<'a>, mode: ModeType, document: &mut Document, - path: &mut Vec, - dynamic_json_obj: &mut serde_json::Map, + path: &mut Vec<&'a str>, + dynamic_json_obj: &mut serde_json_borrow::Map<'b>, ) -> Result<(), DocParsingError> { let json_obj = match json_obj { - JsonValue::Object(json_obj) => json_obj, + serde_json_borrow::Value::Object(json_obj) => json_obj, _ => panic!("internal error: expected json object"), }; - for (field_name, val) in json_obj { - if let Some(child_tree) = self.branches.get(&field_name) { + for (field_name, val) in json_obj.iter() { + if let Some(child_tree) = self.branches.get(field_name) { path.push(field_name); child_tree.doc_from_json(val, mode, document, path, dynamic_json_obj)?; path.pop(); @@ -654,9 +655,9 @@ impl MappingNode { // In lenient mode we simply ignore these unmapped fields. } ModeType::Dynamic => { - let dynamic_json_obj_after_path = - get_or_insert_path(path, dynamic_json_obj); - dynamic_json_obj_after_path.insert(field_name, val); + let dynamic_json_obj_at_path = get_or_insert_path(path, dynamic_json_obj); + // TODO: Remove the clone + dynamic_json_obj_at_path.insert(field_name, val.clone()); } ModeType::Strict => { path.push(field_name); @@ -723,27 +724,21 @@ pub(crate) enum MappingTree { } impl MappingTree { - fn doc_from_json( + fn doc_from_json<'a: 'b, 'b>( &self, - json_value: JsonValue, + json_value: &'a serde_json_borrow::Value<'a>, mode: ModeType, document: &mut Document, - path: &mut Vec, - dynamic_json_obj: &mut serde_json::Map, + path: &mut Vec<&'a str>, + dynamic_json_obj: &mut serde_json_borrow::Map<'b>, ) -> Result<(), DocParsingError> { match self { MappingTree::Leaf(mapping_leaf) => { mapping_leaf.doc_from_json(json_value, document, path) } MappingTree::Node(mapping_node) => { - if let JsonValue::Object(json_obj) = json_value { - mapping_node.doc_from_json( - JsonValue::Object(json_obj), - mode, - document, - path, - dynamic_json_obj, - ) + if let serde_json_borrow::Value::Object(_json_obj) = json_value { + mapping_node.doc_from_json(json_value, mode, document, path, dynamic_json_obj) } else { Err(DocParsingError::ValueError( path.join("."), @@ -1105,7 +1100,7 @@ fn build_mapping_from_field_type<'a>( mod tests { use std::net::IpAddr; - use serde_json::{json, Value as JsonValue}; + use serde_json::json; use tantivy::schema::{Field, IntoIpv6Addr, OwnedValue as TantivyValue, Value}; use tantivy::{DateTime, TantivyDocument as Document}; use time::macros::datetime; @@ -1142,8 +1137,7 @@ mod tests { #[test] fn test_get_or_insert_path() { let mut map = Default::default(); - super::get_or_insert_path(&["a".to_string(), "b".to_string()], &mut map) - .insert("c".to_string(), JsonValue::from(3u64)); + super::get_or_insert_path(&["a", "b"], &mut map).insert("c", (3u64).into()); assert_eq!( &serde_json::to_value(&map).unwrap(), &serde_json::json!({ @@ -1154,8 +1148,7 @@ mod tests { } }) ); - super::get_or_insert_path(&["a".to_string(), "b".to_string()], &mut map) - .insert("d".to_string(), JsonValue::from(2u64)); + super::get_or_insert_path(&["a", "b"], &mut map).insert("d", (2u64).into()); assert_eq!( &serde_json::to_value(&map).unwrap(), &serde_json::json!({ @@ -1167,8 +1160,7 @@ mod tests { } }) ); - super::get_or_insert_path(&["e".to_string()], &mut map) - .insert("f".to_string(), JsonValue::from(5u64)); + super::get_or_insert_path(&["e"], &mut map).insert("f", (5u64).into()); assert_eq!( &serde_json::to_value(&map).unwrap(), &serde_json::json!({ @@ -1181,7 +1173,7 @@ mod tests { "e": { "f": 5u64 } }) ); - super::get_or_insert_path(&[], &mut map).insert("g".to_string(), JsonValue::from(6u64)); + super::get_or_insert_path(&[], &mut map).insert("g", (6u64).into()); assert_eq!( &serde_json::to_value(&map).unwrap(), &serde_json::json!({ @@ -1298,12 +1290,12 @@ mod tests { let mut document = Document::default(); let mut path = Vec::new(); leaf_entry - .doc_from_json(json!([true, false, true]), &mut document, &mut path) + .doc_from_json(&(vec![true, false, true]).into(), &mut document, &mut path) .unwrap(); assert_eq!(document.len(), 3); let values: Vec = document .get_all(field) - .flat_map(|val| (&val).as_bool()) + .flat_map(|val| val.as_bool()) .collect(); assert_eq!(&values, &[true, false, true]) } @@ -1350,12 +1342,12 @@ mod tests { let mut document = Document::default(); let mut path = Vec::new(); leaf_entry - .doc_from_json(serde_json::json!([10u64, 20u64]), &mut document, &mut path) + .doc_from_json(&(vec![10u64, 20u64]).into(), &mut document, &mut path) .unwrap(); assert_eq!(document.len(), 2); let values: Vec = document .get_all(field) - .flat_map(|val| (&val).as_i64()) + .flat_map(|val| val.as_i64()) .collect(); assert_eq!(&values, &[10i64, 20i64]); } @@ -1373,7 +1365,7 @@ mod tests { let mut document = Document::default(); let mut path = Vec::new(); leaf_entry - .doc_from_json(serde_json::json!(null), &mut document, &mut path) + .doc_from_json(&serde_json_borrow::Value::Null, &mut document, &mut path) .unwrap(); assert_eq!(document.len(), 0); } @@ -1391,7 +1383,7 @@ mod tests { let mut document = Document::default(); let mut path = Vec::new(); leaf_entry - .doc_from_json(serde_json::json!(10u64), &mut document, &mut path) + .doc_from_json(&(10u64).into(), &mut document, &mut path) .unwrap(); assert_eq!(document.len(), 1); assert_eq!(document.get_first(field).unwrap().as_i64().unwrap(), 10i64); @@ -1408,10 +1400,14 @@ mod tests { concatenate: Vec::new(), }; let mut document = Document::default(); - let mut path = vec!["root".to_string(), "my_field".to_string()]; + let mut path = vec!["root", "my_field"]; let parse_err = leaf_entry .doc_from_json( - serde_json::json!([10u64, [1u64, 2u64]]), + &(vec![ + serde_json_borrow::Value::from(10u64), + (vec![1u64, 2u64]).into(), + ]) + .into(), &mut document, &mut path, ) @@ -1554,13 +1550,14 @@ mod tests { concatenate: Vec::new(), }; let mut document = Document::default(); - let mut path = vec!["root".to_string(), "my_field".to_string()]; + let mut path = vec!["root", "my_field"]; leaf_entry .doc_from_json( - serde_json::json!([ + &(vec![ + "dGhpcyBpcyBhIGJhc2U2NCBlbmNvZGVkIHN0cmluZw==", "dGhpcyBpcyBhIGJhc2U2NCBlbmNvZGVkIHN0cmluZw==", - "dGhpcyBpcyBhIGJhc2U2NCBlbmNvZGVkIHN0cmluZw==" - ]), + ]) + .into(), &mut document, &mut path, ) @@ -1568,7 +1565,7 @@ mod tests { assert_eq!(document.len(), 2); let bytes_vec: Vec<&[u8]> = document .get_all(field) - .flat_map(|val| (&val).as_bytes()) + .flat_map(|val| val.as_bytes()) .collect(); assert_eq!( &bytes_vec[..], diff --git a/quickwit/quickwit-doc-mapper/src/doc_mapper.rs b/quickwit/quickwit-doc-mapper/src/doc_mapper.rs index 04860c47c97..e590fee081d 100644 --- a/quickwit/quickwit-doc-mapper/src/doc_mapper.rs +++ b/quickwit/quickwit-doc-mapper/src/doc_mapper.rs @@ -34,7 +34,7 @@ use tantivy::{TantivyDocument as Document, Term}; pub type Partition = u64; /// An alias for serde_json's object type. -pub type JsonObject = serde_json::Map; +pub type JsonObject = serde_json_borrow::OwnedValue; use crate::{DocParsingError, QueryParserError}; @@ -60,21 +60,33 @@ pub trait DocMapper: Send + Sync + Debug + DynClone + 'static { &self, json_doc: &[u8], ) -> Result<(Partition, Document), DocParsingError> { - let json_obj: JsonObject = serde_json::from_slice(json_doc).map_err(|_| { - let json_doc_sample: String = std::str::from_utf8(json_doc) - .map(|doc_str| doc_str.chars().take(20).chain("...".chars()).collect()) - .unwrap_or_else(|_| "document contains some invalid UTF-8 characters".to_string()); - DocParsingError::NotJsonObject(json_doc_sample) + let json_str = String::from_utf8(json_doc.to_vec()).map_err(|_| { + DocParsingError::NotJsonObject( + "document contains some invalid UTF-8 characters".to_string(), + ) })?; + let json_obj: JsonObject = + serde_json_borrow::OwnedValue::parse_from(json_str).map_err(|_| { + let json_doc_sample: String = std::str::from_utf8(json_doc) + .map(|doc_str| doc_str.chars().take(20).chain("...".chars()).collect()) + .unwrap_or_else(|_| { + "document contains some invalid UTF-8 characters".to_string() + }); + DocParsingError::NotJsonObject(json_doc_sample) + })?; + self.doc_from_json_obj(json_obj, json_doc.len() as u64) } /// Parses a JSON string into a tantivy [`Document`]. fn doc_from_json_str(&self, json_doc: &str) -> Result<(Partition, Document), DocParsingError> { - let json_obj: JsonObject = serde_json::from_str(json_doc).map_err(|_| { - let json_doc_sample: String = json_doc.chars().take(20).chain("...".chars()).collect(); - DocParsingError::NotJsonObject(json_doc_sample) - })?; + let json_obj: JsonObject = serde_json_borrow::OwnedValue::parse_from(json_doc.to_string()) + .map_err(|_| { + let json_doc_sample: String = + json_doc.chars().take(20).chain("...".chars()).collect(); + DocParsingError::NotJsonObject(json_doc_sample) + })?; + self.doc_from_json_obj(json_obj, json_doc.len() as u64) } diff --git a/quickwit/quickwit-doc-mapper/src/routing_expression/mod.rs b/quickwit/quickwit-doc-mapper/src/routing_expression/mod.rs index 374281e85f9..90f46dbcdf0 100644 --- a/quickwit/quickwit-doc-mapper/src/routing_expression/mod.rs +++ b/quickwit/quickwit-doc-mapper/src/routing_expression/mod.rs @@ -23,7 +23,7 @@ use std::hash::{Hash, Hasher}; use std::str::FromStr; use std::sync::Arc; -use serde_json::Value as JsonValue; +use serde_json_borrow::Value as JsonValue; use siphasher::sip::SipHasher; pub trait RoutingExprContext { @@ -46,7 +46,7 @@ fn hash_json_val(json_val: &JsonValue, hasher: &mut H) { hasher.write_u8(2u8); num.hash(hasher); } - JsonValue::String(s) => { + JsonValue::Str(s) => { hasher.write_u8(3u8); hasher.write_u64(s.len() as u64); hasher.write(s.as_bytes()); @@ -70,7 +70,7 @@ fn hash_json_val(json_val: &JsonValue, hasher: &mut H) { } } -fn find_value<'a>(mut root: &'a JsonValue, keys: &[String]) -> Option<&'a JsonValue> { +fn find_value<'a>(mut root: &'a JsonValue<'a>, keys: &[String]) -> Option<&'a JsonValue<'a>> { for key in keys { match root { JsonValue::Object(obj) => { @@ -82,20 +82,19 @@ fn find_value<'a>(mut root: &'a JsonValue, keys: &[String]) -> Option<&'a JsonVa Some(root) } -fn find_value_in_map<'a>( - obj: &'a serde_json::Map, - keys: &[String], -) -> Option<&'a JsonValue> { +fn find_value_in_map<'a>(obj: &'a JsonValue<'a>, keys: &[String]) -> Option<&'a JsonValue<'a>> { // we can't have an empty path and this is used only for the root map, so there is no risk of // out of bound - if let Some(value) = obj.get(&keys[0]) { - find_value(value, &keys[1..]) - } else { - None - } + obj.as_object().and_then(|obj| { + if let Some(value) = obj.get(&keys[0]) { + find_value(value, &keys[1..]) + } else { + None + } + }) } -impl RoutingExprContext for serde_json::Map { +impl<'a> RoutingExprContext for JsonValue<'a> { fn hash_attribute(&self, attr_name: &[String], hasher: &mut H) { if let Some(json_val) = find_value_in_map(self, attr_name) { hasher.write_u8(1u8); @@ -514,7 +513,7 @@ mod tests { #[test] fn test_routing_expr_empty_hashes_to_0() { let expr = RoutingExpr::new("").unwrap(); - let ctx: serde_json::Map = Default::default(); + let ctx: JsonValue = Default::default(); assert_eq!(expr.eval_hash(&ctx), 0u64); } @@ -638,7 +637,7 @@ mod tests { .collect(); assert_eq!(keys, vec![String::from("tenant.id")]); let value = find_value(&ctx, &keys).unwrap(); - assert_eq!(value, &JsonValue::String(String::from("happy"))); + assert_eq!(value, &JsonValue::Str("happy".into())); } #[test] @@ -654,7 +653,7 @@ mod tests { .collect(); assert_eq!(keys, vec!["app", "id"]); let value = find_value(&ctx, &keys).unwrap(); - assert_eq!(value, &JsonValue::String(String::from("123"))); + assert_eq!(value, &JsonValue::Str(("123").into())); } // This unit test is here to ensure that the routing expr hash depends on // the expression itself as well as the expression value. @@ -662,10 +661,9 @@ mod tests { fn test_routing_expr_depends_on_both_expr_and_value() { let routing_expr = RoutingExpr::new("tenant_id").unwrap(); let routing_expr2 = RoutingExpr::new("app").unwrap(); - let ctx: serde_json::Map = + let ctx: JsonValue = serde_json::from_str(r#"{"tenant_id": "happy", "app": "happy"}"#).unwrap(); - let ctx2: serde_json::Map = - serde_json::from_str(r#"{"tenant_id": "happy2"}"#).unwrap(); + let ctx2: JsonValue = serde_json::from_str(r#"{"tenant_id": "happy2"}"#).unwrap(); // This assert is important. assert_ne!(routing_expr.eval_hash(&ctx), routing_expr2.eval_hash(&ctx),); assert_ne!(routing_expr.eval_hash(&ctx), routing_expr.eval_hash(&ctx2),); @@ -676,7 +674,7 @@ mod tests { #[test] fn test_routing_expr_change_detection() { let routing_expr = RoutingExpr::new("tenant_id").unwrap(); - let ctx: serde_json::Map = + let ctx: JsonValue = serde_json::from_str(r#"{"tenant_id": "happy-tenant", "app": "happy"}"#).unwrap(); assert_eq!(routing_expr.eval_hash(&ctx), 13914409176935416182); } @@ -684,7 +682,7 @@ mod tests { #[test] fn test_routing_expr_missing_value_does_not_panic() { let routing_expr = RoutingExpr::new("tenant_id").unwrap(); - let ctx: serde_json::Map = Default::default(); + let ctx: JsonValue = Default::default(); assert_eq!(routing_expr.eval_hash(&ctx), 12482849403534986143); } @@ -694,8 +692,8 @@ mod tests { let routing_expr = RoutingExpr::new("hash_mod(tenant_id, 10)").unwrap(); for i in 0..1000 { - let ctx: serde_json::Map = - serde_json::from_str(&format!(r#"{{"tenant_id": "happy{i}"}}"#)).unwrap(); + let json = format!(r#"{{"tenant_id": "happy{i}"}}"#); + let ctx: JsonValue = serde_json::from_str(&json).unwrap(); seen.insert(routing_expr.eval_hash(&ctx)); } diff --git a/quickwit/quickwit-indexing/Cargo.toml b/quickwit/quickwit-indexing/Cargo.toml index eccfb4f1aa5..63c8185cab8 100644 --- a/quickwit/quickwit-indexing/Cargo.toml +++ b/quickwit/quickwit-indexing/Cargo.toml @@ -37,6 +37,7 @@ quickwit-query = { workspace = true } rdkafka = { workspace = true, optional = true } serde = { workspace = true } serde_json = { workspace = true } +serde_json_borrow = { workspace = true } tantivy = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } diff --git a/quickwit/quickwit-indexing/src/actors/doc_processor.rs b/quickwit/quickwit-indexing/src/actors/doc_processor.rs index 88fa6322209..7d0abd92a42 100644 --- a/quickwit/quickwit-indexing/src/actors/doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/doc_processor.rs @@ -36,7 +36,7 @@ use quickwit_opentelemetry::otlp::{ use quickwit_proto::types::{IndexId, SourceId}; use serde::Serialize; use serde_json::Value as JsonValue; -use tantivy::schema::Field; +use tantivy::schema::{Field, Value}; use tantivy::{DateTime, TantivyDocument}; use thiserror::Error; use tokio::runtime::Handle; @@ -68,7 +68,14 @@ impl JsonDoc { num_bytes: usize, ) -> Result { match json_value { - JsonValue::Object(json_obj) => Ok(Self::new(json_obj, num_bytes)), + JsonValue::Object(json_obj) => { + // TODO: replace with deserializing directly from serde_json + let json_obj = serde_json_borrow::OwnedValue::parse_from( + serde_json::to_string(&json_obj).unwrap(), + ) + .unwrap(); + Ok(Self::new(json_obj, num_bytes)) + } _ => Err(DocProcessorError::JsonParsing( "document is not an object".to_string(), )), @@ -160,7 +167,19 @@ fn try_into_json_docs( ) -> JsonDocIterator { match input_format { SourceInputFormat::Json => { - let json_doc_result = serde_json::from_slice::(&raw_doc) + let json_doc_result = String::from_utf8(raw_doc.to_vec()) + .map_err(|_| { + DocParsingError::NotJsonObject( + "document contains some invalid UTF-8 characters".to_string(), + ) + }) + .and_then(|json_str| { + serde_json_borrow::OwnedValue::parse_from(json_str).map_err(|_| { + DocParsingError::NotJsonObject( + "document is not a valid JSON object".to_string(), + ) + }) + }) .map(|json_obj| JsonDoc::new(json_obj, num_bytes)); JsonDocIterator::from(json_doc_result) } @@ -185,6 +204,12 @@ fn try_into_json_docs( let mut json_obj = serde_json::Map::with_capacity(1); let key = PLAIN_TEXT.to_string(); json_obj.insert(key, JsonValue::String(value)); + // TODO: replace with deserializing directly from serde_json + let json_obj = serde_json_borrow::OwnedValue::parse_from( + serde_json::to_string(&json_obj).unwrap(), + ) + .unwrap(); + JsonDoc::new(json_obj, num_bytes) }); JsonDocIterator::from(json_doc_result) @@ -422,7 +447,7 @@ impl DocProcessor { }; let timestamp = doc .get_first(timestamp_field) - .and_then(|val| val.as_datetime()) + .and_then(|val| val.as_value().as_datetime()) .ok_or(DocProcessorError::from(DocParsingError::RequiredField( "timestamp field is required".to_string(), )))?; diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index 928020c84bb..758e6334021 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -1337,7 +1337,7 @@ mod tests { fn doc_to_json( &self, _named_doc: std::collections::BTreeMap>, - ) -> anyhow::Result { + ) -> anyhow::Result> { unimplemented!() } fn schema(&self) -> tantivy::schema::Schema { diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index 11a481faf22..2873a5d2c41 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -185,10 +185,10 @@ async fn fetch_docs_in_split( .context("open-index-for-split")?; // we add an executor here, we could add it in open_index_with_caches, though we should verify // the side-effect before - //let tantivy_executor = crate::search_thread_pool() - //.get_underlying_rayon_thread_pool() - //.into(); - //index.set_executor(tantivy_executor); + let tantivy_executor = crate::search_thread_pool() + .get_underlying_rayon_thread_pool() + .into(); + index.set_executor(tantivy_executor); let index_reader = index .reader_builder() // the docs are presorted so a cache size of NUM_CONCURRENT_REQUESTS is fine