Skip to content

Commit

Permalink
nexus cleanup (#702)
Browse files Browse the repository at this point in the history
1. fix typo: QueryAssocation to QueryAssociation
2. remove unnecessary allocations for two-item keys_to_ignore
3. make NexusQueryParser get_peers_bridge/parse_simple_sql async; all callers are async
4. use with_capacity whenever straightforward to do so
5. change peer-snowflake SQLStatement parameters to struct since it serializes to a constant object
  • Loading branch information
serprex authored Nov 22, 2023
1 parent c2117e2 commit 2de985b
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 61 deletions.
15 changes: 5 additions & 10 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ impl<'a> PeerExistanceAnalyzer<'a> {
}

#[derive(Debug, Clone)]
pub enum QueryAssocation {
pub enum QueryAssociation {
Peer(Box<Peer>),
Catalog,
}

impl<'a> StatementAnalyzer for PeerExistanceAnalyzer<'a> {
type Output = QueryAssocation;
type Output = QueryAssociation;

fn analyze(&self, statement: &Statement) -> anyhow::Result<Self::Output> {
let mut peers_touched: HashSet<String> = HashSet::new();
Expand Down Expand Up @@ -78,9 +78,9 @@ impl<'a> StatementAnalyzer for PeerExistanceAnalyzer<'a> {
anyhow::bail!("queries touching multiple peers are not supported")
} else if let Some(peer_name) = peers_touched.iter().next() {
let peer = self.peers.get(peer_name).unwrap();
Ok(QueryAssocation::Peer(Box::new(peer.clone())))
Ok(QueryAssociation::Peer(Box::new(peer.clone())))
} else {
Ok(QueryAssocation::Catalog)
Ok(QueryAssociation::Catalog)
}
}
}
Expand Down Expand Up @@ -785,14 +785,9 @@ fn parse_db_options(
})
.unwrap_or_default();

let keys_to_ignore: HashSet<String> = vec!["metadata_db", "unnest_columns"]
.into_iter()
.map(|s| s.to_string())
.collect();

let mut eventhubs: HashMap<String, EventHubConfig> = HashMap::new();
for (key, _) in opts {
if keys_to_ignore.contains(key) {
if matches!(key, "metadata_db" | "unnest_columns") {
continue;
}

Expand Down
2 changes: 1 addition & 1 deletion nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl Catalog {

let rows = self.pg.query(&stmt, &[]).await?;

let mut peers = HashMap::new();
let mut peers = HashMap::with_capacity(rows.len());

for row in rows {
let name: &str = row.get(1);
Expand Down
20 changes: 8 additions & 12 deletions nexus/parser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc};

use analyzer::{
CursorEvent, PeerCursorAnalyzer, PeerDDL, PeerDDLAnalyzer, PeerExistanceAnalyzer,
QueryAssocation, StatementAnalyzer,
QueryAssociation, StatementAnalyzer,
};
use async_trait::async_trait;
use catalog::Catalog;
Expand All @@ -27,7 +27,7 @@ pub enum NexusStatement {
},
PeerQuery {
stmt: Statement,
assoc: QueryAssocation,
assoc: QueryAssociation,
},
PeerCursor {
stmt: Statement,
Expand Down Expand Up @@ -96,13 +96,9 @@ impl NexusQueryParser {
Self { catalog }
}

pub fn get_peers_bridge(&self) -> PgWireResult<HashMap<String, pt::peerdb_peers::Peer>> {
let peers = tokio::task::block_in_place(move || {
tokio::runtime::Handle::current().block_on(async move {
let catalog = self.catalog.lock().await;
catalog.get_peers().await
})
});
pub async fn get_peers_bridge(&self) -> PgWireResult<HashMap<String, pt::peerdb_peers::Peer>> {
let catalog = self.catalog.lock().await;
let peers = catalog.get_peers().await;

peers.map_err(|e| {
PgWireError::UserError(Box::new(ErrorInfo::new(
Expand All @@ -113,7 +109,7 @@ impl NexusQueryParser {
})
}

pub fn parse_simple_sql(&self, sql: &str) -> PgWireResult<NexusParsedStatement> {
pub async fn parse_simple_sql(&self, sql: &str) -> PgWireResult<NexusParsedStatement> {
let mut stmts =
Parser::parse_sql(&DIALECT, sql).map_err(|e| PgWireError::ApiError(Box::new(e)))?;
if stmts.len() > 1 {
Expand All @@ -131,7 +127,7 @@ impl NexusQueryParser {
})
} else {
let stmt = stmts.remove(0);
let peers = self.get_peers_bridge()?;
let peers = self.get_peers_bridge().await?;
let nexus_stmt = NexusStatement::new(peers, &stmt)?;
Ok(NexusParsedStatement {
statement: nexus_stmt,
Expand Down Expand Up @@ -162,7 +158,7 @@ impl QueryParser for NexusQueryParser {
})
} else {
let stmt = stmts.remove(0);
let peers = self.get_peers_bridge()?;
let peers = self.get_peers_bridge().await?;
let nexus_stmt = NexusStatement::new(peers, &stmt)?;
Ok(NexusParsedStatement {
statement: nexus_stmt,
Expand Down
2 changes: 1 addition & 1 deletion nexus/peer-bigquery/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl BqRecordStream {
}

pub fn convert_result_set_item(&self, result_set: &ResultSet) -> anyhow::Result<Record> {
let mut values = Vec::new();
let mut values = Vec::with_capacity(self.schema.fields.len());
for field in &self.schema.fields {
let field_type = &field.r#type;
let field_name = &field.name;
Expand Down
43 changes: 18 additions & 25 deletions nexus/peer-snowflake/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use sqlparser::dialect::GenericDialect;
use sqlparser::parser;
use std::cmp::min;
use std::{collections::HashMap, time::Duration};
use std::time::Duration;
use stream::SnowflakeDataType;

use auth::SnowflakeAuth;
Expand Down Expand Up @@ -36,14 +36,23 @@ const TIME_OUTPUT_FORMAT: &str = "HH:MI:SS.FF";
const TIMESTAMP_OUTPUT_FORMAT: &str = "YYYY-MM-DDTHH24:MI:SS.FF";
const TIMESTAMP_TZ_OUTPUT_FORMAT: &str = "YYYY-MM-DDTHH24:MI:SS.FFTZHTZM";

#[derive(Debug, Serialize)]
struct SQLStatementParameters<'a> {
pub date_output_format: &'a str,
pub time_output_format: &'a str,
pub timestamp_ltz_output_format: &'a str,
pub timestamp_ntz_output_format: &'a str,
pub timestamp_tz_output_format: &'a str,
}

#[derive(Debug, Serialize)]
struct SQLStatement<'a> {
statement: &'a str,
timeout: u64,
database: &'a str,
warehouse: &'a str,
role: &'a str,
parameters: HashMap<String, String>,
parameters: SQLStatementParameters<'a>,
}

#[allow(non_snake_case)]
Expand Down Expand Up @@ -147,28 +156,6 @@ impl SnowflakeQueryExecutor {
#[async_recursion]
#[tracing::instrument(name = "peer_sflake::process_query", skip_all)]
async fn process_query(&self, query_str: &str) -> anyhow::Result<ResultSet> {
let mut parameters = HashMap::new();
parameters.insert(
"date_output_format".to_string(),
DATE_OUTPUT_FORMAT.to_string(),
);
parameters.insert(
"time_output_format".to_string(),
TIME_OUTPUT_FORMAT.to_string(),
);
parameters.insert(
"timestamp_ltz_output_format".to_string(),
TIMESTAMP_TZ_OUTPUT_FORMAT.to_string(),
);
parameters.insert(
"timestamp_ntz_output_format".to_string(),
TIMESTAMP_OUTPUT_FORMAT.to_string(),
);
parameters.insert(
"timestamp_tz_output_format".to_string(),
TIMESTAMP_TZ_OUTPUT_FORMAT.to_string(),
);

let mut auth = self.auth.clone();
let jwt = auth.get_jwt()?;
let secret = jwt.expose_secret().clone();
Expand All @@ -186,7 +173,13 @@ impl SnowflakeQueryExecutor {
database: &self.config.database,
warehouse: &self.config.warehouse,
role: &self.config.role,
parameters,
parameters: SQLStatementParameters {
date_output_format: DATE_OUTPUT_FORMAT,
time_output_format: TIME_OUTPUT_FORMAT,
timestamp_ltz_output_format: TIMESTAMP_TZ_OUTPUT_FORMAT,
timestamp_ntz_output_format: TIMESTAMP_OUTPUT_FORMAT,
timestamp_tz_output_format: TIMESTAMP_TZ_OUTPUT_FORMAT,
},
})
.send()
.await
Expand Down
4 changes: 2 additions & 2 deletions nexus/server/src/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ impl PeerCursors {
self.cursors.insert(name, peer);
}

pub fn remove_cursor(&mut self, name: String) {
self.cursors.remove(&name);
pub fn remove_cursor(&mut self, name: &str) {
self.cursors.remove(name);
}

pub fn get_peer(&self, name: &str) -> Option<&Peer> {
Expand Down
16 changes: 8 additions & 8 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
time::Duration,
};

use analyzer::{PeerDDL, QueryAssocation};
use analyzer::{PeerDDL, QueryAssociation};
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use catalog::{Catalog, CatalogConfig, WorkflowDetails};
Expand Down Expand Up @@ -141,7 +141,7 @@ impl NexusBackend {
}
peer_cursor::CursorModification::Closed(cursors) => {
for cursor_name in cursors {
peer_cursors.remove_cursor(cursor_name);
peer_cursors.remove_cursor(&cursor_name);
}
Ok(vec![Response::Execution(Tag::new_for_execution(
"CLOSE CURSOR",
Expand Down Expand Up @@ -826,7 +826,7 @@ impl NexusBackend {
NexusStatement::PeerQuery { stmt, assoc } => {
// get the query executor
let executor = match assoc {
QueryAssocation::Peer(peer) => {
QueryAssociation::Peer(peer) => {
tracing::info!("handling peer[{}] query: {}", peer.name, stmt);
peer_holder = Some(peer.clone());
self.get_peer_executor(&peer).await.map_err(|err| {
Expand All @@ -835,7 +835,7 @@ impl NexusBackend {
}))
})?
}
QueryAssocation::Catalog => {
QueryAssociation::Catalog => {
tracing::info!("handling catalog query: {}", stmt);
let catalog = self.catalog.lock().await;
catalog.get_executor()
Expand Down Expand Up @@ -961,7 +961,7 @@ impl SimpleQueryHandler for NexusBackend {
where
C: ClientInfo + Unpin + Send + Sync,
{
let parsed = self.query_parser.parse_simple_sql(sql)?;
let parsed = self.query_parser.parse_simple_sql(sql).await?;
let nexus_stmt = parsed.statement;
self.handle_query(nexus_stmt).await
}
Expand Down Expand Up @@ -1039,7 +1039,7 @@ impl ExtendedQueryHandler for NexusBackend {
sql = sql.replace(&format!("${}", i + 1), &parameter_to_string(portal, i)?);
}

let parsed = self.query_parser.parse_simple_sql(&sql)?;
let parsed = self.query_parser.parse_simple_sql(&sql).await?;
let nexus_stmt = parsed.statement;
let result = self.handle_query(nexus_stmt).await?;
if result.is_empty() {
Expand Down Expand Up @@ -1077,7 +1077,7 @@ impl ExtendedQueryHandler for NexusBackend {
NexusStatement::Empty => Ok(DescribeResponse::no_data()),
NexusStatement::PeerQuery { stmt, assoc } => {
let schema: Option<SchemaRef> = match assoc {
QueryAssocation::Peer(peer) => {
QueryAssociation::Peer(peer) => {
// if the peer is of type bigquery, let us route the query to bq.
match &peer.config {
Some(Config::BigqueryConfig(_)) => {
Expand Down Expand Up @@ -1124,7 +1124,7 @@ impl ExtendedQueryHandler for NexusBackend {
}
}
}
QueryAssocation::Catalog => {
QueryAssociation::Catalog => {
let catalog = self.catalog.lock().await;
let executor = catalog.get_executor();
executor.describe(stmt).await?
Expand Down
4 changes: 2 additions & 2 deletions nexus/value/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl Value {
}
}
serde_json::Value::Object(map) => {
let mut hstore = HashMap::new();
let mut hstore = HashMap::with_capacity(map.len());
for (key, value) in map {
hstore.insert(key.clone(), value.to_string());
}
Expand Down Expand Up @@ -253,7 +253,7 @@ impl Value {
Value::Uuid(u) => serde_json::Value::String(u.to_string()),
Value::Enum(s) => serde_json::Value::String(s.clone()),
Value::Hstore(map) => {
let mut object = serde_json::Map::new();
let mut object = serde_json::Map::with_capacity(map.len());
for (key, value) in map {
object.insert(key.clone(), serde_json::Value::String(value.clone()));
}
Expand Down

0 comments on commit 2de985b

Please sign in to comment.