From a5dacd09f4738c6be6e63382bc8c7691c636d5f8 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Fri, 27 Oct 2023 14:19:04 +0800 Subject: [PATCH] collect json paths in indexing --- common/src/lib.rs | 1 + src/core/json_utils.rs | 93 ++++++++++++++++++++-------- src/indexer/mod.rs | 1 + src/indexer/path_to_unordered_id.rs | 69 +++++++++++++++++++++ src/indexer/segment_writer.rs | 48 +++++++++++++- src/postings/indexing_context.rs | 4 ++ src/postings/json_postings_writer.rs | 16 +++-- src/postings/postings_writer.rs | 37 +++++++++-- src/schema/field_type.rs | 2 + src/schema/term.rs | 14 ++++- stacker/src/arena_hashmap.rs | 25 +++++++- stacker/src/memory_arena.rs | 9 +++ 12 files changed, 279 insertions(+), 40 deletions(-) create mode 100644 src/indexer/path_to_unordered_id.rs diff --git a/common/src/lib.rs b/common/src/lib.rs index 9dcdc5a46e..054378ee55 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -118,6 +118,7 @@ pub fn u64_to_f64(val: u64) -> f64 { /// /// This function assumes that the needle is rarely contained in the bytes string /// and offers a fast path if the needle is not present. +#[inline] pub fn replace_in_place(needle: u8, replacement: u8, bytes: &mut [u8]) { if !bytes.contains(&needle) { return; diff --git a/src/core/json_utils.rs b/src/core/json_utils.rs index ae8db931ab..108e53fdde 100644 --- a/src/core/json_utils.rs +++ b/src/core/json_utils.rs @@ -1,6 +1,5 @@ use columnar::MonotonicallyMappableToU64; use common::{replace_in_place, JsonPathWriter}; -use murmurhash32::murmurhash2; use rustc_hash::FxHashMap; use crate::fastfield::FastValue; @@ -58,13 +57,12 @@ struct IndexingPositionsPerPath { } impl IndexingPositionsPerPath { - fn get_position(&mut self, term: &Term) -> &mut IndexingPosition { - self.positions_per_path - .entry(murmurhash2(term.serialized_term())) - .or_default() + fn get_position_from_id(&mut self, id: u32) -> &mut IndexingPosition { + self.positions_per_path.entry(id).or_default() } } +#[allow(clippy::too_many_arguments)] pub(crate) fn index_json_values<'a, V: Value<'a>>( doc: DocId, json_visitors: impl Iterator>, @@ -72,9 +70,11 @@ pub(crate) fn index_json_values<'a, V: Value<'a>>( expand_dots_enabled: bool, term_buffer: &mut Term, postings_writer: &mut dyn PostingsWriter, + json_path_writer: &mut JsonPathWriter, ctx: &mut IndexingContext, ) -> crate::Result<()> { - let mut json_term_writer = JsonTermWriter::wrap(term_buffer, expand_dots_enabled); + json_path_writer.clear(); + json_path_writer.set_expand_dots(expand_dots_enabled); let mut positions_per_path: IndexingPositionsPerPath = Default::default(); for json_visitor_res in json_visitors { let json_visitor = json_visitor_res?; @@ -82,7 +82,8 @@ pub(crate) fn index_json_values<'a, V: Value<'a>>( doc, json_visitor, text_analyzer, - &mut json_term_writer, + term_buffer, + json_path_writer, postings_writer, ctx, &mut positions_per_path, @@ -91,75 +92,110 @@ pub(crate) fn index_json_values<'a, V: Value<'a>>( Ok(()) } +#[allow(clippy::too_many_arguments)] fn index_json_object<'a, V: Value<'a>>( doc: DocId, json_visitor: V::ObjectIter, text_analyzer: &mut TextAnalyzer, - json_term_writer: &mut JsonTermWriter, + term_buffer: &mut Term, + json_path_writer: &mut JsonPathWriter, postings_writer: &mut dyn PostingsWriter, ctx: &mut IndexingContext, positions_per_path: &mut IndexingPositionsPerPath, ) { for (json_path_segment, json_value_visitor) in json_visitor { - json_term_writer.push_path_segment(json_path_segment); + json_path_writer.push(json_path_segment); index_json_value( doc, json_value_visitor, text_analyzer, - json_term_writer, + term_buffer, + json_path_writer, postings_writer, ctx, positions_per_path, ); - json_term_writer.pop_path_segment(); + json_path_writer.pop(); } } +#[allow(clippy::too_many_arguments)] fn index_json_value<'a, V: Value<'a>>( doc: DocId, json_value: V, text_analyzer: &mut TextAnalyzer, - json_term_writer: &mut JsonTermWriter, + term_buffer: &mut Term, + json_path_writer: &mut JsonPathWriter, postings_writer: &mut dyn PostingsWriter, ctx: &mut IndexingContext, positions_per_path: &mut IndexingPositionsPerPath, ) { + let set_path_id = |term_buffer: &mut Term, unordered_id: u32| { + term_buffer.truncate_value_bytes(0); + term_buffer.append_bytes(&unordered_id.to_be_bytes()); + }; + let set_type = |term_buffer: &mut Term, typ: Type| { + term_buffer.append_bytes(&[typ.to_code()]); + }; + match json_value.as_value() { ReferenceValue::Leaf(leaf) => match leaf { ReferenceValueLeaf::Null => {} ReferenceValueLeaf::Str(val) => { let mut token_stream = text_analyzer.token_stream(val); + let unordered_id = ctx.path_to_unordered_id.get_id(json_path_writer.as_str()); // TODO: make sure the chain position works out. - json_term_writer.close_path_and_set_type(Type::Str); - let indexing_position = positions_per_path.get_position(json_term_writer.term()); + set_path_id(term_buffer, unordered_id); + set_type(term_buffer, Type::Str); + let indexing_position = positions_per_path.get_position_from_id(unordered_id); postings_writer.index_text( doc, &mut *token_stream, - json_term_writer.term_buffer, + term_buffer, ctx, indexing_position, ); } ReferenceValueLeaf::U64(val) => { - json_term_writer.set_fast_value(val); - postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx); + set_path_id( + term_buffer, + ctx.path_to_unordered_id.get_id(json_path_writer.as_str()), + ); + term_buffer.append_type_and_fast_value(val); + postings_writer.subscribe(doc, 0u32, term_buffer, ctx); } ReferenceValueLeaf::I64(val) => { - json_term_writer.set_fast_value(val); - postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx); + set_path_id( + term_buffer, + ctx.path_to_unordered_id.get_id(json_path_writer.as_str()), + ); + term_buffer.append_type_and_fast_value(val); + postings_writer.subscribe(doc, 0u32, term_buffer, ctx); } ReferenceValueLeaf::F64(val) => { - json_term_writer.set_fast_value(val); - postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx); + set_path_id( + term_buffer, + ctx.path_to_unordered_id.get_id(json_path_writer.as_str()), + ); + term_buffer.append_type_and_fast_value(val); + postings_writer.subscribe(doc, 0u32, term_buffer, ctx); } ReferenceValueLeaf::Bool(val) => { - json_term_writer.set_fast_value(val); - postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx); + set_path_id( + term_buffer, + ctx.path_to_unordered_id.get_id(json_path_writer.as_str()), + ); + term_buffer.append_type_and_fast_value(val); + postings_writer.subscribe(doc, 0u32, term_buffer, ctx); } ReferenceValueLeaf::Date(val) => { - json_term_writer.set_fast_value(val); - postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx); + set_path_id( + term_buffer, + ctx.path_to_unordered_id.get_id(json_path_writer.as_str()), + ); + term_buffer.append_type_and_fast_value(val); + postings_writer.subscribe(doc, 0u32, term_buffer, ctx); } ReferenceValueLeaf::PreTokStr(_) => { unimplemented!( @@ -182,7 +218,8 @@ fn index_json_value<'a, V: Value<'a>>( doc, val, text_analyzer, - json_term_writer, + term_buffer, + json_path_writer, postings_writer, ctx, positions_per_path, @@ -194,7 +231,8 @@ fn index_json_value<'a, V: Value<'a>>( doc, object, text_analyzer, - json_term_writer, + term_buffer, + json_path_writer, postings_writer, ctx, positions_per_path, @@ -361,6 +399,7 @@ impl<'a> JsonTermWriter<'a> { self.term_buffer.append_bytes(&[typ.to_code()]); } + // TODO: Remove this function and use JsonPathWriter instead. pub fn push_path_segment(&mut self, segment: &str) { // the path stack should never be empty. self.trim_to_end_of_path(); diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 6669093915..13731444ae 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -5,6 +5,7 @@ //! [`Index::writer`](crate::Index::writer). pub(crate) mod delete_queue; +pub(crate) mod path_to_unordered_id; pub(crate) mod doc_id_mapping; mod doc_opstamp_mapping; diff --git a/src/indexer/path_to_unordered_id.rs b/src/indexer/path_to_unordered_id.rs new file mode 100644 index 0000000000..40d8cbaf1f --- /dev/null +++ b/src/indexer/path_to_unordered_id.rs @@ -0,0 +1,69 @@ +use fnv::FnvHashMap; + +#[derive(Default)] +pub(crate) struct PathToUnorderedId { + map: FnvHashMap, +} + +impl PathToUnorderedId { + #[inline] + pub(crate) fn get_id(&mut self, path: &str) -> u32 { + if let Some(id) = self.map.get(path) { + return *id; + } + self.insert_new_path(path) + } + #[cold] + fn insert_new_path(&mut self, path: &str) -> u32 { + let next_id = self.map.len() as u32; + self.map.insert(path.to_string(), next_id); + next_id + } + + /// Retuns ids which reflect the lexical order of the paths. + /// + /// The returned vec can be indexed with the unordered id to get the ordered id. + pub(crate) fn unordered_id_to_ordered_id(&self) -> Vec { + let mut sorted_ids: Vec<(&String, &u32)> = self.map.iter().collect(); + sorted_ids.sort_unstable_by_key(|(path, _)| *path); + let mut result = vec![sorted_ids.len() as u32; sorted_ids.len()]; + for (ordered, unordered) in sorted_ids.iter().map(|(_k, v)| v).enumerate() { + result[**unordered as usize] = ordered as u32; + } + result + } + + /// Retuns the paths so they can be queried by the ordered id (which is the index). + pub(crate) fn ordered_id_to_path(&self) -> Vec<&String> { + let mut paths = self.map.keys().collect::>(); + paths.sort_unstable(); + paths + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn path_to_unordered_test() { + let mut path_to_id = PathToUnorderedId::default(); + let terms = vec!["b", "a", "b", "c"]; + let ids = terms + .iter() + .map(|term| path_to_id.get_id(term)) + .collect::>(); + assert_eq!(ids, vec![0, 1, 0, 2]); + let ordered_ids = ids + .iter() + .map(|id| path_to_id.unordered_id_to_ordered_id()[*id as usize]) + .collect::>(); + assert_eq!(ordered_ids, vec![1, 0, 1, 2]); + // Fetch terms + let terms_fetched = ordered_ids + .iter() + .map(|id| path_to_id.ordered_id_to_path()[*id as usize]) + .collect::>(); + assert_eq!(terms_fetched, terms); + } +} diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 1369f1ad32..180d5a66d8 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -1,4 +1,5 @@ use columnar::MonotonicallyMappableToU64; +use common::JsonPathWriter; use itertools::Itertools; use tokenizer_api::BoxTokenStream; @@ -66,6 +67,7 @@ pub struct SegmentWriter { pub(crate) segment_serializer: SegmentSerializer, pub(crate) fast_field_writers: FastFieldsWriter, pub(crate) fieldnorms_writer: FieldNormsWriter, + pub(crate) json_path_writer: JsonPathWriter, pub(crate) doc_opstamps: Vec, per_field_text_analyzers: Vec, term_buffer: Term, @@ -116,6 +118,7 @@ impl SegmentWriter { ctx: IndexingContext::new(table_size), per_field_postings_writers, fieldnorms_writer: FieldNormsWriter::for_schema(&schema), + json_path_writer: JsonPathWriter::default(), segment_serializer, fast_field_writers: FastFieldsWriter::from_schema_and_tokenizer_manager( &schema, @@ -144,6 +147,7 @@ impl SegmentWriter { .map(|sort_by_field| get_doc_id_mapping_from_field(sort_by_field, &self)) .transpose()?; remap_and_write( + self.schema, &self.per_field_postings_writers, self.ctx, self.fast_field_writers, @@ -355,6 +359,7 @@ impl SegmentWriter { json_options.is_expand_dots_enabled(), term_buffer, postings_writer, + &mut self.json_path_writer, ctx, )?; } @@ -422,6 +427,7 @@ impl SegmentWriter { /// /// `doc_id_map` is used to map to the new doc_id order. fn remap_and_write( + schema: Schema, per_field_postings_writers: &PerFieldPostingsWriter, ctx: IndexingContext, fast_field_writers: FastFieldsWriter, @@ -439,6 +445,7 @@ fn remap_and_write( let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?; serialize_postings( ctx, + schema, per_field_postings_writers, fieldnorm_readers, doc_id_map, @@ -489,11 +496,11 @@ mod tests { use tempfile::TempDir; use super::compute_initial_table_size; - use crate::collector::Count; + use crate::collector::{Count, TopDocs}; use crate::core::json_utils::JsonTermWriter; use crate::directory::RamDirectory; use crate::postings::TermInfo; - use crate::query::PhraseQuery; + use crate::query::{PhraseQuery, QueryParser}; use crate::schema::document::Value; use crate::schema::{ Document, IndexRecordOption, Schema, TextFieldIndexing, TextOptions, Type, STORED, STRING, @@ -552,6 +559,43 @@ mod tests { assert_eq!(doc.field_values()[0].value().as_str(), Some("A")); assert_eq!(doc.field_values()[1].value().as_str(), Some("title")); } + #[test] + fn test_simple_json_indexing() { + let mut schema_builder = Schema::builder(); + let json_field = schema_builder.add_json_field("json", STORED | STRING); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema.clone()); + let mut writer = index.writer_for_tests().unwrap(); + writer + .add_document(doc!(json_field=>json!({"my_field": "b"}))) + .unwrap(); + writer + .add_document(doc!(json_field=>json!({"my_field": "a"}))) + .unwrap(); + writer + .add_document(doc!(json_field=>json!({"my_field": "b"}))) + .unwrap(); + writer.commit().unwrap(); + + let query_parser = QueryParser::for_index(&index, vec![json_field]); + let text_query = query_parser.parse_query("my_field:a").unwrap(); + let score_docs: Vec<(_, DocAddress)> = index + .reader() + .unwrap() + .searcher() + .search(&text_query, &TopDocs::with_limit(4)) + .unwrap(); + assert_eq!(score_docs.len(), 1); + + let text_query = query_parser.parse_query("my_field:b").unwrap(); + let score_docs: Vec<(_, DocAddress)> = index + .reader() + .unwrap() + .searcher() + .search(&text_query, &TopDocs::with_limit(4)) + .unwrap(); + assert_eq!(score_docs.len(), 2); + } #[test] fn test_json_indexing() { diff --git a/src/postings/indexing_context.rs b/src/postings/indexing_context.rs index 975de71dd0..d8916727d3 100644 --- a/src/postings/indexing_context.rs +++ b/src/postings/indexing_context.rs @@ -1,5 +1,7 @@ use stacker::{ArenaHashMap, MemoryArena}; +use crate::indexer::path_to_unordered_id::PathToUnorderedId; + /// IndexingContext contains all of the transient memory arenas /// required for building the inverted index. pub(crate) struct IndexingContext { @@ -8,6 +10,7 @@ pub(crate) struct IndexingContext { pub term_index: ArenaHashMap, /// Arena is a memory arena that stores posting lists / term frequencies / positions. pub arena: MemoryArena, + pub(crate) path_to_unordered_id: PathToUnorderedId, } impl IndexingContext { @@ -17,6 +20,7 @@ impl IndexingContext { IndexingContext { arena: MemoryArena::default(), term_index, + path_to_unordered_id: PathToUnorderedId::default(), } } diff --git a/src/postings/json_postings_writer.rs b/src/postings/json_postings_writer.rs index 0f875768ca..9669d72f36 100644 --- a/src/postings/json_postings_writer.rs +++ b/src/postings/json_postings_writer.rs @@ -6,7 +6,7 @@ use crate::indexer::doc_id_mapping::DocIdMapping; use crate::postings::postings_writer::SpecializedPostingsWriter; use crate::postings::recorder::{BufferLender, DocIdRecorder, Recorder}; use crate::postings::{FieldSerializer, IndexingContext, IndexingPosition, PostingsWriter}; -use crate::schema::Type; +use crate::schema::{Field, Type, JSON_END_OF_PATH}; use crate::tokenizer::TokenStream; use crate::{DocId, Term}; @@ -55,17 +55,25 @@ impl PostingsWriter for JsonPostingsWriter { fn serialize( &self, term_addrs: &[(Term<&[u8]>, Addr)], + ordered_id_to_path: &[&String], doc_id_map: Option<&DocIdMapping>, ctx: &IndexingContext, serializer: &mut FieldSerializer, ) -> io::Result<()> { + let mut term_buffer = Term::with_capacity(48); let mut buffer_lender = BufferLender::default(); for (term, addr) in term_addrs { - if let Some(json_value) = term.value().as_json_value_bytes() { + let ordered_id_bytes: [u8; 4] = term.serialized_value_bytes()[..4].try_into().unwrap(); + let ordered_id = u32::from_be_bytes(ordered_id_bytes); + term_buffer.clear_with_field_and_type(Type::Json, Field::from_field_id(0)); + term_buffer.append_bytes(ordered_id_to_path[ordered_id as usize].as_bytes()); + term_buffer.append_bytes(&[JSON_END_OF_PATH]); + term_buffer.append_bytes(&term.serialized_value_bytes()[4..]); + if let Some(json_value) = term_buffer.value().as_json_value_bytes() { let typ = json_value.typ(); if typ == Type::Str { SpecializedPostingsWriter::::serialize_one_term( - term, + term_buffer.serialized_value_bytes(), *addr, doc_id_map, &mut buffer_lender, @@ -74,7 +82,7 @@ impl PostingsWriter for JsonPostingsWriter { )?; } else { SpecializedPostingsWriter::::serialize_one_term( - term, + term_buffer.serialized_value_bytes(), *addr, doc_id_map, &mut buffer_lender, diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index 96952e2a79..d603484959 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -10,7 +10,7 @@ use crate::postings::recorder::{BufferLender, Recorder}; use crate::postings::{ FieldSerializer, IndexingContext, InvertedIndexSerializer, PerFieldPostingsWriter, }; -use crate::schema::{Field, Term}; +use crate::schema::{Field, Schema, Term, Type}; use crate::tokenizer::{Token, TokenStream, MAX_TOKEN_LEN}; use crate::DocId; @@ -43,12 +43,27 @@ fn make_field_partition(term_offsets: &[(Term<&[u8]>, Addr)]) -> Vec<(Field, Ran /// It pushes all term, one field at a time, towards the /// postings serializer. pub(crate) fn serialize_postings( - ctx: IndexingContext, + mut ctx: IndexingContext, + schema: Schema, per_field_postings_writers: &PerFieldPostingsWriter, fieldnorm_readers: FieldNormReaders, doc_id_map: Option<&DocIdMapping>, serializer: &mut InvertedIndexSerializer, ) -> crate::Result<()> { + // Replace unordered ids by ordered ids to be able to sort + let unordered_id_to_ordered_id = ctx.path_to_unordered_id.unordered_id_to_ordered_id(); + unsafe { + ctx.term_index.iter_mut_keys(|key| { + let field = Term::wrap(&key).field(); + if schema.get_field_entry(field).field_type().value_type() == Type::Json { + let byte_range = 5..5 + 4; + let unordered_id = u32::from_be_bytes(key[byte_range.clone()].try_into().unwrap()); + let ordered_id = unordered_id_to_ordered_id[unordered_id as usize]; + key[byte_range].copy_from_slice(&ordered_id.to_be_bytes()); + } + }); + } + let mut term_offsets: Vec<(Term<&[u8]>, Addr)> = Vec::with_capacity(ctx.term_index.len()); term_offsets.extend( ctx.term_index @@ -57,6 +72,8 @@ pub(crate) fn serialize_postings( ); term_offsets.sort_unstable_by_key(|(k, _)| k.clone()); + let ordered_id_to_path = ctx.path_to_unordered_id.ordered_id_to_path(); + let field_offsets = make_field_partition(&term_offsets); for (field, byte_offsets) in field_offsets { let postings_writer = per_field_postings_writers.get_for_field(field); @@ -65,6 +82,7 @@ pub(crate) fn serialize_postings( serializer.new_field(field, postings_writer.total_num_tokens(), fieldnorm_reader)?; postings_writer.serialize( &term_offsets[byte_offsets], + &ordered_id_to_path, doc_id_map, &ctx, &mut field_serializer, @@ -99,6 +117,7 @@ pub(crate) trait PostingsWriter: Send + Sync { fn serialize( &self, term_addrs: &[(Term<&[u8]>, Addr)], + ordered_id_to_path: &[&String], doc_id_map: Option<&DocIdMapping>, ctx: &IndexingContext, serializer: &mut FieldSerializer, @@ -162,7 +181,7 @@ impl From> for Box SpecializedPostingsWriter { #[inline] pub(crate) fn serialize_one_term( - term: &Term<&[u8]>, + term: &[u8], addr: Addr, doc_id_map: Option<&DocIdMapping>, buffer_lender: &mut BufferLender, @@ -171,7 +190,7 @@ impl SpecializedPostingsWriter { ) -> io::Result<()> { let recorder: Rec = ctx.term_index.read(addr); let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32); - serializer.new_term(term.serialized_value_bytes(), term_doc_freq)?; + serializer.new_term(term, term_doc_freq)?; recorder.serialize(&ctx.arena, doc_id_map, serializer, buffer_lender); serializer.close_term()?; Ok(()) @@ -205,13 +224,21 @@ impl PostingsWriter for SpecializedPostingsWriter { fn serialize( &self, term_addrs: &[(Term<&[u8]>, Addr)], + _ordered_id_to_path: &[&String], doc_id_map: Option<&DocIdMapping>, ctx: &IndexingContext, serializer: &mut FieldSerializer, ) -> io::Result<()> { let mut buffer_lender = BufferLender::default(); for (term, addr) in term_addrs { - Self::serialize_one_term(term, *addr, doc_id_map, &mut buffer_lender, ctx, serializer)?; + Self::serialize_one_term( + term.serialized_value_bytes(), + *addr, + doc_id_map, + &mut buffer_lender, + ctx, + serializer, + )?; } Ok(()) } diff --git a/src/schema/field_type.rs b/src/schema/field_type.rs index 166977b38f..04e71394bc 100644 --- a/src/schema/field_type.rs +++ b/src/schema/field_type.rs @@ -93,6 +93,7 @@ impl Type { } /// Returns a 1 byte code used to identify the type. + #[inline] pub fn to_code(&self) -> u8 { *self as u8 } @@ -115,6 +116,7 @@ impl Type { /// Interprets a 1byte code as a type. /// Returns `None` if the code is invalid. + #[inline] pub fn from_code(code: u8) -> Option { match code { b's' => Some(Type::Str), diff --git a/src/schema/term.rs b/src/schema/term.rs index 995137b533..db707e2948 100644 --- a/src/schema/term.rs +++ b/src/schema/term.rs @@ -3,7 +3,7 @@ use std::hash::{Hash, Hasher}; use std::net::Ipv6Addr; use std::{fmt, str}; -use columnar::MonotonicallyMappableToU128; +use columnar::{MonotonicallyMappableToU128, MonotonicallyMappableToU64}; use super::date_time_options::DATE_TIME_PRECISION_INDEXED; use super::Field; @@ -170,6 +170,18 @@ impl Term { self.set_bytes(val.to_u64().to_be_bytes().as_ref()); } + pub(crate) fn append_type_and_fast_value(&mut self, val: T) { + self.0.push(T::to_type().to_code()); + let value = if T::to_type() == Type::Date { + DateTime::from_u64(val.to_u64()) + .truncate(DATE_TIME_PRECISION_INDEXED) + .to_u64() + } else { + val.to_u64() + }; + self.0.extend(value.to_be_bytes().as_ref()); + } + /// Sets a `Ipv6Addr` value in the term. pub fn set_ip_addr(&mut self, val: Ipv6Addr) { self.set_bytes(val.to_u128().to_be_bytes().as_ref()); diff --git a/stacker/src/arena_hashmap.rs b/stacker/src/arena_hashmap.rs index d855e70431..df329c8ee8 100644 --- a/stacker/src/arena_hashmap.rs +++ b/stacker/src/arena_hashmap.rs @@ -23,7 +23,7 @@ type HashType = u64; /// The `value_addr` also points to an address in the memory arena. #[derive(Copy, Clone)] struct KeyValue { - key_value_addr: Addr, + pub(crate) key_value_addr: Addr, hash: HashType, } @@ -235,6 +235,29 @@ impl ArenaHashMap { } } + #[inline] + /// This will invalidate the hashmaps get and insert methods. + /// Only iter() is still valid afterwards + /// + /// # Safety + /// Any call to get or mutate_or_create after this call is undefined behavior. + pub unsafe fn iter_mut_keys(&mut self, cb: F) { + for kv in self + .table + .iter() + .cloned() + .filter(KeyValue::is_not_empty_ref) + { + let data = self.memory_arena.slice_from_mut(kv.key_value_addr); + let key_bytes_len_bytes = unsafe { data.get_unchecked(..2) }; + let key_bytes_len = u16::from_le_bytes(key_bytes_len_bytes.try_into().unwrap()); + let key_bytes: &mut [u8] = + unsafe { data.get_unchecked_mut(2..2 + key_bytes_len as usize) }; + + cb(key_bytes); + } + } + fn resize(&mut self) { let new_len = (self.table.len() * 2).max(1 << 13); let mask = new_len - 1; diff --git a/stacker/src/memory_arena.rs b/stacker/src/memory_arena.rs index f3ed5d4bb9..0d5de72f06 100644 --- a/stacker/src/memory_arena.rs +++ b/stacker/src/memory_arena.rs @@ -148,6 +148,11 @@ impl MemoryArena { self.get_page(addr.page_id()) .slice_from(addr.page_local_addr()) } + #[inline] + pub fn slice_from_mut(&mut self, addr: Addr) -> &mut [u8] { + self.get_page_mut(addr.page_id()) + .slice_from_mut(addr.page_local_addr()) + } #[inline] pub fn slice_mut(&mut self, addr: Addr, len: usize) -> &mut [u8] { @@ -206,6 +211,10 @@ impl Page { fn slice_from(&self, local_addr: usize) -> &[u8] { &self.data[local_addr..] } + #[inline] + fn slice_from_mut(&mut self, local_addr: usize) -> &mut [u8] { + &mut self.data[local_addr..] + } #[inline] fn slice_mut(&mut self, local_addr: usize, len: usize) -> &mut [u8] {