Skip to content

Commit

Permalink
Support connector hints (#117)
Browse files Browse the repository at this point in the history
* Support connector hints

* Thread through partial view name

* nit

* update README

* partialViewName -> pipelineName

* output template as 'pipeline'

* update README

* Revert back to hints approach

* update README
  • Loading branch information
jogrogan authored Feb 27, 2025
1 parent 8f5f195 commit 3f1a571
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 10 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ While Deployers are extensible, today the primary deployer is to Kubernetes. The
and [K8sJobDeployer](hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java) (for job-templates)
provide a few template defaults that you can choose to include in your templates:

K8sSourceDeployer: `name, database, schema, table`
K8sSourceDeployer: `name, database, schema, table, pipeline`

K8sJobDeployer: `name, database, schema, table, sql, flinksql, flinkconfigs`
K8sJobDeployer: `name, database, schema, table, pipeline, sql, flinksql, flinkconfigs`

However, it is often a case where you want to add additional information to the templates that will be passed through during Source or Job creation.
There are two mechanisms to achieve this:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,13 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con

// Support "partial views", i.e. CREATE VIEW FOO$BAR, where the view name
// is "foo-bar" and the sink is just FOO.
String sinkName = viewName.split("\\$", 2)[0];
String[] viewParts = viewName.split("\\$", 2);
String sinkName = viewParts[0];
String pipelineName = database + "-" + sinkName;
if (viewParts.length > 1) {
pipelineName = pipelineName + "-" + viewParts[1];
}
connectionProperties.setProperty(DeploymentService.PIPELINE_OPTION, pipelineName);
List<String> sinkPath = new ArrayList<>();
sinkPath.addAll(schemaPath);
sinkPath.add(sinkName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class K8sConnector implements Connector {
@Override
public Map<String, String> configure() throws SQLException {
Template.Environment env =
new Template.SimpleEnvironment().with("name", source.database() + "-" + source.table().toLowerCase(Locale.ROOT))
new Template.SimpleEnvironment()
.with("name", source.database() + "-" + source.table().toLowerCase(Locale.ROOT))
.with("database", source.database())
.with("table", source.table())
.with(source.options());
Expand Down
2 changes: 1 addition & 1 deletion hoptimator-kafka/src/test/resources/kafka-ddl.id
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ metadata:
strimzi.io/cluster: one
spec:
topicName: existing-topic-2
partitions: 1
partitions: 4
replicas: 1
config:
retention.ms: 7200000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand All @@ -29,6 +30,8 @@
public final class DeploymentService {

private static final String HINT_OPTION = "hints";
public static final String PIPELINE_OPTION = "pipeline";

private DeploymentService() {
}

Expand Down Expand Up @@ -89,10 +92,16 @@ public static PipelineRel.Implementor plan(RelRoot root, List<RelOptMaterializat
return implementor;
}

// User provided hints will be passed through the "hints" field as KEY=VALUE pairs separated by commas.
// We can also configure additional properties to pass through as hints to the deployer.
public static Map<String, String> parseHints(Properties connectionProperties) {
Map<String, String> hints = new LinkedHashMap<>();
if (connectionProperties.containsKey(HINT_OPTION)) {
return Splitter.on(',').withKeyValueSeparator('=').split(connectionProperties.getProperty(HINT_OPTION));
hints.putAll(Splitter.on(',').withKeyValueSeparator('=').split(connectionProperties.getProperty(HINT_OPTION)));
}
if (connectionProperties.containsKey(PIPELINE_OPTION)) {
hints.put(PIPELINE_OPTION, connectionProperties.getProperty(PIPELINE_OPTION));
}
return Collections.emptyMap();
return hints;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -50,12 +49,13 @@ class Implementor {
private final ImmutablePairList<Integer, String> targetFields;
private final Map<String, String> hints;
private RelNode query;
private Sink sink = new Sink("PIPELINE", Arrays.asList("PIPELINE", "SINK"), Collections.emptyMap());
private Sink sink;
private RelDataType sinkRowType = null;

public Implementor(ImmutablePairList<Integer, String> targetFields, Map<String, String> hints) {
this.targetFields = targetFields;
this.hints = hints;
this.sink = new Sink("PIPELINE", Arrays.asList("PIPELINE", "SINK"), hints);
}

public void visit(RelNode node) throws SQLException {
Expand All @@ -75,7 +75,9 @@ public void visit(RelNode node) throws SQLException {
* a connector. The connector is configured via `CREATE TABLE...WITH(...)`.
*/
public void addSource(String database, List<String> path, RelDataType rowType, Map<String, String> options) {
sources.put(new Source(database, path, addKeysAsOption(options, rowType)), rowType);
Map<String, String> newOptions = addKeysAsOption(options, rowType);
newOptions.putAll(this.hints);
sources.put(new Source(database, path, newOptions), rowType);
}

/**
Expand Down

0 comments on commit 3f1a571

Please sign in to comment.