Skip to content

Commit

Permalink
[Feature][Mongodb-CDC] support multi-table read
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 committed Nov 21, 2024
1 parent ad9f8b2 commit b5661da
Showing 1 changed file with 40 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,9 @@ public void testMongodbCdcToMysqlCheckDataE2e(TestContainer container)
@TestTemplate
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason =
"This case requires obtaining the task health status and manually canceling the canceled task, which is currently only supported by the zeta engine.")
public void testMongodbCdcMultiTableToMysqlCheckDataE2e(TestContainer container)
type = {EngineType.SPARK, EngineType.SEATUNNEL},
disabledReason = "")
public void testMongodbCdcMultiTableToMysqlByFlinkDataE2e(TestContainer container)
throws InterruptedException {
cleanSourceTable();
CompletableFuture.supplyAsync(
Expand All @@ -203,14 +202,51 @@ public void testMongodbCdcMultiTableToMysqlCheckDataE2e(TestContainer container)
TimeUnit.SECONDS.sleep(10);
// insert update delete
upsertDeleteSourceTable();
TimeUnit.SECONDS.sleep(30);
assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS);
assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS);

cleanSourceTable();
TimeUnit.SECONDS.sleep(20);
assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS);
assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS);
}

@TestTemplate
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "")
public void testMongodbCdcMultiTableToMysqlByZetaE2E(TestContainer container)
throws InterruptedException {
cleanSourceTable();
Long jobId = JobIdGenerator.newJobId();
CompletableFuture.supplyAsync(
() -> {
try {
container.executeJob(
"/mongodb_multi_table_cdc_to_mysql.conf", String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException();
}
return null;
});
TimeUnit.SECONDS.sleep(10);
// insert update delete
upsertDeleteSourceTable();
TimeUnit.SECONDS.sleep(20);
// cancel job
try {
Container.ExecResult cancelJobResult = container.cancelJob(String.valueOf(jobId));
Assertions.assertEquals(0, cancelJobResult.getExitCode(), cancelJobResult.getStderr());
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS);
assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS);

cleanSourceTable();
}

@TestTemplate
Expand Down

0 comments on commit b5661da

Please sign in to comment.