Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow warming partially an sstable for an automaton #2559

Merged
merged 15 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion columnar/src/columnar/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl ColumnarReader {
) -> io::Result<Vec<DynamicColumnHandle>> {
let stream = self
.stream_for_column_range(column_name)
.into_stream_async()
.into_stream_async(0)
.await?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
}
Expand Down
74 changes: 68 additions & 6 deletions src/index/inverted_index_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ use std::io;
use common::json_path_writer::JSON_END_OF_PATH;
use common::BinarySerializable;
use fnv::FnvHashSet;
#[cfg(feature = "quickwit")]
use futures_util::{FutureExt, StreamExt, TryStreamExt};
#[cfg(feature = "quickwit")]
use itertools::Itertools;
#[cfg(feature = "quickwit")]
use tantivy_fst::automaton::{AlwaysMatch, Automaton};

use crate::directory::FileSlice;
use crate::positions::PositionReader;
Expand Down Expand Up @@ -219,13 +225,18 @@ impl InvertedIndexReader {
self.termdict.get_async(term.serialized_value_bytes()).await
}

async fn get_term_range_async(
&self,
async fn get_term_range_async<'a, A: Automaton + 'a>(
&'a self,
terms: impl std::ops::RangeBounds<Term>,
automaton: A,
limit: Option<u64>,
) -> io::Result<impl Iterator<Item = TermInfo> + '_> {
merge_holes_under: usize,
) -> io::Result<impl Iterator<Item = TermInfo> + 'a>
where
A::State: Clone,
{
use std::ops::Bound;
let range_builder = self.termdict.range();
let range_builder = self.termdict.search(automaton);
let range_builder = match terms.start_bound() {
Bound::Included(bound) => range_builder.ge(bound.serialized_value_bytes()),
Bound::Excluded(bound) => range_builder.gt(bound.serialized_value_bytes()),
Expand All @@ -242,7 +253,7 @@ impl InvertedIndexReader {
range_builder
};

let mut stream = range_builder.into_stream_async().await?;
let mut stream = range_builder.into_stream_async(merge_holes_under).await?;

let iter = std::iter::from_fn(move || stream.next().map(|(_k, v)| v.clone()));

Expand Down Expand Up @@ -288,7 +299,9 @@ impl InvertedIndexReader {
limit: Option<u64>,
with_positions: bool,
) -> io::Result<bool> {
let mut term_info = self.get_term_range_async(terms, limit).await?;
let mut term_info = self
.get_term_range_async(terms, AlwaysMatch, limit, 0)
.await?;

let Some(first_terminfo) = term_info.next() else {
// no key matches, nothing more to load
Expand All @@ -315,6 +328,55 @@ impl InvertedIndexReader {
Ok(true)
}

/// Warmup a block postings given a range of `Term`s.
/// This method is for an advanced usage only.
///
/// returns a boolean, whether a term matching the range was found in the dictionary
pub async fn warm_postings_automaton<A: Automaton + Clone>(
&self,
automaton: A,
// with_positions: bool, at the moment we have no use for it, and supporting it would add
// complexity to the coalesce
) -> io::Result<bool>
where
A::State: Clone,
{
// merge holes under 4MiB, that's how many bytes we can hope to receive during a TTFB from
// S3 (~80MiB/s, and 50ms latency)
let merge_holes_under = (80 * 1024 * 1024 * 50) / 1000;
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
// we build a first iterator to download everything. Simply calling the function already
// loads everything, but doesn't start iterating over the sstable.
let mut _term_info = self
.get_term_range_async(.., automaton.clone(), None, merge_holes_under)
.await?;
// we build a 2nd iterator, this one with no holes, so we don't go through blocks we can't
// match, and just download them to reduce our query count. This makes the assumption
// there is a caching layer below, which might not always be true, but is in Quickwit.
let term_info = self.get_term_range_async(.., automaton, None, 0).await?;
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved

let range_to_load = term_info
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, it is difficult for readers to know what is the type of range_to_load.

.map(|term_info| term_info.postings_range)
.coalesce(|range1, range2| {
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
if range1.end + merge_holes_under >= range2.start {
Ok(range1.start..range2.end)
} else {
Err((range1, range2))
}
});

let slices_downloaded = futures_util::stream::iter(range_to_load)
.map(|posting_slice| {
self.postings_file_slice
.read_bytes_slice_async(posting_slice)
.map(|result| result.map(|_slice| ()))
})
.buffer_unordered(5)
.try_collect::<Vec<()>>()
.await?;

Ok(!slices_downloaded.is_empty())
}

/// Warmup the block postings for all terms.
/// This method is for an advanced usage only.
///
Expand Down
2 changes: 2 additions & 0 deletions sstable/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ description = "sstables for tantivy"

[dependencies]
common = {version= "0.7", path="../common", package="tantivy-common"}
futures-util = "0.3.30"
itertools = "0.13.0"
tantivy-bitpacker = { version= "0.6", path="../bitpacker" }
tantivy-fst = "0.5"
# experimental gives us access to Decompressor::upper_bound
Expand Down
208 changes: 208 additions & 0 deletions sstable/src/block_match_automaton.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
use tantivy_fst::Automaton;

/// Returns whether a block whose starting key (exclusive) and final key (inclusive) can match the
/// provided automaton, without actually looking at the block content.
pub(crate) fn block_match_automaton(
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
start_key: Option<&[u8]>,
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
end_key: &[u8],
automaton: &impl Automaton,
) -> bool {
let initial_block = start_key.is_none();
let start_key = start_key.unwrap_or(&[]);

debug_assert!(start_key <= end_key);

let prefix_len = start_key
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
.iter()
.zip(end_key)
.take_while(|(c1, c2)| c1 == c2)
.count();

let mut base_state = automaton.start();
for c in &start_key[0..prefix_len] {
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
base_state = automaton.accept(&base_state, *c);
}
if !automaton.can_match(&base_state) {
return false;
}

if initial_block && automaton.is_match(&base_state) {
// for other blocks, the start_key is exclusive, and a prefix of it would be in a
// previous block
return true;
}

// we have 3 distinct case:
// - keys are `abc` and `abcd` => we test for abc[\0-d].*
// - keys are `abcd` and `abce` => we test for abc[d-e].*
// - keys are `abcd` and `abc` => contradiction with start_key < end_key.
//
// ideally for [abc, abcde] we could test for abc([\0-c].*|d([\0-d].*|e)?)
// but let's start simple (and correct), and tighten our bounds latter
//
// and for [abcde, abcfg] we could test for abc(d(e.+|[f-\xff].*)|e.*|f([\0-f].*|g)?)
// abc (
// d(e.+|[f-\xff].*) |
// e.* |
// f([\0-f].*|g)?
// )
//
// these are all written as regex, but can be converted to operations we can do:
// - [x-y] is a for c in x..y
// - .* is a can_match()
// - .+ is a for c in 0..=255 { accept(c).can_match() }
// - ? is a the thing before can_match(), or current state.is_match()
// - | means test both side

let mut start_range = *start_key.get(prefix_len).unwrap_or(&0);
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
let end_range = end_key[prefix_len];

if start_key.len() > prefix_len {
start_range += 1;
}
for c in start_range..end_range {
let new_state = automaton.accept(&base_state, c);
if automaton.can_match(&new_state) {
return true;
}
}
if start_key.len() > prefix_len {
if start_key.len() <= prefix_len {
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
// case [abcd, abcde], we need to handle \0 which wasn't processed
let new_state = automaton.accept(&base_state, start_range);
if automaton.can_match(&new_state) {
eprintln!("ho");
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
return true;
}
} else if match_range_start(&start_key[prefix_len..], &automaton, &base_state) {
return true;
}
}
match_range_end(&end_key[prefix_len..], &automaton, &base_state)
}

fn match_range_start<S, A: Automaton<State = S>>(
start_key: &[u8],
automaton: &A,
base_state: &S,
) -> bool {
// case [abcdef, abcghi], we need to handle
// - abcd[f-\xff].*
// - abcde[g-\xff].*
// - abcdef.+ == abcdef[\0-\xff].*
let mut state = automaton.accept(base_state, start_key[0]);
for start_point in &start_key[1..] {
if !automaton.can_match(&state) {
return false;
}
// handle case where start_point is \xff
if *start_point < u8::MAX {
for to_name in (start_point + 1)..=u8::MAX {
let temp_state = automaton.accept(&state, to_name);
if automaton.can_match(&temp_state) {
return true;
}
}
}
state = automaton.accept(&state, *start_point);
}

if !automaton.can_match(&state) {
return false;
}
for to_name in 0..=u8::MAX {
let temp_state = automaton.accept(&state, to_name);
if automaton.can_match(&temp_state) {
return true;
}
}
false
}

fn match_range_end<S, A: Automaton<State = S>>(
end_key: &[u8],
automaton: &A,
base_state: &S,
) -> bool {
// f([\0-f].*|g)?
// case [abcdef, abcghi], we need to handle
// - abcg[\0-g].*
// - abcgh[\0-h].*
// - abcghi
let mut state = automaton.accept(base_state, end_key[0]);
for end_point in &end_key[1..] {
if !automaton.can_match(&state) {
return false;
}
if automaton.is_match(&state) {
return true;
}
for to_name in 0..*end_point {
let temp_state = automaton.accept(&state, to_name);
if automaton.can_match(&temp_state) {
return true;
}
}
state = automaton.accept(&state, *end_point);
}

automaton.is_match(&state)
}

#[cfg(test)]
pub(crate) mod tests {
use proptest::prelude::*;
use tantivy_fst::Automaton;

use super::*;

pub(crate) struct EqBuffer(pub Vec<u8>);

impl Automaton for EqBuffer {
type State = Option<usize>;

fn start(&self) -> Self::State {
Some(0)
}

fn is_match(&self, state: &Self::State) -> bool {
*state == Some(self.0.len())
}

fn accept(&self, state: &Self::State, byte: u8) -> Self::State {
state
.filter(|pos| self.0.get(*pos) == Some(&byte))
.map(|pos| pos + 1)
}

fn can_match(&self, state: &Self::State) -> bool {
state.is_some()
}

fn will_always_match(&self, _state: &Self::State) -> bool {
false
}
}

proptest! {
#[test]
fn test_proptest_automaton_match_block(start in any::<Vec<u8>>(), end in any::<Vec<u8>>(), key in any::<Vec<u8>>()) {
// inverted keys are *not* supported and can return bogus results
if start < end && !end.is_empty() {
let expected = start < key && end >= key;
let automaton = EqBuffer(key);

assert_eq!(block_match_automaton(Some(&start), &end, &automaton), expected);
}
}

#[test]
fn test_proptest_automaton_match_first_block(end in any::<Vec<u8>>(), key in any::<Vec<u8>>()) {
if !end.is_empty() {
let expected = end >= key;
let automaton = EqBuffer(key);
assert_eq!(block_match_automaton(None, &end, &automaton), expected);
}
}
}
}
Loading
Loading