From b0db9a9dc329bc569f4b0875cc312c990d007729 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 26 Dec 2024 10:51:18 +0800 Subject: [PATCH] feat(bloom-filter): bloom filter applier (#5220) * wip Signed-off-by: Ruihang Xia * draft search logic Signed-off-by: Ruihang Xia * use defined BloomFilterReader Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia * round the range end Signed-off-by: Ruihang Xia * finish index applier Signed-off-by: Ruihang Xia * integrate applier into mito2 with cache layer Signed-off-by: Ruihang Xia * fix cache key and add unit test Signed-off-by: Ruihang Xia * provide bloom filter index size hint Signed-off-by: Ruihang Xia * revert BloomFilterReaderImpl::read_vec Signed-off-by: Ruihang Xia * remove dead code Signed-off-by: Ruihang Xia * ignore null on eq Signed-off-by: Ruihang Xia * add more tests and fix bloom filter logic Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- Cargo.lock | 1 + src/index/Cargo.toml | 1 + src/index/src/bloom_filter.rs | 5 +- src/index/src/bloom_filter/applier.rs | 133 ++++ src/index/src/bloom_filter/reader.rs | 10 +- src/mito2/src/cache.rs | 14 + src/mito2/src/cache/index.rs | 1 + .../src/cache/index/bloom_filter_index.rs | 167 ++++ src/mito2/src/engine.rs | 1 + src/mito2/src/error.rs | 8 + src/mito2/src/read/scan_region.rs | 70 ++ src/mito2/src/sst/file.rs | 20 +- src/mito2/src/sst/index.rs | 6 +- src/mito2/src/sst/index/bloom_filter.rs | 1 + .../src/sst/index/bloom_filter/applier.rs | 722 ++++++++++++++++++ .../src/sst/index/bloom_filter/creator.rs | 4 +- src/mito2/src/sst/parquet/reader.rs | 63 ++ src/mito2/src/test_util.rs | 1 - 18 files changed, 1214 insertions(+), 14 deletions(-) create mode 100644 src/index/src/bloom_filter/applier.rs create mode 100644 src/mito2/src/cache/index/bloom_filter_index.rs create mode 100644 src/mito2/src/sst/index/bloom_filter/applier.rs diff --git a/Cargo.lock b/Cargo.lock index fb778e623bc9..f17bb4112e25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5288,6 +5288,7 @@ dependencies = [ "futures", "greptime-proto", "mockall", + "parquet", "pin-project", "prost 0.12.6", "rand", diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml index f46c64a17606..898be43b7d56 100644 --- a/src/index/Cargo.toml +++ b/src/index/Cargo.toml @@ -22,6 +22,7 @@ fst.workspace = true futures.workspace = true greptime-proto.workspace = true mockall.workspace = true +parquet.workspace = true pin-project.workspace = true prost.workspace = true regex.workspace = true diff --git a/src/index/src/bloom_filter.rs b/src/index/src/bloom_filter.rs index 00e80f7490ee..600f6e80e84d 100644 --- a/src/index/src/bloom_filter.rs +++ b/src/index/src/bloom_filter.rs @@ -14,6 +14,7 @@ use serde::{Deserialize, Serialize}; +pub mod applier; pub mod creator; pub mod error; pub mod reader; @@ -25,7 +26,7 @@ pub type BytesRef<'a> = &'a [u8]; pub const SEED: u128 = 42; /// The Meta information of the bloom filter stored in the file. -#[derive(Debug, Default, Serialize, Deserialize)] +#[derive(Debug, Default, Serialize, Deserialize, Clone)] pub struct BloomFilterMeta { /// The number of rows per segment. pub rows_per_segment: usize, @@ -44,7 +45,7 @@ pub struct BloomFilterMeta { } /// The location of the bloom filter segment in the file. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone, Hash, PartialEq, Eq)] pub struct BloomFilterSegmentLocation { /// The offset of the bloom filter segment in the file. pub offset: u64, diff --git a/src/index/src/bloom_filter/applier.rs b/src/index/src/bloom_filter/applier.rs new file mode 100644 index 000000000000..2750cbb92b6b --- /dev/null +++ b/src/index/src/bloom_filter/applier.rs @@ -0,0 +1,133 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, HashSet}; + +use parquet::arrow::arrow_reader::RowSelection; +use parquet::file::metadata::RowGroupMetaData; + +use crate::bloom_filter::error::Result; +use crate::bloom_filter::reader::BloomFilterReader; +use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, Bytes}; + +/// Enumerates types of predicates for value filtering. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Predicate { + /// Predicate for matching values in a list. + InList(InListPredicate), +} + +/// `InListPredicate` contains a list of acceptable values. A value needs to match at least +/// one of the elements (logical OR semantic) for the predicate to be satisfied. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct InListPredicate { + /// List of acceptable values. + pub list: HashSet, +} + +pub struct BloomFilterApplier { + reader: Box, + meta: BloomFilterMeta, +} + +impl BloomFilterApplier { + pub async fn new(mut reader: Box) -> Result { + let meta = reader.metadata().await?; + + Ok(Self { reader, meta }) + } + + /// Searches for matching row groups using bloom filters. + /// + /// This method applies bloom filter index to eliminate row groups that definitely + /// don't contain the searched values. It works by: + /// + /// 1. Computing prefix sums for row counts + /// 2. Calculating bloom filter segment locations for each row group + /// 1. A row group may span multiple bloom filter segments + /// 3. Probing bloom filter segments + /// 4. Removing non-matching row groups from the basement + /// 1. If a row group doesn't match any bloom filter segment with any probe, it is removed + /// + /// # Note + /// The method modifies the `basement` map in-place by removing row groups that + /// don't match the bloom filter criteria. + pub async fn search( + &mut self, + probes: &HashSet, + row_group_metas: &[RowGroupMetaData], + basement: &mut BTreeMap>, + ) -> Result<()> { + // 0. Fast path - if basement is empty return empty vec + if basement.is_empty() { + return Ok(()); + } + + // 1. Compute prefix sum for row counts + let mut sum = 0usize; + let mut prefix_sum = Vec::with_capacity(row_group_metas.len() + 1); + prefix_sum.push(0usize); + for meta in row_group_metas { + sum += meta.num_rows() as usize; + prefix_sum.push(sum); + } + + // 2. Calculate bloom filter segment locations + let mut row_groups_to_remove = HashSet::new(); + for &row_group_idx in basement.keys() { + // TODO(ruihang): support further filter over row selection + + // todo: dedup & overlap + let rows_range_start = prefix_sum[row_group_idx] / self.meta.rows_per_segment; + let rows_range_end = (prefix_sum[row_group_idx + 1] as f64 + / self.meta.rows_per_segment as f64) + .ceil() as usize; + + let mut is_any_range_hit = false; + for i in rows_range_start..rows_range_end { + // 3. Probe each bloom filter segment + let loc = BloomFilterSegmentLocation { + offset: self.meta.bloom_filter_segments[i].offset, + size: self.meta.bloom_filter_segments[i].size, + elem_count: self.meta.bloom_filter_segments[i].elem_count, + }; + let bloom = self.reader.bloom_filter(&loc).await?; + + // Check if any probe exists in bloom filter + let mut matches = false; + for probe in probes { + if bloom.contains(probe) { + matches = true; + break; + } + } + + is_any_range_hit |= matches; + if matches { + break; + } + } + if !is_any_range_hit { + row_groups_to_remove.insert(row_group_idx); + } + } + + // 4. Remove row groups that do not match any bloom filter segment + for row_group_idx in row_groups_to_remove { + basement.remove(&row_group_idx); + } + + Ok(()) + } +} diff --git a/src/index/src/bloom_filter/reader.rs b/src/index/src/bloom_filter/reader.rs index 6dc592100fcf..02085fa671f7 100644 --- a/src/index/src/bloom_filter/reader.rs +++ b/src/index/src/bloom_filter/reader.rs @@ -38,7 +38,15 @@ pub trait BloomFilterReader { async fn range_read(&mut self, offset: u64, size: u32) -> Result; /// Reads bunch of ranges from the file. - async fn read_vec(&mut self, ranges: &[Range]) -> Result>; + async fn read_vec(&mut self, ranges: &[Range]) -> Result> { + let mut results = Vec::with_capacity(ranges.len()); + for range in ranges { + let size = (range.end - range.start) as u32; + let data = self.range_read(range.start, size).await?; + results.push(data); + } + Ok(results) + } /// Reads the meta information of the bloom filter. async fn metadata(&mut self) -> Result; diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index a1864c55179e..77577926a2a3 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -28,6 +28,7 @@ use std::sync::Arc; use bytes::Bytes; use datatypes::value::Value; use datatypes::vectors::VectorRef; +use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef}; use moka::notification::RemovalCause; use moka::sync::Cache; use parquet::column::page::Page; @@ -69,6 +70,8 @@ pub struct CacheManager { write_cache: Option, /// Cache for inverted index. index_cache: Option, + /// Cache for bloom filter index. + bloom_filter_index_cache: Option, /// Puffin metadata cache. puffin_metadata_cache: Option, /// Cache for time series selectors. @@ -221,6 +224,10 @@ impl CacheManager { self.index_cache.as_ref() } + pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> { + self.bloom_filter_index_cache.as_ref() + } + pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> { self.puffin_metadata_cache.as_ref() } @@ -364,6 +371,12 @@ impl CacheManagerBuilder { self.index_content_size, self.index_content_page_size, ); + // TODO(ruihang): check if it's ok to reuse the same param with inverted index + let bloom_filter_index_cache = BloomFilterIndexCache::new( + self.index_metadata_size, + self.index_content_size, + self.index_content_page_size, + ); let puffin_metadata_cache = PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES); let selector_result_cache = (self.selector_result_cache_size != 0).then(|| { @@ -387,6 +400,7 @@ impl CacheManagerBuilder { page_cache, write_cache: self.write_cache, index_cache: Some(Arc::new(inverted_index_cache)), + bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)), puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)), selector_result_cache, } diff --git a/src/mito2/src/cache/index.rs b/src/mito2/src/cache/index.rs index c8bd7a8f329b..137dc3d87454 100644 --- a/src/mito2/src/cache/index.rs +++ b/src/mito2/src/cache/index.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod bloom_filter_index; pub mod inverted_index; use std::future::Future; diff --git a/src/mito2/src/cache/index/bloom_filter_index.rs b/src/mito2/src/cache/index/bloom_filter_index.rs new file mode 100644 index 000000000000..f6d19687ceb9 --- /dev/null +++ b/src/mito2/src/cache/index/bloom_filter_index.rs @@ -0,0 +1,167 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use index::bloom_filter::error::Result; +use index::bloom_filter::reader::BloomFilterReader; +use index::bloom_filter::BloomFilterMeta; +use store_api::storage::ColumnId; + +use crate::cache::index::{IndexCache, PageKey, INDEX_METADATA_TYPE}; +use crate::metrics::{CACHE_HIT, CACHE_MISS}; +use crate::sst::file::FileId; + +const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index"; + +/// Cache for bloom filter index. +pub type BloomFilterIndexCache = IndexCache<(FileId, ColumnId), BloomFilterMeta>; +pub type BloomFilterIndexCacheRef = Arc; + +impl BloomFilterIndexCache { + /// Creates a new bloom filter index cache. + pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self { + Self::new_with_weighter( + index_metadata_cap, + index_content_cap, + page_size, + INDEX_TYPE_BLOOM_FILTER_INDEX, + bloom_filter_index_metadata_weight, + bloom_filter_index_content_weight, + ) + } +} + +/// Calculates weight for bloom filter index metadata. +fn bloom_filter_index_metadata_weight(k: &(FileId, ColumnId), _: &Arc) -> u32 { + (k.0.as_bytes().len() + + std::mem::size_of::() + + std::mem::size_of::()) as u32 +} + +/// Calculates weight for bloom filter index content. +fn bloom_filter_index_content_weight((k, _): &((FileId, ColumnId), PageKey), v: &Bytes) -> u32 { + (k.0.as_bytes().len() + std::mem::size_of::() + v.len()) as u32 +} + +/// Bloom filter index blob reader with cache. +pub struct CachedBloomFilterIndexBlobReader { + file_id: FileId, + column_id: ColumnId, + file_size: u64, + inner: R, + cache: BloomFilterIndexCacheRef, +} + +impl CachedBloomFilterIndexBlobReader { + /// Creates a new bloom filter index blob reader with cache. + pub fn new( + file_id: FileId, + column_id: ColumnId, + file_size: u64, + inner: R, + cache: BloomFilterIndexCacheRef, + ) -> Self { + Self { + file_id, + column_id, + file_size, + inner, + cache, + } + } +} + +#[async_trait] +impl BloomFilterReader for CachedBloomFilterIndexBlobReader { + async fn range_read(&mut self, offset: u64, size: u32) -> Result { + let inner = &mut self.inner; + self.cache + .get_or_load( + (self.file_id, self.column_id), + self.file_size, + offset, + size, + move |ranges| async move { inner.read_vec(&ranges).await }, + ) + .await + .map(|b| b.into()) + } + + /// Reads the meta information of the bloom filter. + async fn metadata(&mut self) -> Result { + if let Some(cached) = self.cache.get_metadata((self.file_id, self.column_id)) { + CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc(); + Ok((*cached).clone()) + } else { + let meta = self.inner.metadata().await?; + self.cache + .put_metadata((self.file_id, self.column_id), Arc::new(meta.clone())); + CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc(); + Ok(meta) + } + } +} + +#[cfg(test)] +mod test { + use rand::{Rng, RngCore}; + + use super::*; + + const FUZZ_REPEAT_TIMES: usize = 100; + + #[test] + fn fuzz_index_calculation() { + let mut rng = rand::thread_rng(); + let mut data = vec![0u8; 1024 * 1024]; + rng.fill_bytes(&mut data); + + for _ in 0..FUZZ_REPEAT_TIMES { + let offset = rng.gen_range(0..data.len() as u64); + let size = rng.gen_range(0..data.len() as u32 - offset as u32); + let page_size: usize = rng.gen_range(1..1024); + + let indexes = + PageKey::generate_page_keys(offset, size, page_size as u64).collect::>(); + let page_num = indexes.len(); + let mut read = Vec::with_capacity(size as usize); + for key in indexes.into_iter() { + let start = key.page_id as usize * page_size; + let page = if start + page_size < data.len() { + &data[start..start + page_size] + } else { + &data[start..] + }; + read.extend_from_slice(page); + } + let expected_range = offset as usize..(offset + size as u64 as u64) as usize; + let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec(); + assert_eq!( + read, + data.get(expected_range).unwrap(), + "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}", + offset, + size, + page_size, + read.len(), + size as usize, + PageKey::calculate_range(offset, size, page_size as u64), + page_num + ); + } + } +} diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 9b912318e16b..98160eadc46a 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -433,6 +433,7 @@ impl EngineInner { .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size) .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled()) .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled()) + // .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled()) // TODO(ruihang): wait for #5237 .with_start_time(query_start); Ok(scan_region) diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 3a6f368bacd8..0820d99337ec 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -576,6 +576,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to apply bloom filter index"))] + ApplyBloomFilterIndex { + source: index::bloom_filter::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to push index value"))] PushIndexValue { source: index::inverted_index::error::Error, @@ -1022,6 +1029,7 @@ impl ErrorExt for Error { EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound, ArrowReader { .. } => StatusCode::StorageUnavailable, ConvertValue { source, .. } => source.status_code(), + ApplyBloomFilterIndex { source, .. } => source.status_code(), BuildIndexApplier { source, .. } | PushIndexValue { source, .. } | ApplyInvertedIndex { source, .. } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 2dfa22f9f1c9..2ce3367b409b 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -47,6 +47,9 @@ use crate::read::{Batch, Source}; use crate::region::options::MergeMode; use crate::region::version::VersionRef; use crate::sst::file::FileHandle; +use crate::sst::index::bloom_filter::applier::{ + BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef, +}; use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder; use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; @@ -175,6 +178,8 @@ pub(crate) struct ScanRegion { ignore_inverted_index: bool, /// Whether to ignore fulltext index. ignore_fulltext_index: bool, + /// Whether to ignore bloom filter. + ignore_bloom_filter: bool, /// Start time of the scan task. start_time: Option, } @@ -195,6 +200,7 @@ impl ScanRegion { parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, ignore_inverted_index: false, ignore_fulltext_index: false, + ignore_bloom_filter: false, start_time: None, } } @@ -223,6 +229,14 @@ impl ScanRegion { self } + /// Sets whether to ignore bloom filter. + #[must_use] + #[allow(dead_code)] // TODO(ruihang): waiting for #5237 + pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self { + self.ignore_bloom_filter = ignore; + self + } + #[must_use] pub(crate) fn with_start_time(mut self, now: Instant) -> Self { self.start_time = Some(now); @@ -322,6 +336,7 @@ impl ScanRegion { self.maybe_remove_field_filters(); let inverted_index_applier = self.build_invereted_index_applier(); + let bloom_filter_applier = self.build_bloom_filter_applier(); let fulltext_index_applier = self.build_fulltext_index_applier(); let predicate = Predicate::new(self.request.filters.clone()); // The mapper always computes projected column ids as the schema of SSTs may change. @@ -345,6 +360,7 @@ impl ScanRegion { .with_files(files) .with_cache(self.cache_manager) .with_inverted_index_applier(inverted_index_applier) + .with_bloom_filter_index_applier(bloom_filter_applier) .with_fulltext_index_applier(fulltext_index_applier) .with_parallel_scan_channel_size(self.parallel_scan_channel_size) .with_start_time(self.start_time) @@ -448,6 +464,47 @@ impl ScanRegion { .map(Arc::new) } + /// Use the latest schema to build the bloom filter index applier. + fn build_bloom_filter_applier(&self) -> Option { + if self.ignore_bloom_filter { + return None; + } + + let file_cache = || -> Option { + let cache_manager = self.cache_manager.as_ref()?; + let write_cache = cache_manager.write_cache()?; + let file_cache = write_cache.file_cache(); + Some(file_cache) + }(); + + let index_cache = self + .cache_manager + .as_ref() + .and_then(|c| c.bloom_filter_index_cache()) + .cloned(); + + let puffin_metadata_cache = self + .cache_manager + .as_ref() + .and_then(|c| c.puffin_metadata_cache()) + .cloned(); + + BloomFilterIndexApplierBuilder::new( + self.access_layer.region_dir().to_string(), + self.access_layer.object_store().clone(), + self.version.metadata.as_ref(), + self.access_layer.puffin_manager_factory().clone(), + ) + .with_file_cache(file_cache) + .with_bloom_filter_index_cache(index_cache) + .with_puffin_metadata_cache(puffin_metadata_cache) + .build(&self.request.filters) + .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier")) + .ok() + .flatten() + .map(Arc::new) + } + /// Use the latest schema to build the fulltext index applier. fn build_fulltext_index_applier(&self) -> Option { if self.ignore_fulltext_index { @@ -501,6 +558,7 @@ pub(crate) struct ScanInput { pub(crate) parallel_scan_channel_size: usize, /// Index appliers. inverted_index_applier: Option, + bloom_filter_index_applier: Option, fulltext_index_applier: Option, /// Start time of the query. pub(crate) query_start: Option, @@ -529,6 +587,7 @@ impl ScanInput { ignore_file_not_found: false, parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, inverted_index_applier: None, + bloom_filter_index_applier: None, fulltext_index_applier: None, query_start: None, append_mode: false, @@ -600,6 +659,16 @@ impl ScanInput { self } + /// Sets bloom filter applier. + #[must_use] + pub(crate) fn with_bloom_filter_index_applier( + mut self, + applier: Option, + ) -> Self { + self.bloom_filter_index_applier = applier; + self + } + /// Sets fulltext index applier. #[must_use] pub(crate) fn with_fulltext_index_applier( @@ -694,6 +763,7 @@ impl ScanInput { .projection(Some(self.mapper.column_ids().to_vec())) .cache(self.cache_manager.clone()) .inverted_index_applier(self.inverted_index_applier.clone()) + .bloom_filter_index_applier(self.bloom_filter_index_applier.clone()) .fulltext_index_applier(self.fulltext_index_applier.clone()) .expected_metadata(Some(self.mapper.metadata().clone())) .build_reader_input(reader_metrics) diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 844d3e5d08f8..33166d99cd86 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -143,8 +143,8 @@ pub enum IndexType { InvertedIndex, /// Full-text index. FulltextIndex, - /// Bloom filter. - BloomFilter, + /// Bloom Filter index + BloomFilterIndex, } impl FileMeta { @@ -158,9 +158,10 @@ impl FileMeta { self.available_indexes.contains(&IndexType::FulltextIndex) } - /// Returns true if the file has a bloom filter - pub fn bloom_filter_available(&self) -> bool { - self.available_indexes.contains(&IndexType::BloomFilter) + /// Returns true if the file has a bloom filter index. + pub fn bloom_filter_index_available(&self) -> bool { + self.available_indexes + .contains(&IndexType::BloomFilterIndex) } /// Returns the size of the inverted index file @@ -180,6 +181,15 @@ impl FileMeta { None } } + + /// Returns the size of the bloom filter index file + pub fn bloom_filter_index_size(&self) -> Option { + if self.available_indexes.len() == 1 && self.bloom_filter_index_available() { + Some(self.index_file_size) + } else { + None + } + } } /// Handle to a SST file. diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index b6eac91e56d9..0b2822c04a7c 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -44,7 +44,7 @@ use crate::sst::index::inverted_index::creator::InvertedIndexer; pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index"; pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index"; -pub(crate) const TYPE_BLOOM_FILTER: &str = "bloom_filter"; +pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index"; /// Output of the index creation. #[derive(Debug, Clone, Default)] @@ -69,7 +69,7 @@ impl IndexOutput { indexes.push(IndexType::FulltextIndex); } if self.bloom_filter.is_available() { - indexes.push(IndexType::BloomFilter); + indexes.push(IndexType::BloomFilterIndex); } indexes } @@ -162,7 +162,7 @@ impl Indexer { .as_ref() .map_or(0, |creator| creator.memory_usage()); INDEX_CREATE_MEMORY_USAGE - .with_label_values(&[TYPE_BLOOM_FILTER]) + .with_label_values(&[TYPE_BLOOM_FILTER_INDEX]) .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64); self.last_mem_bloom_filter = bloom_filter_mem; } diff --git a/src/mito2/src/sst/index/bloom_filter.rs b/src/mito2/src/sst/index/bloom_filter.rs index 347195a3b16b..7f454937f0e8 100644 --- a/src/mito2/src/sst/index/bloom_filter.rs +++ b/src/mito2/src/sst/index/bloom_filter.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod applier; pub(crate) mod creator; const INDEX_BLOB_TYPE: &str = "greptime-bloom-filter-v1"; diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs new file mode 100644 index 000000000000..3476ec097243 --- /dev/null +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -0,0 +1,722 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::sync::Arc; + +use common_base::range_read::RangeReader; +use common_telemetry::warn; +use datafusion_common::ScalarValue; +use datafusion_expr::expr::InList; +use datafusion_expr::{BinaryExpr, Expr, Operator}; +use datatypes::data_type::ConcreteDataType; +use datatypes::value::Value; +use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate, Predicate}; +use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl}; +use object_store::ObjectStore; +use parquet::arrow::arrow_reader::RowSelection; +use parquet::file::metadata::RowGroupMetaData; +use puffin::puffin_manager::cache::PuffinMetadataCacheRef; +use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader}; +use snafu::{OptionExt, ResultExt}; +use store_api::metadata::RegionMetadata; +use store_api::storage::{ColumnId, RegionId}; + +use super::INDEX_BLOB_TYPE; +use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; +use crate::cache::index::bloom_filter_index::{ + BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, +}; +use crate::error::{ + ApplyBloomFilterIndexSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, MetadataSnafu, + PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result, +}; +use crate::metrics::INDEX_APPLY_ELAPSED; +use crate::row_converter::SortField; +use crate::sst::file::FileId; +use crate::sst::index::codec::IndexValueCodec; +use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory}; +use crate::sst::index::TYPE_BLOOM_FILTER_INDEX; +use crate::sst::location; + +pub(crate) type BloomFilterIndexApplierRef = Arc; + +pub struct BloomFilterIndexApplier { + region_dir: String, + region_id: RegionId, + object_store: ObjectStore, + file_cache: Option, + puffin_manager_factory: PuffinManagerFactory, + puffin_metadata_cache: Option, + bloom_filter_index_cache: Option, + filters: HashMap>, +} + +impl BloomFilterIndexApplier { + pub fn new( + region_dir: String, + region_id: RegionId, + object_store: ObjectStore, + puffin_manager_factory: PuffinManagerFactory, + filters: HashMap>, + ) -> Self { + Self { + region_dir, + region_id, + object_store, + file_cache: None, + puffin_manager_factory, + puffin_metadata_cache: None, + bloom_filter_index_cache: None, + filters, + } + } + + pub fn with_file_cache(mut self, file_cache: Option) -> Self { + self.file_cache = file_cache; + self + } + + pub fn with_puffin_metadata_cache( + mut self, + puffin_metadata_cache: Option, + ) -> Self { + self.puffin_metadata_cache = puffin_metadata_cache; + self + } + + pub fn with_bloom_filter_cache( + mut self, + bloom_filter_index_cache: Option, + ) -> Self { + self.bloom_filter_index_cache = bloom_filter_index_cache; + self + } + + /// Applies bloom filter predicates to the provided SST file and returns a bitmap + /// indicating which segments may contain matching rows + pub async fn apply( + &self, + file_id: FileId, + file_size_hint: Option, + row_group_metas: &[RowGroupMetaData], + basement: &mut BTreeMap>, + ) -> Result<()> { + let _timer = INDEX_APPLY_ELAPSED + .with_label_values(&[TYPE_BLOOM_FILTER_INDEX]) + .start_timer(); + + for (column_id, predicates) in &self.filters { + let mut blob = match self.cached_blob_reader(file_id, *column_id).await { + Ok(Some(puffin_reader)) => puffin_reader, + other => { + if let Err(err) = other { + warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.") + } + self.remote_blob_reader(file_id, *column_id, file_size_hint) + .await? + } + }; + + // Create appropriate reader based on whether we have caching enabled + if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache { + let file_size = if let Some(file_size) = file_size_hint { + file_size + } else { + blob.metadata().await.context(MetadataSnafu)?.content_length + }; + let reader = CachedBloomFilterIndexBlobReader::new( + file_id, + *column_id, + file_size, + BloomFilterReaderImpl::new(blob), + bloom_filter_cache.clone(), + ); + self.apply_filters(reader, predicates, row_group_metas, basement) + .await + .context(ApplyBloomFilterIndexSnafu)?; + } else { + let reader = BloomFilterReaderImpl::new(blob); + self.apply_filters(reader, predicates, row_group_metas, basement) + .await + .context(ApplyBloomFilterIndexSnafu)?; + } + } + + Ok(()) + } + + /// Creates a blob reader from the cached index file + async fn cached_blob_reader( + &self, + file_id: FileId, + column_id: ColumnId, + ) -> Result> { + let Some(file_cache) = &self.file_cache else { + return Ok(None); + }; + + let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin); + if file_cache.get(index_key).await.is_none() { + return Ok(None); + }; + + let puffin_manager = self.puffin_manager_factory.build(file_cache.local_store()); + let puffin_file_name = file_cache.cache_file_path(index_key); + + let reader = puffin_manager + .reader(&puffin_file_name) + .await + .context(PuffinBuildReaderSnafu)? + .blob(&Self::column_blob_name(column_id)) + .await + .context(PuffinReadBlobSnafu)? + .reader() + .await + .context(PuffinBuildReaderSnafu)?; + Ok(Some(reader)) + } + + // TODO(ruihang): use the same util with the code in creator + fn column_blob_name(column_id: ColumnId) -> String { + format!("{INDEX_BLOB_TYPE}-{column_id}") + } + + /// Creates a blob reader from the remote index file + async fn remote_blob_reader( + &self, + file_id: FileId, + column_id: ColumnId, + file_size_hint: Option, + ) -> Result { + let puffin_manager = self + .puffin_manager_factory + .build(self.object_store.clone()) + .with_puffin_metadata_cache(self.puffin_metadata_cache.clone()); + + let file_path = location::index_file_path(&self.region_dir, file_id); + puffin_manager + .reader(&file_path) + .await + .context(PuffinBuildReaderSnafu)? + .with_file_size_hint(file_size_hint) + .blob(&Self::column_blob_name(column_id)) + .await + .context(PuffinReadBlobSnafu)? + .reader() + .await + .context(PuffinBuildReaderSnafu) + } + + async fn apply_filters( + &self, + reader: R, + predicates: &[Predicate], + row_group_metas: &[RowGroupMetaData], + basement: &mut BTreeMap>, + ) -> std::result::Result<(), index::bloom_filter::error::Error> { + let mut applier = BloomFilterApplier::new(Box::new(reader)).await?; + + for predicate in predicates { + match predicate { + Predicate::InList(in_list) => { + applier + .search(&in_list.list, row_group_metas, basement) + .await?; + } + } + } + + Ok(()) + } +} + +pub struct BloomFilterIndexApplierBuilder<'a> { + region_dir: String, + object_store: ObjectStore, + metadata: &'a RegionMetadata, + puffin_manager_factory: PuffinManagerFactory, + file_cache: Option, + puffin_metadata_cache: Option, + bloom_filter_index_cache: Option, + output: HashMap>, +} + +impl<'a> BloomFilterIndexApplierBuilder<'a> { + pub fn new( + region_dir: String, + object_store: ObjectStore, + metadata: &'a RegionMetadata, + puffin_manager_factory: PuffinManagerFactory, + ) -> Self { + Self { + region_dir, + object_store, + metadata, + puffin_manager_factory, + file_cache: None, + puffin_metadata_cache: None, + bloom_filter_index_cache: None, + output: HashMap::default(), + } + } + + pub fn with_file_cache(mut self, file_cache: Option) -> Self { + self.file_cache = file_cache; + self + } + + pub fn with_puffin_metadata_cache( + mut self, + puffin_metadata_cache: Option, + ) -> Self { + self.puffin_metadata_cache = puffin_metadata_cache; + self + } + + pub fn with_bloom_filter_index_cache( + mut self, + bloom_filter_index_cache: Option, + ) -> Self { + self.bloom_filter_index_cache = bloom_filter_index_cache; + self + } + + /// Builds the applier with given filter expressions + pub fn build(mut self, exprs: &[Expr]) -> Result> { + for expr in exprs { + self.traverse_and_collect(expr); + } + + if self.output.is_empty() { + return Ok(None); + } + + let applier = BloomFilterIndexApplier::new( + self.region_dir, + self.metadata.region_id, + self.object_store, + self.puffin_manager_factory, + self.output, + ) + .with_file_cache(self.file_cache) + .with_puffin_metadata_cache(self.puffin_metadata_cache) + .with_bloom_filter_cache(self.bloom_filter_index_cache); + + Ok(Some(applier)) + } + + /// Recursively traverses expressions to collect bloom filter predicates + fn traverse_and_collect(&mut self, expr: &Expr) { + let res = match expr { + Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op { + Operator::And => { + self.traverse_and_collect(left); + self.traverse_and_collect(right); + Ok(()) + } + Operator::Eq => self.collect_eq(left, right), + _ => Ok(()), + }, + Expr::InList(in_list) => self.collect_in_list(in_list), + _ => Ok(()), + }; + + if let Err(err) = res { + warn!(err; "Failed to collect bloom filter predicates, ignore it. expr: {expr}"); + } + } + + /// Helper function to get the column id and type + fn column_id_and_type( + &self, + column_name: &str, + ) -> Result> { + let column = self + .metadata + .column_by_name(column_name) + .context(ColumnNotFoundSnafu { + column: column_name, + })?; + + Ok(Some(( + column.column_id, + column.column_schema.data_type.clone(), + ))) + } + + /// Collects an equality expression (column = value) + fn collect_eq(&mut self, left: &Expr, right: &Expr) -> Result<()> { + let (col, lit) = match (left, right) { + (Expr::Column(col), Expr::Literal(lit)) => (col, lit), + (Expr::Literal(lit), Expr::Column(col)) => (col, lit), + _ => return Ok(()), + }; + if lit.is_null() { + return Ok(()); + } + let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else { + return Ok(()); + }; + let value = encode_lit(lit, data_type)?; + + // Create bloom filter predicate + let mut set = HashSet::new(); + set.insert(value); + let predicate = Predicate::InList(InListPredicate { list: set }); + + // Add to output predicates + self.output.entry(column_id).or_default().push(predicate); + + Ok(()) + } + + /// Collects an in list expression in the form of `column IN (lit, lit, ...)`. + fn collect_in_list(&mut self, in_list: &InList) -> Result<()> { + // Only collect InList predicates if they reference a column + let Expr::Column(column) = &in_list.expr.as_ref() else { + return Ok(()); + }; + if in_list.list.is_empty() || in_list.negated { + return Ok(()); + } + + let Some((column_id, data_type)) = self.column_id_and_type(&column.name)? else { + return Ok(()); + }; + + // Convert all non-null literals to predicates + let predicates = in_list + .list + .iter() + .filter_map(Self::nonnull_lit) + .map(|lit| encode_lit(lit, data_type.clone())); + + // Collect successful conversions + let mut valid_predicates = HashSet::new(); + for predicate in predicates { + match predicate { + Ok(p) => { + valid_predicates.insert(p); + } + Err(e) => warn!(e; "Failed to convert value in InList"), + } + } + + if !valid_predicates.is_empty() { + self.output + .entry(column_id) + .or_default() + .push(Predicate::InList(InListPredicate { + list: valid_predicates, + })); + } + + Ok(()) + } + + /// Helper function to get non-null literal value + fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> { + match expr { + Expr::Literal(lit) if !lit.is_null() => Some(lit), + _ => None, + } + } +} + +// TODO(ruihang): extract this and the one under inverted_index into a common util mod. +/// Helper function to encode a literal into bytes. +fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result> { + let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?; + let mut bytes = vec![]; + let field = SortField::new(data_type); + IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)?; + Ok(bytes) +} + +#[cfg(test)] +mod tests { + use api::v1::SemanticType; + use datafusion_common::Column; + use datatypes::schema::ColumnSchema; + use object_store::services::Memory; + use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; + + use super::*; + + fn test_region_metadata() -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "column1", + ConcreteDataType::string_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "column2", + ConcreteDataType::int64_datatype(), + false, + ), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "column3", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 3, + }) + .primary_key(vec![1]); + builder.build().unwrap() + } + + fn test_object_store() -> ObjectStore { + ObjectStore::new(Memory::default()).unwrap().finish() + } + + fn column(name: &str) -> Expr { + Expr::Column(Column { + relation: None, + name: name.to_string(), + }) + } + + fn string_lit(s: impl Into) -> Expr { + Expr::Literal(ScalarValue::Utf8(Some(s.into()))) + } + + #[test] + fn test_build_with_exprs() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_exprs_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column1")), + op: Operator::Eq, + right: Box::new(string_lit("value1")), + })]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_some()); + + let filters = result.unwrap().filters; + assert_eq!(filters.len(), 1); + + let column_predicates = filters.get(&1).unwrap(); + assert_eq!(column_predicates.len(), 1); + + let expected = encode_lit( + &ScalarValue::Utf8(Some("value1".to_string())), + ConcreteDataType::string_datatype(), + ) + .unwrap(); + match &column_predicates[0] { + Predicate::InList(p) => { + assert_eq!(p.list.iter().next().unwrap(), &expected); + } + } + } + + fn int64_lit(i: i64) -> Expr { + Expr::Literal(ScalarValue::Int64(Some(i))) + } + + #[test] + fn test_build_with_in_list() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_in_list_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![Expr::InList(InList { + expr: Box::new(column("column2")), + list: vec![int64_lit(1), int64_lit(2), int64_lit(3)], + negated: false, + })]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_some()); + + let filters = result.unwrap().filters; + let column_predicates = filters.get(&2).unwrap(); + assert_eq!(column_predicates.len(), 1); + + match &column_predicates[0] { + Predicate::InList(p) => { + assert_eq!(p.list.len(), 3); + } + } + } + + #[test] + fn test_build_with_and_expressions() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_and_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column1")), + op: Operator::Eq, + right: Box::new(string_lit("value1")), + })), + op: Operator::And, + right: Box::new(Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column2")), + op: Operator::Eq, + right: Box::new(int64_lit(42)), + })), + })]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_some()); + + let filters = result.unwrap().filters; + assert_eq!(filters.len(), 2); + assert!(filters.contains_key(&1)); + assert!(filters.contains_key(&2)); + } + + #[test] + fn test_build_with_null_values() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_null_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![ + Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column1")), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::Utf8(None))), + }), + Expr::InList(InList { + expr: Box::new(column("column2")), + list: vec![ + int64_lit(1), + Expr::Literal(ScalarValue::Int64(None)), + int64_lit(3), + ], + negated: false, + }), + ]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_some()); + + let filters = result.unwrap().filters; + assert!(!filters.contains_key(&1)); // Null equality should be ignored + let column2_predicates = filters.get(&2).unwrap(); + match &column2_predicates[0] { + Predicate::InList(p) => { + assert_eq!(p.list.len(), 2); // Only non-null values should be included + } + } + } + + #[test] + fn test_build_with_invalid_expressions() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_invalid_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![ + // Non-equality operator + Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column1")), + op: Operator::Gt, + right: Box::new(string_lit("value1")), + }), + // Non-existent column + Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("non_existent")), + op: Operator::Eq, + right: Box::new(string_lit("value")), + }), + // Negated IN list + Expr::InList(InList { + expr: Box::new(column("column2")), + list: vec![int64_lit(1), int64_lit(2)], + negated: true, + }), + ]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_none()); + } + + #[test] + fn test_build_with_multiple_predicates_same_column() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_multiple_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![ + Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column1")), + op: Operator::Eq, + right: Box::new(string_lit("value1")), + }), + Expr::InList(InList { + expr: Box::new(column("column1")), + list: vec![string_lit("value2"), string_lit("value3")], + negated: false, + }), + ]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_some()); + + let filters = result.unwrap().filters; + let column_predicates = filters.get(&1).unwrap(); + assert_eq!(column_predicates.len(), 2); + } +} diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs index 8c56800f47e7..d1b73a0bde25 100644 --- a/src/mito2/src/sst/index/bloom_filter/creator.rs +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -39,7 +39,7 @@ use crate::sst::index::intermediate::{ }; use crate::sst::index::puffin_manager::SstPuffinWriter; use crate::sst::index::statistics::{ByteCount, RowCount, Statistics}; -use crate::sst::index::TYPE_BLOOM_FILTER; +use crate::sst::index::TYPE_BLOOM_FILTER_INDEX; /// The buffer size for the pipe used to send index data to the puffin blob. const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192; @@ -114,7 +114,7 @@ impl BloomFilterIndexer { temp_file_provider, codec, aborted: false, - stats: Statistics::new(TYPE_BLOOM_FILTER), + stats: Statistics::new(TYPE_BLOOM_FILTER_INDEX), global_memory_usage, }; Ok(Some(indexer)) diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 5527752a8885..5931658879ad 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -51,6 +51,7 @@ use crate::read::prune::{PruneReader, Source}; use crate::read::{Batch, BatchReader}; use crate::row_converter::{McmpRowCodec, SortField}; use crate::sst::file::FileHandle; +use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef; use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef; use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef}; @@ -80,6 +81,7 @@ pub struct ParquetReaderBuilder { cache_manager: Option, /// Index appliers. inverted_index_applier: Option, + bloom_filter_index_applier: Option, fulltext_index_applier: Option, /// Expected metadata of the region while reading the SST. /// This is usually the latest metadata of the region. The reader use @@ -102,6 +104,7 @@ impl ParquetReaderBuilder { projection: None, cache_manager: None, inverted_index_applier: None, + bloom_filter_index_applier: None, fulltext_index_applier: None, expected_metadata: None, } @@ -140,6 +143,16 @@ impl ParquetReaderBuilder { self } + /// Attaches the bloom filter index applier to the builder. + #[must_use] + pub(crate) fn bloom_filter_index_applier( + mut self, + index_applier: Option, + ) -> Self { + self.bloom_filter_index_applier = index_applier; + self + } + /// Attaches the fulltext index applier to the builder. #[must_use] pub(crate) fn fulltext_index_applier( @@ -359,6 +372,9 @@ impl ParquetReaderBuilder { self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics); } + self.prune_row_groups_by_bloom_filter(parquet_meta, &mut output, metrics) + .await; + output } @@ -607,6 +623,53 @@ impl ParquetReaderBuilder { *output = res; } + async fn prune_row_groups_by_bloom_filter( + &self, + parquet_meta: &ParquetMetaData, + output: &mut BTreeMap>, + _metrics: &mut ReaderFilterMetrics, + ) -> bool { + let Some(index_applier) = &self.bloom_filter_index_applier else { + return false; + }; + + if !self.file_handle.meta_ref().bloom_filter_index_available() { + return false; + } + + let file_size_hint = self.file_handle.meta_ref().bloom_filter_index_size(); + match index_applier + .apply( + self.file_handle.file_id(), + file_size_hint, + parquet_meta.row_groups(), + output, + ) + .await + { + Ok(output) => output, + Err(err) => { + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to apply bloom filter index, region_id: {}, file_id: {}, err: {}", + self.file_handle.region_id(), + self.file_handle.file_id(), + err + ); + } else { + warn!( + err; "Failed to apply bloom filter index, region_id: {}, file_id: {}", + self.file_handle.region_id(), self.file_handle.file_id() + ); + } + + return false; + } + }; + + true + } + /// Prunes row groups by ranges. The `ranges_in_row_groups` is like a map from row group to /// a list of row ranges to keep. fn prune_row_groups_by_ranges( diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 14b4bb4a9109..9eb77d706df3 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -643,7 +643,6 @@ impl TestEnv { .await .unwrap(); - let object_store_manager = self.get_object_store_manager().unwrap(); let write_cache = WriteCache::new(local_store, capacity, None, puffin_mgr, intm_mgr) .await .unwrap();