diff --git a/src/index/src/bloom_filter/applier.rs b/src/index/src/bloom_filter/applier.rs index d4f7b293bc32..2750cbb92b6b 100644 --- a/src/index/src/bloom_filter/applier.rs +++ b/src/index/src/bloom_filter/applier.rs @@ -48,6 +48,21 @@ impl BloomFilterApplier { Ok(Self { reader, meta }) } + /// Searches for matching row groups using bloom filters. + /// + /// This method applies bloom filter index to eliminate row groups that definitely + /// don't contain the searched values. It works by: + /// + /// 1. Computing prefix sums for row counts + /// 2. Calculating bloom filter segment locations for each row group + /// 1. A row group may span multiple bloom filter segments + /// 3. Probing bloom filter segments + /// 4. Removing non-matching row groups from the basement + /// 1. If a row group doesn't match any bloom filter segment with any probe, it is removed + /// + /// # Note + /// The method modifies the `basement` map in-place by removing row groups that + /// don't match the bloom filter criteria. pub async fn search( &mut self, probes: &HashSet, @@ -89,16 +104,19 @@ impl BloomFilterApplier { }; let bloom = self.reader.bloom_filter(&loc).await?; - // Check if all probes exist in bloom filter - let mut matches = true; + // Check if any probe exists in bloom filter + let mut matches = false; for probe in probes { - if !bloom.contains(probe) { - matches = false; + if bloom.contains(probe) { + matches = true; break; } } is_any_range_hit |= matches; + if matches { + break; + } } if !is_any_range_hit { row_groups_to_remove.insert(row_group_idx); diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 1557b764e053..0820d99337ec 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -562,13 +562,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to build bloom filter index applier"))] - BuildBloomFilterIndexApplier { - source: index::bloom_filter::error::Error, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to convert value"))] ConvertValue { source: datatypes::error::Error, @@ -1036,9 +1029,7 @@ impl ErrorExt for Error { EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound, ArrowReader { .. } => StatusCode::StorageUnavailable, ConvertValue { source, .. } => source.status_code(), - BuildBloomFilterIndexApplier { source, .. } | ApplyBloomFilterIndex { source, .. } => { - source.status_code() - } + ApplyBloomFilterIndex { source, .. } => source.status_code(), BuildIndexApplier { source, .. } | PushIndexValue { source, .. } | ApplyInvertedIndex { source, .. } diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index 8fbbbb1eb350..3476ec097243 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -541,4 +541,182 @@ mod tests { } } } + + fn int64_lit(i: i64) -> Expr { + Expr::Literal(ScalarValue::Int64(Some(i))) + } + + #[test] + fn test_build_with_in_list() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_in_list_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![Expr::InList(InList { + expr: Box::new(column("column2")), + list: vec![int64_lit(1), int64_lit(2), int64_lit(3)], + negated: false, + })]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_some()); + + let filters = result.unwrap().filters; + let column_predicates = filters.get(&2).unwrap(); + assert_eq!(column_predicates.len(), 1); + + match &column_predicates[0] { + Predicate::InList(p) => { + assert_eq!(p.list.len(), 3); + } + } + } + + #[test] + fn test_build_with_and_expressions() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_and_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column1")), + op: Operator::Eq, + right: Box::new(string_lit("value1")), + })), + op: Operator::And, + right: Box::new(Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column2")), + op: Operator::Eq, + right: Box::new(int64_lit(42)), + })), + })]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_some()); + + let filters = result.unwrap().filters; + assert_eq!(filters.len(), 2); + assert!(filters.contains_key(&1)); + assert!(filters.contains_key(&2)); + } + + #[test] + fn test_build_with_null_values() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_null_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![ + Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column1")), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::Utf8(None))), + }), + Expr::InList(InList { + expr: Box::new(column("column2")), + list: vec![ + int64_lit(1), + Expr::Literal(ScalarValue::Int64(None)), + int64_lit(3), + ], + negated: false, + }), + ]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_some()); + + let filters = result.unwrap().filters; + assert!(!filters.contains_key(&1)); // Null equality should be ignored + let column2_predicates = filters.get(&2).unwrap(); + match &column2_predicates[0] { + Predicate::InList(p) => { + assert_eq!(p.list.len(), 2); // Only non-null values should be included + } + } + } + + #[test] + fn test_build_with_invalid_expressions() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_invalid_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![ + // Non-equality operator + Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column1")), + op: Operator::Gt, + right: Box::new(string_lit("value1")), + }), + // Non-existent column + Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("non_existent")), + op: Operator::Eq, + right: Box::new(string_lit("value")), + }), + // Negated IN list + Expr::InList(InList { + expr: Box::new(column("column2")), + list: vec![int64_lit(1), int64_lit(2)], + negated: true, + }), + ]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_none()); + } + + #[test] + fn test_build_with_multiple_predicates_same_column() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_multiple_"); + let metadata = test_region_metadata(); + let builder = BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory, + ); + + let exprs = vec![ + Expr::BinaryExpr(BinaryExpr { + left: Box::new(column("column1")), + op: Operator::Eq, + right: Box::new(string_lit("value1")), + }), + Expr::InList(InList { + expr: Box::new(column("column1")), + list: vec![string_lit("value2"), string_lit("value3")], + negated: false, + }), + ]; + + let result = builder.build(&exprs).unwrap(); + assert!(result.is_some()); + + let filters = result.unwrap().filters; + let column_predicates = filters.get(&1).unwrap(); + assert_eq!(column_predicates.len(), 2); + } }