diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index b399bf37f70d..695787e7f1c2 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -21,6 +21,7 @@ use clap::Parser; use client::client_manager::NodeClients; use common_base::Plugins; use common_config::Configurable; +use common_error::ext::BoxedError; use common_grpc::channel_manager::ChannelConfig; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler; @@ -38,8 +39,8 @@ use snafu::{OptionExt, ResultExt}; use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ - BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu, - MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu, + BuildCacheRegistrySnafu, BuildCliSnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, + MetaClientInitSnafu, MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu, }; use crate::options::{GlobalOptions, GreptimeOptions}; use crate::{log_versions, App}; @@ -301,7 +302,7 @@ impl StartCommand { Plugins::new(), table_metadata_manager, catalog_manager.clone(), - flow_metadata_manager, + flow_metadata_manager.clone(), ) .with_heartbeat_task(heartbeat_task); @@ -316,7 +317,7 @@ impl StartCommand { let client = Arc::new(NodeClients::new(channel_config)); let invoker = FrontendInvoker::build_from( - flownode.flow_worker_manager().clone(), + None, catalog_manager.clone(), cached_meta_backend.clone(), layered_cache_registry.clone(), @@ -330,6 +331,16 @@ impl StartCommand { .set_frontend_invoker(invoker) .await; + if let Err(err) = flownode + .flow_worker_manager() + .create_and_start_refill_flow_tasks(&flow_metadata_manager, &(catalog_manager as _)) + .await + .map_err(BoxedError::new) + .context(BuildCliSnafu) + { + common_telemetry::error!(?err, "Failed to create and start refill flow tasks"); + } + Ok(Instance::new(flownode, guard)) } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index e3675a7db7c1..98153952d804 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -76,10 +76,10 @@ use tokio::sync::{broadcast, RwLock}; use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ - BuildCacheRegistrySnafu, CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, - InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, OtherSnafu, Result, - ShutdownDatanodeSnafu, ShutdownFlownodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, - StartFlownodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, + BuildCacheRegistrySnafu, BuildCliSnafu, CreateDirSnafu, IllegalConfigSnafu, + InitDdlManagerSnafu, InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, OtherSnafu, + Result, ShutdownDatanodeSnafu, ShutdownFlownodeSnafu, ShutdownFrontendSnafu, + StartDatanodeSnafu, StartFlownodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu, }; use crate::options::{GlobalOptions, GreptimeOptions}; @@ -580,7 +580,7 @@ impl StartCommand { layered_cache_registry.clone(), table_metadata_manager, table_meta_allocator, - flow_metadata_manager, + flow_metadata_manager.clone(), flow_meta_allocator, ) .await?; @@ -602,7 +602,7 @@ impl StartCommand { let flow_worker_manager = flownode.flow_worker_manager(); // flow server need to be able to use frontend to write insert requests back let invoker = FrontendInvoker::build_from( - flow_worker_manager.clone(), + Some(frontend.query_engine()), catalog_manager.clone(), kv_backend.clone(), layered_cache_registry.clone(), @@ -613,6 +613,15 @@ impl StartCommand { .context(StartFlownodeSnafu)?; flow_worker_manager.set_frontend_invoker(invoker).await; + if let Err(err) = flow_worker_manager + .create_and_start_refill_flow_tasks(&flow_metadata_manager, &(catalog_manager as _)) + .await + .map_err(BoxedError::new) + .context(BuildCliSnafu) + { + common_telemetry::error!(err; "failed to refill flow"); + } + let (tx, _rx) = broadcast::channel(1); let servers = Services::new(opts, Arc::new(frontend.clone()), plugins) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index dcdb1b1eb01a..401edd4e3b8b 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -23,6 +23,7 @@ use std::time::{Duration, Instant, SystemTime}; use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests}; use common_config::Configurable; use common_error::ext::BoxedError; +use common_meta::key::flow::flow_info::FlowInfoValue; use common_meta::key::TableMetadataManagerRef; use common_runtime::JoinHandle; use common_telemetry::logging::{LoggingOptions, TracingOptions}; @@ -37,7 +38,7 @@ use serde::{Deserialize, Serialize}; use servers::grpc::GrpcOptions; use servers::heartbeat_options::HeartbeatOptions; use servers::Mode; -use session::context::QueryContext; +use session::context::{QueryContext, QueryContextBuilder}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{ConcreteDataType, RegionId}; use table::metadata::TableId; @@ -45,6 +46,7 @@ use tokio::sync::broadcast::error::TryRecvError; use tokio::sync::{broadcast, watch, Mutex, RwLock}; pub(crate) use crate::adapter::node_context::FlownodeContext; +use crate::adapter::refill::RefillTask; use crate::adapter::table_source::TableSource; use crate::adapter::util::{ relation_desc_to_column_schemas_with_fallback, table_info_value_to_relation_desc, @@ -58,6 +60,7 @@ use crate::error::{ }; use crate::expr::Batch; use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS}; +use crate::plan::TypedPlan; use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE}; mod flownode_impl; @@ -68,6 +71,8 @@ mod tests; mod util; mod worker; +mod refill; + pub(crate) mod node_context; mod table_source; @@ -133,6 +138,8 @@ pub struct FlowWorkerManager { frontend_invoker: RwLock>, /// contains mapping from table name to global id, and table schema node_context: RwLock, + /// Contains all refill tasks + refill_tasks: RwLock>, flow_err_collectors: RwLock>, src_send_buf_lens: RwLock>>, tick_manager: FlowTickManager, @@ -171,6 +178,7 @@ impl FlowWorkerManager { table_info_source: srv_map, frontend_invoker: RwLock::new(None), node_context: RwLock::new(node_context), + refill_tasks: Default::default(), flow_err_collectors: Default::default(), src_send_buf_lens: Default::default(), tick_manager, @@ -694,8 +702,45 @@ pub struct CreateFlowArgs { pub query_ctx: Option, } +impl CreateFlowArgs { + pub fn from_flow_info( + flow_id: FlowId, + info: FlowInfoValue, + create_if_not_exists: bool, + or_replace: bool, + ) -> Self { + let sink_table_name = [ + info.sink_table_name().catalog_name.clone(), + info.sink_table_name().schema_name.clone(), + info.sink_table_name().table_name.clone(), + ]; + let args = CreateFlowArgs { + flow_id: flow_id as _, + sink_table_name, + source_table_ids: info.source_table_ids().to_vec(), + create_if_not_exists, + or_replace, + expire_after: info.expire_after(), + comment: Some(info.comment().clone()), + sql: info.raw_sql().clone(), + flow_options: info.options().clone(), + query_ctx: Some( + QueryContextBuilder::default() + .current_catalog(info.catalog_name().clone()) + .build(), + ), + }; + args + } +} + /// Create&Remove flow impl FlowWorkerManager { + /// Get table info source + pub fn table_info_source(&self) -> &TableSource { + &self.table_info_source + } + /// remove a flow by it's id pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> { for handle in self.worker_handles.iter() { @@ -709,6 +754,108 @@ impl FlowWorkerManager { Ok(()) } + /// adjust flow plan's time index to match real table schema + async fn fix_time_index_for_flow_plan( + &self, + flow_plan: &TypedPlan, + real_schema: &[ColumnSchema], + ) -> Result { + todo!() + } + + ///// check schema against actual table schema if exists + /// if not exist create sink table immediately + async fn valid_or_create_sink_table( + &self, + flow_id: FlowId, + flow_plan: &TypedPlan, + sink_table_name: &TableName, + node_ctx: &mut FlownodeContext, + ) -> Result<(), Error> { + if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(sink_table_name).await? { + let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema); + + // for column schema, only `data_type` need to be check for equality + // since one can omit flow's column name when write flow query + // print a user friendly error message about mismatch and how to correct them + for (idx, zipped) in auto_schema + .iter() + .zip_longest(real_schema.iter()) + .enumerate() + { + match zipped { + EitherOrBoth::Both(auto, real) => { + if auto.data_type != real.data_type { + InvalidQuerySnafu { + reason: format!( + "Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}", + idx, + real.name, + auto.name, + real.data_type, + auto.data_type + ), + } + .fail()?; + } + } + EitherOrBoth::Right(real) if real.data_type.is_timestamp() => { + // if table is auto created, the last one or two column should be timestamp(update at and ts placeholder) + continue; + } + _ => InvalidQuerySnafu { + reason: format!( + "schema length mismatched, expected {} found {}", + real_schema.len(), + auto_schema.len() + ), + } + .fail()?, + } + } + + let table_id = self + .table_info_source + .get_table_id_from_name(sink_table_name) + .await? + .context(UnexpectedSnafu { + reason: format!("Can't get table id for table name {:?}", sink_table_name), + })?; + let table_info_value = self + .table_info_source + .get_table_info_value(&table_id) + .await? + .context(UnexpectedSnafu { + reason: format!("Can't get table info value for table id {:?}", table_id), + })?; + let real_schema = table_info_value_to_relation_desc(table_info_value)?; + node_ctx.assign_table_schema(sink_table_name, real_schema.clone())?; + } else { + // assign inferred schema to sink table + // create sink table + node_ctx.assign_table_schema(sink_table_name, flow_plan.schema.clone())?; + let did_create = self + .create_table_from_relation( + &format!("flow-id={flow_id}"), + sink_table_name, + &flow_plan.schema, + ) + .await?; + if !did_create { + UnexpectedSnafu { + reason: format!("Failed to create table {:?}", sink_table_name), + } + .fail()?; + } + } + + node_ctx.add_flow_plan(flow_id, flow_plan.clone()); + + debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan); + + Ok(()) + } + /// Return task id if a new task is created, otherwise return None /// /// steps to create task: @@ -783,86 +930,10 @@ impl FlowWorkerManager { // construct a active dataflow state with it let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?; - debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan); - // check schema against actual table schema if exists // if not exist create sink table immediately - if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? { - let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema); - - // for column schema, only `data_type` need to be check for equality - // since one can omit flow's column name when write flow query - // print a user friendly error message about mismatch and how to correct them - for (idx, zipped) in auto_schema - .iter() - .zip_longest(real_schema.iter()) - .enumerate() - { - match zipped { - EitherOrBoth::Both(auto, real) => { - if auto.data_type != real.data_type { - InvalidQuerySnafu { - reason: format!( - "Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}", - idx, - real.name, - auto.name, - real.data_type, - auto.data_type - ), - } - .fail()?; - } - } - EitherOrBoth::Right(real) if real.data_type.is_timestamp() => { - // if table is auto created, the last one or two column should be timestamp(update at and ts placeholder) - continue; - } - _ => InvalidQuerySnafu { - reason: format!( - "schema length mismatched, expected {} found {}", - real_schema.len(), - auto_schema.len() - ), - } - .fail()?, - } - } - - let table_id = self - .table_info_source - .get_table_id_from_name(&sink_table_name) - .await? - .context(UnexpectedSnafu { - reason: format!("Can't get table id for table name {:?}", sink_table_name), - })?; - let table_info_value = self - .table_info_source - .get_table_info_value(&table_id) - .await? - .context(UnexpectedSnafu { - reason: format!("Can't get table info value for table id {:?}", table_id), - })?; - let real_schema = table_info_value_to_relation_desc(table_info_value)?; - node_ctx.assign_table_schema(&sink_table_name, real_schema.clone())?; - } else { - // assign inferred schema to sink table - // create sink table - node_ctx.assign_table_schema(&sink_table_name, flow_plan.schema.clone())?; - let did_create = self - .create_table_from_relation( - &format!("flow-id={flow_id}"), - &sink_table_name, - &flow_plan.schema, - ) - .await?; - if !did_create { - UnexpectedSnafu { - reason: format!("Failed to create table {:?}", sink_table_name), - } - .fail()?; - } - } + self.valid_or_create_sink_table(flow_id, &flow_plan, &sink_table_name, &mut node_ctx) + .await?; let _ = comment; let _ = flow_options; diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 5c644803ec71..b4022cbdd1c0 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -18,6 +18,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use common_recordbatch::RecordBatch; use common_telemetry::trace; use datatypes::prelude::ConcreteDataType; use session::context::QueryContext; @@ -30,6 +31,7 @@ use crate::error::{Error, EvalSnafu, TableNotFoundSnafu}; use crate::expr::error::InternalSnafu; use crate::expr::{Batch, GlobalId}; use crate::metrics::METRIC_FLOW_INPUT_BUF_SIZE; +use crate::plan::TypedPlan; use crate::repr::{DiffRow, RelationDesc, BATCH_SIZE, BROADCAST_CAP, SEND_BUF_CAP}; /// A context that holds the information of the dataflow @@ -39,6 +41,7 @@ pub struct FlownodeContext { pub source_to_tasks: BTreeMap>, /// mapping from task to sink table, useful for sending data back to the client when a task is done running pub flow_to_sink: BTreeMap, + pub flow_plans: BTreeMap, pub sink_to_flow: BTreeMap, /// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender /// @@ -106,7 +109,17 @@ impl SourceSender { // TODO(discord9): send rows instead so it's just moving a point if let Some(batch) = send_buf.recv().await { let len = batch.row_count(); - self.send_buf_row_cnt.fetch_sub(len, Ordering::SeqCst); + let prev = self.send_buf_row_cnt.fetch_sub(len, Ordering::SeqCst); + if prev < len { + InternalSnafu { + reason: format!( + "send_buf_row_cnt underflow, prev = {}, current = {}", + prev, + prev.wrapping_sub(len) + ), + } + .fail()? + } row_cnt += len; self.sender .send(batch) @@ -137,13 +150,28 @@ impl SourceSender { rows: Vec, batch_datatypes: &[ConcreteDataType], ) -> Result { - METRIC_FLOW_INPUT_BUF_SIZE.add(rows.len() as _); - while self.send_buf_row_cnt.load(Ordering::SeqCst) >= BATCH_SIZE * 4 { + let rows_len = rows.len(); + METRIC_FLOW_INPUT_BUF_SIZE.add(rows_len as _); + let mut retry = 0; + let max_retry = 100; + while self.send_buf_row_cnt.load(Ordering::SeqCst) >= BATCH_SIZE * 4 && retry < max_retry { + common_telemetry::debug!("Send buf is full, waiting for it to be flushed"); + retry += 1; tokio::task::yield_now().await; } + + if retry >= max_retry { + return crate::error::InternalSnafu { + reason: format!( + "Send buf is full(len={}), fail to send", + self.send_buf_row_cnt.load(Ordering::SeqCst) + ), + } + .fail()?; + } + // row count metrics is approx so relaxed order is ok - self.send_buf_row_cnt - .fetch_add(rows.len(), Ordering::SeqCst); + self.send_buf_row_cnt.fetch_add(rows_len, Ordering::SeqCst); let batch = Batch::try_from_rows_with_types( rows.into_iter().map(|(row, _, _)| row).collect(), batch_datatypes, @@ -157,7 +185,23 @@ impl SourceSender { .build() })?; - Ok(0) + Ok(rows_len) + } + + /// send record batch + pub async fn send_record_batch(&self, batch: RecordBatch) -> Result { + let row_cnt = batch.num_rows(); + let batch = Batch::from(batch); + + self.send_buf_row_cnt.fetch_add(row_cnt, Ordering::SeqCst); + + self.send_buf_tx.send(batch).await.map_err(|e| { + crate::error::InternalSnafu { + reason: format!("Failed to send batch, error = {:?}", e), + } + .build() + })?; + Ok(row_cnt) } } @@ -180,6 +224,16 @@ impl FlownodeContext { sender.send_rows(rows, batch_datatypes).await } + pub async fn send_rb(&self, table_id: TableId, batch: RecordBatch) -> Result { + let sender = self + .source_sender + .get(&table_id) + .with_context(|| TableNotFoundSnafu { + name: table_id.to_string(), + })?; + sender.send_record_batch(batch).await + } + /// flush all sender's buf /// /// return numbers being sent @@ -215,6 +269,15 @@ impl FlownodeContext { self.sink_to_flow.insert(sink_table_name, task_id); } + /// add flow plan to worker context + pub fn add_flow_plan(&mut self, task_id: FlowId, plan: TypedPlan) { + self.flow_plans.insert(task_id, plan); + } + + pub fn get_flow_plan(&self, task_id: &FlowId) -> Option { + self.flow_plans.get(task_id).cloned() + } + /// remove flow from worker context pub fn remove_flow(&mut self, task_id: FlowId) { if let Some(sink_table_name) = self.flow_to_sink.remove(&task_id) { @@ -226,6 +289,7 @@ impl FlownodeContext { self.source_sender.remove(source_table_id); } } + self.flow_plans.remove(&task_id); } /// try add source sender, if already exist, do nothing diff --git a/src/flow/src/adapter/refill.rs b/src/flow/src/adapter/refill.rs new file mode 100644 index 000000000000..324bb38bfe6c --- /dev/null +++ b/src/flow/src/adapter/refill.rs @@ -0,0 +1,425 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeSet; +use std::sync::Arc; + +use catalog::CatalogManagerRef; +use common_error::ext::BoxedError; +use common_meta::key::flow::FlowMetadataManagerRef; +use common_recordbatch::{RecordBatch, RecordBatches, SendableRecordBatchStream}; +use common_runtime::JoinHandle; +use common_telemetry::error; +use datatypes::value::Value; +use futures::StreamExt; +use query::parser::QueryLanguageParser; +use session::context::QueryContextBuilder; +use snafu::{OptionExt, ResultExt}; +use table::metadata::TableId; + +use super::{FlowId, FlowWorkerManager}; +use crate::adapter::table_source::TableSource; +use crate::adapter::FlowWorkerManagerRef; +use crate::error::{FlowNotFoundSnafu, JoinTaskSnafu, UnexpectedSnafu}; +use crate::expr::error::ExternalSnafu; +use crate::expr::find_plan_time_window_expr_lower_bound; +use crate::repr::RelationDesc; +use crate::server::get_all_flow_ids; +use crate::{Error, FrontendInvoker}; + +impl FlowWorkerManager { + /// Create and start refill flow tasks in background + pub async fn create_and_start_refill_flow_tasks( + self: &FlowWorkerManagerRef, + flow_metadata_manager: &FlowMetadataManagerRef, + catalog_manager: &CatalogManagerRef, + ) -> Result<(), Error> { + let tasks = self + .create_refill_flow_tasks(flow_metadata_manager, catalog_manager) + .await?; + self.starting_refill_flows(tasks).await?; + Ok(()) + } + + /// Create a series of tasks to refill flow + pub async fn create_refill_flow_tasks( + &self, + flow_metadata_manager: &FlowMetadataManagerRef, + catalog_manager: &CatalogManagerRef, + ) -> Result, Error> { + let nodeid = self.node_id.map(|c| c as u64); + + let flow_ids = get_all_flow_ids(flow_metadata_manager, catalog_manager, nodeid).await?; + let mut refill_tasks = Vec::new(); + 'flow_id_loop: for flow_id in flow_ids { + let info = flow_metadata_manager + .flow_info_manager() + .get(flow_id) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? + .context(FlowNotFoundSnafu { id: flow_id })?; + + // TODO(discord9): also check flow is already running + for src_table in info.source_table_ids() { + // check if source table still exists + if !self.table_info_source.check_table_exist(src_table).await? { + error!( + "Source table id = {:?} not found while refill flow_id={}, consider re-create the flow if necessary", + src_table, flow_id + ); + continue 'flow_id_loop; + } + } + + let expire_after = info.expire_after(); + // TODO(discord9): better way to get last point + let now = self.tick_manager.tick(); + let plan = self + .node_context + .read() + .await + .get_flow_plan(&FlowId::from(flow_id)) + .context(FlowNotFoundSnafu { id: flow_id })?; + let time_range = if let Some(expire_after) = expire_after { + let low_bound = common_time::Timestamp::new_millisecond(now - expire_after); + let real_low_bound = find_plan_time_window_expr_lower_bound(&plan, low_bound)?; + real_low_bound.map(|l| (l, common_time::Timestamp::new_millisecond(now))) + } else { + None + }; + + common_telemetry::debug!( + "Time range for refill flow_id={} is {:?}", + flow_id, + time_range + ); + + for src_table in info.source_table_ids() { + let time_index_col = self + .table_info_source + .get_time_index_column_from_table_id(src_table) + .await?; + let time_index_name = time_index_col.name; + let task = RefillTask::create( + flow_id as u64, + *src_table, + time_range, + &time_index_name, + &self.table_info_source, + ) + .await?; + refill_tasks.push(task); + } + } + Ok(refill_tasks) + } + + /// Starting to refill flows, if any error occurs, will rebuild the flow and retry + pub(crate) async fn starting_refill_flows( + self: &FlowWorkerManagerRef, + tasks: Vec, + ) -> Result<(), Error> { + // TODO(discord9): add a back pressure mechanism + let frontend_invoker = + self.frontend_invoker + .read() + .await + .clone() + .context(UnexpectedSnafu { + reason: "frontend invoker is not set", + })?; + + for mut task in tasks { + task.start_running(self.clone(), &frontend_invoker).await?; + // TODO(discord9): save refill tasks to a map and check if it's finished when necessary + // i.e. when system table need query it's state + self.refill_tasks + .write() + .await + .insert(task.data.flow_id, task); + } + Ok(()) + } +} + +/// Task to refill flow with given table id and a time range +pub struct RefillTask { + data: TaskData, + state: TaskState<()>, +} + +#[derive(Clone)] +struct TaskData { + flow_id: FlowId, + table_id: TableId, + table_schema: RelationDesc, +} + +impl TaskData { + /// validate that incoming batch's schema is the same as table schema(by comparing types&names) + fn validate_schema(table_schema: &RelationDesc, rb: &RecordBatch) -> Result<(), Error> { + let rb_schema = &rb.schema; + if rb_schema.column_schemas().len() != table_schema.len()? { + UnexpectedSnafu { + reason: "rb schema len != table schema len", + } + .fail()?; + } + for (i, rb_col) in rb_schema.column_schemas().iter().enumerate() { + let (rb_name, rb_ty) = (rb_col.name.as_str(), &rb_col.data_type); + let (table_name, table_ty) = ( + table_schema.names[i].as_ref(), + &table_schema.typ().column_types[i].scalar_type, + ); + if Some(rb_name) != table_name.map(|c| c.as_str()) { + UnexpectedSnafu { + reason: format!( + "incoming batch's schema name {} != expected table schema name {:?}", + rb_name, table_name + ), + } + .fail()?; + } + + if rb_ty != table_ty { + UnexpectedSnafu { + reason: format!( + "incoming batch's schema type {:?} != expected table schema type {:?}", + rb_ty, table_ty + ), + } + .fail()?; + } + } + Ok(()) + } +} + +/// Refill task state +enum TaskState { + /// Task is not started + Prepared { sql: String }, + /// Task is running + Running { + handle: JoinHandle>, + }, + /// Task is finished + Finished { res: Result }, +} + +impl TaskState { + fn new(sql: String) -> Self { + Self::Prepared { sql } + } +} + +mod test_send { + use std::collections::BTreeMap; + + use tokio::sync::RwLock; + + use super::*; + fn is_send() {} + fn foo() { + is_send::>(); + is_send::(); + is_send::>(); + is_send::>>(); + } +} + +impl TaskState<()> { + /// check if task is finished + async fn is_finished(&mut self) -> Result { + match self { + Self::Finished { .. } => Ok(true), + Self::Running { handle } => Ok(if handle.is_finished() { + *self = Self::Finished { + res: handle.await.context(JoinTaskSnafu)?, + }; + true + } else { + false + }), + _ => Ok(false), + } + } + + fn start_running( + &mut self, + task_data: &TaskData, + manager: FlowWorkerManagerRef, + mut output_stream: SendableRecordBatchStream, + ) -> Result<(), Error> { + let data = (*task_data).clone(); + let handle: JoinHandle> = common_runtime::spawn_global(async move { + while let Some(rb) = output_stream.next().await { + let rb = match rb { + Ok(rb) => rb, + Err(err) => Err(BoxedError::new(err)).context(ExternalSnafu)?, + }; + TaskData::validate_schema(&data.table_schema, &rb)?; + + // send rb into flow node + manager + .node_context + .read() + .await + .send_rb(data.table_id, rb) + .await?; + } + common_telemetry::info!( + "Refill successful for source table_id={}, flow_id={}", + data.table_id, + data.flow_id + ); + Ok(()) + }); + *self = Self::Running { handle }; + + Ok(()) + } +} + +/// Query stream of RefillTask, simply wrap RecordBatches and RecordBatchStream and check output is not `AffectedRows` +enum QueryStream { + Batches { batches: RecordBatches }, + Stream { stream: SendableRecordBatchStream }, +} + +impl TryFrom for QueryStream { + type Error = Error; + fn try_from(value: common_query::Output) -> Result { + match value.data { + common_query::OutputData::Stream(stream) => Ok(QueryStream::Stream { stream }), + common_query::OutputData::RecordBatches(batches) => { + Ok(QueryStream::Batches { batches }) + } + _ => UnexpectedSnafu { + reason: format!("Unexpected output data type: {:?}", value.data), + } + .fail(), + } + } +} + +impl QueryStream { + fn try_into_stream(self) -> Result { + match self { + Self::Batches { batches } => Ok(batches.as_stream()), + Self::Stream { stream } => Ok(stream), + } + } +} + +impl RefillTask { + /// Query with "select * from table WHERE time >= range_start and time < range_end" + pub async fn create( + flow_id: FlowId, + table_id: TableId, + time_range: Option<(common_time::Timestamp, common_time::Timestamp)>, + time_col_name: &str, + table_src: &TableSource, + ) -> Result { + let (table_name, table_schema) = table_src.get_table_name_schema(&table_id).await?; + let all_col_names: BTreeSet<_> = table_schema + .iter_names() + .flatten() + .map(|s| s.as_str()) + .collect(); + + if !all_col_names.contains(time_col_name) { + UnexpectedSnafu { + reason: format!( + "Can't find column {} in table {} while refill flow", + time_col_name, + table_name.join(".") + ), + } + .fail()?; + } + + let sql = if let Some(time_range) = time_range { + format!( + "select * from {0} where {1} >= {2} and {1} < {3}", + table_name.join("."), + time_col_name, + Value::from(time_range.0), + Value::from(time_range.1), + ) + } else { + format!("select * from {0}", table_name.join(".")) + }; + + Ok(RefillTask { + data: TaskData { + flow_id, + table_id, + table_schema, + }, + state: TaskState::new(sql), + }) + } + + /// Start running the task in background, non-blocking + pub async fn start_running( + &mut self, + manager: FlowWorkerManagerRef, + invoker: &FrontendInvoker, + ) -> Result<(), Error> { + let TaskState::Prepared { sql } = &mut self.state else { + UnexpectedSnafu { + reason: "task is not prepared", + } + .fail()? + }; + + // we don't need information from query context in this query so a default query context is enough + let query_ctx = Arc::new( + QueryContextBuilder::default() + .current_catalog("greptime".to_string()) + .current_schema("public".to_string()) + .build(), + ); + + let stmt_exec = invoker.statement_executor(); + + let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let plan = stmt_exec + .plan(&stmt, query_ctx.clone()) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + let output_data = stmt_exec + .exec_plan(plan, query_ctx) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + let output_stream = QueryStream::try_from(output_data)?; + let output_stream = output_stream.try_into_stream()?; + + self.state + .start_running(&self.data, manager, output_stream)?; + Ok(()) + } + + pub async fn is_finished(&mut self) -> Result { + self.state.is_finished().await + } +} diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs index 7981999f0abc..9882f65e7229 100644 --- a/src/flow/src/adapter/table_source.rs +++ b/src/flow/src/adapter/table_source.rs @@ -42,6 +42,30 @@ impl TableSource { } } + pub async fn get_time_index_column_from_table_id( + &self, + table_id: &TableId, + ) -> Result { + let info = self + .table_info_manager + .get(*table_id) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? + .context(UnexpectedSnafu { + reason: format!("Table id = {:?}, couldn't found table info", table_id), + })?; + let raw_schema = &info.table_info.meta.schema; + let Some(ts_index) = raw_schema.timestamp_index else { + UnexpectedSnafu { + reason: format!("Table id = {:?}, couldn't found timestamp index", table_id), + } + .fail()? + }; + let col_schema = raw_schema.column_schemas[ts_index].clone(); + Ok(col_schema) + } + pub async fn get_table_id_from_proto_name( &self, name: &greptime_proto::v1::TableName, @@ -75,6 +99,16 @@ impl TableSource { Ok(ret) } + pub async fn check_table_exist(&self, table_id: &TableId) -> Result { + Ok(self + .table_info_manager + .get(*table_id) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? + .is_some()) + } + /// query metasrv about the table name and table id pub async fn get_table_name(&self, table_id: &TableId) -> Result { self.table_info_manager diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index 137e024307f9..e78e0d6fa33c 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -21,7 +21,7 @@ use common_error::{define_into_tonic_status, from_err_code_msg_to_header}; use common_macro::stack_trace_debug; use common_telemetry::common_error::ext::ErrorExt; use common_telemetry::common_error::status_code::StatusCode; -use snafu::{Location, Snafu}; +use snafu::{Location, ResultExt, Snafu}; use tonic::metadata::MetadataMap; use crate::adapter::FlowId; @@ -46,6 +46,21 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to query"))] + RequestQuery { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, + + #[snafu(display("Failed to find table route for table id {}", table_id))] + FindTableRoute { + table_id: u32, + #[snafu(implicit)] + location: Location, + source: partition::error::Error, + }, + /// TODO(discord9): add detailed location of column #[snafu(display("Failed to eval stream"))] Eval { @@ -221,6 +236,11 @@ impl ErrorExt for Error { Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => { StatusCode::Unsupported } + + Self::FindTableRoute { source, .. } => source.status_code(), + + Self::RequestQuery { source, .. } => source.status_code(), + Self::External { source, .. } => source.status_code(), Self::Internal { .. } | Self::CacheRequired { .. } => StatusCode::Internal, Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => { @@ -237,3 +257,9 @@ impl ErrorExt for Error { } define_into_tonic_status!(Error); + +impl From for Error { + fn from(e: EvalError) -> Self { + Err::<(), _>(e).context(EvalSnafu).unwrap_err() + } +} diff --git a/src/flow/src/expr.rs b/src/flow/src/expr.rs index 5dde62b43a69..a81c16ca79f5 100644 --- a/src/flow/src/expr.rs +++ b/src/flow/src/expr.rs @@ -22,6 +22,7 @@ mod linear; pub(crate) mod relation; mod scalar; mod signature; +mod utils; use arrow::compute::FilterBuilder; use datatypes::prelude::{ConcreteDataType, DataType}; @@ -36,6 +37,7 @@ pub(crate) use linear::{MapFilterProject, MfpPlan, SafeMfpPlan}; pub(crate) use relation::{Accum, Accumulator, AggregateExpr, AggregateFunc}; pub(crate) use scalar::{ScalarExpr, TypedExpr}; use snafu::{ensure, ResultExt}; +pub(crate) use utils::find_plan_time_window_expr_lower_bound; use crate::expr::error::{ArrowSnafu, DataTypeSnafu}; use crate::repr::Diff; @@ -54,6 +56,16 @@ pub struct Batch { diffs: Option, } +impl From for Batch { + fn from(value: common_recordbatch::RecordBatch) -> Self { + Self { + batch: value.columns().to_vec(), + row_count: value.num_rows(), + diffs: None, + } + } +} + impl PartialEq for Batch { fn eq(&self, other: &Self) -> bool { let mut batch_eq = true; diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index f96d7827b6bd..b416ed0b25ec 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -94,6 +94,15 @@ impl MapFilterProject { } } + pub fn get_nth_expr(&self, n: usize) -> Option { + let idx = *self.projection.get(n)?; + if idx < self.input_arity { + Some(ScalarExpr::Column(idx)) + } else { + self.expressions.get(idx - self.input_arity).cloned() + } + } + /// The number of columns expected in the output row. pub fn output_arity(&self) -> usize { self.projection.len() diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index a6e00cce5bdd..94af5e0e425a 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -311,6 +311,9 @@ impl ScalarExpr { } /// Eval this expression with the given values. + /// + /// TODO(discord9): add tests to make sure `eval_batch` is the same as `eval` in + /// most cases pub fn eval(&self, values: &[Value]) -> Result { match self { ScalarExpr::Column(index) => Ok(values[*index].clone()), diff --git a/src/flow/src/expr/utils.rs b/src/flow/src/expr/utils.rs new file mode 100644 index 000000000000..84019c958979 --- /dev/null +++ b/src/flow/src/expr/utils.rs @@ -0,0 +1,345 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::collections::BTreeMap; + +use datatypes::value::Value; +use snafu::{ensure, OptionExt}; + +use crate::error::UnexpectedSnafu; +use crate::expr::ScalarExpr; +use crate::plan::TypedPlan; +use crate::Result; + +/// Find lower bound for time `current` in given `plan` for the time window expr. +/// +/// i.e. for time window expr being `date_bin(INTERVAL '5 minutes', ts) as time_window` and `current="2021-07-01 00:01:01.000"`, +/// return `Some("2021-07-01 00:00:00.000")` +/// +/// if `plan` doesn't contain a `TIME INDEX` column, return `None` +pub fn find_plan_time_window_expr_lower_bound( + plan: &TypedPlan, + current: common_time::Timestamp, +) -> Result> { + let typ = plan.schema.typ(); + let Some(mut time_index) = typ.time_index else { + return Ok(None); + }; + + let mut cur_plan = plan; + let mut expr_time_index; + + loop { + // follow upward and find deepest time index expr that is not a column ref + expr_time_index = Some(cur_plan.plan.get_nth_expr(time_index).context( + UnexpectedSnafu { + reason: "Failed to find time index expr", + }, + )?); + + if let Some(ScalarExpr::Column(i)) = expr_time_index { + time_index = i; + } else { + break; + } + if let Some(input) = cur_plan.plan.get_first_input_plan() { + cur_plan = input; + } else { + break; + } + } + + let expr_time_index = expr_time_index.context(UnexpectedSnafu { + reason: "Failed to find time index expr", + })?; + + let ts_col = expr_time_index + .get_all_ref_columns() + .first() + .cloned() + .context(UnexpectedSnafu { + reason: "Failed to find time index column", + })?; + let ts_col = ScalarExpr::Column(ts_col); + + find_time_window_lower_bound(&expr_time_index, &ts_col, current) +} + +/// Find the lower bound of time window in given `expr` and `current` timestamp. +/// +/// i.e. for `current="2021-07-01 00:01:01.000"` and `expr=date_bin(INTERVAL '5 minutes', ts) as time_window` and `ts_col=ts`, +/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound +/// of current time window given the current timestamp +/// +/// if return None, meaning this time window have no lower bound +pub fn find_time_window_lower_bound( + expr: &ScalarExpr, + ts_col: &ScalarExpr, + current: common_time::Timestamp, +) -> Result> { + let ScalarExpr::Column(ts_col_idx) = ts_col.clone() else { + UnexpectedSnafu { + reason: format!("Expected column expression but got {ts_col:?}"), + } + .fail()? + }; + let all_ref_columns = expr.get_all_ref_columns(); + if !all_ref_columns.contains(&ts_col_idx) { + UnexpectedSnafu { + reason: format!( + "Expected column {} to be referenced in expression {expr:?}", + ts_col_idx + ), + } + .fail()? + } + if all_ref_columns.len() > 1 { + UnexpectedSnafu { + reason: format!( + "Expect only one column to be referenced in expression {expr:?}, found {all_ref_columns:?}" + ), + } + .fail()? + } + let permute_map = BTreeMap::from([(ts_col_idx, 0usize)]); + + let mut rewrote_expr = expr.clone(); + + rewrote_expr.permute_map(&permute_map)?; + + fn eval_to_timestamp(expr: &ScalarExpr, values: &[Value]) -> Result { + let val = expr.eval(values)?; + if let Value::Timestamp(ts) = val { + Ok(ts) + } else { + UnexpectedSnafu { + reason: format!("Expected timestamp in expression {expr:?} but got {val:?}"), + } + .fail()? + } + } + + let cur_time_window = eval_to_timestamp(&rewrote_expr, &[current.into()])?; + + // search to find the lower bound + let mut offset: i64 = 1; + let lower_bound; + let mut upper_bound = Some(current); + // first expontial probe to found a range for binary search + loop { + let Some(next_val) = current.value().checked_sub(offset) else { + // no lower bound + return Ok(None); + }; + + let prev_time_probe = common_time::Timestamp::new(next_val, current.unit()); + + let prev_time_window = eval_to_timestamp(&rewrote_expr, &[prev_time_probe.into()])?; + + match prev_time_window.cmp(&cur_time_window) { + Ordering::Less => { + lower_bound = Some(prev_time_probe); + break; + } + Ordering::Equal => { + upper_bound = Some(prev_time_probe); + } + Ordering::Greater => { + UnexpectedSnafu { + reason: format!( + "Unsupported time window expression {rewrote_expr:?}, expect monotonic increasing for time window expression {expr:?}" + ), + } + .fail()? + } + } + + let Some(new_offset) = offset.checked_mul(2) else { + // no lower bound + return Ok(None); + }; + offset = new_offset; + } + + // binary search for the lower bound + + ensure!(lower_bound.map(|v|v.unit())==upper_bound.map(|v|v.unit()), UnexpectedSnafu{ + reason: format!(" unit mismatch for time window expression {expr:?}, found {lower_bound:?} and {upper_bound:?}"), + }); + + let output_unit = lower_bound.expect("should have lower bound").unit(); + + let mut low = lower_bound.expect("should have lower bound").value(); + let mut high = upper_bound.expect("should have upper bound").value(); + while low < high { + let mid = (low + high) / 2; + let mid_probe = common_time::Timestamp::new(mid, output_unit); + let mid_time_window = eval_to_timestamp(&rewrote_expr, &[mid_probe.into()])?; + + match mid_time_window.cmp(&cur_time_window) { + Ordering::Less => low = mid + 1, + Ordering::Equal => high = mid, + Ordering::Greater => UnexpectedSnafu { + reason: format!("Binary search failed for time window expression {expr:?}"), + } + .fail()?, + } + } + + let final_lower_bound_for_time_window = common_time::Timestamp::new(low, output_unit); + + Ok(Some(final_lower_bound_for_time_window)) +} + +#[cfg(test)] +mod test { + use pretty_assertions::assert_eq; + + use super::*; + use crate::plan::{Plan, TypedPlan}; + use crate::test_utils::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + + #[tokio::test] + async fn test_plan_time_window_lower_bound() { + let testcases = [ + // no time index + ( + "SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;", + "2021-07-01 00:01:01.000", + None, + ), + // time index + ( + "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;", + "2021-07-01 00:01:01.000", + Some("2021-07-01 00:00:00.000"), + ), + // time index with other fields + ( + "SELECT sum(number) as sum_up, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;", + "2021-07-01 00:01:01.000", + Some("2021-07-01 00:00:00.000"), + ), + // time index with other pks + ( + "SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number;", + "2021-07-01 00:01:01.000", + Some("2021-07-01 00:00:00.000"), + ), + ]; + let engine = create_test_query_engine(); + + for (sql, current, expected) in &testcases[3..] { + let plan = sql_to_substrait(engine.clone(), sql).await; + let mut ctx = create_test_ctx(); + let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan) + .await + .unwrap(); + + let current = common_time::Timestamp::from_str(current, None).unwrap(); + + let expected = + expected.map(|expected| common_time::Timestamp::from_str(expected, None).unwrap()); + + assert_eq!( + find_plan_time_window_expr_lower_bound(&flow_plan, current).unwrap(), + expected + ); + } + } + + #[tokio::test] + async fn test_timewindow_lower_bound() { + let testcases = [ + ( + ("'5 minutes'", "ts", Some("2021-07-01 00:00:00.000")), + "2021-07-01 00:01:01.000", + "2021-07-01 00:00:00.000", + ), + ( + ("'5 minutes'", "ts", None), + "2021-07-01 00:01:01.000", + "2021-07-01 00:00:00.000", + ), + ( + ("'5 minutes'", "ts", None), + "2021-07-01 00:00:00.000", + "2021-07-01 00:00:00.000", + ), + // test edge cases + ( + ("'5 minutes'", "ts", None), + "2021-07-01 00:05:00.000", + "2021-07-01 00:05:00.000", + ), + ( + ("'5 minutes'", "ts", None), + "2021-07-01 00:04:59.999", + "2021-07-01 00:00:00.000", + ), + ( + ("'5 minutes'", "ts", None), + "2021-07-01 00:04:59.999999999", + "2021-07-01 00:00:00.000", + ), + ( + ("'5 minutes'", "ts", None), + "2021-07-01 00:04:59.999999999999", + "2021-07-01 00:00:00.000", + ), + ( + ("'5 minutes'", "ts", None), + "2021-07-01 00:04:59.999999999999999", + "2021-07-01 00:00:00.000", + ), + ]; + let engine = create_test_query_engine(); + + for (args, current, expected) in testcases { + let sql = if let Some(origin) = args.2 { + format!( + "SELECT date_bin({}, {}, '{origin}') FROM numbers_with_ts;", + args.0, args.1 + ) + } else { + format!( + "SELECT date_bin({}, {}) FROM numbers_with_ts;", + args.0, args.1 + ) + }; + let plan = sql_to_substrait(engine.clone(), &sql).await; + let mut ctx = create_test_ctx(); + let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan) + .await + .unwrap(); + + let expr = { + let mfp = flow_plan.plan; + let Plan::Mfp { mfp, .. } = mfp else { + unreachable!() + }; + mfp.expressions[0].clone() + }; + + let current = common_time::Timestamp::from_str(current, None).unwrap(); + + let res = find_time_window_lower_bound(&expr, &ScalarExpr::Column(1), current).unwrap(); + + let expected = Some(common_time::Timestamp::from_str(expected, None).unwrap()); + + assert_eq!(res, expected); + } + } +} diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index 8d6a881fa066..8d07369afa55 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -38,6 +38,9 @@ mod server; mod transform; mod utils; +#[cfg(test)] +mod test_utils; + pub use adapter::{FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions}; pub use error::{Error, Result}; pub use server::{FlownodeBuilder, FlownodeInstance, FlownodeServer, FrontendInvoker}; diff --git a/src/flow/src/plan.rs b/src/flow/src/plan.rs index e1cf22e621ec..f6b33c46cb5c 100644 --- a/src/flow/src/plan.rs +++ b/src/flow/src/plan.rs @@ -18,8 +18,10 @@ mod join; mod reduce; +use std::collections::BTreeSet; + use crate::error::Error; -use crate::expr::{Id, LocalId, MapFilterProject, SafeMfpPlan, TypedExpr}; +use crate::expr::{GlobalId, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr, TypedExpr}; use crate::plan::join::JoinPlan; pub(crate) use crate::plan::reduce::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan}; use crate::repr::{DiffRow, RelationDesc}; @@ -184,6 +186,69 @@ pub enum Plan { }, } +impl Plan { + /// Get nth expr using column ref + pub fn get_nth_expr(&self, n: usize) -> Option { + match self { + Self::Mfp { mfp, .. } => mfp.get_nth_expr(n), + Self::Reduce { key_val_plan, .. } => key_val_plan.get_nth_expr(n), + _ => None, + } + } + + /// Get the first input plan if exists + pub fn get_first_input_plan(&self) -> Option<&TypedPlan> { + match self { + Plan::Let { value, .. } => Some(value), + Plan::Mfp { input, .. } => Some(input), + Plan::Reduce { input, .. } => Some(input), + Plan::Join { inputs, .. } => inputs.first(), + Plan::Union { inputs, .. } => inputs.first(), + _ => None, + } + } + + /// Find all the used collection in the plan + pub fn find_used_collection(&self) -> BTreeSet { + fn recur_find_use(plan: &Plan, used: &mut BTreeSet) { + match plan { + Plan::Get { id } => { + match id { + Id::Local(_) => (), + Id::Global(g) => { + used.insert(*g); + } + }; + } + Plan::Let { value, body, .. } => { + recur_find_use(&value.plan, used); + recur_find_use(&body.plan, used); + } + Plan::Mfp { input, .. } => { + recur_find_use(&input.plan, used); + } + Plan::Reduce { input, .. } => { + recur_find_use(&input.plan, used); + } + Plan::Join { inputs, .. } => { + for input in inputs { + recur_find_use(&input.plan, used); + } + } + Plan::Union { inputs, .. } => { + for input in inputs { + recur_find_use(&input.plan, used); + } + } + _ => {} + } + } + let mut ret = Default::default(); + recur_find_use(self, &mut ret); + ret + } +} + impl Plan { pub fn with_types(self, schema: RelationDesc) -> TypedPlan { TypedPlan { schema, plan: self } diff --git a/src/flow/src/plan/reduce.rs b/src/flow/src/plan/reduce.rs index 1edd0c40dd55..65a83756b57f 100644 --- a/src/flow/src/plan/reduce.rs +++ b/src/flow/src/plan/reduce.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::expr::{AggregateExpr, SafeMfpPlan}; +use crate::expr::{AggregateExpr, SafeMfpPlan, ScalarExpr}; /// Describe how to extract key-value pair from a `Row` #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] @@ -23,6 +23,16 @@ pub struct KeyValPlan { pub val_plan: SafeMfpPlan, } +impl KeyValPlan { + /// Get nth expr using column ref + pub fn get_nth_expr(&self, n: usize) -> Option { + self.key_plan.get_nth_expr(n).or_else(|| { + self.val_plan + .get_nth_expr(n - self.key_plan.projection.len()) + }) + } +} + /// TODO(discord9): def&impl of Hierarchical aggregates(for min/max with support to deletion) and /// basic aggregates(for other aggregate functions) and mixed aggregate #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index d22ba220441b..88195481a5cb 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -25,22 +25,27 @@ use common_error::ext::BoxedError; use common_meta::cache::{LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef}; use common_meta::ddl::ProcedureExecutorRef; use common_meta::key::flow::FlowMetadataManagerRef; -use common_meta::key::TableMetadataManagerRef; +use common_meta::key::{FlowId, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::{Flownode, NodeManagerRef}; +use common_query::request::QueryRequest; use common_query::Output; -use common_telemetry::tracing::info; +use common_recordbatch::SendableRecordBatchStream; +use common_telemetry::{info, warn}; use futures::{FutureExt, TryStreamExt}; use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests}; use itertools::Itertools; use operator::delete::Deleter; use operator::insert::Inserter; +use operator::request::Requester; use operator::statement::StatementExecutor; -use partition::manager::PartitionRuleManager; -use query::{QueryEngine, QueryEngineFactory}; +use operator::table::TableMutationOperator; +use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; +use query::region_query::RegionQueryHandler; +use query::{QueryEngine, QueryEngineFactory, QueryEngineRef}; use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu}; use servers::server::Server; -use session::context::{QueryContextBuilder, QueryContextRef}; +use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; use tokio::net::TcpListener; use tokio::sync::{broadcast, oneshot, Mutex}; @@ -50,8 +55,9 @@ use tonic::{Request, Response, Status}; use crate::adapter::{CreateFlowArgs, FlowWorkerManagerRef}; use crate::error::{ - to_status_with_last_err, CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu, - ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, + to_status_with_last_err, CacheRequiredSnafu, ExternalSnafu, FindTableRouteSnafu, + FlowNotFoundSnafu, ListFlowsSnafu, ParseAddrSnafu, RequestQuerySnafu, ShutdownServerSnafu, + StartServerSnafu, UnexpectedSnafu, }; use crate::heartbeat::HeartbeatTask; use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS}; @@ -273,6 +279,18 @@ impl FlownodeBuilder { } } + pub fn table_meta(&self) -> TableMetadataManagerRef { + self.table_meta.clone() + } + + pub fn catalog_manager(&self) -> CatalogManagerRef { + self.catalog_manager.clone() + } + + pub fn flow_metadata_manager(&self) -> FlowMetadataManagerRef { + self.flow_metadata_manager.clone() + } + pub fn with_heartbeat_task(self, heartbeat_task: HeartbeatTask) -> Self { let (sender, receiver) = SizeReportSender::new(); Self { @@ -320,45 +338,20 @@ impl FlownodeBuilder { /// /// or recover all existing flow tasks if in standalone mode(nodeid is None) /// + /// return all flow ids that are successfully recovered + /// /// TODO(discord9): persistent flow tasks with internal state - async fn recover_flows(&self, manager: &FlowWorkerManagerRef) -> Result { + pub(crate) async fn recover_flows( + &self, + manager: &FlowWorkerManagerRef, + ) -> Result, Error> { let nodeid = self.opts.node_id; - let to_be_recovered: Vec<_> = if let Some(nodeid) = nodeid { - let to_be_recover = self - .flow_metadata_manager - .flownode_flow_manager() - .flows(nodeid) - .try_collect::>() - .await - .context(ListFlowsSnafu { id: Some(nodeid) })?; - to_be_recover.into_iter().map(|(id, _)| id).collect() - } else { - let all_catalogs = self - .catalog_manager - .catalog_names() - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - let mut all_flow_ids = vec![]; - for catalog in all_catalogs { - let flows = self - .flow_metadata_manager - .flow_name_manager() - .flow_names(&catalog) - .await - .try_collect::>() - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - - all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id())); - } - all_flow_ids - }; - let cnt = to_be_recovered.len(); + let to_be_recovered = + get_all_flow_ids(&self.flow_metadata_manager, &self.catalog_manager, nodeid).await?; + let mut did_recover = vec![]; // TODO(discord9): recover in parallel - for flow_id in to_be_recovered { + 'flow_id_loop: for flow_id in to_be_recovered { let info = self .flow_metadata_manager .flow_info_manager() @@ -368,34 +361,38 @@ impl FlownodeBuilder { .context(ExternalSnafu)? .context(FlowNotFoundSnafu { id: flow_id })?; - let sink_table_name = [ - info.sink_table_name().catalog_name.clone(), - info.sink_table_name().schema_name.clone(), - info.sink_table_name().table_name.clone(), - ]; - let args = CreateFlowArgs { - flow_id: flow_id as _, - sink_table_name, - source_table_ids: info.source_table_ids().to_vec(), - // because recover should only happen on restart the `create_if_not_exists` and `or_replace` can be arbitrary value(since flow doesn't exist) - // but for the sake of consistency and to make sure recover of flow actually happen, we set both to true - // (which is also fine since checks for not allow both to be true is on metasrv and we already pass that) - create_if_not_exists: true, - or_replace: true, - expire_after: info.expire_after(), - comment: Some(info.comment().clone()), - sql: info.raw_sql().clone(), - flow_options: info.options().clone(), - query_ctx: Some( - QueryContextBuilder::default() - .current_catalog(info.catalog_name().clone()) - .build(), - ), - }; - manager.create_flow(args).await?; + let source_table_ids = info.source_table_ids().to_vec(); + + for src in &source_table_ids { + if !manager.table_info_source().check_table_exist(src).await? { + common_telemetry::error!( + "source table_id={:?} not found, skip recover flow_id={:?}", + src, + flow_id + ); + continue 'flow_id_loop; + } + } + + let args = CreateFlowArgs::from_flow_info(flow_id as _, info, true, true); + + match manager.create_flow(args).await { + Ok(Some(res)) => { + did_recover.push(res as FlowId); + } + Ok(None) => { + warn!( + "Failed to recover flow_id={:?}, flow already exists", + flow_id + ); + } + Err(err) => { + common_telemetry::error!(err; "Failed to recover flow_id={:?}", flow_id); + } + } } - Ok(cnt) + Ok(did_recover) } /// build [`FlowWorkerManager`], note this doesn't take ownership of `self`, @@ -434,6 +431,7 @@ impl FlownodeBuilder { } } +#[derive(Clone)] pub struct FrontendInvoker { inserter: Arc, deleter: Arc, @@ -453,8 +451,11 @@ impl FrontendInvoker { } } + /// Build a frontend invoker, + /// + /// if `query_engine` is not specified, will build one without ability to execute ddl or flow, only insert/delete and query pub async fn build_from( - flow_worker_manager: FlowWorkerManagerRef, + query_engine: Option, catalog_manager: CatalogManagerRef, kv_backend: KvBackendRef, layered_cache_registry: LayeredCacheRegistryRef, @@ -489,7 +490,35 @@ impl FrontendInvoker { node_manager.clone(), )); - let query_engine = flow_worker_manager.query_engine.clone(); + let query_engine = if let Some(query_engine) = query_engine { + query_engine + } else { + // make frontend like query engine + let region_query_handler = + FlownodeRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone()); + + let requester = Arc::new(Requester::new( + catalog_manager.clone(), + partition_manager.clone(), + node_manager.clone(), + )); + + let table_mutation_handler = Arc::new(TableMutationOperator::new( + inserter.clone(), + deleter.clone(), + requester, + )); + + QueryEngineFactory::new( + catalog_manager.clone(), + Some(region_query_handler.clone()), + Some(table_mutation_handler), + None, + None, + true, + ) + .query_engine() + }; let statement_executor = Arc::new(StatementExecutor::new( catalog_manager.clone(), @@ -504,6 +533,10 @@ impl FrontendInvoker { let invoker = FrontendInvoker::new(inserter, deleter, statement_executor); Ok(invoker) } + + pub fn statement_executor(&self) -> Arc { + self.statement_executor.clone() + } } impl FrontendInvoker { @@ -538,8 +571,99 @@ impl FrontendInvoker { .map_err(BoxedError::new) .context(common_frontend::error::ExternalSnafu) } +} - pub fn statement_executor(&self) -> Arc { - self.statement_executor.clone() +/// get all flow ids in this flownode +pub(crate) async fn get_all_flow_ids( + flow_metadata_manager: &FlowMetadataManagerRef, + catalog_manager: &CatalogManagerRef, + nodeid: Option, +) -> Result, Error> { + let ret = if let Some(nodeid) = nodeid { + let flow_ids_one_node = flow_metadata_manager + .flownode_flow_manager() + .flows(nodeid) + .try_collect::>() + .await + .context(ListFlowsSnafu { id: Some(nodeid) })?; + flow_ids_one_node.into_iter().map(|(id, _)| id).collect() + } else { + let all_catalogs = catalog_manager + .catalog_names() + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let mut all_flow_ids = vec![]; + for catalog in all_catalogs { + let flows = flow_metadata_manager + .flow_name_manager() + .flow_names(&catalog) + .await + .try_collect::>() + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id())); + } + all_flow_ids + }; + + Ok(ret) +} + +/// a makeshift region query handler so that flownode can proactively query for +/// data it want +pub(crate) struct FlownodeRegionQueryHandler { + partition_manager: PartitionRuleManagerRef, + node_manager: NodeManagerRef, +} + +impl FlownodeRegionQueryHandler { + pub fn arc( + partition_manager: PartitionRuleManagerRef, + node_manager: NodeManagerRef, + ) -> Arc { + Arc::new(Self { + partition_manager, + node_manager, + }) + } +} + +#[async_trait::async_trait] +impl RegionQueryHandler for FlownodeRegionQueryHandler { + async fn do_get( + &self, + request: QueryRequest, + ) -> query::error::Result { + self.do_get_inner(request) + .await + .map_err(BoxedError::new) + .context(query::error::RegionQuerySnafu) + } +} + +impl FlownodeRegionQueryHandler { + async fn do_get_inner( + &self, + request: QueryRequest, + ) -> Result { + let region_id = request.region_id; + + let peer = &self + .partition_manager + .find_region_leader(region_id) + .await + .context(FindTableRouteSnafu { + table_id: region_id.table_id(), + })?; + + let client = self.node_manager.datanode(peer).await; + + client + .handle_query(request) + .await + .context(RequestQuerySnafu) } } diff --git a/src/flow/src/test_utils.rs b/src/flow/src/test_utils.rs new file mode 100644 index 000000000000..585fa233016c --- /dev/null +++ b/src/flow/src/test_utils.rs @@ -0,0 +1,154 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use catalog::RegisterTableRequest; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID}; +use datatypes::data_type::ConcreteDataType as CDT; +use datatypes::prelude::*; +use datatypes::schema::Schema; +use datatypes::timestamp::TimestampMillisecond; +use datatypes::vectors::{TimestampMillisecondVectorBuilder, VectorRef}; +use itertools::Itertools; +use prost::Message; +use query::parser::QueryLanguageParser; +use query::query_engine::DefaultSerializer; +use query::QueryEngine; +use session::context::QueryContext; +/// note here we are using the `substrait_proto_df` crate from the `substrait` module and +/// rename it to `substrait_proto` +use substrait::substrait_proto_df as substrait_proto; +use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; +use substrait_proto::proto; +use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; +use table::test_util::MemTable; + +use crate::adapter::node_context::IdToNameMap; +use crate::adapter::FlownodeContext; +use crate::df_optimizer::apply_df_optimizer; +use crate::expr::GlobalId; +use crate::repr::{ColumnType, RelationType}; +use crate::transform::register_function_to_query_engine; + +pub fn create_test_ctx() -> FlownodeContext { + let mut schemas = HashMap::new(); + let mut tri_map = IdToNameMap::new(); + { + let gid = GlobalId::User(0); + let name = [ + "greptime".to_string(), + "public".to_string(), + "numbers".to_string(), + ]; + let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]); + + tri_map.insert(Some(name.clone()), Some(1024), gid); + schemas.insert(gid, schema.into_named(vec![Some("number".to_string())])); + } + + { + let gid = GlobalId::User(1); + let name = [ + "greptime".to_string(), + "public".to_string(), + "numbers_with_ts".to_string(), + ]; + let schema = RelationType::new(vec![ + ColumnType::new(CDT::uint32_datatype(), false), + ColumnType::new(CDT::timestamp_millisecond_datatype(), false), + ]); + schemas.insert( + gid, + schema.into_named(vec![Some("number".to_string()), Some("ts".to_string())]), + ); + tri_map.insert(Some(name.clone()), Some(1025), gid); + } + + FlownodeContext { + schema: schemas, + table_repr: tri_map, + query_context: Some(Arc::new(QueryContext::with("greptime", "public"))), + ..Default::default() + } +} + +pub fn create_test_query_engine() -> Arc { + let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap(); + let req = RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: NUMBERS_TABLE_NAME.to_string(), + table_id: NUMBERS_TABLE_ID, + table: NumbersTable::table(NUMBERS_TABLE_ID), + }; + catalog_list.register_table_sync(req).unwrap(); + + let schema = vec![ + datatypes::schema::ColumnSchema::new("number", CDT::uint32_datatype(), false), + datatypes::schema::ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false), + ]; + let mut columns = vec![]; + let numbers = (1..=10).collect_vec(); + let column: VectorRef = Arc::new(::VectorType::from_vec(numbers)); + columns.push(column); + + let ts = (1..=10).collect_vec(); + let mut builder = TimestampMillisecondVectorBuilder::with_capacity(10); + ts.into_iter() + .map(|v| builder.push(Some(TimestampMillisecond::new(v)))) + .count(); + let column: VectorRef = builder.to_vector_cloned(); + columns.push(column); + + let schema = Arc::new(Schema::new(schema)); + let recordbatch = common_recordbatch::RecordBatch::new(schema, columns).unwrap(); + let table = MemTable::table("numbers_with_ts", recordbatch); + + let req_with_ts = RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "numbers_with_ts".to_string(), + table_id: 1024, + table, + }; + catalog_list.register_table_sync(req_with_ts).unwrap(); + + let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, None, false); + + let engine = factory.query_engine(); + register_function_to_query_engine(&engine); + + assert_eq!("datafusion", engine.name()); + engine +} + +pub async fn sql_to_substrait(engine: Arc, sql: &str) -> proto::Plan { + // let engine = create_test_query_engine(); + let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap(); + let plan = engine + .planner() + .plan(&stmt, QueryContext::arc()) + .await + .unwrap(); + let plan = apply_df_optimizer(plan).await.unwrap(); + + // encode then decode so to rely on the impl of conversion from logical plan to substrait plan + let bytes = DFLogicalSubstraitConvertor {} + .encode(&plan, DefaultSerializer) + .unwrap(); + + proto::Plan::decode(bytes).unwrap() +} diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index 94878115cf8b..e2a9c2ff8059 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -152,144 +152,12 @@ impl common_function::function::Function for TumbleFunction { #[cfg(test)] mod test { - use std::sync::Arc; - use catalog::RegisterTableRequest; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID}; - use datatypes::prelude::*; - use datatypes::schema::Schema; - use datatypes::timestamp::TimestampMillisecond; - use datatypes::vectors::{TimestampMillisecondVectorBuilder, VectorRef}; - use itertools::Itertools; - use prost::Message; use query::parser::QueryLanguageParser; - use query::query_engine::DefaultSerializer; - use query::QueryEngine; use session::context::QueryContext; - use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; - use substrait_proto::proto; - use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; - use table::test_util::MemTable; - use super::*; - use crate::adapter::node_context::IdToNameMap; use crate::df_optimizer::apply_df_optimizer; - use crate::expr::GlobalId; - use crate::repr::{ColumnType, RelationType}; - - pub fn create_test_ctx() -> FlownodeContext { - let mut schemas = HashMap::new(); - let mut tri_map = IdToNameMap::new(); - { - let gid = GlobalId::User(0); - let name = [ - "greptime".to_string(), - "public".to_string(), - "numbers".to_string(), - ]; - let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]); - - tri_map.insert(Some(name.clone()), Some(1024), gid); - schemas.insert(gid, schema.into_named(vec![Some("number".to_string())])); - } - - { - let gid = GlobalId::User(1); - let name = [ - "greptime".to_string(), - "public".to_string(), - "numbers_with_ts".to_string(), - ]; - let schema = RelationType::new(vec![ - ColumnType::new(CDT::uint32_datatype(), false), - ColumnType::new(CDT::timestamp_millisecond_datatype(), false), - ]); - schemas.insert( - gid, - schema.into_named(vec![Some("number".to_string()), Some("ts".to_string())]), - ); - tri_map.insert(Some(name.clone()), Some(1025), gid); - } - - FlownodeContext { - schema: schemas, - table_repr: tri_map, - query_context: Some(Arc::new(QueryContext::with("greptime", "public"))), - ..Default::default() - } - } - - pub fn create_test_query_engine() -> Arc { - let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap(); - let req = RegisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: NUMBERS_TABLE_NAME.to_string(), - table_id: NUMBERS_TABLE_ID, - table: NumbersTable::table(NUMBERS_TABLE_ID), - }; - catalog_list.register_table_sync(req).unwrap(); - - let schema = vec![ - datatypes::schema::ColumnSchema::new("number", CDT::uint32_datatype(), false), - datatypes::schema::ColumnSchema::new( - "ts", - CDT::timestamp_millisecond_datatype(), - false, - ), - ]; - let mut columns = vec![]; - let numbers = (1..=10).collect_vec(); - let column: VectorRef = Arc::new(::VectorType::from_vec(numbers)); - columns.push(column); - - let ts = (1..=10).collect_vec(); - let mut builder = TimestampMillisecondVectorBuilder::with_capacity(10); - ts.into_iter() - .map(|v| builder.push(Some(TimestampMillisecond::new(v)))) - .count(); - let column: VectorRef = builder.to_vector_cloned(); - columns.push(column); - - let schema = Arc::new(Schema::new(schema)); - let recordbatch = common_recordbatch::RecordBatch::new(schema, columns).unwrap(); - let table = MemTable::table("numbers_with_ts", recordbatch); - - let req_with_ts = RegisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: "numbers_with_ts".to_string(), - table_id: 1024, - table, - }; - catalog_list.register_table_sync(req_with_ts).unwrap(); - - let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, None, false); - - let engine = factory.query_engine(); - register_function_to_query_engine(&engine); - - assert_eq!("datafusion", engine.name()); - engine - } - - pub async fn sql_to_substrait(engine: Arc, sql: &str) -> proto::Plan { - // let engine = create_test_query_engine(); - let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap(); - let plan = engine - .planner() - .plan(&stmt, QueryContext::arc()) - .await - .unwrap(); - let plan = apply_df_optimizer(plan).await.unwrap(); - - // encode then decode so to rely on the impl of conversion from logical plan to substrait plan - let bytes = DFLogicalSubstraitConvertor {} - .encode(&plan, DefaultSerializer) - .unwrap(); - - proto::Plan::decode(bytes).unwrap() - } + use crate::test_utils::create_test_query_engine; /// TODO(discord9): add more illegal sql tests #[tokio::test] diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index b944e3b263e3..33ee1fa284b9 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -358,7 +358,7 @@ mod test { use crate::expr::{BinaryFunc, DfScalarFunction, GlobalId, RawDfScalarFn}; use crate::plan::{Plan, TypedPlan}; use crate::repr::{ColumnType, RelationType}; - use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + use crate::test_utils::{create_test_ctx, create_test_query_engine, sql_to_substrait}; use crate::transform::CDT; #[tokio::test] diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index de05b018ac51..b461b5e5e1b3 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -545,7 +545,7 @@ mod test { use crate::expr::{GlobalId, MapFilterProject}; use crate::plan::{Plan, TypedPlan}; use crate::repr::{self, ColumnType, RelationType}; - use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + use crate::test_utils::{create_test_ctx, create_test_query_engine, sql_to_substrait}; /// test if `WHERE` condition can be converted to Flow's ScalarExpr in mfp's filter #[tokio::test] diff --git a/src/flow/src/transform/literal.rs b/src/flow/src/transform/literal.rs index ffb62ff14dba..7ad5db6f46d3 100644 --- a/src/flow/src/transform/literal.rs +++ b/src/flow/src/transform/literal.rs @@ -346,7 +346,7 @@ mod test { use super::*; use crate::plan::{Plan, TypedPlan}; use crate::repr::{self, ColumnType, RelationType}; - use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + use crate::test_utils::{create_test_ctx, create_test_query_engine, sql_to_substrait}; /// test if literal in substrait plan can be correctly converted to flow plan #[tokio::test] async fn test_literal() { diff --git a/src/flow/src/transform/plan.rs b/src/flow/src/transform/plan.rs index ad5fc2f58dc2..872342edc8b3 100644 --- a/src/flow/src/transform/plan.rs +++ b/src/flow/src/transform/plan.rs @@ -238,7 +238,7 @@ mod test { use crate::expr::GlobalId; use crate::plan::{Plan, TypedPlan}; use crate::repr::{ColumnType, RelationType}; - use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + use crate::test_utils::{create_test_ctx, create_test_query_engine, sql_to_substrait}; use crate::transform::CDT; #[tokio::test] diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index be17ee810fc0..fea67e31b854 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -196,6 +196,10 @@ impl Instance { pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef { &self.table_metadata_manager } + + pub fn query_engine(&self) -> QueryEngineRef { + self.query_engine.clone() + } } #[async_trait] diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index cc2458fa99cb..50c7127d74b8 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -209,7 +209,7 @@ impl GreptimeDbStandaloneBuilder { memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), table_metadata_manager, table_metadata_allocator, - flow_metadata_manager, + flow_metadata_manager: flow_metadata_manager.clone(), flow_metadata_allocator, region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), }, @@ -235,7 +235,7 @@ impl GreptimeDbStandaloneBuilder { let flow_worker_manager = flownode.flow_worker_manager(); let invoker = flow::FrontendInvoker::build_from( - flow_worker_manager.clone(), + Some(instance.query_engine()), catalog_manager.clone(), kv_backend.clone(), cache_registry.clone(), @@ -248,6 +248,11 @@ impl GreptimeDbStandaloneBuilder { flow_worker_manager.set_frontend_invoker(invoker).await; + flow_worker_manager + .create_and_start_refill_flow_tasks(&flow_metadata_manager, &(catalog_manager as _)) + .await + .unwrap(); + procedure_manager.start().await.unwrap(); wal_options_allocator.start().await.unwrap(); diff --git a/tests/cases/standalone/common/flow/flow_rebuild.result b/tests/cases/standalone/common/flow/flow_rebuild.result index 67fd43a03288..0ecc1726eea1 100644 --- a/tests/cases/standalone/common/flow/flow_rebuild.result +++ b/tests/cases/standalone/common/flow/flow_rebuild.result @@ -463,7 +463,7 @@ SELECT wildcard FROM out_basic; +----------+ | wildcard | +----------+ -| 3 | +| 6 | +----------+ DROP TABLE input_basic; @@ -561,7 +561,7 @@ SELECT wildcard FROM out_basic; +----------+ | wildcard | +----------+ -| 3 | +| 5 | +----------+ DROP FLOW test_wildcard_basic; diff --git a/tests/cases/standalone/common/flow/flow_refill.result b/tests/cases/standalone/common/flow/flow_refill.result new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/cases/standalone/common/flow/flow_refill.sql b/tests/cases/standalone/common/flow/flow_refill.sql new file mode 100644 index 000000000000..e545576e5f76 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_refill.sql @@ -0,0 +1 @@ +-- testing flow refill after reboot \ No newline at end of file diff --git a/tests/cases/standalone/common/flow/show_create_flow.result b/tests/cases/standalone/common/flow/show_create_flow.result index 14e80129446d..02a3efb997c4 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.result +++ b/tests/cases/standalone/common/flow/show_create_flow.result @@ -303,7 +303,11 @@ SELECT * FROM out_num_cnt_show; +--------+-------------------------+ | -2 | 1970-01-01T00:00:00.002 | | -1 | 1970-01-01T00:00:00.003 | +| 3 | 1970-01-01T00:00:00.001 | | 4 | 1970-01-01T00:00:00.002 | +| 4 | 1970-01-01T00:00:00.004 | +| 5 | 1970-01-01T00:00:00.004 | +| 10 | 1970-01-01T00:00:00 | | 10 | 1970-01-01T00:00:00.003 | | 11 | 1970-01-01T00:00:00.004 | | 15 | 1970-01-01T00:00:00.001 | diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 81bbe2fb0b07..4046f7df3828 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -482,6 +482,7 @@ impl Env { Env::stop_server(server_process); } } + if is_full_restart { if let Some(mut metasrv_process) = db.metasrv_process.lock().expect("poisoned lock").take() @@ -493,11 +494,12 @@ impl Env { { Env::stop_server(&mut frontend_process); } - if let Some(mut flownode_process) = - db.flownode_process.lock().expect("poisoned lock").take() - { - Env::stop_server(&mut flownode_process); - } + } + + if let Some(mut flownode_process) = + db.flownode_process.lock().expect("poisoned lock").take() + { + Env::stop_server(&mut flownode_process); } } @@ -531,13 +533,13 @@ impl Env { .lock() .expect("lock poisoned") .replace(frontend); - - let flownode = self.start_server("flownode", &db.ctx, false).await; - db.flownode_process - .lock() - .expect("lock poisoned") - .replace(flownode); } + let flownode = self.start_server("flownode", &db.ctx, false).await; + db.flownode_process + .lock() + .expect("lock poisoned") + .replace(flownode); + processes };