Skip to content

Commit

Permalink
Add missing rules for LISTAGG & UNNEST (#118)
Browse files Browse the repository at this point in the history
* Add missing rules for LISTAGG & UNNEST

* attempt to fix test flakiness
  • Loading branch information
jogrogan authored Feb 27, 2025
1 parent 3f1a571 commit 279b3d4
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 31 deletions.
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,17 @@ integration-tests: deploy-dev-environment
kubectl port-forward -n kafka svc/one-kafka-external-bootstrap 9092 & echo $$! > port-forward.pid
kubectl port-forward -n flink svc/flink-sql-gateway 8083 & echo $$! > port-forward-2.pid
kubectl port-forward -n flink svc/basic-session-deployment-rest 8081 & echo $$! > port-forward-3.pid
./gradlew intTest || kill `cat port-forward.pid port-forward-2.pid, port-forward-3.pid`
./gradlew intTest --no-parallel || kill `cat port-forward.pid port-forward-2.pid, port-forward-3.pid`
kill `cat port-forward.pid`
kill `cat port-forward-2.pid`
kill `cat port-forward-3.pid`

# kind cluster used in github workflow needs to have different routing set up, avoiding the need to forward kafka ports
integration-tests-kind: deploy-dev-environment
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-1 --for=condition=Ready --timeout=10m
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-2 --for=condition=Ready --timeout=10m
./gradlew intTest -i
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-1 --for=condition=Ready --timeout=10m
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-2 --for=condition=Ready --timeout=10m
./gradlew intTest -i --no-parallel

generate-models:
./generate-models.sh
Expand Down
87 changes: 65 additions & 22 deletions hoptimator-k8s/src/test/resources/k8s-ddl-function.id
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
!set outputformat mysql
!use k8s

create or replace view ADS."case" AS SELECT CASE WHEN "FIRST_NAME" = 'Bob' THEN ARRAY['a'] ELSE ARRAY['b'] END || CASE WHEN "FIRST_NAME" = 'Alice' THEN ARRAY['c'] ELSE ARRAY['d'] END AS arr from profile.members;
create or replace materialized view ADS."case" AS SELECT CASE WHEN "FIRST_NAME" = 'Bob' THEN ARRAY['a'] ELSE ARRAY['b'] END || CASE WHEN "FIRST_NAME" = 'Alice' THEN ARRAY['c'] ELSE ARRAY['d'] END AS arr from profile.members;
(0 rows modified)

!update
Expand All @@ -18,24 +18,41 @@ select * from ADS."case";

!ok

create or replace view ADS."json" AS SELECT JSON_VALUE('{"a": 1}', '$.a') AS json from profile.members;
create or replace materialized view ADS."createJson" AS SELECT JSON_OBJECT('name':"FIRST_NAME", 'type':4) AS json from profile.members;
(0 rows modified)

!update

select * from ADS."json";
+------+
| JSON |
+------+
| 1 |
| 1 |
| 1 |
+------+
select * from ADS."createJson";
+-----------------------------+
| JSON |
+-----------------------------+
| {"name":"Alice","type":4} |
| {"name":"Bob","type":4} |
| {"name":"Charlie","type":4} |
+-----------------------------+
(3 rows)

!ok

create or replace view ADS."regex" AS SELECT REGEXP_REPLACE("FIRST_NAME", '(B)ob', '$1ill') AS name from profile.members;
create or replace materialized view ADS."extractJson" AS SELECT JSON_VALUE("JSON", '$.name') AS name from ads."createJson";
(0 rows modified)

!update

select * from ADS."extractJson";
+---------+
| NAME |
+---------+
| Alice |
| Bob |
| Charlie |
+---------+
(3 rows)

!ok

create or replace materialized view ADS."regex" AS SELECT REGEXP_REPLACE("FIRST_NAME", '(B)ob', '$1ill') AS name from profile.members;
(0 rows modified)

!update
Expand All @@ -52,7 +69,7 @@ select * from ads."regex";

!ok

create or replace view ADS."concat" AS SELECT CONCAT('_', "FIRST_NAME", '_') AS name from profile.members;
create or replace materialized view ADS."concat" AS SELECT CONCAT('_', "FIRST_NAME", '_') AS name from profile.members;
(0 rows modified)

!update
Expand All @@ -69,8 +86,7 @@ select * from ads."concat";

!ok


create or replace view ADS."listagg" AS SELECT LISTAGG("FIRST_NAME") AS agg FROM profile.members;
create or replace materialized view ADS."listagg" AS SELECT LISTAGG("FIRST_NAME") AS agg FROM profile.members;
(0 rows modified)

!update
Expand All @@ -85,7 +101,24 @@ select * from ads."listagg";

!ok

create or replace view ADS."unnested" AS SELECT * FROM UNNEST(ARRAY(SELECT "FIRST_NAME" FROM profile.members)) AS name;
create or replace materialized view ADS."arr" AS SELECT ARRAY["FIRST_NAME"] AS arr FROM profile.members;
(0 rows modified)

!update

select * from ADS."arr";
+-----------+
| ARR |
+-----------+
| [Alice] |
| [Bob] |
| [Charlie] |
+-----------+
(3 rows)

!ok

create or replace materialized view ADS."unnested" AS SELECT * FROM UNNEST(SELECT ARR FROM ADS."arr") AS name;
(0 rows modified)

!update
Expand All @@ -102,32 +135,42 @@ select * from ADS."unnested";

!ok

drop view ads."case";
drop materialized view ads."unnested";
(0 rows modified)

!update

drop view ads."json";
drop materialized view ads."arr";
(0 rows modified)

!update

drop view ads."regex";
drop materialized view ads."listagg";
(0 rows modified)

!update

drop view ads."concat";
drop materialized view ads."concat";
(0 rows modified)

!update

drop view ads."listagg";
drop materialized view ads."regex";
(0 rows modified)

!update

drop view ads."unnested";
drop materialized view ads."extractJson";
(0 rows modified)

!update
!update

drop materialized view ads."createJson";
(0 rows modified)

!update

drop materialized view ads."case";
(0 rows modified)

!update
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.apache.calcite.prepare.Prepare;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Calc;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.Filter;
Expand All @@ -26,6 +28,9 @@
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.core.Uncollect;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalCalc;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
Expand All @@ -38,6 +43,7 @@
import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.util.ImmutableBitSet;

import com.google.common.collect.ImmutableSet;

Expand All @@ -51,7 +57,7 @@ private PipelineRules() {

public static Collection<RelOptRule> rules() {
return Arrays.asList(PipelineFilterRule.INSTANCE, PipelineProjectRule.INSTANCE, PipelineJoinRule.INSTANCE,
PipelineCalcRule.INSTANCE);
PipelineCalcRule.INSTANCE, PipelineAggregateRule.INSTANCE, PipelineUncollectRule.INSTANCE);
}

public static class PipelineTableScanRule extends ConverterRule {
Expand All @@ -72,9 +78,8 @@ protected PipelineTableScanRule(Config config, String database) {
@Override
public RelNode convert(RelNode rel) {
TableScan scan = (TableScan) rel;
RelOptTable table = scan.getTable();
RelTraitSet traitSet = scan.getTraitSet().replace(PipelineRel.CONVENTION);
return new PipelineTableScan(rel.getCluster(), traitSet, database, table);
return new PipelineTableScan(rel.getCluster(), traitSet, database, scan.getTable());
}
}

Expand Down Expand Up @@ -115,9 +120,8 @@ protected PipelineTableModifyRule(Config config, String database) {
@Override
public RelNode convert(RelNode rel) {
TableModify mod = (TableModify) rel;
RelOptTable table = mod.getTable();
RelTraitSet traitSet = mod.getTraitSet().replace(PipelineRel.CONVENTION);
return new PipelineTableModify(database, rel.getCluster(), traitSet, table, mod.getCatalogReader(),
return new PipelineTableModify(database, rel.getCluster(), traitSet, mod.getTable(), mod.getCatalogReader(),
convert(mod.getInput(), traitSet), mod.getOperation(), mod.getUpdateColumnList(),
mod.getSourceExpressionList(), mod.isFlattened());
}
Expand Down Expand Up @@ -307,6 +311,80 @@ public void implement(Implementor implementor) {
}
}

static class PipelineAggregateRule extends ConverterRule {
static final PipelineAggregateRule INSTANCE =
Config.INSTANCE.withConversion(LogicalAggregate.class, Convention.NONE, PipelineRel.CONVENTION, "PipelineAggregateRule")
.withRuleFactory(PipelineAggregateRule::new)
.as(Config.class)
.toRule(PipelineAggregateRule.class);

protected PipelineAggregateRule(Config config) {
super(config);
}

@Override
public RelNode convert(RelNode rel) {
Aggregate agg = (Aggregate) rel;
RelTraitSet traitSet = agg.getTraitSet().replace(PipelineRel.CONVENTION);
return new PipelineAggregate(rel.getCluster(), traitSet, agg.getHints(), convert(agg.getInput(), PipelineRel.CONVENTION),
agg.getGroupSet(), agg.getGroupSets(), agg.getAggCallList());
}
}

static class PipelineAggregate extends Aggregate implements PipelineRel {

PipelineAggregate(RelOptCluster cluster, RelTraitSet traitSet, List<RelHint> hints, RelNode input,
ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
super(cluster, traitSet, hints, input, groupSet, groupSets, aggCalls);
}

@Override
public PipelineAggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet,
List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
return new PipelineAggregate(getCluster(), traitSet, Collections.emptyList(), input, groupSet, groupSets, aggCalls);
}

@Override
public void implement(Implementor implementor) {
}
}

static class PipelineUncollectRule extends ConverterRule {
static final PipelineUncollectRule INSTANCE =
Config.INSTANCE.withConversion(Uncollect.class, Convention.NONE, PipelineRel.CONVENTION, "PipelineUncollectRule")
.withRuleFactory(PipelineUncollectRule::new)
.as(Config.class)
.toRule(PipelineUncollectRule.class);

protected PipelineUncollectRule(Config config) {
super(config);
}

@Override
public RelNode convert(RelNode rel) {
Uncollect uncollect = (Uncollect) rel;
RelTraitSet traitSet = uncollect.getTraitSet().replace(PipelineRel.CONVENTION);
return new PipelineUncollect(rel.getCluster(), traitSet, convert(uncollect.getInput(), PipelineRel.CONVENTION),
uncollect.withOrdinality, Collections.emptyList());
}
}

static class PipelineUncollect extends Uncollect implements PipelineRel {
PipelineUncollect(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, boolean withOrdinality,
List<String> itemAliases) {
super(cluster, traitSet, input, withOrdinality, itemAliases);
}

@Override
public PipelineUncollect copy(RelTraitSet traitSet, RelNode input) {
return new PipelineUncollect(getCluster(), traitSet, input, withOrdinality, Collections.emptyList());
}

@Override
public void implement(Implementor implementor) {
}
}

static Table findTable(CalciteSchema schema, List<String> qualifiedName) {
if (qualifiedName.size() == 0) {
throw new IllegalArgumentException("Empty qualified name.");
Expand Down

0 comments on commit 279b3d4

Please sign in to comment.