From b9302e4f0d431be5c50a1e8c53b5aaf8bfe84db4 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Wed, 3 Jan 2024 13:14:40 +0800 Subject: [PATCH] feat(inverted_index): Add applier builder to convert Expr to Predicates (Part 2) (#3068) * feat(inverted_index.integration): Add applier builder to convert Expr to Predicates (Part 1) Signed-off-by: Zhenchi * feat(inverted_index.integration): Add applier builder to convert Expr to Predicates (Part 2) Signed-off-by: Zhenchi * test: add comparison unit tests Signed-off-by: Zhenchi * test: add eq_list unit tests Signed-off-by: Zhenchi * test: add in_list unit tests Signed-off-by: Zhenchi * test: add and unit tests Signed-off-by: Zhenchi * test: strip tests Signed-off-by: Zhenchi * fix: address comments Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- src/mito2/src/sst/index/applier/builder.rs | 133 ++++++-- .../sst/index/applier/builder/comparison.rs | 280 ++++++++++++++++ .../src/sst/index/applier/builder/eq_list.rs | 302 ++++++++++++++++++ .../src/sst/index/applier/builder/in_list.rs | 152 +++++++++ .../sst/index/applier/builder/regex_match.rs | 111 +++++++ 5 files changed, 947 insertions(+), 31 deletions(-) create mode 100644 src/mito2/src/sst/index/applier/builder/comparison.rs create mode 100644 src/mito2/src/sst/index/applier/builder/eq_list.rs create mode 100644 src/mito2/src/sst/index/applier/builder/in_list.rs create mode 100644 src/mito2/src/sst/index/applier/builder/regex_match.rs diff --git a/src/mito2/src/sst/index/applier/builder.rs b/src/mito2/src/sst/index/applier/builder.rs index 52af22effb18..240846a044c2 100644 --- a/src/mito2/src/sst/index/applier/builder.rs +++ b/src/mito2/src/sst/index/applier/builder.rs @@ -13,13 +13,10 @@ // limitations under the License. mod between; - -// TODO(zhongzc): This PR is too large. The following modules are coming soon. - -// mod comparison; -// mod eq_list; -// mod in_list; -// mod regex_match; +mod comparison; +mod eq_list; +mod in_list; +mod regex_match; use std::collections::HashMap; @@ -27,7 +24,7 @@ use api::v1::SemanticType; use common_query::logical_plan::Expr; use common_telemetry::warn; use datafusion_common::ScalarValue; -use datafusion_expr::Expr as DfExpr; +use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator}; use datatypes::data_type::ConcreteDataType; use datatypes::value::Value; use index::inverted_index::search::index_apply::PredicatesIndexApplier; @@ -99,23 +96,21 @@ impl<'a> SstIndexApplierBuilder<'a> { let res = match expr { DfExpr::Between(between) => self.collect_between(between), - // TODO(zhongzc): This PR is too large. The following arms are coming soon. - - // DfExpr::InList(in_list) => self.collect_inlist(in_list), - // DfExpr::BinaryExpr(BinaryExpr { left, op, right }) => match op { - // Operator::And => { - // self.traverse_and_collect(left); - // self.traverse_and_collect(right); - // Ok(()) - // } - // Operator::Or => self.collect_or_eq_list(left, right), - // Operator::Eq => self.collect_eq(left, right), - // Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => { - // self.collect_comparison_expr(left, op, right) - // } - // Operator::RegexMatch => self.collect_regex_match(left, right), - // _ => Ok(()), - // }, + DfExpr::InList(in_list) => self.collect_inlist(in_list), + DfExpr::BinaryExpr(BinaryExpr { left, op, right }) => match op { + Operator::And => { + self.traverse_and_collect(left); + self.traverse_and_collect(right); + Ok(()) + } + Operator::Or => self.collect_or_eq_list(left, right), + Operator::Eq => self.collect_eq(left, right), + Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => { + self.collect_comparison_expr(left, op, right) + } + Operator::RegexMatch => self.collect_regex_match(left, right), + _ => Ok(()), + }, // TODO(zhongzc): support more expressions, e.g. IsNull, IsNotNull, ... _ => Ok(()), @@ -180,8 +175,12 @@ impl<'a> SstIndexApplierBuilder<'a> { mod tests { use api::v1::SemanticType; use datafusion_common::Column; + use datafusion_expr::Between; use datatypes::data_type::ConcreteDataType; use datatypes::schema::ColumnSchema; + use index::inverted_index::search::predicate::{ + Bound, Range, RangePredicate, RegexMatchPredicate, + }; use object_store::services::Memory; use object_store::ObjectStore; use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; @@ -198,20 +197,25 @@ mod tests { column_id: 1, }) .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), false), - semantic_type: SemanticType::Field, + column_schema: ColumnSchema::new("b", ConcreteDataType::int64_datatype(), false), + semantic_type: SemanticType::Tag, column_id: 2, }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("c", ConcreteDataType::string_datatype(), false), + semantic_type: SemanticType::Field, + column_id: 3, + }) .push_column_metadata(ColumnMetadata { column_schema: ColumnSchema::new( - "c", + "d", ConcreteDataType::timestamp_millisecond_datatype(), false, ), semantic_type: SemanticType::Timestamp, - column_id: 3, + column_id: 4, }) - .primary_key(vec![1]); + .primary_key(vec![1, 2]); builder.build().unwrap() } @@ -226,13 +230,20 @@ mod tests { }) } - pub(crate) fn field_column() -> DfExpr { + pub(crate) fn tag_column2() -> DfExpr { DfExpr::Column(Column { relation: None, name: "b".to_string(), }) } + pub(crate) fn field_column() -> DfExpr { + DfExpr::Column(Column { + relation: None, + name: "c".to_string(), + }) + } + pub(crate) fn nonexistent_column() -> DfExpr { DfExpr::Column(Column { relation: None, @@ -258,4 +269,64 @@ mod tests { .unwrap(); bytes } + + pub(crate) fn encoded_int64(s: impl Into) -> Vec { + let mut bytes = vec![]; + IndexValueCodec::encode_value( + Value::from(s.into()).as_value_ref(), + &SortField::new(ConcreteDataType::int64_datatype()), + &mut bytes, + ) + .unwrap(); + bytes + } + + #[test] + fn test_collect_and_basic() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let expr = DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(tag_column()), + op: Operator::RegexMatch, + right: Box::new(string_lit("bar")), + })), + op: Operator::And, + right: Box::new(DfExpr::Between(Between { + expr: Box::new(tag_column2()), + negated: false, + low: Box::new(int64_lit(123)), + high: Box::new(int64_lit(456)), + })), + }); + + builder.traverse_and_collect(&expr); + let predicates = builder.output.get("a").unwrap(); + assert_eq!(predicates.len(), 1); + assert_eq!( + predicates[0], + Predicate::RegexMatch(RegexMatchPredicate { + pattern: "bar".to_string() + }) + ); + let predicates = builder.output.get("b").unwrap(); + assert_eq!(predicates.len(), 1); + assert_eq!( + predicates[0], + Predicate::Range(RangePredicate { + range: Range { + lower: Some(Bound { + inclusive: true, + value: encoded_int64(123), + }), + upper: Some(Bound { + inclusive: true, + value: encoded_int64(456), + }), + } + }) + ); + } } diff --git a/src/mito2/src/sst/index/applier/builder/comparison.rs b/src/mito2/src/sst/index/applier/builder/comparison.rs new file mode 100644 index 000000000000..e132c1c9281e --- /dev/null +++ b/src/mito2/src/sst/index/applier/builder/comparison.rs @@ -0,0 +1,280 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use datafusion_expr::{Expr as DfExpr, Operator}; +use index::inverted_index::search::predicate::{Bound, Predicate, Range, RangePredicate}; +use index::inverted_index::Bytes; + +use crate::error::Result; +use crate::sst::index::applier::builder::SstIndexApplierBuilder; + +impl<'a> SstIndexApplierBuilder<'a> { + /// Collects a comparison expression in the form of + /// `column < lit`, `column > lit`, `column <= lit`, `column >= lit`, + /// `lit < column`, `lit > column`, `lit <= column`, `lit >= column`. + pub(crate) fn collect_comparison_expr( + &mut self, + left: &DfExpr, + op: &Operator, + right: &DfExpr, + ) -> Result<()> { + match op { + Operator::Lt => { + if matches!(right, DfExpr::Column(_)) { + self.collect_column_gt_lit(right, left) + } else { + self.collect_column_lt_lit(left, right) + } + } + Operator::LtEq => { + if matches!(right, DfExpr::Column(_)) { + self.collect_column_ge_lit(right, left) + } else { + self.collect_column_le_lit(left, right) + } + } + Operator::Gt => { + if matches!(right, DfExpr::Column(_)) { + self.collect_column_lt_lit(right, left) + } else { + self.collect_column_gt_lit(left, right) + } + } + Operator::GtEq => { + if matches!(right, DfExpr::Column(_)) { + self.collect_column_le_lit(right, left) + } else { + self.collect_column_ge_lit(left, right) + } + } + _ => Ok(()), + } + } + + fn collect_column_lt_lit(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> { + self.collect_column_cmp_lit(left, right, |value| Range { + lower: None, + upper: Some(Bound { + inclusive: false, + value, + }), + }) + } + + fn collect_column_gt_lit(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> { + self.collect_column_cmp_lit(left, right, |value| Range { + lower: Some(Bound { + inclusive: false, + value, + }), + upper: None, + }) + } + + fn collect_column_le_lit(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> { + self.collect_column_cmp_lit(left, right, |value| Range { + lower: None, + upper: Some(Bound { + inclusive: true, + value, + }), + }) + } + + fn collect_column_ge_lit(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> { + self.collect_column_cmp_lit(left, right, |value| Range { + lower: Some(Bound { + inclusive: true, + value, + }), + upper: None, + }) + } + + fn collect_column_cmp_lit( + &mut self, + column: &DfExpr, + literal: &DfExpr, + range_builder: impl FnOnce(Bytes) -> Range, + ) -> Result<()> { + let Some(column_name) = Self::column_name(column) else { + return Ok(()); + }; + let Some(lit) = Self::nonnull_lit(literal) else { + return Ok(()); + }; + let Some(data_type) = self.tag_column_type(column_name)? else { + return Ok(()); + }; + + let predicate = Predicate::Range(RangePredicate { + range: range_builder(Self::encode_lit(lit, data_type)?), + }); + + self.add_predicate(column_name, predicate); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + + use super::*; + use crate::error::Error; + use crate::sst::index::applier::builder::tests::{ + encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column, + test_object_store, test_region_metadata, + }; + + #[test] + fn test_collect_comparison_basic() { + let cases = [ + ( + (&tag_column(), &Operator::Lt, &string_lit("123")), + Range { + lower: None, + upper: Some(Bound { + inclusive: false, + value: encoded_string("123"), + }), + }, + ), + ( + (&string_lit("123"), &Operator::Lt, &tag_column()), + Range { + lower: Some(Bound { + inclusive: false, + value: encoded_string("123"), + }), + upper: None, + }, + ), + ( + (&tag_column(), &Operator::LtEq, &string_lit("123")), + Range { + lower: None, + upper: Some(Bound { + inclusive: true, + value: encoded_string("123"), + }), + }, + ), + ( + (&string_lit("123"), &Operator::LtEq, &tag_column()), + Range { + lower: Some(Bound { + inclusive: true, + value: encoded_string("123"), + }), + upper: None, + }, + ), + ( + (&tag_column(), &Operator::Gt, &string_lit("123")), + Range { + lower: Some(Bound { + inclusive: false, + value: encoded_string("123"), + }), + upper: None, + }, + ), + ( + (&string_lit("123"), &Operator::Gt, &tag_column()), + Range { + lower: None, + upper: Some(Bound { + inclusive: false, + value: encoded_string("123"), + }), + }, + ), + ( + (&tag_column(), &Operator::GtEq, &string_lit("123")), + Range { + lower: Some(Bound { + inclusive: true, + value: encoded_string("123"), + }), + upper: None, + }, + ), + ( + (&string_lit("123"), &Operator::GtEq, &tag_column()), + Range { + lower: None, + upper: Some(Bound { + inclusive: true, + value: encoded_string("123"), + }), + }, + ), + ]; + + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + for ((left, op, right), _) in &cases { + builder.collect_comparison_expr(left, op, right).unwrap(); + } + + let predicates = builder.output.get("a").unwrap(); + assert_eq!(predicates.len(), cases.len()); + for ((_, expected), actual) in cases.into_iter().zip(predicates) { + assert_eq!( + actual, + &Predicate::Range(RangePredicate { range: expected }) + ); + } + } + + #[test] + fn test_collect_comparison_type_mismatch() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let res = builder.collect_comparison_expr(&tag_column(), &Operator::Lt, &int64_lit(10)); + assert!(matches!(res, Err(Error::FieldTypeMismatch { .. }))); + assert!(builder.output.is_empty()); + } + + #[test] + fn test_collect_comparison_field_column() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + builder + .collect_comparison_expr(&field_column(), &Operator::Lt, &string_lit("abc")) + .unwrap(); + assert!(builder.output.is_empty()); + } + + #[test] + fn test_collect_comparison_nonexistent_column() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let res = builder.collect_comparison_expr( + &nonexistent_column(), + &Operator::Lt, + &string_lit("abc"), + ); + assert!(matches!(res, Err(Error::ColumnNotFound { .. }))); + assert!(builder.output.is_empty()); + } +} diff --git a/src/mito2/src/sst/index/applier/builder/eq_list.rs b/src/mito2/src/sst/index/applier/builder/eq_list.rs new file mode 100644 index 000000000000..07e74e012db0 --- /dev/null +++ b/src/mito2/src/sst/index/applier/builder/eq_list.rs @@ -0,0 +1,302 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator}; +use datatypes::data_type::ConcreteDataType; +use index::inverted_index::search::predicate::{InListPredicate, Predicate}; +use index::inverted_index::Bytes; + +use crate::error::Result; +use crate::sst::index::applier::builder::SstIndexApplierBuilder; + +impl<'a> SstIndexApplierBuilder<'a> { + /// Collects an eq expression in the form of `column = lit`. + pub(crate) fn collect_eq(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> { + let Some(column_name) = Self::column_name(left).or_else(|| Self::column_name(right)) else { + return Ok(()); + }; + let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else { + return Ok(()); + }; + let Some(data_type) = self.tag_column_type(column_name)? else { + return Ok(()); + }; + + let predicate = Predicate::InList(InListPredicate { + list: HashSet::from_iter([Self::encode_lit(lit, data_type)?]), + }); + self.add_predicate(column_name, predicate); + Ok(()) + } + + /// Collects eq list in the form of `column = lit OR column = lit OR ...`. + pub(crate) fn collect_or_eq_list(&mut self, eq_expr: &DfExpr, or_list: &DfExpr) -> Result<()> { + let DfExpr::BinaryExpr(BinaryExpr { + left, + op: Operator::Eq, + right, + }) = eq_expr + else { + return Ok(()); + }; + + let Some(column_name) = Self::column_name(left).or_else(|| Self::column_name(right)) else { + return Ok(()); + }; + let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else { + return Ok(()); + }; + let Some(data_type) = self.tag_column_type(column_name)? else { + return Ok(()); + }; + + let bytes = Self::encode_lit(lit, data_type.clone())?; + let mut inlist = HashSet::from_iter([bytes]); + + if Self::collect_eq_list_inner(column_name, &data_type, or_list, &mut inlist)? { + let predicate = Predicate::InList(InListPredicate { list: inlist }); + self.add_predicate(column_name, predicate); + } + + Ok(()) + } + + /// Recursively collects eq list. + /// + /// Returns false if the expression doesn't match the form then + /// caller can safely ignore the expression. + fn collect_eq_list_inner( + column_name: &str, + data_type: &ConcreteDataType, + expr: &DfExpr, + inlist: &mut HashSet, + ) -> Result { + let DfExpr::BinaryExpr(BinaryExpr { + left, + op: op @ (Operator::Eq | Operator::Or), + right, + }) = expr + else { + return Ok(false); + }; + + if op == &Operator::Or { + let r = Self::collect_eq_list_inner(column_name, data_type, left, inlist)? + .then(|| Self::collect_eq_list_inner(column_name, data_type, right, inlist)) + .transpose()? + .unwrap_or(false); + return Ok(r); + } + + if op == &Operator::Eq { + let Some(name) = Self::column_name(left).or_else(|| Self::column_name(right)) else { + return Ok(false); + }; + if column_name != name { + return Ok(false); + } + let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else { + return Ok(false); + }; + + inlist.insert(Self::encode_lit(lit, data_type.clone())?); + return Ok(true); + } + + Ok(false) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::error::Error; + use crate::sst::index::applier::builder::tests::{ + encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column, + tag_column2, test_object_store, test_region_metadata, + }; + + #[test] + fn test_collect_eq_basic() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + builder + .collect_eq(&tag_column(), &string_lit("foo")) + .unwrap(); + builder + .collect_eq(&string_lit("bar"), &tag_column()) + .unwrap(); + + let predicates = builder.output.get("a").unwrap(); + assert_eq!(predicates.len(), 2); + assert_eq!( + predicates[0], + Predicate::InList(InListPredicate { + list: HashSet::from_iter([encoded_string("foo")]) + }) + ); + assert_eq!( + predicates[1], + Predicate::InList(InListPredicate { + list: HashSet::from_iter([encoded_string("bar")]) + }) + ); + } + + #[test] + fn test_collect_eq_field_column() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + builder + .collect_eq(&field_column(), &string_lit("abc")) + .unwrap(); + assert!(builder.output.is_empty()); + } + + #[test] + fn test_collect_eq_nonexistent_column() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let res = builder.collect_eq(&nonexistent_column(), &string_lit("abc")); + assert!(matches!(res, Err(Error::ColumnNotFound { .. }))); + assert!(builder.output.is_empty()); + } + + #[test] + fn test_collect_eq_type_mismatch() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let res = builder.collect_eq(&tag_column(), &int64_lit(1)); + assert!(matches!(res, Err(Error::FieldTypeMismatch { .. }))); + assert!(builder.output.is_empty()); + } + + #[test] + fn test_collect_or_eq_list_basic() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let eq_expr = DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(tag_column()), + op: Operator::Eq, + right: Box::new(string_lit("abc")), + }); + let or_eq_list = DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(tag_column()), + op: Operator::Eq, + right: Box::new(string_lit("foo")), + })), + op: Operator::Or, + right: Box::new(DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(tag_column()), + op: Operator::Eq, + right: Box::new(string_lit("bar")), + })), + op: Operator::Or, + right: Box::new(DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(tag_column()), + op: Operator::Eq, + right: Box::new(string_lit("baz")), + })), + })), + }); + + builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap(); + + let predicates = builder.output.get("a").unwrap(); + assert_eq!(predicates.len(), 1); + assert_eq!( + predicates[0], + Predicate::InList(InListPredicate { + list: HashSet::from_iter([ + encoded_string("abc"), + encoded_string("foo"), + encoded_string("bar"), + encoded_string("baz") + ]) + }) + ); + } + + #[test] + fn test_collect_or_eq_list_invalid_op() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let eq_expr = DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(tag_column()), + op: Operator::Eq, + right: Box::new(string_lit("abc")), + }); + let or_eq_list = DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(tag_column()), + op: Operator::Eq, + right: Box::new(string_lit("foo")), + })), + op: Operator::Or, + right: Box::new(DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(tag_column()), + op: Operator::Gt, // invalid op + right: Box::new(string_lit("foo")), + })), + }); + + builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap(); + assert!(builder.output.is_empty()); + } + + #[test] + fn test_collect_or_eq_list_multiple_columns() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let eq_expr = DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(tag_column()), + op: Operator::Eq, + right: Box::new(string_lit("abc")), + }); + let or_eq_list = DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(tag_column()), + op: Operator::Eq, + right: Box::new(string_lit("foo")), + })), + op: Operator::Or, + right: Box::new(DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(tag_column2()), // different column + op: Operator::Eq, + right: Box::new(string_lit("bar")), + })), + }); + + builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap(); + assert!(builder.output.is_empty()); + } +} diff --git a/src/mito2/src/sst/index/applier/builder/in_list.rs b/src/mito2/src/sst/index/applier/builder/in_list.rs new file mode 100644 index 000000000000..cfb2b8738f62 --- /dev/null +++ b/src/mito2/src/sst/index/applier/builder/in_list.rs @@ -0,0 +1,152 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use datafusion_expr::expr::InList; +use index::inverted_index::search::predicate::{InListPredicate, Predicate}; + +use crate::error::Result; +use crate::sst::index::applier::builder::SstIndexApplierBuilder; + +impl<'a> SstIndexApplierBuilder<'a> { + /// Collects an in list expression in the form of `column IN (lit, lit, ...)`. + pub(crate) fn collect_inlist(&mut self, inlist: &InList) -> Result<()> { + if inlist.negated { + return Ok(()); + } + let Some(column_name) = Self::column_name(&inlist.expr) else { + return Ok(()); + }; + let Some(data_type) = self.tag_column_type(column_name)? else { + return Ok(()); + }; + + let mut predicate = InListPredicate { + list: HashSet::with_capacity(inlist.list.len()), + }; + for lit in &inlist.list { + let Some(lit) = Self::nonnull_lit(lit) else { + return Ok(()); + }; + + predicate + .list + .insert(Self::encode_lit(lit, data_type.clone())?); + } + + self.add_predicate(column_name, Predicate::InList(predicate)); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::error::Error; + use crate::sst::index::applier::builder::tests::{ + encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column, + test_object_store, test_region_metadata, + }; + + #[test] + fn test_collect_in_list_basic() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let in_list = InList { + expr: Box::new(tag_column()), + list: vec![string_lit("foo"), string_lit("bar")], + negated: false, + }; + + builder.collect_inlist(&in_list).unwrap(); + + let predicates = builder.output.get("a").unwrap(); + assert_eq!(predicates.len(), 1); + assert_eq!( + predicates[0], + Predicate::InList(InListPredicate { + list: HashSet::from_iter([encoded_string("foo"), encoded_string("bar")]) + }) + ); + } + + #[test] + fn test_collect_in_list_negated() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let in_list = InList { + expr: Box::new(tag_column()), + list: vec![string_lit("foo"), string_lit("bar")], + negated: true, + }; + + builder.collect_inlist(&in_list).unwrap(); + assert!(builder.output.is_empty()); + } + + #[test] + fn test_collect_in_list_field_column() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let in_list = InList { + expr: Box::new(field_column()), + list: vec![string_lit("foo"), string_lit("bar")], + negated: false, + }; + + builder.collect_inlist(&in_list).unwrap(); + assert!(builder.output.is_empty()); + } + + #[test] + fn test_collect_in_list_type_mismatch() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let in_list = InList { + expr: Box::new(tag_column()), + list: vec![int64_lit(123), int64_lit(456)], + negated: false, + }; + + let res = builder.collect_inlist(&in_list); + assert!(matches!(res, Err(Error::FieldTypeMismatch { .. }))); + assert!(builder.output.is_empty()); + } + + #[test] + fn test_collect_in_list_nonexistent_column() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let in_list = InList { + expr: Box::new(nonexistent_column()), + list: vec![string_lit("foo"), string_lit("bar")], + negated: false, + }; + + let res = builder.collect_inlist(&in_list); + assert!(matches!(res, Err(Error::ColumnNotFound { .. }))); + assert!(builder.output.is_empty()); + } +} diff --git a/src/mito2/src/sst/index/applier/builder/regex_match.rs b/src/mito2/src/sst/index/applier/builder/regex_match.rs new file mode 100644 index 000000000000..1aa1cd9d95c1 --- /dev/null +++ b/src/mito2/src/sst/index/applier/builder/regex_match.rs @@ -0,0 +1,111 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use datafusion_common::ScalarValue; +use datafusion_expr::Expr as DfExpr; +use index::inverted_index::search::predicate::{Predicate, RegexMatchPredicate}; + +use crate::error::Result; +use crate::sst::index::applier::builder::SstIndexApplierBuilder; + +impl<'a> SstIndexApplierBuilder<'a> { + /// Collects a regex match expression in the form of `column ~ pattern`. + pub(crate) fn collect_regex_match(&mut self, column: &DfExpr, pattern: &DfExpr) -> Result<()> { + let Some(column_name) = Self::column_name(column) else { + return Ok(()); + }; + let Some(data_type) = self.tag_column_type(column_name)? else { + return Ok(()); + }; + if !data_type.is_string() { + return Ok(()); + } + let DfExpr::Literal(ScalarValue::Utf8(Some(pattern))) = pattern else { + return Ok(()); + }; + + let predicate = Predicate::RegexMatch(RegexMatchPredicate { + pattern: pattern.clone(), + }); + self.add_predicate(column_name, predicate); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::error::Error; + use crate::sst::index::applier::builder::tests::{ + field_column, int64_lit, nonexistent_column, string_lit, tag_column, test_object_store, + test_region_metadata, + }; + + #[test] + fn test_regex_match_basic() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + builder + .collect_regex_match(&tag_column(), &string_lit("abc")) + .unwrap(); + + let predicates = builder.output.get("a").unwrap(); + assert_eq!(predicates.len(), 1); + assert_eq!( + predicates[0], + Predicate::RegexMatch(RegexMatchPredicate { + pattern: "abc".to_string() + }) + ); + } + + #[test] + fn test_regex_match_field_column() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + builder + .collect_regex_match(&field_column(), &string_lit("abc")) + .unwrap(); + + assert!(builder.output.is_empty()); + } + + #[test] + fn test_regex_match_type_mismatch() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + builder + .collect_regex_match(&tag_column(), &int64_lit(123)) + .unwrap(); + + assert!(builder.output.is_empty()); + } + + #[test] + fn test_regex_match_type_nonexist_column() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let res = builder.collect_regex_match(&nonexistent_column(), &string_lit("abc")); + assert!(matches!(res, Err(Error::ColumnNotFound { .. }))); + assert!(builder.output.is_empty()); + } +}