From 072d7c2022771082983e010e76fae8bb973d4057 Mon Sep 17 00:00:00 2001 From: JohnsonLee <53596783+J0HN50N133@users.noreply.github.com> Date: Tue, 16 Jul 2024 01:41:08 +0800 Subject: [PATCH] feat: introduce 'pg_catalog.pg_type' (#4332) * WIP: pg_catalog * refactor: move memory_table to crate public level to reuse it in pgcatalog * refactor: new system_schema mod to manage implementation of information_schema and pg_catalog * feat: pg_catalog.pg_type * fix: remove unused code to avoid warning * test: add pg_catalog sqlness test * feat: pg_catalog_cache in system_catalog * fix: integration test * test: rollback unit test * refactor: mix pg_catalog table_id with old ones * fix: add todo information * tests: rerun sqlness --------- Co-authored-by: johnsonlee --- src/catalog/src/kvbackend/manager.rs | 49 +++- src/catalog/src/lib.rs | 8 +- src/catalog/src/memory/manager.rs | 12 +- src/catalog/src/system_schema.rs | 164 +++++++++++ .../{ => system_schema}/information_schema.rs | 276 +++++++----------- .../information_schema/cluster_info.rs | 2 +- .../information_schema/columns.rs | 0 .../information_memory_table.rs} | 59 +--- .../information_schema/key_column_usage.rs | 2 +- .../information_schema/partitions.rs | 2 +- .../information_schema/predicate.rs | 0 .../information_schema/region_peers.rs | 2 +- .../information_schema/runtime_metrics.rs | 0 .../information_schema/schemata.rs | 2 +- .../information_schema/table_constraints.rs | 0 .../information_schema/table_names.rs | 0 .../information_schema/tables.rs | 2 +- .../information_schema/utils.rs | 0 .../information_schema/views.rs | 2 +- .../memory_table.rs | 100 +++---- .../memory_table/table_columns.rs | 50 ++++ .../src/system_schema/memory_table/tables.rs | 79 +++++ src/catalog/src/system_schema/pg_catalog.rs | 115 ++++++++ .../pg_catalog/pg_catalog_memory_table.rs | 69 +++++ .../system_schema/pg_catalog/table_names.rs | 18 ++ src/common/catalog/src/consts.rs | 6 + tests-integration/src/tests/instance_test.rs | 3 + .../common/create/create_database.result | 1 + .../common/create/create_database_opts.result | 3 + .../common/information_schema/tables.result | 1 + .../common/show/show_databases_tables.result | 2 + .../common/system/information_schema.result | 9 +- .../common/system/information_schema.sql | 4 +- .../common/system/pg_catalog.result | 32 ++ .../standalone/common/system/pg_catalog.sql | 4 + .../standalone/common/view/create.result | 1 + 36 files changed, 782 insertions(+), 297 deletions(-) create mode 100644 src/catalog/src/system_schema.rs rename src/catalog/src/{ => system_schema}/information_schema.rs (63%) rename src/catalog/src/{ => system_schema}/information_schema/cluster_info.rs (99%) rename src/catalog/src/{ => system_schema}/information_schema/columns.rs (100%) rename src/catalog/src/{information_schema/memory_table/tables.rs => system_schema/information_schema/information_memory_table.rs} (91%) rename src/catalog/src/{ => system_schema}/information_schema/key_column_usage.rs (99%) rename src/catalog/src/{ => system_schema}/information_schema/partitions.rs (99%) rename src/catalog/src/{ => system_schema}/information_schema/predicate.rs (100%) rename src/catalog/src/{ => system_schema}/information_schema/region_peers.rs (99%) rename src/catalog/src/{ => system_schema}/information_schema/runtime_metrics.rs (100%) rename src/catalog/src/{ => system_schema}/information_schema/schemata.rs (99%) rename src/catalog/src/{ => system_schema}/information_schema/table_constraints.rs (100%) rename src/catalog/src/{ => system_schema}/information_schema/table_names.rs (100%) rename src/catalog/src/{ => system_schema}/information_schema/tables.rs (99%) rename src/catalog/src/{ => system_schema}/information_schema/utils.rs (100%) rename src/catalog/src/{ => system_schema}/information_schema/views.rs (99%) rename src/catalog/src/{information_schema => system_schema}/memory_table.rs (89%) create mode 100644 src/catalog/src/system_schema/memory_table/table_columns.rs create mode 100644 src/catalog/src/system_schema/memory_table/tables.rs create mode 100644 src/catalog/src/system_schema/pg_catalog.rs create mode 100644 src/catalog/src/system_schema/pg_catalog/pg_catalog_memory_table.rs create mode 100644 src/catalog/src/system_schema/pg_catalog/table_names.rs create mode 100644 tests/cases/standalone/common/system/pg_catalog.result create mode 100644 tests/cases/standalone/common/system/pg_catalog.sql diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 0bf51643b1b1..c583786988b8 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -19,6 +19,7 @@ use std::sync::{Arc, Weak}; use async_stream::try_stream; use common_catalog::consts::{ DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID, + PG_CATALOG_NAME, }; use common_config::Mode; use common_error::ext::BoxedError; @@ -46,6 +47,8 @@ use crate::error::{ }; use crate::information_schema::InformationSchemaProvider; use crate::kvbackend::TableCacheRef; +use crate::system_schema::pg_catalog::PGCatalogProvider; +use crate::system_schema::SystemSchemaProvider; use crate::CatalogManager; /// Access all existing catalog, schema and tables. @@ -86,10 +89,15 @@ impl KvBackendCatalogManager { system_catalog: SystemCatalog { catalog_manager: me.clone(), catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY), + pg_catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY), information_schema_provider: Arc::new(InformationSchemaProvider::new( DEFAULT_CATALOG_NAME.to_string(), me.clone(), )), + pg_catalog_provider: Arc::new(PGCatalogProvider::new( + DEFAULT_CATALOG_NAME.to_string(), + me.clone(), + )), }, cache_registry, }) @@ -295,30 +303,40 @@ fn build_table(table_info_value: TableInfoValue) -> Result { /// Existing system tables: /// - public.numbers /// - information_schema.{tables} +/// - pg_catalog.{tables} #[derive(Clone)] struct SystemCatalog { catalog_manager: Weak, catalog_cache: Cache>, + pg_catalog_cache: Cache>, + + // system_schema_provier for default catalog information_schema_provider: Arc, + pg_catalog_provider: Arc, } impl SystemCatalog { + // TODO(j0hn50n133): remove the duplicated hard-coded table names logic fn schema_names(&self) -> Vec { - vec![INFORMATION_SCHEMA_NAME.to_string()] + vec![ + INFORMATION_SCHEMA_NAME.to_string(), + PG_CATALOG_NAME.to_string(), + ] } fn table_names(&self, schema: &str) -> Vec { - if schema == INFORMATION_SCHEMA_NAME { - self.information_schema_provider.table_names() - } else if schema == DEFAULT_SCHEMA_NAME { - vec![NUMBERS_TABLE_NAME.to_string()] - } else { - vec![] + match schema { + INFORMATION_SCHEMA_NAME => self.information_schema_provider.table_names(), + PG_CATALOG_NAME => self.pg_catalog_provider.table_names(), + DEFAULT_SCHEMA_NAME => { + vec![NUMBERS_TABLE_NAME.to_string()] + } + _ => vec![], } } fn schema_exists(&self, schema: &str) -> bool { - schema == INFORMATION_SCHEMA_NAME + schema == INFORMATION_SCHEMA_NAME || schema == PG_CATALOG_NAME } fn table_exists(&self, schema: &str, table: &str) -> bool { @@ -326,6 +344,8 @@ impl SystemCatalog { self.information_schema_provider.table(table).is_some() } else if schema == DEFAULT_SCHEMA_NAME { table == NUMBERS_TABLE_NAME + } else if schema == PG_CATALOG_NAME { + self.pg_catalog_provider.table(table).is_some() } else { false } @@ -341,6 +361,19 @@ impl SystemCatalog { )) }); information_schema_provider.table(table_name) + } else if schema == PG_CATALOG_NAME { + if catalog == DEFAULT_CATALOG_NAME { + self.pg_catalog_provider.table(table_name) + } else { + let pg_catalog_provider = + self.pg_catalog_cache.get_with_by_ref(catalog, move || { + Arc::new(PGCatalogProvider::new( + catalog.to_string(), + self.catalog_manager.clone(), + )) + }); + pg_catalog_provider.table(table_name) + } } else if schema == DEFAULT_SCHEMA_NAME && table_name == NUMBERS_TABLE_NAME { Some(NumbersTable::table(NUMBERS_TABLE_ID)) } else { diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 494a94df2699..394500bb757e 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -28,12 +28,16 @@ use table::TableRef; use crate::error::Result; pub mod error; -pub mod information_schema; pub mod kvbackend; pub mod memory; mod metrics; -pub mod table_source; +pub mod system_schema; +pub mod information_schema { + // TODO(j0hn50n133): re-export to make it compatible with the legacy code, migrate to the new path later + pub use crate::system_schema::information_schema::*; +} +pub mod table_source; #[async_trait::async_trait] pub trait CatalogManager: Send + Sync { fn as_any(&self) -> &dyn Any; diff --git a/src/catalog/src/memory/manager.rs b/src/catalog/src/memory/manager.rs index a5513b89048e..a236a6b4b79f 100644 --- a/src/catalog/src/memory/manager.rs +++ b/src/catalog/src/memory/manager.rs @@ -20,7 +20,8 @@ use std::sync::{Arc, RwLock, Weak}; use async_stream::{stream, try_stream}; use common_catalog::build_db_string; use common_catalog::consts::{ - DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, + DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, + INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME, }; use futures_util::stream::BoxStream; use snafu::OptionExt; @@ -28,6 +29,7 @@ use table::TableRef; use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu}; use crate::information_schema::InformationSchemaProvider; +use crate::system_schema::SystemSchemaProvider; use crate::{CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, RegisterTableRequest}; type SchemaEntries = HashMap>; @@ -173,6 +175,12 @@ impl MemoryCatalogManager { schema: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(), }) .unwrap(); + manager + .register_schema_sync(RegisterSchemaRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: PG_CATALOG_NAME.to_string(), + }) + .unwrap(); manager .register_schema_sync(RegisterSchemaRequest { catalog: DEFAULT_CATALOG_NAME.to_string(), @@ -196,7 +204,7 @@ impl MemoryCatalogManager { } fn catalog_exist_sync(&self, catalog: &str) -> Result { - Ok(self.catalogs.read().unwrap().get(catalog).is_some()) + Ok(self.catalogs.read().unwrap().contains_key(catalog)) } /// Registers a catalog if it does not exist and returns false if the schema exists. diff --git a/src/catalog/src/system_schema.rs b/src/catalog/src/system_schema.rs new file mode 100644 index 000000000000..d804d9c97ca4 --- /dev/null +++ b/src/catalog/src/system_schema.rs @@ -0,0 +1,164 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod information_schema; +mod memory_table; +pub mod pg_catalog; + +use std::collections::HashMap; +use std::sync::Arc; + +use common_error::ext::BoxedError; +use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; +use datatypes::schema::SchemaRef; +use futures_util::StreamExt; +use snafu::ResultExt; +use store_api::data_source::DataSource; +use store_api::storage::ScanRequest; +use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu}; +use table::metadata::{ + FilterPushDownType, TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType, +}; +use table::{Table, TableRef}; + +use crate::error::Result; + +pub trait SystemSchemaProvider { + /// Returns a map of [TableRef] in information schema. + fn tables(&self) -> &HashMap; + + /// Returns the [TableRef] by table name. + fn table(&self, name: &str) -> Option { + self.tables().get(name).cloned() + } + + /// Returns table names in the order of table id. + fn table_names(&self) -> Vec { + let mut tables = self.tables().values().clone().collect::>(); + + tables.sort_by(|t1, t2| { + t1.table_info() + .table_id() + .partial_cmp(&t2.table_info().table_id()) + .unwrap() + }); + tables + .into_iter() + .map(|t| t.table_info().name.clone()) + .collect() + } +} + +trait SystemSchemaProviderInner { + fn catalog_name(&self) -> &str; + fn schema_name() -> &'static str; + fn build_table(&self, name: &str) -> Option { + self.system_table(name).map(|table| { + let table_info = Self::table_info(self.catalog_name().to_string(), &table); + let filter_pushdown = FilterPushDownType::Inexact; + let data_source = Arc::new(SystemTableDataSource::new(table)); + let table = Table::new(table_info, filter_pushdown, data_source); + Arc::new(table) + }) + } + fn system_table(&self, name: &str) -> Option; + + fn table_info(catalog_name: String, table: &SystemTableRef) -> TableInfoRef { + let table_meta = TableMetaBuilder::default() + .schema(table.schema()) + .primary_key_indices(vec![]) + .next_column_id(0) + .build() + .unwrap(); + let table_info = TableInfoBuilder::default() + .table_id(table.table_id()) + .name(table.table_name().to_string()) + .catalog_name(catalog_name) + .schema_name(Self::schema_name().to_string()) + .meta(table_meta) + .table_type(table.table_type()) + .build() + .unwrap(); + Arc::new(table_info) + } +} + +pub(crate) trait SystemTable { + fn table_id(&self) -> TableId; + + fn table_name(&self) -> &'static str; + + fn schema(&self) -> SchemaRef; + + fn to_stream(&self, request: ScanRequest) -> Result; + + fn table_type(&self) -> TableType { + TableType::Temporary + } +} + +pub(crate) type SystemTableRef = Arc; + +struct SystemTableDataSource { + table: SystemTableRef, +} + +impl SystemTableDataSource { + fn new(table: SystemTableRef) -> Self { + Self { table } + } + + fn try_project(&self, projection: &[usize]) -> std::result::Result { + let schema = self + .table + .schema() + .try_project(projection) + .context(SchemaConversionSnafu) + .map_err(BoxedError::new)?; + Ok(Arc::new(schema)) + } +} + +impl DataSource for SystemTableDataSource { + fn get_stream( + &self, + request: ScanRequest, + ) -> std::result::Result { + let projection = request.projection.clone(); + let projected_schema = match &projection { + Some(projection) => self.try_project(projection)?, + None => self.table.schema(), + }; + + let stream = self + .table + .to_stream(request) + .map_err(BoxedError::new) + .context(TablesRecordBatchSnafu) + .map_err(BoxedError::new)? + .map(move |batch| match &projection { + Some(p) => batch.and_then(|b| b.try_project(p)), + None => batch, + }); + + let stream = RecordBatchStreamWrapper { + schema: projected_schema, + stream: Box::pin(stream), + output_ordering: None, + metrics: Default::default(), + }; + + Ok(Box::pin(stream)) + } +} diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/system_schema/information_schema.rs similarity index 63% rename from src/catalog/src/information_schema.rs rename to src/catalog/src/system_schema/information_schema.rs index 3521ccf1359b..cf2607271eb7 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/system_schema/information_schema.rs @@ -14,8 +14,8 @@ mod cluster_info; pub mod columns; +mod information_memory_table; pub mod key_column_usage; -mod memory_table; mod partitions; mod predicate; mod region_peers; @@ -31,35 +31,31 @@ use std::collections::HashMap; use std::sync::{Arc, Weak}; use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME}; -use common_error::ext::BoxedError; -use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; +use common_recordbatch::SendableRecordBatchStream; use datatypes::schema::SchemaRef; -use futures_util::StreamExt; use lazy_static::lazy_static; use paste::paste; pub(crate) use predicate::Predicates; -use snafu::ResultExt; -use store_api::data_source::DataSource; use store_api::storage::{ScanRequest, TableId}; -use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu}; -use table::metadata::{ - FilterPushDownType, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType, -}; -use table::{Table, TableRef}; +use table::metadata::TableType; +use table::TableRef; pub use table_names::*; use views::InformationSchemaViews; use self::columns::InformationSchemaColumns; +use super::{SystemSchemaProviderInner, SystemTable, SystemTableRef}; use crate::error::Result; -use crate::information_schema::cluster_info::InformationSchemaClusterInfo; -use crate::information_schema::key_column_usage::InformationSchemaKeyColumnUsage; -use crate::information_schema::memory_table::{get_schema_columns, MemoryTable}; -use crate::information_schema::partitions::InformationSchemaPartitions; -use crate::information_schema::region_peers::InformationSchemaRegionPeers; -use crate::information_schema::runtime_metrics::InformationSchemaMetrics; -use crate::information_schema::schemata::InformationSchemaSchemata; -use crate::information_schema::table_constraints::InformationSchemaTableConstraints; -use crate::information_schema::tables::InformationSchemaTables; +use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo; +use crate::system_schema::information_schema::information_memory_table::get_schema_columns; +use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage; +use crate::system_schema::information_schema::partitions::InformationSchemaPartitions; +use crate::system_schema::information_schema::region_peers::InformationSchemaRegionPeers; +use crate::system_schema::information_schema::runtime_metrics::InformationSchemaMetrics; +use crate::system_schema::information_schema::schemata::InformationSchemaSchemata; +use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints; +use crate::system_schema::information_schema::tables::InformationSchemaTables; +use crate::system_schema::memory_table::MemoryTable; +use crate::system_schema::SystemSchemaProvider; use crate::CatalogManager; lazy_static! { @@ -111,105 +107,22 @@ pub struct InformationSchemaProvider { tables: HashMap, } -impl InformationSchemaProvider { - pub fn new(catalog_name: String, catalog_manager: Weak) -> Self { - let mut provider = Self { - catalog_name, - catalog_manager, - tables: HashMap::new(), - }; - - provider.build_tables(); - - provider - } - - /// Returns table names in the order of table id. - pub fn table_names(&self) -> Vec { - let mut tables = self.tables.values().clone().collect::>(); - - tables.sort_by(|t1, t2| { - t1.table_info() - .table_id() - .partial_cmp(&t2.table_info().table_id()) - .unwrap() - }); - tables - .into_iter() - .map(|t| t.table_info().name.clone()) - .collect() - } - - /// Returns a map of [TableRef] in information schema. - pub fn tables(&self) -> &HashMap { +impl SystemSchemaProvider for InformationSchemaProvider { + fn tables(&self) -> &HashMap { assert!(!self.tables.is_empty()); &self.tables } - - /// Returns the [TableRef] by table name. - pub fn table(&self, name: &str) -> Option { - self.tables.get(name).cloned() - } - - fn build_tables(&mut self) { - let mut tables = HashMap::new(); - - // SECURITY NOTE: - // Carefully consider the tables that may expose sensitive cluster configurations, - // authentication details, and other critical information. - // Only put these tables under `greptime` catalog to prevent info leak. - if self.catalog_name == DEFAULT_CATALOG_NAME { - tables.insert( - RUNTIME_METRICS.to_string(), - self.build_table(RUNTIME_METRICS).unwrap(), - ); - tables.insert( - BUILD_INFO.to_string(), - self.build_table(BUILD_INFO).unwrap(), - ); - tables.insert( - REGION_PEERS.to_string(), - self.build_table(REGION_PEERS).unwrap(), - ); - tables.insert( - CLUSTER_INFO.to_string(), - self.build_table(CLUSTER_INFO).unwrap(), - ); - } - - tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap()); - tables.insert(VIEWS.to_string(), self.build_table(VIEWS).unwrap()); - tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap()); - tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap()); - tables.insert( - KEY_COLUMN_USAGE.to_string(), - self.build_table(KEY_COLUMN_USAGE).unwrap(), - ); - tables.insert( - TABLE_CONSTRAINTS.to_string(), - self.build_table(TABLE_CONSTRAINTS).unwrap(), - ); - - // Add memory tables - for name in MEMORY_TABLES.iter() { - tables.insert((*name).to_string(), self.build_table(name).expect(name)); - } - - self.tables = tables; +} +impl SystemSchemaProviderInner for InformationSchemaProvider { + fn catalog_name(&self) -> &str { + &self.catalog_name } - - fn build_table(&self, name: &str) -> Option { - self.information_table(name).map(|table| { - let table_info = Self::table_info(self.catalog_name.clone(), &table); - let filter_pushdown = FilterPushDownType::Inexact; - let data_source = Arc::new(InformationTableDataSource::new(table)); - let table = Table::new(table_info, filter_pushdown, data_source); - Arc::new(table) - }) + fn schema_name() -> &'static str { + INFORMATION_SCHEMA_NAME } - fn information_table(&self, name: &str) -> Option { + fn system_table(&self, name: &str) -> Option { match name.to_ascii_lowercase().as_str() { TABLES => Some(Arc::new(InformationSchemaTables::new( self.catalog_name.clone(), @@ -272,24 +185,66 @@ impl InformationSchemaProvider { _ => None, } } +} - fn table_info(catalog_name: String, table: &InformationTableRef) -> TableInfoRef { - let table_meta = TableMetaBuilder::default() - .schema(table.schema()) - .primary_key_indices(vec![]) - .next_column_id(0) - .build() - .unwrap(); - let table_info = TableInfoBuilder::default() - .table_id(table.table_id()) - .name(table.table_name().to_string()) - .catalog_name(catalog_name) - .schema_name(INFORMATION_SCHEMA_NAME.to_string()) - .meta(table_meta) - .table_type(table.table_type()) - .build() - .unwrap(); - Arc::new(table_info) +impl InformationSchemaProvider { + pub fn new(catalog_name: String, catalog_manager: Weak) -> Self { + let mut provider = Self { + catalog_name, + catalog_manager, + tables: HashMap::new(), + }; + + provider.build_tables(); + + provider + } + + fn build_tables(&mut self) { + let mut tables = HashMap::new(); + + // SECURITY NOTE: + // Carefully consider the tables that may expose sensitive cluster configurations, + // authentication details, and other critical information. + // Only put these tables under `greptime` catalog to prevent info leak. + if self.catalog_name == DEFAULT_CATALOG_NAME { + tables.insert( + RUNTIME_METRICS.to_string(), + self.build_table(RUNTIME_METRICS).unwrap(), + ); + tables.insert( + BUILD_INFO.to_string(), + self.build_table(BUILD_INFO).unwrap(), + ); + tables.insert( + REGION_PEERS.to_string(), + self.build_table(REGION_PEERS).unwrap(), + ); + tables.insert( + CLUSTER_INFO.to_string(), + self.build_table(CLUSTER_INFO).unwrap(), + ); + } + + tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap()); + tables.insert(VIEWS.to_string(), self.build_table(VIEWS).unwrap()); + tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap()); + tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap()); + tables.insert( + KEY_COLUMN_USAGE.to_string(), + self.build_table(KEY_COLUMN_USAGE).unwrap(), + ); + tables.insert( + TABLE_CONSTRAINTS.to_string(), + self.build_table(TABLE_CONSTRAINTS).unwrap(), + ); + + // Add memory tables + for name in MEMORY_TABLES.iter() { + tables.insert((*name).to_string(), self.build_table(name).expect(name)); + } + + self.tables = tables; } } @@ -307,57 +262,28 @@ trait InformationTable { } } -type InformationTableRef = Arc; - -struct InformationTableDataSource { - table: InformationTableRef, -} - -impl InformationTableDataSource { - fn new(table: InformationTableRef) -> Self { - Self { table } +// Provide compatibility for legacy `information_schema` code. +impl SystemTable for T +where + T: InformationTable, +{ + fn table_id(&self) -> TableId { + InformationTable::table_id(self) } - fn try_project(&self, projection: &[usize]) -> std::result::Result { - let schema = self - .table - .schema() - .try_project(projection) - .context(SchemaConversionSnafu) - .map_err(BoxedError::new)?; - Ok(Arc::new(schema)) + fn table_name(&self) -> &'static str { + InformationTable::table_name(self) } -} - -impl DataSource for InformationTableDataSource { - fn get_stream( - &self, - request: ScanRequest, - ) -> std::result::Result { - let projection = request.projection.clone(); - let projected_schema = match &projection { - Some(projection) => self.try_project(projection)?, - None => self.table.schema(), - }; - let stream = self - .table - .to_stream(request) - .map_err(BoxedError::new) - .context(TablesRecordBatchSnafu) - .map_err(BoxedError::new)? - .map(move |batch| match &projection { - Some(p) => batch.and_then(|b| b.try_project(p)), - None => batch, - }); + fn schema(&self) -> SchemaRef { + InformationTable::schema(self) + } - let stream = RecordBatchStreamWrapper { - schema: projected_schema, - stream: Box::pin(stream), - output_ordering: None, - metrics: Default::default(), - }; + fn table_type(&self) -> TableType { + InformationTable::table_type(self) + } - Ok(Box::pin(stream)) + fn to_stream(&self, request: ScanRequest) -> Result { + InformationTable::to_stream(self, request) } } diff --git a/src/catalog/src/information_schema/cluster_info.rs b/src/catalog/src/system_schema/information_schema/cluster_info.rs similarity index 99% rename from src/catalog/src/information_schema/cluster_info.rs rename to src/catalog/src/system_schema/information_schema/cluster_info.rs index 6959ad1bd595..e878b94baf4a 100644 --- a/src/catalog/src/information_schema/cluster_info.rs +++ b/src/catalog/src/system_schema/information_schema/cluster_info.rs @@ -41,7 +41,7 @@ use store_api::storage::{ScanRequest, TableId}; use super::CLUSTER_INFO; use crate::error::{CreateRecordBatchSnafu, InternalSnafu, ListNodesSnafu, Result}; -use crate::information_schema::{utils, InformationTable, Predicates}; +use crate::system_schema::information_schema::{utils, InformationTable, Predicates}; use crate::CatalogManager; const PEER_ID: &str = "peer_id"; diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/system_schema/information_schema/columns.rs similarity index 100% rename from src/catalog/src/information_schema/columns.rs rename to src/catalog/src/system_schema/information_schema/columns.rs diff --git a/src/catalog/src/information_schema/memory_table/tables.rs b/src/catalog/src/system_schema/information_schema/information_memory_table.rs similarity index 91% rename from src/catalog/src/information_schema/memory_table/tables.rs rename to src/catalog/src/system_schema/information_schema/information_memory_table.rs index ecbab36cf8e4..e6459fbe8ab5 100644 --- a/src/catalog/src/information_schema/memory_table/tables.rs +++ b/src/catalog/src/system_schema/information_schema/information_memory_table.rs @@ -15,17 +15,19 @@ use std::sync::Arc; use common_catalog::consts::{METRIC_ENGINE, MITO_ENGINE}; -use datatypes::prelude::{ConcreteDataType, VectorRef}; -use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; -use datatypes::vectors::{Int64Vector, StringVector}; +use datatypes::schema::{Schema, SchemaRef}; +use datatypes::vectors::{Int64Vector, StringVector, VectorRef}; -use crate::information_schema::table_names::*; +use super::table_names::*; +use crate::system_schema::memory_table::tables::{ + bigint_column, datetime_column, string_column, string_columns, +}; const NO_VALUE: &str = "NO"; /// Find the schema and columns by the table_name, only valid for memory tables. /// Safety: the user MUST ensure the table schema exists, panic otherwise. -pub fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec) { +pub(super) fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec) { let (column_schemas, columns): (_, Vec) = match table_name { COLUMN_PRIVILEGES => ( string_columns(&[ @@ -414,50 +416,3 @@ pub fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec) { (Arc::new(Schema::new(column_schemas)), columns) } - -fn string_columns(names: &[&'static str]) -> Vec { - names.iter().map(|name| string_column(name)).collect() -} - -fn string_column(name: &str) -> ColumnSchema { - ColumnSchema::new( - str::to_lowercase(name), - ConcreteDataType::string_datatype(), - false, - ) -} - -fn bigint_column(name: &str) -> ColumnSchema { - ColumnSchema::new( - str::to_lowercase(name), - ConcreteDataType::int64_datatype(), - false, - ) -} - -fn datetime_column(name: &str) -> ColumnSchema { - ColumnSchema::new( - str::to_lowercase(name), - ConcreteDataType::datetime_datatype(), - false, - ) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_string_columns() { - let columns = ["a", "b", "c"]; - let column_schemas = string_columns(&columns); - - assert_eq!(3, column_schemas.len()); - for (i, name) in columns.iter().enumerate() { - let cs = column_schemas.get(i).unwrap(); - - assert_eq!(*name, cs.name); - assert_eq!(ConcreteDataType::string_datatype(), cs.data_type); - } - } -} diff --git a/src/catalog/src/information_schema/key_column_usage.rs b/src/catalog/src/system_schema/information_schema/key_column_usage.rs similarity index 99% rename from src/catalog/src/information_schema/key_column_usage.rs rename to src/catalog/src/system_schema/information_schema/key_column_usage.rs index bc4c04ccb8da..f7cedfee2a2e 100644 --- a/src/catalog/src/information_schema/key_column_usage.rs +++ b/src/catalog/src/system_schema/information_schema/key_column_usage.rs @@ -35,7 +35,7 @@ use super::KEY_COLUMN_USAGE; use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; -use crate::information_schema::{InformationTable, Predicates}; +use crate::system_schema::information_schema::{InformationTable, Predicates}; use crate::CatalogManager; pub const CONSTRAINT_SCHEMA: &str = "constraint_schema"; diff --git a/src/catalog/src/information_schema/partitions.rs b/src/catalog/src/system_schema/information_schema/partitions.rs similarity index 99% rename from src/catalog/src/information_schema/partitions.rs rename to src/catalog/src/system_schema/information_schema/partitions.rs index ed918442d1b5..3e49a2ddbd3b 100644 --- a/src/catalog/src/information_schema/partitions.rs +++ b/src/catalog/src/system_schema/information_schema/partitions.rs @@ -44,8 +44,8 @@ use crate::error::{ CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; -use crate::information_schema::{InformationTable, Predicates}; use crate::kvbackend::KvBackendCatalogManager; +use crate::system_schema::information_schema::{InformationTable, Predicates}; use crate::CatalogManager; const TABLE_CATALOG: &str = "table_catalog"; diff --git a/src/catalog/src/information_schema/predicate.rs b/src/catalog/src/system_schema/information_schema/predicate.rs similarity index 100% rename from src/catalog/src/information_schema/predicate.rs rename to src/catalog/src/system_schema/information_schema/predicate.rs diff --git a/src/catalog/src/information_schema/region_peers.rs b/src/catalog/src/system_schema/information_schema/region_peers.rs similarity index 99% rename from src/catalog/src/information_schema/region_peers.rs rename to src/catalog/src/system_schema/information_schema/region_peers.rs index 06b55ebf438f..4bcc28144701 100644 --- a/src/catalog/src/information_schema/region_peers.rs +++ b/src/catalog/src/system_schema/information_schema/region_peers.rs @@ -39,8 +39,8 @@ use crate::error::{ CreateRecordBatchSnafu, FindRegionRoutesSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; -use crate::information_schema::{InformationTable, Predicates}; use crate::kvbackend::KvBackendCatalogManager; +use crate::system_schema::information_schema::{InformationTable, Predicates}; use crate::CatalogManager; const REGION_ID: &str = "region_id"; diff --git a/src/catalog/src/information_schema/runtime_metrics.rs b/src/catalog/src/system_schema/information_schema/runtime_metrics.rs similarity index 100% rename from src/catalog/src/information_schema/runtime_metrics.rs rename to src/catalog/src/system_schema/information_schema/runtime_metrics.rs diff --git a/src/catalog/src/information_schema/schemata.rs b/src/catalog/src/system_schema/information_schema/schemata.rs similarity index 99% rename from src/catalog/src/information_schema/schemata.rs rename to src/catalog/src/system_schema/information_schema/schemata.rs index e33efea89dc6..de94914fe603 100644 --- a/src/catalog/src/information_schema/schemata.rs +++ b/src/catalog/src/system_schema/information_schema/schemata.rs @@ -36,7 +36,7 @@ use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, TableMetadataManagerSnafu, UpgradeWeakCatalogManagerRefSnafu, }; -use crate::information_schema::{utils, InformationTable, Predicates}; +use crate::system_schema::information_schema::{utils, InformationTable, Predicates}; use crate::CatalogManager; pub const CATALOG_NAME: &str = "catalog_name"; diff --git a/src/catalog/src/information_schema/table_constraints.rs b/src/catalog/src/system_schema/information_schema/table_constraints.rs similarity index 100% rename from src/catalog/src/information_schema/table_constraints.rs rename to src/catalog/src/system_schema/information_schema/table_constraints.rs diff --git a/src/catalog/src/information_schema/table_names.rs b/src/catalog/src/system_schema/information_schema/table_names.rs similarity index 100% rename from src/catalog/src/information_schema/table_names.rs rename to src/catalog/src/system_schema/information_schema/table_names.rs diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/system_schema/information_schema/tables.rs similarity index 99% rename from src/catalog/src/information_schema/tables.rs rename to src/catalog/src/system_schema/information_schema/tables.rs index 949138ea6ab4..3a0d1d303b64 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/system_schema/information_schema/tables.rs @@ -38,7 +38,7 @@ use super::TABLES; use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; -use crate::information_schema::{InformationTable, Predicates}; +use crate::system_schema::information_schema::{InformationTable, Predicates}; use crate::CatalogManager; pub const TABLE_CATALOG: &str = "table_catalog"; diff --git a/src/catalog/src/information_schema/utils.rs b/src/catalog/src/system_schema/information_schema/utils.rs similarity index 100% rename from src/catalog/src/information_schema/utils.rs rename to src/catalog/src/system_schema/information_schema/utils.rs diff --git a/src/catalog/src/information_schema/views.rs b/src/catalog/src/system_schema/information_schema/views.rs similarity index 99% rename from src/catalog/src/information_schema/views.rs rename to src/catalog/src/system_schema/information_schema/views.rs index 985fd27850ab..74fcf03e3f28 100644 --- a/src/catalog/src/information_schema/views.rs +++ b/src/catalog/src/system_schema/information_schema/views.rs @@ -37,8 +37,8 @@ use crate::error::{ CastManagerSnafu, CreateRecordBatchSnafu, GetViewCacheSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, ViewInfoNotFoundSnafu, }; -use crate::information_schema::{InformationTable, Predicates}; use crate::kvbackend::KvBackendCatalogManager; +use crate::system_schema::information_schema::{InformationTable, Predicates}; use crate::CatalogManager; const INIT_CAPACITY: usize = 42; diff --git a/src/catalog/src/information_schema/memory_table.rs b/src/catalog/src/system_schema/memory_table.rs similarity index 89% rename from src/catalog/src/information_schema/memory_table.rs rename to src/catalog/src/system_schema/memory_table.rs index 7f016f665409..92caa641a9d0 100644 --- a/src/catalog/src/information_schema/memory_table.rs +++ b/src/catalog/src/system_schema/memory_table.rs @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod tables; +mod table_columns; +pub mod tables; + use std::sync::Arc; use arrow_schema::SchemaRef as ArrowSchemaRef; @@ -27,22 +29,21 @@ use datatypes::schema::SchemaRef; use datatypes::vectors::VectorRef; use snafu::ResultExt; use store_api::storage::{ScanRequest, TableId}; -pub use tables::get_schema_columns; +use super::SystemTable; use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result}; -use crate::information_schema::InformationTable; /// A memory table with specified schema and columns. -pub(super) struct MemoryTable { - table_id: TableId, - table_name: &'static str, - schema: SchemaRef, - columns: Vec, +pub(crate) struct MemoryTable { + pub(crate) table_id: TableId, + pub(crate) table_name: &'static str, + pub(crate) schema: SchemaRef, + pub(crate) columns: Vec, } impl MemoryTable { /// Creates a memory table with table id, name, schema and columns. - pub(super) fn new( + pub fn new( table_id: TableId, table_name: &'static str, schema: SchemaRef, @@ -56,46 +57,12 @@ impl MemoryTable { } } - fn builder(&self) -> MemoryTableBuilder { + pub fn builder(&self) -> MemoryTableBuilder { MemoryTableBuilder::new(self.schema.clone(), self.columns.clone()) } } -impl InformationTable for MemoryTable { - fn table_id(&self) -> TableId { - self.table_id - } - - fn table_name(&self) -> &'static str { - self.table_name - } - - fn schema(&self) -> SchemaRef { - self.schema.clone() - } - - fn to_stream(&self, _request: ScanRequest) -> Result { - let schema = self.schema.arrow_schema().clone(); - let mut builder = self.builder(); - let stream = Box::pin(DfRecordBatchStreamAdapter::new( - schema, - futures::stream::once(async move { - builder - .memory_records() - .await - .map(|x| x.into_df_record_batch()) - .map_err(Into::into) - }), - )); - Ok(Box::pin( - RecordBatchStreamAdapter::try_new(stream) - .map_err(BoxedError::new) - .context(InternalSnafu)?, - )) - } -} - -struct MemoryTableBuilder { +pub(crate) struct MemoryTableBuilder { schema: SchemaRef, columns: Vec, } @@ -106,7 +73,7 @@ impl MemoryTableBuilder { } /// Construct the `information_schema.{table_name}` virtual table - async fn memory_records(&mut self) -> Result { + pub async fn memory_records(&mut self) -> Result { if self.columns.is_empty() { RecordBatch::new_empty(self.schema.clone()).context(CreateRecordBatchSnafu) } else { @@ -137,6 +104,40 @@ impl DfPartitionStream for MemoryTable { } } +impl SystemTable for MemoryTable { + fn table_id(&self) -> TableId { + self.table_id + } + + fn table_name(&self) -> &'static str { + self.table_name + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn to_stream(&self, _request: ScanRequest) -> Result { + let schema = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + let stream = Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .memory_records() + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )); + Ok(Box::pin( + RecordBatchStreamAdapter::try_new(stream) + .map_err(BoxedError::new) + .context(InternalSnafu)?, + )) + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -147,6 +148,7 @@ mod tests { use datatypes::vectors::StringVector; use super::*; + use crate::system_schema::SystemTable; #[tokio::test] async fn test_memory_table() { @@ -166,8 +168,8 @@ mod tests { ); assert_eq!(42, table.table_id()); - assert_eq!("test", table.table_name()); - assert_eq!(schema, InformationTable::schema(&table)); + assert_eq!("test", table.table_name); + assert_eq!(schema, SystemTable::schema(&table)); let stream = table.to_stream(ScanRequest::default()).unwrap(); @@ -196,7 +198,7 @@ mod tests { assert_eq!(42, table.table_id()); assert_eq!("test", table.table_name()); - assert_eq!(schema, InformationTable::schema(&table)); + assert_eq!(schema, SystemTable::schema(&table)); let stream = table.to_stream(ScanRequest::default()).unwrap(); diff --git a/src/catalog/src/system_schema/memory_table/table_columns.rs b/src/catalog/src/system_schema/memory_table/table_columns.rs new file mode 100644 index 000000000000..75c2baf6334d --- /dev/null +++ b/src/catalog/src/system_schema/memory_table/table_columns.rs @@ -0,0 +1,50 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[macro_export] +macro_rules! memory_table_cols{ + ([$($colname:ident),*], $t:expr) => { + let t = &$t; + $( + let mut $colname = Vec::with_capacity(t.len()); + )* + paste::paste!{ + for &($([]),*) in t { + $( + $colname.push([]); + )* + } + } + }; +} + +#[cfg(test)] +mod tests { + + #[test] + fn test_memory_table_columns() { + memory_table_cols!( + [oid, typname, typlen], + [ + (1, "String", -1), + (2, "Binary", -1), + (3, "Time", 8), + (4, "Datetime", 8) + ] + ); + assert_eq!(&oid[..], &[1, 2, 3, 4]); + assert_eq!(&typname[..], &["String", "Binary", "Time", "Datetime"]); + assert_eq!(&typlen[..], &[-1, -1, 8, 8]); + } +} diff --git a/src/catalog/src/system_schema/memory_table/tables.rs b/src/catalog/src/system_schema/memory_table/tables.rs new file mode 100644 index 000000000000..c6459717769c --- /dev/null +++ b/src/catalog/src/system_schema/memory_table/tables.rs @@ -0,0 +1,79 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; + +pub fn string_columns(names: &[&'static str]) -> Vec { + names.iter().map(|name| string_column(name)).collect() +} + +pub fn string_column(name: &str) -> ColumnSchema { + ColumnSchema::new( + str::to_lowercase(name), + ConcreteDataType::string_datatype(), + false, + ) +} + +pub fn u32_column(name: &str) -> ColumnSchema { + ColumnSchema::new( + str::to_lowercase(name), + ConcreteDataType::uint32_datatype(), + false, + ) +} + +pub fn i16_column(name: &str) -> ColumnSchema { + ColumnSchema::new( + str::to_lowercase(name), + ConcreteDataType::int16_datatype(), + false, + ) +} + +pub fn bigint_column(name: &str) -> ColumnSchema { + ColumnSchema::new( + str::to_lowercase(name), + ConcreteDataType::int64_datatype(), + false, + ) +} + +pub fn datetime_column(name: &str) -> ColumnSchema { + ColumnSchema::new( + str::to_lowercase(name), + ConcreteDataType::datetime_datatype(), + false, + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_string_columns() { + let columns = ["a", "b", "c"]; + let column_schemas = string_columns(&columns); + + assert_eq!(3, column_schemas.len()); + for (i, name) in columns.iter().enumerate() { + let cs = column_schemas.get(i).unwrap(); + + assert_eq!(*name, cs.name); + assert_eq!(ConcreteDataType::string_datatype(), cs.data_type); + } + } +} diff --git a/src/catalog/src/system_schema/pg_catalog.rs b/src/catalog/src/system_schema/pg_catalog.rs new file mode 100644 index 000000000000..2b2ca876642f --- /dev/null +++ b/src/catalog/src/system_schema/pg_catalog.rs @@ -0,0 +1,115 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod pg_catalog_memory_table; +mod table_names; + +use std::collections::HashMap; +use std::sync::{Arc, Weak}; + +use common_catalog::consts::{self, PG_CATALOG_NAME}; +use datatypes::schema::ColumnSchema; +use lazy_static::lazy_static; +use paste::paste; +use pg_catalog_memory_table::get_schema_columns; +use table::TableRef; +pub use table_names::*; + +use super::memory_table::tables::u32_column; +use super::memory_table::MemoryTable; +use super::{SystemSchemaProvider, SystemSchemaProviderInner, SystemTableRef}; +use crate::CatalogManager; + +lazy_static! { + static ref MEMORY_TABLES: &'static [&'static str] = &[table_names::PG_TYPE]; +} + +/// The column name for the OID column. +/// The OID column is a unique identifier of type u32 for each object in the database. +const OID_COLUMN_NAME: &str = "oid"; + +fn oid_column() -> ColumnSchema { + u32_column(OID_COLUMN_NAME) +} + +/// [`PGCatalogProvider`] is the provider for a schema named `pg_catalog`, it is not a catalog. +pub struct PGCatalogProvider { + catalog_name: String, + _catalog_manager: Weak, + tables: HashMap, +} + +impl SystemSchemaProvider for PGCatalogProvider { + fn tables(&self) -> &HashMap { + assert!(!self.tables.is_empty()); + + &self.tables + } +} + +// TODO(j0hn50n133): Not sure whether to avoid duplication with `information_schema` or not. +macro_rules! setup_memory_table { + ($name: expr) => { + paste! { + { + let (schema, columns) = get_schema_columns($name); + Some(Arc::new(MemoryTable::new( + consts::[], + $name, + schema, + columns + )) as _) + } + } + }; +} + +impl PGCatalogProvider { + pub fn new(catalog_name: String, catalog_manager: Weak) -> Self { + let mut provider = Self { + catalog_name, + _catalog_manager: catalog_manager, + tables: HashMap::new(), + }; + provider.build_tables(); + provider + } + + fn build_tables(&mut self) { + // SECURITY NOTE: + // Must follow the same security rules as [`InformationSchemaProvider::build_tables`]. + let mut tables = HashMap::new(); + for name in MEMORY_TABLES.iter() { + tables.insert(name.to_string(), self.build_table(name).expect(name)); + } + self.tables = tables; + } +} + +impl SystemSchemaProviderInner for PGCatalogProvider { + fn schema_name() -> &'static str { + PG_CATALOG_NAME + } + + fn system_table(&self, name: &str) -> Option { + match name { + table_names::PG_TYPE => setup_memory_table!(PG_TYPE), + _ => None, + } + } + + fn catalog_name(&self) -> &str { + &self.catalog_name + } +} diff --git a/src/catalog/src/system_schema/pg_catalog/pg_catalog_memory_table.rs b/src/catalog/src/system_schema/pg_catalog/pg_catalog_memory_table.rs new file mode 100644 index 000000000000..53be0c50dd16 --- /dev/null +++ b/src/catalog/src/system_schema/pg_catalog/pg_catalog_memory_table.rs @@ -0,0 +1,69 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::vectors::{Int16Vector, StringVector, UInt32Vector, VectorRef}; + +use super::oid_column; +use super::table_names::PG_TYPE; +use crate::memory_table_cols; +use crate::system_schema::memory_table::tables::{i16_column, string_column}; + +fn pg_type_schema_columns() -> (Vec, Vec) { + // TODO(j0hn50n133): acquire this information from `DataType` instead of hardcoding it to avoid regression. + memory_table_cols!( + [oid, typname, typlen], + [ + (1, "String", -1), + (2, "Binary", -1), + (3, "Int8", 1), + (4, "Int16", 2), + (5, "Int32", 4), + (6, "Int64", 8), + (7, "UInt8", 1), + (8, "UInt16", 2), + (9, "UInt32", 4), + (10, "UInt64", 8), + (11, "Float32", 4), + (12, "Float64", 8), + (13, "Decimal", 16), + (14, "Date", 4), + (15, "DateTime", 8), + (16, "Timestamp", 8), + (17, "Time", 8), + (18, "Duration", 8), + (19, "Interval", 16), + (20, "List", -1), + ] + ); + ( + // not quiet identical with pg, we only follow the definition in pg + vec![oid_column(), string_column("typname"), i16_column("typlen")], + vec![ + Arc::new(UInt32Vector::from_vec(oid)), // oid + Arc::new(StringVector::from(typname)), + Arc::new(Int16Vector::from_vec(typlen)), // typlen in bytes + ], + ) +} + +pub(super) fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec) { + let (column_schemas, columns): (_, Vec) = match table_name { + PG_TYPE => pg_type_schema_columns(), + _ => unreachable!("Unknown table in pg_catalog: {}", table_name), + }; + (Arc::new(Schema::new(column_schemas)), columns) +} diff --git a/src/catalog/src/system_schema/pg_catalog/table_names.rs b/src/catalog/src/system_schema/pg_catalog/table_names.rs new file mode 100644 index 000000000000..c5cb720f3bb9 --- /dev/null +++ b/src/catalog/src/system_schema/pg_catalog/table_names.rs @@ -0,0 +1,18 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub const PG_DATABASE: &str = "pg_databases"; +pub const PG_NAMESPACE: &str = "pg_namespace"; +pub const PG_CLASS: &str = "pg_class"; +pub const PG_TYPE: &str = "pg_type"; diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 2a5e05fad2f4..fbd31b8efd28 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -14,6 +14,7 @@ pub const SYSTEM_CATALOG_NAME: &str = "system"; pub const INFORMATION_SCHEMA_NAME: &str = "information_schema"; +pub const PG_CATALOG_NAME: &str = "pg_catalog"; pub const SYSTEM_CATALOG_TABLE_NAME: &str = "system_catalog"; pub const DEFAULT_CATALOG_NAME: &str = "greptime"; pub const DEFAULT_SCHEMA_NAME: &str = "public"; @@ -97,6 +98,11 @@ pub const INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID: u32 = 31; pub const INFORMATION_SCHEMA_VIEW_TABLE_ID: u32 = 32; /// ----- End of information_schema tables ----- +/// ----- Begin of pg_catalog tables ----- +pub const PG_CATALOG_PG_CLASS_TABLE_ID: u32 = 256; +pub const PG_CATALOG_PG_TYPE_TABLE_ID: u32 = 257; + +/// ----- End of pg_catalog tables ----- pub const MITO_ENGINE: &str = "mito"; pub const MITO2_ENGINE: &str = "mito2"; pub const METRIC_ENGINE: &str = "metric"; diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index b0bc7f4c881f..aefa437532f0 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -462,6 +462,7 @@ async fn test_execute_show_databases_tables(instance: Arc) { +--------------------+ | greptime_private | | information_schema | +| pg_catalog | | public | +--------------------+\ "; @@ -1899,6 +1900,7 @@ async fn test_show_databases(instance: Arc) { +--------------------+ | greptime_private | | information_schema | +| pg_catalog | | public | +--------------------+"; check_output_stream(output, expected).await; @@ -1912,6 +1914,7 @@ async fn test_show_databases(instance: Arc) { | Database | +--------------------+ | information_schema | +| pg_catalog | +--------------------+"; check_output_stream(output, expected).await; } diff --git a/tests/cases/standalone/common/create/create_database.result b/tests/cases/standalone/common/create/create_database.result index 9612b3115ec5..8273385f4e48 100644 --- a/tests/cases/standalone/common/create/create_database.result +++ b/tests/cases/standalone/common/create/create_database.result @@ -18,6 +18,7 @@ show databases; | greptime_private | | illegal-database | | information_schema | +| pg_catalog | | public | +--------------------+ diff --git a/tests/cases/standalone/common/create/create_database_opts.result b/tests/cases/standalone/common/create/create_database_opts.result index c824290d715a..3b8e420885fc 100644 --- a/tests/cases/standalone/common/create/create_database_opts.result +++ b/tests/cases/standalone/common/create/create_database_opts.result @@ -10,6 +10,7 @@ SHOW DATABASES; | greptime_private | | information_schema | | mydb | +| pg_catalog | | public | +--------------------+ @@ -21,6 +22,7 @@ SHOW FULL DATABASES; | greptime_private | | | information_schema | | | mydb | ttl='1h' | +| pg_catalog | | | public | | +--------------------+----------+ @@ -65,6 +67,7 @@ SHOW DATABASES; +--------------------+ | greptime_private | | information_schema | +| pg_catalog | | public | +--------------------+ diff --git a/tests/cases/standalone/common/information_schema/tables.result b/tests/cases/standalone/common/information_schema/tables.result index 93a93a9c9805..f48e65630282 100644 --- a/tests/cases/standalone/common/information_schema/tables.result +++ b/tests/cases/standalone/common/information_schema/tables.result @@ -29,6 +29,7 @@ select table_catalog, table_schema, table_name from information_schema.tables wh +---------------+--------------+------------+ | greptime | abc | t | | greptime | abcde | t | +| greptime | pg_catalog | pg_type | | greptime | public | numbers | +---------------+--------------+------------+ diff --git a/tests/cases/standalone/common/show/show_databases_tables.result b/tests/cases/standalone/common/show/show_databases_tables.result index 918a85a1838c..3f7524eb291a 100644 --- a/tests/cases/standalone/common/show/show_databases_tables.result +++ b/tests/cases/standalone/common/show/show_databases_tables.result @@ -5,6 +5,7 @@ SHOW DATABASES; +--------------------+ | greptime_private | | information_schema | +| pg_catalog | | public | +--------------------+ @@ -15,6 +16,7 @@ SHOW FULL DATABASES; +--------------------+---------+ | greptime_private | | | information_schema | | +| pg_catalog | | | public | | +--------------------+---------+ diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index 2202b25d9e29..2d7cb914ed8f 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -44,6 +44,7 @@ order by table_schema, table_name; |greptime|information_schema|tables|LOCALTEMPORARY|3|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y| |greptime|information_schema|triggers|LOCALTEMPORARY|24|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y| |greptime|information_schema|views|LOCALTEMPORARY|32|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y| +|greptime|pg_catalog|pg_type|LOCALTEMPORARY|257|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y| |greptime|public|numbers|LOCALTEMPORARY|2|0|0|0|0|0|test_engine|11|Fixed|0|0|0|DATETIME||||0|||Y| +++++++++++++++++++++++++ @@ -399,6 +400,9 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | information_schema | views | table_name | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | views | table_schema | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | views | view_definition | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | pg_catalog | pg_type | oid | 1 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | +| greptime | pg_catalog | pg_type | typlen | 3 | | | 5 | 0 | | | | | | select,insert | | Int16 | smallint | FIELD | | No | smallint | | | +| greptime | pg_catalog | pg_type | typname | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | public | numbers | number | 1 | | | 10 | 0 | | | | PRI | | select,insert | | UInt32 | int unsigned | TAG | | No | int unsigned | | | +---------------+--------------------+---------------------------------------+-----------------------------------+------------------+--------------------------+------------------------+-------------------+---------------+--------------------+--------------------+----------------+------------+-------+---------------+-----------------------+----------------------+-----------------+---------------+----------------+-------------+-----------------+----------------+--------+ @@ -454,7 +458,7 @@ order by table_name; select table_name from information_schema.tables -where table_schema not in ('my_db', 'information_schema') +where table_schema not in ('my_db', 'information_schema', 'pg_catalog') order by table_name; +------------+ @@ -468,6 +472,7 @@ from information_schema.tables where table_catalog = 'greptime' and table_schema != 'public' and table_schema != 'information_schema' + and table_schema != 'pg_catalog' order by table_schema, table_name; +---------------+--------------+------------+------------+--------+ @@ -481,6 +486,7 @@ from information_schema.columns where table_catalog = 'greptime' and table_schema != 'public' and table_schema != 'information_schema' + and table_schema != 'pg_catalog' order by table_schema, table_name, column_name; +---------------+--------------+------------+-------------+--------------+---------------+ @@ -570,6 +576,7 @@ select * from schemata where catalog_name = 'greptime' and schema_name != 'publi +--------------+--------------------+----------------------------+------------------------+----------+---------+ | greptime | greptime_private | utf8 | utf8_bin | | | | greptime | information_schema | utf8 | utf8_bin | | | +| greptime | pg_catalog | utf8 | utf8_bin | | | +--------------+--------------------+----------------------------+------------------------+----------+---------+ -- test engines diff --git a/tests/cases/standalone/common/system/information_schema.sql b/tests/cases/standalone/common/system/information_schema.sql index 13cbe5dc9c9b..49007cb44b69 100644 --- a/tests/cases/standalone/common/system/information_schema.sql +++ b/tests/cases/standalone/common/system/information_schema.sql @@ -38,7 +38,7 @@ order by table_name; select table_name from information_schema.tables -where table_schema not in ('my_db', 'information_schema') +where table_schema not in ('my_db', 'information_schema', 'pg_catalog') order by table_name; select table_catalog, table_schema, table_name, table_type, engine @@ -46,6 +46,7 @@ from information_schema.tables where table_catalog = 'greptime' and table_schema != 'public' and table_schema != 'information_schema' + and table_schema != 'pg_catalog' order by table_schema, table_name; select table_catalog, table_schema, table_name, column_name, data_type, semantic_type @@ -53,6 +54,7 @@ from information_schema.columns where table_catalog = 'greptime' and table_schema != 'public' and table_schema != 'information_schema' + and table_schema != 'pg_catalog' order by table_schema, table_name, column_name; -- test query filter for columns -- diff --git a/tests/cases/standalone/common/system/pg_catalog.result b/tests/cases/standalone/common/system/pg_catalog.result new file mode 100644 index 000000000000..f2b9677feb99 --- /dev/null +++ b/tests/cases/standalone/common/system/pg_catalog.result @@ -0,0 +1,32 @@ +-- should not able to create pg_catalog +create database pg_catalog; + +Error: 1004(InvalidArguments), Schema pg_catalog already exists + +select * from pg_catalog.pg_type order by oid; + ++-----+-----------+--------+ +| oid | typname | typlen | ++-----+-----------+--------+ +| 1 | String | -1 | +| 2 | Binary | -1 | +| 3 | Int8 | 1 | +| 4 | Int16 | 2 | +| 5 | Int32 | 4 | +| 6 | Int64 | 8 | +| 7 | UInt8 | 1 | +| 8 | UInt16 | 2 | +| 9 | UInt32 | 4 | +| 10 | UInt64 | 8 | +| 11 | Float32 | 4 | +| 12 | Float64 | 8 | +| 13 | Decimal | 16 | +| 14 | Date | 4 | +| 15 | DateTime | 8 | +| 16 | Timestamp | 8 | +| 17 | Time | 8 | +| 18 | Duration | 8 | +| 19 | Interval | 16 | +| 20 | List | -1 | ++-----+-----------+--------+ + diff --git a/tests/cases/standalone/common/system/pg_catalog.sql b/tests/cases/standalone/common/system/pg_catalog.sql new file mode 100644 index 000000000000..b958b4c3d6c2 --- /dev/null +++ b/tests/cases/standalone/common/system/pg_catalog.sql @@ -0,0 +1,4 @@ +-- should not able to create pg_catalog +create database pg_catalog; + +select * from pg_catalog.pg_type order by oid; \ No newline at end of file diff --git a/tests/cases/standalone/common/view/create.result b/tests/cases/standalone/common/view/create.result index c1cb872111cd..675da6b6ba59 100644 --- a/tests/cases/standalone/common/view/create.result +++ b/tests/cases/standalone/common/view/create.result @@ -89,6 +89,7 @@ SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE; |greptime|information_schema|optimizer_trace|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y| |greptime|information_schema|parameters|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y| |greptime|information_schema|partitions|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y| +|greptime|pg_catalog|pg_type|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y| |greptime|information_schema|profiling|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y| |greptime|information_schema|referential_constraints|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y| |greptime|information_schema|region_peers|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|