Skip to content

Commit

Permalink
feat: add some critical metrics to flownode (#5235)
Browse files Browse the repository at this point in the history
* feat: add some critical metrics to flownode

Signed-off-by: Ruihang Xia <[email protected]>

* fix clippy

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored and evenyag committed Jan 3, 2025
1 parent 327d165 commit c22ca3e
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 6 deletions.
8 changes: 7 additions & 1 deletion src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use crate::error::{
UnexpectedSnafu,
};
use crate::expr::{Batch, GlobalId};
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_RUN_INTERVAL_MS};
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
use crate::repr::{self, DiffRow, Row, BATCH_SIZE};

mod flownode_impl;
Expand Down Expand Up @@ -249,12 +249,18 @@ impl FlowWorkerManager {
self.try_fetch_or_create_table(&table_name).await?;
let schema_len = proto_schema.len();

let total_rows = reqs.iter().map(|r| r.len()).sum::<usize>();
trace!(
"Sending {} writeback requests to table {}, reqs total rows={}",
reqs.len(),
table_name.join("."),
reqs.iter().map(|r| r.len()).sum::<usize>()
);

METRIC_FLOW_ROWS
.with_label_values(&["out"])
.inc_by(total_rows as u64);

let now = self.tick_manager.tick();
for req in reqs {
match req {
Expand Down
2 changes: 1 addition & 1 deletion src/flow/src/adapter/flownode_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl Flownode for FlowWorkerManager {
}

async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse> {
// using try_read makesure two things:
// using try_read to ensure two things:
// 1. flush wouldn't happen until inserts before it is inserted
// 2. inserts happening concurrently with flush wouldn't be block by flush
let _flush_lock = self.flush_lock.try_read();
Expand Down
5 changes: 5 additions & 0 deletions src/flow/src/compute/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::{BTreeMap, VecDeque};
use std::rc::Rc;
use std::sync::Arc;

use common_error::ext::ErrorExt;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::handoff::TeeingHandoff;
use hydroflow::scheduled::port::RecvPort;
Expand All @@ -25,6 +26,7 @@ use itertools::Itertools;
use tokio::sync::Mutex;

use crate::expr::{Batch, EvalError, ScalarExpr};
use crate::metrics::METRIC_FLOW_ERRORS;
use crate::repr::DiffRow;
use crate::utils::ArrangeHandler;

Expand Down Expand Up @@ -185,6 +187,9 @@ impl ErrCollector {
}

pub fn push_err(&self, err: EvalError) {
METRIC_FLOW_ERRORS
.with_label_values(&[err.status_code().as_ref()])
.inc();
self.inner.blocking_lock().push_back(err)
}

Expand Down
31 changes: 30 additions & 1 deletion src/flow/src/expr/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@

//! Error handling for expression evaluation.
use std::any::Any;

use arrow_schema::ArrowError;
use common_error::ext::BoxedError;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datafusion_common::DataFusionError;
use datatypes::data_type::ConcreteDataType;
Expand Down Expand Up @@ -126,3 +129,29 @@ pub enum EvalError {
source: BoxedError,
},
}

impl ErrorExt for EvalError {
fn status_code(&self) -> StatusCode {
use EvalError::*;
match self {
DivisionByZero { .. }
| TypeMismatch { .. }
| TryFromValue { .. }
| DataAlreadyExpired { .. }
| InvalidArgument { .. }
| Overflow { .. } => StatusCode::InvalidArguments,

CastValue { source, .. } | DataType { source, .. } => source.status_code(),

Internal { .. }
| Optimize { .. }
| Arrow { .. }
| Datafusion { .. }
| External { .. } => StatusCode::Internal,
}
}

fn as_any(&self) -> &dyn Any {
self
}
}
18 changes: 18 additions & 0 deletions src/flow/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,22 @@ lazy_static! {
.unwrap();
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
"greptime_flow_processed_rows",
"Count of rows flowing through the system",
&["direction"]
)
.unwrap();
pub static ref METRIC_FLOW_PROCESSING_TIME: HistogramVec = register_histogram_vec!(
"greptime_flow_processing_time",
"Time spent processing requests",
&["type"]
)
.unwrap();
pub static ref METRIC_FLOW_ERRORS: IntCounterVec = register_int_counter_vec!(
"greptime_flow_errors",
"Count of errors in flow processing",
&["code"]
)
.unwrap();
}
32 changes: 29 additions & 3 deletions src/flow/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use crate::error::{
ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
};
use crate::heartbeat::HeartbeatTask;
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
use crate::transform::register_function_to_query_engine;
use crate::utils::{SizeReportSender, StateReportHandler};
use crate::{Error, FlowWorkerManager, FlownodeOptions};
Expand All @@ -77,6 +78,10 @@ impl flow_server::Flow for FlowService {
&self,
request: Request<FlowRequest>,
) -> Result<Response<FlowResponse>, Status> {
let _timer = METRIC_FLOW_PROCESSING_TIME
.with_label_values(&["ddl"])
.start_timer();

let request = request.into_inner();
self.manager
.handle(request)
Expand All @@ -92,18 +97,31 @@ impl flow_server::Flow for FlowService {
&self,
request: Request<InsertRequests>,
) -> Result<Response<FlowResponse>, Status> {
let _timer = METRIC_FLOW_PROCESSING_TIME
.with_label_values(&["insert"])
.start_timer();

let request = request.into_inner();
// TODO(discord9): fix protobuf import order shenanigans to remove this duplicated define
let mut row_count = 0;
let request = api::v1::region::InsertRequests {
requests: request
.requests
.into_iter()
.map(|insert| api::v1::region::InsertRequest {
region_id: insert.region_id,
rows: insert.rows,
.map(|insert| {
insert.rows.as_ref().inspect(|x| row_count += x.rows.len());
api::v1::region::InsertRequest {
region_id: insert.region_id,
rows: insert.rows,
}
})
.collect_vec(),
};

METRIC_FLOW_ROWS
.with_label_values(&["in"])
.inc_by(row_count as u64);

self.manager
.handle_inserts(request)
.await
Expand Down Expand Up @@ -500,6 +518,10 @@ impl FrontendInvoker {
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> common_frontend::error::Result<Output> {
let _timer = METRIC_FLOW_PROCESSING_TIME
.with_label_values(&["output_insert"])
.start_timer();

self.inserter
.handle_row_inserts(requests, ctx, &self.statement_executor)
.await
Expand All @@ -512,6 +534,10 @@ impl FrontendInvoker {
requests: RowDeleteRequests,
ctx: QueryContextRef,
) -> common_frontend::error::Result<Output> {
let _timer = METRIC_FLOW_PROCESSING_TIME
.with_label_values(&["output_delete"])
.start_timer();

self.deleter
.handle_row_deletes(requests, ctx)
.await
Expand Down

0 comments on commit c22ca3e

Please sign in to comment.