Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

collect json paths in indexing #2231

Merged
merged 2 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ pub fn u64_to_f64(val: u64) -> f64 {
///
/// This function assumes that the needle is rarely contained in the bytes string
/// and offers a fast path if the needle is not present.
#[inline]
pub fn replace_in_place(needle: u8, replacement: u8, bytes: &mut [u8]) {
if !bytes.contains(&needle) {
return;
Expand Down
100 changes: 73 additions & 27 deletions src/core/json_utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use columnar::MonotonicallyMappableToU64;
use common::{replace_in_place, JsonPathWriter};
use murmurhash32::murmurhash2;
use rustc_hash::FxHashMap;

use crate::fastfield::FastValue;
Expand Down Expand Up @@ -58,31 +57,33 @@ struct IndexingPositionsPerPath {
}

impl IndexingPositionsPerPath {
fn get_position(&mut self, term: &Term) -> &mut IndexingPosition {
self.positions_per_path
.entry(murmurhash2(term.serialized_term()))
.or_default()
fn get_position_from_id(&mut self, id: u32) -> &mut IndexingPosition {
self.positions_per_path.entry(id).or_default()
}
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn index_json_values<'a, V: Value<'a>>(
doc: DocId,
json_visitors: impl Iterator<Item = crate::Result<V::ObjectIter>>,
text_analyzer: &mut TextAnalyzer,
expand_dots_enabled: bool,
term_buffer: &mut Term,
postings_writer: &mut dyn PostingsWriter,
json_path_writer: &mut JsonPathWriter,
ctx: &mut IndexingContext,
) -> crate::Result<()> {
let mut json_term_writer = JsonTermWriter::wrap(term_buffer, expand_dots_enabled);
json_path_writer.clear();
json_path_writer.set_expand_dots(expand_dots_enabled);
let mut positions_per_path: IndexingPositionsPerPath = Default::default();
for json_visitor_res in json_visitors {
let json_visitor = json_visitor_res?;
index_json_object::<V>(
doc,
json_visitor,
text_analyzer,
&mut json_term_writer,
term_buffer,
json_path_writer,
postings_writer,
ctx,
&mut positions_per_path,
Expand All @@ -91,75 +92,117 @@ pub(crate) fn index_json_values<'a, V: Value<'a>>(
Ok(())
}

#[allow(clippy::too_many_arguments)]
fn index_json_object<'a, V: Value<'a>>(
doc: DocId,
json_visitor: V::ObjectIter,
text_analyzer: &mut TextAnalyzer,
json_term_writer: &mut JsonTermWriter,
term_buffer: &mut Term,
json_path_writer: &mut JsonPathWriter,
postings_writer: &mut dyn PostingsWriter,
ctx: &mut IndexingContext,
positions_per_path: &mut IndexingPositionsPerPath,
) {
for (json_path_segment, json_value_visitor) in json_visitor {
json_term_writer.push_path_segment(json_path_segment);
json_path_writer.push(json_path_segment);
index_json_value(
doc,
json_value_visitor,
text_analyzer,
json_term_writer,
term_buffer,
json_path_writer,
postings_writer,
ctx,
positions_per_path,
);
json_term_writer.pop_path_segment();
json_path_writer.pop();
}
}

#[allow(clippy::too_many_arguments)]
fn index_json_value<'a, V: Value<'a>>(
doc: DocId,
json_value: V,
text_analyzer: &mut TextAnalyzer,
json_term_writer: &mut JsonTermWriter,
term_buffer: &mut Term,
json_path_writer: &mut JsonPathWriter,
postings_writer: &mut dyn PostingsWriter,
ctx: &mut IndexingContext,
positions_per_path: &mut IndexingPositionsPerPath,
) {
let set_path_id = |term_buffer: &mut Term, unordered_id: u32| {
term_buffer.truncate_value_bytes(0);
term_buffer.append_bytes(&unordered_id.to_be_bytes());
};
let set_type = |term_buffer: &mut Term, typ: Type| {
term_buffer.append_bytes(&[typ.to_code()]);
};

match json_value.as_value() {
ReferenceValue::Leaf(leaf) => match leaf {
ReferenceValueLeaf::Null => {}
ReferenceValueLeaf::Str(val) => {
let mut token_stream = text_analyzer.token_stream(val);
let unordered_id = ctx
.path_to_unordered_id
.get_or_allocate_unordered_id(json_path_writer.as_str());

// TODO: make sure the chain position works out.
json_term_writer.close_path_and_set_type(Type::Str);
let indexing_position = positions_per_path.get_position(json_term_writer.term());
set_path_id(term_buffer, unordered_id);
set_type(term_buffer, Type::Str);
let indexing_position = positions_per_path.get_position_from_id(unordered_id);
postings_writer.index_text(
doc,
&mut *token_stream,
json_term_writer.term_buffer,
term_buffer,
ctx,
indexing_position,
);
}
ReferenceValueLeaf::U64(val) => {
json_term_writer.set_fast_value(val);
postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx);
set_path_id(
term_buffer,
ctx.path_to_unordered_id
.get_or_allocate_unordered_id(json_path_writer.as_str()),
);
term_buffer.append_type_and_fast_value(val);
postings_writer.subscribe(doc, 0u32, term_buffer, ctx);
}
ReferenceValueLeaf::I64(val) => {
json_term_writer.set_fast_value(val);
postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx);
set_path_id(
term_buffer,
ctx.path_to_unordered_id
.get_or_allocate_unordered_id(json_path_writer.as_str()),
);
term_buffer.append_type_and_fast_value(val);
postings_writer.subscribe(doc, 0u32, term_buffer, ctx);
}
ReferenceValueLeaf::F64(val) => {
json_term_writer.set_fast_value(val);
postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx);
set_path_id(
term_buffer,
ctx.path_to_unordered_id
.get_or_allocate_unordered_id(json_path_writer.as_str()),
);
term_buffer.append_type_and_fast_value(val);
postings_writer.subscribe(doc, 0u32, term_buffer, ctx);
}
ReferenceValueLeaf::Bool(val) => {
json_term_writer.set_fast_value(val);
postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx);
set_path_id(
term_buffer,
ctx.path_to_unordered_id
.get_or_allocate_unordered_id(json_path_writer.as_str()),
);
term_buffer.append_type_and_fast_value(val);
postings_writer.subscribe(doc, 0u32, term_buffer, ctx);
}
ReferenceValueLeaf::Date(val) => {
json_term_writer.set_fast_value(val);
postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx);
set_path_id(
term_buffer,
ctx.path_to_unordered_id
.get_or_allocate_unordered_id(json_path_writer.as_str()),
);
term_buffer.append_type_and_fast_value(val);
postings_writer.subscribe(doc, 0u32, term_buffer, ctx);
}
ReferenceValueLeaf::PreTokStr(_) => {
unimplemented!(
Expand All @@ -182,7 +225,8 @@ fn index_json_value<'a, V: Value<'a>>(
doc,
val,
text_analyzer,
json_term_writer,
term_buffer,
json_path_writer,
postings_writer,
ctx,
positions_per_path,
Expand All @@ -194,7 +238,8 @@ fn index_json_value<'a, V: Value<'a>>(
doc,
object,
text_analyzer,
json_term_writer,
term_buffer,
json_path_writer,
postings_writer,
ctx,
positions_per_path,
Expand Down Expand Up @@ -361,6 +406,7 @@ impl<'a> JsonTermWriter<'a> {
self.term_buffer.append_bytes(&[typ.to_code()]);
}

// TODO: Remove this function and use JsonPathWriter instead.
pub fn push_path_segment(&mut self, segment: &str) {
// the path stack should never be empty.
self.trim_to_end_of_path();
Expand Down
1 change: 1 addition & 0 deletions src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! [`Index::writer`](crate::Index::writer).

pub(crate) mod delete_queue;
pub(crate) mod path_to_unordered_id;

pub(crate) mod doc_id_mapping;
mod doc_opstamp_mapping;
Expand Down
92 changes: 92 additions & 0 deletions src/indexer/path_to_unordered_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use fnv::FnvHashMap;

/// `Field` is represented by an unsigned 32-bit integer type.
/// The schema holds the mapping between field names and `Field` objects.
#[derive(Copy, Default, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash)]
pub struct OrderedPathId(u32);

impl OrderedPathId {
/// Create a new field object for the given PathId.
pub const fn from_ordered_id(field_id: u32) -> OrderedPathId {
OrderedPathId(field_id)
}

/// Returns a u32 identifying uniquely a path within a schema.
pub const fn path_id(self) -> u32 {
self.0
}
}
impl From<u32> for OrderedPathId {
fn from(id: u32) -> Self {
Self(id)
}
}

#[derive(Default)]
pub(crate) struct PathToUnorderedId {
map: FnvHashMap<String, u32>,
}

impl PathToUnorderedId {
#[inline]
pub(crate) fn get_or_allocate_unordered_id(&mut self, path: &str) -> u32 {
if let Some(id) = self.map.get(path) {
return *id;
}
self.insert_new_path(path)
}
#[cold]
fn insert_new_path(&mut self, path: &str) -> u32 {
let next_id = self.map.len() as u32;
self.map.insert(path.to_string(), next_id);
next_id
}

/// Retuns ids which reflect the lexical order of the paths.
///
/// The returned vec can be indexed with the unordered id to get the ordered id.
pub(crate) fn unordered_id_to_ordered_id(&self) -> Vec<OrderedPathId> {
let mut sorted_ids: Vec<(&str, &u32)> =
self.map.iter().map(|(k, v)| (k.as_str(), v)).collect();
sorted_ids.sort_unstable_by_key(|(path, _)| *path);
let mut result = vec![OrderedPathId::default(); sorted_ids.len()];
for (ordered, unordered) in sorted_ids.iter().map(|(_k, v)| v).enumerate() {
result[**unordered as usize] = OrderedPathId::from_ordered_id(ordered as u32);
}
result
}

/// Retuns the paths so they can be queried by the ordered id (which is the index).
pub(crate) fn ordered_id_to_path(&self) -> Vec<&str> {
let mut paths = self.map.keys().map(String::as_str).collect::<Vec<_>>();
paths.sort_unstable();
paths
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn path_to_unordered_test() {
let mut path_to_id = PathToUnorderedId::default();
let terms = vec!["b", "a", "b", "c"];
let ids = terms
.iter()
.map(|term| path_to_id.get_or_allocate_unordered_id(term))
.collect::<Vec<u32>>();
assert_eq!(ids, vec![0, 1, 0, 2]);
let ordered_ids = ids
.iter()
.map(|id| path_to_id.unordered_id_to_ordered_id()[*id as usize])
.collect::<Vec<OrderedPathId>>();
assert_eq!(ordered_ids, vec![1.into(), 0.into(), 1.into(), 2.into()]);
// Fetch terms
let terms_fetched = ordered_ids
.iter()
.map(|id| path_to_id.ordered_id_to_path()[id.path_id() as usize])
.collect::<Vec<&str>>();
assert_eq!(terms_fetched, terms);
}
}
Loading
Loading