diff --git a/src/catalog/src/information_extension.rs b/src/catalog/src/information_extension.rs index 55764557a326..4d829ae01ae3 100644 --- a/src/catalog/src/information_extension.rs +++ b/src/catalog/src/information_extension.rs @@ -17,6 +17,7 @@ use common_error::ext::BoxedError; use common_meta::cluster::{ClusterInfo, NodeInfo}; use common_meta::datanode::RegionStat; use common_meta::ddl::{ExecutorContext, ProcedureExecutor}; +use common_meta::key::flow::flow_state::FlowStat; use common_meta::rpc::procedure; use common_procedure::{ProcedureInfo, ProcedureState}; use meta_client::MetaClientRef; @@ -89,4 +90,12 @@ impl InformationExtension for DistributedInformationExtension { .map_err(BoxedError::new) .context(error::ListRegionStatsSnafu) } + + async fn flow_stats(&self) -> std::result::Result, Self::Error> { + self.meta_client + .list_flow_stats() + .await + .map_err(BoxedError::new) + .context(crate::error::ListFlowStatsSnafu) + } } diff --git a/src/cli/src/repl.rs b/src/cli/src/repl.rs index 6b394e30c4d5..8b5e3aa389a2 100644 --- a/src/cli/src/repl.rs +++ b/src/cli/src/repl.rs @@ -47,9 +47,9 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use crate::cmd::ReplCommand; use crate::error::{ - CollectRecordBatchesSnafu, MetaClusterClientSnafu, ParseSqlSnafu, PlanStatementSnafu, - PrettyPrintRecordBatchesSnafu, ReadlineSnafu, ReplCreationSnafu, RequestDatabaseSnafu, Result, - StartMetaClientSnafu, SubstraitEncodeLogicalPlanSnafu, + CollectRecordBatchesSnafu, ParseSqlSnafu, PlanStatementSnafu, PrettyPrintRecordBatchesSnafu, + ReadlineSnafu, ReplCreationSnafu, RequestDatabaseSnafu, Result, StartMetaClientSnafu, + SubstraitEncodeLogicalPlanSnafu, }; use crate::helper::RustylineHelper; use crate::{error, AttachCommand}; diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index 7f23c4d3fc84..acd27f46d731 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -15,15 +15,6 @@ #![feature(assert_matches, let_chains)] use async_trait::async_trait; -use catalog::information_schema::InformationExtension; -use client::api::v1::meta::ProcedureStatus; -use common_error::ext::BoxedError; -use common_meta::cluster::{ClusterInfo, NodeInfo}; -use common_meta::datanode::RegionStat; -use common_meta::ddl::{ExecutorContext, ProcedureExecutor}; -use common_meta::key::flow::flow_state::FlowStat; -use common_meta::rpc::procedure; -use common_procedure::{ProcedureInfo, ProcedureState}; use common_telemetry::{error, info}; use crate::error::Result; @@ -129,77 +120,3 @@ fn log_env_flags() { info!("argument: {}", argument); } } - -pub struct DistributedInformationExtension { - meta_client: MetaClientRef, -} - -impl DistributedInformationExtension { - pub fn new(meta_client: MetaClientRef) -> Self { - Self { meta_client } - } -} - -#[async_trait::async_trait] -impl InformationExtension for DistributedInformationExtension { - type Error = catalog::error::Error; - - async fn nodes(&self) -> std::result::Result, Self::Error> { - self.meta_client - .list_nodes(None) - .await - .map_err(BoxedError::new) - .context(catalog::error::ListNodesSnafu) - } - - async fn procedures(&self) -> std::result::Result, Self::Error> { - let procedures = self - .meta_client - .list_procedures(&ExecutorContext::default()) - .await - .map_err(BoxedError::new) - .context(catalog::error::ListProceduresSnafu)? - .procedures; - let mut result = Vec::with_capacity(procedures.len()); - for procedure in procedures { - let pid = match procedure.id { - Some(pid) => pid, - None => return catalog::error::ProcedureIdNotFoundSnafu {}.fail(), - }; - let pid = procedure::pb_pid_to_pid(&pid) - .map_err(BoxedError::new) - .context(catalog::error::ConvertProtoDataSnafu)?; - let status = ProcedureStatus::try_from(procedure.status) - .map(|v| v.as_str_name()) - .unwrap_or("Unknown") - .to_string(); - let procedure_info = ProcedureInfo { - id: pid, - type_name: procedure.type_name, - start_time_ms: procedure.start_time_ms, - end_time_ms: procedure.end_time_ms, - state: ProcedureState::Running, - lock_keys: procedure.lock_keys, - }; - result.push((status, procedure_info)); - } - - Ok(result) - } - - async fn region_stats(&self) -> std::result::Result, Self::Error> { - self.meta_client - .list_region_stats() - .await - .map_err(BoxedError::new) - .context(catalog::error::ListRegionStatsSnafu) - } - - async fn flow_stats(&self) -> std::result::Result, Self::Error> { - self.meta_client - .list_flow_stats() - .await - .map_err(BoxedError::new) - .context(catalog::error::ListFlowStatsSnafu) - } -} diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 2240a1a3471f..b6aa57d497cc 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -1263,8 +1263,8 @@ impl_metadata_value! { FlowRouteValue, TableFlowValue, NodeAddressValue, - SchemaNameValue - FlowStateValue, + SchemaNameValue, + FlowStateValue } impl_optional_metadata_value! {