Skip to content

Commit

Permalink
feat: heartbeat_flush_threshold option (#4924)
Browse files Browse the repository at this point in the history
* feat: heartbeat_flush_threshold

* chore: rename to flush_stats_factor

* Update src/meta-srv/src/handler/collect_stats_handler.rs
  • Loading branch information
fengjiachun authored Nov 4, 2024
1 parent 7a4276c commit 9405d1c
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 12 deletions.
14 changes: 13 additions & 1 deletion src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,11 @@ pub struct HeartbeatHandlerGroupBuilder {
/// The handler to handle region lease.
region_lease_handler: Option<RegionLeaseHandler>,

/// The factor that determines how often statistics should be flushed,
/// based on the number of received heartbeats. When the number of heartbeats
/// reaches this factor, a flush operation is triggered.
flush_stats_factor: Option<usize>,

/// The plugins.
plugins: Option<Plugins>,

Expand All @@ -493,6 +498,7 @@ impl HeartbeatHandlerGroupBuilder {
Self {
region_failure_handler: None,
region_lease_handler: None,
flush_stats_factor: None,
plugins: None,
pushers,
handlers: vec![],
Expand All @@ -510,6 +516,12 @@ impl HeartbeatHandlerGroupBuilder {
self
}

/// Sets the flush stats factor.
pub fn with_flush_stats_factor(mut self, flush_stats_factor: Option<usize>) -> Self {
self.flush_stats_factor = flush_stats_factor;
self
}

/// Sets the [`Plugins`].
pub fn with_plugins(mut self, plugins: Option<Plugins>) -> Self {
self.plugins = plugins;
Expand Down Expand Up @@ -550,7 +562,7 @@ impl HeartbeatHandlerGroupBuilder {
if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
self.add_handler_last(publish_heartbeat_handler);
}
self.add_handler_last(CollectStatsHandler::default());
self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor));

self
}
Expand Down
26 changes: 20 additions & 6 deletions src/meta-srv/src/handler/collect_stats_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ use crate::error::{self, Result};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;

const MAX_CACHED_STATS_PER_KEY: usize = 10;

#[derive(Debug, Default)]
struct EpochStats {
stats: Vec<Stat>,
Expand Down Expand Up @@ -69,9 +67,26 @@ impl EpochStats {
}
}

#[derive(Default)]
const DEFAULT_FLUSH_STATS_FACTOR: usize = 3;

pub struct CollectStatsHandler {
stats_cache: DashMap<DatanodeStatKey, EpochStats>,
flush_stats_factor: usize,
}

impl Default for CollectStatsHandler {
fn default() -> Self {
Self::new(None)
}
}

impl CollectStatsHandler {
pub fn new(flush_stats_factor: Option<usize>) -> Self {
Self {
flush_stats_factor: flush_stats_factor.unwrap_or(DEFAULT_FLUSH_STATS_FACTOR),
stats_cache: DashMap::default(),
}
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -130,7 +145,7 @@ impl HeartbeatHandler for CollectStatsHandler {
rewrite_node_address(ctx, last).await;
}

if !refresh && epoch_stats.len() < MAX_CACHED_STATS_PER_KEY {
if !refresh && epoch_stats.len() < self.flush_stats_factor {
return Ok(HandleControl::Continue);
}

Expand Down Expand Up @@ -261,8 +276,7 @@ mod tests {
let res = ctx.in_memory.get(&key).await.unwrap();
let kv = res.unwrap();
let val: DatanodeStatValue = kv.value.try_into().unwrap();
// refresh every 10 stats
assert_eq!(10, val.stats.len());
assert_eq!(handler.flush_stats_factor, val.stats.len());
}

async fn handle_request_many_times(
Expand Down
5 changes: 5 additions & 0 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ pub struct MetasrvOptions {
/// limit the number of operations in a txn because an infinitely large txn could
/// potentially block other operations.
pub max_txn_ops: usize,
/// The factor that determines how often statistics should be flushed,
/// based on the number of received heartbeats. When the number of heartbeats
/// reaches this factor, a flush operation is triggered.
pub flush_stats_factor: usize,
/// The tracing options.
pub tracing: TracingOptions,
/// The datastore for kv metadata.
Expand Down Expand Up @@ -165,6 +169,7 @@ impl Default for MetasrvOptions {
export_metrics: ExportMetricsOption::default(),
store_key_prefix: String::new(),
max_txn_ops: 128,
flush_stats_factor: 3,
tracing: TracingOptions::default(),
backend: BackendImpl::EtcdStore,
}
Expand Down
3 changes: 2 additions & 1 deletion src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ impl MetasrvBuilder {
))
});
let flow_metadata_allocator = {
// for now flownode just use round robin selector
// for now flownode just use round-robin selector
let flow_selector = RoundRobinSelector::new(SelectTarget::Flownode);
let flow_selector_ctx = selector_ctx.clone();
let peer_allocator = Arc::new(FlowPeerAllocator::new(
Expand Down Expand Up @@ -347,6 +347,7 @@ impl MetasrvBuilder {
.with_plugins(plugins.clone())
.with_region_failure_handler(region_failover_handler)
.with_region_lease_handler(Some(region_lease_handler))
.with_flush_stats_factor(Some(options.flush_stats_factor))
.add_default_handlers()
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ INSERT INTO test VALUES

Affected Rows: 3

-- SQLNESS SLEEP 11s
-- FIXME(dennis): we need to wait the datanode reporting stats info to metasrv.
-- SQLNESS SLEEP 3s
SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size)
FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id
IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ INSERT INTO test VALUES
(11, 'b', 11),
(21, 'c', 21);

-- SQLNESS SLEEP 11s
-- FIXME(dennis): we need to wait the datanode reporting stats info to metasrv.
-- SQLNESS SLEEP 3s
SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size)
FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id
IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public');
Expand Down
1 change: 1 addition & 0 deletions tests/conf/metasrv-test.toml.template
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
flush_stats_factor = 1
[wal]
{{ if is_raft_engine }}
provider = "raft_engine"
Expand Down

0 comments on commit 9405d1c

Please sign in to comment.