Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inverted_index): Add applier builder to convert Expr to Predicates (Part 2) #3068

133 changes: 102 additions & 31 deletions src/mito2/src/sst/index/applier/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,18 @@
// limitations under the License.

mod between;

// TODO(zhongzc): This PR is too large. The following modules are coming soon.

// mod comparison;
// mod eq_list;
// mod in_list;
// mod regex_match;
mod comparison;
mod eq_list;
mod in_list;
mod regex_match;

use std::collections::HashMap;

use api::v1::SemanticType;
use common_query::logical_plan::Expr;
use common_telemetry::warn;
use datafusion_common::ScalarValue;
use datafusion_expr::Expr as DfExpr;
use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator};
use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use index::inverted_index::search::index_apply::PredicatesIndexApplier;
Expand Down Expand Up @@ -99,23 +96,21 @@ impl<'a> SstIndexApplierBuilder<'a> {
let res = match expr {
DfExpr::Between(between) => self.collect_between(between),

// TODO(zhongzc): This PR is too large. The following arms are coming soon.

// DfExpr::InList(in_list) => self.collect_inlist(in_list),
// DfExpr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
// Operator::And => {
// self.traverse_and_collect(left);
// self.traverse_and_collect(right);
// Ok(())
// }
// Operator::Or => self.collect_or_eq_list(left, right),
// Operator::Eq => self.collect_eq(left, right),
// Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => {
// self.collect_comparison_expr(left, op, right)
// }
// Operator::RegexMatch => self.collect_regex_match(left, right),
// _ => Ok(()),
// },
DfExpr::InList(in_list) => self.collect_inlist(in_list),
DfExpr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
Operator::And => {
self.traverse_and_collect(left);
self.traverse_and_collect(right);
Ok(())
}
Operator::Or => self.collect_or_eq_list(left, right),
Operator::Eq => self.collect_eq(left, right),
Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => {
self.collect_comparison_expr(left, op, right)
}
Operator::RegexMatch => self.collect_regex_match(left, right),
_ => Ok(()),
},

// TODO(zhongzc): support more expressions, e.g. IsNull, IsNotNull, ...
_ => Ok(()),
Expand Down Expand Up @@ -180,8 +175,12 @@ impl<'a> SstIndexApplierBuilder<'a> {
mod tests {
use api::v1::SemanticType;
use datafusion_common::Column;
use datafusion_expr::Between;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use index::inverted_index::search::predicate::{
Bound, Range, RangePredicate, RegexMatchPredicate,
};
use object_store::services::Memory;
use object_store::ObjectStore;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
Expand All @@ -198,20 +197,25 @@ mod tests {
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Field,
column_schema: ColumnSchema::new("b", ConcreteDataType::int64_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("c", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 3,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"c",
"d",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 3,
column_id: 4,
})
.primary_key(vec![1]);
.primary_key(vec![1, 2]);
builder.build().unwrap()
}

Expand All @@ -226,13 +230,20 @@ mod tests {
})
}

pub(crate) fn field_column() -> DfExpr {
pub(crate) fn tag_column2() -> DfExpr {
DfExpr::Column(Column {
relation: None,
name: "b".to_string(),
})
}

pub(crate) fn field_column() -> DfExpr {
DfExpr::Column(Column {
relation: None,
name: "c".to_string(),
})
}

pub(crate) fn nonexistent_column() -> DfExpr {
DfExpr::Column(Column {
relation: None,
Expand All @@ -258,4 +269,64 @@ mod tests {
.unwrap();
bytes
}

pub(crate) fn encoded_int64(s: impl Into<i64>) -> Vec<u8> {
let mut bytes = vec![];
IndexValueCodec::encode_value(
Value::from(s.into()).as_value_ref(),
&SortField::new(ConcreteDataType::int64_datatype()),
&mut bytes,
)
.unwrap();
bytes
}

#[test]
fn test_collect_and_basic() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);

let expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(tag_column()),
op: Operator::RegexMatch,
right: Box::new(string_lit("bar")),
})),
op: Operator::And,
right: Box::new(DfExpr::Between(Between {
expr: Box::new(tag_column2()),
negated: false,
low: Box::new(int64_lit(123)),
high: Box::new(int64_lit(456)),
})),
});

builder.traverse_and_collect(&expr);
let predicates = builder.output.get("a").unwrap();
assert_eq!(predicates.len(), 1);
assert_eq!(
predicates[0],
Predicate::RegexMatch(RegexMatchPredicate {
pattern: "bar".to_string()
})
);
let predicates = builder.output.get("b").unwrap();
assert_eq!(predicates.len(), 1);
assert_eq!(
predicates[0],
Predicate::Range(RangePredicate {
range: Range {
lower: Some(Bound {
inclusive: true,
value: encoded_int64(123),
}),
upper: Some(Bound {
inclusive: true,
value: encoded_int64(456),
}),
}
})
);
}
}
Loading
Loading