Skip to content

Commit

Permalink
Support json FAST and object fields
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Dec 17, 2024
1 parent 28e7ac6 commit 7be4105
Show file tree
Hide file tree
Showing 17 changed files with 376 additions and 201 deletions.
148 changes: 79 additions & 69 deletions quickwit/Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ quickwit-serve = { path = "quickwit-serve" }
quickwit-storage = { path = "quickwit-storage" }
quickwit-telemetry = { path = "quickwit-telemetry" }

tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "2f2db16", default-features = false, features = [
tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "1a25b6125", default-features = false, features = [
"lz4-compression",
"mmap",
"quickwit",
Expand Down
46 changes: 35 additions & 11 deletions quickwit/quickwit-doc-mapper/src/doc_mapper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,24 @@ pub struct TermRange {
pub limit: Option<u64>,
}

/// Description of how a fast field should be warmed up
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct FastFieldWarmupInfo {
/// Name of the fast field
pub name: String,
/// Whether subfields should also be loaded for warmup
pub with_subfields: bool,
}

/// Information about what a DocMapper think should be warmed up before
/// running the query.
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct WarmupInfo {
/// Name of fields from the term dictionary and posting list which needs to
/// be entirely loaded
pub term_dict_fields: HashSet<Field>,
/// Name of fast fields which needs to be loaded
pub fast_field_names: HashSet<String>,
/// Fast fields which needs to be loaded
pub fast_fields: HashSet<FastFieldWarmupInfo>,
/// Whether to warmup field norms. Used mostly for scoring.
pub field_norms: bool,
/// Terms to warmup, and whether their position is needed too.
Expand All @@ -106,9 +115,18 @@ impl WarmupInfo {
/// Merge other WarmupInfo into self.
pub fn merge(&mut self, other: WarmupInfo) {
self.term_dict_fields.extend(other.term_dict_fields);
self.fast_field_names.extend(other.fast_field_names);
self.field_norms |= other.field_norms;

for fast_field_warmup_info in other.fast_fields.into_iter() {
// avoid overwriting with a less demanding warmup
if !self.fast_fields.contains(&FastFieldWarmupInfo {
name: fast_field_warmup_info.name.clone(),
with_subfields: true,
}) {
self.fast_fields.insert(fast_field_warmup_info);
}
}

for (field, term_and_pos) in other.terms_grouped_by_field.into_iter() {
let sub_map = self.terms_grouped_by_field.entry(field).or_default();

Expand Down Expand Up @@ -571,8 +589,14 @@ mod tests {
}
}

fn hashset(elements: &[&str]) -> HashSet<String> {
elements.iter().map(|elem| elem.to_string()).collect()
fn hashset_fast(elements: &[&str]) -> HashSet<FastFieldWarmupInfo> {
elements
.iter()
.map(|elem| FastFieldWarmupInfo {
name: elem.to_string(),
with_subfields: false,
})
.collect()
}

fn hashset_field(elements: &[u32]) -> HashSet<Field> {
Expand Down Expand Up @@ -617,7 +641,7 @@ mod tests {
fn test_warmup_info_merge() {
let wi_base = WarmupInfo {
term_dict_fields: hashset_field(&[1, 2]),
fast_field_names: hashset(&["fast1", "fast2"]),
fast_fields: hashset_fast(&["fast1", "fast2"]),
field_norms: false,
terms_grouped_by_field: hashmap(&[(1, "term1", false), (1, "term2", false)]),
term_ranges_grouped_by_field: hashmap_ranges(&[
Expand All @@ -634,7 +658,7 @@ mod tests {
let mut wi_base = wi_base;
let wi_2 = WarmupInfo {
term_dict_fields: hashset_field(&[2, 3]),
fast_field_names: hashset(&["fast2", "fast3"]),
fast_fields: hashset_fast(&["fast2", "fast3"]),
field_norms: true,
terms_grouped_by_field: hashmap(&[(2, "term1", false), (1, "term2", true)]),
term_ranges_grouped_by_field: hashmap_ranges(&[
Expand All @@ -646,8 +670,8 @@ mod tests {

assert_eq!(wi_base.term_dict_fields, hashset_field(&[1, 2, 3]));
assert_eq!(
wi_base.fast_field_names,
hashset(&["fast1", "fast2", "fast3"])
wi_base.fast_fields,
hashset_fast(&["fast1", "fast2", "fast3"])
);
assert!(wi_base.field_norms);

Expand Down Expand Up @@ -698,7 +722,7 @@ mod tests {
fn test_warmup_info_simplify() {
let mut warmup_info = WarmupInfo {
term_dict_fields: hashset_field(&[1]),
fast_field_names: hashset(&["fast1", "fast2"]),
fast_fields: hashset_fast(&["fast1", "fast2"]),
field_norms: false,
terms_grouped_by_field: hashmap(&[
(1, "term1", false),
Expand All @@ -713,7 +737,7 @@ mod tests {
};
let expected = WarmupInfo {
term_dict_fields: hashset_field(&[1]),
fast_field_names: hashset(&["fast1", "fast2"]),
fast_fields: hashset_fast(&["fast1", "fast2"]),
field_norms: false,
terms_grouped_by_field: hashmap(&[(1, "term2", true), (2, "term3", false)]),
term_ranges_grouped_by_field: hashmap_ranges(&[
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-doc-mapper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ mod routing_expression;
pub mod tag_pruning;

pub use doc_mapper::{
analyze_text, BinaryFormat, DocMapper, DocMapperBuilder, FieldMappingEntry, FieldMappingType,
JsonObject, NamedField, QuickwitBytesOptions, QuickwitJsonOptions, TermRange, TokenizerConfig,
TokenizerEntry, WarmupInfo,
analyze_text, BinaryFormat, DocMapper, DocMapperBuilder, FastFieldWarmupInfo,
FieldMappingEntry, FieldMappingType, JsonObject, NamedField, QuickwitBytesOptions,
QuickwitJsonOptions, TermRange, TokenizerConfig, TokenizerEntry, WarmupInfo,
};
use doc_mapper::{
FastFieldOptions, FieldMappingEntryForSerialization, IndexRecordOptionSchema,
Expand Down
100 changes: 69 additions & 31 deletions quickwit/quickwit-doc-mapper/src/query_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use quickwit_query::{find_field_or_hit_dynamic, InvalidQuery};
use tantivy::query::Query;
use tantivy::schema::{Field, Schema};
use tantivy::Term;
use tracing::error;

use crate::doc_mapper::FastFieldWarmupInfo;
use crate::{QueryParserError, TermRange, WarmupInfo};

#[derive(Default)]
Expand All @@ -48,22 +50,37 @@ impl<'a> QueryAstVisitor<'a> for RangeQueryFields {
}
}

#[derive(Default)]
struct ExistsQueryFields {
exists_query_field_names: HashSet<String>,
struct ExistsQueryFastFields {
fields: HashSet<FastFieldWarmupInfo>,
schema: Schema,
}

impl<'a> QueryAstVisitor<'a> for ExistsQueryFields {
impl<'a> QueryAstVisitor<'a> for ExistsQueryFastFields {
type Err = Infallible;

fn visit_exists(&mut self, exists_query: &'a FieldPresenceQuery) -> Result<(), Infallible> {
// If the field is a fast field, we will rely on the `ColumnIndex`.
// If the field is not a fast, we will rely on the field presence field.
//
// After all field names are collected they are checked against schema and
// non-fast fields are removed from warmup operation.
self.exists_query_field_names
.insert(exists_query.field.to_string());
let fields = exists_query.find_field_and_subfields(&self.schema);
for (_, field_entry, path) in fields {
if field_entry.is_fast() {
if field_entry.field_type().is_json() {
let full_path = format!("{}.{}", field_entry.name(), path);
self.fields.insert(FastFieldWarmupInfo {
name: full_path,
with_subfields: true,
});
} else if path.is_empty() {
self.fields.insert(FastFieldWarmupInfo {
name: field_entry.name().to_string(),
with_subfields: false,
});
} else {
error!(
field_entry = field_entry.name(),
path, "only JSON type supports subfields"
);
}
}
}
Ok(())
}
}
Expand All @@ -80,18 +97,24 @@ pub(crate) fn build_query(
// This cannot fail. The error type is Infallible.
let _: Result<(), Infallible> = range_query_fields.visit(query_ast);

let mut exists_query_fields = ExistsQueryFields::default();
let mut exists_query_fields = ExistsQueryFastFields {
fields: HashSet::new(),
schema: schema.clone(),
};
// This cannot fail. The error type is Infallible.
let _: Result<(), Infallible> = exists_query_fields.visit(query_ast);

let mut fast_field_names = HashSet::new();
fast_field_names.extend(range_query_fields.range_query_field_names);
fast_field_names.extend(
exists_query_fields
.exists_query_field_names
let range_query_fast_fields =
range_query_fields
.range_query_field_names
.into_iter()
.filter(|field| is_fast_field(&schema, field)),
);
.map(|name| FastFieldWarmupInfo {
name,
with_subfields: false,
});
fast_field_names.extend(range_query_fast_fields);
fast_field_names.extend(exists_query_fields.fields.into_iter());

let query = query_ast.build_tantivy_query(
&schema,
Expand All @@ -118,20 +141,13 @@ pub(crate) fn build_query(
term_dict_fields: term_set_query_fields,
terms_grouped_by_field,
term_ranges_grouped_by_field,
fast_field_names,
fast_fields: fast_field_names,
..WarmupInfo::default()
};

Ok((query, warmup_info))
}

fn is_fast_field(schema: &Schema, field_name: &str) -> bool {
if let Ok((_field, field_entry, _path)) = find_field_or_hit_dynamic(field_name, schema) {
return field_entry.is_fast();
}
false
}

struct ExtractTermSetFields<'a> {
term_dict_fields_to_warm_up: HashSet<Field>,
schema: &'a Schema,
Expand All @@ -151,7 +167,8 @@ impl<'a> QueryAstVisitor<'a> for ExtractTermSetFields<'_> {

fn visit_term_set(&mut self, term_set_query: &'a TermSetQuery) -> anyhow::Result<()> {
for field in term_set_query.terms_per_field.keys() {
if let Ok((field, _field_entry, _path)) = find_field_or_hit_dynamic(field, self.schema)
if let Some((field, _field_entry, _path)) =
find_field_or_hit_dynamic(field, self.schema)
{
self.term_dict_fields_to_warm_up.insert(field);
} else {
Expand Down Expand Up @@ -574,20 +591,41 @@ mod test {
check_build_query_static_mode(
"ip:*",
Vec::new(),
TestExpectation::Ok("ExistsQuery { field_name: \"ip\" }"),
TestExpectation::Ok("ExistsQuery { field_name: \"ip\", json_subpaths: true }"),
);

check_build_query_static_mode(
"json_text:*",
Vec::new(),
TestExpectation::Ok("TermQuery(Term(field=0, type=U64"),
);

check_build_query_static_mode(
"json_fast:*",
Vec::new(),
TestExpectation::Ok("ExistsQuery { field_name: \"json_fast\" }"),
TestExpectation::Ok("ExistsQuery { field_name: \"json_fast\", json_subpaths: true }"),
);
check_build_query_static_mode(
"foo:*",
Vec::new(),
TestExpectation::Err("invalid query: field does not exist: `foo`"),
);
check_build_query_static_mode(
"server:*",
Vec::new(),
TestExpectation::Ok("?????????????????"),
);

// V currently working V
// check_build_query_static_mode(
// "server:*",
// Vec::new(),
// TestExpectation::Err("invalid query: field does not exist: `server`"),
// );
// check_build_query_dynamic_mode(
// "server:*",
// Vec::new(),
// // dynamic_mapping is set to TEXT here, this would be an ExistsQuery if it was FAST
// TestExpectation::Ok("TermQuery(Term(field=0, type=U64"),
// );
}

#[test]
Expand Down
Loading

0 comments on commit 7be4105

Please sign in to comment.