Skip to content

Commit

Permalink
feat(flow): add some metrics (#4539)
Browse files Browse the repository at this point in the history
* feat: add some metrics

* fix: tmp rate limiter

* feat: add task count metrics

* refactor: use bounded channel anyway

* refactor: better metrics
  • Loading branch information
discord9 authored Aug 14, 2024
1 parent b59a93d commit cbb06cd
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 32 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/flow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ greptime-proto.workspace = true
# otherwise it is the same with upstream repo
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
itertools.workspace = true
lazy_static.workspace = true
meta-client.workspace = true
minstant = "0.1.7"
nom = "7.1.3"
num-traits = "0.2"
operator.workspace = true
partition.workspace = true
prometheus.workspace = true
prost.workspace = true
query.workspace = true
serde.workspace = true
Expand Down
42 changes: 31 additions & 11 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::error::{ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu};
use crate::expr::GlobalId;
use crate::metrics::{
METRIC_FLOW_INPUT_BUF_SIZE, METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_RUN_INTERVAL_MS,
};
use crate::repr::{self, DiffRow, Row, BATCH_SIZE};
use crate::transform::sql_to_flow_plan;

Expand Down Expand Up @@ -193,6 +196,15 @@ pub enum DiffRequest {
Delete(Vec<(Row, repr::Timestamp)>),
}

impl DiffRequest {
pub fn len(&self) -> usize {
match self {
Self::Insert(v) => v.len(),
Self::Delete(v) => v.len(),
}
}
}

/// iterate through the diff row and form continuous diff row with same diff type
pub fn diff_row_to_request(rows: Vec<DiffRow>) -> Vec<DiffRequest> {
let mut reqs = Vec::new();
Expand Down Expand Up @@ -544,6 +556,7 @@ impl FlowWorkerManager {
let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); //in ms
let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64);
since_last_run = tokio::time::Instant::now();
tokio::time::sleep(new_wait).await;
}
Expand Down Expand Up @@ -575,7 +588,7 @@ impl FlowWorkerManager {
}
}
// check row send and rows remain in send buf
let (flush_res, buf_len) = if blocking {
let (flush_res, _buf_len) = if blocking {
let ctx = self.node_context.read().await;
(ctx.flush_all_sender().await, ctx.get_send_buf_size().await)
} else {
Expand All @@ -585,16 +598,19 @@ impl FlowWorkerManager {
}
};
match flush_res {
Ok(r) => row_cnt += r,
Ok(r) => {
common_telemetry::trace!("Flushed {} rows", r);
row_cnt += r;
// send buf is likely to be somewhere empty now, wait
if r < BATCH_SIZE / 2 {
break;
}
}
Err(err) => {
common_telemetry::error!("Flush send buf errors: {:?}", err);
break;
}
};
// if not enough rows, break
if buf_len < BATCH_SIZE {
break;
}
}

Ok(row_cnt)
Expand All @@ -606,13 +622,17 @@ impl FlowWorkerManager {
region_id: RegionId,
rows: Vec<DiffRow>,
) -> Result<(), Error> {
debug!(
"Handling write request for region_id={:?} with {} rows",
region_id,
rows.len()
);
let rows_len = rows.len();
let table_id = region_id.table_id();
METRIC_FLOW_INPUT_BUF_SIZE.add(rows_len as _);
let _timer = METRIC_FLOW_INSERT_ELAPSED
.with_label_values(&[table_id.to_string().as_str()])
.start_timer();
self.node_context.read().await.send(table_id, rows).await?;
debug!(
"Handling write request for table_id={} with {} rows",
table_id, rows_len
);
Ok(())
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/flow/src/adapter/flownode_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use store_api::storage::RegionId;

use crate::adapter::FlowWorkerManager;
use crate::error::InternalSnafu;
use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::repr::{self, DiffRow};

fn to_meta_err(err: crate::error::Error) -> common_meta::error::Error {
Expand Down Expand Up @@ -78,6 +79,7 @@ impl Flownode for FlowWorkerManager {
)
.await
.map_err(to_meta_err)?;
METRIC_FLOW_TASK_COUNT.inc();
Ok(FlowResponse {
affected_flows: ret
.map(|id| greptime_proto::v1::FlowId { id: id as u32 })
Expand All @@ -92,6 +94,7 @@ impl Flownode for FlowWorkerManager {
self.remove_flow(flow_id.id as u64)
.await
.map_err(to_meta_err)?;
METRIC_FLOW_TASK_COUNT.dec();
Ok(Default::default())
}
Some(flow_request::Body::Flush(FlushFlow {
Expand Down
28 changes: 18 additions & 10 deletions src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Node context, prone to change with every incoming requests
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

use common_telemetry::debug;
Expand All @@ -27,7 +28,8 @@ use crate::adapter::{FlowId, TableName, TableSource};
use crate::error::{Error, EvalSnafu, TableNotFoundSnafu};
use crate::expr::error::InternalSnafu;
use crate::expr::GlobalId;
use crate::repr::{DiffRow, RelationDesc, BROADCAST_CAP};
use crate::metrics::METRIC_FLOW_INPUT_BUF_SIZE;
use crate::repr::{DiffRow, RelationDesc, BROADCAST_CAP, SEND_BUF_CAP};

/// A context that holds the information of the dataflow
#[derive(Default, Debug)]
Expand Down Expand Up @@ -67,18 +69,20 @@ pub struct FlownodeContext {
pub struct SourceSender {
// TODO(discord9): make it all Vec<DiffRow>?
sender: broadcast::Sender<DiffRow>,
send_buf_tx: mpsc::UnboundedSender<Vec<DiffRow>>,
send_buf_rx: RwLock<mpsc::UnboundedReceiver<Vec<DiffRow>>>,
send_buf_tx: mpsc::Sender<Vec<DiffRow>>,
send_buf_rx: RwLock<mpsc::Receiver<Vec<DiffRow>>>,
send_buf_row_cnt: AtomicUsize,
}

impl Default for SourceSender {
fn default() -> Self {
let (send_buf_tx, send_buf_rx) = mpsc::unbounded_channel();
let (send_buf_tx, send_buf_rx) = mpsc::channel(SEND_BUF_CAP);
Self {
// TODO(discord9): found a better way then increase this to prevent lagging and hence missing input data
sender: broadcast::Sender::new(BROADCAST_CAP * 2),
send_buf_tx,
send_buf_rx: RwLock::new(send_buf_rx),
send_buf_row_cnt: AtomicUsize::new(0),
}
}
}
Expand All @@ -94,15 +98,18 @@ impl SourceSender {
/// until send buf is empty or broadchannel is full
pub async fn try_flush(&self) -> Result<usize, Error> {
let mut row_cnt = 0;
let mut iterations = 0;
while iterations < Self::MAX_ITERATIONS {
loop {
let mut send_buf = self.send_buf_rx.write().await;
// if inner sender channel is empty or send buf is empty, there
// is nothing to do for now, just break
if self.sender.len() >= BROADCAST_CAP || send_buf.is_empty() {
break;
}
// TODO(discord9): send rows instead so it's just moving a point
if let Some(rows) = send_buf.recv().await {
let len = rows.len();
self.send_buf_row_cnt
.fetch_sub(len, std::sync::atomic::Ordering::SeqCst);
for row in rows {
self.sender
.send(row)
Expand All @@ -116,10 +123,10 @@ impl SourceSender {
row_cnt += 1;
}
}
iterations += 1;
}
if row_cnt > 0 {
debug!("Send {} rows", row_cnt);
METRIC_FLOW_INPUT_BUF_SIZE.sub(row_cnt as _);
debug!(
"Remaining Send buf.len() = {}",
self.send_buf_rx.read().await.len()
Expand All @@ -131,13 +138,12 @@ impl SourceSender {

/// return number of rows it actual send(including what's in the buffer)
pub async fn send_rows(&self, rows: Vec<DiffRow>) -> Result<usize, Error> {
self.send_buf_tx.send(rows).map_err(|e| {
self.send_buf_tx.send(rows).await.map_err(|e| {
crate::error::InternalSnafu {
reason: format!("Failed to send row, error = {:?}", e),
}
.build()
})?;

Ok(0)
}
}
Expand All @@ -153,7 +159,8 @@ impl FlownodeContext {
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})?;
// debug!("FlownodeContext::send: trying to send {} rows", rows.len());

debug!("FlownodeContext::send: trying to send {} rows", rows.len());
sender.send_rows(rows).await
}

Expand All @@ -169,6 +176,7 @@ impl FlownodeContext {
}

/// Return the sum number of rows in all send buf
/// TODO(discord9): remove this since we can't get correct row cnt anyway
pub async fn get_send_buf_size(&self) -> usize {
let mut sum = 0;
for sender in self.source_sender.values() {
Expand Down
6 changes: 2 additions & 4 deletions src/flow/src/expr/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,12 @@ impl UnaryFunc {
}
Self::Cast(to) => {
let arg_ty = arg.data_type();
let res = cast(arg, to).context({
cast(arg, to).context({
CastValueSnafu {
from: arg_ty,
to: to.clone(),
}
});
debug!("Cast to type: {to:?}, result: {:?}", res);
res
})
}
Self::TumbleWindowFloor {
window_size,
Expand Down
1 change: 1 addition & 0 deletions src/flow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod compute;
pub mod error;
mod expr;
pub mod heartbeat;
mod metrics;
mod plan;
mod repr;
mod server;
Expand Down
33 changes: 33 additions & 0 deletions src/flow/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Some of the metrics used in the flow module.
use lazy_static::lazy_static;
use prometheus::*;

lazy_static! {
pub static ref METRIC_FLOW_TASK_COUNT: IntGauge =
register_int_gauge!("greptime_flow_task_count", "flow task count").unwrap();
pub static ref METRIC_FLOW_INPUT_BUF_SIZE: IntGauge =
register_int_gauge!("greptime_flow_input_buf_size", "flow input buf size").unwrap();
pub static ref METRIC_FLOW_INSERT_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_flow_insert_elapsed",
"flow insert elapsed",
&["table_id"]
)
.unwrap();
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
}
3 changes: 3 additions & 0 deletions src/flow/src/repr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff);
/// TODO(discord9): add config for this, so cpu&mem usage can be balanced and configured by this
pub const BROADCAST_CAP: usize = 65535;

/// The maximum capacity of the send buffer, to prevent the buffer from growing too large
pub const SEND_BUF_CAP: usize = BROADCAST_CAP * 2;

pub const BATCH_SIZE: usize = BROADCAST_CAP / 2;

/// Convert a value that is or can be converted to Datetime to internal timestamp
Expand Down
16 changes: 9 additions & 7 deletions src/flow/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,13 +398,15 @@ impl FlownodeBuilder {
let (tx, rx) = oneshot::channel();

let node_id = self.opts.node_id.map(|id| id as u32);
let _handle = std::thread::spawn(move || {
let (flow_node_manager, mut worker) =
FlowWorkerManager::new_with_worker(node_id, query_engine, table_meta);
let _ = tx.send(flow_node_manager);
info!("Flow Worker started in new thread");
worker.run();
});
let _handle = std::thread::Builder::new()
.name("flow-worker".to_string())
.spawn(move || {
let (flow_node_manager, mut worker) =
FlowWorkerManager::new_with_worker(node_id, query_engine, table_meta);
let _ = tx.send(flow_node_manager);
info!("Flow Worker started in new thread");
worker.run();
});
let man = rx.await.map_err(|_e| {
UnexpectedSnafu {
reason: "sender is dropped, failed to create flow node manager",
Expand Down

0 comments on commit cbb06cd

Please sign in to comment.