diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java index 168479fc1e8..6207f673aa9 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java @@ -27,6 +27,7 @@ import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; +import org.apache.seatunnel.e2e.common.util.JobIdGenerator; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -41,6 +42,7 @@ import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; +import java.io.IOException; import java.net.URL; import java.net.URLClassLoader; import java.sql.Connection; @@ -171,22 +173,72 @@ private void initializeStarRocksServer() { @TestTemplate public void testStarRocksSinkWithSchemaEvolutionCase(TestContainer container) - throws InterruptedException { + throws InterruptedException, IOException { + String jobId = String.valueOf(JobIdGenerator.newJobId()); + String jobConfigFile = "/mysqlcdc_to_starrocks_with_schema_change.conf"; CompletableFuture.runAsync( () -> { try { - container.executeJob("/mysqlcdc_to_starrocks_with_schema_change.conf"); + container.executeJob(jobConfigFile, jobId); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); } }); - TimeUnit.SECONDS.sleep(30); - assertSchemaEvolution( + TimeUnit.SECONDS.sleep(20); + // waiting for case1 completed + assertSchemaEvolutionForAddColumns( + DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, starRocksConnection); + + // savepoint 1 + Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); + + // case2 drop columns with cdc data at same time + shopDatabase.setTemplateName("drop_columns").createAndInitialize(); + + // restore 1 + CompletableFuture.supplyAsync( + () -> { + try { + container.restoreJob(jobConfigFile, jobId); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + + // waiting for case2 completed + assertTableStructureAndData( + DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, starRocksConnection); + + // savepoint 2 + Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); + + // case3 change column name with cdc data at same time + shopDatabase.setTemplateName("change_columns").createAndInitialize(); + + // case4 modify column data type with cdc data at same time + shopDatabase.setTemplateName("modify_columns").createAndInitialize(); + + // restore 2 + CompletableFuture.supplyAsync( + () -> { + try { + container.restoreJob(jobConfigFile, jobId); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + + // waiting for case3/case4 completed + assertTableStructureAndData( DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, starRocksConnection); } - private void assertSchemaEvolution( + private void assertSchemaEvolutionForAddColumns( String database, String sourceTable, String sinkTable, @@ -248,22 +300,6 @@ private void assertSchemaEvolution( String.format(PROJECTION_QUERY, database, sinkTable), sinkConnection)); }); - - // case2 drop columns with cdc data at same time - assertCaseByDdlName("drop_columns", database, sourceTable, sinkTable); - - // case3 change column name with cdc data at same time - assertCaseByDdlName("change_columns", database, sourceTable, sinkTable); - - // case4 modify column data type with cdc data at same time - assertCaseByDdlName("modify_columns", database, sourceTable, sinkTable); - } - - private void assertCaseByDdlName( - String drop_columns, String database, String sourceTable, String sinkTable) { - shopDatabase.setTemplateName(drop_columns).createAndInitialize(); - assertTableStructureAndData( - database, sourceTable, sinkTable, mysqlConnection, starRocksConnection); } private void assertTableStructureAndData(