Skip to content

Commit

Permalink
nexus cleanup
Browse files Browse the repository at this point in the history
1. fix typo: QueryAssocation to QueryAssociation
2. remove unnecessary allocations for two-item keys_to_ignore
3. make NexusQueryPArser get_peers_bridge/parse_simple_sql async; all callers are async
  • Loading branch information
serprex committed Nov 22, 2023
1 parent 62e2e5c commit 0ef50fd
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 29 deletions.
15 changes: 5 additions & 10 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ impl<'a> PeerExistanceAnalyzer<'a> {
}

#[derive(Debug, Clone)]
pub enum QueryAssocation {
pub enum QueryAssociation {
Peer(Box<Peer>),
Catalog,
}

impl<'a> StatementAnalyzer for PeerExistanceAnalyzer<'a> {
type Output = QueryAssocation;
type Output = QueryAssociation;

fn analyze(&self, statement: &Statement) -> anyhow::Result<Self::Output> {
let mut peers_touched: HashSet<String> = HashSet::new();
Expand Down Expand Up @@ -78,9 +78,9 @@ impl<'a> StatementAnalyzer for PeerExistanceAnalyzer<'a> {
anyhow::bail!("queries touching multiple peers are not supported")
} else if let Some(peer_name) = peers_touched.iter().next() {
let peer = self.peers.get(peer_name).unwrap();
Ok(QueryAssocation::Peer(Box::new(peer.clone())))
Ok(QueryAssociation::Peer(Box::new(peer.clone())))
} else {
Ok(QueryAssocation::Catalog)
Ok(QueryAssociation::Catalog)
}
}
}
Expand Down Expand Up @@ -785,14 +785,9 @@ fn parse_db_options(
})
.unwrap_or_default();

let keys_to_ignore: HashSet<String> = vec!["metadata_db", "unnest_columns"]
.into_iter()
.map(|s| s.to_string())
.collect();

let mut eventhubs: HashMap<String, EventHubConfig> = HashMap::new();
for (key, _) in opts {
if keys_to_ignore.contains(key) {
if matches!(key, "metadata_db" | "unnest_columns") {
continue;
}

Expand Down
20 changes: 8 additions & 12 deletions nexus/parser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc};

use analyzer::{
CursorEvent, PeerCursorAnalyzer, PeerDDL, PeerDDLAnalyzer, PeerExistanceAnalyzer,
QueryAssocation, StatementAnalyzer,
QueryAssociation, StatementAnalyzer,
};
use async_trait::async_trait;
use catalog::Catalog;
Expand All @@ -27,7 +27,7 @@ pub enum NexusStatement {
},
PeerQuery {
stmt: Statement,
assoc: QueryAssocation,
assoc: QueryAssociation,
},
PeerCursor {
stmt: Statement,
Expand Down Expand Up @@ -96,13 +96,9 @@ impl NexusQueryParser {
Self { catalog }
}

pub fn get_peers_bridge(&self) -> PgWireResult<HashMap<String, pt::peerdb_peers::Peer>> {
let peers = tokio::task::block_in_place(move || {
tokio::runtime::Handle::current().block_on(async move {
let catalog = self.catalog.lock().await;
catalog.get_peers().await
})
});
pub async fn get_peers_bridge(&self) -> PgWireResult<HashMap<String, pt::peerdb_peers::Peer>> {
let catalog = self.catalog.lock().await;
let peers = catalog.get_peers().await;

peers.map_err(|e| {
PgWireError::UserError(Box::new(ErrorInfo::new(
Expand All @@ -113,7 +109,7 @@ impl NexusQueryParser {
})
}

pub fn parse_simple_sql(&self, sql: &str) -> PgWireResult<NexusParsedStatement> {
pub async fn parse_simple_sql(&self, sql: &str) -> PgWireResult<NexusParsedStatement> {
let mut stmts =
Parser::parse_sql(&DIALECT, sql).map_err(|e| PgWireError::ApiError(Box::new(e)))?;
if stmts.len() > 1 {
Expand All @@ -131,7 +127,7 @@ impl NexusQueryParser {
})
} else {
let stmt = stmts.remove(0);
let peers = self.get_peers_bridge()?;
let peers = self.get_peers_bridge().await?;
let nexus_stmt = NexusStatement::new(peers, &stmt)?;
Ok(NexusParsedStatement {
statement: nexus_stmt,
Expand Down Expand Up @@ -162,7 +158,7 @@ impl QueryParser for NexusQueryParser {
})
} else {
let stmt = stmts.remove(0);
let peers = self.get_peers_bridge()?;
let peers = self.get_peers_bridge().await?;
let nexus_stmt = NexusStatement::new(peers, &stmt)?;
Ok(NexusParsedStatement {
statement: nexus_stmt,
Expand Down
14 changes: 7 additions & 7 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
time::Duration,
};

use analyzer::{PeerDDL, QueryAssocation};
use analyzer::{PeerDDL, QueryAssociation};
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use catalog::{Catalog, CatalogConfig, WorkflowDetails};
Expand Down Expand Up @@ -826,7 +826,7 @@ impl NexusBackend {
NexusStatement::PeerQuery { stmt, assoc } => {
// get the query executor
let executor = match assoc {
QueryAssocation::Peer(peer) => {
QueryAssociation::Peer(peer) => {
tracing::info!("handling peer[{}] query: {}", peer.name, stmt);
peer_holder = Some(peer.clone());
self.get_peer_executor(&peer).await.map_err(|err| {
Expand All @@ -835,7 +835,7 @@ impl NexusBackend {
}))
})?
}
QueryAssocation::Catalog => {
QueryAssociation::Catalog => {
tracing::info!("handling catalog query: {}", stmt);
let catalog = self.catalog.lock().await;
catalog.get_executor()
Expand Down Expand Up @@ -961,7 +961,7 @@ impl SimpleQueryHandler for NexusBackend {
where
C: ClientInfo + Unpin + Send + Sync,
{
let parsed = self.query_parser.parse_simple_sql(sql)?;
let parsed = self.query_parser.parse_simple_sql(sql).await?;
let nexus_stmt = parsed.statement;
self.handle_query(nexus_stmt).await
}
Expand Down Expand Up @@ -1039,7 +1039,7 @@ impl ExtendedQueryHandler for NexusBackend {
sql = sql.replace(&format!("${}", i + 1), &parameter_to_string(portal, i)?);
}

let parsed = self.query_parser.parse_simple_sql(&sql)?;
let parsed = self.query_parser.parse_simple_sql(&sql).await?;
let nexus_stmt = parsed.statement;
let result = self.handle_query(nexus_stmt).await?;
if result.is_empty() {
Expand Down Expand Up @@ -1077,7 +1077,7 @@ impl ExtendedQueryHandler for NexusBackend {
NexusStatement::Empty => Ok(DescribeResponse::no_data()),
NexusStatement::PeerQuery { stmt, assoc } => {
let schema: Option<SchemaRef> = match assoc {
QueryAssocation::Peer(peer) => {
QueryAssociation::Peer(peer) => {
// if the peer is of type bigquery, let us route the query to bq.
match &peer.config {
Some(Config::BigqueryConfig(_)) => {
Expand Down Expand Up @@ -1124,7 +1124,7 @@ impl ExtendedQueryHandler for NexusBackend {
}
}
}
QueryAssocation::Catalog => {
QueryAssociation::Catalog => {
let catalog = self.catalog.lock().await;
let executor = catalog.get_executor();
executor.describe(stmt).await?
Expand Down

0 comments on commit 0ef50fd

Please sign in to comment.