Skip to content

Commit

Permalink
More MySQL rewrite rules (#1661)
Browse files Browse the repository at this point in the history
1. also remove CAST from OFFSET
2. remove explicit `public` schema. Needed for UDF passthrough
3. `jsonb` isn't in MySQL, so rewrite '...'::jsonb literals, in
particular mapping [1,2,3] to array for StarRocks

This can be leveraged to use array functions in StarRocks:

```sql
CREATE OR REPLACE FUNCTION public.cardinality(jsonb)
RETURNS int AS $$
  BEGIN
    RETURN jsonb_array_length($1);
  END
$$
LANGUAGE PLPGSQL IMMUTABLE;

CREATE OR REPLACE FUNCTION public.array_intersect(jsonb, jsonb)
RETURNS jsonb AS $$
  BEGIN
        RETURN (select jsonb_agg(a) from jsonb_array_elements($1) a join jsonb_array_elements($2) b on a = b);
  END
$$
LANGUAGE PLPGSQL IMMUTABLE;

-- where postgres_fdw is specified in `extensions` option of fdw server
ALTER EXTENSION postgres_fdw ADD FUNCTION public.cardinality(jsonb);
ALTER EXTENSION postgres_fdw ADD FUNCTION public.array_intersect(jsonb, jsonb);
```

After which `select array_intersect(fdwtbl.col, '[1,2,3]') from fdwtbl`
can be pushed down,
but function will also work if postgres for some reason can't push down
  • Loading branch information
serprex authored May 2, 2024
1 parent 5a1734f commit 5b6ca38
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 23 deletions.
6 changes: 1 addition & 5 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,11 +922,7 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu
.context("no port specified")?
.parse::<u32>()
.context("unable to parse port as valid int")?,
user: opts
.get("user")
.cloned()
.unwrap_or_default()
.to_string(),
user: opts.get("user").cloned().unwrap_or_default().to_string(),
password: opts
.get("password")
.cloned()
Expand Down
3 changes: 1 addition & 2 deletions nexus/analyzer/src/qrep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,7 @@ pub fn process_options(

// If mode is upsert, we need unique key columns
if opts.get("mode") == Some(&Value::String(String::from("upsert")))
&& (opts.get("unique_key_columns").is_none()
|| opts.get("unique_key_columns") == Some(&Value::Array(vec![])))
&& opts.get("unique_key_columns").map(|ukc| ukc == &Value::Array(Vec::new())).unwrap_or(true)
{
anyhow::bail!("For upsert mode, unique_key_columns must be specified");
}
Expand Down
10 changes: 3 additions & 7 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,9 @@ impl Catalog {
)
.await?;

if job.flow_options.get("destination_table_name").is_none() {
let Some(destination_table_name) = job.flow_options.get("destination_table_name") else {
return Err(anyhow!("destination_table_name not found in flow options"));
}
};

let _rows = self
.pg
Expand All @@ -462,11 +462,7 @@ impl Catalog {
&source_peer_id,
&destination_peer_id,
&job.description,
&job.flow_options
.get("destination_table_name")
.unwrap()
.as_str()
.unwrap(),
&destination_table_name.as_str().unwrap(),
&job.query_string,
&serde_json::to_value(job.flow_options.clone())
.context("unable to serialize flow options")?,
Expand Down
8 changes: 4 additions & 4 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ impl FlowGrpcClient {
for (key, value) in &job.flow_options {
match value {
Value::String(s) => match key.as_str() {
"destination_table_name" => cfg.destination_table_identifier = s.clone(),
"watermark_column" => cfg.watermark_column = s.clone(),
"watermark_table_name" => cfg.watermark_table = s.clone(),
"destination_table_name" => cfg.destination_table_identifier.clone_from(s),
"watermark_column" => cfg.watermark_column.clone_from(s),
"watermark_table_name" => cfg.watermark_table.clone_from(s),
"mode" => {
let mut wm = QRepWriteMode {
write_type: QRepWriteType::QrepWriteModeAppend as i32,
Expand Down Expand Up @@ -229,7 +229,7 @@ impl FlowGrpcClient {
_ => return anyhow::Result::Err(anyhow::anyhow!("invalid mode {}", s)),
}
}
"staging_path" => cfg.staging_path = s.clone(),
"staging_path" => cfg.staging_path.clone_from(s),
_ => return anyhow::Result::Err(anyhow::anyhow!("invalid str option {}", key)),
},
Value::Number(n) => match key.as_str() {
Expand Down
54 changes: 52 additions & 2 deletions nexus/peer-mysql/src/ast.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,39 @@
use std::ops::ControlFlow;

use peer_ast::flatten_expr_to_in_list;
use serde_json::{self, Value as JsonValue};
use sqlparser::ast::{
visit_expressions_mut, visit_function_arg_mut, visit_relations_mut, Array, BinaryOperator,
DataType, Expr, FunctionArgExpr, Query,
DataType, Expr, FunctionArgExpr, Offset, Query, Value,
};

fn json_to_expr(val: JsonValue) -> Expr {
match val {
JsonValue::Null => Expr::Value(Value::Null),
JsonValue::Bool(x) => Expr::Value(Value::Boolean(x)),
JsonValue::Number(x) => Expr::Value(Value::Number(x.to_string(), false)),
JsonValue::String(x) => Expr::Value(Value::SingleQuotedString(x)),
JsonValue::Array(x) => Expr::Array(Array {
elem: x.into_iter().map(json_to_expr).collect::<Vec<_>>(),
named: false,
}),
JsonValue::Object(x) => Expr::Cast {
data_type: DataType::JSON,
expr: Box::new(Expr::Value(Value::SingleQuotedString(
JsonValue::Object(x).to_string(),
))),
format: None,
},
}
}

pub fn rewrite_query(peername: &str, query: &mut Query) {
visit_relations_mut(query, |table| {
// if peer name is first part of table name, remove first part
if peername.eq_ignore_ascii_case(&table.0[0].value) {
// remove `public.` to facilitate mysql global function push down
if table.0.len() > 1
&& (peername.eq_ignore_ascii_case(&table.0[0].value) || table.0[0].value == "public")
{
table.0.remove(0);
}
ControlFlow::<()>::Continue(())
Expand All @@ -19,6 +43,16 @@ pub fn rewrite_query(peername: &str, query: &mut Query) {
if let Some(Expr::Cast { expr, .. }) = &query.limit {
query.limit = Some((**expr).clone());
}
if let Some(Offset {
value: Expr::Cast { expr, .. },
rows,
}) = &query.offset
{
query.offset = Some(Offset {
value: (**expr).clone(),
rows: *rows,
});
}

visit_function_arg_mut(query, |node| {
if let FunctionArgExpr::Expr(arg_expr) = node {
Expand All @@ -34,6 +68,22 @@ pub fn rewrite_query(peername: &str, query: &mut Query) {
named: true,
};
*node = FunctionArgExpr::Expr(Expr::Array(rewritten_array));
} else if let Expr::Cast {
data_type: DataType::JSONB,
expr,
..
} = arg_expr
{
*node = match **expr {
Expr::Value(Value::SingleQuotedString(ref s)) => {
if let Ok(val) = serde_json::from_str::<JsonValue>(s) {
FunctionArgExpr::Expr(json_to_expr(val))
} else {
FunctionArgExpr::Expr((**expr).clone())
}
}
_ => FunctionArgExpr::Expr((**expr).clone()),
};
}
}

Expand Down
2 changes: 1 addition & 1 deletion nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ impl NexusBackend {
match qrep_config {
Some(mut qrep_config) => {
if query_string.is_some() {
qrep_config.query = query_string.as_ref().unwrap().clone();
qrep_config.query.clone_from(query_string.as_ref().unwrap());
}
qrep_config.dst_table_full_resync = true;

Expand Down
4 changes: 2 additions & 2 deletions nexus/server/tests/server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ fn server_test() {
output_file.flush().expect("Unable to flush output file");
}

let obtained_file = std::fs::read_to_string(&actual_output_path).unwrap();
let expected_file = std::fs::read_to_string(&expected_output_path).unwrap();
let obtained_file = std::fs::read_to_string(actual_output_path).unwrap();
let expected_file = std::fs::read_to_string(expected_output_path).unwrap();
// if there is a mismatch, print the diff, along with the path.
if obtained_file != expected_file {
tracing::info!("failed: {file}");
Expand Down

0 comments on commit 5b6ca38

Please sign in to comment.