Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add some critical metrics to flownode #5235

Merged
merged 2 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use crate::error::{
UnexpectedSnafu,
};
use crate::expr::{Batch, GlobalId};
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_RUN_INTERVAL_MS};
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
use crate::repr::{self, DiffRow, Row, BATCH_SIZE};

mod flownode_impl;
Expand Down Expand Up @@ -249,12 +249,18 @@ impl FlowWorkerManager {
self.try_fetch_or_create_table(&table_name).await?;
let schema_len = proto_schema.len();

let total_rows = reqs.iter().map(|r| r.len()).sum::<usize>();
trace!(
"Sending {} writeback requests to table {}, reqs total rows={}",
reqs.len(),
table_name.join("."),
reqs.iter().map(|r| r.len()).sum::<usize>()
);

METRIC_FLOW_ROWS
.with_label_values(&["out"])
.inc_by(total_rows as u64);

let now = self.tick_manager.tick();
for req in reqs {
match req {
Expand Down
2 changes: 1 addition & 1 deletion src/flow/src/adapter/flownode_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl Flownode for FlowWorkerManager {
}

async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse> {
// using try_read makesure two things:
// using try_read to ensure two things:
// 1. flush wouldn't happen until inserts before it is inserted
// 2. inserts happening concurrently with flush wouldn't be block by flush
let _flush_lock = self.flush_lock.try_read();
Expand Down
5 changes: 5 additions & 0 deletions src/flow/src/compute/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::{BTreeMap, VecDeque};
use std::rc::Rc;
use std::sync::Arc;

use common_error::ext::ErrorExt;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::handoff::TeeingHandoff;
use hydroflow::scheduled::port::RecvPort;
Expand All @@ -25,6 +26,7 @@ use itertools::Itertools;
use tokio::sync::Mutex;

use crate::expr::{Batch, EvalError, ScalarExpr};
use crate::metrics::METRIC_FLOW_ERRORS;
use crate::repr::DiffRow;
use crate::utils::ArrangeHandler;

Expand Down Expand Up @@ -185,6 +187,9 @@ impl ErrCollector {
}

pub fn push_err(&self, err: EvalError) {
METRIC_FLOW_ERRORS
.with_label_values(&[err.status_code().as_ref()])
.inc();
self.inner.blocking_lock().push_back(err)
}

Expand Down
31 changes: 30 additions & 1 deletion src/flow/src/expr/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@

//! Error handling for expression evaluation.

use std::any::Any;

use arrow_schema::ArrowError;
use common_error::ext::BoxedError;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datafusion_common::DataFusionError;
use datatypes::data_type::ConcreteDataType;
Expand Down Expand Up @@ -126,3 +129,29 @@ pub enum EvalError {
source: BoxedError,
},
}

impl ErrorExt for EvalError {
fn status_code(&self) -> StatusCode {
use EvalError::*;
match self {
DivisionByZero { .. }
| TypeMismatch { .. }
| TryFromValue { .. }
| DataAlreadyExpired { .. }
| InvalidArgument { .. }
| Overflow { .. } => StatusCode::InvalidArguments,

CastValue { source, .. } | DataType { source, .. } => source.status_code(),

Internal { .. }
| Optimize { .. }
| Arrow { .. }
| Datafusion { .. }
| External { .. } => StatusCode::Internal,
}
}

fn as_any(&self) -> &dyn Any {
self
}
}
18 changes: 18 additions & 0 deletions src/flow/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,22 @@ lazy_static! {
.unwrap();
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
"greptime_flow_processed_rows",
"Count of rows flowing through the system",
&["direction"]
)
.unwrap();
pub static ref METRIC_FLOW_PROCESSING_TIME: HistogramVec = register_histogram_vec!(
"greptime_flow_processing_time",
"Time spent processing requests",
&["type"]
)
.unwrap();
pub static ref METRIC_FLOW_ERRORS: IntCounterVec = register_int_counter_vec!(
"greptime_flow_errors",
"Count of errors in flow processing",
&["code"]
)
.unwrap();
}
32 changes: 29 additions & 3 deletions src/flow/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use crate::error::{
ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
};
use crate::heartbeat::HeartbeatTask;
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
use crate::transform::register_function_to_query_engine;
use crate::utils::{SizeReportSender, StateReportHandler};
use crate::{Error, FlowWorkerManager, FlownodeOptions};
Expand All @@ -77,6 +78,10 @@ impl flow_server::Flow for FlowService {
&self,
request: Request<FlowRequest>,
) -> Result<Response<FlowResponse>, Status> {
let _timer = METRIC_FLOW_PROCESSING_TIME
.with_label_values(&["ddl"])
.start_timer();

let request = request.into_inner();
self.manager
.handle(request)
Expand All @@ -92,18 +97,31 @@ impl flow_server::Flow for FlowService {
&self,
request: Request<InsertRequests>,
) -> Result<Response<FlowResponse>, Status> {
let _timer = METRIC_FLOW_PROCESSING_TIME
.with_label_values(&["insert"])
.start_timer();

let request = request.into_inner();
// TODO(discord9): fix protobuf import order shenanigans to remove this duplicated define
let mut row_count = 0;
let request = api::v1::region::InsertRequests {
requests: request
.requests
.into_iter()
.map(|insert| api::v1::region::InsertRequest {
region_id: insert.region_id,
rows: insert.rows,
.map(|insert| {
insert.rows.as_ref().inspect(|x| row_count += x.rows.len());
api::v1::region::InsertRequest {
region_id: insert.region_id,
rows: insert.rows,
}
})
.collect_vec(),
};

METRIC_FLOW_ROWS
.with_label_values(&["in"])
.inc_by(row_count as u64);

self.manager
.handle_inserts(request)
.await
Expand Down Expand Up @@ -500,6 +518,10 @@ impl FrontendInvoker {
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> common_frontend::error::Result<Output> {
let _timer = METRIC_FLOW_PROCESSING_TIME
.with_label_values(&["output_insert"])
.start_timer();

self.inserter
.handle_row_inserts(requests, ctx, &self.statement_executor)
.await
Expand All @@ -512,6 +534,10 @@ impl FrontendInvoker {
requests: RowDeleteRequests,
ctx: QueryContextRef,
) -> common_frontend::error::Result<Output> {
let _timer = METRIC_FLOW_PROCESSING_TIME
.with_label_values(&["output_delete"])
.start_timer();

self.deleter
.handle_row_deletes(requests, ctx)
.await
Expand Down
Loading