From 3fce83fb714033b0b15cd8586dd6f6a127b82c6d Mon Sep 17 00:00:00 2001 From: Demur Rumed Date: Thu, 2 Nov 2023 17:11:20 +0000 Subject: [PATCH] Basic optimizations Prefer `as_ref` over `clone` when possible Prefer `eq_ignore_ascii_case` over calling `to_lower_case` before comparison Executor: convert `Arc>` to `Arc` Prefer passing `&[u8]` instead of `Vec` to get_config. The postgres library is able to lend out slices from rows, avoiding a copy --- nexus/analyzer/src/lib.rs | 6 ++-- nexus/catalog/src/lib.rs | 50 +++++++++++++++---------------- nexus/peer-bigquery/src/stream.rs | 6 ++-- nexus/peer-postgres/src/ast.rs | 27 ++++++++++------- nexus/peer-snowflake/src/lib.rs | 8 ++--- nexus/server/src/main.rs | 14 ++++----- 6 files changed, 57 insertions(+), 54 deletions(-) diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index a06abb34ac..6939b70809 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -162,7 +162,7 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> { destination_table_identifier: table_mapping.destination.to_string(), partition_key: table_mapping .partition_key - .clone() + .as_ref() .map(|s| s.to_string()), }); } @@ -717,9 +717,9 @@ fn parse_db_options( // check if peers contains key and if it does // then add it to the eventhubs hashmap, if not error if let Some(peer) = peers.get(&key) { - let eventhub_config = peer.config.clone().unwrap(); + let eventhub_config = peer.config.as_ref().unwrap(); if let Config::EventhubConfig(eventhub_config) = eventhub_config { - eventhubs.insert(key.to_string(), eventhub_config); + eventhubs.insert(key.to_string(), eventhub_config.clone()); } else { anyhow::bail!("Peer '{}' is not an eventhub", key); } diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index fb4c942961..53788692a7 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -19,7 +19,7 @@ mod embedded { pub struct Catalog { pg: Box, - executor: Arc>, + executor: Arc, } async fn run_migrations(client: &mut Client) -> anyhow::Result<()> { @@ -86,11 +86,10 @@ impl Catalog { let pt_config = catalog_config.to_postgres_config(); let client = connect_postgres(&pt_config).await?; let executor = PostgresQueryExecutor::new(None, &pt_config).await?; - let boxed_trait = Box::new(executor) as Box; Ok(Self { pg: Box::new(client), - executor: Arc::new(boxed_trait), + executor: Arc::new(executor), }) } @@ -98,7 +97,7 @@ impl Catalog { run_migrations(&mut self.pg).await } - pub fn get_executor(&self) -> Arc> { + pub fn get_executor(&self) -> Arc { self.executor.clone() } @@ -213,18 +212,18 @@ impl Catalog { let mut peers = HashMap::new(); for row in rows { - let name: String = row.get(1); + let name: &str = row.get(1); let peer_type: i32 = row.get(2); - let options: Vec = row.get(3); + let options: &[u8] = row.get(3); let db_type = DbType::from_i32(peer_type); let config = self.get_config(db_type, &name, options).await?; let peer = Peer { - name: name.clone().to_lowercase(), + name: name.to_lowercase(), r#type: peer_type, config, }; - peers.insert(name, peer); + peers.insert(name.to_string(), peer); } Ok(peers) @@ -242,14 +241,14 @@ impl Catalog { let rows = self.pg.query(&stmt, &[&peer_name]).await?; if let Some(row) = rows.first() { - let name: String = row.get(1); + let name: &str = row.get(1); let peer_type: i32 = row.get(2); - let options: Vec = row.get(3); + let options: &[u8] = row.get(3); let db_type = DbType::from_i32(peer_type); let config = self.get_config(db_type, &name, options).await?; let peer = Peer { - name: name.clone().to_lowercase(), + name: name.to_lowercase(), r#type: peer_type, config, }; @@ -269,14 +268,14 @@ impl Catalog { let rows = self.pg.query(&stmt, &[&peer_id]).await?; if let Some(row) = rows.first() { - let name: String = row.get(0); + let name: &str = row.get(0); let peer_type: i32 = row.get(1); - let options: Vec = row.get(2); + let options: &[u8] = row.get(2); let db_type = DbType::from_i32(peer_type); let config = self.get_config(db_type, &name, options).await?; let peer = Peer { - name: name.clone().to_lowercase(), + name: name.to_lowercase(), r#type: peer_type, config, }; @@ -291,49 +290,49 @@ impl Catalog { &self, db_type: Option, name: &str, - options: Vec, + options: &[u8], ) -> anyhow::Result> { match db_type { Some(DbType::Snowflake) => { let err = format!("unable to decode {} options for peer {}", "snowflake", name); let snowflake_config = - pt::peerdb_peers::SnowflakeConfig::decode(options.as_slice()).context(err)?; + pt::peerdb_peers::SnowflakeConfig::decode(options).context(err)?; Ok(Some(Config::SnowflakeConfig(snowflake_config))) } Some(DbType::Bigquery) => { let err = format!("unable to decode {} options for peer {}", "bigquery", name); let bigquery_config = - pt::peerdb_peers::BigqueryConfig::decode(options.as_slice()).context(err)?; + pt::peerdb_peers::BigqueryConfig::decode(options).context(err)?; Ok(Some(Config::BigqueryConfig(bigquery_config))) } Some(DbType::Mongo) => { let err = format!("unable to decode {} options for peer {}", "mongo", name); let mongo_config = - pt::peerdb_peers::MongoConfig::decode(options.as_slice()).context(err)?; + pt::peerdb_peers::MongoConfig::decode(options).context(err)?; Ok(Some(Config::MongoConfig(mongo_config))) } Some(DbType::Eventhub) => { let err = format!("unable to decode {} options for peer {}", "eventhub", name); let eventhub_config = - pt::peerdb_peers::EventHubConfig::decode(options.as_slice()).context(err)?; + pt::peerdb_peers::EventHubConfig::decode(options).context(err)?; Ok(Some(Config::EventhubConfig(eventhub_config))) } Some(DbType::Postgres) => { let err = format!("unable to decode {} options for peer {}", "postgres", name); let postgres_config = - pt::peerdb_peers::PostgresConfig::decode(options.as_slice()).context(err)?; + pt::peerdb_peers::PostgresConfig::decode(options).context(err)?; Ok(Some(Config::PostgresConfig(postgres_config))) } Some(DbType::S3) => { let err = format!("unable to decode {} options for peer {}", "s3", name); let s3_config = - pt::peerdb_peers::S3Config::decode(options.as_slice()).context(err)?; + pt::peerdb_peers::S3Config::decode(options).context(err)?; Ok(Some(Config::S3Config(s3_config))) } Some(DbType::Sqlserver) => { let err = format!("unable to decode {} options for peer {}", "sqlserver", name); let sqlserver_config = - pt::peerdb_peers::SqlServerConfig::decode(options.as_slice()).context(err)?; + pt::peerdb_peers::SqlServerConfig::decode(options).context(err)?; Ok(Some(Config::SqlserverConfig(sqlserver_config))) } Some(DbType::EventhubGroup) => { @@ -342,7 +341,7 @@ impl Catalog { "eventhub_group", name ); let eventhub_group_config = - pt::peerdb_peers::EventHubGroupConfig::decode(options.as_slice()) + pt::peerdb_peers::EventHubGroupConfig::decode(options) .context(err)?; Ok(Some(Config::EventhubGroupConfig(eventhub_group_config))) } @@ -528,13 +527,12 @@ impl Catalog { let first_row = rows.get(0).unwrap(); let workflow_id: Option = first_row.get(0); - if workflow_id.is_none() { + let Some(workflow_id) = workflow_id else { return Err(anyhow!( "workflow id not found for existing flow job {}", flow_job_name )); - } - let workflow_id = workflow_id.unwrap(); + }; let source_peer_id: i32 = first_row.get(1); let destination_peer_id: i32 = first_row.get(2); diff --git a/nexus/peer-bigquery/src/stream.rs b/nexus/peer-bigquery/src/stream.rs index fc4867b4f3..e04410f355 100644 --- a/nexus/peer-bigquery/src/stream.rs +++ b/nexus/peer-bigquery/src/stream.rs @@ -60,9 +60,9 @@ impl BqSchema { let bq_schema = result_set .query_response() .schema - .clone() + .as_ref() .expect("Schema is not present"); - let fields = bq_schema.fields.expect("Schema fields are not present"); + let fields = bq_schema.fields.as_ref().expect("Schema fields are not present"); let schema = SchemaRef::new(Schema { fields: fields @@ -74,7 +74,7 @@ impl BqSchema { .collect(), }); - Self { schema, fields } + Self { schema, fields: fields.clone() } } pub fn schema(&self) -> SchemaRef { diff --git a/nexus/peer-postgres/src/ast.rs b/nexus/peer-postgres/src/ast.rs index 3e33a535d8..9e76b9eb76 100644 --- a/nexus/peer-postgres/src/ast.rs +++ b/nexus/peer-postgres/src/ast.rs @@ -10,10 +10,11 @@ pub struct PostgresAst { impl PostgresAst { pub fn rewrite_query(&self, query: &mut Query) { visit_relations_mut(query, |table| { - // if the peer name is the first part of the table name, - // remove it. - if Some(table.0[0].value.clone().to_lowercase()) == self.peername { - table.0.remove(0); + // if peer name is first part of table name, remove first part + if let Some(ref peername) = self.peername { + if peername.eq_ignore_ascii_case(&table.0[0].value) { + table.0.remove(0); + } } ControlFlow::<()>::Continue(()) }); @@ -29,9 +30,12 @@ impl PostgresAst { } = stmnt { if object_type == &ObjectType::Table { - let table = names.get_mut(0).unwrap(); - if Some(table.0[0].value.clone().to_lowercase()) == self.peername { - table.0.remove(0); + if let Some(ref peername) = self.peername { + if let Some(table) = names.first_mut() { + if peername.eq_ignore_ascii_case(&table.0[0].value) { + table.0.remove(0); + } + } } } } @@ -39,10 +43,11 @@ impl PostgresAst { }); visit_relations_mut(stmt, |table| { - // if the peer name is the first part of the table name, - // remove it. - if Some(table.0[0].value.clone().to_lowercase()) == self.peername { - table.0.remove(0); + // if peer name is first part of table name, remove first part + if let Some(ref peername) = self.peername { + if peername.eq_ignore_ascii_case(&table.0[0].value) { + table.0.remove(0); + } } ControlFlow::<()>::Continue(()) }); diff --git a/nexus/peer-snowflake/src/lib.rs b/nexus/peer-snowflake/src/lib.rs index 86f7b58544..7bb5b18790 100644 --- a/nexus/peer-snowflake/src/lib.rs +++ b/nexus/peer-snowflake/src/lib.rs @@ -131,10 +131,10 @@ impl SnowflakeQueryExecutor { SNOWFLAKE_URL_PREFIX, config.account_id, SNOWFLAKE_URL_SUFFIX ), auth: SnowflakeAuth::new( - config.clone().account_id, - config.clone().username, - config.clone().private_key, - config.clone().password, + config.account_id.clone(), + config.username.clone(), + config.private_key.clone(), + config.password.clone(), DEFAULT_REFRESH_THRESHOLD, DEFAULT_EXPIRY_THRESHOLD, )?, diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index c2a9032a24..84e9cdfb05 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -76,7 +76,7 @@ pub struct NexusBackend { portal_store: Arc>, query_parser: Arc, peer_cursors: Arc>, - executors: Arc>>>, + executors: Arc>>, flow_handler: Option>>, peerdb_fdw_mode: bool, } @@ -104,7 +104,7 @@ impl NexusBackend { // execute a statement on a peer async fn execute_statement<'a>( &self, - executor: Arc>, + executor: Arc, stmt: &sqlparser::ast::Statement, peer_holder: Option>, ) -> PgWireResult>> { @@ -641,26 +641,26 @@ impl NexusBackend { Ok(workflow_id) } - async fn get_peer_executor(&self, peer: &Peer) -> anyhow::Result>> { + async fn get_peer_executor(&self, peer: &Peer) -> anyhow::Result> { if let Some(executor) = self.executors.get(&peer.name) { return Ok(Arc::clone(executor.value())); } - let executor = match &peer.config { + let executor: Arc = match &peer.config { Some(Config::BigqueryConfig(ref c)) => { let peer_name = peer.name.clone(); let executor = BigQueryQueryExecutor::new(peer_name, c, self.peer_connections.clone()).await?; - Arc::new(Box::new(executor) as Box) + Arc::new(executor) } Some(Config::PostgresConfig(ref c)) => { let peername = Some(peer.name.clone()); let executor = peer_postgres::PostgresQueryExecutor::new(peername, c).await?; - Arc::new(Box::new(executor) as Box) + Arc::new(executor) } Some(Config::SnowflakeConfig(ref c)) => { let executor = peer_snowflake::SnowflakeQueryExecutor::new(c).await?; - Arc::new(Box::new(executor) as Box) + Arc::new(executor) } _ => { panic!("peer type not supported: {:?}", peer)