Skip to content

Commit

Permalink
fix test case
Browse files Browse the repository at this point in the history
  • Loading branch information
jw-itq committed Nov 21, 2024
1 parent 0693041 commit c586910
Showing 1 changed file with 57 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand All @@ -41,6 +42,7 @@
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.sql.Connection;
Expand Down Expand Up @@ -171,22 +173,72 @@ private void initializeStarRocksServer() {

@TestTemplate
public void testStarRocksSinkWithSchemaEvolutionCase(TestContainer container)
throws InterruptedException {
throws InterruptedException, IOException {
String jobId = String.valueOf(JobIdGenerator.newJobId());
String jobConfigFile = "/mysqlcdc_to_starrocks_with_schema_change.conf";
CompletableFuture.runAsync(
() -> {
try {
container.executeJob("/mysqlcdc_to_starrocks_with_schema_change.conf");
container.executeJob(jobConfigFile, jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
});
TimeUnit.SECONDS.sleep(30);
assertSchemaEvolution(
TimeUnit.SECONDS.sleep(20);
// waiting for case1 completed
assertSchemaEvolutionForAddColumns(
DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, starRocksConnection);

// savepoint 1
Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());

// case2 drop columns with cdc data at same time
shopDatabase.setTemplateName("drop_columns").createAndInitialize();

// restore 1
CompletableFuture.supplyAsync(
() -> {
try {
container.restoreJob(jobConfigFile, jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
return null;
});

// waiting for case2 completed
assertTableStructureAndData(
DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, starRocksConnection);

// savepoint 2
Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());

// case3 change column name with cdc data at same time
shopDatabase.setTemplateName("change_columns").createAndInitialize();

// case4 modify column data type with cdc data at same time
shopDatabase.setTemplateName("modify_columns").createAndInitialize();

// restore 2
CompletableFuture.supplyAsync(
() -> {
try {
container.restoreJob(jobConfigFile, jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
return null;
});

// waiting for case3/case4 completed
assertTableStructureAndData(
DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, starRocksConnection);
}

private void assertSchemaEvolution(
private void assertSchemaEvolutionForAddColumns(
String database,
String sourceTable,
String sinkTable,
Expand Down Expand Up @@ -248,22 +300,6 @@ private void assertSchemaEvolution(
String.format(PROJECTION_QUERY, database, sinkTable),
sinkConnection));
});

// case2 drop columns with cdc data at same time
assertCaseByDdlName("drop_columns", database, sourceTable, sinkTable);

// case3 change column name with cdc data at same time
assertCaseByDdlName("change_columns", database, sourceTable, sinkTable);

// case4 modify column data type with cdc data at same time
assertCaseByDdlName("modify_columns", database, sourceTable, sinkTable);
}

private void assertCaseByDdlName(
String drop_columns, String database, String sourceTable, String sinkTable) {
shopDatabase.setTemplateName(drop_columns).createAndInitialize();
assertTableStructureAndData(
database, sourceTable, sinkTable, mysqlConnection, starRocksConnection);
}

private void assertTableStructureAndData(
Expand Down

0 comments on commit c586910

Please sign in to comment.