diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 0642966713..83bea8d59c 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -922,11 +922,7 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu .context("no port specified")? .parse::() .context("unable to parse port as valid int")?, - user: opts - .get("user") - .cloned() - .unwrap_or_default() - .to_string(), + user: opts.get("user").cloned().unwrap_or_default().to_string(), password: opts .get("password") .cloned() diff --git a/nexus/analyzer/src/qrep.rs b/nexus/analyzer/src/qrep.rs index 692b093d94..41ab1aa40a 100644 --- a/nexus/analyzer/src/qrep.rs +++ b/nexus/analyzer/src/qrep.rs @@ -196,8 +196,7 @@ pub fn process_options( // If mode is upsert, we need unique key columns if opts.get("mode") == Some(&Value::String(String::from("upsert"))) - && (opts.get("unique_key_columns").is_none() - || opts.get("unique_key_columns") == Some(&Value::Array(vec![]))) + && opts.get("unique_key_columns").map(|ukc| ukc == &Value::Array(Vec::new())).unwrap_or(true) { anyhow::bail!("For upsert mode, unique_key_columns must be specified"); } diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index 0c3b65d2b8..e8a569f669 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -449,9 +449,9 @@ impl Catalog { ) .await?; - if job.flow_options.get("destination_table_name").is_none() { + let Some(destination_table_name) = job.flow_options.get("destination_table_name") else { return Err(anyhow!("destination_table_name not found in flow options")); - } + }; let _rows = self .pg @@ -462,11 +462,7 @@ impl Catalog { &source_peer_id, &destination_peer_id, &job.description, - &job.flow_options - .get("destination_table_name") - .unwrap() - .as_str() - .unwrap(), + &destination_table_name.as_str().unwrap(), &job.query_string, &serde_json::to_value(job.flow_options.clone()) .context("unable to serialize flow options")?, diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 35382ca287..70c463e44c 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -199,9 +199,9 @@ impl FlowGrpcClient { for (key, value) in &job.flow_options { match value { Value::String(s) => match key.as_str() { - "destination_table_name" => cfg.destination_table_identifier = s.clone(), - "watermark_column" => cfg.watermark_column = s.clone(), - "watermark_table_name" => cfg.watermark_table = s.clone(), + "destination_table_name" => cfg.destination_table_identifier.clone_from(s), + "watermark_column" => cfg.watermark_column.clone_from(s), + "watermark_table_name" => cfg.watermark_table.clone_from(s), "mode" => { let mut wm = QRepWriteMode { write_type: QRepWriteType::QrepWriteModeAppend as i32, @@ -229,7 +229,7 @@ impl FlowGrpcClient { _ => return anyhow::Result::Err(anyhow::anyhow!("invalid mode {}", s)), } } - "staging_path" => cfg.staging_path = s.clone(), + "staging_path" => cfg.staging_path.clone_from(s), _ => return anyhow::Result::Err(anyhow::anyhow!("invalid str option {}", key)), }, Value::Number(n) => match key.as_str() { diff --git a/nexus/peer-mysql/src/ast.rs b/nexus/peer-mysql/src/ast.rs index 00c12b7dbf..e581cdbb88 100644 --- a/nexus/peer-mysql/src/ast.rs +++ b/nexus/peer-mysql/src/ast.rs @@ -1,15 +1,39 @@ use std::ops::ControlFlow; use peer_ast::flatten_expr_to_in_list; +use serde_json::{self, Value as JsonValue}; use sqlparser::ast::{ visit_expressions_mut, visit_function_arg_mut, visit_relations_mut, Array, BinaryOperator, - DataType, Expr, FunctionArgExpr, Query, + DataType, Expr, FunctionArgExpr, Offset, Query, Value, }; +fn json_to_expr(val: JsonValue) -> Expr { + match val { + JsonValue::Null => Expr::Value(Value::Null), + JsonValue::Bool(x) => Expr::Value(Value::Boolean(x)), + JsonValue::Number(x) => Expr::Value(Value::Number(x.to_string(), false)), + JsonValue::String(x) => Expr::Value(Value::SingleQuotedString(x)), + JsonValue::Array(x) => Expr::Array(Array { + elem: x.into_iter().map(json_to_expr).collect::>(), + named: false, + }), + JsonValue::Object(x) => Expr::Cast { + data_type: DataType::JSON, + expr: Box::new(Expr::Value(Value::SingleQuotedString( + JsonValue::Object(x).to_string(), + ))), + format: None, + }, + } +} + pub fn rewrite_query(peername: &str, query: &mut Query) { visit_relations_mut(query, |table| { // if peer name is first part of table name, remove first part - if peername.eq_ignore_ascii_case(&table.0[0].value) { + // remove `public.` to facilitate mysql global function push down + if table.0.len() > 1 + && (peername.eq_ignore_ascii_case(&table.0[0].value) || table.0[0].value == "public") + { table.0.remove(0); } ControlFlow::<()>::Continue(()) @@ -19,6 +43,16 @@ pub fn rewrite_query(peername: &str, query: &mut Query) { if let Some(Expr::Cast { expr, .. }) = &query.limit { query.limit = Some((**expr).clone()); } + if let Some(Offset { + value: Expr::Cast { expr, .. }, + rows, + }) = &query.offset + { + query.offset = Some(Offset { + value: (**expr).clone(), + rows: *rows, + }); + } visit_function_arg_mut(query, |node| { if let FunctionArgExpr::Expr(arg_expr) = node { @@ -34,6 +68,22 @@ pub fn rewrite_query(peername: &str, query: &mut Query) { named: true, }; *node = FunctionArgExpr::Expr(Expr::Array(rewritten_array)); + } else if let Expr::Cast { + data_type: DataType::JSONB, + expr, + .. + } = arg_expr + { + *node = match **expr { + Expr::Value(Value::SingleQuotedString(ref s)) => { + if let Ok(val) = serde_json::from_str::(s) { + FunctionArgExpr::Expr(json_to_expr(val)) + } else { + FunctionArgExpr::Expr((**expr).clone()) + } + } + _ => FunctionArgExpr::Expr((**expr).clone()), + }; } } diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 51d2999890..176928a71b 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -572,7 +572,7 @@ impl NexusBackend { match qrep_config { Some(mut qrep_config) => { if query_string.is_some() { - qrep_config.query = query_string.as_ref().unwrap().clone(); + qrep_config.query.clone_from(query_string.as_ref().unwrap()); } qrep_config.dst_table_full_resync = true; diff --git a/nexus/server/tests/server_test.rs b/nexus/server/tests/server_test.rs index 49ec206e93..2ca7d02eb7 100644 --- a/nexus/server/tests/server_test.rs +++ b/nexus/server/tests/server_test.rs @@ -162,8 +162,8 @@ fn server_test() { output_file.flush().expect("Unable to flush output file"); } - let obtained_file = std::fs::read_to_string(&actual_output_path).unwrap(); - let expected_file = std::fs::read_to_string(&expected_output_path).unwrap(); + let obtained_file = std::fs::read_to_string(actual_output_path).unwrap(); + let expected_file = std::fs::read_to_string(expected_output_path).unwrap(); // if there is a mismatch, print the diff, along with the path. if obtained_file != expected_file { tracing::info!("failed: {file}");