Skip to content

Commit

Permalink
More fixes for MySQL peer (#1655)
Browse files Browse the repository at this point in the history
Support rewrites for IN
  • Loading branch information
iskakaushik authored and serprex committed Apr 30, 2024
1 parent 221dedd commit 329c121
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 119 deletions.
16 changes: 14 additions & 2 deletions nexus/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 11 additions & 3 deletions nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"catalog",
"flow-rs",
"parser",
"peer-ast",
"peer-bigquery",
"peer-connections",
"peer-cursor",
Expand All @@ -19,9 +20,16 @@ members = [
resolver = "2"

[workspace.dependencies]
chrono = { version = "0.4", default-features = false, features = ["serde", "std"] }
chrono = { version = "0.4", default-features = false, features = [
"serde",
"std",
] }
dashmap = "5.0"
rust_decimal = { version = "1", default-features = false, features = ["tokio-pg"] }
rust_decimal = { version = "1", default-features = false, features = [
"tokio-pg",
] }
sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git", branch = "main" }
tracing = "0.1"
pgwire = { version = "0.22", default-features = false, features = ["server-api-ring"] }
pgwire = { version = "0.22", default-features = false, features = [
"server-api-ring",
] }
12 changes: 12 additions & 0 deletions nexus/peer-ast/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "peer-ast"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0"
pt = { path = "../pt" }
rust_decimal.workspace = true
sqlparser.workspace = true
101 changes: 101 additions & 0 deletions nexus/peer-ast/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use sqlparser::ast::{Array, ArrayElemTypeDef, DataType, Expr};

/// Flatten Cast EXPR to List with right value type
/// For example Value(SingleQuotedString("{hash1,hash2}") must return
/// a vector Value(SingleQuotedString("hash1"), Value(SingleQuotedString("hash2")))
pub fn flatten_expr_to_in_list(expr: &Expr) -> anyhow::Result<Vec<Expr>> {
let mut list = vec![];
// check if expr is of type Cast
if let Expr::Cast {
expr, data_type, ..
} = expr
{
// assert that expr is of type SingleQuotedString
if let Expr::Value(sqlparser::ast::Value::SingleQuotedString(s)) = expr.as_ref() {
// trim the starting and ending curly braces
let s = s.trim_start_matches('{').trim_end_matches('}');
// split string by comma
let split = s.split(',');
// match on data type, and create a vector of Expr::Value
match data_type {
DataType::Array(ArrayElemTypeDef::AngleBracket(inner))
| DataType::Array(ArrayElemTypeDef::SquareBracket(inner)) => match inner.as_ref() {
DataType::Text | DataType::Char(_) | DataType::Varchar(_) => {
for s in split {
list.push(Expr::Value(sqlparser::ast::Value::SingleQuotedString(
s.to_string(),
)));
}
}
DataType::Integer(_)
| DataType::Float(_)
| DataType::BigInt(_)
| DataType::UnsignedBigInt(_)
| DataType::UnsignedInteger(_)
| DataType::UnsignedSmallInt(_)
| DataType::UnsignedTinyInt(_)
| DataType::TinyInt(_)
| DataType::UnsignedInt(_) => {
for s in split {
list.push(Expr::Value(sqlparser::ast::Value::Number(
s.to_string(),
false,
)));
}
}
_ => {
return Err(anyhow::anyhow!(
"Unsupported inner data type for IN list: {:?}",
data_type
))
}
},
_ => {
return Err(anyhow::anyhow!(
"Unsupported data type for IN list: {:?}",
data_type
))
}
}
} else if let Expr::Array(arr) = expr.as_ref() {
list = pour_array_into_list(arr, list).expect("Failed to transfer array to list");
}
} else if let Expr::Array(arr) = expr {
list = pour_array_into_list(arr, list).expect("Failed to transfer array to list");
}

Ok(list)
}

fn pour_array_into_list(arr: &Array, mut list: Vec<Expr>) -> anyhow::Result<Vec<Expr>> {
for element in &arr.elem {
match &element {
Expr::Value(val) => match val {
sqlparser::ast::Value::Number(_, _) => {
list.push(Expr::Value(sqlparser::ast::Value::Number(
element.to_string(),
false,
)));
}
sqlparser::ast::Value::SingleQuotedString(_) => {
list.push(Expr::Value(sqlparser::ast::Value::UnQuotedString(
element.to_string(),
)));
}
_ => {
return Err(anyhow::anyhow!(
"Unsupported data type for IN list: {:?}",
val
))
}
},
_ => {
return Err(anyhow::anyhow!(
"Unsupported element for IN list: {:?}",
element
))
}
}
}
Ok(list)
}
1 change: 1 addition & 0 deletions nexus/peer-bigquery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ anyhow = "1.0"
async-trait = "0.1"
chrono.workspace = true
futures = { version = "0.3.28", features = ["executor"] }
peer-ast = { path = "../peer-ast" }
peer-cursor = { path = "../peer-cursor" }
peer-connections = { path = "../peer-connections" }
pgwire.workspace = true
Expand Down
119 changes: 6 additions & 113 deletions nexus/peer-bigquery/src/ast.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::ops::ControlFlow;

use peer_ast::flatten_expr_to_in_list;
use sqlparser::ast::Value::Number;

use sqlparser::ast::{
visit_expressions_mut, visit_function_arg_mut, visit_relations_mut, visit_setexpr_mut, Array,
ArrayElemTypeDef, BinaryOperator, DataType, DateTimeField, Expr, Function, FunctionArg,
FunctionArgExpr, Ident, ObjectName, Query, SetExpr, SetOperator, SetQuantifier, TimezoneInfo,
BinaryOperator, DataType, DateTimeField, Expr, Function, FunctionArg, FunctionArgExpr, Ident,
ObjectName, Query, SetExpr, SetOperator, SetQuantifier, TimezoneInfo,
};

pub struct BigqueryAst;
Expand Down Expand Up @@ -81,9 +82,8 @@ impl BigqueryAst {
..
} = arg_expr
{
let list = self
.flatten_expr_to_in_list(arg_expr)
.expect("failed to flatten in function");
let list =
flatten_expr_to_in_list(arg_expr).expect("failed to flatten in function");
let rewritten_array = Array {
elem: list,
named: true,
Expand Down Expand Up @@ -241,9 +241,7 @@ impl BigqueryAst {
} = node
{
if matches!(compare_op, BinaryOperator::Eq | BinaryOperator::NotEq) {
let list = self
.flatten_expr_to_in_list(right)
.expect("failed to flatten");
let list = flatten_expr_to_in_list(right).expect("failed to flatten");
*node = Expr::InList {
expr: left.clone(),
list,
Expand All @@ -257,109 +255,4 @@ impl BigqueryAst {

Ok(())
}

fn pour_array_into_list(&self, arr: &Array, mut list: Vec<Expr>) -> anyhow::Result<Vec<Expr>> {
for element in &arr.elem {
match &element {
Expr::Value(val) => match val {
sqlparser::ast::Value::Number(_, _) => {
list.push(Expr::Value(sqlparser::ast::Value::Number(
element.to_string(),
false,
)));
}
sqlparser::ast::Value::SingleQuotedString(_) => {
list.push(Expr::Value(sqlparser::ast::Value::UnQuotedString(
element.to_string(),
)));
}
_ => {
return Err(anyhow::anyhow!(
"Unsupported data type for IN list: {:?}",
val
))
}
},
_ => {
return Err(anyhow::anyhow!(
"Unsupported element for IN list: {:?}",
element
))
}
}
}
Ok(list)
}
/// Flatten Cast EXPR to List with right value type
/// For example Value(SingleQuotedString("{hash1,hash2}") must return
/// a vector Value(SingleQuotedString("hash1"), Value(SingleQuotedString("hash2")))
fn flatten_expr_to_in_list(&self, expr: &Expr) -> anyhow::Result<Vec<Expr>> {
let mut list = vec![];
// check if expr is of type Cast
if let Expr::Cast {
expr, data_type, ..
} = expr
{
// assert that expr is of type SingleQuotedString
if let Expr::Value(sqlparser::ast::Value::SingleQuotedString(s)) = expr.as_ref() {
// trim the starting and ending curly braces
let s = s.trim_start_matches('{').trim_end_matches('}');
// split string by comma
let split = s.split(',');
// match on data type, and create a vector of Expr::Value
match data_type {
DataType::Array(ArrayElemTypeDef::AngleBracket(inner))
| DataType::Array(ArrayElemTypeDef::SquareBracket(inner)) => {
match inner.as_ref() {
DataType::Text | DataType::Char(_) | DataType::Varchar(_) => {
for s in split {
list.push(Expr::Value(
sqlparser::ast::Value::SingleQuotedString(s.to_string()),
));
}
}
DataType::Integer(_)
| DataType::Float(_)
| DataType::BigInt(_)
| DataType::UnsignedBigInt(_)
| DataType::UnsignedInteger(_)
| DataType::UnsignedSmallInt(_)
| DataType::UnsignedTinyInt(_)
| DataType::TinyInt(_)
| DataType::UnsignedInt(_) => {
for s in split {
list.push(Expr::Value(sqlparser::ast::Value::Number(
s.to_string(),
false,
)));
}
}
_ => {
return Err(anyhow::anyhow!(
"Unsupported inner data type for IN list: {:?}",
data_type
))
}
}
}
_ => {
return Err(anyhow::anyhow!(
"Unsupported data type for IN list: {:?}",
data_type
))
}
}
} else if let Expr::Array(arr) = expr.as_ref() {
list = self
.pour_array_into_list(arr, list)
.expect("Failed to transfer array to list");
}
} else if let Expr::Array(arr) = expr {
list = self
.pour_array_into_list(arr, list)
.expect("Failed to transfer array to list");
}

Ok(list)
}
}
1 change: 1 addition & 0 deletions nexus/peer-mysql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ async-trait = "0.1"
chrono.workspace = true
futures = { version = "0.3.28", features = ["executor"] }
mysql_async = { version = "0.34", default-features = false, features = ["minimal-rust", "rust_decimal", "chrono", "rustls-tls"] }
peer-ast = { path = "../peer-ast" }
peer-cursor = { path = "../peer-cursor" }
peer-connections = { path = "../peer-connections" }
pgwire.workspace = true
Expand Down
Loading

0 comments on commit 329c121

Please sign in to comment.