diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index dd7071b09513..0d9e96ab6a44 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -89,9 +89,8 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to get procedure client in {mode} mode"))] - GetProcedureClient { - mode: String, + #[snafu(display("Failed to get information extension client"))] + GetInformationExtension { #[snafu(implicit)] location: Location, }, @@ -301,7 +300,7 @@ impl ErrorExt for Error { | Error::CacheNotFound { .. } | Error::CastManager { .. } | Error::Json { .. } - | Error::GetProcedureClient { .. } + | Error::GetInformationExtension { .. } | Error::ProcedureIdNotFound { .. } => StatusCode::Unexpected, Error::ViewPlanColumnsChanged { .. } => StatusCode::InvalidArguments, diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index e3377afbe10a..ca20805d3706 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -21,7 +21,6 @@ use common_catalog::consts::{ DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID, PG_CATALOG_NAME, }; -use common_config::Mode; use common_error::ext::BoxedError; use common_meta::cache::{LayeredCacheRegistryRef, ViewInfoCacheRef}; use common_meta::key::catalog_name::CatalogNameKey; @@ -34,7 +33,6 @@ use common_meta::kv_backend::KvBackendRef; use common_procedure::ProcedureManagerRef; use futures_util::stream::BoxStream; use futures_util::{StreamExt, TryStreamExt}; -use meta_client::client::MetaClient; use moka::sync::Cache; use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use session::context::{Channel, QueryContext}; @@ -50,7 +48,7 @@ use crate::error::{ CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu, }; -use crate::information_schema::InformationSchemaProvider; +use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider}; use crate::kvbackend::TableCacheRef; use crate::system_schema::pg_catalog::PGCatalogProvider; use crate::system_schema::SystemSchemaProvider; @@ -63,9 +61,8 @@ use crate::CatalogManager; /// comes from `SystemCatalog`, which is static and read-only. #[derive(Clone)] pub struct KvBackendCatalogManager { - mode: Mode, - /// Only available in `Distributed` mode. - meta_client: Option>, + /// Provides the extension methods for the `information_schema` tables + information_extension: InformationExtensionRef, /// Manages partition rules. partition_manager: PartitionRuleManagerRef, /// Manages table metadata. @@ -82,15 +79,13 @@ const CATALOG_CACHE_MAX_CAPACITY: u64 = 128; impl KvBackendCatalogManager { pub fn new( - mode: Mode, - meta_client: Option>, + information_extension: InformationExtensionRef, backend: KvBackendRef, cache_registry: LayeredCacheRegistryRef, procedure_manager: Option, ) -> Arc { Arc::new_cyclic(|me| Self { - mode, - meta_client, + information_extension, partition_manager: Arc::new(PartitionRuleManager::new( backend.clone(), cache_registry @@ -118,20 +113,15 @@ impl KvBackendCatalogManager { }) } - /// Returns the server running mode. - pub fn running_mode(&self) -> &Mode { - &self.mode - } - pub fn view_info_cache(&self) -> Result { self.cache_registry.get().context(CacheNotFoundSnafu { name: "view_info_cache", }) } - /// Returns the `[MetaClient]`. - pub fn meta_client(&self) -> Option> { - self.meta_client.clone() + /// Returns the [`InformationExtension`]. + pub fn information_extension(&self) -> InformationExtensionRef { + self.information_extension.clone() } pub fn partition_manager(&self) -> PartitionRuleManagerRef { diff --git a/src/catalog/src/system_schema/information_schema.rs b/src/catalog/src/system_schema/information_schema.rs index 9fa31b85fdb5..4101887cb443 100644 --- a/src/catalog/src/system_schema/information_schema.rs +++ b/src/catalog/src/system_schema/information_schema.rs @@ -32,7 +32,11 @@ use std::collections::HashMap; use std::sync::{Arc, Weak}; use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME}; +use common_error::ext::ErrorExt; +use common_meta::cluster::NodeInfo; +use common_meta::datanode::RegionStat; use common_meta::key::flow::FlowMetadataManager; +use common_procedure::ProcedureInfo; use common_recordbatch::SendableRecordBatchStream; use datatypes::schema::SchemaRef; use lazy_static::lazy_static; @@ -45,7 +49,7 @@ use views::InformationSchemaViews; use self::columns::InformationSchemaColumns; use super::{SystemSchemaProviderInner, SystemTable, SystemTableRef}; -use crate::error::Result; +use crate::error::{Error, Result}; use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo; use crate::system_schema::information_schema::flows::InformationSchemaFlows; use crate::system_schema::information_schema::information_memory_table::get_schema_columns; @@ -318,3 +322,39 @@ where InformationTable::to_stream(self, request) } } + +pub type InformationExtensionRef = Arc + Send + Sync>; + +/// The `InformationExtension` trait provides the extension methods for the `information_schema` tables. +#[async_trait::async_trait] +pub trait InformationExtension { + type Error: ErrorExt; + + /// Gets the nodes information. + async fn nodes(&self) -> std::result::Result, Self::Error>; + + /// Gets the procedures information. + async fn procedures(&self) -> std::result::Result, Self::Error>; + + /// Gets the region statistics. + async fn region_stats(&self) -> std::result::Result, Self::Error>; +} + +pub struct NoopInformationExtension; + +#[async_trait::async_trait] +impl InformationExtension for NoopInformationExtension { + type Error = Error; + + async fn nodes(&self) -> std::result::Result, Self::Error> { + Ok(vec![]) + } + + async fn procedures(&self) -> std::result::Result, Self::Error> { + Ok(vec![]) + } + + async fn region_stats(&self) -> std::result::Result, Self::Error> { + Ok(vec![]) + } +} diff --git a/src/catalog/src/system_schema/information_schema/cluster_info.rs b/src/catalog/src/system_schema/information_schema/cluster_info.rs index e85e2103043b..1ab700497cc1 100644 --- a/src/catalog/src/system_schema/information_schema/cluster_info.rs +++ b/src/catalog/src/system_schema/information_schema/cluster_info.rs @@ -17,13 +17,10 @@ use std::time::Duration; use arrow_schema::SchemaRef as ArrowSchemaRef; use common_catalog::consts::INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID; -use common_config::Mode; use common_error::ext::BoxedError; -use common_meta::cluster::{ClusterInfo, NodeInfo, NodeStatus}; -use common_meta::peer::Peer; +use common_meta::cluster::NodeInfo; use common_recordbatch::adapter::RecordBatchStreamAdapter; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; -use common_telemetry::warn; use common_time::timestamp::Timestamp; use datafusion::execution::TaskContext; use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; @@ -40,7 +37,7 @@ use snafu::ResultExt; use store_api::storage::{ScanRequest, TableId}; use super::CLUSTER_INFO; -use crate::error::{CreateRecordBatchSnafu, InternalSnafu, ListNodesSnafu, Result}; +use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result}; use crate::system_schema::information_schema::{InformationTable, Predicates}; use crate::system_schema::utils; use crate::CatalogManager; @@ -70,7 +67,6 @@ const INIT_CAPACITY: usize = 42; pub(super) struct InformationSchemaClusterInfo { schema: SchemaRef, catalog_manager: Weak, - start_time_ms: u64, } impl InformationSchemaClusterInfo { @@ -78,7 +74,6 @@ impl InformationSchemaClusterInfo { Self { schema: Self::schema(), catalog_manager, - start_time_ms: common_time::util::current_time_millis() as u64, } } @@ -100,11 +95,7 @@ impl InformationSchemaClusterInfo { } fn builder(&self) -> InformationSchemaClusterInfoBuilder { - InformationSchemaClusterInfoBuilder::new( - self.schema.clone(), - self.catalog_manager.clone(), - self.start_time_ms, - ) + InformationSchemaClusterInfoBuilder::new(self.schema.clone(), self.catalog_manager.clone()) } } @@ -144,7 +135,6 @@ impl InformationTable for InformationSchemaClusterInfo { struct InformationSchemaClusterInfoBuilder { schema: SchemaRef, - start_time_ms: u64, catalog_manager: Weak, peer_ids: Int64VectorBuilder, @@ -158,11 +148,7 @@ struct InformationSchemaClusterInfoBuilder { } impl InformationSchemaClusterInfoBuilder { - fn new( - schema: SchemaRef, - catalog_manager: Weak, - start_time_ms: u64, - ) -> Self { + fn new(schema: SchemaRef, catalog_manager: Weak) -> Self { Self { schema, catalog_manager, @@ -174,56 +160,17 @@ impl InformationSchemaClusterInfoBuilder { start_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY), uptimes: StringVectorBuilder::with_capacity(INIT_CAPACITY), active_times: StringVectorBuilder::with_capacity(INIT_CAPACITY), - start_time_ms, } } /// Construct the `information_schema.cluster_info` virtual table async fn make_cluster_info(&mut self, request: Option) -> Result { let predicates = Predicates::from_scan_request(&request); - let mode = utils::running_mode(&self.catalog_manager)?.unwrap_or(Mode::Standalone); - - match mode { - Mode::Standalone => { - let build_info = common_version::build_info(); - - self.add_node_info( - &predicates, - NodeInfo { - // For the standalone: - // - id always 0 - // - empty string for peer_addr - peer: Peer { - id: 0, - addr: "".to_string(), - }, - last_activity_ts: -1, - status: NodeStatus::Standalone, - version: build_info.version.to_string(), - git_commit: build_info.commit_short.to_string(), - // Use `self.start_time_ms` instead. - // It's not precise but enough. - start_time_ms: self.start_time_ms, - }, - ); - } - Mode::Distributed => { - if let Some(meta_client) = utils::meta_client(&self.catalog_manager)? { - let node_infos = meta_client - .list_nodes(None) - .await - .map_err(BoxedError::new) - .context(ListNodesSnafu)?; - - for node_info in node_infos { - self.add_node_info(&predicates, node_info); - } - } else { - warn!("Could not find meta client in distributed mode."); - } - } + let information_extension = utils::information_extension(&self.catalog_manager)?; + let node_infos = information_extension.nodes().await?; + for node_info in node_infos { + self.add_node_info(&predicates, node_info); } - self.finish() } diff --git a/src/catalog/src/system_schema/information_schema/procedure_info.rs b/src/catalog/src/system_schema/information_schema/procedure_info.rs index 56c36c22100a..6e3c0b1f464d 100644 --- a/src/catalog/src/system_schema/information_schema/procedure_info.rs +++ b/src/catalog/src/system_schema/information_schema/procedure_info.rs @@ -14,14 +14,10 @@ use std::sync::{Arc, Weak}; -use api::v1::meta::{ProcedureMeta, ProcedureStatus}; use arrow_schema::SchemaRef as ArrowSchemaRef; use common_catalog::consts::INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID; -use common_config::Mode; use common_error::ext::BoxedError; -use common_meta::ddl::{ExecutorContext, ProcedureExecutor}; -use common_meta::rpc::procedure; -use common_procedure::{ProcedureInfo, ProcedureState}; +use common_procedure::ProcedureInfo; use common_recordbatch::adapter::RecordBatchStreamAdapter; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use common_time::timestamp::Timestamp; @@ -38,10 +34,7 @@ use snafu::ResultExt; use store_api::storage::{ScanRequest, TableId}; use super::PROCEDURE_INFO; -use crate::error::{ - ConvertProtoDataSnafu, CreateRecordBatchSnafu, GetProcedureClientSnafu, InternalSnafu, - ListProceduresSnafu, ProcedureIdNotFoundSnafu, Result, -}; +use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result}; use crate::system_schema::information_schema::{InformationTable, Predicates}; use crate::system_schema::utils; use crate::CatalogManager; @@ -167,45 +160,11 @@ impl InformationSchemaProcedureInfoBuilder { /// Construct the `information_schema.procedure_info` virtual table async fn make_procedure_info(&mut self, request: Option) -> Result { let predicates = Predicates::from_scan_request(&request); - let mode = utils::running_mode(&self.catalog_manager)?.unwrap_or(Mode::Standalone); - match mode { - Mode::Standalone => { - if let Some(procedure_manager) = utils::procedure_manager(&self.catalog_manager)? { - let procedures = procedure_manager - .list_procedures() - .await - .map_err(BoxedError::new) - .context(ListProceduresSnafu)?; - for procedure in procedures { - self.add_procedure( - &predicates, - procedure.state.as_str_name().to_string(), - procedure, - ); - } - } else { - return GetProcedureClientSnafu { mode: "standalone" }.fail(); - } - } - Mode::Distributed => { - if let Some(meta_client) = utils::meta_client(&self.catalog_manager)? { - let procedures = meta_client - .list_procedures(&ExecutorContext::default()) - .await - .map_err(BoxedError::new) - .context(ListProceduresSnafu)?; - for procedure in procedures.procedures { - self.add_procedure_info(&predicates, procedure)?; - } - } else { - return GetProcedureClientSnafu { - mode: "distributed", - } - .fail(); - } - } - }; - + let information_extension = utils::information_extension(&self.catalog_manager)?; + let procedures = information_extension.procedures().await?; + for (status, procedure_info) in procedures { + self.add_procedure(&predicates, status, procedure_info); + } self.finish() } @@ -247,34 +206,6 @@ impl InformationSchemaProcedureInfoBuilder { self.lock_keys.push(Some(&lock_keys)); } - fn add_procedure_info( - &mut self, - predicates: &Predicates, - procedure: ProcedureMeta, - ) -> Result<()> { - let pid = match procedure.id { - Some(pid) => pid, - None => return ProcedureIdNotFoundSnafu {}.fail(), - }; - let pid = procedure::pb_pid_to_pid(&pid) - .map_err(BoxedError::new) - .context(ConvertProtoDataSnafu)?; - let status = ProcedureStatus::try_from(procedure.status) - .map(|v| v.as_str_name()) - .unwrap_or("Unknown") - .to_string(); - let procedure_info = ProcedureInfo { - id: pid, - type_name: procedure.type_name, - start_time_ms: procedure.start_time_ms, - end_time_ms: procedure.end_time_ms, - state: ProcedureState::Running, - lock_keys: procedure.lock_keys, - }; - self.add_procedure(predicates, status, procedure_info); - Ok(()) - } - fn finish(&mut self) -> Result { let columns: Vec = vec![ Arc::new(self.procedure_ids.finish()), diff --git a/src/catalog/src/system_schema/information_schema/region_statistics.rs b/src/catalog/src/system_schema/information_schema/region_statistics.rs index 07b94ede542b..e92558acd0d6 100644 --- a/src/catalog/src/system_schema/information_schema/region_statistics.rs +++ b/src/catalog/src/system_schema/information_schema/region_statistics.rs @@ -16,13 +16,10 @@ use std::sync::{Arc, Weak}; use arrow_schema::SchemaRef as ArrowSchemaRef; use common_catalog::consts::INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID; -use common_config::Mode; use common_error::ext::BoxedError; -use common_meta::cluster::ClusterInfo; use common_meta::datanode::RegionStat; use common_recordbatch::adapter::RecordBatchStreamAdapter; use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream}; -use common_telemetry::tracing::warn; use datafusion::execution::TaskContext; use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; @@ -34,7 +31,7 @@ use snafu::ResultExt; use store_api::storage::{ScanRequest, TableId}; use super::{InformationTable, REGION_STATISTICS}; -use crate::error::{CreateRecordBatchSnafu, InternalSnafu, ListRegionStatsSnafu, Result}; +use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result}; use crate::information_schema::Predicates; use crate::system_schema::utils; use crate::CatalogManager; @@ -167,28 +164,11 @@ impl InformationSchemaRegionStatisticsBuilder { request: Option, ) -> Result { let predicates = Predicates::from_scan_request(&request); - let mode = utils::running_mode(&self.catalog_manager)?.unwrap_or(Mode::Standalone); - - match mode { - Mode::Standalone => { - // TODO(weny): implement it - } - Mode::Distributed => { - if let Some(meta_client) = utils::meta_client(&self.catalog_manager)? { - let region_stats = meta_client - .list_region_stats() - .await - .map_err(BoxedError::new) - .context(ListRegionStatsSnafu)?; - for region_stat in region_stats { - self.add_region_statistic(&predicates, region_stat); - } - } else { - warn!("Meta client is not available"); - } - } + let information_extension = utils::information_extension(&self.catalog_manager)?; + let region_stats = information_extension.region_stats().await?; + for region_stat in region_stats { + self.add_region_statistic(&predicates, region_stat); } - self.finish() } diff --git a/src/catalog/src/system_schema/utils.rs b/src/catalog/src/system_schema/utils.rs index b9786bc2600f..8d8af84bf00b 100644 --- a/src/catalog/src/system_schema/utils.rs +++ b/src/catalog/src/system_schema/utils.rs @@ -12,48 +12,33 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod tables; - -use std::sync::{Arc, Weak}; +use std::sync::Weak; -use common_config::Mode; use common_meta::key::TableMetadataManagerRef; -use common_procedure::ProcedureManagerRef; -use meta_client::client::MetaClient; use snafu::OptionExt; -use crate::error::{Result, UpgradeWeakCatalogManagerRefSnafu}; +use crate::error::{GetInformationExtensionSnafu, Result, UpgradeWeakCatalogManagerRefSnafu}; +use crate::information_schema::InformationExtensionRef; use crate::kvbackend::KvBackendCatalogManager; use crate::CatalogManager; -/// Try to get the server running mode from `[CatalogManager]` weak reference. -pub fn running_mode(catalog_manager: &Weak) -> Result> { - let catalog_manager = catalog_manager - .upgrade() - .context(UpgradeWeakCatalogManagerRefSnafu)?; - - Ok(catalog_manager - .as_any() - .downcast_ref::() - .map(|manager| manager.running_mode()) - .copied()) -} +pub mod tables; -/// Try to get the `[MetaClient]` from `[CatalogManager]` weak reference. -pub fn meta_client(catalog_manager: &Weak) -> Result>> { +/// Try to get the `[InformationExtension]` from `[CatalogManager]` weak reference. +pub fn information_extension( + catalog_manager: &Weak, +) -> Result { let catalog_manager = catalog_manager .upgrade() .context(UpgradeWeakCatalogManagerRefSnafu)?; - let meta_client = match catalog_manager + let information_extension = catalog_manager .as_any() .downcast_ref::() - { - None => None, - Some(manager) => manager.meta_client(), - }; + .map(|manager| manager.information_extension()) + .context(GetInformationExtensionSnafu)?; - Ok(meta_client) + Ok(information_extension) } /// Try to get the `[TableMetadataManagerRef]` from `[CatalogManager]` weak reference. @@ -69,17 +54,3 @@ pub fn table_meta_manager( .downcast_ref::() .map(|manager| manager.table_metadata_manager_ref().clone())) } - -/// Try to get the `[ProcedureManagerRef]` from `[CatalogManager]` weak reference. -pub fn procedure_manager( - catalog_manager: &Weak, -) -> Result> { - let catalog_manager = catalog_manager - .upgrade() - .context(UpgradeWeakCatalogManagerRefSnafu)?; - - Ok(catalog_manager - .as_any() - .downcast_ref::() - .and_then(|manager| manager.procedure_manager())) -} diff --git a/src/catalog/src/table_source.rs b/src/catalog/src/table_source.rs index 09d3d9d2fd2e..ca9c5b89d3da 100644 --- a/src/catalog/src/table_source.rs +++ b/src/catalog/src/table_source.rs @@ -259,7 +259,6 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; - use common_config::Mode; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; @@ -269,6 +268,8 @@ mod tests { use datafusion::logical_expr::builder::LogicalTableSource; use datafusion::logical_expr::{col, lit, LogicalPlan, LogicalPlanBuilder}; + use crate::information_schema::NoopInformationExtension; + struct MockDecoder; impl MockDecoder { pub fn arc() -> Arc { @@ -323,8 +324,7 @@ mod tests { ); let catalog_manager = KvBackendCatalogManager::new( - Mode::Standalone, - None, + Arc::new(NoopInformationExtension), backend.clone(), layered_cache_registry, None, diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index b55b1c44d652..01384712276b 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -46,12 +46,12 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use crate::cli::cmd::ReplCommand; use crate::cli::helper::RustylineHelper; use crate::cli::AttachCommand; -use crate::error; use crate::error::{ CollectRecordBatchesSnafu, ParseSqlSnafu, PlanStatementSnafu, PrettyPrintRecordBatchesSnafu, ReadlineSnafu, ReplCreationSnafu, RequestDatabaseSnafu, Result, StartMetaClientSnafu, SubstraitEncodeLogicalPlanSnafu, }; +use crate::{error, DistributedInformationExtension}; /// Captures the state of the repl, gathers commands and executes them one by one pub struct Repl { @@ -275,9 +275,9 @@ async fn create_query_engine(meta_addr: &str) -> Result { .build(), ); + let information_extension = Arc::new(DistributedInformationExtension::new(meta_client.clone())); let catalog_manager = KvBackendCatalogManager::new( - Mode::Distributed, - Some(meta_client.clone()), + information_extension, cached_meta_backend.clone(), layered_cache_registry, None, diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index 78dfc906077f..d2a84540853b 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -41,7 +41,7 @@ use crate::error::{ MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu, }; use crate::options::{GlobalOptions, GreptimeOptions}; -use crate::{log_versions, App}; +use crate::{log_versions, App, DistributedInformationExtension}; pub const APP_NAME: &str = "greptime-flownode"; @@ -269,9 +269,10 @@ impl StartCommand { .build(), ); + let information_extension = + Arc::new(DistributedInformationExtension::new(meta_client.clone())); let catalog_manager = KvBackendCatalogManager::new( - opts.mode, - Some(meta_client.clone()), + information_extension, cached_meta_backend.clone(), layered_cache_registry.clone(), None, diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 7678e90c884a..2f10b722c732 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -38,7 +38,6 @@ use frontend::server::Services; use meta_client::{MetaClientOptions, MetaClientType}; use query::stats::StatementStatistics; use servers::tls::{TlsMode, TlsOption}; -use servers::Mode; use snafu::{OptionExt, ResultExt}; use tracing_appender::non_blocking::WorkerGuard; @@ -47,7 +46,7 @@ use crate::error::{ Result, StartFrontendSnafu, }; use crate::options::{GlobalOptions, GreptimeOptions}; -use crate::{log_versions, App}; +use crate::{log_versions, App, DistributedInformationExtension}; type FrontendOptions = GreptimeOptions; @@ -316,9 +315,10 @@ impl StartCommand { .build(), ); + let information_extension = + Arc::new(DistributedInformationExtension::new(meta_client.clone())); let catalog_manager = KvBackendCatalogManager::new( - Mode::Distributed, - Some(meta_client.clone()), + information_extension, cached_meta_backend.clone(), layered_cache_registry.clone(), None, diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index 5797ef5a85a5..80e1628bc0d6 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -15,7 +15,17 @@ #![feature(assert_matches, let_chains)] use async_trait::async_trait; +use catalog::information_schema::InformationExtension; +use client::api::v1::meta::ProcedureStatus; +use common_error::ext::BoxedError; +use common_meta::cluster::{ClusterInfo, NodeInfo}; +use common_meta::datanode::RegionStat; +use common_meta::ddl::{ExecutorContext, ProcedureExecutor}; +use common_meta::rpc::procedure; +use common_procedure::{ProcedureInfo, ProcedureState}; use common_telemetry::{error, info}; +use meta_client::MetaClientRef; +use snafu::ResultExt; use crate::error::Result; @@ -94,3 +104,69 @@ fn log_env_flags() { info!("argument: {}", argument); } } + +pub struct DistributedInformationExtension { + meta_client: MetaClientRef, +} + +impl DistributedInformationExtension { + pub fn new(meta_client: MetaClientRef) -> Self { + Self { meta_client } + } +} + +#[async_trait::async_trait] +impl InformationExtension for DistributedInformationExtension { + type Error = catalog::error::Error; + + async fn nodes(&self) -> std::result::Result, Self::Error> { + self.meta_client + .list_nodes(None) + .await + .map_err(BoxedError::new) + .context(catalog::error::ListNodesSnafu) + } + + async fn procedures(&self) -> std::result::Result, Self::Error> { + let procedures = self + .meta_client + .list_procedures(&ExecutorContext::default()) + .await + .map_err(BoxedError::new) + .context(catalog::error::ListProceduresSnafu)? + .procedures; + let mut result = Vec::with_capacity(procedures.len()); + for procedure in procedures { + let pid = match procedure.id { + Some(pid) => pid, + None => return catalog::error::ProcedureIdNotFoundSnafu {}.fail(), + }; + let pid = procedure::pb_pid_to_pid(&pid) + .map_err(BoxedError::new) + .context(catalog::error::ConvertProtoDataSnafu)?; + let status = ProcedureStatus::try_from(procedure.status) + .map(|v| v.as_str_name()) + .unwrap_or("Unknown") + .to_string(); + let procedure_info = ProcedureInfo { + id: pid, + type_name: procedure.type_name, + start_time_ms: procedure.start_time_ms, + end_time_ms: procedure.end_time_ms, + state: ProcedureState::Running, + lock_keys: procedure.lock_keys, + }; + result.push((status, procedure_info)); + } + + Ok(result) + } + + async fn region_stats(&self) -> std::result::Result, Self::Error> { + self.meta_client + .list_region_stats() + .await + .map_err(BoxedError::new) + .context(catalog::error::ListRegionStatsSnafu) + } +} diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 4335bd5447e5..d4a0d823c90f 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -17,14 +17,18 @@ use std::{fs, path}; use async_trait::async_trait; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; +use catalog::information_schema::InformationExtension; use catalog::kvbackend::KvBackendCatalogManager; use clap::Parser; +use client::api::v1::meta::RegionRole; use common_base::Plugins; use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; use common_config::{metadata_store_dir, Configurable, KvBackendConfig}; use common_error::ext::BoxedError; use common_meta::cache::LayeredCacheRegistryBuilder; use common_meta::cache_invalidator::CacheInvalidatorRef; +use common_meta::cluster::{NodeInfo, NodeStatus}; +use common_meta::datanode::RegionStat; use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef}; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef}; @@ -33,10 +37,11 @@ use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::NodeManagerRef; +use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; use common_meta::wal_options_allocator::{WalOptionsAllocator, WalOptionsAllocatorRef}; -use common_procedure::ProcedureManagerRef; +use common_procedure::{ProcedureInfo, ProcedureManagerRef}; use common_telemetry::info; use common_telemetry::logging::{LoggingOptions, TracingOptions}; use common_time::timezone::set_default_timezone; @@ -44,6 +49,7 @@ use common_version::{short_version, version}; use common_wal::config::DatanodeWalConfig; use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig}; use datanode::datanode::{Datanode, DatanodeBuilder}; +use datanode::region_server::RegionServer; use file_engine::config::EngineConfig as FileEngineConfig; use flow::{FlowWorkerManager, FlownodeBuilder, FrontendInvoker}; use frontend::frontend::FrontendOptions; @@ -478,9 +484,18 @@ impl StartCommand { .build(), ); + let datanode = DatanodeBuilder::new(dn_opts, plugins.clone()) + .with_kv_backend(kv_backend.clone()) + .build() + .await + .context(StartDatanodeSnafu)?; + + let information_extension = Arc::new(StandaloneInformationExtension::new( + datanode.region_server(), + procedure_manager.clone(), + )); let catalog_manager = KvBackendCatalogManager::new( - dn_opts.mode, - None, + information_extension, kv_backend.clone(), layered_cache_registry.clone(), Some(procedure_manager.clone()), @@ -489,12 +504,6 @@ impl StartCommand { let table_metadata_manager = Self::create_table_metadata_manager(kv_backend.clone()).await?; - let datanode = DatanodeBuilder::new(dn_opts, plugins.clone()) - .with_kv_backend(kv_backend.clone()) - .build() - .await - .context(StartDatanodeSnafu)?; - let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); let flow_builder = FlownodeBuilder::new( Default::default(), @@ -644,6 +653,91 @@ impl StartCommand { } } +struct StandaloneInformationExtension { + region_server: RegionServer, + procedure_manager: ProcedureManagerRef, + start_time_ms: u64, +} + +impl StandaloneInformationExtension { + pub fn new(region_server: RegionServer, procedure_manager: ProcedureManagerRef) -> Self { + Self { + region_server, + procedure_manager, + start_time_ms: common_time::util::current_time_millis() as u64, + } + } +} + +#[async_trait::async_trait] +impl InformationExtension for StandaloneInformationExtension { + type Error = catalog::error::Error; + + async fn nodes(&self) -> std::result::Result, Self::Error> { + let build_info = common_version::build_info(); + let node_info = NodeInfo { + // For the standalone: + // - id always 0 + // - empty string for peer_addr + peer: Peer { + id: 0, + addr: "".to_string(), + }, + last_activity_ts: -1, + status: NodeStatus::Standalone, + version: build_info.version.to_string(), + git_commit: build_info.commit_short.to_string(), + // Use `self.start_time_ms` instead. + // It's not precise but enough. + start_time_ms: self.start_time_ms, + }; + Ok(vec![node_info]) + } + + async fn procedures(&self) -> std::result::Result, Self::Error> { + self.procedure_manager + .list_procedures() + .await + .map_err(BoxedError::new) + .map(|procedures| { + procedures + .into_iter() + .map(|procedure| { + let status = procedure.state.as_str_name().to_string(); + (status, procedure) + }) + .collect::>() + }) + .context(catalog::error::ListProceduresSnafu) + } + + async fn region_stats(&self) -> std::result::Result, Self::Error> { + let stats = self + .region_server + .reportable_regions() + .into_iter() + .map(|stat| { + let region_stat = self + .region_server + .region_statistic(stat.region_id) + .unwrap_or_default(); + RegionStat { + id: stat.region_id, + rcus: 0, + wcus: 0, + approximate_bytes: region_stat.estimated_disk_size() as i64, + engine: stat.engine, + role: RegionRole::from(stat.role).into(), + memtable_size: region_stat.memtable_size, + manifest_size: region_stat.manifest_size, + sst_size: region_stat.sst_size, + } + }) + .collect::>(); + Ok(stats) + } +} + #[cfg(test)] mod tests { use std::default::Default; diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index ad2c3e369fe2..44e594676923 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -23,6 +23,7 @@ use cache::{build_fundamental_cache_registry, with_default_composite_cache_regis use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; use client::client_manager::NodeClients; use client::Client; +use cmd::DistributedInformationExtension; use common_base::Plugins; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; @@ -366,9 +367,10 @@ impl GreptimeDbClusterBuilder { .build(), ); + let information_extension = + Arc::new(DistributedInformationExtension::new(meta_client.clone())); let catalog_manager = KvBackendCatalogManager::new( - Mode::Distributed, - Some(meta_client.clone()), + information_extension, cached_meta_backend.clone(), cache_registry.clone(), None, diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 123614e43693..47095ecc65b3 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; +use catalog::information_schema::NoopInformationExtension; use catalog::kvbackend::KvBackendCatalogManager; use cmd::error::StartFlownodeSnafu; use cmd::standalone::StandaloneOptions; @@ -146,8 +147,7 @@ impl GreptimeDbStandaloneBuilder { ); let catalog_manager = KvBackendCatalogManager::new( - Mode::Standalone, - None, + Arc::new(NoopInformationExtension), kv_backend.clone(), cache_registry.clone(), Some(procedure_manager.clone()),