diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 6d70377cf2aa..eeefe019c1fd 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -55,7 +55,7 @@ use crate::error::{ UnexpectedSnafu, }; use crate::expr::{Batch, GlobalId}; -use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_RUN_INTERVAL_MS}; +use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS}; use crate::repr::{self, DiffRow, Row, BATCH_SIZE}; mod flownode_impl; @@ -249,12 +249,18 @@ impl FlowWorkerManager { self.try_fetch_or_create_table(&table_name).await?; let schema_len = proto_schema.len(); + let total_rows = reqs.iter().map(|r| r.len()).sum::(); trace!( "Sending {} writeback requests to table {}, reqs total rows={}", reqs.len(), table_name.join("."), reqs.iter().map(|r| r.len()).sum::() ); + + METRIC_FLOW_ROWS + .with_label_values(&["out"]) + .inc_by(total_rows as u64); + let now = self.tick_manager.tick(); for req in reqs { match req { diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 1fa11b4d83a2..1f431ff92f64 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -138,7 +138,7 @@ impl Flownode for FlowWorkerManager { } async fn handle_inserts(&self, request: InsertRequests) -> Result { - // using try_read makesure two things: + // using try_read to ensure two things: // 1. flush wouldn't happen until inserts before it is inserted // 2. inserts happening concurrently with flush wouldn't be block by flush let _flush_lock = self.flush_lock.try_read(); diff --git a/src/flow/src/compute/types.rs b/src/flow/src/compute/types.rs index e125a2d27261..b92ec30f9f6a 100644 --- a/src/flow/src/compute/types.rs +++ b/src/flow/src/compute/types.rs @@ -17,6 +17,7 @@ use std::collections::{BTreeMap, VecDeque}; use std::rc::Rc; use std::sync::Arc; +use common_error::ext::ErrorExt; use hydroflow::scheduled::graph::Hydroflow; use hydroflow::scheduled::handoff::TeeingHandoff; use hydroflow::scheduled::port::RecvPort; @@ -25,6 +26,7 @@ use itertools::Itertools; use tokio::sync::Mutex; use crate::expr::{Batch, EvalError, ScalarExpr}; +use crate::metrics::METRIC_FLOW_ERRORS; use crate::repr::DiffRow; use crate::utils::ArrangeHandler; @@ -185,6 +187,9 @@ impl ErrCollector { } pub fn push_err(&self, err: EvalError) { + METRIC_FLOW_ERRORS + .with_label_values(&[err.status_code().as_ref()]) + .inc(); self.inner.blocking_lock().push_back(err) } diff --git a/src/flow/src/expr/error.rs b/src/flow/src/expr/error.rs index 992d5c592125..b29098b9d8fa 100644 --- a/src/flow/src/expr/error.rs +++ b/src/flow/src/expr/error.rs @@ -14,8 +14,11 @@ //! Error handling for expression evaluation. +use std::any::Any; + use arrow_schema::ArrowError; -use common_error::ext::BoxedError; +use common_error::ext::{BoxedError, ErrorExt}; +use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use datafusion_common::DataFusionError; use datatypes::data_type::ConcreteDataType; @@ -126,3 +129,29 @@ pub enum EvalError { source: BoxedError, }, } + +impl ErrorExt for EvalError { + fn status_code(&self) -> StatusCode { + use EvalError::*; + match self { + DivisionByZero { .. } + | TypeMismatch { .. } + | TryFromValue { .. } + | DataAlreadyExpired { .. } + | InvalidArgument { .. } + | Overflow { .. } => StatusCode::InvalidArguments, + + CastValue { source, .. } | DataType { source, .. } => source.status_code(), + + Internal { .. } + | Optimize { .. } + | Arrow { .. } + | Datafusion { .. } + | External { .. } => StatusCode::Internal, + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/flow/src/metrics.rs b/src/flow/src/metrics.rs index 119b4c5856de..f165bcadc6a4 100644 --- a/src/flow/src/metrics.rs +++ b/src/flow/src/metrics.rs @@ -30,4 +30,22 @@ lazy_static! { .unwrap(); pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge = register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap(); + pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!( + "greptime_flow_processed_rows", + "Count of rows flowing through the system", + &["direction"] + ) + .unwrap(); + pub static ref METRIC_FLOW_PROCESSING_TIME: HistogramVec = register_histogram_vec!( + "greptime_flow_processing_time", + "Time spent processing requests", + &["type"] + ) + .unwrap(); + pub static ref METRIC_FLOW_ERRORS: IntCounterVec = register_int_counter_vec!( + "greptime_flow_errors", + "Count of errors in flow processing", + &["code"] + ) + .unwrap(); } diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 1259c1175510..37e3249ec82a 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -54,6 +54,7 @@ use crate::error::{ ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, }; use crate::heartbeat::HeartbeatTask; +use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS}; use crate::transform::register_function_to_query_engine; use crate::utils::{SizeReportSender, StateReportHandler}; use crate::{Error, FlowWorkerManager, FlownodeOptions}; @@ -77,6 +78,10 @@ impl flow_server::Flow for FlowService { &self, request: Request, ) -> Result, Status> { + let _timer = METRIC_FLOW_PROCESSING_TIME + .with_label_values(&["ddl"]) + .start_timer(); + let request = request.into_inner(); self.manager .handle(request) @@ -92,18 +97,31 @@ impl flow_server::Flow for FlowService { &self, request: Request, ) -> Result, Status> { + let _timer = METRIC_FLOW_PROCESSING_TIME + .with_label_values(&["insert"]) + .start_timer(); + let request = request.into_inner(); // TODO(discord9): fix protobuf import order shenanigans to remove this duplicated define + let mut row_count = 0; let request = api::v1::region::InsertRequests { requests: request .requests .into_iter() - .map(|insert| api::v1::region::InsertRequest { - region_id: insert.region_id, - rows: insert.rows, + .map(|insert| { + insert.rows.as_ref().inspect(|x| row_count += x.rows.len()); + api::v1::region::InsertRequest { + region_id: insert.region_id, + rows: insert.rows, + } }) .collect_vec(), }; + + METRIC_FLOW_ROWS + .with_label_values(&["in"]) + .inc_by(row_count as u64); + self.manager .handle_inserts(request) .await @@ -500,6 +518,10 @@ impl FrontendInvoker { requests: RowInsertRequests, ctx: QueryContextRef, ) -> common_frontend::error::Result { + let _timer = METRIC_FLOW_PROCESSING_TIME + .with_label_values(&["output_insert"]) + .start_timer(); + self.inserter .handle_row_inserts(requests, ctx, &self.statement_executor) .await @@ -512,6 +534,10 @@ impl FrontendInvoker { requests: RowDeleteRequests, ctx: QueryContextRef, ) -> common_frontend::error::Result { + let _timer = METRIC_FLOW_PROCESSING_TIME + .with_label_values(&["output_delete"]) + .start_timer(); + self.deleter .handle_row_deletes(requests, ctx) .await