Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow warming partially an sstable for an automaton #2559

Merged
merged 15 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ tokenizer-api = { version = "0.3", path = "./tokenizer-api", package = "tantivy-
sketches-ddsketch = { version = "0.3.0", features = ["use_serde"] }
hyperloglogplus = { version = "0.4.1", features = ["const-loop"] }
futures-util = { version = "0.3.28", optional = true }
futures-channel = { version = "0.3.28", optional = true }
fnv = "1.0.7"

[target.'cfg(windows)'.dependencies]
Expand Down Expand Up @@ -120,7 +121,7 @@ zstd-compression = ["zstd"]
failpoints = ["fail", "fail/failpoints"]
unstable = [] # useful for benches.

quickwit = ["sstable", "futures-util"]
quickwit = ["sstable", "futures-util", "futures-channel"]

# Compares only the hash of a string when indexing data.
# Increases indexing speed, but may lead to extremely rare missing terms, when there's a hash collision.
Expand Down
104 changes: 98 additions & 6 deletions src/index/inverted_index_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ use std::io;
use common::json_path_writer::JSON_END_OF_PATH;
use common::BinarySerializable;
use fnv::FnvHashSet;
#[cfg(feature = "quickwit")]
use futures_util::{FutureExt, StreamExt, TryStreamExt};
#[cfg(feature = "quickwit")]
use itertools::Itertools;
#[cfg(feature = "quickwit")]
use tantivy_fst::automaton::{AlwaysMatch, Automaton};

use crate::directory::FileSlice;
use crate::positions::PositionReader;
Expand Down Expand Up @@ -219,13 +225,18 @@ impl InvertedIndexReader {
self.termdict.get_async(term.serialized_value_bytes()).await
}

async fn get_term_range_async(
&self,
async fn get_term_range_async<'a, A: Automaton + 'a>(
&'a self,
terms: impl std::ops::RangeBounds<Term>,
automaton: A,
limit: Option<u64>,
) -> io::Result<impl Iterator<Item = TermInfo> + '_> {
merge_holes_under_bytes: usize,
) -> io::Result<impl Iterator<Item = TermInfo> + 'a>
where
A::State: Clone,
{
use std::ops::Bound;
let range_builder = self.termdict.range();
let range_builder = self.termdict.search(automaton);
let range_builder = match terms.start_bound() {
Bound::Included(bound) => range_builder.ge(bound.serialized_value_bytes()),
Bound::Excluded(bound) => range_builder.gt(bound.serialized_value_bytes()),
Expand All @@ -242,7 +253,9 @@ impl InvertedIndexReader {
range_builder
};

let mut stream = range_builder.into_stream_async().await?;
let mut stream = range_builder
.into_stream_async_merging_holes(merge_holes_under_bytes)
.await?;

let iter = std::iter::from_fn(move || stream.next().map(|(_k, v)| v.clone()));

Expand Down Expand Up @@ -288,7 +301,9 @@ impl InvertedIndexReader {
limit: Option<u64>,
with_positions: bool,
) -> io::Result<bool> {
let mut term_info = self.get_term_range_async(terms, limit).await?;
let mut term_info = self
.get_term_range_async(terms, AlwaysMatch, limit, 0)
.await?;

let Some(first_terminfo) = term_info.next() else {
// no key matches, nothing more to load
Expand All @@ -315,6 +330,83 @@ impl InvertedIndexReader {
Ok(true)
}

/// Warmup a block postings given a range of `Term`s.
/// This method is for an advanced usage only.
///
/// returns a boolean, whether a term matching the range was found in the dictionary
pub async fn warm_postings_automaton<
A: Automaton + Clone + Send + 'static,
E: FnOnce(Box<dyn FnOnce() -> io::Result<()> + Send>) -> F,
F: std::future::Future<Output = io::Result<()>>,
>(
&self,
automaton: A,
// with_positions: bool, at the moment we have no use for it, and supporting it would add
// complexity to the coalesce
executor: E,
) -> io::Result<bool>
where
A::State: Clone,
{
// merge holes under 4MiB, that's how many bytes we can hope to receive during a TTFB from
// S3 (~80MiB/s, and 50ms latency)
const MERGE_HOLES_UNDER_BYTES: usize = (80 * 1024 * 1024 * 50) / 1000;
// we build a first iterator to download everything. Simply calling the function already
// download everything we need from the sstable, but doesn't start iterating over it.
let _term_info_iter = self
.get_term_range_async(.., automaton.clone(), None, MERGE_HOLES_UNDER_BYTES)
.await?;

let (sender, posting_ranges_to_load_stream) = futures_channel::mpsc::unbounded();
let termdict = self.termdict.clone();
let cpu_bound_task = move || {
// then we build a 2nd iterator, this one with no holes, so we don't go through blocks
// we can't match.
// This makes the assumption there is a caching layer below us, which gives sync read
// for free after the initial async access. This might not always be true, but is in
// Quickwit.
// We build things from this closure otherwise we get into lifetime issues that can only
// be solved with self referential strucs. Returning an io::Result from here is a bit
// more leaky abstraction-wise, but a lot better than the alternative
let mut stream = termdict.search(automaton).into_stream()?;

// we could do without an iterator, but this allows us access to coalesce which simplify
// things
let posting_ranges_iter =
std::iter::from_fn(move || stream.next().map(|(_k, v)| v.postings_range.clone()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we break it into several statement.

For instance:

  • Create the iterator of postings ranges.
  • doe the coalesce thing after.

.coalesce(|range1, range2| {
if range1.end + MERGE_HOLES_UNDER_BYTES >= range2.start {
Ok(range1.start..range2.end)
} else {
Err((range1, range2))
}
});

for posting_range in posting_ranges_iter {
if let Err(_) = sender.unbounded_send(posting_range) {
// this should happen only when search is cancelled
return Err(io::Error::other("failed to send posting range back"));
}
}
Ok(())
};
let task_handle = executor(Box::new(cpu_bound_task));

let posting_downloader = posting_ranges_to_load_stream
.map(|posting_slice| {
self.postings_file_slice
.read_bytes_slice_async(posting_slice)
.map(|result| result.map(|_slice| ()))
})
.buffer_unordered(5)
.try_collect::<Vec<()>>();

let (_, slices_downloaded) =
futures_util::future::try_join(task_handle, posting_downloader).await?;

Ok(!slices_downloaded.is_empty())
}

/// Warmup the block postings for all terms.
/// This method is for an advanced usage only.
///
Expand Down
1 change: 1 addition & 0 deletions src/termdict/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ const CURRENT_TYPE: DictionaryType = DictionaryType::SSTable;

// TODO in the future this should become an enum of supported dictionaries
/// A TermDictionary wrapping either an FST based dictionary or a SSTable based one.
#[cfg_attr(feature = "quickwit", derive(Clone))]
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
pub struct TermDictionary(InnerTermDict);

impl TermDictionary {
Expand Down
1 change: 1 addition & 0 deletions src/termdict/sstable_termdict/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub type TermDictionaryBuilder<W> = sstable::Writer<W, TermInfoValueWriter>;
pub type TermStreamer<'a, A = AlwaysMatch> = sstable::Streamer<'a, TermSSTable, A>;

/// SSTable used to store TermInfo objects.
#[derive(Clone)]
pub struct TermSSTable;

pub type TermStreamerBuilder<'a, A = AlwaysMatch> = sstable::StreamerBuilder<'a, TermSSTable, A>;
Expand Down
2 changes: 2 additions & 0 deletions sstable/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ description = "sstables for tantivy"

[dependencies]
common = {version= "0.7", path="../common", package="tantivy-common"}
futures-util = "0.3.30"
itertools = "0.13.0"
tantivy-bitpacker = { version= "0.6", path="../bitpacker" }
tantivy-fst = "0.5"
# experimental gives us access to Decompressor::upper_bound
Expand Down
Loading
Loading