From 3c30a41c1455cdbd67e415f7c00351bb87652e02 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Sat, 13 Jul 2024 16:08:30 +0200 Subject: [PATCH 01/15] add helper to figure if block can match automaton --- sstable/src/block_match_automaton.rs | 209 +++++++++++++++++++++++++++ sstable/src/lib.rs | 1 + 2 files changed, 210 insertions(+) create mode 100644 sstable/src/block_match_automaton.rs diff --git a/sstable/src/block_match_automaton.rs b/sstable/src/block_match_automaton.rs new file mode 100644 index 0000000000..9a481a04ed --- /dev/null +++ b/sstable/src/block_match_automaton.rs @@ -0,0 +1,209 @@ +use tantivy_fst::Automaton; + +/// Returns whether a block whose starting key (exclusive) and final key (inclusive) can match the +/// provided automaton, without actually looking at the block content. +pub(crate) fn block_match_automaton( + start_key: Option<&[u8]>, + end_key: &[u8], + automaton: &impl Automaton, +) -> bool { + let initial_block = start_key.is_none(); + let start_key = start_key.unwrap_or(&[]); + + debug_assert!(start_key <= end_key); + + let prefix_len = start_key + .iter() + .zip(end_key) + .take_while(|(c1, c2)| c1 == c2) + .count(); + + let mut base_state = automaton.start(); + for c in &start_key[0..prefix_len] { + base_state = automaton.accept(&base_state, *c); + } + if !automaton.can_match(&base_state) { + return false; + } + + if initial_block && automaton.is_match(&base_state) { + // for other blocks, the start_key is exclusive, and a prefix of it would be in a + // previous block + return true; + } + + // we have 3 distinct case: + // - keys are `abc` and `abcd` => we test for abc[\0-d].* + // - keys are `abcd` and `abce` => we test for abc[d-e].* + // - keys are `abcd` and `abc` => contradiction with start_key < end_key. + // + // ideally for [abc, abcde] we could test for abc([\0-c].*|d([\0-d].*|e)?) + // but let's start simple (and correct), and tighten our bounds latter + // + // and for [abcde, abcfg] we could test for abc(d(e.+|[f-\xff].*)|e.*|f([\0-f].*|g)?) + // abc ( + // d(e.+|[f-\xff].*) | + // e.* | + // f([\0-f].*|g)? + // ) + // + // these are all written as regex, but can be converted to operations we can do: + // - [x-y] is a for c in x..y + // - .* is a can_match() + // - .+ is a for c in 0..=255 { accept(c).can_match() } + // - ? is a the thing before can_match(), or current state.is_match() + // - | means test both side + + let mut start_range = *start_key.get(prefix_len).unwrap_or(&0); + let end_range = end_key[prefix_len]; + + if start_key.len() > prefix_len { + start_range += 1; + } + for c in start_range..end_range { + let new_state = automaton.accept(&base_state, c); + if automaton.can_match(&new_state) { + return true; + } + } + if start_key.len() > prefix_len { + if start_key.len() <= prefix_len { + // case [abcd, abcde], we need to handle \0 which wasn't processed + let new_state = automaton.accept(&base_state, start_range); + if automaton.can_match(&new_state) { + eprintln!("ho"); + return true; + } + } else if match_range_start(&start_key[prefix_len..], &automaton, &base_state) { + return true; + } + } + match_range_end(&end_key[prefix_len..], &automaton, &base_state) +} + +fn match_range_start>( + start_key: &[u8], + automaton: &A, + base_state: &S, +) -> bool { + // case [abcdef, abcghi], we need to handle + // - abcd[f-\xff].* + // - abcde[g-\xff].* + // - abcdef.+ == abcdef[\0-\xff].* + let mut state = automaton.accept(base_state, start_key[0]); + for start_point in &start_key[1..] { + if !automaton.can_match(&state) { + return false; + } + // handle case where start_point is \xff + if *start_point < u8::MAX { + for to_name in (start_point + 1)..=u8::MAX { + let temp_state = automaton.accept(&state, to_name); + if automaton.can_match(&temp_state) { + return true; + } + } + } + state = automaton.accept(&state, *start_point); + } + + if !automaton.can_match(&state) { + return false; + } + for to_name in 0..=u8::MAX { + let temp_state = automaton.accept(&state, to_name); + if automaton.can_match(&temp_state) { + return true; + } + } + false +} + +fn match_range_end>( + end_key: &[u8], + automaton: &A, + base_state: &S, +) -> bool { + // f([\0-f].*|g)? + // case [abcdef, abcghi], we need to handle + // - abcg[\0-g].* + // - abcgh[\0-h].* + // - abcghi + let mut state = automaton.accept(base_state, end_key[0]); + for end_point in &end_key[1..] { + if !automaton.can_match(&state) { + return false; + } + if automaton.is_match(&state) { + return true; + } + for to_name in 0..*end_point { + let temp_state = automaton.accept(&state, to_name); + if automaton.can_match(&temp_state) { + return true; + } + } + state = automaton.accept(&state, *end_point); + } + + automaton.is_match(&state) +} + +#[cfg(test)] +mod tests { + use proptest::prelude::*; + use tantivy_fst::Automaton; + + use super::*; + + struct EqBuffer(Vec); + + impl Automaton for EqBuffer { + type State = Option; + + fn start(&self) -> Self::State { + Some(0) + } + + fn is_match(&self, state: &Self::State) -> bool { + *state == Some(self.0.len()) + } + + fn accept(&self, state: &Self::State, byte: u8) -> Self::State { + state + .filter(|pos| self.0.get(*pos) == Some(&byte)) + .map(|pos| pos + 1) + } + + fn can_match(&self, state: &Self::State) -> bool { + state.is_some() + } + + fn will_always_match(&self, _state: &Self::State) -> bool { + false + } + } + + proptest! { + #![proptest_config(ProptestConfig::with_cases(1_000_000_000))] + #[test] + fn test_proptest_automaton_match_block(start in any::>(), end in any::>(), key in any::>()) { + // inverted keys are *not* supported and can return bogus results + if start < end && !end.is_empty() { + let expected = start < key && end >= key; + let automaton = EqBuffer(key); + + assert_eq!(block_match_automaton(Some(&start), &end, &automaton), expected); + } + } + + #[test] + fn test_proptest_automaton_match_first_block(end in any::>(), key in any::>()) { + if !end.is_empty() { + let expected = end >= key; + let automaton = EqBuffer(key); + assert_eq!(block_match_automaton(None, &end, &automaton), expected); + } + } + } +} diff --git a/sstable/src/lib.rs b/sstable/src/lib.rs index adcff4939b..f872d2e19c 100644 --- a/sstable/src/lib.rs +++ b/sstable/src/lib.rs @@ -3,6 +3,7 @@ use std::ops::Range; use merge::ValueMerger; +mod block_match_automaton; mod delta; mod dictionary; pub mod merge; From 7e901f523b3d7e6ec8a82caef5cd1228e5083111 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Sat, 13 Jul 2024 18:39:15 +0200 Subject: [PATCH 02/15] get iter for blocks of sstable matching automaton --- sstable/src/block_match_automaton.rs | 5 +- sstable/src/sstable_index_v2.rs | 128 ++++++++++++++++- sstable/src/sstable_index_v3.rs | 200 ++++++++++++++++++++++++++- 3 files changed, 327 insertions(+), 6 deletions(-) diff --git a/sstable/src/block_match_automaton.rs b/sstable/src/block_match_automaton.rs index 9a481a04ed..3bb75b883b 100644 --- a/sstable/src/block_match_automaton.rs +++ b/sstable/src/block_match_automaton.rs @@ -150,13 +150,13 @@ fn match_range_end>( } #[cfg(test)] -mod tests { +pub(crate) mod tests { use proptest::prelude::*; use tantivy_fst::Automaton; use super::*; - struct EqBuffer(Vec); + pub(crate) struct EqBuffer(pub Vec); impl Automaton for EqBuffer { type State = Option; @@ -185,7 +185,6 @@ mod tests { } proptest! { - #![proptest_config(ProptestConfig::with_cases(1_000_000_000))] #[test] fn test_proptest_automaton_match_block(start in any::>(), end in any::>(), key in any::>()) { // inverted keys are *not* supported and can return bogus results diff --git a/sstable/src/sstable_index_v2.rs b/sstable/src/sstable_index_v2.rs index d7c97c13a7..a10731fc08 100644 --- a/sstable/src/sstable_index_v2.rs +++ b/sstable/src/sstable_index_v2.rs @@ -1,10 +1,12 @@ use common::OwnedBytes; +use tantivy_fst::Automaton; +use crate::block_match_automaton::block_match_automaton; use crate::{BlockAddr, SSTable, SSTableDataCorruption, TermOrdinal}; #[derive(Default, Debug, Clone)] pub struct SSTableIndex { - blocks: Vec, + pub(crate) blocks: Vec, } impl SSTableIndex { @@ -74,6 +76,27 @@ impl SSTableIndex { // locate_with_ord always returns an index within range self.get_block(self.locate_with_ord(ord)).unwrap() } + + pub(crate) fn get_block_for_automaton<'a>( + &'a self, + automaton: &'a impl Automaton, + ) -> impl Iterator + 'a { + std::iter::once((None, &self.blocks[0])) + .chain(self.blocks.windows(2).map(|window| { + let [prev, curr] = window else { + unreachable!(); + }; + (Some(&*prev.last_key_or_greater), curr) + })) + .enumerate() + .filter_map(move |(pos, (prev_key, current_block))| { + if block_match_automaton(prev_key, ¤t_block.last_key_or_greater, automaton) { + Some((pos, current_block.block_addr.clone())) + } else { + None + } + }) + } } #[derive(Debug, Clone)] @@ -99,3 +122,106 @@ impl SSTable for IndexSSTable { type ValueWriter = crate::value::index::IndexValueWriter; } + +#[cfg(test)] +mod tests { + use super::*; + use crate::block_match_automaton::tests::EqBuffer; + + #[test] + fn test_get_block_for_automaton() { + let sstable = SSTableIndex { + blocks: vec![ + BlockMeta { + last_key_or_greater: vec![0, 1, 2], + block_addr: BlockAddr { + first_ordinal: 0, + byte_range: 0..10, + }, + }, + BlockMeta { + last_key_or_greater: vec![0, 2, 2], + block_addr: BlockAddr { + first_ordinal: 5, + byte_range: 10..20, + }, + }, + BlockMeta { + last_key_or_greater: vec![0, 3, 2], + block_addr: BlockAddr { + first_ordinal: 10, + byte_range: 20..30, + }, + }, + ], + }; + + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 1, 1])) + .collect::>(); + assert_eq!( + res, + vec![( + 0, + BlockAddr { + first_ordinal: 0, + byte_range: 0..10 + } + )] + ); + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 2, 1])) + .collect::>(); + assert_eq!( + res, + vec![( + 1, + BlockAddr { + first_ordinal: 5, + byte_range: 10..20 + } + )] + ); + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 3, 1])) + .collect::>(); + assert_eq!( + res, + vec![( + 2, + BlockAddr { + first_ordinal: 10, + byte_range: 20..30 + } + )] + ); + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 4, 1])) + .collect::>(); + assert!(res.is_empty()); + + let complex_automaton = EqBuffer(vec![0, 1, 1]).union(EqBuffer(vec![0, 3, 1])); + let res = sstable + .get_block_for_automaton(&complex_automaton) + .collect::>(); + assert_eq!( + res, + vec![ + ( + 0, + BlockAddr { + first_ordinal: 0, + byte_range: 0..10 + } + ), + ( + 2, + BlockAddr { + first_ordinal: 10, + byte_range: 20..30 + } + ) + ] + ); + } +} diff --git a/sstable/src/sstable_index_v3.rs b/sstable/src/sstable_index_v3.rs index 8206ab242a..a8f728bd55 100644 --- a/sstable/src/sstable_index_v3.rs +++ b/sstable/src/sstable_index_v3.rs @@ -5,8 +5,9 @@ use std::sync::Arc; use common::{BinarySerializable, FixedSize, OwnedBytes}; use tantivy_bitpacker::{compute_num_bits, BitPacker}; use tantivy_fst::raw::Fst; -use tantivy_fst::{IntoStreamer, Map, MapBuilder, Streamer}; +use tantivy_fst::{Automaton, IntoStreamer, Map, MapBuilder, Streamer}; +use crate::block_match_automaton::block_match_automaton; use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal}; #[derive(Debug, Clone)] @@ -64,6 +65,41 @@ impl SSTableIndex { SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_ord(ord), } } + + pub fn get_block_for_automaton<'a>( + &'a self, + automaton: &'a impl Automaton, + ) -> impl Iterator + 'a { + match self { + SSTableIndex::V2(v2_index) => { + BlockIter::V2(v2_index.get_block_for_automaton(automaton)) + } + SSTableIndex::V3(v3_index) => { + BlockIter::V3(v3_index.get_block_for_automaton(automaton)) + } + SSTableIndex::V3Empty(v3_empty) => { + BlockIter::V3Empty(std::iter::once((0, v3_empty.block_addr.clone()))) + } + } + } +} + +enum BlockIter { + V2(V2), + V3(V3), + V3Empty(std::iter::Once), +} + +impl, V3: Iterator, T> Iterator for BlockIter { + type Item = T; + + fn next(&mut self) -> Option { + match self { + BlockIter::V2(v2) => v2.next(), + BlockIter::V3(v3) => v3.next(), + BlockIter::V3Empty(once) => once.next(), + } + } } #[derive(Debug, Clone)] @@ -123,6 +159,61 @@ impl SSTableIndexV3 { pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr { self.block_addr_store.binary_search_ord(ord).1 } + + pub(crate) fn get_block_for_automaton<'a>( + &'a self, + automaton: &'a impl Automaton, + ) -> impl Iterator + 'a { + // this is more complicated than other index formats: we don't have a ready made list of + // blocks, and instead need to stream-decode the sstable. + + GetBlockForAutomaton { + streamer: self.fst_index.stream(), + block_addr_store: &self.block_addr_store, + prev_key: None, + automaton, + } + } +} + +struct GetBlockForAutomaton<'a, A: Automaton> { + streamer: tantivy_fst::map::Stream<'a>, + // TODO we could be more efficient by streaming the store + block_addr_store: &'a BlockAddrStore, + prev_key: Option>, + automaton: &'a A, +} + +impl<'a, A: Automaton> Iterator for GetBlockForAutomaton<'a, A> { + type Item = (usize, BlockAddr); + + fn next(&mut self) -> Option { + while let Some((new_key, block_id)) = self.streamer.next() { + if let Some(prev_key) = self.prev_key.as_mut() { + if block_match_automaton(Some(prev_key), new_key, self.automaton) { + prev_key.clear(); + prev_key.extend_from_slice(new_key); + return Some(( + block_id as usize, + self.block_addr_store.get(block_id).unwrap(), + )); + } + // actually we could not write here, and it would still be correct, but it might + // lead to checking more keys than necessary which in itself can be a slowdown. + prev_key.clear(); + prev_key.extend_from_slice(new_key); + } else { + self.prev_key = Some(new_key.to_owned()); + if block_match_automaton(None, new_key, self.automaton) { + return Some(( + block_id as usize, + self.block_addr_store.get(block_id).unwrap(), + )); + } + } + } + None + } } #[derive(Debug, Clone)] @@ -734,7 +825,8 @@ fn find_best_slope(elements: impl Iterator + Clone) -> (u32 mod tests { use common::OwnedBytes; - use super::{BlockAddr, SSTableIndexBuilder, SSTableIndexV3}; + use super::*; + use crate::block_match_automaton::tests::EqBuffer; use crate::SSTableDataCorruption; #[test] @@ -823,4 +915,108 @@ mod tests { (12345, 1) ); } + + #[test] + fn test_get_block_for_automaton() { + let sstable_index_builder = SSTableIndexBuilder { + blocks: vec![ + BlockMeta { + last_key_or_greater: vec![0, 1, 2], + block_addr: BlockAddr { + first_ordinal: 0, + byte_range: 0..10, + }, + }, + BlockMeta { + last_key_or_greater: vec![0, 2, 2], + block_addr: BlockAddr { + first_ordinal: 5, + byte_range: 10..20, + }, + }, + BlockMeta { + last_key_or_greater: vec![0, 3, 2], + block_addr: BlockAddr { + first_ordinal: 10, + byte_range: 20..30, + }, + }, + ], + }; + + let mut sstable_index_bytes = Vec::new(); + let fst_len = sstable_index_builder + .serialize(&mut sstable_index_bytes) + .unwrap(); + + let sstable = SSTableIndexV3::load(OwnedBytes::new(sstable_index_bytes), fst_len).unwrap(); + + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 1, 1])) + .collect::>(); + assert_eq!( + res, + vec![( + 0, + BlockAddr { + first_ordinal: 0, + byte_range: 0..10 + } + )] + ); + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 2, 1])) + .collect::>(); + assert_eq!( + res, + vec![( + 1, + BlockAddr { + first_ordinal: 5, + byte_range: 10..20 + } + )] + ); + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 3, 1])) + .collect::>(); + assert_eq!( + res, + vec![( + 2, + BlockAddr { + first_ordinal: 10, + byte_range: 20..30 + } + )] + ); + let res = sstable + .get_block_for_automaton(&EqBuffer(vec![0, 4, 1])) + .collect::>(); + assert!(res.is_empty()); + + let complex_automaton = EqBuffer(vec![0, 1, 1]).union(EqBuffer(vec![0, 3, 1])); + let res = sstable + .get_block_for_automaton(&complex_automaton) + .collect::>(); + assert_eq!( + res, + vec![ + ( + 0, + BlockAddr { + first_ordinal: 0, + byte_range: 0..10 + } + ), + ( + 2, + BlockAddr { + first_ordinal: 10, + byte_range: 20..30 + } + ) + ] + ); + } } From 1f6a8e74bbe37d7aade74aa2391e8b81f5c31c52 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Sat, 13 Jul 2024 20:04:05 +0200 Subject: [PATCH 03/15] support iterating over partially loaded sstable --- sstable/Cargo.toml | 1 + sstable/src/block_reader.rs | 87 +++++++++++++++++++++------------ sstable/src/delta.rs | 10 ++++ sstable/src/dictionary.rs | 64 +++++++++++++++++++++--- sstable/src/sstable_index_v2.rs | 4 +- sstable/src/sstable_index_v3.rs | 16 ++---- sstable/src/streamer.rs | 7 ++- 7 files changed, 136 insertions(+), 53 deletions(-) diff --git a/sstable/Cargo.toml b/sstable/Cargo.toml index 91d629229b..cf840ec80d 100644 --- a/sstable/Cargo.toml +++ b/sstable/Cargo.toml @@ -11,6 +11,7 @@ description = "sstables for tantivy" [dependencies] common = {version= "0.7", path="../common", package="tantivy-common"} +futures-util = "0.3.30" tantivy-bitpacker = { version= "0.6", path="../bitpacker" } tantivy-fst = "0.5" # experimental gives us access to Decompressor::upper_bound diff --git a/sstable/src/block_reader.rs b/sstable/src/block_reader.rs index ee01ed5705..4fac90412c 100644 --- a/sstable/src/block_reader.rs +++ b/sstable/src/block_reader.rs @@ -7,6 +7,7 @@ use zstd::bulk::Decompressor; pub struct BlockReader { buffer: Vec, reader: OwnedBytes, + next_readers: std::vec::IntoIter, offset: usize, } @@ -15,6 +16,18 @@ impl BlockReader { BlockReader { buffer: Vec::new(), reader, + next_readers: Vec::new().into_iter(), + offset: 0, + } + } + + pub fn from_multiple_blocks(readers: Vec) -> BlockReader { + let mut next_readers = readers.into_iter(); + let reader = next_readers.next().unwrap_or_else(|| OwnedBytes::empty()); + BlockReader { + buffer: Vec::new(), + reader, + next_readers, offset: 0, } } @@ -34,42 +47,52 @@ impl BlockReader { self.offset = 0; self.buffer.clear(); - let block_len = match self.reader.len() { - 0 => return Ok(false), - 1..=3 => { + loop { + let block_len = match self.reader.len() { + 0 => { + // we are out of data for this block. Check if we have another block after + if let Some(new_reader) = self.next_readers.next() { + self.reader = new_reader; + continue; + } else { + return Ok(false); + } + } + 1..=3 => { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "failed to read block_len", + )) + } + _ => self.reader.read_u32() as usize, + }; + if block_len <= 1 { + return Ok(false); + } + let compress = self.reader.read_u8(); + let block_len = block_len - 1; + + if self.reader.len() < block_len { return Err(io::Error::new( io::ErrorKind::UnexpectedEof, - "failed to read block_len", - )) + "failed to read block content", + )); + } + if compress == 1 { + let required_capacity = + Decompressor::upper_bound(&self.reader[..block_len]).unwrap_or(1024 * 1024); + self.buffer.reserve(required_capacity); + Decompressor::new()? + .decompress_to_buffer(&self.reader[..block_len], &mut self.buffer)?; + + self.reader.advance(block_len); + } else { + self.buffer.resize(block_len, 0u8); + self.reader.read_exact(&mut self.buffer[..])?; } - _ => self.reader.read_u32() as usize, - }; - if block_len <= 1 { - return Ok(false); - } - let compress = self.reader.read_u8(); - let block_len = block_len - 1; - - if self.reader.len() < block_len { - return Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "failed to read block content", - )); - } - if compress == 1 { - let required_capacity = - Decompressor::upper_bound(&self.reader[..block_len]).unwrap_or(1024 * 1024); - self.buffer.reserve(required_capacity); - Decompressor::new()? - .decompress_to_buffer(&self.reader[..block_len], &mut self.buffer)?; - - self.reader.advance(block_len); - } else { - self.buffer.resize(block_len, 0u8); - self.reader.read_exact(&mut self.buffer[..])?; - } - Ok(true) + return Ok(true); + } } #[inline(always)] diff --git a/sstable/src/delta.rs b/sstable/src/delta.rs index 2b8a332d1e..e627f987eb 100644 --- a/sstable/src/delta.rs +++ b/sstable/src/delta.rs @@ -143,6 +143,16 @@ where TValueReader: value::ValueReader } } + pub fn from_multiple_blocks(reader: Vec) -> Self { + DeltaReader { + idx: 0, + common_prefix_len: 0, + suffix_range: 0..0, + value_reader: TValueReader::default(), + block_reader: BlockReader::from_multiple_blocks(reader), + } + } + pub fn empty() -> Self { DeltaReader::new(OwnedBytes::empty()) } diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index 7c61df09a8..e57748aa1d 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use common::bounds::{transform_bound_inner_res, TransformBound}; use common::file_slice::FileSlice; use common::{BinarySerializable, OwnedBytes}; +use futures_util::{stream, StreamExt, TryStreamExt}; use tantivy_fst::automaton::AlwaysMatch; use tantivy_fst::Automaton; @@ -98,20 +99,46 @@ impl Dictionary { &self, key_range: impl RangeBounds<[u8]>, limit: Option, + automaton: &impl Automaton, ) -> io::Result> { - let slice = self.file_slice_for_range(key_range, limit); - let data = slice.read_bytes_async().await?; - Ok(TSSTable::delta_reader(data)) + let match_all = automaton.will_always_match(&automaton.start()); + if match_all { + let slice = self.file_slice_for_range(key_range, limit); + let data = slice.read_bytes_async().await?; + Ok(TSSTable::delta_reader(data)) + } else { + let blocks = + stream::iter(self.get_block_iterator_for_range_and_automaton(key_range, automaton)); + let data = blocks + .map(|block_addr| { + self.sstable_slice + .read_bytes_slice_async(block_addr.byte_range) + }) + .buffered(5) + .try_collect::>() + .await?; + Ok(DeltaReader::from_multiple_blocks(data)) + } } pub(crate) fn sstable_delta_reader_for_key_range( &self, key_range: impl RangeBounds<[u8]>, limit: Option, + automaton: &impl Automaton, ) -> io::Result> { - let slice = self.file_slice_for_range(key_range, limit); - let data = slice.read_bytes()?; - Ok(TSSTable::delta_reader(data)) + let match_all = automaton.will_always_match(&automaton.start()); + if match_all { + let slice = self.file_slice_for_range(key_range, limit); + let data = slice.read_bytes()?; + Ok(TSSTable::delta_reader(data)) + } else { + let blocks = self.get_block_iterator_for_range_and_automaton(key_range, automaton); + let data = blocks + .map(|block_addr| self.sstable_slice.read_bytes_slice(block_addr.byte_range)) + .collect::, _>>()?; + Ok(DeltaReader::from_multiple_blocks(data)) + } } pub(crate) fn sstable_delta_reader_block( @@ -204,6 +231,31 @@ impl Dictionary { self.sstable_slice.slice((start_bound, end_bound)) } + fn get_block_iterator_for_range_and_automaton<'a>( + &'a self, + key_range: impl RangeBounds<[u8]>, + automaton: &'a impl Automaton, + ) -> impl Iterator + 'a { + let lower_bound = match key_range.start_bound() { + Bound::Included(key) | Bound::Excluded(key) => { + self.sstable_index.locate_with_key(key).unwrap_or(u64::MAX) + } + Bound::Unbounded => 0, + }; + + let upper_bound = match key_range.end_bound() { + Bound::Included(key) | Bound::Excluded(key) => { + self.sstable_index.locate_with_key(key).unwrap_or(u64::MAX) + } + Bound::Unbounded => u64::MAX, + }; + let block_range = lower_bound..=upper_bound; + self.sstable_index + .get_block_for_automaton(automaton) + .filter(move |(block_id, _)| block_range.contains(block_id)) + .map(|(_, block_addr)| block_addr) + } + /// Opens a `TermDictionary`. pub fn open(term_dictionary_file: FileSlice) -> io::Result { let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(20); diff --git a/sstable/src/sstable_index_v2.rs b/sstable/src/sstable_index_v2.rs index a10731fc08..f00e004a3c 100644 --- a/sstable/src/sstable_index_v2.rs +++ b/sstable/src/sstable_index_v2.rs @@ -80,7 +80,7 @@ impl SSTableIndex { pub(crate) fn get_block_for_automaton<'a>( &'a self, automaton: &'a impl Automaton, - ) -> impl Iterator + 'a { + ) -> impl Iterator + 'a { std::iter::once((None, &self.blocks[0])) .chain(self.blocks.windows(2).map(|window| { let [prev, curr] = window else { @@ -91,7 +91,7 @@ impl SSTableIndex { .enumerate() .filter_map(move |(pos, (prev_key, current_block))| { if block_match_automaton(prev_key, ¤t_block.last_key_or_greater, automaton) { - Some((pos, current_block.block_addr.clone())) + Some((pos as u64, current_block.block_addr.clone())) } else { None } diff --git a/sstable/src/sstable_index_v3.rs b/sstable/src/sstable_index_v3.rs index a8f728bd55..abc36fe063 100644 --- a/sstable/src/sstable_index_v3.rs +++ b/sstable/src/sstable_index_v3.rs @@ -69,7 +69,7 @@ impl SSTableIndex { pub fn get_block_for_automaton<'a>( &'a self, automaton: &'a impl Automaton, - ) -> impl Iterator + 'a { + ) -> impl Iterator + 'a { match self { SSTableIndex::V2(v2_index) => { BlockIter::V2(v2_index.get_block_for_automaton(automaton)) @@ -163,7 +163,7 @@ impl SSTableIndexV3 { pub(crate) fn get_block_for_automaton<'a>( &'a self, automaton: &'a impl Automaton, - ) -> impl Iterator + 'a { + ) -> impl Iterator + 'a { // this is more complicated than other index formats: we don't have a ready made list of // blocks, and instead need to stream-decode the sstable. @@ -185,7 +185,7 @@ struct GetBlockForAutomaton<'a, A: Automaton> { } impl<'a, A: Automaton> Iterator for GetBlockForAutomaton<'a, A> { - type Item = (usize, BlockAddr); + type Item = (u64, BlockAddr); fn next(&mut self) -> Option { while let Some((new_key, block_id)) = self.streamer.next() { @@ -193,10 +193,7 @@ impl<'a, A: Automaton> Iterator for GetBlockForAutomaton<'a, A> { if block_match_automaton(Some(prev_key), new_key, self.automaton) { prev_key.clear(); prev_key.extend_from_slice(new_key); - return Some(( - block_id as usize, - self.block_addr_store.get(block_id).unwrap(), - )); + return Some((block_id, self.block_addr_store.get(block_id).unwrap())); } // actually we could not write here, and it would still be correct, but it might // lead to checking more keys than necessary which in itself can be a slowdown. @@ -205,10 +202,7 @@ impl<'a, A: Automaton> Iterator for GetBlockForAutomaton<'a, A> { } else { self.prev_key = Some(new_key.to_owned()); if block_match_automaton(None, new_key, self.automaton) { - return Some(( - block_id as usize, - self.block_addr_store.get(block_id).unwrap(), - )); + return Some((block_id, self.block_addr_store.get(block_id).unwrap())); } } } diff --git a/sstable/src/streamer.rs b/sstable/src/streamer.rs index 7ae4bab14e..ca5206fb96 100644 --- a/sstable/src/streamer.rs +++ b/sstable/src/streamer.rs @@ -86,7 +86,7 @@ where bound_as_byte_slice(&self.upper), ); self.term_dict - .sstable_delta_reader_for_key_range(key_range, self.limit) + .sstable_delta_reader_for_key_range(key_range, self.limit, &self.automaton) } async fn delta_reader_async(&self) -> io::Result> { @@ -95,7 +95,7 @@ where bound_as_byte_slice(&self.upper), ); self.term_dict - .sstable_delta_reader_for_key_range_async(key_range, self.limit) + .sstable_delta_reader_for_key_range_async(key_range, self.limit, &self.automaton) .await } @@ -327,4 +327,7 @@ mod tests { assert!(!term_streamer.advance()); Ok(()) } + + // TODO add test for sparse search with a block of poison (starts with 0xffffffff) => such a + // block instantly causes an unexpected EOF error } From 9e2ddec4b31a57b32e4033b63a0f1827a000a777 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Sat, 13 Jul 2024 20:28:12 +0200 Subject: [PATCH 04/15] merge adjacent block when building delta for automaton --- sstable/Cargo.toml | 1 + sstable/src/dictionary.rs | 11 +++++++++++ 2 files changed, 12 insertions(+) diff --git a/sstable/Cargo.toml b/sstable/Cargo.toml index cf840ec80d..7a27024868 100644 --- a/sstable/Cargo.toml +++ b/sstable/Cargo.toml @@ -12,6 +12,7 @@ description = "sstables for tantivy" [dependencies] common = {version= "0.7", path="../common", package="tantivy-common"} futures-util = "0.3.30" +itertools = "0.13.0" tantivy-bitpacker = { version= "0.6", path="../bitpacker" } tantivy-fst = "0.5" # experimental gives us access to Decompressor::upper_bound diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index e57748aa1d..b98513788a 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -8,6 +8,7 @@ use common::bounds::{transform_bound_inner_res, TransformBound}; use common::file_slice::FileSlice; use common::{BinarySerializable, OwnedBytes}; use futures_util::{stream, StreamExt, TryStreamExt}; +use itertools::Itertools; use tantivy_fst::automaton::AlwaysMatch; use tantivy_fst::Automaton; @@ -254,6 +255,16 @@ impl Dictionary { .get_block_for_automaton(automaton) .filter(move |(block_id, _)| block_range.contains(block_id)) .map(|(_, block_addr)| block_addr) + .coalesce(|first, second| { + if first.byte_range.end == second.byte_range.start { + Ok(BlockAddr { + first_ordinal: first.first_ordinal, + byte_range: first.byte_range.start..second.byte_range.end, + }) + } else { + Err((first, second)) + } + }) } /// Opens a `TermDictionary`. From 24c5dc2398024de68c09a54e21f37ab2f844b30b Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Sat, 14 Sep 2024 13:58:34 +0200 Subject: [PATCH 05/15] allow warming up automaton --- columnar/src/columnar/reader/mod.rs | 2 +- src/index/inverted_index_reader.rs | 74 ++++++++++++++++++++++++++--- sstable/src/dictionary.rs | 17 +++++-- sstable/src/streamer.rs | 19 ++++++-- 4 files changed, 96 insertions(+), 16 deletions(-) diff --git a/columnar/src/columnar/reader/mod.rs b/columnar/src/columnar/reader/mod.rs index f850e4f6f6..832fdb7da7 100644 --- a/columnar/src/columnar/reader/mod.rs +++ b/columnar/src/columnar/reader/mod.rs @@ -170,7 +170,7 @@ impl ColumnarReader { ) -> io::Result> { let stream = self .stream_for_column_range(column_name) - .into_stream_async() + .into_stream_async(0) .await?; read_all_columns_in_stream(stream, &self.column_data, self.format_version) } diff --git a/src/index/inverted_index_reader.rs b/src/index/inverted_index_reader.rs index 30685eaa44..5781f9df3f 100644 --- a/src/index/inverted_index_reader.rs +++ b/src/index/inverted_index_reader.rs @@ -3,6 +3,12 @@ use std::io; use common::json_path_writer::JSON_END_OF_PATH; use common::BinarySerializable; use fnv::FnvHashSet; +#[cfg(feature = "quickwit")] +use futures_util::{FutureExt, StreamExt, TryStreamExt}; +#[cfg(feature = "quickwit")] +use itertools::Itertools; +#[cfg(feature = "quickwit")] +use tantivy_fst::automaton::{AlwaysMatch, Automaton}; use crate::directory::FileSlice; use crate::positions::PositionReader; @@ -219,13 +225,18 @@ impl InvertedIndexReader { self.termdict.get_async(term.serialized_value_bytes()).await } - async fn get_term_range_async( - &self, + async fn get_term_range_async<'a, A: Automaton + 'a>( + &'a self, terms: impl std::ops::RangeBounds, + automaton: A, limit: Option, - ) -> io::Result + '_> { + merge_holes_under: usize, + ) -> io::Result + 'a> + where + A::State: Clone, + { use std::ops::Bound; - let range_builder = self.termdict.range(); + let range_builder = self.termdict.search(automaton); let range_builder = match terms.start_bound() { Bound::Included(bound) => range_builder.ge(bound.serialized_value_bytes()), Bound::Excluded(bound) => range_builder.gt(bound.serialized_value_bytes()), @@ -242,7 +253,7 @@ impl InvertedIndexReader { range_builder }; - let mut stream = range_builder.into_stream_async().await?; + let mut stream = range_builder.into_stream_async(merge_holes_under).await?; let iter = std::iter::from_fn(move || stream.next().map(|(_k, v)| v.clone())); @@ -288,7 +299,9 @@ impl InvertedIndexReader { limit: Option, with_positions: bool, ) -> io::Result { - let mut term_info = self.get_term_range_async(terms, limit).await?; + let mut term_info = self + .get_term_range_async(terms, AlwaysMatch, limit, 0) + .await?; let Some(first_terminfo) = term_info.next() else { // no key matches, nothing more to load @@ -315,6 +328,55 @@ impl InvertedIndexReader { Ok(true) } + /// Warmup a block postings given a range of `Term`s. + /// This method is for an advanced usage only. + /// + /// returns a boolean, whether a term matching the range was found in the dictionary + pub async fn warm_postings_automaton( + &self, + automaton: A, + // with_positions: bool, at the moment we have no use for it, and supporting it would add + // complexity to the coalesce + ) -> io::Result + where + A::State: Clone, + { + // merge holes under 4MiB, that's how many bytes we can hope to receive during a TTFB from + // S3 (~80MiB/s, and 50ms latency) + let merge_holes_under = (80 * 1024 * 1024 * 50) / 1000; + // we build a first iterator to download everything. Simply calling the function already + // loads everything, but doesn't start iterating over the sstable. + let mut _term_info = self + .get_term_range_async(.., automaton.clone(), None, merge_holes_under) + .await?; + // we build a 2nd iterator, this one with no holes, so we don't go through blocks we can't + // match, and just download them to reduce our query count. This makes the assumption + // there is a caching layer below, which might not always be true, but is in Quickwit. + let term_info = self.get_term_range_async(.., automaton, None, 0).await?; + + let range_to_load = term_info + .map(|term_info| term_info.postings_range) + .coalesce(|range1, range2| { + if range1.end + merge_holes_under >= range2.start { + Ok(range1.start..range2.end) + } else { + Err((range1, range2)) + } + }); + + let slices_downloaded = futures_util::stream::iter(range_to_load) + .map(|posting_slice| { + self.postings_file_slice + .read_bytes_slice_async(posting_slice) + .map(|result| result.map(|_slice| ())) + }) + .buffer_unordered(5) + .try_collect::>() + .await?; + + Ok(!slices_downloaded.is_empty()) + } + /// Warmup the block postings for all terms. /// This method is for an advanced usage only. /// diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index b98513788a..b351d64af8 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -101,6 +101,7 @@ impl Dictionary { key_range: impl RangeBounds<[u8]>, limit: Option, automaton: &impl Automaton, + merge_holes_under: usize, ) -> io::Result> { let match_all = automaton.will_always_match(&automaton.start()); if match_all { @@ -108,8 +109,11 @@ impl Dictionary { let data = slice.read_bytes_async().await?; Ok(TSSTable::delta_reader(data)) } else { - let blocks = - stream::iter(self.get_block_iterator_for_range_and_automaton(key_range, automaton)); + let blocks = stream::iter(self.get_block_iterator_for_range_and_automaton( + key_range, + automaton, + merge_holes_under, + )); let data = blocks .map(|block_addr| { self.sstable_slice @@ -134,7 +138,9 @@ impl Dictionary { let data = slice.read_bytes()?; Ok(TSSTable::delta_reader(data)) } else { - let blocks = self.get_block_iterator_for_range_and_automaton(key_range, automaton); + // if operations are sync, we assume latency is almost null, and there is no point in + // merging accross holes + let blocks = self.get_block_iterator_for_range_and_automaton(key_range, automaton, 0); let data = blocks .map(|block_addr| self.sstable_slice.read_bytes_slice(block_addr.byte_range)) .collect::, _>>()?; @@ -236,6 +242,7 @@ impl Dictionary { &'a self, key_range: impl RangeBounds<[u8]>, automaton: &'a impl Automaton, + merge_holes_under: usize, ) -> impl Iterator + 'a { let lower_bound = match key_range.start_bound() { Bound::Included(key) | Bound::Excluded(key) => { @@ -255,8 +262,8 @@ impl Dictionary { .get_block_for_automaton(automaton) .filter(move |(block_id, _)| block_range.contains(block_id)) .map(|(_, block_addr)| block_addr) - .coalesce(|first, second| { - if first.byte_range.end == second.byte_range.start { + .coalesce(move |first, second| { + if first.byte_range.end + merge_holes_under >= second.byte_range.start { Ok(BlockAddr { first_ordinal: first.first_ordinal, byte_range: first.byte_range.start..second.byte_range.end, diff --git a/sstable/src/streamer.rs b/sstable/src/streamer.rs index ca5206fb96..e1c34a538f 100644 --- a/sstable/src/streamer.rs +++ b/sstable/src/streamer.rs @@ -89,13 +89,21 @@ where .sstable_delta_reader_for_key_range(key_range, self.limit, &self.automaton) } - async fn delta_reader_async(&self) -> io::Result> { + async fn delta_reader_async( + &self, + merge_holes_under: usize, + ) -> io::Result> { let key_range = ( bound_as_byte_slice(&self.lower), bound_as_byte_slice(&self.upper), ); self.term_dict - .sstable_delta_reader_for_key_range_async(key_range, self.limit, &self.automaton) + .sstable_delta_reader_for_key_range_async( + key_range, + self.limit, + &self.automaton, + merge_holes_under, + ) .await } @@ -129,8 +137,11 @@ where } /// See `into_stream(..)` - pub async fn into_stream_async(self) -> io::Result> { - let delta_reader = self.delta_reader_async().await?; + pub async fn into_stream_async( + self, + merge_holes_undex: usize, + ) -> io::Result> { + let delta_reader = self.delta_reader_async(merge_holes_undex).await?; self.into_stream_given_delta_reader(delta_reader) } From a1447cc9c2ecb2831c37df1255e162aaca4b2722 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Thu, 19 Dec 2024 17:30:05 +0100 Subject: [PATCH 06/15] remove breaking change in sstable public api --- columnar/src/columnar/reader/mod.rs | 2 +- src/index/inverted_index_reader.rs | 4 +++- sstable/src/streamer.rs | 12 +++++++++--- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/columnar/src/columnar/reader/mod.rs b/columnar/src/columnar/reader/mod.rs index 832fdb7da7..f850e4f6f6 100644 --- a/columnar/src/columnar/reader/mod.rs +++ b/columnar/src/columnar/reader/mod.rs @@ -170,7 +170,7 @@ impl ColumnarReader { ) -> io::Result> { let stream = self .stream_for_column_range(column_name) - .into_stream_async(0) + .into_stream_async() .await?; read_all_columns_in_stream(stream, &self.column_data, self.format_version) } diff --git a/src/index/inverted_index_reader.rs b/src/index/inverted_index_reader.rs index 5781f9df3f..d3f8f46402 100644 --- a/src/index/inverted_index_reader.rs +++ b/src/index/inverted_index_reader.rs @@ -253,7 +253,9 @@ impl InvertedIndexReader { range_builder }; - let mut stream = range_builder.into_stream_async(merge_holes_under).await?; + let mut stream = range_builder + .into_stream_async_merging_holes(merge_holes_under) + .await?; let iter = std::iter::from_fn(move || stream.next().map(|(_k, v)| v.clone())); diff --git a/sstable/src/streamer.rs b/sstable/src/streamer.rs index e1c34a538f..de727cec46 100644 --- a/sstable/src/streamer.rs +++ b/sstable/src/streamer.rs @@ -137,11 +137,17 @@ where } /// See `into_stream(..)` - pub async fn into_stream_async( + pub async fn into_stream_async(self) -> io::Result> { + self.into_stream_async_merging_holes(0).await + } + + /// Same as `into_stream_async`, but tries to issue a single io operation when requesting + /// blocks that are not consecutive, but also less than `merge_holes_under` bytes appart. + pub async fn into_stream_async_merging_holes( self, - merge_holes_undex: usize, + merge_holes_under: usize, ) -> io::Result> { - let delta_reader = self.delta_reader_async(merge_holes_undex).await?; + let delta_reader = self.delta_reader_async(merge_holes_under).await?; self.into_stream_given_delta_reader(delta_reader) } From 192395c311b147b6c42091f7753daa23546745fa Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Fri, 20 Dec 2024 10:25:38 +0100 Subject: [PATCH 07/15] attempt at simplifying can_block_match_automaton --- sstable/src/block_match_automaton.rs | 233 +++++++++++++++++---------- sstable/src/sstable_index_v2.rs | 8 +- sstable/src/sstable_index_v3.rs | 6 +- 3 files changed, 157 insertions(+), 90 deletions(-) diff --git a/sstable/src/block_match_automaton.rs b/sstable/src/block_match_automaton.rs index 3bb75b883b..e3be4b56f7 100644 --- a/sstable/src/block_match_automaton.rs +++ b/sstable/src/block_match_automaton.rs @@ -1,46 +1,61 @@ use tantivy_fst::Automaton; -/// Returns whether a block whose starting key (exclusive) and final key (inclusive) can match the -/// provided automaton, without actually looking at the block content. -pub(crate) fn block_match_automaton( - start_key: Option<&[u8]>, +/// Returns whether a block can match an automaton based on its bounds. +/// +/// start key is exclusive, and optional to account for the first block. end key is inclusive and +/// mandatory. +pub(crate) fn can_block_match_automaton( + start_key_opt: Option<&[u8]>, end_key: &[u8], automaton: &impl Automaton, ) -> bool { - let initial_block = start_key.is_none(); - let start_key = start_key.unwrap_or(&[]); + let start_key = if let Some(start_key) = start_key_opt { + start_key + } else { + // if start_key_opt is None, we would allow an automaton matching the empty string to match + if automaton.is_match(&automaton.start()) { + return true; + } + &[] + }; + can_block_match_automaton_with_start(start_key, end_key, automaton) +} + +// similar to can_block_match_automaton, ignoring the edge case of the initial block +fn can_block_match_automaton_with_start( + start_key: &[u8], + end_key: &[u8], + automaton: &impl Automaton, +) -> bool { + // notation: in loops, we use `kb` to denotate a key byte (a byte taken from the start/end key), + // and `rb`, a range byte (usually all values higher than a `kb` when comparing with + // start_key, or all values lower than a `kb` when comparing with end_key) - debug_assert!(start_key <= end_key); + if start_key >= end_key { + return false; + } - let prefix_len = start_key - .iter() - .zip(end_key) - .take_while(|(c1, c2)| c1 == c2) - .count(); + let common_prefix_len = crate::common_prefix_len(start_key, end_key); let mut base_state = automaton.start(); - for c in &start_key[0..prefix_len] { - base_state = automaton.accept(&base_state, *c); + for kb in &start_key[0..common_prefix_len] { + base_state = automaton.accept(&base_state, *kb); } + + // this is not required for correctness, but allows dodging more expensive checks if !automaton.can_match(&base_state) { return false; } - if initial_block && automaton.is_match(&base_state) { - // for other blocks, the start_key is exclusive, and a prefix of it would be in a - // previous block - return true; - } - // we have 3 distinct case: // - keys are `abc` and `abcd` => we test for abc[\0-d].* // - keys are `abcd` and `abce` => we test for abc[d-e].* // - keys are `abcd` and `abc` => contradiction with start_key < end_key. // - // ideally for [abc, abcde] we could test for abc([\0-c].*|d([\0-d].*|e)?) + // ideally for ]abc, abcde] we could test for abc([\0-c].*|d([\0-d].*|e)?) // but let's start simple (and correct), and tighten our bounds latter // - // and for [abcde, abcfg] we could test for abc(d(e.+|[f-\xff].*)|e.*|f([\0-f].*|g)?) + // and for ]abcde, abcfg] we could test for abc(d(e.+|[f-\xff].*)|e.*|f([\0-f].*|g)?) // abc ( // d(e.+|[f-\xff].*) | // e.* | @@ -48,70 +63,108 @@ pub(crate) fn block_match_automaton( // ) // // these are all written as regex, but can be converted to operations we can do: - // - [x-y] is a for c in x..y + // - [x-y] is a for c in x..=y // - .* is a can_match() // - .+ is a for c in 0..=255 { accept(c).can_match() } // - ? is a the thing before can_match(), or current state.is_match() // - | means test both side - let mut start_range = *start_key.get(prefix_len).unwrap_or(&0); - let end_range = end_key[prefix_len]; + // we have two cases, either start_key is a prefix of end_key (e.g. ]abc, abcjp]), + // or it is not (e.g. ]abcdg, abcjp]). It is not possible however that end_key be a prefix of + // start_key (or that both are equal) because we already handled start_key >= end_key. + // + // if we are in the first case, we want to visit the following states: + // abc ( + // [\0-i].* | + // j ( + // [\0-o].* | + // p + // )? + // ) + // Everything after `abc` is handled by `match_range_end` + // + // if we are in the 2nd case, we want to visit the following states: + // abc ( + // d(g.+|[h-\xff].*) | // this is handled by match_range_start + // + // [e-i].* | // this is handled here + // + // j ( // this is handled by match_range_end (but countrary to the other + // [\0-o].* | // case, j is already consumed so to not check [\0-i].* ) + // p + // )? + // ) - if start_key.len() > prefix_len { - start_range += 1; - } - for c in start_range..end_range { - let new_state = automaton.accept(&base_state, c); + let Some(start_range) = start_key.get(common_prefix_len) else { + return match_range_end(&end_key[common_prefix_len..], &automaton, base_state); + }; + + let end_range = end_key[common_prefix_len]; + + // things starting with start_range were handled in match_range_start + // this starting with end_range are handled bellow. + // this can run for 0 iteration in cases such as ]abc, abd] + for rb in (start_range + 1)..end_range { + let new_state = automaton.accept(&base_state, rb); if automaton.can_match(&new_state) { return true; } } - if start_key.len() > prefix_len { - if start_key.len() <= prefix_len { - // case [abcd, abcde], we need to handle \0 which wasn't processed - let new_state = automaton.accept(&base_state, start_range); - if automaton.can_match(&new_state) { - eprintln!("ho"); - return true; - } - } else if match_range_start(&start_key[prefix_len..], &automaton, &base_state) { - return true; - } + + let state_for_start = automaton.accept(&base_state, *start_range); + if match_range_start( + &start_key[common_prefix_len + 1..], + &automaton, + state_for_start, + ) { + return true; + } + + let state_for_end = automaton.accept(&base_state, end_range); + if automaton.is_match(&state_for_end) { + return true; } - match_range_end(&end_key[prefix_len..], &automaton, &base_state) + match_range_end(&end_key[common_prefix_len + 1..], &automaton, state_for_end) } fn match_range_start>( start_key: &[u8], automaton: &A, - base_state: &S, + mut state: S, ) -> bool { - // case [abcdef, abcghi], we need to handle - // - abcd[f-\xff].* - // - abcde[g-\xff].* - // - abcdef.+ == abcdef[\0-\xff].* - let mut state = automaton.accept(base_state, start_key[0]); - for start_point in &start_key[1..] { + // case [abcdgj, abcpqr], `abcd` is already consumed, we need to handle: + // - [h-\xff].* + // - g[k-\xff].* + // - gj.+ == gf[\0-\xff].* + + for kb in start_key { + // this is an optimisation, and is not needed for correctness if !automaton.can_match(&state) { return false; } - // handle case where start_point is \xff - if *start_point < u8::MAX { - for to_name in (start_point + 1)..=u8::MAX { - let temp_state = automaton.accept(&state, to_name); + + // does the [h-\xff].* part. we skip if kb==255 as [\{0100}-\xff] is an empty range, and + // this would overflow in our u8 world + if *kb < u8::MAX { + for rb in (kb + 1)..=u8::MAX { + let temp_state = automaton.accept(&state, rb); if automaton.can_match(&temp_state) { return true; } } } - state = automaton.accept(&state, *start_point); + // push g + state = automaton.accept(&state, *kb); } + // this isn't required for correctness, but can save us from looping 256 below if !automaton.can_match(&state) { return false; } - for to_name in 0..=u8::MAX { - let temp_state = automaton.accept(&state, to_name); + + // does the final `.+`, which is the same as `[\0-\xff].*` + for rb in 0..=u8::MAX { + let temp_state = automaton.accept(&state, rb); if automaton.can_match(&temp_state) { return true; } @@ -122,31 +175,36 @@ fn match_range_start>( fn match_range_end>( end_key: &[u8], automaton: &A, - base_state: &S, + mut state: S, ) -> bool { - // f([\0-f].*|g)? - // case [abcdef, abcghi], we need to handle - // - abcg[\0-g].* - // - abcgh[\0-h].* - // - abcghi - let mut state = automaton.accept(base_state, end_key[0]); - for end_point in &end_key[1..] { + // for ]abcdef, abcmps]. the prefix `abcm` has been consumed, `[d-l].*` was handled elsewhere, + // we just need to handle + // - [\0-o].* + // - p + // - p[\0-r].* + // - ps + for kb in end_key { + // this is an optimisation, and is not needed for correctness if !automaton.can_match(&state) { return false; } - if automaton.is_match(&state) { - return true; - } - for to_name in 0..*end_point { - let temp_state = automaton.accept(&state, to_name); + + // does the `[\0-o].*` + for rb in 0..*kb { + let temp_state = automaton.accept(&state, rb); if automaton.can_match(&temp_state) { return true; } } - state = automaton.accept(&state, *end_point); - } - automaton.is_match(&state) + // push p + state = automaton.accept(&state, *kb); + // verify the `p` case + if automaton.is_match(&state) { + return true; + } + } + false } #[cfg(test)] @@ -184,25 +242,30 @@ pub(crate) mod tests { } } + fn gen_key_strategy() -> impl Strategy> { + // we only generate bytes in [0, 1, 2, 254, 255] to reduce the search space without + // ignoring edge cases that might ocure with integer over/underflow + proptest::collection::vec(prop_oneof![0u8..=2, 254u8..=255], 0..5) + } + proptest! { + #![proptest_config(ProptestConfig { + cases: 10000, .. ProptestConfig::default() + })] + #[test] - fn test_proptest_automaton_match_block(start in any::>(), end in any::>(), key in any::>()) { - // inverted keys are *not* supported and can return bogus results - if start < end && !end.is_empty() { - let expected = start < key && end >= key; - let automaton = EqBuffer(key); + fn test_proptest_automaton_match_block(start in gen_key_strategy(), end in gen_key_strategy(), key in gen_key_strategy()) { + let expected = start < key && end >= key; + let automaton = EqBuffer(key); - assert_eq!(block_match_automaton(Some(&start), &end, &automaton), expected); - } + assert_eq!(can_block_match_automaton(Some(&start), &end, &automaton), expected); } #[test] - fn test_proptest_automaton_match_first_block(end in any::>(), key in any::>()) { - if !end.is_empty() { - let expected = end >= key; - let automaton = EqBuffer(key); - assert_eq!(block_match_automaton(None, &end, &automaton), expected); - } + fn test_proptest_automaton_match_first_block(end in gen_key_strategy(), key in gen_key_strategy()) { + let expected = end >= key; + let automaton = EqBuffer(key); + assert_eq!(can_block_match_automaton(None, &end, &automaton), expected); } } } diff --git a/sstable/src/sstable_index_v2.rs b/sstable/src/sstable_index_v2.rs index f00e004a3c..f0aa83ab0b 100644 --- a/sstable/src/sstable_index_v2.rs +++ b/sstable/src/sstable_index_v2.rs @@ -1,7 +1,7 @@ use common::OwnedBytes; use tantivy_fst::Automaton; -use crate::block_match_automaton::block_match_automaton; +use crate::block_match_automaton::can_block_match_automaton; use crate::{BlockAddr, SSTable, SSTableDataCorruption, TermOrdinal}; #[derive(Default, Debug, Clone)] @@ -90,7 +90,11 @@ impl SSTableIndex { })) .enumerate() .filter_map(move |(pos, (prev_key, current_block))| { - if block_match_automaton(prev_key, ¤t_block.last_key_or_greater, automaton) { + if can_block_match_automaton( + prev_key, + ¤t_block.last_key_or_greater, + automaton, + ) { Some((pos as u64, current_block.block_addr.clone())) } else { None diff --git a/sstable/src/sstable_index_v3.rs b/sstable/src/sstable_index_v3.rs index abc36fe063..4308743bce 100644 --- a/sstable/src/sstable_index_v3.rs +++ b/sstable/src/sstable_index_v3.rs @@ -7,7 +7,7 @@ use tantivy_bitpacker::{compute_num_bits, BitPacker}; use tantivy_fst::raw::Fst; use tantivy_fst::{Automaton, IntoStreamer, Map, MapBuilder, Streamer}; -use crate::block_match_automaton::block_match_automaton; +use crate::block_match_automaton::can_block_match_automaton; use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal}; #[derive(Debug, Clone)] @@ -190,7 +190,7 @@ impl<'a, A: Automaton> Iterator for GetBlockForAutomaton<'a, A> { fn next(&mut self) -> Option { while let Some((new_key, block_id)) = self.streamer.next() { if let Some(prev_key) = self.prev_key.as_mut() { - if block_match_automaton(Some(prev_key), new_key, self.automaton) { + if can_block_match_automaton(Some(prev_key), new_key, self.automaton) { prev_key.clear(); prev_key.extend_from_slice(new_key); return Some((block_id, self.block_addr_store.get(block_id).unwrap())); @@ -201,7 +201,7 @@ impl<'a, A: Automaton> Iterator for GetBlockForAutomaton<'a, A> { prev_key.extend_from_slice(new_key); } else { self.prev_key = Some(new_key.to_owned()); - if block_match_automaton(None, new_key, self.automaton) { + if can_block_match_automaton(None, new_key, self.automaton) { return Some((block_id, self.block_addr_store.get(block_id).unwrap())); } } From 42efc7f7c84c0c1a3f8b8dee1ff88745bc924ceb Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Fri, 20 Dec 2024 11:00:11 +0100 Subject: [PATCH 08/15] clippy --- sstable/src/block_reader.rs | 2 +- sstable/src/sstable_index_v3.rs | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/sstable/src/block_reader.rs b/sstable/src/block_reader.rs index 4fac90412c..3299c3b4ce 100644 --- a/sstable/src/block_reader.rs +++ b/sstable/src/block_reader.rs @@ -23,7 +23,7 @@ impl BlockReader { pub fn from_multiple_blocks(readers: Vec) -> BlockReader { let mut next_readers = readers.into_iter(); - let reader = next_readers.next().unwrap_or_else(|| OwnedBytes::empty()); + let reader = next_readers.next().unwrap_or_else(OwnedBytes::empty); BlockReader { buffer: Vec::new(), reader, diff --git a/sstable/src/sstable_index_v3.rs b/sstable/src/sstable_index_v3.rs index 4308743bce..1c99c36e5f 100644 --- a/sstable/src/sstable_index_v3.rs +++ b/sstable/src/sstable_index_v3.rs @@ -176,15 +176,21 @@ impl SSTableIndexV3 { } } +// TODO we iterate over the entire Map to find matching blocks, +// we could manually iterate on the underlying Fst and skip whole branches if our Automaton says +// cannot match. this isn't as bad as it sounds given the fst is a lot smaller than the rest of the +// sstable. +// To do that, we can't use tantivy_fst's Stream with an automaton, as we need to know 2 consecutive +// fst keys to form a proper opinion on whether this is a match, which we wan't translate into a +// single automaton struct GetBlockForAutomaton<'a, A: Automaton> { streamer: tantivy_fst::map::Stream<'a>, - // TODO we could be more efficient by streaming the store block_addr_store: &'a BlockAddrStore, prev_key: Option>, automaton: &'a A, } -impl<'a, A: Automaton> Iterator for GetBlockForAutomaton<'a, A> { +impl Iterator for GetBlockForAutomaton<'_, A> { type Item = (u64, BlockAddr); fn next(&mut self) -> Option { @@ -195,8 +201,6 @@ impl<'a, A: Automaton> Iterator for GetBlockForAutomaton<'a, A> { prev_key.extend_from_slice(new_key); return Some((block_id, self.block_addr_store.get(block_id).unwrap())); } - // actually we could not write here, and it would still be correct, but it might - // lead to checking more keys than necessary which in itself can be a slowdown. prev_key.clear(); prev_key.extend_from_slice(new_key); } else { From ebf4d84553a63e1ab7710f7f635c04d6452460d6 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Fri, 20 Dec 2024 12:20:35 +0100 Subject: [PATCH 09/15] add comment about cpu-intensive operation in async context --- src/index/inverted_index_reader.rs | 5 +++++ sstable/src/block_match_automaton.rs | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/index/inverted_index_reader.rs b/src/index/inverted_index_reader.rs index d3f8f46402..7a6655ca93 100644 --- a/src/index/inverted_index_reader.rs +++ b/src/index/inverted_index_reader.rs @@ -351,11 +351,16 @@ impl InvertedIndexReader { let mut _term_info = self .get_term_range_async(.., automaton.clone(), None, merge_holes_under) .await?; + // we build a 2nd iterator, this one with no holes, so we don't go through blocks we can't // match, and just download them to reduce our query count. This makes the assumption // there is a caching layer below, which might not always be true, but is in Quickwit. let term_info = self.get_term_range_async(.., automaton, None, 0).await?; + // TODO this operation is often cheap for "friendly" automatons, but can be very costly for + // "unfriendly" ones such as ".*a{50}" (very few terms if any match this pattern, but we + // can't know early). In this case, we decompress and iterate over the entire sstable, while + // still being in async context. Ideally we should spawn this on a threadpool. let range_to_load = term_info .map(|term_info| term_info.postings_range) .coalesce(|range1, range2| { diff --git a/sstable/src/block_match_automaton.rs b/sstable/src/block_match_automaton.rs index e3be4b56f7..bb516ea2e2 100644 --- a/sstable/src/block_match_automaton.rs +++ b/sstable/src/block_match_automaton.rs @@ -132,7 +132,7 @@ fn match_range_start>( automaton: &A, mut state: S, ) -> bool { - // case [abcdgj, abcpqr], `abcd` is already consumed, we need to handle: + // case ]abcdgj, abcpqr], `abcd` is already consumed, we need to handle: // - [h-\xff].* // - g[k-\xff].* // - gj.+ == gf[\0-\xff].* From dfff5f3bcb17c2447272e07ee5fcbcbd9e5c49a7 Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Mon, 23 Dec 2024 16:17:44 +0100 Subject: [PATCH 10/15] rename merge_holes_under => merge_holes_under_bytes --- src/index/inverted_index_reader.rs | 10 +++++----- sstable/src/dictionary.rs | 8 ++++---- sstable/src/streamer.rs | 10 +++++----- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/index/inverted_index_reader.rs b/src/index/inverted_index_reader.rs index 7a6655ca93..064d10954b 100644 --- a/src/index/inverted_index_reader.rs +++ b/src/index/inverted_index_reader.rs @@ -230,7 +230,7 @@ impl InvertedIndexReader { terms: impl std::ops::RangeBounds, automaton: A, limit: Option, - merge_holes_under: usize, + merge_holes_under_bytes: usize, ) -> io::Result + 'a> where A::State: Clone, @@ -254,7 +254,7 @@ impl InvertedIndexReader { }; let mut stream = range_builder - .into_stream_async_merging_holes(merge_holes_under) + .into_stream_async_merging_holes(merge_holes_under_bytes) .await?; let iter = std::iter::from_fn(move || stream.next().map(|(_k, v)| v.clone())); @@ -345,11 +345,11 @@ impl InvertedIndexReader { { // merge holes under 4MiB, that's how many bytes we can hope to receive during a TTFB from // S3 (~80MiB/s, and 50ms latency) - let merge_holes_under = (80 * 1024 * 1024 * 50) / 1000; + let merge_holes_under_bytes = (80 * 1024 * 1024 * 50) / 1000; // we build a first iterator to download everything. Simply calling the function already // loads everything, but doesn't start iterating over the sstable. let mut _term_info = self - .get_term_range_async(.., automaton.clone(), None, merge_holes_under) + .get_term_range_async(.., automaton.clone(), None, merge_holes_under_bytes) .await?; // we build a 2nd iterator, this one with no holes, so we don't go through blocks we can't @@ -364,7 +364,7 @@ impl InvertedIndexReader { let range_to_load = term_info .map(|term_info| term_info.postings_range) .coalesce(|range1, range2| { - if range1.end + merge_holes_under >= range2.start { + if range1.end + merge_holes_under_bytes >= range2.start { Ok(range1.start..range2.end) } else { Err((range1, range2)) diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index b351d64af8..68708f202a 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -101,7 +101,7 @@ impl Dictionary { key_range: impl RangeBounds<[u8]>, limit: Option, automaton: &impl Automaton, - merge_holes_under: usize, + merge_holes_under_bytes: usize, ) -> io::Result> { let match_all = automaton.will_always_match(&automaton.start()); if match_all { @@ -112,7 +112,7 @@ impl Dictionary { let blocks = stream::iter(self.get_block_iterator_for_range_and_automaton( key_range, automaton, - merge_holes_under, + merge_holes_under_bytes, )); let data = blocks .map(|block_addr| { @@ -242,7 +242,7 @@ impl Dictionary { &'a self, key_range: impl RangeBounds<[u8]>, automaton: &'a impl Automaton, - merge_holes_under: usize, + merge_holes_under_bytes: usize, ) -> impl Iterator + 'a { let lower_bound = match key_range.start_bound() { Bound::Included(key) | Bound::Excluded(key) => { @@ -263,7 +263,7 @@ impl Dictionary { .filter(move |(block_id, _)| block_range.contains(block_id)) .map(|(_, block_addr)| block_addr) .coalesce(move |first, second| { - if first.byte_range.end + merge_holes_under >= second.byte_range.start { + if first.byte_range.end + merge_holes_under_bytes >= second.byte_range.start { Ok(BlockAddr { first_ordinal: first.first_ordinal, byte_range: first.byte_range.start..second.byte_range.end, diff --git a/sstable/src/streamer.rs b/sstable/src/streamer.rs index de727cec46..f0f052c33a 100644 --- a/sstable/src/streamer.rs +++ b/sstable/src/streamer.rs @@ -91,7 +91,7 @@ where async fn delta_reader_async( &self, - merge_holes_under: usize, + merge_holes_under_bytes: usize, ) -> io::Result> { let key_range = ( bound_as_byte_slice(&self.lower), @@ -102,7 +102,7 @@ where key_range, self.limit, &self.automaton, - merge_holes_under, + merge_holes_under_bytes, ) .await } @@ -142,12 +142,12 @@ where } /// Same as `into_stream_async`, but tries to issue a single io operation when requesting - /// blocks that are not consecutive, but also less than `merge_holes_under` bytes appart. + /// blocks that are not consecutive, but also less than `merge_holes_under_bytes` bytes appart. pub async fn into_stream_async_merging_holes( self, - merge_holes_under: usize, + merge_holes_under_bytes: usize, ) -> io::Result> { - let delta_reader = self.delta_reader_async(merge_holes_under).await?; + let delta_reader = self.delta_reader_async(merge_holes_under_bytes).await?; self.into_stream_given_delta_reader(delta_reader) } From fe0c7c54082c077f4cb12f61cc435aa2b02248ba Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Thu, 2 Jan 2025 11:56:05 +0100 Subject: [PATCH 11/15] change rangebound style --- src/index/inverted_index_reader.rs | 8 ++++---- sstable/src/block_match_automaton.rs | 14 +++++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/index/inverted_index_reader.rs b/src/index/inverted_index_reader.rs index 064d10954b..142d629012 100644 --- a/src/index/inverted_index_reader.rs +++ b/src/index/inverted_index_reader.rs @@ -348,20 +348,20 @@ impl InvertedIndexReader { let merge_holes_under_bytes = (80 * 1024 * 1024 * 50) / 1000; // we build a first iterator to download everything. Simply calling the function already // loads everything, but doesn't start iterating over the sstable. - let mut _term_info = self + let mut _term_infos = self .get_term_range_async(.., automaton.clone(), None, merge_holes_under_bytes) .await?; // we build a 2nd iterator, this one with no holes, so we don't go through blocks we can't // match, and just download them to reduce our query count. This makes the assumption // there is a caching layer below, which might not always be true, but is in Quickwit. - let term_info = self.get_term_range_async(.., automaton, None, 0).await?; + let term_infos = self.get_term_range_async(.., automaton, None, 0).await?; // TODO this operation is often cheap for "friendly" automatons, but can be very costly for // "unfriendly" ones such as ".*a{50}" (very few terms if any match this pattern, but we // can't know early). In this case, we decompress and iterate over the entire sstable, while // still being in async context. Ideally we should spawn this on a threadpool. - let range_to_load = term_info + let posting_ranges_to_load = term_infos .map(|term_info| term_info.postings_range) .coalesce(|range1, range2| { if range1.end + merge_holes_under_bytes >= range2.start { @@ -371,7 +371,7 @@ impl InvertedIndexReader { } }); - let slices_downloaded = futures_util::stream::iter(range_to_load) + let slices_downloaded = futures_util::stream::iter(postings_ranges_to_load) .map(|posting_slice| { self.postings_file_slice .read_bytes_slice_async(posting_slice) diff --git a/sstable/src/block_match_automaton.rs b/sstable/src/block_match_automaton.rs index bb516ea2e2..0c84a05e0f 100644 --- a/sstable/src/block_match_automaton.rs +++ b/sstable/src/block_match_automaton.rs @@ -52,10 +52,10 @@ fn can_block_match_automaton_with_start( // - keys are `abcd` and `abce` => we test for abc[d-e].* // - keys are `abcd` and `abc` => contradiction with start_key < end_key. // - // ideally for ]abc, abcde] we could test for abc([\0-c].*|d([\0-d].*|e)?) + // ideally for (abc, abcde] we could test for abc([\0-c].*|d([\0-d].*|e)?) // but let's start simple (and correct), and tighten our bounds latter // - // and for ]abcde, abcfg] we could test for abc(d(e.+|[f-\xff].*)|e.*|f([\0-f].*|g)?) + // and for (abcde, abcfg] we could test for abc(d(e.+|[f-\xff].*)|e.*|f([\0-f].*|g)?) // abc ( // d(e.+|[f-\xff].*) | // e.* | @@ -69,8 +69,8 @@ fn can_block_match_automaton_with_start( // - ? is a the thing before can_match(), or current state.is_match() // - | means test both side - // we have two cases, either start_key is a prefix of end_key (e.g. ]abc, abcjp]), - // or it is not (e.g. ]abcdg, abcjp]). It is not possible however that end_key be a prefix of + // we have two cases, either start_key is a prefix of end_key (e.g. (abc, abcjp]), + // or it is not (e.g. (abcdg, abcjp]). It is not possible however that end_key be a prefix of // start_key (or that both are equal) because we already handled start_key >= end_key. // // if we are in the first case, we want to visit the following states: @@ -103,7 +103,7 @@ fn can_block_match_automaton_with_start( // things starting with start_range were handled in match_range_start // this starting with end_range are handled bellow. - // this can run for 0 iteration in cases such as ]abc, abd] + // this can run for 0 iteration in cases such as (abc, abd] for rb in (start_range + 1)..end_range { let new_state = automaton.accept(&base_state, rb); if automaton.can_match(&new_state) { @@ -132,7 +132,7 @@ fn match_range_start>( automaton: &A, mut state: S, ) -> bool { - // case ]abcdgj, abcpqr], `abcd` is already consumed, we need to handle: + // case (abcdgj, abcpqr], `abcd` is already consumed, we need to handle: // - [h-\xff].* // - g[k-\xff].* // - gj.+ == gf[\0-\xff].* @@ -177,7 +177,7 @@ fn match_range_end>( automaton: &A, mut state: S, ) -> bool { - // for ]abcdef, abcmps]. the prefix `abcm` has been consumed, `[d-l].*` was handled elsewhere, + // for (abcdef, abcmps]. the prefix `abcm` has been consumed, `[d-l].*` was handled elsewhere, // we just need to handle // - [\0-o].* // - p From 175a529c416114c445f3d404b2c6d91275e726a7 Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Fri, 3 Jan 2025 18:55:54 +0100 Subject: [PATCH 12/15] use executor for cpu-heavy sstable decompression for automaton --- Cargo.toml | 3 +- src/index/inverted_index_reader.rs | 71 +++++++++++++++++++--------- src/termdict/mod.rs | 1 + src/termdict/sstable_termdict/mod.rs | 1 + 4 files changed, 52 insertions(+), 24 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 41058dcfb5..2c7cdfa9d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ tokenizer-api = { version = "0.3", path = "./tokenizer-api", package = "tantivy- sketches-ddsketch = { version = "0.3.0", features = ["use_serde"] } hyperloglogplus = { version = "0.4.1", features = ["const-loop"] } futures-util = { version = "0.3.28", optional = true } +futures-channel = { version = "0.3.28", optional = true } fnv = "1.0.7" [target.'cfg(windows)'.dependencies] @@ -120,7 +121,7 @@ zstd-compression = ["zstd"] failpoints = ["fail", "fail/failpoints"] unstable = [] # useful for benches. -quickwit = ["sstable", "futures-util"] +quickwit = ["sstable", "futures-util", "futures-channel"] # Compares only the hash of a string when indexing data. # Increases indexing speed, but may lead to extremely rare missing terms, when there's a hash collision. diff --git a/src/index/inverted_index_reader.rs b/src/index/inverted_index_reader.rs index 142d629012..4b9165ec49 100644 --- a/src/index/inverted_index_reader.rs +++ b/src/index/inverted_index_reader.rs @@ -334,44 +334,65 @@ impl InvertedIndexReader { /// This method is for an advanced usage only. /// /// returns a boolean, whether a term matching the range was found in the dictionary - pub async fn warm_postings_automaton( + pub async fn warm_postings_automaton< + A: Automaton + Clone + Send + 'static, + E: FnOnce(Box io::Result<()> + Send>) -> F, + F: std::future::Future>, + >( &self, automaton: A, // with_positions: bool, at the moment we have no use for it, and supporting it would add // complexity to the coalesce + executor: E, ) -> io::Result where A::State: Clone, { // merge holes under 4MiB, that's how many bytes we can hope to receive during a TTFB from // S3 (~80MiB/s, and 50ms latency) - let merge_holes_under_bytes = (80 * 1024 * 1024 * 50) / 1000; + const MERGE_HOLES_UNDER_BYTES: usize = (80 * 1024 * 1024 * 50) / 1000; // we build a first iterator to download everything. Simply calling the function already - // loads everything, but doesn't start iterating over the sstable. - let mut _term_infos = self - .get_term_range_async(.., automaton.clone(), None, merge_holes_under_bytes) + // download everything we need from the sstable, but doesn't start iterating over it. + let _term_info_iter = self + .get_term_range_async(.., automaton.clone(), None, MERGE_HOLES_UNDER_BYTES) .await?; - // we build a 2nd iterator, this one with no holes, so we don't go through blocks we can't - // match, and just download them to reduce our query count. This makes the assumption - // there is a caching layer below, which might not always be true, but is in Quickwit. - let term_infos = self.get_term_range_async(.., automaton, None, 0).await?; - - // TODO this operation is often cheap for "friendly" automatons, but can be very costly for - // "unfriendly" ones such as ".*a{50}" (very few terms if any match this pattern, but we - // can't know early). In this case, we decompress and iterate over the entire sstable, while - // still being in async context. Ideally we should spawn this on a threadpool. - let posting_ranges_to_load = term_infos - .map(|term_info| term_info.postings_range) - .coalesce(|range1, range2| { - if range1.end + merge_holes_under_bytes >= range2.start { - Ok(range1.start..range2.end) - } else { - Err((range1, range2)) + let (sender, posting_ranges_to_load_stream) = futures_channel::mpsc::unbounded(); + let termdict = self.termdict.clone(); + let cpu_bound_task = move || { + // then we build a 2nd iterator, this one with no holes, so we don't go through blocks + // we can't match. + // This makes the assumption there is a caching layer below us, which gives sync read + // for free after the initial async access. This might not always be true, but is in + // Quickwit. + // We build things from this closure otherwise we get into lifetime issues that can only + // be solved with self referential strucs. Returning an io::Result from here is a bit + // more leaky abstraction-wise, but a lot better than the alternative + let mut stream = termdict.search(automaton).into_stream()?; + + // we could do without an iterator, but this allows us access to coalesce which simplify + // things + let posting_ranges_iter = + std::iter::from_fn(move || stream.next().map(|(_k, v)| v.postings_range.clone())) + .coalesce(|range1, range2| { + if range1.end + MERGE_HOLES_UNDER_BYTES >= range2.start { + Ok(range1.start..range2.end) + } else { + Err((range1, range2)) + } + }); + + for posting_range in posting_ranges_iter { + if let Err(_) = sender.unbounded_send(posting_range) { + // this should happen only when search is cancelled + return Err(io::Error::other("failed to send posting range back")); } - }); + } + Ok(()) + }; + let task_handle = executor(Box::new(cpu_bound_task)); - let slices_downloaded = futures_util::stream::iter(postings_ranges_to_load) + let slices_downloaded = posting_ranges_to_load_stream .map(|posting_slice| { self.postings_file_slice .read_bytes_slice_async(posting_slice) @@ -381,6 +402,10 @@ impl InvertedIndexReader { .try_collect::>() .await?; + // we don't need to pull that sooner, its future is only used to make sure we didn't miss + // an error, but the channel gets filled even before this is polled. + task_handle.await?; + Ok(!slices_downloaded.is_empty()) } diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs index 01c9591ee3..3051492547 100644 --- a/src/termdict/mod.rs +++ b/src/termdict/mod.rs @@ -74,6 +74,7 @@ const CURRENT_TYPE: DictionaryType = DictionaryType::SSTable; // TODO in the future this should become an enum of supported dictionaries /// A TermDictionary wrapping either an FST based dictionary or a SSTable based one. +#[cfg_attr(feature = "quickwit", derive(Clone))] pub struct TermDictionary(InnerTermDict); impl TermDictionary { diff --git a/src/termdict/sstable_termdict/mod.rs b/src/termdict/sstable_termdict/mod.rs index 621b85b35d..cc82eba3cd 100644 --- a/src/termdict/sstable_termdict/mod.rs +++ b/src/termdict/sstable_termdict/mod.rs @@ -28,6 +28,7 @@ pub type TermDictionaryBuilder = sstable::Writer; pub type TermStreamer<'a, A = AlwaysMatch> = sstable::Streamer<'a, TermSSTable, A>; /// SSTable used to store TermInfo objects. +#[derive(Clone)] pub struct TermSSTable; pub type TermStreamerBuilder<'a, A = AlwaysMatch> = sstable::StreamerBuilder<'a, TermSSTable, A>; From 037d12c9c9b8c96c09288297cacc7e20d88ea842 Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Mon, 6 Jan 2025 11:58:58 +0100 Subject: [PATCH 13/15] fix deadlocking on automaton warmup --- src/index/inverted_index_reader.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/index/inverted_index_reader.rs b/src/index/inverted_index_reader.rs index 4b9165ec49..3a960e795b 100644 --- a/src/index/inverted_index_reader.rs +++ b/src/index/inverted_index_reader.rs @@ -392,19 +392,17 @@ impl InvertedIndexReader { }; let task_handle = executor(Box::new(cpu_bound_task)); - let slices_downloaded = posting_ranges_to_load_stream + let posting_downloader = posting_ranges_to_load_stream .map(|posting_slice| { self.postings_file_slice .read_bytes_slice_async(posting_slice) .map(|result| result.map(|_slice| ())) }) .buffer_unordered(5) - .try_collect::>() - .await?; + .try_collect::>(); - // we don't need to pull that sooner, its future is only used to make sure we didn't miss - // an error, but the channel gets filled even before this is polled. - task_handle.await?; + let (_, slices_downloaded) = + futures_util::future::try_join(task_handle, posting_downloader).await?; Ok(!slices_downloaded.is_empty()) } From 6ca84a61fad2126f64c93a910de0855c13db8040 Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Wed, 8 Jan 2025 16:19:54 +0100 Subject: [PATCH 14/15] make termdict always clone --- src/termdict/fst_termdict/term_info_store.rs | 1 + src/termdict/fst_termdict/termdict.rs | 6 ++++-- src/termdict/mod.rs | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/termdict/fst_termdict/term_info_store.rs b/src/termdict/fst_termdict/term_info_store.rs index 0ad3a9d35a..1768c94118 100644 --- a/src/termdict/fst_termdict/term_info_store.rs +++ b/src/termdict/fst_termdict/term_info_store.rs @@ -93,6 +93,7 @@ impl TermInfoBlockMeta { } } +#[derive(Clone)] pub struct TermInfoStore { num_terms: usize, block_meta_bytes: OwnedBytes, diff --git a/src/termdict/fst_termdict/termdict.rs b/src/termdict/fst_termdict/termdict.rs index 23ed3606d4..fb3352d044 100644 --- a/src/termdict/fst_termdict/termdict.rs +++ b/src/termdict/fst_termdict/termdict.rs @@ -1,4 +1,5 @@ use std::io::{self, Write}; +use std::sync::Arc; use common::{BinarySerializable, CountingWriter}; use once_cell::sync::Lazy; @@ -113,8 +114,9 @@ static EMPTY_TERM_DICT_FILE: Lazy = Lazy::new(|| { /// The `Fst` crate is used to associate terms to their /// respective `TermOrdinal`. The `TermInfoStore` then makes it /// possible to fetch the associated `TermInfo`. +#[derive(Clone)] pub struct TermDictionary { - fst_index: tantivy_fst::Map, + fst_index: Arc>, term_info_store: TermInfoStore, } @@ -136,7 +138,7 @@ impl TermDictionary { let fst_index = open_fst_index(fst_file_slice)?; let term_info_store = TermInfoStore::open(values_file_slice)?; Ok(TermDictionary { - fst_index, + fst_index: Arc::new(fst_index), term_info_store, }) } diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs index 3051492547..7153881251 100644 --- a/src/termdict/mod.rs +++ b/src/termdict/mod.rs @@ -74,7 +74,7 @@ const CURRENT_TYPE: DictionaryType = DictionaryType::SSTable; // TODO in the future this should become an enum of supported dictionaries /// A TermDictionary wrapping either an FST based dictionary or a SSTable based one. -#[cfg_attr(feature = "quickwit", derive(Clone))] +#[derive(Clone)] pub struct TermDictionary(InnerTermDict); impl TermDictionary { From be17daf65813344f57908c78117622c66c5396d6 Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Wed, 8 Jan 2025 16:24:34 +0100 Subject: [PATCH 15/15] split iterator --- src/index/inverted_index_reader.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/index/inverted_index_reader.rs b/src/index/inverted_index_reader.rs index 3a960e795b..4be9d9c51f 100644 --- a/src/index/inverted_index_reader.rs +++ b/src/index/inverted_index_reader.rs @@ -373,16 +373,17 @@ impl InvertedIndexReader { // we could do without an iterator, but this allows us access to coalesce which simplify // things let posting_ranges_iter = - std::iter::from_fn(move || stream.next().map(|(_k, v)| v.postings_range.clone())) - .coalesce(|range1, range2| { - if range1.end + MERGE_HOLES_UNDER_BYTES >= range2.start { - Ok(range1.start..range2.end) - } else { - Err((range1, range2)) - } - }); - - for posting_range in posting_ranges_iter { + std::iter::from_fn(move || stream.next().map(|(_k, v)| v.postings_range.clone())); + + let merged_posting_ranges_iter = posting_ranges_iter.coalesce(|range1, range2| { + if range1.end + MERGE_HOLES_UNDER_BYTES >= range2.start { + Ok(range1.start..range2.end) + } else { + Err((range1, range2)) + } + }); + + for posting_range in merged_posting_ranges_iter { if let Err(_) = sender.unbounded_send(posting_range) { // this should happen only when search is cancelled return Err(io::Error::other("failed to send posting range back"));