Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: SQL layer fixes #2235

Merged
merged 32 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
cf29f30
Ignore transactions in sql layer
Jesse-Bakker Nov 17, 2023
844cc25
Fix response message for transaction start/end & implement some system
Jesse-Bakker Nov 20, 2023
f626771
Implement describe. Parameter type inference is a bit WIP
Jesse-Bakker Nov 21, 2023
84b29e5
Add pg_types dep to dozer-api
Jesse-Bakker Nov 21, 2023
d366cfb
Add pg_proc catalog
Jesse-Bakker Nov 21, 2023
90d4766
Add pg_class
Jesse-Bakker Nov 21, 2023
781c445
Add pg_attribute
Jesse-Bakker Nov 21, 2023
1bb6029
Don't share QueryProcessor between sessions
Jesse-Bakker Nov 21, 2023
0de29eb
fix: support some system variables with SHOW
Nov 21, 2023
b9cd577
chore: improve schema macro syntax
Nov 21, 2023
4ee90a8
fix: add some catalog tables and functions
Nov 22, 2023
1bfb8e2
fix: add more information schema tables
Nov 22, 2023
9b0ca8a
Update to patched datafusion
Jesse-Bakker Nov 23, 2023
4233f5f
Support ODBC (at least unixodbc)
Jesse-Bakker Nov 23, 2023
1b7d818
Support more of postgres for superset
Jesse-Bakker Nov 24, 2023
cddc2de
pg_catalog wip
Nov 28, 2023
15c7f9d
fix: SQL AST rewrite for SUM('1') as SUM(1)
Nov 29, 2023
c2a03dc
Adjust type names
Jesse-Bakker Nov 29, 2023
1031c22
fix: add more pg_catalog tables and rewrite format_type function
Nov 29, 2023
6d24eb5
Update datafusion fork for boolean coercion
Jesse-Bakker Nov 29, 2023
3786ff0
Make some UDFs more lenient with types, because array types aren't
Jesse-Bakker Nov 29, 2023
835bd93
fix: rewrite a = ANY (b) to a in (b)
Nov 30, 2023
2899da9
fix: use patched datafusion with disabled field-name ambiguity check
Nov 30, 2023
ed4b741
chore: optimize: reuse as much state as possible between queries
Dec 1, 2023
78a0214
Correctness fixes for bugs exposed by PG JDBC
Jesse-Bakker Dec 1, 2023
b1d12c2
chore: update datafusion repo url
Dec 12, 2023
874ce9a
chore: clippy fixes
Dec 12, 2023
751fe2d
chore: remove println
Dec 12, 2023
1c98ac2
chore: add information_schema.character_sets
Dec 13, 2023
c306ada
chore: clippy fix
Dec 13, 2023
f4eaa9b
chore: add enabled flag in pgwire config
Dec 13, 2023
e9c04f1
chore: add enable_sql flag in rest config
Dec 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,171 changes: 1,483 additions & 1,688 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,19 @@ resolver = "2"

[workspace.dependencies]
bincode = { version = "2.0.0-rc.3", features = ["derive"]}
datafusion = { version = "33.0.0"}
datafusion-expr = { version = "33.0.0"}

[patch.crates-io]
postgres = { git = "https://github.com/getdozer/rust-postgres" }
postgres-protocol = { git = "https://github.com/getdozer/rust-postgres" }
postgres-types = { git = "https://github.com/getdozer/rust-postgres" }
tokio-postgres = { git = "https://github.com/getdozer/rust-postgres" }


datafusion = { git = "https://github.com/getdozer/arrow-datafusion" }
datafusion-expr = { git = "https://github.com/getdozer/arrow-datafusion" }
datafusion-physical-expr = { git = "https://github.com/getdozer/arrow-datafusion" }
datafusion-physical-plan = { git = "https://github.com/getdozer/arrow-datafusion" }
datafusion-sql = { git = "https://github.com/getdozer/arrow-datafusion" }
datafusion-proto = { git = "https://github.com/getdozer/arrow-datafusion" }
datafusion-common = { git = "https://github.com/getdozer/arrow-datafusion" }
8 changes: 6 additions & 2 deletions dozer-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@ pin-project = "1.1.3"
async-stream = "0.3.5"
uuid = "1.6.1"
chrono = "0.4.31"
datafusion = "32.0.0"
datafusion-expr = "32.0.0"
datafusion = { workspace = true }
datafusion-expr = { workspace = true }
serde_json = { version = "1.0.108", features = ["arbitrary_precision"] }
pgwire = "0.16.1"
tempdir = "0.3.7"
postgres-types = "0.2"
futures-sink = "0.3.29"
async-once-cell = "0.5.3"
genawaiter = "0.99.1"
2 changes: 2 additions & 0 deletions dozer-api/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub enum ApiInitError {
CacheNotFound(Labels),
#[error("Failed to bind to address {0}: {1}")]
FailedToBindToAddress(String, #[source] std::io::Error),
#[error("Failed to initialize SQL engine: {0}")]
SQLEngineError(#[source] DataFusionError),
}

#[derive(Error, Debug)]
Expand Down
28 changes: 22 additions & 6 deletions dozer-api/src/rest/api_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::sync::Arc;

use actix_web::web::ReqData;
use actix_web::{web, HttpResponse};
use datafusion::common::plan_datafusion_err;
use datafusion::error::DataFusionError;
use dozer_cache::cache::expression::{QueryExpression, Skip};
use dozer_cache::cache::CacheRecord;
use dozer_cache::{CacheReader, Phase};
Expand All @@ -14,7 +16,7 @@ use openapiv3::OpenAPI;
use crate::api_helper::{get_record, get_records, get_records_count};
use crate::generator::oapi::generator::OpenApiGenerator;
use crate::sql::datafusion::json::record_batches_to_json_rows;
use crate::sql::datafusion::SQLExecutor;
use crate::sql::datafusion::{PlannedStatement, SQLExecutor};
use crate::CacheEndpoint;
use crate::{auth::Access, errors::ApiError};
use dozer_types::grpc_types::health::health_check_response::ServingStatus;
Expand Down Expand Up @@ -177,16 +179,30 @@ pub async fn get_phase(
Ok(web::Json(phase))
}

pub async fn sql(
pub(crate) async fn sql(
// access: Option<ReqData<Access>>, // TODO:
cache_endpoints: web::Data<Vec<Arc<CacheEndpoint>>>,
sql_executor: web::Data<Option<Arc<SQLExecutor>>>,
sql: extractor::SQLQueryExtractor,
) -> Result<actix_web::HttpResponse, crate::errors::ApiError> {
let cache_endpoints = (*cache_endpoints.into_inner()).clone();
let sql_executor = SQLExecutor::new(cache_endpoints);
let Some(sql_executor) = sql_executor.as_deref() else {
return Ok(HttpResponse::NotFound().json(json!({ "error": "SQL endpoint is disabled" })));
};
let query = sql.0 .0;
let planned = sql_executor
.parse(&query)
.await
.map_err(ApiError::SQLQueryFailed)?;
if planned.len() > 1 {
return Err(ApiError::SQLQueryFailed(plan_datafusion_err!(
"More than one query supplied"
)));
}
let Some(PlannedStatement::Query(plan)) = planned.first().cloned() else {
// This was a transaction statement, which doesn't require a result
return Ok(HttpResponse::Ok().json(json!({})));
};
let record_batches = sql_executor
.execute(&query)
.execute(plan)
.await
.map_err(ApiError::SQLQueryFailed)?
.collect()
Expand Down
18 changes: 17 additions & 1 deletion dozer-api/src/rest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::api_helper::get_api_security;
// Exports
use crate::errors::ApiInitError;
use crate::rest::api_generator::health_route;
use crate::sql::datafusion::SQLExecutor;
use crate::{
auth::api::{auth_route, validate},
CacheEndpoint,
Expand Down Expand Up @@ -49,6 +50,7 @@ pub struct ApiServer {
security: Option<ApiSecurity>,
host: String,
default_max_num_records: usize,
enable_sql: bool,
}

impl Default for ApiServer {
Expand All @@ -60,6 +62,7 @@ impl Default for ApiServer {
security: None,
host: "0.0.0.0".to_owned(),
default_max_num_records: 50,
enable_sql: true,
}
}
}
Expand All @@ -77,8 +80,10 @@ impl ApiServer {
security,
host: rest_config.host.unwrap_or_else(default_host),
default_max_num_records,
enable_sql: rest_config.enable_sql.unwrap_or(true),
}
}

fn get_cors(cors: CorsOptions) -> Cors {
match cors {
CorsOptions::Permissive => Cors::permissive(),
Expand All @@ -95,6 +100,7 @@ impl ApiServer {
mut cache_endpoints: Vec<Arc<CacheEndpoint>>,
labels: LabelsAndProgress,
default_max_num_records: usize,
sql_executor: Option<Arc<SQLExecutor>>,
) -> App<
impl ServiceFactory<
ServiceRequest,
Expand All @@ -113,6 +119,7 @@ impl ApiServer {
.app_data(web::Data::new(endpoint_paths))
.app_data(web::Data::new(default_max_num_records))
.app_data(web::Data::new(cache_endpoints.clone()))
.app_data(web::Data::new(sql_executor))
.app_data(cfg)
.wrap(Logger::default())
.wrap(TracingLogger::default())
Expand Down Expand Up @@ -171,7 +178,7 @@ impl ApiServer {
.wrap(cors_middleware)
}

pub fn run(
pub async fn run(
self,
cache_endpoints: Vec<Arc<CacheEndpoint>>,
shutdown: impl Future<Output = ()> + Send + 'static,
Expand All @@ -190,13 +197,22 @@ impl ApiServer {

let address = format!("{}:{}", self.host, self.port);
let default_max_num_records = self.default_max_num_records;
let sql_executor = if self.enable_sql {
let sql_executor = SQLExecutor::try_new(&cache_endpoints)
.await
.map_err(ApiInitError::SQLEngineError)?;
Some(Arc::new(sql_executor))
} else {
None
};
let server = HttpServer::new(move || {
ApiServer::create_app_entry(
security.clone(),
cors.clone(),
cache_endpoints.clone(),
labels.clone(),
default_max_num_records,
sql_executor.clone(),
)
})
.bind(&address)
Expand Down
2 changes: 2 additions & 0 deletions dozer-api/src/rest/tests/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ async fn check_status(
)],
Default::default(),
50,
None,
);
let app = actix_web::test::init_service(api_server).await;

Expand Down Expand Up @@ -104,6 +105,7 @@ async fn _call_auth_token_api(
)],
Default::default(),
50,
None,
);
let app = actix_web::test::init_service(api_server).await;

Expand Down
3 changes: 3 additions & 0 deletions dozer-api/src/rest/tests/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ async fn list_route() {
)],
Default::default(),
50,
None,
);
let app = actix_web::test::init_service(api_server).await;

Expand Down Expand Up @@ -230,6 +231,7 @@ async fn path_collision_test() {
],
Default::default(),
50,
None,
);
let app = actix_web::test::init_service(api_server).await;

Expand Down Expand Up @@ -257,6 +259,7 @@ async fn setup_service() -> (
)],
Default::default(),
50,
None,
);
(actix_web::test::init_service(api_server).await, endpoint)
}
Expand Down
Loading
Loading