Skip to content

Commit

Permalink
feat: information extension (#4811)
Browse files Browse the repository at this point in the history
* feat: information extension

* Update manager.rs

Co-authored-by: Weny Xu <[email protected]>

* chore: by comment

---------

Co-authored-by: Weny Xu <[email protected]>
  • Loading branch information
fengjiachun and WenyXu authored Oct 11, 2024
1 parent 695ff1e commit 4b34f61
Show file tree
Hide file tree
Showing 15 changed files with 283 additions and 252 deletions.
7 changes: 3 additions & 4 deletions src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,8 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to get procedure client in {mode} mode"))]
GetProcedureClient {
mode: String,
#[snafu(display("Failed to get information extension client"))]
GetInformationExtension {
#[snafu(implicit)]
location: Location,
},
Expand Down Expand Up @@ -301,7 +300,7 @@ impl ErrorExt for Error {
| Error::CacheNotFound { .. }
| Error::CastManager { .. }
| Error::Json { .. }
| Error::GetProcedureClient { .. }
| Error::GetInformationExtension { .. }
| Error::ProcedureIdNotFound { .. } => StatusCode::Unexpected,

Error::ViewPlanColumnsChanged { .. } => StatusCode::InvalidArguments,
Expand Down
26 changes: 8 additions & 18 deletions src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
PG_CATALOG_NAME,
};
use common_config::Mode;
use common_error::ext::BoxedError;
use common_meta::cache::{LayeredCacheRegistryRef, ViewInfoCacheRef};
use common_meta::key::catalog_name::CatalogNameKey;
Expand All @@ -34,7 +33,6 @@ use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureManagerRef;
use futures_util::stream::BoxStream;
use futures_util::{StreamExt, TryStreamExt};
use meta_client::client::MetaClient;
use moka::sync::Cache;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use session::context::{Channel, QueryContext};
Expand All @@ -50,7 +48,7 @@ use crate::error::{
CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu,
ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu,
};
use crate::information_schema::InformationSchemaProvider;
use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
use crate::kvbackend::TableCacheRef;
use crate::system_schema::pg_catalog::PGCatalogProvider;
use crate::system_schema::SystemSchemaProvider;
Expand All @@ -63,9 +61,8 @@ use crate::CatalogManager;
/// comes from `SystemCatalog`, which is static and read-only.
#[derive(Clone)]
pub struct KvBackendCatalogManager {
mode: Mode,
/// Only available in `Distributed` mode.
meta_client: Option<Arc<MetaClient>>,
/// Provides the extension methods for the `information_schema` tables
information_extension: InformationExtensionRef,
/// Manages partition rules.
partition_manager: PartitionRuleManagerRef,
/// Manages table metadata.
Expand All @@ -82,15 +79,13 @@ const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;

impl KvBackendCatalogManager {
pub fn new(
mode: Mode,
meta_client: Option<Arc<MetaClient>>,
information_extension: InformationExtensionRef,
backend: KvBackendRef,
cache_registry: LayeredCacheRegistryRef,
procedure_manager: Option<ProcedureManagerRef>,
) -> Arc<Self> {
Arc::new_cyclic(|me| Self {
mode,
meta_client,
information_extension,
partition_manager: Arc::new(PartitionRuleManager::new(
backend.clone(),
cache_registry
Expand Down Expand Up @@ -118,20 +113,15 @@ impl KvBackendCatalogManager {
})
}

/// Returns the server running mode.
pub fn running_mode(&self) -> &Mode {
&self.mode
}

pub fn view_info_cache(&self) -> Result<ViewInfoCacheRef> {
self.cache_registry.get().context(CacheNotFoundSnafu {
name: "view_info_cache",
})
}

/// Returns the `[MetaClient]`.
pub fn meta_client(&self) -> Option<Arc<MetaClient>> {
self.meta_client.clone()
/// Returns the [`InformationExtension`].
pub fn information_extension(&self) -> InformationExtensionRef {
self.information_extension.clone()
}

pub fn partition_manager(&self) -> PartitionRuleManagerRef {
Expand Down
42 changes: 41 additions & 1 deletion src/catalog/src/system_schema/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ use std::collections::HashMap;
use std::sync::{Arc, Weak};

use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
use common_error::ext::ErrorExt;
use common_meta::cluster::NodeInfo;
use common_meta::datanode::RegionStat;
use common_meta::key::flow::FlowMetadataManager;
use common_procedure::ProcedureInfo;
use common_recordbatch::SendableRecordBatchStream;
use datatypes::schema::SchemaRef;
use lazy_static::lazy_static;
Expand All @@ -45,7 +49,7 @@ use views::InformationSchemaViews;

use self::columns::InformationSchemaColumns;
use super::{SystemSchemaProviderInner, SystemTable, SystemTableRef};
use crate::error::Result;
use crate::error::{Error, Result};
use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
use crate::system_schema::information_schema::flows::InformationSchemaFlows;
use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
Expand Down Expand Up @@ -318,3 +322,39 @@ where
InformationTable::to_stream(self, request)
}
}

pub type InformationExtensionRef = Arc<dyn InformationExtension<Error = Error> + Send + Sync>;

/// The `InformationExtension` trait provides the extension methods for the `information_schema` tables.
#[async_trait::async_trait]
pub trait InformationExtension {
type Error: ErrorExt;

/// Gets the nodes information.
async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error>;

/// Gets the procedures information.
async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error>;

/// Gets the region statistics.
async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
}

pub struct NoopInformationExtension;

#[async_trait::async_trait]
impl InformationExtension for NoopInformationExtension {
type Error = Error;

async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
Ok(vec![])
}

async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
Ok(vec![])
}

async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
Ok(vec![])
}
}
69 changes: 8 additions & 61 deletions src/catalog/src/system_schema/information_schema/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@ use std::time::Duration;

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID;
use common_config::Mode;
use common_error::ext::BoxedError;
use common_meta::cluster::{ClusterInfo, NodeInfo, NodeStatus};
use common_meta::peer::Peer;
use common_meta::cluster::NodeInfo;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::warn;
use common_time::timestamp::Timestamp;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
Expand All @@ -40,7 +37,7 @@ use snafu::ResultExt;
use store_api::storage::{ScanRequest, TableId};

use super::CLUSTER_INFO;
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, ListNodesSnafu, Result};
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
use crate::system_schema::information_schema::{InformationTable, Predicates};
use crate::system_schema::utils;
use crate::CatalogManager;
Expand Down Expand Up @@ -70,15 +67,13 @@ const INIT_CAPACITY: usize = 42;
pub(super) struct InformationSchemaClusterInfo {
schema: SchemaRef,
catalog_manager: Weak<dyn CatalogManager>,
start_time_ms: u64,
}

impl InformationSchemaClusterInfo {
pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
schema: Self::schema(),
catalog_manager,
start_time_ms: common_time::util::current_time_millis() as u64,
}
}

Expand All @@ -100,11 +95,7 @@ impl InformationSchemaClusterInfo {
}

fn builder(&self) -> InformationSchemaClusterInfoBuilder {
InformationSchemaClusterInfoBuilder::new(
self.schema.clone(),
self.catalog_manager.clone(),
self.start_time_ms,
)
InformationSchemaClusterInfoBuilder::new(self.schema.clone(), self.catalog_manager.clone())
}
}

Expand Down Expand Up @@ -144,7 +135,6 @@ impl InformationTable for InformationSchemaClusterInfo {

struct InformationSchemaClusterInfoBuilder {
schema: SchemaRef,
start_time_ms: u64,
catalog_manager: Weak<dyn CatalogManager>,

peer_ids: Int64VectorBuilder,
Expand All @@ -158,11 +148,7 @@ struct InformationSchemaClusterInfoBuilder {
}

impl InformationSchemaClusterInfoBuilder {
fn new(
schema: SchemaRef,
catalog_manager: Weak<dyn CatalogManager>,
start_time_ms: u64,
) -> Self {
fn new(schema: SchemaRef, catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
schema,
catalog_manager,
Expand All @@ -174,56 +160,17 @@ impl InformationSchemaClusterInfoBuilder {
start_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
uptimes: StringVectorBuilder::with_capacity(INIT_CAPACITY),
active_times: StringVectorBuilder::with_capacity(INIT_CAPACITY),
start_time_ms,
}
}

/// Construct the `information_schema.cluster_info` virtual table
async fn make_cluster_info(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
let predicates = Predicates::from_scan_request(&request);
let mode = utils::running_mode(&self.catalog_manager)?.unwrap_or(Mode::Standalone);

match mode {
Mode::Standalone => {
let build_info = common_version::build_info();

self.add_node_info(
&predicates,
NodeInfo {
// For the standalone:
// - id always 0
// - empty string for peer_addr
peer: Peer {
id: 0,
addr: "".to_string(),
},
last_activity_ts: -1,
status: NodeStatus::Standalone,
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
// Use `self.start_time_ms` instead.
// It's not precise but enough.
start_time_ms: self.start_time_ms,
},
);
}
Mode::Distributed => {
if let Some(meta_client) = utils::meta_client(&self.catalog_manager)? {
let node_infos = meta_client
.list_nodes(None)
.await
.map_err(BoxedError::new)
.context(ListNodesSnafu)?;

for node_info in node_infos {
self.add_node_info(&predicates, node_info);
}
} else {
warn!("Could not find meta client in distributed mode.");
}
}
let information_extension = utils::information_extension(&self.catalog_manager)?;
let node_infos = information_extension.nodes().await?;
for node_info in node_infos {
self.add_node_info(&predicates, node_info);
}

self.finish()
}

Expand Down
Loading

0 comments on commit 4b34f61

Please sign in to comment.