From 9be9347892c266a6c196c39d162e320c4f8b1e60 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 16 Dec 2024 19:48:30 +0800 Subject: [PATCH 01/22] feat: find time window lower bound --- src/flow/src/error.rs | 10 +- src/flow/src/expr.rs | 1 + src/flow/src/expr/scalar.rs | 3 + src/flow/src/expr/utils.rs | 232 ++++++++++++++++++++++++++++++ src/flow/src/lib.rs | 3 + src/flow/src/test_utils.rs | 154 ++++++++++++++++++++ src/flow/src/transform.rs | 134 +---------------- src/flow/src/transform/aggr.rs | 2 +- src/flow/src/transform/expr.rs | 2 +- src/flow/src/transform/literal.rs | 2 +- src/flow/src/transform/plan.rs | 2 +- 11 files changed, 406 insertions(+), 139 deletions(-) create mode 100644 src/flow/src/expr/utils.rs create mode 100644 src/flow/src/test_utils.rs diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index 137e024307f9..f12b15593c22 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -20,9 +20,9 @@ use common_error::ext::BoxedError; use common_error::{define_into_tonic_status, from_err_code_msg_to_header}; use common_macro::stack_trace_debug; use common_telemetry::common_error::ext::ErrorExt; -use common_telemetry::common_error::status_code::StatusCode; -use snafu::{Location, Snafu}; +use common_telemetry::common_error::status_code::StatusCode;\ use tonic::metadata::MetadataMap; +use snafu::{Location, ResultExt, Snafu}; use crate::adapter::FlowId; use crate::expr::EvalError; @@ -237,3 +237,9 @@ impl ErrorExt for Error { } define_into_tonic_status!(Error); + +impl From for Error { + fn from(e: EvalError) -> Self { + Err::<(), _>(e).context(EvalSnafu).unwrap_err() + } +} diff --git a/src/flow/src/expr.rs b/src/flow/src/expr.rs index 5dde62b43a69..80baede40cc0 100644 --- a/src/flow/src/expr.rs +++ b/src/flow/src/expr.rs @@ -22,6 +22,7 @@ mod linear; pub(crate) mod relation; mod scalar; mod signature; +mod utils; use arrow::compute::FilterBuilder; use datatypes::prelude::{ConcreteDataType, DataType}; diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index a6e00cce5bdd..94af5e0e425a 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -311,6 +311,9 @@ impl ScalarExpr { } /// Eval this expression with the given values. + /// + /// TODO(discord9): add tests to make sure `eval_batch` is the same as `eval` in + /// most cases pub fn eval(&self, values: &[Value]) -> Result { match self { ScalarExpr::Column(index) => Ok(values[*index].clone()), diff --git a/src/flow/src/expr/utils.rs b/src/flow/src/expr/utils.rs new file mode 100644 index 000000000000..c171f350a2b5 --- /dev/null +++ b/src/flow/src/expr/utils.rs @@ -0,0 +1,232 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use datatypes::value::Value; +use snafu::{ensure, OptionExt}; + +use crate::error::UnexpectedSnafu; +use crate::expr::ScalarExpr; +use crate::Result; + +/// Find the lower bound of time window in given `expr` and `current` timestamp. +/// +/// i.e. for `current="2021-07-01 00:01:01.000"` and `expr=date_bin(INTERVAL '5 minutes', ts) as time_window` and `ts_col=ts`, +/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound +/// of current time window given the current timestamp +/// +/// if return None, meaning this time window have no lower bound +pub fn find_time_window_lower_bound( + expr: &ScalarExpr, + ts_col: &ScalarExpr, + current: common_time::Timestamp, +) -> Result> { + let ScalarExpr::Column(ts_col_idx) = ts_col.clone() else { + UnexpectedSnafu { + reason: format!("Expected column expression but got {ts_col:?}"), + } + .fail()? + }; + let all_ref_columns = expr.get_all_ref_columns(); + if !all_ref_columns.contains(&ts_col_idx) { + UnexpectedSnafu { + reason: format!( + "Expected column {} to be referenced in expression {expr:?}", + ts_col_idx + ), + } + .fail()? + } + if all_ref_columns.len() > 1 { + UnexpectedSnafu { + reason: format!( + "Expect only one column to be referenced in expression {expr:?}, found {all_ref_columns:?}" + ), + } + .fail()? + } + let permute_map = BTreeMap::from([(ts_col_idx, 0usize)]); + + let mut rewrited_expr = expr.clone(); + + rewrited_expr.permute_map(&permute_map)?; + + fn eval_to_timestamp(expr: &ScalarExpr, values: &[Value]) -> Result { + let val = expr.eval(values)?; + if let Value::Timestamp(ts) = val { + Ok(ts) + } else { + UnexpectedSnafu { + reason: format!("Expected timestamp in expression {expr:?} but got {val:?}"), + } + .fail()? + } + } + + let cur_time_window = eval_to_timestamp(&rewrited_expr, &[current.into()])?; + + // search to find the lower bound + let mut offset: i64 = 1; + let lower_bound; + let mut upper_bound = Some(current); + // first expontial probe to found a range for binary search + loop { + let Some(next_val) = current.value().checked_sub(offset) else { + // no lower bound + return Ok(None); + }; + + let prev_time_probe = common_time::Timestamp::new(next_val, current.unit()); + + let prev_time_window = eval_to_timestamp(&rewrited_expr, &[prev_time_probe.into()])?; + + if prev_time_window < cur_time_window { + lower_bound = Some(prev_time_probe); + break; + } else if prev_time_window == cur_time_window { + upper_bound = Some(prev_time_probe); + } else { + UnexpectedSnafu{ + reason: format!("Unsupported time window expression {rewrited_expr:?}, expect monotonic increasing for time window expression {expr:?}"), + }.fail()? + } + + let Some(new_offset) = offset.checked_mul(2) else { + // no lower bound + return Ok(None); + }; + offset = new_offset; + } + + // binary search for the lower bound + + ensure!(lower_bound.map(|v|v.unit())==upper_bound.map(|v|v.unit()), UnexpectedSnafu{ + reason: format!(" unit mismatch for time window expression {expr:?}, found {lower_bound:?} and {upper_bound:?}"), + }); + + let output_unit = lower_bound.expect("should have lower bound").unit(); + + let mut low = lower_bound.expect("should have lower bound").value(); + let mut high = upper_bound.expect("should have upper bound").value(); + while low < high { + let mid = (low + high) / 2; + let mid_probe = common_time::Timestamp::new(mid, output_unit); + let mid_time_window = eval_to_timestamp(&rewrited_expr, &[mid_probe.into()])?; + if mid_time_window < cur_time_window { + low = mid + 1; + } else if mid_time_window == cur_time_window { + high = mid; + } else { + UnexpectedSnafu { + reason: format!("Binary search failed for time window expression {expr:?}"), + } + .fail()? + } + } + + let final_lower_bound_for_time_window = common_time::Timestamp::new(low, output_unit); + + Ok(Some(final_lower_bound_for_time_window)) +} + +#[cfg(test)] +mod test { + use super::*; + use crate::plan::{Plan, TypedPlan}; + use crate::test_utils::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + + #[tokio::test] + async fn test_timewindow_lower_bound() { + let testcases = vec![ + ( + ("'5 minutes'", "ts", Some("2021-07-01 00:00:00.000")), + "2021-07-01 00:01:01.000", + "2021-07-01 00:00:00.000", + ), + ( + ("'5 minutes'", "ts", None), + "2021-07-01 00:01:01.000", + "2021-07-01 00:00:00.000", + ), + ( + ("'5 minutes'", "ts", None), + "2021-07-01 00:00:00.000", + "2021-07-01 00:00:00.000", + ), + // test edge cases + ( + ("'5 minutes'", "ts", None), + "2021-07-01 00:05:00.000", + "2021-07-01 00:05:00.000", + ), + ( + ("'5 minutes'", "ts", None), + "2021-07-01 00:04:59.999", + "2021-07-01 00:00:00.000", + ), + ( + ("'5 minutes'", "ts", None), + "2021-07-01 00:04:59.999999999", + "2021-07-01 00:00:00.000", + ), + ( + ("'5 minutes'", "ts", None), + "2021-07-01 00:04:59.999999999999", + "2021-07-01 00:00:00.000", + ), + ( + ("'5 minutes'", "ts", None), + "2021-07-01 00:04:59.999999999999999", + "2021-07-01 00:00:00.000", + ), + ]; + let engine = create_test_query_engine(); + + for (args, current, expected) in testcases { + let sql = if let Some(origin) = args.2 { + format!( + "SELECT date_bin({}, {}, '{origin}') FROM numbers_with_ts;", + args.0, args.1 + ) + } else { + format!( + "SELECT date_bin({}, {}) FROM numbers_with_ts;", + args.0, args.1 + ) + }; + let plan = sql_to_substrait(engine.clone(), &sql).await; + let mut ctx = create_test_ctx(); + let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan) + .await + .unwrap(); + + let expr = { + let mfp = flow_plan.plan; + let Plan::Mfp { mfp, .. } = mfp else { + unreachable!() + }; + mfp.expressions[0].clone() + }; + + let current = common_time::Timestamp::from_str(current, None).unwrap(); + + let res = find_time_window_lower_bound(&expr, &ScalarExpr::Column(1), current).unwrap(); + + let expected = Some(common_time::Timestamp::from_str(expected, None).unwrap()); + + assert_eq!(res, expected); + } + } +} diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index 8d6a881fa066..8d07369afa55 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -38,6 +38,9 @@ mod server; mod transform; mod utils; +#[cfg(test)] +mod test_utils; + pub use adapter::{FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions}; pub use error::{Error, Result}; pub use server::{FlownodeBuilder, FlownodeInstance, FlownodeServer, FrontendInvoker}; diff --git a/src/flow/src/test_utils.rs b/src/flow/src/test_utils.rs new file mode 100644 index 000000000000..585fa233016c --- /dev/null +++ b/src/flow/src/test_utils.rs @@ -0,0 +1,154 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use catalog::RegisterTableRequest; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID}; +use datatypes::data_type::ConcreteDataType as CDT; +use datatypes::prelude::*; +use datatypes::schema::Schema; +use datatypes::timestamp::TimestampMillisecond; +use datatypes::vectors::{TimestampMillisecondVectorBuilder, VectorRef}; +use itertools::Itertools; +use prost::Message; +use query::parser::QueryLanguageParser; +use query::query_engine::DefaultSerializer; +use query::QueryEngine; +use session::context::QueryContext; +/// note here we are using the `substrait_proto_df` crate from the `substrait` module and +/// rename it to `substrait_proto` +use substrait::substrait_proto_df as substrait_proto; +use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; +use substrait_proto::proto; +use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; +use table::test_util::MemTable; + +use crate::adapter::node_context::IdToNameMap; +use crate::adapter::FlownodeContext; +use crate::df_optimizer::apply_df_optimizer; +use crate::expr::GlobalId; +use crate::repr::{ColumnType, RelationType}; +use crate::transform::register_function_to_query_engine; + +pub fn create_test_ctx() -> FlownodeContext { + let mut schemas = HashMap::new(); + let mut tri_map = IdToNameMap::new(); + { + let gid = GlobalId::User(0); + let name = [ + "greptime".to_string(), + "public".to_string(), + "numbers".to_string(), + ]; + let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]); + + tri_map.insert(Some(name.clone()), Some(1024), gid); + schemas.insert(gid, schema.into_named(vec![Some("number".to_string())])); + } + + { + let gid = GlobalId::User(1); + let name = [ + "greptime".to_string(), + "public".to_string(), + "numbers_with_ts".to_string(), + ]; + let schema = RelationType::new(vec![ + ColumnType::new(CDT::uint32_datatype(), false), + ColumnType::new(CDT::timestamp_millisecond_datatype(), false), + ]); + schemas.insert( + gid, + schema.into_named(vec![Some("number".to_string()), Some("ts".to_string())]), + ); + tri_map.insert(Some(name.clone()), Some(1025), gid); + } + + FlownodeContext { + schema: schemas, + table_repr: tri_map, + query_context: Some(Arc::new(QueryContext::with("greptime", "public"))), + ..Default::default() + } +} + +pub fn create_test_query_engine() -> Arc { + let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap(); + let req = RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: NUMBERS_TABLE_NAME.to_string(), + table_id: NUMBERS_TABLE_ID, + table: NumbersTable::table(NUMBERS_TABLE_ID), + }; + catalog_list.register_table_sync(req).unwrap(); + + let schema = vec![ + datatypes::schema::ColumnSchema::new("number", CDT::uint32_datatype(), false), + datatypes::schema::ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false), + ]; + let mut columns = vec![]; + let numbers = (1..=10).collect_vec(); + let column: VectorRef = Arc::new(::VectorType::from_vec(numbers)); + columns.push(column); + + let ts = (1..=10).collect_vec(); + let mut builder = TimestampMillisecondVectorBuilder::with_capacity(10); + ts.into_iter() + .map(|v| builder.push(Some(TimestampMillisecond::new(v)))) + .count(); + let column: VectorRef = builder.to_vector_cloned(); + columns.push(column); + + let schema = Arc::new(Schema::new(schema)); + let recordbatch = common_recordbatch::RecordBatch::new(schema, columns).unwrap(); + let table = MemTable::table("numbers_with_ts", recordbatch); + + let req_with_ts = RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "numbers_with_ts".to_string(), + table_id: 1024, + table, + }; + catalog_list.register_table_sync(req_with_ts).unwrap(); + + let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, None, false); + + let engine = factory.query_engine(); + register_function_to_query_engine(&engine); + + assert_eq!("datafusion", engine.name()); + engine +} + +pub async fn sql_to_substrait(engine: Arc, sql: &str) -> proto::Plan { + // let engine = create_test_query_engine(); + let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap(); + let plan = engine + .planner() + .plan(&stmt, QueryContext::arc()) + .await + .unwrap(); + let plan = apply_df_optimizer(plan).await.unwrap(); + + // encode then decode so to rely on the impl of conversion from logical plan to substrait plan + let bytes = DFLogicalSubstraitConvertor {} + .encode(&plan, DefaultSerializer) + .unwrap(); + + proto::Plan::decode(bytes).unwrap() +} diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index 94878115cf8b..e2a9c2ff8059 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -152,144 +152,12 @@ impl common_function::function::Function for TumbleFunction { #[cfg(test)] mod test { - use std::sync::Arc; - use catalog::RegisterTableRequest; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID}; - use datatypes::prelude::*; - use datatypes::schema::Schema; - use datatypes::timestamp::TimestampMillisecond; - use datatypes::vectors::{TimestampMillisecondVectorBuilder, VectorRef}; - use itertools::Itertools; - use prost::Message; use query::parser::QueryLanguageParser; - use query::query_engine::DefaultSerializer; - use query::QueryEngine; use session::context::QueryContext; - use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; - use substrait_proto::proto; - use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; - use table::test_util::MemTable; - use super::*; - use crate::adapter::node_context::IdToNameMap; use crate::df_optimizer::apply_df_optimizer; - use crate::expr::GlobalId; - use crate::repr::{ColumnType, RelationType}; - - pub fn create_test_ctx() -> FlownodeContext { - let mut schemas = HashMap::new(); - let mut tri_map = IdToNameMap::new(); - { - let gid = GlobalId::User(0); - let name = [ - "greptime".to_string(), - "public".to_string(), - "numbers".to_string(), - ]; - let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]); - - tri_map.insert(Some(name.clone()), Some(1024), gid); - schemas.insert(gid, schema.into_named(vec![Some("number".to_string())])); - } - - { - let gid = GlobalId::User(1); - let name = [ - "greptime".to_string(), - "public".to_string(), - "numbers_with_ts".to_string(), - ]; - let schema = RelationType::new(vec![ - ColumnType::new(CDT::uint32_datatype(), false), - ColumnType::new(CDT::timestamp_millisecond_datatype(), false), - ]); - schemas.insert( - gid, - schema.into_named(vec![Some("number".to_string()), Some("ts".to_string())]), - ); - tri_map.insert(Some(name.clone()), Some(1025), gid); - } - - FlownodeContext { - schema: schemas, - table_repr: tri_map, - query_context: Some(Arc::new(QueryContext::with("greptime", "public"))), - ..Default::default() - } - } - - pub fn create_test_query_engine() -> Arc { - let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap(); - let req = RegisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: NUMBERS_TABLE_NAME.to_string(), - table_id: NUMBERS_TABLE_ID, - table: NumbersTable::table(NUMBERS_TABLE_ID), - }; - catalog_list.register_table_sync(req).unwrap(); - - let schema = vec![ - datatypes::schema::ColumnSchema::new("number", CDT::uint32_datatype(), false), - datatypes::schema::ColumnSchema::new( - "ts", - CDT::timestamp_millisecond_datatype(), - false, - ), - ]; - let mut columns = vec![]; - let numbers = (1..=10).collect_vec(); - let column: VectorRef = Arc::new(::VectorType::from_vec(numbers)); - columns.push(column); - - let ts = (1..=10).collect_vec(); - let mut builder = TimestampMillisecondVectorBuilder::with_capacity(10); - ts.into_iter() - .map(|v| builder.push(Some(TimestampMillisecond::new(v)))) - .count(); - let column: VectorRef = builder.to_vector_cloned(); - columns.push(column); - - let schema = Arc::new(Schema::new(schema)); - let recordbatch = common_recordbatch::RecordBatch::new(schema, columns).unwrap(); - let table = MemTable::table("numbers_with_ts", recordbatch); - - let req_with_ts = RegisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: "numbers_with_ts".to_string(), - table_id: 1024, - table, - }; - catalog_list.register_table_sync(req_with_ts).unwrap(); - - let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, None, false); - - let engine = factory.query_engine(); - register_function_to_query_engine(&engine); - - assert_eq!("datafusion", engine.name()); - engine - } - - pub async fn sql_to_substrait(engine: Arc, sql: &str) -> proto::Plan { - // let engine = create_test_query_engine(); - let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap(); - let plan = engine - .planner() - .plan(&stmt, QueryContext::arc()) - .await - .unwrap(); - let plan = apply_df_optimizer(plan).await.unwrap(); - - // encode then decode so to rely on the impl of conversion from logical plan to substrait plan - let bytes = DFLogicalSubstraitConvertor {} - .encode(&plan, DefaultSerializer) - .unwrap(); - - proto::Plan::decode(bytes).unwrap() - } + use crate::test_utils::create_test_query_engine; /// TODO(discord9): add more illegal sql tests #[tokio::test] diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index b944e3b263e3..33ee1fa284b9 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -358,7 +358,7 @@ mod test { use crate::expr::{BinaryFunc, DfScalarFunction, GlobalId, RawDfScalarFn}; use crate::plan::{Plan, TypedPlan}; use crate::repr::{ColumnType, RelationType}; - use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + use crate::test_utils::{create_test_ctx, create_test_query_engine, sql_to_substrait}; use crate::transform::CDT; #[tokio::test] diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index de05b018ac51..b461b5e5e1b3 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -545,7 +545,7 @@ mod test { use crate::expr::{GlobalId, MapFilterProject}; use crate::plan::{Plan, TypedPlan}; use crate::repr::{self, ColumnType, RelationType}; - use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + use crate::test_utils::{create_test_ctx, create_test_query_engine, sql_to_substrait}; /// test if `WHERE` condition can be converted to Flow's ScalarExpr in mfp's filter #[tokio::test] diff --git a/src/flow/src/transform/literal.rs b/src/flow/src/transform/literal.rs index ffb62ff14dba..7ad5db6f46d3 100644 --- a/src/flow/src/transform/literal.rs +++ b/src/flow/src/transform/literal.rs @@ -346,7 +346,7 @@ mod test { use super::*; use crate::plan::{Plan, TypedPlan}; use crate::repr::{self, ColumnType, RelationType}; - use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + use crate::test_utils::{create_test_ctx, create_test_query_engine, sql_to_substrait}; /// test if literal in substrait plan can be correctly converted to flow plan #[tokio::test] async fn test_literal() { diff --git a/src/flow/src/transform/plan.rs b/src/flow/src/transform/plan.rs index ad5fc2f58dc2..872342edc8b3 100644 --- a/src/flow/src/transform/plan.rs +++ b/src/flow/src/transform/plan.rs @@ -238,7 +238,7 @@ mod test { use crate::expr::GlobalId; use crate::plan::{Plan, TypedPlan}; use crate::repr::{ColumnType, RelationType}; - use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + use crate::test_utils::{create_test_ctx, create_test_query_engine, sql_to_substrait}; use crate::transform::CDT; #[tokio::test] From de526ed7d6ca7d81e1bee6e07a46183fb70d0d14 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 16 Dec 2024 19:49:35 +0800 Subject: [PATCH 02/22] chore: typo --- src/flow/src/expr/utils.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/flow/src/expr/utils.rs b/src/flow/src/expr/utils.rs index c171f350a2b5..21f0124f3317 100644 --- a/src/flow/src/expr/utils.rs +++ b/src/flow/src/expr/utils.rs @@ -59,9 +59,9 @@ pub fn find_time_window_lower_bound( } let permute_map = BTreeMap::from([(ts_col_idx, 0usize)]); - let mut rewrited_expr = expr.clone(); + let mut rewrote_expr = expr.clone(); - rewrited_expr.permute_map(&permute_map)?; + rewrote_expr.permute_map(&permute_map)?; fn eval_to_timestamp(expr: &ScalarExpr, values: &[Value]) -> Result { let val = expr.eval(values)?; @@ -75,7 +75,7 @@ pub fn find_time_window_lower_bound( } } - let cur_time_window = eval_to_timestamp(&rewrited_expr, &[current.into()])?; + let cur_time_window = eval_to_timestamp(&rewrote_expr, &[current.into()])?; // search to find the lower bound let mut offset: i64 = 1; @@ -90,7 +90,7 @@ pub fn find_time_window_lower_bound( let prev_time_probe = common_time::Timestamp::new(next_val, current.unit()); - let prev_time_window = eval_to_timestamp(&rewrited_expr, &[prev_time_probe.into()])?; + let prev_time_window = eval_to_timestamp(&rewrote_expr, &[prev_time_probe.into()])?; if prev_time_window < cur_time_window { lower_bound = Some(prev_time_probe); @@ -99,7 +99,7 @@ pub fn find_time_window_lower_bound( upper_bound = Some(prev_time_probe); } else { UnexpectedSnafu{ - reason: format!("Unsupported time window expression {rewrited_expr:?}, expect monotonic increasing for time window expression {expr:?}"), + reason: format!("Unsupported time window expression {rewrote_expr:?}, expect monotonic increasing for time window expression {expr:?}"), }.fail()? } @@ -123,7 +123,7 @@ pub fn find_time_window_lower_bound( while low < high { let mid = (low + high) / 2; let mid_probe = common_time::Timestamp::new(mid, output_unit); - let mid_time_window = eval_to_timestamp(&rewrited_expr, &[mid_probe.into()])?; + let mid_time_window = eval_to_timestamp(&rewrote_expr, &[mid_probe.into()])?; if mid_time_window < cur_time_window { low = mid + 1; } else if mid_time_window == cur_time_window { From 165a210a244b1684de59439e6fde094399edb697 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 16 Dec 2024 19:59:39 +0800 Subject: [PATCH 03/22] chore: clippy --- src/flow/src/expr/utils.rs | 41 ++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/src/flow/src/expr/utils.rs b/src/flow/src/expr/utils.rs index 21f0124f3317..723d0fe41039 100644 --- a/src/flow/src/expr/utils.rs +++ b/src/flow/src/expr/utils.rs @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::collections::BTreeMap; use datatypes::value::Value; -use snafu::{ensure, OptionExt}; +use snafu::ensure; use crate::error::UnexpectedSnafu; use crate::expr::ScalarExpr; @@ -92,15 +93,22 @@ pub fn find_time_window_lower_bound( let prev_time_window = eval_to_timestamp(&rewrote_expr, &[prev_time_probe.into()])?; - if prev_time_window < cur_time_window { - lower_bound = Some(prev_time_probe); - break; - } else if prev_time_window == cur_time_window { - upper_bound = Some(prev_time_probe); - } else { - UnexpectedSnafu{ - reason: format!("Unsupported time window expression {rewrote_expr:?}, expect monotonic increasing for time window expression {expr:?}"), - }.fail()? + match prev_time_window.cmp(&cur_time_window) { + Ordering::Less => { + lower_bound = Some(prev_time_probe); + break; + } + Ordering::Equal => { + upper_bound = Some(prev_time_probe); + } + Ordering::Greater => { + UnexpectedSnafu { + reason: format!( + "Unsupported time window expression {rewrote_expr:?}, expect monotonic increasing for time window expression {expr:?}" + ), + } + .fail()? + } } let Some(new_offset) = offset.checked_mul(2) else { @@ -124,15 +132,14 @@ pub fn find_time_window_lower_bound( let mid = (low + high) / 2; let mid_probe = common_time::Timestamp::new(mid, output_unit); let mid_time_window = eval_to_timestamp(&rewrote_expr, &[mid_probe.into()])?; - if mid_time_window < cur_time_window { - low = mid + 1; - } else if mid_time_window == cur_time_window { - high = mid; - } else { - UnexpectedSnafu { + + match mid_time_window.cmp(&cur_time_window) { + Ordering::Less => low = mid + 1, + Ordering::Equal => high = mid, + Ordering::Greater => UnexpectedSnafu { reason: format!("Binary search failed for time window expression {expr:?}"), } - .fail()? + .fail()?, } } From 1f5ef092cbb8a88dfd0753515119c2d3dfdf9f67 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 16 Dec 2024 21:12:22 +0800 Subject: [PATCH 04/22] feat: find in plan --- src/flow/src/expr/utils.rs | 49 +++++++++++++++++++++++++++- src/flow/src/plan.rs | 65 ++++++++++++++++++++++++++++++++++++- src/flow/src/plan/reduce.rs | 14 +++++++- 3 files changed, 125 insertions(+), 3 deletions(-) diff --git a/src/flow/src/expr/utils.rs b/src/flow/src/expr/utils.rs index 723d0fe41039..9edab2081701 100644 --- a/src/flow/src/expr/utils.rs +++ b/src/flow/src/expr/utils.rs @@ -16,12 +16,59 @@ use std::cmp::Ordering; use std::collections::BTreeMap; use datatypes::value::Value; -use snafu::ensure; +use snafu::{ensure, OptionExt}; use crate::error::UnexpectedSnafu; use crate::expr::ScalarExpr; +use crate::plan::TypedPlan; use crate::Result; +/// Find lower bound for time `current` in given `plan` +/// +/// if `plan` doesn't contain a `TIME INDEX` column, return `None` +pub fn find_time_lower_bound_for_plan( + plan: &TypedPlan, + current: common_time::Timestamp, +) -> Result> { + let typ = plan.schema.typ(); + let Some(mut time_index) = typ.time_index else { + return Ok(None); + }; + + let mut cur_plan = plan; + let mut expr_time_index = None; + + while let Some(input) = cur_plan.plan.get_first_input_plan() { + // follow upward and find deepest time index expr that is not a column ref + expr_time_index = Some(input.plan.get_nth_expr(time_index).cloned().context( + UnexpectedSnafu { + reason: "Failed to find time index expr", + }, + )?); + if let Some(ScalarExpr::Column(i)) = expr_time_index { + time_index = i; + } else { + break; + } + cur_plan = input; + } + + let expr_time_index = expr_time_index.context(UnexpectedSnafu { + reason: "Failed to find time index expr", + })?; + + let ts_col = expr_time_index + .get_all_ref_columns() + .first() + .cloned() + .context(UnexpectedSnafu { + reason: "Failed to find time index column", + })?; + let ts_col = ScalarExpr::Column(ts_col); + + find_time_window_lower_bound(&expr_time_index, &ts_col, current) +} + /// Find the lower bound of time window in given `expr` and `current` timestamp. /// /// i.e. for `current="2021-07-01 00:01:01.000"` and `expr=date_bin(INTERVAL '5 minutes', ts) as time_window` and `ts_col=ts`, diff --git a/src/flow/src/plan.rs b/src/flow/src/plan.rs index e1cf22e621ec..d20685083447 100644 --- a/src/flow/src/plan.rs +++ b/src/flow/src/plan.rs @@ -19,7 +19,7 @@ mod join; mod reduce; use crate::error::Error; -use crate::expr::{Id, LocalId, MapFilterProject, SafeMfpPlan, TypedExpr}; +use crate::expr::{GlobalId, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr, TypedExpr}; use crate::plan::join::JoinPlan; pub(crate) use crate::plan::reduce::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan}; use crate::repr::{DiffRow, RelationDesc}; @@ -184,6 +184,69 @@ pub enum Plan { }, } +impl Plan { + /// Get nth expr using column ref + pub fn get_nth_expr(&self, n: usize) -> Option<&ScalarExpr> { + match self { + Self::Mfp { mfp, .. } => mfp.expressions.get(n), + Self::Reduce { key_val_plan, .. } => key_val_plan.get_nth_expr(n), + _ => None, + } + } + + /// Get the first input plan if exists + pub fn get_first_input_plan(&self) -> Option<&TypedPlan> { + match self { + Plan::Let { value, .. } => Some(value), + Plan::Mfp { input, .. } => Some(input), + Plan::Reduce { input, .. } => Some(input), + Plan::Join { inputs, .. } => inputs.first(), + Plan::Union { inputs, .. } => inputs.first(), + _ => None, + } + } + + /// Find all the used collection in the plan + pub fn find_used_collection(&self) -> BTreeSet { + fn recur_find_use(plan: &Plan, used: &mut BTreeSet) { + match plan { + Plan::Get { id } => { + match id { + Id::Local(_) => (), + Id::Global(g) => { + used.insert(*g); + } + }; + } + Plan::Let { value, body, .. } => { + recur_find_use(&value.plan, used); + recur_find_use(&body.plan, used); + } + Plan::Mfp { input, .. } => { + recur_find_use(&input.plan, used); + } + Plan::Reduce { input, .. } => { + recur_find_use(&input.plan, used); + } + Plan::Join { inputs, .. } => { + for input in inputs { + recur_find_use(&input.plan, used); + } + } + Plan::Union { inputs, .. } => { + for input in inputs { + recur_find_use(&input.plan, used); + } + } + _ => {} + } + } + let mut ret = Default::default(); + recur_find_use(self, &mut ret); + ret + } +} + impl Plan { pub fn with_types(self, schema: RelationDesc) -> TypedPlan { TypedPlan { schema, plan: self } diff --git a/src/flow/src/plan/reduce.rs b/src/flow/src/plan/reduce.rs index 1edd0c40dd55..7fa6c237e45e 100644 --- a/src/flow/src/plan/reduce.rs +++ b/src/flow/src/plan/reduce.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::expr::{AggregateExpr, SafeMfpPlan}; +use crate::expr::{AggregateExpr, SafeMfpPlan, ScalarExpr}; /// Describe how to extract key-value pair from a `Row` #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] @@ -23,6 +23,18 @@ pub struct KeyValPlan { pub val_plan: SafeMfpPlan, } +impl KeyValPlan { + /// Get nth expr using column ref + pub fn get_nth_expr(&self, n: usize) -> Option<&ScalarExpr> { + let key_len = self.key_plan.expressions.len(); + if n < key_len { + return self.key_plan.expressions.get(n); + } else { + return self.val_plan.expressions.get(n - key_len); + } + } +} + /// TODO(discord9): def&impl of Hierarchical aggregates(for min/max with support to deletion) and /// basic aggregates(for other aggregate functions) and mixed aggregate #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] From 318a9158896e1412d64198dd44ffaa6f8943f995 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 16 Dec 2024 21:14:16 +0800 Subject: [PATCH 05/22] chore: clippy --- src/flow/src/plan/reduce.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/flow/src/plan/reduce.rs b/src/flow/src/plan/reduce.rs index 7fa6c237e45e..bcbf68772e89 100644 --- a/src/flow/src/plan/reduce.rs +++ b/src/flow/src/plan/reduce.rs @@ -28,9 +28,9 @@ impl KeyValPlan { pub fn get_nth_expr(&self, n: usize) -> Option<&ScalarExpr> { let key_len = self.key_plan.expressions.len(); if n < key_len { - return self.key_plan.expressions.get(n); + self.key_plan.expressions.get(n) } else { - return self.val_plan.expressions.get(n - key_len); + self.val_plan.expressions.get(n - key_len) } } } From 06a1cb9236db9a3e478f195580d28f6a8ef99f07 Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 17 Dec 2024 14:09:57 +0800 Subject: [PATCH 06/22] test: auto get plan time window lower bound --- src/flow/src/expr/linear.rs | 9 +++++ src/flow/src/expr/utils.rs | 68 +++++++++++++++++++++++++++++++++---- src/flow/src/plan.rs | 4 +-- src/flow/src/plan/reduce.rs | 12 +++---- 4 files changed, 78 insertions(+), 15 deletions(-) diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index f96d7827b6bd..b416ed0b25ec 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -94,6 +94,15 @@ impl MapFilterProject { } } + pub fn get_nth_expr(&self, n: usize) -> Option { + let idx = *self.projection.get(n)?; + if idx < self.input_arity { + Some(ScalarExpr::Column(idx)) + } else { + self.expressions.get(idx - self.input_arity).cloned() + } + } + /// The number of columns expected in the output row. pub fn output_arity(&self) -> usize { self.projection.len() diff --git a/src/flow/src/expr/utils.rs b/src/flow/src/expr/utils.rs index 9edab2081701..b0b69910073b 100644 --- a/src/flow/src/expr/utils.rs +++ b/src/flow/src/expr/utils.rs @@ -26,7 +26,7 @@ use crate::Result; /// Find lower bound for time `current` in given `plan` /// /// if `plan` doesn't contain a `TIME INDEX` column, return `None` -pub fn find_time_lower_bound_for_plan( +pub fn find_plan_time_lower_bound( plan: &TypedPlan, current: common_time::Timestamp, ) -> Result> { @@ -36,21 +36,26 @@ pub fn find_time_lower_bound_for_plan( }; let mut cur_plan = plan; - let mut expr_time_index = None; + let mut expr_time_index; - while let Some(input) = cur_plan.plan.get_first_input_plan() { + loop { // follow upward and find deepest time index expr that is not a column ref - expr_time_index = Some(input.plan.get_nth_expr(time_index).cloned().context( + expr_time_index = Some(cur_plan.plan.get_nth_expr(time_index).context( UnexpectedSnafu { reason: "Failed to find time index expr", }, )?); + if let Some(ScalarExpr::Column(i)) = expr_time_index { time_index = i; } else { break; } - cur_plan = input; + if let Some(input) = cur_plan.plan.get_first_input_plan() { + cur_plan = input; + } else { + break; + } } let expr_time_index = expr_time_index.context(UnexpectedSnafu { @@ -197,13 +202,64 @@ pub fn find_time_window_lower_bound( #[cfg(test)] mod test { + use pretty_assertions::assert_eq; + use super::*; use crate::plan::{Plan, TypedPlan}; use crate::test_utils::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + #[tokio::test] + async fn test_plan_time_window_lower_bound() { + let testcases = [ + // no time index + ( + "SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;", + "2021-07-01 00:01:01.000", + None, + ), + // time index + ( + "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;", + "2021-07-01 00:01:01.000", + Some("2021-07-01 00:00:00.000"), + ), + // time index with other fields + ( + "SELECT sum(number) as sum_up, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;", + "2021-07-01 00:01:01.000", + Some("2021-07-01 00:00:00.000"), + ), + // time index with other pks + ( + "SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number;", + "2021-07-01 00:01:01.000", + Some("2021-07-01 00:00:00.000"), + ), + ]; + let engine = create_test_query_engine(); + + for (sql, current, expected) in &testcases[3..] { + let plan = sql_to_substrait(engine.clone(), sql).await; + let mut ctx = create_test_ctx(); + let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan) + .await + .unwrap(); + + let current = common_time::Timestamp::from_str(current, None).unwrap(); + + let expected = + expected.map(|expected| common_time::Timestamp::from_str(expected, None).unwrap()); + + assert_eq!( + find_plan_time_lower_bound(&flow_plan, current).unwrap(), + expected + ); + } + } + #[tokio::test] async fn test_timewindow_lower_bound() { - let testcases = vec![ + let testcases = [ ( ("'5 minutes'", "ts", Some("2021-07-01 00:00:00.000")), "2021-07-01 00:01:01.000", diff --git a/src/flow/src/plan.rs b/src/flow/src/plan.rs index d20685083447..0ad367d49a31 100644 --- a/src/flow/src/plan.rs +++ b/src/flow/src/plan.rs @@ -186,9 +186,9 @@ pub enum Plan { impl Plan { /// Get nth expr using column ref - pub fn get_nth_expr(&self, n: usize) -> Option<&ScalarExpr> { + pub fn get_nth_expr(&self, n: usize) -> Option { match self { - Self::Mfp { mfp, .. } => mfp.expressions.get(n), + Self::Mfp { mfp, .. } => mfp.get_nth_expr(n), Self::Reduce { key_val_plan, .. } => key_val_plan.get_nth_expr(n), _ => None, } diff --git a/src/flow/src/plan/reduce.rs b/src/flow/src/plan/reduce.rs index bcbf68772e89..65a83756b57f 100644 --- a/src/flow/src/plan/reduce.rs +++ b/src/flow/src/plan/reduce.rs @@ -25,13 +25,11 @@ pub struct KeyValPlan { impl KeyValPlan { /// Get nth expr using column ref - pub fn get_nth_expr(&self, n: usize) -> Option<&ScalarExpr> { - let key_len = self.key_plan.expressions.len(); - if n < key_len { - self.key_plan.expressions.get(n) - } else { - self.val_plan.expressions.get(n - key_len) - } + pub fn get_nth_expr(&self, n: usize) -> Option { + self.key_plan.get_nth_expr(n).or_else(|| { + self.val_plan + .get_nth_expr(n - self.key_plan.projection.len()) + }) } } From 2103e18ce4a15fc742c0308319436fd969204448 Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 17 Dec 2024 15:13:26 +0800 Subject: [PATCH 07/22] WIP: refill flow --- src/flow/src/adapter.rs | 2 ++ src/flow/src/adapter/refill.rs | 13 +++++++++++++ src/flow/src/server.rs | 4 ++++ 3 files changed, 19 insertions(+) create mode 100644 src/flow/src/adapter/refill.rs diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index dcdb1b1eb01a..26d0b0f89a1b 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -68,6 +68,8 @@ mod tests; mod util; mod worker; +mod refill; + pub(crate) mod node_context; mod table_source; diff --git a/src/flow/src/adapter/refill.rs b/src/flow/src/adapter/refill.rs new file mode 100644 index 000000000000..59f3388c4861 --- /dev/null +++ b/src/flow/src/adapter/refill.rs @@ -0,0 +1,13 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index d22ba220441b..c2c15eef6c23 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -316,6 +316,10 @@ impl FlownodeBuilder { Ok(instance) } + async fn start_refill_flows(&self, manager: &FlowWorkerManagerRef) -> Result<(), Error> { + todo!() + } + /// recover all flow tasks in this flownode in distributed mode(nodeid is Some()) /// /// or recover all existing flow tasks if in standalone mode(nodeid is None) From c30cb9259f1550c5b26e16899008f06ec411d5f7 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 18 Dec 2024 15:55:00 +0800 Subject: [PATCH 08/22] TODO: impl refill tasks --- src/flow/src/adapter/refill.rs | 12 ++++++++++++ src/flow/src/server.rs | 4 ---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/flow/src/adapter/refill.rs b/src/flow/src/adapter/refill.rs index 59f3388c4861..ba891e87a35a 100644 --- a/src/flow/src/adapter/refill.rs +++ b/src/flow/src/adapter/refill.rs @@ -11,3 +11,15 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + +use crate::adapter::FlowWorkerManagerRef; +use crate::{Error, FlownodeBuilder}; + +impl FlownodeBuilder { + /// Create a series of tasks to refill flow, will be transfer to flownode if + /// + /// tasks havn't completed, and will show up in `flows` table + async fn start_refill_flows(&self, manager: &FlowWorkerManagerRef) -> Result<(), Error> { + todo!() + } +} diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index c2c15eef6c23..d22ba220441b 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -316,10 +316,6 @@ impl FlownodeBuilder { Ok(instance) } - async fn start_refill_flows(&self, manager: &FlowWorkerManagerRef) -> Result<(), Error> { - todo!() - } - /// recover all flow tasks in this flownode in distributed mode(nodeid is Some()) /// /// or recover all existing flow tasks if in standalone mode(nodeid is None) From e04f9ed47683cc566b46f374695439806171755d Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 19 Dec 2024 14:37:48 +0800 Subject: [PATCH 09/22] WIP: RefillTask --- src/flow/src/adapter/refill.rs | 57 +++++++++++++++++++++++++++++++++- src/flow/src/server.rs | 4 +++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/src/flow/src/adapter/refill.rs b/src/flow/src/adapter/refill.rs index ba891e87a35a..5dfc643bbf24 100644 --- a/src/flow/src/adapter/refill.rs +++ b/src/flow/src/adapter/refill.rs @@ -12,8 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::{BTreeSet, VecDeque}; + +use common_query::OutputData; +use datatypes::value::Value; +use table::metadata::TableId; + +use crate::adapter::table_source::TableSource; use crate::adapter::FlowWorkerManagerRef; -use crate::{Error, FlownodeBuilder}; +use crate::error::UnexpectedSnafu; +use crate::{Error, FlownodeBuilder, FrontendInvoker}; impl FlownodeBuilder { /// Create a series of tasks to refill flow, will be transfer to flownode if @@ -23,3 +31,50 @@ impl FlownodeBuilder { todo!() } } + +/// Task to refill flow with given table id and a time range +pub struct RefillTask { + table_id: TableId, + output_data: OutputData, +} + +impl RefillTask { + /// Query with "select * from table WHERE time >= range_start and time < range_end" + pub async fn create( + invoker: &FrontendInvoker, + table_id: TableId, + time_range: ( + common_time::timestamp::Timestamp, + common_time::timestamp::Timestamp, + ), + time_col_name: &str, + table_src: &TableSource, + ) -> Result { + let (table_name, table_schmea) = table_src.get_table_name_schema(&table_id).await?; + let all_col_names: BTreeSet<_> = table_schmea + .iter_names() + .flatten() + .map(|s| s.as_str()) + .collect(); + + if !all_col_names.contains(time_col_name) { + UnexpectedSnafu { + reason: format!( + "Can't find column {} in table {} while refill flow", + time_col_name, + table_name.join(".") + ), + } + .fail()?; + } + + let sql = format!( + "select * from {0} where {1} >= {2} and {1} < {3}", + table_name.join("."), + time_col_name, + Value::from(time_range.0), + Value::from(time_range.1), + ); + todo!() + } +} diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index d22ba220441b..623feafa61ca 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -504,6 +504,10 @@ impl FrontendInvoker { let invoker = FrontendInvoker::new(inserter, deleter, statement_executor); Ok(invoker) } + + pub fn statement_executor(&self) -> Arc { + self.statement_executor.clone() + } } impl FrontendInvoker { From 062270eeded470d718f1af36180d7f90f3fd99a1 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 19 Dec 2024 17:56:11 +0800 Subject: [PATCH 10/22] feat: create refill task --- src/flow/src/adapter/refill.rs | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/src/flow/src/adapter/refill.rs b/src/flow/src/adapter/refill.rs index 5dfc643bbf24..7153d2684afd 100644 --- a/src/flow/src/adapter/refill.rs +++ b/src/flow/src/adapter/refill.rs @@ -14,13 +14,18 @@ use std::collections::{BTreeSet, VecDeque}; +use common_error::ext::BoxedError; use common_query::OutputData; use datatypes::value::Value; +use query::parser::QueryLanguageParser; +use session::context::QueryContext; +use snafu::ResultExt; use table::metadata::TableId; use crate::adapter::table_source::TableSource; use crate::adapter::FlowWorkerManagerRef; use crate::error::UnexpectedSnafu; +use crate::expr::error::ExternalSnafu; use crate::{Error, FlownodeBuilder, FrontendInvoker}; impl FlownodeBuilder { @@ -35,7 +40,7 @@ impl FlownodeBuilder { /// Task to refill flow with given table id and a time range pub struct RefillTask { table_id: TableId, - output_data: OutputData, + output_data: common_query::Output, } impl RefillTask { @@ -75,6 +80,25 @@ impl RefillTask { Value::from(time_range.0), Value::from(time_range.1), ); - todo!() + + // we don't need information from query context in this query so a default query context is enough + let query_ctx = QueryContext::arc(); + + let stmt = QueryLanguageParser::parse_sql(&sql, &query_ctx) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + let stmt_exec = invoker.statement_executor(); + + let output_data = stmt_exec + .execute_stmt(stmt, query_ctx) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + Ok(RefillTask { + table_id, + output_data, + }) } } From 25d50e50f372532bb3858676c5f292f2c5b8fe95 Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 20 Dec 2024 11:52:14 +0800 Subject: [PATCH 11/22] refactor: QueryStream --- src/flow/src/adapter/flownode_impl.rs | 1 + src/flow/src/adapter/node_context.rs | 5 ++++ src/flow/src/adapter/refill.rs | 42 ++++++++++++++++++++++++--- 3 files changed, 44 insertions(+), 4 deletions(-) diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 1f431ff92f64..2843b6538769 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -29,6 +29,7 @@ use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; use super::util::from_proto_to_data_type; +use super::FlownodeContext; use crate::adapter::{CreateFlowArgs, FlowWorkerManager}; use crate::error::InternalSnafu; use crate::metrics::METRIC_FLOW_TASK_COUNT; diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 5c644803ec71..06d6336e90a9 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -18,6 +18,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use common_recordbatch::RecordBatch; use common_telemetry::trace; use datatypes::prelude::ConcreteDataType; use session::context::QueryContext; @@ -159,6 +160,10 @@ impl SourceSender { Ok(0) } + + pub async fn send_record_batchs(&self, batch: RecordBatch) -> Result { + todo!() + } } impl FlownodeContext { diff --git a/src/flow/src/adapter/refill.rs b/src/flow/src/adapter/refill.rs index 7153d2684afd..ae2aa13cfe65 100644 --- a/src/flow/src/adapter/refill.rs +++ b/src/flow/src/adapter/refill.rs @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, VecDeque}; +use std::collections::BTreeSet; use common_error::ext::BoxedError; -use common_query::OutputData; +use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; use datatypes::value::Value; use query::parser::QueryLanguageParser; use session::context::QueryContext; @@ -40,7 +40,38 @@ impl FlownodeBuilder { /// Task to refill flow with given table id and a time range pub struct RefillTask { table_id: TableId, - output_data: common_query::Output, + output_stream: SendableRecordBatchStream, +} + +/// Query stream of RefillTask, simply wrap RecordBatches and RecordBatchStream and check output is not `AffectedRows` +enum QueryStream { + Batches { batches: RecordBatches }, + Stream { stream: SendableRecordBatchStream }, +} + +impl TryFrom for QueryStream { + type Error = Error; + fn try_from(value: common_query::Output) -> Result { + match value.data { + common_query::OutputData::Stream(stream) => Ok(QueryStream::Stream { stream }), + common_query::OutputData::RecordBatches(batches) => { + Ok(QueryStream::Batches { batches }) + } + _ => UnexpectedSnafu { + reason: format!("Unexpected output data type: {:?}", value.data), + } + .fail(), + } + } +} + +impl QueryStream { + fn try_into_stream(self) -> Result { + match self { + Self::Batches { batches } => Ok(batches.as_stream()), + Self::Stream { stream } => Ok(stream), + } + } } impl RefillTask { @@ -96,9 +127,12 @@ impl RefillTask { .map_err(BoxedError::new) .context(ExternalSnafu)?; + let output_stream = QueryStream::try_from(output_data)?; + let output_stream = output_stream.try_into_stream()?; + Ok(RefillTask { table_id, - output_data, + output_stream, }) } } From 7b4d9f66997c75bc9511b03cfa5bfc9f3ef47dab Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 20 Dec 2024 12:22:49 +0800 Subject: [PATCH 12/22] chore: typo --- src/flow/src/adapter/refill.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/flow/src/adapter/refill.rs b/src/flow/src/adapter/refill.rs index ae2aa13cfe65..1bb176317343 100644 --- a/src/flow/src/adapter/refill.rs +++ b/src/flow/src/adapter/refill.rs @@ -86,8 +86,8 @@ impl RefillTask { time_col_name: &str, table_src: &TableSource, ) -> Result { - let (table_name, table_schmea) = table_src.get_table_name_schema(&table_id).await?; - let all_col_names: BTreeSet<_> = table_schmea + let (table_name, table_schema) = table_src.get_table_name_schema(&table_id).await?; + let all_col_names: BTreeSet<_> = table_schema .iter_names() .flatten() .map(|s| s.as_str()) From c1e088adc450b19e4069b8c09868e206e4833b92 Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 20 Dec 2024 15:53:32 +0800 Subject: [PATCH 13/22] feat: handle refill inserts --- src/flow/src/adapter/flownode_impl.rs | 1 - src/flow/src/adapter/node_context.rs | 32 +++++-- src/flow/src/adapter/refill.rs | 125 +++++++++++++++++++++++--- src/flow/src/expr.rs | 10 +++ src/flow/src/plan.rs | 2 + src/flow/src/server.rs | 41 ++++++--- 6 files changed, 180 insertions(+), 31 deletions(-) diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 2843b6538769..1f431ff92f64 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -29,7 +29,6 @@ use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; use super::util::from_proto_to_data_type; -use super::FlownodeContext; use crate::adapter::{CreateFlowArgs, FlowWorkerManager}; use crate::error::InternalSnafu; use crate::metrics::METRIC_FLOW_TASK_COUNT; diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 06d6336e90a9..e38e7898f4ef 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -138,13 +138,13 @@ impl SourceSender { rows: Vec, batch_datatypes: &[ConcreteDataType], ) -> Result { - METRIC_FLOW_INPUT_BUF_SIZE.add(rows.len() as _); + let rows_len = rows.len(); + METRIC_FLOW_INPUT_BUF_SIZE.add(rows_len as _); while self.send_buf_row_cnt.load(Ordering::SeqCst) >= BATCH_SIZE * 4 { tokio::task::yield_now().await; } // row count metrics is approx so relaxed order is ok - self.send_buf_row_cnt - .fetch_add(rows.len(), Ordering::SeqCst); + self.send_buf_row_cnt.fetch_add(rows_len, Ordering::SeqCst); let batch = Batch::try_from_rows_with_types( rows.into_iter().map(|(row, _, _)| row).collect(), batch_datatypes, @@ -158,11 +158,21 @@ impl SourceSender { .build() })?; - Ok(0) + Ok(rows_len) } - pub async fn send_record_batchs(&self, batch: RecordBatch) -> Result { - todo!() + /// send record batch + pub async fn send_record_batch(&self, batch: RecordBatch) -> Result { + let row_cnt = batch.num_rows(); + let batch = Batch::from(batch); + + self.send_buf_tx.send(batch).await.map_err(|e| { + crate::error::InternalSnafu { + reason: format!("Failed to send batch, error = {:?}", e), + } + .build() + })?; + Ok(row_cnt) } } @@ -185,6 +195,16 @@ impl FlownodeContext { sender.send_rows(rows, batch_datatypes).await } + pub async fn send_rb(&self, table_id: TableId, batch: RecordBatch) -> Result { + let sender = self + .source_sender + .get(&table_id) + .with_context(|| TableNotFoundSnafu { + name: table_id.to_string(), + })?; + sender.send_record_batch(batch).await + } + /// flush all sender's buf /// /// return numbers being sent diff --git a/src/flow/src/adapter/refill.rs b/src/flow/src/adapter/refill.rs index 1bb176317343..9f1ead8ef675 100644 --- a/src/flow/src/adapter/refill.rs +++ b/src/flow/src/adapter/refill.rs @@ -15,17 +15,19 @@ use std::collections::BTreeSet; use common_error::ext::BoxedError; -use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; +use common_recordbatch::{RecordBatch, RecordBatches, SendableRecordBatchStream}; use datatypes::value::Value; +use futures::{Stream, StreamExt}; use query::parser::QueryLanguageParser; use session::context::QueryContext; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; use crate::adapter::table_source::TableSource; use crate::adapter::FlowWorkerManagerRef; -use crate::error::UnexpectedSnafu; +use crate::error::{FlowNotFoundSnafu, UnexpectedSnafu}; use crate::expr::error::ExternalSnafu; +use crate::repr::RelationDesc; use crate::{Error, FlownodeBuilder, FrontendInvoker}; impl FlownodeBuilder { @@ -33,6 +35,34 @@ impl FlownodeBuilder { /// /// tasks havn't completed, and will show up in `flows` table async fn start_refill_flows(&self, manager: &FlowWorkerManagerRef) -> Result<(), Error> { + let Some(nodeid) = manager.node_id else { + UnexpectedSnafu { + reason: "node id is not set", + } + .fail()? + }; + let nodeid = nodeid as u64; + + let flow_ids = self.get_all_flow_ids(Some(nodeid)).await?; + for flow_id in flow_ids { + let info = self + .flow_metadata_manager() + .flow_info_manager() + .get(flow_id) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? + .context(FlowNotFoundSnafu { id: flow_id })?; + let expire_after = info.expire_after(); + // TODO(discord9): better way to get last point + let now = manager.tick_manager.tick(); + let time_range = expire_after.map(|e| { + ( + common_time::Timestamp::new_millisecond(now - e), + common_time::Timestamp::new_millisecond(now), + ) + }); + } todo!() } } @@ -40,6 +70,7 @@ impl FlownodeBuilder { /// Task to refill flow with given table id and a time range pub struct RefillTask { table_id: TableId, + table_schema: RelationDesc, output_stream: SendableRecordBatchStream, } @@ -79,10 +110,7 @@ impl RefillTask { pub async fn create( invoker: &FrontendInvoker, table_id: TableId, - time_range: ( - common_time::timestamp::Timestamp, - common_time::timestamp::Timestamp, - ), + time_range: Option<(common_time::Timestamp, common_time::Timestamp)>, time_col_name: &str, table_src: &TableSource, ) -> Result { @@ -104,13 +132,17 @@ impl RefillTask { .fail()?; } - let sql = format!( - "select * from {0} where {1} >= {2} and {1} < {3}", - table_name.join("."), - time_col_name, - Value::from(time_range.0), - Value::from(time_range.1), - ); + let sql = if let Some(time_range) = time_range { + format!( + "select * from {0} where {1} >= {2} and {1} < {3}", + table_name.join("."), + time_col_name, + Value::from(time_range.0), + Value::from(time_range.1), + ) + } else { + format!("select * from {0}", table_name.join(".")) + }; // we don't need information from query context in this query so a default query context is enough let query_ctx = QueryContext::arc(); @@ -132,7 +164,72 @@ impl RefillTask { Ok(RefillTask { table_id, + table_schema, output_stream, }) } + + /// handle refill insert requests + /// + /// TODO(discord9): add a back pressure mechanism + pub async fn handle_refill_inserts( + &mut self, + manager: FlowWorkerManagerRef, + ) -> Result<(), Error> { + while let Some(rb) = self.output_stream.next().await { + let rb = match rb { + Ok(rb) => rb, + Err(err) => Err(err).map_err(BoxedError::new).context(ExternalSnafu)?, + }; + self.validate_schema(&rb)?; + + // send rb into flow node + manager + .node_context + .read() + .await + .send_rb(self.table_id, rb) + .await?; + } + Ok(()) + } + + /// validate that incoming batch's schema is the same as table schema(by comparing types&names) + fn validate_schema(&self, rb: &RecordBatch) -> Result<(), Error> { + let rb_schema = &rb.schema; + let table_schema = &self.table_schema; + if rb_schema.column_schemas().len() != table_schema.len()? { + UnexpectedSnafu { + reason: "rb schema len != table schema len", + } + .fail()?; + } + for (i, rb_col) in rb_schema.column_schemas().iter().enumerate() { + let (rb_name, rb_ty) = (rb_col.name.as_str(), &rb_col.data_type); + let (table_name, table_ty) = ( + table_schema.names[i].as_ref(), + &table_schema.typ().column_types[i].scalar_type, + ); + if Some(rb_name) != table_name.map(|c| c.as_str()) { + UnexpectedSnafu { + reason: format!( + "incoming batch's schema name {} != expected table schema name {:?}", + rb_name, table_name + ), + } + .fail()?; + } + + if rb_ty != table_ty { + UnexpectedSnafu { + reason: format!( + "incoming batch's schema type {:?} != expected table schema type {:?}", + rb_ty, table_ty + ), + } + .fail()?; + } + } + Ok(()) + } } diff --git a/src/flow/src/expr.rs b/src/flow/src/expr.rs index 80baede40cc0..ea05821498f6 100644 --- a/src/flow/src/expr.rs +++ b/src/flow/src/expr.rs @@ -55,6 +55,16 @@ pub struct Batch { diffs: Option, } +impl From for Batch { + fn from(value: common_recordbatch::RecordBatch) -> Self { + Self { + batch: value.columns().to_vec(), + row_count: value.num_rows(), + diffs: None, + } + } +} + impl PartialEq for Batch { fn eq(&self, other: &Self) -> bool { let mut batch_eq = true; diff --git a/src/flow/src/plan.rs b/src/flow/src/plan.rs index 0ad367d49a31..f6b33c46cb5c 100644 --- a/src/flow/src/plan.rs +++ b/src/flow/src/plan.rs @@ -18,6 +18,8 @@ mod join; mod reduce; +use std::collections::BTreeSet; + use crate::error::Error; use crate::expr::{GlobalId, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr, TypedExpr}; use crate::plan::join::JoinPlan; diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 623feafa61ca..b4ba8a535471 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -273,6 +273,18 @@ impl FlownodeBuilder { } } + pub fn table_meta(&self) -> TableMetadataManagerRef { + self.table_meta.clone() + } + + pub fn catalog_manager(&self) -> CatalogManagerRef { + self.catalog_manager.clone() + } + + pub fn flow_metadata_manager(&self) -> FlowMetadataManagerRef { + self.flow_metadata_manager.clone() + } + pub fn with_heartbeat_task(self, heartbeat_task: HeartbeatTask) -> Self { let (sender, receiver) = SizeReportSender::new(); Self { @@ -316,22 +328,16 @@ impl FlownodeBuilder { Ok(instance) } - /// recover all flow tasks in this flownode in distributed mode(nodeid is Some()) - /// - /// or recover all existing flow tasks if in standalone mode(nodeid is None) - /// - /// TODO(discord9): persistent flow tasks with internal state - async fn recover_flows(&self, manager: &FlowWorkerManagerRef) -> Result { - let nodeid = self.opts.node_id; - let to_be_recovered: Vec<_> = if let Some(nodeid) = nodeid { - let to_be_recover = self + pub(crate) async fn get_all_flow_ids(&self, nodeid: Option) -> Result, Error> { + let ret = if let Some(nodeid) = nodeid { + let flow_ids_one_node = self .flow_metadata_manager .flownode_flow_manager() .flows(nodeid) .try_collect::>() .await .context(ListFlowsSnafu { id: Some(nodeid) })?; - to_be_recover.into_iter().map(|(id, _)| id).collect() + flow_ids_one_node.into_iter().map(|(id, _)| id).collect() } else { let all_catalogs = self .catalog_manager @@ -355,6 +361,21 @@ impl FlownodeBuilder { } all_flow_ids }; + + Ok(ret) + } + + /// recover all flow tasks in this flownode in distributed mode(nodeid is Some()) + /// + /// or recover all existing flow tasks if in standalone mode(nodeid is None) + /// + /// TODO(discord9): persistent flow tasks with internal state + pub(crate) async fn recover_flows( + &self, + manager: &FlowWorkerManagerRef, + ) -> Result { + let nodeid = self.opts.node_id; + let to_be_recovered = self.get_all_flow_ids(nodeid).await?; let cnt = to_be_recovered.len(); // TODO(discord9): recover in parallel From 01c88280928bfa8f9c0565924e9ddacd340e84ed Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 23 Dec 2024 14:15:14 +0800 Subject: [PATCH 14/22] feat: refill task --- src/flow/src/adapter.rs | 35 ++++++++- src/flow/src/adapter/refill.rs | 103 ++++++++++++++++++++++----- src/flow/src/adapter/table_source.rs | 24 +++++++ 3 files changed, 145 insertions(+), 17 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 26d0b0f89a1b..03c2167a70c9 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -23,6 +23,7 @@ use std::time::{Duration, Instant, SystemTime}; use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests}; use common_config::Configurable; use common_error::ext::BoxedError; +use common_meta::key::flow::flow_info::FlowInfoValue; use common_meta::key::TableMetadataManagerRef; use common_runtime::JoinHandle; use common_telemetry::logging::{LoggingOptions, TracingOptions}; @@ -37,7 +38,7 @@ use serde::{Deserialize, Serialize}; use servers::grpc::GrpcOptions; use servers::heartbeat_options::HeartbeatOptions; use servers::Mode; -use session::context::QueryContext; +use session::context::{QueryContext, QueryContextBuilder}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{ConcreteDataType, RegionId}; use table::metadata::TableId; @@ -696,6 +697,38 @@ pub struct CreateFlowArgs { pub query_ctx: Option, } +impl CreateFlowArgs { + pub fn from_flow_info( + flow_id: FlowId, + info: FlowInfoValue, + create_if_not_exists: bool, + or_replace: bool, + ) -> Self { + let sink_table_name = [ + info.sink_table_name().catalog_name.clone(), + info.sink_table_name().schema_name.clone(), + info.sink_table_name().table_name.clone(), + ]; + let args = CreateFlowArgs { + flow_id: flow_id as _, + sink_table_name, + source_table_ids: info.source_table_ids().to_vec(), + create_if_not_exists, + or_replace, + expire_after: info.expire_after(), + comment: Some(info.comment().clone()), + sql: info.raw_sql().clone(), + flow_options: info.options().clone(), + query_ctx: Some( + QueryContextBuilder::default() + .current_catalog(info.catalog_name().clone()) + .build(), + ), + }; + args + } +} + /// Create&Remove flow impl FlowWorkerManager { /// remove a flow by it's id diff --git a/src/flow/src/adapter/refill.rs b/src/flow/src/adapter/refill.rs index 9f1ead8ef675..2e0be6011974 100644 --- a/src/flow/src/adapter/refill.rs +++ b/src/flow/src/adapter/refill.rs @@ -16,34 +16,42 @@ use std::collections::BTreeSet; use common_error::ext::BoxedError; use common_recordbatch::{RecordBatch, RecordBatches, SendableRecordBatchStream}; +use common_runtime::JoinHandle; use datatypes::value::Value; -use futures::{Stream, StreamExt}; +use futures::StreamExt; use query::parser::QueryLanguageParser; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; +use super::FlowId; use crate::adapter::table_source::TableSource; -use crate::adapter::FlowWorkerManagerRef; +use crate::adapter::{CreateFlowArgs, FlowWorkerManagerRef}; use crate::error::{FlowNotFoundSnafu, UnexpectedSnafu}; use crate::expr::error::ExternalSnafu; use crate::repr::RelationDesc; use crate::{Error, FlownodeBuilder, FrontendInvoker}; impl FlownodeBuilder { - /// Create a series of tasks to refill flow, will be transfer to flownode if - /// - /// tasks havn't completed, and will show up in `flows` table - async fn start_refill_flows(&self, manager: &FlowWorkerManagerRef) -> Result<(), Error> { - let Some(nodeid) = manager.node_id else { - UnexpectedSnafu { - reason: "node id is not set", - } - .fail()? - }; - let nodeid = nodeid as u64; + /// Create a series of tasks to refill flow + async fn create_refill_flow_tasks( + &self, + manager: &FlowWorkerManagerRef, + ) -> Result, Error> { + let nodeid = manager.node_id.map(|c| c as u64); - let flow_ids = self.get_all_flow_ids(Some(nodeid)).await?; + let frontend_invoker = + manager + .frontend_invoker + .read() + .await + .clone() + .context(UnexpectedSnafu { + reason: "frontend invoker is not set", + })?; + + let flow_ids = self.get_all_flow_ids(nodeid).await?; + let mut refill_tasks = Vec::new(); for flow_id in flow_ids { let info = self .flow_metadata_manager() @@ -62,13 +70,74 @@ impl FlownodeBuilder { common_time::Timestamp::new_millisecond(now), ) }); + for src_table in info.source_table_ids() { + let time_index_col = manager + .table_info_source + .get_time_index_column_from_table_id(src_table) + .await?; + let time_index_name = time_index_col.name; + let task = RefillTask::create( + &frontend_invoker, + flow_id as u64, + *src_table, + time_range, + &time_index_name, + &manager.table_info_source, + ) + .await?; + refill_tasks.push(task); + } + } + Ok(refill_tasks) + } + + /// Starting to refill flows, if any error occurs, will rebuild the flow and retry + pub(crate) async fn starting_refill_flows( + &self, + manager: &FlowWorkerManagerRef, + ) -> Result>>, Error> { + let tasks = self.create_refill_flow_tasks(manager).await?; + // TODO(discord9): add a back pressure mechanism + let mut handles = Vec::new(); + for mut task in tasks { + let flow_metadata_manager = self.flow_metadata_manager(); + let manager = manager.clone(); + let handle: JoinHandle> = common_runtime::spawn_global(async move { + // if failed to refill, will rebuild the flow without refill + match task.handle_refill_inserts(manager.clone()).await { + Ok(()) => { + common_telemetry::info!( + "Successfully refill flow: flow_id={}", + task.flow_id + ); + } + Err(err) => { + common_telemetry::error!(err; "Failed to refill flow(id={}), will rebuild the flow with clean state", task.flow_id); + + let flow_id = task.flow_id; + manager.remove_flow(flow_id).await?; + let info = flow_metadata_manager + .flow_info_manager() + .get(flow_id as u32) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? + .context(FlowNotFoundSnafu { id: flow_id })?; + let args = CreateFlowArgs::from_flow_info(flow_id, info, true, true); + manager.create_flow(args).await?; + } + } + Ok(()) + }); + handles.push(handle); } - todo!() + Ok(handles) } } /// Task to refill flow with given table id and a time range pub struct RefillTask { + flow_id: FlowId, table_id: TableId, table_schema: RelationDesc, output_stream: SendableRecordBatchStream, @@ -109,6 +178,7 @@ impl RefillTask { /// Query with "select * from table WHERE time >= range_start and time < range_end" pub async fn create( invoker: &FrontendInvoker, + flow_id: FlowId, table_id: TableId, time_range: Option<(common_time::Timestamp, common_time::Timestamp)>, time_col_name: &str, @@ -163,6 +233,7 @@ impl RefillTask { let output_stream = output_stream.try_into_stream()?; Ok(RefillTask { + flow_id, table_id, table_schema, output_stream, @@ -179,7 +250,7 @@ impl RefillTask { while let Some(rb) = self.output_stream.next().await { let rb = match rb { Ok(rb) => rb, - Err(err) => Err(err).map_err(BoxedError::new).context(ExternalSnafu)?, + Err(err) => Err(BoxedError::new(err)).context(ExternalSnafu)?, }; self.validate_schema(&rb)?; diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs index 7981999f0abc..8b8ecd662ff2 100644 --- a/src/flow/src/adapter/table_source.rs +++ b/src/flow/src/adapter/table_source.rs @@ -42,6 +42,30 @@ impl TableSource { } } + pub async fn get_time_index_column_from_table_id( + &self, + table_id: &TableId, + ) -> Result { + let info = self + .table_info_manager + .get(*table_id) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? + .context(UnexpectedSnafu { + reason: format!("Table id = {:?}, couldn't found table info", table_id), + })?; + let raw_schema = &info.table_info.meta.schema; + let Some(ts_index) = raw_schema.timestamp_index else { + UnexpectedSnafu { + reason: format!("Table id = {:?}, couldn't found timestamp index", table_id), + } + .fail()? + }; + let col_schema = raw_schema.column_schemas[ts_index].clone(); + Ok(col_schema) + } + pub async fn get_table_id_from_proto_name( &self, name: &greptime_proto::v1::TableName, From 917aaa19dab38f570529b9b15725b835edaf6cac Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 24 Dec 2024 15:50:39 +0800 Subject: [PATCH 15/22] feat: refill impl --- src/cmd/src/flownode.rs | 16 +- src/cmd/src/standalone.rs | 21 +- src/flow/src/adapter.rs | 5 + src/flow/src/adapter/node_context.rs | 11 + src/flow/src/adapter/refill.rs | 382 +++++++++++++++++---------- src/flow/src/error.rs | 20 ++ src/flow/src/expr.rs | 1 + src/flow/src/expr/utils.rs | 9 +- src/flow/src/server.rs | 182 ++++++++++--- src/frontend/src/instance.rs | 4 + tests-integration/src/standalone.rs | 9 +- 11 files changed, 461 insertions(+), 199 deletions(-) diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index b399bf37f70d..1d21f54645fa 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -21,6 +21,7 @@ use clap::Parser; use client::client_manager::NodeClients; use common_base::Plugins; use common_config::Configurable; +use common_error::ext::BoxedError; use common_grpc::channel_manager::ChannelConfig; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler; @@ -38,8 +39,8 @@ use snafu::{OptionExt, ResultExt}; use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ - BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu, - MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu, + BuildCacheRegistrySnafu, BuildCliSnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, + MetaClientInitSnafu, MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu, }; use crate::options::{GlobalOptions, GreptimeOptions}; use crate::{log_versions, App}; @@ -301,7 +302,7 @@ impl StartCommand { Plugins::new(), table_metadata_manager, catalog_manager.clone(), - flow_metadata_manager, + flow_metadata_manager.clone(), ) .with_heartbeat_task(heartbeat_task); @@ -316,7 +317,7 @@ impl StartCommand { let client = Arc::new(NodeClients::new(channel_config)); let invoker = FrontendInvoker::build_from( - flownode.flow_worker_manager().clone(), + None, catalog_manager.clone(), cached_meta_backend.clone(), layered_cache_registry.clone(), @@ -330,6 +331,13 @@ impl StartCommand { .set_frontend_invoker(invoker) .await; + flownode + .flow_worker_manager() + .create_and_start_refill_flow_tasks(&flow_metadata_manager, &(catalog_manager as _)) + .await + .map_err(BoxedError::new) + .context(BuildCliSnafu)?; + Ok(Instance::new(flownode, guard)) } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index e3675a7db7c1..98153952d804 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -76,10 +76,10 @@ use tokio::sync::{broadcast, RwLock}; use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ - BuildCacheRegistrySnafu, CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, - InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, OtherSnafu, Result, - ShutdownDatanodeSnafu, ShutdownFlownodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, - StartFlownodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, + BuildCacheRegistrySnafu, BuildCliSnafu, CreateDirSnafu, IllegalConfigSnafu, + InitDdlManagerSnafu, InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, OtherSnafu, + Result, ShutdownDatanodeSnafu, ShutdownFlownodeSnafu, ShutdownFrontendSnafu, + StartDatanodeSnafu, StartFlownodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu, }; use crate::options::{GlobalOptions, GreptimeOptions}; @@ -580,7 +580,7 @@ impl StartCommand { layered_cache_registry.clone(), table_metadata_manager, table_meta_allocator, - flow_metadata_manager, + flow_metadata_manager.clone(), flow_meta_allocator, ) .await?; @@ -602,7 +602,7 @@ impl StartCommand { let flow_worker_manager = flownode.flow_worker_manager(); // flow server need to be able to use frontend to write insert requests back let invoker = FrontendInvoker::build_from( - flow_worker_manager.clone(), + Some(frontend.query_engine()), catalog_manager.clone(), kv_backend.clone(), layered_cache_registry.clone(), @@ -613,6 +613,15 @@ impl StartCommand { .context(StartFlownodeSnafu)?; flow_worker_manager.set_frontend_invoker(invoker).await; + if let Err(err) = flow_worker_manager + .create_and_start_refill_flow_tasks(&flow_metadata_manager, &(catalog_manager as _)) + .await + .map_err(BoxedError::new) + .context(BuildCliSnafu) + { + common_telemetry::error!(err; "failed to refill flow"); + } + let (tx, _rx) = broadcast::channel(1); let servers = Services::new(opts, Arc::new(frontend.clone()), plugins) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 03c2167a70c9..944b38d5159e 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -46,6 +46,7 @@ use tokio::sync::broadcast::error::TryRecvError; use tokio::sync::{broadcast, watch, Mutex, RwLock}; pub(crate) use crate::adapter::node_context::FlownodeContext; +use crate::adapter::refill::RefillTask; use crate::adapter::table_source::TableSource; use crate::adapter::util::{ relation_desc_to_column_schemas_with_fallback, table_info_value_to_relation_desc, @@ -136,6 +137,7 @@ pub struct FlowWorkerManager { frontend_invoker: RwLock>, /// contains mapping from table name to global id, and table schema node_context: RwLock, + refill_tasks: RwLock>, flow_err_collectors: RwLock>, src_send_buf_lens: RwLock>>, tick_manager: FlowTickManager, @@ -174,6 +176,7 @@ impl FlowWorkerManager { table_info_source: srv_map, frontend_invoker: RwLock::new(None), node_context: RwLock::new(node_context), + refill_tasks: Default::default(), flow_err_collectors: Default::default(), src_send_buf_lens: Default::default(), tick_manager, @@ -818,6 +821,8 @@ impl FlowWorkerManager { // construct a active dataflow state with it let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?; + node_ctx.add_flow_plan(flow_id, flow_plan.clone()); + debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan); // check schema against actual table schema if exists diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index e38e7898f4ef..38d8a8fc45a4 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -31,6 +31,7 @@ use crate::error::{Error, EvalSnafu, TableNotFoundSnafu}; use crate::expr::error::InternalSnafu; use crate::expr::{Batch, GlobalId}; use crate::metrics::METRIC_FLOW_INPUT_BUF_SIZE; +use crate::plan::TypedPlan; use crate::repr::{DiffRow, RelationDesc, BATCH_SIZE, BROADCAST_CAP, SEND_BUF_CAP}; /// A context that holds the information of the dataflow @@ -40,6 +41,7 @@ pub struct FlownodeContext { pub source_to_tasks: BTreeMap>, /// mapping from task to sink table, useful for sending data back to the client when a task is done running pub flow_to_sink: BTreeMap, + pub flow_plans: BTreeMap, pub sink_to_flow: BTreeMap, /// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender /// @@ -240,6 +242,14 @@ impl FlownodeContext { self.sink_to_flow.insert(sink_table_name, task_id); } + pub fn add_flow_plan(&mut self, task_id: FlowId, plan: TypedPlan) { + self.flow_plans.insert(task_id, plan); + } + + pub fn get_flow_plan(&self, task_id: &FlowId) -> Option { + self.flow_plans.get(task_id).cloned() + } + /// remove flow from worker context pub fn remove_flow(&mut self, task_id: FlowId) { if let Some(sink_table_name) = self.flow_to_sink.remove(&task_id) { @@ -251,6 +261,7 @@ impl FlownodeContext { self.source_sender.remove(source_table_id); } } + self.flow_plans.remove(&task_id); } /// try add source sender, if already exist, do nothing diff --git a/src/flow/src/adapter/refill.rs b/src/flow/src/adapter/refill.rs index 2e0be6011974..7d727745fe70 100644 --- a/src/flow/src/adapter/refill.rs +++ b/src/flow/src/adapter/refill.rs @@ -13,48 +13,56 @@ // limitations under the License. use std::collections::BTreeSet; +use std::sync::Arc; +use catalog::CatalogManagerRef; use common_error::ext::BoxedError; +use common_meta::key::flow::FlowMetadataManagerRef; use common_recordbatch::{RecordBatch, RecordBatches, SendableRecordBatchStream}; use common_runtime::JoinHandle; use datatypes::value::Value; use futures::StreamExt; use query::parser::QueryLanguageParser; -use session::context::QueryContext; +use session::context::QueryContextBuilder; use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; -use super::FlowId; +use super::{FlowId, FlowWorkerManager}; use crate::adapter::table_source::TableSource; -use crate::adapter::{CreateFlowArgs, FlowWorkerManagerRef}; -use crate::error::{FlowNotFoundSnafu, UnexpectedSnafu}; +use crate::adapter::FlowWorkerManagerRef; +use crate::error::{FlowNotFoundSnafu, JoinTaskSnafu, UnexpectedSnafu}; use crate::expr::error::ExternalSnafu; +use crate::expr::find_plan_time_window_expr_lower_bound; use crate::repr::RelationDesc; -use crate::{Error, FlownodeBuilder, FrontendInvoker}; +use crate::server::get_all_flow_ids; +use crate::{Error, FrontendInvoker}; + +impl FlowWorkerManager { + /// Create and start refill flow tasks in background + pub async fn create_and_start_refill_flow_tasks( + self: &FlowWorkerManagerRef, + flow_metadata_manager: &FlowMetadataManagerRef, + catalog_manager: &CatalogManagerRef, + ) -> Result<(), Error> { + let tasks = self + .create_refill_flow_tasks(flow_metadata_manager, catalog_manager) + .await?; + self.starting_refill_flows(tasks).await?; + Ok(()) + } -impl FlownodeBuilder { /// Create a series of tasks to refill flow - async fn create_refill_flow_tasks( + pub async fn create_refill_flow_tasks( &self, - manager: &FlowWorkerManagerRef, + flow_metadata_manager: &FlowMetadataManagerRef, + catalog_manager: &CatalogManagerRef, ) -> Result, Error> { - let nodeid = manager.node_id.map(|c| c as u64); + let nodeid = self.node_id.map(|c| c as u64); - let frontend_invoker = - manager - .frontend_invoker - .read() - .await - .clone() - .context(UnexpectedSnafu { - reason: "frontend invoker is not set", - })?; - - let flow_ids = self.get_all_flow_ids(nodeid).await?; + let flow_ids = get_all_flow_ids(flow_metadata_manager, catalog_manager, nodeid).await?; let mut refill_tasks = Vec::new(); for flow_id in flow_ids { - let info = self - .flow_metadata_manager() + let info = flow_metadata_manager .flow_info_manager() .get(flow_id) .await @@ -63,26 +71,32 @@ impl FlownodeBuilder { .context(FlowNotFoundSnafu { id: flow_id })?; let expire_after = info.expire_after(); // TODO(discord9): better way to get last point - let now = manager.tick_manager.tick(); - let time_range = expire_after.map(|e| { - ( - common_time::Timestamp::new_millisecond(now - e), - common_time::Timestamp::new_millisecond(now), - ) - }); + let now = self.tick_manager.tick(); + let plan = self + .node_context + .read() + .await + .get_flow_plan(&FlowId::from(flow_id)) + .context(FlowNotFoundSnafu { id: flow_id })?; + let time_range = if let Some(expire_after) = expire_after { + let low_bound = common_time::Timestamp::new_millisecond(now - expire_after); + let real_low_bound = find_plan_time_window_expr_lower_bound(&plan, low_bound)?; + real_low_bound.map(|l| (l, common_time::Timestamp::new_millisecond(now))) + } else { + None + }; for src_table in info.source_table_ids() { - let time_index_col = manager + let time_index_col = self .table_info_source .get_time_index_column_from_table_id(src_table) .await?; let time_index_name = time_index_col.name; let task = RefillTask::create( - &frontend_invoker, flow_id as u64, *src_table, time_range, &time_index_name, - &manager.table_info_source, + &self.table_info_source, ) .await?; refill_tasks.push(task); @@ -93,54 +107,169 @@ impl FlownodeBuilder { /// Starting to refill flows, if any error occurs, will rebuild the flow and retry pub(crate) async fn starting_refill_flows( - &self, - manager: &FlowWorkerManagerRef, - ) -> Result>>, Error> { - let tasks = self.create_refill_flow_tasks(manager).await?; + self: &FlowWorkerManagerRef, + tasks: Vec, + ) -> Result<(), Error> { // TODO(discord9): add a back pressure mechanism - let mut handles = Vec::new(); + let frontend_invoker = + self.frontend_invoker + .read() + .await + .clone() + .context(UnexpectedSnafu { + reason: "frontend invoker is not set", + })?; + for mut task in tasks { - let flow_metadata_manager = self.flow_metadata_manager(); - let manager = manager.clone(); - let handle: JoinHandle> = common_runtime::spawn_global(async move { - // if failed to refill, will rebuild the flow without refill - match task.handle_refill_inserts(manager.clone()).await { - Ok(()) => { - common_telemetry::info!( - "Successfully refill flow: flow_id={}", - task.flow_id - ); - } - Err(err) => { - common_telemetry::error!(err; "Failed to refill flow(id={}), will rebuild the flow with clean state", task.flow_id); - - let flow_id = task.flow_id; - manager.remove_flow(flow_id).await?; - let info = flow_metadata_manager - .flow_info_manager() - .get(flow_id as u32) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)? - .context(FlowNotFoundSnafu { id: flow_id })?; - let args = CreateFlowArgs::from_flow_info(flow_id, info, true, true); - manager.create_flow(args).await?; - } - } - Ok(()) - }); - handles.push(handle); + task.start_running(self.clone(), &frontend_invoker).await?; + // TODO(discord9): save refill tasks to a map and check if it's finished when necessary + // i.e. when system table need query it's state + self.refill_tasks + .write() + .await + .insert(task.data.flow_id, task); } - Ok(handles) + Ok(()) } } /// Task to refill flow with given table id and a time range pub struct RefillTask { + data: TaskData, + state: TaskState<()>, +} + +#[derive(Clone)] +struct TaskData { flow_id: FlowId, table_id: TableId, table_schema: RelationDesc, - output_stream: SendableRecordBatchStream, +} + +impl TaskData { + /// validate that incoming batch's schema is the same as table schema(by comparing types&names) + fn validate_schema(table_schema: &RelationDesc, rb: &RecordBatch) -> Result<(), Error> { + let rb_schema = &rb.schema; + if rb_schema.column_schemas().len() != table_schema.len()? { + UnexpectedSnafu { + reason: "rb schema len != table schema len", + } + .fail()?; + } + for (i, rb_col) in rb_schema.column_schemas().iter().enumerate() { + let (rb_name, rb_ty) = (rb_col.name.as_str(), &rb_col.data_type); + let (table_name, table_ty) = ( + table_schema.names[i].as_ref(), + &table_schema.typ().column_types[i].scalar_type, + ); + if Some(rb_name) != table_name.map(|c| c.as_str()) { + UnexpectedSnafu { + reason: format!( + "incoming batch's schema name {} != expected table schema name {:?}", + rb_name, table_name + ), + } + .fail()?; + } + + if rb_ty != table_ty { + UnexpectedSnafu { + reason: format!( + "incoming batch's schema type {:?} != expected table schema type {:?}", + rb_ty, table_ty + ), + } + .fail()?; + } + } + Ok(()) + } +} + +/// Refill task state +enum TaskState { + /// Task is not started + Prepared { sql: String }, + /// Task is running + Running { + handle: JoinHandle>, + }, + /// Task is finished + Finished { res: Result }, +} + +impl TaskState { + fn new(sql: String) -> Self { + Self::Prepared { sql } + } +} + +mod test_send { + use std::collections::BTreeMap; + + use tokio::sync::RwLock; + + use super::*; + fn is_send() {} + fn foo() { + is_send::>(); + is_send::(); + is_send::>(); + is_send::>>(); + } +} + +impl TaskState<()> { + /// check if task is finished + async fn is_finished(&mut self) -> Result { + match self { + Self::Finished { .. } => Ok(true), + Self::Running { handle } => Ok(if handle.is_finished() { + *self = Self::Finished { + res: handle.await.context(JoinTaskSnafu)?, + }; + true + } else { + false + }), + _ => Ok(false), + } + } + + fn start_running( + &mut self, + task_data: &TaskData, + manager: FlowWorkerManagerRef, + mut output_stream: SendableRecordBatchStream, + ) -> Result<(), Error> { + let data = (*task_data).clone(); + let handle: JoinHandle> = common_runtime::spawn_global(async move { + while let Some(rb) = output_stream.next().await { + let rb = match rb { + Ok(rb) => rb, + Err(err) => Err(BoxedError::new(err)).context(ExternalSnafu)?, + }; + TaskData::validate_schema(&data.table_schema, &rb)?; + + // send rb into flow node + manager + .node_context + .read() + .await + .send_rb(data.table_id, rb) + .await?; + } + common_telemetry::info!( + "Refill successful for source table_id={}, flow_id={}", + data.table_id, + data.flow_id + ); + Ok(()) + }); + *self = Self::Running { handle }; + + Ok(()) + } } /// Query stream of RefillTask, simply wrap RecordBatches and RecordBatchStream and check output is not `AffectedRows` @@ -177,7 +306,6 @@ impl QueryStream { impl RefillTask { /// Query with "select * from table WHERE time >= range_start and time < range_end" pub async fn create( - invoker: &FrontendInvoker, flow_id: FlowId, table_id: TableId, time_range: Option<(common_time::Timestamp, common_time::Timestamp)>, @@ -214,17 +342,50 @@ impl RefillTask { format!("select * from {0}", table_name.join(".")) }; + Ok(RefillTask { + data: TaskData { + flow_id, + table_id, + table_schema, + }, + state: TaskState::new(sql), + }) + } + + /// Start running the task in background, non-blocking + pub async fn start_running( + &mut self, + manager: FlowWorkerManagerRef, + invoker: &FrontendInvoker, + ) -> Result<(), Error> { + let TaskState::Prepared { sql } = &mut self.state else { + UnexpectedSnafu { + reason: "task is not prepared", + } + .fail()? + }; + // we don't need information from query context in this query so a default query context is enough - let query_ctx = QueryContext::arc(); + let query_ctx = Arc::new( + QueryContextBuilder::default() + .current_catalog("greptime".to_string()) + .current_schema("public".to_string()) + .build(), + ); - let stmt = QueryLanguageParser::parse_sql(&sql, &query_ctx) + let stmt_exec = invoker.statement_executor(); + + let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let plan = stmt_exec + .plan(&stmt, query_ctx.clone()) + .await .map_err(BoxedError::new) .context(ExternalSnafu)?; - - let stmt_exec = invoker.statement_executor(); let output_data = stmt_exec - .execute_stmt(stmt, query_ctx) + .exec_plan(plan, query_ctx) .await .map_err(BoxedError::new) .context(ExternalSnafu)?; @@ -232,75 +393,12 @@ impl RefillTask { let output_stream = QueryStream::try_from(output_data)?; let output_stream = output_stream.try_into_stream()?; - Ok(RefillTask { - flow_id, - table_id, - table_schema, - output_stream, - }) - } - - /// handle refill insert requests - /// - /// TODO(discord9): add a back pressure mechanism - pub async fn handle_refill_inserts( - &mut self, - manager: FlowWorkerManagerRef, - ) -> Result<(), Error> { - while let Some(rb) = self.output_stream.next().await { - let rb = match rb { - Ok(rb) => rb, - Err(err) => Err(BoxedError::new(err)).context(ExternalSnafu)?, - }; - self.validate_schema(&rb)?; - - // send rb into flow node - manager - .node_context - .read() - .await - .send_rb(self.table_id, rb) - .await?; - } + self.state + .start_running(&self.data, manager, output_stream)?; Ok(()) } - /// validate that incoming batch's schema is the same as table schema(by comparing types&names) - fn validate_schema(&self, rb: &RecordBatch) -> Result<(), Error> { - let rb_schema = &rb.schema; - let table_schema = &self.table_schema; - if rb_schema.column_schemas().len() != table_schema.len()? { - UnexpectedSnafu { - reason: "rb schema len != table schema len", - } - .fail()?; - } - for (i, rb_col) in rb_schema.column_schemas().iter().enumerate() { - let (rb_name, rb_ty) = (rb_col.name.as_str(), &rb_col.data_type); - let (table_name, table_ty) = ( - table_schema.names[i].as_ref(), - &table_schema.typ().column_types[i].scalar_type, - ); - if Some(rb_name) != table_name.map(|c| c.as_str()) { - UnexpectedSnafu { - reason: format!( - "incoming batch's schema name {} != expected table schema name {:?}", - rb_name, table_name - ), - } - .fail()?; - } - - if rb_ty != table_ty { - UnexpectedSnafu { - reason: format!( - "incoming batch's schema type {:?} != expected table schema type {:?}", - rb_ty, table_ty - ), - } - .fail()?; - } - } - Ok(()) + pub async fn is_finished(&mut self) -> Result { + self.state.is_finished().await } } diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index f12b15593c22..11a79a050b21 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -46,6 +46,21 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to query"))] + RequestQuery { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, + + #[snafu(display("Failed to find table route for table id {}", table_id))] + FindTableRoute { + table_id: u32, + #[snafu(implicit)] + location: Location, + source: partition::error::Error, + }, + /// TODO(discord9): add detailed location of column #[snafu(display("Failed to eval stream"))] Eval { @@ -221,6 +236,11 @@ impl ErrorExt for Error { Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => { StatusCode::Unsupported } + + Self::FindTableRoute { source, .. } => source.status_code(), + + Self::RequestQuery { source, .. } => source.status_code(), + Self::External { source, .. } => source.status_code(), Self::Internal { .. } | Self::CacheRequired { .. } => StatusCode::Internal, Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => { diff --git a/src/flow/src/expr.rs b/src/flow/src/expr.rs index ea05821498f6..a81c16ca79f5 100644 --- a/src/flow/src/expr.rs +++ b/src/flow/src/expr.rs @@ -37,6 +37,7 @@ pub(crate) use linear::{MapFilterProject, MfpPlan, SafeMfpPlan}; pub(crate) use relation::{Accum, Accumulator, AggregateExpr, AggregateFunc}; pub(crate) use scalar::{ScalarExpr, TypedExpr}; use snafu::{ensure, ResultExt}; +pub(crate) use utils::find_plan_time_window_expr_lower_bound; use crate::expr::error::{ArrowSnafu, DataTypeSnafu}; use crate::repr::Diff; diff --git a/src/flow/src/expr/utils.rs b/src/flow/src/expr/utils.rs index b0b69910073b..84019c958979 100644 --- a/src/flow/src/expr/utils.rs +++ b/src/flow/src/expr/utils.rs @@ -23,10 +23,13 @@ use crate::expr::ScalarExpr; use crate::plan::TypedPlan; use crate::Result; -/// Find lower bound for time `current` in given `plan` +/// Find lower bound for time `current` in given `plan` for the time window expr. +/// +/// i.e. for time window expr being `date_bin(INTERVAL '5 minutes', ts) as time_window` and `current="2021-07-01 00:01:01.000"`, +/// return `Some("2021-07-01 00:00:00.000")` /// /// if `plan` doesn't contain a `TIME INDEX` column, return `None` -pub fn find_plan_time_lower_bound( +pub fn find_plan_time_window_expr_lower_bound( plan: &TypedPlan, current: common_time::Timestamp, ) -> Result> { @@ -251,7 +254,7 @@ mod test { expected.map(|expected| common_time::Timestamp::from_str(expected, None).unwrap()); assert_eq!( - find_plan_time_lower_bound(&flow_plan, current).unwrap(), + find_plan_time_window_expr_lower_bound(&flow_plan, current).unwrap(), expected ); } diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index b4ba8a535471..dcb04888d249 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -28,16 +28,21 @@ use common_meta::key::flow::FlowMetadataManagerRef; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::{Flownode, NodeManagerRef}; +use common_query::request::QueryRequest; use common_query::Output; +use common_recordbatch::SendableRecordBatchStream; use common_telemetry::tracing::info; use futures::{FutureExt, TryStreamExt}; use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests}; use itertools::Itertools; use operator::delete::Deleter; use operator::insert::Inserter; +use operator::request::Requester; use operator::statement::StatementExecutor; -use partition::manager::PartitionRuleManager; -use query::{QueryEngine, QueryEngineFactory}; +use operator::table::TableMutationOperator; +use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; +use query::region_query::RegionQueryHandler; +use query::{QueryEngine, QueryEngineFactory, QueryEngineRef}; use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu}; use servers::server::Server; use session::context::{QueryContextBuilder, QueryContextRef}; @@ -52,6 +57,8 @@ use crate::adapter::{CreateFlowArgs, FlowWorkerManagerRef}; use crate::error::{ to_status_with_last_err, CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, + FindTableRouteSnafu, + RequestQuerySnafu, }; use crate::heartbeat::HeartbeatTask; use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS}; @@ -328,43 +335,6 @@ impl FlownodeBuilder { Ok(instance) } - pub(crate) async fn get_all_flow_ids(&self, nodeid: Option) -> Result, Error> { - let ret = if let Some(nodeid) = nodeid { - let flow_ids_one_node = self - .flow_metadata_manager - .flownode_flow_manager() - .flows(nodeid) - .try_collect::>() - .await - .context(ListFlowsSnafu { id: Some(nodeid) })?; - flow_ids_one_node.into_iter().map(|(id, _)| id).collect() - } else { - let all_catalogs = self - .catalog_manager - .catalog_names() - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - let mut all_flow_ids = vec![]; - for catalog in all_catalogs { - let flows = self - .flow_metadata_manager - .flow_name_manager() - .flow_names(&catalog) - .await - .try_collect::>() - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - - all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id())); - } - all_flow_ids - }; - - Ok(ret) - } - /// recover all flow tasks in this flownode in distributed mode(nodeid is Some()) /// /// or recover all existing flow tasks if in standalone mode(nodeid is None) @@ -375,7 +345,8 @@ impl FlownodeBuilder { manager: &FlowWorkerManagerRef, ) -> Result { let nodeid = self.opts.node_id; - let to_be_recovered = self.get_all_flow_ids(nodeid).await?; + let to_be_recovered = + get_all_flow_ids(&self.flow_metadata_manager, &self.catalog_manager, nodeid).await?; let cnt = to_be_recovered.len(); // TODO(discord9): recover in parallel @@ -455,6 +426,7 @@ impl FlownodeBuilder { } } +#[derive(Clone)] pub struct FrontendInvoker { inserter: Arc, deleter: Arc, @@ -474,8 +446,11 @@ impl FrontendInvoker { } } + /// Build a frontend invoker, + /// + /// if `query_engine` is not specified, will build one without ability to execute ddl or flow, only insert/delete and query pub async fn build_from( - flow_worker_manager: FlowWorkerManagerRef, + query_engine: Option, catalog_manager: CatalogManagerRef, kv_backend: KvBackendRef, layered_cache_registry: LayeredCacheRegistryRef, @@ -510,7 +485,35 @@ impl FrontendInvoker { node_manager.clone(), )); - let query_engine = flow_worker_manager.query_engine.clone(); + let query_engine = if let Some(query_engine) = query_engine { + query_engine + } else { + // make frontend like query engine + let region_query_handler = + FlownodeRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone()); + + let requester = Arc::new(Requester::new( + catalog_manager.clone(), + partition_manager.clone(), + node_manager.clone(), + )); + + let table_mutation_handler = Arc::new(TableMutationOperator::new( + inserter.clone(), + deleter.clone(), + requester, + )); + + QueryEngineFactory::new( + catalog_manager.clone(), + Some(region_query_handler.clone()), + Some(table_mutation_handler), + None, + None, + true, + ) + .query_engine() + }; let statement_executor = Arc::new(StatementExecutor::new( catalog_manager.clone(), @@ -568,3 +571,98 @@ impl FrontendInvoker { self.statement_executor.clone() } } + +/// get all flow ids in this flownode +pub(crate) async fn get_all_flow_ids( + flow_metadata_manager: &FlowMetadataManagerRef, + catalog_manager: &CatalogManagerRef, + nodeid: Option, +) -> Result, Error> { + let ret = if let Some(nodeid) = nodeid { + let flow_ids_one_node = flow_metadata_manager + .flownode_flow_manager() + .flows(nodeid) + .try_collect::>() + .await + .context(ListFlowsSnafu { id: Some(nodeid) })?; + flow_ids_one_node.into_iter().map(|(id, _)| id).collect() + } else { + let all_catalogs = catalog_manager + .catalog_names() + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let mut all_flow_ids = vec![]; + for catalog in all_catalogs { + let flows = flow_metadata_manager + .flow_name_manager() + .flow_names(&catalog) + .await + .try_collect::>() + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id())); + } + all_flow_ids + }; + + Ok(ret) +} + +/// a makeshift region query handler so that flownode can proactively query for +/// data it want +pub(crate) struct FlownodeRegionQueryHandler { + partition_manager: PartitionRuleManagerRef, + node_manager: NodeManagerRef, +} + +impl FlownodeRegionQueryHandler { + pub fn arc( + partition_manager: PartitionRuleManagerRef, + node_manager: NodeManagerRef, + ) -> Arc { + Arc::new(Self { + partition_manager, + node_manager, + }) + } +} + +#[async_trait::async_trait] +impl RegionQueryHandler for FlownodeRegionQueryHandler { + async fn do_get( + &self, + request: QueryRequest, + ) -> query::error::Result { + self.do_get_inner(request) + .await + .map_err(BoxedError::new) + .context(query::error::RegionQuerySnafu) + } +} + +impl FlownodeRegionQueryHandler { + async fn do_get_inner( + &self, + request: QueryRequest, + ) -> Result { + let region_id = request.region_id; + + let peer = &self + .partition_manager + .find_region_leader(region_id) + .await + .context(FindTableRouteSnafu { + table_id: region_id.table_id(), + })?; + + let client = self.node_manager.datanode(peer).await; + + client + .handle_query(request) + .await + .context(RequestQuerySnafu) + } +} diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index be17ee810fc0..fea67e31b854 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -196,6 +196,10 @@ impl Instance { pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef { &self.table_metadata_manager } + + pub fn query_engine(&self) -> QueryEngineRef { + self.query_engine.clone() + } } #[async_trait] diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index cc2458fa99cb..50c7127d74b8 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -209,7 +209,7 @@ impl GreptimeDbStandaloneBuilder { memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), table_metadata_manager, table_metadata_allocator, - flow_metadata_manager, + flow_metadata_manager: flow_metadata_manager.clone(), flow_metadata_allocator, region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), }, @@ -235,7 +235,7 @@ impl GreptimeDbStandaloneBuilder { let flow_worker_manager = flownode.flow_worker_manager(); let invoker = flow::FrontendInvoker::build_from( - flow_worker_manager.clone(), + Some(instance.query_engine()), catalog_manager.clone(), kv_backend.clone(), cache_registry.clone(), @@ -248,6 +248,11 @@ impl GreptimeDbStandaloneBuilder { flow_worker_manager.set_frontend_invoker(invoker).await; + flow_worker_manager + .create_and_start_refill_flow_tasks(&flow_metadata_manager, &(catalog_manager as _)) + .await + .unwrap(); + procedure_manager.start().await.unwrap(); wal_options_allocator.start().await.unwrap(); From 149951b7e0991c68825e3d3cce670b29f84dadf8 Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 24 Dec 2024 17:22:48 +0800 Subject: [PATCH 16/22] dbg: more debug logs --- src/flow/src/adapter/refill.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/flow/src/adapter/refill.rs b/src/flow/src/adapter/refill.rs index 7d727745fe70..2e388dfaa2ab 100644 --- a/src/flow/src/adapter/refill.rs +++ b/src/flow/src/adapter/refill.rs @@ -85,6 +85,13 @@ impl FlowWorkerManager { } else { None }; + + common_telemetry::debug!( + "Time range for refill flow_id={} is {:?}", + flow_id, + time_range + ); + for src_table in info.source_table_ids() { let time_index_col = self .table_info_source From 08385300f06ba952749d3b18a04c37288dee7895 Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 24 Dec 2024 20:48:27 +0800 Subject: [PATCH 17/22] fix: hang due to mis calc row cnt --- src/flow/src/adapter/node_context.rs | 32 ++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 38d8a8fc45a4..0ceca02a63ef 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use common_recordbatch::RecordBatch; use common_telemetry::trace; +use common_telemetry::tracing_subscriber::field::debug; use datatypes::prelude::ConcreteDataType; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; @@ -109,7 +110,17 @@ impl SourceSender { // TODO(discord9): send rows instead so it's just moving a point if let Some(batch) = send_buf.recv().await { let len = batch.row_count(); - self.send_buf_row_cnt.fetch_sub(len, Ordering::SeqCst); + let prev = self.send_buf_row_cnt.fetch_sub(len, Ordering::SeqCst); + if prev < len { + InternalSnafu { + reason: format!( + "send_buf_row_cnt underflow, prev = {}, current = {}", + prev, + prev.wrapping_sub(len) + ), + } + .fail()? + } row_cnt += len; self.sender .send(batch) @@ -142,9 +153,24 @@ impl SourceSender { ) -> Result { let rows_len = rows.len(); METRIC_FLOW_INPUT_BUF_SIZE.add(rows_len as _); - while self.send_buf_row_cnt.load(Ordering::SeqCst) >= BATCH_SIZE * 4 { + let mut retry = 0; + let max_retry = 100; + while self.send_buf_row_cnt.load(Ordering::SeqCst) >= BATCH_SIZE * 4 && retry < max_retry { + common_telemetry::debug!("Send buf is full, waiting for it to be flushed"); + retry += 1; tokio::task::yield_now().await; } + + if retry >= max_retry { + return crate::error::InternalSnafu { + reason: format!( + "Send buf is full(len={}), fail to send", + self.send_buf_row_cnt.load(Ordering::SeqCst) + ), + } + .fail()?; + } + // row count metrics is approx so relaxed order is ok self.send_buf_row_cnt.fetch_add(rows_len, Ordering::SeqCst); let batch = Batch::try_from_rows_with_types( @@ -168,6 +194,8 @@ impl SourceSender { let row_cnt = batch.num_rows(); let batch = Batch::from(batch); + self.send_buf_row_cnt.fetch_add(row_cnt, Ordering::SeqCst); + self.send_buf_tx.send(batch).await.map_err(|e| { crate::error::InternalSnafu { reason: format!("Failed to send batch, error = {:?}", e), From a60338584fe651e5511d6672e58c434f8e28bbbe Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 25 Dec 2024 20:01:48 +0800 Subject: [PATCH 18/22] tests: sqlness --- .../common/flow/flow_rebuild.result | 4 ++-- tests/runner/src/env.rs | 24 ++++++++++--------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/tests/cases/standalone/common/flow/flow_rebuild.result b/tests/cases/standalone/common/flow/flow_rebuild.result index 67fd43a03288..0ecc1726eea1 100644 --- a/tests/cases/standalone/common/flow/flow_rebuild.result +++ b/tests/cases/standalone/common/flow/flow_rebuild.result @@ -463,7 +463,7 @@ SELECT wildcard FROM out_basic; +----------+ | wildcard | +----------+ -| 3 | +| 6 | +----------+ DROP TABLE input_basic; @@ -561,7 +561,7 @@ SELECT wildcard FROM out_basic; +----------+ | wildcard | +----------+ -| 3 | +| 5 | +----------+ DROP FLOW test_wildcard_basic; diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 81bbe2fb0b07..4046f7df3828 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -482,6 +482,7 @@ impl Env { Env::stop_server(server_process); } } + if is_full_restart { if let Some(mut metasrv_process) = db.metasrv_process.lock().expect("poisoned lock").take() @@ -493,11 +494,12 @@ impl Env { { Env::stop_server(&mut frontend_process); } - if let Some(mut flownode_process) = - db.flownode_process.lock().expect("poisoned lock").take() - { - Env::stop_server(&mut flownode_process); - } + } + + if let Some(mut flownode_process) = + db.flownode_process.lock().expect("poisoned lock").take() + { + Env::stop_server(&mut flownode_process); } } @@ -531,13 +533,13 @@ impl Env { .lock() .expect("lock poisoned") .replace(frontend); - - let flownode = self.start_server("flownode", &db.ctx, false).await; - db.flownode_process - .lock() - .expect("lock poisoned") - .replace(flownode); } + let flownode = self.start_server("flownode", &db.ctx, false).await; + db.flownode_process + .lock() + .expect("lock poisoned") + .replace(flownode); + processes }; From 0e11812e4cdec6807fc356e2ca76847d39207d53 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 26 Dec 2024 14:16:07 +0800 Subject: [PATCH 19/22] fix: handle src table missing --- src/cmd/src/flownode.rs | 7 +++-- src/flow/src/adapter.rs | 6 +++++ src/flow/src/adapter/node_context.rs | 1 - src/flow/src/adapter/refill.rs | 16 ++++++++++- src/flow/src/adapter/table_source.rs | 10 +++++++ src/flow/src/server.rs | 27 +++++++++++++++---- .../common/flow/show_create_flow.result | 4 +++ 7 files changed, 62 insertions(+), 9 deletions(-) diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index 1d21f54645fa..695787e7f1c2 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -331,12 +331,15 @@ impl StartCommand { .set_frontend_invoker(invoker) .await; - flownode + if let Err(err) = flownode .flow_worker_manager() .create_and_start_refill_flow_tasks(&flow_metadata_manager, &(catalog_manager as _)) .await .map_err(BoxedError::new) - .context(BuildCliSnafu)?; + .context(BuildCliSnafu) + { + common_telemetry::error!(?err, "Failed to create and start refill flow tasks"); + } Ok(Instance::new(flownode, guard)) } diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 944b38d5159e..0099e8c1780d 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -137,6 +137,7 @@ pub struct FlowWorkerManager { frontend_invoker: RwLock>, /// contains mapping from table name to global id, and table schema node_context: RwLock, + /// Contains all refill tasks refill_tasks: RwLock>, flow_err_collectors: RwLock>, src_send_buf_lens: RwLock>>, @@ -734,6 +735,11 @@ impl CreateFlowArgs { /// Create&Remove flow impl FlowWorkerManager { + /// Get table info source + pub fn table_info_source(&self) -> &TableSource { + &self.table_info_source + } + /// remove a flow by it's id pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> { for handle in self.worker_handles.iter() { diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 0ceca02a63ef..3943ff44ed36 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -20,7 +20,6 @@ use std::sync::Arc; use common_recordbatch::RecordBatch; use common_telemetry::trace; -use common_telemetry::tracing_subscriber::field::debug; use datatypes::prelude::ConcreteDataType; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; diff --git a/src/flow/src/adapter/refill.rs b/src/flow/src/adapter/refill.rs index 2e388dfaa2ab..324bb38bfe6c 100644 --- a/src/flow/src/adapter/refill.rs +++ b/src/flow/src/adapter/refill.rs @@ -20,6 +20,7 @@ use common_error::ext::BoxedError; use common_meta::key::flow::FlowMetadataManagerRef; use common_recordbatch::{RecordBatch, RecordBatches, SendableRecordBatchStream}; use common_runtime::JoinHandle; +use common_telemetry::error; use datatypes::value::Value; use futures::StreamExt; use query::parser::QueryLanguageParser; @@ -61,7 +62,7 @@ impl FlowWorkerManager { let flow_ids = get_all_flow_ids(flow_metadata_manager, catalog_manager, nodeid).await?; let mut refill_tasks = Vec::new(); - for flow_id in flow_ids { + 'flow_id_loop: for flow_id in flow_ids { let info = flow_metadata_manager .flow_info_manager() .get(flow_id) @@ -69,6 +70,19 @@ impl FlowWorkerManager { .map_err(BoxedError::new) .context(ExternalSnafu)? .context(FlowNotFoundSnafu { id: flow_id })?; + + // TODO(discord9): also check flow is already running + for src_table in info.source_table_ids() { + // check if source table still exists + if !self.table_info_source.check_table_exist(src_table).await? { + error!( + "Source table id = {:?} not found while refill flow_id={}, consider re-create the flow if necessary", + src_table, flow_id + ); + continue 'flow_id_loop; + } + } + let expire_after = info.expire_after(); // TODO(discord9): better way to get last point let now = self.tick_manager.tick(); diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs index 8b8ecd662ff2..9882f65e7229 100644 --- a/src/flow/src/adapter/table_source.rs +++ b/src/flow/src/adapter/table_source.rs @@ -99,6 +99,16 @@ impl TableSource { Ok(ret) } + pub async fn check_table_exist(&self, table_id: &TableId) -> Result { + Ok(self + .table_info_manager + .get(*table_id) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? + .is_some()) + } + /// query metasrv about the table name and table id pub async fn get_table_name(&self, table_id: &TableId) -> Result { self.table_info_manager diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index dcb04888d249..c413201183e8 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -25,7 +25,7 @@ use common_error::ext::BoxedError; use common_meta::cache::{LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef}; use common_meta::ddl::ProcedureExecutorRef; use common_meta::key::flow::FlowMetadataManagerRef; -use common_meta::key::TableMetadataManagerRef; +use common_meta::key::{FlowId, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::{Flownode, NodeManagerRef}; use common_query::request::QueryRequest; @@ -339,18 +339,20 @@ impl FlownodeBuilder { /// /// or recover all existing flow tasks if in standalone mode(nodeid is None) /// + /// return all flow ids that are successfully recovered + /// /// TODO(discord9): persistent flow tasks with internal state pub(crate) async fn recover_flows( &self, manager: &FlowWorkerManagerRef, - ) -> Result { + ) -> Result, Error> { let nodeid = self.opts.node_id; let to_be_recovered = get_all_flow_ids(&self.flow_metadata_manager, &self.catalog_manager, nodeid).await?; - let cnt = to_be_recovered.len(); + let mut did_recover = vec![]; // TODO(discord9): recover in parallel - for flow_id in to_be_recovered { + 'flow_id_loop: for flow_id in to_be_recovered { let info = self .flow_metadata_manager .flow_info_manager() @@ -365,6 +367,20 @@ impl FlownodeBuilder { info.sink_table_name().schema_name.clone(), info.sink_table_name().table_name.clone(), ]; + + let source_table_ids = info.source_table_ids().to_vec(); + + for src in &source_table_ids { + if !manager.table_info_source().check_table_exist(src).await? { + common_telemetry::error!( + "source table_id={:?} not found, skip recover flow_id={:?}", + src, + flow_id + ); + continue 'flow_id_loop; + } + } + let args = CreateFlowArgs { flow_id: flow_id as _, sink_table_name, @@ -385,9 +401,10 @@ impl FlownodeBuilder { ), }; manager.create_flow(args).await?; + did_recover.push(flow_id); } - Ok(cnt) + Ok(did_recover) } /// build [`FlowWorkerManager`], note this doesn't take ownership of `self`, diff --git a/tests/cases/standalone/common/flow/show_create_flow.result b/tests/cases/standalone/common/flow/show_create_flow.result index 14e80129446d..02a3efb997c4 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.result +++ b/tests/cases/standalone/common/flow/show_create_flow.result @@ -303,7 +303,11 @@ SELECT * FROM out_num_cnt_show; +--------+-------------------------+ | -2 | 1970-01-01T00:00:00.002 | | -1 | 1970-01-01T00:00:00.003 | +| 3 | 1970-01-01T00:00:00.001 | | 4 | 1970-01-01T00:00:00.002 | +| 4 | 1970-01-01T00:00:00.004 | +| 5 | 1970-01-01T00:00:00.004 | +| 10 | 1970-01-01T00:00:00 | | 10 | 1970-01-01T00:00:00.003 | | 11 | 1970-01-01T00:00:00.004 | | 15 | 1970-01-01T00:00:00.001 | From 294c4c5c9ae47fe1eb150d44e2b7e138c3e76ed5 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 26 Dec 2024 14:32:20 +0800 Subject: [PATCH 20/22] chore: after rebase --- src/flow/src/error.rs | 4 ++-- src/flow/src/server.rs | 11 +++-------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index 11a79a050b21..e78e0d6fa33c 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -20,9 +20,9 @@ use common_error::ext::BoxedError; use common_error::{define_into_tonic_status, from_err_code_msg_to_header}; use common_macro::stack_trace_debug; use common_telemetry::common_error::ext::ErrorExt; -use common_telemetry::common_error::status_code::StatusCode;\ -use tonic::metadata::MetadataMap; +use common_telemetry::common_error::status_code::StatusCode; use snafu::{Location, ResultExt, Snafu}; +use tonic::metadata::MetadataMap; use crate::adapter::FlowId; use crate::expr::EvalError; diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index c413201183e8..4e01958b1c4f 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -55,10 +55,9 @@ use tonic::{Request, Response, Status}; use crate::adapter::{CreateFlowArgs, FlowWorkerManagerRef}; use crate::error::{ - to_status_with_last_err, CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu, - ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, - FindTableRouteSnafu, - RequestQuerySnafu, + to_status_with_last_err, CacheRequiredSnafu, ExternalSnafu, FindTableRouteSnafu, + FlowNotFoundSnafu, ListFlowsSnafu, ParseAddrSnafu, RequestQuerySnafu, ShutdownServerSnafu, + StartServerSnafu, UnexpectedSnafu, }; use crate::heartbeat::HeartbeatTask; use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS}; @@ -583,10 +582,6 @@ impl FrontendInvoker { .map_err(BoxedError::new) .context(common_frontend::error::ExternalSnafu) } - - pub fn statement_executor(&self) -> Arc { - self.statement_executor.clone() - } } /// get all flow ids in this flownode From ebe053dbf3fd202544104a8f3e12ae9b0d199b1f Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 26 Dec 2024 14:56:00 +0800 Subject: [PATCH 21/22] refactor: more resilent flow recover --- src/flow/src/server.rs | 47 ++++++++++++++++-------------------------- 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 4e01958b1c4f..88195481a5cb 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -31,7 +31,7 @@ use common_meta::node_manager::{Flownode, NodeManagerRef}; use common_query::request::QueryRequest; use common_query::Output; use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::tracing::info; +use common_telemetry::{info, warn}; use futures::{FutureExt, TryStreamExt}; use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests}; use itertools::Itertools; @@ -45,7 +45,7 @@ use query::region_query::RegionQueryHandler; use query::{QueryEngine, QueryEngineFactory, QueryEngineRef}; use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu}; use servers::server::Server; -use session::context::{QueryContextBuilder, QueryContextRef}; +use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; use tokio::net::TcpListener; use tokio::sync::{broadcast, oneshot, Mutex}; @@ -361,12 +361,6 @@ impl FlownodeBuilder { .context(ExternalSnafu)? .context(FlowNotFoundSnafu { id: flow_id })?; - let sink_table_name = [ - info.sink_table_name().catalog_name.clone(), - info.sink_table_name().schema_name.clone(), - info.sink_table_name().table_name.clone(), - ]; - let source_table_ids = info.source_table_ids().to_vec(); for src in &source_table_ids { @@ -380,27 +374,22 @@ impl FlownodeBuilder { } } - let args = CreateFlowArgs { - flow_id: flow_id as _, - sink_table_name, - source_table_ids: info.source_table_ids().to_vec(), - // because recover should only happen on restart the `create_if_not_exists` and `or_replace` can be arbitrary value(since flow doesn't exist) - // but for the sake of consistency and to make sure recover of flow actually happen, we set both to true - // (which is also fine since checks for not allow both to be true is on metasrv and we already pass that) - create_if_not_exists: true, - or_replace: true, - expire_after: info.expire_after(), - comment: Some(info.comment().clone()), - sql: info.raw_sql().clone(), - flow_options: info.options().clone(), - query_ctx: Some( - QueryContextBuilder::default() - .current_catalog(info.catalog_name().clone()) - .build(), - ), - }; - manager.create_flow(args).await?; - did_recover.push(flow_id); + let args = CreateFlowArgs::from_flow_info(flow_id as _, info, true, true); + + match manager.create_flow(args).await { + Ok(Some(res)) => { + did_recover.push(res as FlowId); + } + Ok(None) => { + warn!( + "Failed to recover flow_id={:?}, flow already exists", + flow_id + ); + } + Err(err) => { + common_telemetry::error!(err; "Failed to recover flow_id={:?}", flow_id); + } + } } Ok(did_recover) From 70d2465fe430c6e369c9734fd1ae29965b90fefd Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 26 Dec 2024 16:03:23 +0800 Subject: [PATCH 22/22] todo: adjust flow's time index --- src/flow/src/adapter.rs | 185 ++++++++++-------- src/flow/src/adapter/node_context.rs | 1 + .../standalone/common/flow/flow_refill.result | 0 .../standalone/common/flow/flow_refill.sql | 1 + 4 files changed, 107 insertions(+), 80 deletions(-) create mode 100644 tests/cases/standalone/common/flow/flow_refill.result create mode 100644 tests/cases/standalone/common/flow/flow_refill.sql diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 0099e8c1780d..401edd4e3b8b 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -60,6 +60,7 @@ use crate::error::{ }; use crate::expr::Batch; use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS}; +use crate::plan::TypedPlan; use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE}; mod flownode_impl; @@ -753,6 +754,108 @@ impl FlowWorkerManager { Ok(()) } + /// adjust flow plan's time index to match real table schema + async fn fix_time_index_for_flow_plan( + &self, + flow_plan: &TypedPlan, + real_schema: &[ColumnSchema], + ) -> Result { + todo!() + } + + ///// check schema against actual table schema if exists + /// if not exist create sink table immediately + async fn valid_or_create_sink_table( + &self, + flow_id: FlowId, + flow_plan: &TypedPlan, + sink_table_name: &TableName, + node_ctx: &mut FlownodeContext, + ) -> Result<(), Error> { + if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(sink_table_name).await? { + let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema); + + // for column schema, only `data_type` need to be check for equality + // since one can omit flow's column name when write flow query + // print a user friendly error message about mismatch and how to correct them + for (idx, zipped) in auto_schema + .iter() + .zip_longest(real_schema.iter()) + .enumerate() + { + match zipped { + EitherOrBoth::Both(auto, real) => { + if auto.data_type != real.data_type { + InvalidQuerySnafu { + reason: format!( + "Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}", + idx, + real.name, + auto.name, + real.data_type, + auto.data_type + ), + } + .fail()?; + } + } + EitherOrBoth::Right(real) if real.data_type.is_timestamp() => { + // if table is auto created, the last one or two column should be timestamp(update at and ts placeholder) + continue; + } + _ => InvalidQuerySnafu { + reason: format!( + "schema length mismatched, expected {} found {}", + real_schema.len(), + auto_schema.len() + ), + } + .fail()?, + } + } + + let table_id = self + .table_info_source + .get_table_id_from_name(sink_table_name) + .await? + .context(UnexpectedSnafu { + reason: format!("Can't get table id for table name {:?}", sink_table_name), + })?; + let table_info_value = self + .table_info_source + .get_table_info_value(&table_id) + .await? + .context(UnexpectedSnafu { + reason: format!("Can't get table info value for table id {:?}", table_id), + })?; + let real_schema = table_info_value_to_relation_desc(table_info_value)?; + node_ctx.assign_table_schema(sink_table_name, real_schema.clone())?; + } else { + // assign inferred schema to sink table + // create sink table + node_ctx.assign_table_schema(sink_table_name, flow_plan.schema.clone())?; + let did_create = self + .create_table_from_relation( + &format!("flow-id={flow_id}"), + sink_table_name, + &flow_plan.schema, + ) + .await?; + if !did_create { + UnexpectedSnafu { + reason: format!("Failed to create table {:?}", sink_table_name), + } + .fail()?; + } + } + + node_ctx.add_flow_plan(flow_id, flow_plan.clone()); + + debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan); + + Ok(()) + } + /// Return task id if a new task is created, otherwise return None /// /// steps to create task: @@ -827,88 +930,10 @@ impl FlowWorkerManager { // construct a active dataflow state with it let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?; - node_ctx.add_flow_plan(flow_id, flow_plan.clone()); - - debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan); - // check schema against actual table schema if exists // if not exist create sink table immediately - if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? { - let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema); - - // for column schema, only `data_type` need to be check for equality - // since one can omit flow's column name when write flow query - // print a user friendly error message about mismatch and how to correct them - for (idx, zipped) in auto_schema - .iter() - .zip_longest(real_schema.iter()) - .enumerate() - { - match zipped { - EitherOrBoth::Both(auto, real) => { - if auto.data_type != real.data_type { - InvalidQuerySnafu { - reason: format!( - "Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}", - idx, - real.name, - auto.name, - real.data_type, - auto.data_type - ), - } - .fail()?; - } - } - EitherOrBoth::Right(real) if real.data_type.is_timestamp() => { - // if table is auto created, the last one or two column should be timestamp(update at and ts placeholder) - continue; - } - _ => InvalidQuerySnafu { - reason: format!( - "schema length mismatched, expected {} found {}", - real_schema.len(), - auto_schema.len() - ), - } - .fail()?, - } - } - - let table_id = self - .table_info_source - .get_table_id_from_name(&sink_table_name) - .await? - .context(UnexpectedSnafu { - reason: format!("Can't get table id for table name {:?}", sink_table_name), - })?; - let table_info_value = self - .table_info_source - .get_table_info_value(&table_id) - .await? - .context(UnexpectedSnafu { - reason: format!("Can't get table info value for table id {:?}", table_id), - })?; - let real_schema = table_info_value_to_relation_desc(table_info_value)?; - node_ctx.assign_table_schema(&sink_table_name, real_schema.clone())?; - } else { - // assign inferred schema to sink table - // create sink table - node_ctx.assign_table_schema(&sink_table_name, flow_plan.schema.clone())?; - let did_create = self - .create_table_from_relation( - &format!("flow-id={flow_id}"), - &sink_table_name, - &flow_plan.schema, - ) - .await?; - if !did_create { - UnexpectedSnafu { - reason: format!("Failed to create table {:?}", sink_table_name), - } - .fail()?; - } - } + self.valid_or_create_sink_table(flow_id, &flow_plan, &sink_table_name, &mut node_ctx) + .await?; let _ = comment; let _ = flow_options; diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 3943ff44ed36..b4022cbdd1c0 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -269,6 +269,7 @@ impl FlownodeContext { self.sink_to_flow.insert(sink_table_name, task_id); } + /// add flow plan to worker context pub fn add_flow_plan(&mut self, task_id: FlowId, plan: TypedPlan) { self.flow_plans.insert(task_id, plan); } diff --git a/tests/cases/standalone/common/flow/flow_refill.result b/tests/cases/standalone/common/flow/flow_refill.result new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/cases/standalone/common/flow/flow_refill.sql b/tests/cases/standalone/common/flow/flow_refill.sql new file mode 100644 index 000000000000..e545576e5f76 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_refill.sql @@ -0,0 +1 @@ +-- testing flow refill after reboot \ No newline at end of file