diff --git a/deploy/samples/venicedb.yaml b/deploy/samples/venicedb.yaml index 3a993573..c3a46849 100644 --- a/deploy/samples/venicedb.yaml +++ b/deploy/samples/venicedb.yaml @@ -20,6 +20,7 @@ spec: connector = venice storeName = {{table}} partial-update-mode = true - key.fields-prefix = KEY_ - key.fields = {{keys}} + key.fields-prefix = {{keyPrefix:}} + key.fields = {{keys:KEY}} + key.type = {{keyType:PRIMITIVE}} value.fields-include: EXCEPT_KEY diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java index 75e91307..3905eccc 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java @@ -28,7 +28,7 @@ class K8sConnector implements Connector { @Override public Map configure(Source source) throws SQLException { Template.Environment env = - Template.Environment.EMPTY.with("name", source.database() + "-" + source.table().toLowerCase(Locale.ROOT)) + new Template.SimpleEnvironment().with("name", source.database() + "-" + source.table().toLowerCase(Locale.ROOT)) .with("database", source.database()) .with("table", source.table()) .with(source.options()); diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java index 69ab98ff..3186c2ee 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java @@ -26,7 +26,7 @@ class K8sJobDeployer extends K8sYamlDeployer { @Override public List specify(Job job) throws SQLException { Function sql = job.sql(); - Template.Environment env = Template.Environment.EMPTY.with("name", + Template.Environment env = new Template.SimpleEnvironment().with("name", job.sink().database() + "-" + job.sink().table().toLowerCase(Locale.ROOT)) .with("database", job.sink().database()) .with("schema", job.sink().schema()) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sSourceDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sSourceDeployer.java index c64e6215..2923d0d6 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sSourceDeployer.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sSourceDeployer.java @@ -24,7 +24,7 @@ class K8sSourceDeployer extends K8sYamlDeployer { @Override public List specify(Source source) throws SQLException { Template.Environment env = - Template.Environment.EMPTY.with("name", source.database() + "-" + source.table().toLowerCase(Locale.ROOT)) + new Template.SimpleEnvironment().with("name", source.database() + "-" + source.table().toLowerCase(Locale.ROOT)) .with("database", source.database()) .with("schema", source.schema()) .with("table", source.table()) diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java index 26267def..6a0bddfe 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java @@ -162,7 +162,7 @@ public SimpleTemplate(String template) { public String render(Environment env) { StringBuffer sb = new StringBuffer(); Pattern p = - Pattern.compile("([\\s\\-\\#]*)\\{\\{\\s*([\\w_\\-\\.]+)\\s*(:([\\w_\\-\\.]+))?\\s*((\\w+\\s*)*)\\s*\\}\\}"); + Pattern.compile("([\\s\\-\\#]*)\\{\\{\\s*([\\w_\\-\\.]+)\\s*(:([\\w_\\-\\.]*))?\\s*((\\w+\\s*)*)\\s*\\}\\}"); Matcher m = p.matcher(template); while (m.find()) { String prefix = m.group(1); diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java index 235be0dc..39e0187c 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java @@ -19,6 +19,7 @@ import org.apache.calcite.util.Litmus; import org.apache.calcite.util.Pair; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.linkedin.hoptimator.Deployable; @@ -44,6 +45,8 @@ public interface PipelineRel extends RelNode { Convention CONVENTION = new Convention.Impl("PIPELINE", PipelineRel.class); String KEY_OPTION = "keys"; + String KEY_PREFIX_OPTION = "keyPrefix"; + String KEY_TYPE_OPTION = "keyType"; String KEY_PREFIX = "KEY_"; void implement(Implementor implementor) throws SQLException; @@ -95,7 +98,8 @@ public void setSink(String database, List path, RelDataType rowType, Map this.sinkOptions = addKeysAsOption(options, rowType); } - private Map addKeysAsOption(Map options, RelDataType rowType) { + @VisibleForTesting + static Map addKeysAsOption(Map options, RelDataType rowType) { Map newOptions = new LinkedHashMap<>(options); RelDataType flattened = DataTypeUtils.flatten(rowType, new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT)); @@ -104,12 +108,15 @@ private Map addKeysAsOption(Map options, RelData if (newOptions.containsKey(KEY_OPTION)) { return newOptions; } + String keyString = flattened.getFieldList().stream() .map(x -> x.getName().replaceAll("\\$", "_")) .filter(name -> name.startsWith(KEY_PREFIX)) .collect(Collectors.joining(";")); if (!keyString.isEmpty()) { newOptions.put(KEY_OPTION, keyString); + newOptions.put(KEY_PREFIX_OPTION, KEY_PREFIX); + newOptions.put(KEY_TYPE_OPTION, "RECORD"); } return newOptions; } diff --git a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestTemplate.java b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestTemplate.java new file mode 100644 index 00000000..d24e68ae --- /dev/null +++ b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestTemplate.java @@ -0,0 +1,43 @@ +package com.linkedin.hoptimator.util; + +import java.util.Arrays; +import java.util.List; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestTemplate { + + @Test + public void testRender() { + Template.Environment env = new Template.SimpleEnvironment() + .with("name", "name") + .with("nameUpper", "name") + .with("nameLower", "NAME") + .with("multiline", "1\n2\n3\n") + .with("multilineUpper", "a\nb\nc\n") + .with("other", "test"); + + String template = "{{keys:KEY}}\n" + + "{{keyPrefix:}}\n" + + "{{name:default}}\n" + + "{{nameUpper toUpperCase}}\n" + + "{{nameLower toLowerCase}}\n" + + "{{multiline concat}}\n" + + "{{multilineUpper concat toUpperCase}}\n" + + "{{other unknown}}\n"; + + String renderedTemplate = new Template.SimpleTemplate(template).render(env); + List renderedTemplates = Arrays.asList(renderedTemplate.split("\n")); + assertEquals(8, renderedTemplates.size()); + assertEquals("KEY", renderedTemplates.get(0)); + assertEquals("", renderedTemplates.get(1)); + assertEquals("name", renderedTemplates.get(2)); + assertEquals("NAME", renderedTemplates.get(3)); + assertEquals("name", renderedTemplates.get(4)); + assertEquals("123", renderedTemplates.get(5)); + assertEquals("ABC", renderedTemplates.get(6)); + assertEquals("test", renderedTemplates.get(7)); + } +} diff --git a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/TestPipelineRel.java b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/TestPipelineRel.java new file mode 100644 index 00000000..bdbaa6d9 --- /dev/null +++ b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/TestPipelineRel.java @@ -0,0 +1,39 @@ +package com.linkedin.hoptimator.util.planner; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.jupiter.api.Test; + +import static com.linkedin.hoptimator.util.planner.PipelineRel.Implementor.addKeysAsOption; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestPipelineRel { + + @Test + public void testKeyOptions() { + RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + RelDataTypeFactory.Builder primitiveKeyBuilder = new RelDataTypeFactory.Builder(typeFactory); + primitiveKeyBuilder.add("KEY", SqlTypeName.VARCHAR); + primitiveKeyBuilder.add("intField", SqlTypeName.INTEGER); + Map keyOptions = addKeysAsOption(new HashMap<>(), primitiveKeyBuilder.build()); + assertTrue(keyOptions.isEmpty()); + + RelDataTypeFactory.Builder keyBuilder = new RelDataTypeFactory.Builder(typeFactory); + keyBuilder.add("keyInt", SqlTypeName.INTEGER); + keyBuilder.add("keyString", SqlTypeName.VARCHAR); + RelDataTypeFactory.Builder recordBuilder = new RelDataTypeFactory.Builder(typeFactory); + recordBuilder.add("intField", SqlTypeName.INTEGER); + recordBuilder.add("KEY", keyBuilder.build()); + keyOptions = addKeysAsOption(new HashMap<>(), recordBuilder.build()); + assertEquals(3, keyOptions.size()); + assertEquals("KEY_keyInt;KEY_keyString", keyOptions.get("keys")); + assertEquals("KEY_", keyOptions.get("keyPrefix")); + assertEquals("RECORD", keyOptions.get("keyType")); + } +}