Skip to content

Commit

Permalink
store DateTime as nanoseconds in doc store (#2486)
Browse files Browse the repository at this point in the history
* store DateTime as nanoseconds in doc store

The doc store DateTime was truncated to microseconds previously. This
removes this truncation, while still keeping backwards compatibility.

This is done by adding the trait `ConfigurableBinarySerializable`, which
works like `BinarySerializable`, but with a config that allows de/serialize
as different date time precision currently.

bump version format to 7.
add compat test to check the date time truncation.

* remove configurable binary serialize, add enum for doc store version

* test doc store version ord
  • Loading branch information
PSeitz authored Oct 18, 2024
1 parent d152e29 commit 2f2db16
Show file tree
Hide file tree
Showing 22 changed files with 246 additions and 89 deletions.
2 changes: 1 addition & 1 deletion common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use datetime::{DateTime, DateTimePrecision};
pub use group_by::GroupByIteratorExtended;
pub use json_path_writer::JsonPathWriter;
pub use ownedbytes::{OwnedBytes, StableDeref};
pub use serialize::{BinarySerializable, DeserializeFrom, FixedSize};
pub use serialize::*;
pub use vint::{
read_u32_vint, read_u32_vint_no_advance, serialize_vint_u32, write_u32_vint, VInt, VIntU128,
};
Expand Down
20 changes: 10 additions & 10 deletions common/src/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ impl FixedSize for () {

impl<T: BinarySerializable> BinarySerializable for Vec<T> {
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> io::Result<()> {
VInt(self.len() as u64).serialize(writer)?;
BinarySerializable::serialize(&VInt(self.len() as u64), writer)?;
for it in self {
it.serialize(writer)?;
}
Ok(())
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Vec<T>> {
let num_items = VInt::deserialize(reader)?.val();
let num_items = <VInt as BinarySerializable>::deserialize(reader)?.val();
let mut items: Vec<T> = Vec::with_capacity(num_items as usize);
for _ in 0..num_items {
let item = T::deserialize(reader)?;
Expand Down Expand Up @@ -236,12 +236,12 @@ impl FixedSize for bool {
impl BinarySerializable for String {
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> io::Result<()> {
let data: &[u8] = self.as_bytes();
VInt(data.len() as u64).serialize(writer)?;
BinarySerializable::serialize(&VInt(data.len() as u64), writer)?;
writer.write_all(data)
}

fn deserialize<R: Read>(reader: &mut R) -> io::Result<String> {
let string_length = VInt::deserialize(reader)?.val() as usize;
let string_length = <VInt as BinarySerializable>::deserialize(reader)?.val() as usize;
let mut result = String::with_capacity(string_length);
reader
.take(string_length as u64)
Expand All @@ -253,12 +253,12 @@ impl BinarySerializable for String {
impl<'a> BinarySerializable for Cow<'a, str> {
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> io::Result<()> {
let data: &[u8] = self.as_bytes();
VInt(data.len() as u64).serialize(writer)?;
BinarySerializable::serialize(&VInt(data.len() as u64), writer)?;
writer.write_all(data)
}

fn deserialize<R: Read>(reader: &mut R) -> io::Result<Cow<'a, str>> {
let string_length = VInt::deserialize(reader)?.val() as usize;
let string_length = <VInt as BinarySerializable>::deserialize(reader)?.val() as usize;
let mut result = String::with_capacity(string_length);
reader
.take(string_length as u64)
Expand All @@ -269,18 +269,18 @@ impl<'a> BinarySerializable for Cow<'a, str> {

impl<'a> BinarySerializable for Cow<'a, [u8]> {
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> io::Result<()> {
VInt(self.len() as u64).serialize(writer)?;
BinarySerializable::serialize(&VInt(self.len() as u64), writer)?;
for it in self.iter() {
it.serialize(writer)?;
BinarySerializable::serialize(it, writer)?;
}
Ok(())
}

fn deserialize<R: Read>(reader: &mut R) -> io::Result<Cow<'a, [u8]>> {
let num_items = VInt::deserialize(reader)?.val();
let num_items = <VInt as BinarySerializable>::deserialize(reader)?.val();
let mut items: Vec<u8> = Vec::with_capacity(num_items as usize);
for _ in 0..num_items {
let item = u8::deserialize(reader)?;
let item = <u8 as BinarySerializable>::deserialize(reader)?;
items.push(item);
}
Ok(Cow::Owned(items))
Expand Down
15 changes: 13 additions & 2 deletions src/compat_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,19 @@ fn test_format_6() {
assert_date_time_precision(&index, DateTimePrecision::Microseconds);
}

/// feature flag quickwit uses a different dictionary type
#[test]
#[cfg(not(feature = "quickwit"))]
fn test_format_7() {
let path = path_for_version("7");

let index = Index::open_in_dir(path).expect("Failed to open index");
// dates are not truncated in v7 in the docstore
assert_date_time_precision(&index, DateTimePrecision::Nanoseconds);
}

#[cfg(not(feature = "quickwit"))]
fn assert_date_time_precision(index: &Index, precision: DateTimePrecision) {
fn assert_date_time_precision(index: &Index, doc_store_precision: DateTimePrecision) {
use collector::TopDocs;
let reader = index.reader().expect("Failed to create reader");
let searcher = reader.searcher();
Expand Down Expand Up @@ -75,6 +86,6 @@ fn assert_date_time_precision(index: &Index, precision: DateTimePrecision) {
.as_datetime()
.unwrap();

let expected = DateTime::from_timestamp_nanos(123456).truncate(precision);
let expected = DateTime::from_timestamp_nanos(123456).truncate(doc_store_precision);
assert_eq!(date_value, expected,);
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ pub use crate::indexer::{IndexWriter, SingleSegmentIndexWriter};
pub use crate::schema::{Document, TantivyDocument, Term};

/// Index format version.
pub const INDEX_FORMAT_VERSION: u32 = 6;
pub const INDEX_FORMAT_VERSION: u32 = 7;
/// Oldest index format version this tantivy version can read.
pub const INDEX_FORMAT_OLDEST_SUPPORTED_VERSION: u32 = 4;

Expand Down
88 changes: 74 additions & 14 deletions src/schema/document/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use super::se::BinaryObjectSerializer;
use super::{OwnedValue, Value};
use crate::schema::document::type_codes;
use crate::schema::{Facet, Field};
use crate::store::DocStoreVersion;
use crate::tokenizer::PreTokenizedString;

#[derive(Debug, thiserror::Error, Clone)]
Expand All @@ -45,6 +46,9 @@ pub enum DeserializeError {
#[error("{0}")]
/// A custom error message.
Custom(String),
#[error("Version {0}, Max version supported: {1}")]
/// Unsupported version error.
UnsupportedVersion(u32, u32),
}

impl DeserializeError {
Expand Down Expand Up @@ -291,19 +295,24 @@ pub trait ObjectAccess<'de> {
pub struct BinaryDocumentDeserializer<'de, R> {
length: usize,
position: usize,
doc_store_version: DocStoreVersion,
reader: &'de mut R,
}

impl<'de, R> BinaryDocumentDeserializer<'de, R>
where R: Read
{
/// Attempts to create a new document deserializer from a given reader.
pub(crate) fn from_reader(reader: &'de mut R) -> Result<Self, DeserializeError> {
pub(crate) fn from_reader(
reader: &'de mut R,
doc_store_version: DocStoreVersion,
) -> Result<Self, DeserializeError> {
let length = VInt::deserialize(reader)?;

Ok(Self {
length: length.val() as usize,
position: 0,
doc_store_version,
reader,
})
}
Expand All @@ -329,8 +338,8 @@ where R: Read
}

let field = Field::deserialize(self.reader).map_err(DeserializeError::from)?;

let deserializer = BinaryValueDeserializer::from_reader(self.reader)?;
let deserializer =
BinaryValueDeserializer::from_reader(self.reader, self.doc_store_version)?;
let value = V::deserialize(deserializer)?;

self.position += 1;
Expand All @@ -344,13 +353,17 @@ where R: Read
pub struct BinaryValueDeserializer<'de, R> {
value_type: ValueType,
reader: &'de mut R,
doc_store_version: DocStoreVersion,
}

impl<'de, R> BinaryValueDeserializer<'de, R>
where R: Read
{
/// Attempts to create a new value deserializer from a given reader.
fn from_reader(reader: &'de mut R) -> Result<Self, DeserializeError> {
fn from_reader(
reader: &'de mut R,
doc_store_version: DocStoreVersion,
) -> Result<Self, DeserializeError> {
let type_code = <u8 as BinarySerializable>::deserialize(reader)?;

let value_type = match type_code {
Expand Down Expand Up @@ -391,7 +404,11 @@ where R: Read
}
};

Ok(Self { value_type, reader })
Ok(Self {
value_type,
reader,
doc_store_version,
})
}

fn validate_type(&self, expected_type: ValueType) -> Result<(), DeserializeError> {
Expand Down Expand Up @@ -438,7 +455,16 @@ where R: Read

fn deserialize_datetime(self) -> Result<DateTime, DeserializeError> {
self.validate_type(ValueType::DateTime)?;
<DateTime as BinarySerializable>::deserialize(self.reader).map_err(DeserializeError::from)
match self.doc_store_version {
DocStoreVersion::V1 => {
let timestamp_micros = <i64 as BinarySerializable>::deserialize(self.reader)?;
Ok(DateTime::from_timestamp_micros(timestamp_micros))
}
DocStoreVersion::V2 => {
let timestamp_nanos = <i64 as BinarySerializable>::deserialize(self.reader)?;
Ok(DateTime::from_timestamp_nanos(timestamp_nanos))
}
}
}

fn deserialize_facet(self) -> Result<Facet, DeserializeError> {
Expand Down Expand Up @@ -514,11 +540,13 @@ where R: Read
visitor.visit_pre_tokenized_string(val)
}
ValueType::Array => {
let access = BinaryArrayDeserializer::from_reader(self.reader)?;
let access =
BinaryArrayDeserializer::from_reader(self.reader, self.doc_store_version)?;
visitor.visit_array(access)
}
ValueType::Object => {
let access = BinaryObjectDeserializer::from_reader(self.reader)?;
let access =
BinaryObjectDeserializer::from_reader(self.reader, self.doc_store_version)?;
visitor.visit_object(access)
}
#[allow(deprecated)]
Expand All @@ -537,7 +565,8 @@ where R: Read

let out_rc = std::rc::Rc::new(out);
let mut slice: &[u8] = &out_rc;
let access = BinaryObjectDeserializer::from_reader(&mut slice)?;
let access =
BinaryObjectDeserializer::from_reader(&mut slice, self.doc_store_version)?;

visitor.visit_object(access)
}
Expand All @@ -551,19 +580,24 @@ pub struct BinaryArrayDeserializer<'de, R> {
length: usize,
position: usize,
reader: &'de mut R,
doc_store_version: DocStoreVersion,
}

impl<'de, R> BinaryArrayDeserializer<'de, R>
where R: Read
{
/// Attempts to create a new array deserializer from a given reader.
fn from_reader(reader: &'de mut R) -> Result<Self, DeserializeError> {
fn from_reader(
reader: &'de mut R,
doc_store_version: DocStoreVersion,
) -> Result<Self, DeserializeError> {
let length = <VInt as BinarySerializable>::deserialize(reader)?;

Ok(Self {
length: length.val() as usize,
position: 0,
reader,
doc_store_version,
})
}

Expand All @@ -587,7 +621,8 @@ where R: Read
return Ok(None);
}

let deserializer = BinaryValueDeserializer::from_reader(self.reader)?;
let deserializer =
BinaryValueDeserializer::from_reader(self.reader, self.doc_store_version)?;
let value = V::deserialize(deserializer)?;

// Advance the position cursor.
Expand All @@ -610,8 +645,11 @@ impl<'de, R> BinaryObjectDeserializer<'de, R>
where R: Read
{
/// Attempts to create a new object deserializer from a given reader.
fn from_reader(reader: &'de mut R) -> Result<Self, DeserializeError> {
let inner = BinaryArrayDeserializer::from_reader(reader)?;
fn from_reader(
reader: &'de mut R,
doc_store_version: DocStoreVersion,
) -> Result<Self, DeserializeError> {
let inner = BinaryArrayDeserializer::from_reader(reader, doc_store_version)?;
Ok(Self { inner })
}
}
Expand Down Expand Up @@ -819,6 +857,7 @@ mod tests {
use crate::schema::document::existing_type_impls::JsonObjectIter;
use crate::schema::document::se::BinaryValueSerializer;
use crate::schema::document::{ReferenceValue, ReferenceValueLeaf};
use crate::store::DOC_STORE_VERSION;

fn serialize_value<'a>(value: ReferenceValue<'a, &'a serde_json::Value>) -> Vec<u8> {
let mut writer = Vec::new();
Expand All @@ -829,9 +868,19 @@ mod tests {
writer
}

fn serialize_owned_value<'a>(value: ReferenceValue<'a, &'a OwnedValue>) -> Vec<u8> {
let mut writer = Vec::new();

let mut serializer = BinaryValueSerializer::new(&mut writer);
serializer.serialize_value(value).expect("Serialize value");

writer
}

fn deserialize_value(buffer: Vec<u8>) -> crate::schema::OwnedValue {
let mut cursor = Cursor::new(buffer);
let deserializer = BinaryValueDeserializer::from_reader(&mut cursor).unwrap();
let deserializer =
BinaryValueDeserializer::from_reader(&mut cursor, DOC_STORE_VERSION).unwrap();
crate::schema::OwnedValue::deserialize(deserializer).expect("Deserialize value")
}

Expand Down Expand Up @@ -1010,6 +1059,17 @@ mod tests {
assert_eq!(value, expected_val);
}

#[test]
fn test_nested_date_precision() {
let object = OwnedValue::Object(vec![(
"my-date".into(),
OwnedValue::Date(DateTime::from_timestamp_nanos(323456)),
)]);
let result = serialize_owned_value((&object).as_value());
let value = deserialize_value(result);
assert_eq!(value, object);
}

#[test]
fn test_nested_serialize() {
let mut object = serde_json::Map::new();
Expand Down
Loading

0 comments on commit 2f2db16

Please sign in to comment.