diff --git a/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java b/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java index 22974fa224..72b8ba8b48 100644 --- a/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java +++ b/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java @@ -25,6 +25,7 @@ import org.dinky.utils.JsonUtils; import org.dinky.utils.LineageContext; +import org.apache.calcite.sql.SqlNode; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.ConfigOption; @@ -184,6 +185,11 @@ public Configuration getRootConfiguration() { } } + @Override + public SqlNode parseSql(String sql) { + return getParser().parseSql(sql); + } + private static Executor lookupExecutor( ClassLoader classLoader, String executorIdentifier, StreamExecutionEnvironment executionEnvironment) { try { diff --git a/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java b/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java index 9ca5fca07c..2d790d2667 100644 --- a/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java +++ b/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java @@ -26,6 +26,7 @@ import org.dinky.parser.CustomParserImpl; import org.dinky.utils.LineageContext; +import org.apache.calcite.sql.SqlNode; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.ConfigOption; @@ -284,6 +285,11 @@ public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extr return record; } + @Override + public SqlNode parseSql(String sql) { + return getParser().parseSql(sql); + } + @Override public Table fromDataStream(DataStream dataStream, String fields) { List expressions = ExpressionParser.INSTANCE.parseExpressionList(fields); diff --git a/dinky-client/dinky-client-1.16/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java b/dinky-client/dinky-client-1.16/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java index c03380a8ec..ca3b6e9f31 100644 --- a/dinky-client/dinky-client-1.16/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java +++ b/dinky-client/dinky-client-1.16/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java @@ -23,6 +23,7 @@ import org.dinky.parser.CustomParserImpl; import org.dinky.utils.JsonUtils; +import org.apache.calcite.sql.SqlNode; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; @@ -167,4 +168,9 @@ public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extr record.setExplain(getPlanner().explain(operations, extraDetails)); return record; } + + @Override + public SqlNode parseSql(String sql) { + return ((ExtendedParser) getParser()).parseSql(sql); + } } diff --git a/dinky-client/dinky-client-1.17/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java b/dinky-client/dinky-client-1.17/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java index 8ee3256aa7..3b6e4b1a9e 100644 --- a/dinky-client/dinky-client-1.17/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java +++ b/dinky-client/dinky-client-1.17/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java @@ -21,6 +21,7 @@ import org.dinky.parser.CustomParserImpl; +import org.apache.calcite.sql.SqlNode; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; @@ -144,4 +145,9 @@ public StreamGraph getStreamGraphFromInserts(List statements) { return transOperatoinsToStreamGraph(modifyOperations); } + + @Override + public SqlNode parseSql(String sql) { + return ((ExtendedParser) getParser()).parseSql(sql); + } } diff --git a/dinky-client/dinky-client-1.18/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java b/dinky-client/dinky-client-1.18/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java index 530f772c6d..4a63497546 100644 --- a/dinky-client/dinky-client-1.18/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java +++ b/dinky-client/dinky-client-1.18/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java @@ -21,6 +21,7 @@ import org.dinky.operations.CustomNewParserImpl; +import org.apache.calcite.sql.SqlNode; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; @@ -149,4 +150,9 @@ public StreamGraph getStreamGraphFromInserts(List statements) { public void createCatalog(String catalogName, CatalogDescriptor catalogDescriptor) { getCatalogManager().createCatalog(catalogName, catalogDescriptor); } + + @Override + public SqlNode parseSql(String sql) { + return ((ExtendedParser) getParser()).getCustomParser().parseSql(sql); + } } diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java index 74c58c0d64..a36e38fcc3 100644 --- a/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java +++ b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java @@ -21,6 +21,7 @@ import org.dinky.operations.CustomNewParserImpl; +import org.apache.calcite.sql.SqlNode; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; @@ -157,4 +158,9 @@ public TableResultInternal executeCachedPlanInternal(CachedPlan cachedPlan) { return null; } + + @Override + public SqlNode parseSql(String sql) { + return ((ExtendedParser) getParser()).getCustomParser().parseSql(sql); + } } diff --git a/dinky-client/dinky-client-1.20/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java b/dinky-client/dinky-client-1.20/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java index 74c58c0d64..a36e38fcc3 100644 --- a/dinky-client/dinky-client-1.20/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java +++ b/dinky-client/dinky-client-1.20/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java @@ -21,6 +21,7 @@ import org.dinky.operations.CustomNewParserImpl; +import org.apache.calcite.sql.SqlNode; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; @@ -157,4 +158,9 @@ public TableResultInternal executeCachedPlanInternal(CachedPlan cachedPlan) { return null; } + + @Override + public SqlNode parseSql(String sql) { + return ((ExtendedParser) getParser()).getCustomParser().parseSql(sql); + } } diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java index 794375f761..13bc6b49ad 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java @@ -22,6 +22,7 @@ import org.dinky.data.model.LineageRel; import org.dinky.data.result.SqlExplainResult; +import org.apache.calcite.sql.SqlNode; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; @@ -70,6 +71,8 @@ public interface CustomTableEnvironment Configuration getRootConfiguration(); + SqlNode parseSql(String sql); + default JobGraph getJobGraphFromInserts(List statements) { return getStreamGraphFromInserts(statements).getJobGraph(); } diff --git a/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java b/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java index 8f6f7873b9..c6ad11ce17 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java @@ -29,22 +29,19 @@ import org.apache.calcite.config.Lex; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.dialect.AnsiSqlDialect; import org.apache.calcite.sql.parser.SqlParser; import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.commons.lang3.StringUtils; -import org.apache.flink.table.api.Schema; -import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.operations.ddl.CreateTableOperation; +import org.apache.flink.sql.parser.ddl.SqlCreateTable; import java.text.MessageFormat; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -89,22 +86,23 @@ private void mockSink(JobParam jobParam) { // mock insert table ddl List mockedDdl = new ArrayList<>(); for (StatementParam ddl : jobParam.getDdl()) { - List parseOperationList = tableEnv.getParser().parse(ddl.getValue()); - for (Operation operation : parseOperationList) { - if (operation instanceof CreateTableOperation) { - CreateTableOperation createOperation = (CreateTableOperation) operation; - CatalogTable catalogTable = createOperation.getCatalogTable(); - // get table name and check if it should be mocked - String tableName = createOperation.getTableIdentifier().getObjectName(); - if (tablesNeedMock.contains(tableName)) { - // generate mock statement - mockedDdl.add( - new StatementParam(getSinkMockDdlStatement(tableName, catalogTable), SqlType.CREATE)); - } else { - mockedDdl.add(ddl); - } + SqlNode sqlNode = tableEnv.parseSql(ddl.getValue()); + boolean isDdlMocked = false; + if (sqlNode instanceof SqlCreateTable) { + SqlCreateTable sqlCreateTable = (SqlCreateTable) sqlNode; + String tableName = sqlCreateTable.getTableName().toString(); + if (tablesNeedMock.contains(tableName)) { + // generate mock statement + mockedDdl.add(new StatementParam( + getSinkMockDdlStatement( + tableName, sqlCreateTable.getColumnList().toString()), + SqlType.CREATE)); + isDdlMocked = true; } } + if (!isDdlMocked) { + mockedDdl.add(ddl); + } } jobParam.setDdl(mockedDdl); log.debug("Mock sink succeed: {}", JsonUtils.toJsonString(jobParam)); @@ -155,19 +153,11 @@ private Set getTableNamesNeedMockAndModifyTrans(JobParam jobParam) { * get mocked ddl statement * * @param tableName table name - * @param catalogTable catalog table + * @param columns columns * @return ddl that connector is changed as well as other options not changed */ - private String getSinkMockDdlStatement(String tableName, CatalogTable catalogTable) { + private String getSinkMockDdlStatement(String tableName, String columns) { String mockedOption = "'connector'='" + MockDynamicTableSinkFactory.IDENTIFIER + "'"; - // columns - Schema unresolvedSchema = catalogTable.getUnresolvedSchema(); - String columns = unresolvedSchema.getColumns().stream() - .map(column -> { - Schema.UnresolvedPhysicalColumn physicalColumn = (Schema.UnresolvedPhysicalColumn) column; - return physicalColumn.getName() + " " + physicalColumn.getDataType(); - }) - .collect(Collectors.joining(", ")); return MessageFormat.format( MOCK_SQL_TEMPLATE, StringUtils.join(generateMockedTableIdentifier(tableName), "."),