diff --git a/Cargo.lock b/Cargo.lock index fa8ba34d1a3b..c23acf60636d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -730,6 +730,36 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "attribute-derive" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1800e974930e9079c965b9ffbcb6667a40401063a26396c7b4f15edc92da690" +dependencies = [ + "attribute-derive-macro", + "derive-where", + "manyhow", + "proc-macro2", + "quote", + "syn 2.0.90", +] + +[[package]] +name = "attribute-derive-macro" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d908eb786ef94296bff86f90130b3b748b49401dc81fd2bb8b3dccd44cfacbd" +dependencies = [ + "collection_literals", + "interpolator", + "manyhow", + "proc-macro-utils", + "proc-macro2", + "quote", + "quote-use", + "syn 2.0.90", +] + [[package]] name = "atty" version = "0.2.14" @@ -1845,6 +1875,12 @@ dependencies = [ "tracing-appender", ] +[[package]] +name = "collection_literals" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "186dce98367766de751c42c4f03970fc60fc012296e706ccbb9d5df9b6c1e271" + [[package]] name = "colorchoice" version = "1.0.2" @@ -3346,6 +3382,17 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "derive-where" +version = "1.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62d671cc41a825ebabc75757b62d3d168c577f9149b2d49ece1dad1f72119d25" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "derive_arbitrary" version = "1.3.2" @@ -4011,6 +4058,8 @@ dependencies = [ "enum-as-inner", "enum_dispatch", "futures", + "get-size-derive2", + "get-size2", "greptime-proto", "hydroflow", "itertools 0.10.5", @@ -4415,6 +4464,23 @@ dependencies = [ "libm", ] +[[package]] +name = "get-size-derive2" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd26d3a97ea14d289c8b54180243ecfe465f3fa9c279a6336d7a003698fc39d" +dependencies = [ + "attribute-derive", + "quote", + "syn 2.0.90", +] + +[[package]] +name = "get-size2" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "159c430715e540d2198fa981d39cd45563ccc60900de187f5b152b33b1cb408e" + [[package]] name = "gethostname" version = "0.2.3" @@ -5346,6 +5412,12 @@ version = "4.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d762194228a2f1c11063e46e32e5acb96e66e906382b9eb5441f2e0504bbd5a" +[[package]] +name = "interpolator" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71dd52191aae121e8611f1e8dc3e324dd0dd1dee1e6dd91d10ee07a3cfb4d9d8" + [[package]] name = "inventory" version = "0.3.15" @@ -6244,6 +6316,29 @@ dependencies = [ "libc", ] +[[package]] +name = "manyhow" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b33efb3ca6d3b07393750d4030418d594ab1139cee518f0dc88db70fec873587" +dependencies = [ + "manyhow-macros", + "proc-macro2", + "quote", + "syn 2.0.90", +] + +[[package]] +name = "manyhow-macros" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46fce34d199b78b6e6073abf984c9cf5fd3e9330145a93ee0738a7443e371495" +dependencies = [ + "proc-macro-utils", + "proc-macro2", + "quote", +] + [[package]] name = "maplit" version = "1.0.2" @@ -8528,6 +8623,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "proc-macro-utils" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eeaf08a13de400bc215877b5bdc088f241b12eb42f0a548d3390dc1c56bb7071" +dependencies = [ + "proc-macro2", + "quote", + "smallvec", +] + [[package]] name = "proc-macro2" version = "1.0.92" @@ -9107,6 +9213,28 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "quote-use" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9619db1197b497a36178cfc736dc96b271fe918875fbf1344c436a7e93d0321e" +dependencies = [ + "quote", + "quote-use-macros", +] + +[[package]] +name = "quote-use-macros" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82ebfb7faafadc06a7ab141a6f67bcfb24cb8beb158c6fe933f2f035afa99f35" +dependencies = [ + "proc-macro-utils", + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "radium" version = "0.7.0" diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index c7e6f8b55c01..4980a8178eb6 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -64,6 +64,13 @@ pub enum Error { source: BoxedError, }, + #[snafu(display("Failed to list flow stats"))] + ListFlowStats { + #[snafu(implicit)] + location: Location, + source: BoxedError, + }, + #[snafu(display("Failed to list flows in catalog {catalog}"))] ListFlows { #[snafu(implicit)] @@ -326,6 +333,7 @@ impl ErrorExt for Error { | Error::ListSchemas { source, .. } | Error::ListTables { source, .. } | Error::ListFlows { source, .. } + | Error::ListFlowStats { source, .. } | Error::ListProcedures { source, .. } | Error::ListRegionStats { source, .. } | Error::ConvertProtoData { source, .. } => source.status_code(), diff --git a/src/catalog/src/information_extension.rs b/src/catalog/src/information_extension.rs index 55764557a326..4d829ae01ae3 100644 --- a/src/catalog/src/information_extension.rs +++ b/src/catalog/src/information_extension.rs @@ -17,6 +17,7 @@ use common_error::ext::BoxedError; use common_meta::cluster::{ClusterInfo, NodeInfo}; use common_meta::datanode::RegionStat; use common_meta::ddl::{ExecutorContext, ProcedureExecutor}; +use common_meta::key::flow::flow_state::FlowStat; use common_meta::rpc::procedure; use common_procedure::{ProcedureInfo, ProcedureState}; use meta_client::MetaClientRef; @@ -89,4 +90,12 @@ impl InformationExtension for DistributedInformationExtension { .map_err(BoxedError::new) .context(error::ListRegionStatsSnafu) } + + async fn flow_stats(&self) -> std::result::Result, Self::Error> { + self.meta_client + .list_flow_stats() + .await + .map_err(BoxedError::new) + .context(crate::error::ListFlowStatsSnafu) + } } diff --git a/src/catalog/src/system_schema/information_schema.rs b/src/catalog/src/system_schema/information_schema.rs index 4101887cb443..6b3231cc080a 100644 --- a/src/catalog/src/system_schema/information_schema.rs +++ b/src/catalog/src/system_schema/information_schema.rs @@ -35,6 +35,7 @@ use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME use common_error::ext::ErrorExt; use common_meta::cluster::NodeInfo; use common_meta::datanode::RegionStat; +use common_meta::key::flow::flow_state::FlowStat; use common_meta::key::flow::FlowMetadataManager; use common_procedure::ProcedureInfo; use common_recordbatch::SendableRecordBatchStream; @@ -192,6 +193,7 @@ impl SystemSchemaProviderInner for InformationSchemaProvider { )) as _), FLOWS => Some(Arc::new(InformationSchemaFlows::new( self.catalog_name.clone(), + self.catalog_manager.clone(), self.flow_metadata_manager.clone(), )) as _), PROCEDURE_INFO => Some( @@ -338,6 +340,9 @@ pub trait InformationExtension { /// Gets the region statistics. async fn region_stats(&self) -> std::result::Result, Self::Error>; + + /// Get the flow statistics. If no flownode is available, return `None`. + async fn flow_stats(&self) -> std::result::Result, Self::Error>; } pub struct NoopInformationExtension; @@ -357,4 +362,8 @@ impl InformationExtension for NoopInformationExtension { async fn region_stats(&self) -> std::result::Result, Self::Error> { Ok(vec![]) } + + async fn flow_stats(&self) -> std::result::Result, Self::Error> { + Ok(None) + } } diff --git a/src/catalog/src/system_schema/information_schema/flows.rs b/src/catalog/src/system_schema/information_schema/flows.rs index 15a4205ae2af..5d35cfbbe431 100644 --- a/src/catalog/src/system_schema/information_schema/flows.rs +++ b/src/catalog/src/system_schema/information_schema/flows.rs @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; +use std::sync::{Arc, Weak}; use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID; use common_error::ext::BoxedError; use common_meta::key::flow::flow_info::FlowInfoValue; +use common_meta::key::flow::flow_state::FlowStat; use common_meta::key::flow::FlowMetadataManager; use common_meta::key::FlowId; use common_recordbatch::adapter::RecordBatchStreamAdapter; @@ -28,7 +29,9 @@ use datatypes::prelude::ConcreteDataType as CDT; use datatypes::scalars::ScalarVectorBuilder; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::value::Value; -use datatypes::vectors::{Int64VectorBuilder, StringVectorBuilder, UInt32VectorBuilder, VectorRef}; +use datatypes::vectors::{ + Int64VectorBuilder, StringVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder, VectorRef, +}; use futures::TryStreamExt; use snafu::{OptionExt, ResultExt}; use store_api::storage::{ScanRequest, TableId}; @@ -38,6 +41,8 @@ use crate::error::{ }; use crate::information_schema::{Predicates, FLOWS}; use crate::system_schema::information_schema::InformationTable; +use crate::system_schema::utils; +use crate::CatalogManager; const INIT_CAPACITY: usize = 42; @@ -45,6 +50,7 @@ const INIT_CAPACITY: usize = 42; // pk is (flow_name, flow_id, table_catalog) pub const FLOW_NAME: &str = "flow_name"; pub const FLOW_ID: &str = "flow_id"; +pub const STATE_SIZE: &str = "state_size"; pub const TABLE_CATALOG: &str = "table_catalog"; pub const FLOW_DEFINITION: &str = "flow_definition"; pub const COMMENT: &str = "comment"; @@ -55,20 +61,24 @@ pub const FLOWNODE_IDS: &str = "flownode_ids"; pub const OPTIONS: &str = "options"; /// The `information_schema.flows` to provides information about flows in databases. +/// pub(super) struct InformationSchemaFlows { schema: SchemaRef, catalog_name: String, + catalog_manager: Weak, flow_metadata_manager: Arc, } impl InformationSchemaFlows { pub(super) fn new( catalog_name: String, + catalog_manager: Weak, flow_metadata_manager: Arc, ) -> Self { Self { schema: Self::schema(), catalog_name, + catalog_manager, flow_metadata_manager, } } @@ -80,6 +90,7 @@ impl InformationSchemaFlows { vec![ (FLOW_NAME, CDT::string_datatype(), false), (FLOW_ID, CDT::uint32_datatype(), false), + (STATE_SIZE, CDT::uint64_datatype(), true), (TABLE_CATALOG, CDT::string_datatype(), false), (FLOW_DEFINITION, CDT::string_datatype(), false), (COMMENT, CDT::string_datatype(), true), @@ -99,6 +110,7 @@ impl InformationSchemaFlows { InformationSchemaFlowsBuilder::new( self.schema.clone(), self.catalog_name.clone(), + self.catalog_manager.clone(), &self.flow_metadata_manager, ) } @@ -144,10 +156,12 @@ impl InformationTable for InformationSchemaFlows { struct InformationSchemaFlowsBuilder { schema: SchemaRef, catalog_name: String, + catalog_manager: Weak, flow_metadata_manager: Arc, flow_names: StringVectorBuilder, flow_ids: UInt32VectorBuilder, + state_sizes: UInt64VectorBuilder, table_catalogs: StringVectorBuilder, raw_sqls: StringVectorBuilder, comments: StringVectorBuilder, @@ -162,15 +176,18 @@ impl InformationSchemaFlowsBuilder { fn new( schema: SchemaRef, catalog_name: String, + catalog_manager: Weak, flow_metadata_manager: &Arc, ) -> Self { Self { schema, catalog_name, + catalog_manager, flow_metadata_manager: flow_metadata_manager.clone(), flow_names: StringVectorBuilder::with_capacity(INIT_CAPACITY), flow_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY), + state_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY), table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY), raw_sqls: StringVectorBuilder::with_capacity(INIT_CAPACITY), comments: StringVectorBuilder::with_capacity(INIT_CAPACITY), @@ -195,6 +212,11 @@ impl InformationSchemaFlowsBuilder { .flow_names(&catalog_name) .await; + let flow_stat = { + let information_extension = utils::information_extension(&self.catalog_manager)?; + information_extension.flow_stats().await? + }; + while let Some((flow_name, flow_id)) = stream .try_next() .await @@ -213,7 +235,7 @@ impl InformationSchemaFlowsBuilder { catalog_name: catalog_name.to_string(), flow_name: flow_name.to_string(), })?; - self.add_flow(&predicates, flow_id.flow_id(), flow_info)?; + self.add_flow(&predicates, flow_id.flow_id(), flow_info, &flow_stat)?; } self.finish() @@ -224,6 +246,7 @@ impl InformationSchemaFlowsBuilder { predicates: &Predicates, flow_id: FlowId, flow_info: FlowInfoValue, + flow_stat: &Option, ) -> Result<()> { let row = [ (FLOW_NAME, &Value::from(flow_info.flow_name().to_string())), @@ -238,6 +261,11 @@ impl InformationSchemaFlowsBuilder { } self.flow_names.push(Some(flow_info.flow_name())); self.flow_ids.push(Some(flow_id)); + self.state_sizes.push( + flow_stat + .as_ref() + .and_then(|state| state.state_size.get(&flow_id).map(|v| *v as u64)), + ); self.table_catalogs.push(Some(flow_info.catalog_name())); self.raw_sqls.push(Some(flow_info.raw_sql())); self.comments.push(Some(flow_info.comment())); @@ -270,6 +298,7 @@ impl InformationSchemaFlowsBuilder { let columns: Vec = vec![ Arc::new(self.flow_names.finish()), Arc::new(self.flow_ids.finish()), + Arc::new(self.state_sizes.finish()), Arc::new(self.table_catalogs.finish()), Arc::new(self.raw_sqls.finish()), Arc::new(self.comments.finish()), diff --git a/src/cli/src/repl.rs b/src/cli/src/repl.rs index 4c2ef8ffe396..8b5e3aa389a2 100644 --- a/src/cli/src/repl.rs +++ b/src/cli/src/repl.rs @@ -34,7 +34,7 @@ use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::debug; use either::Either; -use meta_client::client::MetaClientBuilder; +use meta_client::client::{ClusterKvBackend, MetaClientBuilder}; use query::datafusion::DatafusionQueryEngine; use query::parser::QueryLanguageParser; use query::query_engine::{DefaultSerializer, QueryEngineState}; diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index d7e816166b03..8490e14147b2 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -34,6 +34,7 @@ use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRe use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef}; use common_meta::ddl_manager::DdlManager; +use common_meta::key::flow::flow_state::FlowStat; use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; @@ -70,7 +71,7 @@ use servers::http::HttpOptions; use servers::tls::{TlsMode, TlsOption}; use servers::Mode; use snafu::ResultExt; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, RwLock}; use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ @@ -507,7 +508,7 @@ impl StartCommand { procedure_manager.clone(), )); let catalog_manager = KvBackendCatalogManager::new( - information_extension, + information_extension.clone(), kv_backend.clone(), layered_cache_registry.clone(), Some(procedure_manager.clone()), @@ -532,6 +533,14 @@ impl StartCommand { .context(OtherSnafu)?, ); + // set the ref to query for the local flow state + { + let flow_worker_manager = flownode.flow_worker_manager(); + information_extension + .set_flow_worker_manager(flow_worker_manager.clone()) + .await; + } + let node_manager = Arc::new(StandaloneDatanodeManager { region_server: datanode.region_server(), flow_server: flownode.flow_worker_manager(), @@ -669,6 +678,7 @@ pub struct StandaloneInformationExtension { region_server: RegionServer, procedure_manager: ProcedureManagerRef, start_time_ms: u64, + flow_worker_manager: RwLock>>, } impl StandaloneInformationExtension { @@ -677,8 +687,15 @@ impl StandaloneInformationExtension { region_server, procedure_manager, start_time_ms: common_time::util::current_time_millis() as u64, + flow_worker_manager: RwLock::new(None), } } + + /// Set the flow worker manager for the standalone instance. + pub async fn set_flow_worker_manager(&self, flow_worker_manager: Arc) { + let mut guard = self.flow_worker_manager.write().await; + *guard = Some(flow_worker_manager); + } } #[async_trait::async_trait] @@ -750,6 +767,18 @@ impl InformationExtension for StandaloneInformationExtension { .collect::>(); Ok(stats) } + + async fn flow_stats(&self) -> std::result::Result, Self::Error> { + Ok(Some( + self.flow_worker_manager + .read() + .await + .as_ref() + .unwrap() + .gen_state_report() + .await, + )) + } } #[cfg(test)] diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 90b96f32dc9e..b6aa57d497cc 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -137,6 +137,7 @@ use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue}; use self::table_route::{TableRouteManager, TableRouteValue}; use self::tombstone::TombstoneManager; use crate::error::{self, Result, SerdeJsonSnafu}; +use crate::key::flow::flow_state::FlowStateValue; use crate::key::node_address::NodeAddressValue; use crate::key::table_route::TableRouteKey; use crate::key::txn_helper::TxnOpGetResponseSet; @@ -1262,7 +1263,8 @@ impl_metadata_value! { FlowRouteValue, TableFlowValue, NodeAddressValue, - SchemaNameValue + SchemaNameValue, + FlowStateValue } impl_optional_metadata_value! { diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index 9715aab1fde1..9023ca2ef83d 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -15,6 +15,7 @@ pub mod flow_info; pub(crate) mod flow_name; pub(crate) mod flow_route; +pub mod flow_state; pub(crate) mod flownode_flow; pub(crate) mod table_flow; @@ -35,6 +36,7 @@ use crate::ensure_values; use crate::error::{self, Result}; use crate::key::flow::flow_info::FlowInfoManager; use crate::key::flow::flow_name::FlowNameManager; +use crate::key::flow::flow_state::FlowStateManager; use crate::key::flow::flownode_flow::FlownodeFlowManager; pub use crate::key::flow::table_flow::{TableFlowManager, TableFlowManagerRef}; use crate::key::txn_helper::TxnOpGetResponseSet; @@ -102,6 +104,8 @@ pub struct FlowMetadataManager { flownode_flow_manager: FlownodeFlowManager, table_flow_manager: TableFlowManager, flow_name_manager: FlowNameManager, + /// only metasrv have access to itself's memory backend, so for other case it should be None + flow_state_manager: Option, kv_backend: KvBackendRef, } @@ -114,6 +118,7 @@ impl FlowMetadataManager { flow_name_manager: FlowNameManager::new(kv_backend.clone()), flownode_flow_manager: FlownodeFlowManager::new(kv_backend.clone()), table_flow_manager: TableFlowManager::new(kv_backend.clone()), + flow_state_manager: None, kv_backend, } } @@ -123,6 +128,10 @@ impl FlowMetadataManager { &self.flow_name_manager } + pub fn flow_state_manager(&self) -> Option<&FlowStateManager> { + self.flow_state_manager.as_ref() + } + /// Returns the [`FlowInfoManager`]. pub fn flow_info_manager(&self) -> &FlowInfoManager { &self.flow_info_manager diff --git a/src/common/meta/src/key/flow/flow_state.rs b/src/common/meta/src/key/flow/flow_state.rs new file mode 100644 index 000000000000..eeb4b06f0132 --- /dev/null +++ b/src/common/meta/src/key/flow/flow_state.rs @@ -0,0 +1,162 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; + +use crate::error::{self, Result}; +use crate::key::flow::FlowScoped; +use crate::key::{FlowId, MetadataKey, MetadataValue}; +use crate::kv_backend::KvBackendRef; +use crate::rpc::store::PutRequest; + +/// The entire FlowId to Flow Size's Map is stored directly in the value part of the key. +const FLOW_STATE_KEY: &str = "state"; + +/// The key of flow state. +#[derive(Debug, Clone, Copy, PartialEq)] +struct FlowStateKeyInner; + +impl FlowStateKeyInner { + pub fn new() -> Self { + Self + } +} + +impl<'a> MetadataKey<'a, FlowStateKeyInner> for FlowStateKeyInner { + fn to_bytes(&self) -> Vec { + FLOW_STATE_KEY.as_bytes().to_vec() + } + + fn from_bytes(bytes: &'a [u8]) -> Result { + let key = std::str::from_utf8(bytes).map_err(|e| { + error::InvalidMetadataSnafu { + err_msg: format!( + "FlowInfoKeyInner '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + if key != FLOW_STATE_KEY { + return Err(error::InvalidMetadataSnafu { + err_msg: format!("Invalid FlowStateKeyInner '{key}'"), + } + .build()); + } + Ok(FlowStateKeyInner::new()) + } +} + +/// The key stores the state size of the flow. +/// +/// The layout: `__flow/state`. +pub struct FlowStateKey(FlowScoped); + +impl FlowStateKey { + /// Returns the [FlowStateKey]. + pub fn new() -> FlowStateKey { + let inner = FlowStateKeyInner::new(); + FlowStateKey(FlowScoped::new(inner)) + } +} + +impl Default for FlowStateKey { + fn default() -> Self { + Self::new() + } +} + +impl<'a> MetadataKey<'a, FlowStateKey> for FlowStateKey { + fn to_bytes(&self) -> Vec { + self.0.to_bytes() + } + + fn from_bytes(bytes: &'a [u8]) -> Result { + Ok(FlowStateKey(FlowScoped::::from_bytes( + bytes, + )?)) + } +} + +/// The value of flow state size +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct FlowStateValue { + /// For each key, the bytes of the state in memory + pub state_size: BTreeMap, +} + +impl FlowStateValue { + pub fn new(state_size: BTreeMap) -> Self { + Self { state_size } + } +} + +pub type FlowStateManagerRef = Arc; + +/// The manager of [FlowStateKey]. Since state size changes frequently, we store it in memory. +/// +/// This is only used in distributed mode. When meta-srv use heartbeat to update the flow stat report +/// and frontned use get to get the latest flow stat report. +pub struct FlowStateManager { + in_memory: KvBackendRef, +} + +impl FlowStateManager { + pub fn new(in_memory: KvBackendRef) -> Self { + Self { in_memory } + } + + pub async fn get(&self) -> Result> { + let key = FlowStateKey::new().to_bytes(); + self.in_memory + .get(&key) + .await? + .map(|x| FlowStateValue::try_from_raw_value(&x.value)) + .transpose() + } + + pub async fn put(&self, value: FlowStateValue) -> Result<()> { + let key = FlowStateKey::new().to_bytes(); + let value = value.try_as_raw_value()?; + let req = PutRequest::new().with_key(key).with_value(value); + self.in_memory.put(req).await?; + Ok(()) + } +} + +/// Flow's state report, send regularly through heartbeat message +#[derive(Debug, Clone)] +pub struct FlowStat { + /// For each key, the bytes of the state in memory + pub state_size: BTreeMap, +} + +impl From for FlowStat { + fn from(value: FlowStateValue) -> Self { + Self { + state_size: value.state_size, + } + } +} + +impl From for FlowStateValue { + fn from(value: FlowStat) -> Self { + Self { + state_size: value.state_size, + } + } +} diff --git a/src/common/meta/src/kv_backend.rs b/src/common/meta/src/kv_backend.rs index ba9db2ec2a44..d4b2cf2ef381 100644 --- a/src/common/meta/src/kv_backend.rs +++ b/src/common/meta/src/kv_backend.rs @@ -36,7 +36,7 @@ pub mod postgres; pub mod test; pub mod txn; -pub type KvBackendRef = Arc + Send + Sync>; +pub type KvBackendRef = Arc + Send + Sync>; #[async_trait] pub trait KvBackend: TxnService @@ -161,6 +161,9 @@ where Self::Error: ErrorExt, { fn reset(&self); + + /// Upcast as `KvBackendRef`. Since https://github.com/rust-lang/rust/issues/65991 is not yet stable. + fn as_kv_backend_ref(self: Arc) -> KvBackendRef; } -pub type ResettableKvBackendRef = Arc + Send + Sync>; +pub type ResettableKvBackendRef = Arc + Send + Sync>; diff --git a/src/common/meta/src/kv_backend/memory.rs b/src/common/meta/src/kv_backend/memory.rs index 256e31f93ed3..9475a30001ce 100644 --- a/src/common/meta/src/kv_backend/memory.rs +++ b/src/common/meta/src/kv_backend/memory.rs @@ -16,13 +16,13 @@ use std::any::Any; use std::collections::BTreeMap; use std::fmt::{Display, Formatter}; use std::marker::PhantomData; -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; use async_trait::async_trait; use common_error::ext::ErrorExt; use serde::Serializer; -use super::ResettableKvBackend; +use super::{KvBackendRef, ResettableKvBackend}; use crate::kv_backend::txn::{Txn, TxnOp, TxnOpResponse, TxnRequest, TxnResponse}; use crate::kv_backend::{KvBackend, TxnService}; use crate::metrics::METRIC_META_TXN_REQUEST; @@ -311,6 +311,10 @@ impl ResettableKvBackend for MemoryKvBacken fn reset(&self) { self.clear(); } + + fn as_kv_backend_ref(self: Arc) -> KvBackendRef { + self + } } #[cfg(test)] diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index ffba0618daaf..08867d342a74 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -40,6 +40,8 @@ datatypes.workspace = true enum-as-inner = "0.6.0" enum_dispatch = "0.3" futures = "0.3" +get-size-derive2 = "0.1.2" +get-size2 = "0.1.2" greptime-proto.workspace = true # This fork of hydroflow is simply for keeping our dependency in our org, and pin the version # otherwise it is the same with upstream repo diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 7d9ae5e422d2..586eaa8e586a 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -60,6 +60,7 @@ use crate::repr::{self, DiffRow, Row, BATCH_SIZE}; mod flownode_impl; mod parse_expr; +mod stat; #[cfg(test)] mod tests; mod util; @@ -69,6 +70,7 @@ pub(crate) mod node_context; mod table_source; use crate::error::Error; +use crate::utils::StateReportHandler; use crate::FrontendInvoker; // `GREPTIME_TIMESTAMP` is not used to distinguish when table is created automatically by flow @@ -137,6 +139,8 @@ pub struct FlowWorkerManager { /// /// So that a series of event like `inserts -> flush` can be handled correctly flush_lock: RwLock<()>, + /// receive a oneshot sender to send state size report + state_report_handler: RwLock>, } /// Building FlownodeManager @@ -170,9 +174,15 @@ impl FlowWorkerManager { tick_manager, node_id, flush_lock: RwLock::new(()), + state_report_handler: RwLock::new(None), } } + pub async fn with_state_report_handler(self, handler: StateReportHandler) -> Self { + *self.state_report_handler.write().await = Some(handler); + self + } + /// Create a flownode manager with one worker pub fn new_with_worker<'s>( node_id: Option, @@ -500,6 +510,27 @@ impl FlowWorkerManager { /// Flow Runtime related methods impl FlowWorkerManager { + /// Start state report handler, which will receive a sender from HeartbeatTask to send state size report back + /// + /// if heartbeat task is shutdown, this future will exit too + async fn start_state_report_handler(self: Arc) -> Option> { + let state_report_handler = self.state_report_handler.write().await.take(); + if let Some(mut handler) = state_report_handler { + let zelf = self.clone(); + let handler = common_runtime::spawn_global(async move { + while let Some(ret_handler) = handler.recv().await { + let state_report = zelf.gen_state_report().await; + ret_handler.send(state_report).unwrap_or_else(|err| { + common_telemetry::error!(err; "Send state size report error"); + }); + } + }); + Some(handler) + } else { + None + } + } + /// run in common_runtime background runtime pub fn run_background( self: Arc, @@ -507,6 +538,7 @@ impl FlowWorkerManager { ) -> JoinHandle<()> { info!("Starting flownode manager's background task"); common_runtime::spawn_global(async move { + let _state_report_handler = self.clone().start_state_report_handler().await; self.run(shutdown).await; }) } diff --git a/src/flow/src/adapter/stat.rs b/src/flow/src/adapter/stat.rs new file mode 100644 index 000000000000..c719e35f3ca9 --- /dev/null +++ b/src/flow/src/adapter/stat.rs @@ -0,0 +1,40 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use common_meta::key::flow::flow_state::FlowStat; + +use crate::FlowWorkerManager; + +impl FlowWorkerManager { + pub async fn gen_state_report(&self) -> FlowStat { + let mut full_report = BTreeMap::new(); + for worker in self.worker_handles.iter() { + let worker = worker.lock().await; + match worker.get_state_size().await { + Ok(state_size) => { + full_report.extend(state_size.into_iter().map(|(k, v)| (k as u32, v))) + } + Err(err) => { + common_telemetry::error!(err; "Get flow stat size error"); + } + } + } + + FlowStat { + state_size: full_report, + } + } +} diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index 978d3c608cec..4a6b0ba963d9 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -197,6 +197,21 @@ impl WorkerHandle { .fail() } } + + pub async fn get_state_size(&self) -> Result, Error> { + let ret = self + .itc_client + .call_with_resp(Request::QueryStateSize) + .await?; + ret.into_query_state_size().map_err(|ret| { + InternalSnafu { + reason: format!( + "Flow Node/Worker itc failed, expect Response::QueryStateSize, found {ret:?}" + ), + } + .build() + }) + } } impl Drop for WorkerHandle { @@ -361,6 +376,13 @@ impl<'s> Worker<'s> { Some(Response::ContainTask { result: ret }) } Request::Shutdown => return Err(()), + Request::QueryStateSize => { + let mut ret = BTreeMap::new(); + for (flow_id, task_state) in self.task_states.iter() { + ret.insert(*flow_id, task_state.state.get_state_size()); + } + Some(Response::QueryStateSize { result: ret }) + } }; Ok(ret) } @@ -391,6 +413,7 @@ pub enum Request { flow_id: FlowId, }, Shutdown, + QueryStateSize, } #[derive(Debug, EnumAsInner)] @@ -406,6 +429,10 @@ enum Response { result: bool, }, RunAvail, + QueryStateSize { + /// each flow tasks' state size + result: BTreeMap, + }, } fn create_inter_thread_call() -> (InterThreadCallClient, InterThreadCallServer) { @@ -423,10 +450,12 @@ struct InterThreadCallClient { } impl InterThreadCallClient { + /// call without response fn call_no_resp(&self, req: Request) -> Result<(), Error> { self.arg_sender.send((req, None)).map_err(from_send_error) } + /// call with response async fn call_with_resp(&self, req: Request) -> Result { let (tx, rx) = oneshot::channel(); self.arg_sender @@ -527,6 +556,7 @@ mod test { ); tx.send(Batch::empty()).unwrap(); handle.run_available(0, true).await.unwrap(); + assert_eq!(handle.get_state_size().await.unwrap().len(), 1); assert_eq!(sink_rx.recv().await.unwrap(), Batch::empty()); drop(handle); worker_thread_handle.join().unwrap(); diff --git a/src/flow/src/compute/state.rs b/src/flow/src/compute/state.rs index d34b4a311d15..ee2c7628a28c 100644 --- a/src/flow/src/compute/state.rs +++ b/src/flow/src/compute/state.rs @@ -16,6 +16,7 @@ use std::cell::RefCell; use std::collections::{BTreeMap, VecDeque}; use std::rc::Rc; +use get_size2::GetSize; use hydroflow::scheduled::graph::Hydroflow; use hydroflow::scheduled::SubgraphId; @@ -109,6 +110,10 @@ impl DataflowState { pub fn expire_after(&self) -> Option { self.expire_after } + + pub fn get_state_size(&self) -> usize { + self.arrange_used.iter().map(|x| x.read().get_size()).sum() + } } #[derive(Debug, Clone)] diff --git a/src/flow/src/heartbeat.rs b/src/flow/src/heartbeat.rs index 96635e350dde..69159d1d2a01 100644 --- a/src/flow/src/heartbeat.rs +++ b/src/flow/src/heartbeat.rs @@ -24,6 +24,7 @@ use common_meta::heartbeat::handler::{ }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage}; use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; +use common_meta::key::flow::flow_state::FlowStat; use common_telemetry::{debug, error, info, warn}; use greptime_proto::v1::meta::NodeInfo; use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient}; @@ -34,8 +35,27 @@ use tokio::sync::mpsc; use tokio::time::Duration; use crate::error::ExternalSnafu; +use crate::utils::SizeReportSender; use crate::{Error, FlownodeOptions}; +async fn query_flow_state( + query_stat_size: &Option, + timeout: Duration, +) -> Option { + if let Some(report_requester) = query_stat_size.as_ref() { + let ret = report_requester.query(timeout).await; + match ret { + Ok(latest) => Some(latest), + Err(err) => { + error!(err; "Failed to get query stat size"); + None + } + } + } else { + None + } +} + /// The flownode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background. #[derive(Clone)] pub struct HeartbeatTask { @@ -47,9 +67,14 @@ pub struct HeartbeatTask { resp_handler_executor: HeartbeatResponseHandlerExecutorRef, start_time_ms: u64, running: Arc, + query_stat_size: Option, } impl HeartbeatTask { + pub fn with_query_stat_size(mut self, query_stat_size: SizeReportSender) -> Self { + self.query_stat_size = Some(query_stat_size); + self + } pub fn new( opts: &FlownodeOptions, meta_client: Arc, @@ -65,6 +90,7 @@ impl HeartbeatTask { resp_handler_executor, start_time_ms: common_time::util::current_time_millis() as u64, running: Arc::new(AtomicBool::new(false)), + query_stat_size: None, } } @@ -112,6 +138,7 @@ impl HeartbeatTask { message: Option, peer: Option, start_time_ms: u64, + latest_report: &Option, ) -> Option { let mailbox_message = match message.map(outgoing_message_to_mailbox_message) { Some(Ok(message)) => Some(message), @@ -121,11 +148,22 @@ impl HeartbeatTask { } None => None, }; + let flow_stat = latest_report + .as_ref() + .map(|report| { + report + .state_size + .iter() + .map(|(k, v)| (*k, *v as u64)) + .collect() + }) + .map(|f| api::v1::meta::FlowStat { flow_stat_size: f }); Some(HeartbeatRequest { mailbox_message, peer, info: Self::build_node_info(start_time_ms), + flow_stat, ..Default::default() }) } @@ -151,24 +189,27 @@ impl HeartbeatTask { addr: self.peer_addr.clone(), }); + let query_stat_size = self.query_stat_size.clone(); + common_runtime::spawn_hb(async move { // note that using interval will cause it to first immediately send // a heartbeat let mut interval = tokio::time::interval(report_interval); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + let mut latest_report = None; loop { let req = tokio::select! { message = outgoing_rx.recv() => { if let Some(message) = message { - Self::create_heartbeat_request(Some(message), self_peer.clone(), start_time_ms) + Self::create_heartbeat_request(Some(message), self_peer.clone(), start_time_ms, &latest_report) } else { // Receives None that means Sender was dropped, we need to break the current loop break } } _ = interval.tick() => { - Self::create_heartbeat_request(None, self_peer.clone(), start_time_ms) + Self::create_heartbeat_request(None, self_peer.clone(), start_time_ms, &latest_report) } }; @@ -180,6 +221,10 @@ impl HeartbeatTask { debug!("Send a heartbeat request to metasrv, content: {:?}", req); } } + // after sending heartbeat, try to get the latest report + // TODO(discord9): consider a better place to update the size report + // set the timeout to half of the report interval so that it wouldn't delay heartbeat if something went horribly wrong + latest_report = query_flow_state(&query_stat_size, report_interval / 2).await; } }); } diff --git a/src/flow/src/repr.rs b/src/flow/src/repr.rs index 1c257c8f3c6c..50f2a78ef8f9 100644 --- a/src/flow/src/repr.rs +++ b/src/flow/src/repr.rs @@ -22,12 +22,14 @@ use api::v1::Row as ProtoRow; use datatypes::data_type::ConcreteDataType; use datatypes::types::cast; use datatypes::value::Value; +use get_size2::GetSize; use itertools::Itertools; pub(crate) use relation::{ColumnType, Key, RelationDesc, RelationType}; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use crate::expr::error::{CastValueSnafu, EvalError, InvalidArgumentSnafu}; +use crate::utils::get_value_heap_size; /// System-wide Record count difference type. Useful for capture data change /// @@ -105,6 +107,12 @@ pub struct Row { pub inner: Vec, } +impl GetSize for Row { + fn get_heap_size(&self) -> usize { + self.inner.iter().map(get_value_heap_size).sum() + } +} + impl Row { /// Create an empty row pub fn empty() -> Self { diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 87b6bbdc09ed..1259c1175510 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -55,6 +55,7 @@ use crate::error::{ }; use crate::heartbeat::HeartbeatTask; use crate::transform::register_function_to_query_engine; +use crate::utils::{SizeReportSender, StateReportHandler}; use crate::{Error, FlowWorkerManager, FlownodeOptions}; pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER"; @@ -236,6 +237,8 @@ pub struct FlownodeBuilder { catalog_manager: CatalogManagerRef, flow_metadata_manager: FlowMetadataManagerRef, heartbeat_task: Option, + /// receive a oneshot sender to send state size report + state_report_handler: Option, } impl FlownodeBuilder { @@ -254,17 +257,20 @@ impl FlownodeBuilder { catalog_manager, flow_metadata_manager, heartbeat_task: None, + state_report_handler: None, } } pub fn with_heartbeat_task(self, heartbeat_task: HeartbeatTask) -> Self { + let (sender, receiver) = SizeReportSender::new(); Self { - heartbeat_task: Some(heartbeat_task), + heartbeat_task: Some(heartbeat_task.with_query_stat_size(sender)), + state_report_handler: Some(receiver), ..self } } - pub async fn build(self) -> Result { + pub async fn build(mut self) -> Result { // TODO(discord9): does this query engine need those? let query_engine_factory = QueryEngineFactory::new_with_plugins( // query engine in flownode is only used for translate plan with resolved table source. @@ -383,7 +389,7 @@ impl FlownodeBuilder { /// build [`FlowWorkerManager`], note this doesn't take ownership of `self`, /// nor does it actually start running the worker. async fn build_manager( - &self, + &mut self, query_engine: Arc, ) -> Result { let table_meta = self.table_meta.clone(); @@ -402,12 +408,15 @@ impl FlownodeBuilder { info!("Flow Worker started in new thread"); worker.run(); }); - let man = rx.await.map_err(|_e| { + let mut man = rx.await.map_err(|_e| { UnexpectedSnafu { reason: "sender is dropped, failed to create flow node manager", } .build() })?; + if let Some(handler) = self.state_report_handler.take() { + man = man.with_state_report_handler(handler).await; + } info!("Flow Node Manager started"); Ok(man) } diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index 1cd5b3ba5c1c..5e01d0bfa423 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -18,16 +18,73 @@ use std::collections::{BTreeMap, BTreeSet}; use std::ops::Bound; use std::sync::Arc; +use common_meta::key::flow::flow_state::FlowStat; use common_telemetry::trace; +use datatypes::value::Value; +use get_size2::GetSize; use smallvec::{smallvec, SmallVec}; -use tokio::sync::RwLock; +use tokio::sync::{mpsc, oneshot, RwLock}; +use tokio::time::Instant; +use crate::error::InternalSnafu; use crate::expr::{EvalError, ScalarExpr}; use crate::repr::{value_to_internal_ts, DiffRow, Duration, KeyValDiffRow, Row, Timestamp}; /// A batch of updates, arranged by key pub type Batch = BTreeMap>; +/// Get a estimate of heap size of a value +pub fn get_value_heap_size(v: &Value) -> usize { + match v { + Value::Binary(bin) => bin.len(), + Value::String(s) => s.len(), + Value::List(list) => list.items().iter().map(get_value_heap_size).sum(), + _ => 0, + } +} + +#[derive(Clone)] +pub struct SizeReportSender { + inner: mpsc::Sender>, +} + +impl SizeReportSender { + pub fn new() -> (Self, StateReportHandler) { + let (tx, rx) = mpsc::channel(1); + let zelf = Self { inner: tx }; + (zelf, rx) + } + + /// Query the size report, will timeout after one second if no response + pub async fn query(&self, timeout: std::time::Duration) -> crate::Result { + let (tx, rx) = oneshot::channel(); + self.inner.send(tx).await.map_err(|_| { + InternalSnafu { + reason: "failed to send size report request due to receiver dropped", + } + .build() + })?; + let timeout = tokio::time::timeout(timeout, rx); + timeout + .await + .map_err(|_elapsed| { + InternalSnafu { + reason: "failed to receive size report after one second timeout", + } + .build() + })? + .map_err(|_| { + InternalSnafu { + reason: "failed to receive size report due to sender dropped", + } + .build() + }) + } +} + +/// Handle the size report request, and send the report back +pub type StateReportHandler = mpsc::Receiver>; + /// A spine of batches, arranged by timestamp /// TODO(discord9): consider internally index by key, value, and timestamp for faster lookup pub type Spine = BTreeMap; @@ -49,6 +106,24 @@ pub struct KeyExpiryManager { event_timestamp_from_row: Option, } +impl GetSize for KeyExpiryManager { + fn get_heap_size(&self) -> usize { + let row_size = if let Some(row_size) = &self + .event_ts_to_key + .first_key_value() + .map(|(_, v)| v.first().get_heap_size()) + { + *row_size + } else { + 0 + }; + self.event_ts_to_key + .values() + .map(|v| v.len() * row_size + std::mem::size_of::()) + .sum::() + } +} + impl KeyExpiryManager { pub fn new( key_expiration_duration: Option, @@ -154,7 +229,7 @@ impl KeyExpiryManager { /// /// Note the two way arrow between reduce operator and arrange, it's because reduce operator need to query existing state /// and also need to update existing state. -#[derive(Debug, Clone, Default, Eq, PartialEq, Ord, PartialOrd)] +#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] pub struct Arrangement { /// A name or identifier for the arrangement which can be used for debugging or logging purposes. /// This field is not critical to the functionality but aids in monitoring and management of arrangements. @@ -196,6 +271,61 @@ pub struct Arrangement { /// The time that the last compaction happened, also known as the current time. last_compaction_time: Option, + + /// Estimated size of the arrangement in heap size. + estimated_size: usize, + last_size_update: Instant, + size_update_interval: tokio::time::Duration, +} + +impl Arrangement { + fn compute_size(&self) -> usize { + self.spine + .values() + .map(|v| { + let per_entry_size = v + .first_key_value() + .map(|(k, v)| { + k.get_heap_size() + + v.len() * v.first().map(|r| r.get_heap_size()).unwrap_or(0) + }) + .unwrap_or(0); + std::mem::size_of::() + v.len() * per_entry_size + }) + .sum::() + + self.expire_state.get_heap_size() + + self.name.get_heap_size() + } + + fn update_and_fetch_size(&mut self) -> usize { + if self.last_size_update.elapsed() > self.size_update_interval { + self.estimated_size = self.compute_size(); + self.last_size_update = Instant::now(); + } + self.estimated_size + } +} + +impl GetSize for Arrangement { + fn get_heap_size(&self) -> usize { + self.estimated_size + } +} + +impl Default for Arrangement { + fn default() -> Self { + Self { + spine: Default::default(), + full_arrangement: false, + is_written: false, + expire_state: None, + last_compaction_time: None, + name: Vec::new(), + estimated_size: 0, + last_size_update: Instant::now(), + size_update_interval: tokio::time::Duration::from_secs(3), + } + } } impl Arrangement { @@ -207,6 +337,9 @@ impl Arrangement { expire_state: None, last_compaction_time: None, name, + estimated_size: 0, + last_size_update: Instant::now(), + size_update_interval: tokio::time::Duration::from_secs(3), } } @@ -269,6 +402,7 @@ impl Arrangement { // without changing the order of updates within same tick key_updates.sort_by_key(|(_val, ts, _diff)| *ts); } + self.update_and_fetch_size(); Ok(max_expired_by) } @@ -390,6 +524,7 @@ impl Arrangement { // insert the compacted batch into spine with key being `now` self.spine.insert(now, compacting_batch); + self.update_and_fetch_size(); Ok(max_expired_by) } diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index d0008a7e81b1..ebe0e94e4861 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use api::v1::meta::{ProcedureDetailResponse, Role}; use cluster::Client as ClusterClient; +pub use cluster::ClusterKvBackend; use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::cluster::{ @@ -33,6 +34,8 @@ use common_meta::cluster::{ use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat}; use common_meta::ddl::{ExecutorContext, ProcedureExecutor}; use common_meta::error::{self as meta_error, ExternalSnafu, Result as MetaResult}; +use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager}; +use common_meta::kv_backend::KvBackendRef; use common_meta::range_stream::PaginationStream; use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::procedure::{ @@ -54,7 +57,8 @@ use store::Client as StoreClient; pub use self::heartbeat::{HeartbeatSender, HeartbeatStream}; use crate::error::{ - ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, NotStartedSnafu, Result, + ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, GetFlowStatSnafu, NotStartedSnafu, + Result, }; pub type Id = (u64, u64); @@ -347,6 +351,15 @@ fn decode_stats(kv: KeyValue) -> MetaResult { } impl MetaClient { + pub async fn list_flow_stats(&self) -> Result> { + let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?)); + let cluster_backend = Arc::new(cluster_backend) as KvBackendRef; + let flow_state_manager = FlowStateManager::new(cluster_backend); + let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?; + + Ok(res.map(|r| r.into())) + } + pub fn new(id: Id) -> Self { Self { id, diff --git a/src/meta-client/src/client/cluster.rs b/src/meta-client/src/client/cluster.rs index c7edbcc8d39e..d50ac9717cc8 100644 --- a/src/meta-client/src/client/cluster.rs +++ b/src/meta-client/src/client/cluster.rs @@ -40,8 +40,8 @@ use tonic::Status; use crate::client::ask_leader::AskLeader; use crate::client::{util, Id}; use crate::error::{ - ConvertMetaResponseSnafu, CreateChannelSnafu, Error, IllegalGrpcClientStateSnafu, Result, - RetryTimesExceededSnafu, + ConvertMetaResponseSnafu, CreateChannelSnafu, Error, IllegalGrpcClientStateSnafu, + ReadOnlyKvBackendSnafu, Result, RetryTimesExceededSnafu, }; #[derive(Clone, Debug)] @@ -308,3 +308,75 @@ impl Inner { .map(|res| (res.leader, res.followers)) } } + +/// A client for the cluster info. Read only and corresponding to +/// `in_memory` kvbackend in the meta-srv. +#[derive(Clone, Debug)] +pub struct ClusterKvBackend { + inner: Arc, +} + +impl ClusterKvBackend { + pub fn new(client: Arc) -> Self { + Self { inner: client } + } + + fn unimpl(&self) -> common_meta::error::Error { + let ret: common_meta::error::Result<()> = ReadOnlyKvBackendSnafu { + name: self.name().to_string(), + } + .fail() + .map_err(BoxedError::new) + .context(common_meta::error::ExternalSnafu); + ret.unwrap_err() + } +} + +impl TxnService for ClusterKvBackend { + type Error = common_meta::error::Error; +} + +#[async_trait::async_trait] +impl KvBackend for ClusterKvBackend { + fn name(&self) -> &str { + "ClusterKvBackend" + } + + fn as_any(&self) -> &dyn Any { + self + } + + async fn range(&self, req: RangeRequest) -> common_meta::error::Result { + self.inner + .range(req) + .await + .map_err(BoxedError::new) + .context(common_meta::error::ExternalSnafu) + } + + async fn batch_get(&self, _: BatchGetRequest) -> common_meta::error::Result { + Err(self.unimpl()) + } + + async fn put(&self, _: PutRequest) -> common_meta::error::Result { + Err(self.unimpl()) + } + + async fn batch_put(&self, _: BatchPutRequest) -> common_meta::error::Result { + Err(self.unimpl()) + } + + async fn delete_range( + &self, + _: DeleteRangeRequest, + ) -> common_meta::error::Result { + Err(self.unimpl()) + } + + async fn batch_delete( + &self, + _: BatchDeleteRequest, + ) -> common_meta::error::Result { + Err(self.unimpl()) + } +} diff --git a/src/meta-client/src/error.rs b/src/meta-client/src/error.rs index a4f8663368b4..be1cf150da0f 100644 --- a/src/meta-client/src/error.rs +++ b/src/meta-client/src/error.rs @@ -99,8 +99,22 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Failed to get flow stat"))] + GetFlowStat { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Retry exceeded max times({}), message: {}", times, msg))] RetryTimesExceeded { times: usize, msg: String }, + + #[snafu(display("Trying to write to a read-only kv backend: {}", name))] + ReadOnlyKvBackend { + name: String, + #[snafu(implicit)] + location: Location, + }, } #[allow(dead_code)] @@ -120,13 +134,15 @@ impl ErrorExt for Error { | Error::SendHeartbeat { .. } | Error::CreateHeartbeatStream { .. } | Error::CreateChannel { .. } - | Error::RetryTimesExceeded { .. } => StatusCode::Internal, + | Error::RetryTimesExceeded { .. } + | Error::ReadOnlyKvBackend { .. } => StatusCode::Internal, Error::MetaServer { code, .. } => *code, Error::InvalidResponseHeader { source, .. } | Error::ConvertMetaRequest { source, .. } - | Error::ConvertMetaResponse { source, .. } => source.status_code(), + | Error::ConvertMetaResponse { source, .. } + | Error::GetFlowStat { source, .. } => source.status_code(), } } } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 705f31ac49f4..1c529f06d606 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -716,6 +716,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Flow state handler error"))] + FlowStateHandler { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, } impl Error { @@ -761,7 +768,8 @@ impl ErrorExt for Error { | Error::Join { .. } | Error::PeerUnavailable { .. } | Error::ExceededDeadline { .. } - | Error::ChooseItems { .. } => StatusCode::Internal, + | Error::ChooseItems { .. } + | Error::FlowStateHandler { .. } => StatusCode::Internal, Error::Unsupported { .. } => StatusCode::Unsupported, diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 3b4eb9a27935..ad1492cd7cdc 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -51,6 +51,7 @@ use tokio::sync::mpsc::Sender; use tokio::sync::{oneshot, Notify, RwLock}; use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu}; +use crate::handler::flow_state_handler::FlowStateHandler; use crate::metasrv::Context; use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM}; use crate::pubsub::PublisherRef; @@ -64,6 +65,7 @@ pub mod collect_stats_handler; pub mod extract_stat_handler; pub mod failure_handler; pub mod filter_inactive_region_stats; +pub mod flow_state_handler; pub mod keep_lease_handler; pub mod mailbox_handler; pub mod on_leader_start_handler; @@ -482,6 +484,8 @@ pub struct HeartbeatHandlerGroupBuilder { /// based on the number of received heartbeats. When the number of heartbeats /// reaches this factor, a flush operation is triggered. flush_stats_factor: Option, + /// A simple handler for flow internal state report + flow_state_handler: Option, /// The plugins. plugins: Option, @@ -499,12 +503,18 @@ impl HeartbeatHandlerGroupBuilder { region_failure_handler: None, region_lease_handler: None, flush_stats_factor: None, + flow_state_handler: None, plugins: None, pushers, handlers: vec![], } } + pub fn with_flow_state_handler(mut self, handler: Option) -> Self { + self.flow_state_handler = handler; + self + } + pub fn with_region_lease_handler(mut self, handler: Option) -> Self { self.region_lease_handler = handler; self @@ -564,6 +574,10 @@ impl HeartbeatHandlerGroupBuilder { } self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor)); + if let Some(flow_state_handler) = self.flow_state_handler.take() { + self.add_handler_last(flow_state_handler); + } + self } diff --git a/src/meta-srv/src/handler/flow_state_handler.rs b/src/meta-srv/src/handler/flow_state_handler.rs new file mode 100644 index 000000000000..ab387eb29185 --- /dev/null +++ b/src/meta-srv/src/handler/flow_state_handler.rs @@ -0,0 +1,58 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::{FlowStat, HeartbeatRequest, Role}; +use common_meta::key::flow::flow_state::{FlowStateManager, FlowStateValue}; +use snafu::ResultExt; + +use crate::error::{FlowStateHandlerSnafu, Result}; +use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; +use crate::metasrv::Context; + +pub struct FlowStateHandler { + flow_state_manager: FlowStateManager, +} + +impl FlowStateHandler { + pub fn new(flow_state_manager: FlowStateManager) -> Self { + Self { flow_state_manager } + } +} + +#[async_trait::async_trait] +impl HeartbeatHandler for FlowStateHandler { + fn is_acceptable(&self, role: Role) -> bool { + role == Role::Flownode + } + + async fn handle( + &self, + req: &HeartbeatRequest, + _ctx: &mut Context, + _acc: &mut HeartbeatAccumulator, + ) -> Result { + if let Some(FlowStat { flow_stat_size }) = &req.flow_stat { + let state_size = flow_stat_size + .iter() + .map(|(k, v)| (*k, *v as usize)) + .collect(); + let value = FlowStateValue::new(state_size); + self.flow_state_manager + .put(value) + .await + .context(FlowStateHandlerSnafu)?; + } + Ok(HandleControl::Continue) + } +} diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 05344b482b06..0afaf004933d 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -26,6 +26,7 @@ use common_meta::ddl::{ }; use common_meta::ddl_manager::DdlManager; use common_meta::distributed_time_constants; +use common_meta::key::flow::flow_state::FlowStateManager; use common_meta::key::flow::FlowMetadataManager; use common_meta::key::maintenance::MaintenanceModeManager; use common_meta::key::TableMetadataManager; @@ -47,6 +48,7 @@ use crate::error::{self, Result}; use crate::flow_meta_alloc::FlowPeerAllocator; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::handler::failure_handler::RegionFailureHandler; +use crate::handler::flow_state_handler::FlowStateHandler; use crate::handler::region_lease_handler::RegionLeaseHandler; use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers}; use crate::lease::MetaPeerLookupService; @@ -228,6 +230,7 @@ impl MetasrvBuilder { peer_allocator, )) }); + let flow_metadata_allocator = { // for now flownode just use round-robin selector let flow_selector = RoundRobinSelector::new(SelectTarget::Flownode); @@ -248,6 +251,9 @@ impl MetasrvBuilder { peer_allocator, )) }; + let flow_state_handler = + FlowStateHandler::new(FlowStateManager::new(in_memory.clone().as_kv_backend_ref())); + let memory_region_keeper = Arc::new(MemoryRegionKeeper::default()); let node_manager = node_manager.unwrap_or_else(|| { let datanode_client_channel_config = ChannelConfig::new() @@ -350,6 +356,7 @@ impl MetasrvBuilder { .with_region_failure_handler(region_failover_handler) .with_region_lease_handler(Some(region_lease_handler)) .with_flush_stats_factor(Some(options.flush_stats_factor)) + .with_flow_state_handler(Some(flow_state_handler)) .add_default_handlers() } }; diff --git a/src/meta-srv/src/service/store/cached_kv.rs b/src/meta-srv/src/service/store/cached_kv.rs index d4b6f84f5802..0f90ecddea33 100644 --- a/src/meta-srv/src/service/store/cached_kv.rs +++ b/src/meta-srv/src/service/store/cached_kv.rs @@ -386,6 +386,10 @@ impl ResettableKvBackend for LeaderCachedKvBackend { fn reset(&self) { self.cache.reset() } + + fn as_kv_backend_ref(self: Arc) -> KvBackendRef { + self + } } #[cfg(test)] diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result index 8ee6a90c83bf..fa360a6de684 100644 --- a/tests/cases/standalone/common/flow/flow_basic.result +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -1045,6 +1045,18 @@ FROM | svc1 | 200 | 2024-10-18T19:01:31 | +--------------+-----+---------------------+ +-- Test if FLOWS table works, but don't care about the result since it vary from runs +SELECT + count(CASE WHEN state_size > 0 THEN 1 ELSE 0 END) as active_flows, +FROM + INFORMATION_SCHEMA.FLOWS; + ++--------------+ +| active_flows | ++--------------+ +| 1 | ++--------------+ + DROP FLOW requests_long_term; Affected Rows: 0 diff --git a/tests/cases/standalone/common/flow/flow_basic.sql b/tests/cases/standalone/common/flow/flow_basic.sql index 43a42de4dd5f..8946c014be36 100644 --- a/tests/cases/standalone/common/flow/flow_basic.sql +++ b/tests/cases/standalone/common/flow/flow_basic.sql @@ -569,6 +569,12 @@ SELECT FROM requests_without_ip; +-- Test if FLOWS table works, but don't care about the result since it vary from runs +SELECT + count(CASE WHEN state_size > 0 THEN 1 ELSE 0 END) as active_flows, +FROM + INFORMATION_SCHEMA.FLOWS; + DROP FLOW requests_long_term; DROP TABLE requests_without_ip; diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index b1c8c9329514..a0dce0152296 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -187,16 +187,17 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | information_schema | files | update_count | 13 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | | | greptime | information_schema | files | update_time | 34 | | | | | 3 | | | | | select,insert | | DateTime | datetime | FIELD | | No | datetime | | | | greptime | information_schema | files | version | 25 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | -| greptime | information_schema | flows | comment | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | -| greptime | information_schema | flows | expire_after | 6 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | Yes | bigint | | | -| greptime | information_schema | flows | flow_definition | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | flows | comment | 6 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | flows | expire_after | 7 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | Yes | bigint | | | +| greptime | information_schema | flows | flow_definition | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | flows | flow_id | 2 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | | greptime | information_schema | flows | flow_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | -| greptime | information_schema | flows | flownode_ids | 9 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | -| greptime | information_schema | flows | options | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | -| greptime | information_schema | flows | sink_table_name | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | -| greptime | information_schema | flows | source_table_ids | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | -| greptime | information_schema | flows | table_catalog | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | flows | flownode_ids | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | flows | options | 11 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | flows | sink_table_name | 9 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | flows | source_table_ids | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | flows | state_size | 3 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | +| greptime | information_schema | flows | table_catalog | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | global_status | variable_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | global_status | variable_value | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | key_column_usage | column_name | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |