Skip to content

Commit

Permalink
feat(metrics): add recent filter metrics (#17559)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored Jul 4, 2024
1 parent c22c426 commit 1faa045
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 62 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -2881,6 +2881,26 @@ def section_hummock_tiered_cache(outer_panels):
),
],
),
panels.timeseries_count(
"Recent Filter Size",
"Item numbers of the recent filter.",
[
panels.target(
f"sum({metric('recent_filter_items')}) by ({NODE_LABEL})",
"items @ {{%s}}" % NODE_LABEL,
),
],
),
panels.timeseries_ops(
"Recent Filter Ops",
"",
[
panels.target(
f"sum(rate({metric('recent_filter_ops')}[$__rate_interval])) by (op, {NODE_LABEL})",
"recent filter {{op}} @ {{%s}}" % NODE_LABEL,
),
],
),
],
)
]
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/common/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ use tracing_subscriber::Layer;

mod error_metrics;
mod guarded_metrics;
mod metrics;
pub mod monitor;
mod relabeled_metric;

pub use error_metrics::*;
pub use guarded_metrics::*;
pub use metrics::*;
pub use relabeled_metric::*;

#[derive(Debug)]
Expand Down
43 changes: 43 additions & 0 deletions src/common/metrics/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use prometheus::core::{AtomicU64, GenericGaugeVec};

pub type UintGaugeVec = GenericGaugeVec<AtomicU64>;

#[macro_export]
macro_rules! register_gauge_vec {
($TYPE:ident, $OPTS:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
let gauge_vec = $TYPE::new($OPTS, $LABELS_NAMES).unwrap();
$REGISTRY
.register(Box::new(gauge_vec.clone()))
.map(|_| gauge_vec)
}};
}

#[macro_export]
macro_rules! register_uint_gauge_vec_with_registry {
($OPTS:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
use $crate::UintGaugeVec;
$crate::register_gauge_vec!(UintGaugeVec, $OPTS, $LABELS_NAMES, $REGISTRY)
}};

($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
register_uint_gauge_vec_with_registry!(
prometheus::opts!($NAME, $HELP),
$LABELS_NAMES,
$REGISTRY
)
}};
}
24 changes: 2 additions & 22 deletions src/connector/src/source/kafka/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,10 @@
// limitations under the License.

use prometheus::core::{AtomicU64, GenericGaugeVec};
use prometheus::{opts, register_int_gauge_vec_with_registry, IntGaugeVec, Registry};
use prometheus::{register_int_gauge_vec_with_registry, IntGaugeVec, Registry};
use rdkafka::statistics::{Broker, ConsumerGroup, Partition, Topic, Window};
use rdkafka::Statistics;

type UintGaugeVec = GenericGaugeVec<AtomicU64>;

macro_rules! register_gauge_vec {
($TYPE:ident, $OPTS:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
let gauge_vec = $TYPE::new($OPTS, $LABELS_NAMES).unwrap();
$REGISTRY
.register(Box::new(gauge_vec.clone()))
.map(|_| gauge_vec)
}};
}

macro_rules! register_uint_gauge_vec_with_registry {
($OPTS:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
register_gauge_vec!(UintGaugeVec, $OPTS, $LABELS_NAMES, $REGISTRY)
}};

($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
register_uint_gauge_vec_with_registry!(opts!($NAME, $HELP), $LABELS_NAMES, $REGISTRY)
}};
}
use risingwave_common::metrics::register_uint_gauge_vec_with_registry;

#[derive(Debug, Clone)]
pub struct RdKafkaStats {
Expand Down
151 changes: 115 additions & 36 deletions src/storage/src/hummock/recent_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,44 +12,100 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Borrow;
use std::collections::{HashSet, VecDeque};
use std::fmt::Debug;
use std::hash::Hash;
use std::sync::LazyLock;
use std::time::{Duration, Instant};

use parking_lot::RwLock;
use prometheus::core::{AtomicU64, GenericCounter, GenericCounterVec};
use prometheus::{
register_int_counter_vec_with_registry, register_int_gauge_with_registry, IntGauge, Registry,
};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;

pub trait RecentFilterKey = Eq + Hash + Send + Sync + Debug + Clone + 'static;
pub static GLOBAL_RECENT_FILTER_METRICS: LazyLock<RecentFilterMetrics> =
LazyLock::new(|| RecentFilterMetrics::new(&GLOBAL_METRICS_REGISTRY));

pub struct RecentFilterMetrics {
pub recent_filter_items: IntGauge,
pub recent_filter_ops: GenericCounterVec<AtomicU64>,
pub recent_filter_inserts: GenericCounter<AtomicU64>,
pub recent_filter_queries: GenericCounter<AtomicU64>,
}

impl RecentFilterMetrics {
fn new(registry: &Registry) -> Self {
let recent_filter_items = register_int_gauge_with_registry!(
"recent_filter_items",
"Item numbers of the recent filter.",
registry
)
.unwrap();

let recent_filter_ops = register_int_counter_vec_with_registry!(
"recent_filter_ops",
"Ops of the recent filter.",
&["op"],
registry
)
.unwrap();

let recent_filter_inserts = recent_filter_ops.with_label_values(&["insert"]);
let recent_filter_queries = recent_filter_ops.with_label_values(&["query"]);

Self {
recent_filter_items,
recent_filter_ops,
recent_filter_inserts,
recent_filter_queries,
}
}
}

#[derive(Debug)]
pub struct RecentFilter<K>
where
K: RecentFilterKey,
{
pub struct RecentFilter<K> {
refresh_interval: Duration,
inner: RwLock<CacheRefillFilterInner<K>>,
inner: RwLock<RecentFilterInner<K>>,
}

#[derive(Debug)]
struct CacheRefillFilterInner<K>
where
K: RecentFilterKey,
{
struct RecentFilterInner<K> {
last_refresh: Instant,
layers: VecDeque<RwLock<HashSet<K>>>,
}

impl<K> RecentFilter<K>
where
K: RecentFilterKey,
{
impl<K> RecentFilterInner<K> {
fn try_rotate(&mut self, refresh_interval: &Duration) {
if &self.last_refresh.elapsed() > refresh_interval {
if let Some(removed) = self.layers.pop_front() {
GLOBAL_RECENT_FILTER_METRICS
.recent_filter_items
.sub(removed.read().len() as i64);
}

if let Some(latest) = self.layers.back() {
GLOBAL_RECENT_FILTER_METRICS
.recent_filter_items
.add(latest.read().len() as i64);
}

self.layers.push_back(RwLock::new(HashSet::new()));
self.last_refresh = Instant::now();
}
}
}

impl<K> RecentFilter<K> {
pub fn new(layers: usize, refresh_interval: Duration) -> Self {
assert!(layers > 0);
let layers = (0..layers)
.map(|_| HashSet::new())
.map(RwLock::new)
.collect();
let inner = CacheRefillFilterInner {
let inner = RecentFilterInner {
last_refresh: Instant::now(),
layers,
};
Expand All @@ -60,36 +116,53 @@ where
}
}

pub fn insert(&self, key: K) {
pub fn insert(&self, key: K)
where
K: Hash + Eq,
{
if let Some(mut inner) = self.inner.try_write() {
if inner.last_refresh.elapsed() > self.refresh_interval {
inner.layers.pop_front();
inner.layers.push_back(RwLock::new(HashSet::new()));
inner.last_refresh = Instant::now();
}
inner.try_rotate(&self.refresh_interval);
}

let inner = self.inner.read();
inner.layers.back().unwrap().write().insert(key);
{
let inner = self.inner.read();
inner.layers.back().unwrap().write().insert(key);
}

GLOBAL_RECENT_FILTER_METRICS.recent_filter_inserts.inc();
}

pub fn extend(&self, keys: impl IntoIterator<Item = K>) {
pub fn extend(&self, keys: impl IntoIterator<Item = K>)
where
K: Hash + Eq,
{
if let Some(mut inner) = self.inner.try_write() {
if inner.last_refresh.elapsed() > self.refresh_interval {
inner.layers.pop_front();
inner.layers.push_back(RwLock::new(HashSet::new()));
inner.last_refresh = Instant::now();
}
inner.try_rotate(&self.refresh_interval);
}

let inner = self.inner.read();
let mut guard = inner.layers.back().unwrap().write();
for key in keys {
guard.insert(key);
}
let len = {
let mut len = 0;
let inner = self.inner.read();
let mut guard = inner.layers.back().unwrap().write();
for key in keys {
guard.insert(key);
len += 1;
}
len
};

GLOBAL_RECENT_FILTER_METRICS
.recent_filter_inserts
.inc_by(len);
}

pub fn contains(&self, key: &K) -> bool {
pub fn contains<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q> + Hash + Eq,
Q: Hash + Eq,
{
GLOBAL_RECENT_FILTER_METRICS.recent_filter_queries.inc();

let inner = self.inner.read();
for layer in inner.layers.iter().rev() {
if layer.read().contains(key) {
Expand All @@ -99,7 +172,13 @@ where
false
}

pub fn contains_one<'a>(&self, keys: impl Iterator<Item = &'a K> + Clone) -> bool {
pub fn contains_one<'a, Q>(&self, keys: impl Iterator<Item = &'a Q> + Clone) -> bool
where
K: Borrow<Q> + Hash + Eq,
Q: Hash + Eq + 'a,
{
GLOBAL_RECENT_FILTER_METRICS.recent_filter_queries.inc();

let inner = self.inner.read();
for layer in inner.layers.iter().rev() {
for key in keys.clone() {
Expand Down

0 comments on commit 1faa045

Please sign in to comment.