Skip to content

Commit

Permalink
add more tests and fix bloom filter logic
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia committed Dec 25, 2024
1 parent 2fcbbe0 commit 7a2be74
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 14 deletions.
26 changes: 22 additions & 4 deletions src/index/src/bloom_filter/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,21 @@ impl BloomFilterApplier {
Ok(Self { reader, meta })
}

/// Searches for matching row groups using bloom filters.
///
/// This method applies bloom filter index to eliminate row groups that definitely
/// don't contain the searched values. It works by:
///
/// 1. Computing prefix sums for row counts
/// 2. Calculating bloom filter segment locations for each row group
/// 1. A row group may span multiple bloom filter segments
/// 3. Probing bloom filter segments
/// 4. Removing non-matching row groups from the basement
/// 1. If a row group doesn't match any bloom filter segment with any probe, it is removed
///
/// # Note
/// The method modifies the `basement` map in-place by removing row groups that
/// don't match the bloom filter criteria.
pub async fn search(
&mut self,
probes: &HashSet<Bytes>,
Expand Down Expand Up @@ -89,16 +104,19 @@ impl BloomFilterApplier {
};
let bloom = self.reader.bloom_filter(&loc).await?;

// Check if all probes exist in bloom filter
let mut matches = true;
// Check if any probe exists in bloom filter
let mut matches = false;
for probe in probes {
if !bloom.contains(probe) {
matches = false;
if bloom.contains(probe) {
matches = true;
break;
}
}

is_any_range_hit |= matches;
if matches {
break;
}
}
if !is_any_range_hit {
row_groups_to_remove.insert(row_group_idx);
Expand Down
11 changes: 1 addition & 10 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,13 +562,6 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to build bloom filter index applier"))]
BuildBloomFilterIndexApplier {
source: index::bloom_filter::error::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to convert value"))]
ConvertValue {
source: datatypes::error::Error,
Expand Down Expand Up @@ -1036,9 +1029,7 @@ impl ErrorExt for Error {
EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound,
ArrowReader { .. } => StatusCode::StorageUnavailable,
ConvertValue { source, .. } => source.status_code(),
BuildBloomFilterIndexApplier { source, .. } | ApplyBloomFilterIndex { source, .. } => {
source.status_code()
}
ApplyBloomFilterIndex { source, .. } => source.status_code(),
BuildIndexApplier { source, .. }
| PushIndexValue { source, .. }
| ApplyInvertedIndex { source, .. }
Expand Down
178 changes: 178 additions & 0 deletions src/mito2/src/sst/index/bloom_filter/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,4 +541,182 @@ mod tests {
}
}
}

fn int64_lit(i: i64) -> Expr {
Expr::Literal(ScalarValue::Int64(Some(i)))
}

#[test]
fn test_build_with_in_list() {
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_in_list_");
let metadata = test_region_metadata();
let builder = BloomFilterIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
factory,
);

let exprs = vec![Expr::InList(InList {
expr: Box::new(column("column2")),
list: vec![int64_lit(1), int64_lit(2), int64_lit(3)],
negated: false,
})];

let result = builder.build(&exprs).unwrap();
assert!(result.is_some());

let filters = result.unwrap().filters;
let column_predicates = filters.get(&2).unwrap();
assert_eq!(column_predicates.len(), 1);

match &column_predicates[0] {
Predicate::InList(p) => {
assert_eq!(p.list.len(), 3);
}
}
}

#[test]
fn test_build_with_and_expressions() {
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_and_");
let metadata = test_region_metadata();
let builder = BloomFilterIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
factory,
);

let exprs = vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("column1")),
op: Operator::Eq,
right: Box::new(string_lit("value1")),
})),
op: Operator::And,
right: Box::new(Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("column2")),
op: Operator::Eq,
right: Box::new(int64_lit(42)),
})),
})];

let result = builder.build(&exprs).unwrap();
assert!(result.is_some());

let filters = result.unwrap().filters;
assert_eq!(filters.len(), 2);
assert!(filters.contains_key(&1));
assert!(filters.contains_key(&2));
}

#[test]
fn test_build_with_null_values() {
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_null_");
let metadata = test_region_metadata();
let builder = BloomFilterIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
factory,
);

let exprs = vec![
Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("column1")),
op: Operator::Eq,
right: Box::new(Expr::Literal(ScalarValue::Utf8(None))),
}),
Expr::InList(InList {
expr: Box::new(column("column2")),
list: vec![
int64_lit(1),
Expr::Literal(ScalarValue::Int64(None)),
int64_lit(3),
],
negated: false,
}),
];

let result = builder.build(&exprs).unwrap();
assert!(result.is_some());

let filters = result.unwrap().filters;
assert!(!filters.contains_key(&1)); // Null equality should be ignored
let column2_predicates = filters.get(&2).unwrap();
match &column2_predicates[0] {
Predicate::InList(p) => {
assert_eq!(p.list.len(), 2); // Only non-null values should be included
}
}
}

#[test]
fn test_build_with_invalid_expressions() {
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_invalid_");
let metadata = test_region_metadata();
let builder = BloomFilterIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
factory,
);

let exprs = vec![
// Non-equality operator
Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("column1")),
op: Operator::Gt,
right: Box::new(string_lit("value1")),
}),
// Non-existent column
Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("non_existent")),
op: Operator::Eq,
right: Box::new(string_lit("value")),
}),
// Negated IN list
Expr::InList(InList {
expr: Box::new(column("column2")),
list: vec![int64_lit(1), int64_lit(2)],
negated: true,
}),
];

let result = builder.build(&exprs).unwrap();
assert!(result.is_none());
}

#[test]
fn test_build_with_multiple_predicates_same_column() {
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_multiple_");
let metadata = test_region_metadata();
let builder = BloomFilterIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
factory,
);

let exprs = vec![
Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("column1")),
op: Operator::Eq,
right: Box::new(string_lit("value1")),
}),
Expr::InList(InList {
expr: Box::new(column("column1")),
list: vec![string_lit("value2"), string_lit("value3")],
negated: false,
}),
];

let result = builder.build(&exprs).unwrap();
assert!(result.is_some());

let filters = result.unwrap().filters;
let column_predicates = filters.get(&1).unwrap();
assert_eq!(column_predicates.len(), 2);
}
}

0 comments on commit 7a2be74

Please sign in to comment.