diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java index cbdf35954f8e..d848d36a00f2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java @@ -184,10 +184,9 @@ public void testMongodbCdcToMysqlCheckDataE2e(TestContainer container) @TestTemplate @DisabledOnContainer( value = {}, - type = {EngineType.SPARK, EngineType.FLINK}, - disabledReason = - "This case requires obtaining the task health status and manually canceling the canceled task, which is currently only supported by the zeta engine.") - public void testMongodbCdcMultiTableToMysqlCheckDataE2e(TestContainer container) + type = {EngineType.SPARK, EngineType.SEATUNNEL}, + disabledReason = "") + public void testMongodbCdcMultiTableToMysqlByFlinkDataE2e(TestContainer container) throws InterruptedException { cleanSourceTable(); CompletableFuture.supplyAsync( @@ -203,14 +202,51 @@ public void testMongodbCdcMultiTableToMysqlCheckDataE2e(TestContainer container) TimeUnit.SECONDS.sleep(10); // insert update delete upsertDeleteSourceTable(); + TimeUnit.SECONDS.sleep(30); + assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS); + assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS); + + cleanSourceTable(); TimeUnit.SECONDS.sleep(20); assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS); assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS); + } + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "") + public void testMongodbCdcMultiTableToMysqlByZetaE2E(TestContainer container) + throws InterruptedException { cleanSourceTable(); + Long jobId = JobIdGenerator.newJobId(); + CompletableFuture.supplyAsync( + () -> { + try { + container.executeJob( + "/mongodb_multi_table_cdc_to_mysql.conf", String.valueOf(jobId)); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(); + } + return null; + }); + TimeUnit.SECONDS.sleep(10); + // insert update delete + upsertDeleteSourceTable(); TimeUnit.SECONDS.sleep(20); + // cancel job + try { + Container.ExecResult cancelJobResult = container.cancelJob(String.valueOf(jobId)); + Assertions.assertEquals(0, cancelJobResult.getExitCode(), cancelJobResult.getStderr()); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS); assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS); + + cleanSourceTable(); } @TestTemplate