Skip to content

Commit

Permalink
fix: cr comments and supports like predicate
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Jan 4, 2024
1 parent a3496c6 commit b8f5a73
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 44 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ testing = []
[dependencies]
api.workspace = true
arc-swap = "1.0"
arrow.workspace = true
arrow-schema.workspace = true
async-stream.workspace = true
async-trait = "0.1"
Expand Down
39 changes: 19 additions & 20 deletions src/catalog/src/information_schema/key_column_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ use crate::error::{
use crate::information_schema::{InformationTable, Predicates};
use crate::CatalogManager;

const CONSTRAINT_SCHEMA: &str = "constraint_schema";
const CONSTRAINT_NAME: &str = "constraint_name";
const TABLE_SCHEMA: &str = "table_schema";
const TABLE_NAME: &str = "table_name";
const COLUMN_NAME: &str = "column_name";
const ORDINAL_POSITION: &str = "ordinal_position";

/// The virtual table implementation for `information_schema.KEY_COLUMN_USAGE`.
pub(super) struct InformationSchemaKeyColumnUsage {
schema: SchemaRef,
Expand All @@ -61,24 +68,16 @@ impl InformationSchemaKeyColumnUsage {
false,
),
ColumnSchema::new(
"constraint_schema",
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
"constraint_name",
CONSTRAINT_SCHEMA,
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(CONSTRAINT_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new("table_catalog", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("table_schema", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("table_name", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("column_name", ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
"ordinal_position",
ConcreteDataType::uint32_datatype(),
false,
),
ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(COLUMN_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(ORDINAL_POSITION, ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new(
"position_in_unique_constraint",
ConcreteDataType::uint32_datatype(),
Expand Down Expand Up @@ -280,12 +279,12 @@ impl InformationSchemaKeyColumnUsageBuilder {
ordinal_position: u32,
) {
let row = [
("constraint_schema", &Value::from(constraint_schema)),
("constraint_name", &Value::from(constraint_name)),
("table_schema", &Value::from(table_schema)),
("table_name", &Value::from(table_name)),
("column_name", &Value::from(column_name)),
("ordinal_position", &Value::from(ordinal_position)),
(CONSTRAINT_SCHEMA, &Value::from(constraint_schema)),
(CONSTRAINT_NAME, &Value::from(constraint_name)),
(TABLE_SCHEMA, &Value::from(table_schema)),
(TABLE_NAME, &Value::from(table_name)),
(COLUMN_NAME, &Value::from(column_name)),
(ORDINAL_POSITION, &Value::from(ordinal_position)),
];

if !predicates.eval(&row) {
Expand Down
158 changes: 152 additions & 6 deletions src/catalog/src/information_schema/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use arrow::array::StringArray;
use arrow::compute::kernels::comparison;
use common_query::logical_plan::DfExpr;
use datafusion::common::ScalarValue;
use datafusion::logical_expr::expr::Like;
use datafusion::logical_expr::Operator;
use datatypes::value::Value;
use store_api::storage::ScanRequest;
Expand All @@ -24,6 +28,7 @@ type ColumnName = String;
#[derive(Clone, PartialEq, Eq, Debug)]
enum Predicate {
Eq(ColumnName, Value),
Like(ColumnName, String, bool),
NotEq(ColumnName, Value),
InList(ColumnName, Vec<Value>),
And(Box<Predicate>, Box<Predicate>),
Expand All @@ -46,6 +51,19 @@ impl Predicate {
return Some(v == *value);
}
}
Predicate::Like(c, pattern, case_insenstive) => {

Check warning on line 54 in src/catalog/src/information_schema/predicate.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"insenstive" should be "insensitive".
for (column, value) in row {
if c != column {
continue;
}

let Value::String(bs) = value else {
continue;
};

return like_utf8(bs.as_utf8(), pattern, case_insenstive);

Check warning on line 64 in src/catalog/src/information_schema/predicate.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"insenstive" should be "insensitive".
}
}
Predicate::NotEq(c, v) => {
for (column, value) in row {
if c != column {
Expand All @@ -63,17 +81,29 @@ impl Predicate {
}
}
Predicate::And(left, right) => {
return match (left.eval(row), right.eval(row)) {
let left = left.eval(row);

// short-circuit
if matches!(left, Some(false)) {
return Some(false);
}

return match (left, right.eval(row)) {
(Some(left), Some(right)) => Some(left && right),
(Some(false), None) => Some(false),
(None, Some(false)) => Some(false),
_ => None,
};
}
Predicate::Or(left, right) => {
return match (left.eval(row), right.eval(row)) {
let left = left.eval(row);

// short-circuit
if matches!(left, Some(true)) {
return Some(true);
}

return match (left, right.eval(row)) {
(Some(left), Some(right)) => Some(left || right),
(Some(true), None) => Some(true),
(None, Some(true)) => Some(true),
_ => None,
};
Expand Down Expand Up @@ -102,6 +132,30 @@ impl Predicate {

Some(Predicate::Not(Box::new(p)))
}
// expr LIKE pattern
DfExpr::Like(Like {
negated,
expr,
pattern,
case_insensitive,
..
}) if is_column(&expr) && is_string_literal(&pattern) => {
// Safety: ensured by gurad
let DfExpr::Column(c) = *expr else {
unreachable!();
};
let DfExpr::Literal(ScalarValue::Utf8(Some(pattern))) = *pattern else {
unreachable!();
};

let p = Predicate::Like(c.name, pattern, case_insensitive);

if negated {
Some(Predicate::Not(Box::new(p)))
} else {
Some(p)
}
}
// left OP right
DfExpr::BinaryExpr(bin) => match (*bin.left, bin.op, *bin.right) {
// left == right
Expand Down Expand Up @@ -183,6 +237,34 @@ impl Predicate {
}
}

/// Perform SQL left LIKE right, return `None` if fail to evaluate.
/// - `s` the target string
/// - `pattern` the pattern just like '%abc'
/// - `case_insenstive` whether to perform case-insensitive like or not.

Check warning on line 243 in src/catalog/src/information_schema/predicate.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"insenstive" should be "insensitive".
fn like_utf8(s: &str, pattern: &str, case_insenstive: &bool) -> Option<bool> {

Check warning on line 244 in src/catalog/src/information_schema/predicate.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"insenstive" should be "insensitive".
let array = StringArray::from(vec![s]);
let patterns = StringArray::new_scalar(pattern);

let Ok(booleans) = (if *case_insenstive {

Check warning on line 248 in src/catalog/src/information_schema/predicate.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"insenstive" should be "insensitive".
comparison::ilike(&array, &patterns)
} else {
comparison::like(&array, &patterns)
}) else {
return None;
};

// Safty: at least one value in result

Check warning on line 256 in src/catalog/src/information_schema/predicate.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"Safty" should be "Safety".
Some(booleans.value(0))
}

fn is_string_literal(expr: &DfExpr) -> bool {
matches!(expr, DfExpr::Literal(ScalarValue::Utf8(Some(_))))
}

fn is_column(expr: &DfExpr) -> bool {
matches!(expr, DfExpr::Column(_))
}

/// A list of predicate
pub struct Predicates {
predicates: Vec<Predicate>,
Expand Down Expand Up @@ -324,6 +406,70 @@ mod tests {
.is_none());
}

#[test]
fn test_predicate_like() {
// case insenstive

Check warning on line 411 in src/catalog/src/information_schema/predicate.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"insenstive" should be "insensitive".
let expr = DfExpr::Like(Like {
negated: false,
expr: Box::new(column("a")),
pattern: Box::new(string_literal("%abc")),
case_insensitive: true,
escape_char: None,
});

let p = Predicate::from_expr(expr).unwrap();
assert!(
matches!(&p, Predicate::Like(c, pattern, case_insensitive) if
c == "a"
&& pattern == "%abc"
&& *case_insensitive)
);

let match_row = [
("a", &Value::from("hello AbC")),
("b", &Value::from("b value")),
];
let unmatch_row = [("a", &Value::from("bca")), ("b", &Value::from("b value"))];

assert!(p.eval(&match_row).unwrap());
assert!(!p.eval(&unmatch_row).unwrap());
assert!(p.eval(&[]).is_none());

// case senstive

Check warning on line 438 in src/catalog/src/information_schema/predicate.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"senstive" should be "sensitive".
let expr = DfExpr::Like(Like {
negated: false,
expr: Box::new(column("a")),
pattern: Box::new(string_literal("%abc")),
case_insensitive: false,
escape_char: None,
});

let p = Predicate::from_expr(expr).unwrap();
assert!(
matches!(&p, Predicate::Like(c, pattern, case_insensitive) if
c == "a"
&& pattern == "%abc"
&& !*case_insensitive)
);
assert!(!p.eval(&match_row).unwrap());
assert!(!p.eval(&unmatch_row).unwrap());
assert!(p.eval(&[]).is_none());

// not like
let expr = DfExpr::Like(Like {
negated: true,
expr: Box::new(column("a")),
pattern: Box::new(string_literal("%abc")),
case_insensitive: true,
escape_char: None,
});

let p = Predicate::from_expr(expr).unwrap();
assert!(!p.eval(&match_row).unwrap());
assert!(p.eval(&unmatch_row).unwrap());
assert!(p.eval(&[]).is_none());
}

fn column(name: &str) -> DfExpr {
DfExpr::Column(Column {
relation: None,
Expand Down Expand Up @@ -435,11 +581,11 @@ mod tests {
assert_eq!(2, predicates.predicates.len());
assert!(
matches!(&predicates.predicates[0], Predicate::Eq(column, v) if column == "a"
&& match_string_value(v, "a_value"))
&& match_string_value(v, "a_value"))
);
assert!(
matches!(&predicates.predicates[1], Predicate::NotEq(column, v) if column == "b"
&& match_string_value(v, "b_value"))
&& match_string_value(v, "b_value"))
);
}

Expand Down
21 changes: 13 additions & 8 deletions src/catalog/src/information_schema/schemata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ use crate::error::{
use crate::information_schema::{InformationTable, Predicates};
use crate::CatalogManager;

const CATALOG_NAME: &str = "catalog_name";
const SCHEMA_NAME: &str = "schema_name";
const DEFAULT_CHARACTER_SET_NAME: &str = "default_character_set_name";
const DEFAULT_COLLATION_NAME: &str = "default_collation_name";

/// The `information_schema.schemata` table implementation.
pub(super) struct InformationSchemaSchemata {
schema: SchemaRef,
Expand All @@ -55,15 +60,15 @@ impl InformationSchemaSchemata {

pub(crate) fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
ColumnSchema::new("catalog_name", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("schema_name", ConcreteDataType::string_datatype(), false),
ColumnSchema::new(CATALOG_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(SCHEMA_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
"default_character_set_name",
DEFAULT_CHARACTER_SET_NAME,
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
"default_collation_name",
DEFAULT_COLLATION_NAME,
ConcreteDataType::string_datatype(),
false,
),
Expand Down Expand Up @@ -172,10 +177,10 @@ impl InformationSchemaSchemataBuilder {

fn add_schema(&mut self, predicates: &Predicates, catalog_name: &str, schema_name: &str) {
let row = [
("catalog_name", &Value::from(catalog_name)),
("schema_name", &Value::from(schema_name)),
("default_character_set_name", &Value::from("utf8")),
("default_collation_name", &Value::from("utf8_bin")),
(CATALOG_NAME, &Value::from(catalog_name)),
(SCHEMA_NAME, &Value::from(schema_name)),
(DEFAULT_CHARACTER_SET_NAME, &Value::from("utf8")),
(DEFAULT_COLLATION_NAME, &Value::from("utf8_bin")),
];

if !predicates.eval(&row) {
Expand Down
27 changes: 17 additions & 10 deletions src/catalog/src/information_schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ use crate::error::{
use crate::information_schema::{InformationTable, Predicates};
use crate::CatalogManager;

const TABLE_CATALOG: &str = "table_catalog";
const TABLE_SCHEMA: &str = "table_schema";
const TABLE_NAME: &str = "table_name";
const TABLE_TYPE: &str = "table_type";
const TABLE_ID: &str = "table_id";
const ENGINE: &str = "engine";

pub(super) struct InformationSchemaTables {
schema: SchemaRef,
catalog_name: String,
Expand All @@ -55,12 +62,12 @@ impl InformationSchemaTables {

pub(crate) fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
ColumnSchema::new("table_catalog", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("table_schema", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("table_name", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("table_type", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("table_id", ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new("engine", ConcreteDataType::string_datatype(), true),
ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true),
]))
}

Expand Down Expand Up @@ -204,10 +211,10 @@ impl InformationSchemaTablesBuilder {
};

let row = [
("table_catalog", &Value::from(catalog_name)),
("table_schema", &Value::from(schema_name)),
("table_name", &Value::from(table_name)),
("table_type", &Value::from(table_type)),
(TABLE_CATALOG, &Value::from(catalog_name)),
(TABLE_SCHEMA, &Value::from(schema_name)),
(TABLE_NAME, &Value::from(table_name)),
(TABLE_TYPE, &Value::from(table_type)),
];

if !predicates.eval(&row) {
Expand Down
Loading

0 comments on commit b8f5a73

Please sign in to comment.