From 2de985b81c763feee3c518f061ec5061699eee6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 22 Nov 2023 19:13:22 +0000 Subject: [PATCH] nexus cleanup (#702) 1. fix typo: QueryAssocation to QueryAssociation 2. remove unnecessary allocations for two-item keys_to_ignore 3. make NexusQueryParser get_peers_bridge/parse_simple_sql async; all callers are async 4. use with_capacity whenever straightforward to do so 5. change peer-snowflake SQLStatement parameters to struct since it serializes to a constant object --- nexus/analyzer/src/lib.rs | 15 ++++------- nexus/catalog/src/lib.rs | 2 +- nexus/parser/src/lib.rs | 20 ++++++-------- nexus/peer-bigquery/src/stream.rs | 2 +- nexus/peer-snowflake/src/lib.rs | 43 +++++++++++++------------------ nexus/server/src/cursor.rs | 4 +-- nexus/server/src/main.rs | 16 ++++++------ nexus/value/src/lib.rs | 4 +-- 8 files changed, 45 insertions(+), 61 deletions(-) diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 0824abe2f2..2e64012a35 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -40,13 +40,13 @@ impl<'a> PeerExistanceAnalyzer<'a> { } #[derive(Debug, Clone)] -pub enum QueryAssocation { +pub enum QueryAssociation { Peer(Box), Catalog, } impl<'a> StatementAnalyzer for PeerExistanceAnalyzer<'a> { - type Output = QueryAssocation; + type Output = QueryAssociation; fn analyze(&self, statement: &Statement) -> anyhow::Result { let mut peers_touched: HashSet = HashSet::new(); @@ -78,9 +78,9 @@ impl<'a> StatementAnalyzer for PeerExistanceAnalyzer<'a> { anyhow::bail!("queries touching multiple peers are not supported") } else if let Some(peer_name) = peers_touched.iter().next() { let peer = self.peers.get(peer_name).unwrap(); - Ok(QueryAssocation::Peer(Box::new(peer.clone()))) + Ok(QueryAssociation::Peer(Box::new(peer.clone()))) } else { - Ok(QueryAssocation::Catalog) + Ok(QueryAssociation::Catalog) } } } @@ -785,14 +785,9 @@ fn parse_db_options( }) .unwrap_or_default(); - let keys_to_ignore: HashSet = vec!["metadata_db", "unnest_columns"] - .into_iter() - .map(|s| s.to_string()) - .collect(); - let mut eventhubs: HashMap = HashMap::new(); for (key, _) in opts { - if keys_to_ignore.contains(key) { + if matches!(key, "metadata_db" | "unnest_columns") { continue; } diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index 15fccf2233..c1851432e0 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -211,7 +211,7 @@ impl Catalog { let rows = self.pg.query(&stmt, &[]).await?; - let mut peers = HashMap::new(); + let mut peers = HashMap::with_capacity(rows.len()); for row in rows { let name: &str = row.get(1); diff --git a/nexus/parser/src/lib.rs b/nexus/parser/src/lib.rs index f5b2aac340..02c8cb5a27 100644 --- a/nexus/parser/src/lib.rs +++ b/nexus/parser/src/lib.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc}; use analyzer::{ CursorEvent, PeerCursorAnalyzer, PeerDDL, PeerDDLAnalyzer, PeerExistanceAnalyzer, - QueryAssocation, StatementAnalyzer, + QueryAssociation, StatementAnalyzer, }; use async_trait::async_trait; use catalog::Catalog; @@ -27,7 +27,7 @@ pub enum NexusStatement { }, PeerQuery { stmt: Statement, - assoc: QueryAssocation, + assoc: QueryAssociation, }, PeerCursor { stmt: Statement, @@ -96,13 +96,9 @@ impl NexusQueryParser { Self { catalog } } - pub fn get_peers_bridge(&self) -> PgWireResult> { - let peers = tokio::task::block_in_place(move || { - tokio::runtime::Handle::current().block_on(async move { - let catalog = self.catalog.lock().await; - catalog.get_peers().await - }) - }); + pub async fn get_peers_bridge(&self) -> PgWireResult> { + let catalog = self.catalog.lock().await; + let peers = catalog.get_peers().await; peers.map_err(|e| { PgWireError::UserError(Box::new(ErrorInfo::new( @@ -113,7 +109,7 @@ impl NexusQueryParser { }) } - pub fn parse_simple_sql(&self, sql: &str) -> PgWireResult { + pub async fn parse_simple_sql(&self, sql: &str) -> PgWireResult { let mut stmts = Parser::parse_sql(&DIALECT, sql).map_err(|e| PgWireError::ApiError(Box::new(e)))?; if stmts.len() > 1 { @@ -131,7 +127,7 @@ impl NexusQueryParser { }) } else { let stmt = stmts.remove(0); - let peers = self.get_peers_bridge()?; + let peers = self.get_peers_bridge().await?; let nexus_stmt = NexusStatement::new(peers, &stmt)?; Ok(NexusParsedStatement { statement: nexus_stmt, @@ -162,7 +158,7 @@ impl QueryParser for NexusQueryParser { }) } else { let stmt = stmts.remove(0); - let peers = self.get_peers_bridge()?; + let peers = self.get_peers_bridge().await?; let nexus_stmt = NexusStatement::new(peers, &stmt)?; Ok(NexusParsedStatement { statement: nexus_stmt, diff --git a/nexus/peer-bigquery/src/stream.rs b/nexus/peer-bigquery/src/stream.rs index d4acce7fd0..02851ae2cd 100644 --- a/nexus/peer-bigquery/src/stream.rs +++ b/nexus/peer-bigquery/src/stream.rs @@ -101,7 +101,7 @@ impl BqRecordStream { } pub fn convert_result_set_item(&self, result_set: &ResultSet) -> anyhow::Result { - let mut values = Vec::new(); + let mut values = Vec::with_capacity(self.schema.fields.len()); for field in &self.schema.fields { let field_type = &field.r#type; let field_name = &field.name; diff --git a/nexus/peer-snowflake/src/lib.rs b/nexus/peer-snowflake/src/lib.rs index 7bb5b18790..a4eeeacb91 100644 --- a/nexus/peer-snowflake/src/lib.rs +++ b/nexus/peer-snowflake/src/lib.rs @@ -7,7 +7,7 @@ use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; use sqlparser::dialect::GenericDialect; use sqlparser::parser; use std::cmp::min; -use std::{collections::HashMap, time::Duration}; +use std::time::Duration; use stream::SnowflakeDataType; use auth::SnowflakeAuth; @@ -36,6 +36,15 @@ const TIME_OUTPUT_FORMAT: &str = "HH:MI:SS.FF"; const TIMESTAMP_OUTPUT_FORMAT: &str = "YYYY-MM-DDTHH24:MI:SS.FF"; const TIMESTAMP_TZ_OUTPUT_FORMAT: &str = "YYYY-MM-DDTHH24:MI:SS.FFTZHTZM"; +#[derive(Debug, Serialize)] +struct SQLStatementParameters<'a> { + pub date_output_format: &'a str, + pub time_output_format: &'a str, + pub timestamp_ltz_output_format: &'a str, + pub timestamp_ntz_output_format: &'a str, + pub timestamp_tz_output_format: &'a str, +} + #[derive(Debug, Serialize)] struct SQLStatement<'a> { statement: &'a str, @@ -43,7 +52,7 @@ struct SQLStatement<'a> { database: &'a str, warehouse: &'a str, role: &'a str, - parameters: HashMap, + parameters: SQLStatementParameters<'a>, } #[allow(non_snake_case)] @@ -147,28 +156,6 @@ impl SnowflakeQueryExecutor { #[async_recursion] #[tracing::instrument(name = "peer_sflake::process_query", skip_all)] async fn process_query(&self, query_str: &str) -> anyhow::Result { - let mut parameters = HashMap::new(); - parameters.insert( - "date_output_format".to_string(), - DATE_OUTPUT_FORMAT.to_string(), - ); - parameters.insert( - "time_output_format".to_string(), - TIME_OUTPUT_FORMAT.to_string(), - ); - parameters.insert( - "timestamp_ltz_output_format".to_string(), - TIMESTAMP_TZ_OUTPUT_FORMAT.to_string(), - ); - parameters.insert( - "timestamp_ntz_output_format".to_string(), - TIMESTAMP_OUTPUT_FORMAT.to_string(), - ); - parameters.insert( - "timestamp_tz_output_format".to_string(), - TIMESTAMP_TZ_OUTPUT_FORMAT.to_string(), - ); - let mut auth = self.auth.clone(); let jwt = auth.get_jwt()?; let secret = jwt.expose_secret().clone(); @@ -186,7 +173,13 @@ impl SnowflakeQueryExecutor { database: &self.config.database, warehouse: &self.config.warehouse, role: &self.config.role, - parameters, + parameters: SQLStatementParameters { + date_output_format: DATE_OUTPUT_FORMAT, + time_output_format: TIME_OUTPUT_FORMAT, + timestamp_ltz_output_format: TIMESTAMP_TZ_OUTPUT_FORMAT, + timestamp_ntz_output_format: TIMESTAMP_OUTPUT_FORMAT, + timestamp_tz_output_format: TIMESTAMP_TZ_OUTPUT_FORMAT, + }, }) .send() .await diff --git a/nexus/server/src/cursor.rs b/nexus/server/src/cursor.rs index 36fee27c3c..58d0e6a0c0 100644 --- a/nexus/server/src/cursor.rs +++ b/nexus/server/src/cursor.rs @@ -20,8 +20,8 @@ impl PeerCursors { self.cursors.insert(name, peer); } - pub fn remove_cursor(&mut self, name: String) { - self.cursors.remove(&name); + pub fn remove_cursor(&mut self, name: &str) { + self.cursors.remove(name); } pub fn get_peer(&self, name: &str) -> Option<&Peer> { diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 1e38b2e979..c218f9f0c7 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -4,7 +4,7 @@ use std::{ time::Duration, }; -use analyzer::{PeerDDL, QueryAssocation}; +use analyzer::{PeerDDL, QueryAssociation}; use async_trait::async_trait; use bytes::{BufMut, BytesMut}; use catalog::{Catalog, CatalogConfig, WorkflowDetails}; @@ -141,7 +141,7 @@ impl NexusBackend { } peer_cursor::CursorModification::Closed(cursors) => { for cursor_name in cursors { - peer_cursors.remove_cursor(cursor_name); + peer_cursors.remove_cursor(&cursor_name); } Ok(vec![Response::Execution(Tag::new_for_execution( "CLOSE CURSOR", @@ -826,7 +826,7 @@ impl NexusBackend { NexusStatement::PeerQuery { stmt, assoc } => { // get the query executor let executor = match assoc { - QueryAssocation::Peer(peer) => { + QueryAssociation::Peer(peer) => { tracing::info!("handling peer[{}] query: {}", peer.name, stmt); peer_holder = Some(peer.clone()); self.get_peer_executor(&peer).await.map_err(|err| { @@ -835,7 +835,7 @@ impl NexusBackend { })) })? } - QueryAssocation::Catalog => { + QueryAssociation::Catalog => { tracing::info!("handling catalog query: {}", stmt); let catalog = self.catalog.lock().await; catalog.get_executor() @@ -961,7 +961,7 @@ impl SimpleQueryHandler for NexusBackend { where C: ClientInfo + Unpin + Send + Sync, { - let parsed = self.query_parser.parse_simple_sql(sql)?; + let parsed = self.query_parser.parse_simple_sql(sql).await?; let nexus_stmt = parsed.statement; self.handle_query(nexus_stmt).await } @@ -1039,7 +1039,7 @@ impl ExtendedQueryHandler for NexusBackend { sql = sql.replace(&format!("${}", i + 1), ¶meter_to_string(portal, i)?); } - let parsed = self.query_parser.parse_simple_sql(&sql)?; + let parsed = self.query_parser.parse_simple_sql(&sql).await?; let nexus_stmt = parsed.statement; let result = self.handle_query(nexus_stmt).await?; if result.is_empty() { @@ -1077,7 +1077,7 @@ impl ExtendedQueryHandler for NexusBackend { NexusStatement::Empty => Ok(DescribeResponse::no_data()), NexusStatement::PeerQuery { stmt, assoc } => { let schema: Option = match assoc { - QueryAssocation::Peer(peer) => { + QueryAssociation::Peer(peer) => { // if the peer is of type bigquery, let us route the query to bq. match &peer.config { Some(Config::BigqueryConfig(_)) => { @@ -1124,7 +1124,7 @@ impl ExtendedQueryHandler for NexusBackend { } } } - QueryAssocation::Catalog => { + QueryAssociation::Catalog => { let catalog = self.catalog.lock().await; let executor = catalog.get_executor(); executor.describe(stmt).await? diff --git a/nexus/value/src/lib.rs b/nexus/value/src/lib.rs index f6dbe0687b..3e6e0a1cbd 100644 --- a/nexus/value/src/lib.rs +++ b/nexus/value/src/lib.rs @@ -203,7 +203,7 @@ impl Value { } } serde_json::Value::Object(map) => { - let mut hstore = HashMap::new(); + let mut hstore = HashMap::with_capacity(map.len()); for (key, value) in map { hstore.insert(key.clone(), value.to_string()); } @@ -253,7 +253,7 @@ impl Value { Value::Uuid(u) => serde_json::Value::String(u.to_string()), Value::Enum(s) => serde_json::Value::String(s.clone()), Value::Hstore(map) => { - let mut object = serde_json::Map::new(); + let mut object = serde_json::Map::with_capacity(map.len()); for (key, value) in map { object.insert(key.clone(), serde_json::Value::String(value.clone())); }