From 4051be4214a52cea619c4945ee8160d6997162a6 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 25 Dec 2024 18:57:21 +0800 Subject: [PATCH 1/4] feat: add some critical metrics to flownode (#5235) * feat: add some critical metrics to flownode Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/flow/src/adapter.rs | 8 ++++++- src/flow/src/adapter/flownode_impl.rs | 2 +- src/flow/src/compute/types.rs | 5 +++++ src/flow/src/expr/error.rs | 31 +++++++++++++++++++++++++- src/flow/src/metrics.rs | 18 +++++++++++++++ src/flow/src/server.rs | 32 ++++++++++++++++++++++++--- 6 files changed, 90 insertions(+), 6 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 6d70377cf2aa..eeefe019c1fd 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -55,7 +55,7 @@ use crate::error::{ UnexpectedSnafu, }; use crate::expr::{Batch, GlobalId}; -use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_RUN_INTERVAL_MS}; +use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS}; use crate::repr::{self, DiffRow, Row, BATCH_SIZE}; mod flownode_impl; @@ -249,12 +249,18 @@ impl FlowWorkerManager { self.try_fetch_or_create_table(&table_name).await?; let schema_len = proto_schema.len(); + let total_rows = reqs.iter().map(|r| r.len()).sum::(); trace!( "Sending {} writeback requests to table {}, reqs total rows={}", reqs.len(), table_name.join("."), reqs.iter().map(|r| r.len()).sum::() ); + + METRIC_FLOW_ROWS + .with_label_values(&["out"]) + .inc_by(total_rows as u64); + let now = self.tick_manager.tick(); for req in reqs { match req { diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 1fa11b4d83a2..1f431ff92f64 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -138,7 +138,7 @@ impl Flownode for FlowWorkerManager { } async fn handle_inserts(&self, request: InsertRequests) -> Result { - // using try_read makesure two things: + // using try_read to ensure two things: // 1. flush wouldn't happen until inserts before it is inserted // 2. inserts happening concurrently with flush wouldn't be block by flush let _flush_lock = self.flush_lock.try_read(); diff --git a/src/flow/src/compute/types.rs b/src/flow/src/compute/types.rs index e125a2d27261..b92ec30f9f6a 100644 --- a/src/flow/src/compute/types.rs +++ b/src/flow/src/compute/types.rs @@ -17,6 +17,7 @@ use std::collections::{BTreeMap, VecDeque}; use std::rc::Rc; use std::sync::Arc; +use common_error::ext::ErrorExt; use hydroflow::scheduled::graph::Hydroflow; use hydroflow::scheduled::handoff::TeeingHandoff; use hydroflow::scheduled::port::RecvPort; @@ -25,6 +26,7 @@ use itertools::Itertools; use tokio::sync::Mutex; use crate::expr::{Batch, EvalError, ScalarExpr}; +use crate::metrics::METRIC_FLOW_ERRORS; use crate::repr::DiffRow; use crate::utils::ArrangeHandler; @@ -185,6 +187,9 @@ impl ErrCollector { } pub fn push_err(&self, err: EvalError) { + METRIC_FLOW_ERRORS + .with_label_values(&[err.status_code().as_ref()]) + .inc(); self.inner.blocking_lock().push_back(err) } diff --git a/src/flow/src/expr/error.rs b/src/flow/src/expr/error.rs index 992d5c592125..b29098b9d8fa 100644 --- a/src/flow/src/expr/error.rs +++ b/src/flow/src/expr/error.rs @@ -14,8 +14,11 @@ //! Error handling for expression evaluation. +use std::any::Any; + use arrow_schema::ArrowError; -use common_error::ext::BoxedError; +use common_error::ext::{BoxedError, ErrorExt}; +use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use datafusion_common::DataFusionError; use datatypes::data_type::ConcreteDataType; @@ -126,3 +129,29 @@ pub enum EvalError { source: BoxedError, }, } + +impl ErrorExt for EvalError { + fn status_code(&self) -> StatusCode { + use EvalError::*; + match self { + DivisionByZero { .. } + | TypeMismatch { .. } + | TryFromValue { .. } + | DataAlreadyExpired { .. } + | InvalidArgument { .. } + | Overflow { .. } => StatusCode::InvalidArguments, + + CastValue { source, .. } | DataType { source, .. } => source.status_code(), + + Internal { .. } + | Optimize { .. } + | Arrow { .. } + | Datafusion { .. } + | External { .. } => StatusCode::Internal, + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/flow/src/metrics.rs b/src/flow/src/metrics.rs index 119b4c5856de..f165bcadc6a4 100644 --- a/src/flow/src/metrics.rs +++ b/src/flow/src/metrics.rs @@ -30,4 +30,22 @@ lazy_static! { .unwrap(); pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge = register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap(); + pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!( + "greptime_flow_processed_rows", + "Count of rows flowing through the system", + &["direction"] + ) + .unwrap(); + pub static ref METRIC_FLOW_PROCESSING_TIME: HistogramVec = register_histogram_vec!( + "greptime_flow_processing_time", + "Time spent processing requests", + &["type"] + ) + .unwrap(); + pub static ref METRIC_FLOW_ERRORS: IntCounterVec = register_int_counter_vec!( + "greptime_flow_errors", + "Count of errors in flow processing", + &["code"] + ) + .unwrap(); } diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 1259c1175510..37e3249ec82a 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -54,6 +54,7 @@ use crate::error::{ ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, }; use crate::heartbeat::HeartbeatTask; +use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS}; use crate::transform::register_function_to_query_engine; use crate::utils::{SizeReportSender, StateReportHandler}; use crate::{Error, FlowWorkerManager, FlownodeOptions}; @@ -77,6 +78,10 @@ impl flow_server::Flow for FlowService { &self, request: Request, ) -> Result, Status> { + let _timer = METRIC_FLOW_PROCESSING_TIME + .with_label_values(&["ddl"]) + .start_timer(); + let request = request.into_inner(); self.manager .handle(request) @@ -92,18 +97,31 @@ impl flow_server::Flow for FlowService { &self, request: Request, ) -> Result, Status> { + let _timer = METRIC_FLOW_PROCESSING_TIME + .with_label_values(&["insert"]) + .start_timer(); + let request = request.into_inner(); // TODO(discord9): fix protobuf import order shenanigans to remove this duplicated define + let mut row_count = 0; let request = api::v1::region::InsertRequests { requests: request .requests .into_iter() - .map(|insert| api::v1::region::InsertRequest { - region_id: insert.region_id, - rows: insert.rows, + .map(|insert| { + insert.rows.as_ref().inspect(|x| row_count += x.rows.len()); + api::v1::region::InsertRequest { + region_id: insert.region_id, + rows: insert.rows, + } }) .collect_vec(), }; + + METRIC_FLOW_ROWS + .with_label_values(&["in"]) + .inc_by(row_count as u64); + self.manager .handle_inserts(request) .await @@ -500,6 +518,10 @@ impl FrontendInvoker { requests: RowInsertRequests, ctx: QueryContextRef, ) -> common_frontend::error::Result { + let _timer = METRIC_FLOW_PROCESSING_TIME + .with_label_values(&["output_insert"]) + .start_timer(); + self.inserter .handle_row_inserts(requests, ctx, &self.statement_executor) .await @@ -512,6 +534,10 @@ impl FrontendInvoker { requests: RowDeleteRequests, ctx: QueryContextRef, ) -> common_frontend::error::Result { + let _timer = METRIC_FLOW_PROCESSING_TIME + .with_label_values(&["output_delete"]) + .start_timer(); + self.deleter .handle_row_deletes(requests, ctx) .await From abf34b845c1a6800b7c6edb768c498d2ac2bc8ef Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Wed, 25 Dec 2024 21:42:37 +0800 Subject: [PATCH 2/4] feat(flow): check sink table mismatch on flow creation (#5112) * tests: more mismatch errors * feat: check sink table schema if exists&prompt nice err msg * chore: rm unused variant * chore: fmt * chore: cargo clippy * feat: check schema on create * feat: better err msg when mismatch * tests: fix a schema mismatch * todo: create sink table * feat: create sink table * fix: find time index * tests: auto created sink table * fix: remove empty keys * refactor: per review * chore: fmt * test: sqlness * chore: after rebase --- Cargo.lock | 2 + Cargo.toml | 1 + src/common/error/Cargo.toml | 1 + src/common/error/src/lib.rs | 21 + src/flow/Cargo.toml | 1 + src/flow/src/adapter.rs | 247 +++++---- src/flow/src/adapter/node_context.rs | 3 + src/flow/src/adapter/table_source.rs | 38 +- src/flow/src/adapter/util.rs | 169 +++++- src/flow/src/error.rs | 22 +- src/flow/src/repr/relation.rs | 2 + src/flow/src/server.rs | 18 +- src/flow/src/transform/aggr.rs | 7 +- src/operator/src/expr_factory.rs | 3 +- src/operator/src/insert.rs | 2 +- src/servers/Cargo.toml | 2 +- src/servers/src/http/result/error_result.rs | 19 +- .../common/flow/flow_auto_sink_table.result | 161 ++++++ .../common/flow/flow_auto_sink_table.sql | 58 ++ .../standalone/common/flow/flow_basic.result | 500 +++++++++++++++++- .../standalone/common/flow/flow_basic.sql | 160 +++++- .../common/flow/show_create_flow.result | 84 +++ .../common/flow/show_create_flow.sql | 42 ++ 23 files changed, 1381 insertions(+), 182 deletions(-) create mode 100644 tests/cases/standalone/common/flow/flow_auto_sink_table.result create mode 100644 tests/cases/standalone/common/flow/flow_auto_sink_table.sql diff --git a/Cargo.lock b/Cargo.lock index 7049caad46e6..fb778e623bc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2016,6 +2016,7 @@ dependencies = [ name = "common-error" version = "0.12.0" dependencies = [ + "http 0.2.12", "snafu 0.8.5", "strum 0.25.0", "tonic 0.11.0", @@ -4061,6 +4062,7 @@ dependencies = [ "get-size-derive2", "get-size2", "greptime-proto", + "http 0.2.12", "hydroflow", "itertools 0.10.5", "lazy_static", diff --git a/Cargo.toml b/Cargo.toml index 9729d5796d74..2156a3fcfc51 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -126,6 +126,7 @@ futures = "0.3" futures-util = "0.3" greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a875e976441188028353f7274a46a7e6e065c5d4" } hex = "0.4" +http = "0.2" humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/common/error/Cargo.toml b/src/common/error/Cargo.toml index 49eafb81d5a2..148e2c66336f 100644 --- a/src/common/error/Cargo.toml +++ b/src/common/error/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true workspace = true [dependencies] +http.workspace = true snafu.workspace = true strum.workspace = true tonic.workspace = true diff --git a/src/common/error/src/lib.rs b/src/common/error/src/lib.rs index c5c0e6efe092..0052d70cf38e 100644 --- a/src/common/error/src/lib.rs +++ b/src/common/error/src/lib.rs @@ -18,9 +18,30 @@ pub mod ext; pub mod mock; pub mod status_code; +use http::{HeaderMap, HeaderValue}; pub use snafu; // HACK - these headers are here for shared in gRPC services. For common HTTP headers, // please define in `src/servers/src/http/header.rs`. pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code"; pub const GREPTIME_DB_HEADER_ERROR_MSG: &str = "x-greptime-err-msg"; + +/// Create a http header map from error code and message. +/// using `GREPTIME_DB_HEADER_ERROR_CODE` and `GREPTIME_DB_HEADER_ERROR_MSG` as keys. +pub fn from_err_code_msg_to_header(code: u32, msg: &str) -> HeaderMap { + let mut header = HeaderMap::new(); + + let msg = HeaderValue::from_str(msg).unwrap_or_else(|_| { + HeaderValue::from_bytes( + &msg.as_bytes() + .iter() + .flat_map(|b| std::ascii::escape_default(*b)) + .collect::>(), + ) + .expect("Already escaped string should be valid ascii") + }); + + header.insert(GREPTIME_DB_HEADER_ERROR_CODE, code.into()); + header.insert(GREPTIME_DB_HEADER_ERROR_MSG, msg); + header +} diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 08867d342a74..b624eed05b9b 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -45,6 +45,7 @@ get-size2 = "0.1.2" greptime-proto.workspace = true # This fork of hydroflow is simply for keeping our dependency in our org, and pin the version # otherwise it is the same with upstream repo +http.workspace = true hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" } itertools.workspace = true lazy_static.workspace = true diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index eeefe019c1fd..dcdb1b1eb01a 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -30,7 +30,7 @@ use common_telemetry::{debug, info, trace}; use datatypes::schema::ColumnSchema; use datatypes::value::Value; use greptime_proto::v1; -use itertools::Itertools; +use itertools::{EitherOrBoth, Itertools}; use meta_client::MetaClientOptions; use query::QueryEngine; use serde::{Deserialize, Serialize}; @@ -46,17 +46,19 @@ use tokio::sync::{broadcast, watch, Mutex, RwLock}; pub(crate) use crate::adapter::node_context::FlownodeContext; use crate::adapter::table_source::TableSource; -use crate::adapter::util::column_schemas_to_proto; +use crate::adapter::util::{ + relation_desc_to_column_schemas_with_fallback, table_info_value_to_relation_desc, +}; use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; use crate::compute::ErrCollector; use crate::df_optimizer::sql_to_flow_plan; use crate::error::{ - EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, TableNotFoundSnafu, + EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu, }; -use crate::expr::{Batch, GlobalId}; +use crate::expr::Batch; use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS}; -use crate::repr::{self, DiffRow, Row, BATCH_SIZE}; +use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE}; mod flownode_impl; mod parse_expr; @@ -245,8 +247,12 @@ impl FlowWorkerManager { let (catalog, schema) = (table_name[0].clone(), table_name[1].clone()); let ctx = Arc::new(QueryContext::with(&catalog, &schema)); - let (is_ts_placeholder, proto_schema) = - self.try_fetch_or_create_table(&table_name).await?; + let (is_ts_placeholder, proto_schema) = self + .try_fetch_existing_table(&table_name) + .await? + .context(UnexpectedSnafu { + reason: format!("Table not found: {}", table_name.join(".")), + })?; let schema_len = proto_schema.len(); let total_rows = reqs.iter().map(|r| r.len()).sum::(); @@ -396,14 +402,12 @@ impl FlowWorkerManager { Ok(output) } - /// Fetch table info or create table from flow's schema if not exist - async fn try_fetch_or_create_table( + /// Fetch table schema and primary key from table info source, if table not exist return None + async fn fetch_table_pk_schema( &self, table_name: &TableName, - ) -> Result<(bool, Vec), Error> { - // TODO(discord9): instead of auto build table from request schema, actually build table - // before `create flow` to be able to assign pk and ts etc. - let (primary_keys, schema, is_ts_placeholder) = if let Some(table_id) = self + ) -> Result, Option, Vec)>, Error> { + if let Some(table_id) = self .table_info_source .get_table_id_from_name(table_name) .await? @@ -420,97 +424,64 @@ impl FlowWorkerManager { .map(|i| meta.schema.column_schemas[i].name.clone()) .collect_vec(); let schema = meta.schema.column_schemas; - // check if the last column is the auto created timestamp column, hence the table is auto created from - // flow's plan type - let is_auto_create = { - let correct_name = schema - .last() - .map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL) - .unwrap_or(false); - let correct_time_index = meta.schema.timestamp_index == Some(schema.len() - 1); - correct_name && correct_time_index - }; - (primary_keys, schema, is_auto_create) + let time_index = meta.schema.timestamp_index; + Ok(Some((primary_keys, time_index, schema))) } else { - // TODO(discord9): condiser remove buggy auto create by schema + Ok(None) + } + } - let node_ctx = self.node_context.read().await; - let gid: GlobalId = node_ctx - .table_repr - .get_by_name(table_name) - .map(|x| x.1) - .unwrap(); - let schema = node_ctx - .schema - .get(&gid) - .with_context(|| TableNotFoundSnafu { - name: format!("Table name = {:?}", table_name), - })? - .clone(); - // TODO(discord9): use default key from schema - let primary_keys = schema - .typ() - .keys - .first() - .map(|v| { - v.column_indices - .iter() - .map(|i| { - schema - .get_name(*i) - .clone() - .unwrap_or_else(|| format!("col_{i}")) - }) - .collect_vec() - }) - .unwrap_or_default(); - let update_at = ColumnSchema::new( - UPDATE_AT_TS_COL, - ConcreteDataType::timestamp_millisecond_datatype(), - true, - ); + /// return (primary keys, schema and if the table have a placeholder timestamp column) + /// schema of the table comes from flow's output plan + /// + /// adjust to add `update_at` column and ts placeholder if needed + async fn adjust_auto_created_table_schema( + &self, + schema: &RelationDesc, + ) -> Result<(Vec, Vec, bool), Error> { + // TODO(discord9): condiser remove buggy auto create by schema + + // TODO(discord9): use default key from schema + let primary_keys = schema + .typ() + .keys + .first() + .map(|v| { + v.column_indices + .iter() + .map(|i| { + schema + .get_name(*i) + .clone() + .unwrap_or_else(|| format!("col_{i}")) + }) + .collect_vec() + }) + .unwrap_or_default(); + let update_at = ColumnSchema::new( + UPDATE_AT_TS_COL, + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ); - let original_schema = schema - .typ() - .column_types - .clone() - .into_iter() - .enumerate() - .map(|(idx, typ)| { - let name = schema - .names - .get(idx) - .cloned() - .flatten() - .unwrap_or(format!("col_{}", idx)); - let ret = ColumnSchema::new(name, typ.scalar_type, typ.nullable); - if schema.typ().time_index == Some(idx) { - ret.with_time_index(true) - } else { - ret - } - }) - .collect_vec(); + let original_schema = relation_desc_to_column_schemas_with_fallback(schema); - let mut with_auto_added_col = original_schema.clone(); - with_auto_added_col.push(update_at); + let mut with_auto_added_col = original_schema.clone(); + with_auto_added_col.push(update_at); - // if no time index, add one as placeholder - let no_time_index = schema.typ().time_index.is_none(); - if no_time_index { - let ts_col = ColumnSchema::new( - AUTO_CREATED_PLACEHOLDER_TS_COL, - ConcreteDataType::timestamp_millisecond_datatype(), - true, - ) - .with_time_index(true); - with_auto_added_col.push(ts_col); - } + // if no time index, add one as placeholder + let no_time_index = schema.typ().time_index.is_none(); + if no_time_index { + let ts_col = ColumnSchema::new( + AUTO_CREATED_PLACEHOLDER_TS_COL, + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ) + .with_time_index(true); + with_auto_added_col.push(ts_col); + } - (primary_keys, with_auto_added_col, no_time_index) - }; - let proto_schema = column_schemas_to_proto(schema, &primary_keys)?; - Ok((is_ts_placeholder, proto_schema)) + Ok((primary_keys, with_auto_added_col, no_time_index)) } } @@ -813,7 +784,85 @@ impl FlowWorkerManager { let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?; debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan); - node_ctx.assign_table_schema(&sink_table_name, flow_plan.schema.clone())?; + + // check schema against actual table schema if exists + // if not exist create sink table immediately + if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? { + let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema); + + // for column schema, only `data_type` need to be check for equality + // since one can omit flow's column name when write flow query + // print a user friendly error message about mismatch and how to correct them + for (idx, zipped) in auto_schema + .iter() + .zip_longest(real_schema.iter()) + .enumerate() + { + match zipped { + EitherOrBoth::Both(auto, real) => { + if auto.data_type != real.data_type { + InvalidQuerySnafu { + reason: format!( + "Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}", + idx, + real.name, + auto.name, + real.data_type, + auto.data_type + ), + } + .fail()?; + } + } + EitherOrBoth::Right(real) if real.data_type.is_timestamp() => { + // if table is auto created, the last one or two column should be timestamp(update at and ts placeholder) + continue; + } + _ => InvalidQuerySnafu { + reason: format!( + "schema length mismatched, expected {} found {}", + real_schema.len(), + auto_schema.len() + ), + } + .fail()?, + } + } + + let table_id = self + .table_info_source + .get_table_id_from_name(&sink_table_name) + .await? + .context(UnexpectedSnafu { + reason: format!("Can't get table id for table name {:?}", sink_table_name), + })?; + let table_info_value = self + .table_info_source + .get_table_info_value(&table_id) + .await? + .context(UnexpectedSnafu { + reason: format!("Can't get table info value for table id {:?}", table_id), + })?; + let real_schema = table_info_value_to_relation_desc(table_info_value)?; + node_ctx.assign_table_schema(&sink_table_name, real_schema.clone())?; + } else { + // assign inferred schema to sink table + // create sink table + node_ctx.assign_table_schema(&sink_table_name, flow_plan.schema.clone())?; + let did_create = self + .create_table_from_relation( + &format!("flow-id={flow_id}"), + &sink_table_name, + &flow_plan.schema, + ) + .await?; + if !did_create { + UnexpectedSnafu { + reason: format!("Failed to create table {:?}", sink_table_name), + } + .fail()?; + } + } let _ = comment; let _ = flow_options; diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 990fdd129797..5c644803ec71 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -331,12 +331,14 @@ impl FlownodeContext { } else { let global_id = self.new_global_id(); + // table id is Some meaning db must have created the table if let Some(table_id) = table_id { let (known_table_name, schema) = srv_map.get_table_name_schema(&table_id).await?; table_name = table_name.or(Some(known_table_name)); self.schema.insert(global_id, schema); } // if we don't have table id, it means database havn't assign one yet or we don't need it + // still update the mapping with new global id self.table_repr.insert(table_name, table_id, global_id); Ok(global_id) } @@ -358,6 +360,7 @@ impl FlownodeContext { })?; self.schema.insert(gid, schema); + Ok(()) } diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs index 0454ab16b1d3..7981999f0abc 100644 --- a/src/flow/src/adapter/table_source.rs +++ b/src/flow/src/adapter/table_source.rs @@ -20,11 +20,12 @@ use common_meta::key::table_name::{TableNameKey, TableNameManager}; use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; +use crate::adapter::util::table_info_value_to_relation_desc; use crate::adapter::TableName; use crate::error::{ Error, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu, }; -use crate::repr::{self, ColumnType, RelationDesc, RelationType}; +use crate::repr::RelationDesc; /// mapping of table name <-> table id should be query from tableinfo manager pub struct TableSource { @@ -121,38 +122,7 @@ impl TableSource { table_name.table_name, ]; - let raw_schema = table_info_value.table_info.meta.schema; - let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema - .column_schemas - .clone() - .into_iter() - .map(|col| { - ( - ColumnType { - nullable: col.is_nullable(), - scalar_type: col.data_type, - }, - Some(col.name), - ) - }) - .unzip(); - - let key = table_info_value.table_info.meta.primary_key_indices; - let keys = vec![repr::Key::from(key)]; - - let time_index = raw_schema.timestamp_index; - Ok(( - table_name, - RelationDesc { - typ: RelationType { - column_types, - keys, - time_index, - // by default table schema's column are all non-auto - auto_columns: vec![], - }, - names: col_names, - }, - )) + let desc = table_info_value_to_relation_desc(table_info_value)?; + Ok((table_name, desc)) } } diff --git a/src/flow/src/adapter/util.rs b/src/flow/src/adapter/util.rs index f2a29bec8e9e..f5d2968543f9 100644 --- a/src/flow/src/adapter/util.rs +++ b/src/flow/src/adapter/util.rs @@ -12,16 +12,153 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use api::helper::ColumnDataTypeWrapper; use api::v1::column_def::options_from_column_schema; -use api::v1::{ColumnDataType, ColumnDataTypeExtension, SemanticType}; +use api::v1::{ColumnDataType, ColumnDataTypeExtension, CreateTableExpr, SemanticType}; use common_error::ext::BoxedError; +use common_meta::key::table_info::TableInfoValue; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use itertools::Itertools; -use snafu::ResultExt; +use operator::expr_factory::CreateExprFactory; +use session::context::QueryContextBuilder; +use snafu::{OptionExt, ResultExt}; +use table::table_reference::TableReference; + +use crate::adapter::{TableName, AUTO_CREATED_PLACEHOLDER_TS_COL}; +use crate::error::{Error, ExternalSnafu, UnexpectedSnafu}; +use crate::repr::{ColumnType, RelationDesc, RelationType}; +use crate::FlowWorkerManager; + +impl FlowWorkerManager { + /// Create table from given schema(will adjust to add auto column if needed), return true if table is created + pub(crate) async fn create_table_from_relation( + &self, + flow_name: &str, + table_name: &TableName, + relation_desc: &RelationDesc, + ) -> Result { + if self.fetch_table_pk_schema(table_name).await?.is_some() { + return Ok(false); + } + let (pks, tys, _) = self.adjust_auto_created_table_schema(relation_desc).await?; + + //create sink table using pks, column types and is_ts_auto + + let proto_schema = column_schemas_to_proto(tys.clone(), &pks)?; + + // create sink table + let create_expr = CreateExprFactory {} + .create_table_expr_by_column_schemas( + &TableReference { + catalog: &table_name[0], + schema: &table_name[1], + table: &table_name[2], + }, + &proto_schema, + "mito", + Some(&format!("Sink table for flow {}", flow_name)), + ) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; -use crate::error::{Error, ExternalSnafu}; + self.submit_create_sink_table_ddl(create_expr).await?; + Ok(true) + } + + /// Try fetch table with adjusted schema(added auto column if needed) + pub(crate) async fn try_fetch_existing_table( + &self, + table_name: &TableName, + ) -> Result)>, Error> { + if let Some((primary_keys, time_index, schema)) = + self.fetch_table_pk_schema(table_name).await? + { + // check if the last column is the auto created timestamp column, hence the table is auto created from + // flow's plan type + let is_auto_create = { + let correct_name = schema + .last() + .map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL) + .unwrap_or(false); + let correct_time_index = time_index == Some(schema.len() - 1); + correct_name && correct_time_index + }; + let proto_schema = column_schemas_to_proto(schema, &primary_keys)?; + Ok(Some((is_auto_create, proto_schema))) + } else { + Ok(None) + } + } + + /// submit a create table ddl + pub(crate) async fn submit_create_sink_table_ddl( + &self, + mut create_table: CreateTableExpr, + ) -> Result<(), Error> { + let stmt_exec = { + self.frontend_invoker + .read() + .await + .as_ref() + .map(|f| f.statement_executor()) + } + .context(UnexpectedSnafu { + reason: "Failed to get statement executor", + })?; + let ctx = Arc::new( + QueryContextBuilder::default() + .current_catalog(create_table.catalog_name.clone()) + .current_schema(create_table.schema_name.clone()) + .build(), + ); + stmt_exec + .create_table_inner(&mut create_table, None, ctx) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + Ok(()) + } +} + +pub fn table_info_value_to_relation_desc( + table_info_value: TableInfoValue, +) -> Result { + let raw_schema = table_info_value.table_info.meta.schema; + let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema + .column_schemas + .clone() + .into_iter() + .map(|col| { + ( + ColumnType { + nullable: col.is_nullable(), + scalar_type: col.data_type, + }, + Some(col.name), + ) + }) + .unzip(); + + let key = table_info_value.table_info.meta.primary_key_indices; + let keys = vec![crate::repr::Key::from(key)]; + + let time_index = raw_schema.timestamp_index; + + Ok(RelationDesc { + typ: RelationType { + column_types, + keys, + time_index, + // by default table schema's column are all non-auto + auto_columns: vec![], + }, + names: col_names, + }) +} pub fn from_proto_to_data_type( column_schema: &api::v1::ColumnSchema, @@ -75,3 +212,29 @@ pub fn column_schemas_to_proto( .collect(); Ok(ret) } + +/// Convert `RelationDesc` to `ColumnSchema` list, +/// if the column name is not present, use `col_{idx}` as the column name +pub fn relation_desc_to_column_schemas_with_fallback(schema: &RelationDesc) -> Vec { + schema + .typ() + .column_types + .clone() + .into_iter() + .enumerate() + .map(|(idx, typ)| { + let name = schema + .names + .get(idx) + .cloned() + .flatten() + .unwrap_or(format!("col_{}", idx)); + let ret = ColumnSchema::new(name, typ.scalar_type, typ.nullable); + if schema.typ().time_index == Some(idx) { + ret.with_time_index(true) + } else { + ret + } + }) + .collect_vec() +} diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index a94de4b6ed7b..137e024307f9 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -16,12 +16,13 @@ use std::any::Any; -use common_error::define_into_tonic_status; use common_error::ext::BoxedError; +use common_error::{define_into_tonic_status, from_err_code_msg_to_header}; use common_macro::stack_trace_debug; use common_telemetry::common_error::ext::ErrorExt; use common_telemetry::common_error::status_code::StatusCode; use snafu::{Location, Snafu}; +use tonic::metadata::MetadataMap; use crate::adapter::FlowId; use crate::expr::EvalError; @@ -186,6 +187,20 @@ pub enum Error { }, } +/// the outer message is the full error stack, and inner message in header is the last error message that can be show directly to user +pub fn to_status_with_last_err(err: impl ErrorExt) -> tonic::Status { + let msg = err.to_string(); + let last_err_msg = common_error::ext::StackError::last(&err).to_string(); + let code = err.status_code() as u32; + let header = from_err_code_msg_to_header(code, &last_err_msg); + + tonic::Status::with_metadata( + tonic::Code::InvalidArgument, + msg, + MetadataMap::from_headers(header), + ) +} + /// Result type for flow module pub type Result = std::result::Result; @@ -200,9 +215,8 @@ impl ErrorExt for Error { | Self::TableNotFoundMeta { .. } | Self::FlowNotFound { .. } | Self::ListFlows { .. } => StatusCode::TableNotFound, - Self::InvalidQuery { .. } | Self::Plan { .. } | Self::Datatypes { .. } => { - StatusCode::PlanQuery - } + Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery, + Self::InvalidQuery { .. } => StatusCode::EngineExecuteQuery, Self::Unexpected { .. } => StatusCode::Unexpected, Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => { StatusCode::Unsupported diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs index d0fbb861eb24..168b5df7d0e2 100644 --- a/src/flow/src/repr/relation.rs +++ b/src/flow/src/repr/relation.rs @@ -212,6 +212,8 @@ impl RelationType { for key in &mut self.keys { key.remove_col(time_index.unwrap_or(usize::MAX)); } + // remove empty keys + self.keys.retain(|key| !key.is_empty()); self } diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 37e3249ec82a..d22ba220441b 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -50,8 +50,8 @@ use tonic::{Request, Response, Status}; use crate::adapter::{CreateFlowArgs, FlowWorkerManagerRef}; use crate::error::{ - CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu, ParseAddrSnafu, - ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, + to_status_with_last_err, CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu, + ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, }; use crate::heartbeat::HeartbeatTask; use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS}; @@ -87,10 +87,7 @@ impl flow_server::Flow for FlowService { .handle(request) .await .map(Response::new) - .map_err(|e| { - let msg = format!("failed to handle request: {:?}", e); - Status::internal(msg) - }) + .map_err(to_status_with_last_err) } async fn handle_mirror_request( @@ -126,10 +123,7 @@ impl flow_server::Flow for FlowService { .handle_inserts(request) .await .map(Response::new) - .map_err(|e| { - let msg = format!("failed to handle request: {:?}", e); - Status::internal(msg) - }) + .map_err(to_status_with_last_err) } } @@ -544,4 +538,8 @@ impl FrontendInvoker { .map_err(BoxedError::new) .context(common_frontend::error::ExternalSnafu) } + + pub fn statement_executor(&self) -> Arc { + self.statement_executor.clone() + } } diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index 2d0a6f66dab8..b944e3b263e3 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -216,6 +216,7 @@ impl KeyValPlan { /// find out the column that should be time index in group exprs(which is all columns that should be keys) /// TODO(discord9): better ways to assign time index +/// for now, it will found the first column that is timestamp or has a tumble window floor function fn find_time_index_in_group_exprs(group_exprs: &[TypedExpr]) -> Option { group_exprs.iter().position(|expr| { matches!( @@ -224,7 +225,7 @@ fn find_time_index_in_group_exprs(group_exprs: &[TypedExpr]) -> Option { func: UnaryFunc::TumbleWindowFloor { .. }, expr: _ } - ) + ) || expr.typ.scalar_type.is_timestamp() }) } @@ -1482,7 +1483,7 @@ mod test { ColumnType::new(CDT::float64_datatype(), true), ColumnType::new(CDT::timestamp_millisecond_datatype(), true), ]) - .with_key(vec![1]) + .with_time_index(Some(1)) .into_named(vec![ Some( "MAX(numbers_with_ts.number) - MIN(numbers_with_ts.number) / Float64(30)" @@ -1571,7 +1572,7 @@ mod test { ColumnType::new(ConcreteDataType::uint32_datatype(), true), // max ColumnType::new(ConcreteDataType::uint32_datatype(), true), // min ]) - .with_key(vec![0]) + .with_time_index(Some(0)) .into_unnamed(), ), ), diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index 59fe87a66ec3..bc50eff161b5 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -68,6 +68,7 @@ impl CreateExprFactory { table_name: &TableReference<'_>, column_schemas: &[api::v1::ColumnSchema], engine: &str, + desc: Option<&str>, ) -> Result { let column_exprs = ColumnExpr::from_column_schemas(column_schemas); let create_expr = common_grpc_expr::util::build_create_table_expr( @@ -75,7 +76,7 @@ impl CreateExprFactory { table_name, column_exprs, engine, - "Created on insertion", + desc.unwrap_or("Created on insertion"), ) .context(BuildCreateExprOnInsertionSnafu)?; diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 466dde5425c1..6b7702f25b8b 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -870,5 +870,5 @@ fn build_create_table_expr( request_schema: &[ColumnSchema], engine: &str, ) -> Result { - CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, engine) + CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, engine, None) } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 74adaffd5ea8..33831ba639bb 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -56,7 +56,7 @@ futures-util.workspace = true hashbrown = "0.14" headers = "0.3" hostname = "0.3" -http = "0.2" +http.workspace = true http-body = "0.4" humantime.workspace = true humantime-serde.workspace = true diff --git a/src/servers/src/http/result/error_result.rs b/src/servers/src/http/result/error_result.rs index be5f01f9e950..3c7c718e5ba4 100644 --- a/src/servers/src/http/result/error_result.rs +++ b/src/servers/src/http/result/error_result.rs @@ -12,17 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use axum::http::HeaderValue; use axum::response::{IntoResponse, Response}; use axum::Json; use common_error::ext::ErrorExt; +use common_error::from_err_code_msg_to_header; use common_error::status_code::StatusCode; use common_telemetry::{debug, error}; use serde::{Deserialize, Serialize}; use crate::error::status_code_to_http_status; -use crate::http::header::constants::GREPTIME_DB_HEADER_ERROR_CODE; -use crate::http::header::GREPTIME_DB_HEADER_EXECUTION_TIME; #[derive(Serialize, Deserialize, Debug)] pub struct ErrorResponse { @@ -74,13 +72,16 @@ impl IntoResponse for ErrorResponse { fn into_response(self) -> Response { let code = self.code; let execution_time = self.execution_time_ms; - let mut resp = Json(self).into_response(); - resp.headers_mut() - .insert(GREPTIME_DB_HEADER_ERROR_CODE, HeaderValue::from(code)); - resp.headers_mut().insert( - &GREPTIME_DB_HEADER_EXECUTION_TIME, - HeaderValue::from(execution_time), + let new_header = from_err_code_msg_to_header( + code, + &format!( + "error: {}, execution_time_ms: {}", + self.error, execution_time + ), ); + let mut resp = Json(self).into_response(); + resp.headers_mut().extend(new_header); + let status = StatusCode::from_u32(code).unwrap_or(StatusCode::Unknown); let status_code = status_code_to_http_status(&status); diff --git a/tests/cases/standalone/common/flow/flow_auto_sink_table.result b/tests/cases/standalone/common/flow/flow_auto_sink_table.result new file mode 100644 index 000000000000..e8bd5cf739f4 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_auto_sink_table.result @@ -0,0 +1,161 @@ +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(number) +FROM + numbers_input_basic +GROUP BY + tumble(ts, '1 second', '2021-07-01 00:00:00'); + +Affected Rows: 0 + +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "SUM(numbers_input_basic.number)" BIGINT NULL, | +| | "window_start" TIMESTAMP(3) NOT NULL, | +| | "window_end" TIMESTAMP(3) NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("window_start"), | +| | PRIMARY KEY ("window_end") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + ++----------------------------------------+ +| ADMIN FLUSH_FLOW('test_numbers_basic') | ++----------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------+ + +-- SQLNESS ARG restart=true +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "SUM(numbers_input_basic.number)" BIGINT NULL, | +| | "window_start" TIMESTAMP(3) NOT NULL, | +| | "window_end" TIMESTAMP(3) NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("window_start"), | +| | PRIMARY KEY ("window_end") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + +SHOW CREATE FLOW test_numbers_basic; + ++--------------------+-------------------------------------------------------------------------------------------------------+ +| Flow | Create Flow | ++--------------------+-------------------------------------------------------------------------------------------------------+ +| test_numbers_basic | CREATE FLOW IF NOT EXISTS test_numbers_basic | +| | SINK TO out_num_cnt_basic | +| | AS SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00') | ++--------------------+-------------------------------------------------------------------------------------------------------+ + +DROP FLOW test_numbers_basic; + +Affected Rows: 0 + +DROP TABLE numbers_input_basic; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_basic; + +Affected Rows: 0 + +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(number) as sumup, ts as event_time +FROM + numbers_input_basic +GROUP BY + ts; + +Affected Rows: 0 + +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "sumup" BIGINT NULL, | +| | "event_time" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("event_time") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + +-- SQLNESS ARG restart=true +SHOW CREATE FLOW test_numbers_basic; + ++--------------------+---------------------------------------------------------------------------------------+ +| Flow | Create Flow | ++--------------------+---------------------------------------------------------------------------------------+ +| test_numbers_basic | CREATE FLOW IF NOT EXISTS test_numbers_basic | +| | SINK TO out_num_cnt_basic | +| | AS SELECT sum(number) AS sumup, ts AS event_time FROM numbers_input_basic GROUP BY ts | ++--------------------+---------------------------------------------------------------------------------------+ + +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "sumup" BIGINT NULL, | +| | "event_time" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("event_time") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + +DROP FLOW test_numbers_basic; + +Affected Rows: 0 + +DROP TABLE numbers_input_basic; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_basic; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_auto_sink_table.sql b/tests/cases/standalone/common/flow/flow_auto_sink_table.sql new file mode 100644 index 000000000000..0af723770ced --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_auto_sink_table.sql @@ -0,0 +1,58 @@ +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(number) +FROM + numbers_input_basic +GROUP BY + tumble(ts, '1 second', '2021-07-01 00:00:00'); + +SHOW CREATE TABLE out_num_cnt_basic; + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + +-- SQLNESS ARG restart=true +SHOW CREATE TABLE out_num_cnt_basic; + +SHOW CREATE FLOW test_numbers_basic; + +DROP FLOW test_numbers_basic; + +DROP TABLE numbers_input_basic; + +DROP TABLE out_num_cnt_basic; + +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(number) as sumup, ts as event_time +FROM + numbers_input_basic +GROUP BY + ts; + +SHOW CREATE TABLE out_num_cnt_basic; + +-- SQLNESS ARG restart=true +SHOW CREATE FLOW test_numbers_basic; + +SHOW CREATE TABLE out_num_cnt_basic; + +DROP FLOW test_numbers_basic; + +DROP TABLE numbers_input_basic; + +DROP TABLE out_num_cnt_basic; diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result index c70fe54fec19..e17efd74be40 100644 --- a/tests/cases/standalone/common/flow/flow_basic.result +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -17,6 +17,24 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "SUM(numbers_input_basic.number)" BIGINT NULL, | +| | "window_start" TIMESTAMP(3) NOT NULL, | +| | "window_end" TIMESTAMP(3) NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("window_start"), | +| | PRIMARY KEY ("window_end") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | @@ -28,6 +46,24 @@ ADMIN FLUSH_FLOW('test_numbers_basic'); | FLOW_FLUSHED | +----------------------------------------+ +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "SUM(numbers_input_basic.number)" BIGINT NULL, | +| | "window_start" TIMESTAMP(3) NOT NULL, | +| | "window_end" TIMESTAMP(3) NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("window_start"), | +| | PRIMARY KEY ("window_end") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + -- SQLNESS ARG restart=true INSERT INTO numbers_input_basic @@ -130,6 +166,22 @@ FROM Affected Rows: 0 +SHOW CREATE TABLE out_basic; + ++-----------+---------------------------------------------+ +| Table | Create Table | ++-----------+---------------------------------------------+ +| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( | +| | "wildcard" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-----------+---------------------------------------------+ + DROP FLOW test_wildcard_basic; Affected Rows: 0 @@ -142,6 +194,23 @@ FROM Affected Rows: 0 +SHOW CREATE TABLE out_basic; + ++-----------+---------------------------------------------+ +| Table | Create Table | ++-----------+---------------------------------------------+ +| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( | +| | "wildcard" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-----------+---------------------------------------------+ + +-- SQLNESS ARG restart=true INSERT INTO input_basic VALUES @@ -159,6 +228,22 @@ ADMIN FLUSH_FLOW('test_wildcard_basic'); | FLOW_FLUSHED | +-----------------------------------------+ +SHOW CREATE TABLE out_basic; + ++-----------+---------------------------------------------+ +| Table | Create Table | ++-----------+---------------------------------------------+ +| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( | +| | "wildcard" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-----------+---------------------------------------------+ + SELECT wildcard FROM out_basic; +----------+ @@ -197,6 +282,23 @@ FROM Affected Rows: 0 +SHOW CREATE TABLE out_distinct_basic; + ++--------------------+---------------------------------------------------+ +| Table | Create Table | ++--------------------+---------------------------------------------------+ +| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( | +| | "dis" INT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder"), | +| | PRIMARY KEY ("dis") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++--------------------+---------------------------------------------------+ + -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | @@ -320,7 +422,9 @@ CREATE TABLE numbers_input_basic ( Affected Rows: 0 create table out_num_cnt_basic ( - number INT, + a INTERVAL, + b INTERVAL, + c INTERVAL, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX ); @@ -348,6 +452,23 @@ SHOW CREATE FLOW filter_numbers_basic; | | AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic WHERE number > 10 | +----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+ +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+-----------------------------------------------------------+ +| Table | Create Table | ++-------------------+-----------------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "a" INTERVAL NULL, | +| | "b" INTERVAL NULL, | +| | "c" INTERVAL NULL, | +| | "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(), | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+-----------------------------------------------------------+ + drop flow filter_numbers_basic; Affected Rows: 0 @@ -390,6 +511,22 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE approx_rate; + ++-------------+--------------------------------------------+ +| Table | Create Table | ++-------------+--------------------------------------------+ +| approx_rate | CREATE TABLE IF NOT EXISTS "approx_rate" ( | +| | "rate" DOUBLE NULL, | +| | "time_window" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+--------------------------------------------+ + INSERT INTO bytes_log VALUES @@ -542,6 +679,23 @@ FROM Affected Rows: 0 +SHOW CREATE TABLE ngx_country; + ++-------------+---------------------------------------------+ +| Table | Create Table | ++-------------+---------------------------------------------+ +| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | +| | "ngx_access_log.country" STRING NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder"), | +| | PRIMARY KEY ("ngx_access_log.country") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+---------------------------------------------+ + INSERT INTO ngx_access_log VALUES @@ -675,6 +829,23 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE ngx_country; + ++-------------+--------------------------------------------+ +| Table | Create Table | ++-------------+--------------------------------------------+ +| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | +| | "ngx_access_log.country" STRING NULL, | +| | "time_window" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window"), | +| | PRIMARY KEY ("ngx_access_log.country") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+--------------------------------------------+ + INSERT INTO ngx_access_log VALUES @@ -693,21 +864,20 @@ ADMIN FLUSH_FLOW('calc_ngx_country'); SHOW CREATE TABLE ngx_country; -+-------------+---------------------------------------------------------+ -| Table | Create Table | -+-------------+---------------------------------------------------------+ -| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | -| | "ngx_access_log.country" STRING NULL, | -| | "time_window" TIMESTAMP(3) NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("__ts_placeholder"), | -| | PRIMARY KEY ("ngx_access_log.country", "time_window") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+-------------+---------------------------------------------------------+ ++-------------+--------------------------------------------+ +| Table | Create Table | ++-------------+--------------------------------------------+ +| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | +| | "ngx_access_log.country" STRING NULL, | +| | "time_window" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window"), | +| | PRIMARY KEY ("ngx_access_log.country") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+--------------------------------------------+ SELECT "ngx_access_log.country", @@ -824,6 +994,23 @@ HAVING Affected Rows: 0 +SHOW CREATE TABLE temp_alerts; + ++-------------+--------------------------------------------+ +| Table | Create Table | ++-------------+--------------------------------------------+ +| temp_alerts | CREATE TABLE IF NOT EXISTS "temp_alerts" ( | +| | "sensor_id" INT NULL, | +| | "loc" STRING NULL, | +| | "max_temp" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+--------------------------------------------+ + INSERT INTO temp_sensor_data VALUES @@ -963,6 +1150,25 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE ngx_distribution; + ++------------------+-------------------------------------------------+ +| Table | Create Table | ++------------------+-------------------------------------------------+ +| ngx_distribution | CREATE TABLE IF NOT EXISTS "ngx_distribution" ( | +| | "stat" INT NULL, | +| | "bucket_size" INT NULL, | +| | "total_logs" BIGINT NULL, | +| | "time_window" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window"), | +| | PRIMARY KEY ("stat", "bucket_size") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++------------------+-------------------------------------------------+ + INSERT INTO ngx_access_log VALUES @@ -1070,6 +1276,23 @@ FROM Affected Rows: 0 +SHOW CREATE TABLE requests_without_ip; + ++---------------------+----------------------------------------------------+ +| Table | Create Table | ++---------------------+----------------------------------------------------+ +| requests_without_ip | CREATE TABLE IF NOT EXISTS "requests_without_ip" ( | +| | "service_name" STRING NULL, | +| | "val" INT NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("service_name") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++---------------------+----------------------------------------------------+ + INSERT INTO requests VALUES @@ -1269,6 +1492,25 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE android_log_abnormal; + ++----------------------+-----------------------------------------------------+ +| Table | Create Table | ++----------------------+-----------------------------------------------------+ +| android_log_abnormal | CREATE TABLE IF NOT EXISTS "android_log_abnormal" ( | +| | "crash" BIGINT NULL, | +| | "fatal" BIGINT NULL, | +| | "backtrace" BIGINT NULL, | +| | "anr" BIGINT NULL, | +| | "time_window" TIMESTAMP(9) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++----------------------+-----------------------------------------------------+ + INSERT INTO android_log values ("am_crash", "2021-07-01 00:01:01.000"), ("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); @@ -1361,6 +1603,25 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE android_log_abnormal; + ++----------------------+-----------------------------------------------------+ +| Table | Create Table | ++----------------------+-----------------------------------------------------+ +| android_log_abnormal | CREATE TABLE IF NOT EXISTS "android_log_abnormal" ( | +| | "crash" BIGINT NULL, | +| | "fatal" BIGINT NULL, | +| | "backtrace" BIGINT NULL, | +| | "anr" BIGINT NULL, | +| | "time_window" TIMESTAMP(9) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++----------------------+-----------------------------------------------------+ + INSERT INTO android_log values ("am_crash", "2021-07-01 00:01:01.000"), ("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); @@ -1419,3 +1680,210 @@ DROP TABLE android_log; Affected Rows: 0 +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(case when number > 10 then 1 else 0 end)/count(number) as avg_after_filter_num +FROM + numbers_input_basic; + +Affected Rows: 0 + +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "avg_after_filter_num" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + +-- TODO(discord9): confirm if it's necessary to flush flow here? +-- because flush_flow result is at most 1 +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + ++----------------------------------------+ +| ADMIN FLUSH_FLOW('test_numbers_basic') | ++----------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------+ + +-- SQLNESS ARG restart=true +INSERT INTO + numbers_input_basic +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + ++----------------------------------------+ +| ADMIN FLUSH_FLOW('test_numbers_basic') | ++----------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------+ + +SELECT avg_after_filter_num FROM out_num_cnt_basic; + ++----------------------+ +| avg_after_filter_num | ++----------------------+ +| 1 | ++----------------------+ + +INSERT INTO + numbers_input_basic +VALUES + (10, "2021-07-01 00:00:00.200"), + (23, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + ++----------------------------------------+ +| ADMIN FLUSH_FLOW('test_numbers_basic') | ++----------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------+ + +DROP FLOW test_numbers_basic; + +Affected Rows: 0 + +DROP TABLE numbers_input_basic; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_basic; + +Affected Rows: 0 + +CREATE TABLE `live_connection_log` +( + `device_model` STRING NULL, + `connect_protocol` INT NULL, + `connect_mode` INT NULL, + `connect_retry_times` DOUBLE NULL, + `connect_result` INT NULL, + `first_frame_time` DOUBLE NULL, + `record_time` TIMESTAMP TIME INDEX, + `iot_online` INT NULL, + PRIMARY KEY (`device_model`,`connect_protocol`), +); + +Affected Rows: 0 + +CREATE TABLE `live_connection_statistics_detail` +( + `device_model` STRING NULL, + `connect_protocol` INT NULL, + `connect_mode` INT NULL, + `avg_connect_retry_times` DOUBLE NULL, + `total_connect_result_ok` INT64 NULL, + `total_connect_result_fail` INT64 NULL, + `total_connect` INT64 NULL, + `conection_rate` DOUBLE NULL, + `avg_first_frame_time` DOUBLE NULL, + `max_first_frame_time` DOUBLE NULL, + `ok_conection_rate` DOUBLE NULL, + `record_time_window` TIMESTAMP TIME INDEX, + `update_at` TIMESTAMP, + PRIMARY KEY (`device_model`,`connect_protocol`), +); + +Affected Rows: 0 + +CREATE FLOW live_connection_aggregation_detail +SINK TO live_connection_statistics_detail +AS +SELECT + device_model, + connect_protocol, + connect_mode, + avg(connect_retry_times) as avg_connect_retry_times, + sum(case when connect_result = 1 then 1 else 0 end) as total_connect_result_ok, + sum(case when connect_result = 0 then 1 else 0 end) as total_connect_result_fail, + count(connect_result) as total_connect, + sum(case when connect_result = 1 then 1 else 0 end)::double / count(connect_result) as conection_rate, + avg(first_frame_time) as avg_first_frame_time, + max(first_frame_time) as max_first_frame_time, + sum(case when connect_result = 1 then 1 else 0 end)::double / count(connect_result) as ok_conection_rate, + date_bin(INTERVAL '1 minutes', record_time) as record_time_window, +FROM live_connection_log +WHERE iot_online = 1 +GROUP BY + device_model, + connect_protocol, + connect_mode, + record_time_window; + +Affected Rows: 0 + +INSERT INTO + live_connection_log +VALUES + ("STM51", 1, 1, 0.5, 1, 0.1, 0, 1); + +Affected Rows: 1 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('live_connection_aggregation_detail'); + ++--------------------------------------------------------+ +| ADMIN FLUSH_FLOW('live_connection_aggregation_detail') | ++--------------------------------------------------------+ +| FLOW_FLUSHED | ++--------------------------------------------------------+ + +SELECT device_model, + connect_protocol, + connect_mode, + avg_connect_retry_times, + total_connect_result_ok, + total_connect_result_fail, + total_connect, + conection_rate, + avg_first_frame_time, + max_first_frame_time, + ok_conection_rate, + record_time_window FROM live_connection_statistics_detail; + ++--------------+------------------+--------------+-------------------------+-------------------------+---------------------------+---------------+----------------+----------------------+----------------------+-------------------+---------------------+ +| device_model | connect_protocol | connect_mode | avg_connect_retry_times | total_connect_result_ok | total_connect_result_fail | total_connect | conection_rate | avg_first_frame_time | max_first_frame_time | ok_conection_rate | record_time_window | ++--------------+------------------+--------------+-------------------------+-------------------------+---------------------------+---------------+----------------+----------------------+----------------------+-------------------+---------------------+ +| STM51 | 1 | 1 | 0.5 | 1 | 0 | 1 | 1.0 | 0.1 | 0.1 | 1.0 | 1970-01-01T00:00:00 | ++--------------+------------------+--------------+-------------------------+-------------------------+---------------------------+---------------+----------------+----------------------+----------------------+-------------------+---------------------+ + +DROP FLOW live_connection_aggregation_detail; + +Affected Rows: 0 + +DROP TABLE live_connection_log; + +Affected Rows: 0 + +DROP TABLE live_connection_statistics_detail; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_basic.sql b/tests/cases/standalone/common/flow/flow_basic.sql index 74abbc85df22..516afa4074a3 100644 --- a/tests/cases/standalone/common/flow/flow_basic.sql +++ b/tests/cases/standalone/common/flow/flow_basic.sql @@ -13,11 +13,15 @@ FROM GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); +SHOW CREATE TABLE out_num_cnt_basic; + -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_numbers_basic'); +SHOW CREATE TABLE out_num_cnt_basic; + -- SQLNESS ARG restart=true INSERT INTO numbers_input_basic @@ -75,6 +79,8 @@ SELECT FROM input_basic; +SHOW CREATE TABLE out_basic; + DROP FLOW test_wildcard_basic; CREATE FLOW test_wildcard_basic sink TO out_basic AS @@ -83,6 +89,9 @@ SELECT FROM input_basic; +SHOW CREATE TABLE out_basic; + +-- SQLNESS ARG restart=true INSERT INTO input_basic VALUES @@ -92,6 +101,8 @@ VALUES -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_wildcard_basic'); +SHOW CREATE TABLE out_basic; + SELECT wildcard FROM out_basic; DROP FLOW test_wildcard_basic; @@ -112,6 +123,8 @@ SELECT FROM distinct_basic; +SHOW CREATE TABLE out_distinct_basic; + -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | @@ -168,7 +181,9 @@ CREATE TABLE numbers_input_basic ( ); create table out_num_cnt_basic ( - number INT, + a INTERVAL, + b INTERVAL, + c INTERVAL, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX ); @@ -184,6 +199,8 @@ where SHOW CREATE FLOW filter_numbers_basic; +SHOW CREATE TABLE out_num_cnt_basic; + drop flow filter_numbers_basic; drop table out_num_cnt_basic; @@ -214,6 +231,8 @@ from GROUP BY time_window; +SHOW CREATE TABLE approx_rate; + INSERT INTO bytes_log VALUES @@ -294,6 +313,8 @@ SELECT FROM ngx_access_log; +SHOW CREATE TABLE ngx_country; + INSERT INTO ngx_access_log VALUES @@ -359,6 +380,8 @@ GROUP BY country, time_window; +SHOW CREATE TABLE ngx_country; + INSERT INTO ngx_access_log VALUES @@ -437,6 +460,8 @@ GROUP BY HAVING max_temp > 100; +SHOW CREATE TABLE temp_alerts; + INSERT INTO temp_sensor_data VALUES @@ -516,6 +541,8 @@ GROUP BY time_window, bucket_size; +SHOW CREATE TABLE ngx_distribution; + INSERT INTO ngx_access_log VALUES @@ -580,6 +607,8 @@ SELECT FROM requests; +SHOW CREATE TABLE requests_without_ip; + INSERT INTO requests VALUES @@ -680,6 +709,8 @@ FROM android_log GROUP BY time_window; +SHOW CREATE TABLE android_log_abnormal; + INSERT INTO android_log values ("am_crash", "2021-07-01 00:01:01.000"), ("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); @@ -732,6 +763,8 @@ FROM android_log GROUP BY time_window; +SHOW CREATE TABLE android_log_abnormal; + INSERT INTO android_log values ("am_crash", "2021-07-01 00:01:01.000"), ("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); @@ -755,3 +788,128 @@ DROP FLOW calc_android_log_abnormal; DROP TABLE android_log_abnormal; DROP TABLE android_log; + +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(case when number > 10 then 1 else 0 end)/count(number) as avg_after_filter_num +FROM + numbers_input_basic; + +SHOW CREATE TABLE out_num_cnt_basic; + +-- TODO(discord9): confirm if it's necessary to flush flow here? +-- because flush_flow result is at most 1 +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + +-- SQLNESS ARG restart=true +INSERT INTO + numbers_input_basic +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + +SELECT avg_after_filter_num FROM out_num_cnt_basic; + +INSERT INTO + numbers_input_basic +VALUES + (10, "2021-07-01 00:00:00.200"), + (23, "2021-07-01 00:00:00.600"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + +DROP FLOW test_numbers_basic; +DROP TABLE numbers_input_basic; +DROP TABLE out_num_cnt_basic; + +CREATE TABLE `live_connection_log` +( + `device_model` STRING NULL, + `connect_protocol` INT NULL, + `connect_mode` INT NULL, + `connect_retry_times` DOUBLE NULL, + `connect_result` INT NULL, + `first_frame_time` DOUBLE NULL, + `record_time` TIMESTAMP TIME INDEX, + `iot_online` INT NULL, + PRIMARY KEY (`device_model`,`connect_protocol`), +); + +CREATE TABLE `live_connection_statistics_detail` +( + `device_model` STRING NULL, + `connect_protocol` INT NULL, + `connect_mode` INT NULL, + `avg_connect_retry_times` DOUBLE NULL, + `total_connect_result_ok` INT64 NULL, + `total_connect_result_fail` INT64 NULL, + `total_connect` INT64 NULL, + `conection_rate` DOUBLE NULL, + `avg_first_frame_time` DOUBLE NULL, + `max_first_frame_time` DOUBLE NULL, + `ok_conection_rate` DOUBLE NULL, + `record_time_window` TIMESTAMP TIME INDEX, + `update_at` TIMESTAMP, + PRIMARY KEY (`device_model`,`connect_protocol`), +); + +CREATE FLOW live_connection_aggregation_detail +SINK TO live_connection_statistics_detail +AS +SELECT + device_model, + connect_protocol, + connect_mode, + avg(connect_retry_times) as avg_connect_retry_times, + sum(case when connect_result = 1 then 1 else 0 end) as total_connect_result_ok, + sum(case when connect_result = 0 then 1 else 0 end) as total_connect_result_fail, + count(connect_result) as total_connect, + sum(case when connect_result = 1 then 1 else 0 end)::double / count(connect_result) as conection_rate, + avg(first_frame_time) as avg_first_frame_time, + max(first_frame_time) as max_first_frame_time, + sum(case when connect_result = 1 then 1 else 0 end)::double / count(connect_result) as ok_conection_rate, + date_bin(INTERVAL '1 minutes', record_time) as record_time_window, +FROM live_connection_log +WHERE iot_online = 1 +GROUP BY + device_model, + connect_protocol, + connect_mode, + record_time_window; + +INSERT INTO + live_connection_log +VALUES + ("STM51", 1, 1, 0.5, 1, 0.1, 0, 1); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('live_connection_aggregation_detail'); + +SELECT device_model, + connect_protocol, + connect_mode, + avg_connect_retry_times, + total_connect_result_ok, + total_connect_result_fail, + total_connect, + conection_rate, + avg_first_frame_time, + max_first_frame_time, + ok_conection_rate, + record_time_window FROM live_connection_statistics_detail; + +DROP FLOW live_connection_aggregation_detail; +DROP TABLE live_connection_log; +DROP TABLE live_connection_statistics_detail; diff --git a/tests/cases/standalone/common/flow/show_create_flow.result b/tests/cases/standalone/common/flow/show_create_flow.result index 38fa609c960d..14e80129446d 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.result +++ b/tests/cases/standalone/common/flow/show_create_flow.result @@ -322,3 +322,87 @@ drop table numbers_input_show; Affected Rows: 0 +CREATE TABLE numbers_input_show ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +create table out_num_cnt_show ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX, + PRIMARY KEY(number), +); + +Affected Rows: 0 + +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number as n1 FROM numbers_input_show where number > 10; + +Affected Rows: 0 + +INSERT INTO numbers_input_show VALUES (10, 0),(15, 1),(16, 2); + +Affected Rows: 3 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + ++-----------------------------------------+ +| ADMIN FLUSH_FLOW('filter_numbers_show') | ++-----------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------+ + +SELECT number FROM out_num_cnt_show; + ++--------+ +| number | ++--------+ +| 15 | +| 16 | ++--------+ + +-- should mismatch +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2 FROM numbers_input_show where number > 15; + +Error: 3001(EngineExecuteQuery), Invalid query: Column 1(name is 'ts', flow inferred name is 'n2')'s data type mismatch, expect Timestamp(Millisecond(TimestampMillisecondType)) got Int32(Int32Type) + +-- should mismatch +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2, number AS n3 FROM numbers_input_show where number > 15; + +Error: 3001(EngineExecuteQuery), Invalid query: Column 1(name is 'ts', flow inferred name is 'n2')'s data type mismatch, expect Timestamp(Millisecond(TimestampMillisecondType)) got Int32(Int32Type) + +INSERT INTO numbers_input_show VALUES (10, 6),(15, 7),(18, 3); + +Affected Rows: 3 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + +Error: 1003(Internal), Internal error: 1003 + +-- sink table stays the same since the flow error out due to column mismatch +SELECT number FROM out_num_cnt_show; + ++--------+ +| number | ++--------+ +| 15 | +| 16 | ++--------+ + +DROP FLOW filter_numbers_show; + +Affected Rows: 0 + +drop table out_num_cnt_show; + +Affected Rows: 0 + +drop table numbers_input_show; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/show_create_flow.sql b/tests/cases/standalone/common/flow/show_create_flow.sql index 7348a83b5103..f445c4f254c4 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.sql +++ b/tests/cases/standalone/common/flow/show_create_flow.sql @@ -125,3 +125,45 @@ DROP FLOW filter_numbers_show; drop table out_num_cnt_show; drop table numbers_input_show; + +CREATE TABLE numbers_input_show ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); +create table out_num_cnt_show ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX, + PRIMARY KEY(number), +); + +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number as n1 FROM numbers_input_show where number > 10; + +INSERT INTO numbers_input_show VALUES (10, 0),(15, 1),(16, 2); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + +SELECT number FROM out_num_cnt_show; + + +-- should mismatch +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2 FROM numbers_input_show where number > 15; + +-- should mismatch +CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2, number AS n3 FROM numbers_input_show where number > 15; + +INSERT INTO numbers_input_show VALUES (10, 6),(15, 7),(18, 3); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_numbers_show'); + +-- sink table stays the same since the flow error out due to column mismatch +SELECT number FROM out_num_cnt_show; + +DROP FLOW filter_numbers_show; + +drop table out_num_cnt_show; + +drop table numbers_input_show; From 039989f77bea142c9f9767dfa0b85ed388c1695c Mon Sep 17 00:00:00 2001 From: Lin Yihai Date: Wed, 25 Dec 2024 22:17:22 +0800 Subject: [PATCH 3/4] feat: Add `vec_mul` function. (#5205) --- src/common/function/src/scalars/vector.rs | 4 + .../function/src/scalars/vector/vector_mul.rs | 205 ++++++++++++++++++ .../common/function/vector/vector.result | 24 ++ .../common/function/vector/vector.sql | 6 + 4 files changed, 239 insertions(+) create mode 100644 src/common/function/src/scalars/vector/vector_mul.rs diff --git a/src/common/function/src/scalars/vector.rs b/src/common/function/src/scalars/vector.rs index d462b917af59..b3a6f105ad01 100644 --- a/src/common/function/src/scalars/vector.rs +++ b/src/common/function/src/scalars/vector.rs @@ -17,6 +17,7 @@ mod distance; pub(crate) mod impl_conv; mod scalar_add; mod scalar_mul; +mod vector_mul; use std::sync::Arc; @@ -38,5 +39,8 @@ impl VectorFunction { // scalar calculation registry.register(Arc::new(scalar_add::ScalarAddFunction)); registry.register(Arc::new(scalar_mul::ScalarMulFunction)); + + // vector calculation + registry.register(Arc::new(vector_mul::VectorMulFunction)); } } diff --git a/src/common/function/src/scalars/vector/vector_mul.rs b/src/common/function/src/scalars/vector/vector_mul.rs new file mode 100644 index 000000000000..02e9833623e9 --- /dev/null +++ b/src/common/function/src/scalars/vector/vector_mul.rs @@ -0,0 +1,205 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; +use std::fmt::Display; + +use common_query::error::{InvalidFuncArgsSnafu, Result}; +use common_query::prelude::Signature; +use datatypes::prelude::ConcreteDataType; +use datatypes::scalars::ScalarVectorBuilder; +use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef}; +use nalgebra::DVectorView; +use snafu::ensure; + +use crate::function::{Function, FunctionContext}; +use crate::helper; +use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit}; + +const NAME: &str = "vec_mul"; + +/// Multiplies corresponding elements of two vectors. +/// +/// # Example +/// +/// ```sql +/// SELECT vec_to_string(vec_mul("[1, 2, 3]", "[1, 2, 3]")) as result; +/// +/// +---------+ +/// | result | +/// +---------+ +/// | [1,4,9] | +/// +---------+ +/// +/// ``` +#[derive(Debug, Clone, Default)] +pub struct VectorMulFunction; + +impl Function for VectorMulFunction { + fn name(&self) -> &str { + NAME + } + + fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { + Ok(ConcreteDataType::binary_datatype()) + } + + fn signature(&self) -> Signature { + helper::one_of_sigs2( + vec![ + ConcreteDataType::string_datatype(), + ConcreteDataType::binary_datatype(), + ], + vec![ + ConcreteDataType::string_datatype(), + ConcreteDataType::binary_datatype(), + ], + ) + } + + fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { + ensure!( + columns.len() == 2, + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect exactly two, have: {}", + columns.len() + ), + } + ); + + let arg0 = &columns[0]; + let arg1 = &columns[1]; + + let len = arg0.len(); + let mut result = BinaryVectorBuilder::with_capacity(len); + if len == 0 { + return Ok(result.to_vector()); + } + + let arg0_const = as_veclit_if_const(arg0)?; + let arg1_const = as_veclit_if_const(arg1)?; + + for i in 0..len { + let arg0 = match arg0_const.as_ref() { + Some(arg0) => Some(Cow::Borrowed(arg0.as_ref())), + None => as_veclit(arg0.get_ref(i))?, + }; + + let arg1 = match arg1_const.as_ref() { + Some(arg1) => Some(Cow::Borrowed(arg1.as_ref())), + None => as_veclit(arg1.get_ref(i))?, + }; + + if let (Some(arg0), Some(arg1)) = (arg0, arg1) { + ensure!( + arg0.len() == arg1.len(), + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the vectors must match for multiplying, have: {} vs {}", + arg0.len(), + arg1.len() + ), + } + ); + let vec0 = DVectorView::from_slice(&arg0, arg0.len()); + let vec1 = DVectorView::from_slice(&arg1, arg1.len()); + let vec_res = vec1.component_mul(&vec0); + + let veclit = vec_res.as_slice(); + let binlit = veclit_to_binlit(veclit); + result.push(Some(&binlit)); + } else { + result.push_null(); + } + } + + Ok(result.to_vector()) + } +} + +impl Display for VectorMulFunction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", NAME.to_ascii_uppercase()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_query::error; + use datatypes::vectors::StringVector; + + use super::*; + + #[test] + fn test_vector_mul() { + let func = VectorMulFunction; + + let vec0 = vec![1.0, 2.0, 3.0]; + let vec1 = vec![1.0, 1.0]; + let (len0, len1) = (vec0.len(), vec1.len()); + let input0 = Arc::new(StringVector::from(vec![Some(format!("{vec0:?}"))])); + let input1 = Arc::new(StringVector::from(vec![Some(format!("{vec1:?}"))])); + + let err = func + .eval(FunctionContext::default(), &[input0, input1]) + .unwrap_err(); + + match err { + error::Error::InvalidFuncArgs { err_msg, .. } => { + assert_eq!( + err_msg, + format!( + "The length of the vectors must match for multiplying, have: {} vs {}", + len0, len1 + ) + ) + } + _ => unreachable!(), + } + + let input0 = Arc::new(StringVector::from(vec![ + Some("[1.0,2.0,3.0]".to_string()), + Some("[8.0,10.0,12.0]".to_string()), + Some("[7.0,8.0,9.0]".to_string()), + None, + ])); + + let input1 = Arc::new(StringVector::from(vec![ + Some("[1.0,1.0,1.0]".to_string()), + Some("[2.0,2.0,2.0]".to_string()), + None, + Some("[3.0,3.0,3.0]".to_string()), + ])); + + let result = func + .eval(FunctionContext::default(), &[input0, input1]) + .unwrap(); + + let result = result.as_ref(); + assert_eq!(result.len(), 4); + assert_eq!( + result.get_ref(0).as_binary().unwrap(), + Some(veclit_to_binlit(&[1.0, 2.0, 3.0]).as_slice()) + ); + assert_eq!( + result.get_ref(1).as_binary().unwrap(), + Some(veclit_to_binlit(&[16.0, 20.0, 24.0]).as_slice()) + ); + assert!(result.get_ref(2).is_null()); + assert!(result.get_ref(3).is_null()); + } +} diff --git a/tests/cases/standalone/common/function/vector/vector.result b/tests/cases/standalone/common/function/vector/vector.result index 6f0205982685..0bcca4740350 100644 --- a/tests/cases/standalone/common/function/vector/vector.result +++ b/tests/cases/standalone/common/function/vector/vector.result @@ -22,3 +22,27 @@ SELECT vec_to_string(parse_vec('[]')); | [] | +--------------------------------------+ +SELECT vec_to_string(vec_mul('[1.0, 2.0]', '[3.0, 4.0]')); + ++---------------------------------------------------------------+ +| vec_to_string(vec_mul(Utf8("[1.0, 2.0]"),Utf8("[3.0, 4.0]"))) | ++---------------------------------------------------------------+ +| [3,8] | ++---------------------------------------------------------------+ + +SELECT vec_to_string(vec_mul(parse_vec('[1.0, 2.0]'), '[3.0, 4.0]')); + ++--------------------------------------------------------------------------+ +| vec_to_string(vec_mul(parse_vec(Utf8("[1.0, 2.0]")),Utf8("[3.0, 4.0]"))) | ++--------------------------------------------------------------------------+ +| [3,8] | ++--------------------------------------------------------------------------+ + +SELECT vec_to_string(vec_mul('[1.0, 2.0]', parse_vec('[3.0, 4.0]'))); + ++--------------------------------------------------------------------------+ +| vec_to_string(vec_mul(Utf8("[1.0, 2.0]"),parse_vec(Utf8("[3.0, 4.0]")))) | ++--------------------------------------------------------------------------+ +| [3,8] | ++--------------------------------------------------------------------------+ + diff --git a/tests/cases/standalone/common/function/vector/vector.sql b/tests/cases/standalone/common/function/vector/vector.sql index 97a986916ab1..3f46fa8f2210 100644 --- a/tests/cases/standalone/common/function/vector/vector.sql +++ b/tests/cases/standalone/common/function/vector/vector.sql @@ -3,3 +3,9 @@ SELECT vec_to_string(parse_vec('[1.0, 2.0]')); SELECT vec_to_string(parse_vec('[1.0, 2.0, 3.0]')); SELECT vec_to_string(parse_vec('[]')); + +SELECT vec_to_string(vec_mul('[1.0, 2.0]', '[3.0, 4.0]')); + +SELECT vec_to_string(vec_mul(parse_vec('[1.0, 2.0]'), '[3.0, 4.0]')); + +SELECT vec_to_string(vec_mul('[1.0, 2.0]', parse_vec('[3.0, 4.0]'))); \ No newline at end of file From a9f21915efacacd7ac10d76bf2caa342776f2490 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Wed, 25 Dec 2024 22:30:07 +0800 Subject: [PATCH 4/4] feat(bloom-filter): integrate indexer with mito2 (#5236) * feat(bloom-filter): integrate indexer with mito2 Signed-off-by: Zhenchi * rename skippingindextype Signed-off-by: Zhenchi * address comments Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- src/datatypes/src/schema.rs | 2 +- src/datatypes/src/schema/column_schema.rs | 12 +- src/index/src/bloom_filter/creator.rs | 6 +- .../bloom_filter/creator/finalize_segment.rs | 8 +- src/index/src/bloom_filter/reader.rs | 2 +- src/mito2/src/compaction/compactor.rs | 14 +- src/mito2/src/error.rs | 25 +- src/mito2/src/flush.rs | 14 +- src/mito2/src/sst/file.rs | 7 + src/mito2/src/sst/index.rs | 151 ++++- src/mito2/src/sst/index/bloom_filter.rs | 17 + .../src/sst/index/bloom_filter/creator.rs | 530 ++++++++++++++++++ .../sst/index/{inverted_index => }/codec.rs | 0 .../src/sst/index/fulltext_index/creator.rs | 16 +- src/mito2/src/sst/index/indexer/abort.rs | 26 +- src/mito2/src/sst/index/indexer/finish.rs | 73 ++- src/mito2/src/sst/index/indexer/update.rs | 32 +- src/mito2/src/sst/index/intermediate.rs | 157 ++++++ src/mito2/src/sst/index/inverted_index.rs | 1 - .../index/inverted_index/applier/builder.rs | 2 +- .../src/sst/index/inverted_index/creator.rs | 9 +- .../inverted_index/creator/temp_provider.rs | 182 ------ 22 files changed, 1032 insertions(+), 254 deletions(-) create mode 100644 src/mito2/src/sst/index/bloom_filter.rs create mode 100644 src/mito2/src/sst/index/bloom_filter/creator.rs rename src/mito2/src/sst/index/{inverted_index => }/codec.rs (100%) delete mode 100644 src/mito2/src/sst/index/inverted_index/creator/temp_provider.rs diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index c537a4608b42..19f3c6e55fb1 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -29,7 +29,7 @@ use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, R use crate::prelude::ConcreteDataType; pub use crate::schema::column_schema::{ ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, SkippingIndexOptions, - COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER, + SkippingIndexType, COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, TIME_INDEX_KEY, diff --git a/src/datatypes/src/schema/column_schema.rs b/src/datatypes/src/schema/column_schema.rs index 7a96ab5e2bf2..74e066adc7b4 100644 --- a/src/datatypes/src/schema/column_schema.rs +++ b/src/datatypes/src/schema/column_schema.rs @@ -543,7 +543,7 @@ pub struct SkippingIndexOptions { pub granularity: u32, /// The type of the skip index. #[serde(default)] - pub index_type: SkipIndexType, + pub index_type: SkippingIndexType, } impl fmt::Display for SkippingIndexOptions { @@ -556,15 +556,15 @@ impl fmt::Display for SkippingIndexOptions { /// Skip index types. #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)] -pub enum SkipIndexType { +pub enum SkippingIndexType { #[default] BloomFilter, } -impl fmt::Display for SkipIndexType { +impl fmt::Display for SkippingIndexType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - SkipIndexType::BloomFilter => write!(f, "BLOOM"), + SkippingIndexType::BloomFilter => write!(f, "BLOOM"), } } } @@ -587,7 +587,7 @@ impl TryFrom> for SkippingIndexOptions { // Parse index type with default value BloomFilter let index_type = match options.get(COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE) { Some(typ) => match typ.to_ascii_uppercase().as_str() { - "BLOOM" => SkipIndexType::BloomFilter, + "BLOOM" => SkippingIndexType::BloomFilter, _ => { return error::InvalidSkippingIndexOptionSnafu { msg: format!("Invalid index type: {typ}, expected: 'BLOOM'"), @@ -595,7 +595,7 @@ impl TryFrom> for SkippingIndexOptions { .fail(); } }, - None => SkipIndexType::default(), + None => SkippingIndexType::default(), }; Ok(SkippingIndexOptions { diff --git a/src/index/src/bloom_filter/creator.rs b/src/index/src/bloom_filter/creator.rs index f8c54239645b..da95334782a7 100644 --- a/src/index/src/bloom_filter/creator.rs +++ b/src/index/src/bloom_filter/creator.rs @@ -73,7 +73,7 @@ impl BloomFilterCreator { /// `rows_per_segment` <= 0 pub fn new( rows_per_segment: usize, - intermediate_provider: Box, + intermediate_provider: Arc, global_memory_usage: Arc, global_memory_usage_threshold: Option, ) -> Self { @@ -252,7 +252,7 @@ mod tests { let mut writer = Cursor::new(Vec::new()); let mut creator = BloomFilterCreator::new( 2, - Box::new(MockExternalTempFileProvider::new()), + Arc::new(MockExternalTempFileProvider::new()), Arc::new(AtomicUsize::new(0)), None, ); @@ -322,7 +322,7 @@ mod tests { let mut writer = Cursor::new(Vec::new()); let mut creator = BloomFilterCreator::new( 2, - Box::new(MockExternalTempFileProvider::new()), + Arc::new(MockExternalTempFileProvider::new()), Arc::new(AtomicUsize::new(0)), None, ); diff --git a/src/index/src/bloom_filter/creator/finalize_segment.rs b/src/index/src/bloom_filter/creator/finalize_segment.rs index 091b1ee6aac0..e97652f5fc6a 100644 --- a/src/index/src/bloom_filter/creator/finalize_segment.rs +++ b/src/index/src/bloom_filter/creator/finalize_segment.rs @@ -43,7 +43,7 @@ pub struct FinalizedBloomFilterStorage { intermediate_prefix: String, /// The provider for intermediate Bloom filter files. - intermediate_provider: Box, + intermediate_provider: Arc, /// The memory usage of the in-memory Bloom filters. memory_usage: usize, @@ -59,7 +59,7 @@ pub struct FinalizedBloomFilterStorage { impl FinalizedBloomFilterStorage { /// Creates a new `FinalizedBloomFilterStorage`. pub fn new( - intermediate_provider: Box, + intermediate_provider: Arc, global_memory_usage: Arc, global_memory_usage_threshold: Option, ) -> Self { @@ -132,7 +132,7 @@ impl FinalizedBloomFilterStorage { /// Drains the storage and returns a stream of finalized Bloom filter segments. pub async fn drain( &mut self, - ) -> Result> + '_>>> { + ) -> Result> + Send + '_>>> { // FAST PATH: memory only if self.intermediate_file_id_counter == 0 { return Ok(Box::pin(stream::iter(self.in_memory.drain(..).map(Ok)))); @@ -257,7 +257,7 @@ mod tests { let global_memory_usage = Arc::new(AtomicUsize::new(0)); let global_memory_usage_threshold = Some(1024 * 1024); // 1MB - let provider = Box::new(mock_provider); + let provider = Arc::new(mock_provider); let mut storage = FinalizedBloomFilterStorage::new( provider, global_memory_usage.clone(), diff --git a/src/index/src/bloom_filter/reader.rs b/src/index/src/bloom_filter/reader.rs index 788afe033124..6dc592100fcf 100644 --- a/src/index/src/bloom_filter/reader.rs +++ b/src/index/src/bloom_filter/reader.rs @@ -190,7 +190,7 @@ mod tests { let mut writer = Cursor::new(vec![]); let mut creator = BloomFilterCreator::new( 2, - Box::new(MockExternalTempFileProvider::new()), + Arc::new(MockExternalTempFileProvider::new()), Arc::new(AtomicUsize::new(0)), None, ); diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index e2499140fd61..e7d5e779b675 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -21,7 +21,6 @@ use common_telemetry::{info, warn}; use common_time::TimeToLive; use object_store::manager::ObjectStoreManagerRef; use serde::{Deserialize, Serialize}; -use smallvec::SmallVec; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; @@ -41,7 +40,7 @@ use crate::region::options::RegionOptions; use crate::region::version::VersionRef; use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState}; use crate::schedule::scheduler::LocalScheduler; -use crate::sst::file::{FileMeta, IndexType}; +use crate::sst::file::FileMeta; use crate::sst::file_purger::LocalFilePurger; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; @@ -336,16 +335,7 @@ impl Compactor for DefaultCompactor { time_range: sst_info.time_range, level: output.output_level, file_size: sst_info.file_size, - available_indexes: { - let mut indexes = SmallVec::new(); - if sst_info.index_metadata.inverted_index.is_available() { - indexes.push(IndexType::InvertedIndex); - } - if sst_info.index_metadata.fulltext_index.is_available() { - indexes.push(IndexType::FulltextIndex); - } - indexes - }, + available_indexes: sst_info.index_metadata.build_available_indexes(), index_file_size: sst_info.index_metadata.file_size, num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 1baffd4a7fa1..3a6f368bacd8 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -816,8 +816,8 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to retrieve fulltext options from column metadata"))] - FulltextOptions { + #[snafu(display("Failed to retrieve index options from column metadata"))] + IndexOptions { #[snafu(implicit)] location: Location, source: datatypes::error::Error, @@ -904,6 +904,20 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to push value to bloom filter"))] + PushBloomFilterValue { + source: index::bloom_filter::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to finish bloom filter"))] + BloomFilterFinish { + source: index::bloom_filter::error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -1029,7 +1043,7 @@ impl ErrorExt for Error { UnsupportedOperation { .. } => StatusCode::Unsupported, RemoteCompaction { .. } => StatusCode::Unexpected, - FulltextOptions { source, .. } => source.status_code(), + IndexOptions { source, .. } => source.status_code(), CreateFulltextCreator { source, .. } => source.status_code(), CastVector { source, .. } => source.status_code(), FulltextPushText { source, .. } @@ -1039,7 +1053,12 @@ impl ErrorExt for Error { RegionBusy { .. } => StatusCode::RegionBusy, GetSchemaMetadata { source, .. } => source.status_code(), Timeout { .. } => StatusCode::Cancelled, + DecodeArrowRowGroup { .. } => StatusCode::Internal, + + PushBloomFilterValue { source, .. } | BloomFilterFinish { source, .. } => { + source.status_code() + } } } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index b522f225f9f0..64a739068ad9 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -19,7 +19,6 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use common_telemetry::{debug, error, info, trace}; -use smallvec::SmallVec; use snafu::ResultExt; use store_api::storage::RegionId; use strum::IntoStaticStr; @@ -45,7 +44,7 @@ use crate::request::{ SenderWriteRequest, WorkerRequest, }; use crate::schedule::scheduler::{Job, SchedulerRef}; -use crate::sst::file::{FileId, FileMeta, IndexType}; +use crate::sst::file::{FileId, FileMeta}; use crate::sst::parquet::WriteOptions; use crate::worker::WorkerListener; @@ -378,16 +377,7 @@ impl RegionFlushTask { time_range: sst_info.time_range, level: 0, file_size: sst_info.file_size, - available_indexes: { - let mut indexes = SmallVec::new(); - if sst_info.index_metadata.inverted_index.is_available() { - indexes.push(IndexType::InvertedIndex); - } - if sst_info.index_metadata.fulltext_index.is_available() { - indexes.push(IndexType::FulltextIndex); - } - indexes - }, + available_indexes: sst_info.index_metadata.build_available_indexes(), index_file_size: sst_info.index_metadata.file_size, num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 5a9932ab433b..844d3e5d08f8 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -143,6 +143,8 @@ pub enum IndexType { InvertedIndex, /// Full-text index. FulltextIndex, + /// Bloom filter. + BloomFilter, } impl FileMeta { @@ -156,6 +158,11 @@ impl FileMeta { self.available_indexes.contains(&IndexType::FulltextIndex) } + /// Returns true if the file has a bloom filter + pub fn bloom_filter_available(&self) -> bool { + self.available_indexes.contains(&IndexType::BloomFilter) + } + /// Returns the size of the inverted index file pub fn inverted_index_size(&self) -> Option { if self.available_indexes.len() == 1 && self.inverted_index_available() { diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 1972f3d7abb6..b6eac91e56d9 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod bloom_filter; +mod codec; pub(crate) mod fulltext_index; mod indexer; pub(crate) mod intermediate; @@ -22,8 +24,10 @@ pub(crate) mod store; use std::num::NonZeroUsize; +use bloom_filter::creator::BloomFilterIndexer; use common_telemetry::{debug, warn}; use puffin_manager::SstPuffinManager; +use smallvec::SmallVec; use statistics::{ByteCount, RowCount}; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, RegionId}; @@ -33,13 +37,14 @@ use crate::config::{FulltextIndexConfig, InvertedIndexConfig}; use crate::metrics::INDEX_CREATE_MEMORY_USAGE; use crate::read::Batch; use crate::region::options::IndexOptions; -use crate::sst::file::FileId; +use crate::sst::file::{FileId, IndexType}; use crate::sst::index::fulltext_index::creator::FulltextIndexer; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::inverted_index::creator::InvertedIndexer; pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index"; pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index"; +pub(crate) const TYPE_BLOOM_FILTER: &str = "bloom_filter"; /// Output of the index creation. #[derive(Debug, Clone, Default)] @@ -50,6 +55,24 @@ pub struct IndexOutput { pub inverted_index: InvertedIndexOutput, /// Fulltext index output. pub fulltext_index: FulltextIndexOutput, + /// Bloom filter output. + pub bloom_filter: BloomFilterOutput, +} + +impl IndexOutput { + pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> { + let mut indexes = SmallVec::new(); + if self.inverted_index.is_available() { + indexes.push(IndexType::InvertedIndex); + } + if self.fulltext_index.is_available() { + indexes.push(IndexType::FulltextIndex); + } + if self.bloom_filter.is_available() { + indexes.push(IndexType::BloomFilter); + } + indexes + } } /// Base output of the index creation. @@ -73,6 +96,8 @@ impl IndexBaseOutput { pub type InvertedIndexOutput = IndexBaseOutput; /// Output of the fulltext index creation. pub type FulltextIndexOutput = IndexBaseOutput; +/// Output of the bloom filter creation. +pub type BloomFilterOutput = IndexBaseOutput; /// The index creator that hides the error handling details. #[derive(Default)] @@ -86,6 +111,8 @@ pub struct Indexer { last_mem_inverted_index: usize, fulltext_indexer: Option, last_mem_fulltext_index: usize, + bloom_filter_indexer: Option, + last_mem_bloom_filter: usize, } impl Indexer { @@ -129,6 +156,15 @@ impl Indexer { .with_label_values(&[TYPE_FULLTEXT_INDEX]) .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64); self.last_mem_fulltext_index = fulltext_mem; + + let bloom_filter_mem = self + .bloom_filter_indexer + .as_ref() + .map_or(0, |creator| creator.memory_usage()); + INDEX_CREATE_MEMORY_USAGE + .with_label_values(&[TYPE_BLOOM_FILTER]) + .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64); + self.last_mem_bloom_filter = bloom_filter_mem; } } @@ -158,7 +194,11 @@ impl<'a> IndexerBuilder<'a> { indexer.inverted_indexer = self.build_inverted_indexer(); indexer.fulltext_indexer = self.build_fulltext_indexer().await; - if indexer.inverted_indexer.is_none() && indexer.fulltext_indexer.is_none() { + indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(); + if indexer.inverted_indexer.is_none() + && indexer.fulltext_indexer.is_none() + && indexer.bloom_filter_indexer.is_none() + { indexer.abort().await; return Indexer::default(); } @@ -266,7 +306,7 @@ impl<'a> IndexerBuilder<'a> { if cfg!(any(test, feature = "test")) { panic!( - "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {}", + "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}", self.metadata.region_id, self.file_id, err ); } else { @@ -278,6 +318,53 @@ impl<'a> IndexerBuilder<'a> { None } + + fn build_bloom_filter_indexer(&self) -> Option { + let create = true; // TODO(zhongzc): add config for bloom filter + + if !create { + debug!( + "Skip creating bloom filter due to config, region_id: {}, file_id: {}", + self.metadata.region_id, self.file_id, + ); + return None; + } + + let mem_limit = Some(16 * 1024 * 1024); // TODO(zhongzc): add config for bloom filter + let indexer = BloomFilterIndexer::new( + self.file_id, + self.metadata, + self.intermediate_manager.clone(), + mem_limit, + ); + + let err = match indexer { + Ok(indexer) => { + if indexer.is_none() { + debug!( + "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}", + self.metadata.region_id, self.file_id, + ); + } + return indexer; + } + Err(err) => err, + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}", + self.metadata.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to create bloom filter, region_id: {}, file_id: {}", + self.metadata.region_id, self.file_id, + ); + } + + None + } } #[cfg(test)] @@ -286,7 +373,9 @@ mod tests { use api::v1::SemanticType; use datatypes::data_type::ConcreteDataType; - use datatypes::schema::{ColumnSchema, FulltextOptions}; + use datatypes::schema::{ + ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType, + }; use object_store::services::Memory; use object_store::ObjectStore; use puffin_manager::PuffinManagerFactory; @@ -298,12 +387,14 @@ mod tests { struct MetaConfig { with_tag: bool, with_fulltext: bool, + with_skipping_bloom: bool, } fn mock_region_metadata( MetaConfig { with_tag, with_fulltext, + with_skipping_bloom, }: MetaConfig, ) -> RegionMetadataRef { let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2)); @@ -354,6 +445,24 @@ mod tests { builder.push_column_metadata(column); } + if with_skipping_bloom { + let column_schema = + ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false) + .with_skipping_options(SkippingIndexOptions { + granularity: 42, + index_type: SkippingIndexType::BloomFilter, + }) + .unwrap(); + + let column = ColumnMetadata { + column_schema, + semantic_type: SemanticType::Field, + column_id: 5, + }; + + builder.push_column_metadata(column); + } + Arc::new(builder.build().unwrap()) } @@ -374,6 +483,7 @@ mod tests { let metadata = mock_region_metadata(MetaConfig { with_tag: true, with_fulltext: true, + with_skipping_bloom: true, }); let indexer = IndexerBuilder { op_type: OperationType::Flush, @@ -392,6 +502,7 @@ mod tests { assert!(indexer.inverted_indexer.is_some()); assert!(indexer.fulltext_indexer.is_some()); + assert!(indexer.bloom_filter_indexer.is_some()); } #[tokio::test] @@ -403,6 +514,7 @@ mod tests { let metadata = mock_region_metadata(MetaConfig { with_tag: true, with_fulltext: true, + with_skipping_bloom: true, }); let indexer = IndexerBuilder { op_type: OperationType::Flush, @@ -456,6 +568,7 @@ mod tests { let metadata = mock_region_metadata(MetaConfig { with_tag: false, with_fulltext: true, + with_skipping_bloom: true, }); let indexer = IndexerBuilder { op_type: OperationType::Flush, @@ -474,10 +587,12 @@ mod tests { assert!(indexer.inverted_indexer.is_none()); assert!(indexer.fulltext_indexer.is_some()); + assert!(indexer.bloom_filter_indexer.is_some()); let metadata = mock_region_metadata(MetaConfig { with_tag: true, with_fulltext: false, + with_skipping_bloom: true, }); let indexer = IndexerBuilder { op_type: OperationType::Flush, @@ -486,7 +601,7 @@ mod tests { metadata: &metadata, row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), - intermediate_manager: intm_manager, + intermediate_manager: intm_manager.clone(), index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), @@ -496,6 +611,31 @@ mod tests { assert!(indexer.inverted_indexer.is_some()); assert!(indexer.fulltext_indexer.is_none()); + assert!(indexer.bloom_filter_indexer.is_some()); + + let metadata = mock_region_metadata(MetaConfig { + with_tag: true, + with_fulltext: true, + with_skipping_bloom: false, + }); + let indexer = IndexerBuilder { + op_type: OperationType::Flush, + file_id: FileId::random(), + file_path: "test".to_string(), + metadata: &metadata, + row_group_size: 1024, + puffin_manager: factory.build(mock_object_store()), + intermediate_manager: intm_manager, + index_options: IndexOptions::default(), + inverted_index_config: InvertedIndexConfig::default(), + fulltext_index_config: FulltextIndexConfig::default(), + } + .build() + .await; + + assert!(indexer.inverted_indexer.is_some()); + assert!(indexer.fulltext_indexer.is_some()); + assert!(indexer.bloom_filter_indexer.is_none()); } #[tokio::test] @@ -507,6 +647,7 @@ mod tests { let metadata = mock_region_metadata(MetaConfig { with_tag: true, with_fulltext: true, + with_skipping_bloom: true, }); let indexer = IndexerBuilder { op_type: OperationType::Flush, diff --git a/src/mito2/src/sst/index/bloom_filter.rs b/src/mito2/src/sst/index/bloom_filter.rs new file mode 100644 index 000000000000..347195a3b16b --- /dev/null +++ b/src/mito2/src/sst/index/bloom_filter.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub(crate) mod creator; + +const INDEX_BLOB_TYPE: &str = "greptime-bloom-filter-v1"; diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs new file mode 100644 index 000000000000..8c56800f47e7 --- /dev/null +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -0,0 +1,530 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; + +use common_telemetry::warn; +use datatypes::schema::SkippingIndexType; +use index::bloom_filter::creator::BloomFilterCreator; +use puffin::puffin_manager::{PuffinWriter, PutOptions}; +use snafu::{ensure, ResultExt}; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::ColumnId; +use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; + +use crate::error::{ + BiErrorsSnafu, BloomFilterFinishSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu, + PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result, +}; +use crate::read::Batch; +use crate::row_converter::SortField; +use crate::sst::file::FileId; +use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE; +use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec}; +use crate::sst::index::intermediate::{ + IntermediateLocation, IntermediateManager, TempFileProvider, +}; +use crate::sst::index::puffin_manager::SstPuffinWriter; +use crate::sst::index::statistics::{ByteCount, RowCount, Statistics}; +use crate::sst::index::TYPE_BLOOM_FILTER; + +/// The buffer size for the pipe used to send index data to the puffin blob. +const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192; + +/// The indexer for the bloom filter index. +pub struct BloomFilterIndexer { + /// The bloom filter creators. + creators: HashMap, + + /// The provider for intermediate files. + temp_file_provider: Arc, + + /// Codec for decoding primary keys. + codec: IndexValuesCodec, + + /// Whether the indexing process has been aborted. + aborted: bool, + + /// The statistics of the indexer. + stats: Statistics, + + /// The global memory usage. + global_memory_usage: Arc, +} + +impl BloomFilterIndexer { + /// Creates a new bloom filter indexer. + pub fn new( + sst_file_id: FileId, + metadata: &RegionMetadataRef, + intermediate_manager: IntermediateManager, + memory_usage_threshold: Option, + ) -> Result> { + let mut creators = HashMap::new(); + + let temp_file_provider = Arc::new(TempFileProvider::new( + IntermediateLocation::new(&metadata.region_id, &sst_file_id), + intermediate_manager, + )); + let global_memory_usage = Arc::new(AtomicUsize::new(0)); + + for column in &metadata.column_metadatas { + let options = + column + .column_schema + .skipping_index_options() + .context(IndexOptionsSnafu { + column_name: &column.column_schema.name, + })?; + + let options = match options { + Some(options) if options.index_type == SkippingIndexType::BloomFilter => options, + _ => continue, + }; + + let creator = BloomFilterCreator::new( + options.granularity as _, + temp_file_provider.clone(), + global_memory_usage.clone(), + memory_usage_threshold, + ); + creators.insert(column.column_id, creator); + } + + if creators.is_empty() { + return Ok(None); + } + + let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns()); + let indexer = Self { + creators, + temp_file_provider, + codec, + aborted: false, + stats: Statistics::new(TYPE_BLOOM_FILTER), + global_memory_usage, + }; + Ok(Some(indexer)) + } + + /// Updates index with a batch of rows. + /// Garbage will be cleaned up if failed to update. + /// + /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator` + pub async fn update(&mut self, batch: &Batch) -> Result<()> { + ensure!(!self.aborted, OperateAbortedIndexSnafu); + + if self.creators.is_empty() { + return Ok(()); + } + + if let Err(update_err) = self.do_update(batch).await { + // clean up garbage if failed to update + if let Err(err) = self.do_cleanup().await { + if cfg!(any(test, feature = "test")) { + panic!("Failed to clean up index creator, err: {err:?}",); + } else { + warn!(err; "Failed to clean up index creator"); + } + } + return Err(update_err); + } + + Ok(()) + } + + /// Finishes index creation and cleans up garbage. + /// Returns the number of rows and bytes written. + /// + /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator` + pub async fn finish( + &mut self, + puffin_writer: &mut SstPuffinWriter, + ) -> Result<(RowCount, ByteCount)> { + ensure!(!self.aborted, OperateAbortedIndexSnafu); + + if self.stats.row_count() == 0 { + // no IO is performed, no garbage to clean up, just return + return Ok((0, 0)); + } + + let finish_res = self.do_finish(puffin_writer).await; + // clean up garbage no matter finish successfully or not + if let Err(err) = self.do_cleanup().await { + if cfg!(any(test, feature = "test")) { + panic!("Failed to clean up index creator, err: {err:?}",); + } else { + warn!(err; "Failed to clean up index creator"); + } + } + + finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count())) + } + + /// Aborts index creation and clean up garbage. + /// + /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator` + pub async fn abort(&mut self) -> Result<()> { + if self.aborted { + return Ok(()); + } + self.aborted = true; + + self.do_cleanup().await + } + + async fn do_update(&mut self, batch: &Batch) -> Result<()> { + let mut guard = self.stats.record_update(); + + let n = batch.num_rows(); + guard.inc_row_count(n); + + // Tags + for ((col_id, _), field, value) in self.codec.decode(batch.primary_key())? { + let Some(creator) = self.creators.get_mut(col_id) else { + continue; + }; + let elems = value + .map(|v| { + let mut buf = vec![]; + IndexValueCodec::encode_nonnull_value(v.as_value_ref(), field, &mut buf)?; + Ok(buf) + }) + .transpose()?; + creator + .push_n_row_elems(n, elems) + .await + .context(PushBloomFilterValueSnafu)?; + } + + // Fields + for field in batch.fields() { + let Some(creator) = self.creators.get_mut(&field.column_id) else { + continue; + }; + + let sort_field = SortField::new(field.data.data_type()); + for i in 0..n { + let value = field.data.get_ref(i); + let elems = (!value.is_null()) + .then(|| { + let mut buf = vec![]; + IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)?; + Ok(buf) + }) + .transpose()?; + + creator + .push_row_elems(elems) + .await + .context(PushBloomFilterValueSnafu)?; + } + } + Ok(()) + } + + /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator` + async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> { + let mut guard = self.stats.record_finish(); + + for (id, creator) in &mut self.creators { + let written_bytes = Self::do_finish_single_creator(id, creator, puffin_writer).await?; + guard.inc_byte_count(written_bytes); + } + + Ok(()) + } + + async fn do_cleanup(&mut self) -> Result<()> { + let mut _guard = self.stats.record_cleanup(); + + self.creators.clear(); + self.temp_file_provider.cleanup().await + } + + /// Data flow of finishing index: + /// + /// ```text + /// (In Memory Buffer) + /// ┌──────┐ + /// ┌─────────────┐ │ PIPE │ + /// │ │ write index data │ │ + /// │ IndexWriter ├──────────────────►│ tx │ + /// │ │ │ │ + /// └─────────────┘ │ │ + /// ┌─────────────────┤ rx │ + /// ┌─────────────┐ │ read as blob └──────┘ + /// │ │ │ + /// │ PuffinWriter├─┤ + /// │ │ │ copy to file ┌──────┐ + /// └─────────────┘ └────────────────►│ File │ + /// └──────┘ + /// ``` + /// + /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator` + async fn do_finish_single_creator( + col_id: &ColumnId, + creator: &mut BloomFilterCreator, + puffin_writer: &mut SstPuffinWriter, + ) -> Result { + let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB); + + let blob_name = format!("{}-{}", INDEX_BLOB_TYPE, col_id); + let (index_finish, puffin_add_blob) = futures::join!( + creator.finish(tx.compat_write()), + puffin_writer.put_blob(&blob_name, rx.compat(), PutOptions::default()) + ); + + match ( + puffin_add_blob.context(PuffinAddBlobSnafu), + index_finish.context(BloomFilterFinishSnafu), + ) { + (Err(e1), Err(e2)) => BiErrorsSnafu { + first: Box::new(e1), + second: Box::new(e2), + } + .fail()?, + + (Ok(_), e @ Err(_)) => e?, + (e @ Err(_), Ok(_)) => e.map(|_| ())?, + (Ok(written_bytes), Ok(_)) => { + return Ok(written_bytes); + } + } + + Ok(0) + } + + /// Returns the memory usage of the indexer. + pub fn memory_usage(&self) -> usize { + self.global_memory_usage + .load(std::sync::atomic::Ordering::Relaxed) + } + + /// Returns the column ids to be indexed. + pub fn column_ids(&self) -> impl Iterator + use<'_> { + self.creators.keys().copied() + } +} + +#[cfg(test)] +mod tests { + use std::iter; + + use api::v1::SemanticType; + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::{ColumnSchema, SkippingIndexOptions}; + use datatypes::value::ValueRef; + use datatypes::vectors::{UInt64Vector, UInt8Vector}; + use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl}; + use object_store::services::Memory; + use object_store::ObjectStore; + use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader}; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use store_api::storage::RegionId; + + use super::*; + use crate::read::BatchColumn; + use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; + use crate::sst::index::puffin_manager::PuffinManagerFactory; + + fn mock_object_store() -> ObjectStore { + ObjectStore::new(Memory::default()).unwrap().finish() + } + + async fn new_intm_mgr(path: impl AsRef) -> IntermediateManager { + IntermediateManager::init_fs(path).await.unwrap() + } + + /// tag_str: + /// - type: string + /// - index: bloom filter + /// - granularity: 2 + /// - column_id: 1 + /// + /// ts: + /// - type: timestamp + /// - index: time index + /// - column_id: 2 + /// + /// field_u64: + /// - type: uint64 + /// - index: bloom filter + /// - granularity: 4 + /// - column_id: 3 + fn mock_region_metadata() -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_str", + ConcreteDataType::string_datatype(), + false, + ) + .with_skipping_options(SkippingIndexOptions { + index_type: SkippingIndexType::BloomFilter, + granularity: 2, + }) + .unwrap(), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field_u64", + ConcreteDataType::uint64_datatype(), + false, + ) + .with_skipping_options(SkippingIndexOptions { + index_type: SkippingIndexType::BloomFilter, + granularity: 4, + }) + .unwrap(), + semantic_type: SemanticType::Field, + column_id: 3, + }) + .primary_key(vec![1]); + + Arc::new(builder.build().unwrap()) + } + + fn new_batch(str_tag: impl AsRef, u64_field: impl IntoIterator) -> Batch { + let fields = vec![SortField::new(ConcreteDataType::string_datatype())]; + let codec = McmpRowCodec::new(fields); + let row: [ValueRef; 1] = [str_tag.as_ref().into()]; + let primary_key = codec.encode(row.into_iter()).unwrap(); + + let u64_field = BatchColumn { + column_id: 3, + data: Arc::new(UInt64Vector::from_iter_values(u64_field)), + }; + let num_rows = u64_field.data.len(); + + Batch::new( + primary_key, + Arc::new(UInt64Vector::from_iter_values( + iter::repeat(0).take(num_rows), + )), + Arc::new(UInt64Vector::from_iter_values( + iter::repeat(0).take(num_rows), + )), + Arc::new(UInt8Vector::from_iter_values( + iter::repeat(1).take(num_rows), + )), + vec![u64_field], + ) + .unwrap() + } + + #[tokio::test] + async fn test_bloom_filter_indexer() { + let prefix = "test_bloom_filter_indexer_"; + let object_store = mock_object_store(); + let intm_mgr = new_intm_mgr(prefix).await; + let region_metadata = mock_region_metadata(); + let memory_usage_threshold = Some(1024); + + let mut indexer = BloomFilterIndexer::new( + FileId::random(), + ®ion_metadata, + intm_mgr, + memory_usage_threshold, + ) + .unwrap() + .unwrap(); + + // push 20 rows + let batch = new_batch("tag1", 0..10); + indexer.update(&batch).await.unwrap(); + + let batch = new_batch("tag2", 10..20); + indexer.update(&batch).await.unwrap(); + + let (_d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await; + let puffin_manager = factory.build(object_store); + + let index_file_name = "index_file"; + let mut puffin_writer = puffin_manager.writer(index_file_name).await.unwrap(); + let (row_count, byte_count) = indexer.finish(&mut puffin_writer).await.unwrap(); + assert_eq!(row_count, 20); + assert!(byte_count > 0); + puffin_writer.finish().await.unwrap(); + + let puffin_reader = puffin_manager.reader(index_file_name).await.unwrap(); + + // tag_str + { + let blob_guard = puffin_reader + .blob("greptime-bloom-filter-v1-1") + .await + .unwrap(); + let reader = blob_guard.reader().await.unwrap(); + let mut bloom_filter = BloomFilterReaderImpl::new(reader); + let metadata = bloom_filter.metadata().await.unwrap(); + + assert_eq!(metadata.bloom_filter_segments.len(), 10); + for i in 0..5 { + let bf = bloom_filter + .bloom_filter(&metadata.bloom_filter_segments[i]) + .await + .unwrap(); + assert!(bf.contains(b"tag1")); + } + for i in 5..10 { + let bf = bloom_filter + .bloom_filter(&metadata.bloom_filter_segments[i]) + .await + .unwrap(); + assert!(bf.contains(b"tag2")); + } + } + + // field_u64 + { + let sort_field = SortField::new(ConcreteDataType::uint64_datatype()); + + let blob_guard = puffin_reader + .blob("greptime-bloom-filter-v1-3") + .await + .unwrap(); + let reader = blob_guard.reader().await.unwrap(); + let mut bloom_filter = BloomFilterReaderImpl::new(reader); + let metadata = bloom_filter.metadata().await.unwrap(); + + assert_eq!(metadata.bloom_filter_segments.len(), 5); + for i in 0u64..20 { + let bf = bloom_filter + .bloom_filter(&metadata.bloom_filter_segments[i as usize / 4]) + .await + .unwrap(); + let mut buf = vec![]; + IndexValueCodec::encode_nonnull_value(ValueRef::UInt64(i), &sort_field, &mut buf) + .unwrap(); + + assert!(bf.contains(&buf)); + } + } + } +} diff --git a/src/mito2/src/sst/index/inverted_index/codec.rs b/src/mito2/src/sst/index/codec.rs similarity index 100% rename from src/mito2/src/sst/index/inverted_index/codec.rs rename to src/mito2/src/sst/index/codec.rs diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs index 416e39d9dd5e..41fa15bd7c72 100644 --- a/src/mito2/src/sst/index/fulltext_index/creator.rs +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -27,8 +27,7 @@ use store_api::storage::{ColumnId, ConcreteDataType, RegionId}; use crate::error::{ CastVectorSnafu, CreateFulltextCreatorSnafu, FieldTypeMismatchSnafu, FulltextFinishSnafu, - FulltextOptionsSnafu, FulltextPushTextSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu, - Result, + FulltextPushTextSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu, Result, }; use crate::read::Batch; use crate::sst::file::FileId; @@ -61,13 +60,12 @@ impl FulltextIndexer { let mut creators = HashMap::new(); for column in &metadata.column_metadatas { - let options = - column - .column_schema - .fulltext_options() - .context(FulltextOptionsSnafu { - column_name: &column.column_schema.name, - })?; + let options = column + .column_schema + .fulltext_options() + .context(IndexOptionsSnafu { + column_name: &column.column_schema.name, + })?; // Relax the type constraint here as many types can be casted to string. diff --git a/src/mito2/src/sst/index/indexer/abort.rs b/src/mito2/src/sst/index/indexer/abort.rs index 68034d48fb29..5b29009a033b 100644 --- a/src/mito2/src/sst/index/indexer/abort.rs +++ b/src/mito2/src/sst/index/indexer/abort.rs @@ -20,6 +20,7 @@ impl Indexer { pub(crate) async fn do_abort(&mut self) { self.do_abort_inverted_index().await; self.do_abort_fulltext_index().await; + self.do_abort_bloom_filter().await; self.puffin_manager = None; } @@ -33,7 +34,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to abort inverted index, region_id: {}, file_id: {}, err: {}", + "Failed to abort inverted index, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -54,7 +55,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to abort full-text index, region_id: {}, file_id: {}, err: {}", + "Failed to abort full-text index, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -64,4 +65,25 @@ impl Indexer { ); } } + + async fn do_abort_bloom_filter(&mut self) { + let Some(mut indexer) = self.bloom_filter_indexer.take() else { + return; + }; + let Err(err) = indexer.abort().await else { + return; + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to abort bloom filter, region_id: {}, file_id: {}, err: {:?}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to abort bloom filter, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + } } diff --git a/src/mito2/src/sst/index/indexer/finish.rs b/src/mito2/src/sst/index/indexer/finish.rs index a0157a9b66f4..025eead758ff 100644 --- a/src/mito2/src/sst/index/indexer/finish.rs +++ b/src/mito2/src/sst/index/indexer/finish.rs @@ -15,11 +15,14 @@ use common_telemetry::{debug, warn}; use puffin::puffin_manager::{PuffinManager, PuffinWriter}; +use crate::sst::index::bloom_filter::creator::BloomFilterIndexer; use crate::sst::index::fulltext_index::creator::FulltextIndexer; use crate::sst::index::inverted_index::creator::InvertedIndexer; use crate::sst::index::puffin_manager::SstPuffinWriter; use crate::sst::index::statistics::{ByteCount, RowCount}; -use crate::sst::index::{FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput}; +use crate::sst::index::{ + BloomFilterOutput, FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput, +}; impl Indexer { pub(crate) async fn do_finish(&mut self) -> IndexOutput { @@ -46,6 +49,12 @@ impl Indexer { return IndexOutput::default(); } + let success = self.do_finish_bloom_filter(&mut writer, &mut output).await; + if !success { + self.do_abort().await; + return IndexOutput::default(); + } + output.file_size = self.do_finish_puffin_writer(writer).await; output } @@ -60,7 +69,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to create puffin writer, region_id: {}, file_id: {}, err: {}", + "Failed to create puffin writer, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -81,7 +90,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to finish puffin writer, region_id: {}, file_id: {}, err: {}", + "Failed to finish puffin writer, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -119,7 +128,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to finish inverted index, region_id: {}, file_id: {}, err: {}", + "Failed to finish inverted index, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -156,7 +165,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to finish full-text index, region_id: {}, file_id: {}, err: {}", + "Failed to finish full-text index, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -169,6 +178,43 @@ impl Indexer { false } + async fn do_finish_bloom_filter( + &mut self, + puffin_writer: &mut SstPuffinWriter, + index_output: &mut IndexOutput, + ) -> bool { + let Some(mut indexer) = self.bloom_filter_indexer.take() else { + return true; + }; + + let err = match indexer.finish(puffin_writer).await { + Ok((row_count, byte_count)) => { + self.fill_bloom_filter_output( + &mut index_output.bloom_filter, + row_count, + byte_count, + &indexer, + ); + return true; + } + Err(err) => err, + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to finish bloom filter, region_id: {}, file_id: {}, err: {:?}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to finish bloom filter, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + + false + } + fn fill_inverted_index_output( &mut self, output: &mut InvertedIndexOutput, @@ -202,4 +248,21 @@ impl Indexer { output.row_count = row_count; output.columns = indexer.column_ids().collect(); } + + fn fill_bloom_filter_output( + &mut self, + output: &mut BloomFilterOutput, + row_count: RowCount, + byte_count: ByteCount, + indexer: &BloomFilterIndexer, + ) { + debug!( + "Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}", + self.region_id, self.file_id, byte_count, row_count + ); + + output.index_size = byte_count; + output.row_count = row_count; + output.columns = indexer.column_ids().collect(); + } } diff --git a/src/mito2/src/sst/index/indexer/update.rs b/src/mito2/src/sst/index/indexer/update.rs index c08f171bb415..c2ab33f0e13a 100644 --- a/src/mito2/src/sst/index/indexer/update.rs +++ b/src/mito2/src/sst/index/indexer/update.rs @@ -29,6 +29,9 @@ impl Indexer { if !self.do_update_fulltext_index(batch).await { self.do_abort().await; } + if !self.do_update_bloom_filter(batch).await { + self.do_abort().await; + } } /// Returns false if the update failed. @@ -43,7 +46,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to update inverted index, region_id: {}, file_id: {}, err: {}", + "Failed to update inverted index, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -68,7 +71,7 @@ impl Indexer { if cfg!(any(test, feature = "test")) { panic!( - "Failed to update full-text index, region_id: {}, file_id: {}, err: {}", + "Failed to update full-text index, region_id: {}, file_id: {}, err: {:?}", self.region_id, self.file_id, err ); } else { @@ -80,4 +83,29 @@ impl Indexer { false } + + /// Returns false if the update failed. + async fn do_update_bloom_filter(&mut self, batch: &Batch) -> bool { + let Some(creator) = self.bloom_filter_indexer.as_mut() else { + return true; + }; + + let Err(err) = creator.update(batch).await else { + return true; + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to update bloom filter, region_id: {}, file_id: {}, err: {:?}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to update bloom filter, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + + false + } } diff --git a/src/mito2/src/sst/index/intermediate.rs b/src/mito2/src/sst/index/intermediate.rs index d0da804c745b..fd8845f96ac3 100644 --- a/src/mito2/src/sst/index/intermediate.rs +++ b/src/mito2/src/sst/index/intermediate.rs @@ -14,13 +14,25 @@ use std::path::PathBuf; +use async_trait::async_trait; +use common_error::ext::BoxedError; use common_telemetry::warn; +use futures::{AsyncRead, AsyncWrite}; +use index::error as index_error; +use index::error::Result as IndexResult; +use index::external_provider::ExternalTempFileProvider; use object_store::util::{self, normalize_dir}; +use snafu::ResultExt; use store_api::storage::{ColumnId, RegionId}; use uuid::Uuid; use crate::access_layer::new_fs_cache_store; use crate::error::Result; +use crate::metrics::{ + INDEX_INTERMEDIATE_FLUSH_OP_TOTAL, INDEX_INTERMEDIATE_READ_BYTES_TOTAL, + INDEX_INTERMEDIATE_READ_OP_TOTAL, INDEX_INTERMEDIATE_SEEK_OP_TOTAL, + INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, INDEX_INTERMEDIATE_WRITE_OP_TOTAL, +}; use crate::sst::file::FileId; use crate::sst::index::store::InstrumentedStore; @@ -129,14 +141,105 @@ impl IntermediateLocation { } } +/// `TempFileProvider` implements `ExternalTempFileProvider`. +/// It uses `InstrumentedStore` to create and read intermediate files. +pub(crate) struct TempFileProvider { + /// Provides the location of intermediate files. + location: IntermediateLocation, + /// Provides store to access to intermediate files. + manager: IntermediateManager, +} + +#[async_trait] +impl ExternalTempFileProvider for TempFileProvider { + async fn create( + &self, + file_group: &str, + file_id: &str, + ) -> IndexResult> { + let path = self.location.file_path(file_group, file_id); + let writer = self + .manager + .store() + .writer( + &path, + &INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, + &INDEX_INTERMEDIATE_WRITE_OP_TOTAL, + &INDEX_INTERMEDIATE_FLUSH_OP_TOTAL, + ) + .await + .map_err(BoxedError::new) + .context(index_error::ExternalSnafu)?; + Ok(Box::new(writer)) + } + + async fn read_all( + &self, + file_group: &str, + ) -> IndexResult)>> { + let file_group_path = self.location.file_group_path(file_group); + let entries = self + .manager + .store() + .list(&file_group_path) + .await + .map_err(BoxedError::new) + .context(index_error::ExternalSnafu)?; + let mut readers = Vec::with_capacity(entries.len()); + + for entry in entries { + if entry.metadata().is_dir() { + warn!("Unexpected entry in index creation dir: {:?}", entry.path()); + continue; + } + + let im_file_id = self.location.im_file_id_from_path(entry.path()); + + let reader = self + .manager + .store() + .reader( + entry.path(), + &INDEX_INTERMEDIATE_READ_BYTES_TOTAL, + &INDEX_INTERMEDIATE_READ_OP_TOTAL, + &INDEX_INTERMEDIATE_SEEK_OP_TOTAL, + ) + .await + .map_err(BoxedError::new) + .context(index_error::ExternalSnafu)?; + readers.push((im_file_id, Box::new(reader) as _)); + } + + Ok(readers) + } +} + +impl TempFileProvider { + /// Creates a new `TempFileProvider`. + pub fn new(location: IntermediateLocation, manager: IntermediateManager) -> Self { + Self { location, manager } + } + + /// Removes all intermediate files. + pub async fn cleanup(&self) -> Result<()> { + self.manager + .store() + .remove_all(self.location.dir_to_cleanup()) + .await + } +} + #[cfg(test)] mod tests { use std::ffi::OsStr; use common_test_util::temp_dir; + use futures::{AsyncReadExt, AsyncWriteExt}; use regex::Regex; + use store_api::storage::RegionId; use super::*; + use crate::sst::file::FileId; #[tokio::test] async fn test_manager() { @@ -212,4 +315,58 @@ mod tests { .is_match(&pi.next().unwrap().to_string_lossy())); // fulltext path assert!(pi.next().is_none()); } + + #[tokio::test] + async fn test_temp_file_provider_basic() { + let temp_dir = temp_dir::create_temp_dir("intermediate"); + let path = temp_dir.path().display().to_string(); + + let location = IntermediateLocation::new(&RegionId::new(0, 0), &FileId::random()); + let store = IntermediateManager::init_fs(path).await.unwrap(); + let provider = TempFileProvider::new(location.clone(), store); + + let file_group = "tag0"; + let file_id = "0000000010"; + let mut writer = provider.create(file_group, file_id).await.unwrap(); + writer.write_all(b"hello").await.unwrap(); + writer.flush().await.unwrap(); + writer.close().await.unwrap(); + + let file_id = "0000000100"; + let mut writer = provider.create(file_group, file_id).await.unwrap(); + writer.write_all(b"world").await.unwrap(); + writer.flush().await.unwrap(); + writer.close().await.unwrap(); + + let file_group = "tag1"; + let file_id = "0000000010"; + let mut writer = provider.create(file_group, file_id).await.unwrap(); + writer.write_all(b"foo").await.unwrap(); + writer.flush().await.unwrap(); + writer.close().await.unwrap(); + + let readers = provider.read_all("tag0").await.unwrap(); + assert_eq!(readers.len(), 2); + for (_, mut reader) in readers { + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await.unwrap(); + assert!(matches!(buf.as_slice(), b"hello" | b"world")); + } + let readers = provider.read_all("tag1").await.unwrap(); + assert_eq!(readers.len(), 1); + let mut reader = readers.into_iter().map(|x| x.1).next().unwrap(); + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, b"foo"); + + provider.cleanup().await.unwrap(); + + assert!(provider + .manager + .store() + .list(location.dir_to_cleanup()) + .await + .unwrap() + .is_empty()); + } } diff --git a/src/mito2/src/sst/index/inverted_index.rs b/src/mito2/src/sst/index/inverted_index.rs index d325f735a431..73dca4ac47f2 100644 --- a/src/mito2/src/sst/index/inverted_index.rs +++ b/src/mito2/src/sst/index/inverted_index.rs @@ -13,7 +13,6 @@ // limitations under the License. pub(crate) mod applier; -mod codec; pub(crate) mod creator; const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1"; diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder.rs b/src/mito2/src/sst/index/inverted_index/applier/builder.rs index c2f90b293003..e14bb89bd1c9 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder.rs @@ -37,8 +37,8 @@ use crate::cache::file_cache::FileCacheRef; use crate::cache::index::inverted_index::InvertedIndexCacheRef; use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result}; use crate::row_converter::SortField; +use crate::sst::index::codec::IndexValueCodec; use crate::sst::index::inverted_index::applier::InvertedIndexApplier; -use crate::sst::index::inverted_index::codec::IndexValueCodec; use crate::sst::index::puffin_manager::PuffinManagerFactory; /// Constructs an [`InvertedIndexApplier`] which applies predicates to SST files during scan. diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 0076322fccbd..138035d554a1 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) mod temp_provider; - use std::collections::HashSet; use std::num::NonZeroUsize; use std::sync::atomic::AtomicUsize; @@ -38,9 +36,10 @@ use crate::error::{ use crate::read::Batch; use crate::row_converter::SortField; use crate::sst::file::FileId; -use crate::sst::index::intermediate::{IntermediateLocation, IntermediateManager}; -use crate::sst::index::inverted_index::codec::{IndexValueCodec, IndexValuesCodec}; -use crate::sst::index::inverted_index::creator::temp_provider::TempFileProvider; +use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec}; +use crate::sst::index::intermediate::{ + IntermediateLocation, IntermediateManager, TempFileProvider, +}; use crate::sst::index::inverted_index::INDEX_BLOB_TYPE; use crate::sst::index::puffin_manager::SstPuffinWriter; use crate::sst::index::statistics::{ByteCount, RowCount, Statistics}; diff --git a/src/mito2/src/sst/index/inverted_index/creator/temp_provider.rs b/src/mito2/src/sst/index/inverted_index/creator/temp_provider.rs deleted file mode 100644 index 1822f3119459..000000000000 --- a/src/mito2/src/sst/index/inverted_index/creator/temp_provider.rs +++ /dev/null @@ -1,182 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use async_trait::async_trait; -use common_error::ext::BoxedError; -use common_telemetry::warn; -use futures::{AsyncRead, AsyncWrite}; -use index::error as index_error; -use index::error::Result as IndexResult; -use index::external_provider::ExternalTempFileProvider; -use snafu::ResultExt; - -use crate::error::Result; -use crate::metrics::{ - INDEX_INTERMEDIATE_FLUSH_OP_TOTAL, INDEX_INTERMEDIATE_READ_BYTES_TOTAL, - INDEX_INTERMEDIATE_READ_OP_TOTAL, INDEX_INTERMEDIATE_SEEK_OP_TOTAL, - INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, INDEX_INTERMEDIATE_WRITE_OP_TOTAL, -}; -use crate::sst::index::intermediate::{IntermediateLocation, IntermediateManager}; - -/// `TempFileProvider` implements `ExternalTempFileProvider`. -/// It uses `InstrumentedStore` to create and read intermediate files. -pub(crate) struct TempFileProvider { - /// Provides the location of intermediate files. - location: IntermediateLocation, - /// Provides store to access to intermediate files. - manager: IntermediateManager, -} - -#[async_trait] -impl ExternalTempFileProvider for TempFileProvider { - async fn create( - &self, - file_group: &str, - file_id: &str, - ) -> IndexResult> { - let path = self.location.file_path(file_group, file_id); - let writer = self - .manager - .store() - .writer( - &path, - &INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, - &INDEX_INTERMEDIATE_WRITE_OP_TOTAL, - &INDEX_INTERMEDIATE_FLUSH_OP_TOTAL, - ) - .await - .map_err(BoxedError::new) - .context(index_error::ExternalSnafu)?; - Ok(Box::new(writer)) - } - - async fn read_all( - &self, - file_group: &str, - ) -> IndexResult)>> { - let file_group_path = self.location.file_group_path(file_group); - let entries = self - .manager - .store() - .list(&file_group_path) - .await - .map_err(BoxedError::new) - .context(index_error::ExternalSnafu)?; - let mut readers = Vec::with_capacity(entries.len()); - - for entry in entries { - if entry.metadata().is_dir() { - warn!("Unexpected entry in index creation dir: {:?}", entry.path()); - continue; - } - - let im_file_id = self.location.im_file_id_from_path(entry.path()); - - let reader = self - .manager - .store() - .reader( - entry.path(), - &INDEX_INTERMEDIATE_READ_BYTES_TOTAL, - &INDEX_INTERMEDIATE_READ_OP_TOTAL, - &INDEX_INTERMEDIATE_SEEK_OP_TOTAL, - ) - .await - .map_err(BoxedError::new) - .context(index_error::ExternalSnafu)?; - readers.push((im_file_id, Box::new(reader) as _)); - } - - Ok(readers) - } -} - -impl TempFileProvider { - /// Creates a new `TempFileProvider`. - pub fn new(location: IntermediateLocation, manager: IntermediateManager) -> Self { - Self { location, manager } - } - - /// Removes all intermediate files. - pub async fn cleanup(&self) -> Result<()> { - self.manager - .store() - .remove_all(self.location.dir_to_cleanup()) - .await - } -} - -#[cfg(test)] -mod tests { - use common_test_util::temp_dir; - use futures::{AsyncReadExt, AsyncWriteExt}; - use store_api::storage::RegionId; - - use super::*; - use crate::sst::file::FileId; - - #[tokio::test] - async fn test_temp_file_provider_basic() { - let temp_dir = temp_dir::create_temp_dir("intermediate"); - let path = temp_dir.path().display().to_string(); - - let location = IntermediateLocation::new(&RegionId::new(0, 0), &FileId::random()); - let store = IntermediateManager::init_fs(path).await.unwrap(); - let provider = TempFileProvider::new(location.clone(), store); - - let file_group = "tag0"; - let file_id = "0000000010"; - let mut writer = provider.create(file_group, file_id).await.unwrap(); - writer.write_all(b"hello").await.unwrap(); - writer.flush().await.unwrap(); - writer.close().await.unwrap(); - - let file_id = "0000000100"; - let mut writer = provider.create(file_group, file_id).await.unwrap(); - writer.write_all(b"world").await.unwrap(); - writer.flush().await.unwrap(); - writer.close().await.unwrap(); - - let file_group = "tag1"; - let file_id = "0000000010"; - let mut writer = provider.create(file_group, file_id).await.unwrap(); - writer.write_all(b"foo").await.unwrap(); - writer.flush().await.unwrap(); - writer.close().await.unwrap(); - - let readers = provider.read_all("tag0").await.unwrap(); - assert_eq!(readers.len(), 2); - for (_, mut reader) in readers { - let mut buf = Vec::new(); - reader.read_to_end(&mut buf).await.unwrap(); - assert!(matches!(buf.as_slice(), b"hello" | b"world")); - } - let readers = provider.read_all("tag1").await.unwrap(); - assert_eq!(readers.len(), 1); - let mut reader = readers.into_iter().map(|x| x.1).next().unwrap(); - let mut buf = Vec::new(); - reader.read_to_end(&mut buf).await.unwrap(); - assert_eq!(buf, b"foo"); - - provider.cleanup().await.unwrap(); - - assert!(provider - .manager - .store() - .list(location.dir_to_cleanup()) - .await - .unwrap() - .is_empty()); - } -}