diff --git a/src/datatypes/src/schema/column_schema.rs b/src/datatypes/src/schema/column_schema.rs index ffe9d2eb4a95..c22b9f7b4a5f 100644 --- a/src/datatypes/src/schema/column_schema.rs +++ b/src/datatypes/src/schema/column_schema.rs @@ -123,6 +123,14 @@ impl ColumnSchema { self.default_constraint.as_ref() } + /// Check if the default constraint is a impure function. + pub fn is_default_impure(&self) -> bool { + self.default_constraint + .as_ref() + .map(|c| c.is_function()) + .unwrap_or(false) + } + #[inline] pub fn metadata(&self) -> &Metadata { &self.metadata @@ -290,6 +298,15 @@ impl ColumnSchema { } } + /// Creates an impure default value for this column, only if it have a impure default constraint. + /// Otherwise, returns `Ok(None)`. + pub fn create_impure_default(&self) -> Result> { + match &self.default_constraint { + Some(c) => c.create_impure_default(&self.data_type), + None => Ok(None), + } + } + /// Retrieves the fulltext options for the column. pub fn fulltext_options(&self) -> Result> { match self.metadata.get(FULLTEXT_KEY) { diff --git a/src/datatypes/src/schema/constraint.rs b/src/datatypes/src/schema/constraint.rs index faf08a1bb8c3..1a2128c2009f 100644 --- a/src/datatypes/src/schema/constraint.rs +++ b/src/datatypes/src/schema/constraint.rs @@ -178,12 +178,63 @@ impl ColumnDefaultConstraint { } } + /// Only create default vector if it's impure, i.e., it's a function. + /// + /// This helps to delay creating constant default values to mito engine while also keeps impure default have consistent values + pub fn create_impure_default_vector( + &self, + data_type: &ConcreteDataType, + num_rows: usize, + ) -> Result> { + assert!(num_rows > 0); + + match self { + ColumnDefaultConstraint::Function(expr) => { + // Functions should also ensure its return value is not null when + // is_nullable is true. + match &expr[..] { + // TODO(dennis): we only supports current_timestamp right now, + // it's better to use a expression framework in future. + CURRENT_TIMESTAMP | CURRENT_TIMESTAMP_FN | NOW_FN => { + create_current_timestamp_vector(data_type, num_rows).map(Some) + } + _ => error::UnsupportedDefaultExprSnafu { expr }.fail(), + } + } + ColumnDefaultConstraint::Value(_) => Ok(None), + } + } + + /// Only create default value if it's impure, i.e., it's a function. + /// + /// This helps to delay creating constant default values to mito engine while also keeps impure default have consistent values + pub fn create_impure_default(&self, data_type: &ConcreteDataType) -> Result> { + match self { + ColumnDefaultConstraint::Function(expr) => { + // Functions should also ensure its return value is not null when + // is_nullable is true. + match &expr[..] { + CURRENT_TIMESTAMP | CURRENT_TIMESTAMP_FN | NOW_FN => { + create_current_timestamp(data_type).map(Some) + } + _ => error::UnsupportedDefaultExprSnafu { expr }.fail(), + } + } + ColumnDefaultConstraint::Value(_) => Ok(None), + } + } + /// Returns true if this constraint might creates NULL. fn maybe_null(&self) -> bool { // Once we support more functions, we may return true if given function // could return null. matches!(self, ColumnDefaultConstraint::Value(Value::Null)) } + + /// Returns true if this constraint is a function. + pub fn is_function(&self) -> bool { + matches!(self, ColumnDefaultConstraint::Function(_)) + } } fn create_current_timestamp(data_type: &ConcreteDataType) -> Result { diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index f46f0d544021..7f4a4267cb14 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -24,6 +24,7 @@ use common_error::ext::BoxedError; use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu}; use common_meta::node_manager::Flownode; use common_telemetry::{debug, trace}; +use datatypes::value::Value; use itertools::Itertools; use snafu::{IntoError, OptionExt, ResultExt}; use store_api::storage::RegionId; @@ -178,14 +179,32 @@ impl Flownode for FlowWorkerManager { .table_from_id(&table_id) .await .map_err(to_meta_err(snafu::location!()))?; + let default_vals = table_schema + .default_values + .iter() + .zip(table_schema.relation_desc.typ().column_types.iter()) + .map(|(v, ty)| { + v.as_ref().and_then(|v| { + match v.create_default(ty.scalar_type(), ty.nullable()) { + Ok(v) => Some(v), + Err(err) => { + common_telemetry::error!(err; "Failed to create default value"); + None + } + } + }) + }) + .collect_vec(); + let table_types = table_schema + .relation_desc .typ() .column_types .clone() .into_iter() .map(|t| t.scalar_type) .collect_vec(); - let table_col_names = table_schema.names; + let table_col_names = table_schema.relation_desc.names; let table_col_names = table_col_names .iter().enumerate() .map(|(idx,name)| match name { @@ -202,31 +221,35 @@ impl Flownode for FlowWorkerManager { .enumerate() .map(|(i, name)| (&name.column_name, i)), ); - let fetch_order: Vec = table_col_names + + let fetch_order: Vec = table_col_names .iter() - .map(|col_name| { + .zip(default_vals.into_iter()) + .map(|(col_name, col_default_val)| { name_to_col .get(col_name) .copied() + .map(FetchFromRow::Idx) + .or_else(|| col_default_val.clone().map(FetchFromRow::Default)) .with_context(|| UnexpectedSnafu { - err_msg: format!("Column not found: {}", col_name), + err_msg: format!( + "Column not found: {}, default_value: {:?}", + col_name, col_default_val + ), }) }) .try_collect()?; - if !fetch_order.iter().enumerate().all(|(i, &v)| i == v) { - trace!("Reordering columns: {:?}", fetch_order) - } + + trace!("Reordering columns: {:?}", fetch_order); (table_types, fetch_order) }; + // TODO(discord9): use column instead of row let rows: Vec = rows_proto .into_iter() .map(|r| { let r = repr::Row::from(r); - let reordered = fetch_order - .iter() - .map(|&i| r.inner[i].clone()) - .collect_vec(); + let reordered = fetch_order.iter().map(|i| i.fetch(&r)).collect_vec(); repr::Row::new(reordered) }) .map(|r| (r, now, 1)) @@ -258,3 +281,20 @@ impl Flownode for FlowWorkerManager { Ok(Default::default()) } } + +/// Simple helper enum for fetching value from row with default value +#[derive(Debug, Clone)] +enum FetchFromRow { + Idx(usize), + Default(Value), +} + +impl FetchFromRow { + /// Panic if idx is out of bound + fn fetch(&self, row: &repr::Row) -> Value { + match self { + FetchFromRow::Idx(idx) => row.get(*idx).unwrap().clone(), + FetchFromRow::Default(v) => v.clone(), + } + } +} diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 070a3946c27e..d0cc5ffce5bf 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -353,7 +353,7 @@ impl FlownodeContext { name: name.join("."), })?; let schema = self.table_source.table(name).await?; - Ok((id, schema)) + Ok((id, schema.relation_desc)) } /// Assign a global id to a table, if already assigned, return the existing global id diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs index 41ef218feb10..c40a9ce3608c 100644 --- a/src/flow/src/adapter/table_source.rs +++ b/src/flow/src/adapter/table_source.rs @@ -17,6 +17,8 @@ use common_error::ext::BoxedError; use common_meta::key::table_info::{TableInfoManager, TableInfoValue}; use common_meta::key::table_name::{TableNameKey, TableNameManager}; +use datatypes::schema::ColumnDefaultConstraint; +use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; @@ -27,6 +29,32 @@ use crate::error::{ }; use crate::repr::RelationDesc; +/// Table description, include relation desc and default values, which is the minimal information flow needed for table +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct TableDesc { + pub relation_desc: RelationDesc, + pub default_values: Vec>, +} + +impl TableDesc { + pub fn new( + relation_desc: RelationDesc, + default_values: Vec>, + ) -> Self { + Self { + relation_desc, + default_values, + } + } + + pub fn new_no_default(relation_desc: RelationDesc) -> Self { + Self { + relation_desc, + default_values: vec![], + } + } +} + /// Table source but for flow, provide table schema by table name/id #[async_trait::async_trait] pub trait FlowTableSource: Send + Sync + std::fmt::Debug { @@ -34,11 +62,11 @@ pub trait FlowTableSource: Send + Sync + std::fmt::Debug { async fn table_id_from_name(&self, name: &TableName) -> Result; /// Get the table schema by table name - async fn table(&self, name: &TableName) -> Result { + async fn table(&self, name: &TableName) -> Result { let id = self.table_id_from_name(name).await?; self.table_from_id(&id).await } - async fn table_from_id(&self, table_id: &TableId) -> Result; + async fn table_from_id(&self, table_id: &TableId) -> Result; } /// managed table source information, query from table info manager and table name manager @@ -51,7 +79,7 @@ pub struct ManagedTableSource { #[async_trait::async_trait] impl FlowTableSource for ManagedTableSource { - async fn table_from_id(&self, table_id: &TableId) -> Result { + async fn table_from_id(&self, table_id: &TableId) -> Result { let table_info_value = self .get_table_info_value(table_id) .await? @@ -175,7 +203,7 @@ impl ManagedTableSource { pub async fn get_table_name_schema( &self, table_id: &TableId, - ) -> Result<(TableName, RelationDesc), Error> { + ) -> Result<(TableName, TableDesc), Error> { let table_info_value = self .get_table_info_value(table_id) .await? @@ -219,7 +247,7 @@ pub(crate) mod test { use crate::repr::{ColumnType, RelationType}; pub struct FlowDummyTableSource { - pub id_names_to_desc: Vec<(TableId, TableName, RelationDesc)>, + pub id_names_to_desc: Vec<(TableId, TableName, TableDesc)>, id_to_idx: HashMap, name_to_idx: HashMap, } @@ -234,8 +262,10 @@ pub(crate) mod test { "public".to_string(), "numbers".to_string(), ], - RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]) - .into_named(vec![Some("number".to_string())]), + TableDesc::new_no_default( + RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]) + .into_named(vec![Some("number".to_string())]), + ), ), ( 1025, @@ -244,11 +274,13 @@ pub(crate) mod test { "public".to_string(), "numbers_with_ts".to_string(), ], - RelationType::new(vec![ - ColumnType::new(CDT::uint32_datatype(), false), - ColumnType::new(CDT::timestamp_millisecond_datatype(), false), - ]) - .into_named(vec![Some("number".to_string()), Some("ts".to_string())]), + TableDesc::new_no_default( + RelationType::new(vec![ + ColumnType::new(CDT::uint32_datatype(), false), + ColumnType::new(CDT::timestamp_millisecond_datatype(), false), + ]) + .into_named(vec![Some("number".to_string()), Some("ts".to_string())]), + ), ), ]; let id_to_idx = id_names_to_desc @@ -271,7 +303,7 @@ pub(crate) mod test { #[async_trait::async_trait] impl FlowTableSource for FlowDummyTableSource { - async fn table_from_id(&self, table_id: &TableId) -> Result { + async fn table_from_id(&self, table_id: &TableId) -> Result { let idx = self.id_to_idx.get(table_id).context(TableNotFoundSnafu { name: format!("Table id = {:?}, couldn't found table desc", table_id), })?; diff --git a/src/flow/src/adapter/util.rs b/src/flow/src/adapter/util.rs index f5d2968543f9..b851cf0e70bc 100644 --- a/src/flow/src/adapter/util.rs +++ b/src/flow/src/adapter/util.rs @@ -27,6 +27,7 @@ use session::context::QueryContextBuilder; use snafu::{OptionExt, ResultExt}; use table::table_reference::TableReference; +use crate::adapter::table_source::TableDesc; use crate::adapter::{TableName, AUTO_CREATED_PLACEHOLDER_TS_COL}; use crate::error::{Error, ExternalSnafu, UnexpectedSnafu}; use crate::repr::{ColumnType, RelationDesc, RelationType}; @@ -126,7 +127,7 @@ impl FlowWorkerManager { pub fn table_info_value_to_relation_desc( table_info_value: TableInfoValue, -) -> Result { +) -> Result { let raw_schema = table_info_value.table_info.meta.schema; let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema .column_schemas @@ -147,8 +148,7 @@ pub fn table_info_value_to_relation_desc( let keys = vec![crate::repr::Key::from(key)]; let time_index = raw_schema.timestamp_index; - - Ok(RelationDesc { + let relation_desc = RelationDesc { typ: RelationType { column_types, keys, @@ -157,7 +157,14 @@ pub fn table_info_value_to_relation_desc( auto_columns: vec![], }, names: col_names, - }) + }; + let default_values = raw_schema + .column_schemas + .iter() + .map(|c| c.default_constraint().cloned()) + .collect_vec(); + + Ok(TableDesc::new(relation_desc, default_values)) } pub fn from_proto_to_data_type( diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 0820d99337ec..9f55c45804c5 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -925,6 +925,20 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display( + "Unexpected impure default value with region_id: {}, column: {}, default_value: {}", + region_id, + column, + default_value + ))] + UnexpectedImpureDefault { + #[snafu(implicit)] + location: Location, + region_id: RegionId, + column: String, + default_value: String, + }, } pub type Result = std::result::Result; @@ -964,7 +978,8 @@ impl ErrorExt for Error { | InvalidParquet { .. } | OperateAbortedIndex { .. } | UnexpectedReplay { .. } - | IndexEncodeNull { .. } => StatusCode::Unexpected, + | IndexEncodeNull { .. } + | UnexpectedImpureDefault { .. } => StatusCode::Unexpected, RegionNotFound { .. } => StatusCode::RegionNotFound, ObjectStoreNotFound { .. } | InvalidScanIndex { .. } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index c6dfcbe6c716..0cd59aa1d98c 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -42,7 +42,7 @@ use tokio::sync::oneshot::{self, Receiver, Sender}; use crate::error::{ CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu, - FlushRegionSnafu, InvalidRequestSnafu, Result, + FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedImpureDefaultSnafu, }; use crate::manifest::action::RegionEdit; use crate::memtable::MemtableId; @@ -333,6 +333,14 @@ impl WriteRequest { } OpType::Put => { // For put requests, we use the default value from column schema. + if column.column_schema.is_default_impure() { + UnexpectedImpureDefaultSnafu { + region_id: self.region_id, + column: &column.column_schema.name, + default_value: format!("{:?}", column.column_schema.default_constraint()), + } + .fail()? + } column .column_schema .create_default() @@ -1039,6 +1047,57 @@ mod tests { check_invalid_request(&err, r#"unknown columns: ["k1"]"#); } + #[test] + fn test_fill_impure_columns_err() { + let rows = Rows { + schema: vec![new_column_schema( + "k0", + ColumnDataType::Int64, + SemanticType::Tag, + )], + rows: vec![Row { + values: vec![i64_value(1)], + }], + }; + let metadata = { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_default_constraint(Some(ColumnDefaultConstraint::Function( + "now()".to_string(), + ))) + .unwrap(), + semantic_type: SemanticType::Timestamp, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + "k0", + ConcreteDataType::int64_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 2, + }) + .primary_key(vec![2]); + builder.build().unwrap() + }; + + let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + assert!(err.is_fill_default()); + assert!(request + .fill_missing_columns(&metadata) + .unwrap_err() + .to_string() + .contains("Unexpected impure default value with region_id")); + } + #[test] fn test_fill_missing_columns() { let rows = Rows { diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 9aed4e43ec27..47525d13cad1 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -47,7 +47,7 @@ use store_api::metric_engine_consts::{ }; use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY}; use store_api::storage::{RegionId, TableId}; -use table::metadata::TableInfoRef; +use table::metadata::TableInfo; use table::requests::{InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TTL_KEY}; use table::table_reference::TableReference; use table::TableRef; @@ -59,7 +59,9 @@ use crate::error::{ use crate::expr_factory::CreateExprFactory; use crate::region_req_factory::RegionRequestFactory; use crate::req_convert::common::preprocess_row_insert_requests; -use crate::req_convert::insert::{ColumnToRow, RowToRegion, StatementToRegion, TableToRegion}; +use crate::req_convert::insert::{ + fill_reqs_with_impure_default, ColumnToRow, RowToRegion, StatementToRegion, TableToRegion, +}; use crate::statement::StatementExecutor; pub struct Inserter { @@ -202,18 +204,26 @@ impl Inserter { }); validate_column_count_match(&requests)?; - let (tables_info, instant_table_ids) = self + let CreateAlterTableResult { + instant_table_ids, + table_infos, + } = self .create_or_alter_tables_on_demand(&requests, &ctx, create_type, statement_executor) .await?; + + let name_to_info = table_infos + .values() + .map(|info| (info.name.clone(), info.clone())) + .collect::>(); let inserts = RowToRegion::new( - tables_info, + name_to_info, instant_table_ids, self.partition_manager.as_ref(), ) .convert(requests) .await?; - self.do_request(inserts, &ctx).await + self.do_request(inserts, &table_infos, &ctx).await } /// Handles row inserts request with metric engine. @@ -238,7 +248,10 @@ impl Inserter { .await?; // check and create logical tables - let (tables_info, instant_table_ids) = self + let CreateAlterTableResult { + instant_table_ids, + table_infos, + } = self .create_or_alter_tables_on_demand( &requests, &ctx, @@ -246,11 +259,15 @@ impl Inserter { statement_executor, ) .await?; - let inserts = RowToRegion::new(tables_info, instant_table_ids, &self.partition_manager) + let name_to_info = table_infos + .values() + .map(|info| (info.name.clone(), info.clone())) + .collect::>(); + let inserts = RowToRegion::new(name_to_info, instant_table_ids, &self.partition_manager) .convert(requests) .await?; - self.do_request(inserts, &ctx).await + self.do_request(inserts, &table_infos, &ctx).await } pub async fn handle_table_insert( @@ -271,7 +288,10 @@ impl Inserter { .convert(request) .await?; - self.do_request(inserts, &ctx).await + let table_infos = + HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter()); + + self.do_request(inserts, &table_infos, &ctx).await } pub async fn handle_statement_insert( @@ -279,12 +299,15 @@ impl Inserter { insert: &Insert, ctx: &QueryContextRef, ) -> Result { - let inserts = + let (inserts, table_info) = StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx) .convert(insert, ctx) .await?; - self.do_request(inserts, ctx).await + let table_infos = + HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter()); + + self.do_request(inserts, &table_infos, ctx).await } } @@ -292,8 +315,12 @@ impl Inserter { async fn do_request( &self, requests: InstantAndNormalInsertRequests, + table_infos: &HashMap>, ctx: &QueryContextRef, ) -> Result { + // Fill impure default values in the request + let requests = fill_reqs_with_impure_default(table_infos, requests)?; + let write_cost = write_meter!( ctx.current_catalog(), ctx.current_schema(), @@ -497,14 +524,15 @@ impl Inserter { ctx: &QueryContextRef, auto_create_table_type: AutoCreateTableType, statement_executor: &StatementExecutor, - ) -> Result<(HashMap, HashSet)> { + ) -> Result { let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND .with_label_values(&[auto_create_table_type.as_str()]) .start_timer(); let catalog = ctx.current_catalog(); let schema = ctx.current_schema(); - let mut tables_info = HashMap::with_capacity(requests.inserts.len()); + + let mut table_infos = HashMap::new(); // If `auto_create_table` hint is disabled, skip creating/altering tables. let auto_create_table_hint = ctx .extension(AUTO_CREATE_TABLE_KEY) @@ -533,9 +561,13 @@ impl Inserter { if table_info.is_ttl_instant_table() { instant_table_ids.insert(table_info.table_id()); } - tables_info.insert(table_info.name.clone(), table_info); + table_infos.insert(table_info.table_id(), table.table_info()); } - return Ok((tables_info, instant_table_ids)); + let ret = CreateAlterTableResult { + instant_table_ids, + table_infos, + }; + return Ok(ret); } let mut create_tables = vec![]; @@ -549,7 +581,7 @@ impl Inserter { if table_info.is_ttl_instant_table() { instant_table_ids.insert(table_info.table_id()); } - tables_info.insert(table_info.name.clone(), table_info); + table_infos.insert(table_info.table_id(), table.table_info()); if let Some(alter_expr) = self.get_alter_table_expr_on_demand(req, &table, ctx)? { @@ -577,7 +609,7 @@ impl Inserter { if table_info.is_ttl_instant_table() { instant_table_ids.insert(table_info.table_id()); } - tables_info.insert(table_info.name.clone(), table_info); + table_infos.insert(table_info.table_id(), table.table_info()); } } if !alter_tables.is_empty() { @@ -600,7 +632,7 @@ impl Inserter { if table_info.is_ttl_instant_table() { instant_table_ids.insert(table_info.table_id()); } - tables_info.insert(table_info.name.clone(), table_info); + table_infos.insert(table_info.table_id(), table.table_info()); } for alter_expr in alter_tables.into_iter() { statement_executor @@ -610,7 +642,10 @@ impl Inserter { } } - Ok((tables_info, instant_table_ids)) + Ok(CreateAlterTableResult { + instant_table_ids, + table_infos, + }) } async fn create_physical_table_on_demand( @@ -872,3 +907,11 @@ fn build_create_table_expr( ) -> Result { CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, engine, None) } + +/// Result of `create_or_alter_tables_on_demand`. +struct CreateAlterTableResult { + /// table ids of ttl=instant tables. + instant_table_ids: HashSet, + /// Table Info of the created tables. + table_infos: HashMap>, +} diff --git a/src/operator/src/req_convert/insert.rs b/src/operator/src/req_convert/insert.rs index 51984c4de034..88d402b0ac65 100644 --- a/src/operator/src/req_convert/insert.rs +++ b/src/operator/src/req_convert/insert.rs @@ -13,12 +13,14 @@ // limitations under the License. mod column_to_row; +mod fill_impure_default; mod row_to_region; mod stmt_to_region; mod table_to_region; use api::v1::SemanticType; pub use column_to_row::ColumnToRow; +pub use fill_impure_default::fill_reqs_with_impure_default; pub use row_to_region::RowToRegion; use snafu::{OptionExt, ResultExt}; pub use stmt_to_region::StatementToRegion; diff --git a/src/operator/src/req_convert/insert/fill_impure_default.rs b/src/operator/src/req_convert/insert/fill_impure_default.rs new file mode 100644 index 000000000000..d473d55dfb7f --- /dev/null +++ b/src/operator/src/req_convert/insert/fill_impure_default.rs @@ -0,0 +1,242 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Util functions to help with fill impure default values columns in request + +use std::sync::Arc; + +use ahash::{HashMap, HashMapExt, HashSet}; +use datatypes::schema::ColumnSchema; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::{RegionId, TableId}; +use table::metadata::{TableInfo, TableInfoRef}; + +use crate::error::{ConvertColumnDefaultConstraintSnafu, Result, UnexpectedSnafu}; +use crate::expr_factory::column_schemas_to_defs; +use crate::insert::InstantAndNormalInsertRequests; + +/// Find all columns that have impure default values +pub fn find_all_impure_columns(table_info: &TableInfo) -> Vec { + let columns = table_info.meta.schema.column_schemas(); + columns + .iter() + .filter(|column| column.is_default_impure()) + .cloned() + .collect() +} + +/// Fill impure default values in the request +pub struct ImpureDefaultFiller { + impure_columns: HashMap)>, +} + +impl ImpureDefaultFiller { + pub fn new(table_info: TableInfoRef) -> Result { + let impure_column_list = find_all_impure_columns(&table_info); + let pks = &table_info.meta.primary_key_indices; + let pk_names = pks + .iter() + .map(|&i| table_info.meta.schema.column_name_by_index(i).to_string()) + .collect::>(); + let mut impure_columns = HashMap::new(); + for column in impure_column_list { + let default_value = column + .create_impure_default() + .with_context(|_| ConvertColumnDefaultConstraintSnafu { + column_name: column.name.clone(), + })? + .with_context(|| UnexpectedSnafu { + violated: format!( + "Expect default value to be impure, found {:?}", + column.default_constraint() + ), + })?; + let grpc_default_value = api::helper::to_proto_value(default_value); + let def = column_schemas_to_defs(vec![column], &pk_names)?.swap_remove(0); + let grpc_column_schema = api::v1::ColumnSchema { + column_name: def.name, + datatype: def.data_type, + semantic_type: def.semantic_type, + datatype_extension: def.datatype_extension, + options: def.options, + }; + impure_columns.insert( + grpc_column_schema.column_name.clone(), + (grpc_column_schema, grpc_default_value), + ); + } + Ok(Self { impure_columns }) + } + + /// Fill impure default values in the request + pub fn fill_rows(&self, rows: &mut api::v1::Rows) { + let impure_columns_in_reqs: HashSet<_> = rows + .schema + .iter() + .filter_map(|schema| { + if self.impure_columns.contains_key(&schema.column_name) { + Some(&schema.column_name) + } else { + None + } + }) + .collect(); + + if self.impure_columns.len() == impure_columns_in_reqs.len() { + return; + } + + let (schema_append, row_append): (Vec<_>, Vec<_>) = self + .impure_columns + .iter() + .filter_map(|(name, (schema, val))| { + if !impure_columns_in_reqs.contains(name) { + Some((schema.clone(), val.clone().unwrap_or_default())) + } else { + None + } + }) + .unzip(); + + rows.schema.extend(schema_append); + for row in rows.rows.iter_mut() { + row.values.extend_from_slice(row_append.as_slice()); + } + } +} + +/// Fill impure default values in the request(only for normal insert requests, since instant insert can be filled in flownode directly as a single source of truth) +pub fn fill_reqs_with_impure_default( + table_infos: &HashMap>, + mut inserts: InstantAndNormalInsertRequests, +) -> Result { + let fillers = table_infos + .iter() + .map(|(table_id, table_info)| { + let table_id = *table_id; + ImpureDefaultFiller::new(table_info.clone()).map(|filler| (table_id, filler)) + }) + .collect::>>()?; + + let normal_inserts = &mut inserts.normal_requests; + for request in normal_inserts.requests.iter_mut() { + let region_id = RegionId::from(request.region_id); + let table_id = region_id.table_id(); + let filler = fillers.get(&table_id).with_context(|| UnexpectedSnafu { + violated: format!("impure default filler for table_id: {} not found", table_id), + })?; + + if let Some(rows) = &mut request.rows { + filler.fill_rows(rows); + } + } + Ok(inserts) +} + +#[cfg(test)] +mod tests { + use api::v1::value::ValueData; + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder}; + use datatypes::value::Value; + use table::metadata::{TableInfoBuilder, TableMetaBuilder}; + + use super::*; + + /// Create a test schema with 3 columns: `[col1 int32, ts timestampmills DEFAULT now(), col2 int32]`. + fn new_test_schema() -> Schema { + let column_schemas = vec![ + ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true) + .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Function( + "now()".to_string(), + ))) + .unwrap(), + ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true) + .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value( + Value::from(1i32), + ))) + .unwrap(), + ]; + SchemaBuilder::try_from(column_schemas) + .unwrap() + .version(123) + .build() + .unwrap() + } + + pub fn new_table_info() -> TableInfo { + let schema = Arc::new(new_test_schema()); + let meta = TableMetaBuilder::default() + .schema(schema) + .primary_key_indices(vec![0]) + .engine("engine") + .next_column_id(3) + .build() + .unwrap(); + + TableInfoBuilder::default() + .table_id(10) + .table_version(5) + .name("mytable") + .meta(meta) + .build() + .unwrap() + } + + fn column_schema_to_proto( + column_schema: &[ColumnSchema], + pk_names: &[String], + ) -> Vec { + column_schemas_to_defs(column_schema.to_vec(), pk_names) + .unwrap() + .into_iter() + .map(|def| api::v1::ColumnSchema { + column_name: def.name, + datatype: def.data_type, + semantic_type: def.semantic_type, + datatype_extension: def.datatype_extension, + options: def.options, + }) + .collect() + } + + #[test] + fn test_impure_append() { + let row = api::v1::Row { + values: vec![api::v1::Value { + value_data: Some(ValueData::I32Value(42)), + }], + }; + let schema = new_test_schema().column_schemas()[0].clone(); + let col_schemas = column_schema_to_proto(&[schema], &["col1".to_string()]); + + let mut rows = api::v1::Rows { + schema: col_schemas, + rows: vec![row], + }; + + let info = new_table_info(); + let filler = ImpureDefaultFiller::new(Arc::new(info)).unwrap(); + filler.fill_rows(&mut rows); + + assert_eq!(rows.schema[1].column_name, "ts"); + assert!(rows.schema.len() == 2 && rows.rows[0].values.len() == 2); + } +} diff --git a/src/operator/src/req_convert/insert/stmt_to_region.rs b/src/operator/src/req_convert/insert/stmt_to_region.rs index cd48b4fca54e..cb1b0057308d 100644 --- a/src/operator/src/req_convert/insert/stmt_to_region.rs +++ b/src/operator/src/req_convert/insert/stmt_to_region.rs @@ -25,6 +25,7 @@ use snafu::{ensure, OptionExt, ResultExt}; use sql::statements; use sql::statements::insert::Insert; use sqlparser::ast::{ObjectName, Value as SqlValue}; +use table::metadata::TableInfoRef; use table::TableRef; use crate::error::{ @@ -61,7 +62,7 @@ impl<'a> StatementToRegion<'a> { &self, stmt: &Insert, query_ctx: &QueryContextRef, - ) -> Result { + ) -> Result<(InstantAndNormalInsertRequests, TableInfoRef)> { let (catalog, schema, table_name) = self.get_full_name(stmt.table_name())?; let table = self.get_table(&catalog, &schema, &table_name).await?; let table_schema = table.schema(); @@ -137,15 +138,21 @@ impl<'a> StatementToRegion<'a> { .await?; let requests = RegionInsertRequests { requests }; if table_info.is_ttl_instant_table() { - Ok(InstantAndNormalInsertRequests { - normal_requests: Default::default(), - instant_requests: requests, - }) + Ok(( + InstantAndNormalInsertRequests { + normal_requests: Default::default(), + instant_requests: requests, + }, + table_info, + )) } else { - Ok(InstantAndNormalInsertRequests { - normal_requests: requests, - instant_requests: Default::default(), - }) + Ok(( + InstantAndNormalInsertRequests { + normal_requests: requests, + instant_requests: Default::default(), + }, + table_info, + )) } } diff --git a/tests/cases/standalone/common/flow/flow_ins_default.result b/tests/cases/standalone/common/flow/flow_ins_default.result new file mode 100644 index 000000000000..37c04e022d7a --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_ins_default.result @@ -0,0 +1,70 @@ +CREATE TABLE bytes_log ( + byte INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + -- event time + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE TABLE approx_rate ( + rate DOUBLE, + time_window TIMESTAMP, + update_at TIMESTAMP, + TIME INDEX(time_window) +); + +Affected Rows: 0 + +CREATE FLOW find_approx_rate SINK TO approx_rate AS +SELECT + (max(byte) - min(byte)) / 30.0 as rate, + date_bin(INTERVAL '30 second', ts) as time_window +from + bytes_log +GROUP BY + time_window; + +Affected Rows: 0 + +INSERT INTO + bytes_log (byte) +VALUES + (NULL), + (300); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('find_approx_rate') | ++--------------------------------------+ +| FLOW_FLUSHED | ++--------------------------------------+ + +-- since ts is default to now(), omit it when querying +SELECT + rate +FROM + approx_rate; + ++------+ +| rate | ++------+ +| 0.0 | ++------+ + +DROP FLOW find_approx_rate; + +Affected Rows: 0 + +DROP TABLE bytes_log; + +Affected Rows: 0 + +DROP TABLE approx_rate; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_ins_default.sql b/tests/cases/standalone/common/flow/flow_ins_default.sql new file mode 100644 index 000000000000..f7ab96cc701b --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_ins_default.sql @@ -0,0 +1,41 @@ +CREATE TABLE bytes_log ( + byte INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + -- event time + TIME INDEX(ts) +); + +CREATE TABLE approx_rate ( + rate DOUBLE, + time_window TIMESTAMP, + update_at TIMESTAMP, + TIME INDEX(time_window) +); + +CREATE FLOW find_approx_rate SINK TO approx_rate AS +SELECT + (max(byte) - min(byte)) / 30.0 as rate, + date_bin(INTERVAL '30 second', ts) as time_window +from + bytes_log +GROUP BY + time_window; + +INSERT INTO + bytes_log (byte) +VALUES + (NULL), + (300); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); + +-- since ts is default to now(), omit it when querying +SELECT + rate +FROM + approx_rate; + +DROP FLOW find_approx_rate; +DROP TABLE bytes_log; +DROP TABLE approx_rate;