From 7e61d1ae2795263815bf8db275f94a3a1131ae9a Mon Sep 17 00:00:00 2001 From: yihong Date: Wed, 15 Jan 2025 15:05:34 +0800 Subject: [PATCH] feat: support pg_database for DBeaver. (#5362) This patch support pg_database for pg_catalog, also add query replace, in fixtures.rs for the reason that datafusion do not support sql like 'select 1,1;' more can check issue #5344. Signed-off-by: yihong0618 --- src/catalog/src/system_schema/pg_catalog.rs | 11 + .../system_schema/pg_catalog/pg_database.rs | 214 ++++++++++++++++++ .../system_schema/pg_catalog/table_names.rs | 6 +- src/common/catalog/src/consts.rs | 1 + src/servers/src/postgres/fixtures.rs | 14 +- .../common/system/pg_catalog.result | 13 ++ .../standalone/common/system/pg_catalog.sql | 4 + 7 files changed, 261 insertions(+), 2 deletions(-) create mode 100644 src/catalog/src/system_schema/pg_catalog/pg_database.rs diff --git a/src/catalog/src/system_schema/pg_catalog.rs b/src/catalog/src/system_schema/pg_catalog.rs index bc9c246e25fc..822dc7571ed6 100644 --- a/src/catalog/src/system_schema/pg_catalog.rs +++ b/src/catalog/src/system_schema/pg_catalog.rs @@ -14,6 +14,7 @@ mod pg_catalog_memory_table; mod pg_class; +mod pg_database; mod pg_namespace; mod table_names; @@ -26,6 +27,7 @@ use lazy_static::lazy_static; use paste::paste; use pg_catalog_memory_table::get_schema_columns; use pg_class::PGClass; +use pg_database::PGDatabase; use pg_namespace::PGNamespace; use session::context::{Channel, QueryContext}; use table::TableRef; @@ -113,6 +115,10 @@ impl PGCatalogProvider { PG_CLASS.to_string(), self.build_table(PG_CLASS).expect(PG_NAMESPACE), ); + tables.insert( + PG_DATABASE.to_string(), + self.build_table(PG_DATABASE).expect(PG_DATABASE), + ); self.tables = tables; } } @@ -135,6 +141,11 @@ impl SystemSchemaProviderInner for PGCatalogProvider { self.catalog_manager.clone(), self.namespace_oid_map.clone(), ))), + table_names::PG_DATABASE => Some(Arc::new(PGDatabase::new( + self.catalog_name.clone(), + self.catalog_manager.clone(), + self.namespace_oid_map.clone(), + ))), _ => None, } } diff --git a/src/catalog/src/system_schema/pg_catalog/pg_database.rs b/src/catalog/src/system_schema/pg_catalog/pg_database.rs new file mode 100644 index 000000000000..244be630fdc2 --- /dev/null +++ b/src/catalog/src/system_schema/pg_catalog/pg_database.rs @@ -0,0 +1,214 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{Arc, Weak}; + +use arrow_schema::SchemaRef as ArrowSchemaRef; +use common_catalog::consts::PG_CATALOG_PG_DATABASE_TABLE_ID; +use common_error::ext::BoxedError; +use common_recordbatch::adapter::RecordBatchStreamAdapter; +use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch}; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; +use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; +use datatypes::scalars::ScalarVectorBuilder; +use datatypes::schema::{Schema, SchemaRef}; +use datatypes::value::Value; +use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, VectorRef}; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::ScanRequest; + +use super::pg_namespace::oid_map::PGNamespaceOidMapRef; +use super::{query_ctx, OID_COLUMN_NAME, PG_DATABASE}; +use crate::error::{ + CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, +}; +use crate::information_schema::Predicates; +use crate::system_schema::utils::tables::{string_column, u32_column}; +use crate::system_schema::SystemTable; +use crate::CatalogManager; + +// === column name === +pub const DATNAME: &str = "datname"; + +/// The initial capacity of the vector builders. +const INIT_CAPACITY: usize = 42; + +/// The `pg_catalog.database` table implementation. +pub(super) struct PGDatabase { + schema: SchemaRef, + catalog_name: String, + catalog_manager: Weak, + + // Workaround to convert schema_name to a numeric id + namespace_oid_map: PGNamespaceOidMapRef, +} + +impl PGDatabase { + pub(super) fn new( + catalog_name: String, + catalog_manager: Weak, + namespace_oid_map: PGNamespaceOidMapRef, + ) -> Self { + Self { + schema: Self::schema(), + catalog_name, + catalog_manager, + namespace_oid_map, + } + } + + fn schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + u32_column(OID_COLUMN_NAME), + string_column(DATNAME), + ])) + } + + fn builder(&self) -> PGCDatabaseBuilder { + PGCDatabaseBuilder::new( + self.schema.clone(), + self.catalog_name.clone(), + self.catalog_manager.clone(), + self.namespace_oid_map.clone(), + ) + } +} + +impl DfPartitionStream for PGDatabase { + fn schema(&self) -> &ArrowSchemaRef { + self.schema.arrow_schema() + } + + fn execute(&self, _: Arc) -> DfSendableRecordBatchStream { + let schema = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_database(None) + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )) + } +} + +impl SystemTable for PGDatabase { + fn table_id(&self) -> table::metadata::TableId { + PG_CATALOG_PG_DATABASE_TABLE_ID + } + + fn table_name(&self) -> &'static str { + PG_DATABASE + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn to_stream( + &self, + request: ScanRequest, + ) -> Result { + let schema = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + let stream = Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_database(Some(request)) + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )); + Ok(Box::pin( + RecordBatchStreamAdapter::try_new(stream) + .map_err(BoxedError::new) + .context(InternalSnafu)?, + )) + } +} + +/// Builds the `pg_catalog.pg_database` table row by row +/// `oid` use schema name as a workaround since we don't have numeric schema id. +/// `nspname` is the schema name. +struct PGCDatabaseBuilder { + schema: SchemaRef, + catalog_name: String, + catalog_manager: Weak, + namespace_oid_map: PGNamespaceOidMapRef, + + oid: UInt32VectorBuilder, + datname: StringVectorBuilder, +} + +impl PGCDatabaseBuilder { + fn new( + schema: SchemaRef, + catalog_name: String, + catalog_manager: Weak, + namespace_oid_map: PGNamespaceOidMapRef, + ) -> Self { + Self { + schema, + catalog_name, + catalog_manager, + namespace_oid_map, + + oid: UInt32VectorBuilder::with_capacity(INIT_CAPACITY), + datname: StringVectorBuilder::with_capacity(INIT_CAPACITY), + } + } + + async fn make_database(&mut self, request: Option) -> Result { + let catalog_name = self.catalog_name.clone(); + let catalog_manager = self + .catalog_manager + .upgrade() + .context(UpgradeWeakCatalogManagerRefSnafu)?; + let predicates = Predicates::from_scan_request(&request); + for schema_name in catalog_manager + .schema_names(&catalog_name, query_ctx()) + .await? + { + self.add_database(&predicates, &schema_name); + } + self.finish() + } + + fn add_database(&mut self, predicates: &Predicates, schema_name: &str) { + let oid = self.namespace_oid_map.get_oid(schema_name); + let row: [(&str, &Value); 2] = [ + (OID_COLUMN_NAME, &Value::from(oid)), + (DATNAME, &Value::from(schema_name)), + ]; + + if !predicates.eval(&row) { + return; + } + + self.oid.push(Some(oid)); + self.datname.push(Some(schema_name)); + } + + fn finish(&mut self) -> Result { + let columns: Vec = + vec![Arc::new(self.oid.finish()), Arc::new(self.datname.finish())]; + RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu) + } +} diff --git a/src/catalog/src/system_schema/pg_catalog/table_names.rs b/src/catalog/src/system_schema/pg_catalog/table_names.rs index c5cb720f3bb9..afe5d1df41ff 100644 --- a/src/catalog/src/system_schema/pg_catalog/table_names.rs +++ b/src/catalog/src/system_schema/pg_catalog/table_names.rs @@ -12,7 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub const PG_DATABASE: &str = "pg_databases"; +// https://www.postgresql.org/docs/current/catalog-pg-database.html +pub const PG_DATABASE: &str = "pg_database"; +// https://www.postgresql.org/docs/current/catalog-pg-namespace.html pub const PG_NAMESPACE: &str = "pg_namespace"; +// https://www.postgresql.org/docs/current/catalog-pg-class.html pub const PG_CLASS: &str = "pg_class"; +// https://www.postgresql.org/docs/current/catalog-pg-type.html pub const PG_TYPE: &str = "pg_type"; diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 5a73e8ddae36..34c6fa0fdbf3 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -109,6 +109,7 @@ pub const INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID: u32 = 35; pub const PG_CATALOG_PG_CLASS_TABLE_ID: u32 = 256; pub const PG_CATALOG_PG_TYPE_TABLE_ID: u32 = 257; pub const PG_CATALOG_PG_NAMESPACE_TABLE_ID: u32 = 258; +pub const PG_CATALOG_PG_DATABASE_TABLE_ID: u32 = 259; // ----- End of pg_catalog tables ----- diff --git a/src/servers/src/postgres/fixtures.rs b/src/servers/src/postgres/fixtures.rs index 3132a38d1b5d..34761d5d35ec 100644 --- a/src/servers/src/postgres/fixtures.rs +++ b/src/servers/src/postgres/fixtures.rs @@ -119,7 +119,15 @@ static LIMIT_CAST_PATTERN: Lazy = Lazy::new(|| Regex::new("(?i)(LIMIT\\s+\\d+)::bigint").unwrap()); pub(crate) fn rewrite_sql(query: &str) -> Cow<'_, str> { //TODO(sunng87): remove this when we upgraded datafusion to 43 or newer - LIMIT_CAST_PATTERN.replace_all(query, "$1") + let query = LIMIT_CAST_PATTERN.replace_all(query, "$1"); + // DBeaver tricky replacement for datafusion not support sql + // TODO: add more here + query + .replace( + "SELECT db.oid,db.* FROM pg_catalog.pg_database db", + "SELECT db.oid as _oid,db.* FROM pg_catalog.pg_database db", + ) + .into() } #[cfg(test)] @@ -215,5 +223,9 @@ mod test { assert_eq!("SELECT * FROM number LIMIT 1", rewrite_sql(sql)); assert_eq!("SELECT * FROM number limit 1", rewrite_sql(sql2)); + assert_eq!( + "SELECT db.oid as _oid,db.* FROM pg_catalog.pg_database db", + rewrite_sql("SELECT db.oid,db.* FROM pg_catalog.pg_database db") + ); } } diff --git a/tests/cases/standalone/common/system/pg_catalog.result b/tests/cases/standalone/common/system/pg_catalog.result index d43c707bcc4a..092e9cab06be 100644 --- a/tests/cases/standalone/common/system/pg_catalog.result +++ b/tests/cases/standalone/common/system/pg_catalog.result @@ -74,6 +74,10 @@ select * from pg_catalog.pg_type; Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.pg_catalog.pg_type +select * from pg_catalog.pg_database; + +Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.pg_catalog.pg_database + -- SQLNESS PROTOCOL POSTGRES select * from pg_catalog.pg_type order by oid; @@ -102,6 +106,15 @@ select * from pg_catalog.pg_type order by oid; | 20 | List | -1 | +-----+-----------+--------+ +-- SQLNESS PROTOCOL POSTGRES +select * from pg_catalog.pg_database where datname = 'public'; + ++------------+---------+ +| oid | datname | ++------------+---------+ +| 3927743705 | public | ++------------+---------+ + -- \d -- SQLNESS PROTOCOL POSTGRES SELECT n.nspname as "Schema", diff --git a/tests/cases/standalone/common/system/pg_catalog.sql b/tests/cases/standalone/common/system/pg_catalog.sql index 2a8815cd32c0..4a110a8f0718 100644 --- a/tests/cases/standalone/common/system/pg_catalog.sql +++ b/tests/cases/standalone/common/system/pg_catalog.sql @@ -28,10 +28,14 @@ select current_schema(); select * from pg_catalog.pg_class; select * from pg_catalog.pg_namespace; select * from pg_catalog.pg_type; +select * from pg_catalog.pg_database; -- SQLNESS PROTOCOL POSTGRES select * from pg_catalog.pg_type order by oid; +-- SQLNESS PROTOCOL POSTGRES +select * from pg_catalog.pg_database where datname = 'public'; + -- \d -- SQLNESS PROTOCOL POSTGRES SELECT n.nspname as "Schema",