Skip to content

Commit

Permalink
nexus: impl QueryExecutor for Catalog (#1150)
Browse files Browse the repository at this point in the history
To share code, Catalog was storing a postgres query executor
Unfortunately, that caused two connections for every nexus connection

Remove is_connection_valid since it isn't used, & for pg was wrong
(see deadpool-postgres for how they check if connection is valid)

Alternative fix would've been to use more connection pools,
that could be added in the future if necessary
  • Loading branch information
serprex authored Jan 25, 2024
1 parent c1a6562 commit aefdeaa
Show file tree
Hide file tree
Showing 17 changed files with 105 additions and 145 deletions.
57 changes: 17 additions & 40 deletions nexus/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ members = [
resolver = "2"

[workspace.dependencies]
sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" }
pgwire = "0.19"
2 changes: 1 addition & 1 deletion nexus/analyzer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ catalog = { path = "../catalog" }
flow-rs = { path = "../flow-rs" }
pem = "3.0"
pt = { path = "../pt" }
sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git", features = ["visitor"] }
sqlparser.workspace = true
serde_json = "1.0"
5 changes: 4 additions & 1 deletion nexus/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,21 @@ edition = "2021"

[dependencies]
anyhow = "1"
async-trait = "0.1"
chrono = { version = "0.4.22", default-features = false }
prost = "0.12"
peer-cursor = { path = "../peer-cursor" }
peer-postgres = { path = "../peer-postgres" }
pgwire.workspace = true
pt = { path = "../pt" }
refinery = { version = "0.8", features = ["tokio-postgres"] }
serde_json = "1.0"
sqlparser.workspace = true
tokio = { version = "1.13.0", features = ["full"] }
tokio-postgres = { version = "0.7.6", features = [
"with-chrono-0_4",
"with-serde_json-1",
"with-uuid-1",
] }
tracing = "0.1.29"
serde_json = "1.0"
postgres-connection = { path = "../postgres-connection" }
36 changes: 21 additions & 15 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::{collections::HashMap, sync::Arc};
use std::collections::HashMap;

use anyhow::{anyhow, Context};
use peer_cursor::QueryExecutor;
use peer_postgres::PostgresQueryExecutor;
use peer_cursor::{QueryExecutor, QueryOutput, Schema};
use peer_postgres::{self, ast};
use pgwire::error::{PgWireResult};
use postgres_connection::{connect_postgres, get_pg_connection_string};
use prost::Message;
use pt::{
Expand All @@ -11,6 +12,7 @@ use pt::{
peerdb_peers::{peer::Config, DbType, Peer},
};
use serde_json::Value;
use sqlparser::ast::Statement;
use tokio_postgres::{types, Client};

mod embedded {
Expand All @@ -19,8 +21,7 @@ mod embedded {
}

pub struct Catalog {
pg: Box<Client>,
executor: Arc<dyn QueryExecutor>,
pg: Client,
}

async fn run_migrations(client: &mut Client) -> anyhow::Result<()> {
Expand Down Expand Up @@ -77,22 +78,13 @@ impl<'a> CatalogConfig<'a> {
impl Catalog {
pub async fn new(pt_config: pt::peerdb_peers::PostgresConfig) -> anyhow::Result<Self> {
let client = connect_postgres(&pt_config).await?;
let executor = PostgresQueryExecutor::new(None, &pt_config).await?;

Ok(Self {
pg: Box::new(client),
executor: Arc::new(executor),
})
Ok(Self { pg: client })
}

pub async fn run_migrations(&mut self) -> anyhow::Result<()> {
run_migrations(&mut self.pg).await
}

pub fn get_executor(&self) -> &Arc<dyn QueryExecutor> {
&self.executor
}

pub async fn create_peer(&self, peer: &Peer) -> anyhow::Result<i64> {
let config_blob = {
let config = peer.config.clone().context("invalid peer config")?;
Expand Down Expand Up @@ -602,3 +594,17 @@ impl Catalog {
})
}
}

#[async_trait::async_trait]
impl QueryExecutor for Catalog {
#[tracing::instrument(skip(self, stmt), fields(stmt = %stmt))]
async fn execute(&self, stmt: &Statement) -> PgWireResult<QueryOutput> {
peer_postgres::pg_execute(&self.pg, ast::PostgresAst {
peername: None,
}, stmt).await
}

async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<Schema>> {
peer_postgres::pg_describe(&self.pg, stmt).await
}
}
2 changes: 1 addition & 1 deletion nexus/parser/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ futures = { version = "0.3.28", features = ["executor"] }
pgwire.workspace = true
pt = { path = "../pt" }
rand = "0.8"
sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" }
sqlparser.workspace = true
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
2 changes: 1 addition & 1 deletion nexus/peer-bigquery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ rust_decimal = { version = "1.30.0", features = [ "tokio-pg" ] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_bytes = "0.11"
sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" }
sqlparser.workspace = true
tracing = "0.1"
tokio = { version = "1.0", features = ["full"] }
gcp-bigquery-client = "0.18"
Expand Down
15 changes: 1 addition & 14 deletions nexus/peer-bigquery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,7 @@ impl BigQueryQueryExecutor {
PgWireError::ApiError(err.into())
})?;

let result_set = self
.client
.job()
.query(&self.project_id, query_req)
.await;
let result_set = self.client.job().query(&self.project_id, query_req).await;

token.end().await.map_err(|err| {
tracing::error!("error closing tracking token: {}", err);
Expand Down Expand Up @@ -237,13 +233,4 @@ impl QueryExecutor for BigQueryQueryExecutor {
)))),
}
}
async fn is_connection_valid(&self) -> anyhow::Result<bool> {
let sql = "SELECT 1;";
let _result_set = self
.client
.job()
.query(&self.project_id, QueryRequest::new(sql))
.await?;
Ok(true)
}
}
2 changes: 1 addition & 1 deletion nexus/peer-cursor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ anyhow = "1.0"
async-trait = "0.1"
futures = "0.3"
pgwire.workspace = true
sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" }
sqlparser.workspace = true
tokio = { version = "1.0", features = ["full"] }
value = { path = "../value" }
3 changes: 0 additions & 3 deletions nexus/peer-cursor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,5 @@ pub enum QueryOutput {
#[async_trait::async_trait]
pub trait QueryExecutor: Send + Sync {
async fn execute(&self, stmt: &Statement) -> PgWireResult<QueryOutput>;

async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<Schema>>;

async fn is_connection_valid(&self) -> anyhow::Result<bool>;
}
2 changes: 1 addition & 1 deletion nexus/peer-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_bytes = "0.11"
postgres-inet = "0.19.0"
sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" }
sqlparser.workspace = true
tokio = { version = "1.0", features = ["full"] }
tokio-postgres = { version = "0.7.6", features = [
"with-chrono-0_4",
Expand Down
Loading

0 comments on commit aefdeaa

Please sign in to comment.