From f982a77c278ea03b37c2775440086352a2e4e149 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Wed, 3 Jan 2024 16:31:45 +0800 Subject: [PATCH 01/14] feat: pushdown scan request to information_schema tables stream --- src/catalog/src/information_schema.rs | 6 +++--- src/catalog/src/information_schema/columns.rs | 4 ++-- src/catalog/src/information_schema/key_column_usage.rs | 4 ++-- src/catalog/src/information_schema/memory_table.rs | 4 ++-- src/catalog/src/information_schema/schemata.rs | 4 ++-- src/catalog/src/information_schema/tables.rs | 4 ++-- 6 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 7dbc71f6f465..1b4519715ca2 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -238,7 +238,7 @@ trait InformationTable { fn schema(&self) -> SchemaRef; - fn to_stream(&self) -> Result; + fn to_stream(&self, requst: ScanRequest) -> Result; fn table_type(&self) -> TableType { TableType::Temporary @@ -272,7 +272,7 @@ impl DataSource for InformationTableDataSource { &self, request: ScanRequest, ) -> std::result::Result { - let projection = request.projection; + let projection = request.projection.clone(); let projected_schema = match &projection { Some(projection) => self.try_project(projection)?, None => self.table.schema(), @@ -280,7 +280,7 @@ impl DataSource for InformationTableDataSource { let stream = self .table - .to_stream() + .to_stream(request) .map_err(BoxedError::new) .context(TablesRecordBatchSnafu) .map_err(BoxedError::new)? diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/information_schema/columns.rs index 27d4921928e8..1534d4f56808 100644 --- a/src/catalog/src/information_schema/columns.rs +++ b/src/catalog/src/information_schema/columns.rs @@ -31,7 +31,7 @@ use datatypes::scalars::ScalarVectorBuilder; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::{StringVectorBuilder, VectorRef}; use snafu::{OptionExt, ResultExt}; -use store_api::storage::TableId; +use store_api::storage::{ScanRequest, TableId}; use super::{InformationTable, COLUMNS}; use crate::error::{ @@ -102,7 +102,7 @@ impl InformationTable for InformationSchemaColumns { self.schema.clone() } - fn to_stream(&self) -> Result { + fn to_stream(&self, _request: ScanRequest) -> Result { let schema = self.schema.arrow_schema().clone(); let mut builder = self.builder(); let stream = Box::pin(DfRecordBatchStreamAdapter::new( diff --git a/src/catalog/src/information_schema/key_column_usage.rs b/src/catalog/src/information_schema/key_column_usage.rs index 7c8a4995acfd..75a46d4513ac 100644 --- a/src/catalog/src/information_schema/key_column_usage.rs +++ b/src/catalog/src/information_schema/key_column_usage.rs @@ -27,7 +27,7 @@ use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef}; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder}; use snafu::{OptionExt, ResultExt}; -use store_api::storage::TableId; +use store_api::storage::{ScanRequest, TableId}; use super::KEY_COLUMN_USAGE; use crate::error::{ @@ -123,7 +123,7 @@ impl InformationTable for InformationSchemaKeyColumnUsage { self.schema.clone() } - fn to_stream(&self) -> Result { + fn to_stream(&self, _requets: ScanRequest) -> Result { let schema = self.schema.arrow_schema().clone(); let mut builder = self.builder(); let stream = Box::pin(DfRecordBatchStreamAdapter::new( diff --git a/src/catalog/src/information_schema/memory_table.rs b/src/catalog/src/information_schema/memory_table.rs index cce53c88a73c..a39fe3c022b4 100644 --- a/src/catalog/src/information_schema/memory_table.rs +++ b/src/catalog/src/information_schema/memory_table.rs @@ -26,7 +26,7 @@ use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatc use datatypes::schema::SchemaRef; use datatypes::vectors::VectorRef; use snafu::ResultExt; -use store_api::storage::TableId; +use store_api::storage::{ScanRequest, TableId}; pub use tables::get_schema_columns; use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result}; @@ -74,7 +74,7 @@ impl InformationTable for MemoryTable { self.schema.clone() } - fn to_stream(&self) -> Result { + fn to_stream(&self, _requets: ScanRequest) -> Result { let schema = self.schema.arrow_schema().clone(); let mut builder = self.builder(); let stream = Box::pin(DfRecordBatchStreamAdapter::new( diff --git a/src/catalog/src/information_schema/schemata.rs b/src/catalog/src/information_schema/schemata.rs index 8f523f7a9383..46aa6e388d61 100644 --- a/src/catalog/src/information_schema/schemata.rs +++ b/src/catalog/src/information_schema/schemata.rs @@ -27,7 +27,7 @@ use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef}; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::StringVectorBuilder; use snafu::{OptionExt, ResultExt}; -use store_api::storage::TableId; +use store_api::storage::{ScanRequest, TableId}; use super::SCHEMATA; use crate::error::{ @@ -92,7 +92,7 @@ impl InformationTable for InformationSchemaSchemata { self.schema.clone() } - fn to_stream(&self) -> Result { + fn to_stream(&self, _request: ScanRequest) -> Result { let schema = self.schema.arrow_schema().clone(); let mut builder = self.builder(); let stream = Box::pin(DfRecordBatchStreamAdapter::new( diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs index d258dd490130..e243b685be62 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/information_schema/tables.rs @@ -27,7 +27,7 @@ use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef}; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder}; use snafu::{OptionExt, ResultExt}; -use store_api::storage::TableId; +use store_api::storage::{ScanRequest, TableId}; use table::metadata::TableType; use super::TABLES; @@ -85,7 +85,7 @@ impl InformationTable for InformationSchemaTables { self.schema.clone() } - fn to_stream(&self) -> Result { + fn to_stream(&self, _request: ScanRequest) -> Result { let schema = self.schema.arrow_schema().clone(); let mut builder = self.builder(); let stream = Box::pin(DfRecordBatchStreamAdapter::new( From 2bb2e8866e670ab8de03804b52e40b9cfd17f7a8 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Wed, 3 Jan 2024 18:56:53 +0800 Subject: [PATCH 02/14] feat: supports filter pushdown for columns --- src/catalog/src/information_schema.rs | 4 +- src/catalog/src/information_schema/columns.rs | 35 ++- .../src/information_schema/memory_table.rs | 4 +- .../src/information_schema/predicate.rs | 253 ++++++++++++++++++ 4 files changed, 282 insertions(+), 14 deletions(-) create mode 100644 src/catalog/src/information_schema/predicate.rs diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 1b4519715ca2..f8e379efc630 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -15,6 +15,7 @@ mod columns; mod key_column_usage; mod memory_table; +pub(crate) mod predicate; mod schemata; mod table_names; mod tables; @@ -29,6 +30,7 @@ use datatypes::schema::SchemaRef; use futures_util::StreamExt; use lazy_static::lazy_static; use paste::paste; +pub(crate) use predicate::Predicates; use snafu::ResultExt; use store_api::data_source::DataSource; use store_api::storage::{ScanRequest, TableId}; @@ -159,7 +161,7 @@ impl InformationSchemaProvider { fn build_table(&self, name: &str) -> Option { self.information_table(name).map(|table| { let table_info = Self::table_info(self.catalog_name.clone(), &table); - let filter_pushdown = FilterPushDownType::Unsupported; + let filter_pushdown = FilterPushDownType::Inexact; let thin_table = ThinTable::new(table_info, filter_pushdown); let data_source = Arc::new(InformationTableDataSource::new(table)); diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/information_schema/columns.rs index 1534d4f56808..3d5858798a5d 100644 --- a/src/catalog/src/information_schema/columns.rs +++ b/src/catalog/src/information_schema/columns.rs @@ -29,6 +29,7 @@ use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatc use datatypes::prelude::{ConcreteDataType, DataType}; use datatypes::scalars::ScalarVectorBuilder; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::value::Value; use datatypes::vectors::{StringVectorBuilder, VectorRef}; use snafu::{OptionExt, ResultExt}; use store_api::storage::{ScanRequest, TableId}; @@ -37,6 +38,7 @@ use super::{InformationTable, COLUMNS}; use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; +use crate::information_schema::Predicates; use crate::CatalogManager; pub(super) struct InformationSchemaColumns { @@ -102,14 +104,14 @@ impl InformationTable for InformationSchemaColumns { self.schema.clone() } - fn to_stream(&self, _request: ScanRequest) -> Result { + fn to_stream(&self, request: ScanRequest) -> Result { let schema = self.schema.arrow_schema().clone(); let mut builder = self.builder(); let stream = Box::pin(DfRecordBatchStreamAdapter::new( schema, futures::stream::once(async move { builder - .make_columns() + .make_columns(Some(request)) .await .map(|x| x.into_df_record_batch()) .map_err(Into::into) @@ -165,13 +167,15 @@ impl InformationSchemaColumnsBuilder { } /// Construct the `information_schema.columns` virtual table - async fn make_columns(&mut self) -> Result { + async fn make_columns(&mut self, request: Option) -> Result { let catalog_name = self.catalog_name.clone(); let catalog_manager = self .catalog_manager .upgrade() .context(UpgradeWeakCatalogManagerRefSnafu)?; + let predicates = Predicates::from_scan_request(&request); + for schema_name in catalog_manager.schema_names(&catalog_name).await? { if !catalog_manager .schema_exists(&catalog_name, &schema_name) @@ -200,13 +204,22 @@ impl InformationSchemaColumnsBuilder { SEMANTIC_TYPE_FIELD }; - self.add_column( - &catalog_name, - &schema_name, - &table_name, - semantic_type, - column, - ); + let row = [ + ("catalog_name", &Value::from(catalog_name.as_str())), + ("schema_name", &Value::from(schema_name.as_str())), + ("table_name", &Value::from(table_name.as_str())), + ("semantic_type", &Value::from(semantic_type)), + ]; + + if predicates.eval(&row) { + self.add_column( + &catalog_name, + &schema_name, + &table_name, + semantic_type, + column, + ); + } } } else { unreachable!(); @@ -279,7 +292,7 @@ impl DfPartitionStream for InformationSchemaColumns { schema, futures::stream::once(async move { builder - .make_columns() + .make_columns(None) .await .map(|x| x.into_df_record_batch()) .map_err(Into::into) diff --git a/src/catalog/src/information_schema/memory_table.rs b/src/catalog/src/information_schema/memory_table.rs index a39fe3c022b4..555850d44b8e 100644 --- a/src/catalog/src/information_schema/memory_table.rs +++ b/src/catalog/src/information_schema/memory_table.rs @@ -169,7 +169,7 @@ mod tests { assert_eq!("test", table.table_name()); assert_eq!(schema, InformationTable::schema(&table)); - let stream = table.to_stream().unwrap(); + let stream = table.to_stream(ScanRequest::default()).unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); @@ -198,7 +198,7 @@ mod tests { assert_eq!("test", table.table_name()); assert_eq!(schema, InformationTable::schema(&table)); - let stream = table.to_stream().unwrap(); + let stream = table.to_stream(ScanRequest::default()).unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/src/catalog/src/information_schema/predicate.rs b/src/catalog/src/information_schema/predicate.rs new file mode 100644 index 000000000000..1be50a56a842 --- /dev/null +++ b/src/catalog/src/information_schema/predicate.rs @@ -0,0 +1,253 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_query::logical_plan::DfExpr; +use datafusion::logical_expr::Operator; +use datatypes::value::Value; +use store_api::storage::ScanRequest; + +type ColumnName = String; +/// Predicate to filter information_schema tables stream, +/// we only support these simple predicates currently. +/// TODO(dennis): supports more predicate types. +#[derive(Clone, PartialEq, Eq, Debug)] +enum Predicate { + Eq(ColumnName, Value), + NotEq(ColumnName, Value), + InList(ColumnName, Vec), + And(Box, Box), + Or(Box, Box), + Not(Box), +} + +impl Predicate { + /// Evaluate the predicate with value, returns: + /// - `None` when the predicate can't run on value, + /// - `Some(true)` when the predicate is satisfied,. + /// - `Some(false)` when the predicate is not satisfied. + fn eval(&self, column: &str, value: &Value) -> Option { + match self { + Predicate::Eq(c, v) => { + if c != column { + return None; + } + Some(v == value) + } + Predicate::NotEq(c, v) => { + if c != column { + return None; + } + Some(v != value) + } + Predicate::InList(c, values) => { + if c != column { + return None; + } + Some(values.iter().any(|v| v == value)) + } + Predicate::And(left, right) => { + let Some(left) = left.eval(column, value) else { + return None; + }; + let Some(right) = right.eval(column, value) else { + return None; + }; + + Some(left && right) + } + Predicate::Or(left, right) => { + let Some(left) = left.eval(column, value) else { + return None; + }; + let Some(right) = right.eval(column, value) else { + return None; + }; + + Some(left || right) + } + Predicate::Not(p) => { + let Some(p) = p.eval(column, value) else { + return None; + }; + + Some(!p) + } + } + } + + // Try to create a predicate from datafusion `Expr`, return None if fails. + fn from_expr(expr: DfExpr) -> Option { + match expr { + // NOT expr + DfExpr::Not(expr) => { + let Some(p) = Self::from_expr(*expr) else { + return None; + }; + + Some(Predicate::Not(Box::new(p))) + } + // left OP right + DfExpr::BinaryExpr(bin) => match (*bin.left, bin.op, *bin.right) { + // left == right + (DfExpr::Literal(scalar), Operator::Eq, DfExpr::Column(c)) + | (DfExpr::Column(c), Operator::Eq, DfExpr::Literal(scalar)) => { + let Ok(v) = Value::try_from(scalar) else { + return None; + }; + + Some(Predicate::Eq(c.name, v)) + } + // left != right + (DfExpr::Literal(scalar), Operator::NotEq, DfExpr::Column(c)) + | (DfExpr::Column(c), Operator::NotEq, DfExpr::Literal(scalar)) => { + let Ok(v) = Value::try_from(scalar) else { + return None; + }; + + Some(Predicate::NotEq(c.name, v)) + } + // left AND right + (left, Operator::And, right) => { + let Some(left) = Self::from_expr(left) else { + return None; + }; + + let Some(right) = Self::from_expr(right) else { + return None; + }; + + Some(Predicate::And(Box::new(left), Box::new(right))) + } + // left OR right + (left, Operator::Or, right) => { + let Some(left) = Self::from_expr(left) else { + return None; + }; + + let Some(right) = Self::from_expr(right) else { + return None; + }; + + Some(Predicate::Or(Box::new(left), Box::new(right))) + } + _ => None, + }, + // [NOT] IN (LIST) + DfExpr::InList(list) => { + match (*list.expr, list.list, list.negated) { + // column [NOT] IN (v1, v2, v3, ...) + (DfExpr::Column(c), list, negated) if is_all_scalars(&list) => { + let mut values = Vec::with_capacity(list.len()); + for scalar in list { + // Safety: checked by `is_all_scalars` + let DfExpr::Literal(scalar) = scalar else { + unreachable!(); + }; + + let Ok(value) = Value::try_from(scalar) else { + return None; + }; + + values.push(value); + } + + let predicate = Predicate::InList(c.name, values); + + if negated { + Some(Predicate::Not(Box::new(predicate))) + } else { + Some(predicate) + } + } + _ => None, + } + } + _ => None, + } + } +} + +/// A list of predicate +pub struct Predicates { + predicates: Vec, +} + +impl Predicates { + /// Try its best to create predicates from `ScanRequest`. + pub fn from_scan_request(request: &Option) -> Predicates { + if let Some(request) = request { + let mut predicates = Vec::with_capacity(request.filters.len()); + + for filter in &request.filters { + if let Some(predicate) = Predicate::from_expr(filter.df_expr().clone()) { + predicates.push(predicate); + } + } + + Self { predicates } + } else { + Self { + predicates: Vec::new(), + } + } + } + + /// Evaluate the predicates with the column value, + /// returns true when all the predicates are satisfied or can't be evaluated. + fn eval_column(&self, column: &str, value: &Value) -> bool { + let mut result = true; + for predicate in &self.predicates { + match predicate.eval(column, value) { + Some(b) => { + result = result && b; + } + None => { + // Can't eval this predicate, continue + continue; + } + } + + if !result { + break; + } + } + + result + } + + /// Evaluate the predicates with the columns and values, + /// returns true when all the predicates are satisfied or can't be evaluated. + pub fn eval(&self, row: &[(&str, &Value)]) -> bool { + // fast path + if self.predicates.is_empty() { + return true; + } + + let mut result = true; + for (column, value) in row { + result = result && self.eval_column(column, value); + + if !result { + break; + } + } + + result + } +} + +/// Returns true when the values are all `ScalarValue`. +fn is_all_scalars(list: &[DfExpr]) -> bool { + list.iter().all(|v| matches!(v, DfExpr::Literal(_))) +} From 8abcb534169f1294e811282d90fa56f86931f215 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Wed, 3 Jan 2024 19:22:57 +0800 Subject: [PATCH 03/14] feat: supports filter pushdown for some information_schema tables --- src/catalog/src/information_schema/columns.rs | 38 +++++++------ .../information_schema/key_column_usage.rs | 55 +++++++++++-------- .../src/information_schema/schemata.rs | 27 ++++++--- src/catalog/src/information_schema/tables.rs | 38 +++++++++---- 4 files changed, 101 insertions(+), 57 deletions(-) diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/information_schema/columns.rs index 3d5858798a5d..d9e4e8105afb 100644 --- a/src/catalog/src/information_schema/columns.rs +++ b/src/catalog/src/information_schema/columns.rs @@ -173,7 +173,6 @@ impl InformationSchemaColumnsBuilder { .catalog_manager .upgrade() .context(UpgradeWeakCatalogManagerRefSnafu)?; - let predicates = Predicates::from_scan_request(&request); for schema_name in catalog_manager.schema_names(&catalog_name).await? { @@ -204,22 +203,14 @@ impl InformationSchemaColumnsBuilder { SEMANTIC_TYPE_FIELD }; - let row = [ - ("catalog_name", &Value::from(catalog_name.as_str())), - ("schema_name", &Value::from(schema_name.as_str())), - ("table_name", &Value::from(table_name.as_str())), - ("semantic_type", &Value::from(semantic_type)), - ]; - - if predicates.eval(&row) { - self.add_column( - &catalog_name, - &schema_name, - &table_name, - semantic_type, - column, - ); - } + self.add_column( + &predicates, + &catalog_name, + &schema_name, + &table_name, + semantic_type, + column, + ); } } else { unreachable!(); @@ -232,6 +223,7 @@ impl InformationSchemaColumnsBuilder { fn add_column( &mut self, + predicates: &Predicates, catalog_name: &str, schema_name: &str, table_name: &str, @@ -240,6 +232,18 @@ impl InformationSchemaColumnsBuilder { ) { let data_type = &column_schema.data_type.name(); + let row = [ + ("catalog_name", &Value::from(catalog_name)), + ("schema_name", &Value::from(schema_name)), + ("table_name", &Value::from(table_name)), + ("semantic_type", &Value::from(semantic_type)), + ("data_type", &Value::from(data_type.as_str())), + ]; + + if !predicates.eval(&row) { + return; + } + self.catalog_names.push(Some(catalog_name)); self.schema_names.push(Some(schema_name)); self.table_names.push(Some(table_name)); diff --git a/src/catalog/src/information_schema/key_column_usage.rs b/src/catalog/src/information_schema/key_column_usage.rs index 75a46d4513ac..7952a8e7f288 100644 --- a/src/catalog/src/information_schema/key_column_usage.rs +++ b/src/catalog/src/information_schema/key_column_usage.rs @@ -25,6 +25,7 @@ use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef}; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::value::Value; use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder}; use snafu::{OptionExt, ResultExt}; use store_api::storage::{ScanRequest, TableId}; @@ -33,7 +34,7 @@ use super::KEY_COLUMN_USAGE; use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; -use crate::information_schema::InformationTable; +use crate::information_schema::{InformationTable, Predicates}; use crate::CatalogManager; /// The virtual table implementation for `information_schema.KEY_COLUMN_USAGE`. @@ -123,14 +124,14 @@ impl InformationTable for InformationSchemaKeyColumnUsage { self.schema.clone() } - fn to_stream(&self, _requets: ScanRequest) -> Result { + fn to_stream(&self, request: ScanRequest) -> Result { let schema = self.schema.arrow_schema().clone(); let mut builder = self.builder(); let stream = Box::pin(DfRecordBatchStreamAdapter::new( schema, futures::stream::once(async move { builder - .make_key_column_usage() + .make_key_column_usage(Some(request)) .await .map(|x| x.into_df_record_batch()) .map_err(Into::into) @@ -192,14 +193,14 @@ impl InformationSchemaKeyColumnUsageBuilder { } /// Construct the `information_schema.KEY_COLUMN_USAGE` virtual table - async fn make_key_column_usage(&mut self) -> Result { + async fn make_key_column_usage(&mut self, request: Option) -> Result { let catalog_name = self.catalog_name.clone(); let catalog_manager = self .catalog_manager .upgrade() .context(UpgradeWeakCatalogManagerRefSnafu)?; + let predicates = Predicates::from_scan_request(&request); - let mut time_index_constraints = vec![]; let mut primary_constraints = vec![]; for schema_name in catalog_manager.schema_names(&catalog_name).await? { @@ -223,11 +224,15 @@ impl InformationSchemaKeyColumnUsageBuilder { for (idx, column) in schema.column_schemas().iter().enumerate() { if column.is_time_index() { - time_index_constraints.push(( - schema_name.clone(), - table_name.clone(), - column.name.clone(), - )); + self.add_key_column_usage( + &predicates, + &schema_name, + "TIME INDEX", + &schema_name, + &table_name, + &column.name, + 1, //always 1 for time index + ); } if keys.contains(&idx) { primary_constraints.push(( @@ -244,22 +249,11 @@ impl InformationSchemaKeyColumnUsageBuilder { } } - for (i, (schema_name, table_name, column_name)) in - time_index_constraints.into_iter().enumerate() - { - self.add_key_column_usage( - &schema_name, - "TIME INDEX", - &schema_name, - &table_name, - &column_name, - i as u32 + 1, - ); - } for (i, (schema_name, table_name, column_name)) in primary_constraints.into_iter().enumerate() { self.add_key_column_usage( + &predicates, &schema_name, "PRIMARY", &schema_name, @@ -274,8 +268,10 @@ impl InformationSchemaKeyColumnUsageBuilder { // TODO(dimbtp): Foreign key constraint has not `None` value for last 4 // fields, but it is not supported yet. + #[allow(clippy::too_many_arguments)] fn add_key_column_usage( &mut self, + predicates: &Predicates, constraint_schema: &str, constraint_name: &str, table_schema: &str, @@ -283,6 +279,19 @@ impl InformationSchemaKeyColumnUsageBuilder { column_name: &str, ordinal_position: u32, ) { + let row = [ + ("constraint_schema", &Value::from(constraint_schema)), + ("constraint_name", &Value::from(constraint_name)), + ("table_schema", &Value::from(table_schema)), + ("table_name", &Value::from(table_name)), + ("column_name", &Value::from(column_name)), + ("ordinal_position", &Value::from(ordinal_position)), + ]; + + if !predicates.eval(&row) { + return; + } + self.constraint_catalog.push(Some("def")); self.constraint_schema.push(Some(constraint_schema)); self.constraint_name.push(Some(constraint_name)); @@ -328,7 +337,7 @@ impl DfPartitionStream for InformationSchemaKeyColumnUsage { schema, futures::stream::once(async move { builder - .make_key_column_usage() + .make_key_column_usage(None) .await .map(|x| x.into_df_record_batch()) .map_err(Into::into) diff --git a/src/catalog/src/information_schema/schemata.rs b/src/catalog/src/information_schema/schemata.rs index 46aa6e388d61..40d401de3337 100644 --- a/src/catalog/src/information_schema/schemata.rs +++ b/src/catalog/src/information_schema/schemata.rs @@ -25,6 +25,7 @@ use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef}; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::value::Value; use datatypes::vectors::StringVectorBuilder; use snafu::{OptionExt, ResultExt}; use store_api::storage::{ScanRequest, TableId}; @@ -33,7 +34,7 @@ use super::SCHEMATA; use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; -use crate::information_schema::InformationTable; +use crate::information_schema::{InformationTable, Predicates}; use crate::CatalogManager; /// The `information_schema.schemata` table implementation. @@ -92,14 +93,14 @@ impl InformationTable for InformationSchemaSchemata { self.schema.clone() } - fn to_stream(&self, _request: ScanRequest) -> Result { + fn to_stream(&self, request: ScanRequest) -> Result { let schema = self.schema.arrow_schema().clone(); let mut builder = self.builder(); let stream = Box::pin(DfRecordBatchStreamAdapter::new( schema, futures::stream::once(async move { builder - .make_schemata() + .make_schemata(Some(request)) .await .map(|x| x.into_df_record_batch()) .map_err(Into::into) @@ -147,12 +148,13 @@ impl InformationSchemaSchemataBuilder { } /// Construct the `information_schema.schemata` virtual table - async fn make_schemata(&mut self) -> Result { + async fn make_schemata(&mut self, request: Option) -> Result { let catalog_name = self.catalog_name.clone(); let catalog_manager = self .catalog_manager .upgrade() .context(UpgradeWeakCatalogManagerRefSnafu)?; + let predicates = Predicates::from_scan_request(&request); for schema_name in catalog_manager.schema_names(&catalog_name).await? { if !catalog_manager @@ -162,13 +164,24 @@ impl InformationSchemaSchemataBuilder { continue; } - self.add_schema(&catalog_name, &schema_name); + self.add_schema(&predicates, &catalog_name, &schema_name); } self.finish() } - fn add_schema(&mut self, catalog_name: &str, schema_name: &str) { + fn add_schema(&mut self, predicates: &Predicates, catalog_name: &str, schema_name: &str) { + let row = [ + ("catalog_name", &Value::from(catalog_name)), + ("schema_name", &Value::from(schema_name)), + ("charset_name", &Value::from("utf8")), + ("collation_name", &Value::from("utf8_bin")), + ]; + + if !predicates.eval(&row) { + return; + } + self.catalog_names.push(Some(catalog_name)); self.schema_names.push(Some(schema_name)); self.charset_names.push(Some("utf8")); @@ -200,7 +213,7 @@ impl DfPartitionStream for InformationSchemaSchemata { schema, futures::stream::once(async move { builder - .make_schemata() + .make_schemata(None) .await .map(|x| x.into_df_record_batch()) .map_err(Into::into) diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs index e243b685be62..271cc1a88881 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/information_schema/tables.rs @@ -25,6 +25,7 @@ use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef}; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::value::Value; use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder}; use snafu::{OptionExt, ResultExt}; use store_api::storage::{ScanRequest, TableId}; @@ -34,7 +35,7 @@ use super::TABLES; use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; -use crate::information_schema::InformationTable; +use crate::information_schema::{InformationTable, Predicates}; use crate::CatalogManager; pub(super) struct InformationSchemaTables { @@ -85,14 +86,14 @@ impl InformationTable for InformationSchemaTables { self.schema.clone() } - fn to_stream(&self, _request: ScanRequest) -> Result { + fn to_stream(&self, request: ScanRequest) -> Result { let schema = self.schema.arrow_schema().clone(); let mut builder = self.builder(); let stream = Box::pin(DfRecordBatchStreamAdapter::new( schema, futures::stream::once(async move { builder - .make_tables() + .make_tables(Some(request)) .await .map(|x| x.into_df_record_batch()) .map_err(Into::into) @@ -142,12 +143,13 @@ impl InformationSchemaTablesBuilder { } /// Construct the `information_schema.tables` virtual table - async fn make_tables(&mut self) -> Result { + async fn make_tables(&mut self, request: Option) -> Result { let catalog_name = self.catalog_name.clone(); let catalog_manager = self .catalog_manager .upgrade() .context(UpgradeWeakCatalogManagerRefSnafu)?; + let predicates = Predicates::from_scan_request(&request); for schema_name in catalog_manager.schema_names(&catalog_name).await? { if !catalog_manager @@ -167,6 +169,7 @@ impl InformationSchemaTablesBuilder { { let table_info = table.table_info(); self.add_table( + &predicates, &catalog_name, &schema_name, &table_name, @@ -183,8 +186,10 @@ impl InformationSchemaTablesBuilder { self.finish() } + #[allow(clippy::too_many_arguments)] fn add_table( &mut self, + predicates: &Predicates, catalog_name: &str, schema_name: &str, table_name: &str, @@ -192,14 +197,27 @@ impl InformationSchemaTablesBuilder { table_id: Option, engine: Option<&str>, ) { - self.catalog_names.push(Some(catalog_name)); - self.schema_names.push(Some(schema_name)); - self.table_names.push(Some(table_name)); - self.table_types.push(Some(match table_type { + let table_type = match table_type { TableType::Base => "BASE TABLE", TableType::View => "VIEW", TableType::Temporary => "LOCAL TEMPORARY", - })); + }; + + let row = [ + ("catalog_name", &Value::from(catalog_name)), + ("schema_name", &Value::from(schema_name)), + ("table_name", &Value::from(table_name)), + ("table_type", &Value::from(table_type)), + ]; + + if !predicates.eval(&row) { + return; + } + + self.catalog_names.push(Some(catalog_name)); + self.schema_names.push(Some(schema_name)); + self.table_names.push(Some(table_name)); + self.table_types.push(Some(table_type)); self.table_ids.push(table_id); self.engines.push(engine); } @@ -229,7 +247,7 @@ impl DfPartitionStream for InformationSchemaTables { schema, futures::stream::once(async move { builder - .make_tables() + .make_tables(None) .await .map(|x| x.into_df_record_batch()) .map_err(Into::into) From a7f0837ebb024e1b42b2a39f3fd68c981102d11e Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Wed, 3 Jan 2024 20:12:22 +0800 Subject: [PATCH 04/14] fix: typo --- src/catalog/src/information_schema.rs | 2 +- src/catalog/src/information_schema/memory_table.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index f8e379efc630..e646eb3d1bac 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -240,7 +240,7 @@ trait InformationTable { fn schema(&self) -> SchemaRef; - fn to_stream(&self, requst: ScanRequest) -> Result; + fn to_stream(&self, request: ScanRequest) -> Result; fn table_type(&self) -> TableType { TableType::Temporary diff --git a/src/catalog/src/information_schema/memory_table.rs b/src/catalog/src/information_schema/memory_table.rs index 555850d44b8e..9597fc5d12e3 100644 --- a/src/catalog/src/information_schema/memory_table.rs +++ b/src/catalog/src/information_schema/memory_table.rs @@ -74,7 +74,7 @@ impl InformationTable for MemoryTable { self.schema.clone() } - fn to_stream(&self, _requets: ScanRequest) -> Result { + fn to_stream(&self, _request: ScanRequest) -> Result { let schema = self.schema.arrow_schema().clone(); let mut builder = self.builder(); let stream = Box::pin(DfRecordBatchStreamAdapter::new( From 8de298390b77c4c4d3e0e95884271f3702393e7f Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Wed, 3 Jan 2024 22:16:21 +0800 Subject: [PATCH 05/14] fix: predicate evaluate --- src/catalog/src/information_schema/columns.rs | 11 +- .../src/information_schema/predicate.rs | 194 +++++++++++++----- .../src/information_schema/schemata.rs | 4 +- src/catalog/src/information_schema/tables.rs | 4 +- .../common/system/information_schema.result | 61 ++++++ .../common/system/information_schema.sql | 26 +++ 6 files changed, 236 insertions(+), 64 deletions(-) diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/information_schema/columns.rs index d9e4e8105afb..5ca12eb73fbf 100644 --- a/src/catalog/src/information_schema/columns.rs +++ b/src/catalog/src/information_schema/columns.rs @@ -233,11 +233,12 @@ impl InformationSchemaColumnsBuilder { let data_type = &column_schema.data_type.name(); let row = [ - ("catalog_name", &Value::from(catalog_name)), - ("schema_name", &Value::from(schema_name)), - ("table_name", &Value::from(table_name)), - ("semantic_type", &Value::from(semantic_type)), - ("data_type", &Value::from(data_type.as_str())), + (TABLE_CATALOG, &Value::from(catalog_name)), + (TABLE_SCHEMA, &Value::from(schema_name)), + (TABLE_NAME, &Value::from(table_name)), + (COLUMN_NAME, &Value::from(column_schema.name.as_str())), + (DATA_TYPE, &Value::from(data_type.as_str())), + (SEMANTIC_TYPE, &Value::from(semantic_type)), ]; if !predicates.eval(&row) { diff --git a/src/catalog/src/information_schema/predicate.rs b/src/catalog/src/information_schema/predicate.rs index 1be50a56a842..8fb68ee8b592 100644 --- a/src/catalog/src/information_schema/predicate.rs +++ b/src/catalog/src/information_schema/predicate.rs @@ -32,58 +32,63 @@ enum Predicate { } impl Predicate { - /// Evaluate the predicate with value, returns: - /// - `None` when the predicate can't run on value, - /// - `Some(true)` when the predicate is satisfied,. - /// - `Some(false)` when the predicate is not satisfied. - fn eval(&self, column: &str, value: &Value) -> Option { + /// Evaluate the predicate with the row, returns: + /// - None when the predicate can't evaluate with the row. + /// - Some(true) when the predicate is satisfied, + /// - Some(false) when the predicate is satisfied, + fn eval(&self, row: &[(&str, &Value)]) -> Option { match self { Predicate::Eq(c, v) => { - if c != column { - return None; + for (column, value) in row { + if c != column { + continue; + } + return Some(v == *value); } - Some(v == value) } Predicate::NotEq(c, v) => { - if c != column { - return None; + for (column, value) in row { + if c != column { + continue; + } + return Some(v != *value); } - Some(v != value) } Predicate::InList(c, values) => { - if c != column { - return None; + for (column, value) in row { + if c != column { + continue; + } + return Some(values.iter().any(|v| v == *value)); } - Some(values.iter().any(|v| v == value)) } Predicate::And(left, right) => { - let Some(left) = left.eval(column, value) else { - return None; - }; - let Some(right) = right.eval(column, value) else { - return None; + return match (left.eval(row), right.eval(row)) { + (Some(left), Some(right)) => Some(left && right), + (Some(false), None) => Some(false), + (None, Some(false)) => Some(false), + _ => None, }; - - Some(left && right) } Predicate::Or(left, right) => { - let Some(left) = left.eval(column, value) else { - return None; - }; - let Some(right) = right.eval(column, value) else { - return None; + return match (left.eval(row), right.eval(row)) { + (Some(left), Some(right)) => Some(left || right), + (Some(true), None) => Some(true), + (None, Some(true)) => Some(true), + _ => None, }; - - Some(left || right) } Predicate::Not(p) => { - let Some(p) = p.eval(column, value) else { + let Some(b) = p.eval(row) else { return None; }; - Some(!p) + return Some(!b); } } + + // Can't evaluate predicate on the row + None } // Try to create a predicate from datafusion `Expr`, return None if fails. @@ -203,29 +208,6 @@ impl Predicates { } } - /// Evaluate the predicates with the column value, - /// returns true when all the predicates are satisfied or can't be evaluated. - fn eval_column(&self, column: &str, value: &Value) -> bool { - let mut result = true; - for predicate in &self.predicates { - match predicate.eval(column, value) { - Some(b) => { - result = result && b; - } - None => { - // Can't eval this predicate, continue - continue; - } - } - - if !result { - break; - } - } - - result - } - /// Evaluate the predicates with the columns and values, /// returns true when all the predicates are satisfied or can't be evaluated. pub fn eval(&self, row: &[(&str, &Value)]) -> bool { @@ -235,8 +217,15 @@ impl Predicates { } let mut result = true; - for (column, value) in row { - result = result && self.eval_column(column, value); + + for predicate in &self.predicates { + match predicate.eval(row) { + Some(b) => { + result = result && b; + } + // The predicate can't evalute on the row, continue + None => continue, + } if !result { break; @@ -251,3 +240,98 @@ impl Predicates { fn is_all_scalars(list: &[DfExpr]) -> bool { list.iter().all(|v| matches!(v, DfExpr::Literal(_))) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_predicate_eval() { + let a_col = "a".to_string(); + let b_col = "b".to_string(); + let a_value = Value::from("a_value"); + let b_value = Value::from("b_value"); + let wrong_value = Value::from("wrong_value"); + + let a_row = [(a_col.as_str(), &a_value)]; + let b_row = [("b", &wrong_value)]; + let wrong_row = [(a_col.as_str(), &wrong_value)]; + + // Predicate::Eq + let p = Predicate::Eq(a_col.clone(), a_value.clone()); + assert!(p.eval(&a_row).unwrap()); + assert!(p.eval(&b_row).is_none()); + assert!(!p.eval(&wrong_row).unwrap()); + + // Predicate::NotEq + let p = Predicate::NotEq(a_col.clone(), a_value.clone()); + assert!(!p.eval(&a_row).unwrap()); + assert!(p.eval(&b_row).is_none()); + assert!(p.eval(&wrong_row).unwrap()); + + // Predicate::InList + let p = Predicate::InList(a_col.clone(), vec![a_value.clone(), b_value.clone()]); + assert!(p.eval(&a_row).unwrap()); + assert!(p.eval(&b_row).is_none()); + assert!(!p.eval(&wrong_row).unwrap()); + assert!(p.eval(&[(&a_col, &b_value)]).unwrap()); + + let p1 = Predicate::Eq(a_col.clone(), a_value.clone()); + let p2 = Predicate::Eq(b_col.clone(), b_value.clone()); + let row = [(a_col.as_str(), &a_value), (b_col.as_str(), &b_value)]; + let wrong_row = [(a_col.as_str(), &a_value), (b_col.as_str(), &wrong_value)]; + + //Predicate::And + let p = Predicate::And(Box::new(p1.clone()), Box::new(p2.clone())); + assert!(p.eval(&row).unwrap()); + assert!(!p.eval(&wrong_row).unwrap()); + assert!(p.eval(&[]).is_none()); + assert!(p.eval(&[("c", &a_value)]).is_none()); + assert!(!p + .eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &a_value)]) + .unwrap()); + assert!(!p + .eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &b_value)]) + .unwrap()); + assert!(p + .eval(&[(a_col.as_ref(), &a_value), ("c", &a_value)]) + .is_none()); + assert!(!p + .eval(&[(a_col.as_ref(), &b_value), ("c", &a_value)]) + .unwrap()); + + //Predicate::Or + let p = Predicate::Or(Box::new(p1), Box::new(p2)); + assert!(p.eval(&row).unwrap()); + assert!(p.eval(&wrong_row).unwrap()); + assert!(p.eval(&[]).is_none()); + assert!(p.eval(&[("c", &a_value)]).is_none()); + assert!(!p + .eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &a_value)]) + .unwrap()); + assert!(p + .eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &b_value)]) + .unwrap()); + assert!(p + .eval(&[(a_col.as_ref(), &a_value), ("c", &a_value)]) + .unwrap()); + assert!(p + .eval(&[(a_col.as_ref(), &b_value), ("c", &a_value)]) + .is_none()); + } + + #[test] + fn test_predicate_from_expr() { + todo!() + } + + #[test] + fn test_predicates_from_scan_request() { + todo!() + } + + #[test] + fn test_predicates_eval_row() { + todo!() + } +} diff --git a/src/catalog/src/information_schema/schemata.rs b/src/catalog/src/information_schema/schemata.rs index 40d401de3337..cc1e6d79a429 100644 --- a/src/catalog/src/information_schema/schemata.rs +++ b/src/catalog/src/information_schema/schemata.rs @@ -174,8 +174,8 @@ impl InformationSchemaSchemataBuilder { let row = [ ("catalog_name", &Value::from(catalog_name)), ("schema_name", &Value::from(schema_name)), - ("charset_name", &Value::from("utf8")), - ("collation_name", &Value::from("utf8_bin")), + ("default_character_set_name", &Value::from("utf8")), + ("default_collation_name", &Value::from("utf8_bin")), ]; if !predicates.eval(&row) { diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs index 271cc1a88881..dd0a89833cd4 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/information_schema/tables.rs @@ -204,8 +204,8 @@ impl InformationSchemaTablesBuilder { }; let row = [ - ("catalog_name", &Value::from(catalog_name)), - ("schema_name", &Value::from(schema_name)), + ("table_catalog", &Value::from(catalog_name)), + ("table_schema", &Value::from(schema_name)), ("table_name", &Value::from(table_name)), ("table_type", &Value::from(table_type)), ]; diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index 2243265bf3a9..890eed008cc6 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -324,6 +324,29 @@ order by table_name; | foo | +------------+ +select table_name +from information_schema.tables +where table_schema in ('my_db', 'public') +order by table_name; + ++------------+ +| table_name | ++------------+ +| foo | +| numbers | ++------------+ + +select table_name +from information_schema.tables +where table_schema not in ('my_db', 'information_schema') +order by table_name; + ++------------+ +| table_name | ++------------+ +| numbers | ++------------+ + select table_catalog, table_schema, table_name, table_type, engine from information_schema.tables where table_catalog = 'greptime' @@ -350,6 +373,22 @@ order by table_schema, table_name, column_name; | greptime | my_db | foo | ts | TimestampMillisecond | TIMESTAMP | +---------------+--------------+------------+-------------+----------------------+---------------+ +-- test query filter for columns -- +select table_catalog, table_schema, table_name, column_name, data_type, semantic_type +from information_schema.columns +where table_catalog = 'greptime' + and (table_schema in ('public') + or + table_schema == 'my_db') +order by table_schema, table_name, column_name; + ++---------------+--------------+------------+-------------+----------------------+---------------+ +| table_catalog | table_schema | table_name | column_name | data_type | semantic_type | ++---------------+--------------+------------+-------------+----------------------+---------------+ +| greptime | my_db | foo | ts | TimestampMillisecond | TIMESTAMP | +| greptime | public | numbers | number | UInt32 | TAG | ++---------------+--------------+------------+-------------+----------------------+---------------+ + use public; Affected Rows: 0 @@ -362,6 +401,28 @@ use information_schema; Affected Rows: 0 +-- test query filter for key_column_usage -- +select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME = 'TIME INDEX'; + ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ +| constraint_catalog | constraint_schema | constraint_name | table_catalog | table_schema | table_name | column_name | ordinal_position | position_in_unique_constraint | referenced_table_schema | referenced_table_name | referenced_column_name | ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ +| def | my_db | TIME INDEX | def | my_db | foo | ts | 1 | | | | | ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ + +select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME != 'TIME INDEX'; + ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ +| constraint_catalog | constraint_schema | constraint_name | table_catalog | table_schema | table_name | column_name | ordinal_position | position_in_unique_constraint | referenced_table_schema | referenced_table_name | referenced_column_name | ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ +| def | public | PRIMARY | def | public | numbers | number | 1 | | | | | ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ + +select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME == 'TIME INDEX' AND CONSTRAINT_SCHEMA != 'my_db'; + +++ +++ + -- schemata -- desc table schemata; diff --git a/tests/cases/standalone/common/system/information_schema.sql b/tests/cases/standalone/common/system/information_schema.sql index ef0fbdeb7578..c741a33d5252 100644 --- a/tests/cases/standalone/common/system/information_schema.sql +++ b/tests/cases/standalone/common/system/information_schema.sql @@ -24,6 +24,16 @@ from information_schema.tables where table_schema = 'my_db' order by table_name; +select table_name +from information_schema.tables +where table_schema in ('my_db', 'public') +order by table_name; + +select table_name +from information_schema.tables +where table_schema not in ('my_db', 'information_schema') +order by table_name; + select table_catalog, table_schema, table_name, table_type, engine from information_schema.tables where table_catalog = 'greptime' @@ -38,12 +48,28 @@ where table_catalog = 'greptime' and table_schema != 'information_schema' order by table_schema, table_name, column_name; +-- test query filter for columns -- +select table_catalog, table_schema, table_name, column_name, data_type, semantic_type +from information_schema.columns +where table_catalog = 'greptime' + and (table_schema in ('public') + or + table_schema == 'my_db') +order by table_schema, table_name, column_name; + use public; drop schema my_db; use information_schema; +-- test query filter for key_column_usage -- +select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME = 'TIME INDEX'; + +select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME != 'TIME INDEX'; + +select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME == 'TIME INDEX' AND CONSTRAINT_SCHEMA != 'my_db'; + -- schemata -- desc table schemata; From 5fb395310820855067db16f2bbf74566be2bc363 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Wed, 3 Jan 2024 22:21:11 +0800 Subject: [PATCH 06/14] fix: typo --- src/catalog/src/information_schema/predicate.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/catalog/src/information_schema/predicate.rs b/src/catalog/src/information_schema/predicate.rs index 8fb68ee8b592..cb6bbf29f111 100644 --- a/src/catalog/src/information_schema/predicate.rs +++ b/src/catalog/src/information_schema/predicate.rs @@ -87,7 +87,7 @@ impl Predicate { } } - // Can't evaluate predicate on the row + // Can't evaluate predicate with the row None } @@ -208,7 +208,7 @@ impl Predicates { } } - /// Evaluate the predicates with the columns and values, + /// Evaluate the predicates with the row. /// returns true when all the predicates are satisfied or can't be evaluated. pub fn eval(&self, row: &[(&str, &Value)]) -> bool { // fast path @@ -223,7 +223,7 @@ impl Predicates { Some(b) => { result = result && b; } - // The predicate can't evalute on the row, continue + // The predicate can't evaluate with the row, continue None => continue, } From 044b2bec415eedaf1d7f44bc3d068e1146768edc Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Thu, 4 Jan 2024 10:24:07 +0800 Subject: [PATCH 07/14] test: predicates --- .../src/information_schema/predicate.rs | 147 +++++++++++++++++- 1 file changed, 143 insertions(+), 4 deletions(-) diff --git a/src/catalog/src/information_schema/predicate.rs b/src/catalog/src/information_schema/predicate.rs index cb6bbf29f111..8149a8a55b11 100644 --- a/src/catalog/src/information_schema/predicate.rs +++ b/src/catalog/src/information_schema/predicate.rs @@ -35,7 +35,7 @@ impl Predicate { /// Evaluate the predicate with the row, returns: /// - None when the predicate can't evaluate with the row. /// - Some(true) when the predicate is satisfied, - /// - Some(false) when the predicate is satisfied, + /// - Some(false) when the predicate is not satisfied, fn eval(&self, row: &[(&str, &Value)]) -> Option { match self { Predicate::Eq(c, v) => { @@ -243,6 +243,10 @@ fn is_all_scalars(list: &[DfExpr]) -> bool { #[cfg(test)] mod tests { + use datafusion::common::{Column, ScalarValue}; + use datafusion::logical_expr::expr::InList; + use datafusion::logical_expr::BinaryExpr; + use super::*; #[test] @@ -320,18 +324,153 @@ mod tests { .is_none()); } + fn column(name: &str) -> DfExpr { + DfExpr::Column(Column { + relation: None, + name: name.to_string(), + }) + } + + fn string_literal(v: &str) -> DfExpr { + DfExpr::Literal(ScalarValue::Utf8(Some(v.to_string()))) + } + + fn match_string_value(v: &Value, expected: &str) -> bool { + matches!(v, Value::String(bs) if bs.as_utf8() == expected) + } + + fn match_string_values(vs: &[Value], expected: &[&str]) -> bool { + assert_eq!(vs.len(), expected.len()); + + let mut result = true; + for (i, v) in vs.iter().enumerate() { + result = result && match_string_value(v, expected[i]); + } + + result + } + + fn mock_exprs() -> (DfExpr, DfExpr) { + let expr1 = DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(column("a")), + op: Operator::Eq, + right: Box::new(string_literal("a_value")), + }); + + let expr2 = DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(column("b")), + op: Operator::NotEq, + right: Box::new(string_literal("b_value")), + }); + + (expr1, expr2) + } + #[test] fn test_predicate_from_expr() { - todo!() + let (expr1, expr2) = mock_exprs(); + + let p1 = Predicate::from_expr(expr1.clone()).unwrap(); + assert!(matches!(&p1, Predicate::Eq(column, v) if column == "a" + && match_string_value(v, "a_value"))); + + let p2 = Predicate::from_expr(expr2.clone()).unwrap(); + assert!(matches!(&p2, Predicate::NotEq(column, v) if column == "b" + && match_string_value(v, "b_value"))); + + let and_expr = DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(expr1.clone()), + op: Operator::And, + right: Box::new(expr2.clone()), + }); + let or_expr = DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(expr1.clone()), + op: Operator::Or, + right: Box::new(expr2.clone()), + }); + let not_expr = DfExpr::Not(Box::new(expr1.clone())); + + let and_p = Predicate::from_expr(and_expr).unwrap(); + assert!(matches!(and_p, Predicate::And(left, right) if *left == p1 && *right == p2)); + let or_p = Predicate::from_expr(or_expr).unwrap(); + assert!(matches!(or_p, Predicate::Or(left, right) if *left == p1 && *right == p2)); + let not_p = Predicate::from_expr(not_expr).unwrap(); + assert!(matches!(not_p, Predicate::Not(p) if *p == p1)); + + let inlist_expr = DfExpr::InList(InList { + expr: Box::new(column("a")), + list: vec![string_literal("a1"), string_literal("a2")], + negated: false, + }); + + let inlist_p = Predicate::from_expr(inlist_expr).unwrap(); + assert!(matches!(&inlist_p, Predicate::InList(c, values) if c == "a" + && match_string_values(values, &["a1", "a2"]))); + + let inlist_expr = DfExpr::InList(InList { + expr: Box::new(column("a")), + list: vec![string_literal("a1"), string_literal("a2")], + negated: true, + }); + let inlist_p = Predicate::from_expr(inlist_expr).unwrap(); + assert!(matches!(inlist_p, Predicate::Not(p) if + matches!(&*p, + Predicate::InList(c, values) if c == "a" + && match_string_values(values, &["a1", "a2"])))); } #[test] fn test_predicates_from_scan_request() { - todo!() + let predicates = Predicates::from_scan_request(&None); + assert!(predicates.predicates.is_empty()); + + let (expr1, expr2) = mock_exprs(); + + let request = ScanRequest { + filters: vec![expr1.into(), expr2.into()], + ..Default::default() + }; + let predicates = Predicates::from_scan_request(&Some(request)); + + assert_eq!(2, predicates.predicates.len()); + assert!( + matches!(&predicates.predicates[0], Predicate::Eq(column, v) if column == "a" + && match_string_value(v, "a_value")) + ); + assert!( + matches!(&predicates.predicates[1], Predicate::NotEq(column, v) if column == "b" + && match_string_value(v, "b_value")) + ); } #[test] fn test_predicates_eval_row() { - todo!() + let wrong_row = [ + ("a", &Value::from("a_value")), + ("b", &Value::from("b_value")), + ("c", &Value::from("c_value")), + ]; + let row = [ + ("a", &Value::from("a_value")), + ("b", &Value::from("not_b_value")), + ("c", &Value::from("c_value")), + ]; + let c_row = [("c", &Value::from("c_value"))]; + + // test empty predicates, always returns true + let predicates = Predicates::from_scan_request(&None); + assert!(predicates.eval(&row)); + assert!(predicates.eval(&wrong_row)); + assert!(predicates.eval(&c_row)); + + let (expr1, expr2) = mock_exprs(); + let request = ScanRequest { + filters: vec![expr1.into(), expr2.into()], + ..Default::default() + }; + let predicates = Predicates::from_scan_request(&Some(request)); + assert!(predicates.eval(&row)); + assert!(!predicates.eval(&wrong_row)); + assert!(predicates.eval(&c_row)); } } From f8e6212ae686e5da67ca48a0f271ff13ac0f7713 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Thu, 4 Jan 2024 10:38:54 +0800 Subject: [PATCH 08/14] fix: comment --- src/catalog/src/information_schema/predicate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/catalog/src/information_schema/predicate.rs b/src/catalog/src/information_schema/predicate.rs index 8149a8a55b11..2ff46cb5bd8e 100644 --- a/src/catalog/src/information_schema/predicate.rs +++ b/src/catalog/src/information_schema/predicate.rs @@ -91,7 +91,7 @@ impl Predicate { None } - // Try to create a predicate from datafusion `Expr`, return None if fails. + /// Try to create a predicate from datafusion `Expr`, return None if fails. fn from_expr(expr: DfExpr) -> Option { match expr { // NOT expr From 0ea1ffc56cd1d0e73eb67291e6f653cfb01fed36 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Thu, 4 Jan 2024 10:45:29 +0800 Subject: [PATCH 09/14] fix: pub mod --- src/catalog/src/information_schema.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index e646eb3d1bac..84be9576f610 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -15,7 +15,7 @@ mod columns; mod key_column_usage; mod memory_table; -pub(crate) mod predicate; +mod predicate; mod schemata; mod table_names; mod tables; From a3496c67642a318bd2ec6fb6ed327975df415822 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Thu, 4 Jan 2024 14:10:44 +0800 Subject: [PATCH 10/14] docs: improve comments --- src/catalog/src/information_schema/predicate.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/catalog/src/information_schema/predicate.rs b/src/catalog/src/information_schema/predicate.rs index 2ff46cb5bd8e..10e8aeecb3cd 100644 --- a/src/catalog/src/information_schema/predicate.rs +++ b/src/catalog/src/information_schema/predicate.rs @@ -18,7 +18,7 @@ use datatypes::value::Value; use store_api::storage::ScanRequest; type ColumnName = String; -/// Predicate to filter information_schema tables stream, +/// Predicate to filter `information_schema` tables stream, /// we only support these simple predicates currently. /// TODO(dennis): supports more predicate types. #[derive(Clone, PartialEq, Eq, Debug)] @@ -33,9 +33,9 @@ enum Predicate { impl Predicate { /// Evaluate the predicate with the row, returns: - /// - None when the predicate can't evaluate with the row. - /// - Some(true) when the predicate is satisfied, - /// - Some(false) when the predicate is not satisfied, + /// - `None` when the predicate can't evaluate with the row. + /// - `Some(true)` when the predicate is satisfied, + /// - `Some(false)` when the predicate is not satisfied, fn eval(&self, row: &[(&str, &Value)]) -> Option { match self { Predicate::Eq(c, v) => { @@ -91,7 +91,7 @@ impl Predicate { None } - /// Try to create a predicate from datafusion `Expr`, return None if fails. + /// Try to create a predicate from datafusion [`Expr`], return None if fails. fn from_expr(expr: DfExpr) -> Option { match expr { // NOT expr @@ -189,7 +189,7 @@ pub struct Predicates { } impl Predicates { - /// Try its best to create predicates from `ScanRequest`. + /// Try its best to create predicates from [`ScanRequest`]. pub fn from_scan_request(request: &Option) -> Predicates { if let Some(request) = request { let mut predicates = Vec::with_capacity(request.filters.len()); @@ -236,7 +236,7 @@ impl Predicates { } } -/// Returns true when the values are all `ScalarValue`. +/// Returns true when the values are all [`DfExpr::Literal`]. fn is_all_scalars(list: &[DfExpr]) -> bool { list.iter().all(|v| matches!(v, DfExpr::Literal(_))) } From b8f5a73abdf6cd9a8399fc6e3c5abf177e0a9d72 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Thu, 4 Jan 2024 18:15:52 +0800 Subject: [PATCH 11/14] fix: cr comments and supports like predicate --- Cargo.lock | 1 + src/catalog/Cargo.toml | 1 + .../information_schema/key_column_usage.rs | 39 +++-- .../src/information_schema/predicate.rs | 158 +++++++++++++++++- .../src/information_schema/schemata.rs | 21 ++- src/catalog/src/information_schema/tables.rs | 27 +-- .../common/system/information_schema.result | 27 +++ .../common/system/information_schema.sql | 9 + 8 files changed, 239 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b042227a293b..0b3ece080a4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1183,6 +1183,7 @@ version = "0.5.0" dependencies = [ "api", "arc-swap", + "arrow", "arrow-schema", "async-stream", "async-trait", diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 79f3603f4f19..9d9f1329d9b2 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -10,6 +10,7 @@ testing = [] [dependencies] api.workspace = true arc-swap = "1.0" +arrow.workspace = true arrow-schema.workspace = true async-stream.workspace = true async-trait = "0.1" diff --git a/src/catalog/src/information_schema/key_column_usage.rs b/src/catalog/src/information_schema/key_column_usage.rs index 7952a8e7f288..e77397fa089e 100644 --- a/src/catalog/src/information_schema/key_column_usage.rs +++ b/src/catalog/src/information_schema/key_column_usage.rs @@ -37,6 +37,13 @@ use crate::error::{ use crate::information_schema::{InformationTable, Predicates}; use crate::CatalogManager; +const CONSTRAINT_SCHEMA: &str = "constraint_schema"; +const CONSTRAINT_NAME: &str = "constraint_name"; +const TABLE_SCHEMA: &str = "table_schema"; +const TABLE_NAME: &str = "table_name"; +const COLUMN_NAME: &str = "column_name"; +const ORDINAL_POSITION: &str = "ordinal_position"; + /// The virtual table implementation for `information_schema.KEY_COLUMN_USAGE`. pub(super) struct InformationSchemaKeyColumnUsage { schema: SchemaRef, @@ -61,24 +68,16 @@ impl InformationSchemaKeyColumnUsage { false, ), ColumnSchema::new( - "constraint_schema", - ConcreteDataType::string_datatype(), - false, - ), - ColumnSchema::new( - "constraint_name", + CONSTRAINT_SCHEMA, ConcreteDataType::string_datatype(), false, ), + ColumnSchema::new(CONSTRAINT_NAME, ConcreteDataType::string_datatype(), false), ColumnSchema::new("table_catalog", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("table_schema", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("table_name", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("column_name", ConcreteDataType::string_datatype(), false), - ColumnSchema::new( - "ordinal_position", - ConcreteDataType::uint32_datatype(), - false, - ), + ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(COLUMN_NAME, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(ORDINAL_POSITION, ConcreteDataType::uint32_datatype(), false), ColumnSchema::new( "position_in_unique_constraint", ConcreteDataType::uint32_datatype(), @@ -280,12 +279,12 @@ impl InformationSchemaKeyColumnUsageBuilder { ordinal_position: u32, ) { let row = [ - ("constraint_schema", &Value::from(constraint_schema)), - ("constraint_name", &Value::from(constraint_name)), - ("table_schema", &Value::from(table_schema)), - ("table_name", &Value::from(table_name)), - ("column_name", &Value::from(column_name)), - ("ordinal_position", &Value::from(ordinal_position)), + (CONSTRAINT_SCHEMA, &Value::from(constraint_schema)), + (CONSTRAINT_NAME, &Value::from(constraint_name)), + (TABLE_SCHEMA, &Value::from(table_schema)), + (TABLE_NAME, &Value::from(table_name)), + (COLUMN_NAME, &Value::from(column_name)), + (ORDINAL_POSITION, &Value::from(ordinal_position)), ]; if !predicates.eval(&row) { diff --git a/src/catalog/src/information_schema/predicate.rs b/src/catalog/src/information_schema/predicate.rs index 10e8aeecb3cd..ca5551672c7e 100644 --- a/src/catalog/src/information_schema/predicate.rs +++ b/src/catalog/src/information_schema/predicate.rs @@ -12,7 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use arrow::array::StringArray; +use arrow::compute::kernels::comparison; use common_query::logical_plan::DfExpr; +use datafusion::common::ScalarValue; +use datafusion::logical_expr::expr::Like; use datafusion::logical_expr::Operator; use datatypes::value::Value; use store_api::storage::ScanRequest; @@ -24,6 +28,7 @@ type ColumnName = String; #[derive(Clone, PartialEq, Eq, Debug)] enum Predicate { Eq(ColumnName, Value), + Like(ColumnName, String, bool), NotEq(ColumnName, Value), InList(ColumnName, Vec), And(Box, Box), @@ -46,6 +51,19 @@ impl Predicate { return Some(v == *value); } } + Predicate::Like(c, pattern, case_insenstive) => { + for (column, value) in row { + if c != column { + continue; + } + + let Value::String(bs) = value else { + continue; + }; + + return like_utf8(bs.as_utf8(), pattern, case_insenstive); + } + } Predicate::NotEq(c, v) => { for (column, value) in row { if c != column { @@ -63,17 +81,29 @@ impl Predicate { } } Predicate::And(left, right) => { - return match (left.eval(row), right.eval(row)) { + let left = left.eval(row); + + // short-circuit + if matches!(left, Some(false)) { + return Some(false); + } + + return match (left, right.eval(row)) { (Some(left), Some(right)) => Some(left && right), - (Some(false), None) => Some(false), (None, Some(false)) => Some(false), _ => None, }; } Predicate::Or(left, right) => { - return match (left.eval(row), right.eval(row)) { + let left = left.eval(row); + + // short-circuit + if matches!(left, Some(true)) { + return Some(true); + } + + return match (left, right.eval(row)) { (Some(left), Some(right)) => Some(left || right), - (Some(true), None) => Some(true), (None, Some(true)) => Some(true), _ => None, }; @@ -102,6 +132,30 @@ impl Predicate { Some(Predicate::Not(Box::new(p))) } + // expr LIKE pattern + DfExpr::Like(Like { + negated, + expr, + pattern, + case_insensitive, + .. + }) if is_column(&expr) && is_string_literal(&pattern) => { + // Safety: ensured by gurad + let DfExpr::Column(c) = *expr else { + unreachable!(); + }; + let DfExpr::Literal(ScalarValue::Utf8(Some(pattern))) = *pattern else { + unreachable!(); + }; + + let p = Predicate::Like(c.name, pattern, case_insensitive); + + if negated { + Some(Predicate::Not(Box::new(p))) + } else { + Some(p) + } + } // left OP right DfExpr::BinaryExpr(bin) => match (*bin.left, bin.op, *bin.right) { // left == right @@ -183,6 +237,34 @@ impl Predicate { } } +/// Perform SQL left LIKE right, return `None` if fail to evaluate. +/// - `s` the target string +/// - `pattern` the pattern just like '%abc' +/// - `case_insenstive` whether to perform case-insensitive like or not. +fn like_utf8(s: &str, pattern: &str, case_insenstive: &bool) -> Option { + let array = StringArray::from(vec![s]); + let patterns = StringArray::new_scalar(pattern); + + let Ok(booleans) = (if *case_insenstive { + comparison::ilike(&array, &patterns) + } else { + comparison::like(&array, &patterns) + }) else { + return None; + }; + + // Safty: at least one value in result + Some(booleans.value(0)) +} + +fn is_string_literal(expr: &DfExpr) -> bool { + matches!(expr, DfExpr::Literal(ScalarValue::Utf8(Some(_)))) +} + +fn is_column(expr: &DfExpr) -> bool { + matches!(expr, DfExpr::Column(_)) +} + /// A list of predicate pub struct Predicates { predicates: Vec, @@ -324,6 +406,70 @@ mod tests { .is_none()); } + #[test] + fn test_predicate_like() { + // case insenstive + let expr = DfExpr::Like(Like { + negated: false, + expr: Box::new(column("a")), + pattern: Box::new(string_literal("%abc")), + case_insensitive: true, + escape_char: None, + }); + + let p = Predicate::from_expr(expr).unwrap(); + assert!( + matches!(&p, Predicate::Like(c, pattern, case_insensitive) if + c == "a" + && pattern == "%abc" + && *case_insensitive) + ); + + let match_row = [ + ("a", &Value::from("hello AbC")), + ("b", &Value::from("b value")), + ]; + let unmatch_row = [("a", &Value::from("bca")), ("b", &Value::from("b value"))]; + + assert!(p.eval(&match_row).unwrap()); + assert!(!p.eval(&unmatch_row).unwrap()); + assert!(p.eval(&[]).is_none()); + + // case senstive + let expr = DfExpr::Like(Like { + negated: false, + expr: Box::new(column("a")), + pattern: Box::new(string_literal("%abc")), + case_insensitive: false, + escape_char: None, + }); + + let p = Predicate::from_expr(expr).unwrap(); + assert!( + matches!(&p, Predicate::Like(c, pattern, case_insensitive) if + c == "a" + && pattern == "%abc" + && !*case_insensitive) + ); + assert!(!p.eval(&match_row).unwrap()); + assert!(!p.eval(&unmatch_row).unwrap()); + assert!(p.eval(&[]).is_none()); + + // not like + let expr = DfExpr::Like(Like { + negated: true, + expr: Box::new(column("a")), + pattern: Box::new(string_literal("%abc")), + case_insensitive: true, + escape_char: None, + }); + + let p = Predicate::from_expr(expr).unwrap(); + assert!(!p.eval(&match_row).unwrap()); + assert!(p.eval(&unmatch_row).unwrap()); + assert!(p.eval(&[]).is_none()); + } + fn column(name: &str) -> DfExpr { DfExpr::Column(Column { relation: None, @@ -435,11 +581,11 @@ mod tests { assert_eq!(2, predicates.predicates.len()); assert!( matches!(&predicates.predicates[0], Predicate::Eq(column, v) if column == "a" - && match_string_value(v, "a_value")) + && match_string_value(v, "a_value")) ); assert!( matches!(&predicates.predicates[1], Predicate::NotEq(column, v) if column == "b" - && match_string_value(v, "b_value")) + && match_string_value(v, "b_value")) ); } diff --git a/src/catalog/src/information_schema/schemata.rs b/src/catalog/src/information_schema/schemata.rs index cc1e6d79a429..eddfb142cc77 100644 --- a/src/catalog/src/information_schema/schemata.rs +++ b/src/catalog/src/information_schema/schemata.rs @@ -37,6 +37,11 @@ use crate::error::{ use crate::information_schema::{InformationTable, Predicates}; use crate::CatalogManager; +const CATALOG_NAME: &str = "catalog_name"; +const SCHEMA_NAME: &str = "schema_name"; +const DEFAULT_CHARACTER_SET_NAME: &str = "default_character_set_name"; +const DEFAULT_COLLATION_NAME: &str = "default_collation_name"; + /// The `information_schema.schemata` table implementation. pub(super) struct InformationSchemaSchemata { schema: SchemaRef, @@ -55,15 +60,15 @@ impl InformationSchemaSchemata { pub(crate) fn schema() -> SchemaRef { Arc::new(Schema::new(vec![ - ColumnSchema::new("catalog_name", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("schema_name", ConcreteDataType::string_datatype(), false), + ColumnSchema::new(CATALOG_NAME, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(SCHEMA_NAME, ConcreteDataType::string_datatype(), false), ColumnSchema::new( - "default_character_set_name", + DEFAULT_CHARACTER_SET_NAME, ConcreteDataType::string_datatype(), false, ), ColumnSchema::new( - "default_collation_name", + DEFAULT_COLLATION_NAME, ConcreteDataType::string_datatype(), false, ), @@ -172,10 +177,10 @@ impl InformationSchemaSchemataBuilder { fn add_schema(&mut self, predicates: &Predicates, catalog_name: &str, schema_name: &str) { let row = [ - ("catalog_name", &Value::from(catalog_name)), - ("schema_name", &Value::from(schema_name)), - ("default_character_set_name", &Value::from("utf8")), - ("default_collation_name", &Value::from("utf8_bin")), + (CATALOG_NAME, &Value::from(catalog_name)), + (SCHEMA_NAME, &Value::from(schema_name)), + (DEFAULT_CHARACTER_SET_NAME, &Value::from("utf8")), + (DEFAULT_COLLATION_NAME, &Value::from("utf8_bin")), ]; if !predicates.eval(&row) { diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs index dd0a89833cd4..5320e50277f0 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/information_schema/tables.rs @@ -38,6 +38,13 @@ use crate::error::{ use crate::information_schema::{InformationTable, Predicates}; use crate::CatalogManager; +const TABLE_CATALOG: &str = "table_catalog"; +const TABLE_SCHEMA: &str = "table_schema"; +const TABLE_NAME: &str = "table_name"; +const TABLE_TYPE: &str = "table_type"; +const TABLE_ID: &str = "table_id"; +const ENGINE: &str = "engine"; + pub(super) struct InformationSchemaTables { schema: SchemaRef, catalog_name: String, @@ -55,12 +62,12 @@ impl InformationSchemaTables { pub(crate) fn schema() -> SchemaRef { Arc::new(Schema::new(vec![ - ColumnSchema::new("table_catalog", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("table_schema", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("table_name", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("table_type", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("table_id", ConcreteDataType::uint32_datatype(), true), - ColumnSchema::new("engine", ConcreteDataType::string_datatype(), true), + ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(TABLE_TYPE, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), true), + ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true), ])) } @@ -204,10 +211,10 @@ impl InformationSchemaTablesBuilder { }; let row = [ - ("table_catalog", &Value::from(catalog_name)), - ("table_schema", &Value::from(schema_name)), - ("table_name", &Value::from(table_name)), - ("table_type", &Value::from(table_type)), + (TABLE_CATALOG, &Value::from(catalog_name)), + (TABLE_SCHEMA, &Value::from(schema_name)), + (TABLE_NAME, &Value::from(table_name)), + (TABLE_TYPE, &Value::from(table_type)), ]; if !predicates.eval(&row) { diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index 890eed008cc6..75a692b51f1e 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -336,6 +336,17 @@ order by table_name; | numbers | +------------+ +select table_name +from information_schema.tables +where table_schema like 'my%' +order by table_name; + ++------------+ +| table_name | ++------------+ +| foo | ++------------+ + select table_name from information_schema.tables where table_schema not in ('my_db', 'information_schema') @@ -418,6 +429,22 @@ select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME != 'TIME INDEX'; | def | public | PRIMARY | def | public | numbers | number | 1 | | | | | +--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ +select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME LIKE '%INDEX'; + ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ +| constraint_catalog | constraint_schema | constraint_name | table_catalog | table_schema | table_name | column_name | ordinal_position | position_in_unique_constraint | referenced_table_schema | referenced_table_name | referenced_column_name | ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ +| def | my_db | TIME INDEX | def | my_db | foo | ts | 1 | | | | | ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ + +select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME NOT LIKE '%INDEX'; + ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ +| constraint_catalog | constraint_schema | constraint_name | table_catalog | table_schema | table_name | column_name | ordinal_position | position_in_unique_constraint | referenced_table_schema | referenced_table_name | referenced_column_name | ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ +| def | public | PRIMARY | def | public | numbers | number | 1 | | | | | ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ + select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME == 'TIME INDEX' AND CONSTRAINT_SCHEMA != 'my_db'; ++ diff --git a/tests/cases/standalone/common/system/information_schema.sql b/tests/cases/standalone/common/system/information_schema.sql index c741a33d5252..0ba3508aca47 100644 --- a/tests/cases/standalone/common/system/information_schema.sql +++ b/tests/cases/standalone/common/system/information_schema.sql @@ -29,6 +29,11 @@ from information_schema.tables where table_schema in ('my_db', 'public') order by table_name; +select table_name +from information_schema.tables +where table_schema like 'my%' +order by table_name; + select table_name from information_schema.tables where table_schema not in ('my_db', 'information_schema') @@ -68,6 +73,10 @@ select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME = 'TIME INDEX'; select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME != 'TIME INDEX'; +select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME LIKE '%INDEX'; + +select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME NOT LIKE '%INDEX'; + select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME == 'TIME INDEX' AND CONSTRAINT_SCHEMA != 'my_db'; -- schemata -- From 618d4835e0d2e263aa7d1409a76436cfe6e026b5 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Thu, 4 Jan 2024 18:22:05 +0800 Subject: [PATCH 12/14] chore: typo --- .../src/information_schema/key_column_usage.rs | 3 ++- src/catalog/src/information_schema/predicate.rs | 16 ++++++++-------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/catalog/src/information_schema/key_column_usage.rs b/src/catalog/src/information_schema/key_column_usage.rs index e77397fa089e..28fba3c63ced 100644 --- a/src/catalog/src/information_schema/key_column_usage.rs +++ b/src/catalog/src/information_schema/key_column_usage.rs @@ -39,6 +39,7 @@ use crate::CatalogManager; const CONSTRAINT_SCHEMA: &str = "constraint_schema"; const CONSTRAINT_NAME: &str = "constraint_name"; +const TABLE_CATALOG: &str = "table_catalog"; const TABLE_SCHEMA: &str = "table_schema"; const TABLE_NAME: &str = "table_name"; const COLUMN_NAME: &str = "column_name"; @@ -73,7 +74,7 @@ impl InformationSchemaKeyColumnUsage { false, ), ColumnSchema::new(CONSTRAINT_NAME, ConcreteDataType::string_datatype(), false), - ColumnSchema::new("table_catalog", ConcreteDataType::string_datatype(), false), + ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false), ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false), ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false), ColumnSchema::new(COLUMN_NAME, ConcreteDataType::string_datatype(), false), diff --git a/src/catalog/src/information_schema/predicate.rs b/src/catalog/src/information_schema/predicate.rs index ca5551672c7e..c05b19c4ee50 100644 --- a/src/catalog/src/information_schema/predicate.rs +++ b/src/catalog/src/information_schema/predicate.rs @@ -51,7 +51,7 @@ impl Predicate { return Some(v == *value); } } - Predicate::Like(c, pattern, case_insenstive) => { + Predicate::Like(c, pattern, case_insensitive) => { for (column, value) in row { if c != column { continue; @@ -61,7 +61,7 @@ impl Predicate { continue; }; - return like_utf8(bs.as_utf8(), pattern, case_insenstive); + return like_utf8(bs.as_utf8(), pattern, case_insensitive); } } Predicate::NotEq(c, v) => { @@ -240,12 +240,12 @@ impl Predicate { /// Perform SQL left LIKE right, return `None` if fail to evaluate. /// - `s` the target string /// - `pattern` the pattern just like '%abc' -/// - `case_insenstive` whether to perform case-insensitive like or not. -fn like_utf8(s: &str, pattern: &str, case_insenstive: &bool) -> Option { +/// - `case_insensitive` whether to perform case-insensitive like or not. +fn like_utf8(s: &str, pattern: &str, case_insensitive: &bool) -> Option { let array = StringArray::from(vec![s]); let patterns = StringArray::new_scalar(pattern); - let Ok(booleans) = (if *case_insenstive { + let Ok(booleans) = (if *case_insensitive { comparison::ilike(&array, &patterns) } else { comparison::like(&array, &patterns) @@ -253,7 +253,7 @@ fn like_utf8(s: &str, pattern: &str, case_insenstive: &bool) -> Option { return None; }; - // Safty: at least one value in result + // Safety: at least one value in result Some(booleans.value(0)) } @@ -408,7 +408,7 @@ mod tests { #[test] fn test_predicate_like() { - // case insenstive + // case insensitive let expr = DfExpr::Like(Like { negated: false, expr: Box::new(column("a")), @@ -435,7 +435,7 @@ mod tests { assert!(!p.eval(&unmatch_row).unwrap()); assert!(p.eval(&[]).is_none()); - // case senstive + // case sensitive let expr = DfExpr::Like(Like { negated: false, expr: Box::new(column("a")), From 91b86dabdf66954760c33d1efb1e385fb0bc0a2a Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Thu, 4 Jan 2024 18:26:20 +0800 Subject: [PATCH 13/14] fix: cargo toml format --- src/catalog/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 9d9f1329d9b2..715324e7a2db 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -10,8 +10,8 @@ testing = [] [dependencies] api.workspace = true arc-swap = "1.0" -arrow.workspace = true arrow-schema.workspace = true +arrow.workspace = true async-stream.workspace = true async-trait = "0.1" build-data = "0.1" From 452ad9db1b6ad59ffe8a2b405400abee53c84017 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Fri, 5 Jan 2024 15:02:56 +0800 Subject: [PATCH 14/14] chore: apply suggestion --- .../src/information_schema/predicate.rs | 21 ++++--------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/src/catalog/src/information_schema/predicate.rs b/src/catalog/src/information_schema/predicate.rs index c05b19c4ee50..9afc83a389f5 100644 --- a/src/catalog/src/information_schema/predicate.rs +++ b/src/catalog/src/information_schema/predicate.rs @@ -298,23 +298,10 @@ impl Predicates { return true; } - let mut result = true; - - for predicate in &self.predicates { - match predicate.eval(row) { - Some(b) => { - result = result && b; - } - // The predicate can't evaluate with the row, continue - None => continue, - } - - if !result { - break; - } - } - - result + self.predicates + .iter() + .filter_map(|p| p.eval(row)) + .all(|b| b) } }