diff --git a/Cargo.toml b/Cargo.toml index 41058dcfb5..2c7cdfa9d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ tokenizer-api = { version = "0.3", path = "./tokenizer-api", package = "tantivy- sketches-ddsketch = { version = "0.3.0", features = ["use_serde"] } hyperloglogplus = { version = "0.4.1", features = ["const-loop"] } futures-util = { version = "0.3.28", optional = true } +futures-channel = { version = "0.3.28", optional = true } fnv = "1.0.7" [target.'cfg(windows)'.dependencies] @@ -120,7 +121,7 @@ zstd-compression = ["zstd"] failpoints = ["fail", "fail/failpoints"] unstable = [] # useful for benches. -quickwit = ["sstable", "futures-util"] +quickwit = ["sstable", "futures-util", "futures-channel"] # Compares only the hash of a string when indexing data. # Increases indexing speed, but may lead to extremely rare missing terms, when there's a hash collision. diff --git a/src/index/inverted_index_reader.rs b/src/index/inverted_index_reader.rs index 30685eaa44..4be9d9c51f 100644 --- a/src/index/inverted_index_reader.rs +++ b/src/index/inverted_index_reader.rs @@ -3,6 +3,12 @@ use std::io; use common::json_path_writer::JSON_END_OF_PATH; use common::BinarySerializable; use fnv::FnvHashSet; +#[cfg(feature = "quickwit")] +use futures_util::{FutureExt, StreamExt, TryStreamExt}; +#[cfg(feature = "quickwit")] +use itertools::Itertools; +#[cfg(feature = "quickwit")] +use tantivy_fst::automaton::{AlwaysMatch, Automaton}; use crate::directory::FileSlice; use crate::positions::PositionReader; @@ -219,13 +225,18 @@ impl InvertedIndexReader { self.termdict.get_async(term.serialized_value_bytes()).await } - async fn get_term_range_async( - &self, + async fn get_term_range_async<'a, A: Automaton + 'a>( + &'a self, terms: impl std::ops::RangeBounds, + automaton: A, limit: Option, - ) -> io::Result + '_> { + merge_holes_under_bytes: usize, + ) -> io::Result + 'a> + where + A::State: Clone, + { use std::ops::Bound; - let range_builder = self.termdict.range(); + let range_builder = self.termdict.search(automaton); let range_builder = match terms.start_bound() { Bound::Included(bound) => range_builder.ge(bound.serialized_value_bytes()), Bound::Excluded(bound) => range_builder.gt(bound.serialized_value_bytes()), @@ -242,7 +253,9 @@ impl InvertedIndexReader { range_builder }; - let mut stream = range_builder.into_stream_async().await?; + let mut stream = range_builder + .into_stream_async_merging_holes(merge_holes_under_bytes) + .await?; let iter = std::iter::from_fn(move || stream.next().map(|(_k, v)| v.clone())); @@ -288,7 +301,9 @@ impl InvertedIndexReader { limit: Option, with_positions: bool, ) -> io::Result { - let mut term_info = self.get_term_range_async(terms, limit).await?; + let mut term_info = self + .get_term_range_async(terms, AlwaysMatch, limit, 0) + .await?; let Some(first_terminfo) = term_info.next() else { // no key matches, nothing more to load @@ -315,6 +330,84 @@ impl InvertedIndexReader { Ok(true) } + /// Warmup a block postings given a range of `Term`s. + /// This method is for an advanced usage only. + /// + /// returns a boolean, whether a term matching the range was found in the dictionary + pub async fn warm_postings_automaton< + A: Automaton + Clone + Send + 'static, + E: FnOnce(Box io::Result<()> + Send>) -> F, + F: std::future::Future>, + >( + &self, + automaton: A, + // with_positions: bool, at the moment we have no use for it, and supporting it would add + // complexity to the coalesce + executor: E, + ) -> io::Result + where + A::State: Clone, + { + // merge holes under 4MiB, that's how many bytes we can hope to receive during a TTFB from + // S3 (~80MiB/s, and 50ms latency) + const MERGE_HOLES_UNDER_BYTES: usize = (80 * 1024 * 1024 * 50) / 1000; + // we build a first iterator to download everything. Simply calling the function already + // download everything we need from the sstable, but doesn't start iterating over it. + let _term_info_iter = self + .get_term_range_async(.., automaton.clone(), None, MERGE_HOLES_UNDER_BYTES) + .await?; + + let (sender, posting_ranges_to_load_stream) = futures_channel::mpsc::unbounded(); + let termdict = self.termdict.clone(); + let cpu_bound_task = move || { + // then we build a 2nd iterator, this one with no holes, so we don't go through blocks + // we can't match. + // This makes the assumption there is a caching layer below us, which gives sync read + // for free after the initial async access. This might not always be true, but is in + // Quickwit. + // We build things from this closure otherwise we get into lifetime issues that can only + // be solved with self referential strucs. Returning an io::Result from here is a bit + // more leaky abstraction-wise, but a lot better than the alternative + let mut stream = termdict.search(automaton).into_stream()?; + + // we could do without an iterator, but this allows us access to coalesce which simplify + // things + let posting_ranges_iter = + std::iter::from_fn(move || stream.next().map(|(_k, v)| v.postings_range.clone())); + + let merged_posting_ranges_iter = posting_ranges_iter.coalesce(|range1, range2| { + if range1.end + MERGE_HOLES_UNDER_BYTES >= range2.start { + Ok(range1.start..range2.end) + } else { + Err((range1, range2)) + } + }); + + for posting_range in merged_posting_ranges_iter { + if let Err(_) = sender.unbounded_send(posting_range) { + // this should happen only when search is cancelled + return Err(io::Error::other("failed to send posting range back")); + } + } + Ok(()) + }; + let task_handle = executor(Box::new(cpu_bound_task)); + + let posting_downloader = posting_ranges_to_load_stream + .map(|posting_slice| { + self.postings_file_slice + .read_bytes_slice_async(posting_slice) + .map(|result| result.map(|_slice| ())) + }) + .buffer_unordered(5) + .try_collect::>(); + + let (_, slices_downloaded) = + futures_util::future::try_join(task_handle, posting_downloader).await?; + + Ok(!slices_downloaded.is_empty()) + } + /// Warmup the block postings for all terms. /// This method is for an advanced usage only. /// diff --git a/src/termdict/fst_termdict/term_info_store.rs b/src/termdict/fst_termdict/term_info_store.rs index 0ad3a9d35a..1768c94118 100644 --- a/src/termdict/fst_termdict/term_info_store.rs +++ b/src/termdict/fst_termdict/term_info_store.rs @@ -93,6 +93,7 @@ impl TermInfoBlockMeta { } } +#[derive(Clone)] pub struct TermInfoStore { num_terms: usize, block_meta_bytes: OwnedBytes, diff --git a/src/termdict/fst_termdict/termdict.rs b/src/termdict/fst_termdict/termdict.rs index 23ed3606d4..fb3352d044 100644 --- a/src/termdict/fst_termdict/termdict.rs +++ b/src/termdict/fst_termdict/termdict.rs @@ -1,4 +1,5 @@ use std::io::{self, Write}; +use std::sync::Arc; use common::{BinarySerializable, CountingWriter}; use once_cell::sync::Lazy; @@ -113,8 +114,9 @@ static EMPTY_TERM_DICT_FILE: Lazy = Lazy::new(|| { /// The `Fst` crate is used to associate terms to their /// respective `TermOrdinal`. The `TermInfoStore` then makes it /// possible to fetch the associated `TermInfo`. +#[derive(Clone)] pub struct TermDictionary { - fst_index: tantivy_fst::Map, + fst_index: Arc>, term_info_store: TermInfoStore, } @@ -136,7 +138,7 @@ impl TermDictionary { let fst_index = open_fst_index(fst_file_slice)?; let term_info_store = TermInfoStore::open(values_file_slice)?; Ok(TermDictionary { - fst_index, + fst_index: Arc::new(fst_index), term_info_store, }) } diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs index 01c9591ee3..7153881251 100644 --- a/src/termdict/mod.rs +++ b/src/termdict/mod.rs @@ -74,6 +74,7 @@ const CURRENT_TYPE: DictionaryType = DictionaryType::SSTable; // TODO in the future this should become an enum of supported dictionaries /// A TermDictionary wrapping either an FST based dictionary or a SSTable based one. +#[derive(Clone)] pub struct TermDictionary(InnerTermDict); impl TermDictionary { diff --git a/src/termdict/sstable_termdict/mod.rs b/src/termdict/sstable_termdict/mod.rs index 621b85b35d..cc82eba3cd 100644 --- a/src/termdict/sstable_termdict/mod.rs +++ b/src/termdict/sstable_termdict/mod.rs @@ -28,6 +28,7 @@ pub type TermDictionaryBuilder = sstable::Writer; pub type TermStreamer<'a, A = AlwaysMatch> = sstable::Streamer<'a, TermSSTable, A>; /// SSTable used to store TermInfo objects. +#[derive(Clone)] pub struct TermSSTable; pub type TermStreamerBuilder<'a, A = AlwaysMatch> = sstable::StreamerBuilder<'a, TermSSTable, A>; diff --git a/sstable/Cargo.toml b/sstable/Cargo.toml index 91d629229b..7a27024868 100644 --- a/sstable/Cargo.toml +++ b/sstable/Cargo.toml @@ -11,6 +11,8 @@ description = "sstables for tantivy" [dependencies] common = {version= "0.7", path="../common", package="tantivy-common"} +futures-util = "0.3.30" +itertools = "0.13.0" tantivy-bitpacker = { version= "0.6", path="../bitpacker" } tantivy-fst = "0.5" # experimental gives us access to Decompressor::upper_bound diff --git a/sstable/src/block_match_automaton.rs b/sstable/src/block_match_automaton.rs new file mode 100644 index 0000000000..0c84a05e0f --- /dev/null +++ b/sstable/src/block_match_automaton.rs @@ -0,0 +1,271 @@ +use tantivy_fst::Automaton; + +/// Returns whether a block can match an automaton based on its bounds. +/// +/// start key is exclusive, and optional to account for the first block. end key is inclusive and +/// mandatory. +pub(crate) fn can_block_match_automaton( + start_key_opt: Option<&[u8]>, + end_key: &[u8], + automaton: &impl Automaton, +) -> bool { + let start_key = if let Some(start_key) = start_key_opt { + start_key + } else { + // if start_key_opt is None, we would allow an automaton matching the empty string to match + if automaton.is_match(&automaton.start()) { + return true; + } + &[] + }; + can_block_match_automaton_with_start(start_key, end_key, automaton) +} + +// similar to can_block_match_automaton, ignoring the edge case of the initial block +fn can_block_match_automaton_with_start( + start_key: &[u8], + end_key: &[u8], + automaton: &impl Automaton, +) -> bool { + // notation: in loops, we use `kb` to denotate a key byte (a byte taken from the start/end key), + // and `rb`, a range byte (usually all values higher than a `kb` when comparing with + // start_key, or all values lower than a `kb` when comparing with end_key) + + if start_key >= end_key { + return false; + } + + let common_prefix_len = crate::common_prefix_len(start_key, end_key); + + let mut base_state = automaton.start(); + for kb in &start_key[0..common_prefix_len] { + base_state = automaton.accept(&base_state, *kb); + } + + // this is not required for correctness, but allows dodging more expensive checks + if !automaton.can_match(&base_state) { + return false; + } + + // we have 3 distinct case: + // - keys are `abc` and `abcd` => we test for abc[\0-d].* + // - keys are `abcd` and `abce` => we test for abc[d-e].* + // - keys are `abcd` and `abc` => contradiction with start_key < end_key. + // + // ideally for (abc, abcde] we could test for abc([\0-c].*|d([\0-d].*|e)?) + // but let's start simple (and correct), and tighten our bounds latter + // + // and for (abcde, abcfg] we could test for abc(d(e.+|[f-\xff].*)|e.*|f([\0-f].*|g)?) + // abc ( + // d(e.+|[f-\xff].*) | + // e.* | + // f([\0-f].*|g)? + // ) + // + // these are all written as regex, but can be converted to operations we can do: + // - [x-y] is a for c in x..=y + // - .* is a can_match() + // - .+ is a for c in 0..=255 { accept(c).can_match() } + // - ? is a the thing before can_match(), or current state.is_match() + // - | means test both side + + // we have two cases, either start_key is a prefix of end_key (e.g. (abc, abcjp]), + // or it is not (e.g. (abcdg, abcjp]). It is not possible however that end_key be a prefix of + // start_key (or that both are equal) because we already handled start_key >= end_key. + // + // if we are in the first case, we want to visit the following states: + // abc ( + // [\0-i].* | + // j ( + // [\0-o].* | + // p + // )? + // ) + // Everything after `abc` is handled by `match_range_end` + // + // if we are in the 2nd case, we want to visit the following states: + // abc ( + // d(g.+|[h-\xff].*) | // this is handled by match_range_start + // + // [e-i].* | // this is handled here + // + // j ( // this is handled by match_range_end (but countrary to the other + // [\0-o].* | // case, j is already consumed so to not check [\0-i].* ) + // p + // )? + // ) + + let Some(start_range) = start_key.get(common_prefix_len) else { + return match_range_end(&end_key[common_prefix_len..], &automaton, base_state); + }; + + let end_range = end_key[common_prefix_len]; + + // things starting with start_range were handled in match_range_start + // this starting with end_range are handled bellow. + // this can run for 0 iteration in cases such as (abc, abd] + for rb in (start_range + 1)..end_range { + let new_state = automaton.accept(&base_state, rb); + if automaton.can_match(&new_state) { + return true; + } + } + + let state_for_start = automaton.accept(&base_state, *start_range); + if match_range_start( + &start_key[common_prefix_len + 1..], + &automaton, + state_for_start, + ) { + return true; + } + + let state_for_end = automaton.accept(&base_state, end_range); + if automaton.is_match(&state_for_end) { + return true; + } + match_range_end(&end_key[common_prefix_len + 1..], &automaton, state_for_end) +} + +fn match_range_start>( + start_key: &[u8], + automaton: &A, + mut state: S, +) -> bool { + // case (abcdgj, abcpqr], `abcd` is already consumed, we need to handle: + // - [h-\xff].* + // - g[k-\xff].* + // - gj.+ == gf[\0-\xff].* + + for kb in start_key { + // this is an optimisation, and is not needed for correctness + if !automaton.can_match(&state) { + return false; + } + + // does the [h-\xff].* part. we skip if kb==255 as [\{0100}-\xff] is an empty range, and + // this would overflow in our u8 world + if *kb < u8::MAX { + for rb in (kb + 1)..=u8::MAX { + let temp_state = automaton.accept(&state, rb); + if automaton.can_match(&temp_state) { + return true; + } + } + } + // push g + state = automaton.accept(&state, *kb); + } + + // this isn't required for correctness, but can save us from looping 256 below + if !automaton.can_match(&state) { + return false; + } + + // does the final `.+`, which is the same as `[\0-\xff].*` + for rb in 0..=u8::MAX { + let temp_state = automaton.accept(&state, rb); + if automaton.can_match(&temp_state) { + return true; + } + } + false +} + +fn match_range_end>( + end_key: &[u8], + automaton: &A, + mut state: S, +) -> bool { + // for (abcdef, abcmps]. the prefix `abcm` has been consumed, `[d-l].*` was handled elsewhere, + // we just need to handle + // - [\0-o].* + // - p + // - p[\0-r].* + // - ps + for kb in end_key { + // this is an optimisation, and is not needed for correctness + if !automaton.can_match(&state) { + return false; + } + + // does the `[\0-o].*` + for rb in 0..*kb { + let temp_state = automaton.accept(&state, rb); + if automaton.can_match(&temp_state) { + return true; + } + } + + // push p + state = automaton.accept(&state, *kb); + // verify the `p` case + if automaton.is_match(&state) { + return true; + } + } + false +} + +#[cfg(test)] +pub(crate) mod tests { + use proptest::prelude::*; + use tantivy_fst::Automaton; + + use super::*; + + pub(crate) struct EqBuffer(pub Vec); + + impl Automaton for EqBuffer { + type State = Option; + + fn start(&self) -> Self::State { + Some(0) + } + + fn is_match(&self, state: &Self::State) -> bool { + *state == Some(self.0.len()) + } + + fn accept(&self, state: &Self::State, byte: u8) -> Self::State { + state + .filter(|pos| self.0.get(*pos) == Some(&byte)) + .map(|pos| pos + 1) + } + + fn can_match(&self, state: &Self::State) -> bool { + state.is_some() + } + + fn will_always_match(&self, _state: &Self::State) -> bool { + false + } + } + + fn gen_key_strategy() -> impl Strategy> { + // we only generate bytes in [0, 1, 2, 254, 255] to reduce the search space without + // ignoring edge cases that might ocure with integer over/underflow + proptest::collection::vec(prop_oneof![0u8..=2, 254u8..=255], 0..5) + } + + proptest! { + #![proptest_config(ProptestConfig { + cases: 10000, .. ProptestConfig::default() + })] + + #[test] + fn test_proptest_automaton_match_block(start in gen_key_strategy(), end in gen_key_strategy(), key in gen_key_strategy()) { + let expected = start < key && end >= key; + let automaton = EqBuffer(key); + + assert_eq!(can_block_match_automaton(Some(&start), &end, &automaton), expected); + } + + #[test] + fn test_proptest_automaton_match_first_block(end in gen_key_strategy(), key in gen_key_strategy()) { + let expected = end >= key; + let automaton = EqBuffer(key); + assert_eq!(can_block_match_automaton(None, &end, &automaton), expected); + } + } +} diff --git a/sstable/src/block_reader.rs b/sstable/src/block_reader.rs index ee01ed5705..3299c3b4ce 100644 --- a/sstable/src/block_reader.rs +++ b/sstable/src/block_reader.rs @@ -7,6 +7,7 @@ use zstd::bulk::Decompressor; pub struct BlockReader { buffer: Vec, reader: OwnedBytes, + next_readers: std::vec::IntoIter, offset: usize, } @@ -15,6 +16,18 @@ impl BlockReader { BlockReader { buffer: Vec::new(), reader, + next_readers: Vec::new().into_iter(), + offset: 0, + } + } + + pub fn from_multiple_blocks(readers: Vec) -> BlockReader { + let mut next_readers = readers.into_iter(); + let reader = next_readers.next().unwrap_or_else(OwnedBytes::empty); + BlockReader { + buffer: Vec::new(), + reader, + next_readers, offset: 0, } } @@ -34,42 +47,52 @@ impl BlockReader { self.offset = 0; self.buffer.clear(); - let block_len = match self.reader.len() { - 0 => return Ok(false), - 1..=3 => { + loop { + let block_len = match self.reader.len() { + 0 => { + // we are out of data for this block. Check if we have another block after + if let Some(new_reader) = self.next_readers.next() { + self.reader = new_reader; + continue; + } else { + return Ok(false); + } + } + 1..=3 => { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "failed to read block_len", + )) + } + _ => self.reader.read_u32() as usize, + }; + if block_len <= 1 { + return Ok(false); + } + let compress = self.reader.read_u8(); + let block_len = block_len - 1; + + if self.reader.len() < block_len { return Err(io::Error::new( io::ErrorKind::UnexpectedEof, - "failed to read block_len", - )) + "failed to read block content", + )); + } + if compress == 1 { + let required_capacity = + Decompressor::upper_bound(&self.reader[..block_len]).unwrap_or(1024 * 1024); + self.buffer.reserve(required_capacity); + Decompressor::new()? + .decompress_to_buffer(&self.reader[..block_len], &mut self.buffer)?; + + self.reader.advance(block_len); + } else { + self.buffer.resize(block_len, 0u8); + self.reader.read_exact(&mut self.buffer[..])?; } - _ => self.reader.read_u32() as usize, - }; - if block_len <= 1 { - return Ok(false); - } - let compress = self.reader.read_u8(); - let block_len = block_len - 1; - - if self.reader.len() < block_len { - return Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "failed to read block content", - )); - } - if compress == 1 { - let required_capacity = - Decompressor::upper_bound(&self.reader[..block_len]).unwrap_or(1024 * 1024); - self.buffer.reserve(required_capacity); - Decompressor::new()? - .decompress_to_buffer(&self.reader[..block_len], &mut self.buffer)?; - - self.reader.advance(block_len); - } else { - self.buffer.resize(block_len, 0u8); - self.reader.read_exact(&mut self.buffer[..])?; - } - Ok(true) + return Ok(true); + } } #[inline(always)] diff --git a/sstable/src/delta.rs b/sstable/src/delta.rs index 2b8a332d1e..e627f987eb 100644 --- a/sstable/src/delta.rs +++ b/sstable/src/delta.rs @@ -143,6 +143,16 @@ where TValueReader: value::ValueReader } } + pub fn from_multiple_blocks(reader: Vec) -> Self { + DeltaReader { + idx: 0, + common_prefix_len: 0, + suffix_range: 0..0, + value_reader: TValueReader::default(), + block_reader: BlockReader::from_multiple_blocks(reader), + } + } + pub fn empty() -> Self { DeltaReader::new(OwnedBytes::empty()) } diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index 7c61df09a8..68708f202a 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -7,6 +7,8 @@ use std::sync::Arc; use common::bounds::{transform_bound_inner_res, TransformBound}; use common::file_slice::FileSlice; use common::{BinarySerializable, OwnedBytes}; +use futures_util::{stream, StreamExt, TryStreamExt}; +use itertools::Itertools; use tantivy_fst::automaton::AlwaysMatch; use tantivy_fst::Automaton; @@ -98,20 +100,52 @@ impl Dictionary { &self, key_range: impl RangeBounds<[u8]>, limit: Option, + automaton: &impl Automaton, + merge_holes_under_bytes: usize, ) -> io::Result> { - let slice = self.file_slice_for_range(key_range, limit); - let data = slice.read_bytes_async().await?; - Ok(TSSTable::delta_reader(data)) + let match_all = automaton.will_always_match(&automaton.start()); + if match_all { + let slice = self.file_slice_for_range(key_range, limit); + let data = slice.read_bytes_async().await?; + Ok(TSSTable::delta_reader(data)) + } else { + let blocks = stream::iter(self.get_block_iterator_for_range_and_automaton( + key_range, + automaton, + merge_holes_under_bytes, + )); + let data = blocks + .map(|block_addr| { + self.sstable_slice + .read_bytes_slice_async(block_addr.byte_range) + }) + .buffered(5) + .try_collect::>() + .await?; + Ok(DeltaReader::from_multiple_blocks(data)) + } } pub(crate) fn sstable_delta_reader_for_key_range( &self, key_range: impl RangeBounds<[u8]>, limit: Option, + automaton: &impl Automaton, ) -> io::Result> { - let slice = self.file_slice_for_range(key_range, limit); - let data = slice.read_bytes()?; - Ok(TSSTable::delta_reader(data)) + let match_all = automaton.will_always_match(&automaton.start()); + if match_all { + let slice = self.file_slice_for_range(key_range, limit); + let data = slice.read_bytes()?; + Ok(TSSTable::delta_reader(data)) + } else { + // if operations are sync, we assume latency is almost null, and there is no point in + // merging accross holes + let blocks = self.get_block_iterator_for_range_and_automaton(key_range, automaton, 0); + let data = blocks + .map(|block_addr| self.sstable_slice.read_bytes_slice(block_addr.byte_range)) + .collect::, _>>()?; + Ok(DeltaReader::from_multiple_blocks(data)) + } } pub(crate) fn sstable_delta_reader_block( @@ -204,6 +238,42 @@ impl Dictionary { self.sstable_slice.slice((start_bound, end_bound)) } + fn get_block_iterator_for_range_and_automaton<'a>( + &'a self, + key_range: impl RangeBounds<[u8]>, + automaton: &'a impl Automaton, + merge_holes_under_bytes: usize, + ) -> impl Iterator + 'a { + let lower_bound = match key_range.start_bound() { + Bound::Included(key) | Bound::Excluded(key) => { + self.sstable_index.locate_with_key(key).unwrap_or(u64::MAX) + } + Bound::Unbounded => 0, + }; + + let upper_bound = match key_range.end_bound() { + Bound::Included(key) | Bound::Excluded(key) => { + self.sstable_index.locate_with_key(key).unwrap_or(u64::MAX) + } + Bound::Unbounded => u64::MAX, + }; + let block_range = lower_bound..=upper_bound; + self.sstable_index + .get_block_for_automaton(automaton) + .filter(move |(block_id, _)| block_range.contains(block_id)) + .map(|(_, block_addr)| block_addr) + .coalesce(move |first, second| { + if first.byte_range.end + merge_holes_under_bytes >= second.byte_range.start { + Ok(BlockAddr { + first_ordinal: first.first_ordinal, + byte_range: first.byte_range.start..second.byte_range.end, + }) + } else { + Err((first, second)) + } + }) + } + /// Opens a `TermDictionary`. pub fn open(term_dictionary_file: FileSlice) -> io::Result { let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(20); diff --git a/sstable/src/lib.rs b/sstable/src/lib.rs index adcff4939b..f872d2e19c 100644 --- a/sstable/src/lib.rs +++ b/sstable/src/lib.rs @@ -3,6 +3,7 @@ use std::ops::Range; use merge::ValueMerger; +mod block_match_automaton; mod delta; mod dictionary; pub mod merge; diff --git a/sstable/src/sstable_index_v2.rs b/sstable/src/sstable_index_v2.rs index d7c97c13a7..f0aa83ab0b 100644 --- a/sstable/src/sstable_index_v2.rs +++ b/sstable/src/sstable_index_v2.rs @@ -1,10 +1,12 @@ use common::OwnedBytes; +use tantivy_fst::Automaton; +use crate::block_match_automaton::can_block_match_automaton; use crate::{BlockAddr, SSTable, SSTableDataCorruption, TermOrdinal}; #[derive(Default, Debug, Clone)] pub struct SSTableIndex { - blocks: Vec, + pub(crate) blocks: Vec, } impl SSTableIndex { @@ -74,6 +76,31 @@ impl SSTableIndex { // locate_with_ord always returns an index within range self.get_block(self.locate_with_ord(ord)).unwrap() } + + pub(crate) fn get_block_for_automaton<'a>( + &'a self, + automaton: &'a impl Automaton, + ) -> impl Iterator + 'a { + std::iter::once((None, &self.blocks[0])) + .chain(self.blocks.windows(2).map(|window| { + let [prev, curr] = window else { + unreachable!(); + }; + (Some(&*prev.last_key_or_greater), curr) + })) + .enumerate() + .filter_map(move |(pos, (prev_key, current_block))| { + if can_block_match_automaton( + prev_key, + ¤t_block.last_key_or_greater, + automaton, + ) { + Some((pos as u64, current_block.block_addr.clone())) + } else { + None + } + }) + } } #[derive(Debug, Clone)] @@ -99,3 +126,106 @@ impl SSTable for IndexSSTable { type ValueWriter = crate::value::index::IndexValueWriter; } + +#[cfg(test)] +mod tests { + use super::*; + use crate::block_match_automaton::tests::EqBuffer; + + #[test] + fn test_get_block_for_automaton() { + let sstable = SSTableIndex { + blocks: vec![ + BlockMeta { + last_key_or_greater: vec![0, 1, 2], + block_addr: BlockAddr { + first_ordinal: 0, + byte_range: 0..10, + }, + }, + BlockMeta { + last_key_or_greater: vec![0, 2, 2], + block_addr: BlockAddr { + first_ordinal: 5, + byte_range: 10..20, + }, + }, + BlockMeta { + last_key_or_greater: vec![0, 3, 2], + block_addr: BlockAddr { + first_ordinal: 10, + byte_range: 20..30, + }, + }, + ], + }; + + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 1, 1])) + .collect::>(); + assert_eq!( + res, + vec![( + 0, + BlockAddr { + first_ordinal: 0, + byte_range: 0..10 + } + )] + ); + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 2, 1])) + .collect::>(); + assert_eq!( + res, + vec![( + 1, + BlockAddr { + first_ordinal: 5, + byte_range: 10..20 + } + )] + ); + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 3, 1])) + .collect::>(); + assert_eq!( + res, + vec![( + 2, + BlockAddr { + first_ordinal: 10, + byte_range: 20..30 + } + )] + ); + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 4, 1])) + .collect::>(); + assert!(res.is_empty()); + + let complex_automaton = EqBuffer(vec![0, 1, 1]).union(EqBuffer(vec![0, 3, 1])); + let res = sstable + .get_block_for_automaton(&complex_automaton) + .collect::>(); + assert_eq!( + res, + vec![ + ( + 0, + BlockAddr { + first_ordinal: 0, + byte_range: 0..10 + } + ), + ( + 2, + BlockAddr { + first_ordinal: 10, + byte_range: 20..30 + } + ) + ] + ); + } +} diff --git a/sstable/src/sstable_index_v3.rs b/sstable/src/sstable_index_v3.rs index 8206ab242a..1c99c36e5f 100644 --- a/sstable/src/sstable_index_v3.rs +++ b/sstable/src/sstable_index_v3.rs @@ -5,8 +5,9 @@ use std::sync::Arc; use common::{BinarySerializable, FixedSize, OwnedBytes}; use tantivy_bitpacker::{compute_num_bits, BitPacker}; use tantivy_fst::raw::Fst; -use tantivy_fst::{IntoStreamer, Map, MapBuilder, Streamer}; +use tantivy_fst::{Automaton, IntoStreamer, Map, MapBuilder, Streamer}; +use crate::block_match_automaton::can_block_match_automaton; use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal}; #[derive(Debug, Clone)] @@ -64,6 +65,41 @@ impl SSTableIndex { SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_ord(ord), } } + + pub fn get_block_for_automaton<'a>( + &'a self, + automaton: &'a impl Automaton, + ) -> impl Iterator + 'a { + match self { + SSTableIndex::V2(v2_index) => { + BlockIter::V2(v2_index.get_block_for_automaton(automaton)) + } + SSTableIndex::V3(v3_index) => { + BlockIter::V3(v3_index.get_block_for_automaton(automaton)) + } + SSTableIndex::V3Empty(v3_empty) => { + BlockIter::V3Empty(std::iter::once((0, v3_empty.block_addr.clone()))) + } + } + } +} + +enum BlockIter { + V2(V2), + V3(V3), + V3Empty(std::iter::Once), +} + +impl, V3: Iterator, T> Iterator for BlockIter { + type Item = T; + + fn next(&mut self) -> Option { + match self { + BlockIter::V2(v2) => v2.next(), + BlockIter::V3(v3) => v3.next(), + BlockIter::V3Empty(once) => once.next(), + } + } } #[derive(Debug, Clone)] @@ -123,6 +159,59 @@ impl SSTableIndexV3 { pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr { self.block_addr_store.binary_search_ord(ord).1 } + + pub(crate) fn get_block_for_automaton<'a>( + &'a self, + automaton: &'a impl Automaton, + ) -> impl Iterator + 'a { + // this is more complicated than other index formats: we don't have a ready made list of + // blocks, and instead need to stream-decode the sstable. + + GetBlockForAutomaton { + streamer: self.fst_index.stream(), + block_addr_store: &self.block_addr_store, + prev_key: None, + automaton, + } + } +} + +// TODO we iterate over the entire Map to find matching blocks, +// we could manually iterate on the underlying Fst and skip whole branches if our Automaton says +// cannot match. this isn't as bad as it sounds given the fst is a lot smaller than the rest of the +// sstable. +// To do that, we can't use tantivy_fst's Stream with an automaton, as we need to know 2 consecutive +// fst keys to form a proper opinion on whether this is a match, which we wan't translate into a +// single automaton +struct GetBlockForAutomaton<'a, A: Automaton> { + streamer: tantivy_fst::map::Stream<'a>, + block_addr_store: &'a BlockAddrStore, + prev_key: Option>, + automaton: &'a A, +} + +impl Iterator for GetBlockForAutomaton<'_, A> { + type Item = (u64, BlockAddr); + + fn next(&mut self) -> Option { + while let Some((new_key, block_id)) = self.streamer.next() { + if let Some(prev_key) = self.prev_key.as_mut() { + if can_block_match_automaton(Some(prev_key), new_key, self.automaton) { + prev_key.clear(); + prev_key.extend_from_slice(new_key); + return Some((block_id, self.block_addr_store.get(block_id).unwrap())); + } + prev_key.clear(); + prev_key.extend_from_slice(new_key); + } else { + self.prev_key = Some(new_key.to_owned()); + if can_block_match_automaton(None, new_key, self.automaton) { + return Some((block_id, self.block_addr_store.get(block_id).unwrap())); + } + } + } + None + } } #[derive(Debug, Clone)] @@ -734,7 +823,8 @@ fn find_best_slope(elements: impl Iterator + Clone) -> (u32 mod tests { use common::OwnedBytes; - use super::{BlockAddr, SSTableIndexBuilder, SSTableIndexV3}; + use super::*; + use crate::block_match_automaton::tests::EqBuffer; use crate::SSTableDataCorruption; #[test] @@ -823,4 +913,108 @@ mod tests { (12345, 1) ); } + + #[test] + fn test_get_block_for_automaton() { + let sstable_index_builder = SSTableIndexBuilder { + blocks: vec![ + BlockMeta { + last_key_or_greater: vec![0, 1, 2], + block_addr: BlockAddr { + first_ordinal: 0, + byte_range: 0..10, + }, + }, + BlockMeta { + last_key_or_greater: vec![0, 2, 2], + block_addr: BlockAddr { + first_ordinal: 5, + byte_range: 10..20, + }, + }, + BlockMeta { + last_key_or_greater: vec![0, 3, 2], + block_addr: BlockAddr { + first_ordinal: 10, + byte_range: 20..30, + }, + }, + ], + }; + + let mut sstable_index_bytes = Vec::new(); + let fst_len = sstable_index_builder + .serialize(&mut sstable_index_bytes) + .unwrap(); + + let sstable = SSTableIndexV3::load(OwnedBytes::new(sstable_index_bytes), fst_len).unwrap(); + + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 1, 1])) + .collect::>(); + assert_eq!( + res, + vec![( + 0, + BlockAddr { + first_ordinal: 0, + byte_range: 0..10 + } + )] + ); + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 2, 1])) + .collect::>(); + assert_eq!( + res, + vec![( + 1, + BlockAddr { + first_ordinal: 5, + byte_range: 10..20 + } + )] + ); + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 3, 1])) + .collect::>(); + assert_eq!( + res, + vec![( + 2, + BlockAddr { + first_ordinal: 10, + byte_range: 20..30 + } + )] + ); + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 4, 1])) + .collect::>(); + assert!(res.is_empty()); + + let complex_automaton = EqBuffer(vec![0, 1, 1]).union(EqBuffer(vec![0, 3, 1])); + let res = sstable + .get_block_for_automaton(&complex_automaton) + .collect::>(); + assert_eq!( + res, + vec![ + ( + 0, + BlockAddr { + first_ordinal: 0, + byte_range: 0..10 + } + ), + ( + 2, + BlockAddr { + first_ordinal: 10, + byte_range: 20..30 + } + ) + ] + ); + } } diff --git a/sstable/src/streamer.rs b/sstable/src/streamer.rs index 7ae4bab14e..f0f052c33a 100644 --- a/sstable/src/streamer.rs +++ b/sstable/src/streamer.rs @@ -86,16 +86,24 @@ where bound_as_byte_slice(&self.upper), ); self.term_dict - .sstable_delta_reader_for_key_range(key_range, self.limit) + .sstable_delta_reader_for_key_range(key_range, self.limit, &self.automaton) } - async fn delta_reader_async(&self) -> io::Result> { + async fn delta_reader_async( + &self, + merge_holes_under_bytes: usize, + ) -> io::Result> { let key_range = ( bound_as_byte_slice(&self.lower), bound_as_byte_slice(&self.upper), ); self.term_dict - .sstable_delta_reader_for_key_range_async(key_range, self.limit) + .sstable_delta_reader_for_key_range_async( + key_range, + self.limit, + &self.automaton, + merge_holes_under_bytes, + ) .await } @@ -130,7 +138,16 @@ where /// See `into_stream(..)` pub async fn into_stream_async(self) -> io::Result> { - let delta_reader = self.delta_reader_async().await?; + self.into_stream_async_merging_holes(0).await + } + + /// Same as `into_stream_async`, but tries to issue a single io operation when requesting + /// blocks that are not consecutive, but also less than `merge_holes_under_bytes` bytes appart. + pub async fn into_stream_async_merging_holes( + self, + merge_holes_under_bytes: usize, + ) -> io::Result> { + let delta_reader = self.delta_reader_async(merge_holes_under_bytes).await?; self.into_stream_given_delta_reader(delta_reader) } @@ -327,4 +344,7 @@ mod tests { assert!(!term_streamer.advance()); Ok(()) } + + // TODO add test for sparse search with a block of poison (starts with 0xffffffff) => such a + // block instantly causes an unexpected EOF error }