diff --git a/src/catalog/src/system_schema/information_schema/flows.rs b/src/catalog/src/system_schema/information_schema/flows.rs index 728e959a439c..36721f70e827 100644 --- a/src/catalog/src/system_schema/information_schema/flows.rs +++ b/src/catalog/src/system_schema/information_schema/flows.rs @@ -212,7 +212,7 @@ impl InformationSchemaFlowsBuilder { .flow_names(&catalog_name) .await; - let flow_state_size = { + let flow_stat = { let information_extension = utils::information_extension(&self.catalog_manager).unwrap(); information_extension.flow_stats().await?.clone() @@ -236,7 +236,7 @@ impl InformationSchemaFlowsBuilder { catalog_name: catalog_name.to_string(), flow_name: flow_name.to_string(), })?; - self.add_flow(&predicates, flow_id.flow_id(), flow_info, &flow_state_size)?; + self.add_flow(&predicates, flow_id.flow_id(), flow_info, &flow_stat)?; } self.finish() @@ -247,7 +247,7 @@ impl InformationSchemaFlowsBuilder { predicates: &Predicates, flow_id: FlowId, flow_info: FlowInfoValue, - flow_state_size: &Option, + flow_stat: &Option, ) -> Result<()> { let row = [ (FLOW_NAME, &Value::from(flow_info.flow_name().to_string())), @@ -263,7 +263,7 @@ impl InformationSchemaFlowsBuilder { self.flow_names.push(Some(flow_info.flow_name())); self.flow_ids.push(Some(flow_id)); self.state_sizes.push( - flow_state_size + flow_stat .as_ref() .and_then(|state| state.state_size.get(&flow_id).map(|v| *v as u64)), ); diff --git a/src/common/meta/src/key/flow/flow_state.rs b/src/common/meta/src/key/flow/flow_state.rs index 5b43db4bf11d..eeb4b06f0132 100644 --- a/src/common/meta/src/key/flow/flow_state.rs +++ b/src/common/meta/src/key/flow/flow_state.rs @@ -115,11 +115,6 @@ pub struct FlowStateManager { in_memory: KvBackendRef, } -#[async_trait::async_trait] -pub trait LocalFlowStateReporter: Send + Sync { - async fn report(&self) -> FlowStateValue; -} - impl FlowStateManager { pub fn new(in_memory: KvBackendRef) -> Self { Self { in_memory } diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 21c64cc7ee6f..671d40dcce81 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -23,7 +23,6 @@ use std::time::{Duration, Instant, SystemTime}; use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests}; use common_config::Configurable; use common_error::ext::BoxedError; -use common_meta::key::flow::flow_state::{FlowStat, FlowStateValue, LocalFlowStateReporter}; use common_meta::key::TableMetadataManagerRef; use common_runtime::JoinHandle; use common_telemetry::logging::{LoggingOptions, TracingOptions}; @@ -52,17 +51,16 @@ use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; use crate::compute::ErrCollector; use crate::df_optimizer::sql_to_flow_plan; use crate::error::{ - EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowOutputTypeMismatchSnafu, InternalSnafu, - TableNotFoundSnafu, UnexpectedSnafu, + EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, TableNotFoundSnafu, + UnexpectedSnafu, }; use crate::expr::{Batch, GlobalId}; use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_RUN_INTERVAL_MS}; use crate::repr::{self, DiffRow, Row, BATCH_SIZE}; -// TODO(discord9): change type mismatch information to a more user friendly message in this PR - mod flownode_impl; mod parse_expr; +mod stat; #[cfg(test)] mod tests; mod util; @@ -269,16 +267,8 @@ impl FlowWorkerManager { let (catalog, schema) = (table_name[0].clone(), table_name[1].clone()); let ctx = Arc::new(QueryContext::with(&catalog, &schema)); - let first_row = reqs - .first() - .and_then(|r| match r { - DiffRequest::Insert(rows) => rows.first(), - DiffRequest::Delete(rows) => rows.first(), - }) - .map(|(row, _ts)| row); - let (is_ts_placeholder, proto_schema) = self - .try_fetch_or_create_table(&table_name, first_row) - .await?; + let (is_ts_placeholder, proto_schema) = + self.try_fetch_or_create_table(&table_name).await?; let schema_len = proto_schema.len(); trace!( @@ -426,7 +416,6 @@ impl FlowWorkerManager { async fn try_fetch_or_create_table( &self, table_name: &TableName, - first_row: Option<&Row>, ) -> Result<(bool, Vec), Error> { // TODO(discord9): instead of auto build table from request schema, actually build table // before `create flow` to be able to assign pk and ts etc. @@ -536,51 +525,13 @@ impl FlowWorkerManager { (primary_keys, with_auto_added_col, no_time_index) }; - if let Some(first_row) = first_row { - for (expected_type, actual_type) in schema.iter().zip(first_row.iter()) { - ensure!( - expected_type.data_type == actual_type.data_type() || actual_type.is_null(), - FlowOutputTypeMismatchSnafu { - expected: expected_type.data_type.clone(), - actual: actual_type.data_type() - } - ); - } - } - let proto_schema = column_schemas_to_proto(schema, &primary_keys)?; Ok((is_ts_placeholder, proto_schema)) } } -#[async_trait::async_trait] -impl LocalFlowStateReporter for FlowWorkerManager { - async fn report(&self) -> common_meta::key::flow::flow_state::FlowStateValue { - FlowStateValue::new(self.gen_state_report().await.state_size) - } -} - /// Flow Runtime related methods impl FlowWorkerManager { - pub async fn gen_state_report(&self) -> FlowStat { - let mut full_report = BTreeMap::new(); - for worker in self.worker_handles.iter() { - let worker = worker.lock().await; - match worker.get_state_size().await { - Ok(state_size) => { - full_report.extend(state_size.into_iter().map(|(k, v)| (k as u32, v))) - } - Err(err) => { - common_telemetry::error!(err; "Get state size error"); - } - } - } - - FlowStat { - state_size: full_report, - } - } - /// Start state report handler, which will receive a sender from HeartbeatTask to send state size report back /// /// if heartbeat task is shutdown, this future will exit too diff --git a/src/flow/src/adapter/stat.rs b/src/flow/src/adapter/stat.rs new file mode 100644 index 000000000000..c719e35f3ca9 --- /dev/null +++ b/src/flow/src/adapter/stat.rs @@ -0,0 +1,40 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use common_meta::key::flow::flow_state::FlowStat; + +use crate::FlowWorkerManager; + +impl FlowWorkerManager { + pub async fn gen_state_report(&self) -> FlowStat { + let mut full_report = BTreeMap::new(); + for worker in self.worker_handles.iter() { + let worker = worker.lock().await; + match worker.get_state_size().await { + Ok(state_size) => { + full_report.extend(state_size.into_iter().map(|(k, v)| (k as u32, v))) + } + Err(err) => { + common_telemetry::error!(err; "Get flow stat size error"); + } + } + } + + FlowStat { + state_size: full_report, + } + } +} diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index b4733ef49265..a94de4b6ed7b 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -21,7 +21,6 @@ use common_error::ext::BoxedError; use common_macro::stack_trace_debug; use common_telemetry::common_error::ext::ErrorExt; use common_telemetry::common_error::status_code::StatusCode; -use datatypes::prelude::ConcreteDataType; use snafu::{Location, Snafu}; use crate::adapter::FlowId; @@ -46,18 +45,6 @@ pub enum Error { location: Location, }, - #[snafu(display( - "Flow output type mismatch: expected: {:?}, actual: {:?}", - expected, - actual - ))] - FlowOutputTypeMismatch { - expected: ConcreteDataType, - actual: ConcreteDataType, - #[snafu(implicit)] - location: Location, - }, - /// TODO(discord9): add detailed location of column #[snafu(display("Failed to eval stream"))] Eval { @@ -226,9 +213,7 @@ impl ErrorExt for Error { source.status_code() } Self::MetaClientInit { source, .. } => source.status_code(), - Self::ParseAddr { .. } | Self::FlowOutputTypeMismatch { .. } => { - StatusCode::InvalidArguments - } + Self::ParseAddr { .. } => StatusCode::InvalidArguments, } } diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result index c4184941dabb..fa360a6de684 100644 --- a/tests/cases/standalone/common/flow/flow_basic.result +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -1047,15 +1047,15 @@ FROM -- Test if FLOWS table works, but don't care about the result since it vary from runs SELECT - 1 + count(CASE WHEN state_size > 0 THEN 1 ELSE 0 END) as active_flows, FROM INFORMATION_SCHEMA.FLOWS; -+----------+ -| Int64(1) | -+----------+ -| 1 | -+----------+ ++--------------+ +| active_flows | ++--------------+ +| 1 | ++--------------+ DROP FLOW requests_long_term; diff --git a/tests/cases/standalone/common/flow/flow_basic.sql b/tests/cases/standalone/common/flow/flow_basic.sql index 2e890098d095..8946c014be36 100644 --- a/tests/cases/standalone/common/flow/flow_basic.sql +++ b/tests/cases/standalone/common/flow/flow_basic.sql @@ -571,7 +571,7 @@ FROM -- Test if FLOWS table works, but don't care about the result since it vary from runs SELECT - 1 + count(CASE WHEN state_size > 0 THEN 1 ELSE 0 END) as active_flows, FROM INFORMATION_SCHEMA.FLOWS;