From 28f22bbce17b96e35868b70ae864e78879960c08 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Tue, 30 Apr 2024 04:58:02 -0700 Subject: [PATCH] More fixes for MySQL peer (#1655) Support rewrites for IN --- nexus/Cargo.lock | 16 ++++- nexus/Cargo.toml | 14 +++- nexus/peer-ast/Cargo.toml | 12 ++++ nexus/peer-ast/src/lib.rs | 101 ++++++++++++++++++++++++++++ nexus/peer-bigquery/Cargo.toml | 1 + nexus/peer-bigquery/src/ast.rs | 119 ++------------------------------- nexus/peer-mysql/Cargo.toml | 1 + nexus/peer-mysql/src/ast.rs | 47 ++++++++++++- 8 files changed, 192 insertions(+), 119 deletions(-) create mode 100644 nexus/peer-ast/Cargo.toml create mode 100644 nexus/peer-ast/src/lib.rs diff --git a/nexus/Cargo.lock b/nexus/Cargo.lock index 677c0b2a82..5bcd71b614 100644 --- a/nexus/Cargo.lock +++ b/nexus/Cargo.lock @@ -1592,9 +1592,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.153" +version = "0.2.154" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +checksum = "ae743338b92ff9146ce83992f766a31066a91a8c84a45e0e9f21e7cf6de6d346" [[package]] name = "libloading" @@ -2002,6 +2002,16 @@ dependencies = [ "hmac", ] +[[package]] +name = "peer-ast" +version = "0.1.0" +dependencies = [ + "anyhow", + "pt", + "rust_decimal", + "sqlparser", +] + [[package]] name = "peer-bigquery" version = "0.1.0" @@ -2011,6 +2021,7 @@ dependencies = [ "chrono", "futures", "gcp-bigquery-client", + "peer-ast", "peer-connections", "peer-cursor", "pgwire", @@ -2063,6 +2074,7 @@ dependencies = [ "chrono", "futures", "mysql_async", + "peer-ast", "peer-connections", "peer-cursor", "pgwire", diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index adc48736f6..858243453a 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -4,6 +4,7 @@ members = [ "catalog", "flow-rs", "parser", + "peer-ast", "peer-bigquery", "peer-connections", "peer-cursor", @@ -19,9 +20,16 @@ members = [ resolver = "2" [workspace.dependencies] -chrono = { version = "0.4", default-features = false, features = ["serde", "std"] } +chrono = { version = "0.4", default-features = false, features = [ + "serde", + "std", +] } dashmap = "5.0" -rust_decimal = { version = "1", default-features = false, features = ["tokio-pg"] } +rust_decimal = { version = "1", default-features = false, features = [ + "tokio-pg", +] } sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git", branch = "main" } tracing = "0.1" -pgwire = { version = "0.22", default-features = false, features = ["server-api-ring"] } +pgwire = { version = "0.22", default-features = false, features = [ + "server-api-ring", +] } diff --git a/nexus/peer-ast/Cargo.toml b/nexus/peer-ast/Cargo.toml new file mode 100644 index 0000000000..91264b6a41 --- /dev/null +++ b/nexus/peer-ast/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "peer-ast" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0" +pt = { path = "../pt" } +rust_decimal.workspace = true +sqlparser.workspace = true diff --git a/nexus/peer-ast/src/lib.rs b/nexus/peer-ast/src/lib.rs new file mode 100644 index 0000000000..cab1728999 --- /dev/null +++ b/nexus/peer-ast/src/lib.rs @@ -0,0 +1,101 @@ +use sqlparser::ast::{Array, ArrayElemTypeDef, DataType, Expr}; + +/// Flatten Cast EXPR to List with right value type +/// For example Value(SingleQuotedString("{hash1,hash2}") must return +/// a vector Value(SingleQuotedString("hash1"), Value(SingleQuotedString("hash2"))) +pub fn flatten_expr_to_in_list(expr: &Expr) -> anyhow::Result> { + let mut list = vec![]; + // check if expr is of type Cast + if let Expr::Cast { + expr, data_type, .. + } = expr + { + // assert that expr is of type SingleQuotedString + if let Expr::Value(sqlparser::ast::Value::SingleQuotedString(s)) = expr.as_ref() { + // trim the starting and ending curly braces + let s = s.trim_start_matches('{').trim_end_matches('}'); + // split string by comma + let split = s.split(','); + // match on data type, and create a vector of Expr::Value + match data_type { + DataType::Array(ArrayElemTypeDef::AngleBracket(inner)) + | DataType::Array(ArrayElemTypeDef::SquareBracket(inner)) => match inner.as_ref() { + DataType::Text | DataType::Char(_) | DataType::Varchar(_) => { + for s in split { + list.push(Expr::Value(sqlparser::ast::Value::SingleQuotedString( + s.to_string(), + ))); + } + } + DataType::Integer(_) + | DataType::Float(_) + | DataType::BigInt(_) + | DataType::UnsignedBigInt(_) + | DataType::UnsignedInteger(_) + | DataType::UnsignedSmallInt(_) + | DataType::UnsignedTinyInt(_) + | DataType::TinyInt(_) + | DataType::UnsignedInt(_) => { + for s in split { + list.push(Expr::Value(sqlparser::ast::Value::Number( + s.to_string(), + false, + ))); + } + } + _ => { + return Err(anyhow::anyhow!( + "Unsupported inner data type for IN list: {:?}", + data_type + )) + } + }, + _ => { + return Err(anyhow::anyhow!( + "Unsupported data type for IN list: {:?}", + data_type + )) + } + } + } else if let Expr::Array(arr) = expr.as_ref() { + list = pour_array_into_list(arr, list).expect("Failed to transfer array to list"); + } + } else if let Expr::Array(arr) = expr { + list = pour_array_into_list(arr, list).expect("Failed to transfer array to list"); + } + + Ok(list) +} + +fn pour_array_into_list(arr: &Array, mut list: Vec) -> anyhow::Result> { + for element in &arr.elem { + match &element { + Expr::Value(val) => match val { + sqlparser::ast::Value::Number(_, _) => { + list.push(Expr::Value(sqlparser::ast::Value::Number( + element.to_string(), + false, + ))); + } + sqlparser::ast::Value::SingleQuotedString(_) => { + list.push(Expr::Value(sqlparser::ast::Value::UnQuotedString( + element.to_string(), + ))); + } + _ => { + return Err(anyhow::anyhow!( + "Unsupported data type for IN list: {:?}", + val + )) + } + }, + _ => { + return Err(anyhow::anyhow!( + "Unsupported element for IN list: {:?}", + element + )) + } + } + } + Ok(list) +} diff --git a/nexus/peer-bigquery/Cargo.toml b/nexus/peer-bigquery/Cargo.toml index fefba5bf25..94670fe4f6 100644 --- a/nexus/peer-bigquery/Cargo.toml +++ b/nexus/peer-bigquery/Cargo.toml @@ -10,6 +10,7 @@ anyhow = "1.0" async-trait = "0.1" chrono.workspace = true futures = { version = "0.3.28", features = ["executor"] } +peer-ast = { path = "../peer-ast" } peer-cursor = { path = "../peer-cursor" } peer-connections = { path = "../peer-connections" } pgwire.workspace = true diff --git a/nexus/peer-bigquery/src/ast.rs b/nexus/peer-bigquery/src/ast.rs index 15e5efe5a4..fd8153382a 100644 --- a/nexus/peer-bigquery/src/ast.rs +++ b/nexus/peer-bigquery/src/ast.rs @@ -1,11 +1,12 @@ use std::ops::ControlFlow; +use peer_ast::flatten_expr_to_in_list; use sqlparser::ast::Value::Number; use sqlparser::ast::{ visit_expressions_mut, visit_function_arg_mut, visit_relations_mut, visit_setexpr_mut, Array, - ArrayElemTypeDef, BinaryOperator, DataType, DateTimeField, Expr, Function, FunctionArg, - FunctionArgExpr, Ident, ObjectName, Query, SetExpr, SetOperator, SetQuantifier, TimezoneInfo, + BinaryOperator, DataType, DateTimeField, Expr, Function, FunctionArg, FunctionArgExpr, Ident, + ObjectName, Query, SetExpr, SetOperator, SetQuantifier, TimezoneInfo, }; pub struct BigqueryAst; @@ -81,9 +82,8 @@ impl BigqueryAst { .. } = arg_expr { - let list = self - .flatten_expr_to_in_list(arg_expr) - .expect("failed to flatten in function"); + let list = + flatten_expr_to_in_list(arg_expr).expect("failed to flatten in function"); let rewritten_array = Array { elem: list, named: true, @@ -241,9 +241,7 @@ impl BigqueryAst { } = node { if matches!(compare_op, BinaryOperator::Eq | BinaryOperator::NotEq) { - let list = self - .flatten_expr_to_in_list(right) - .expect("failed to flatten"); + let list = flatten_expr_to_in_list(right).expect("failed to flatten"); *node = Expr::InList { expr: left.clone(), list, @@ -257,109 +255,4 @@ impl BigqueryAst { Ok(()) } - - fn pour_array_into_list(&self, arr: &Array, mut list: Vec) -> anyhow::Result> { - for element in &arr.elem { - match &element { - Expr::Value(val) => match val { - sqlparser::ast::Value::Number(_, _) => { - list.push(Expr::Value(sqlparser::ast::Value::Number( - element.to_string(), - false, - ))); - } - sqlparser::ast::Value::SingleQuotedString(_) => { - list.push(Expr::Value(sqlparser::ast::Value::UnQuotedString( - element.to_string(), - ))); - } - _ => { - return Err(anyhow::anyhow!( - "Unsupported data type for IN list: {:?}", - val - )) - } - }, - _ => { - return Err(anyhow::anyhow!( - "Unsupported element for IN list: {:?}", - element - )) - } - } - } - Ok(list) - } - /// Flatten Cast EXPR to List with right value type - /// For example Value(SingleQuotedString("{hash1,hash2}") must return - /// a vector Value(SingleQuotedString("hash1"), Value(SingleQuotedString("hash2"))) - fn flatten_expr_to_in_list(&self, expr: &Expr) -> anyhow::Result> { - let mut list = vec![]; - // check if expr is of type Cast - if let Expr::Cast { - expr, data_type, .. - } = expr - { - // assert that expr is of type SingleQuotedString - if let Expr::Value(sqlparser::ast::Value::SingleQuotedString(s)) = expr.as_ref() { - // trim the starting and ending curly braces - let s = s.trim_start_matches('{').trim_end_matches('}'); - // split string by comma - let split = s.split(','); - // match on data type, and create a vector of Expr::Value - match data_type { - DataType::Array(ArrayElemTypeDef::AngleBracket(inner)) - | DataType::Array(ArrayElemTypeDef::SquareBracket(inner)) => { - match inner.as_ref() { - DataType::Text | DataType::Char(_) | DataType::Varchar(_) => { - for s in split { - list.push(Expr::Value( - sqlparser::ast::Value::SingleQuotedString(s.to_string()), - )); - } - } - DataType::Integer(_) - | DataType::Float(_) - | DataType::BigInt(_) - | DataType::UnsignedBigInt(_) - | DataType::UnsignedInteger(_) - | DataType::UnsignedSmallInt(_) - | DataType::UnsignedTinyInt(_) - | DataType::TinyInt(_) - | DataType::UnsignedInt(_) => { - for s in split { - list.push(Expr::Value(sqlparser::ast::Value::Number( - s.to_string(), - false, - ))); - } - } - _ => { - return Err(anyhow::anyhow!( - "Unsupported inner data type for IN list: {:?}", - data_type - )) - } - } - } - _ => { - return Err(anyhow::anyhow!( - "Unsupported data type for IN list: {:?}", - data_type - )) - } - } - } else if let Expr::Array(arr) = expr.as_ref() { - list = self - .pour_array_into_list(arr, list) - .expect("Failed to transfer array to list"); - } - } else if let Expr::Array(arr) = expr { - list = self - .pour_array_into_list(arr, list) - .expect("Failed to transfer array to list"); - } - - Ok(list) - } } diff --git a/nexus/peer-mysql/Cargo.toml b/nexus/peer-mysql/Cargo.toml index 9637716249..3b28572be4 100644 --- a/nexus/peer-mysql/Cargo.toml +++ b/nexus/peer-mysql/Cargo.toml @@ -11,6 +11,7 @@ async-trait = "0.1" chrono.workspace = true futures = { version = "0.3.28", features = ["executor"] } mysql_async = { version = "0.34", default-features = false, features = ["minimal-rust", "rust_decimal", "chrono", "rustls-tls"] } +peer-ast = { path = "../peer-ast" } peer-cursor = { path = "../peer-cursor" } peer-connections = { path = "../peer-connections" } pgwire.workspace = true diff --git a/nexus/peer-mysql/src/ast.rs b/nexus/peer-mysql/src/ast.rs index de1b3ae9dd..00c12b7dbf 100644 --- a/nexus/peer-mysql/src/ast.rs +++ b/nexus/peer-mysql/src/ast.rs @@ -1,6 +1,10 @@ use std::ops::ControlFlow; -use sqlparser::ast::{visit_relations_mut, Expr, Query}; +use peer_ast::flatten_expr_to_in_list; +use sqlparser::ast::{ + visit_expressions_mut, visit_function_arg_mut, visit_relations_mut, Array, BinaryOperator, + DataType, Expr, FunctionArgExpr, Query, +}; pub fn rewrite_query(peername: &str, query: &mut Query) { visit_relations_mut(query, |table| { @@ -15,4 +19,45 @@ pub fn rewrite_query(peername: &str, query: &mut Query) { if let Some(Expr::Cast { expr, .. }) = &query.limit { query.limit = Some((**expr).clone()); } + + visit_function_arg_mut(query, |node| { + if let FunctionArgExpr::Expr(arg_expr) = node { + if let Expr::Cast { + data_type: DataType::Array(_), + .. + } = arg_expr + { + let list = + flatten_expr_to_in_list(arg_expr).expect("failed to flatten in function"); + let rewritten_array = Array { + elem: list, + named: true, + }; + *node = FunctionArgExpr::Expr(Expr::Array(rewritten_array)); + } + } + + ControlFlow::<()>::Continue(()) + }); + + // flatten ANY to IN operation overall. + visit_expressions_mut(query, |node| { + if let Expr::AnyOp { + left, + compare_op, + right, + } = node + { + if matches!(compare_op, BinaryOperator::Eq | BinaryOperator::NotEq) { + let list = flatten_expr_to_in_list(right).expect("failed to flatten"); + *node = Expr::InList { + expr: left.clone(), + list, + negated: matches!(compare_op, BinaryOperator::NotEq), + }; + } + } + + ControlFlow::<()>::Continue(()) + }); }