From cbb06cd0c635a63aed4265d0cfc100bac7f48eef Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Wed, 14 Aug 2024 11:23:49 +0800 Subject: [PATCH] feat(flow): add some metrics (#4539) * feat: add some metrics * fix: tmp rate limiter * feat: add task count metrics * refactor: use bounded channel anyway * refactor: better metrics --- Cargo.lock | 2 ++ src/flow/Cargo.toml | 2 ++ src/flow/src/adapter.rs | 42 ++++++++++++++++++++------- src/flow/src/adapter/flownode_impl.rs | 3 ++ src/flow/src/adapter/node_context.rs | 28 +++++++++++------- src/flow/src/expr/func.rs | 6 ++-- src/flow/src/lib.rs | 1 + src/flow/src/metrics.rs | 33 +++++++++++++++++++++ src/flow/src/repr.rs | 3 ++ src/flow/src/server.rs | 16 +++++----- 10 files changed, 104 insertions(+), 32 deletions(-) create mode 100644 src/flow/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 17adde5e6ab0..34da29c6727b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3832,6 +3832,7 @@ dependencies = [ "greptime-proto", "hydroflow", "itertools 0.10.5", + "lazy_static", "meta-client", "minstant", "nom", @@ -3839,6 +3840,7 @@ dependencies = [ "operator", "partition", "pretty_assertions", + "prometheus", "prost 0.12.6", "query", "serde", diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 4dfc70e6c4c9..c4db341dbe7d 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -44,12 +44,14 @@ greptime-proto.workspace = true # otherwise it is the same with upstream repo hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" } itertools.workspace = true +lazy_static.workspace = true meta-client.workspace = true minstant = "0.1.7" nom = "7.1.3" num-traits = "0.2" operator.workspace = true partition.workspace = true +prometheus.workspace = true prost.workspace = true query.workspace = true serde.workspace = true diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 60336f548f24..b45fc1e88916 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -51,6 +51,9 @@ use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; use crate::compute::ErrCollector; use crate::error::{ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu}; use crate::expr::GlobalId; +use crate::metrics::{ + METRIC_FLOW_INPUT_BUF_SIZE, METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_RUN_INTERVAL_MS, +}; use crate::repr::{self, DiffRow, Row, BATCH_SIZE}; use crate::transform::sql_to_flow_plan; @@ -193,6 +196,15 @@ pub enum DiffRequest { Delete(Vec<(Row, repr::Timestamp)>), } +impl DiffRequest { + pub fn len(&self) -> usize { + match self { + Self::Insert(v) => v.len(), + Self::Delete(v) => v.len(), + } + } +} + /// iterate through the diff row and form continuous diff row with same diff type pub fn diff_row_to_request(rows: Vec) -> Vec { let mut reqs = Vec::new(); @@ -544,6 +556,7 @@ impl FlowWorkerManager { let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); //in ms let new_wait = Duration::from_millis(new_wait as u64).min(default_interval); trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt); + METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64); since_last_run = tokio::time::Instant::now(); tokio::time::sleep(new_wait).await; } @@ -575,7 +588,7 @@ impl FlowWorkerManager { } } // check row send and rows remain in send buf - let (flush_res, buf_len) = if blocking { + let (flush_res, _buf_len) = if blocking { let ctx = self.node_context.read().await; (ctx.flush_all_sender().await, ctx.get_send_buf_size().await) } else { @@ -585,16 +598,19 @@ impl FlowWorkerManager { } }; match flush_res { - Ok(r) => row_cnt += r, + Ok(r) => { + common_telemetry::trace!("Flushed {} rows", r); + row_cnt += r; + // send buf is likely to be somewhere empty now, wait + if r < BATCH_SIZE / 2 { + break; + } + } Err(err) => { common_telemetry::error!("Flush send buf errors: {:?}", err); break; } }; - // if not enough rows, break - if buf_len < BATCH_SIZE { - break; - } } Ok(row_cnt) @@ -606,13 +622,17 @@ impl FlowWorkerManager { region_id: RegionId, rows: Vec, ) -> Result<(), Error> { - debug!( - "Handling write request for region_id={:?} with {} rows", - region_id, - rows.len() - ); + let rows_len = rows.len(); let table_id = region_id.table_id(); + METRIC_FLOW_INPUT_BUF_SIZE.add(rows_len as _); + let _timer = METRIC_FLOW_INSERT_ELAPSED + .with_label_values(&[table_id.to_string().as_str()]) + .start_timer(); self.node_context.read().await.send(table_id, rows).await?; + debug!( + "Handling write request for table_id={} with {} rows", + table_id, rows_len + ); Ok(()) } } diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 04a748de40a8..c2da7af95c7d 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -30,6 +30,7 @@ use store_api::storage::RegionId; use crate::adapter::FlowWorkerManager; use crate::error::InternalSnafu; +use crate::metrics::METRIC_FLOW_TASK_COUNT; use crate::repr::{self, DiffRow}; fn to_meta_err(err: crate::error::Error) -> common_meta::error::Error { @@ -78,6 +79,7 @@ impl Flownode for FlowWorkerManager { ) .await .map_err(to_meta_err)?; + METRIC_FLOW_TASK_COUNT.inc(); Ok(FlowResponse { affected_flows: ret .map(|id| greptime_proto::v1::FlowId { id: id as u32 }) @@ -92,6 +94,7 @@ impl Flownode for FlowWorkerManager { self.remove_flow(flow_id.id as u64) .await .map_err(to_meta_err)?; + METRIC_FLOW_TASK_COUNT.dec(); Ok(Default::default()) } Some(flow_request::Body::Flush(FlushFlow { diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 812faa41d92c..99bd9f97e96e 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -15,6 +15,7 @@ //! Node context, prone to change with every incoming requests use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::sync::atomic::AtomicUsize; use std::sync::Arc; use common_telemetry::debug; @@ -27,7 +28,8 @@ use crate::adapter::{FlowId, TableName, TableSource}; use crate::error::{Error, EvalSnafu, TableNotFoundSnafu}; use crate::expr::error::InternalSnafu; use crate::expr::GlobalId; -use crate::repr::{DiffRow, RelationDesc, BROADCAST_CAP}; +use crate::metrics::METRIC_FLOW_INPUT_BUF_SIZE; +use crate::repr::{DiffRow, RelationDesc, BROADCAST_CAP, SEND_BUF_CAP}; /// A context that holds the information of the dataflow #[derive(Default, Debug)] @@ -67,18 +69,20 @@ pub struct FlownodeContext { pub struct SourceSender { // TODO(discord9): make it all Vec? sender: broadcast::Sender, - send_buf_tx: mpsc::UnboundedSender>, - send_buf_rx: RwLock>>, + send_buf_tx: mpsc::Sender>, + send_buf_rx: RwLock>>, + send_buf_row_cnt: AtomicUsize, } impl Default for SourceSender { fn default() -> Self { - let (send_buf_tx, send_buf_rx) = mpsc::unbounded_channel(); + let (send_buf_tx, send_buf_rx) = mpsc::channel(SEND_BUF_CAP); Self { // TODO(discord9): found a better way then increase this to prevent lagging and hence missing input data sender: broadcast::Sender::new(BROADCAST_CAP * 2), send_buf_tx, send_buf_rx: RwLock::new(send_buf_rx), + send_buf_row_cnt: AtomicUsize::new(0), } } } @@ -94,15 +98,18 @@ impl SourceSender { /// until send buf is empty or broadchannel is full pub async fn try_flush(&self) -> Result { let mut row_cnt = 0; - let mut iterations = 0; - while iterations < Self::MAX_ITERATIONS { + loop { let mut send_buf = self.send_buf_rx.write().await; // if inner sender channel is empty or send buf is empty, there // is nothing to do for now, just break if self.sender.len() >= BROADCAST_CAP || send_buf.is_empty() { break; } + // TODO(discord9): send rows instead so it's just moving a point if let Some(rows) = send_buf.recv().await { + let len = rows.len(); + self.send_buf_row_cnt + .fetch_sub(len, std::sync::atomic::Ordering::SeqCst); for row in rows { self.sender .send(row) @@ -116,10 +123,10 @@ impl SourceSender { row_cnt += 1; } } - iterations += 1; } if row_cnt > 0 { debug!("Send {} rows", row_cnt); + METRIC_FLOW_INPUT_BUF_SIZE.sub(row_cnt as _); debug!( "Remaining Send buf.len() = {}", self.send_buf_rx.read().await.len() @@ -131,13 +138,12 @@ impl SourceSender { /// return number of rows it actual send(including what's in the buffer) pub async fn send_rows(&self, rows: Vec) -> Result { - self.send_buf_tx.send(rows).map_err(|e| { + self.send_buf_tx.send(rows).await.map_err(|e| { crate::error::InternalSnafu { reason: format!("Failed to send row, error = {:?}", e), } .build() })?; - Ok(0) } } @@ -153,7 +159,8 @@ impl FlownodeContext { .with_context(|| TableNotFoundSnafu { name: table_id.to_string(), })?; - // debug!("FlownodeContext::send: trying to send {} rows", rows.len()); + + debug!("FlownodeContext::send: trying to send {} rows", rows.len()); sender.send_rows(rows).await } @@ -169,6 +176,7 @@ impl FlownodeContext { } /// Return the sum number of rows in all send buf + /// TODO(discord9): remove this since we can't get correct row cnt anyway pub async fn get_send_buf_size(&self) -> usize { let mut sum = 0; for sender in self.source_sender.values() { diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index 39b469207169..ba8cdba71c70 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -278,14 +278,12 @@ impl UnaryFunc { } Self::Cast(to) => { let arg_ty = arg.data_type(); - let res = cast(arg, to).context({ + cast(arg, to).context({ CastValueSnafu { from: arg_ty, to: to.clone(), } - }); - debug!("Cast to type: {to:?}, result: {:?}", res); - res + }) } Self::TumbleWindowFloor { window_size, diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index d01e5ea28346..99d6773789a2 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -28,6 +28,7 @@ mod compute; pub mod error; mod expr; pub mod heartbeat; +mod metrics; mod plan; mod repr; mod server; diff --git a/src/flow/src/metrics.rs b/src/flow/src/metrics.rs new file mode 100644 index 000000000000..119b4c5856de --- /dev/null +++ b/src/flow/src/metrics.rs @@ -0,0 +1,33 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Some of the metrics used in the flow module. + +use lazy_static::lazy_static; +use prometheus::*; + +lazy_static! { + pub static ref METRIC_FLOW_TASK_COUNT: IntGauge = + register_int_gauge!("greptime_flow_task_count", "flow task count").unwrap(); + pub static ref METRIC_FLOW_INPUT_BUF_SIZE: IntGauge = + register_int_gauge!("greptime_flow_input_buf_size", "flow input buf size").unwrap(); + pub static ref METRIC_FLOW_INSERT_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_flow_insert_elapsed", + "flow insert elapsed", + &["table_id"] + ) + .unwrap(); + pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge = + register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap(); +} diff --git a/src/flow/src/repr.rs b/src/flow/src/repr.rs index 06571146f606..188629e58db0 100644 --- a/src/flow/src/repr.rs +++ b/src/flow/src/repr.rs @@ -56,6 +56,9 @@ pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff); /// TODO(discord9): add config for this, so cpu&mem usage can be balanced and configured by this pub const BROADCAST_CAP: usize = 65535; +/// The maximum capacity of the send buffer, to prevent the buffer from growing too large +pub const SEND_BUF_CAP: usize = BROADCAST_CAP * 2; + pub const BATCH_SIZE: usize = BROADCAST_CAP / 2; /// Convert a value that is or can be converted to Datetime to internal timestamp diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index d470eb0ad8e7..a32ca197d289 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -398,13 +398,15 @@ impl FlownodeBuilder { let (tx, rx) = oneshot::channel(); let node_id = self.opts.node_id.map(|id| id as u32); - let _handle = std::thread::spawn(move || { - let (flow_node_manager, mut worker) = - FlowWorkerManager::new_with_worker(node_id, query_engine, table_meta); - let _ = tx.send(flow_node_manager); - info!("Flow Worker started in new thread"); - worker.run(); - }); + let _handle = std::thread::Builder::new() + .name("flow-worker".to_string()) + .spawn(move || { + let (flow_node_manager, mut worker) = + FlowWorkerManager::new_with_worker(node_id, query_engine, table_meta); + let _ = tx.send(flow_node_manager); + info!("Flow Worker started in new thread"); + worker.run(); + }); let man = rx.await.map_err(|_e| { UnexpectedSnafu { reason: "sender is dropped, failed to create flow node manager",