From 9405d1c578909feca73cd552c2c37a38255e44f2 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Mon, 4 Nov 2024 11:34:50 +0800 Subject: [PATCH] feat: heartbeat_flush_threshold option (#4924) * feat: heartbeat_flush_threshold * chore: rename to flush_stats_factor * Update src/meta-srv/src/handler/collect_stats_handler.rs --- src/meta-srv/src/handler.rs | 14 +++++++++- .../src/handler/collect_stats_handler.rs | 26 ++++++++++++++----- src/meta-srv/src/metasrv.rs | 5 ++++ src/meta-srv/src/metasrv/builder.rs | 3 ++- .../region_statistics.result | 3 +-- .../information_schema/region_statistics.sql | 3 +-- tests/conf/metasrv-test.toml.template | 1 + 7 files changed, 43 insertions(+), 12 deletions(-) diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 5d382f350602..3b4eb9a27935 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -478,6 +478,11 @@ pub struct HeartbeatHandlerGroupBuilder { /// The handler to handle region lease. region_lease_handler: Option, + /// The factor that determines how often statistics should be flushed, + /// based on the number of received heartbeats. When the number of heartbeats + /// reaches this factor, a flush operation is triggered. + flush_stats_factor: Option, + /// The plugins. plugins: Option, @@ -493,6 +498,7 @@ impl HeartbeatHandlerGroupBuilder { Self { region_failure_handler: None, region_lease_handler: None, + flush_stats_factor: None, plugins: None, pushers, handlers: vec![], @@ -510,6 +516,12 @@ impl HeartbeatHandlerGroupBuilder { self } + /// Sets the flush stats factor. + pub fn with_flush_stats_factor(mut self, flush_stats_factor: Option) -> Self { + self.flush_stats_factor = flush_stats_factor; + self + } + /// Sets the [`Plugins`]. pub fn with_plugins(mut self, plugins: Option) -> Self { self.plugins = plugins; @@ -550,7 +562,7 @@ impl HeartbeatHandlerGroupBuilder { if let Some(publish_heartbeat_handler) = publish_heartbeat_handler { self.add_handler_last(publish_heartbeat_handler); } - self.add_handler_last(CollectStatsHandler::default()); + self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor)); self } diff --git a/src/meta-srv/src/handler/collect_stats_handler.rs b/src/meta-srv/src/handler/collect_stats_handler.rs index b399b961eeaa..4c62f44bcffc 100644 --- a/src/meta-srv/src/handler/collect_stats_handler.rs +++ b/src/meta-srv/src/handler/collect_stats_handler.rs @@ -29,8 +29,6 @@ use crate::error::{self, Result}; use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; -const MAX_CACHED_STATS_PER_KEY: usize = 10; - #[derive(Debug, Default)] struct EpochStats { stats: Vec, @@ -69,9 +67,26 @@ impl EpochStats { } } -#[derive(Default)] +const DEFAULT_FLUSH_STATS_FACTOR: usize = 3; + pub struct CollectStatsHandler { stats_cache: DashMap, + flush_stats_factor: usize, +} + +impl Default for CollectStatsHandler { + fn default() -> Self { + Self::new(None) + } +} + +impl CollectStatsHandler { + pub fn new(flush_stats_factor: Option) -> Self { + Self { + flush_stats_factor: flush_stats_factor.unwrap_or(DEFAULT_FLUSH_STATS_FACTOR), + stats_cache: DashMap::default(), + } + } } #[async_trait::async_trait] @@ -130,7 +145,7 @@ impl HeartbeatHandler for CollectStatsHandler { rewrite_node_address(ctx, last).await; } - if !refresh && epoch_stats.len() < MAX_CACHED_STATS_PER_KEY { + if !refresh && epoch_stats.len() < self.flush_stats_factor { return Ok(HandleControl::Continue); } @@ -261,8 +276,7 @@ mod tests { let res = ctx.in_memory.get(&key).await.unwrap(); let kv = res.unwrap(); let val: DatanodeStatValue = kv.value.try_into().unwrap(); - // refresh every 10 stats - assert_eq!(10, val.stats.len()); + assert_eq!(handler.flush_stats_factor, val.stats.len()); } async fn handle_request_many_times( diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index ee02518fa35a..c9c9522d7bbb 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -130,6 +130,10 @@ pub struct MetasrvOptions { /// limit the number of operations in a txn because an infinitely large txn could /// potentially block other operations. pub max_txn_ops: usize, + /// The factor that determines how often statistics should be flushed, + /// based on the number of received heartbeats. When the number of heartbeats + /// reaches this factor, a flush operation is triggered. + pub flush_stats_factor: usize, /// The tracing options. pub tracing: TracingOptions, /// The datastore for kv metadata. @@ -165,6 +169,7 @@ impl Default for MetasrvOptions { export_metrics: ExportMetricsOption::default(), store_key_prefix: String::new(), max_txn_ops: 128, + flush_stats_factor: 3, tracing: TracingOptions::default(), backend: BackendImpl::EtcdStore, } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 781839a193a8..b21978a3977b 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -227,7 +227,7 @@ impl MetasrvBuilder { )) }); let flow_metadata_allocator = { - // for now flownode just use round robin selector + // for now flownode just use round-robin selector let flow_selector = RoundRobinSelector::new(SelectTarget::Flownode); let flow_selector_ctx = selector_ctx.clone(); let peer_allocator = Arc::new(FlowPeerAllocator::new( @@ -347,6 +347,7 @@ impl MetasrvBuilder { .with_plugins(plugins.clone()) .with_region_failure_handler(region_failover_handler) .with_region_lease_handler(Some(region_lease_handler)) + .with_flush_stats_factor(Some(options.flush_stats_factor)) .add_default_handlers() } }; diff --git a/tests/cases/standalone/common/information_schema/region_statistics.result b/tests/cases/standalone/common/information_schema/region_statistics.result index 0c62b4ad6cd0..6e49679cf172 100644 --- a/tests/cases/standalone/common/information_schema/region_statistics.result +++ b/tests/cases/standalone/common/information_schema/region_statistics.result @@ -21,8 +21,7 @@ INSERT INTO test VALUES Affected Rows: 3 --- SQLNESS SLEEP 11s --- FIXME(dennis): we need to wait the datanode reporting stats info to metasrv. +-- SQLNESS SLEEP 3s SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size) FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public'); diff --git a/tests/cases/standalone/common/information_schema/region_statistics.sql b/tests/cases/standalone/common/information_schema/region_statistics.sql index cbc4424683a6..9b6e64890405 100644 --- a/tests/cases/standalone/common/information_schema/region_statistics.sql +++ b/tests/cases/standalone/common/information_schema/region_statistics.sql @@ -16,8 +16,7 @@ INSERT INTO test VALUES (11, 'b', 11), (21, 'c', 21); --- SQLNESS SLEEP 11s --- FIXME(dennis): we need to wait the datanode reporting stats info to metasrv. +-- SQLNESS SLEEP 3s SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size) FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public'); diff --git a/tests/conf/metasrv-test.toml.template b/tests/conf/metasrv-test.toml.template index df7f66aab73e..8d27aad3c4b2 100644 --- a/tests/conf/metasrv-test.toml.template +++ b/tests/conf/metasrv-test.toml.template @@ -1,3 +1,4 @@ +flush_stats_factor = 1 [wal] {{ if is_raft_engine }} provider = "raft_engine"