diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 69f3141821..c275344312 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -166,7 +166,7 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> { destination_table_identifier: table_mapping.destination.to_string(), partition_key: table_mapping .partition_key - .clone() + .as_ref() .map(|s| s.to_string()), }); } @@ -728,9 +728,9 @@ fn parse_db_options( // check if peers contains key and if it does // then add it to the eventhubs hashmap, if not error if let Some(peer) = peers.get(&key) { - let eventhub_config = peer.config.clone().unwrap(); + let eventhub_config = peer.config.as_ref().unwrap(); if let Config::EventhubConfig(eventhub_config) = eventhub_config { - eventhubs.insert(key.to_string(), eventhub_config); + eventhubs.insert(key.to_string(), eventhub_config.clone()); } else { anyhow::bail!("Peer '{}' is not an eventhub", key); } diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index fb4c942961..4b691dc56e 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -19,7 +19,7 @@ mod embedded { pub struct Catalog { pg: Box, - executor: Arc>, + executor: Arc, } async fn run_migrations(client: &mut Client) -> anyhow::Result<()> { @@ -86,11 +86,10 @@ impl Catalog { let pt_config = catalog_config.to_postgres_config(); let client = connect_postgres(&pt_config).await?; let executor = PostgresQueryExecutor::new(None, &pt_config).await?; - let boxed_trait = Box::new(executor) as Box; Ok(Self { pg: Box::new(client), - executor: Arc::new(boxed_trait), + executor: Arc::new(executor), }) } @@ -98,7 +97,7 @@ impl Catalog { run_migrations(&mut self.pg).await } - pub fn get_executor(&self) -> Arc> { + pub fn get_executor(&self) -> Arc { self.executor.clone() } @@ -213,18 +212,18 @@ impl Catalog { let mut peers = HashMap::new(); for row in rows { - let name: String = row.get(1); + let name: &str = row.get(1); let peer_type: i32 = row.get(2); - let options: Vec = row.get(3); + let options: &[u8] = row.get(3); let db_type = DbType::from_i32(peer_type); - let config = self.get_config(db_type, &name, options).await?; + let config = self.get_config(db_type, name, options).await?; let peer = Peer { - name: name.clone().to_lowercase(), + name: name.to_lowercase(), r#type: peer_type, config, }; - peers.insert(name, peer); + peers.insert(name.to_string(), peer); } Ok(peers) @@ -242,14 +241,14 @@ impl Catalog { let rows = self.pg.query(&stmt, &[&peer_name]).await?; if let Some(row) = rows.first() { - let name: String = row.get(1); + let name: &str = row.get(1); let peer_type: i32 = row.get(2); - let options: Vec = row.get(3); + let options: &[u8] = row.get(3); let db_type = DbType::from_i32(peer_type); - let config = self.get_config(db_type, &name, options).await?; + let config = self.get_config(db_type, name, options).await?; let peer = Peer { - name: name.clone().to_lowercase(), + name: name.to_lowercase(), r#type: peer_type, config, }; @@ -269,14 +268,14 @@ impl Catalog { let rows = self.pg.query(&stmt, &[&peer_id]).await?; if let Some(row) = rows.first() { - let name: String = row.get(0); + let name: &str = row.get(0); let peer_type: i32 = row.get(1); - let options: Vec = row.get(2); + let options: &[u8] = row.get(2); let db_type = DbType::from_i32(peer_type); - let config = self.get_config(db_type, &name, options).await?; + let config = self.get_config(db_type, name, options).await?; let peer = Peer { - name: name.clone().to_lowercase(), + name: name.to_lowercase(), r#type: peer_type, config, }; @@ -291,49 +290,49 @@ impl Catalog { &self, db_type: Option, name: &str, - options: Vec, + options: &[u8], ) -> anyhow::Result> { match db_type { Some(DbType::Snowflake) => { let err = format!("unable to decode {} options for peer {}", "snowflake", name); let snowflake_config = - pt::peerdb_peers::SnowflakeConfig::decode(options.as_slice()).context(err)?; + pt::peerdb_peers::SnowflakeConfig::decode(options).context(err)?; Ok(Some(Config::SnowflakeConfig(snowflake_config))) } Some(DbType::Bigquery) => { let err = format!("unable to decode {} options for peer {}", "bigquery", name); let bigquery_config = - pt::peerdb_peers::BigqueryConfig::decode(options.as_slice()).context(err)?; + pt::peerdb_peers::BigqueryConfig::decode(options).context(err)?; Ok(Some(Config::BigqueryConfig(bigquery_config))) } Some(DbType::Mongo) => { let err = format!("unable to decode {} options for peer {}", "mongo", name); let mongo_config = - pt::peerdb_peers::MongoConfig::decode(options.as_slice()).context(err)?; + pt::peerdb_peers::MongoConfig::decode(options).context(err)?; Ok(Some(Config::MongoConfig(mongo_config))) } Some(DbType::Eventhub) => { let err = format!("unable to decode {} options for peer {}", "eventhub", name); let eventhub_config = - pt::peerdb_peers::EventHubConfig::decode(options.as_slice()).context(err)?; + pt::peerdb_peers::EventHubConfig::decode(options).context(err)?; Ok(Some(Config::EventhubConfig(eventhub_config))) } Some(DbType::Postgres) => { let err = format!("unable to decode {} options for peer {}", "postgres", name); let postgres_config = - pt::peerdb_peers::PostgresConfig::decode(options.as_slice()).context(err)?; + pt::peerdb_peers::PostgresConfig::decode(options).context(err)?; Ok(Some(Config::PostgresConfig(postgres_config))) } Some(DbType::S3) => { let err = format!("unable to decode {} options for peer {}", "s3", name); let s3_config = - pt::peerdb_peers::S3Config::decode(options.as_slice()).context(err)?; + pt::peerdb_peers::S3Config::decode(options).context(err)?; Ok(Some(Config::S3Config(s3_config))) } Some(DbType::Sqlserver) => { let err = format!("unable to decode {} options for peer {}", "sqlserver", name); let sqlserver_config = - pt::peerdb_peers::SqlServerConfig::decode(options.as_slice()).context(err)?; + pt::peerdb_peers::SqlServerConfig::decode(options).context(err)?; Ok(Some(Config::SqlserverConfig(sqlserver_config))) } Some(DbType::EventhubGroup) => { @@ -342,7 +341,7 @@ impl Catalog { "eventhub_group", name ); let eventhub_group_config = - pt::peerdb_peers::EventHubGroupConfig::decode(options.as_slice()) + pt::peerdb_peers::EventHubGroupConfig::decode(options) .context(err)?; Ok(Some(Config::EventhubGroupConfig(eventhub_group_config))) } @@ -528,13 +527,12 @@ impl Catalog { let first_row = rows.get(0).unwrap(); let workflow_id: Option = first_row.get(0); - if workflow_id.is_none() { + let Some(workflow_id) = workflow_id else { return Err(anyhow!( "workflow id not found for existing flow job {}", flow_job_name )); - } - let workflow_id = workflow_id.unwrap(); + }; let source_peer_id: i32 = first_row.get(1); let destination_peer_id: i32 = first_row.get(2); diff --git a/nexus/peer-bigquery/src/ast.rs b/nexus/peer-bigquery/src/ast.rs index 075bcc09c5..0c922e5ed7 100644 --- a/nexus/peer-bigquery/src/ast.rs +++ b/nexus/peer-bigquery/src/ast.rs @@ -12,11 +12,11 @@ use sqlparser::ast::{ pub struct BigqueryAst {} impl BigqueryAst { - pub fn is_timestamp_returning_function(&self, name: String) -> bool { - if name == "now" - || name == "date_trunc" - || name == "make_timestamp" - || name == "current_timestamp" + pub fn is_timestamp_returning_function(&self, name: &str) -> bool { + if name.eq_ignore_ascii_case("now") + || name.eq_ignore_ascii_case("date_trunc") + || name.eq_ignore_ascii_case("make_timestamp") + || name.eq_ignore_ascii_case("current_timestamp") { return true; } @@ -37,7 +37,7 @@ impl BigqueryAst { .. }) = e { - if self.is_timestamp_returning_function(v[0].to_string().to_lowercase()) { + if self.is_timestamp_returning_function(&v[0].value) { return true; } } @@ -49,21 +49,20 @@ impl BigqueryAst { false } - pub fn convert_to_datetimefield(&self, t: String) -> Option { - let t_lower = t.to_lowercase(); - if t_lower == "day" || t_lower == "days" { + pub fn convert_to_datetimefield(&self, t: &str) -> Option { + if t.eq_ignore_ascii_case("day") || t.eq_ignore_ascii_case("days") { return Some(DateTimeField::Day); } - if t_lower == "hour" || t_lower == "hours" { + if t.eq_ignore_ascii_case("hour") || t.eq_ignore_ascii_case("hours") { return Some(DateTimeField::Hour); } - if t_lower == "minute" || t_lower == "minutes" { + if t.eq_ignore_ascii_case("minute") || t.eq_ignore_ascii_case("minutes") { return Some(DateTimeField::Minute); } - if t_lower == "second" || t_lower == "Seconds" { + if t.eq_ignore_ascii_case("second") || t.eq_ignore_ascii_case("seconds") { return Some(DateTimeField::Second); } - if t_lower == "millisecond" || t_lower == "milliseconds" { + if t.eq_ignore_ascii_case("millisecond") || t.eq_ignore_ascii_case("milliseconds") { return Some(DateTimeField::Milliseconds); } None @@ -119,7 +118,7 @@ impl BigqueryAst { }) = node { // now() to CURRENT_TIMESTAMP - if v[0].to_string().to_lowercase() == "now" { + if v[0].value.eq_ignore_ascii_case("now") { v[0].value = "CURRENT_TIMESTAMP".into(); } } @@ -131,7 +130,7 @@ impl BigqueryAst { .. }) = node { - if let Expr::Value(sqlparser::ast::Value::SingleQuotedString(s)) = value.as_mut() { + if let Expr::Value(sqlparser::ast::Value::SingleQuotedString(s)) = value.as_ref() { /* postgres will have interval '1 Day' rewriting that to interval 1 Day in BQ @@ -139,9 +138,8 @@ impl BigqueryAst { let split = s.split(' '); let vec = split.collect::>(); let val_string: String = vec[0].into(); - let date_time_field_string: String = vec[1].into(); + let date_time_field = self.convert_to_datetimefield(vec[1]); *(value.as_mut()) = Expr::Value(Number(val_string, false)); - let date_time_field = self.convert_to_datetimefield(date_time_field_string); if date_time_field.is_none() { // Error handling - Nexus for BQ only supports Day, Hour, Minute, Second, Millisecond } @@ -164,7 +162,7 @@ impl BigqueryAst { change - to DATE_SUB */ if let Expr::BinaryOp { left, op, right } = node { - if self.is_timestamp_expr(left.as_mut()) || self.is_timestamp_expr(right.as_mut()) { + if self.is_timestamp_expr(left.as_ref()) || self.is_timestamp_expr(right.as_ref()) { if let BinaryOperator::Minus = op { *node = Expr::Function(Function { name: ObjectName(vec![Ident::new("DATE_SUB".to_string())]), @@ -198,7 +196,7 @@ impl BigqueryAst { .. }) = node { - if v[0].to_string().to_lowercase() == "date_trunc" { + if v[0].value.eq_ignore_ascii_case("date_trunc") { let mut date_part = a[0].to_string(); let date_expression = &a[1]; a[0] = date_expression.clone(); diff --git a/nexus/peer-bigquery/src/stream.rs b/nexus/peer-bigquery/src/stream.rs index fc4867b4f3..e04410f355 100644 --- a/nexus/peer-bigquery/src/stream.rs +++ b/nexus/peer-bigquery/src/stream.rs @@ -60,9 +60,9 @@ impl BqSchema { let bq_schema = result_set .query_response() .schema - .clone() + .as_ref() .expect("Schema is not present"); - let fields = bq_schema.fields.expect("Schema fields are not present"); + let fields = bq_schema.fields.as_ref().expect("Schema fields are not present"); let schema = SchemaRef::new(Schema { fields: fields @@ -74,7 +74,7 @@ impl BqSchema { .collect(), }); - Self { schema, fields } + Self { schema, fields: fields.clone() } } pub fn schema(&self) -> SchemaRef { diff --git a/nexus/peer-postgres/src/ast.rs b/nexus/peer-postgres/src/ast.rs index 3e33a535d8..9e76b9eb76 100644 --- a/nexus/peer-postgres/src/ast.rs +++ b/nexus/peer-postgres/src/ast.rs @@ -10,10 +10,11 @@ pub struct PostgresAst { impl PostgresAst { pub fn rewrite_query(&self, query: &mut Query) { visit_relations_mut(query, |table| { - // if the peer name is the first part of the table name, - // remove it. - if Some(table.0[0].value.clone().to_lowercase()) == self.peername { - table.0.remove(0); + // if peer name is first part of table name, remove first part + if let Some(ref peername) = self.peername { + if peername.eq_ignore_ascii_case(&table.0[0].value) { + table.0.remove(0); + } } ControlFlow::<()>::Continue(()) }); @@ -29,9 +30,12 @@ impl PostgresAst { } = stmnt { if object_type == &ObjectType::Table { - let table = names.get_mut(0).unwrap(); - if Some(table.0[0].value.clone().to_lowercase()) == self.peername { - table.0.remove(0); + if let Some(ref peername) = self.peername { + if let Some(table) = names.first_mut() { + if peername.eq_ignore_ascii_case(&table.0[0].value) { + table.0.remove(0); + } + } } } } @@ -39,10 +43,11 @@ impl PostgresAst { }); visit_relations_mut(stmt, |table| { - // if the peer name is the first part of the table name, - // remove it. - if Some(table.0[0].value.clone().to_lowercase()) == self.peername { - table.0.remove(0); + // if peer name is first part of table name, remove first part + if let Some(ref peername) = self.peername { + if peername.eq_ignore_ascii_case(&table.0[0].value) { + table.0.remove(0); + } } ControlFlow::<()>::Continue(()) }); diff --git a/nexus/peer-snowflake/src/auth.rs b/nexus/peer-snowflake/src/auth.rs index 10eb2e32e8..0ce026e1ac 100644 --- a/nexus/peer-snowflake/src/auth.rs +++ b/nexus/peer-snowflake/src/auth.rs @@ -86,8 +86,8 @@ impl SnowflakeAuth { .get_or_insert(raw_account.chars().count()) }; raw_account - .to_uppercase() .chars() + .flat_map(char::to_uppercase) .take(split_index) .collect() } diff --git a/nexus/peer-snowflake/src/lib.rs b/nexus/peer-snowflake/src/lib.rs index 86f7b58544..7bb5b18790 100644 --- a/nexus/peer-snowflake/src/lib.rs +++ b/nexus/peer-snowflake/src/lib.rs @@ -131,10 +131,10 @@ impl SnowflakeQueryExecutor { SNOWFLAKE_URL_PREFIX, config.account_id, SNOWFLAKE_URL_SUFFIX ), auth: SnowflakeAuth::new( - config.clone().account_id, - config.clone().username, - config.clone().private_key, - config.clone().password, + config.account_id.clone(), + config.username.clone(), + config.private_key.clone(), + config.password.clone(), DEFAULT_REFRESH_THRESHOLD, DEFAULT_EXPIRY_THRESHOLD, )?, diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 4bcc742cfc..12b25384be 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -76,7 +76,7 @@ pub struct NexusBackend { portal_store: Arc>, query_parser: Arc, peer_cursors: Arc>, - executors: Arc>>>, + executors: Arc>>, flow_handler: Option>>, peerdb_fdw_mode: bool, } @@ -104,7 +104,7 @@ impl NexusBackend { // execute a statement on a peer async fn execute_statement<'a>( &self, - executor: Arc>, + executor: Arc, stmt: &sqlparser::ast::Statement, peer_holder: Option>, ) -> PgWireResult>> { @@ -699,26 +699,26 @@ impl NexusBackend { Ok(workflow_id) } - async fn get_peer_executor(&self, peer: &Peer) -> anyhow::Result>> { + async fn get_peer_executor(&self, peer: &Peer) -> anyhow::Result> { if let Some(executor) = self.executors.get(&peer.name) { return Ok(Arc::clone(executor.value())); } - let executor = match &peer.config { + let executor: Arc = match &peer.config { Some(Config::BigqueryConfig(ref c)) => { let peer_name = peer.name.clone(); let executor = BigQueryQueryExecutor::new(peer_name, c, self.peer_connections.clone()).await?; - Arc::new(Box::new(executor) as Box) + Arc::new(executor) } Some(Config::PostgresConfig(ref c)) => { let peername = Some(peer.name.clone()); let executor = peer_postgres::PostgresQueryExecutor::new(peername, c).await?; - Arc::new(Box::new(executor) as Box) + Arc::new(executor) } Some(Config::SnowflakeConfig(ref c)) => { let executor = peer_snowflake::SnowflakeQueryExecutor::new(c).await?; - Arc::new(Box::new(executor) as Box) + Arc::new(executor) } _ => { panic!("peer type not supported: {:?}", peer)