diff --git a/Cargo.lock b/Cargo.lock index 7049caad46e6..fb778e623bc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2016,6 +2016,7 @@ dependencies = [ name = "common-error" version = "0.12.0" dependencies = [ + "http 0.2.12", "snafu 0.8.5", "strum 0.25.0", "tonic 0.11.0", @@ -4061,6 +4062,7 @@ dependencies = [ "get-size-derive2", "get-size2", "greptime-proto", + "http 0.2.12", "hydroflow", "itertools 0.10.5", "lazy_static", diff --git a/Cargo.toml b/Cargo.toml index 9729d5796d74..2156a3fcfc51 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -126,6 +126,7 @@ futures = "0.3" futures-util = "0.3" greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a875e976441188028353f7274a46a7e6e065c5d4" } hex = "0.4" +http = "0.2" humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/common/error/Cargo.toml b/src/common/error/Cargo.toml index 49eafb81d5a2..148e2c66336f 100644 --- a/src/common/error/Cargo.toml +++ b/src/common/error/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true workspace = true [dependencies] +http.workspace = true snafu.workspace = true strum.workspace = true tonic.workspace = true diff --git a/src/common/error/src/lib.rs b/src/common/error/src/lib.rs index c5c0e6efe092..0052d70cf38e 100644 --- a/src/common/error/src/lib.rs +++ b/src/common/error/src/lib.rs @@ -18,9 +18,30 @@ pub mod ext; pub mod mock; pub mod status_code; +use http::{HeaderMap, HeaderValue}; pub use snafu; // HACK - these headers are here for shared in gRPC services. For common HTTP headers, // please define in `src/servers/src/http/header.rs`. pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code"; pub const GREPTIME_DB_HEADER_ERROR_MSG: &str = "x-greptime-err-msg"; + +/// Create a http header map from error code and message. +/// using `GREPTIME_DB_HEADER_ERROR_CODE` and `GREPTIME_DB_HEADER_ERROR_MSG` as keys. +pub fn from_err_code_msg_to_header(code: u32, msg: &str) -> HeaderMap { + let mut header = HeaderMap::new(); + + let msg = HeaderValue::from_str(msg).unwrap_or_else(|_| { + HeaderValue::from_bytes( + &msg.as_bytes() + .iter() + .flat_map(|b| std::ascii::escape_default(*b)) + .collect::>(), + ) + .expect("Already escaped string should be valid ascii") + }); + + header.insert(GREPTIME_DB_HEADER_ERROR_CODE, code.into()); + header.insert(GREPTIME_DB_HEADER_ERROR_MSG, msg); + header +} diff --git a/src/common/function/src/scalars/vector.rs b/src/common/function/src/scalars/vector.rs index d462b917af59..b3a6f105ad01 100644 --- a/src/common/function/src/scalars/vector.rs +++ b/src/common/function/src/scalars/vector.rs @@ -17,6 +17,7 @@ mod distance; pub(crate) mod impl_conv; mod scalar_add; mod scalar_mul; +mod vector_mul; use std::sync::Arc; @@ -38,5 +39,8 @@ impl VectorFunction { // scalar calculation registry.register(Arc::new(scalar_add::ScalarAddFunction)); registry.register(Arc::new(scalar_mul::ScalarMulFunction)); + + // vector calculation + registry.register(Arc::new(vector_mul::VectorMulFunction)); } } diff --git a/src/common/function/src/scalars/vector/vector_mul.rs b/src/common/function/src/scalars/vector/vector_mul.rs new file mode 100644 index 000000000000..02e9833623e9 --- /dev/null +++ b/src/common/function/src/scalars/vector/vector_mul.rs @@ -0,0 +1,205 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; +use std::fmt::Display; + +use common_query::error::{InvalidFuncArgsSnafu, Result}; +use common_query::prelude::Signature; +use datatypes::prelude::ConcreteDataType; +use datatypes::scalars::ScalarVectorBuilder; +use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef}; +use nalgebra::DVectorView; +use snafu::ensure; + +use crate::function::{Function, FunctionContext}; +use crate::helper; +use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit}; + +const NAME: &str = "vec_mul"; + +/// Multiplies corresponding elements of two vectors. +/// +/// # Example +/// +/// ```sql +/// SELECT vec_to_string(vec_mul("[1, 2, 3]", "[1, 2, 3]")) as result; +/// +/// +---------+ +/// | result | +/// +---------+ +/// | [1,4,9] | +/// +---------+ +/// +/// ``` +#[derive(Debug, Clone, Default)] +pub struct VectorMulFunction; + +impl Function for VectorMulFunction { + fn name(&self) -> &str { + NAME + } + + fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { + Ok(ConcreteDataType::binary_datatype()) + } + + fn signature(&self) -> Signature { + helper::one_of_sigs2( + vec![ + ConcreteDataType::string_datatype(), + ConcreteDataType::binary_datatype(), + ], + vec![ + ConcreteDataType::string_datatype(), + ConcreteDataType::binary_datatype(), + ], + ) + } + + fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { + ensure!( + columns.len() == 2, + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect exactly two, have: {}", + columns.len() + ), + } + ); + + let arg0 = &columns[0]; + let arg1 = &columns[1]; + + let len = arg0.len(); + let mut result = BinaryVectorBuilder::with_capacity(len); + if len == 0 { + return Ok(result.to_vector()); + } + + let arg0_const = as_veclit_if_const(arg0)?; + let arg1_const = as_veclit_if_const(arg1)?; + + for i in 0..len { + let arg0 = match arg0_const.as_ref() { + Some(arg0) => Some(Cow::Borrowed(arg0.as_ref())), + None => as_veclit(arg0.get_ref(i))?, + }; + + let arg1 = match arg1_const.as_ref() { + Some(arg1) => Some(Cow::Borrowed(arg1.as_ref())), + None => as_veclit(arg1.get_ref(i))?, + }; + + if let (Some(arg0), Some(arg1)) = (arg0, arg1) { + ensure!( + arg0.len() == arg1.len(), + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the vectors must match for multiplying, have: {} vs {}", + arg0.len(), + arg1.len() + ), + } + ); + let vec0 = DVectorView::from_slice(&arg0, arg0.len()); + let vec1 = DVectorView::from_slice(&arg1, arg1.len()); + let vec_res = vec1.component_mul(&vec0); + + let veclit = vec_res.as_slice(); + let binlit = veclit_to_binlit(veclit); + result.push(Some(&binlit)); + } else { + result.push_null(); + } + } + + Ok(result.to_vector()) + } +} + +impl Display for VectorMulFunction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", NAME.to_ascii_uppercase()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_query::error; + use datatypes::vectors::StringVector; + + use super::*; + + #[test] + fn test_vector_mul() { + let func = VectorMulFunction; + + let vec0 = vec![1.0, 2.0, 3.0]; + let vec1 = vec![1.0, 1.0]; + let (len0, len1) = (vec0.len(), vec1.len()); + let input0 = Arc::new(StringVector::from(vec![Some(format!("{vec0:?}"))])); + let input1 = Arc::new(StringVector::from(vec![Some(format!("{vec1:?}"))])); + + let err = func + .eval(FunctionContext::default(), &[input0, input1]) + .unwrap_err(); + + match err { + error::Error::InvalidFuncArgs { err_msg, .. } => { + assert_eq!( + err_msg, + format!( + "The length of the vectors must match for multiplying, have: {} vs {}", + len0, len1 + ) + ) + } + _ => unreachable!(), + } + + let input0 = Arc::new(StringVector::from(vec![ + Some("[1.0,2.0,3.0]".to_string()), + Some("[8.0,10.0,12.0]".to_string()), + Some("[7.0,8.0,9.0]".to_string()), + None, + ])); + + let input1 = Arc::new(StringVector::from(vec![ + Some("[1.0,1.0,1.0]".to_string()), + Some("[2.0,2.0,2.0]".to_string()), + None, + Some("[3.0,3.0,3.0]".to_string()), + ])); + + let result = func + .eval(FunctionContext::default(), &[input0, input1]) + .unwrap(); + + let result = result.as_ref(); + assert_eq!(result.len(), 4); + assert_eq!( + result.get_ref(0).as_binary().unwrap(), + Some(veclit_to_binlit(&[1.0, 2.0, 3.0]).as_slice()) + ); + assert_eq!( + result.get_ref(1).as_binary().unwrap(), + Some(veclit_to_binlit(&[16.0, 20.0, 24.0]).as_slice()) + ); + assert!(result.get_ref(2).is_null()); + assert!(result.get_ref(3).is_null()); + } +} diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 2dcaf3ecb52a..19f3c6e55fb1 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -28,8 +28,8 @@ use snafu::{ensure, ResultExt}; use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result}; use crate::prelude::ConcreteDataType; pub use crate::schema::column_schema::{ - ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, SkipIndexType, SkippingIndexOptions, - COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER, + ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, SkippingIndexOptions, + SkippingIndexType, COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, TIME_INDEX_KEY, diff --git a/src/datatypes/src/schema/column_schema.rs b/src/datatypes/src/schema/column_schema.rs index 7a96ab5e2bf2..74e066adc7b4 100644 --- a/src/datatypes/src/schema/column_schema.rs +++ b/src/datatypes/src/schema/column_schema.rs @@ -543,7 +543,7 @@ pub struct SkippingIndexOptions { pub granularity: u32, /// The type of the skip index. #[serde(default)] - pub index_type: SkipIndexType, + pub index_type: SkippingIndexType, } impl fmt::Display for SkippingIndexOptions { @@ -556,15 +556,15 @@ impl fmt::Display for SkippingIndexOptions { /// Skip index types. #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)] -pub enum SkipIndexType { +pub enum SkippingIndexType { #[default] BloomFilter, } -impl fmt::Display for SkipIndexType { +impl fmt::Display for SkippingIndexType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - SkipIndexType::BloomFilter => write!(f, "BLOOM"), + SkippingIndexType::BloomFilter => write!(f, "BLOOM"), } } } @@ -587,7 +587,7 @@ impl TryFrom> for SkippingIndexOptions { // Parse index type with default value BloomFilter let index_type = match options.get(COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE) { Some(typ) => match typ.to_ascii_uppercase().as_str() { - "BLOOM" => SkipIndexType::BloomFilter, + "BLOOM" => SkippingIndexType::BloomFilter, _ => { return error::InvalidSkippingIndexOptionSnafu { msg: format!("Invalid index type: {typ}, expected: 'BLOOM'"), @@ -595,7 +595,7 @@ impl TryFrom> for SkippingIndexOptions { .fail(); } }, - None => SkipIndexType::default(), + None => SkippingIndexType::default(), }; Ok(SkippingIndexOptions { diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 08867d342a74..b624eed05b9b 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -45,6 +45,7 @@ get-size2 = "0.1.2" greptime-proto.workspace = true # This fork of hydroflow is simply for keeping our dependency in our org, and pin the version # otherwise it is the same with upstream repo +http.workspace = true hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" } itertools.workspace = true lazy_static.workspace = true diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 6d70377cf2aa..dcdb1b1eb01a 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -30,7 +30,7 @@ use common_telemetry::{debug, info, trace}; use datatypes::schema::ColumnSchema; use datatypes::value::Value; use greptime_proto::v1; -use itertools::Itertools; +use itertools::{EitherOrBoth, Itertools}; use meta_client::MetaClientOptions; use query::QueryEngine; use serde::{Deserialize, Serialize}; @@ -46,17 +46,19 @@ use tokio::sync::{broadcast, watch, Mutex, RwLock}; pub(crate) use crate::adapter::node_context::FlownodeContext; use crate::adapter::table_source::TableSource; -use crate::adapter::util::column_schemas_to_proto; +use crate::adapter::util::{ + relation_desc_to_column_schemas_with_fallback, table_info_value_to_relation_desc, +}; use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; use crate::compute::ErrCollector; use crate::df_optimizer::sql_to_flow_plan; use crate::error::{ - EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, TableNotFoundSnafu, + EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu, }; -use crate::expr::{Batch, GlobalId}; -use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_RUN_INTERVAL_MS}; -use crate::repr::{self, DiffRow, Row, BATCH_SIZE}; +use crate::expr::Batch; +use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS}; +use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE}; mod flownode_impl; mod parse_expr; @@ -245,16 +247,26 @@ impl FlowWorkerManager { let (catalog, schema) = (table_name[0].clone(), table_name[1].clone()); let ctx = Arc::new(QueryContext::with(&catalog, &schema)); - let (is_ts_placeholder, proto_schema) = - self.try_fetch_or_create_table(&table_name).await?; + let (is_ts_placeholder, proto_schema) = self + .try_fetch_existing_table(&table_name) + .await? + .context(UnexpectedSnafu { + reason: format!("Table not found: {}", table_name.join(".")), + })?; let schema_len = proto_schema.len(); + let total_rows = reqs.iter().map(|r| r.len()).sum::(); trace!( "Sending {} writeback requests to table {}, reqs total rows={}", reqs.len(), table_name.join("."), reqs.iter().map(|r| r.len()).sum::() ); + + METRIC_FLOW_ROWS + .with_label_values(&["out"]) + .inc_by(total_rows as u64); + let now = self.tick_manager.tick(); for req in reqs { match req { @@ -390,14 +402,12 @@ impl FlowWorkerManager { Ok(output) } - /// Fetch table info or create table from flow's schema if not exist - async fn try_fetch_or_create_table( + /// Fetch table schema and primary key from table info source, if table not exist return None + async fn fetch_table_pk_schema( &self, table_name: &TableName, - ) -> Result<(bool, Vec), Error> { - // TODO(discord9): instead of auto build table from request schema, actually build table - // before `create flow` to be able to assign pk and ts etc. - let (primary_keys, schema, is_ts_placeholder) = if let Some(table_id) = self + ) -> Result, Option, Vec)>, Error> { + if let Some(table_id) = self .table_info_source .get_table_id_from_name(table_name) .await? @@ -414,97 +424,64 @@ impl FlowWorkerManager { .map(|i| meta.schema.column_schemas[i].name.clone()) .collect_vec(); let schema = meta.schema.column_schemas; - // check if the last column is the auto created timestamp column, hence the table is auto created from - // flow's plan type - let is_auto_create = { - let correct_name = schema - .last() - .map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL) - .unwrap_or(false); - let correct_time_index = meta.schema.timestamp_index == Some(schema.len() - 1); - correct_name && correct_time_index - }; - (primary_keys, schema, is_auto_create) + let time_index = meta.schema.timestamp_index; + Ok(Some((primary_keys, time_index, schema))) } else { - // TODO(discord9): condiser remove buggy auto create by schema + Ok(None) + } + } - let node_ctx = self.node_context.read().await; - let gid: GlobalId = node_ctx - .table_repr - .get_by_name(table_name) - .map(|x| x.1) - .unwrap(); - let schema = node_ctx - .schema - .get(&gid) - .with_context(|| TableNotFoundSnafu { - name: format!("Table name = {:?}", table_name), - })? - .clone(); - // TODO(discord9): use default key from schema - let primary_keys = schema - .typ() - .keys - .first() - .map(|v| { - v.column_indices - .iter() - .map(|i| { - schema - .get_name(*i) - .clone() - .unwrap_or_else(|| format!("col_{i}")) - }) - .collect_vec() - }) - .unwrap_or_default(); - let update_at = ColumnSchema::new( - UPDATE_AT_TS_COL, - ConcreteDataType::timestamp_millisecond_datatype(), - true, - ); + /// return (primary keys, schema and if the table have a placeholder timestamp column) + /// schema of the table comes from flow's output plan + /// + /// adjust to add `update_at` column and ts placeholder if needed + async fn adjust_auto_created_table_schema( + &self, + schema: &RelationDesc, + ) -> Result<(Vec, Vec, bool), Error> { + // TODO(discord9): condiser remove buggy auto create by schema + + // TODO(discord9): use default key from schema + let primary_keys = schema + .typ() + .keys + .first() + .map(|v| { + v.column_indices + .iter() + .map(|i| { + schema + .get_name(*i) + .clone() + .unwrap_or_else(|| format!("col_{i}")) + }) + .collect_vec() + }) + .unwrap_or_default(); + let update_at = ColumnSchema::new( + UPDATE_AT_TS_COL, + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ); - let original_schema = schema - .typ() - .column_types - .clone() - .into_iter() - .enumerate() - .map(|(idx, typ)| { - let name = schema - .names - .get(idx) - .cloned() - .flatten() - .unwrap_or(format!("col_{}", idx)); - let ret = ColumnSchema::new(name, typ.scalar_type, typ.nullable); - if schema.typ().time_index == Some(idx) { - ret.with_time_index(true) - } else { - ret - } - }) - .collect_vec(); + let original_schema = relation_desc_to_column_schemas_with_fallback(schema); - let mut with_auto_added_col = original_schema.clone(); - with_auto_added_col.push(update_at); + let mut with_auto_added_col = original_schema.clone(); + with_auto_added_col.push(update_at); - // if no time index, add one as placeholder - let no_time_index = schema.typ().time_index.is_none(); - if no_time_index { - let ts_col = ColumnSchema::new( - AUTO_CREATED_PLACEHOLDER_TS_COL, - ConcreteDataType::timestamp_millisecond_datatype(), - true, - ) - .with_time_index(true); - with_auto_added_col.push(ts_col); - } + // if no time index, add one as placeholder + let no_time_index = schema.typ().time_index.is_none(); + if no_time_index { + let ts_col = ColumnSchema::new( + AUTO_CREATED_PLACEHOLDER_TS_COL, + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ) + .with_time_index(true); + with_auto_added_col.push(ts_col); + } - (primary_keys, with_auto_added_col, no_time_index) - }; - let proto_schema = column_schemas_to_proto(schema, &primary_keys)?; - Ok((is_ts_placeholder, proto_schema)) + Ok((primary_keys, with_auto_added_col, no_time_index)) } } @@ -807,7 +784,85 @@ impl FlowWorkerManager { let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?; debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan); - node_ctx.assign_table_schema(&sink_table_name, flow_plan.schema.clone())?; + + // check schema against actual table schema if exists + // if not exist create sink table immediately + if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? { + let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema); + + // for column schema, only `data_type` need to be check for equality + // since one can omit flow's column name when write flow query + // print a user friendly error message about mismatch and how to correct them + for (idx, zipped) in auto_schema + .iter() + .zip_longest(real_schema.iter()) + .enumerate() + { + match zipped { + EitherOrBoth::Both(auto, real) => { + if auto.data_type != real.data_type { + InvalidQuerySnafu { + reason: format!( + "Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}", + idx, + real.name, + auto.name, + real.data_type, + auto.data_type + ), + } + .fail()?; + } + } + EitherOrBoth::Right(real) if real.data_type.is_timestamp() => { + // if table is auto created, the last one or two column should be timestamp(update at and ts placeholder) + continue; + } + _ => InvalidQuerySnafu { + reason: format!( + "schema length mismatched, expected {} found {}", + real_schema.len(), + auto_schema.len() + ), + } + .fail()?, + } + } + + let table_id = self + .table_info_source + .get_table_id_from_name(&sink_table_name) + .await? + .context(UnexpectedSnafu { + reason: format!("Can't get table id for table name {:?}", sink_table_name), + })?; + let table_info_value = self + .table_info_source + .get_table_info_value(&table_id) + .await? + .context(UnexpectedSnafu { + reason: format!("Can't get table info value for table id {:?}", table_id), + })?; + let real_schema = table_info_value_to_relation_desc(table_info_value)?; + node_ctx.assign_table_schema(&sink_table_name, real_schema.clone())?; + } else { + // assign inferred schema to sink table + // create sink table + node_ctx.assign_table_schema(&sink_table_name, flow_plan.schema.clone())?; + let did_create = self + .create_table_from_relation( + &format!("flow-id={flow_id}"), + &sink_table_name, + &flow_plan.schema, + ) + .await?; + if !did_create { + UnexpectedSnafu { + reason: format!("Failed to create table {:?}", sink_table_name), + } + .fail()?; + } + } let _ = comment; let _ = flow_options; diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 1fa11b4d83a2..1f431ff92f64 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -138,7 +138,7 @@ impl Flownode for FlowWorkerManager { } async fn handle_inserts(&self, request: InsertRequests) -> Result { - // using try_read makesure two things: + // using try_read to ensure two things: // 1. flush wouldn't happen until inserts before it is inserted // 2. inserts happening concurrently with flush wouldn't be block by flush let _flush_lock = self.flush_lock.try_read(); diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 990fdd129797..5c644803ec71 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -331,12 +331,14 @@ impl FlownodeContext { } else { let global_id = self.new_global_id(); + // table id is Some meaning db must have created the table if let Some(table_id) = table_id { let (known_table_name, schema) = srv_map.get_table_name_schema(&table_id).await?; table_name = table_name.or(Some(known_table_name)); self.schema.insert(global_id, schema); } // if we don't have table id, it means database havn't assign one yet or we don't need it + // still update the mapping with new global id self.table_repr.insert(table_name, table_id, global_id); Ok(global_id) } @@ -358,6 +360,7 @@ impl FlownodeContext { })?; self.schema.insert(gid, schema); + Ok(()) } diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs index 0454ab16b1d3..7981999f0abc 100644 --- a/src/flow/src/adapter/table_source.rs +++ b/src/flow/src/adapter/table_source.rs @@ -20,11 +20,12 @@ use common_meta::key::table_name::{TableNameKey, TableNameManager}; use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; +use crate::adapter::util::table_info_value_to_relation_desc; use crate::adapter::TableName; use crate::error::{ Error, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu, }; -use crate::repr::{self, ColumnType, RelationDesc, RelationType}; +use crate::repr::RelationDesc; /// mapping of table name <-> table id should be query from tableinfo manager pub struct TableSource { @@ -121,38 +122,7 @@ impl TableSource { table_name.table_name, ]; - let raw_schema = table_info_value.table_info.meta.schema; - let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema - .column_schemas - .clone() - .into_iter() - .map(|col| { - ( - ColumnType { - nullable: col.is_nullable(), - scalar_type: col.data_type, - }, - Some(col.name), - ) - }) - .unzip(); - - let key = table_info_value.table_info.meta.primary_key_indices; - let keys = vec![repr::Key::from(key)]; - - let time_index = raw_schema.timestamp_index; - Ok(( - table_name, - RelationDesc { - typ: RelationType { - column_types, - keys, - time_index, - // by default table schema's column are all non-auto - auto_columns: vec![], - }, - names: col_names, - }, - )) + let desc = table_info_value_to_relation_desc(table_info_value)?; + Ok((table_name, desc)) } } diff --git a/src/flow/src/adapter/util.rs b/src/flow/src/adapter/util.rs index f2a29bec8e9e..f5d2968543f9 100644 --- a/src/flow/src/adapter/util.rs +++ b/src/flow/src/adapter/util.rs @@ -12,16 +12,153 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use api::helper::ColumnDataTypeWrapper; use api::v1::column_def::options_from_column_schema; -use api::v1::{ColumnDataType, ColumnDataTypeExtension, SemanticType}; +use api::v1::{ColumnDataType, ColumnDataTypeExtension, CreateTableExpr, SemanticType}; use common_error::ext::BoxedError; +use common_meta::key::table_info::TableInfoValue; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use itertools::Itertools; -use snafu::ResultExt; +use operator::expr_factory::CreateExprFactory; +use session::context::QueryContextBuilder; +use snafu::{OptionExt, ResultExt}; +use table::table_reference::TableReference; + +use crate::adapter::{TableName, AUTO_CREATED_PLACEHOLDER_TS_COL}; +use crate::error::{Error, ExternalSnafu, UnexpectedSnafu}; +use crate::repr::{ColumnType, RelationDesc, RelationType}; +use crate::FlowWorkerManager; + +impl FlowWorkerManager { + /// Create table from given schema(will adjust to add auto column if needed), return true if table is created + pub(crate) async fn create_table_from_relation( + &self, + flow_name: &str, + table_name: &TableName, + relation_desc: &RelationDesc, + ) -> Result { + if self.fetch_table_pk_schema(table_name).await?.is_some() { + return Ok(false); + } + let (pks, tys, _) = self.adjust_auto_created_table_schema(relation_desc).await?; + + //create sink table using pks, column types and is_ts_auto + + let proto_schema = column_schemas_to_proto(tys.clone(), &pks)?; + + // create sink table + let create_expr = CreateExprFactory {} + .create_table_expr_by_column_schemas( + &TableReference { + catalog: &table_name[0], + schema: &table_name[1], + table: &table_name[2], + }, + &proto_schema, + "mito", + Some(&format!("Sink table for flow {}", flow_name)), + ) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; -use crate::error::{Error, ExternalSnafu}; + self.submit_create_sink_table_ddl(create_expr).await?; + Ok(true) + } + + /// Try fetch table with adjusted schema(added auto column if needed) + pub(crate) async fn try_fetch_existing_table( + &self, + table_name: &TableName, + ) -> Result)>, Error> { + if let Some((primary_keys, time_index, schema)) = + self.fetch_table_pk_schema(table_name).await? + { + // check if the last column is the auto created timestamp column, hence the table is auto created from + // flow's plan type + let is_auto_create = { + let correct_name = schema + .last() + .map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL) + .unwrap_or(false); + let correct_time_index = time_index == Some(schema.len() - 1); + correct_name && correct_time_index + }; + let proto_schema = column_schemas_to_proto(schema, &primary_keys)?; + Ok(Some((is_auto_create, proto_schema))) + } else { + Ok(None) + } + } + + /// submit a create table ddl + pub(crate) async fn submit_create_sink_table_ddl( + &self, + mut create_table: CreateTableExpr, + ) -> Result<(), Error> { + let stmt_exec = { + self.frontend_invoker + .read() + .await + .as_ref() + .map(|f| f.statement_executor()) + } + .context(UnexpectedSnafu { + reason: "Failed to get statement executor", + })?; + let ctx = Arc::new( + QueryContextBuilder::default() + .current_catalog(create_table.catalog_name.clone()) + .current_schema(create_table.schema_name.clone()) + .build(), + ); + stmt_exec + .create_table_inner(&mut create_table, None, ctx) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + Ok(()) + } +} + +pub fn table_info_value_to_relation_desc( + table_info_value: TableInfoValue, +) -> Result { + let raw_schema = table_info_value.table_info.meta.schema; + let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema + .column_schemas + .clone() + .into_iter() + .map(|col| { + ( + ColumnType { + nullable: col.is_nullable(), + scalar_type: col.data_type, + }, + Some(col.name), + ) + }) + .unzip(); + + let key = table_info_value.table_info.meta.primary_key_indices; + let keys = vec![crate::repr::Key::from(key)]; + + let time_index = raw_schema.timestamp_index; + + Ok(RelationDesc { + typ: RelationType { + column_types, + keys, + time_index, + // by default table schema's column are all non-auto + auto_columns: vec![], + }, + names: col_names, + }) +} pub fn from_proto_to_data_type( column_schema: &api::v1::ColumnSchema, @@ -75,3 +212,29 @@ pub fn column_schemas_to_proto( .collect(); Ok(ret) } + +/// Convert `RelationDesc` to `ColumnSchema` list, +/// if the column name is not present, use `col_{idx}` as the column name +pub fn relation_desc_to_column_schemas_with_fallback(schema: &RelationDesc) -> Vec { + schema + .typ() + .column_types + .clone() + .into_iter() + .enumerate() + .map(|(idx, typ)| { + let name = schema + .names + .get(idx) + .cloned() + .flatten() + .unwrap_or(format!("col_{}", idx)); + let ret = ColumnSchema::new(name, typ.scalar_type, typ.nullable); + if schema.typ().time_index == Some(idx) { + ret.with_time_index(true) + } else { + ret + } + }) + .collect_vec() +} diff --git a/src/flow/src/compute/types.rs b/src/flow/src/compute/types.rs index e125a2d27261..b92ec30f9f6a 100644 --- a/src/flow/src/compute/types.rs +++ b/src/flow/src/compute/types.rs @@ -17,6 +17,7 @@ use std::collections::{BTreeMap, VecDeque}; use std::rc::Rc; use std::sync::Arc; +use common_error::ext::ErrorExt; use hydroflow::scheduled::graph::Hydroflow; use hydroflow::scheduled::handoff::TeeingHandoff; use hydroflow::scheduled::port::RecvPort; @@ -25,6 +26,7 @@ use itertools::Itertools; use tokio::sync::Mutex; use crate::expr::{Batch, EvalError, ScalarExpr}; +use crate::metrics::METRIC_FLOW_ERRORS; use crate::repr::DiffRow; use crate::utils::ArrangeHandler; @@ -185,6 +187,9 @@ impl ErrCollector { } pub fn push_err(&self, err: EvalError) { + METRIC_FLOW_ERRORS + .with_label_values(&[err.status_code().as_ref()]) + .inc(); self.inner.blocking_lock().push_back(err) } diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index a94de4b6ed7b..137e024307f9 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -16,12 +16,13 @@ use std::any::Any; -use common_error::define_into_tonic_status; use common_error::ext::BoxedError; +use common_error::{define_into_tonic_status, from_err_code_msg_to_header}; use common_macro::stack_trace_debug; use common_telemetry::common_error::ext::ErrorExt; use common_telemetry::common_error::status_code::StatusCode; use snafu::{Location, Snafu}; +use tonic::metadata::MetadataMap; use crate::adapter::FlowId; use crate::expr::EvalError; @@ -186,6 +187,20 @@ pub enum Error { }, } +/// the outer message is the full error stack, and inner message in header is the last error message that can be show directly to user +pub fn to_status_with_last_err(err: impl ErrorExt) -> tonic::Status { + let msg = err.to_string(); + let last_err_msg = common_error::ext::StackError::last(&err).to_string(); + let code = err.status_code() as u32; + let header = from_err_code_msg_to_header(code, &last_err_msg); + + tonic::Status::with_metadata( + tonic::Code::InvalidArgument, + msg, + MetadataMap::from_headers(header), + ) +} + /// Result type for flow module pub type Result = std::result::Result; @@ -200,9 +215,8 @@ impl ErrorExt for Error { | Self::TableNotFoundMeta { .. } | Self::FlowNotFound { .. } | Self::ListFlows { .. } => StatusCode::TableNotFound, - Self::InvalidQuery { .. } | Self::Plan { .. } | Self::Datatypes { .. } => { - StatusCode::PlanQuery - } + Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery, + Self::InvalidQuery { .. } => StatusCode::EngineExecuteQuery, Self::Unexpected { .. } => StatusCode::Unexpected, Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => { StatusCode::Unsupported diff --git a/src/flow/src/expr/error.rs b/src/flow/src/expr/error.rs index 992d5c592125..b29098b9d8fa 100644 --- a/src/flow/src/expr/error.rs +++ b/src/flow/src/expr/error.rs @@ -14,8 +14,11 @@ //! Error handling for expression evaluation. +use std::any::Any; + use arrow_schema::ArrowError; -use common_error::ext::BoxedError; +use common_error::ext::{BoxedError, ErrorExt}; +use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use datafusion_common::DataFusionError; use datatypes::data_type::ConcreteDataType; @@ -126,3 +129,29 @@ pub enum EvalError { source: BoxedError, }, } + +impl ErrorExt for EvalError { + fn status_code(&self) -> StatusCode { + use EvalError::*; + match self { + DivisionByZero { .. } + | TypeMismatch { .. } + | TryFromValue { .. } + | DataAlreadyExpired { .. } + | InvalidArgument { .. } + | Overflow { .. } => StatusCode::InvalidArguments, + + CastValue { source, .. } | DataType { source, .. } => source.status_code(), + + Internal { .. } + | Optimize { .. } + | Arrow { .. } + | Datafusion { .. } + | External { .. } => StatusCode::Internal, + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/flow/src/metrics.rs b/src/flow/src/metrics.rs index 119b4c5856de..f165bcadc6a4 100644 --- a/src/flow/src/metrics.rs +++ b/src/flow/src/metrics.rs @@ -30,4 +30,22 @@ lazy_static! { .unwrap(); pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge = register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap(); + pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!( + "greptime_flow_processed_rows", + "Count of rows flowing through the system", + &["direction"] + ) + .unwrap(); + pub static ref METRIC_FLOW_PROCESSING_TIME: HistogramVec = register_histogram_vec!( + "greptime_flow_processing_time", + "Time spent processing requests", + &["type"] + ) + .unwrap(); + pub static ref METRIC_FLOW_ERRORS: IntCounterVec = register_int_counter_vec!( + "greptime_flow_errors", + "Count of errors in flow processing", + &["code"] + ) + .unwrap(); } diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs index d0fbb861eb24..168b5df7d0e2 100644 --- a/src/flow/src/repr/relation.rs +++ b/src/flow/src/repr/relation.rs @@ -212,6 +212,8 @@ impl RelationType { for key in &mut self.keys { key.remove_col(time_index.unwrap_or(usize::MAX)); } + // remove empty keys + self.keys.retain(|key| !key.is_empty()); self } diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 1259c1175510..d22ba220441b 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -50,10 +50,11 @@ use tonic::{Request, Response, Status}; use crate::adapter::{CreateFlowArgs, FlowWorkerManagerRef}; use crate::error::{ - CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu, ParseAddrSnafu, - ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, + to_status_with_last_err, CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu, + ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, }; use crate::heartbeat::HeartbeatTask; +use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS}; use crate::transform::register_function_to_query_engine; use crate::utils::{SizeReportSender, StateReportHandler}; use crate::{Error, FlowWorkerManager, FlownodeOptions}; @@ -77,41 +78,52 @@ impl flow_server::Flow for FlowService { &self, request: Request, ) -> Result, Status> { + let _timer = METRIC_FLOW_PROCESSING_TIME + .with_label_values(&["ddl"]) + .start_timer(); + let request = request.into_inner(); self.manager .handle(request) .await .map(Response::new) - .map_err(|e| { - let msg = format!("failed to handle request: {:?}", e); - Status::internal(msg) - }) + .map_err(to_status_with_last_err) } async fn handle_mirror_request( &self, request: Request, ) -> Result, Status> { + let _timer = METRIC_FLOW_PROCESSING_TIME + .with_label_values(&["insert"]) + .start_timer(); + let request = request.into_inner(); // TODO(discord9): fix protobuf import order shenanigans to remove this duplicated define + let mut row_count = 0; let request = api::v1::region::InsertRequests { requests: request .requests .into_iter() - .map(|insert| api::v1::region::InsertRequest { - region_id: insert.region_id, - rows: insert.rows, + .map(|insert| { + insert.rows.as_ref().inspect(|x| row_count += x.rows.len()); + api::v1::region::InsertRequest { + region_id: insert.region_id, + rows: insert.rows, + } }) .collect_vec(), }; + + METRIC_FLOW_ROWS + .with_label_values(&["in"]) + .inc_by(row_count as u64); + self.manager .handle_inserts(request) .await .map(Response::new) - .map_err(|e| { - let msg = format!("failed to handle request: {:?}", e); - Status::internal(msg) - }) + .map_err(to_status_with_last_err) } } @@ -500,6 +512,10 @@ impl FrontendInvoker { requests: RowInsertRequests, ctx: QueryContextRef, ) -> common_frontend::error::Result { + let _timer = METRIC_FLOW_PROCESSING_TIME + .with_label_values(&["output_insert"]) + .start_timer(); + self.inserter .handle_row_inserts(requests, ctx, &self.statement_executor) .await @@ -512,10 +528,18 @@ impl FrontendInvoker { requests: RowDeleteRequests, ctx: QueryContextRef, ) -> common_frontend::error::Result { + let _timer = METRIC_FLOW_PROCESSING_TIME + .with_label_values(&["output_delete"]) + .start_timer(); + self.deleter .handle_row_deletes(requests, ctx) .await .map_err(BoxedError::new) .context(common_frontend::error::ExternalSnafu) } + + pub fn statement_executor(&self) -> Arc { + self.statement_executor.clone() + } } diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index 2d0a6f66dab8..b944e3b263e3 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -216,6 +216,7 @@ impl KeyValPlan { /// find out the column that should be time index in group exprs(which is all columns that should be keys) /// TODO(discord9): better ways to assign time index +/// for now, it will found the first column that is timestamp or has a tumble window floor function fn find_time_index_in_group_exprs(group_exprs: &[TypedExpr]) -> Option { group_exprs.iter().position(|expr| { matches!( @@ -224,7 +225,7 @@ fn find_time_index_in_group_exprs(group_exprs: &[TypedExpr]) -> Option { func: UnaryFunc::TumbleWindowFloor { .. }, expr: _ } - ) + ) || expr.typ.scalar_type.is_timestamp() }) } @@ -1482,7 +1483,7 @@ mod test { ColumnType::new(CDT::float64_datatype(), true), ColumnType::new(CDT::timestamp_millisecond_datatype(), true), ]) - .with_key(vec![1]) + .with_time_index(Some(1)) .into_named(vec![ Some( "MAX(numbers_with_ts.number) - MIN(numbers_with_ts.number) / Float64(30)" @@ -1571,7 +1572,7 @@ mod test { ColumnType::new(ConcreteDataType::uint32_datatype(), true), // max ColumnType::new(ConcreteDataType::uint32_datatype(), true), // min ]) - .with_key(vec![0]) + .with_time_index(Some(0)) .into_unnamed(), ), ), diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index a8b94d5abdbc..0fef8bd38b94 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -307,7 +307,7 @@ impl<'a> IndexerBuilder<'a> { if cfg!(any(test, feature = "test")) { panic!( - "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {}", + "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}", self.metadata.region_id, self.file_id, err ); } else { @@ -357,7 +357,7 @@ impl<'a> IndexerBuilder<'a> { if cfg!(any(test, feature = "test")) { panic!( - "Failed to create bloom filter, region_id: {}, file_id: {}, err: {}", + "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}", self.metadata.region_id, self.file_id, err ); } else { @@ -452,7 +452,7 @@ mod tests { ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false) .with_skipping_options(SkippingIndexOptions { granularity: 42, - index_type: SkipIndexType::BloomFilter, + index_type: SkippingIndexType::BloomFilter, }) .unwrap(); diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs index 1517a38a2efc..8c56800f47e7 100644 --- a/src/mito2/src/sst/index/bloom_filter/creator.rs +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -17,7 +17,7 @@ use std::sync::atomic::AtomicUsize; use std::sync::Arc; use common_telemetry::warn; -use datatypes::schema::SkipIndexType; +use datatypes::schema::SkippingIndexType; use index::bloom_filter::creator::BloomFilterCreator; use puffin::puffin_manager::{PuffinWriter, PutOptions}; use snafu::{ensure, ResultExt}; @@ -91,7 +91,7 @@ impl BloomFilterIndexer { })?; let options = match options { - Some(options) if options.index_type == SkipIndexType::BloomFilter => options, + Some(options) if options.index_type == SkippingIndexType::BloomFilter => options, _ => continue, }; @@ -135,7 +135,7 @@ impl BloomFilterIndexer { // clean up garbage if failed to update if let Err(err) = self.do_cleanup().await { if cfg!(any(test, feature = "test")) { - panic!("Failed to clean up index creator, err: {err}",); + panic!("Failed to clean up index creator, err: {err:?}",); } else { warn!(err; "Failed to clean up index creator"); } @@ -165,7 +165,7 @@ impl BloomFilterIndexer { // clean up garbage no matter finish successfully or not if let Err(err) = self.do_cleanup().await { if cfg!(any(test, feature = "test")) { - panic!("Failed to clean up index creator, err: {err}",); + panic!("Failed to clean up index creator, err: {err:?}",); } else { warn!(err; "Failed to clean up index creator"); } @@ -375,7 +375,7 @@ mod tests { false, ) .with_skipping_options(SkippingIndexOptions { - index_type: SkipIndexType::BloomFilter, + index_type: SkippingIndexType::BloomFilter, granularity: 2, }) .unwrap(), @@ -398,7 +398,7 @@ mod tests { false, ) .with_skipping_options(SkippingIndexOptions { - index_type: SkipIndexType::BloomFilter, + index_type: SkippingIndexType::BloomFilter, granularity: 4, }) .unwrap(), diff --git a/src/mito2/src/sst/index/indexer/abort.rs b/src/mito2/src/sst/index/indexer/abort.rs index 4a49bc4ee84c..5b29009a033b 100644 --- a/src/mito2/src/sst/index/indexer/abort.rs +++ b/src/mito2/src/sst/index/indexer/abort.rs @@ -34,7 +34,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to abort inverted index, region_id: {}, file_id: {}, err: {}", + "Failed to abort inverted index, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -55,7 +55,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to abort full-text index, region_id: {}, file_id: {}, err: {}", + "Failed to abort full-text index, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -76,7 +76,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to abort bloom filter, region_id: {}, file_id: {}, err: {}", + "Failed to abort bloom filter, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { diff --git a/src/mito2/src/sst/index/indexer/finish.rs b/src/mito2/src/sst/index/indexer/finish.rs index 74c693f6a5b4..025eead758ff 100644 --- a/src/mito2/src/sst/index/indexer/finish.rs +++ b/src/mito2/src/sst/index/indexer/finish.rs @@ -69,7 +69,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to create puffin writer, region_id: {}, file_id: {}, err: {}", + "Failed to create puffin writer, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -90,7 +90,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to finish puffin writer, region_id: {}, file_id: {}, err: {}", + "Failed to finish puffin writer, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -128,7 +128,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to finish inverted index, region_id: {}, file_id: {}, err: {}", + "Failed to finish inverted index, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -165,7 +165,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to finish full-text index, region_id: {}, file_id: {}, err: {}", + "Failed to finish full-text index, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -202,7 +202,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to finish bloom filter, region_id: {}, file_id: {}, err: {}", + "Failed to finish bloom filter, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { diff --git a/src/mito2/src/sst/index/indexer/update.rs b/src/mito2/src/sst/index/indexer/update.rs index e93762a6322f..c2ab33f0e13a 100644 --- a/src/mito2/src/sst/index/indexer/update.rs +++ b/src/mito2/src/sst/index/indexer/update.rs @@ -46,7 +46,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to update inverted index, region_id: {}, file_id: {}, err: {}", + "Failed to update inverted index, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -71,7 +71,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to update full-text index, region_id: {}, file_id: {}, err: {}", + "Failed to update full-text index, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -96,7 +96,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to update bloom filter, region_id: {}, file_id: {}, err: {}", + "Failed to update bloom filter, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index 59fe87a66ec3..bc50eff161b5 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -68,6 +68,7 @@ impl CreateExprFactory { table_name: &TableReference<'_>, column_schemas: &[api::v1::ColumnSchema], engine: &str, + desc: Option<&str>, ) -> Result { let column_exprs = ColumnExpr::from_column_schemas(column_schemas); let create_expr = common_grpc_expr::util::build_create_table_expr( @@ -75,7 +76,7 @@ impl CreateExprFactory { table_name, column_exprs, engine, - "Created on insertion", + desc.unwrap_or("Created on insertion"), ) .context(BuildCreateExprOnInsertionSnafu)?; diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 466dde5425c1..6b7702f25b8b 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -870,5 +870,5 @@ fn build_create_table_expr( request_schema: &[ColumnSchema], engine: &str, ) -> Result { - CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, engine) + CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, engine, None) } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 74adaffd5ea8..33831ba639bb 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -56,7 +56,7 @@ futures-util.workspace = true hashbrown = "0.14" headers = "0.3" hostname = "0.3" -http = "0.2" +http.workspace = true http-body = "0.4" humantime.workspace = true humantime-serde.workspace = true diff --git a/src/servers/src/http/result/error_result.rs b/src/servers/src/http/result/error_result.rs index be5f01f9e950..3c7c718e5ba4 100644 --- a/src/servers/src/http/result/error_result.rs +++ b/src/servers/src/http/result/error_result.rs @@ -12,17 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use axum::http::HeaderValue; use axum::response::{IntoResponse, Response}; use axum::Json; use common_error::ext::ErrorExt; +use common_error::from_err_code_msg_to_header; use common_error::status_code::StatusCode; use common_telemetry::{debug, error}; use serde::{Deserialize, Serialize}; use crate::error::status_code_to_http_status; -use crate::http::header::constants::GREPTIME_DB_HEADER_ERROR_CODE; -use crate::http::header::GREPTIME_DB_HEADER_EXECUTION_TIME; #[derive(Serialize, Deserialize, Debug)] pub struct ErrorResponse { @@ -74,13 +72,16 @@ impl IntoResponse for ErrorResponse { fn into_response(self) -> Response { let code = self.code; let execution_time = self.execution_time_ms; - let mut resp = Json(self).into_response(); - resp.headers_mut() - .insert(GREPTIME_DB_HEADER_ERROR_CODE, HeaderValue::from(code)); - resp.headers_mut().insert( - &GREPTIME_DB_HEADER_EXECUTION_TIME, - HeaderValue::from(execution_time), + let new_header = from_err_code_msg_to_header( + code, + &format!( + "error: {}, execution_time_ms: {}", + self.error, execution_time + ), ); + let mut resp = Json(self).into_response(); + resp.headers_mut().extend(new_header); + let status = StatusCode::from_u32(code).unwrap_or(StatusCode::Unknown); let status_code = status_code_to_http_status(&status); diff --git a/tests/cases/standalone/common/flow/flow_auto_sink_table.result b/tests/cases/standalone/common/flow/flow_auto_sink_table.result new file mode 100644 index 000000000000..e8bd5cf739f4 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_auto_sink_table.result @@ -0,0 +1,161 @@ +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(number) +FROM + numbers_input_basic +GROUP BY + tumble(ts, '1 second', '2021-07-01 00:00:00'); + +Affected Rows: 0 + +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "SUM(numbers_input_basic.number)" BIGINT NULL, | +| | "window_start" TIMESTAMP(3) NOT NULL, | +| | "window_end" TIMESTAMP(3) NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("window_start"), | +| | PRIMARY KEY ("window_end") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + ++----------------------------------------+ +| ADMIN FLUSH_FLOW('test_numbers_basic') | ++----------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------+ + +-- SQLNESS ARG restart=true +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "SUM(numbers_input_basic.number)" BIGINT NULL, | +| | "window_start" TIMESTAMP(3) NOT NULL, | +| | "window_end" TIMESTAMP(3) NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("window_start"), | +| | PRIMARY KEY ("window_end") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + +SHOW CREATE FLOW test_numbers_basic; + ++--------------------+-------------------------------------------------------------------------------------------------------+ +| Flow | Create Flow | ++--------------------+-------------------------------------------------------------------------------------------------------+ +| test_numbers_basic | CREATE FLOW IF NOT EXISTS test_numbers_basic | +| | SINK TO out_num_cnt_basic | +| | AS SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00') | ++--------------------+-------------------------------------------------------------------------------------------------------+ + +DROP FLOW test_numbers_basic; + +Affected Rows: 0 + +DROP TABLE numbers_input_basic; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_basic; + +Affected Rows: 0 + +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(number) as sumup, ts as event_time +FROM + numbers_input_basic +GROUP BY + ts; + +Affected Rows: 0 + +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "sumup" BIGINT NULL, | +| | "event_time" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("event_time") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + +-- SQLNESS ARG restart=true +SHOW CREATE FLOW test_numbers_basic; + ++--------------------+---------------------------------------------------------------------------------------+ +| Flow | Create Flow | ++--------------------+---------------------------------------------------------------------------------------+ +| test_numbers_basic | CREATE FLOW IF NOT EXISTS test_numbers_basic | +| | SINK TO out_num_cnt_basic | +| | AS SELECT sum(number) AS sumup, ts AS event_time FROM numbers_input_basic GROUP BY ts | ++--------------------+---------------------------------------------------------------------------------------+ + +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "sumup" BIGINT NULL, | +| | "event_time" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("event_time") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + +DROP FLOW test_numbers_basic; + +Affected Rows: 0 + +DROP TABLE numbers_input_basic; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_basic; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_auto_sink_table.sql b/tests/cases/standalone/common/flow/flow_auto_sink_table.sql new file mode 100644 index 000000000000..0af723770ced --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_auto_sink_table.sql @@ -0,0 +1,58 @@ +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(number) +FROM + numbers_input_basic +GROUP BY + tumble(ts, '1 second', '2021-07-01 00:00:00'); + +SHOW CREATE TABLE out_num_cnt_basic; + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + +-- SQLNESS ARG restart=true +SHOW CREATE TABLE out_num_cnt_basic; + +SHOW CREATE FLOW test_numbers_basic; + +DROP FLOW test_numbers_basic; + +DROP TABLE numbers_input_basic; + +DROP TABLE out_num_cnt_basic; + +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(number) as sumup, ts as event_time +FROM + numbers_input_basic +GROUP BY + ts; + +SHOW CREATE TABLE out_num_cnt_basic; + +-- SQLNESS ARG restart=true +SHOW CREATE FLOW test_numbers_basic; + +SHOW CREATE TABLE out_num_cnt_basic; + +DROP FLOW test_numbers_basic; + +DROP TABLE numbers_input_basic; + +DROP TABLE out_num_cnt_basic; diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result index c70fe54fec19..e17efd74be40 100644 --- a/tests/cases/standalone/common/flow/flow_basic.result +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -17,6 +17,24 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "SUM(numbers_input_basic.number)" BIGINT NULL, | +| | "window_start" TIMESTAMP(3) NOT NULL, | +| | "window_end" TIMESTAMP(3) NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("window_start"), | +| | PRIMARY KEY ("window_end") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | @@ -28,6 +46,24 @@ ADMIN FLUSH_FLOW('test_numbers_basic'); | FLOW_FLUSHED | +----------------------------------------+ +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "SUM(numbers_input_basic.number)" BIGINT NULL, | +| | "window_start" TIMESTAMP(3) NOT NULL, | +| | "window_end" TIMESTAMP(3) NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("window_start"), | +| | PRIMARY KEY ("window_end") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + -- SQLNESS ARG restart=true INSERT INTO numbers_input_basic @@ -130,6 +166,22 @@ FROM Affected Rows: 0 +SHOW CREATE TABLE out_basic; + ++-----------+---------------------------------------------+ +| Table | Create Table | ++-----------+---------------------------------------------+ +| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( | +| | "wildcard" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-----------+---------------------------------------------+ + DROP FLOW test_wildcard_basic; Affected Rows: 0 @@ -142,6 +194,23 @@ FROM Affected Rows: 0 +SHOW CREATE TABLE out_basic; + ++-----------+---------------------------------------------+ +| Table | Create Table | ++-----------+---------------------------------------------+ +| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( | +| | "wildcard" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-----------+---------------------------------------------+ + +-- SQLNESS ARG restart=true INSERT INTO input_basic VALUES @@ -159,6 +228,22 @@ ADMIN FLUSH_FLOW('test_wildcard_basic'); | FLOW_FLUSHED | +-----------------------------------------+ +SHOW CREATE TABLE out_basic; + ++-----------+---------------------------------------------+ +| Table | Create Table | ++-----------+---------------------------------------------+ +| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( | +| | "wildcard" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-----------+---------------------------------------------+ + SELECT wildcard FROM out_basic; +----------+ @@ -197,6 +282,23 @@ FROM Affected Rows: 0 +SHOW CREATE TABLE out_distinct_basic; + ++--------------------+---------------------------------------------------+ +| Table | Create Table | ++--------------------+---------------------------------------------------+ +| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( | +| | "dis" INT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder"), | +| | PRIMARY KEY ("dis") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++--------------------+---------------------------------------------------+ + -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | @@ -320,7 +422,9 @@ CREATE TABLE numbers_input_basic ( Affected Rows: 0 create table out_num_cnt_basic ( - number INT, + a INTERVAL, + b INTERVAL, + c INTERVAL, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX ); @@ -348,6 +452,23 @@ SHOW CREATE FLOW filter_numbers_basic; | | AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic WHERE number > 10 | +----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+ +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+-----------------------------------------------------------+ +| Table | Create Table | ++-------------------+-----------------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "a" INTERVAL NULL, | +| | "b" INTERVAL NULL, | +| | "c" INTERVAL NULL, | +| | "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(), | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+-----------------------------------------------------------+ + drop flow filter_numbers_basic; Affected Rows: 0 @@ -390,6 +511,22 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE approx_rate; + ++-------------+--------------------------------------------+ +| Table | Create Table | ++-------------+--------------------------------------------+ +| approx_rate | CREATE TABLE IF NOT EXISTS "approx_rate" ( | +| | "rate" DOUBLE NULL, | +| | "time_window" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+--------------------------------------------+ + INSERT INTO bytes_log VALUES @@ -542,6 +679,23 @@ FROM Affected Rows: 0 +SHOW CREATE TABLE ngx_country; + ++-------------+---------------------------------------------+ +| Table | Create Table | ++-------------+---------------------------------------------+ +| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | +| | "ngx_access_log.country" STRING NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder"), | +| | PRIMARY KEY ("ngx_access_log.country") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+---------------------------------------------+ + INSERT INTO ngx_access_log VALUES @@ -675,6 +829,23 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE ngx_country; + ++-------------+--------------------------------------------+ +| Table | Create Table | ++-------------+--------------------------------------------+ +| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | +| | "ngx_access_log.country" STRING NULL, | +| | "time_window" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window"), | +| | PRIMARY KEY ("ngx_access_log.country") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+--------------------------------------------+ + INSERT INTO ngx_access_log VALUES @@ -693,21 +864,20 @@ ADMIN FLUSH_FLOW('calc_ngx_country'); SHOW CREATE TABLE ngx_country; -+-------------+---------------------------------------------------------+ -| Table | Create Table | -+-------------+---------------------------------------------------------+ -| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | -| | "ngx_access_log.country" STRING NULL, | -| | "time_window" TIMESTAMP(3) NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("__ts_placeholder"), | -| | PRIMARY KEY ("ngx_access_log.country", "time_window") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+-------------+---------------------------------------------------------+ ++-------------+--------------------------------------------+ +| Table | Create Table | ++-------------+--------------------------------------------+ +| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | +| | "ngx_access_log.country" STRING NULL, | +| | "time_window" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window"), | +| | PRIMARY KEY ("ngx_access_log.country") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+--------------------------------------------+ SELECT "ngx_access_log.country", @@ -824,6 +994,23 @@ HAVING Affected Rows: 0 +SHOW CREATE TABLE temp_alerts; + ++-------------+--------------------------------------------+ +| Table | Create Table | ++-------------+--------------------------------------------+ +| temp_alerts | CREATE TABLE IF NOT EXISTS "temp_alerts" ( | +| | "sensor_id" INT NULL, | +| | "loc" STRING NULL, | +| | "max_temp" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+--------------------------------------------+ + INSERT INTO temp_sensor_data VALUES @@ -963,6 +1150,25 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE ngx_distribution; + ++------------------+-------------------------------------------------+ +| Table | Create Table | ++------------------+-------------------------------------------------+ +| ngx_distribution | CREATE TABLE IF NOT EXISTS "ngx_distribution" ( | +| | "stat" INT NULL, | +| | "bucket_size" INT NULL, | +| | "total_logs" BIGINT NULL, | +| | "time_window" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window"), | +| | PRIMARY KEY ("stat", "bucket_size") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++------------------+-------------------------------------------------+ + INSERT INTO ngx_access_log VALUES @@ -1070,6 +1276,23 @@ FROM Affected Rows: 0 +SHOW CREATE TABLE requests_without_ip; + ++---------------------+----------------------------------------------------+ +| Table | Create Table | ++---------------------+----------------------------------------------------+ +| requests_without_ip | CREATE TABLE IF NOT EXISTS "requests_without_ip" ( | +| | "service_name" STRING NULL, | +| | "val" INT NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("service_name") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++---------------------+----------------------------------------------------+ + INSERT INTO requests VALUES @@ -1269,6 +1492,25 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE android_log_abnormal; + ++----------------------+-----------------------------------------------------+ +| Table | Create Table | ++----------------------+-----------------------------------------------------+ +| android_log_abnormal | CREATE TABLE IF NOT EXISTS "android_log_abnormal" ( | +| | "crash" BIGINT NULL, | +| | "fatal" BIGINT NULL, | +| | "backtrace" BIGINT NULL, | +| | "anr" BIGINT NULL, | +| | "time_window" TIMESTAMP(9) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++----------------------+-----------------------------------------------------+ + INSERT INTO android_log values ("am_crash", "2021-07-01 00:01:01.000"), ("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); @@ -1361,6 +1603,25 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE android_log_abnormal; + ++----------------------+-----------------------------------------------------+ +| Table | Create Table | ++----------------------+-----------------------------------------------------+ +| android_log_abnormal | CREATE TABLE IF NOT EXISTS "android_log_abnormal" ( | +| | "crash" BIGINT NULL, | +| | "fatal" BIGINT NULL, | +| | "backtrace" BIGINT NULL, | +| | "anr" BIGINT NULL, | +| | "time_window" TIMESTAMP(9) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++----------------------+-----------------------------------------------------+ + INSERT INTO android_log values ("am_crash", "2021-07-01 00:01:01.000"), ("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); @@ -1419,3 +1680,210 @@ DROP TABLE android_log; Affected Rows: 0 +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(case when number > 10 then 1 else 0 end)/count(number) as avg_after_filter_num +FROM + numbers_input_basic; + +Affected Rows: 0 + +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "avg_after_filter_num" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + +-- TODO(discord9): confirm if it's necessary to flush flow here? +-- because flush_flow result is at most 1 +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + ++----------------------------------------+ +| ADMIN FLUSH_FLOW('test_numbers_basic') | ++----------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------+ + +-- SQLNESS ARG restart=true +INSERT INTO + numbers_input_basic +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + ++----------------------------------------+ +| ADMIN FLUSH_FLOW('test_numbers_basic') | ++----------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------+ + +SELECT avg_after_filter_num FROM out_num_cnt_basic; + ++----------------------+ +| avg_after_filter_num | ++----------------------+ +| 1 | ++----------------------+ + +INSERT INTO + numbers_input_basic +VALUES + (10, "2021-07-01 00:00:00.200"), + (23, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + ++----------------------------------------+ +| ADMIN FLUSH_FLOW('test_numbers_basic') | ++----------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------+ + +DROP FLOW test_numbers_basic; + +Affected Rows: 0 + +DROP TABLE numbers_input_basic; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_basic; + +Affected Rows: 0 + +CREATE TABLE `live_connection_log` +( + `device_model` STRING NULL, + `connect_protocol` INT NULL, + `connect_mode` INT NULL, + `connect_retry_times` DOUBLE NULL, + `connect_result` INT NULL, + `first_frame_time` DOUBLE NULL, + `record_time` TIMESTAMP TIME INDEX, + `iot_online` INT NULL, + PRIMARY KEY (`device_model`,`connect_protocol`), +); + +Affected Rows: 0 + +CREATE TABLE `live_connection_statistics_detail` +( + `device_model` STRING NULL, + `connect_protocol` INT NULL, + `connect_mode` INT NULL, + `avg_connect_retry_times` DOUBLE NULL, + `total_connect_result_ok` INT64 NULL, + `total_connect_result_fail` INT64 NULL, + `total_connect` INT64 NULL, + `conection_rate` DOUBLE NULL, + `avg_first_frame_time` DOUBLE NULL, + `max_first_frame_time` DOUBLE NULL, + `ok_conection_rate` DOUBLE NULL, + `record_time_window` TIMESTAMP TIME INDEX, + `update_at` TIMESTAMP, + PRIMARY KEY (`device_model`,`connect_protocol`), +); + +Affected Rows: 0 + +CREATE FLOW live_connection_aggregation_detail +SINK TO live_connection_statistics_detail +AS +SELECT + device_model, + connect_protocol, + connect_mode, + avg(connect_retry_times) as avg_connect_retry_times, + sum(case when connect_result = 1 then 1 else 0 end) as total_connect_result_ok, + sum(case when connect_result = 0 then 1 else 0 end) as total_connect_result_fail, + count(connect_result) as total_connect, + sum(case when connect_result = 1 then 1 else 0 end)::double / count(connect_result) as conection_rate, + avg(first_frame_time) as avg_first_frame_time, + max(first_frame_time) as max_first_frame_time, + sum(case when connect_result = 1 then 1 else 0 end)::double / count(connect_result) as ok_conection_rate, + date_bin(INTERVAL '1 minutes', record_time) as record_time_window, +FROM live_connection_log +WHERE iot_online = 1 +GROUP BY + device_model, + connect_protocol, + connect_mode, + record_time_window; + +Affected Rows: 0 + +INSERT INTO + live_connection_log +VALUES + ("STM51", 1, 1, 0.5, 1, 0.1, 0, 1); + +Affected Rows: 1 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('live_connection_aggregation_detail'); + ++--------------------------------------------------------+ +| ADMIN FLUSH_FLOW('live_connection_aggregation_detail') | ++--------------------------------------------------------+ +| FLOW_FLUSHED | ++--------------------------------------------------------+ + +SELECT device_model, + connect_protocol, + connect_mode, + avg_connect_retry_times, + total_connect_result_ok, + total_connect_result_fail, + total_connect, + conection_rate, + avg_first_frame_time, + max_first_frame_time, + ok_conection_rate, + record_time_window FROM live_connection_statistics_detail; + ++--------------+------------------+--------------+-------------------------+-------------------------+---------------------------+---------------+----------------+----------------------+----------------------+-------------------+---------------------+ +| device_model | connect_protocol | connect_mode | avg_connect_retry_times | total_connect_result_ok | total_connect_result_fail | total_connect | conection_rate | avg_first_frame_time | max_first_frame_time | ok_conection_rate | record_time_window | ++--------------+------------------+--------------+-------------------------+-------------------------+---------------------------+---------------+----------------+----------------------+----------------------+-------------------+---------------------+ +| STM51 | 1 | 1 | 0.5 | 1 | 0 | 1 | 1.0 | 0.1 | 0.1 | 1.0 | 1970-01-01T00:00:00 | ++--------------+------------------+--------------+-------------------------+-------------------------+---------------------------+---------------+----------------+----------------------+----------------------+-------------------+---------------------+ + +DROP FLOW live_connection_aggregation_detail; + +Affected Rows: 0 + +DROP TABLE live_connection_log; + +Affected Rows: 0 + +DROP TABLE live_connection_statistics_detail; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_basic.sql b/tests/cases/standalone/common/flow/flow_basic.sql index 74abbc85df22..516afa4074a3 100644 --- a/tests/cases/standalone/common/flow/flow_basic.sql +++ b/tests/cases/standalone/common/flow/flow_basic.sql @@ -13,11 +13,15 @@ FROM GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); +SHOW CREATE TABLE out_num_cnt_basic; + -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_numbers_basic'); +SHOW CREATE TABLE out_num_cnt_basic; + -- SQLNESS ARG restart=true INSERT INTO numbers_input_basic @@ -75,6 +79,8 @@ SELECT FROM input_basic; +SHOW CREATE TABLE out_basic; + DROP FLOW test_wildcard_basic; CREATE FLOW test_wildcard_basic sink TO out_basic AS @@ -83,6 +89,9 @@ SELECT FROM input_basic; +SHOW CREATE TABLE out_basic; + +-- SQLNESS ARG restart=true INSERT INTO input_basic VALUES @@ -92,6 +101,8 @@ VALUES -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_wildcard_basic'); +SHOW CREATE TABLE out_basic; + SELECT wildcard FROM out_basic; DROP FLOW test_wildcard_basic; @@ -112,6 +123,8 @@ SELECT FROM distinct_basic; +SHOW CREATE TABLE out_distinct_basic; + -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | @@ -168,7 +181,9 @@ CREATE TABLE numbers_input_basic ( ); create table out_num_cnt_basic ( - number INT, + a INTERVAL, + b INTERVAL, + c INTERVAL, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX ); @@ -184,6 +199,8 @@ where SHOW CREATE FLOW filter_numbers_basic; +SHOW CREATE TABLE out_num_cnt_basic; + drop flow filter_numbers_basic; drop table out_num_cnt_basic; @@ -214,6 +231,8 @@ from GROUP BY time_window; +SHOW CREATE TABLE approx_rate; + INSERT INTO bytes_log VALUES @@ -294,6 +313,8 @@ SELECT FROM ngx_access_log; +SHOW CREATE TABLE ngx_country; + INSERT INTO ngx_access_log VALUES @@ -359,6 +380,8 @@ GROUP BY country, time_window; +SHOW CREATE TABLE ngx_country; + INSERT INTO ngx_access_log VALUES @@ -437,6 +460,8 @@ GROUP BY HAVING max_temp > 100; +SHOW CREATE TABLE temp_alerts; + INSERT INTO temp_sensor_data VALUES @@ -516,6 +541,8 @@ GROUP BY time_window, bucket_size; +SHOW CREATE TABLE ngx_distribution; + INSERT INTO ngx_access_log VALUES @@ -580,6 +607,8 @@ SELECT FROM requests; +SHOW CREATE TABLE requests_without_ip; + INSERT INTO requests VALUES @@ -680,6 +709,8 @@ FROM android_log GROUP BY time_window; +SHOW CREATE TABLE android_log_abnormal; + INSERT INTO android_log values ("am_crash", "2021-07-01 00:01:01.000"), ("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); @@ -732,6 +763,8 @@ FROM android_log GROUP BY time_window; +SHOW CREATE TABLE android_log_abnormal; + INSERT INTO android_log values ("am_crash", "2021-07-01 00:01:01.000"), ("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); @@ -755,3 +788,128 @@ DROP FLOW calc_android_log_abnormal; DROP TABLE android_log_abnormal; DROP TABLE android_log; + +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(case when number > 10 then 1 else 0 end)/count(number) as avg_after_filter_num +FROM + numbers_input_basic; + +SHOW CREATE TABLE out_num_cnt_basic; + +-- TODO(discord9): confirm if it's necessary to flush flow here? +-- because flush_flow result is at most 1 +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + +-- SQLNESS ARG restart=true +INSERT INTO + numbers_input_basic +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + +SELECT avg_after_filter_num FROM out_num_cnt_basic; + +INSERT INTO + numbers_input_basic +VALUES + (10, "2021-07-01 00:00:00.200"), + (23, "2021-07-01 00:00:00.600"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + +DROP FLOW test_numbers_basic; +DROP TABLE numbers_input_basic; +DROP TABLE out_num_cnt_basic; + +CREATE TABLE `live_connection_log` +( + `device_model` STRING NULL, + `connect_protocol` INT NULL, + `connect_mode` INT NULL, + `connect_retry_times` DOUBLE NULL, + `connect_result` INT NULL, + `first_frame_time` DOUBLE NULL, + `record_time` TIMESTAMP TIME INDEX, + `iot_online` INT NULL, + PRIMARY KEY (`device_model`,`connect_protocol`), +); + +CREATE TABLE `live_connection_statistics_detail` +( + `device_model` STRING NULL, + `connect_protocol` INT NULL, + `connect_mode` INT NULL, + `avg_connect_retry_times` DOUBLE NULL, + `total_connect_result_ok` INT64 NULL, + `total_connect_result_fail` INT64 NULL, + `total_connect` INT64 NULL, + `conection_rate` DOUBLE NULL, + `avg_first_frame_time` DOUBLE NULL, + `max_first_frame_time` DOUBLE NULL, + `ok_conection_rate` DOUBLE NULL, + `record_time_window` TIMESTAMP TIME INDEX, + `update_at` TIMESTAMP, + PRIMARY KEY (`device_model`,`connect_protocol`), +); + +CREATE FLOW live_connection_aggregation_detail +SINK TO live_connection_statistics_detail +AS +SELECT + device_model, + connect_protocol, + connect_mode, + avg(connect_retry_times) as avg_connect_retry_times, + sum(case when connect_result = 1 then 1 else 0 end) as total_connect_result_ok, + sum(case when connect_result = 0 then 1 else 0 end) as total_connect_result_fail, + count(connect_result) as total_connect, + sum(case when connect_result = 1 then 1 else 0 end)::double / count(connect_result) as conection_rate, + avg(first_frame_time) as avg_first_frame_time, + max(first_frame_time) as max_first_frame_time, + sum(case when connect_result = 1 then 1 else 0 end)::double / count(connect_result) as ok_conection_rate, + date_bin(INTERVAL '1 minutes', record_time) as record_time_window, +FROM live_connection_log +WHERE iot_online = 1 +GROUP BY + device_model, + connect_protocol, + connect_mode, + record_time_window; + +INSERT INTO + live_connection_log +VALUES + ("STM51", 1, 1, 0.5, 1, 0.1, 0, 1); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('live_connection_aggregation_detail'); + +SELECT device_model, + connect_protocol, + connect_mode, + avg_connect_retry_times, + total_connect_result_ok, + total_connect_result_fail, + total_connect, + conection_rate, + avg_first_frame_time, + max_first_frame_time, + ok_conection_rate, + record_time_window FROM live_connection_statistics_detail; + +DROP FLOW live_connection_aggregation_detail; +DROP TABLE live_connection_log; +DROP TABLE live_connection_statistics_detail; diff --git a/tests/cases/standalone/common/flow/show_create_flow.result b/tests/cases/standalone/common/flow/show_create_flow.result index 38fa609c960d..14e80129446d 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.result +++ b/tests/cases/standalone/common/flow/show_create_flow.result @@ -322,3 +322,87 @@ drop table numbers_input_show; Affected Rows: 0 +CREATE TABLE numbers_input_show ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +create table out_num_cnt_show ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX, + PRIMARY KEY(number), +); + +Affected Rows: 0 + +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number as n1 FROM numbers_input_show where number > 10; + +Affected Rows: 0 + +INSERT INTO numbers_input_show VALUES (10, 0),(15, 1),(16, 2); + +Affected Rows: 3 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + ++-----------------------------------------+ +| ADMIN FLUSH_FLOW('filter_numbers_show') | ++-----------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------+ + +SELECT number FROM out_num_cnt_show; + ++--------+ +| number | ++--------+ +| 15 | +| 16 | ++--------+ + +-- should mismatch +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2 FROM numbers_input_show where number > 15; + +Error: 3001(EngineExecuteQuery), Invalid query: Column 1(name is 'ts', flow inferred name is 'n2')'s data type mismatch, expect Timestamp(Millisecond(TimestampMillisecondType)) got Int32(Int32Type) + +-- should mismatch +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2, number AS n3 FROM numbers_input_show where number > 15; + +Error: 3001(EngineExecuteQuery), Invalid query: Column 1(name is 'ts', flow inferred name is 'n2')'s data type mismatch, expect Timestamp(Millisecond(TimestampMillisecondType)) got Int32(Int32Type) + +INSERT INTO numbers_input_show VALUES (10, 6),(15, 7),(18, 3); + +Affected Rows: 3 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + +Error: 1003(Internal), Internal error: 1003 + +-- sink table stays the same since the flow error out due to column mismatch +SELECT number FROM out_num_cnt_show; + ++--------+ +| number | ++--------+ +| 15 | +| 16 | ++--------+ + +DROP FLOW filter_numbers_show; + +Affected Rows: 0 + +drop table out_num_cnt_show; + +Affected Rows: 0 + +drop table numbers_input_show; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/show_create_flow.sql b/tests/cases/standalone/common/flow/show_create_flow.sql index 7348a83b5103..f445c4f254c4 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.sql +++ b/tests/cases/standalone/common/flow/show_create_flow.sql @@ -125,3 +125,45 @@ DROP FLOW filter_numbers_show; drop table out_num_cnt_show; drop table numbers_input_show; + +CREATE TABLE numbers_input_show ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); +create table out_num_cnt_show ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX, + PRIMARY KEY(number), +); + +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number as n1 FROM numbers_input_show where number > 10; + +INSERT INTO numbers_input_show VALUES (10, 0),(15, 1),(16, 2); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + +SELECT number FROM out_num_cnt_show; + + +-- should mismatch +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2 FROM numbers_input_show where number > 15; + +-- should mismatch +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2, number AS n3 FROM numbers_input_show where number > 15; + +INSERT INTO numbers_input_show VALUES (10, 6),(15, 7),(18, 3); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + +-- sink table stays the same since the flow error out due to column mismatch +SELECT number FROM out_num_cnt_show; + +DROP FLOW filter_numbers_show; + +drop table out_num_cnt_show; + +drop table numbers_input_show; diff --git a/tests/cases/standalone/common/function/vector/vector.result b/tests/cases/standalone/common/function/vector/vector.result index 6f0205982685..0bcca4740350 100644 --- a/tests/cases/standalone/common/function/vector/vector.result +++ b/tests/cases/standalone/common/function/vector/vector.result @@ -22,3 +22,27 @@ SELECT vec_to_string(parse_vec('[]')); | [] | +--------------------------------------+ +SELECT vec_to_string(vec_mul('[1.0, 2.0]', '[3.0, 4.0]')); + ++---------------------------------------------------------------+ +| vec_to_string(vec_mul(Utf8("[1.0, 2.0]"),Utf8("[3.0, 4.0]"))) | ++---------------------------------------------------------------+ +| [3,8] | ++---------------------------------------------------------------+ + +SELECT vec_to_string(vec_mul(parse_vec('[1.0, 2.0]'), '[3.0, 4.0]')); + ++--------------------------------------------------------------------------+ +| vec_to_string(vec_mul(parse_vec(Utf8("[1.0, 2.0]")),Utf8("[3.0, 4.0]"))) | ++--------------------------------------------------------------------------+ +| [3,8] | ++--------------------------------------------------------------------------+ + +SELECT vec_to_string(vec_mul('[1.0, 2.0]', parse_vec('[3.0, 4.0]'))); + ++--------------------------------------------------------------------------+ +| vec_to_string(vec_mul(Utf8("[1.0, 2.0]"),parse_vec(Utf8("[3.0, 4.0]")))) | ++--------------------------------------------------------------------------+ +| [3,8] | ++--------------------------------------------------------------------------+ + diff --git a/tests/cases/standalone/common/function/vector/vector.sql b/tests/cases/standalone/common/function/vector/vector.sql index 97a986916ab1..3f46fa8f2210 100644 --- a/tests/cases/standalone/common/function/vector/vector.sql +++ b/tests/cases/standalone/common/function/vector/vector.sql @@ -3,3 +3,9 @@ SELECT vec_to_string(parse_vec('[1.0, 2.0]')); SELECT vec_to_string(parse_vec('[1.0, 2.0, 3.0]')); SELECT vec_to_string(parse_vec('[]')); + +SELECT vec_to_string(vec_mul('[1.0, 2.0]', '[3.0, 4.0]')); + +SELECT vec_to_string(vec_mul(parse_vec('[1.0, 2.0]'), '[3.0, 4.0]')); + +SELECT vec_to_string(vec_mul('[1.0, 2.0]', parse_vec('[3.0, 4.0]'))); \ No newline at end of file