From 0951fbdf485537057738651aa6c6be37ca113f5e Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 5 Oct 2023 09:46:24 -0400 Subject: [PATCH] fix rust lints --- .github/workflows/rust-lint.yml | 2 +- nexus/parser/src/lib.rs | 7 ++-- nexus/peer-bigquery/src/ast.rs | 24 +++++++------ nexus/peer-bigquery/src/cursor.rs | 2 -- nexus/peer-snowflake/src/auth.rs | 22 ++++++------ nexus/peer-snowflake/src/cursor.rs | 2 -- nexus/peer-snowflake/src/lib.rs | 16 ++++----- nexus/peer-snowflake/src/stream.rs | 55 +++++++++++++++++------------- nexus/server/src/cursor.rs | 4 +-- nexus/server/src/main.rs | 35 +++++++++---------- nexus/value/src/array.rs | 3 +- 11 files changed, 86 insertions(+), 86 deletions(-) diff --git a/.github/workflows/rust-lint.yml b/.github/workflows/rust-lint.yml index 2c623af99..01c3dde17 100644 --- a/.github/workflows/rust-lint.yml +++ b/.github/workflows/rust-lint.yml @@ -26,5 +26,5 @@ jobs: components: clippy - name: clippy - run: cargo clippy --all-targets --all-features -- -D warnings + run: cargo clippy -- -D warnings working-directory: ./nexus diff --git a/nexus/parser/src/lib.rs b/nexus/parser/src/lib.rs index 18716b66e..f5b2aac34 100644 --- a/nexus/parser/src/lib.rs +++ b/nexus/parser/src/lib.rs @@ -23,7 +23,7 @@ pub struct NexusQueryParser { pub enum NexusStatement { PeerDDL { stmt: Statement, - ddl: PeerDDL, + ddl: Box, }, PeerQuery { stmt: Statement, @@ -55,7 +55,7 @@ impl NexusStatement { if let Some(ddl) = ddl { return Ok(NexusStatement::PeerDDL { stmt: stmt.clone(), - ddl, + ddl: Box::new(ddl), }); } @@ -100,8 +100,7 @@ impl NexusQueryParser { let peers = tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move { let catalog = self.catalog.lock().await; - let peers = catalog.get_peers().await; - peers + catalog.get_peers().await }) }); diff --git a/nexus/peer-bigquery/src/ast.rs b/nexus/peer-bigquery/src/ast.rs index dd020a668..075bcc09c 100644 --- a/nexus/peer-bigquery/src/ast.rs +++ b/nexus/peer-bigquery/src/ast.rs @@ -78,17 +78,19 @@ impl BigqueryAst { visit_function_arg_mut(query, |node| { if let FunctionArgExpr::Expr(arg_expr) = node { - if let Expr::Cast { expr: _, data_type } = arg_expr { - if let DataType::Array(_) = data_type { - let list = self - .flatten_expr_to_in_list(&arg_expr) - .expect("failed to flatten in function"); - let rewritten_array = Array { - elem: list, - named: true, - }; - *node = FunctionArgExpr::Expr(Expr::Array(rewritten_array)); - } + if let Expr::Cast { + expr: _, + data_type: DataType::Array(_), + } = arg_expr + { + let list = self + .flatten_expr_to_in_list(arg_expr) + .expect("failed to flatten in function"); + let rewritten_array = Array { + elem: list, + named: true, + }; + *node = FunctionArgExpr::Expr(Expr::Array(rewritten_array)); } } diff --git a/nexus/peer-bigquery/src/cursor.rs b/nexus/peer-bigquery/src/cursor.rs index ab591dcc3..23812a382 100644 --- a/nexus/peer-bigquery/src/cursor.rs +++ b/nexus/peer-bigquery/src/cursor.rs @@ -9,7 +9,6 @@ use sqlparser::ast::Statement; use crate::BigQueryQueryExecutor; pub struct BigQueryCursor { - stmt: Statement, position: usize, stream: Mutex, schema: SchemaRef, @@ -42,7 +41,6 @@ impl BigQueryCursorManager { // Create a new cursor let cursor = BigQueryCursor { - stmt: stmt.clone(), position: 0, stream: Mutex::new(stream), schema, diff --git a/nexus/peer-snowflake/src/auth.rs b/nexus/peer-snowflake/src/auth.rs index 568c09643..10eb2e32e 100644 --- a/nexus/peer-snowflake/src/auth.rs +++ b/nexus/peer-snowflake/src/auth.rs @@ -47,9 +47,8 @@ impl SnowflakeAuth { expiry_threshold: u64, ) -> anyhow::Result { let pkey = match password { - Some(pw) => { - DecodePrivateKey::from_pkcs8_encrypted_pem(&private_key, pw).context("Invalid private key or decryption failed")? - }, + Some(pw) => DecodePrivateKey::from_pkcs8_encrypted_pem(&private_key, pw) + .context("Invalid private key or decryption failed")?, None => { DecodePrivateKey::from_pkcs8_pem(&private_key).context("Invalid private key")? } @@ -77,16 +76,15 @@ impl SnowflakeAuth { // Normalize the account identifer to a form that is embedded into the JWT. // Logic adapted from Snowflake's example Python code for key-pair authentication "sql-api-generate-jwt.py". fn normalize_account_identifier(raw_account: &str) -> String { - let split_index: usize; - if !raw_account.contains(".global") { - split_index = *raw_account - .find(".") - .get_or_insert(raw_account.chars().count()); + let split_index = if !raw_account.contains(".global") { + *raw_account + .find('.') + .get_or_insert(raw_account.chars().count()) } else { - split_index = *raw_account - .find("-") - .get_or_insert(raw_account.chars().count()); - } + *raw_account + .find('-') + .get_or_insert(raw_account.chars().count()) + }; raw_account .to_uppercase() .chars() diff --git a/nexus/peer-snowflake/src/cursor.rs b/nexus/peer-snowflake/src/cursor.rs index b1a0ecc9a..475a2d7f3 100644 --- a/nexus/peer-snowflake/src/cursor.rs +++ b/nexus/peer-snowflake/src/cursor.rs @@ -7,7 +7,6 @@ use sqlparser::ast::Statement; use tokio::sync::Mutex; pub struct SnowflakeCursor { - stmt: Statement, position: usize, stream: Mutex, schema: SchemaRef, @@ -39,7 +38,6 @@ impl SnowflakeCursorManager { // Create a new cursor let cursor = SnowflakeCursor { - stmt: stmt.clone(), position: 0, stream: Mutex::new(stream), schema, diff --git a/nexus/peer-snowflake/src/lib.rs b/nexus/peer-snowflake/src/lib.rs index c2e4fbd9d..86f7b5854 100644 --- a/nexus/peer-snowflake/src/lib.rs +++ b/nexus/peer-snowflake/src/lib.rs @@ -28,13 +28,13 @@ mod stream; const DEFAULT_REFRESH_THRESHOLD: u64 = 3000; const DEFAULT_EXPIRY_THRESHOLD: u64 = 3600; -const SNOWFLAKE_URL_PREFIX: &'static str = "https://"; -const SNOWFLAKE_URL_SUFFIX: &'static str = ".snowflakecomputing.com/api/v2/statements"; +const SNOWFLAKE_URL_PREFIX: &str = "https://"; +const SNOWFLAKE_URL_SUFFIX: &str = ".snowflakecomputing.com/api/v2/statements"; -const DATE_OUTPUT_FORMAT: &'static str = "YYYY/MM/DD"; -const TIME_OUTPUT_FORMAT: &'static str = "HH:MI:SS.FF"; -const TIMESTAMP_OUTPUT_FORMAT: &'static str = "YYYY-MM-DDTHH24:MI:SS.FF"; -const TIMESTAMP_TZ_OUTPUT_FORMAT: &'static str = "YYYY-MM-DDTHH24:MI:SS.FFTZHTZM"; +const DATE_OUTPUT_FORMAT: &str = "YYYY/MM/DD"; +const TIME_OUTPUT_FORMAT: &str = "HH:MI:SS.FF"; +const TIMESTAMP_OUTPUT_FORMAT: &str = "YYYY-MM-DDTHH24:MI:SS.FF"; +const TIMESTAMP_TZ_OUTPUT_FORMAT: &str = "YYYY-MM-DDTHH24:MI:SS.FFTZHTZM"; #[derive(Debug, Serialize)] struct SQLStatement<'a> { @@ -59,7 +59,7 @@ pub(crate) struct ResultSetRowType { r#type: SnowflakeDataType, } -#[allow(non_snake_case)] +#[allow(non_snake_case, dead_code)] #[derive(Deserialize, Debug)] struct ResultSetPartitionInfo { rowCount: u64, @@ -207,7 +207,7 @@ impl SnowflakeQueryExecutor { }) } - pub async fn query(&self, query: &Box) -> PgWireResult { + pub async fn query(&self, query: &Query) -> PgWireResult { let mut query = query.clone(); let ast = ast::SnowflakeAst::default(); diff --git a/nexus/peer-snowflake/src/stream.rs b/nexus/peer-snowflake/src/stream.rs index b4290707a..3434b70df 100644 --- a/nexus/peer-snowflake/src/stream.rs +++ b/nexus/peer-snowflake/src/stream.rs @@ -13,7 +13,6 @@ use pgwire::{ }; use secrecy::ExposeSecret; use serde::Deserialize; -use serde_json; use std::{ pin::Pin, task::{Context, Poll}, @@ -146,17 +145,21 @@ impl SnowflakeRecordStream { // really hacky workaround for parsing the UTC timezone specifically. SnowflakeDataType::TimestampLtz => { match DateTime::parse_from_str(elem, TIMESTAMP_TZ_PARSE_FORMAT) { - Ok(_) => TimestampWithTimeZone(Utc.from_utc_datetime( - &DateTime::parse_from_str(elem, TIMESTAMP_TZ_PARSE_FORMAT)? + Ok(_) => TimestampWithTimeZone( + Utc.from_utc_datetime( + &DateTime::parse_from_str(elem, TIMESTAMP_TZ_PARSE_FORMAT)? + .naive_utc(), + ), + ), + Err(_) => TimestampWithTimeZone( + Utc.from_utc_datetime( + &DateTime::parse_from_str( + &elem.replace('Z', "+0000"), + TIMESTAMP_TZ_PARSE_FORMAT, + )? .naive_utc(), - )), - Err(_) => TimestampWithTimeZone(Utc.from_utc_datetime( - &DateTime::parse_from_str( - &elem.replace("Z", "+0000"), - TIMESTAMP_TZ_PARSE_FORMAT, - )? - .naive_utc(), - )), + ), + ), } } SnowflakeDataType::TimestampNtz => PostgresTimestamp( @@ -164,21 +167,25 @@ impl SnowflakeRecordStream { ), SnowflakeDataType::TimestampTz => { match DateTime::parse_from_str(elem, TIMESTAMP_TZ_PARSE_FORMAT) { - Ok(_) => TimestampWithTimeZone(Utc.from_utc_datetime( - &DateTime::parse_from_str(elem, TIMESTAMP_TZ_PARSE_FORMAT)? + Ok(_) => TimestampWithTimeZone( + Utc.from_utc_datetime( + &DateTime::parse_from_str(elem, TIMESTAMP_TZ_PARSE_FORMAT)? + .naive_utc(), + ), + ), + Err(_) => TimestampWithTimeZone( + Utc.from_utc_datetime( + &DateTime::parse_from_str( + &elem.replace('Z', "+0000"), + TIMESTAMP_TZ_PARSE_FORMAT, + )? .naive_utc(), - )), - Err(_) => TimestampWithTimeZone(Utc.from_utc_datetime( - &DateTime::parse_from_str( - &elem.replace("Z", "+0000"), - TIMESTAMP_TZ_PARSE_FORMAT, - )? - .naive_utc(), - )), + ), + ), } } SnowflakeDataType::Variant => { - let jsonb: serde_json::Value = serde_json::from_str(&elem)?; + let jsonb: serde_json::Value = serde_json::from_str(elem)?; Value::JsonB(jsonb) } }, @@ -188,7 +195,7 @@ impl SnowflakeRecordStream { row_values.push(row_value.unwrap_or(Value::Null)); } - self.partition_index = self.partition_index + 1; + self.partition_index += 1; Ok(Record { values: row_values, @@ -200,7 +207,7 @@ impl SnowflakeRecordStream { if (self.partition_number + 1) == self.result_set.resultSetMetaData.partitionInfo.len() { return Ok(false); } - self.partition_number = self.partition_number + 1; + self.partition_number += 1; self.partition_index = 0; let partition_number = self.partition_number; let secret = self.auth.get_jwt()?.expose_secret().clone(); diff --git a/nexus/server/src/cursor.rs b/nexus/server/src/cursor.rs index 025fdbcf5..36fee27c3 100644 --- a/nexus/server/src/cursor.rs +++ b/nexus/server/src/cursor.rs @@ -24,7 +24,7 @@ impl PeerCursors { self.cursors.remove(&name); } - pub fn get_peer(&self, name: &str) -> Option<&Box> { - self.cursors.get(name) + pub fn get_peer(&self, name: &str) -> Option<&Peer> { + self.cursors.get(name).map(|peer| peer.as_ref()) } } diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index bb7ea02f7..cafe2084f 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -160,10 +160,10 @@ impl NexusBackend { async fn check_for_mirror( catalog: &MutexGuard<'_, Catalog>, - flow_name: String, + flow_name: &str, ) -> PgWireResult> { let workflow_details = catalog - .get_workflow_details_for_flow_job(&flow_name) + .get_workflow_details_for_flow_job(flow_name) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { @@ -257,14 +257,14 @@ impl NexusBackend { ) -> PgWireResult>> { let mut peer_holder: Option> = None; match nexus_stmt { - NexusStatement::PeerDDL { stmt: _, ddl } => match ddl { + NexusStatement::PeerDDL { stmt: _, ddl } => match ddl.as_ref() { PeerDDL::CreatePeer { peer, if_not_exists: _, } => { let peer_type = peer.r#type; if Self::is_peer_validity_supported(peer_type) { - self.validate_peer(peer_type, &peer).await.map_err(|e| { + self.validate_peer(peer_type, peer).await.map_err(|e| { PgWireError::UserError(Box::new(ErrorInfo::new( "ERROR".to_owned(), "internal_error".to_owned(), @@ -295,11 +295,10 @@ impl NexusBackend { }))); } let catalog = self.catalog.lock().await; - let mirror_details = - Self::check_for_mirror(&catalog, flow_job.name.clone()).await?; + let mirror_details = Self::check_for_mirror(&catalog, &flow_job.name).await?; if mirror_details.is_none() { catalog - .create_flow_job_entry(&flow_job) + .create_flow_job_entry(flow_job) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { @@ -321,7 +320,7 @@ impl NexusBackend { // make a request to the flow service to start the job. let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; let workflow_id = flow_handler - .start_peer_flow_job(&flow_job, src_peer, dst_peer) + .start_peer_flow_job(flow_job, src_peer, dst_peer) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { @@ -344,7 +343,7 @@ impl NexusBackend { None, ))]) } else { - Self::handle_mirror_existence(if_not_exists, flow_job.name) + Self::handle_mirror_existence(*if_not_exists, flow_job.name.clone()) } } PeerDDL::CreateMirrorForSelect { @@ -360,13 +359,13 @@ impl NexusBackend { { let catalog = self.catalog.lock().await; mirror_details = - Self::check_for_mirror(&catalog, qrep_flow_job.name.clone()).await?; + Self::check_for_mirror(&catalog, &qrep_flow_job.name).await?; } if mirror_details.is_none() { { let catalog = self.catalog.lock().await; catalog - .create_qrep_flow_job_entry(&qrep_flow_job) + .create_qrep_flow_job_entry(qrep_flow_job) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { @@ -387,14 +386,14 @@ impl NexusBackend { ))]); } - let _workflow_id = self.run_qrep_mirror(&qrep_flow_job).await?; + let _workflow_id = self.run_qrep_mirror(qrep_flow_job).await?; let create_mirror_success = format!("CREATE MIRROR {}", qrep_flow_job.name); Ok(vec![Response::Execution(Tag::new_for_execution( &create_mirror_success, None, ))]) } else { - Self::handle_mirror_existence(if_not_exists, qrep_flow_job.name) + Self::handle_mirror_existence(*if_not_exists, qrep_flow_job.name.clone()) } } PeerDDL::ExecuteMirrorForSelect { flow_job_name } => { @@ -407,7 +406,7 @@ impl NexusBackend { if let Some(job) = { let catalog = self.catalog.lock().await; catalog - .get_qrep_flow_job_by_name(&flow_job_name) + .get_qrep_flow_job_by_name(flow_job_name) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { @@ -442,7 +441,7 @@ impl NexusBackend { let catalog = self.catalog.lock().await; tracing::info!("mirror_name: {}, if_exists: {}", flow_job_name, if_exists); let workflow_details = catalog - .get_workflow_details_for_flow_job(&flow_job_name) + .get_workflow_details_for_flow_job(flow_job_name) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { @@ -460,7 +459,7 @@ impl NexusBackend { let workflow_details = workflow_details.unwrap(); let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; flow_handler - .shutdown_flow_job(&flow_job_name, workflow_details) + .shutdown_flow_job(flow_job_name, workflow_details) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { @@ -468,7 +467,7 @@ impl NexusBackend { })) })?; catalog - .delete_flow_job_entry(&flow_job_name) + .delete_flow_job_entry(flow_job_name) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { @@ -480,7 +479,7 @@ impl NexusBackend { &drop_mirror_success, None, ))]) - } else if if_exists { + } else if *if_exists { let no_mirror_success = "NO SUCH MIRROR"; Ok(vec![Response::Execution(Tag::new_for_execution( no_mirror_success, diff --git a/nexus/value/src/array.rs b/nexus/value/src/array.rs index 9b50c8c67..2fa299bb3 100644 --- a/nexus/value/src/array.rs +++ b/nexus/value/src/array.rs @@ -128,7 +128,7 @@ impl ArrayValue { } } -impl<'a> ToSql for ArrayValue { +impl ToSql for ArrayValue { fn to_sql( &self, ty: &Type, @@ -235,7 +235,6 @@ impl ToSqlText for ArrayValue { ArrayValue::Timestamp(arr) => array_to_sql_text!(arr, ty, out), ArrayValue::TimestampWithTimeZone(arr) => array_to_sql_text!(arr, ty, out), ArrayValue::Empty => {} - _ => todo!(), } // remove trailing comma