Skip to content

Commit

Permalink
fix: SQL layer fixes (#2235)
Browse files Browse the repository at this point in the history
* Ignore transactions in sql layer

* Fix response message for transaction start/end & implement some system
catalogs

* Implement describe. Parameter type inference is a bit WIP

* Add pg_types dep to dozer-api

* Add pg_proc catalog

* Add pg_class

* Add pg_attribute

* Don't share QueryProcessor between sessions

* fix: support some system variables with SHOW

* chore: improve schema macro syntax

* fix: add some catalog tables and functions

* fix: add more information schema tables

* Update to patched datafusion

* Support ODBC (at least unixodbc)

* Support more of postgres for superset

* pg_catalog wip

* fix: SQL AST rewrite for SUM('1') as SUM(1)

* Adjust type names

* fix: add more pg_catalog tables and rewrite format_type function

* Update datafusion fork for boolean coercion

* Make some UDFs more lenient with types, because array types aren't
supported by datafusion sql

* fix: rewrite a = ANY (b) to a in (b)

* fix: use patched datafusion with disabled field-name ambiguity check

* chore: optimize: reuse as much state as possible between queries

Reduce the cost of starting a new session and running a query over REST
by building pg_catalog and information schema tables only once when dozer starts.

* Correctness fixes for bugs exposed by PG JDBC

* chore: update datafusion repo url

* chore: clippy fixes

* chore: remove println

* chore: add information_schema.character_sets

* chore: clippy fix

* chore: add enabled flag in pgwire config

* chore: add enable_sql flag in rest config

---------

Co-authored-by: Jesse Bakker <[email protected]>
  • Loading branch information
abcpro1 and Jesse-Bakker authored Dec 13, 2023
1 parent c932fb0 commit c9df9af
Show file tree
Hide file tree
Showing 30 changed files with 3,560 additions and 1,911 deletions.
3,171 changes: 1,483 additions & 1,688 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,19 @@ resolver = "2"

[workspace.dependencies]
bincode = { version = "2.0.0-rc.3", features = ["derive"]}
datafusion = { version = "33.0.0"}
datafusion-expr = { version = "33.0.0"}

[patch.crates-io]
postgres = { git = "https://github.com/getdozer/rust-postgres" }
postgres-protocol = { git = "https://github.com/getdozer/rust-postgres" }
postgres-types = { git = "https://github.com/getdozer/rust-postgres" }
tokio-postgres = { git = "https://github.com/getdozer/rust-postgres" }


datafusion = { git = "https://github.com/getdozer/arrow-datafusion" }
datafusion-expr = { git = "https://github.com/getdozer/arrow-datafusion" }
datafusion-physical-expr = { git = "https://github.com/getdozer/arrow-datafusion" }
datafusion-physical-plan = { git = "https://github.com/getdozer/arrow-datafusion" }
datafusion-sql = { git = "https://github.com/getdozer/arrow-datafusion" }
datafusion-proto = { git = "https://github.com/getdozer/arrow-datafusion" }
datafusion-common = { git = "https://github.com/getdozer/arrow-datafusion" }
8 changes: 6 additions & 2 deletions dozer-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@ pin-project = "1.1.3"
async-stream = "0.3.5"
uuid = "1.6.1"
chrono = "0.4.31"
datafusion = "32.0.0"
datafusion-expr = "32.0.0"
datafusion = { workspace = true }
datafusion-expr = { workspace = true }
serde_json = { version = "1.0.108", features = ["arbitrary_precision"] }
pgwire = "0.16.1"
tempdir = "0.3.7"
postgres-types = "0.2"
futures-sink = "0.3.29"
async-once-cell = "0.5.3"
genawaiter = "0.99.1"
2 changes: 2 additions & 0 deletions dozer-api/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub enum ApiInitError {
CacheNotFound(Labels),
#[error("Failed to bind to address {0}: {1}")]
FailedToBindToAddress(String, #[source] std::io::Error),
#[error("Failed to initialize SQL engine: {0}")]
SQLEngineError(#[source] DataFusionError),
}

#[derive(Error, Debug)]
Expand Down
28 changes: 22 additions & 6 deletions dozer-api/src/rest/api_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::sync::Arc;

use actix_web::web::ReqData;
use actix_web::{web, HttpResponse};
use datafusion::common::plan_datafusion_err;
use datafusion::error::DataFusionError;
use dozer_cache::cache::expression::{QueryExpression, Skip};
use dozer_cache::cache::CacheRecord;
use dozer_cache::{CacheReader, Phase};
Expand All @@ -14,7 +16,7 @@ use openapiv3::OpenAPI;
use crate::api_helper::{get_record, get_records, get_records_count};
use crate::generator::oapi::generator::OpenApiGenerator;
use crate::sql::datafusion::json::record_batches_to_json_rows;
use crate::sql::datafusion::SQLExecutor;
use crate::sql::datafusion::{PlannedStatement, SQLExecutor};
use crate::CacheEndpoint;
use crate::{auth::Access, errors::ApiError};
use dozer_types::grpc_types::health::health_check_response::ServingStatus;
Expand Down Expand Up @@ -177,16 +179,30 @@ pub async fn get_phase(
Ok(web::Json(phase))
}

pub async fn sql(
pub(crate) async fn sql(
// access: Option<ReqData<Access>>, // TODO:
cache_endpoints: web::Data<Vec<Arc<CacheEndpoint>>>,
sql_executor: web::Data<Option<Arc<SQLExecutor>>>,
sql: extractor::SQLQueryExtractor,
) -> Result<actix_web::HttpResponse, crate::errors::ApiError> {
let cache_endpoints = (*cache_endpoints.into_inner()).clone();
let sql_executor = SQLExecutor::new(cache_endpoints);
let Some(sql_executor) = sql_executor.as_deref() else {
return Ok(HttpResponse::NotFound().json(json!({ "error": "SQL endpoint is disabled" })));
};
let query = sql.0 .0;
let planned = sql_executor
.parse(&query)
.await
.map_err(ApiError::SQLQueryFailed)?;
if planned.len() > 1 {
return Err(ApiError::SQLQueryFailed(plan_datafusion_err!(
"More than one query supplied"
)));
}
let Some(PlannedStatement::Query(plan)) = planned.first().cloned() else {
// This was a transaction statement, which doesn't require a result
return Ok(HttpResponse::Ok().json(json!({})));
};
let record_batches = sql_executor
.execute(&query)
.execute(plan)
.await
.map_err(ApiError::SQLQueryFailed)?
.collect()
Expand Down
18 changes: 17 additions & 1 deletion dozer-api/src/rest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::api_helper::get_api_security;
// Exports
use crate::errors::ApiInitError;
use crate::rest::api_generator::health_route;
use crate::sql::datafusion::SQLExecutor;
use crate::{
auth::api::{auth_route, validate},
CacheEndpoint,
Expand Down Expand Up @@ -49,6 +50,7 @@ pub struct ApiServer {
security: Option<ApiSecurity>,
host: String,
default_max_num_records: usize,
enable_sql: bool,
}

impl Default for ApiServer {
Expand All @@ -60,6 +62,7 @@ impl Default for ApiServer {
security: None,
host: "0.0.0.0".to_owned(),
default_max_num_records: 50,
enable_sql: true,
}
}
}
Expand All @@ -77,8 +80,10 @@ impl ApiServer {
security,
host: rest_config.host.unwrap_or_else(default_host),
default_max_num_records,
enable_sql: rest_config.enable_sql.unwrap_or(true),
}
}

fn get_cors(cors: CorsOptions) -> Cors {
match cors {
CorsOptions::Permissive => Cors::permissive(),
Expand All @@ -95,6 +100,7 @@ impl ApiServer {
mut cache_endpoints: Vec<Arc<CacheEndpoint>>,
labels: LabelsAndProgress,
default_max_num_records: usize,
sql_executor: Option<Arc<SQLExecutor>>,
) -> App<
impl ServiceFactory<
ServiceRequest,
Expand All @@ -113,6 +119,7 @@ impl ApiServer {
.app_data(web::Data::new(endpoint_paths))
.app_data(web::Data::new(default_max_num_records))
.app_data(web::Data::new(cache_endpoints.clone()))
.app_data(web::Data::new(sql_executor))
.app_data(cfg)
.wrap(Logger::default())
.wrap(TracingLogger::default())
Expand Down Expand Up @@ -171,7 +178,7 @@ impl ApiServer {
.wrap(cors_middleware)
}

pub fn run(
pub async fn run(
self,
cache_endpoints: Vec<Arc<CacheEndpoint>>,
shutdown: impl Future<Output = ()> + Send + 'static,
Expand All @@ -190,13 +197,22 @@ impl ApiServer {

let address = format!("{}:{}", self.host, self.port);
let default_max_num_records = self.default_max_num_records;
let sql_executor = if self.enable_sql {
let sql_executor = SQLExecutor::try_new(&cache_endpoints)
.await
.map_err(ApiInitError::SQLEngineError)?;
Some(Arc::new(sql_executor))
} else {
None
};
let server = HttpServer::new(move || {
ApiServer::create_app_entry(
security.clone(),
cors.clone(),
cache_endpoints.clone(),
labels.clone(),
default_max_num_records,
sql_executor.clone(),
)
})
.bind(&address)
Expand Down
2 changes: 2 additions & 0 deletions dozer-api/src/rest/tests/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ async fn check_status(
)],
Default::default(),
50,
None,
);
let app = actix_web::test::init_service(api_server).await;

Expand Down Expand Up @@ -104,6 +105,7 @@ async fn _call_auth_token_api(
)],
Default::default(),
50,
None,
);
let app = actix_web::test::init_service(api_server).await;

Expand Down
3 changes: 3 additions & 0 deletions dozer-api/src/rest/tests/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ async fn list_route() {
)],
Default::default(),
50,
None,
);
let app = actix_web::test::init_service(api_server).await;

Expand Down Expand Up @@ -230,6 +231,7 @@ async fn path_collision_test() {
],
Default::default(),
50,
None,
);
let app = actix_web::test::init_service(api_server).await;

Expand Down Expand Up @@ -257,6 +259,7 @@ async fn setup_service() -> (
)],
Default::default(),
50,
None,
);
(actix_web::test::init_service(api_server).await, endpoint)
}
Expand Down
Loading

0 comments on commit c9df9af

Please sign in to comment.