From fc7a659e383ae475cab0e6e938a12eb4d4a387b9 Mon Sep 17 00:00:00 2001 From: Demur Rumed Date: Fri, 24 Nov 2023 15:29:36 +0000 Subject: [PATCH] nexus: more cloning reduction BigQueryQueryExecutor clones most of config into client, only needing 2 strings out of it afterwards SnowflakeAuth says private_key must not be copied while the caller was calling clone to pass String While going over String parameters to functions, noticed get_peer_of_mirror/handle_mirror_existence not needing ownership --- nexus/peer-bigquery/src/lib.rs | 18 ++++++++++-------- nexus/peer-snowflake/src/auth.rs | 9 ++++----- nexus/peer-snowflake/src/lib.rs | 4 ++-- nexus/server/src/main.rs | 20 +++++++++----------- 4 files changed, 25 insertions(+), 26 deletions(-) diff --git a/nexus/peer-bigquery/src/lib.rs b/nexus/peer-bigquery/src/lib.rs index 902155247a..d90f2f6c1d 100644 --- a/nexus/peer-bigquery/src/lib.rs +++ b/nexus/peer-bigquery/src/lib.rs @@ -20,13 +20,14 @@ mod stream; pub struct BigQueryQueryExecutor { peer_name: String, - config: BigqueryConfig, + project_id: String, + dataset_id: String, peer_connections: Arc, client: Box, cursor_manager: BigQueryCursorManager, } -pub async fn bq_client_from_config(config: BigqueryConfig) -> anyhow::Result { +pub async fn bq_client_from_config(config: &BigqueryConfig) -> anyhow::Result { let sa_key = yup_oauth2::ServiceAccountKey { key_type: Some(config.auth_type.clone()), project_id: Some(config.project_id.clone()), @@ -52,12 +53,13 @@ impl BigQueryQueryExecutor { config: &BigqueryConfig, peer_connections: Arc, ) -> anyhow::Result { - let client = bq_client_from_config(config.clone()).await?; + let client = bq_client_from_config(config).await?; let client = Box::new(client); let cursor_manager = BigQueryCursorManager::new(); Ok(Self { peer_name, - config: config.clone(), + project_id: config.project_id.clone(), + dataset_id: config.dataset_id.clone(), peer_connections, client, cursor_manager, @@ -82,7 +84,7 @@ impl BigQueryQueryExecutor { let result_set = self .client .job() - .query(&self.config.project_id, query_req) + .query(&self.project_id, query_req) .await .map_err(|err| { tracing::error!("error running query: {}", err); @@ -112,7 +114,7 @@ impl QueryExecutor for BigQueryQueryExecutor { let mut query = query.clone(); let bq_ast = ast::BigqueryAst::default(); bq_ast - .rewrite(&self.config.dataset_id, &mut query) + .rewrite(&self.dataset_id, &mut query) .context("unable to rewrite query") .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { @@ -222,7 +224,7 @@ impl QueryExecutor for BigQueryQueryExecutor { let mut query = query.clone(); let bq_ast = ast::BigqueryAst::default(); bq_ast - .rewrite(&self.config.dataset_id, &mut query) + .rewrite(&self.dataset_id, &mut query) .context("unable to rewrite query") .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { @@ -260,7 +262,7 @@ impl QueryExecutor for BigQueryQueryExecutor { let _result_set = self .client .job() - .query(&self.config.project_id, QueryRequest::new(sql)) + .query(&self.project_id, QueryRequest::new(sql)) .await?; Ok(true) } diff --git a/nexus/peer-snowflake/src/auth.rs b/nexus/peer-snowflake/src/auth.rs index 0ce026e1ac..e379b0f7fd 100644 --- a/nexus/peer-snowflake/src/auth.rs +++ b/nexus/peer-snowflake/src/auth.rs @@ -36,21 +36,20 @@ pub struct SnowflakeAuth { } impl SnowflakeAuth { - // When initializing, private_key must not be copied, to improve security of credentials. #[tracing::instrument(name = "peer_sflake::init_client_auth", skip_all)] pub fn new( account_id: String, username: String, - private_key: String, - password: Option, + private_key: &str, + password: Option<&str>, refresh_threshold: u64, expiry_threshold: u64, ) -> anyhow::Result { let pkey = match password { - Some(pw) => DecodePrivateKey::from_pkcs8_encrypted_pem(&private_key, pw) + Some(pw) => DecodePrivateKey::from_pkcs8_encrypted_pem(private_key, pw) .context("Invalid private key or decryption failed")?, None => { - DecodePrivateKey::from_pkcs8_pem(&private_key).context("Invalid private key")? + DecodePrivateKey::from_pkcs8_pem(private_key).context("Invalid private key")? } }; let mut snowflake_auth: SnowflakeAuth = SnowflakeAuth { diff --git a/nexus/peer-snowflake/src/lib.rs b/nexus/peer-snowflake/src/lib.rs index a4eeeacb91..9fe3a8536c 100644 --- a/nexus/peer-snowflake/src/lib.rs +++ b/nexus/peer-snowflake/src/lib.rs @@ -142,8 +142,8 @@ impl SnowflakeQueryExecutor { auth: SnowflakeAuth::new( config.account_id.clone(), config.username.clone(), - config.private_key.clone(), - config.password.clone(), + &config.private_key, + config.password.as_deref(), DEFAULT_REFRESH_THRESHOLD, DEFAULT_EXPIRY_THRESHOLD, )?, diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 7c9d909c80..8537fb9020 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -180,9 +180,9 @@ impl NexusBackend { async fn get_peer_of_mirror( catalog: &MutexGuard<'_, Catalog>, - peer_name: String, + peer_name: &str, ) -> PgWireResult { - let peer = catalog.get_peer(&peer_name).await.map_err(|err| { + let peer = catalog.get_peer(peer_name).await.map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { err_msg: format!("unable to get peer {:?}: {:?}", peer_name, err), })) @@ -192,7 +192,7 @@ impl NexusBackend { fn handle_mirror_existence( if_not_exists: bool, - flow_name: String, + flow_name: &str, ) -> PgWireResult>> { if if_not_exists { let existing_mirror_success = "MIRROR ALREADY EXISTS"; @@ -389,7 +389,7 @@ impl NexusBackend { None, ))]) } else { - Self::handle_mirror_existence(*if_not_exists, qrep_flow_job.name.clone()) + Self::handle_mirror_existence(*if_not_exists, &qrep_flow_job.name) } } _ => unreachable!(), @@ -487,11 +487,9 @@ impl NexusBackend { // get source and destination peers let src_peer = - Self::get_peer_of_mirror(&catalog, flow_job.source_peer.clone()) - .await?; + Self::get_peer_of_mirror(&catalog, &flow_job.source_peer).await?; let dst_peer = - Self::get_peer_of_mirror(&catalog, flow_job.target_peer.clone()) - .await?; + Self::get_peer_of_mirror(&catalog, &flow_job.target_peer).await?; // make a request to the flow service to start the job. let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; @@ -519,7 +517,7 @@ impl NexusBackend { None, ))]) } else { - Self::handle_mirror_existence(*if_not_exists, flow_job.name.clone()) + Self::handle_mirror_existence(*if_not_exists, &flow_job.name) } } PeerDDL::CreateMirrorForSelect { .. } => { @@ -931,9 +929,9 @@ impl NexusBackend { let executor: Arc = match &peer.config { Some(Config::BigqueryConfig(ref c)) => { - let peer_name = peer.name.clone(); let executor = - BigQueryQueryExecutor::new(peer_name, c, self.peer_connections.clone()).await?; + BigQueryQueryExecutor::new(peer.name.clone(), c, self.peer_connections.clone()) + .await?; Arc::new(executor) } Some(Config::PostgresConfig(ref c)) => {