Skip to content

Commit

Permalink
fix: add more information schema tables
Browse files Browse the repository at this point in the history
  • Loading branch information
Solomon committed Nov 22, 2023
1 parent ffabea7 commit 52019df
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 9 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dozer-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ serde_json = { version = "1.0.107", features = ["arbitrary_precision"] }
pgwire = "0.16.1"
tempdir = "0.3.7"
postgres-types = "0.2"
futures-sink = "0.3.29"
114 changes: 108 additions & 6 deletions dozer-api/src/sql/datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ use std::collections::HashMap;
use std::{any::Any, sync::Arc};

use async_trait::async_trait;
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::catalog::information_schema::InformationSchemaProvider;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::config::ConfigOptions;
use datafusion::datasource::streaming::StreamingTable;
use datafusion::datasource::{DefaultTableSource, TableProvider, TableType};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::{SessionState, TaskContext};
use datafusion::physical_expr::var_provider::is_system_variables;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Expand Down Expand Up @@ -120,7 +122,10 @@ impl ContextProvider for ContextResolver {
return Some(Arc::new(ScalarUDF {
name: "pg_catalog.format_type".to_owned(),
signature: datafusion_expr::Signature {
type_signature: datafusion_expr::TypeSignature::Exact(vec![DataType::UInt32, DataType::Int32]),
type_signature: datafusion_expr::TypeSignature::Exact(vec![
DataType::UInt32,
DataType::Int32,
]),
volatility: datafusion_expr::Volatility::Immutable,
},
return_type: Arc::new(|_| Ok(Arc::new(DataType::Utf8))),
Expand Down Expand Up @@ -221,9 +226,9 @@ impl SQLExecutor {
resolved_ref: ResolvedTableReference<'_>,
) -> Result<Arc<dyn SchemaProvider>> {
if state.config().information_schema() && resolved_ref.schema == "information_schema" {
return Ok(Arc::new(InformationSchemaProvider::new(
state.catalog_list().clone(),
)));
return Ok(Arc::new(InformationSchemaProviderWrapper {
inner: InformationSchemaProvider::new(state.catalog_list().clone()),
}));
}

state
Expand All @@ -241,8 +246,15 @@ impl SQLExecutor {
})
}

pub async fn parse(&self, sql: &str) -> Result<Option<LogicalPlan>, DataFusionError> {
pub async fn parse(&self, mut sql: &str) -> Result<Option<LogicalPlan>, DataFusionError> {
println!("@@ query: {sql}");
if sql
.to_ascii_lowercase()
.trim_start()
.starts_with("select character_set_name")
{
sql = "select 'UTF8'"
}
let mut statement = self.ctx.state().sql_to_statement(sql, "postgres")?;
let rewrite = if let datafusion::sql::parser::Statement::Statement(ref stmt) = statement {
match stmt.as_ref() {
Expand Down Expand Up @@ -544,3 +556,93 @@ fn normalize_ident(id: ast::Ident) -> String {
None => id.value.to_ascii_lowercase(),
}
}

macro_rules! nullable_helper {
(nullable) => {
true
};
() => {
false
};
}

macro_rules! schema {
({$($name:literal: $type:path $(: $nullable:ident)?),* $(,)?}) => {{
let v = vec![$(Field::new($name, $type, nullable_helper!($($nullable)?))),*];

Arc::new(Schema::new(v))
}};
}

struct InformationSchemaProviderWrapper {
pub inner: InformationSchemaProvider,
}

#[async_trait]
impl SchemaProvider for InformationSchemaProviderWrapper {
fn as_any(&self) -> &(dyn Any + 'static) {
self
}

fn table_names(&self) -> Vec<String> {
let mut names = self.inner.table_names();
names.extend(["referential_constraints".to_string()]);
names
}

async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
if name.eq_ignore_ascii_case("referential_constraints") {
Some(Arc::new(InformationSchemaReferentialConstraints))
} else {
self.inner.table(name).await
}
}

fn table_exist(&self, name: &str) -> bool {
name.to_ascii_lowercase().as_str() == "referential_constraints"
|| self.inner.table_exist(name)
}
}

#[derive(Debug)]
struct InformationSchemaReferentialConstraints;

#[async_trait]
impl TableProvider for InformationSchemaReferentialConstraints {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
schema!({
"constraint_catalog" : DataType::Utf8,
"constraint_schema" : DataType::Utf8,
"constraint_name" : DataType::Utf8,
"unique_constraint_catalog" : DataType::Utf8,
"unique_constraint_schema" : DataType::Utf8,
"unique_constraint_name" : DataType::Utf8,
"match_option" : DataType::Utf8,
"update_rule" : DataType::Utf8,
"delete_rule" : DataType::Utf8,
})
}

fn table_type(&self) -> TableType {
TableType::View
}

async fn scan(
&self,
_state: &SessionState,
projection: Option<&Vec<usize>>,
// filters and limit can be used here to inject some push-down operations if needed
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(MemoryExec::try_new(
&[],
self.schema(),
projection.cloned(),
)?))
}
}
5 changes: 4 additions & 1 deletion dozer-api/src/sql/pgwire.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ use dozer_types::arrow::datatypes::{TimeUnit, DECIMAL_DEFAULT_SCALE};
use dozer_types::log::{debug, info};
use dozer_types::models::api_config::{default_host, default_sql_port, SqlOptions};
use dozer_types::rust_decimal::Decimal;
use futures_sink::Sink;
use futures_util::stream::BoxStream;
use futures_util::{stream, StreamExt};
use futures_util::{stream, SinkExt, StreamExt};
use pgwire::api::portal::Portal;
use pgwire::api::stmt::QueryParser;
use pgwire::api::store::MemPortalStore;
use pgwire::messages::data::DataRow;
use pgwire::messages::extendedquery::{Bind, BindComplete, ParseComplete};
use pgwire::messages::PgWireBackendMessage;
use tokio::net::TcpListener;

use pgwire::api::auth::noop::NoopStartupHandler;
Expand Down

0 comments on commit 52019df

Please sign in to comment.