Skip to content

Commit

Permalink
feat: support pg_database for DBeaver. (GreptimeTeam#5362)
Browse files Browse the repository at this point in the history
This patch support pg_database for pg_catalog, also add query replace,
in fixtures.rs for the reason that datafusion do not support sql like
'select 1,1;' more can check issue GreptimeTeam#5344.

Signed-off-by: yihong0618 <[email protected]>
  • Loading branch information
yihong0618 authored Jan 15, 2025
1 parent e56dd20 commit 7e61d1a
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 2 deletions.
11 changes: 11 additions & 0 deletions src/catalog/src/system_schema/pg_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod pg_catalog_memory_table;
mod pg_class;
mod pg_database;
mod pg_namespace;
mod table_names;

Expand All @@ -26,6 +27,7 @@ use lazy_static::lazy_static;
use paste::paste;
use pg_catalog_memory_table::get_schema_columns;
use pg_class::PGClass;
use pg_database::PGDatabase;
use pg_namespace::PGNamespace;
use session::context::{Channel, QueryContext};
use table::TableRef;
Expand Down Expand Up @@ -113,6 +115,10 @@ impl PGCatalogProvider {
PG_CLASS.to_string(),
self.build_table(PG_CLASS).expect(PG_NAMESPACE),
);
tables.insert(
PG_DATABASE.to_string(),
self.build_table(PG_DATABASE).expect(PG_DATABASE),
);
self.tables = tables;
}
}
Expand All @@ -135,6 +141,11 @@ impl SystemSchemaProviderInner for PGCatalogProvider {
self.catalog_manager.clone(),
self.namespace_oid_map.clone(),
))),
table_names::PG_DATABASE => Some(Arc::new(PGDatabase::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
self.namespace_oid_map.clone(),
))),
_ => None,
}
}
Expand Down
214 changes: 214 additions & 0 deletions src/catalog/src/system_schema/pg_catalog/pg_database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::{Arc, Weak};

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::PG_CATALOG_PG_DATABASE_TABLE_ID;
use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, VectorRef};
use snafu::{OptionExt, ResultExt};
use store_api::storage::ScanRequest;

use super::pg_namespace::oid_map::PGNamespaceOidMapRef;
use super::{query_ctx, OID_COLUMN_NAME, PG_DATABASE};
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::Predicates;
use crate::system_schema::utils::tables::{string_column, u32_column};
use crate::system_schema::SystemTable;
use crate::CatalogManager;

// === column name ===
pub const DATNAME: &str = "datname";

/// The initial capacity of the vector builders.
const INIT_CAPACITY: usize = 42;

/// The `pg_catalog.database` table implementation.
pub(super) struct PGDatabase {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,

// Workaround to convert schema_name to a numeric id
namespace_oid_map: PGNamespaceOidMapRef,
}

impl PGDatabase {
pub(super) fn new(
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,
) -> Self {
Self {
schema: Self::schema(),
catalog_name,
catalog_manager,
namespace_oid_map,
}
}

fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
u32_column(OID_COLUMN_NAME),
string_column(DATNAME),
]))
}

fn builder(&self) -> PGCDatabaseBuilder {
PGCDatabaseBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_manager.clone(),
self.namespace_oid_map.clone(),
)
}
}

impl DfPartitionStream for PGDatabase {
fn schema(&self) -> &ArrowSchemaRef {
self.schema.arrow_schema()
}

fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_database(None)
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
))
}
}

impl SystemTable for PGDatabase {
fn table_id(&self) -> table::metadata::TableId {
PG_CATALOG_PG_DATABASE_TABLE_ID
}

fn table_name(&self) -> &'static str {
PG_DATABASE
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn to_stream(
&self,
request: ScanRequest,
) -> Result<common_recordbatch::SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_database(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}

/// Builds the `pg_catalog.pg_database` table row by row
/// `oid` use schema name as a workaround since we don't have numeric schema id.
/// `nspname` is the schema name.
struct PGCDatabaseBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,

oid: UInt32VectorBuilder,
datname: StringVectorBuilder,
}

impl PGCDatabaseBuilder {
fn new(
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,
) -> Self {
Self {
schema,
catalog_name,
catalog_manager,
namespace_oid_map,

oid: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
datname: StringVectorBuilder::with_capacity(INIT_CAPACITY),
}
}

async fn make_database(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager
.schema_names(&catalog_name, query_ctx())
.await?
{
self.add_database(&predicates, &schema_name);
}
self.finish()
}

fn add_database(&mut self, predicates: &Predicates, schema_name: &str) {
let oid = self.namespace_oid_map.get_oid(schema_name);
let row: [(&str, &Value); 2] = [
(OID_COLUMN_NAME, &Value::from(oid)),
(DATNAME, &Value::from(schema_name)),
];

if !predicates.eval(&row) {
return;
}

self.oid.push(Some(oid));
self.datname.push(Some(schema_name));
}

fn finish(&mut self) -> Result<RecordBatch> {
let columns: Vec<VectorRef> =
vec![Arc::new(self.oid.finish()), Arc::new(self.datname.finish())];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
}
6 changes: 5 additions & 1 deletion src/catalog/src/system_schema/pg_catalog/table_names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub const PG_DATABASE: &str = "pg_databases";
// https://www.postgresql.org/docs/current/catalog-pg-database.html
pub const PG_DATABASE: &str = "pg_database";
// https://www.postgresql.org/docs/current/catalog-pg-namespace.html
pub const PG_NAMESPACE: &str = "pg_namespace";
// https://www.postgresql.org/docs/current/catalog-pg-class.html
pub const PG_CLASS: &str = "pg_class";
// https://www.postgresql.org/docs/current/catalog-pg-type.html
pub const PG_TYPE: &str = "pg_type";
1 change: 1 addition & 0 deletions src/common/catalog/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub const INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID: u32 = 35;
pub const PG_CATALOG_PG_CLASS_TABLE_ID: u32 = 256;
pub const PG_CATALOG_PG_TYPE_TABLE_ID: u32 = 257;
pub const PG_CATALOG_PG_NAMESPACE_TABLE_ID: u32 = 258;
pub const PG_CATALOG_PG_DATABASE_TABLE_ID: u32 = 259;

// ----- End of pg_catalog tables -----

Expand Down
14 changes: 13 additions & 1 deletion src/servers/src/postgres/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,15 @@ static LIMIT_CAST_PATTERN: Lazy<Regex> =
Lazy::new(|| Regex::new("(?i)(LIMIT\\s+\\d+)::bigint").unwrap());
pub(crate) fn rewrite_sql(query: &str) -> Cow<'_, str> {
//TODO(sunng87): remove this when we upgraded datafusion to 43 or newer
LIMIT_CAST_PATTERN.replace_all(query, "$1")
let query = LIMIT_CAST_PATTERN.replace_all(query, "$1");
// DBeaver tricky replacement for datafusion not support sql
// TODO: add more here
query
.replace(
"SELECT db.oid,db.* FROM pg_catalog.pg_database db",
"SELECT db.oid as _oid,db.* FROM pg_catalog.pg_database db",
)
.into()
}

#[cfg(test)]
Expand Down Expand Up @@ -215,5 +223,9 @@ mod test {

assert_eq!("SELECT * FROM number LIMIT 1", rewrite_sql(sql));
assert_eq!("SELECT * FROM number limit 1", rewrite_sql(sql2));
assert_eq!(
"SELECT db.oid as _oid,db.* FROM pg_catalog.pg_database db",
rewrite_sql("SELECT db.oid,db.* FROM pg_catalog.pg_database db")
);
}
}
13 changes: 13 additions & 0 deletions tests/cases/standalone/common/system/pg_catalog.result
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ select * from pg_catalog.pg_type;

Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.pg_catalog.pg_type

select * from pg_catalog.pg_database;

Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.pg_catalog.pg_database

-- SQLNESS PROTOCOL POSTGRES
select * from pg_catalog.pg_type order by oid;

Expand Down Expand Up @@ -102,6 +106,15 @@ select * from pg_catalog.pg_type order by oid;
| 20 | List | -1 |
+-----+-----------+--------+

-- SQLNESS PROTOCOL POSTGRES
select * from pg_catalog.pg_database where datname = 'public';

+------------+---------+
| oid | datname |
+------------+---------+
| 3927743705 | public |
+------------+---------+

-- \d
-- SQLNESS PROTOCOL POSTGRES
SELECT n.nspname as "Schema",
Expand Down
4 changes: 4 additions & 0 deletions tests/cases/standalone/common/system/pg_catalog.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ select current_schema();
select * from pg_catalog.pg_class;
select * from pg_catalog.pg_namespace;
select * from pg_catalog.pg_type;
select * from pg_catalog.pg_database;

-- SQLNESS PROTOCOL POSTGRES
select * from pg_catalog.pg_type order by oid;

-- SQLNESS PROTOCOL POSTGRES
select * from pg_catalog.pg_database where datname = 'public';

-- \d
-- SQLNESS PROTOCOL POSTGRES
SELECT n.nspname as "Schema",
Expand Down

0 comments on commit 7e61d1a

Please sign in to comment.