From e2a96bc561d8c7b98411437ed701998a83c954ea Mon Sep 17 00:00:00 2001 From: Solomon Date: Tue, 28 Nov 2023 10:22:34 +0100 Subject: [PATCH] pg_catalog wip --- Cargo.lock | 19 +- dozer-api/Cargo.toml | 2 + dozer-api/src/sql/datafusion/mod.rs | 1364 +++++++++++++++++--- dozer-api/src/sql/datafusion/pg_catalog.rs | 465 ------- dozer-api/src/sql/pgwire.rs | 3 + dozer-types/Cargo.toml | 2 +- 6 files changed, 1203 insertions(+), 652 deletions(-) delete mode 100644 dozer-api/src/sql/datafusion/pg_catalog.rs diff --git a/Cargo.lock b/Cargo.lock index d7e6e28f4d..b3131cc980 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -831,6 +831,12 @@ dependencies = [ "zstd-safe 7.0.0", ] +[[package]] +name = "async-once-cell" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9338790e78aa95a416786ec8389546c4b6a1dfc3dc36071ed9518a9413a542eb" + [[package]] name = "async-recursion" version = "1.0.5" @@ -3583,6 +3589,7 @@ dependencies = [ "actix-web", "actix-web-httpauth", "arc-swap", + "async-once-cell", "async-stream", "async-trait", "bytes", @@ -3595,6 +3602,7 @@ dependencies = [ "dozer-types", "futures-sink", "futures-util", + "genawaiter", "gethostname", "handlebars", "http", @@ -4059,7 +4067,7 @@ dependencies = [ "chrono", "geo", "ijson", - "indexmap 1.9.3", + "indexmap 2.1.0", "indicatif", "log", "ordered-float 3.9.2", @@ -5571,7 +5579,6 @@ checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", "hashbrown 0.12.3", - "serde", ] [[package]] @@ -7081,7 +7088,7 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75e56d5c441965b6425165b7e3223cc933ca469834f4a8b4786817a1f9dc4f13" dependencies = [ - "indexmap 1.9.3", + "indexmap 2.1.0", "serde", "serde_json", ] @@ -8154,7 +8161,7 @@ checksum = "c55e02e35260070b6f716a2423c2ff1c3bb1642ddca6f99e1f26d06268a0e2d2" dependencies = [ "bytes", "heck", - "itertools 0.10.5", + "itertools 0.11.0", "log", "multimap 0.8.3", "once_cell", @@ -8188,7 +8195,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.11.0", "proc-macro2 1.0.69", "quote 1.0.33", "syn 2.0.39", @@ -11292,7 +11299,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", - "rand 0.4.6", + "rand 0.8.5", "static_assertions", ] diff --git a/dozer-api/Cargo.toml b/dozer-api/Cargo.toml index 7e16ca4aa7..afb9b641a4 100644 --- a/dozer-api/Cargo.toml +++ b/dozer-api/Cargo.toml @@ -60,3 +60,5 @@ pgwire = "0.16.1" tempdir = "0.3.7" postgres-types = "0.2" futures-sink = "0.3.29" +async-once-cell = "0.5.3" +genawaiter = "0.99.1" diff --git a/dozer-api/src/sql/datafusion/mod.rs b/dozer-api/src/sql/datafusion/mod.rs index 2d6995d581..1935620225 100644 --- a/dozer-api/src/sql/datafusion/mod.rs +++ b/dozer-api/src/sql/datafusion/mod.rs @@ -1,5 +1,4 @@ pub mod json; -mod pg_catalog; mod predicate_pushdown; use std::any::Any; @@ -9,17 +8,15 @@ use std::sync::Arc; use async_trait::async_trait; -use datafusion::arrow::datatypes::{DataType, Field, Schema}; -use datafusion::catalog::information_schema::InformationSchemaProvider; +use datafusion::arrow::datatypes::DataType; use datafusion::catalog::schema::SchemaProvider; use datafusion::common::not_impl_err; use datafusion::config::ConfigOptions; use datafusion::datasource::{DefaultTableSource, TableProvider, TableType}; use datafusion::error::{DataFusionError, Result}; -use datafusion::execution::context::{SessionState, TaskContext}; +use datafusion::execution::context::{SQLOptions, SessionState, TaskContext}; use datafusion::physical_expr::var_provider::is_system_variables; use datafusion::physical_expr::PhysicalSortExpr; -use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, @@ -52,10 +49,11 @@ use crate::CacheEndpoint; use predicate_pushdown::{predicate_pushdown, supports_predicates_pushdown}; -use self::pg_catalog::PgCatalogTable; +// use self::pg_catalog::PgCatalogTable; pub struct SQLExecutor { - pub ctx: Arc, + ctx: Arc, + lazy_init: async_once_cell::OnceCell<()>, } struct ContextResolver { @@ -65,19 +63,20 @@ struct ContextResolver { impl ContextProvider for ContextResolver { fn get_table_source(&self, name: TableReference) -> Result> { - if let Some(table) = PgCatalogTable::from_ref_with_state(&name, self.state.clone()) { - Ok(Arc::new(DefaultTableSource::new(Arc::new(table)))) - } else { - let catalog = &self.state.config_options().catalog; - let name = name - .resolve(&catalog.default_catalog, &catalog.default_schema) - .to_string(); - self.tables - .get(&name) - .ok_or_else(|| DataFusionError::Plan(format!("table '{name}' not found"))) - .cloned() - } + // if let Some(table) = PgCatalogTable::from_ref_with_state(&name, self.state.clone()) { + // Ok(Arc::new(DefaultTableSource::new(Arc::new(table)))) + // } else { + + let catalog = &self.state.config_options().catalog; + let name = + resolve_table_ref(&name, &catalog.default_catalog, &catalog.default_schema).to_string(); + self.tables + .get(&name) + .ok_or_else(|| DataFusionError::Plan(format!("table '{name}' not found"))) + .cloned() + // } } + fn get_function_meta(&self, name: &str) -> Option> { let mut parts = name.splitn(3, '.'); let first = parts.next()?; @@ -105,6 +104,22 @@ impl ContextProvider for ContextResolver { }), })) } + "current_database" => { + let catalog = self.state.config_options().catalog.default_catalog.clone(); + return Some(Arc::new(ScalarUDF { + name: "pg_catalog.current_database".to_owned(), + signature: datafusion_expr::Signature { + type_signature: datafusion_expr::TypeSignature::Exact(vec![]), + volatility: datafusion_expr::Volatility::Immutable, + }, + return_type: Arc::new(|_| Ok(Arc::new(DataType::Utf8))), + fun: Arc::new(move |_| { + Ok(datafusion_expr::ColumnarValue::Scalar( + datafusion::scalar::ScalarValue::Utf8(Some(catalog.clone())), + )) + }), + })); + } "current_schema" => { let schema = self.state.config_options().catalog.default_schema.clone(); return Some(Arc::new(ScalarUDF { @@ -121,6 +136,143 @@ impl ContextProvider for ContextResolver { }), })); } + "pg_my_temp_schema" => { + return Some(Arc::new(ScalarUDF { + name: "pg_catalog.pg_my_temp_schema".to_owned(), + signature: datafusion_expr::Signature { + type_signature: datafusion_expr::TypeSignature::Exact(vec![]), + volatility: datafusion_expr::Volatility::Immutable, + }, + return_type: Arc::new(|_| Ok(Arc::new(DataType::UInt32))), + fun: Arc::new(move |_| { + Ok(datafusion_expr::ColumnarValue::Scalar( + datafusion::scalar::ScalarValue::UInt32(Some(0)), + )) + }), + })); + } + "pg_is_other_temp_schema" => { + return Some(Arc::new(ScalarUDF { + name: "pg_catalog.pg_is_other_temp_schema".to_owned(), + signature: datafusion_expr::Signature { + type_signature: datafusion_expr::TypeSignature::Exact(vec![ + DataType::UInt32, + ]), + volatility: datafusion_expr::Volatility::Immutable, + }, + return_type: Arc::new(|_| Ok(Arc::new(DataType::Boolean))), + fun: Arc::new(move |_| { + Ok(datafusion_expr::ColumnarValue::Scalar( + datafusion::scalar::ScalarValue::Boolean(Some(false)), + )) + }), + })); + } + "pg_has_role" => { + return Some(Arc::new(ScalarUDF { + name: "pg_catalog.pg_has_role".to_owned(), + signature: datafusion_expr::Signature { + type_signature: datafusion_expr::TypeSignature::VariadicAny, + volatility: datafusion_expr::Volatility::Immutable, + }, + return_type: Arc::new(|_| Ok(Arc::new(DataType::Boolean))), + fun: Arc::new(move |_| { + Ok(datafusion_expr::ColumnarValue::Scalar( + datafusion::scalar::ScalarValue::Boolean(Some(true)), + )) + }), + })); + } + "has_table_privilege" => { + return Some(Arc::new(ScalarUDF { + name: "pg_catalog.has_table_privilege".to_owned(), + signature: datafusion_expr::Signature { + type_signature: datafusion_expr::TypeSignature::VariadicAny, + volatility: datafusion_expr::Volatility::Immutable, + }, + return_type: Arc::new(|_| Ok(Arc::new(DataType::Boolean))), + fun: Arc::new(move |_| { + Ok(datafusion_expr::ColumnarValue::Scalar( + datafusion::scalar::ScalarValue::Boolean(Some(true)), + )) + }), + })); + } + "has_column_privilege" => { + return Some(Arc::new(ScalarUDF { + name: "pg_catalog.has_column_privilege".to_owned(), + signature: datafusion_expr::Signature { + type_signature: datafusion_expr::TypeSignature::VariadicAny, + volatility: datafusion_expr::Volatility::Immutable, + }, + return_type: Arc::new(|_| Ok(Arc::new(DataType::Boolean))), + fun: Arc::new(move |_| { + Ok(datafusion_expr::ColumnarValue::Scalar( + datafusion::scalar::ScalarValue::Boolean(Some(true)), + )) + }), + })); + } + "has_any_column_privilege" => { + return Some(Arc::new(ScalarUDF { + name: "pg_catalog.has_any_column_privilege".to_owned(), + signature: datafusion_expr::Signature { + type_signature: datafusion_expr::TypeSignature::VariadicAny, + volatility: datafusion_expr::Volatility::Immutable, + }, + return_type: Arc::new(|_| Ok(Arc::new(DataType::Boolean))), + fun: Arc::new(move |_| { + Ok(datafusion_expr::ColumnarValue::Scalar( + datafusion::scalar::ScalarValue::Boolean(Some(true)), + )) + }), + })); + } + "pg_column_is_updatable" => { + return Some(Arc::new(ScalarUDF { + name: "pg_catalog.pg_column_is_updatable".to_owned(), + signature: datafusion_expr::Signature { + type_signature: datafusion_expr::TypeSignature::VariadicAny, + volatility: datafusion_expr::Volatility::Immutable, + }, + return_type: Arc::new(|_| Ok(Arc::new(DataType::Boolean))), + fun: Arc::new(move |_| { + Ok(datafusion_expr::ColumnarValue::Scalar( + datafusion::scalar::ScalarValue::Boolean(Some(false)), + )) + }), + })); + } + "pg_relation_is_updatable" => { + return Some(Arc::new(ScalarUDF { + name: "pg_catalog.pg_relation_is_updatable".to_owned(), + signature: datafusion_expr::Signature { + type_signature: datafusion_expr::TypeSignature::VariadicAny, + volatility: datafusion_expr::Volatility::Immutable, + }, + return_type: Arc::new(|_| Ok(Arc::new(DataType::Int32))), + fun: Arc::new(move |_| { + Ok(datafusion_expr::ColumnarValue::Scalar( + datafusion::scalar::ScalarValue::Int32(Some(0)), + )) + }), + })); + } + "pg_get_viewdef" => { + return Some(Arc::new(ScalarUDF { + name: "pg_catalog.pg_get_viewdef".to_owned(), + signature: datafusion_expr::Signature { + type_signature: datafusion_expr::TypeSignature::VariadicAny, + volatility: datafusion_expr::Volatility::Immutable, + }, + return_type: Arc::new(|_| Ok(Arc::new(DataType::Utf8))), + fun: Arc::new(move |_| { + Ok(datafusion_expr::ColumnarValue::Scalar( + datafusion::scalar::ScalarValue::Utf8(Some("".into())), + )) + }), + })); + } "format_type" => { return Some(Arc::new(ScalarUDF { name: "pg_catalog.format_type".to_owned(), @@ -271,12 +423,27 @@ impl ContextProvider for ContextResolver { } } +fn resolve_table_ref<'a>( + reference: &'a TableReference<'a>, + default_catalog: &'a str, + default_schema: &'a str, +) -> ResolvedTableReference<'a> { + let reference = { + if matches!(reference.schema(), Some("pg_catalog")) { + TableReference::partial(reference.schema().unwrap(), reference.table()) + } else if reference.table().to_ascii_lowercase().starts_with("pg_") { + TableReference::partial("pg_catalog", reference.table()) + } else { + reference.clone() + } + }; + reference.resolve(default_catalog, default_schema) +} + impl SQLExecutor { pub fn new(cache_endpoints: Vec>) -> Self { let ctx = SessionContext::new_with_config( - SessionConfig::new() - .with_information_schema(true) - .with_default_catalog_and_schema("public", "dozer"), + SessionConfig::new().with_default_catalog_and_schema("dozer", "public"), ); for cache_endpoint in cache_endpoints { let data_source = CacheEndpointDataSource::new(cache_endpoint.clone()); @@ -294,7 +461,12 @@ impl SQLExecutor { ctx.register_variable(VarType::UserDefined, variable_provider.clone()); ctx.register_variable(VarType::System, variable_provider); - Self { ctx: Arc::new(ctx) } + // let _df = ctx.sql("sql").await.unwrap(); + + Self { + ctx: Arc::new(ctx), + lazy_init: Default::default(), + } } pub async fn execute(&self, plan: LogicalPlan) -> Result { @@ -306,12 +478,6 @@ impl SQLExecutor { state: &SessionState, resolved_ref: ResolvedTableReference<'_>, ) -> Result> { - if state.config().information_schema() && resolved_ref.schema == "information_schema" { - return Ok(Arc::new(InformationSchemaProviderWrapper { - inner: InformationSchemaProvider::new(state.catalog_list().clone()), - })); - } - state .catalog_list() .catalog(&resolved_ref.catalog) @@ -327,6 +493,13 @@ impl SQLExecutor { }) } + async fn lazy_init(&self) -> Result<(), DataFusionError> { + self.lazy_init + .get_or_try_init::(self.pg_catalog_sql()) + .await?; + Ok(()) + } + async fn parse_statement( &self, mut statement: Statement, @@ -343,11 +516,13 @@ impl SQLExecutor { ast::Statement::ShowVariable { variable } => { let variable = object_name_to_string(variable); match variable.as_str() { - "transaction.isolation.level" => Some("SELECT \"@@transaction_isolation\""), + "transaction.isolation.level" => { + Some("SELECT \"@@transaction_isolation\"".to_string()) + } "standard_conforming_strings" => { - Some("SELECT \"@@standard_conforming_strings\"") + Some("SELECT \"@@standard_conforming_strings\"".to_string()) } - _ => None, + _ => Some(format!("SELECT \"@@{variable}\"")), } } _ => None, @@ -356,10 +531,35 @@ impl SQLExecutor { None }; if let Some(query) = rewrite { - statement = self.ctx.state().sql_to_statement(query, "postgres")?; + statement = self.ctx.state().sql_to_statement(&query, "postgres")?; } + self.lazy_init().await?; + + let provider = self.context_provider_for_statment(&statement).await?; + + let query = SqlToRel::new_with_options( + &provider, + ParserOptions { + parse_float_as_decimal: false, + enable_ident_normalization: true, + }, + ); + let plan = query.statement_to_plan(statement)?; + let options = SQLOptions::new() + .with_allow_ddl(false) + .with_allow_dml(false); + options.verify_plan(&plan)?; + Ok(Some(plan)) + } + + async fn context_provider_for_statment( + &self, + statement: &Statement, + ) -> Result { let state = self.ctx.state(); - let table_refs = state.resolve_table_references(&statement)?; + let mut table_refs = state.resolve_table_references(statement)?; + table_refs.push(TableReference::partial("information_schema", "tables")); + table_refs.push(TableReference::partial("information_schema", "columns")); let mut provider = ContextResolver { state: Arc::new(state), @@ -371,7 +571,7 @@ impl SQLExecutor { let default_schema = &config.catalog.default_schema; for table_ref in table_refs { let table = table_ref.table(); - let resolved = table_ref.clone().resolve(default_catalog, default_schema); + let resolved = resolve_table_ref(&table_ref, default_catalog, default_schema); if let Entry::Vacant(v) = provider.tables.entry(resolved.to_string()) { if let Ok(schema) = self.schema_for_ref(&state, resolved) { if let Some(table) = schema.table(table).await { @@ -380,15 +580,7 @@ impl SQLExecutor { } } } - - let query = SqlToRel::new_with_options( - &provider, - ParserOptions { - parse_float_as_decimal: false, - enable_ident_normalization: true, - }, - ); - Some(query.statement_to_plan(statement)).transpose() + Ok(provider) } pub async fn parse(&self, mut sql: &str) -> Result>, DataFusionError> { @@ -411,6 +603,954 @@ impl SQLExecutor { ) .await } + + // async fn sql_with_params( + // &self, + // sql: &str, + // param_values: Vec, + // ) -> Result<(), DataFusionError> { + // let sql = format!("PREPARE my_plan as {sql}"); + // let df = self.ctx.sql(&sql).await?; + // let _r = df.with_param_values(param_values)?.collect().await?; + // Ok(()) + // } + + async fn dml_sql_with_params( + &self, + sql: &str, + param_values: Vec, + ) -> Result<(), DataFusionError> { + self.ctx + .sql(sql) + .await + .map_err(|err| { + eprintln!("error in dml query {sql}: {err}"); + err + })? + .with_param_values(param_values) + .map_err(|err| { + eprintln!("error in dml query {sql}: {err}"); + err + })? + .collect() + .await + .map_err(|err| { + eprintln!("error in dml query {sql}: {err}"); + err + })?; + Ok(()) + } + + async fn pg_catalog_sql(&self) -> Result<(), DataFusionError> { + let schema_query = " + + CREATE SCHEMA pg_catalog; + + CREATE TABLE pg_catalog.pg_authid ( + oid int unsigned NOT NULL, + rolname string NOT NULL, + rolsuper boolean NOT NULL, + rolinherit boolean NOT NULL, + rolcreaterole boolean NOT NULL, + rolcreatedb boolean NOT NULL, + rolcanlogin boolean NOT NULL, + rolreplication boolean NOT NULL, + rolbypassrls boolean NOT NULL, + rolconnlimit integer NOT NULL, + rolpassword text, + rolvaliduntil timestamp with time zone, + PRIMARY KEY (oid), + UNIQUE (rolname), + ); + + CREATE TABLE pg_catalog.pg_tablespace ( + oid int unsigned NOT NULL, + spcname string NOT NULL, + spcowner int unsigned NOT NULL, + spcacl string, + spcoptions text, + PRIMARY KEY (oid), + UNIQUE (spcname), + ); + + CREATE TABLE pg_catalog.pg_database ( + oid int unsigned NOT NULL, + datname string NOT NULL, + datdba int unsigned NOT NULL, + encoding integer NOT NULL, + datlocprovider string NOT NULL, + datistemplate boolean NOT NULL, + datallowconn boolean NOT NULL, + datconnlimit integer NOT NULL, + datfrozenxid int unsigned NOT NULL, + datminmxid int unsigned NOT NULL, + dattablespace int unsigned NOT NULL, + datcollate text NOT NULL, + datctype text NOT NULL, + daticulocale text, + daticurules text, + datcollversion text, + datacl string, + UNIQUE (datname), + PRIMARY KEY (oid), + ); + + CREATE TABLE pg_catalog.pg_namespace ( + oid int unsigned NOT NULL, + nspname string NOT NULL, + nspowner int unsigned NOT NULL, + nspacl string, + PRIMARY KEY (oid), + UNIQUE (nspname), + ); + + CREATE TABLE pg_catalog.pg_type ( + oid int unsigned NOT NULL, + typname string NOT NULL, + typnamespace int unsigned NOT NULL, + typowner int unsigned NOT NULL, + typlen smallint NOT NULL, + typbyval boolean NOT NULL, + typtype char NOT NULL, + typcategory string NOT NULL, + typispreferred boolean NOT NULL, + typisdefined boolean NOT NULL, + typdelim string NOT NULL, + typrelid int unsigned NOT NULL, + typsubscript int unsigned NOT NULL, + typelem int unsigned NOT NULL, + typarray int unsigned NOT NULL, + typinput int unsigned NOT NULL, + typoutput int unsigned NOT NULL, + typreceive int unsigned NOT NULL, + typsend int unsigned NOT NULL, + typmodin int unsigned NOT NULL, + typmodout int unsigned NOT NULL, + typanalyze int unsigned NOT NULL, + typalign string NOT NULL, + typstorage string NOT NULL, + typnotnull boolean NOT NULL, + typbasetype int unsigned NOT NULL, + typtypmod integer NOT NULL, + typndims integer NOT NULL, + typcollation int unsigned NOT NULL, + typdefaultbin string, + typdefault text, + typacl text, + PRIMARY KEY (oid), + UNIQUE (typname, typnamespace), + ); + + CREATE TABLE pg_catalog.pg_am ( + oid int unsigned NOT NULL, + amname string NOT NULL, + amhandler string NOT NULL, + amtype string NOT NULL, + UNIQUE (amname), + PRIMARY KEY (oid), + ); + + CREATE TABLE pg_catalog.pg_attribute ( + attrelid int unsigned NOT NULL, + attname string NOT NULL, + atttypid int unsigned NOT NULL, + attlen smallint NOT NULL, + attnum smallint NOT NULL, + attcacheoff integer NOT NULL, + atttypmod integer NOT NULL, + attndims smallint NOT NULL, + attbyval boolean NOT NULL, + attalign string NOT NULL, + attstorage string NOT NULL, + attcompression string NOT NULL, + attnotnull boolean NOT NULL, + atthasdef boolean NOT NULL, + atthasmissing boolean NOT NULL, + attidentity string NOT NULL, + attgenerated string NOT NULL, + attisdropped boolean NOT NULL, + attislocal boolean NOT NULL, + attinhcount smallint NOT NULL, + attstattarget smallint NOT NULL, + attcollation int unsigned NOT NULL, + attacl string, + attoptions string, + attfdwoptions string, + attmissingval string, + PRIMARY KEY (attrelid, attnum), + UNIQUE (attrelid, attname), + ); + + CREATE TABLE pg_catalog.pg_attrdef ( + oid int unsigned NOT NULL, + adrelid int unsigned NOT NULL, + adnum smallint NOT NULL, + adbin string NOT NULL, + PRIMARY KEY (oid), + UNIQUE (adrelid, adnum), + ); + + CREATE TABLE pg_catalog.pg_proc ( + oid int unsigned NOT NULL, + proname string NOT NULL, + pronamespace int unsigned NOT NULL, + proowner int unsigned NOT NULL, + prolang int unsigned NOT NULL, + procost real NOT NULL, + prorows real NOT NULL, + provariadic int unsigned NOT NULL, + prosupport string NOT NULL, + prokind string NOT NULL, + prosecdef boolean NOT NULL, + proleakproof boolean NOT NULL, + proisstrict boolean NOT NULL, + proretset boolean NOT NULL, + provolatile string NOT NULL, + proparallel string NOT NULL, + pronargs smallint NOT NULL, + pronargdefaults smallint NOT NULL, + prorettype int unsigned NOT NULL, + proargtypes string NOT NULL, + proallargtypes string, + proargmodes string, + proargnames text, + proargdefaults string, + protrftypes string, + prosrc text NOT NULL, + probin text, + prosqlbody string, + proconfig text, + proacl string, + PRIMARY KEY (oid), + UNIQUE (proname, proargtypes, pronamespace), + ); + + CREATE TABLE pg_catalog.pg_language ( + oid int unsigned NOT NULL, + lanname string NOT NULL, + lanowner int unsigned NOT NULL, + lanispl boolean NOT NULL, + lanpltrusted boolean NOT NULL, + lanplcallfoid int unsigned NOT NULL, + laninline int unsigned NOT NULL, + lanvalidator int unsigned NOT NULL, + lanacl string, + PRIMARY KEY (oid), + UNIQUE (lanname), + ); + + CREATE TABLE pg_catalog.pg_collation ( + oid int unsigned NOT NULL, + collname string NOT NULL, + collnamespace int unsigned NOT NULL, + collowner int unsigned NOT NULL, + collprovider string NOT NULL, + collisdeterministic boolean NOT NULL, + collencoding integer NOT NULL, + collcollate text, + collctype text, + colliculocale text, + collicurules text, + collversion text, + PRIMARY KEY (oid), + UNIQUE (collname, collencoding, collnamespace), + ); + + CREATE TABLE pg_catalog.pg_constraint ( + oid int unsigned NOT NULL, + conname string NOT NULL, + connamespace int unsigned NOT NULL, + contype string NOT NULL, + condeferrable boolean NOT NULL, + condeferred boolean NOT NULL, + convalidated boolean NOT NULL, + conrelid int unsigned NOT NULL, + contypid int unsigned NOT NULL, + conindid int unsigned NOT NULL, + conparentid int unsigned NOT NULL, + confrelid int unsigned NOT NULL, + confupdtype string NOT NULL, + confdeltype string NOT NULL, + confmatchtype string NOT NULL, + conislocal boolean NOT NULL, + coninhcount smallint NOT NULL, + connoinherit boolean NOT NULL, + conkey string, + confkey string, + conpfeqop string, + conppeqop string, + conffeqop string, + confdelsetcols string, + conexclop string, + conbin string, + PRIMARY KEY (oid), + UNIQUE (conrelid, contypid, conname), + ); + + CREATE TABLE pg_catalog.pg_rewrite ( + oid int unsigned NOT NULL, + rulename string NOT NULL, + ev_class int unsigned NOT NULL, + ev_type string NOT NULL, + ev_enabled string NOT NULL, + is_instead boolean NOT NULL, + ev_qual string NOT NULL, + ev_action string NOT NULL, + PRIMARY KEY (oid), + UNIQUE (ev_class, rulename), + ); + + CREATE TABLE pg_catalog.pg_trigger ( + oid int unsigned NOT NULL, + tgrelid int unsigned NOT NULL, + tgparentid int unsigned NOT NULL, + tgname string NOT NULL, + tgfoid int unsigned NOT NULL, + tgtype smallint NOT NULL, + tgenabled string NOT NULL, + tgisinternal boolean NOT NULL, + tgconstrrelid int unsigned NOT NULL, + tgconstrindid int unsigned NOT NULL, + tgconstraint int unsigned NOT NULL, + tgdeferrable boolean NOT NULL, + tginitdeferred boolean NOT NULL, + tgnargs smallint NOT NULL, + tgattr string NOT NULL, + tgargs string NOT NULL, + tgqual string, + tgoldtable string, + tgnewtable string, + PRIMARY KEY (oid), + UNIQUE (tgrelid, tgname), + ); + + CREATE TABLE pg_catalog.pg_operator ( + oid int unsigned NOT NULL, + oprname string NOT NULL, + oprnamespace int unsigned NOT NULL, + oprowner int unsigned NOT NULL, + oprkind string NOT NULL, + oprcanmerge boolean NOT NULL, + oprcanhash boolean NOT NULL, + oprleft int unsigned NOT NULL, + oprright int unsigned NOT NULL, + oprresult int unsigned NOT NULL, + oprcom int unsigned NOT NULL, + oprnegate int unsigned NOT NULL, + oprcode string NOT NULL, + oprrest string NOT NULL, + oprjoin string NOT NULL, + PRIMARY KEY (oid), + UNIQUE (oprname, oprleft, oprright, oprnamespace), + ); + + CREATE TABLE pg_catalog.pg_policy ( + oid int unsigned NOT NULL, + polname string NOT NULL, + polrelid int unsigned NOT NULL, + polcmd string NOT NULL, + polpermissive boolean NOT NULL, + polroles string NOT NULL, + polqual string, + polwithcheck string, + PRIMARY KEY (oid), + UNIQUE (polrelid, polname), + ); + + CREATE TABLE pg_catalog.pg_depend ( + classid int unsigned NOT NULL, + objid int unsigned NOT NULL, + objsubid integer NOT NULL, + refclassid int unsigned NOT NULL, + refobjid int unsigned NOT NULL, + refobjsubid integer NOT NULL, + deptype char NOT NULL, + ); + + CREATE TABLE pg_catalog.pg_class ( + oid int unsigned NOT NULL, + relname string NOT NULL, + relnamespace int unsigned NOT NULL, + reltype int unsigned NOT NULL, + reloftype int unsigned NOT NULL, + relowner int unsigned NOT NULL, + relam int unsigned NOT NULL, + relfilenode int unsigned NOT NULL, + reltablespace int unsigned NOT NULL, + relpages int NOT NULL, + reltuples real NOT NULL, + relallvisible int NOT NULL, + reltoastrelid int unsigned NOT NULL, + relhasindex boolean NOT NULL, + relisshared boolean NOT NULL, + relpersistence string NOT NULL, + relkind string NOT NULL, + relnatts smallint NOT NULL, + relchecks smallint NOT NULL, + relhasrules boolean NOT NULL, + relhastriggers boolean NOT NULL, + relhassubclass boolean NOT NULL, + relrowsecurity boolean NOT NULL, + relforcerowsecurity boolean NOT NULL, + relispopulated boolean NOT NULL, + relreplident string NOT NULL, + relispartition boolean NOT NULL, + relrewrite int unsigned NOT NULL, + relfrozenxid int unsigned NOT NULL, + relminmxid int unsigned NOT NULL, + relacl string, + reloptions string, + relpartbound string, + PRIMARY KEY (oid), + UNIQUE (relname, relnamespace), + ); + + CREATE SCHEMA information_schema; + + CREATE VIEW information_schema.tables AS + SELECT (current_database()) AS table_catalog, + (nc.nspname) AS table_schema, + (c.relname) AS table_name, + ( + CASE + WHEN (nc.oid = pg_my_temp_schema()) THEN 'LOCAL TEMPORARY' + WHEN (c.relkind in ('r', 'p')) THEN 'BASE TABLE' + WHEN (c.relkind = 'v') THEN 'VIEW' + WHEN (c.relkind = 'f') THEN 'FOREIGN' + ELSE NULL + END) AS table_type, + (NULL) AS self_referencing_column_name, + (NULL) AS reference_generation, + ( + CASE + WHEN (t.typname IS NOT NULL) THEN current_database() + ELSE NULL + END) AS user_defined_type_catalog, + (nt.nspname) AS user_defined_type_schema, + (t.typname) AS user_defined_type_name, + ('NO') AS is_insertable_into, + ( + CASE + WHEN (t.typname IS NOT NULL) THEN 'YES' + ELSE 'NO' + END) AS is_typed, + (NULL) AS commit_action + FROM ((pg_namespace nc + JOIN pg_class c ON ((nc.oid = c.relnamespace))) + LEFT JOIN (pg_type t + JOIN pg_namespace nt ON ((t.typnamespace = nt.oid))) ON ((c.reloftype = t.oid))) + WHERE ((c.relkind in ('r', 'v', 'f', 'p')) + AND (NOT pg_is_other_temp_schema(nc.oid)) + AND (pg_has_role(c.relowner, 'USAGE') + OR has_table_privilege(c.oid, 'SELECT, INSERT, UPDATE, DELETE, TRUNCATE, REFERENCES, TRIGGER') + OR has_any_column_privilege(c.oid, 'SELECT, INSERT, UPDATE, REFERENCES'))); + + + CREATE VIEW information_schema.columns AS + SELECT (current_database()) AS table_catalog, + (nc.nspname) AS table_schema, + (c.relname) AS table_name, + (a.attname) AS column_name, + (a.attnum) AS ordinal_position, + ( + CASE + WHEN (a.attgenerated = '') THEN pg_get_expr(ad.adbin, ad.adrelid) + ELSE NULL + END) AS column_default, + ( + CASE + WHEN (a.attnotnull OR ((t.typtype = 'd') AND t.typnotnull)) THEN 'NO' + ELSE 'YES' + END) AS is_nullable, + ( + CASE + WHEN (t.typtype = 'd') THEN + CASE + WHEN ((bt.typelem <> (0)) AND (bt.typlen = '-1')) THEN 'ARRAY' + WHEN (nbt.nspname = 'pg_catalog') THEN (SELECT typname FROM pg_type WHERE oid = t.typbasetype) + ELSE 'USER-DEFINED' + END + ELSE + CASE + WHEN ((t.typelem <> (0)) AND (t.typlen = '-1')) THEN 'ARRAY' + WHEN (nt.nspname = 'pg_catalog') THEN (SELECT typname FROM pg_type WHERE oid = a.atttypid) + ELSE 'USER-DEFINED' + END + END) AS data_type, + (NULL::integer) AS character_maximum_length, + (NULL::integer) AS character_octet_length, + (NULL::integer) AS numeric_precision, + (NULL::integer) AS numeric_precision_radix, + (NULL::integer) AS numeric_scale, + (NULL::integer) AS datetime_precision, + (NULL::integer) AS interval_type, + (NULL::integer) AS interval_precision, + (NULL::string) AS character_set_catalog, + (NULL::string) AS character_set_schema, + (NULL::string) AS character_set_name, + ( + CASE + WHEN (nco.nspname IS NOT NULL) THEN current_database() + ELSE NULL::string + END) AS collation_catalog, + (nco.nspname) AS collation_schema, + (co.collname) AS collation_name, + ( + CASE + WHEN (t.typtype = 'd') THEN current_database() + ELSE NULL::string + END) AS domain_catalog, + ( + CASE + WHEN (t.typtype = 'd') THEN nt.nspname + ELSE NULL::string + END) AS domain_schema, + ( + CASE + WHEN (t.typtype = 'd') THEN t.typname + ELSE NULL::string + END) AS domain_name, + (current_database()) AS udt_catalog, + (COALESCE(nbt.nspname, nt.nspname)) AS udt_schema, + (COALESCE(bt.typname, t.typname)) AS udt_name, + (NULL::string) AS scope_catalog, + (NULL::string) AS scope_schema, + (NULL::string) AS scope_name, + (NULL::integer) AS maximum_cardinality, + (a.attnum) AS dtd_identifier, + ('NO') AS is_self_referencing, + ( + CASE + WHEN (a.attidentity in ('a', 'd')) THEN 'YES' + ELSE 'NO' + END) AS is_identity, + ( + CASE a.attidentity + WHEN 'a' THEN 'ALWAYS' + WHEN 'd' THEN 'BY DEFAULT' + ELSE NULL + END) AS identity_generation, + ( + CASE + WHEN (a.attgenerated <> '') THEN 'ALWAYS' + ELSE 'NEVER' + END) AS is_generated, + ( + CASE + WHEN (a.attgenerated <> '') THEN pg_get_expr(ad.adbin, ad.adrelid) + ELSE NULL + END) AS generation_expression, + ( + CASE + WHEN ((c.relkind in ('r', 'p')) OR ((c.relkind in ('v', 'f')) AND pg_column_is_updatable((c.oid), a.attnum, false))) THEN 'YES' + ELSE 'NO' + END) AS is_updatable + FROM ((((((pg_attribute a + LEFT JOIN pg_attrdef ad ON (((a.attrelid = ad.adrelid) AND (a.attnum = ad.adnum)))) + JOIN (pg_class c + JOIN pg_namespace nc ON ((c.relnamespace = nc.oid))) ON ((a.attrelid = c.oid))) + JOIN (pg_type t + JOIN pg_namespace nt ON ((t.typnamespace = nt.oid))) ON ((a.atttypid = t.oid))) + LEFT JOIN (pg_type bt + JOIN pg_namespace nbt ON ((bt.typnamespace = nbt.oid))) ON (((t.typtype = 'd') AND (t.typbasetype = bt.oid)))) + LEFT JOIN (pg_collation co + JOIN pg_namespace nco ON ((co.collnamespace = nco.oid))) ON (((a.attcollation = co.oid) AND ((nco.nspname <> 'pg_catalog') OR (co.collname <> 'default')))))) + WHERE ((NOT pg_is_other_temp_schema(nc.oid)) + AND (a.attnum > 0) + AND (NOT a.attisdropped) + AND (c.relkind in ('r', 'v', 'f', 'p')) + AND (pg_has_role(c.relowner, 'USAGE') + OR has_column_privilege(c.oid, a.attnum, 'SELECT, INSERT, UPDATE, REFERENCES'))); + + + CREATE VIEW information_schema.views AS + SELECT (current_database()) AS table_catalog, + (nc.nspname) AS table_schema, + (c.relname) AS table_name, + ( + CASE + WHEN pg_has_role(c.relowner, 'USAGE'::text) THEN pg_get_viewdef(c.oid) + ELSE NULL::text + END) AS view_definition, + ( + CASE + WHEN ('check_option=cascaded'::text in (c.reloptions)) THEN 'CASCADED'::text + WHEN ('check_option=local'::text in (c.reloptions)) THEN 'LOCAL'::text + ELSE 'NONE'::text + END) AS check_option, + ( + CASE + WHEN ((pg_relation_is_updatable((c.oid), false) & 20) = 20) THEN 'YES'::text + ELSE 'NO'::text + END) AS is_updatable, + ( + CASE + WHEN ((pg_relation_is_updatable((c.oid), false) & 8) = 8) THEN 'YES'::text + ELSE 'NO'::text + END) AS is_insertable_into, + ('NO'::text) AS is_trigger_updatable, + ('NO'::text) AS is_trigger_deletable, + ('NO'::text) AS is_trigger_insertable_into + FROM pg_namespace nc, + pg_class c + WHERE ((c.relnamespace = nc.oid) + AND (c.relkind = 'v'::char) + AND (NOT pg_is_other_temp_schema(nc.oid)) + AND (pg_has_role(c.relowner, 'USAGE'::text) + OR has_table_privilege(c.oid, 'SELECT, INSERT, UPDATE, DELETE, TRUNCATE, REFERENCES, TRIGGER'::text) + OR has_any_column_privilege(c.oid, 'SELECT, INSERT, UPDATE, REFERENCES'::text))); + + + CREATE VIEW information_schema.referential_constraints AS + SELECT (current_database()) AS constraint_catalog, + (ncon.nspname) AS constraint_schema, + (con.conname) AS constraint_name, + ( + CASE + WHEN (npkc.nspname IS NULL) THEN NULL::string + ELSE current_database() + END) AS unique_constraint_catalog, + (npkc.nspname) AS unique_constraint_schema, + (pkc.conname) AS unique_constraint_name, + ( + CASE con.confmatchtype + WHEN 'f'::char THEN 'FULL'::text + WHEN 'p'::char THEN 'PARTIAL'::text + WHEN 's'::char THEN 'NONE'::text + ELSE NULL::text + END) AS match_option, + ( + CASE con.confupdtype + WHEN 'c'::char THEN 'CASCADE'::text + WHEN 'n'::char THEN 'SET NULL'::text + WHEN 'd'::char THEN 'SET DEFAULT'::text + WHEN 'r'::char THEN 'RESTRICT'::text + WHEN 'a'::char THEN 'NO ACTION'::text + ELSE NULL::text + END) AS update_rule, + ( + CASE con.confdeltype + WHEN 'c'::char THEN 'CASCADE'::text + WHEN 'n'::char THEN 'SET NULL'::text + WHEN 'd'::char THEN 'SET DEFAULT'::text + WHEN 'r'::char THEN 'RESTRICT'::text + WHEN 'a'::char THEN 'NO ACTION'::text + ELSE NULL::text + END) AS delete_rule + FROM ((((((pg_namespace ncon + JOIN pg_constraint con ON ((ncon.oid = con.connamespace))) + JOIN pg_class c ON (((con.conrelid = c.oid) AND (con.contype = 'f'::char)))) + LEFT JOIN pg_depend d1 ON (((d1.objid = con.oid) AND (d1.classid = (SELECT oid FROM pg_class WHERE relname = 'pg_constraint')) AND (d1.refclassid = (SELECT oid FROM pg_class WHERE relname = 'pg_class')) AND (d1.refobjsubid = 0)))) + LEFT JOIN pg_depend d2 ON (((d2.refclassid = (SELECT oid FROM pg_class WHERE relname = 'pg_constraint')) AND (d2.classid = (SELECT oid FROM pg_class WHERE relname = 'pg_class')) AND (d2.objid = d1.refobjid) AND (d2.objsubid = 0) AND (d2.deptype = 'i'::char)))) + LEFT JOIN pg_constraint pkc ON (((pkc.oid = d2.refobjid) AND (pkc.contype in ('p'::char, 'u'::char)) AND (pkc.conrelid = con.confrelid)))) + LEFT JOIN pg_namespace npkc ON ((pkc.connamespace = npkc.oid))) + WHERE (pg_has_role(c.relowner, 'USAGE'::text) + OR has_table_privilege(c.oid, 'INSERT, UPDATE, DELETE, TRUNCATE, REFERENCES, TRIGGER'::text) + OR has_any_column_privilege(c.oid, 'INSERT, UPDATE, REFERENCES'::text)); + + CREATE TABLE information_schema.key_column_usage( + constraint_catalog string, + constraint_schema string, + constraint_name string, + table_catalog string, + table_schema string, + table_name string, + column_name string, + ordinal_position int unsigned, + position_in_unique_constraint int unsigned, + ); + + CREATE TABLE information_schema.table_constraints( + constraint_catalog string, + constraint_schema string, + constraint_name string, + table_catalog string, + table_schema string, + table_name string, + constraint_type string, + is_deferrable string, + initially_deferred string, + enforced string, + nulls_distinct string, + ); + + "; + + let state = self.ctx.state(); + + { + let statements = DFParser::parse_sql_with_dialect( + schema_query, + &datafusion::sql::sqlparser::dialect::PostgreSqlDialect {}, + )?; + for statement in statements { + let provider = self.context_provider_for_statment(&statement).await?; + let query = SqlToRel::new(&provider); + let plan = query.statement_to_plan(statement)?; + self.ctx.execute_logical_plan(plan).await?; + } + } + + // data + { + struct OIDFactory(u32); + impl OIDFactory { + fn new() -> Self { + Self(1) + } + + fn next(&mut self) -> u32 { + let oid = self.0; + self.0 += 1; + oid + } + } + + let mut oids = OIDFactory::new(); + + let owner_id = oids.next(); + let owner_name = "dozer"; + self.dml_sql_with_params( + "INSERT + INTO pg_catalog.pg_authid + VALUES ($1, $2, true, true, true, true, true, true, true, -1, null, null)", + vec![owner_id.into(), owner_name.into()], + ) + .await?; + + let default_table_space_id = oids.next(); + let default_table_space_name = "pg_default"; + self.dml_sql_with_params( + "INSERT INTO pg_catalog.pg_tablespace VALUES($1, $2, $3, null, null)", + vec![ + default_table_space_id.into(), + default_table_space_name.into(), + owner_id.into(), + ], + ) + .await?; + + struct SQLType { + pub id: u32, + pub name: &'static str, + pub len: i16, + pub category_code: &'static str, + pub preferred: bool, + } + + impl SQLType { + fn new( + id: u32, + name: &'static str, + len: i16, + category_code: &'static str, + preferred: bool, + ) -> Self { + Self { + id, + name, + len, + category_code, + preferred, + } + } + } + + let sqltypes = [ + SQLType::new(oids.next(), "boolean", 1i16, "B", true), + SQLType::new(oids.next(), "bytea", -1, "A", true), + SQLType::new(oids.next(), "tinyint", 1, "N", false), + SQLType::new(oids.next(), "int2", 2, "N", false), + SQLType::new(oids.next(), "int4", 4, "N", true), + SQLType::new(oids.next(), "int8", 8, "N", true), + SQLType::new(oids.next(), "char", -1, "S", true), + SQLType::new(oids.next(), "varchar", -1, "S", true), + SQLType::new(oids.next(), "text", -1, "S", true), + SQLType::new(oids.next(), "string", -1, "S", true), + SQLType::new(oids.next(), "float4", 4, "N", false), + SQLType::new(oids.next(), "float8", 8, "N", true), + SQLType::new(oids.next(), "date", 4, "D", true), + SQLType::new(oids.next(), "time", 8, "D", true), + SQLType::new(oids.next(), "timestamp", 8, "T", true), + SQLType::new(oids.next(), "timestamptz", 8, "T", true), + SQLType::new(oids.next(), "interval", 8, "T", true), + SQLType::new(oids.next(), "numeric", -1, "N", true), + ]; + + let sqltype_by_name = + |name| -> &SQLType { sqltypes.iter().find(|&t| t.name.eq(name)).unwrap() }; + + let sqltype_for_dftype = |dftype: &DataType| -> &SQLType { + match dftype { + DataType::Null | DataType::Boolean => sqltype_by_name("boolean"), + DataType::Int8 => sqltype_by_name("tinyint"), + DataType::Int16 => sqltype_by_name("int2"), + DataType::Int32 => sqltype_by_name("int4"), + DataType::Int64 => sqltype_by_name("int8"), + DataType::UInt8 => sqltype_by_name("tinyint"), + DataType::UInt16 => sqltype_by_name("int2"), + DataType::UInt32 => sqltype_by_name("int4"), + DataType::UInt64 => sqltype_by_name("int8"), + DataType::Float16 => sqltype_by_name("float4"), + DataType::Float32 => sqltype_by_name("float4"), + DataType::Float64 => sqltype_by_name("float8"), + DataType::Timestamp(_, None) => sqltype_by_name("timestamp"), + DataType::Timestamp(_, Some(_)) => sqltype_by_name("timestamptz"), + DataType::Date32 => sqltype_by_name("date"), + DataType::Date64 => sqltype_by_name("date"), + DataType::Time32(_) => sqltype_by_name("time"), + DataType::Time64(_) => sqltype_by_name("time"), + DataType::Duration(_) => sqltype_by_name("interval"), + DataType::Interval(_) => sqltype_by_name("interval"), + DataType::Binary => sqltype_by_name("bytea"), + DataType::FixedSizeBinary(_) => sqltype_by_name("bytea"), + DataType::LargeBinary => sqltype_by_name("bytea"), + DataType::Utf8 => sqltype_by_name("string"), + DataType::LargeUtf8 => sqltype_by_name("text"), + DataType::Decimal256(_, _) | DataType::Decimal128(_, _) => { + sqltype_by_name("numeric") + } + DataType::List(_) + | DataType::FixedSizeList(_, _) + | DataType::LargeList(_) + | DataType::Struct(_) + | DataType::Union(_, _) + | DataType::Dictionary(_, _) + | DataType::Map(_, _) + | DataType::RunEndEncoded(_, _) => unimplemented!("No SQLType for {dftype}"), + } + }; + + let pg_catalog_schema_id = oids.next(); + self.dml_sql_with_params( + "INSERT INTO pg_catalog.pg_namespace VALUES($1, 'pg_catalog', $2, null)", + vec![pg_catalog_schema_id.into(), owner_id.into()], + ) + .await?; + + self.dml_sql_with_params( + "INSERT INTO pg_catalog.pg_proc VALUES(1, 'dummy', $1, $2, 0, 0, 0, 0, '', '', false, false, false, false, '', '', 0, 0, 0, '', null, null, null, null, null, '', null, null, null, null)", + vec![pg_catalog_schema_id.into(), owner_id.into()], + ) + .await?; + + for sqltype in sqltypes.iter() { + let pass_by_val = sqltype.len != -1; + self.dml_sql_with_params( + "INSERT INTO pg_catalog.pg_type + VALUES($1, /* oid */ + $2, /* typname */ + $3, /* typnamespace */ + $4, /* typowner */ + $5, /* typlen */ + $6, /* typbyval */ + 'b', /* typtype */ + $7, /* typcategory */ + $8, /* typispreferred */ + true, /* typisdefined */ + '', /* typdelim */ + 0, /* typrelid */ + 0, /* typsubscript */ + 0, /* typelem */ + 0, /* typarray */ + 0, /* typinput */ + 0, /* typoutput */ + 1, /* typreceive */ + 0, /* typsend */ + 0, /* typmodin */ + 0, /* typmodout */ + 0, /* typanalyze */ + 'c', /* typalign */ + 'p', /* typstorage */ + false, /* typnotnull */ + 0, /* typbasetype */ + 0, /* typtypmod */ + 0, /* typndims */ + 0, /* typcollation */ + null, /* typdefaultbin */ + null, /* typdefault */ + null /* typacl */ + )", + vec![ + sqltype.id.into(), + sqltype.name.into(), + pg_catalog_schema_id.into(), + owner_id.into(), + sqltype.len.into(), + pass_by_val.into(), + sqltype.category_code.into(), + sqltype.preferred.into(), + ], + ) + .await?; + } + + let catalog_list = state.catalog_list(); + for catalog_name in catalog_list.catalog_names() { + let catalog_id = oids.next(); + self.dml_sql_with_params( + "INSERT INTO pg_catalog.pg_database + VALUES($1, $2, $3, 0, 'c', false, true, -1, 0, 0, $4, 'C.utf8', 'C.utf8', null, null, null, null)", + vec![catalog_id.into(), catalog_name.as_str().into(), owner_id.into(), default_table_space_id.into()], + ) + .await?; + + let catalog = catalog_list.catalog(&catalog_name).unwrap(); + for schema_name in catalog.schema_names().into_iter() { + let schema_id = oids.next(); + self.dml_sql_with_params( + "INSERT INTO pg_catalog.pg_namespace VALUES($1, $2, $3, null)", + vec![ + schema_id.into(), + schema_name.as_str().into(), + owner_id.into(), + ], + ) + .await?; + + let schema = catalog.schema(&schema_name).unwrap(); + for table_name in schema.table_names() { + let table = schema.table(&table_name).await.unwrap(); + let table_id = oids.next(); + self.dml_sql_with_params( + "INSERT INTO pg_catalog.pg_class + VALUES($1, $2, $3, 0, 0, $4, 0, 0, 0, 0, -1, -1, 0, false, false, 'p', 'r', $5, 0, false, false, false, false, false, true, 'd', false, 0, 1, 1, null, null, null)", + vec![ + table_id.into(), + table_name.as_str().into(), + schema_id.into(), + owner_id.into(), + (table.schema().fields().len() as i16).into(), + ], + ) + .await?; + + for (i, field) in table.schema().fields().into_iter().enumerate() { + let sqltype = sqltype_for_dftype(field.data_type()); + let ordinal = i as i16 + 1; + let pass_by_val = sqltype.len != -1; + self.dml_sql_with_params( + "INSERT INTO pg_catalog.pg_attribute + VALUES($1, $2, $3, $4, $5, -1, -1, 0, $6, 'b', 'p', '', $7, false, false, '', '', false, true, 0, 0, 0, null, null, null, null)", + vec![ + table_id.into(), + field.name().as_str().into(), + sqltype.id.into(), + sqltype.len.into(), + ordinal.into(), + pass_by_val.into(), + (!field.is_nullable()).into(), + ], + ) + .await?; + } + } + } + } + } + + Ok(()) + } } /// A custom datasource, used to represent a datastore with a single index @@ -654,139 +1794,3 @@ fn normalize_ident(id: ast::Ident) -> String { None => id.value.to_ascii_lowercase(), } } - -macro_rules! nullable_helper { - (nullable) => { - true - }; - () => { - false - }; -} - -macro_rules! schema { - ({$($name:literal: $type:path $(: $nullable:ident)?),* $(,)?}) => {{ - let v = vec![$(Field::new($name, $type, nullable_helper!($($nullable)?))),*]; - - Arc::new(Schema::new(v)) - }}; -} - -struct InformationSchemaProviderWrapper { - pub inner: InformationSchemaProvider, -} - -#[async_trait] -impl SchemaProvider for InformationSchemaProviderWrapper { - fn as_any(&self) -> &(dyn Any + 'static) { - self - } - - fn table_names(&self) -> Vec { - let mut names = self.inner.table_names(); - names.extend( - InformationSchemaEmptyTable::TABLES - .iter() - .map(ToString::to_string), - ); - names - } - - async fn table(&self, name: &str) -> Option> { - let name_lowercase = name.to_ascii_lowercase(); - if InformationSchemaEmptyTable::TABLES.contains(&name_lowercase.as_str()) { - Some(Arc::new(InformationSchemaEmptyTable::new(name_lowercase))) - } else { - self.inner.table(name).await - } - } - - fn table_exist(&self, name: &str) -> bool { - InformationSchemaEmptyTable::TABLES.contains(&name) || self.inner.table_exist(name) - } -} - -#[derive(Debug)] -struct InformationSchemaEmptyTable { - table: String, -} - -impl InformationSchemaEmptyTable { - const TABLES: [&str; 3] = [ - "referential_constraints", - "key_column_usage", - "table_constraints", - ]; - - fn new(table: String) -> Self { - Self { table } - } -} - -#[async_trait] -impl TableProvider for InformationSchemaEmptyTable { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - match self.table.as_str() { - "referential_constraints" => schema!({ - "constraint_catalog" : DataType::Utf8, - "constraint_schema" : DataType::Utf8, - "constraint_name" : DataType::Utf8, - "unique_constraint_catalog" : DataType::Utf8, - "unique_constraint_schema" : DataType::Utf8, - "unique_constraint_name" : DataType::Utf8, - "match_option" : DataType::Utf8, - "update_rule" : DataType::Utf8, - "delete_rule" : DataType::Utf8, - }), - "key_column_usage" => schema!({ - "constraint_catalog" : DataType::Utf8, - "constraint_schema" : DataType::Utf8, - "constraint_name" : DataType::Utf8, - "table_catalog" : DataType::Utf8, - "table_schema" : DataType::Utf8, - "table_name" : DataType::Utf8, - "column_name" : DataType::Utf8, - "ordinal_position" : DataType::UInt32, - "position_in_unique_constraint" : DataType::UInt32, - - }), - "table_constraints" => schema!({ - "constraint_catalog" : DataType::Utf8, - "constraint_schema" : DataType::Utf8, - "constraint_name" : DataType::Utf8, - "table_catalog" : DataType::Utf8, - "table_schema" : DataType::Utf8, - "table_name" : DataType::Utf8, - "constraint_type" : DataType::Utf8, - "is_deferrable" : DataType::Utf8, - "initially_deferred" : DataType::Utf8, - "enforced" : DataType::Utf8, - "nulls_distinct" : DataType::Utf8, - }), - _ => unreachable!(), - } - } - - fn table_type(&self) -> TableType { - TableType::View - } - - async fn scan( - &self, - _state: &SessionState, - projection: Option<&Vec>, - // filters and limit can be used here to inject some push-down operations if needed - _filters: &[Expr], - _limit: Option, - ) -> Result> { - Ok(Arc::new(MemoryExec::try_new( - &[], - self.schema(), - projection.cloned(), - )?)) - } -} diff --git a/dozer-api/src/sql/datafusion/pg_catalog.rs b/dozer-api/src/sql/datafusion/pg_catalog.rs deleted file mode 100644 index 3f5c0f124b..0000000000 --- a/dozer-api/src/sql/datafusion/pg_catalog.rs +++ /dev/null @@ -1,465 +0,0 @@ -use std::{any::Any, sync::Arc}; - -use async_trait::async_trait; -use datafusion::{ - arrow::{ - array::{StringBuilder, UInt32Builder}, - datatypes::SchemaRef, - record_batch::RecordBatch, - }, - common::Constraints, - datasource::TableProvider, - error::Result, - execution::context::SessionState, - physical_plan::{memory::MemoryExec, ExecutionPlan}, - sql::TableReference, -}; -use datafusion_expr::{Expr, TableType}; - -#[derive(Debug)] -pub struct PgCatalogTable { - table: String, - schema: SchemaRef, - state: Arc, - constraints: Constraints, -} - -impl PgCatalogTable { - pub fn from_ref_with_state( - reference: &TableReference, - state: Arc, - ) -> Option { - match reference.schema() { - Some("pg_catalog") | None => (), - _ => return None, - } - let table = reference.table(); - let (schema, constraints) = match table { - "pg_type" => schemas::pg_type(), - "pg_namespace" => schemas::pg_namespace(), - "pg_proc" => schemas::pg_proc(), - "pg_class" => schemas::pg_class(), - "pg_attribute" => schemas::pg_attribute(), - "pg_description" => schemas::pg_description(), - "pg_attrdef" => schemas::pg_attrdef(), - "pg_enum" => schemas::pg_enum(), - "pg_index" => schemas::pg_index(), - "pg_constraint" => schemas::pg_constraint(), - _ => return None, - }; - let table = table.to_string(); - Some(Self { - constraints, - table, - schema, - state, - }) - } -} - -#[async_trait] -impl TableProvider for PgCatalogTable { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.schema.clone() - } - - fn table_type(&self) -> TableType { - TableType::View - } - - fn constraints(&self) -> Option<&Constraints> { - Some(&self.constraints) - } - - async fn scan( - &self, - _state: &SessionState, - projection: Option<&Vec>, - // filters and limit can be used here to inject some push-down operations if needed - _filters: &[Expr], - _limit: Option, - ) -> Result> { - Ok(Arc::new(MemoryExec::try_new( - &[self.records()], - self.schema.clone(), - projection.cloned(), - )?)) - } -} - -impl PgCatalogTable { - fn records(&self) -> Vec { - match self.table.as_str() { - "pg_namespace" => self.pg_namespace(), - "pg_class" => self.pg_class(), - _ => vec![], - } - } - - fn pg_namespace(&self) -> Vec { - let mut oid = UInt32Builder::new(); - let mut nspname = StringBuilder::new(); - let mut nspowner = UInt32Builder::new(); - let mut nspacl = StringBuilder::new(); - - for (i, catalog) in self - .state - .catalog_list() - .catalog_names() - .into_iter() - .enumerate() - { - oid.append_value(i as u32); - nspname.append_value(catalog); - nspowner.append_value(0); - nspacl.append_null(); - } - - let schema = self.schema.clone(); - vec![RecordBatch::try_new( - schema, - vec![ - Arc::new(oid.finish()), - Arc::new(nspname.finish()), - Arc::new(nspowner.finish()), - Arc::new(nspacl.finish()), - ], - ) - .unwrap()] - } - - fn pg_class(&self) -> Vec { - let mut oid = UInt32Builder::new(); - let mut relname = StringBuilder::new(); - let mut relnamespace = UInt32Builder::new(); - let mut relkind = StringBuilder::new(); - - for (i, table) in self - .state - .catalog_list() - .catalog("public") - .unwrap() - .schema("dozer") - .unwrap() - .table_names() - .into_iter() - .enumerate() - { - oid.append_value(i as u32); - relname.append_value(table); - relnamespace.append_value(0); - relkind.append_value("r"); - } - - let schema = self.schema.clone(); - vec![RecordBatch::try_new( - schema, - vec![ - Arc::new(oid.finish()), - Arc::new(relname.finish()), - Arc::new(relnamespace.finish()), - Arc::new(relkind.finish()), - ], - ) - .unwrap()] - } -} - -pub mod schemas { - use datafusion::arrow::datatypes::{DataType, Field, Schema}; - use datafusion::common::{Constraint, Constraints}; - use std::sync::Arc; - - #[derive(PartialEq)] - enum Modifier { - Nullable, - PrimaryKey, - None, - } - - macro_rules! nullable_helper { - (nullable) => { - Modifier::Nullable - }; - (primary_key) => { - Modifier::PrimaryKey - }; - () => { - Modifier::None - }; - } - - macro_rules! schema { - ({$($name:literal: $type:expr $(=> $modifier:ident)?),* $(,)?} $(, unique($($uniq_field:literal),+))*) => {{ - #![allow(unused_assignments)] - #![allow(unused_mut)] - let mut fields = Vec::new(); - let mut primary_key = Vec::new(); - let mut i = 0; - $( - let modifier = nullable_helper!($($modifier)?); - let nullable = modifier == Modifier::Nullable; - if modifier == Modifier::PrimaryKey { - primary_key.push(i); - } - fields.push(Field::new($name, $type, nullable)); - i += 1; - )* - let mut indexes = vec![Constraint::PrimaryKey(primary_key)]; - $( - let mut elems = Vec::new(); - $( - let elem = fields.iter().position(|field| field.name() == $uniq_field).expect("Unique index field not found"); - elems.push(elem); - )+ - indexes.push(Constraint::Unique(elems)); - )* - (Arc::new(Schema::new(fields)), Constraints::new_unverified(indexes)) - }}; - } - - pub fn pg_type() -> (Arc, Constraints) { - schema!({ - "oid" : DataType::UInt32 => primary_key, - "typname" : DataType::Utf8, - "typnamespace" : DataType::Utf8, - "typowner" : DataType::Utf8, - "typlen" : DataType::Int16, - "typbyval" : DataType::Boolean, - "typtype" : DataType::Utf8, - "typcategory" : DataType::Utf8, - "typispreferred" : DataType::Boolean, - "typisdefined" : DataType::Boolean, - "typdelim" : DataType::Utf8, - "typrelid" : DataType::Utf8, - "typelem" : DataType::Utf8, - "typarray" : DataType::Utf8, - "typinput" : DataType::Utf8, - "typoutput" : DataType::Utf8, - "typreceive" : DataType::Utf8, - "typsend" : DataType::Utf8, - "typmodin" : DataType::Utf8, - "typmodout" : DataType::Utf8, - "typanalyze" : DataType::Utf8, - "typalign" : DataType::Utf8, - "typstorage" : DataType::Utf8, - "typnotnull" : DataType::Boolean, - "typbasetype" : DataType::UInt32, - "typtypmod" : DataType::Int32, - "typndims" : DataType::Int32, - "typcollation" : DataType::Utf8, - "typdefaultbin" : DataType::Binary => nullable, - "typdefault" : DataType::Utf8 => nullable, - "typacl" : DataType::Utf8 => nullable, - }, unique("typname", "typnamespace")) - } - pub fn pg_namespace() -> (Arc, Constraints) { - schema!({ - "oid" : DataType::UInt32 => primary_key, - "nspname" : DataType::Utf8, - "nspowner" : DataType::UInt32, - "nspacl" : DataType::Utf8 => nullable, - }, unique("nspname")) - } - - pub fn pg_proc() -> (Arc, Constraints) { - schema!({ - "oid" : DataType::UInt32 => primary_key, - "proname" : DataType::Utf8, - "pronamespace" : DataType::UInt32, - "proowner" : DataType::UInt32, - "prolang" : DataType::UInt32, - "procost" : DataType::Float64, - "prorows" : DataType::Float64, - "provariadic" : DataType::UInt32, - "prosupport" : DataType::UInt32, - "prokind" : DataType::Utf8, - "prosecdef" : DataType::Boolean, - "proleakproof" : DataType::Boolean, - "proisstrict" : DataType::Boolean, - "proretset" : DataType::Boolean, - "provolatile" : DataType::Utf8, - "proparallel" : DataType::Utf8, - "pronargs" : DataType::Int16, - "pronargdefaults" : DataType::Int16, - "prorettype" : DataType::UInt32, - "proargtypes" : DataType::Utf8, - "proallargtypes" : DataType::Utf8 => nullable, - "proargmodes" : DataType::Utf8 => nullable, - "proargnames" : DataType::Utf8 => nullable, - "proargdefaults" : DataType::Utf8 => nullable, - "protrftypes" : DataType::Utf8 => nullable, - "prosrc" : DataType::Utf8, - "probin" : DataType::Utf8 => nullable, - "prosqlbody" : DataType::Utf8 => nullable, - "proconfig" : DataType::Utf8 => nullable, - "proacl" : DataType::Utf8 => nullable, - }, unique("proname", "proargtypes", "pronamespace")) - } - - pub fn pg_attribute() -> (Arc, Constraints) { - schema!({ - "attrelid" : DataType::UInt32 => primary_key, - "attname" : DataType::Utf8, - "atttypid" : DataType::UInt32, - "attlen" : DataType::Int16, - "attnum" : DataType::Int16 => primary_key, - "attcacheoff" : DataType::Int32, - "atttypmod" : DataType::Int32, - "attndims" : DataType::Int16, - "attbyval" : DataType::Boolean, - "attalign" : DataType::Utf8, - "attstorage" : DataType::Utf8, - "attcompression" : DataType::Utf8, - "attnotnull" : DataType::Boolean, - "atthasdef" : DataType::Boolean, - "atthasmissing" : DataType::Boolean, - "attidentity" : DataType::Utf8, - "attgenerated" : DataType::Utf8, - "attisdropped" : DataType::Boolean, - "attislocal" : DataType::Boolean, - "attinhcount" : DataType::UInt16, - "attstattarget" : DataType::UInt16, - "attcollation" : DataType::UInt32, - "attacl" : DataType::Utf8 => nullable, - "attoptions" : DataType::Utf8 => nullable, - "attfdwoptions" : DataType::Utf8 => nullable, - "attmissingval" : DataType::Utf8 => nullable, - }, unique("attrelid", "attname")) - } - - pub fn pg_class() -> (Arc, Constraints) { - schema! ({ - "oid" : DataType::UInt32 => primary_key, - "relname" : DataType::Utf8, - "relnamespace" : DataType::UInt32, - // "reltype" : DataType::UInt32, - // "reloftype" : DataType::UInt32, - // "relowner" : DataType::UInt32, - // "relam" : DataType::UInt32, - // "relfilenode" : DataType::UInt32, - // "reltablespace" : DataType::UInt32, - // "relpages" : DataType::Int32, - // "reltuples" : DataType::Float64, - // "relallvisible" : DataType::Int32, - // "reltoastrelid" : DataType::UInt32, - // "relhasindex" : DataType::Boolean, - // "relisshared" : DataType::Boolean, - // "relpersistence" : DataType::Utf8, - "relkind" : DataType::Utf8, - // "relnatts" : DataType::Int16, - // "relchecks" : DataType::Int16, - // "relhasrules" : DataType::Boolean, - // "relhastriggers" : DataType::Boolean, - // "relhassubclass" : DataType::Boolean, - // "relrowsecurity" : DataType::Boolean, - // "relforcerowsecurity" : DataType::Boolean, - // "relispopulated" : DataType::Boolean, - // "relreplident" : DataType::Utf8, - // "relispartition" : DataType::Boolean, - // "relrewrite" : DataType::UInt32, - // "relfrozenxid" : DataType::UInt32, - // "relminmxid" : DataType::UInt32, - // "relacl" : DataType::Utf8 : nullable, - // "reloptions" : DataType::Utf8 : nullable, - // "relpartbound" : DataType::Utf8 : nullable, - }, unique("relname", "relnamespace")) - } - - pub fn pg_description() -> (Arc, Constraints) { - schema!({ - "objoid" : DataType::UInt32 => primary_key, - "classoid" : DataType::UInt32 => primary_key, - "objsubid" : DataType::Int32 => primary_key, - "description" : DataType::Utf8, - }) - } - - pub fn pg_attrdef() -> (Arc, Constraints) { - schema!({ - "oid" : DataType::UInt32 => primary_key, - "adrelid" : DataType::UInt32, - "adnum" : DataType::Int16, - "adbin" : DataType::Utf8, - }, unique("adrelid", "adnum")) - } - - pub fn pg_enum() -> (Arc, Constraints) { - schema!({ - "oid" : DataType::UInt32 => primary_key, - "enumtypid" : DataType::UInt32, - "enumsortorder" : DataType::Float64, - "enumlabel" : DataType::Utf8, - }, - unique("enumtypid", "enumlabel"), unique("enumtypid", "enumsortorder") - ) - } - - pub(crate) fn pg_index() -> (Arc, Constraints) { - let int2list = DataType::List(Arc::new(Field::new("indkey", DataType::Int16, false))); - schema!({ - "indexrelid" : DataType::UInt32 => primary_key, - "indrelid" : DataType::UInt32, - "indnatts" : DataType::Int16, - "indnkeyatts" : DataType::Int16, - "indisunique" : DataType::Boolean, - "indnullsnotdistinct" : DataType::Boolean, - "indisprimary" : DataType::Boolean, - "indisexclusion" : DataType::Boolean, - "indimmediate" : DataType::Boolean, - "indisclustered" : DataType::Boolean, - "indisvalid" : DataType::Boolean, - "indcheckxmin" : DataType::Boolean, - "indisready" : DataType::Boolean, - "indislive" : DataType::Boolean, - "indisreplident" : DataType::Boolean, - "indkey" : int2list, - "indcollation" : DataType::Utf8, - "indclass" : DataType::Utf8, - "indoption" : DataType::Utf8, - "indexprs" : DataType::Utf8 => nullable, - "indpred" : DataType::Utf8 => nullable, - }) - } - - pub(crate) fn pg_constraint() -> (Arc, Constraints) { - fn oidlist(name: &'static str) -> DataType { - DataType::List(Arc::new(Field::new(name, DataType::UInt32, false))) - } - fn smallintlist(name: &'static str) -> DataType { - DataType::List(Arc::new(Field::new(name, DataType::Int16, false))) - } - schema!({ - "oid" : DataType::UInt32 => primary_key, - "conname" : DataType::Utf8, - "connamespace" : DataType::UInt32, - "contype" : DataType::UInt8, - "condeferrable" : DataType::Boolean, - "condeferred" : DataType::Boolean, - "convalidated" : DataType::Boolean, - "conrelid" : DataType::UInt32, - "contypid" : DataType::UInt32, - "conindid" : DataType::UInt32, - "conparentid" : DataType::UInt32, - "confrelid" : DataType::UInt32, - "confupdtype" : DataType::UInt8, - "confdeltype" : DataType::UInt8, - "confmatchtype" : DataType::UInt8, - "conislocal" : DataType::Boolean, - "coninhcount" : DataType::Int16, - "connoinherit" : DataType::Boolean, - "conkey" : smallintlist("conkey") => nullable, - "confkey" : smallintlist("confkey") => nullable, - "conpfeqop" : oidlist("conpfeqop") => nullable, - "conppeqop" : oidlist("conppeqop") => nullable, - "conffeqop" : oidlist("conffeqop") => nullable, - "confdelsetcols" : smallintlist("confdelsetcols") => nullable, - "conexclop" : oidlist("conexclop") => nullable, - "conbin" : DataType::Utf8 => nullable, - }, unique("conrelid", "contypid", "conname")) - } -} diff --git a/dozer-api/src/sql/pgwire.rs b/dozer-api/src/sql/pgwire.rs index 7f47f5ad6c..6f65192684 100644 --- a/dozer-api/src/sql/pgwire.rs +++ b/dozer-api/src/sql/pgwire.rs @@ -419,6 +419,9 @@ fn encode_field( column_data_type: &DataType, row_index: usize, ) -> Result<(), PgWireError> { + if column_data.is_null(row_index) { + return encoder.encode_field(&None::); + } match column_data_type { DataType::Null => encoder.encode_field(&None::), DataType::Boolean => { diff --git a/dozer-types/Cargo.toml b/dozer-types/Cargo.toml index 5e23abbce1..60716ca3a6 100644 --- a/dozer-types/Cargo.toml +++ b/dozer-types/Cargo.toml @@ -17,7 +17,7 @@ ahash = "0.8.3" thiserror = "1.0.48" parking_lot = "0.12" bytes = "1.4.0" -indexmap = "1.9.3" +indexmap = "2.1.0" ordered-float = { version = "3.9.1", features = ["serde"] } tracing = "0.1.36" log = "0.4.17"