diff --git a/Cargo.lock b/Cargo.lock index 7049caad46e6..fb778e623bc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2016,6 +2016,7 @@ dependencies = [ name = "common-error" version = "0.12.0" dependencies = [ + "http 0.2.12", "snafu 0.8.5", "strum 0.25.0", "tonic 0.11.0", @@ -4061,6 +4062,7 @@ dependencies = [ "get-size-derive2", "get-size2", "greptime-proto", + "http 0.2.12", "hydroflow", "itertools 0.10.5", "lazy_static", diff --git a/Cargo.toml b/Cargo.toml index 9729d5796d74..2156a3fcfc51 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -126,6 +126,7 @@ futures = "0.3" futures-util = "0.3" greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a875e976441188028353f7274a46a7e6e065c5d4" } hex = "0.4" +http = "0.2" humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/common/error/Cargo.toml b/src/common/error/Cargo.toml index 49eafb81d5a2..148e2c66336f 100644 --- a/src/common/error/Cargo.toml +++ b/src/common/error/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true workspace = true [dependencies] +http.workspace = true snafu.workspace = true strum.workspace = true tonic.workspace = true diff --git a/src/common/error/src/lib.rs b/src/common/error/src/lib.rs index c5c0e6efe092..0052d70cf38e 100644 --- a/src/common/error/src/lib.rs +++ b/src/common/error/src/lib.rs @@ -18,9 +18,30 @@ pub mod ext; pub mod mock; pub mod status_code; +use http::{HeaderMap, HeaderValue}; pub use snafu; // HACK - these headers are here for shared in gRPC services. For common HTTP headers, // please define in `src/servers/src/http/header.rs`. pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code"; pub const GREPTIME_DB_HEADER_ERROR_MSG: &str = "x-greptime-err-msg"; + +/// Create a http header map from error code and message. +/// using `GREPTIME_DB_HEADER_ERROR_CODE` and `GREPTIME_DB_HEADER_ERROR_MSG` as keys. +pub fn from_err_code_msg_to_header(code: u32, msg: &str) -> HeaderMap { + let mut header = HeaderMap::new(); + + let msg = HeaderValue::from_str(msg).unwrap_or_else(|_| { + HeaderValue::from_bytes( + &msg.as_bytes() + .iter() + .flat_map(|b| std::ascii::escape_default(*b)) + .collect::>(), + ) + .expect("Already escaped string should be valid ascii") + }); + + header.insert(GREPTIME_DB_HEADER_ERROR_CODE, code.into()); + header.insert(GREPTIME_DB_HEADER_ERROR_MSG, msg); + header +} diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 08867d342a74..b624eed05b9b 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -45,6 +45,7 @@ get-size2 = "0.1.2" greptime-proto.workspace = true # This fork of hydroflow is simply for keeping our dependency in our org, and pin the version # otherwise it is the same with upstream repo +http.workspace = true hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" } itertools.workspace = true lazy_static.workspace = true diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index eeefe019c1fd..dcdb1b1eb01a 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -30,7 +30,7 @@ use common_telemetry::{debug, info, trace}; use datatypes::schema::ColumnSchema; use datatypes::value::Value; use greptime_proto::v1; -use itertools::Itertools; +use itertools::{EitherOrBoth, Itertools}; use meta_client::MetaClientOptions; use query::QueryEngine; use serde::{Deserialize, Serialize}; @@ -46,17 +46,19 @@ use tokio::sync::{broadcast, watch, Mutex, RwLock}; pub(crate) use crate::adapter::node_context::FlownodeContext; use crate::adapter::table_source::TableSource; -use crate::adapter::util::column_schemas_to_proto; +use crate::adapter::util::{ + relation_desc_to_column_schemas_with_fallback, table_info_value_to_relation_desc, +}; use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; use crate::compute::ErrCollector; use crate::df_optimizer::sql_to_flow_plan; use crate::error::{ - EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, TableNotFoundSnafu, + EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu, }; -use crate::expr::{Batch, GlobalId}; +use crate::expr::Batch; use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS}; -use crate::repr::{self, DiffRow, Row, BATCH_SIZE}; +use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE}; mod flownode_impl; mod parse_expr; @@ -245,8 +247,12 @@ impl FlowWorkerManager { let (catalog, schema) = (table_name[0].clone(), table_name[1].clone()); let ctx = Arc::new(QueryContext::with(&catalog, &schema)); - let (is_ts_placeholder, proto_schema) = - self.try_fetch_or_create_table(&table_name).await?; + let (is_ts_placeholder, proto_schema) = self + .try_fetch_existing_table(&table_name) + .await? + .context(UnexpectedSnafu { + reason: format!("Table not found: {}", table_name.join(".")), + })?; let schema_len = proto_schema.len(); let total_rows = reqs.iter().map(|r| r.len()).sum::(); @@ -396,14 +402,12 @@ impl FlowWorkerManager { Ok(output) } - /// Fetch table info or create table from flow's schema if not exist - async fn try_fetch_or_create_table( + /// Fetch table schema and primary key from table info source, if table not exist return None + async fn fetch_table_pk_schema( &self, table_name: &TableName, - ) -> Result<(bool, Vec), Error> { - // TODO(discord9): instead of auto build table from request schema, actually build table - // before `create flow` to be able to assign pk and ts etc. - let (primary_keys, schema, is_ts_placeholder) = if let Some(table_id) = self + ) -> Result, Option, Vec)>, Error> { + if let Some(table_id) = self .table_info_source .get_table_id_from_name(table_name) .await? @@ -420,97 +424,64 @@ impl FlowWorkerManager { .map(|i| meta.schema.column_schemas[i].name.clone()) .collect_vec(); let schema = meta.schema.column_schemas; - // check if the last column is the auto created timestamp column, hence the table is auto created from - // flow's plan type - let is_auto_create = { - let correct_name = schema - .last() - .map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL) - .unwrap_or(false); - let correct_time_index = meta.schema.timestamp_index == Some(schema.len() - 1); - correct_name && correct_time_index - }; - (primary_keys, schema, is_auto_create) + let time_index = meta.schema.timestamp_index; + Ok(Some((primary_keys, time_index, schema))) } else { - // TODO(discord9): condiser remove buggy auto create by schema + Ok(None) + } + } - let node_ctx = self.node_context.read().await; - let gid: GlobalId = node_ctx - .table_repr - .get_by_name(table_name) - .map(|x| x.1) - .unwrap(); - let schema = node_ctx - .schema - .get(&gid) - .with_context(|| TableNotFoundSnafu { - name: format!("Table name = {:?}", table_name), - })? - .clone(); - // TODO(discord9): use default key from schema - let primary_keys = schema - .typ() - .keys - .first() - .map(|v| { - v.column_indices - .iter() - .map(|i| { - schema - .get_name(*i) - .clone() - .unwrap_or_else(|| format!("col_{i}")) - }) - .collect_vec() - }) - .unwrap_or_default(); - let update_at = ColumnSchema::new( - UPDATE_AT_TS_COL, - ConcreteDataType::timestamp_millisecond_datatype(), - true, - ); + /// return (primary keys, schema and if the table have a placeholder timestamp column) + /// schema of the table comes from flow's output plan + /// + /// adjust to add `update_at` column and ts placeholder if needed + async fn adjust_auto_created_table_schema( + &self, + schema: &RelationDesc, + ) -> Result<(Vec, Vec, bool), Error> { + // TODO(discord9): condiser remove buggy auto create by schema + + // TODO(discord9): use default key from schema + let primary_keys = schema + .typ() + .keys + .first() + .map(|v| { + v.column_indices + .iter() + .map(|i| { + schema + .get_name(*i) + .clone() + .unwrap_or_else(|| format!("col_{i}")) + }) + .collect_vec() + }) + .unwrap_or_default(); + let update_at = ColumnSchema::new( + UPDATE_AT_TS_COL, + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ); - let original_schema = schema - .typ() - .column_types - .clone() - .into_iter() - .enumerate() - .map(|(idx, typ)| { - let name = schema - .names - .get(idx) - .cloned() - .flatten() - .unwrap_or(format!("col_{}", idx)); - let ret = ColumnSchema::new(name, typ.scalar_type, typ.nullable); - if schema.typ().time_index == Some(idx) { - ret.with_time_index(true) - } else { - ret - } - }) - .collect_vec(); + let original_schema = relation_desc_to_column_schemas_with_fallback(schema); - let mut with_auto_added_col = original_schema.clone(); - with_auto_added_col.push(update_at); + let mut with_auto_added_col = original_schema.clone(); + with_auto_added_col.push(update_at); - // if no time index, add one as placeholder - let no_time_index = schema.typ().time_index.is_none(); - if no_time_index { - let ts_col = ColumnSchema::new( - AUTO_CREATED_PLACEHOLDER_TS_COL, - ConcreteDataType::timestamp_millisecond_datatype(), - true, - ) - .with_time_index(true); - with_auto_added_col.push(ts_col); - } + // if no time index, add one as placeholder + let no_time_index = schema.typ().time_index.is_none(); + if no_time_index { + let ts_col = ColumnSchema::new( + AUTO_CREATED_PLACEHOLDER_TS_COL, + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ) + .with_time_index(true); + with_auto_added_col.push(ts_col); + } - (primary_keys, with_auto_added_col, no_time_index) - }; - let proto_schema = column_schemas_to_proto(schema, &primary_keys)?; - Ok((is_ts_placeholder, proto_schema)) + Ok((primary_keys, with_auto_added_col, no_time_index)) } } @@ -813,7 +784,85 @@ impl FlowWorkerManager { let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?; debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan); - node_ctx.assign_table_schema(&sink_table_name, flow_plan.schema.clone())?; + + // check schema against actual table schema if exists + // if not exist create sink table immediately + if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? { + let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema); + + // for column schema, only `data_type` need to be check for equality + // since one can omit flow's column name when write flow query + // print a user friendly error message about mismatch and how to correct them + for (idx, zipped) in auto_schema + .iter() + .zip_longest(real_schema.iter()) + .enumerate() + { + match zipped { + EitherOrBoth::Both(auto, real) => { + if auto.data_type != real.data_type { + InvalidQuerySnafu { + reason: format!( + "Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}", + idx, + real.name, + auto.name, + real.data_type, + auto.data_type + ), + } + .fail()?; + } + } + EitherOrBoth::Right(real) if real.data_type.is_timestamp() => { + // if table is auto created, the last one or two column should be timestamp(update at and ts placeholder) + continue; + } + _ => InvalidQuerySnafu { + reason: format!( + "schema length mismatched, expected {} found {}", + real_schema.len(), + auto_schema.len() + ), + } + .fail()?, + } + } + + let table_id = self + .table_info_source + .get_table_id_from_name(&sink_table_name) + .await? + .context(UnexpectedSnafu { + reason: format!("Can't get table id for table name {:?}", sink_table_name), + })?; + let table_info_value = self + .table_info_source + .get_table_info_value(&table_id) + .await? + .context(UnexpectedSnafu { + reason: format!("Can't get table info value for table id {:?}", table_id), + })?; + let real_schema = table_info_value_to_relation_desc(table_info_value)?; + node_ctx.assign_table_schema(&sink_table_name, real_schema.clone())?; + } else { + // assign inferred schema to sink table + // create sink table + node_ctx.assign_table_schema(&sink_table_name, flow_plan.schema.clone())?; + let did_create = self + .create_table_from_relation( + &format!("flow-id={flow_id}"), + &sink_table_name, + &flow_plan.schema, + ) + .await?; + if !did_create { + UnexpectedSnafu { + reason: format!("Failed to create table {:?}", sink_table_name), + } + .fail()?; + } + } let _ = comment; let _ = flow_options; diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 990fdd129797..5c644803ec71 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -331,12 +331,14 @@ impl FlownodeContext { } else { let global_id = self.new_global_id(); + // table id is Some meaning db must have created the table if let Some(table_id) = table_id { let (known_table_name, schema) = srv_map.get_table_name_schema(&table_id).await?; table_name = table_name.or(Some(known_table_name)); self.schema.insert(global_id, schema); } // if we don't have table id, it means database havn't assign one yet or we don't need it + // still update the mapping with new global id self.table_repr.insert(table_name, table_id, global_id); Ok(global_id) } @@ -358,6 +360,7 @@ impl FlownodeContext { })?; self.schema.insert(gid, schema); + Ok(()) } diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs index 0454ab16b1d3..7981999f0abc 100644 --- a/src/flow/src/adapter/table_source.rs +++ b/src/flow/src/adapter/table_source.rs @@ -20,11 +20,12 @@ use common_meta::key::table_name::{TableNameKey, TableNameManager}; use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; +use crate::adapter::util::table_info_value_to_relation_desc; use crate::adapter::TableName; use crate::error::{ Error, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu, }; -use crate::repr::{self, ColumnType, RelationDesc, RelationType}; +use crate::repr::RelationDesc; /// mapping of table name <-> table id should be query from tableinfo manager pub struct TableSource { @@ -121,38 +122,7 @@ impl TableSource { table_name.table_name, ]; - let raw_schema = table_info_value.table_info.meta.schema; - let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema - .column_schemas - .clone() - .into_iter() - .map(|col| { - ( - ColumnType { - nullable: col.is_nullable(), - scalar_type: col.data_type, - }, - Some(col.name), - ) - }) - .unzip(); - - let key = table_info_value.table_info.meta.primary_key_indices; - let keys = vec![repr::Key::from(key)]; - - let time_index = raw_schema.timestamp_index; - Ok(( - table_name, - RelationDesc { - typ: RelationType { - column_types, - keys, - time_index, - // by default table schema's column are all non-auto - auto_columns: vec![], - }, - names: col_names, - }, - )) + let desc = table_info_value_to_relation_desc(table_info_value)?; + Ok((table_name, desc)) } } diff --git a/src/flow/src/adapter/util.rs b/src/flow/src/adapter/util.rs index f2a29bec8e9e..f5d2968543f9 100644 --- a/src/flow/src/adapter/util.rs +++ b/src/flow/src/adapter/util.rs @@ -12,16 +12,153 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use api::helper::ColumnDataTypeWrapper; use api::v1::column_def::options_from_column_schema; -use api::v1::{ColumnDataType, ColumnDataTypeExtension, SemanticType}; +use api::v1::{ColumnDataType, ColumnDataTypeExtension, CreateTableExpr, SemanticType}; use common_error::ext::BoxedError; +use common_meta::key::table_info::TableInfoValue; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use itertools::Itertools; -use snafu::ResultExt; +use operator::expr_factory::CreateExprFactory; +use session::context::QueryContextBuilder; +use snafu::{OptionExt, ResultExt}; +use table::table_reference::TableReference; + +use crate::adapter::{TableName, AUTO_CREATED_PLACEHOLDER_TS_COL}; +use crate::error::{Error, ExternalSnafu, UnexpectedSnafu}; +use crate::repr::{ColumnType, RelationDesc, RelationType}; +use crate::FlowWorkerManager; + +impl FlowWorkerManager { + /// Create table from given schema(will adjust to add auto column if needed), return true if table is created + pub(crate) async fn create_table_from_relation( + &self, + flow_name: &str, + table_name: &TableName, + relation_desc: &RelationDesc, + ) -> Result { + if self.fetch_table_pk_schema(table_name).await?.is_some() { + return Ok(false); + } + let (pks, tys, _) = self.adjust_auto_created_table_schema(relation_desc).await?; + + //create sink table using pks, column types and is_ts_auto + + let proto_schema = column_schemas_to_proto(tys.clone(), &pks)?; + + // create sink table + let create_expr = CreateExprFactory {} + .create_table_expr_by_column_schemas( + &TableReference { + catalog: &table_name[0], + schema: &table_name[1], + table: &table_name[2], + }, + &proto_schema, + "mito", + Some(&format!("Sink table for flow {}", flow_name)), + ) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; -use crate::error::{Error, ExternalSnafu}; + self.submit_create_sink_table_ddl(create_expr).await?; + Ok(true) + } + + /// Try fetch table with adjusted schema(added auto column if needed) + pub(crate) async fn try_fetch_existing_table( + &self, + table_name: &TableName, + ) -> Result)>, Error> { + if let Some((primary_keys, time_index, schema)) = + self.fetch_table_pk_schema(table_name).await? + { + // check if the last column is the auto created timestamp column, hence the table is auto created from + // flow's plan type + let is_auto_create = { + let correct_name = schema + .last() + .map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL) + .unwrap_or(false); + let correct_time_index = time_index == Some(schema.len() - 1); + correct_name && correct_time_index + }; + let proto_schema = column_schemas_to_proto(schema, &primary_keys)?; + Ok(Some((is_auto_create, proto_schema))) + } else { + Ok(None) + } + } + + /// submit a create table ddl + pub(crate) async fn submit_create_sink_table_ddl( + &self, + mut create_table: CreateTableExpr, + ) -> Result<(), Error> { + let stmt_exec = { + self.frontend_invoker + .read() + .await + .as_ref() + .map(|f| f.statement_executor()) + } + .context(UnexpectedSnafu { + reason: "Failed to get statement executor", + })?; + let ctx = Arc::new( + QueryContextBuilder::default() + .current_catalog(create_table.catalog_name.clone()) + .current_schema(create_table.schema_name.clone()) + .build(), + ); + stmt_exec + .create_table_inner(&mut create_table, None, ctx) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + Ok(()) + } +} + +pub fn table_info_value_to_relation_desc( + table_info_value: TableInfoValue, +) -> Result { + let raw_schema = table_info_value.table_info.meta.schema; + let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema + .column_schemas + .clone() + .into_iter() + .map(|col| { + ( + ColumnType { + nullable: col.is_nullable(), + scalar_type: col.data_type, + }, + Some(col.name), + ) + }) + .unzip(); + + let key = table_info_value.table_info.meta.primary_key_indices; + let keys = vec![crate::repr::Key::from(key)]; + + let time_index = raw_schema.timestamp_index; + + Ok(RelationDesc { + typ: RelationType { + column_types, + keys, + time_index, + // by default table schema's column are all non-auto + auto_columns: vec![], + }, + names: col_names, + }) +} pub fn from_proto_to_data_type( column_schema: &api::v1::ColumnSchema, @@ -75,3 +212,29 @@ pub fn column_schemas_to_proto( .collect(); Ok(ret) } + +/// Convert `RelationDesc` to `ColumnSchema` list, +/// if the column name is not present, use `col_{idx}` as the column name +pub fn relation_desc_to_column_schemas_with_fallback(schema: &RelationDesc) -> Vec { + schema + .typ() + .column_types + .clone() + .into_iter() + .enumerate() + .map(|(idx, typ)| { + let name = schema + .names + .get(idx) + .cloned() + .flatten() + .unwrap_or(format!("col_{}", idx)); + let ret = ColumnSchema::new(name, typ.scalar_type, typ.nullable); + if schema.typ().time_index == Some(idx) { + ret.with_time_index(true) + } else { + ret + } + }) + .collect_vec() +} diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index a94de4b6ed7b..137e024307f9 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -16,12 +16,13 @@ use std::any::Any; -use common_error::define_into_tonic_status; use common_error::ext::BoxedError; +use common_error::{define_into_tonic_status, from_err_code_msg_to_header}; use common_macro::stack_trace_debug; use common_telemetry::common_error::ext::ErrorExt; use common_telemetry::common_error::status_code::StatusCode; use snafu::{Location, Snafu}; +use tonic::metadata::MetadataMap; use crate::adapter::FlowId; use crate::expr::EvalError; @@ -186,6 +187,20 @@ pub enum Error { }, } +/// the outer message is the full error stack, and inner message in header is the last error message that can be show directly to user +pub fn to_status_with_last_err(err: impl ErrorExt) -> tonic::Status { + let msg = err.to_string(); + let last_err_msg = common_error::ext::StackError::last(&err).to_string(); + let code = err.status_code() as u32; + let header = from_err_code_msg_to_header(code, &last_err_msg); + + tonic::Status::with_metadata( + tonic::Code::InvalidArgument, + msg, + MetadataMap::from_headers(header), + ) +} + /// Result type for flow module pub type Result = std::result::Result; @@ -200,9 +215,8 @@ impl ErrorExt for Error { | Self::TableNotFoundMeta { .. } | Self::FlowNotFound { .. } | Self::ListFlows { .. } => StatusCode::TableNotFound, - Self::InvalidQuery { .. } | Self::Plan { .. } | Self::Datatypes { .. } => { - StatusCode::PlanQuery - } + Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery, + Self::InvalidQuery { .. } => StatusCode::EngineExecuteQuery, Self::Unexpected { .. } => StatusCode::Unexpected, Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => { StatusCode::Unsupported diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs index d0fbb861eb24..168b5df7d0e2 100644 --- a/src/flow/src/repr/relation.rs +++ b/src/flow/src/repr/relation.rs @@ -212,6 +212,8 @@ impl RelationType { for key in &mut self.keys { key.remove_col(time_index.unwrap_or(usize::MAX)); } + // remove empty keys + self.keys.retain(|key| !key.is_empty()); self } diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 37e3249ec82a..d22ba220441b 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -50,8 +50,8 @@ use tonic::{Request, Response, Status}; use crate::adapter::{CreateFlowArgs, FlowWorkerManagerRef}; use crate::error::{ - CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu, ParseAddrSnafu, - ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, + to_status_with_last_err, CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu, + ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, }; use crate::heartbeat::HeartbeatTask; use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS}; @@ -87,10 +87,7 @@ impl flow_server::Flow for FlowService { .handle(request) .await .map(Response::new) - .map_err(|e| { - let msg = format!("failed to handle request: {:?}", e); - Status::internal(msg) - }) + .map_err(to_status_with_last_err) } async fn handle_mirror_request( @@ -126,10 +123,7 @@ impl flow_server::Flow for FlowService { .handle_inserts(request) .await .map(Response::new) - .map_err(|e| { - let msg = format!("failed to handle request: {:?}", e); - Status::internal(msg) - }) + .map_err(to_status_with_last_err) } } @@ -544,4 +538,8 @@ impl FrontendInvoker { .map_err(BoxedError::new) .context(common_frontend::error::ExternalSnafu) } + + pub fn statement_executor(&self) -> Arc { + self.statement_executor.clone() + } } diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index 2d0a6f66dab8..b944e3b263e3 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -216,6 +216,7 @@ impl KeyValPlan { /// find out the column that should be time index in group exprs(which is all columns that should be keys) /// TODO(discord9): better ways to assign time index +/// for now, it will found the first column that is timestamp or has a tumble window floor function fn find_time_index_in_group_exprs(group_exprs: &[TypedExpr]) -> Option { group_exprs.iter().position(|expr| { matches!( @@ -224,7 +225,7 @@ fn find_time_index_in_group_exprs(group_exprs: &[TypedExpr]) -> Option { func: UnaryFunc::TumbleWindowFloor { .. }, expr: _ } - ) + ) || expr.typ.scalar_type.is_timestamp() }) } @@ -1482,7 +1483,7 @@ mod test { ColumnType::new(CDT::float64_datatype(), true), ColumnType::new(CDT::timestamp_millisecond_datatype(), true), ]) - .with_key(vec![1]) + .with_time_index(Some(1)) .into_named(vec![ Some( "MAX(numbers_with_ts.number) - MIN(numbers_with_ts.number) / Float64(30)" @@ -1571,7 +1572,7 @@ mod test { ColumnType::new(ConcreteDataType::uint32_datatype(), true), // max ColumnType::new(ConcreteDataType::uint32_datatype(), true), // min ]) - .with_key(vec![0]) + .with_time_index(Some(0)) .into_unnamed(), ), ), diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index 59fe87a66ec3..bc50eff161b5 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -68,6 +68,7 @@ impl CreateExprFactory { table_name: &TableReference<'_>, column_schemas: &[api::v1::ColumnSchema], engine: &str, + desc: Option<&str>, ) -> Result { let column_exprs = ColumnExpr::from_column_schemas(column_schemas); let create_expr = common_grpc_expr::util::build_create_table_expr( @@ -75,7 +76,7 @@ impl CreateExprFactory { table_name, column_exprs, engine, - "Created on insertion", + desc.unwrap_or("Created on insertion"), ) .context(BuildCreateExprOnInsertionSnafu)?; diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 466dde5425c1..6b7702f25b8b 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -870,5 +870,5 @@ fn build_create_table_expr( request_schema: &[ColumnSchema], engine: &str, ) -> Result { - CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, engine) + CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, engine, None) } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 74adaffd5ea8..33831ba639bb 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -56,7 +56,7 @@ futures-util.workspace = true hashbrown = "0.14" headers = "0.3" hostname = "0.3" -http = "0.2" +http.workspace = true http-body = "0.4" humantime.workspace = true humantime-serde.workspace = true diff --git a/src/servers/src/http/result/error_result.rs b/src/servers/src/http/result/error_result.rs index be5f01f9e950..3c7c718e5ba4 100644 --- a/src/servers/src/http/result/error_result.rs +++ b/src/servers/src/http/result/error_result.rs @@ -12,17 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use axum::http::HeaderValue; use axum::response::{IntoResponse, Response}; use axum::Json; use common_error::ext::ErrorExt; +use common_error::from_err_code_msg_to_header; use common_error::status_code::StatusCode; use common_telemetry::{debug, error}; use serde::{Deserialize, Serialize}; use crate::error::status_code_to_http_status; -use crate::http::header::constants::GREPTIME_DB_HEADER_ERROR_CODE; -use crate::http::header::GREPTIME_DB_HEADER_EXECUTION_TIME; #[derive(Serialize, Deserialize, Debug)] pub struct ErrorResponse { @@ -74,13 +72,16 @@ impl IntoResponse for ErrorResponse { fn into_response(self) -> Response { let code = self.code; let execution_time = self.execution_time_ms; - let mut resp = Json(self).into_response(); - resp.headers_mut() - .insert(GREPTIME_DB_HEADER_ERROR_CODE, HeaderValue::from(code)); - resp.headers_mut().insert( - &GREPTIME_DB_HEADER_EXECUTION_TIME, - HeaderValue::from(execution_time), + let new_header = from_err_code_msg_to_header( + code, + &format!( + "error: {}, execution_time_ms: {}", + self.error, execution_time + ), ); + let mut resp = Json(self).into_response(); + resp.headers_mut().extend(new_header); + let status = StatusCode::from_u32(code).unwrap_or(StatusCode::Unknown); let status_code = status_code_to_http_status(&status); diff --git a/tests/cases/standalone/common/flow/flow_auto_sink_table.result b/tests/cases/standalone/common/flow/flow_auto_sink_table.result new file mode 100644 index 000000000000..e8bd5cf739f4 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_auto_sink_table.result @@ -0,0 +1,161 @@ +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(number) +FROM + numbers_input_basic +GROUP BY + tumble(ts, '1 second', '2021-07-01 00:00:00'); + +Affected Rows: 0 + +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "SUM(numbers_input_basic.number)" BIGINT NULL, | +| | "window_start" TIMESTAMP(3) NOT NULL, | +| | "window_end" TIMESTAMP(3) NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("window_start"), | +| | PRIMARY KEY ("window_end") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + ++----------------------------------------+ +| ADMIN FLUSH_FLOW('test_numbers_basic') | ++----------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------+ + +-- SQLNESS ARG restart=true +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "SUM(numbers_input_basic.number)" BIGINT NULL, | +| | "window_start" TIMESTAMP(3) NOT NULL, | +| | "window_end" TIMESTAMP(3) NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("window_start"), | +| | PRIMARY KEY ("window_end") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + +SHOW CREATE FLOW test_numbers_basic; + ++--------------------+-------------------------------------------------------------------------------------------------------+ +| Flow | Create Flow | ++--------------------+-------------------------------------------------------------------------------------------------------+ +| test_numbers_basic | CREATE FLOW IF NOT EXISTS test_numbers_basic | +| | SINK TO out_num_cnt_basic | +| | AS SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00') | ++--------------------+-------------------------------------------------------------------------------------------------------+ + +DROP FLOW test_numbers_basic; + +Affected Rows: 0 + +DROP TABLE numbers_input_basic; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_basic; + +Affected Rows: 0 + +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(number) as sumup, ts as event_time +FROM + numbers_input_basic +GROUP BY + ts; + +Affected Rows: 0 + +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "sumup" BIGINT NULL, | +| | "event_time" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("event_time") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + +-- SQLNESS ARG restart=true +SHOW CREATE FLOW test_numbers_basic; + ++--------------------+---------------------------------------------------------------------------------------+ +| Flow | Create Flow | ++--------------------+---------------------------------------------------------------------------------------+ +| test_numbers_basic | CREATE FLOW IF NOT EXISTS test_numbers_basic | +| | SINK TO out_num_cnt_basic | +| | AS SELECT sum(number) AS sumup, ts AS event_time FROM numbers_input_basic GROUP BY ts | ++--------------------+---------------------------------------------------------------------------------------+ + +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "sumup" BIGINT NULL, | +| | "event_time" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("event_time") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + +DROP FLOW test_numbers_basic; + +Affected Rows: 0 + +DROP TABLE numbers_input_basic; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_basic; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_auto_sink_table.sql b/tests/cases/standalone/common/flow/flow_auto_sink_table.sql new file mode 100644 index 000000000000..0af723770ced --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_auto_sink_table.sql @@ -0,0 +1,58 @@ +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(number) +FROM + numbers_input_basic +GROUP BY + tumble(ts, '1 second', '2021-07-01 00:00:00'); + +SHOW CREATE TABLE out_num_cnt_basic; + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + +-- SQLNESS ARG restart=true +SHOW CREATE TABLE out_num_cnt_basic; + +SHOW CREATE FLOW test_numbers_basic; + +DROP FLOW test_numbers_basic; + +DROP TABLE numbers_input_basic; + +DROP TABLE out_num_cnt_basic; + +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(number) as sumup, ts as event_time +FROM + numbers_input_basic +GROUP BY + ts; + +SHOW CREATE TABLE out_num_cnt_basic; + +-- SQLNESS ARG restart=true +SHOW CREATE FLOW test_numbers_basic; + +SHOW CREATE TABLE out_num_cnt_basic; + +DROP FLOW test_numbers_basic; + +DROP TABLE numbers_input_basic; + +DROP TABLE out_num_cnt_basic; diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result index c70fe54fec19..e17efd74be40 100644 --- a/tests/cases/standalone/common/flow/flow_basic.result +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -17,6 +17,24 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "SUM(numbers_input_basic.number)" BIGINT NULL, | +| | "window_start" TIMESTAMP(3) NOT NULL, | +| | "window_end" TIMESTAMP(3) NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("window_start"), | +| | PRIMARY KEY ("window_end") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | @@ -28,6 +46,24 @@ ADMIN FLUSH_FLOW('test_numbers_basic'); | FLOW_FLUSHED | +----------------------------------------+ +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "SUM(numbers_input_basic.number)" BIGINT NULL, | +| | "window_start" TIMESTAMP(3) NOT NULL, | +| | "window_end" TIMESTAMP(3) NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("window_start"), | +| | PRIMARY KEY ("window_end") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + -- SQLNESS ARG restart=true INSERT INTO numbers_input_basic @@ -130,6 +166,22 @@ FROM Affected Rows: 0 +SHOW CREATE TABLE out_basic; + ++-----------+---------------------------------------------+ +| Table | Create Table | ++-----------+---------------------------------------------+ +| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( | +| | "wildcard" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-----------+---------------------------------------------+ + DROP FLOW test_wildcard_basic; Affected Rows: 0 @@ -142,6 +194,23 @@ FROM Affected Rows: 0 +SHOW CREATE TABLE out_basic; + ++-----------+---------------------------------------------+ +| Table | Create Table | ++-----------+---------------------------------------------+ +| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( | +| | "wildcard" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-----------+---------------------------------------------+ + +-- SQLNESS ARG restart=true INSERT INTO input_basic VALUES @@ -159,6 +228,22 @@ ADMIN FLUSH_FLOW('test_wildcard_basic'); | FLOW_FLUSHED | +-----------------------------------------+ +SHOW CREATE TABLE out_basic; + ++-----------+---------------------------------------------+ +| Table | Create Table | ++-----------+---------------------------------------------+ +| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( | +| | "wildcard" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-----------+---------------------------------------------+ + SELECT wildcard FROM out_basic; +----------+ @@ -197,6 +282,23 @@ FROM Affected Rows: 0 +SHOW CREATE TABLE out_distinct_basic; + ++--------------------+---------------------------------------------------+ +| Table | Create Table | ++--------------------+---------------------------------------------------+ +| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( | +| | "dis" INT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder"), | +| | PRIMARY KEY ("dis") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++--------------------+---------------------------------------------------+ + -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | @@ -320,7 +422,9 @@ CREATE TABLE numbers_input_basic ( Affected Rows: 0 create table out_num_cnt_basic ( - number INT, + a INTERVAL, + b INTERVAL, + c INTERVAL, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX ); @@ -348,6 +452,23 @@ SHOW CREATE FLOW filter_numbers_basic; | | AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic WHERE number > 10 | +----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+ +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+-----------------------------------------------------------+ +| Table | Create Table | ++-------------------+-----------------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "a" INTERVAL NULL, | +| | "b" INTERVAL NULL, | +| | "c" INTERVAL NULL, | +| | "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(), | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+-----------------------------------------------------------+ + drop flow filter_numbers_basic; Affected Rows: 0 @@ -390,6 +511,22 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE approx_rate; + ++-------------+--------------------------------------------+ +| Table | Create Table | ++-------------+--------------------------------------------+ +| approx_rate | CREATE TABLE IF NOT EXISTS "approx_rate" ( | +| | "rate" DOUBLE NULL, | +| | "time_window" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+--------------------------------------------+ + INSERT INTO bytes_log VALUES @@ -542,6 +679,23 @@ FROM Affected Rows: 0 +SHOW CREATE TABLE ngx_country; + ++-------------+---------------------------------------------+ +| Table | Create Table | ++-------------+---------------------------------------------+ +| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | +| | "ngx_access_log.country" STRING NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder"), | +| | PRIMARY KEY ("ngx_access_log.country") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+---------------------------------------------+ + INSERT INTO ngx_access_log VALUES @@ -675,6 +829,23 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE ngx_country; + ++-------------+--------------------------------------------+ +| Table | Create Table | ++-------------+--------------------------------------------+ +| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | +| | "ngx_access_log.country" STRING NULL, | +| | "time_window" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window"), | +| | PRIMARY KEY ("ngx_access_log.country") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+--------------------------------------------+ + INSERT INTO ngx_access_log VALUES @@ -693,21 +864,20 @@ ADMIN FLUSH_FLOW('calc_ngx_country'); SHOW CREATE TABLE ngx_country; -+-------------+---------------------------------------------------------+ -| Table | Create Table | -+-------------+---------------------------------------------------------+ -| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | -| | "ngx_access_log.country" STRING NULL, | -| | "time_window" TIMESTAMP(3) NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("__ts_placeholder"), | -| | PRIMARY KEY ("ngx_access_log.country", "time_window") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+-------------+---------------------------------------------------------+ ++-------------+--------------------------------------------+ +| Table | Create Table | ++-------------+--------------------------------------------+ +| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | +| | "ngx_access_log.country" STRING NULL, | +| | "time_window" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window"), | +| | PRIMARY KEY ("ngx_access_log.country") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+--------------------------------------------+ SELECT "ngx_access_log.country", @@ -824,6 +994,23 @@ HAVING Affected Rows: 0 +SHOW CREATE TABLE temp_alerts; + ++-------------+--------------------------------------------+ +| Table | Create Table | ++-------------+--------------------------------------------+ +| temp_alerts | CREATE TABLE IF NOT EXISTS "temp_alerts" ( | +| | "sensor_id" INT NULL, | +| | "loc" STRING NULL, | +| | "max_temp" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+--------------------------------------------+ + INSERT INTO temp_sensor_data VALUES @@ -963,6 +1150,25 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE ngx_distribution; + ++------------------+-------------------------------------------------+ +| Table | Create Table | ++------------------+-------------------------------------------------+ +| ngx_distribution | CREATE TABLE IF NOT EXISTS "ngx_distribution" ( | +| | "stat" INT NULL, | +| | "bucket_size" INT NULL, | +| | "total_logs" BIGINT NULL, | +| | "time_window" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window"), | +| | PRIMARY KEY ("stat", "bucket_size") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++------------------+-------------------------------------------------+ + INSERT INTO ngx_access_log VALUES @@ -1070,6 +1276,23 @@ FROM Affected Rows: 0 +SHOW CREATE TABLE requests_without_ip; + ++---------------------+----------------------------------------------------+ +| Table | Create Table | ++---------------------+----------------------------------------------------+ +| requests_without_ip | CREATE TABLE IF NOT EXISTS "requests_without_ip" ( | +| | "service_name" STRING NULL, | +| | "val" INT NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("service_name") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++---------------------+----------------------------------------------------+ + INSERT INTO requests VALUES @@ -1269,6 +1492,25 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE android_log_abnormal; + ++----------------------+-----------------------------------------------------+ +| Table | Create Table | ++----------------------+-----------------------------------------------------+ +| android_log_abnormal | CREATE TABLE IF NOT EXISTS "android_log_abnormal" ( | +| | "crash" BIGINT NULL, | +| | "fatal" BIGINT NULL, | +| | "backtrace" BIGINT NULL, | +| | "anr" BIGINT NULL, | +| | "time_window" TIMESTAMP(9) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++----------------------+-----------------------------------------------------+ + INSERT INTO android_log values ("am_crash", "2021-07-01 00:01:01.000"), ("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); @@ -1361,6 +1603,25 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE android_log_abnormal; + ++----------------------+-----------------------------------------------------+ +| Table | Create Table | ++----------------------+-----------------------------------------------------+ +| android_log_abnormal | CREATE TABLE IF NOT EXISTS "android_log_abnormal" ( | +| | "crash" BIGINT NULL, | +| | "fatal" BIGINT NULL, | +| | "backtrace" BIGINT NULL, | +| | "anr" BIGINT NULL, | +| | "time_window" TIMESTAMP(9) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++----------------------+-----------------------------------------------------+ + INSERT INTO android_log values ("am_crash", "2021-07-01 00:01:01.000"), ("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); @@ -1419,3 +1680,210 @@ DROP TABLE android_log; Affected Rows: 0 +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(case when number > 10 then 1 else 0 end)/count(number) as avg_after_filter_num +FROM + numbers_input_basic; + +Affected Rows: 0 + +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "avg_after_filter_num" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + +-- TODO(discord9): confirm if it's necessary to flush flow here? +-- because flush_flow result is at most 1 +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + ++----------------------------------------+ +| ADMIN FLUSH_FLOW('test_numbers_basic') | ++----------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------+ + +-- SQLNESS ARG restart=true +INSERT INTO + numbers_input_basic +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + ++----------------------------------------+ +| ADMIN FLUSH_FLOW('test_numbers_basic') | ++----------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------+ + +SELECT avg_after_filter_num FROM out_num_cnt_basic; + ++----------------------+ +| avg_after_filter_num | ++----------------------+ +| 1 | ++----------------------+ + +INSERT INTO + numbers_input_basic +VALUES + (10, "2021-07-01 00:00:00.200"), + (23, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + ++----------------------------------------+ +| ADMIN FLUSH_FLOW('test_numbers_basic') | ++----------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------+ + +DROP FLOW test_numbers_basic; + +Affected Rows: 0 + +DROP TABLE numbers_input_basic; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_basic; + +Affected Rows: 0 + +CREATE TABLE `live_connection_log` +( + `device_model` STRING NULL, + `connect_protocol` INT NULL, + `connect_mode` INT NULL, + `connect_retry_times` DOUBLE NULL, + `connect_result` INT NULL, + `first_frame_time` DOUBLE NULL, + `record_time` TIMESTAMP TIME INDEX, + `iot_online` INT NULL, + PRIMARY KEY (`device_model`,`connect_protocol`), +); + +Affected Rows: 0 + +CREATE TABLE `live_connection_statistics_detail` +( + `device_model` STRING NULL, + `connect_protocol` INT NULL, + `connect_mode` INT NULL, + `avg_connect_retry_times` DOUBLE NULL, + `total_connect_result_ok` INT64 NULL, + `total_connect_result_fail` INT64 NULL, + `total_connect` INT64 NULL, + `conection_rate` DOUBLE NULL, + `avg_first_frame_time` DOUBLE NULL, + `max_first_frame_time` DOUBLE NULL, + `ok_conection_rate` DOUBLE NULL, + `record_time_window` TIMESTAMP TIME INDEX, + `update_at` TIMESTAMP, + PRIMARY KEY (`device_model`,`connect_protocol`), +); + +Affected Rows: 0 + +CREATE FLOW live_connection_aggregation_detail +SINK TO live_connection_statistics_detail +AS +SELECT + device_model, + connect_protocol, + connect_mode, + avg(connect_retry_times) as avg_connect_retry_times, + sum(case when connect_result = 1 then 1 else 0 end) as total_connect_result_ok, + sum(case when connect_result = 0 then 1 else 0 end) as total_connect_result_fail, + count(connect_result) as total_connect, + sum(case when connect_result = 1 then 1 else 0 end)::double / count(connect_result) as conection_rate, + avg(first_frame_time) as avg_first_frame_time, + max(first_frame_time) as max_first_frame_time, + sum(case when connect_result = 1 then 1 else 0 end)::double / count(connect_result) as ok_conection_rate, + date_bin(INTERVAL '1 minutes', record_time) as record_time_window, +FROM live_connection_log +WHERE iot_online = 1 +GROUP BY + device_model, + connect_protocol, + connect_mode, + record_time_window; + +Affected Rows: 0 + +INSERT INTO + live_connection_log +VALUES + ("STM51", 1, 1, 0.5, 1, 0.1, 0, 1); + +Affected Rows: 1 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('live_connection_aggregation_detail'); + ++--------------------------------------------------------+ +| ADMIN FLUSH_FLOW('live_connection_aggregation_detail') | ++--------------------------------------------------------+ +| FLOW_FLUSHED | ++--------------------------------------------------------+ + +SELECT device_model, + connect_protocol, + connect_mode, + avg_connect_retry_times, + total_connect_result_ok, + total_connect_result_fail, + total_connect, + conection_rate, + avg_first_frame_time, + max_first_frame_time, + ok_conection_rate, + record_time_window FROM live_connection_statistics_detail; + ++--------------+------------------+--------------+-------------------------+-------------------------+---------------------------+---------------+----------------+----------------------+----------------------+-------------------+---------------------+ +| device_model | connect_protocol | connect_mode | avg_connect_retry_times | total_connect_result_ok | total_connect_result_fail | total_connect | conection_rate | avg_first_frame_time | max_first_frame_time | ok_conection_rate | record_time_window | ++--------------+------------------+--------------+-------------------------+-------------------------+---------------------------+---------------+----------------+----------------------+----------------------+-------------------+---------------------+ +| STM51 | 1 | 1 | 0.5 | 1 | 0 | 1 | 1.0 | 0.1 | 0.1 | 1.0 | 1970-01-01T00:00:00 | ++--------------+------------------+--------------+-------------------------+-------------------------+---------------------------+---------------+----------------+----------------------+----------------------+-------------------+---------------------+ + +DROP FLOW live_connection_aggregation_detail; + +Affected Rows: 0 + +DROP TABLE live_connection_log; + +Affected Rows: 0 + +DROP TABLE live_connection_statistics_detail; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_basic.sql b/tests/cases/standalone/common/flow/flow_basic.sql index 74abbc85df22..516afa4074a3 100644 --- a/tests/cases/standalone/common/flow/flow_basic.sql +++ b/tests/cases/standalone/common/flow/flow_basic.sql @@ -13,11 +13,15 @@ FROM GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); +SHOW CREATE TABLE out_num_cnt_basic; + -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_numbers_basic'); +SHOW CREATE TABLE out_num_cnt_basic; + -- SQLNESS ARG restart=true INSERT INTO numbers_input_basic @@ -75,6 +79,8 @@ SELECT FROM input_basic; +SHOW CREATE TABLE out_basic; + DROP FLOW test_wildcard_basic; CREATE FLOW test_wildcard_basic sink TO out_basic AS @@ -83,6 +89,9 @@ SELECT FROM input_basic; +SHOW CREATE TABLE out_basic; + +-- SQLNESS ARG restart=true INSERT INTO input_basic VALUES @@ -92,6 +101,8 @@ VALUES -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_wildcard_basic'); +SHOW CREATE TABLE out_basic; + SELECT wildcard FROM out_basic; DROP FLOW test_wildcard_basic; @@ -112,6 +123,8 @@ SELECT FROM distinct_basic; +SHOW CREATE TABLE out_distinct_basic; + -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | @@ -168,7 +181,9 @@ CREATE TABLE numbers_input_basic ( ); create table out_num_cnt_basic ( - number INT, + a INTERVAL, + b INTERVAL, + c INTERVAL, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX ); @@ -184,6 +199,8 @@ where SHOW CREATE FLOW filter_numbers_basic; +SHOW CREATE TABLE out_num_cnt_basic; + drop flow filter_numbers_basic; drop table out_num_cnt_basic; @@ -214,6 +231,8 @@ from GROUP BY time_window; +SHOW CREATE TABLE approx_rate; + INSERT INTO bytes_log VALUES @@ -294,6 +313,8 @@ SELECT FROM ngx_access_log; +SHOW CREATE TABLE ngx_country; + INSERT INTO ngx_access_log VALUES @@ -359,6 +380,8 @@ GROUP BY country, time_window; +SHOW CREATE TABLE ngx_country; + INSERT INTO ngx_access_log VALUES @@ -437,6 +460,8 @@ GROUP BY HAVING max_temp > 100; +SHOW CREATE TABLE temp_alerts; + INSERT INTO temp_sensor_data VALUES @@ -516,6 +541,8 @@ GROUP BY time_window, bucket_size; +SHOW CREATE TABLE ngx_distribution; + INSERT INTO ngx_access_log VALUES @@ -580,6 +607,8 @@ SELECT FROM requests; +SHOW CREATE TABLE requests_without_ip; + INSERT INTO requests VALUES @@ -680,6 +709,8 @@ FROM android_log GROUP BY time_window; +SHOW CREATE TABLE android_log_abnormal; + INSERT INTO android_log values ("am_crash", "2021-07-01 00:01:01.000"), ("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); @@ -732,6 +763,8 @@ FROM android_log GROUP BY time_window; +SHOW CREATE TABLE android_log_abnormal; + INSERT INTO android_log values ("am_crash", "2021-07-01 00:01:01.000"), ("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); @@ -755,3 +788,128 @@ DROP FLOW calc_android_log_abnormal; DROP TABLE android_log_abnormal; DROP TABLE android_log; + +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(case when number > 10 then 1 else 0 end)/count(number) as avg_after_filter_num +FROM + numbers_input_basic; + +SHOW CREATE TABLE out_num_cnt_basic; + +-- TODO(discord9): confirm if it's necessary to flush flow here? +-- because flush_flow result is at most 1 +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + +-- SQLNESS ARG restart=true +INSERT INTO + numbers_input_basic +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + +SELECT avg_after_filter_num FROM out_num_cnt_basic; + +INSERT INTO + numbers_input_basic +VALUES + (10, "2021-07-01 00:00:00.200"), + (23, "2021-07-01 00:00:00.600"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + +DROP FLOW test_numbers_basic; +DROP TABLE numbers_input_basic; +DROP TABLE out_num_cnt_basic; + +CREATE TABLE `live_connection_log` +( + `device_model` STRING NULL, + `connect_protocol` INT NULL, + `connect_mode` INT NULL, + `connect_retry_times` DOUBLE NULL, + `connect_result` INT NULL, + `first_frame_time` DOUBLE NULL, + `record_time` TIMESTAMP TIME INDEX, + `iot_online` INT NULL, + PRIMARY KEY (`device_model`,`connect_protocol`), +); + +CREATE TABLE `live_connection_statistics_detail` +( + `device_model` STRING NULL, + `connect_protocol` INT NULL, + `connect_mode` INT NULL, + `avg_connect_retry_times` DOUBLE NULL, + `total_connect_result_ok` INT64 NULL, + `total_connect_result_fail` INT64 NULL, + `total_connect` INT64 NULL, + `conection_rate` DOUBLE NULL, + `avg_first_frame_time` DOUBLE NULL, + `max_first_frame_time` DOUBLE NULL, + `ok_conection_rate` DOUBLE NULL, + `record_time_window` TIMESTAMP TIME INDEX, + `update_at` TIMESTAMP, + PRIMARY KEY (`device_model`,`connect_protocol`), +); + +CREATE FLOW live_connection_aggregation_detail +SINK TO live_connection_statistics_detail +AS +SELECT + device_model, + connect_protocol, + connect_mode, + avg(connect_retry_times) as avg_connect_retry_times, + sum(case when connect_result = 1 then 1 else 0 end) as total_connect_result_ok, + sum(case when connect_result = 0 then 1 else 0 end) as total_connect_result_fail, + count(connect_result) as total_connect, + sum(case when connect_result = 1 then 1 else 0 end)::double / count(connect_result) as conection_rate, + avg(first_frame_time) as avg_first_frame_time, + max(first_frame_time) as max_first_frame_time, + sum(case when connect_result = 1 then 1 else 0 end)::double / count(connect_result) as ok_conection_rate, + date_bin(INTERVAL '1 minutes', record_time) as record_time_window, +FROM live_connection_log +WHERE iot_online = 1 +GROUP BY + device_model, + connect_protocol, + connect_mode, + record_time_window; + +INSERT INTO + live_connection_log +VALUES + ("STM51", 1, 1, 0.5, 1, 0.1, 0, 1); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('live_connection_aggregation_detail'); + +SELECT device_model, + connect_protocol, + connect_mode, + avg_connect_retry_times, + total_connect_result_ok, + total_connect_result_fail, + total_connect, + conection_rate, + avg_first_frame_time, + max_first_frame_time, + ok_conection_rate, + record_time_window FROM live_connection_statistics_detail; + +DROP FLOW live_connection_aggregation_detail; +DROP TABLE live_connection_log; +DROP TABLE live_connection_statistics_detail; diff --git a/tests/cases/standalone/common/flow/show_create_flow.result b/tests/cases/standalone/common/flow/show_create_flow.result index 38fa609c960d..14e80129446d 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.result +++ b/tests/cases/standalone/common/flow/show_create_flow.result @@ -322,3 +322,87 @@ drop table numbers_input_show; Affected Rows: 0 +CREATE TABLE numbers_input_show ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +create table out_num_cnt_show ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX, + PRIMARY KEY(number), +); + +Affected Rows: 0 + +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number as n1 FROM numbers_input_show where number > 10; + +Affected Rows: 0 + +INSERT INTO numbers_input_show VALUES (10, 0),(15, 1),(16, 2); + +Affected Rows: 3 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + ++-----------------------------------------+ +| ADMIN FLUSH_FLOW('filter_numbers_show') | ++-----------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------+ + +SELECT number FROM out_num_cnt_show; + ++--------+ +| number | ++--------+ +| 15 | +| 16 | ++--------+ + +-- should mismatch +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2 FROM numbers_input_show where number > 15; + +Error: 3001(EngineExecuteQuery), Invalid query: Column 1(name is 'ts', flow inferred name is 'n2')'s data type mismatch, expect Timestamp(Millisecond(TimestampMillisecondType)) got Int32(Int32Type) + +-- should mismatch +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2, number AS n3 FROM numbers_input_show where number > 15; + +Error: 3001(EngineExecuteQuery), Invalid query: Column 1(name is 'ts', flow inferred name is 'n2')'s data type mismatch, expect Timestamp(Millisecond(TimestampMillisecondType)) got Int32(Int32Type) + +INSERT INTO numbers_input_show VALUES (10, 6),(15, 7),(18, 3); + +Affected Rows: 3 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + +Error: 1003(Internal), Internal error: 1003 + +-- sink table stays the same since the flow error out due to column mismatch +SELECT number FROM out_num_cnt_show; + ++--------+ +| number | ++--------+ +| 15 | +| 16 | ++--------+ + +DROP FLOW filter_numbers_show; + +Affected Rows: 0 + +drop table out_num_cnt_show; + +Affected Rows: 0 + +drop table numbers_input_show; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/show_create_flow.sql b/tests/cases/standalone/common/flow/show_create_flow.sql index 7348a83b5103..f445c4f254c4 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.sql +++ b/tests/cases/standalone/common/flow/show_create_flow.sql @@ -125,3 +125,45 @@ DROP FLOW filter_numbers_show; drop table out_num_cnt_show; drop table numbers_input_show; + +CREATE TABLE numbers_input_show ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); +create table out_num_cnt_show ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX, + PRIMARY KEY(number), +); + +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number as n1 FROM numbers_input_show where number > 10; + +INSERT INTO numbers_input_show VALUES (10, 0),(15, 1),(16, 2); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + +SELECT number FROM out_num_cnt_show; + + +-- should mismatch +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2 FROM numbers_input_show where number > 15; + +-- should mismatch +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2, number AS n3 FROM numbers_input_show where number > 15; + +INSERT INTO numbers_input_show VALUES (10, 6),(15, 7),(18, 3); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + +-- sink table stays the same since the flow error out due to column mismatch +SELECT number FROM out_num_cnt_show; + +DROP FLOW filter_numbers_show; + +drop table out_num_cnt_show; + +drop table numbers_input_show;